mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-20 05:13:11 -06:00
Add some logic for send-to-device messages, add them into sync stream
This commit is contained in:
parent
8b7e81b423
commit
d3bf9cb31b
|
|
@ -104,4 +104,8 @@ type Database interface {
|
|||
StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent
|
||||
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
|
||||
SyncStreamPosition(ctx context.Context) (types.StreamPosition, error)
|
||||
// SendToDeviceUpdatesForSync returns a list of send-to-device updates, after having completed
|
||||
// updates and deletions for previous events. The sync token should be supplied to this function so
|
||||
// that we can clean up old events properly.
|
||||
SendToDeviceUpdatesForSync(ctx context.Context, userID, deviceID string, token types.StreamingToken) ([]types.SendToDeviceEvent, error)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -117,15 +117,20 @@ func (s *sendToDeviceStatements) SelectSendToDeviceMessages(
|
|||
if err = rows.Scan(&id, &userID, &deviceID, &eventType, &message, &sentByToken); err != nil {
|
||||
return
|
||||
}
|
||||
events = append(events, types.SendToDeviceEvent{
|
||||
event := types.SendToDeviceEvent{
|
||||
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
||||
UserID: userID,
|
||||
DeviceID: deviceID,
|
||||
EventType: eventType,
|
||||
Message: json.RawMessage(message),
|
||||
},
|
||||
SentByToken: sentByToken,
|
||||
})
|
||||
}
|
||||
if sentByToken != nil {
|
||||
if token, err := types.NewStreamTokenFromString(*sentByToken); err == nil {
|
||||
event.SentByToken = &token
|
||||
}
|
||||
}
|
||||
events = append(events, event)
|
||||
}
|
||||
|
||||
return events, rows.Err()
|
||||
|
|
|
|||
|
|
@ -1029,6 +1029,59 @@ func (d *Database) currentStateStreamEventsForRoom(
|
|||
return s, nil
|
||||
}
|
||||
|
||||
func (d *Database) AddSendToDeviceEvent(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
userID, deviceID, eventType, message string,
|
||||
) error {
|
||||
return d.SendToDevice.InsertSendToDeviceMessage(
|
||||
ctx, txn, userID, deviceID, eventType, message,
|
||||
)
|
||||
}
|
||||
|
||||
func (d *Database) SendToDeviceUpdatesForSync(
|
||||
ctx context.Context,
|
||||
userID, deviceID string,
|
||||
token types.StreamingToken,
|
||||
) (events []types.SendToDeviceEvent, err error) {
|
||||
err = internal.WithTransaction(d.DB, func(txn *sql.Tx) error {
|
||||
// First of all, get our send-to-device updates for this user.
|
||||
events, err := d.SendToDevice.SelectSendToDeviceMessages(ctx, userID, deviceID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("d.SendToDevice.SelectSendToDeviceMessages: %w", err)
|
||||
}
|
||||
|
||||
// Start by cleaning up any send-to-device messages that have older sent-by-tokens.
|
||||
// This means that they were sent in a previous /sync and the client has happily
|
||||
// progressed onto newer sync tokens.
|
||||
toUpdate := []types.SendToDeviceNID{}
|
||||
toDelete := []types.SendToDeviceNID{}
|
||||
for pos, event := range events {
|
||||
if event.SentByToken != nil && token.IsAfter(*event.SentByToken) {
|
||||
// Mark the event for deletion and remove it from our list of return events.
|
||||
toDelete = append(toDelete, event.ID)
|
||||
events = append(events[:pos], events[pos+1:]...)
|
||||
} else {
|
||||
// Mark the event for update and keep it in our list of return events.
|
||||
toUpdate = append(toUpdate, event.ID)
|
||||
event.SentByToken = &token
|
||||
}
|
||||
}
|
||||
|
||||
// Delete any send-to-device messages marked for deletion.
|
||||
if err := d.SendToDevice.DeleteSendToDeviceMessages(ctx, txn, toDelete); err != nil {
|
||||
return fmt.Errorf("d.SendToDevice.DeleteSendToDeviceMessages: %w", err)
|
||||
}
|
||||
|
||||
// Now update any outstanding send-to-device messages with the new sync token.
|
||||
if err := d.SendToDevice.UpdateSentSendToDeviceMessages(ctx, txn, token.String(), toUpdate); err != nil {
|
||||
return fmt.Errorf("d.SendToDevice.UpdateSentSendToDeviceMessages: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// There may be some overlap where events in stateEvents are already in recentEvents, so filter
|
||||
// them out so we don't include them twice in the /sync response. They should be in recentEvents
|
||||
// only, so clients get to the correct state once they have rolled forward.
|
||||
|
|
|
|||
|
|
@ -108,15 +108,20 @@ func (s *sendToDeviceStatements) SelectSendToDeviceMessages(
|
|||
if err = rows.Scan(&id, &userID, &deviceID, &eventType, &message, &sentByToken); err != nil {
|
||||
return
|
||||
}
|
||||
events = append(events, types.SendToDeviceEvent{
|
||||
event := types.SendToDeviceEvent{
|
||||
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
|
||||
UserID: userID,
|
||||
DeviceID: deviceID,
|
||||
EventType: eventType,
|
||||
Message: json.RawMessage(message),
|
||||
},
|
||||
SentByToken: sentByToken,
|
||||
})
|
||||
}
|
||||
if sentByToken != nil {
|
||||
if token, err := types.NewStreamTokenFromString(*sentByToken); err == nil {
|
||||
event.SentByToken = &token
|
||||
}
|
||||
}
|
||||
events = append(events, event)
|
||||
}
|
||||
|
||||
return events, rows.Err()
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@
|
|||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
|
|
@ -147,6 +148,11 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
|
|||
|
||||
accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
|
||||
res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition(), &accountDataFilter)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
res, err = rp.appendSendToDeviceMessages(res, req.device.UserID, req, latestPos)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -235,6 +241,26 @@ func (rp *RequestPool) appendAccountData(
|
|||
return data, nil
|
||||
}
|
||||
|
||||
func (rp *RequestPool) appendSendToDeviceMessages(
|
||||
data *types.Response, userID string, req syncRequest, currentPos types.StreamingToken,
|
||||
) (*types.Response, error) {
|
||||
events, err := rp.db.SendToDeviceUpdatesForSync(
|
||||
context.TODO(),
|
||||
userID,
|
||||
req.device.ID,
|
||||
currentPos,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, event := range events {
|
||||
data.ToDevice.Events = append(data.ToDevice.Events, event.SendToDeviceEvent)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// shouldReturnImmediately returns whether the /sync request is an initial sync,
|
||||
// or timeout=0, or full_state=true, in any of the cases the request should
|
||||
// return immediately.
|
||||
|
|
|
|||
|
|
@ -296,6 +296,9 @@ type Response struct {
|
|||
Invite map[string]InviteResponse `json:"invite"`
|
||||
Leave map[string]LeaveResponse `json:"leave"`
|
||||
} `json:"rooms"`
|
||||
ToDevice struct {
|
||||
Events []gomatrixserverlib.SendToDeviceEvent `json:"events"`
|
||||
} `json:"to_device"`
|
||||
}
|
||||
|
||||
// NewResponse creates an empty response with initialised maps.
|
||||
|
|
@ -398,5 +401,6 @@ type SendToDeviceNID int
|
|||
|
||||
type SendToDeviceEvent struct {
|
||||
gomatrixserverlib.SendToDeviceEvent
|
||||
SentByToken *string
|
||||
ID SendToDeviceNID
|
||||
SentByToken *StreamingToken
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue