diff --git a/syncapi/internal/history_visibility.go b/syncapi/internal/history_visibility.go index 98fc8bfc4..a37484dbf 100644 --- a/syncapi/internal/history_visibility.go +++ b/syncapi/internal/history_visibility.go @@ -17,12 +17,36 @@ package internal import ( "context" "math" + "sync" + "time" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/gomatrixserverlib" + "github.com/prometheus/client_golang/prometheus" "github.com/tidwall/gjson" ) +var registerOnce = &sync.Once{} + +// calculateHistoryVisibilityDuration stores the time it takes to +// calculate the history visibility. In polylith mode the roundtrip +// to the roomserver is included in this time. +var calculateHistoryVisibilityDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "dendrite", + Subsystem: "syncapi", + Name: "calculateHistoryVisibility_duration_millis", + Help: "How long it takes to calculate the history visibility", + Buckets: []float64{ // milliseconds + 5, 10, 25, 50, 75, 100, 250, 500, + 1000, 2000, 3000, 4000, 5000, 6000, + 7000, 8000, 9000, 10000, 15000, 20000, + }, + }, + []string{"api"}, +) + var historyVisibilityPriority = map[gomatrixserverlib.HistoryVisibility]uint8{ gomatrixserverlib.WorldReadable: 0, gomatrixserverlib.HistoryVisibilityShared: 1, @@ -75,14 +99,18 @@ func (ev eventVisibility) allowed() (allowed bool) { func ApplyHistoryVisibilityFilter( ctx context.Context, syncDB storage.Database, + rsAPI api.SyncRoomserverAPI, events []*gomatrixserverlib.HeaderedEvent, alwaysIncludeEventIDs map[string]struct{}, - userID string, + userID, endpoint string, ) ([]*gomatrixserverlib.HeaderedEvent, error) { - eventsFiltered := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events)) + registerOnce.Do(func() { + prometheus.MustRegister(calculateHistoryVisibilityDuration) + }) if len(events) == 0 { return events, nil } + start := time.Now() // try to get the current membership of the user membershipCurrent, _, err := syncDB.SelectMembershipForUser(ctx, events[0].RoomID(), userID, math.MaxInt64) @@ -90,12 +118,21 @@ func ApplyHistoryVisibilityFilter( return nil, err } + eventIDs := make([]string, len(events)) + for i := range events { + eventIDs[i] = events[i].EventID() + } + + // Get the mapping from eventID -> eventVisibility + eventsFiltered := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events)) + event, err := visibilityForEvents(ctx, rsAPI, eventIDs, userID, events[0].RoomID()) + if err != nil { + return eventsFiltered, err + } for _, ev := range events { - event, err := visibilityForEvent(ctx, syncDB, ev, userID) - if err != nil { - return eventsFiltered, err - } - event.membershipCurrent = membershipCurrent + d := event[ev.EventID()] + d.membershipCurrent = membershipCurrent + d.visibility = ev.Visibility // Always include specific state events for /sync responses if alwaysIncludeEventIDs != nil { if _, ok := alwaysIncludeEventIDs[ev.EventID()]; ok { @@ -115,31 +152,53 @@ func ApplyHistoryVisibilityFilter( // no OK check, since this should have been validated when setting the value newPrio := historyVisibilityPriority[hisVis] if oldPrio < newPrio { - event.visibility = gomatrixserverlib.HistoryVisibility(prevHisVis) + d.visibility = gomatrixserverlib.HistoryVisibility(prevHisVis) } } } // do the actual check - allowed := event.allowed() + allowed := d.allowed() if allowed { eventsFiltered = append(eventsFiltered, ev) } } + calculateHistoryVisibilityDuration.With(prometheus.Labels{"api": endpoint}).Observe(float64(time.Since(start).Milliseconds())) return eventsFiltered, nil } -// visibilityForEvent returns an eventVisibility containing the visibility and the membership at the given event. -// Returns an error if the database returns an error. -func visibilityForEvent(ctx context.Context, db storage.Database, event *gomatrixserverlib.HeaderedEvent, userID string) (eventVisibility, error) { - // get the membership event - var membershipAtEvent string - membershipAtEvent, _, err := db.SelectMembershipForUser(ctx, event.RoomID(), userID, event.Depth()) +// visibilityForEvents returns a map from eventID to eventVisibility containing the visibility and the membership at the given event. +// Returns an error if the roomserver can't calculate the memberships. +func visibilityForEvents(ctx context.Context, rsAPI api.SyncRoomserverAPI, eventIDs []string, userID, roomID string) (map[string]eventVisibility, error) { + res := make(map[string]eventVisibility, len(eventIDs)) + + // get the membership events for all eventIDs + resp := &api.QueryMembersipAtEventResponse{} + err := rsAPI.QueryMembershipAtEvent(ctx, &api.QueryMembersipAtEventRequest{ + RoomID: roomID, + EventIDs: eventIDs, + UserID: userID, + }, resp) if err != nil { - return eventVisibility{}, err + return res, err } - return eventVisibility{ - visibility: event.Visibility, - membershipAtEvent: membershipAtEvent, - }, nil + // Create a map from eventID -> eventVisibility + for _, eventID := range eventIDs { + vis := eventVisibility{membershipAtEvent: gomatrixserverlib.Leave} + events, ok := resp.Memberships[eventID] + if !ok { + res[eventID] = vis + continue + } + for _, ev := range events { + membership, err := ev.Membership() + if err != nil { + return res, err + } + vis.membershipAtEvent = membership + } + res[eventID] = vis + } + + return res, nil } diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index 79ed440e7..526f2c17a 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -53,6 +53,11 @@ type mockRoomserverAPI struct { roomIDToJoinedMembers map[string][]string } +func (s *mockRoomserverAPI) QueryMembershipAtEvent(ctx context.Context, request *api.QueryMembersipAtEventRequest, response *api.QueryMembersipAtEventResponse) error { + //TODO implement me + panic("implement me") +} + // QueryRoomsForUser retrieves a list of room IDs matching the given query. func (s *mockRoomserverAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error { return nil diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go index 48ad4be37..769eb4f79 100644 --- a/syncapi/routing/context.go +++ b/syncapi/routing/context.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "strconv" + "time" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/internal/caching" @@ -109,11 +110,16 @@ func Context( } // verify the user is allowed to see the context for this room/event - filteredEvent, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, []*gomatrixserverlib.HeaderedEvent{&requestedEvent}, nil, device.UserID) + startTime := time.Now() + filteredEvent, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, rsAPI, []*gomatrixserverlib.HeaderedEvent{&requestedEvent}, nil, device.UserID, "context") if err != nil { logrus.WithError(err).Error("unable to apply history visibility filter") return jsonerror.InternalServerError() } + logrus.WithFields(logrus.Fields{ + "duration": time.Since(startTime), + "room_id": roomID, + }).Debug("applied history visibility (context)") if len(filteredEvent) == 0 { return util.JSONResponse{ Code: http.StatusForbidden, @@ -133,17 +139,18 @@ func Context( return jsonerror.InternalServerError() } - eventsBeforeFiltered, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, eventsBefore, nil, device.UserID) - if err != nil { - logrus.WithError(err).Error("unable to apply history visibility filter") - return jsonerror.InternalServerError() - } - eventsAfterFiltered, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, eventsAfter, nil, device.UserID) + startTime = time.Now() + eventsBeforeFiltered, eventsAfterFiltered, err := applyHistoryVisibilityOnContextEvents(ctx, syncDB, rsAPI, eventsBefore, eventsAfter, device.UserID) if err != nil { logrus.WithError(err).Error("unable to apply history visibility filter") return jsonerror.InternalServerError() } + logrus.WithFields(logrus.Fields{ + "duration": time.Since(startTime), + "room_id": roomID, + }).Debug("applied history visibility (context eventsBefore/eventsAfter)") + // TODO: Get the actual state at the last event returned by SelectContextAfterEvent state, err := syncDB.CurrentState(ctx, roomID, &stateFilter, nil) if err != nil { @@ -176,6 +183,44 @@ func Context( } } +// applyHistoryVisibilityOnContextEvents is a helper function to avoid roundtrips to the roomserver +// by combining the events before and after the context event. Returns the filtered events, +// and an error, if any. +func applyHistoryVisibilityOnContextEvents( + ctx context.Context, syncDB storage.Database, rsAPI roomserver.SyncRoomserverAPI, + eventsBefore, eventsAfter []*gomatrixserverlib.HeaderedEvent, + userID string, +) (filteredBefore, filteredAfter []*gomatrixserverlib.HeaderedEvent, err error) { + eventIDsBefore := make(map[string]struct{}, len(eventsBefore)) + eventIDsAfter := make(map[string]struct{}, len(eventsAfter)) + + // Remember before/after eventIDs, so we can restore them + // after applying history visibility checks + for _, ev := range eventsBefore { + eventIDsBefore[ev.EventID()] = struct{}{} + } + for _, ev := range eventsAfter { + eventIDsAfter[ev.EventID()] = struct{}{} + } + + allEvents := append(eventsBefore, eventsAfter...) + filteredEvents, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, rsAPI, allEvents, nil, userID, "context") + if err != nil { + return nil, nil, err + } + + // "Restore" events in the correct context + for _, ev := range filteredEvents { + if _, ok := eventIDsBefore[ev.EventID()]; ok { + filteredBefore = append(filteredBefore, ev) + } + if _, ok := eventIDsAfter[ev.EventID()]; ok { + filteredAfter = append(filteredAfter, ev) + } + } + return filteredBefore, filteredAfter, nil +} + func getStartEnd(ctx context.Context, syncDB storage.Database, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) { if len(startEvents) > 0 { start, err = syncDB.EventPositionInTopology(ctx, startEvents[0].EventID()) diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index fab9ce7e2..e693d2d8d 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -19,6 +19,7 @@ import ( "fmt" "net/http" "sort" + "time" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/internal/caching" @@ -345,8 +346,12 @@ func (r *messagesReq) retrieveEvents() ( } // Apply room history visibility filter - filteredEvents, err := internal.ApplyHistoryVisibilityFilter(r.ctx, r.db, events, nil, r.device.UserID) - + startTime := time.Now() + filteredEvents, err := internal.ApplyHistoryVisibilityFilter(r.ctx, r.db, r.rsAPI, events, nil, r.device.UserID, "messages") + logrus.WithFields(logrus.Fields{ + "duration": time.Since(startTime), + "room_id": r.roomID, + }).Debug("applied history visibility (messages)") return gomatrixserverlib.HeaderedToClientEvents(filteredEvents, gomatrixserverlib.FormatAll), start, end, err } diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 9d3c71d13..d05958726 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -114,7 +114,7 @@ func (p *PDUStreamProvider) CompleteSync( var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( - ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, + ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") @@ -140,7 +140,7 @@ func (p *PDUStreamProvider) CompleteSync( if !peek.Deleted { var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( - ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, + ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, true, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") @@ -300,7 +300,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( switch delta.Membership { case gomatrixserverlib.Join: // We need to make sure we always include the latest states events, if they are in the timeline - events, err := applyHistoryVisibilityFilter(ctx, p.DB, delta.RoomID, device.UserID, recentEvents) + events, err := applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, delta.RoomID, device.UserID, recentEvents) if err != nil { logrus.WithError(err).Error("unable to apply history visibility filter") } @@ -315,15 +315,10 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( res.Rooms.Join[delta.RoomID] = *jr case gomatrixserverlib.Peek: - events, err := applyHistoryVisibilityFilter(ctx, p.DB, delta.RoomID, device.UserID, recentEvents) - if err != nil { - logrus.WithError(err).Error("unable to apply history visibility filter") - } - jr := types.NewJoinResponse() jr.Timeline.PrevBatch = &prevBatch - jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = limited && len(events) == len(recentEvents) + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = limited jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) res.Rooms.Peek[delta.RoomID] = *jr @@ -349,6 +344,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( func applyHistoryVisibilityFilter( ctx context.Context, db storage.Database, + rsAPI roomserverAPI.SyncRoomserverAPI, roomID, userID string, recentEvents []*gomatrixserverlib.HeaderedEvent, ) ([]*gomatrixserverlib.HeaderedEvent, error) { @@ -361,11 +357,16 @@ func applyHistoryVisibilityFilter( for _, ev := range stateEvents { alwaysIncludeIDs[ev.EventID()] = struct{}{} } - events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, recentEvents, alwaysIncludeIDs, userID) + startTime := time.Now() + events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync") if err != nil { return nil, err } + logrus.WithFields(logrus.Fields{ + "duration": time.Since(startTime), + "room_id": roomID, + }).Debug("applied history visibility (sync)") return events, err } @@ -416,6 +417,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( eventFilter *gomatrixserverlib.RoomEventFilter, wantFullState bool, device *userapi.Device, + isPeek bool, ) (jr *types.JoinResponse, err error) { jr = types.NewJoinResponse() // TODO: When filters are added, we may need to call this multiple times to get enough events. @@ -473,10 +475,15 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) - events, err := applyHistoryVisibilityFilter(ctx, p.DB, roomID, device.UserID, recentEvents) - if err != nil { - logrus.WithError(err).Error("unable to apply history visibility filter") + events := recentEvents + // Only apply history visibility checks if the response is for joined rooms + if !isPeek { + events, err = applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, roomID, device.UserID, recentEvents) + if err != nil { + logrus.WithError(err).Error("unable to apply history visibility filter") + } } + limited = limited && len(events) == len(recentEvents) if stateFilter.LazyLoadMembers {