diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 2926cf275..83e399ac3 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -285,6 +285,8 @@ func Setup( }), ).Methods(http.MethodPut, http.MethodOptions) + // This is only here because sytest refers to /unstable for this endpoint + // rather than r0. It's an exact duplicate of the above handler. // TODO: Remove this if/when sytest is fixed! unstableMux.Handle("/sendToDevice/{eventType}/{txnID}", internal.MakeAuthAPI("send_to_device", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse { diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index add130aa8..6cebb2e91 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -176,6 +176,7 @@ func main() { cfg.Database.SyncAPI = "file:/idb/dendritejs_syncapi.db" cfg.Kafka.Topics.UserUpdates = "user_updates" cfg.Kafka.Topics.OutputTypingEvent = "output_typing_event" + cfg.Kafka.Topics.OutputSendToDeviceEvent = "output_send_to_device_event" cfg.Kafka.Topics.OutputClientData = "output_client_data" cfg.Kafka.Topics.OutputRoomEvent = "output_room_event" cfg.Matrix.TrustedIDServers = []string{ diff --git a/eduserver/api/output.go b/eduserver/api/output.go index c0c514d52..e6ded8413 100644 --- a/eduserver/api/output.go +++ b/eduserver/api/output.go @@ -41,9 +41,9 @@ type TypingEvent struct { Typing bool `json:"typing"` } -// OutputTypingEvent is an entry in typing server output kafka log. -// This contains the event with extra fields used to create 'm.typing' event -// in clientapi & federation. +// OutputSendToDeviceEvent is an entry in the send-to-device output kafka log. +// This contains the full event content, along with the user ID and device ID +// to which it is destined. type OutputSendToDeviceEvent struct { UserID string `json:"user_id"` DeviceID string `json:"device_id"` diff --git a/eduserver/cache/cache.go b/eduserver/cache/cache.go index f02216c59..dd535a6d2 100644 --- a/eduserver/cache/cache.go +++ b/eduserver/cache/cache.go @@ -115,7 +115,9 @@ func (t *EDUCache) AddTypingUser( // AddSendToDeviceMessage increases the sync position for // send-to-device updates. -// Returns the latest sync position for typing after update. +// Returns the sync position before update, as the caller +// will use this to record the current stream position +// at the time that the send-to-device message was sent. func (t *EDUCache) AddSendToDeviceMessage() int64 { t.Lock() defer t.Unlock() diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index 013bf6859..6f664eb67 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -38,7 +38,7 @@ func SetupEDUServerComponent( DeviceDB: deviceDB, Producer: base.KafkaProducer, OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent), - OutputSendToDeviceEventTopic: string(base.Cfg.Kafka.Topics.OutputSendToDeviceEventTopic), + OutputSendToDeviceEventTopic: string(base.Cfg.Kafka.Topics.OutputSendToDeviceEvent), ServerName: base.Cfg.Matrix.ServerName, } diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 0d5d0ce5d..74b4c0144 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -276,7 +276,11 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { for deviceID, message := range byUser { // TODO: check that the user and the device actually exist here if err := t.eduProducer.SendToDevice(t.context, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil { - util.GetLogger(t.context).WithError(err).Error("Failed to send send-to-device event to edu server") + util.GetLogger(t.context).WithError(err).WithFields(logrus.Fields{ + "sender": directPayload.Sender, + "user_id": userID, + "device_id": deviceID, + }).Error("Failed to send send-to-device event to edu server") } } } diff --git a/internal/config/config.go b/internal/config/config.go index 54f12c5b3..a20cc0ead 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -153,7 +153,7 @@ type Dendrite struct { // Topic for eduserver/api.OutputTypingEvent events. OutputTypingEvent Topic `yaml:"output_typing_event"` // Topic for eduserver/api.OutputSendToDeviceEvent events. - OutputSendToDeviceEventTopic Topic `yaml:"output_send_to_device_event"` + OutputSendToDeviceEvent Topic `yaml:"output_send_to_device_event"` // Topic for user updates (profile, presence) UserUpdates Topic `yaml:"user_updates"` } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index bffc9bd84..487018031 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -48,7 +48,7 @@ func NewOutputSendToDeviceEventConsumer( ) *OutputSendToDeviceEventConsumer { consumer := internal.ContinualConsumer{ - Topic: string(cfg.Kafka.Topics.OutputSendToDeviceEventTopic), + Topic: string(cfg.Kafka.Topics.OutputSendToDeviceEvent), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index f83aa7f97..b6d4d2df2 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -16,7 +16,6 @@ package storage import ( "context" - "database/sql" "time" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" @@ -56,9 +55,11 @@ type Database interface { // sync response for the given user. Events returned will include any client // transaction IDs associated with the given device. These transaction IDs come // from when the device sent the event via an API that included a transaction - // ID. + // ID. A response object must be provided for IncrementaSync to populate - it + // will not create one. IncrementalSync(ctx context.Context, res *types.Response, device authtypes.Device, fromPos, toPos types.StreamingToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error) - // CompleteSync returns a complete /sync API response for the given user. + // CompleteSync returns a complete /sync API response for the given user. A response object + // must be provided for CompleteSync to populate - it will not create one. CompleteSync(ctx context.Context, res *types.Response, device authtypes.Device, numRecentEventsPerRoom int) (*types.Response, error) // GetAccountDataInRange returns all account data for a given user inserted or // updated between two given positions @@ -107,14 +108,24 @@ type Database interface { SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) // AddSendToDevice increases the EDU position in the cache and returns the stream position. AddSendToDevice() types.StreamPosition - // SendToDeviceUpdatesForSync returns a list of send-to-device updates, after having completed - // updates and deletions for previous events. The sync token should be supplied to this function so - // that we can clean up old events properly. - SendToDeviceUpdatesForSync(ctx context.Context, userID, deviceID string, token types.StreamingToken) ([]types.SendToDeviceEvent, []types.SendToDeviceNID, []types.SendToDeviceNID, error) + // SendToDeviceUpdatesForSync returns a list of send-to-device updates. It returns three lists: + // - "events": a list of send-to-device events that should be included in the sync + // - "changes": a list of send-to-device events that should be updated in the database by + // CleanSendToDeviceUpdates + // - "defletions": a list of send-to-device events which have been confirmed as sent and + // can be deleted altogether by CleanSendToDeviceUpdates + // The token supplied should be the current requested sync token, e.g. from the "since" + // parameter. + SendToDeviceUpdatesForSync(ctx context.Context, userID, deviceID string, token types.StreamingToken) (events []types.SendToDeviceEvent, changes []types.SendToDeviceNID, deletions []types.SendToDeviceNID, err error) // StoreNewSendForDeviceMessage stores a new send-to-device event for a user's device. StoreNewSendForDeviceMessage(ctx context.Context, streamPos types.StreamPosition, userID, deviceID string, event gomatrixserverlib.SendToDeviceEvent) (types.StreamPosition, error) - // CleanSendToDeviceUpdates will update or remove any send-to-device updates based on the given sync. + // CleanSendToDeviceUpdates will update or remove any send-to-device updates based on the + // result to a previous call to SendDeviceUpdatesForSync. This is separate as it allows + // SendToDeviceUpdatesForSync to be called multiple times if needed (e.g. before and after + // starting to wait for an incremental sync with timeout). + // The token supplied should be the current requested sync token, e.g. from the "since" + // parameter. CleanSendToDeviceUpdates(ctx context.Context, toUpdate, toDelete []types.SendToDeviceNID, token types.StreamingToken) (err error) // SendToDeviceUpdatesWaiting returns true if there are send-to-device updates waiting to be sent. - SendToDeviceUpdatesWaiting(ctx context.Context, txn *sql.Tx, userID, deviceID string) (bool, error) + SendToDeviceUpdatesWaiting(ctx context.Context, userID, deviceID string) (bool, error) } diff --git a/syncapi/storage/postgres/send_to_device_table.go b/syncapi/storage/postgres/send_to_device_table.go index d6192f21e..335a05ef1 100644 --- a/syncapi/storage/postgres/send_to_device_table.go +++ b/syncapi/storage/postgres/send_to_device_table.go @@ -38,8 +38,9 @@ CREATE TABLE IF NOT EXISTS syncapi_send_to_device ( device_id TEXT NOT NULL, -- The event content JSON. content TEXT NOT NULL, - -- The sync token that was supplied when we tried to send the message, - -- or NULL if we haven't tried to send it yet. + -- The token that was supplied to the /sync at the time that this + -- message was included in a sync response, or NULL if we haven't + -- included it in a /sync response yet. sent_by_token TEXT ); ` diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 86747f874..d160f7f6c 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -1050,10 +1050,9 @@ func (d *Database) currentStateStreamEventsForRoom( } func (d *Database) SendToDeviceUpdatesWaiting( - ctx context.Context, txn *sql.Tx, - userID, deviceID string, + ctx context.Context, userID, deviceID string, ) (bool, error) { - count, err := d.SendToDevice.CountSendToDeviceMessages(ctx, txn, userID, deviceID) + count, err := d.SendToDevice.CountSendToDeviceMessages(ctx, nil, userID, deviceID) if err != nil { return false, err } diff --git a/syncapi/storage/sqlite3/send_to_device_table.go b/syncapi/storage/sqlite3/send_to_device_table.go index 211ac8004..0d03f23ef 100644 --- a/syncapi/storage/sqlite3/send_to_device_table.go +++ b/syncapi/storage/sqlite3/send_to_device_table.go @@ -36,8 +36,9 @@ CREATE TABLE IF NOT EXISTS syncapi_send_to_device ( device_id TEXT NOT NULL, -- The event content JSON. content TEXT NOT NULL, - -- The sync token that was supplied when we tried to send the message, - -- or NULL if we haven't tried to send it yet. + -- The token that was supplied to the /sync at the time that this + -- message was included in a sync response, or NULL if we haven't + -- included it in a /sync response yet. sent_by_token TEXT ); ` diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 579582e00..8b93cad45 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -68,16 +68,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype currPos := rp.notifier.CurrentPosition() - returnImmediately := shouldReturnImmediately(syncReq) - if !returnImmediately { - if waiting, werr := rp.db.SendToDeviceUpdatesWaiting( - context.TODO(), nil, device.UserID, device.ID, - ); werr == nil { - returnImmediately = waiting - } - } - - if returnImmediately { + if rp.shouldReturnImmediately(syncReq) { syncData, err = rp.currentSyncForUser(*syncReq, currPos) if err != nil { logger.WithError(err).Error("rp.currentSyncForUser failed") @@ -159,32 +150,6 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea return nil, err } - // Before we return the sync response, make sure that we take action on - // any send-to-device database updates or deletions that we need to do. - // Then add the updates into the sync response. - defer func() { - if len(updates) > 0 || len(deletions) > 0 { - // Handle the updates and deletions in the database. - err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, since) - if err != nil { - return - } - } - if len(events) > 0 { - // Add the updates into the sync response. - for _, event := range events { - res.ToDevice.Events = append(res.ToDevice.Events, event.SendToDeviceEvent) - } - - // Get the next_batch from the sync response and increase the - // EDU counter. - if pos, perr := types.NewStreamTokenFromString(res.NextBatch); perr == nil { - pos.Positions[1]++ - res.NextBatch = pos.String() - } - } - }() - // TODO: handle ignored users if req.since == nil { res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit) @@ -197,6 +162,34 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition(), &accountDataFilter) + if err != nil { + return + } + + // Before we return the sync response, make sure that we take action on + // any send-to-device database updates or deletions that we need to do. + // Then add the updates into the sync response. + if len(updates) > 0 || len(deletions) > 0 { + // Handle the updates and deletions in the database. + err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, since) + if err != nil { + return + } + } + if len(events) > 0 { + // Add the updates into the sync response. + for _, event := range events { + res.ToDevice.Events = append(res.ToDevice.Events, event.SendToDeviceEvent) + } + + // Get the next_batch from the sync response and increase the + // EDU counter. + if pos, perr := types.NewStreamTokenFromString(res.NextBatch); perr == nil { + pos.Positions[1]++ + res.NextBatch = pos.String() + } + } + return } @@ -288,6 +281,10 @@ func (rp *RequestPool) appendAccountData( // shouldReturnImmediately returns whether the /sync request is an initial sync, // or timeout=0, or full_state=true, in any of the cases the request should // return immediately. -func shouldReturnImmediately(syncReq *syncRequest) bool { - return syncReq.since == nil || syncReq.timeout == 0 || syncReq.wantFullState +func (rp *RequestPool) shouldReturnImmediately(syncReq *syncRequest) bool { + if syncReq.since == nil || syncReq.timeout == 0 || syncReq.wantFullState { + return true + } + waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.device.UserID, syncReq.device.ID) + return werr == nil && waiting }