From bb5a35edb3a903dcd384626c87867bc193170cf6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 15 Feb 2017 11:50:39 +0000 Subject: [PATCH] Move partition offsets sql to a separate file --- .../roomserver/storage/partitionoffsetsql.go | 67 +++++++++++++++++++ .../dendrite/roomserver/storage/sql.go | 61 +---------------- 2 files changed, 69 insertions(+), 59 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/roomserver/storage/partitionoffsetsql.go diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/partitionoffsetsql.go b/src/github.com/matrix-org/dendrite/roomserver/storage/partitionoffsetsql.go new file mode 100644 index 000000000..2dfe13287 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/partitionoffsetsql.go @@ -0,0 +1,67 @@ +package storage + +import ( + "database/sql" + "github.com/matrix-org/dendrite/roomserver/types" +) + +const partitionOffsetsSchema = ` +-- The offsets that the server has processed up to. +CREATE TABLE IF NOT EXISTS partition_offsets ( + -- The name of the topic. + topic TEXT NOT NULL, + -- The 32-bit partition ID + partition INTEGER NOT NULL, + -- The 64-bit offset. + partition_offset BIGINT NOT NULL, + CONSTRAINT topic_partition_unique UNIQUE (topic, partition) +); +` + +const selectPartitionOffsetsSQL = "" + + "SELECT partition, partition_offset FROM partition_offsets WHERE topic = $1" + +const upsertPartitionOffsetsSQL = "" + + "INSERT INTO partition_offsets (topic, partition, partition_offset) VALUES ($1, $2, $3)" + + " ON CONFLICT ON CONSTRAINT topic_partition_unique" + + " DO UPDATE SET partition_offset = $3" + +type partitionOffsetStatements struct { + selectPartitionOffsetsStmt *sql.Stmt + upsertPartitionOffsetStmt *sql.Stmt +} + +func (s *partitionOffsetStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(partitionOffsetsSchema) + if err != nil { + return + } + if s.selectPartitionOffsetsStmt, err = db.Prepare(selectPartitionOffsetsSQL); err != nil { + return + } + if s.upsertPartitionOffsetStmt, err = db.Prepare(upsertPartitionOffsetsSQL); err != nil { + return + } + return +} + +func (s *partitionOffsetStatements) selectPartitionOffsets(topic string) ([]types.PartitionOffset, error) { + rows, err := s.selectPartitionOffsetsStmt.Query(topic) + if err != nil { + return nil, err + } + defer rows.Close() + var results []types.PartitionOffset + for rows.Next() { + var offset types.PartitionOffset + if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil { + return nil, err + } + } + return results, nil +} + +func (s *partitionOffsetStatements) upsertPartitionOffset(topic string, partition int32, offset int64) error { + _, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset) + return err +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go index 6b23f1e36..bfd61a450 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go @@ -8,8 +8,7 @@ import ( ) type statements struct { - selectPartitionOffsetsStmt *sql.Stmt - upsertPartitionOffsetStmt *sql.Stmt + partitionOffsetStatements insertEventTypeNIDStmt *sql.Stmt selectEventTypeNIDStmt *sql.Stmt insertEventStateKeyNIDStmt *sql.Stmt @@ -33,7 +32,7 @@ type statements struct { func (s *statements) prepare(db *sql.DB) error { var err error - if err = s.preparePartitionOffsets(db); err != nil { + if err = s.partitionOffsetStatements.prepare(db); err != nil { return err } @@ -60,62 +59,6 @@ func (s *statements) prepare(db *sql.DB) error { return nil } -func (s *statements) preparePartitionOffsets(db *sql.DB) (err error) { - _, err = db.Exec(partitionOffsetsSchema) - if err != nil { - return - } - if s.selectPartitionOffsetsStmt, err = db.Prepare(selectPartitionOffsetsSQL); err != nil { - return - } - if s.upsertPartitionOffsetStmt, err = db.Prepare(upsertPartitionOffsetsSQL); err != nil { - return - } - return -} - -const partitionOffsetsSchema = ` --- The offsets that the server has processed up to. -CREATE TABLE IF NOT EXISTS partition_offsets ( - -- The name of the topic. - topic TEXT NOT NULL, - -- The 32-bit partition ID - partition INTEGER NOT NULL, - -- The 64-bit offset. - partition_offset BIGINT NOT NULL, - CONSTRAINT topic_partition_unique UNIQUE (topic, partition) -); -` - -const selectPartitionOffsetsSQL = "" + - "SELECT partition, partition_offset FROM partition_offsets WHERE topic = $1" - -const upsertPartitionOffsetsSQL = "" + - "INSERT INTO partition_offsets (topic, partition, partition_offset) VALUES ($1, $2, $3)" + - " ON CONFLICT ON CONSTRAINT topic_partition_unique" + - " DO UPDATE SET partition_offset = $3" - -func (s *statements) selectPartitionOffsets(topic string) ([]types.PartitionOffset, error) { - rows, err := s.selectPartitionOffsetsStmt.Query(topic) - if err != nil { - return nil, err - } - defer rows.Close() - var results []types.PartitionOffset - for rows.Next() { - var offset types.PartitionOffset - if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil { - return nil, err - } - } - return results, nil -} - -func (s *statements) upsertPartitionOffset(topic string, partition int32, offset int64) error { - _, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset) - return err -} - func (s *statements) prepareEventTypes(db *sql.DB) (err error) { _, err = db.Exec(eventTypesSchema) if err != nil {