diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go index 53864b9d7..c6b23ac46 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go @@ -50,7 +50,7 @@ CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id); ` const selectEventsByApplicationServiceIDSQL = "" + - "SELECT event_id, origin_server_ts, room_id, type, sender, event_content FROM appservice_events " + + "SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content FROM appservice_events " + "WHERE as_id = $1 ORDER BY id ASC LIMIT $2" const countEventsByApplicationServiceIDSQL = "" + @@ -61,7 +61,7 @@ const insertEventSQL = "" + "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" const deleteEventsBeforeAndIncludingIDSQL = "" + - "DELETE FROM appservice_events WHERE event_id <= $1" + "DELETE FROM appservice_events WHERE id <= $1" type eventsStatements struct { selectEventsByApplicationServiceIDStmt *sql.Stmt @@ -93,19 +93,21 @@ func (s *eventsStatements) prepare(db *sql.DB) (err error) { } // selectEventsByApplicationServiceID takes in an application service ID and -// returns a slice of events that need to be sent to that application service. +// returns a slice of events that need to be sent to that application service, +// as well as an int later used to remove these same events from the database +// once successfully sent to an application service. func (s *eventsStatements) selectEventsByApplicationServiceID( ctx context.Context, applicationServiceID string, limit int, ) ( - eventIDs []string, + maxID int, events []gomatrixserverlib.ApplicationServiceEvent, err error, ) { eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit) if err != nil { - return nil, nil, err + return 0, nil, err } defer func() { err = eventRows.Close() @@ -119,7 +121,9 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( for eventRows.Next() { var event gomatrixserverlib.ApplicationServiceEvent var eventContent sql.NullString + var id int err = eventRows.Scan( + &id, &event.EventID, &event.OriginServerTimestamp, &event.RoomID, @@ -128,12 +132,14 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( &eventContent, ) if err != nil { - return nil, nil, err + return 0, nil, err } if eventContent.Valid { event.Content = gomatrixserverlib.RawJSON(eventContent.String) } - eventIDs = append(eventIDs, event.EventID) + if id > maxID { + maxID = id + } // Get age of the event from original timestamp and current time ageMilli := time.Now().UnixNano() / int64(time.Millisecond) @@ -187,8 +193,8 @@ func (s *eventsStatements) insertEvent( // deleteEventsBeforeAndIncludingID removes events matching given IDs from the database. func (s *eventsStatements) deleteEventsBeforeAndIncludingID( ctx context.Context, - eventID string, + eventTableID int, ) (err error) { - _, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, eventID) + _, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, eventTableID) return err } diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/storage.go b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go index 12b8f0016..6046f1e51 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go @@ -67,7 +67,7 @@ func (d *Database) GetEventsWithAppServiceID( ctx context.Context, appServiceID string, limit int, -) ([]string, []gomatrixserverlib.ApplicationServiceEvent, error) { +) (int, []gomatrixserverlib.ApplicationServiceEvent, error) { return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) } @@ -80,13 +80,14 @@ func (d *Database) CountEventsWithAppServiceID( return d.events.countEventsByApplicationServiceID(ctx, appServiceID) } -// RemoveEventsBeforeAndIncludingID removes events from the database given a slice of their -// event IDs. +// RemoveEventsBeforeAndIncludingID removes all events from the database that +// are less than or equal to a given maximum ID. IDs here are implemented as a +// serial, thus this should always delete events in chronological order. func (d *Database) RemoveEventsBeforeAndIncludingID( ctx context.Context, - eventID string, + eventTableID int, ) error { - return d.events.deleteEventsBeforeAndIncludingID(ctx, eventID) + return d.events.deleteEventsBeforeAndIncludingID(ctx, eventTableID) } // GetTxnIDWithAppServiceID takes in an application service ID and returns the diff --git a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go index 671824a1c..5a511c43c 100644 --- a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go +++ b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go @@ -101,7 +101,7 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { ws.EventsReady = false ws.Cond.L.Unlock() - eventIDs, events, err := db.GetEventsWithAppServiceID(ctx, ws.AppService.ID, transactionBatchSize) + maxID, events, err := db.GetEventsWithAppServiceID(ctx, ws.AppService.ID, transactionBatchSize) if err != nil { log.WithError(err).Errorf("appservice %s worker unable to read queued events from DB", ws.AppService.ID) @@ -132,7 +132,7 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { ws.Backoff = 0 // Remove sent events from the DB - err = db.RemoveEventsBeforeAndIncludingID(ctx, eventIDs[len(eventIDs)-1]) + err = db.RemoveEventsBeforeAndIncludingID(ctx, maxID) if err != nil { log.WithError(err).Fatalf("unable to remove appservice events from the database for %s", ws.AppService.ID)