Try to line up sync tokens again

This commit is contained in:
Neil Alexander 2020-05-29 16:22:52 +01:00
parent 2ef53ff726
commit 43a83d375e
6 changed files with 18 additions and 8 deletions

View file

@ -93,8 +93,10 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
"event_type": output.Type,
}).Info("sync API received send-to-device event from EDU server")
streamPos := s.db.AddSendToDevice()
_, err = s.db.StoreNewSendForDeviceMessage(
context.TODO(), output.UserID, output.DeviceID, output.SendToDeviceEvent,
context.TODO(), streamPos, output.UserID, output.DeviceID, output.SendToDeviceEvent,
)
if err != nil {
log.WithError(err).Errorf("failed to store send-to-device message")
@ -104,7 +106,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
s.notifier.OnNewSendToDevice(
output.UserID,
[]string{output.DeviceID}, // TODO: support wildcard here as per spec
types.NewStreamToken(0, 1),
types.NewStreamToken(0, streamPos),
)
return nil

View file

@ -104,12 +104,14 @@ type Database interface {
StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
SyncStreamPosition(ctx context.Context) (types.StreamPosition, error)
// AddSendToDevice increases the EDU position in the cache and returns the stream position.
AddSendToDevice() types.StreamPosition
// SendToDeviceUpdatesForSync returns a list of send-to-device updates, after having completed
// updates and deletions for previous events. The sync token should be supplied to this function so
// that we can clean up old events properly.
SendToDeviceUpdatesForSync(ctx context.Context, userID, deviceID string, token types.StreamingToken) ([]types.SendToDeviceEvent, []types.SendToDeviceNID, []types.SendToDeviceNID, error)
// StoreNewSendForDeviceMessage stores a new send-to-device event for a user's device.
StoreNewSendForDeviceMessage(ctx context.Context, userID, deviceID string, event gomatrixserverlib.SendToDeviceEvent) (types.StreamPosition, error)
StoreNewSendForDeviceMessage(ctx context.Context, streamPos types.StreamPosition, userID, deviceID string, event gomatrixserverlib.SendToDeviceEvent) (types.StreamPosition, error)
// CleanSendToDeviceUpdates will update or remove any send-to-device updates based on the given sync.
CleanSendToDeviceUpdates(ctx context.Context, toUpdate, toDelete []types.SendToDeviceNID, token types.StreamingToken) (err error)
}

View file

@ -53,6 +53,7 @@ const selectSendToDeviceMessagesSQL = `
SELECT id, user_id, device_id, content, sent_by_token
FROM syncapi_send_to_device
WHERE user_id = $1 AND device_id = $2
ORDER BY id DESC
`
const updateSentSendToDeviceMessagesSQL = `

View file

@ -91,6 +91,10 @@ func (d *Database) RemoveTypingUser(
return types.StreamPosition(d.EDUCache.RemoveUser(userID, roomID))
}
func (d *Database) AddSendToDevice() types.StreamPosition {
return types.StreamPosition(d.EDUCache.AddSendToDeviceMessage())
}
func (d *Database) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) {
d.EDUCache.SetTimeoutCallback(fn)
}
@ -1041,11 +1045,11 @@ func (d *Database) AddSendToDeviceEvent(
}
func (d *Database) StoreNewSendForDeviceMessage(
ctx context.Context, userID, deviceID string, event gomatrixserverlib.SendToDeviceEvent,
ctx context.Context, streamPos types.StreamPosition, userID, deviceID string, event gomatrixserverlib.SendToDeviceEvent,
) (types.StreamPosition, error) {
j, err := json.Marshal(event)
if err != nil {
return 0, err
return streamPos, err
}
// Delegate the database write task to the SendToDeviceWriter. It'll guarantee
// that we don't lock the table for writes in more than one place.
@ -1055,9 +1059,9 @@ func (d *Database) StoreNewSendForDeviceMessage(
)
})
if err != nil {
return 0, err
return streamPos, err
}
return types.StreamPosition(d.EDUCache.AddSendToDeviceMessage()), nil
return streamPos, nil
}
func (d *Database) SendToDeviceUpdatesForSync(

View file

@ -51,6 +51,7 @@ const selectSendToDeviceMessagesSQL = `
SELECT id, user_id, device_id, content, sent_by_token
FROM syncapi_send_to_device
WHERE user_id = $1 AND device_id = $2
ORDER BY id DESC
`
const updateSentSendToDeviceMessagesSQL = `

View file

@ -537,7 +537,7 @@ func TestSendToDeviceBehaviour(t *testing.T) {
}
// Try sending a message.
streamPos, err := db.StoreNewSendForDeviceMessage(ctx, "alice", "one", gomatrixserverlib.SendToDeviceEvent{
streamPos, err := db.StoreNewSendForDeviceMessage(ctx, types.StreamPosition(0), "alice", "one", gomatrixserverlib.SendToDeviceEvent{
Sender: "bob",
Type: "m.type",
Content: json.RawMessage("{}"),