Add context to the partition offset table (#249)
This commit is contained in:
parent
e7cf2ae095
commit
a7773d3d3d
|
@ -30,7 +30,7 @@ import (
|
||||||
// Database represents an account database
|
// Database represents an account database
|
||||||
type Database struct {
|
type Database struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
partitions common.PartitionOffsetStatements
|
common.PartitionOffsetStatements
|
||||||
accounts accountsStatements
|
accounts accountsStatements
|
||||||
profiles profilesStatements
|
profiles profilesStatements
|
||||||
memberships membershipStatements
|
memberships membershipStatements
|
||||||
|
@ -127,16 +127,6 @@ func (d *Database) CreateAccount(
|
||||||
return d.accounts.insertAccount(ctx, localpart, hash)
|
return d.accounts.insertAccount(ctx, localpart, hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PartitionOffsets implements common.PartitionStorer
|
|
||||||
func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) {
|
|
||||||
return d.partitions.SelectPartitionOffsets(topic)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetPartitionOffset implements common.PartitionStorer
|
|
||||||
func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error {
|
|
||||||
return d.partitions.UpsertPartitionOffset(topic, partition, offset)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SaveMembership saves the user matching a given localpart as a member of a given
|
// SaveMembership saves the user matching a given localpart as a member of a given
|
||||||
// room. It also stores the ID of the membership event and a flag on whether the user
|
// room. It also stores the ID of the membership event and a flag on whether the user
|
||||||
// is still in the room.
|
// is still in the room.
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
package common
|
package common
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
|
@ -31,9 +32,9 @@ type PartitionOffset struct {
|
||||||
// A PartitionStorer has the storage APIs needed by the consumer.
|
// A PartitionStorer has the storage APIs needed by the consumer.
|
||||||
type PartitionStorer interface {
|
type PartitionStorer interface {
|
||||||
// PartitionOffsets returns the offsets the consumer has reached for each partition.
|
// PartitionOffsets returns the offsets the consumer has reached for each partition.
|
||||||
PartitionOffsets(topic string) ([]PartitionOffset, error)
|
PartitionOffsets(ctx context.Context, topic string) ([]PartitionOffset, error)
|
||||||
// SetPartitionOffset records where the consumer has reached for a partition.
|
// SetPartitionOffset records where the consumer has reached for a partition.
|
||||||
SetPartitionOffset(topic string, partition int32, offset int64) error
|
SetPartitionOffset(ctx context.Context, topic string, partition int32, offset int64) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to
|
// A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to
|
||||||
|
@ -75,7 +76,7 @@ func (c *ContinualConsumer) Start() error {
|
||||||
offsets[partition] = sarama.OffsetOldest
|
offsets[partition] = sarama.OffsetOldest
|
||||||
}
|
}
|
||||||
|
|
||||||
storedOffsets, err := c.PartitionStore.PartitionOffsets(c.Topic)
|
storedOffsets, err := c.PartitionStore.PartitionOffsets(context.TODO(), c.Topic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -110,7 +111,7 @@ func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) {
|
||||||
for message := range pc.Messages() {
|
for message := range pc.Messages() {
|
||||||
msgErr := c.ProcessMessage(message)
|
msgErr := c.ProcessMessage(message)
|
||||||
// Advance our position in the stream so that we will start at the right position after a restart.
|
// Advance our position in the stream so that we will start at the right position after a restart.
|
||||||
if err := c.PartitionStore.SetPartitionOffset(c.Topic, message.Partition, message.Offset); err != nil {
|
if err := c.PartitionStore.SetPartitionOffset(context.TODO(), c.Topic, message.Partition, message.Offset); err != nil {
|
||||||
panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %s", err))
|
panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %s", err))
|
||||||
}
|
}
|
||||||
// Shutdown if we were told to do so.
|
// Shutdown if we were told to do so.
|
||||||
|
|
|
@ -14,8 +14,11 @@
|
||||||
|
|
||||||
package common
|
package common
|
||||||
|
|
||||||
import "database/sql"
|
import (
|
||||||
import "strings"
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
const partitionOffsetsSchema = `
|
const partitionOffsetsSchema = `
|
||||||
-- The offsets that the server has processed up to.
|
-- The offsets that the server has processed up to.
|
||||||
|
@ -66,9 +69,25 @@ func (s *PartitionOffsetStatements) Prepare(db *sql.DB, prefix string) (err erro
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// SelectPartitionOffsets returns all the partition offsets for the given topic.
|
// PartitionOffsets implements PartitionStorer
|
||||||
func (s *PartitionOffsetStatements) SelectPartitionOffsets(topic string) ([]PartitionOffset, error) {
|
func (s *PartitionOffsetStatements) PartitionOffsets(
|
||||||
rows, err := s.selectPartitionOffsetsStmt.Query(topic)
|
ctx context.Context, topic string,
|
||||||
|
) ([]PartitionOffset, error) {
|
||||||
|
return s.selectPartitionOffsets(ctx, topic)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetPartitionOffset implements PartitionStorer
|
||||||
|
func (s *PartitionOffsetStatements) SetPartitionOffset(
|
||||||
|
ctx context.Context, topic string, partition int32, offset int64,
|
||||||
|
) error {
|
||||||
|
return s.upsertPartitionOffset(ctx, topic, partition, offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
// selectPartitionOffsets returns all the partition offsets for the given topic.
|
||||||
|
func (s *PartitionOffsetStatements) selectPartitionOffsets(
|
||||||
|
ctx context.Context, topic string,
|
||||||
|
) ([]PartitionOffset, error) {
|
||||||
|
rows, err := s.selectPartitionOffsetsStmt.QueryContext(ctx, topic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -85,7 +104,9 @@ func (s *PartitionOffsetStatements) SelectPartitionOffsets(topic string) ([]Part
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpsertPartitionOffset updates or inserts the partition offset for the given topic.
|
// UpsertPartitionOffset updates or inserts the partition offset for the given topic.
|
||||||
func (s *PartitionOffsetStatements) UpsertPartitionOffset(topic string, partition int32, offset int64) error {
|
func (s *PartitionOffsetStatements) upsertPartitionOffset(
|
||||||
_, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset)
|
ctx context.Context, topic string, partition int32, offset int64,
|
||||||
|
) error {
|
||||||
|
_, err := s.upsertPartitionOffsetStmt.ExecContext(ctx, topic, partition, offset)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,16 +61,6 @@ func (d *Database) prepare() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PartitionOffsets implements common.PartitionStorer
|
|
||||||
func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) {
|
|
||||||
return d.SelectPartitionOffsets(topic)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetPartitionOffset implements common.PartitionStorer
|
|
||||||
func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error {
|
|
||||||
return d.UpsertPartitionOffset(topic, partition, offset)
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateRoom updates the joined hosts for a room and returns what the joined
|
// UpdateRoom updates the joined hosts for a room and returns what the joined
|
||||||
// hosts were before the update.
|
// hosts were before the update.
|
||||||
func (d *Database) UpdateRoom(
|
func (d *Database) UpdateRoom(
|
||||||
|
|
|
@ -28,7 +28,7 @@ import (
|
||||||
// PublicRoomsServerDatabase represents a public rooms server database.
|
// PublicRoomsServerDatabase represents a public rooms server database.
|
||||||
type PublicRoomsServerDatabase struct {
|
type PublicRoomsServerDatabase struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
partitions common.PartitionOffsetStatements
|
common.PartitionOffsetStatements
|
||||||
statements publicRoomsStatements
|
statements publicRoomsStatements
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,16 +52,6 @@ func NewPublicRoomsServerDatabase(dataSourceName string) (*PublicRoomsServerData
|
||||||
return &PublicRoomsServerDatabase{db, partitions, statements}, nil
|
return &PublicRoomsServerDatabase{db, partitions, statements}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PartitionOffsets implements common.PartitionStorer
|
|
||||||
func (d *PublicRoomsServerDatabase) PartitionOffsets(topic string) ([]common.PartitionOffset, error) {
|
|
||||||
return d.partitions.SelectPartitionOffsets(topic)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetPartitionOffset implements common.PartitionStorer
|
|
||||||
func (d *PublicRoomsServerDatabase) SetPartitionOffset(topic string, partition int32, offset int64) error {
|
|
||||||
return d.partitions.UpsertPartitionOffset(topic, partition, offset)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetRoomVisibility returns the room visibility as a boolean: true if the room
|
// GetRoomVisibility returns the room visibility as a boolean: true if the room
|
||||||
// is publicly visible, false if not.
|
// is publicly visible, false if not.
|
||||||
// Returns an error if the retrieval failed.
|
// Returns an error if the retrieval failed.
|
||||||
|
|
|
@ -43,7 +43,7 @@ type streamEvent struct {
|
||||||
// SyncServerDatabase represents a sync server database
|
// SyncServerDatabase represents a sync server database
|
||||||
type SyncServerDatabase struct {
|
type SyncServerDatabase struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
partitions common.PartitionOffsetStatements
|
common.PartitionOffsetStatements
|
||||||
accountData accountDataStatements
|
accountData accountDataStatements
|
||||||
events outputRoomEventsStatements
|
events outputRoomEventsStatements
|
||||||
roomstate currentRoomStateStatements
|
roomstate currentRoomStateStatements
|
||||||
|
@ -57,7 +57,7 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
|
||||||
if d.db, err = sql.Open("postgres", dataSourceName); err != nil {
|
if d.db, err = sql.Open("postgres", dataSourceName); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err = d.partitions.Prepare(d.db, "syncapi"); err != nil {
|
if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err = d.accountData.prepare(d.db); err != nil {
|
if err = d.accountData.prepare(d.db); err != nil {
|
||||||
|
@ -162,16 +162,6 @@ func (d *SyncServerDatabase) GetStateEvent(
|
||||||
return d.roomstate.selectStateEvent(ctx, evType, roomID, stateKey)
|
return d.roomstate.selectStateEvent(ctx, evType, roomID, stateKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PartitionOffsets implements common.PartitionStorer
|
|
||||||
func (d *SyncServerDatabase) PartitionOffsets(topic string) ([]common.PartitionOffset, error) {
|
|
||||||
return d.partitions.SelectPartitionOffsets(topic)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetPartitionOffset implements common.PartitionStorer
|
|
||||||
func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, offset int64) error {
|
|
||||||
return d.partitions.UpsertPartitionOffset(topic, partition, offset)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
|
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
|
||||||
func (d *SyncServerDatabase) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) {
|
func (d *SyncServerDatabase) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) {
|
||||||
return d.syncStreamPositionTx(ctx, nil)
|
return d.syncStreamPositionTx(ctx, nil)
|
||||||
|
|
Loading…
Reference in a new issue