diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index daaaf06c8..2e9176745 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -93,7 +93,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) "event_type": output.Type, }).Info("sync API received send-to-device event from EDU server") - newPos, err := s.db.StoreNewSendForDeviceMessage( + _, err = s.db.StoreNewSendForDeviceMessage( context.TODO(), output.UserID, output.DeviceID, output.SendToDeviceEvent, ) if err != nil { @@ -104,7 +104,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) s.notifier.OnNewSendToDevice( output.UserID, []string{output.DeviceID}, // TODO: support wildcard here as per spec - types.NewStreamToken(0, newPos), + types.NewStreamToken(0, 1), ) return nil diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 51bcd5e9b..7378d3bd9 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -56,9 +56,9 @@ type Database interface { // transaction IDs associated with the given device. These transaction IDs come // from when the device sent the event via an API that included a transaction // ID. - IncrementalSync(ctx context.Context, device authtypes.Device, fromPos, toPos types.StreamingToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error) + IncrementalSync(ctx context.Context, res *types.Response, device authtypes.Device, fromPos, toPos types.StreamingToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error) // CompleteSync returns a complete /sync API response for the given user. - CompleteSync(ctx context.Context, device authtypes.Device, numRecentEventsPerRoom int) (*types.Response, error) + CompleteSync(ctx context.Context, res *types.Response, device authtypes.Device, numRecentEventsPerRoom int) (*types.Response, error) // GetAccountDataInRange returns all account data for a given user inserted or // updated between two given positions // Returns a map following the format data[roomID] = []dataTypes diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 44c0fbca5..ea5b2ad24 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -530,14 +530,14 @@ func (d *Database) addEDUDeltaToResponse( } func (d *Database) IncrementalSync( - ctx context.Context, + ctx context.Context, res *types.Response, device authtypes.Device, fromPos, toPos types.StreamingToken, numRecentEventsPerRoom int, wantFullState bool, ) (*types.Response, error) { nextBatchPos := fromPos.WithUpdates(toPos) - res := types.NewResponse(nextBatchPos) + res.NextBatch = nextBatchPos.String() var joinedRoomIDs []string var err error @@ -571,11 +571,10 @@ func (d *Database) IncrementalSync( // getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed // to it. It returns toPos and joinedRoomIDs for use of adding EDUs. func (d *Database) getResponseWithPDUsForCompleteSync( - ctx context.Context, + ctx context.Context, res *types.Response, userID string, numRecentEventsPerRoom int, ) ( - res *types.Response, toPos types.StreamingToken, joinedRoomIDs []string, err error, @@ -606,7 +605,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync( To: toPos.PDUPosition(), } - res = types.NewResponse(toPos) + res.NextBatch = toPos.String() // Extract room state and recent events for all rooms the user is joined to. joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) @@ -664,14 +663,15 @@ func (d *Database) getResponseWithPDUsForCompleteSync( } succeeded = true - return res, toPos, joinedRoomIDs, err + return //res, toPos, joinedRoomIDs, err } func (d *Database) CompleteSync( - ctx context.Context, device authtypes.Device, numRecentEventsPerRoom int, + ctx context.Context, res *types.Response, + device authtypes.Device, numRecentEventsPerRoom int, ) (*types.Response, error) { - res, toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync( - ctx, device.UserID, numRecentEventsPerRoom, + toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync( + ctx, res, device.UserID, numRecentEventsPerRoom, ) if err != nil { return nil, err @@ -1063,21 +1063,27 @@ func (d *Database) SendToDeviceUpdatesForSync( ctx context.Context, userID, deviceID string, token types.StreamingToken, -) (events []types.SendToDeviceEvent, err error) { +) (toReturn []types.SendToDeviceEvent, err error) { // First of all, get our send-to-device updates for this user. - events, err = d.SendToDevice.SelectSendToDeviceMessages(ctx, nil, userID, deviceID) + events, err := d.SendToDevice.SelectSendToDeviceMessages(ctx, nil, userID, deviceID) if err != nil { return nil, fmt.Errorf("d.SendToDevice.SelectSendToDeviceMessages: %w", err) } + // If there's nothing to do then stop here. + if len(events) == 0 { + return nil, nil + } + // Work out whether we need to update any of the database entries. toUpdate := []types.SendToDeviceNID{} toDelete := []types.SendToDeviceNID{} - for pos, event := range events { + for _, event := range events { if event.SentByToken == nil { // If the event has no sent-by token yet then we haven't attempted to send // it. Record the current requested sync token in the database. toUpdate = append(toUpdate, event.ID) + toReturn = append(toReturn, event) event.SentByToken = &token } else if token.IsAfter(*event.SentByToken) { // The event had a sync token, therefore we've sent it before. The current @@ -1085,7 +1091,10 @@ func (d *Database) SendToDeviceUpdatesForSync( // successfully completed the previous sync (it would re-request it otherwise) // so we can remove the entry from the database. toDelete = append(toDelete, event.ID) - events = append(events[:pos], events[pos+1:]...) + } else { + // It looks like the sync is being re-requested, maybe it timed out or + // failed. Re-send any that should have been acknowledged by now. + toReturn = append(toReturn, event) } } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 9952825ed..bf5dcd896 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -136,11 +136,21 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype } func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (res *types.Response, err error) { + res = types.NewResponse() + + res, err = rp.appendSendToDeviceMessages(res, req.device.UserID, req, latestPos) + if err != nil { + return + } + if len(res.ToDevice.Events) > 0 { + return + } + // TODO: handle ignored users if req.since == nil { - res, err = rp.db.CompleteSync(req.ctx, req.device, req.limit) + res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit) } else { - res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit, req.wantFullState) + res, err = rp.db.IncrementalSync(req.ctx, res, req.device, *req.since, latestPos, req.limit, req.wantFullState) } if err != nil { @@ -149,11 +159,6 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition(), &accountDataFilter) - if err != nil { - return - } - - res, err = rp.appendSendToDeviceMessages(res, req.device.UserID, req, latestPos) return } @@ -245,11 +250,6 @@ func (rp *RequestPool) appendAccountData( func (rp *RequestPool) appendSendToDeviceMessages( data *types.Response, userID string, req syncRequest, currentPos types.StreamingToken, ) (*types.Response, error) { - nextPos, err := types.NewStreamTokenFromString(data.NextBatch) - if err != nil { - return nil, err - } - events, err := rp.db.SendToDeviceUpdatesForSync( context.TODO(), userID, @@ -262,9 +262,13 @@ func (rp *RequestPool) appendSendToDeviceMessages( for _, event := range events { data.ToDevice.Events = append(data.ToDevice.Events, event.SendToDeviceEvent) - nextPos.Positions[1]++ } - data.NextBatch = nextPos.String() + + if len(data.ToDevice.Events) > 0 { + currentPos.Positions[1]++ + data.NextBatch = currentPos.String() + } + return data, nil } diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 2c8082c45..c1f09fba5 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -302,10 +302,8 @@ type Response struct { } // NewResponse creates an empty response with initialised maps. -func NewResponse(token StreamingToken) *Response { - res := Response{ - NextBatch: token.String(), - } +func NewResponse() *Response { + res := Response{} // Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section, // so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors. res.Rooms.Join = make(map[string]JoinResponse)