diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 3c9e52daf..560cd2373 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -48,6 +48,7 @@ func NewOutputRoomEventConsumer( workerStates []types.ApplicationServiceWorkerState, ) *OutputRoomEventConsumer { consumer := internal.ContinualConsumer{ + ComponentName: "appservice/roomserver", Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent), Consumer: kafkaConsumer, PartitionStore: appserviceDB, diff --git a/currentstateserver/consumers/roomserver.go b/currentstateserver/consumers/roomserver.go index 23495b24f..cb054481c 100644 --- a/currentstateserver/consumers/roomserver.go +++ b/currentstateserver/consumers/roomserver.go @@ -36,6 +36,7 @@ type OutputRoomEventConsumer struct { func NewOutputRoomEventConsumer(topicName string, kafkaConsumer sarama.Consumer, store storage.Database, acls *acls.ServerACLs) *OutputRoomEventConsumer { consumer := &internal.ContinualConsumer{ + ComponentName: "currentstateserver/roomserver", Topic: topicName, Consumer: kafkaConsumer, PartitionStore: store, diff --git a/currentstateserver/storage/sqlite3/current_room_state_table.go b/currentstateserver/storage/sqlite3/current_room_state_table.go index c6cf40ede..1985bf422 100644 --- a/currentstateserver/storage/sqlite3/current_room_state_table.go +++ b/currentstateserver/storage/sqlite3/current_room_state_table.go @@ -83,7 +83,6 @@ const selectKnownUsersSQL = "" + type currentRoomStateStatements struct { db *sql.DB - writer sqlutil.Writer upsertRoomStateStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt selectRoomIDsWithMembershipStmt *sql.Stmt @@ -95,8 +94,7 @@ type currentRoomStateStatements struct { func NewSqliteCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) { s := ¤tRoomStateStatements{ - db: db, - writer: sqlutil.NewExclusiveWriter(), + db: db, } _, err := db.Exec(currentRoomStateSchema) if err != nil { @@ -177,11 +175,9 @@ func (s *currentRoomStateStatements) SelectRoomIDsWithMembership( func (s *currentRoomStateStatements) DeleteRoomStateByEventID( ctx context.Context, txn *sql.Tx, eventID string, ) error { - return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { - stmt := sqlutil.TxStmt(txn, s.deleteRoomStateByEventIDStmt) - _, err := stmt.ExecContext(ctx, eventID) - return err - }) + stmt := sqlutil.TxStmt(txn, s.deleteRoomStateByEventIDStmt) + _, err := stmt.ExecContext(ctx, eventID) + return err } func (s *currentRoomStateStatements) UpsertRoomState( @@ -194,20 +190,18 @@ func (s *currentRoomStateStatements) UpsertRoomState( } // upsert state event - return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { - stmt := sqlutil.TxStmt(txn, s.upsertRoomStateStmt) - _, err = stmt.ExecContext( - ctx, - event.RoomID(), - event.EventID(), - event.Type(), - event.Sender(), - *event.StateKey(), - headeredJSON, - contentVal, - ) - return err - }) + stmt := sqlutil.TxStmt(txn, s.upsertRoomStateStmt) + _, err = stmt.ExecContext( + ctx, + event.RoomID(), + event.EventID(), + event.Type(), + event.Sender(), + *event.StateKey(), + headeredJSON, + contentVal, + ) + return err } func (s *currentRoomStateStatements) SelectEventsWithEventIDs( diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go index e1a42f074..d9ac41b3b 100644 --- a/federationsender/consumers/eduserver.go +++ b/federationsender/consumers/eduserver.go @@ -50,11 +50,13 @@ func NewOutputEDUConsumer( ) *OutputEDUConsumer { c := &OutputEDUConsumer{ typingConsumer: &internal.ContinualConsumer{ + ComponentName: "eduserver/typing", Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)), Consumer: kafkaConsumer, PartitionStore: store, }, sendToDeviceConsumer: &internal.ContinualConsumer{ + ComponentName: "eduserver/sendtodevice", Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)), Consumer: kafkaConsumer, PartitionStore: store, diff --git a/federationsender/consumers/keychange.go b/federationsender/consumers/keychange.go index c1136f10c..4f206f5fb 100644 --- a/federationsender/consumers/keychange.go +++ b/federationsender/consumers/keychange.go @@ -49,6 +49,7 @@ func NewKeyChangeConsumer( ) *KeyChangeConsumer { c := &KeyChangeConsumer{ consumer: &internal.ContinualConsumer{ + ComponentName: "federationsender/keychange", Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), Consumer: kafkaConsumer, PartitionStore: store, diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 92b4d6f40..efeb53fa6 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -48,6 +48,7 @@ func NewOutputRoomEventConsumer( rsAPI api.RoomserverInternalAPI, ) *OutputRoomEventConsumer { consumer := internal.ContinualConsumer{ + ComponentName: "federationsender/roomserver", Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), Consumer: kafkaConsumer, PartitionStore: store, diff --git a/internal/consumers.go b/internal/consumers.go index c000c1719..807cf5899 100644 --- a/internal/consumers.go +++ b/internal/consumers.go @@ -33,6 +33,7 @@ type PartitionStorer interface { // A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to // remember the offset it reached. type ContinualConsumer struct { + ComponentName string // The kafkaesque topic to consume events from. // This is the name used in kafka to identify the stream to consume events from. Topic string @@ -111,7 +112,7 @@ func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) { msgErr := c.ProcessMessage(message) // Advance our position in the stream so that we will start at the right position after a restart. if err := c.PartitionStore.SetPartitionOffset(context.TODO(), c.Topic, message.Partition, message.Offset); err != nil { - panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %w", err)) + panic(fmt.Errorf("the ContinualConsumer in %q failed to SetPartitionOffset: %w", c.ComponentName, err)) } // Shutdown if we were told to do so. if msgErr == ErrShutdown { diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index 87dce6ad1..33782171e 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -122,7 +122,7 @@ func Open(dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) d.Database = shared.Database{ DB: d.db, Cache: cache, - Writer: sqlutil.NewExclusiveWriter(), + Writer: d.writer, EventsTable: d.events, EventTypesTable: d.eventTypes, EventStateKeysTable: d.eventStateKeys, diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index 6a1e590aa..d03dd2c46 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -44,6 +44,7 @@ func NewOutputClientDataConsumer( ) *OutputClientDataConsumer { consumer := internal.ContinualConsumer{ + ComponentName: "syncapi/clientapi", Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData)), Consumer: kafkaConsumer, PartitionStore: store, diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 90bfe3e5a..f880f3f20 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -48,6 +48,7 @@ func NewOutputSendToDeviceEventConsumer( ) *OutputSendToDeviceEventConsumer { consumer := internal.ContinualConsumer{ + ComponentName: "syncapi/eduserver/sendtodevice", Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)), Consumer: kafkaConsumer, PartitionStore: store, diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 523728cda..80d1d000b 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -44,6 +44,7 @@ func NewOutputTypingEventConsumer( ) *OutputTypingEventConsumer { consumer := internal.ContinualConsumer{ + ComponentName: "syncapi/eduserver/typing", Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)), Consumer: kafkaConsumer, PartitionStore: store, diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index ee95e09d3..93fa822df 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -56,6 +56,7 @@ func NewOutputKeyChangeEventConsumer( ) *OutputKeyChangeEventConsumer { consumer := internal.ContinualConsumer{ + ComponentName: "syncapi/keychange", Topic: topic, Consumer: kafkaConsumer, PartitionStore: store, diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index da656a4cf..b205e1e90 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -49,6 +49,7 @@ func NewOutputRoomEventConsumer( ) *OutputRoomEventConsumer { consumer := internal.ContinualConsumer{ + ComponentName: "syncapi/roomserver", Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), Consumer: kafkaConsumer, PartitionStore: store, diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 36e8de67f..e7f2c9440 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -80,7 +80,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e } d.Database = shared.Database{ DB: d.db, - Writer: sqlutil.NewDummyWriter(), + Writer: d.writer, Invites: invites, AccountData: accountData, OutputEvents: events, diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 5db9939a3..2cbab26e4 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -97,7 +97,7 @@ func (d *SyncServerDatasource) prepare() (err error) { } d.Database = shared.Database{ DB: d.db, - Writer: sqlutil.NewExclusiveWriter(), + Writer: d.writer, Invites: invites, Peeks: peeks, AccountData: accountData,