From 6db4d967184aacc853b4b3f126438a3f8bc1d4fc Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Wed, 24 Aug 2022 16:31:21 +0200 Subject: [PATCH] Handle newly joined rooms --- syncapi/storage/interface.go | 5 +++-- syncapi/storage/shared/syncserver.go | 28 ++++++++++++++-------------- syncapi/streams/stream_pdu.go | 23 ++++++++++++++++++----- 3 files changed, 35 insertions(+), 21 deletions(-) diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 43a75da95..345fdb67d 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -19,10 +19,11 @@ import ( "github.com/matrix-org/dendrite/internal/eventutil" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrixserverlib" ) type Database interface { @@ -38,7 +39,7 @@ type Database interface { CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) - GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) + GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, map[string]struct{}, error) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error) GetRoomHeroes(ctx context.Context, roomID, userID string, memberships []string) ([]string, error) diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 65b2dc424..f7ee9da4b 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -686,7 +686,7 @@ func (d *Database) GetStateDeltas( ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter, -) ([]types.StateDelta, []string, error) { +) (deltas []types.StateDelta, joinedRoomsIDs []string, newlyJoinedRooms map[string]struct{}, err error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response // - For each room which has membership list changes: @@ -697,7 +697,7 @@ func (d *Database) GetStateDeltas( // - Get all CURRENTLY joined rooms, and add them to 'joined' block. txn, err := d.readOnlySnapshot(ctx) if err != nil { - return nil, nil, fmt.Errorf("d.readOnlySnapshot: %w", err) + return nil, nil, nil, fmt.Errorf("d.readOnlySnapshot: %w", err) } var succeeded bool defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err) @@ -707,9 +707,9 @@ func (d *Database) GetStateDeltas( memberships, err := d.CurrentRoomState.SelectRoomIDsWithAnyMembership(ctx, txn, userID) if err != nil { if err == sql.ErrNoRows { - return nil, nil, nil + return nil, nil, nil, nil } - return nil, nil, err + return nil, nil, nil, err } allRoomIDs := make([]string, 0, len(memberships)) @@ -721,29 +721,27 @@ func (d *Database) GetStateDeltas( } } - var deltas []types.StateDelta - // get all the state events ever (i.e. for all available rooms) between these two positions stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter, allRoomIDs) if err != nil { if err == sql.ErrNoRows { - return nil, nil, nil + return nil, nil, nil, nil } - return nil, nil, err + return nil, nil, nil, err } state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) if err != nil { if err == sql.ErrNoRows { - return nil, nil, nil + return nil, nil, nil, nil } - return nil, nil, err + return nil, nil, nil, err } // find out which rooms this user is peeking, if any. // We do this before joins so any peeks get overwritten peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, device.ID, r) if err != nil && err != sql.ErrNoRows { - return nil, nil, err + return nil, nil, nil, err } // add peek blocks @@ -756,7 +754,7 @@ func (d *Database) GetStateDeltas( if err == sql.ErrNoRows { continue } - return nil, nil, err + return nil, nil, nil, err } state[peek.RoomID] = s } @@ -770,10 +768,12 @@ func (d *Database) GetStateDeltas( } // handle newly joined rooms and non-joined rooms + newlyJoinedRoomIDs := make(map[string]struct{}, len(memberships)) for roomID, stateStreamEvents := range state { for _, ev := range stateStreamEvents { if membership, prevMembership := getMembershipFromEvent(ev.Event, userID); membership != "" { if membership == gomatrixserverlib.Join && prevMembership != membership { + newlyJoinedRoomIDs[roomID] = struct{}{} // send full room state down instead of a delta var s []types.StreamEvent s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilter) @@ -781,7 +781,7 @@ func (d *Database) GetStateDeltas( if err == sql.ErrNoRows { continue } - return nil, nil, err + return nil, nil, nil, err } state[roomID] = s continue // we'll add this room in when we do joined rooms @@ -808,7 +808,7 @@ func (d *Database) GetStateDeltas( } succeeded = true - return deltas, joinedRoomIDs, nil + return deltas, joinedRoomIDs, newlyJoinedRoomIDs, nil } // GetStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index d2b9f95fb..7b54571da 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -178,24 +178,25 @@ func (p *PDUStreamProvider) IncrementalSync( var err error var stateDeltas []types.StateDelta - var joinedRooms []string + var syncJoinedRooms []string + var newlyJoinedRooms map[string]struct{} stateFilter := req.Filter.Room.State eventFilter := req.Filter.Room.Timeline if req.WantFullState { - if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { + if stateDeltas, syncJoinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { req.Log.WithError(err).Error("p.DB.GetStateDeltasForFullStateSync failed") return } } else { - if stateDeltas, joinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { + if stateDeltas, syncJoinedRooms, newlyJoinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { req.Log.WithError(err).Error("p.DB.GetStateDeltas failed") return } } - for _, roomID := range joinedRooms { + for _, roomID := range syncJoinedRooms { req.Rooms[roomID] = gomatrixserverlib.Join } @@ -209,8 +210,20 @@ func (p *PDUStreamProvider) IncrementalSync( newPos = from for _, delta := range stateDeltas { + newRange := r + // If this room was joined in this sync, try to fetch + // as much timeline events as allowed by the filter. + if _, ok := newlyJoinedRooms[delta.RoomID]; ok { + // Reverse the range, so we get the most recent first. + // This will be limited by the eventFilter. + newRange = types.Range{ + From: r.To, + To: 0, + Backwards: true, + } + } var pos types.StreamPosition - if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, &stateFilter, req.Response); err != nil { + if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") return to }