Add MembershipChanges to send presence down /sync

This commit is contained in:
Till Faelligen 2022-10-19 11:03:05 +02:00
parent 6f7c57e6bd
commit 1d19da0cfd
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
4 changed files with 63 additions and 27 deletions

View file

@ -185,7 +185,6 @@ func (p *PDUStreamProvider) IncrementalSync(
// If this room was joined in this sync, try to fetch // If this room was joined in this sync, try to fetch
// as much timeline events as allowed by the filter. // as much timeline events as allowed by the filter.
if delta.NewlyJoined { if delta.NewlyJoined {
req.NewlyJoined[delta.RoomID] = struct{}{}
// Reverse the range, so we get the most recent first. // Reverse the range, so we get the most recent first.
// This will be limited by the eventFilter. // This will be limited by the eventFilter.
newRange = types.Range{ newRange = types.Range{
@ -195,7 +194,7 @@ func (p *PDUStreamProvider) IncrementalSync(
} }
} }
var pos types.StreamPosition var pos types.StreamPosition
if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil { if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone { if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone {
return newPos return newPos
@ -226,7 +225,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
delta types.StateDelta, delta types.StateDelta,
eventFilter *gomatrixserverlib.RoomEventFilter, eventFilter *gomatrixserverlib.RoomEventFilter,
stateFilter *gomatrixserverlib.StateFilter, stateFilter *gomatrixserverlib.StateFilter,
res *types.Response, req *types.SyncRequest,
) (types.StreamPosition, error) { ) (types.StreamPosition, error) {
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
// make sure we don't leak recent events after the leave event. // make sure we don't leak recent events after the leave event.
@ -291,8 +290,10 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
hasMembershipChange := false hasMembershipChange := false
for _, recentEvent := range recentStreamEvents { for _, recentEvent := range recentStreamEvents {
if recentEvent.Type() == gomatrixserverlib.MRoomMember && recentEvent.StateKey() != nil { if recentEvent.Type() == gomatrixserverlib.MRoomMember && recentEvent.StateKey() != nil {
if membership, _ := recentEvent.Membership(); membership == gomatrixserverlib.Join {
req.MembershipChanges[*recentEvent.StateKey()] = struct{}{}
}
hasMembershipChange = true hasMembershipChange = true
break
} }
} }
@ -321,7 +322,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// didn't "remove" events, return that the response is limited. // didn't "remove" events, return that the response is limited.
jr.Timeline.Limited = (limited && len(events) == len(recentEvents)) || delta.NewlyJoined jr.Timeline.Limited = (limited && len(events) == len(recentEvents)) || delta.NewlyJoined
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.RoomID] = jr req.Response.Rooms.Join[delta.RoomID] = jr
case gomatrixserverlib.Peek: case gomatrixserverlib.Peek:
jr := types.NewJoinResponse() jr := types.NewJoinResponse()
@ -330,7 +331,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Peek[delta.RoomID] = jr req.Response.Rooms.Peek[delta.RoomID] = jr
case gomatrixserverlib.Leave: case gomatrixserverlib.Leave:
fallthrough // transitions to leave are the same as ban fallthrough // transitions to leave are the same as ban
@ -343,7 +344,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// didn't "remove" events, return that the response is limited. // didn't "remove" events, return that the response is limited.
lr.Timeline.Limited = limited && len(events) == len(recentEvents) lr.Timeline.Limited = limited && len(events) == len(recentEvents)
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.RoomID] = lr req.Response.Rooms.Leave[delta.RoomID] = lr
} }
return latestPosition, nil return latestPosition, nil

View file

@ -20,6 +20,7 @@ import (
"sync" "sync"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
@ -69,19 +70,20 @@ func (p *PresenceStreamProvider) IncrementalSync(
return from return from
} }
// add newly joined rooms user presences if len(presences) == 0 {
if len(req.NewlyJoined) > 0 { return to
newlyJoinedRoomIDs := make([]string, 0, len(req.NewlyJoined))
for roomID := range req.NewlyJoined {
newlyJoinedRoomIDs = append(newlyJoinedRoomIDs, roomID)
} }
// add newly joined rooms user presences
newlyJoined := joinedRooms(req.Response, req.Device.UserID)
if len(newlyJoined) > 0 {
// TODO: Check if this is working better than before. // TODO: Check if this is working better than before.
if err = p.notifier.LoadRooms(ctx, p.DB, newlyJoinedRoomIDs); err != nil { if err = p.notifier.LoadRooms(ctx, p.DB, newlyJoined); err != nil {
req.Log.WithError(err).Error("unable to refresh notifier lists") req.Log.WithError(err).Error("unable to refresh notifier lists")
return from return from
} }
NewlyJoinedLoop: NewlyJoinedLoop:
for _, roomID := range newlyJoinedRoomIDs { for _, roomID := range newlyJoined {
roomUsers := p.notifier.JoinedUsers(roomID) roomUsers := p.notifier.JoinedUsers(roomID)
for i := range roomUsers { for i := range roomUsers {
// we already got a presence from this user // we already got a presence from this user
@ -101,8 +103,6 @@ func (p *PresenceStreamProvider) IncrementalSync(
} }
} }
} }
} else {
return to
} }
lastPos := from lastPos := from
@ -121,7 +121,8 @@ func (p *PresenceStreamProvider) IncrementalSync(
prevPresence := pres.(*types.PresenceInternal) prevPresence := pres.(*types.PresenceInternal)
currentlyActive := prevPresence.CurrentlyActive() currentlyActive := prevPresence.CurrentlyActive()
skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID
if skip { _, membershipChange := req.MembershipChanges[presence.UserID]
if skip && !membershipChange {
req.Log.Tracef("Skipping presence, no change (%s)", presence.UserID) req.Log.Tracef("Skipping presence, no change (%s)", presence.UserID)
continue continue
} }
@ -162,3 +163,36 @@ func (p *PresenceStreamProvider) IncrementalSync(
return lastPos return lastPos
} }
func joinedRooms(res *types.Response, userID string) []string {
var roomIDs []string
for roomID, join := range res.Rooms.Join {
// we would expect to see our join event somewhere if we newly joined the room.
// Normal events get put in the join section so it's not enough to know the room ID is present in 'join'.
newlyJoined := membershipEventPresent(join.State.Events, userID)
if newlyJoined {
roomIDs = append(roomIDs, roomID)
continue
}
newlyJoined = membershipEventPresent(join.Timeline.Events, userID)
if newlyJoined {
roomIDs = append(roomIDs, roomID)
}
}
return roomIDs
}
func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool {
for _, ev := range events {
// it's enough to know that we have our member event here, don't need to check membership content
// as it's implied by being in the respective section of the sync response.
if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
// ignore e.g. join -> join changes
if gjson.GetBytes(ev.Unsigned, "prev_content.membership").Str == gjson.GetBytes(ev.Content, "membership").Str {
continue
}
return true
}
}
return false
}

View file

@ -100,6 +100,7 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
Timeout: timeout, // Timeout: timeout, //
Rooms: make(map[string]string), // Populated by the PDU stream Rooms: make(map[string]string), // Populated by the PDU stream
WantFullState: wantFullState, // WantFullState: wantFullState, //
MembershipChanges: make(map[string]struct{}), // Populated by the PDU stream
}, nil }, nil
} }

View file

@ -23,7 +23,7 @@ type SyncRequest struct {
// Updated by the PDU stream. // Updated by the PDU stream.
Rooms map[string]string Rooms map[string]string
// Updated by the PDU stream. // Updated by the PDU stream.
NewlyJoined map[string]struct{} MembershipChanges map[string]struct{}
// Updated by the PDU stream. // Updated by the PDU stream.
IgnoredUsers IgnoredUsers IgnoredUsers IgnoredUsers
} }