diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go index 26607cdf1..16474ec6b 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go @@ -29,8 +29,8 @@ import ( // Database represents an account database type Database struct { - db *sql.DB - partitions common.PartitionOffsetStatements + db *sql.DB + common.PartitionOffsetStatements accounts accountsStatements profiles profilesStatements memberships membershipStatements @@ -127,16 +127,6 @@ func (d *Database) CreateAccount( return d.accounts.insertAccount(ctx, localpart, hash) } -// PartitionOffsets implements common.PartitionStorer -func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { - return d.partitions.SelectPartitionOffsets(topic) -} - -// SetPartitionOffset implements common.PartitionStorer -func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error { - return d.partitions.UpsertPartitionOffset(topic, partition, offset) -} - // SaveMembership saves the user matching a given localpart as a member of a given // room. It also stores the ID of the membership event and a flag on whether the user // is still in the room. diff --git a/src/github.com/matrix-org/dendrite/common/consumers.go b/src/github.com/matrix-org/dendrite/common/consumers.go index e3f999111..2e58a6241 100644 --- a/src/github.com/matrix-org/dendrite/common/consumers.go +++ b/src/github.com/matrix-org/dendrite/common/consumers.go @@ -15,6 +15,7 @@ package common import ( + "context" "fmt" sarama "gopkg.in/Shopify/sarama.v1" @@ -31,9 +32,9 @@ type PartitionOffset struct { // A PartitionStorer has the storage APIs needed by the consumer. type PartitionStorer interface { // PartitionOffsets returns the offsets the consumer has reached for each partition. - PartitionOffsets(topic string) ([]PartitionOffset, error) + PartitionOffsets(ctx context.Context, topic string) ([]PartitionOffset, error) // SetPartitionOffset records where the consumer has reached for a partition. - SetPartitionOffset(topic string, partition int32, offset int64) error + SetPartitionOffset(ctx context.Context, topic string, partition int32, offset int64) error } // A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to @@ -75,7 +76,7 @@ func (c *ContinualConsumer) Start() error { offsets[partition] = sarama.OffsetOldest } - storedOffsets, err := c.PartitionStore.PartitionOffsets(c.Topic) + storedOffsets, err := c.PartitionStore.PartitionOffsets(context.TODO(), c.Topic) if err != nil { return err } @@ -110,7 +111,7 @@ func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) { for message := range pc.Messages() { msgErr := c.ProcessMessage(message) // Advance our position in the stream so that we will start at the right position after a restart. - if err := c.PartitionStore.SetPartitionOffset(c.Topic, message.Partition, message.Offset); err != nil { + if err := c.PartitionStore.SetPartitionOffset(context.TODO(), c.Topic, message.Partition, message.Offset); err != nil { panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %s", err)) } // Shutdown if we were told to do so. diff --git a/src/github.com/matrix-org/dendrite/common/partition_offset_table.go b/src/github.com/matrix-org/dendrite/common/partition_offset_table.go index 9d727c56e..bb23755c7 100644 --- a/src/github.com/matrix-org/dendrite/common/partition_offset_table.go +++ b/src/github.com/matrix-org/dendrite/common/partition_offset_table.go @@ -14,8 +14,11 @@ package common -import "database/sql" -import "strings" +import ( + "context" + "database/sql" + "strings" +) const partitionOffsetsSchema = ` -- The offsets that the server has processed up to. @@ -66,9 +69,25 @@ func (s *PartitionOffsetStatements) Prepare(db *sql.DB, prefix string) (err erro return } -// SelectPartitionOffsets returns all the partition offsets for the given topic. -func (s *PartitionOffsetStatements) SelectPartitionOffsets(topic string) ([]PartitionOffset, error) { - rows, err := s.selectPartitionOffsetsStmt.Query(topic) +// PartitionOffsets implements PartitionStorer +func (s *PartitionOffsetStatements) PartitionOffsets( + ctx context.Context, topic string, +) ([]PartitionOffset, error) { + return s.selectPartitionOffsets(ctx, topic) +} + +// SetPartitionOffset implements PartitionStorer +func (s *PartitionOffsetStatements) SetPartitionOffset( + ctx context.Context, topic string, partition int32, offset int64, +) error { + return s.upsertPartitionOffset(ctx, topic, partition, offset) +} + +// selectPartitionOffsets returns all the partition offsets for the given topic. +func (s *PartitionOffsetStatements) selectPartitionOffsets( + ctx context.Context, topic string, +) ([]PartitionOffset, error) { + rows, err := s.selectPartitionOffsetsStmt.QueryContext(ctx, topic) if err != nil { return nil, err } @@ -85,7 +104,9 @@ func (s *PartitionOffsetStatements) SelectPartitionOffsets(topic string) ([]Part } // UpsertPartitionOffset updates or inserts the partition offset for the given topic. -func (s *PartitionOffsetStatements) UpsertPartitionOffset(topic string, partition int32, offset int64) error { - _, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset) +func (s *PartitionOffsetStatements) upsertPartitionOffset( + ctx context.Context, topic string, partition int32, offset int64, +) error { + _, err := s.upsertPartitionOffsetStmt.ExecContext(ctx, topic, partition, offset) return err } diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go index aa836efb0..fc7f830e4 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go @@ -61,16 +61,6 @@ func (d *Database) prepare() error { return nil } -// PartitionOffsets implements common.PartitionStorer -func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { - return d.SelectPartitionOffsets(topic) -} - -// SetPartitionOffset implements common.PartitionStorer -func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error { - return d.UpsertPartitionOffset(topic, partition, offset) -} - // UpdateRoom updates the joined hosts for a room and returns what the joined // hosts were before the update. func (d *Database) UpdateRoom( diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/storage/storage.go b/src/github.com/matrix-org/dendrite/publicroomsapi/storage/storage.go index c1482e334..eab27041b 100644 --- a/src/github.com/matrix-org/dendrite/publicroomsapi/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/storage/storage.go @@ -27,8 +27,8 @@ import ( // PublicRoomsServerDatabase represents a public rooms server database. type PublicRoomsServerDatabase struct { - db *sql.DB - partitions common.PartitionOffsetStatements + db *sql.DB + common.PartitionOffsetStatements statements publicRoomsStatements } @@ -52,16 +52,6 @@ func NewPublicRoomsServerDatabase(dataSourceName string) (*PublicRoomsServerData return &PublicRoomsServerDatabase{db, partitions, statements}, nil } -// PartitionOffsets implements common.PartitionStorer -func (d *PublicRoomsServerDatabase) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { - return d.partitions.SelectPartitionOffsets(topic) -} - -// SetPartitionOffset implements common.PartitionStorer -func (d *PublicRoomsServerDatabase) SetPartitionOffset(topic string, partition int32, offset int64) error { - return d.partitions.UpsertPartitionOffset(topic, partition, offset) -} - // GetRoomVisibility returns the room visibility as a boolean: true if the room // is publicly visible, false if not. // Returns an error if the retrieval failed. diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 491cf3959..3ab7e1c7c 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -42,8 +42,8 @@ type streamEvent struct { // SyncServerDatabase represents a sync server database type SyncServerDatabase struct { - db *sql.DB - partitions common.PartitionOffsetStatements + db *sql.DB + common.PartitionOffsetStatements accountData accountDataStatements events outputRoomEventsStatements roomstate currentRoomStateStatements @@ -57,7 +57,7 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { if d.db, err = sql.Open("postgres", dataSourceName); err != nil { return nil, err } - if err = d.partitions.Prepare(d.db, "syncapi"); err != nil { + if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil { return nil, err } if err = d.accountData.prepare(d.db); err != nil { @@ -162,16 +162,6 @@ func (d *SyncServerDatabase) GetStateEvent( return d.roomstate.selectStateEvent(ctx, evType, roomID, stateKey) } -// PartitionOffsets implements common.PartitionStorer -func (d *SyncServerDatabase) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { - return d.partitions.SelectPartitionOffsets(topic) -} - -// SetPartitionOffset implements common.PartitionStorer -func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, offset int64) error { - return d.partitions.UpsertPartitionOffset(topic, partition, offset) -} - // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. func (d *SyncServerDatabase) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { return d.syncStreamPositionTx(ctx, nil)