From 2c0273a36eff80241fce478a9535f5be8150eb2d Mon Sep 17 00:00:00 2001 From: DavidSpenler Date: Wed, 2 Dec 2020 19:43:27 -0500 Subject: [PATCH] Implement recognition of 'include_leave' parameter in filter for sync endpoint --- syncapi/storage/interface.go | 2 +- syncapi/storage/shared/syncserver.go | 103 ++++++++++++++++++++++++++- syncapi/storage/storage_test.go | 4 +- syncapi/sync/request.go | 8 +++ syncapi/sync/requestpool.go | 2 +- 5 files changed, 114 insertions(+), 5 deletions(-) diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 456ca1b1d..726796665 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -67,7 +67,7 @@ type Database interface { IncrementalSync(ctx context.Context, res *types.Response, device userapi.Device, fromPos, toPos types.StreamingToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error) // CompleteSync returns a complete /sync API response for the given user. A response object // must be provided for CompleteSync to populate - it will not create one. - CompleteSync(ctx context.Context, res *types.Response, device userapi.Device, numRecentEventsPerRoom int) (*types.Response, error) + CompleteSync(ctx context.Context, res *types.Response, device userapi.Device, numRecentEventsPerRoom int, includeLeave bool) (*types.Response, error) // GetAccountDataInRange returns all account data for a given user inserted or // updated between two given positions // Returns a map following the format data[roomID] = []dataTypes diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 6c35a7653..f7ee481b4 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -745,6 +745,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync( ctx context.Context, res *types.Response, userID string, device userapi.Device, numRecentEventsPerRoom int, + includeLeave bool, ) ( toPos types.StreamingToken, joinedRoomIDs []string, @@ -811,6 +812,19 @@ func (d *Database) getResponseWithPDUsForCompleteSync( } } + //Add left rooms + leftRoomIDs, err := d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Leave) + for _, roomID := range leftRoomIDs { + var lr *types.LeaveResponse + lr, err = d.getLeaveResponseForCompleteSync( + ctx, txn, roomID, r, &stateFilter, numRecentEventsPerRoom, device, + ) + if err != nil { + return + } + res.Rooms.Leave[roomID] = *lr + } + if err = d.addInvitesToResponse(ctx, txn, userID, r, res); err != nil { return } @@ -896,12 +910,97 @@ func (d *Database) getJoinResponseForCompleteSync( return jr, nil } +func (d *Database) getLeaveResponseForCompleteSync( + ctx context.Context, txn *sql.Tx, + roomID string, + r types.Range, + stateFilter *gomatrixserverlib.StateFilter, + numRecentEventsPerRoom int, device userapi.Device, +) (lr *types.LeaveResponse, err error) { + var stateEvents []*gomatrixserverlib.HeaderedEvent + stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter) + if err != nil { + return + } + // TODO: When filters are added, we may need to call this multiple times to get enough events. + // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 + var recentStreamEvents []types.StreamEvent + var limited bool + recentStreamEvents, limited, err = d.OutputEvents.SelectRecentEvents( + ctx, txn, roomID, r, numRecentEventsPerRoom, true, true, + ) + if err != nil { + return + } + + // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the + // user shouldn't see, we check the recent events and remove any prior to the join event of the user + // which is equiv to history_visibility: joined + joinEventIndex := -1 + leaveEventIndex := -1 + for i := len(recentStreamEvents) - 1; i >= 0; i-- { + ev := recentStreamEvents[i] + if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) { + membership, _ := ev.Membership() + if leaveEventIndex == -1 && membership != "leave" { + continue + } else if membership == "leave"{ + leaveEventIndex = i + } else if membership == "join" { + joinEventIndex = i + if i > 0 { + // the create event happens before the first join, so we should cut it at that point instead + if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") { + joinEventIndex = i - 1 + break + } + } + break + } + } + } + recentStreamEvents = recentStreamEvents[:leaveEventIndex+1] + if joinEventIndex != -1 { + // cut all events earlier than the join (but not the join itself) + recentStreamEvents = recentStreamEvents[joinEventIndex:] + limited = false // so clients know not to try to backpaginate + } + + // Retrieve the backward topology position, i.e. the position of the + // oldest event in the room's topology. + var prevBatchStr string + if len(recentStreamEvents) > 0 { + var backwardTopologyPos, backwardStreamPos types.StreamPosition + backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID()) + if err != nil { + return + } + prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos) + prevBatch.Decrement() + prevBatchStr = prevBatch.String() + } + + // We don't include a device here as we don't need to send down + // transaction IDs for complete syncs, but we do it anyway because Sytest demands it for: + // "Can sync a room with a message with a transaction id" - which does a complete sync to check. + recentEvents := d.StreamEventsToEvents(&device, recentStreamEvents) + stateEvents = removeDuplicates(stateEvents, recentEvents) + lr = types.NewLeaveResponse() + lr.Timeline.PrevBatch = prevBatchStr + lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + lr.Timeline.Limited = limited + lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync) + return lr, nil +} + + func (d *Database) CompleteSync( ctx context.Context, res *types.Response, device userapi.Device, numRecentEventsPerRoom int, + includeLeave bool, ) (*types.Response, error) { toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync( - ctx, res, device.UserID, device, numRecentEventsPerRoom, + ctx, res, device.UserID, device, numRecentEventsPerRoom, includeLeave, ) if err != nil { return nil, fmt.Errorf("d.getResponseWithPDUsForCompleteSync: %w", err) @@ -948,8 +1047,10 @@ func (d *Database) addInvitesToResponse( } for roomID := range retiredInvites { if _, ok := res.Rooms.Join[roomID]; !ok { + /* lr := types.NewLeaveResponse() res.Rooms.Leave[roomID] = *lr + */ } } return nil diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index b1b0d2543..a8eb15b14 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -195,7 +195,7 @@ func TestSyncResponse(t *testing.T) { DoSync: func() (*types.Response, error) { res := types.NewResponse() // limit set to 5 - return db.CompleteSync(ctx, res, testUserDeviceA, 5) + return db.CompleteSync(ctx, res, testUserDeviceA, 5, false) }, // want the last 5 events WantTimeline: events[len(events)-5:], @@ -208,7 +208,7 @@ func TestSyncResponse(t *testing.T) { Name: "CompleteSync", DoSync: func() (*types.Response, error) { res := types.NewResponse() - return db.CompleteSync(ctx, res, testUserDeviceA, len(events)+1) + return db.CompleteSync(ctx, res, testUserDeviceA, len(events)+1, false) }, WantTimeline: events, // We want no state at all as that field in /sync is the delta between the token (beginning of time) diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index 0996729e6..afaab199d 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -31,6 +31,7 @@ import ( const defaultSyncTimeout = time.Duration(0) const DefaultTimelineLimit = 20 +const DefaultIncludeLeave = false type filter struct { Room struct { @@ -46,6 +47,7 @@ type syncRequest struct { device userapi.Device limit int timeout time.Duration + includeLeave bool since *types.StreamingToken // nil means that no since token was supplied wantFullState bool log *log.Entry @@ -69,6 +71,7 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat since = &tok } timelineLimit := DefaultTimelineLimit + includeLeave := DefaultIncludeLeave // TODO: read from stored filters too filterQuery := req.URL.Query().Get("filter") if filterQuery != "" { @@ -88,7 +91,11 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat } f, err := syncDB.GetFilter(req.Context(), localpart, filterQuery) if err == nil { + util.GetLogger(req.Context()).Infof("Filter was retrieved") timelineLimit = f.Room.Timeline.Limit + includeLeave = f.Room.IncludeLeave + } else { + util.GetLogger(req.Context()).Infof("Filter could not be retrieved") } } } @@ -97,6 +104,7 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat ctx: req.Context(), device: device, timeout: timeout, + includeLeave: includeLeave, since: since, wantFullState: wantFullState, limit: timelineLimit, diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 0cb6efe7a..8a75f2123 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -255,7 +255,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea // TODO: handle ignored users if req.since.PDUPosition() == 0 && req.since.EDUPosition() == 0 { - res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit) + res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit, req.includeLeave) if err != nil { return res, fmt.Errorf("rp.db.CompleteSync: %w", err) }