Jiggle about sync a bit

This commit is contained in:
Neil Alexander 2020-05-29 13:43:37 +01:00
parent 7a073b14fc
commit fc1303bbbe
5 changed files with 46 additions and 35 deletions

View file

@ -93,7 +93,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
"event_type": output.Type, "event_type": output.Type,
}).Info("sync API received send-to-device event from EDU server") }).Info("sync API received send-to-device event from EDU server")
newPos, err := s.db.StoreNewSendForDeviceMessage( _, err = s.db.StoreNewSendForDeviceMessage(
context.TODO(), output.UserID, output.DeviceID, output.SendToDeviceEvent, context.TODO(), output.UserID, output.DeviceID, output.SendToDeviceEvent,
) )
if err != nil { if err != nil {
@ -104,7 +104,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
s.notifier.OnNewSendToDevice( s.notifier.OnNewSendToDevice(
output.UserID, output.UserID,
[]string{output.DeviceID}, // TODO: support wildcard here as per spec []string{output.DeviceID}, // TODO: support wildcard here as per spec
types.NewStreamToken(0, newPos), types.NewStreamToken(0, 1),
) )
return nil return nil

View file

@ -56,9 +56,9 @@ type Database interface {
// transaction IDs associated with the given device. These transaction IDs come // transaction IDs associated with the given device. These transaction IDs come
// from when the device sent the event via an API that included a transaction // from when the device sent the event via an API that included a transaction
// ID. // ID.
IncrementalSync(ctx context.Context, device authtypes.Device, fromPos, toPos types.StreamingToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error) IncrementalSync(ctx context.Context, res *types.Response, device authtypes.Device, fromPos, toPos types.StreamingToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error)
// CompleteSync returns a complete /sync API response for the given user. // CompleteSync returns a complete /sync API response for the given user.
CompleteSync(ctx context.Context, device authtypes.Device, numRecentEventsPerRoom int) (*types.Response, error) CompleteSync(ctx context.Context, res *types.Response, device authtypes.Device, numRecentEventsPerRoom int) (*types.Response, error)
// GetAccountDataInRange returns all account data for a given user inserted or // GetAccountDataInRange returns all account data for a given user inserted or
// updated between two given positions // updated between two given positions
// Returns a map following the format data[roomID] = []dataTypes // Returns a map following the format data[roomID] = []dataTypes

View file

@ -530,14 +530,14 @@ func (d *Database) addEDUDeltaToResponse(
} }
func (d *Database) IncrementalSync( func (d *Database) IncrementalSync(
ctx context.Context, ctx context.Context, res *types.Response,
device authtypes.Device, device authtypes.Device,
fromPos, toPos types.StreamingToken, fromPos, toPos types.StreamingToken,
numRecentEventsPerRoom int, numRecentEventsPerRoom int,
wantFullState bool, wantFullState bool,
) (*types.Response, error) { ) (*types.Response, error) {
nextBatchPos := fromPos.WithUpdates(toPos) nextBatchPos := fromPos.WithUpdates(toPos)
res := types.NewResponse(nextBatchPos) res.NextBatch = nextBatchPos.String()
var joinedRoomIDs []string var joinedRoomIDs []string
var err error var err error
@ -571,11 +571,10 @@ func (d *Database) IncrementalSync(
// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed // getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed
// to it. It returns toPos and joinedRoomIDs for use of adding EDUs. // to it. It returns toPos and joinedRoomIDs for use of adding EDUs.
func (d *Database) getResponseWithPDUsForCompleteSync( func (d *Database) getResponseWithPDUsForCompleteSync(
ctx context.Context, ctx context.Context, res *types.Response,
userID string, userID string,
numRecentEventsPerRoom int, numRecentEventsPerRoom int,
) ( ) (
res *types.Response,
toPos types.StreamingToken, toPos types.StreamingToken,
joinedRoomIDs []string, joinedRoomIDs []string,
err error, err error,
@ -606,7 +605,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
To: toPos.PDUPosition(), To: toPos.PDUPosition(),
} }
res = types.NewResponse(toPos) res.NextBatch = toPos.String()
// Extract room state and recent events for all rooms the user is joined to. // Extract room state and recent events for all rooms the user is joined to.
joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
@ -664,14 +663,15 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
} }
succeeded = true succeeded = true
return res, toPos, joinedRoomIDs, err return //res, toPos, joinedRoomIDs, err
} }
func (d *Database) CompleteSync( func (d *Database) CompleteSync(
ctx context.Context, device authtypes.Device, numRecentEventsPerRoom int, ctx context.Context, res *types.Response,
device authtypes.Device, numRecentEventsPerRoom int,
) (*types.Response, error) { ) (*types.Response, error) {
res, toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync( toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync(
ctx, device.UserID, numRecentEventsPerRoom, ctx, res, device.UserID, numRecentEventsPerRoom,
) )
if err != nil { if err != nil {
return nil, err return nil, err
@ -1063,21 +1063,27 @@ func (d *Database) SendToDeviceUpdatesForSync(
ctx context.Context, ctx context.Context,
userID, deviceID string, userID, deviceID string,
token types.StreamingToken, token types.StreamingToken,
) (events []types.SendToDeviceEvent, err error) { ) (toReturn []types.SendToDeviceEvent, err error) {
// First of all, get our send-to-device updates for this user. // First of all, get our send-to-device updates for this user.
events, err = d.SendToDevice.SelectSendToDeviceMessages(ctx, nil, userID, deviceID) events, err := d.SendToDevice.SelectSendToDeviceMessages(ctx, nil, userID, deviceID)
if err != nil { if err != nil {
return nil, fmt.Errorf("d.SendToDevice.SelectSendToDeviceMessages: %w", err) return nil, fmt.Errorf("d.SendToDevice.SelectSendToDeviceMessages: %w", err)
} }
// If there's nothing to do then stop here.
if len(events) == 0 {
return nil, nil
}
// Work out whether we need to update any of the database entries. // Work out whether we need to update any of the database entries.
toUpdate := []types.SendToDeviceNID{} toUpdate := []types.SendToDeviceNID{}
toDelete := []types.SendToDeviceNID{} toDelete := []types.SendToDeviceNID{}
for pos, event := range events { for _, event := range events {
if event.SentByToken == nil { if event.SentByToken == nil {
// If the event has no sent-by token yet then we haven't attempted to send // If the event has no sent-by token yet then we haven't attempted to send
// it. Record the current requested sync token in the database. // it. Record the current requested sync token in the database.
toUpdate = append(toUpdate, event.ID) toUpdate = append(toUpdate, event.ID)
toReturn = append(toReturn, event)
event.SentByToken = &token event.SentByToken = &token
} else if token.IsAfter(*event.SentByToken) { } else if token.IsAfter(*event.SentByToken) {
// The event had a sync token, therefore we've sent it before. The current // The event had a sync token, therefore we've sent it before. The current
@ -1085,7 +1091,10 @@ func (d *Database) SendToDeviceUpdatesForSync(
// successfully completed the previous sync (it would re-request it otherwise) // successfully completed the previous sync (it would re-request it otherwise)
// so we can remove the entry from the database. // so we can remove the entry from the database.
toDelete = append(toDelete, event.ID) toDelete = append(toDelete, event.ID)
events = append(events[:pos], events[pos+1:]...) } else {
// It looks like the sync is being re-requested, maybe it timed out or
// failed. Re-send any that should have been acknowledged by now.
toReturn = append(toReturn, event)
} }
} }

View file

@ -136,11 +136,21 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
} }
func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (res *types.Response, err error) { func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (res *types.Response, err error) {
res = types.NewResponse()
res, err = rp.appendSendToDeviceMessages(res, req.device.UserID, req, latestPos)
if err != nil {
return
}
if len(res.ToDevice.Events) > 0 {
return
}
// TODO: handle ignored users // TODO: handle ignored users
if req.since == nil { if req.since == nil {
res, err = rp.db.CompleteSync(req.ctx, req.device, req.limit) res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit)
} else { } else {
res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit, req.wantFullState) res, err = rp.db.IncrementalSync(req.ctx, res, req.device, *req.since, latestPos, req.limit, req.wantFullState)
} }
if err != nil { if err != nil {
@ -149,11 +159,6 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition(), &accountDataFilter) res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition(), &accountDataFilter)
if err != nil {
return
}
res, err = rp.appendSendToDeviceMessages(res, req.device.UserID, req, latestPos)
return return
} }
@ -245,11 +250,6 @@ func (rp *RequestPool) appendAccountData(
func (rp *RequestPool) appendSendToDeviceMessages( func (rp *RequestPool) appendSendToDeviceMessages(
data *types.Response, userID string, req syncRequest, currentPos types.StreamingToken, data *types.Response, userID string, req syncRequest, currentPos types.StreamingToken,
) (*types.Response, error) { ) (*types.Response, error) {
nextPos, err := types.NewStreamTokenFromString(data.NextBatch)
if err != nil {
return nil, err
}
events, err := rp.db.SendToDeviceUpdatesForSync( events, err := rp.db.SendToDeviceUpdatesForSync(
context.TODO(), context.TODO(),
userID, userID,
@ -262,9 +262,13 @@ func (rp *RequestPool) appendSendToDeviceMessages(
for _, event := range events { for _, event := range events {
data.ToDevice.Events = append(data.ToDevice.Events, event.SendToDeviceEvent) data.ToDevice.Events = append(data.ToDevice.Events, event.SendToDeviceEvent)
nextPos.Positions[1]++
} }
data.NextBatch = nextPos.String()
if len(data.ToDevice.Events) > 0 {
currentPos.Positions[1]++
data.NextBatch = currentPos.String()
}
return data, nil return data, nil
} }

View file

@ -302,10 +302,8 @@ type Response struct {
} }
// NewResponse creates an empty response with initialised maps. // NewResponse creates an empty response with initialised maps.
func NewResponse(token StreamingToken) *Response { func NewResponse() *Response {
res := Response{ res := Response{}
NextBatch: token.String(),
}
// Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section, // Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section,
// so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors. // so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.
res.Rooms.Join = make(map[string]JoinResponse) res.Rooms.Join = make(map[string]JoinResponse)