diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index d95067921..6e774b5b4 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -105,8 +105,6 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) return err } - util.GetLogger(context.TODO()).Infof("Stored send-to-device message at position %d", streamPos) - s.stream.Advance(streamPos) s.notifier.OnNewSendToDevice( output.UserID, diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 8c6c5c120..a51ab4e0d 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -119,11 +119,13 @@ type Database interface { // added to the unsigned section of the output event. StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*gomatrixserverlib.HeaderedEvent // SendToDeviceUpdatesForSync returns a list of send-to-device updates. It returns the - // relevant events, and it automatically truncates old events once we advance past the - // stream position of the old send-to-device messages. + // relevant events within the given ranges for the supplied user ID and device ID. SendToDeviceUpdatesForSync(ctx context.Context, userID, deviceID string, from, to types.StreamPosition) (pos types.StreamPosition, events []types.SendToDeviceEvent, err error) // StoreNewSendForDeviceMessage stores a new send-to-device event for a user's device. StoreNewSendForDeviceMessage(ctx context.Context, userID, deviceID string, event gomatrixserverlib.SendToDeviceEvent) (types.StreamPosition, error) + // CleanSendToDeviceUpdates removes all send-to-device messages BEFORE the specified + // from position, preventing the send-to-device table from growing indefinitely. + CleanSendToDeviceUpdates(ctx context.Context, userID, deviceID string, before types.StreamPosition) (err error) // GetFilter looks up the filter associated with a given local user and filter ID. // Returns a filter structure. Otherwise returns an error if no such filter exists // or if there was an error talking to the database. diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 572d60ae3..5b06aabcd 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -907,21 +907,24 @@ func (d *Database) SendToDeviceUpdatesForSync( if err != nil { return from, nil, fmt.Errorf("d.SendToDevice.SelectSendToDeviceMessages: %w", err) } - // If there's nothing to do then stop here. if len(events) == 0 { return to, nil, nil } + return lastPos, events, nil +} - // If we've advanced past this stream position for this - // user+device combo then clean up behind. +func (d *Database) CleanSendToDeviceUpdates( + ctx context.Context, + userID, deviceID string, before types.StreamPosition, +) (err error) { if err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - return d.SendToDevice.DeleteSendToDeviceMessages(ctx, txn, userID, deviceID, from) + return d.SendToDevice.DeleteSendToDeviceMessages(ctx, txn, userID, deviceID, before) }); err != nil { logrus.WithError(err).Errorf("Failed to clean up old send-to-device messages for user %q device %q", userID, deviceID) + return err } - - return lastPos, events, nil + return nil } // getMembershipFromEvent returns the value of content.membership iff the event is a state event diff --git a/syncapi/streams/stream_sendtodevice.go b/syncapi/streams/stream_sendtodevice.go index 8ad7b456a..a3aaf3d7d 100644 --- a/syncapi/streams/stream_sendtodevice.go +++ b/syncapi/streams/stream_sendtodevice.go @@ -38,9 +38,18 @@ func (p *SendToDeviceStreamProvider) IncrementalSync( req.Log.WithError(err).Error("p.DB.SendToDeviceUpdatesForSync failed") return from } - // Add the updates into the sync response. - for _, event := range events { - req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent) + + if len(events) > 0 { + // Clean up old send-to-device messages from before this stream position. + if err := p.DB.CleanSendToDeviceUpdates(req.Context, req.Device.UserID, req.Device.ID, from); err != nil { + req.Log.WithError(err).Error("p.DB.CleanSendToDeviceUpdates failed") + return from + } + + // Add the updates into the sync response. + for _, event := range events { + req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent) + } } return lastPos