From 76e1faeaf81edf9b77432281468ce87c8f372aa9 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 14 Jan 2021 01:59:18 -0600 Subject: [PATCH] Try get Sytest working again Currently it's failing way earlier on "Newly left rooms appear in the leave section of incremental sync" and I've commented out all of my new functionality so need to figure out what is interfering in this branch now --- syncapi/streams/stream_pdu.go | 60 +++++++++++++++++++++++++++++++++-- syncapi/sync/request.go | 8 ++--- syncapi/types/provider.go | 1 - 3 files changed, 61 insertions(+), 8 deletions(-) diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 953608407..6c51a2b7f 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -8,6 +8,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" ) type PDUStreamProvider struct { @@ -146,7 +147,7 @@ func (p *PDUStreamProvider) IncrementalSync( } for _, delta := range stateDeltas { - if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, req.Limit, req.Response); err != nil { + if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, req.Filter.Room.Timeline.Limit, req.Response); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") return newPos } @@ -179,6 +180,13 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( if err != nil { return err } + + logrus.WithFields(logrus.Fields{ + "limited": limited, + "delta.stateEvents": len(delta.StateEvents), + "recentStreamEvents": len(recentStreamEvents), + }).Info("isync addRoomDeltaToResponse removeDuplicates") + recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents) @@ -250,7 +258,44 @@ func (p *PDUStreamProvider) getResponseForCompleteSync( return } - recentStreamEvents, limited = p.filterStreamEventsAccordingToHistoryVisibility(recentStreamEvents, stateEvents, device, limited) + events := make([]string, len(recentStreamEvents)) + for i, v := range recentStreamEvents { + events[i] = string(v.HeaderedEvent.Event.JSON()) + } + + logrus.WithFields(logrus.Fields{ + "filter.Room.Timeline.Limit": filter.Room.Timeline.Limit, + "recentStreamEvents": fmt.Sprintf("%+v", events), + }).Info("getResponseForCompleteSync") + + //recentStreamEvents, limited = p.filterStreamEventsAccordingToHistoryVisibility(recentStreamEvents, stateEvents, device, limited) + + // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the + // user shouldn't see, we check the recent events and remove any prior to the join event of the user + // which is equiv to history_visibility: joined + joinEventIndex := -1 + for i := len(recentStreamEvents) - 1; i >= 0; i-- { + ev := recentStreamEvents[i] + if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) { + membership, _ := ev.Membership() + if membership == "join" { + joinEventIndex = i + if i > 0 { + // the create event happens before the first join, so we should cut it at that point instead + if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") { + joinEventIndex = i - 1 + break + } + } + break + } + } + } + if joinEventIndex != -1 { + // cut all events earlier than the join (but not the join itself) + recentStreamEvents = recentStreamEvents[joinEventIndex:] + limited = false // so clients know not to try to backpaginate + } // Retrieve the backward topology position, i.e. the position of the // oldest event in the room's topology. @@ -382,6 +427,17 @@ func (p *PDUStreamProvider) filterStreamEventsAccordingToHistoryVisibility( sliceEnd = leaveEventIndex } + events := make([]string, len(recentStreamEvents)) + for i, v := range recentStreamEvents { + events[i] = string(v.HeaderedEvent.Event.JSON()) + } + + logrus.WithFields(logrus.Fields{ + "sliceStart": sliceStart, + "sliceEnd": sliceEnd, + "before recentStreamEvents": fmt.Sprintf("%+v", events), + }).Info("cutting down the events") + return recentStreamEvents[sliceStart:sliceEnd], limited } diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index 3a1d4d4a1..96d0bfa01 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -30,7 +30,7 @@ import ( ) const defaultSyncTimeout = time.Duration(0) -const DefaultTimelineLimit = 20 +const defaultTimelineLimit = 20 func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*types.SyncRequest, error) { timeout := getTimeout(req.URL.Query().Get("timeout")) @@ -76,16 +76,15 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat } limit := filter.Room.Timeline.Limit if limit == 0 { - limit = DefaultTimelineLimit + filter.Room.Timeline.Limit = defaultTimelineLimit } - // TODO: Additional query params: set_presence, filter + // TODO: Additional query params: set_presence logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{ "user_id": device.UserID, "device_id": device.ID, "since": since, "timeout": timeout, - "limit": limit, }) return &types.SyncRequest{ @@ -96,7 +95,6 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat Filter: filter, // Since: since, // Timeout: timeout, // - Limit: limit, // Rooms: make(map[string]string), // Populated by the PDU stream WantFullState: wantFullState, // }, nil diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go index 164e59a8d..93ed12661 100644 --- a/syncapi/types/provider.go +++ b/syncapi/types/provider.go @@ -16,7 +16,6 @@ type SyncRequest struct { Response *Response Filter gomatrixserverlib.Filter Since StreamingToken - Limit int Timeout time.Duration WantFullState bool