diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 6c2271e68..9d3c71d13 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -11,6 +11,7 @@ import ( "github.com/matrix-org/dendrite/internal/caching" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/internal" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -299,19 +300,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( switch delta.Membership { case gomatrixserverlib.Join: // We need to make sure we always include the latest states events, if they are in the timeline - stateEvents, err := p.DB.CurrentState(ctx, delta.RoomID, &gomatrixserverlib.StateFilter{Limit: 1000}, nil) - if err != nil { - logrus.WithError(err).Warnf("failed to get current state") - } - alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents)) - for _, ev := range stateEvents { - alwaysIncludeIDs[ev.EventID()] = struct{}{} - } - - events, err := internal.ApplyHistoryVisibilityFilter(ctx, p.DB, recentEvents, alwaysIncludeIDs, device.UserID) + events, err := applyHistoryVisibilityFilter(ctx, p.DB, delta.RoomID, device.UserID, recentEvents) if err != nil { logrus.WithError(err).Error("unable to apply history visibility filter") - return r.From, err } jr := types.NewJoinResponse() if hasMembershipChange { @@ -324,20 +315,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( res.Rooms.Join[delta.RoomID] = *jr case gomatrixserverlib.Peek: - // We need to make sure we always include the latest states events, if they are in the timeline - stateEvents, err := p.DB.CurrentState(ctx, delta.RoomID, &gomatrixserverlib.StateFilter{Limit: 1000}, nil) - if err != nil { - logrus.WithError(err).Warnf("failed to get current state") - } - alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents)) - for _, ev := range stateEvents { - alwaysIncludeIDs[ev.EventID()] = struct{}{} - } - events, err := internal.ApplyHistoryVisibilityFilter(ctx, p.DB, recentEvents, alwaysIncludeIDs, device.UserID) + events, err := applyHistoryVisibilityFilter(ctx, p.DB, delta.RoomID, device.UserID, recentEvents) if err != nil { logrus.WithError(err).Error("unable to apply history visibility filter") - return r.From, err } + jr := types.NewJoinResponse() jr.Timeline.PrevBatch = &prevBatch jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync) @@ -362,6 +344,31 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( return latestPosition, nil } +// applyHistoryVisibilityFilter gets the current room state and supplies it to ApplyHistoryVisibilityFilter, to make +// sure we always return the required events in the timeline. +func applyHistoryVisibilityFilter( + ctx context.Context, + db storage.Database, + roomID, userID string, + recentEvents []*gomatrixserverlib.HeaderedEvent, +) ([]*gomatrixserverlib.HeaderedEvent, error) { + // We need to make sure we always include the latest states events, if they are in the timeline + stateEvents, err := db.CurrentState(ctx, roomID, &gomatrixserverlib.StateFilter{Limit: 1000}, nil) + if err != nil { + logrus.WithError(err).Warnf("failed to get current state") + } + alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents)) + for _, ev := range stateEvents { + alwaysIncludeIDs[ev.EventID()] = struct{}{} + } + events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, recentEvents, alwaysIncludeIDs, userID) + if err != nil { + + return nil, err + } + return events, err +} + func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) { // Work out how many members are in the room. joinedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Join, latestPosition) @@ -466,17 +473,9 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) - fullStateEvents, err := p.DB.CurrentState(ctx, roomID, &gomatrixserverlib.StateFilter{Limit: 1000}, nil) + events, err := applyHistoryVisibilityFilter(ctx, p.DB, roomID, device.UserID, recentEvents) if err != nil { - return - } - alwaysIncludeIDs := make(map[string]struct{}, len(fullStateEvents)) - for _, ev := range stateEvents { - alwaysIncludeIDs[ev.EventID()] = struct{}{} - } - events, err := internal.ApplyHistoryVisibilityFilter(ctx, p.DB, recentEvents, alwaysIncludeIDs, device.UserID) - if err != nil { - return nil, err + logrus.WithError(err).Error("unable to apply history visibility filter") } limited = limited && len(events) == len(recentEvents)