From 1d19da0cfdacb313d636066662162f2508f5d8a9 Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Wed, 19 Oct 2022 11:03:05 +0200 Subject: [PATCH] Add MembershipChanges to send presence down /sync --- syncapi/streams/stream_pdu.go | 15 +++++---- syncapi/streams/stream_presence.go | 54 ++++++++++++++++++++++++------ syncapi/sync/request.go | 19 ++++++----- syncapi/types/provider.go | 2 +- 4 files changed, 63 insertions(+), 27 deletions(-) diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 67d5981bb..9ec2b61cd 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -185,7 +185,6 @@ func (p *PDUStreamProvider) IncrementalSync( // If this room was joined in this sync, try to fetch // as much timeline events as allowed by the filter. if delta.NewlyJoined { - req.NewlyJoined[delta.RoomID] = struct{}{} // Reverse the range, so we get the most recent first. // This will be limited by the eventFilter. newRange = types.Range{ @@ -195,7 +194,7 @@ func (p *PDUStreamProvider) IncrementalSync( } } var pos types.StreamPosition - if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil { + if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone { return newPos @@ -226,7 +225,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( delta types.StateDelta, eventFilter *gomatrixserverlib.RoomEventFilter, stateFilter *gomatrixserverlib.StateFilter, - res *types.Response, + req *types.SyncRequest, ) (types.StreamPosition, error) { if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { // make sure we don't leak recent events after the leave event. @@ -291,8 +290,10 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( hasMembershipChange := false for _, recentEvent := range recentStreamEvents { if recentEvent.Type() == gomatrixserverlib.MRoomMember && recentEvent.StateKey() != nil { + if membership, _ := recentEvent.Membership(); membership == gomatrixserverlib.Join { + req.MembershipChanges[*recentEvent.StateKey()] = struct{}{} + } hasMembershipChange = true - break } } @@ -321,7 +322,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( // didn't "remove" events, return that the response is limited. jr.Timeline.Limited = (limited && len(events) == len(recentEvents)) || delta.NewlyJoined jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Join[delta.RoomID] = jr + req.Response.Rooms.Join[delta.RoomID] = jr case gomatrixserverlib.Peek: jr := types.NewJoinResponse() @@ -330,7 +331,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = limited jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Peek[delta.RoomID] = jr + req.Response.Rooms.Peek[delta.RoomID] = jr case gomatrixserverlib.Leave: fallthrough // transitions to leave are the same as ban @@ -343,7 +344,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( // didn't "remove" events, return that the response is limited. lr.Timeline.Limited = limited && len(events) == len(recentEvents) lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Leave[delta.RoomID] = lr + req.Response.Rooms.Leave[delta.RoomID] = lr } return latestPosition, nil diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go index b90b1b97a..030b7c5d5 100644 --- a/syncapi/streams/stream_presence.go +++ b/syncapi/streams/stream_presence.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/matrix-org/gomatrixserverlib" + "github.com/tidwall/gjson" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" @@ -69,19 +70,20 @@ func (p *PresenceStreamProvider) IncrementalSync( return from } + if len(presences) == 0 { + return to + } + // add newly joined rooms user presences - if len(req.NewlyJoined) > 0 { - newlyJoinedRoomIDs := make([]string, 0, len(req.NewlyJoined)) - for roomID := range req.NewlyJoined { - newlyJoinedRoomIDs = append(newlyJoinedRoomIDs, roomID) - } + newlyJoined := joinedRooms(req.Response, req.Device.UserID) + if len(newlyJoined) > 0 { // TODO: Check if this is working better than before. - if err = p.notifier.LoadRooms(ctx, p.DB, newlyJoinedRoomIDs); err != nil { + if err = p.notifier.LoadRooms(ctx, p.DB, newlyJoined); err != nil { req.Log.WithError(err).Error("unable to refresh notifier lists") return from } NewlyJoinedLoop: - for _, roomID := range newlyJoinedRoomIDs { + for _, roomID := range newlyJoined { roomUsers := p.notifier.JoinedUsers(roomID) for i := range roomUsers { // we already got a presence from this user @@ -101,8 +103,6 @@ func (p *PresenceStreamProvider) IncrementalSync( } } } - } else { - return to } lastPos := from @@ -121,7 +121,8 @@ func (p *PresenceStreamProvider) IncrementalSync( prevPresence := pres.(*types.PresenceInternal) currentlyActive := prevPresence.CurrentlyActive() skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID - if skip { + _, membershipChange := req.MembershipChanges[presence.UserID] + if skip && !membershipChange { req.Log.Tracef("Skipping presence, no change (%s)", presence.UserID) continue } @@ -162,3 +163,36 @@ func (p *PresenceStreamProvider) IncrementalSync( return lastPos } + +func joinedRooms(res *types.Response, userID string) []string { + var roomIDs []string + for roomID, join := range res.Rooms.Join { + // we would expect to see our join event somewhere if we newly joined the room. + // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'. + newlyJoined := membershipEventPresent(join.State.Events, userID) + if newlyJoined { + roomIDs = append(roomIDs, roomID) + continue + } + newlyJoined = membershipEventPresent(join.Timeline.Events, userID) + if newlyJoined { + roomIDs = append(roomIDs, roomID) + } + } + return roomIDs +} + +func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool { + for _, ev := range events { + // it's enough to know that we have our member event here, don't need to check membership content + // as it's implied by being in the respective section of the sync response. + if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID { + // ignore e.g. join -> join changes + if gjson.GetBytes(ev.Unsigned, "prev_content.membership").Str == gjson.GetBytes(ev.Content, "membership").Str { + continue + } + return true + } + } + return false +} diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index 268ed70c6..620dfdcdb 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -91,15 +91,16 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat }) return &types.SyncRequest{ - Context: req.Context(), // - Log: logger, // - Device: &device, // - Response: types.NewResponse(), // Populated by all streams - Filter: filter, // - Since: since, // - Timeout: timeout, // - Rooms: make(map[string]string), // Populated by the PDU stream - WantFullState: wantFullState, // + Context: req.Context(), // + Log: logger, // + Device: &device, // + Response: types.NewResponse(), // Populated by all streams + Filter: filter, // + Since: since, // + Timeout: timeout, // + Rooms: make(map[string]string), // Populated by the PDU stream + WantFullState: wantFullState, // + MembershipChanges: make(map[string]struct{}), // Populated by the PDU stream }, nil } diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go index 5310e2604..9a533002b 100644 --- a/syncapi/types/provider.go +++ b/syncapi/types/provider.go @@ -23,7 +23,7 @@ type SyncRequest struct { // Updated by the PDU stream. Rooms map[string]string // Updated by the PDU stream. - NewlyJoined map[string]struct{} + MembershipChanges map[string]struct{} // Updated by the PDU stream. IgnoredUsers IgnoredUsers }