Tweaks and logging

This commit is contained in:
Till Faelligen 2022-07-22 07:32:29 +02:00
parent e24dcaa205
commit 2502e66592
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
5 changed files with 164 additions and 43 deletions

View file

@ -17,12 +17,36 @@ package internal
import ( import (
"context" "context"
"math" "math"
"sync"
"time"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
) )
var registerOnce = &sync.Once{}
// calculateHistoryVisibilityDuration stores the time it takes to
// calculate the history visibility. In polylith mode the roundtrip
// to the roomserver is included in this time.
var calculateHistoryVisibilityDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dendrite",
Subsystem: "syncapi",
Name: "calculateHistoryVisibility_duration_millis",
Help: "How long it takes to calculate the history visibility",
Buckets: []float64{ // milliseconds
5, 10, 25, 50, 75, 100, 250, 500,
1000, 2000, 3000, 4000, 5000, 6000,
7000, 8000, 9000, 10000, 15000, 20000,
},
},
[]string{"api"},
)
var historyVisibilityPriority = map[gomatrixserverlib.HistoryVisibility]uint8{ var historyVisibilityPriority = map[gomatrixserverlib.HistoryVisibility]uint8{
gomatrixserverlib.WorldReadable: 0, gomatrixserverlib.WorldReadable: 0,
gomatrixserverlib.HistoryVisibilityShared: 1, gomatrixserverlib.HistoryVisibilityShared: 1,
@ -75,14 +99,18 @@ func (ev eventVisibility) allowed() (allowed bool) {
func ApplyHistoryVisibilityFilter( func ApplyHistoryVisibilityFilter(
ctx context.Context, ctx context.Context,
syncDB storage.Database, syncDB storage.Database,
rsAPI api.SyncRoomserverAPI,
events []*gomatrixserverlib.HeaderedEvent, events []*gomatrixserverlib.HeaderedEvent,
alwaysIncludeEventIDs map[string]struct{}, alwaysIncludeEventIDs map[string]struct{},
userID string, userID, endpoint string,
) ([]*gomatrixserverlib.HeaderedEvent, error) { ) ([]*gomatrixserverlib.HeaderedEvent, error) {
eventsFiltered := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events)) registerOnce.Do(func() {
prometheus.MustRegister(calculateHistoryVisibilityDuration)
})
if len(events) == 0 { if len(events) == 0 {
return events, nil return events, nil
} }
start := time.Now()
// try to get the current membership of the user // try to get the current membership of the user
membershipCurrent, _, err := syncDB.SelectMembershipForUser(ctx, events[0].RoomID(), userID, math.MaxInt64) membershipCurrent, _, err := syncDB.SelectMembershipForUser(ctx, events[0].RoomID(), userID, math.MaxInt64)
@ -90,12 +118,21 @@ func ApplyHistoryVisibilityFilter(
return nil, err return nil, err
} }
for _, ev := range events { eventIDs := make([]string, len(events))
event, err := visibilityForEvent(ctx, syncDB, ev, userID) for i := range events {
eventIDs[i] = events[i].EventID()
}
// Get the mapping from eventID -> eventVisibility
eventsFiltered := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events))
event, err := visibilityForEvents(ctx, rsAPI, eventIDs, userID, events[0].RoomID())
if err != nil { if err != nil {
return eventsFiltered, err return eventsFiltered, err
} }
event.membershipCurrent = membershipCurrent for _, ev := range events {
d := event[ev.EventID()]
d.membershipCurrent = membershipCurrent
d.visibility = ev.Visibility
// Always include specific state events for /sync responses // Always include specific state events for /sync responses
if alwaysIncludeEventIDs != nil { if alwaysIncludeEventIDs != nil {
if _, ok := alwaysIncludeEventIDs[ev.EventID()]; ok { if _, ok := alwaysIncludeEventIDs[ev.EventID()]; ok {
@ -115,31 +152,53 @@ func ApplyHistoryVisibilityFilter(
// no OK check, since this should have been validated when setting the value // no OK check, since this should have been validated when setting the value
newPrio := historyVisibilityPriority[hisVis] newPrio := historyVisibilityPriority[hisVis]
if oldPrio < newPrio { if oldPrio < newPrio {
event.visibility = gomatrixserverlib.HistoryVisibility(prevHisVis) d.visibility = gomatrixserverlib.HistoryVisibility(prevHisVis)
} }
} }
} }
// do the actual check // do the actual check
allowed := event.allowed() allowed := d.allowed()
if allowed { if allowed {
eventsFiltered = append(eventsFiltered, ev) eventsFiltered = append(eventsFiltered, ev)
} }
} }
calculateHistoryVisibilityDuration.With(prometheus.Labels{"api": endpoint}).Observe(float64(time.Since(start).Milliseconds()))
return eventsFiltered, nil return eventsFiltered, nil
} }
// visibilityForEvent returns an eventVisibility containing the visibility and the membership at the given event. // visibilityForEvents returns a map from eventID to eventVisibility containing the visibility and the membership at the given event.
// Returns an error if the database returns an error. // Returns an error if the roomserver can't calculate the memberships.
func visibilityForEvent(ctx context.Context, db storage.Database, event *gomatrixserverlib.HeaderedEvent, userID string) (eventVisibility, error) { func visibilityForEvents(ctx context.Context, rsAPI api.SyncRoomserverAPI, eventIDs []string, userID, roomID string) (map[string]eventVisibility, error) {
// get the membership event res := make(map[string]eventVisibility, len(eventIDs))
var membershipAtEvent string
membershipAtEvent, _, err := db.SelectMembershipForUser(ctx, event.RoomID(), userID, event.Depth()) // get the membership events for all eventIDs
resp := &api.QueryMembersipAtEventResponse{}
err := rsAPI.QueryMembershipAtEvent(ctx, &api.QueryMembersipAtEventRequest{
RoomID: roomID,
EventIDs: eventIDs,
UserID: userID,
}, resp)
if err != nil { if err != nil {
return eventVisibility{}, err return res, err
} }
return eventVisibility{ // Create a map from eventID -> eventVisibility
visibility: event.Visibility, for _, eventID := range eventIDs {
membershipAtEvent: membershipAtEvent, vis := eventVisibility{membershipAtEvent: gomatrixserverlib.Leave}
}, nil events, ok := resp.Memberships[eventID]
if !ok {
res[eventID] = vis
continue
}
for _, ev := range events {
membership, err := ev.Membership()
if err != nil {
return res, err
}
vis.membershipAtEvent = membership
}
res[eventID] = vis
}
return res, nil
} }

View file

@ -53,6 +53,11 @@ type mockRoomserverAPI struct {
roomIDToJoinedMembers map[string][]string roomIDToJoinedMembers map[string][]string
} }
func (s *mockRoomserverAPI) QueryMembershipAtEvent(ctx context.Context, request *api.QueryMembersipAtEventRequest, response *api.QueryMembersipAtEventResponse) error {
//TODO implement me
panic("implement me")
}
// QueryRoomsForUser retrieves a list of room IDs matching the given query. // QueryRoomsForUser retrieves a list of room IDs matching the given query.
func (s *mockRoomserverAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error { func (s *mockRoomserverAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error {
return nil return nil

View file

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"strconv" "strconv"
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
@ -109,11 +110,16 @@ func Context(
} }
// verify the user is allowed to see the context for this room/event // verify the user is allowed to see the context for this room/event
filteredEvent, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, []*gomatrixserverlib.HeaderedEvent{&requestedEvent}, nil, device.UserID) startTime := time.Now()
filteredEvent, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, rsAPI, []*gomatrixserverlib.HeaderedEvent{&requestedEvent}, nil, device.UserID, "context")
if err != nil { if err != nil {
logrus.WithError(err).Error("unable to apply history visibility filter") logrus.WithError(err).Error("unable to apply history visibility filter")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
logrus.WithFields(logrus.Fields{
"duration": time.Since(startTime),
"room_id": roomID,
}).Debug("applied history visibility (context)")
if len(filteredEvent) == 0 { if len(filteredEvent) == 0 {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusForbidden, Code: http.StatusForbidden,
@ -133,17 +139,18 @@ func Context(
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
eventsBeforeFiltered, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, eventsBefore, nil, device.UserID) startTime = time.Now()
if err != nil { eventsBeforeFiltered, eventsAfterFiltered, err := applyHistoryVisibilityOnContextEvents(ctx, syncDB, rsAPI, eventsBefore, eventsAfter, device.UserID)
logrus.WithError(err).Error("unable to apply history visibility filter")
return jsonerror.InternalServerError()
}
eventsAfterFiltered, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, eventsAfter, nil, device.UserID)
if err != nil { if err != nil {
logrus.WithError(err).Error("unable to apply history visibility filter") logrus.WithError(err).Error("unable to apply history visibility filter")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
logrus.WithFields(logrus.Fields{
"duration": time.Since(startTime),
"room_id": roomID,
}).Debug("applied history visibility (context eventsBefore/eventsAfter)")
// TODO: Get the actual state at the last event returned by SelectContextAfterEvent // TODO: Get the actual state at the last event returned by SelectContextAfterEvent
state, err := syncDB.CurrentState(ctx, roomID, &stateFilter, nil) state, err := syncDB.CurrentState(ctx, roomID, &stateFilter, nil)
if err != nil { if err != nil {
@ -176,6 +183,44 @@ func Context(
} }
} }
// applyHistoryVisibilityOnContextEvents is a helper function to avoid roundtrips to the roomserver
// by combining the events before and after the context event. Returns the filtered events,
// and an error, if any.
func applyHistoryVisibilityOnContextEvents(
ctx context.Context, syncDB storage.Database, rsAPI roomserver.SyncRoomserverAPI,
eventsBefore, eventsAfter []*gomatrixserverlib.HeaderedEvent,
userID string,
) (filteredBefore, filteredAfter []*gomatrixserverlib.HeaderedEvent, err error) {
eventIDsBefore := make(map[string]struct{}, len(eventsBefore))
eventIDsAfter := make(map[string]struct{}, len(eventsAfter))
// Remember before/after eventIDs, so we can restore them
// after applying history visibility checks
for _, ev := range eventsBefore {
eventIDsBefore[ev.EventID()] = struct{}{}
}
for _, ev := range eventsAfter {
eventIDsAfter[ev.EventID()] = struct{}{}
}
allEvents := append(eventsBefore, eventsAfter...)
filteredEvents, err := internal.ApplyHistoryVisibilityFilter(ctx, syncDB, rsAPI, allEvents, nil, userID, "context")
if err != nil {
return nil, nil, err
}
// "Restore" events in the correct context
for _, ev := range filteredEvents {
if _, ok := eventIDsBefore[ev.EventID()]; ok {
filteredBefore = append(filteredBefore, ev)
}
if _, ok := eventIDsAfter[ev.EventID()]; ok {
filteredAfter = append(filteredAfter, ev)
}
}
return filteredBefore, filteredAfter, nil
}
func getStartEnd(ctx context.Context, syncDB storage.Database, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) { func getStartEnd(ctx context.Context, syncDB storage.Database, startEvents, endEvents []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) {
if len(startEvents) > 0 { if len(startEvents) > 0 {
start, err = syncDB.EventPositionInTopology(ctx, startEvents[0].EventID()) start, err = syncDB.EventPositionInTopology(ctx, startEvents[0].EventID())

View file

@ -19,6 +19,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"sort" "sort"
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
@ -345,8 +346,12 @@ func (r *messagesReq) retrieveEvents() (
} }
// Apply room history visibility filter // Apply room history visibility filter
filteredEvents, err := internal.ApplyHistoryVisibilityFilter(r.ctx, r.db, events, nil, r.device.UserID) startTime := time.Now()
filteredEvents, err := internal.ApplyHistoryVisibilityFilter(r.ctx, r.db, r.rsAPI, events, nil, r.device.UserID, "messages")
logrus.WithFields(logrus.Fields{
"duration": time.Since(startTime),
"room_id": r.roomID,
}).Debug("applied history visibility (messages)")
return gomatrixserverlib.HeaderedToClientEvents(filteredEvents, gomatrixserverlib.FormatAll), start, end, err return gomatrixserverlib.HeaderedToClientEvents(filteredEvents, gomatrixserverlib.FormatAll), start, end, err
} }

View file

@ -114,7 +114,7 @@ func (p *PDUStreamProvider) CompleteSync(
var jr *types.JoinResponse var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync( jr, err = p.getJoinResponseForCompleteSync(
ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false,
) )
if err != nil { if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@ -140,7 +140,7 @@ func (p *PDUStreamProvider) CompleteSync(
if !peek.Deleted { if !peek.Deleted {
var jr *types.JoinResponse var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync( jr, err = p.getJoinResponseForCompleteSync(
ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, true,
) )
if err != nil { if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@ -300,7 +300,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
switch delta.Membership { switch delta.Membership {
case gomatrixserverlib.Join: case gomatrixserverlib.Join:
// We need to make sure we always include the latest states events, if they are in the timeline // We need to make sure we always include the latest states events, if they are in the timeline
events, err := applyHistoryVisibilityFilter(ctx, p.DB, delta.RoomID, device.UserID, recentEvents) events, err := applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, delta.RoomID, device.UserID, recentEvents)
if err != nil { if err != nil {
logrus.WithError(err).Error("unable to apply history visibility filter") logrus.WithError(err).Error("unable to apply history visibility filter")
} }
@ -315,15 +315,10 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
res.Rooms.Join[delta.RoomID] = *jr res.Rooms.Join[delta.RoomID] = *jr
case gomatrixserverlib.Peek: case gomatrixserverlib.Peek:
events, err := applyHistoryVisibilityFilter(ctx, p.DB, delta.RoomID, device.UserID, recentEvents)
if err != nil {
logrus.WithError(err).Error("unable to apply history visibility filter")
}
jr := types.NewJoinResponse() jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited && len(events) == len(recentEvents) jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Peek[delta.RoomID] = *jr res.Rooms.Peek[delta.RoomID] = *jr
@ -349,6 +344,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
func applyHistoryVisibilityFilter( func applyHistoryVisibilityFilter(
ctx context.Context, ctx context.Context,
db storage.Database, db storage.Database,
rsAPI roomserverAPI.SyncRoomserverAPI,
roomID, userID string, roomID, userID string,
recentEvents []*gomatrixserverlib.HeaderedEvent, recentEvents []*gomatrixserverlib.HeaderedEvent,
) ([]*gomatrixserverlib.HeaderedEvent, error) { ) ([]*gomatrixserverlib.HeaderedEvent, error) {
@ -361,11 +357,16 @@ func applyHistoryVisibilityFilter(
for _, ev := range stateEvents { for _, ev := range stateEvents {
alwaysIncludeIDs[ev.EventID()] = struct{}{} alwaysIncludeIDs[ev.EventID()] = struct{}{}
} }
events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, recentEvents, alwaysIncludeIDs, userID) startTime := time.Now()
events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync")
if err != nil { if err != nil {
return nil, err return nil, err
} }
logrus.WithFields(logrus.Fields{
"duration": time.Since(startTime),
"room_id": roomID,
}).Debug("applied history visibility (sync)")
return events, err return events, err
} }
@ -416,6 +417,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
eventFilter *gomatrixserverlib.RoomEventFilter, eventFilter *gomatrixserverlib.RoomEventFilter,
wantFullState bool, wantFullState bool,
device *userapi.Device, device *userapi.Device,
isPeek bool,
) (jr *types.JoinResponse, err error) { ) (jr *types.JoinResponse, err error) {
jr = types.NewJoinResponse() jr = types.NewJoinResponse()
// TODO: When filters are added, we may need to call this multiple times to get enough events. // TODO: When filters are added, we may need to call this multiple times to get enough events.
@ -473,10 +475,15 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents) stateEvents = removeDuplicates(stateEvents, recentEvents)
events, err := applyHistoryVisibilityFilter(ctx, p.DB, roomID, device.UserID, recentEvents) events := recentEvents
// Only apply history visibility checks if the response is for joined rooms
if !isPeek {
events, err = applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, roomID, device.UserID, recentEvents)
if err != nil { if err != nil {
logrus.WithError(err).Error("unable to apply history visibility filter") logrus.WithError(err).Error("unable to apply history visibility filter")
} }
}
limited = limited && len(events) == len(recentEvents) limited = limited && len(events) == len(recentEvents)
if stateFilter.LazyLoadMembers { if stateFilter.LazyLoadMembers {