Merge branch 'master' into matthew/peeking

This commit is contained in:
Matthew Hodgson 2020-09-01 19:11:51 +03:00
commit d0d5f70105
15 changed files with 32 additions and 26 deletions

View file

@ -48,6 +48,7 @@ func NewOutputRoomEventConsumer(
workerStates []types.ApplicationServiceWorkerState,
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
ComponentName: "appservice/roomserver",
Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent),
Consumer: kafkaConsumer,
PartitionStore: appserviceDB,

View file

@ -36,6 +36,7 @@ type OutputRoomEventConsumer struct {
func NewOutputRoomEventConsumer(topicName string, kafkaConsumer sarama.Consumer, store storage.Database, acls *acls.ServerACLs) *OutputRoomEventConsumer {
consumer := &internal.ContinualConsumer{
ComponentName: "currentstateserver/roomserver",
Topic: topicName,
Consumer: kafkaConsumer,
PartitionStore: store,

View file

@ -83,7 +83,6 @@ const selectKnownUsersSQL = "" +
type currentRoomStateStatements struct {
db *sql.DB
writer sqlutil.Writer
upsertRoomStateStmt *sql.Stmt
deleteRoomStateByEventIDStmt *sql.Stmt
selectRoomIDsWithMembershipStmt *sql.Stmt
@ -95,8 +94,7 @@ type currentRoomStateStatements struct {
func NewSqliteCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) {
s := &currentRoomStateStatements{
db: db,
writer: sqlutil.NewExclusiveWriter(),
db: db,
}
_, err := db.Exec(currentRoomStateSchema)
if err != nil {
@ -177,11 +175,9 @@ func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
func (s *currentRoomStateStatements) DeleteRoomStateByEventID(
ctx context.Context, txn *sql.Tx, eventID string,
) error {
return s.writer.Do(s.db, txn, func(txn *sql.Tx) error {
stmt := sqlutil.TxStmt(txn, s.deleteRoomStateByEventIDStmt)
_, err := stmt.ExecContext(ctx, eventID)
return err
})
stmt := sqlutil.TxStmt(txn, s.deleteRoomStateByEventIDStmt)
_, err := stmt.ExecContext(ctx, eventID)
return err
}
func (s *currentRoomStateStatements) UpsertRoomState(
@ -194,20 +190,18 @@ func (s *currentRoomStateStatements) UpsertRoomState(
}
// upsert state event
return s.writer.Do(s.db, txn, func(txn *sql.Tx) error {
stmt := sqlutil.TxStmt(txn, s.upsertRoomStateStmt)
_, err = stmt.ExecContext(
ctx,
event.RoomID(),
event.EventID(),
event.Type(),
event.Sender(),
*event.StateKey(),
headeredJSON,
contentVal,
)
return err
})
stmt := sqlutil.TxStmt(txn, s.upsertRoomStateStmt)
_, err = stmt.ExecContext(
ctx,
event.RoomID(),
event.EventID(),
event.Type(),
event.Sender(),
*event.StateKey(),
headeredJSON,
contentVal,
)
return err
}
func (s *currentRoomStateStatements) SelectEventsWithEventIDs(

View file

@ -50,11 +50,13 @@ func NewOutputEDUConsumer(
) *OutputEDUConsumer {
c := &OutputEDUConsumer{
typingConsumer: &internal.ContinualConsumer{
ComponentName: "eduserver/typing",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
},
sendToDeviceConsumer: &internal.ContinualConsumer{
ComponentName: "eduserver/sendtodevice",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,

View file

@ -49,6 +49,7 @@ func NewKeyChangeConsumer(
) *KeyChangeConsumer {
c := &KeyChangeConsumer{
consumer: &internal.ContinualConsumer{
ComponentName: "federationsender/keychange",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,

View file

@ -48,6 +48,7 @@ func NewOutputRoomEventConsumer(
rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
ComponentName: "federationsender/roomserver",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,

View file

@ -33,6 +33,7 @@ type PartitionStorer interface {
// A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to
// remember the offset it reached.
type ContinualConsumer struct {
ComponentName string
// The kafkaesque topic to consume events from.
// This is the name used in kafka to identify the stream to consume events from.
Topic string
@ -111,7 +112,7 @@ func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) {
msgErr := c.ProcessMessage(message)
// Advance our position in the stream so that we will start at the right position after a restart.
if err := c.PartitionStore.SetPartitionOffset(context.TODO(), c.Topic, message.Partition, message.Offset); err != nil {
panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %w", err))
panic(fmt.Errorf("the ContinualConsumer in %q failed to SetPartitionOffset: %w", c.ComponentName, err))
}
// Shutdown if we were told to do so.
if msgErr == ErrShutdown {

View file

@ -122,7 +122,7 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches)
d.Database = shared.Database{
DB: d.db,
Cache: cache,
Writer: sqlutil.NewExclusiveWriter(),
Writer: d.writer,
EventsTable: d.events,
EventTypesTable: d.eventTypes,
EventStateKeysTable: d.eventStateKeys,

View file

@ -44,6 +44,7 @@ func NewOutputClientDataConsumer(
) *OutputClientDataConsumer {
consumer := internal.ContinualConsumer{
ComponentName: "syncapi/clientapi",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData)),
Consumer: kafkaConsumer,
PartitionStore: store,

View file

@ -48,6 +48,7 @@ func NewOutputSendToDeviceEventConsumer(
) *OutputSendToDeviceEventConsumer {
consumer := internal.ContinualConsumer{
ComponentName: "syncapi/eduserver/sendtodevice",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,

View file

@ -44,6 +44,7 @@ func NewOutputTypingEventConsumer(
) *OutputTypingEventConsumer {
consumer := internal.ContinualConsumer{
ComponentName: "syncapi/eduserver/typing",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,

View file

@ -56,6 +56,7 @@ func NewOutputKeyChangeEventConsumer(
) *OutputKeyChangeEventConsumer {
consumer := internal.ContinualConsumer{
ComponentName: "syncapi/keychange",
Topic: topic,
Consumer: kafkaConsumer,
PartitionStore: store,

View file

@ -49,6 +49,7 @@ func NewOutputRoomEventConsumer(
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
ComponentName: "syncapi/roomserver",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,

View file

@ -80,7 +80,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
}
d.Database = shared.Database{
DB: d.db,
Writer: sqlutil.NewDummyWriter(),
Writer: d.writer,
Invites: invites,
AccountData: accountData,
OutputEvents: events,

View file

@ -97,7 +97,7 @@ func (d *SyncServerDatasource) prepare() (err error) {
}
d.Database = shared.Database{
DB: d.db,
Writer: sqlutil.NewExclusiveWriter(),
Writer: d.writer,
Invites: invites,
Peeks: peeks,
AccountData: accountData,