From 9ead2d96e2947faa4c636e68859788ed50195fc5 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 Jan 2021 16:07:59 +0000 Subject: [PATCH] Use addRoomDeltaToResponse --- syncapi/storage/shared/stream_pdu.go | 124 +++++++++++++++++---------- 1 file changed, 77 insertions(+), 47 deletions(-) diff --git a/syncapi/storage/shared/stream_pdu.go b/syncapi/storage/shared/stream_pdu.go index 9ff168ebb..7d58ebbc0 100644 --- a/syncapi/storage/shared/stream_pdu.go +++ b/syncapi/storage/shared/stream_pdu.go @@ -118,7 +118,7 @@ func (p *PDUStreamProvider) IncrementalSync( newPos = to var err error - var events []types.StreamEvent + //var events []types.StreamEvent var stateDeltas []stateDelta var joinedRooms []string @@ -139,55 +139,85 @@ func (p *PDUStreamProvider) IncrementalSync( req.Rooms[roomID] = gomatrixserverlib.Join } - for _, stateDelta := range stateDeltas { - roomID := stateDelta.roomID - room := types.JoinResponse{} - - if r.Backwards { - // When using backward ordering, we want the most recent events first. - if events, _, err = p.DB.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, req.Filter.Limit, false, false); err != nil { - return - } - } else { - // When using forward ordering, we want the least recent events first. - if events, err = p.DB.OutputEvents.SelectEarlyEvents(ctx, nil, roomID, r, req.Filter.Limit); err != nil { - return - } + for _, delta := range stateDeltas { + err = p.addRoomDeltaToResponse(ctx, req.Device, nil, r, delta, 20, req.Response) + if err != nil { + return newPos // nil, fmt.Errorf("d.addRoomDeltaToResponse: %w", err) } - - for _, event := range p.DB.StreamEventsToEvents(req.Device, events) { - room.Timeline.Events = append( - room.Timeline.Events, - gomatrixserverlib.ToClientEvent( - event.Event, - gomatrixserverlib.FormatSync, - ), - ) - } - - for _, event := range events { - if event.StreamPosition > newPos { - newPos = event.StreamPosition - } - } - - room.State.Events = gomatrixserverlib.HeaderedToClientEvents( - stateDelta.stateEvents, - gomatrixserverlib.FormatSync, - ) - - if len(events) > 0 { - prevBatch, err := p.DB.getBackwardTopologyPos(ctx, nil, events) - if err != nil { - return - } - room.Timeline.PrevBatch = &prevBatch - } - - req.Response.Rooms.Join[roomID] = room } - return newPos + return r.To +} + +func (p *PDUStreamProvider) addRoomDeltaToResponse( + ctx context.Context, + device *userapi.Device, + txn *sql.Tx, + r types.Range, + delta stateDelta, + numRecentEventsPerRoom int, + res *types.Response, +) error { + if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave { + // make sure we don't leak recent events after the leave event. + // TODO: History visibility makes this somewhat complex to handle correctly. For example: + // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). + // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave + // in a single /sync request + // This is all "okay" assuming history_visibility == "shared" which it is by default. + r.To = delta.membershipPos + } + recentStreamEvents, limited, err := p.DB.OutputEvents.SelectRecentEvents( + ctx, txn, delta.roomID, r, + numRecentEventsPerRoom, true, true, + ) + if err != nil { + return err + } + recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) + delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back + prevBatch, err := p.DB.getBackwardTopologyPos(ctx, txn, recentStreamEvents) + if err != nil { + return err + } + + // XXX: should we ever get this far if we have no recent events or state in this room? + // in practice we do for peeks, but possibly not joins? + if len(recentEvents) == 0 && len(delta.stateEvents) == 0 { + return nil + } + + switch delta.membership { + case gomatrixserverlib.Join: + jr := types.NewJoinResponse() + + jr.Timeline.PrevBatch = &prevBatch + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = limited + jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[delta.roomID] = *jr + case gomatrixserverlib.Peek: + jr := types.NewJoinResponse() + + jr.Timeline.PrevBatch = &prevBatch + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = limited + jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Peek[delta.roomID] = *jr + case gomatrixserverlib.Leave: + fallthrough // transitions to leave are the same as ban + case gomatrixserverlib.Ban: + // TODO: recentEvents may contain events that this user is not allowed to see because they are + // no longer in the room. + lr := types.NewLeaveResponse() + lr.Timeline.PrevBatch = &prevBatch + lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Leave[delta.roomID] = *lr + } + + return nil } func (p *PDUStreamProvider) getJoinResponseForCompleteSync(