diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 345fdb67d..0c8ba4e3d 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -39,7 +39,7 @@ type Database interface { CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) - GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, map[string]struct{}, error) + GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error) GetRoomHeroes(ctx context.Context, roomID, userID string, memberships []string) ([]string, error) diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index f7ee9da4b..4e4dcfead 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -686,7 +686,7 @@ func (d *Database) GetStateDeltas( ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter, -) (deltas []types.StateDelta, joinedRoomsIDs []string, newlyJoinedRooms map[string]struct{}, err error) { +) (deltas []types.StateDelta, joinedRoomsIDs []string, err error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response // - For each room which has membership list changes: @@ -697,7 +697,7 @@ func (d *Database) GetStateDeltas( // - Get all CURRENTLY joined rooms, and add them to 'joined' block. txn, err := d.readOnlySnapshot(ctx) if err != nil { - return nil, nil, nil, fmt.Errorf("d.readOnlySnapshot: %w", err) + return nil, nil, fmt.Errorf("d.readOnlySnapshot: %w", err) } var succeeded bool defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err) @@ -707,9 +707,9 @@ func (d *Database) GetStateDeltas( memberships, err := d.CurrentRoomState.SelectRoomIDsWithAnyMembership(ctx, txn, userID) if err != nil { if err == sql.ErrNoRows { - return nil, nil, nil, nil + return nil, nil, nil } - return nil, nil, nil, err + return nil, nil, err } allRoomIDs := make([]string, 0, len(memberships)) @@ -725,23 +725,23 @@ func (d *Database) GetStateDeltas( stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter, allRoomIDs) if err != nil { if err == sql.ErrNoRows { - return nil, nil, nil, nil + return nil, nil, nil } - return nil, nil, nil, err + return nil, nil, err } state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) if err != nil { if err == sql.ErrNoRows { - return nil, nil, nil, nil + return nil, nil, nil } - return nil, nil, nil, err + return nil, nil, err } // find out which rooms this user is peeking, if any. // We do this before joins so any peeks get overwritten peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, device.ID, r) if err != nil && err != sql.ErrNoRows { - return nil, nil, nil, err + return nil, nil, err } // add peek blocks @@ -754,7 +754,7 @@ func (d *Database) GetStateDeltas( if err == sql.ErrNoRows { continue } - return nil, nil, nil, err + return nil, nil, err } state[peek.RoomID] = s } @@ -768,12 +768,11 @@ func (d *Database) GetStateDeltas( } // handle newly joined rooms and non-joined rooms - newlyJoinedRoomIDs := make(map[string]struct{}, len(memberships)) + newlyJoinedRooms := make(map[string]struct{}, len(state)) for roomID, stateStreamEvents := range state { for _, ev := range stateStreamEvents { if membership, prevMembership := getMembershipFromEvent(ev.Event, userID); membership != "" { if membership == gomatrixserverlib.Join && prevMembership != membership { - newlyJoinedRoomIDs[roomID] = struct{}{} // send full room state down instead of a delta var s []types.StreamEvent s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilter) @@ -781,9 +780,10 @@ func (d *Database) GetStateDeltas( if err == sql.ErrNoRows { continue } - return nil, nil, nil, err + return nil, nil, err } state[roomID] = s + newlyJoinedRooms[roomID] = struct{}{} continue // we'll add this room in when we do joined rooms } @@ -800,15 +800,19 @@ func (d *Database) GetStateDeltas( // Add in currently joined rooms for _, joinedRoomID := range joinedRoomIDs { - deltas = append(deltas, types.StateDelta{ + delta := types.StateDelta{ Membership: gomatrixserverlib.Join, StateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]), RoomID: joinedRoomID, - }) + } + if _, ok := newlyJoinedRooms[joinedRoomID]; ok { + delta.NewlyJoined = true + } + deltas = append(deltas, delta) } succeeded = true - return deltas, joinedRoomIDs, newlyJoinedRoomIDs, nil + return deltas, joinedRoomIDs, nil } // GetStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 7b54571da..fa4c722ce 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -179,7 +179,6 @@ func (p *PDUStreamProvider) IncrementalSync( var err error var stateDeltas []types.StateDelta var syncJoinedRooms []string - var newlyJoinedRooms map[string]struct{} stateFilter := req.Filter.Room.State eventFilter := req.Filter.Room.Timeline @@ -190,7 +189,7 @@ func (p *PDUStreamProvider) IncrementalSync( return } } else { - if stateDeltas, syncJoinedRooms, newlyJoinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { + if stateDeltas, syncJoinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { req.Log.WithError(err).Error("p.DB.GetStateDeltas failed") return } @@ -213,7 +212,7 @@ func (p *PDUStreamProvider) IncrementalSync( newRange := r // If this room was joined in this sync, try to fetch // as much timeline events as allowed by the filter. - if _, ok := newlyJoinedRooms[delta.RoomID]; ok { + if delta.NewlyJoined { // Reverse the range, so we get the most recent first. // This will be limited by the eventFilter. newRange = types.Range{ @@ -227,6 +226,10 @@ func (p *PDUStreamProvider) IncrementalSync( req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") return to } + // Reset the position, as it is only for the special case of newly joined rooms + if delta.NewlyJoined { + pos = newRange.From + } switch { case r.Backwards && pos < newPos: fallthrough @@ -400,6 +403,8 @@ func applyHistoryVisibilityFilter( logrus.WithFields(logrus.Fields{ "duration": time.Since(startTime), "room_id": roomID, + "before": len(recentEvents), + "after": len(events), }).Debug("applied history visibility (sync)") return events, nil } diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 39b085d9c..d75d53ca9 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -37,6 +37,7 @@ var ( type StateDelta struct { RoomID string StateEvents []*gomatrixserverlib.HeaderedEvent + NewlyJoined bool Membership string // The PDU stream position of the latest membership event for this user, if applicable. // Can be 0 if there is no membership event in this delta.