mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-06 14:33:10 -06:00
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/joined-members
This commit is contained in:
commit
5f74a5cab7
|
|
@ -647,6 +647,8 @@ func MakeFedAPI(
|
||||||
// add the user to Sentry, if enabled
|
// add the user to Sentry, if enabled
|
||||||
hub := sentry.GetHubFromContext(req.Context())
|
hub := sentry.GetHubFromContext(req.Context())
|
||||||
if hub != nil {
|
if hub != nil {
|
||||||
|
// clone the hub, so we don't send garbage events with e.g. mismatching rooms/event_ids
|
||||||
|
hub = hub.Clone()
|
||||||
hub.Scope().SetTag("origin", string(fedReq.Origin()))
|
hub.Scope().SetTag("origin", string(fedReq.Origin()))
|
||||||
hub.Scope().SetTag("uri", fedReq.RequestURI())
|
hub.Scope().SetTag("uri", fedReq.RequestURI())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -76,6 +76,8 @@ func MakeAuthAPI(
|
||||||
// add the user to Sentry, if enabled
|
// add the user to Sentry, if enabled
|
||||||
hub := sentry.GetHubFromContext(req.Context())
|
hub := sentry.GetHubFromContext(req.Context())
|
||||||
if hub != nil {
|
if hub != nil {
|
||||||
|
// clone the hub, so we don't send garbage events with e.g. mismatching rooms/event_ids
|
||||||
|
hub = hub.Clone()
|
||||||
hub.Scope().SetUser(sentry.User{
|
hub.Scope().SetUser(sentry.User{
|
||||||
Username: device.UserID,
|
Username: device.UserID,
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -108,6 +108,8 @@ func MakeRelayAPI(
|
||||||
// add the user to Sentry, if enabled
|
// add the user to Sentry, if enabled
|
||||||
hub := sentry.GetHubFromContext(req.Context())
|
hub := sentry.GetHubFromContext(req.Context())
|
||||||
if hub != nil {
|
if hub != nil {
|
||||||
|
// clone the hub, so we don't send garbage events with e.g. mismatching rooms/event_ids
|
||||||
|
hub = hub.Clone()
|
||||||
hub.Scope().SetTag("origin", string(fedReq.Origin()))
|
hub.Scope().SetTag("origin", string(fedReq.Origin()))
|
||||||
hub.Scope().SetTag("uri", fedReq.RequestURI())
|
hub.Scope().SetTag("uri", fedReq.RequestURI())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -108,12 +108,14 @@ type worker struct {
|
||||||
r *Inputer
|
r *Inputer
|
||||||
roomID string
|
roomID string
|
||||||
subscription *nats.Subscription
|
subscription *nats.Subscription
|
||||||
|
sentryHub *sentry.Hub
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Inputer) startWorkerForRoom(roomID string) {
|
func (r *Inputer) startWorkerForRoom(roomID string) {
|
||||||
v, loaded := r.workers.LoadOrStore(roomID, &worker{
|
v, loaded := r.workers.LoadOrStore(roomID, &worker{
|
||||||
r: r,
|
r: r,
|
||||||
roomID: roomID,
|
roomID: roomID,
|
||||||
|
sentryHub: sentry.CurrentHub().Clone(),
|
||||||
})
|
})
|
||||||
w := v.(*worker)
|
w := v.(*worker)
|
||||||
w.Lock()
|
w.Lock()
|
||||||
|
|
@ -265,9 +267,9 @@ func (w *worker) _next() {
|
||||||
// Look up what the next event is that's waiting to be processed.
|
// Look up what the next event is that's waiting to be processed.
|
||||||
ctx, cancel := context.WithTimeout(w.r.ProcessContext.Context(), time.Minute)
|
ctx, cancel := context.WithTimeout(w.r.ProcessContext.Context(), time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if scope := sentry.CurrentHub().Scope(); scope != nil {
|
w.sentryHub.ConfigureScope(func(scope *sentry.Scope) {
|
||||||
scope.SetTag("room_id", w.roomID)
|
scope.SetTag("room_id", w.roomID)
|
||||||
}
|
})
|
||||||
msgs, err := w.subscription.Fetch(1, nats.Context(ctx))
|
msgs, err := w.subscription.Fetch(1, nats.Context(ctx))
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
|
|
@ -323,9 +325,9 @@ func (w *worker) _next() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if scope := sentry.CurrentHub().Scope(); scope != nil {
|
w.sentryHub.ConfigureScope(func(scope *sentry.Scope) {
|
||||||
scope.SetTag("event_id", inputRoomEvent.Event.EventID())
|
scope.SetTag("event_id", inputRoomEvent.Event.EventID())
|
||||||
}
|
})
|
||||||
|
|
||||||
// Process the room event. If something goes wrong then we'll tell
|
// Process the room event. If something goes wrong then we'll tell
|
||||||
// NATS to terminate the message. We'll store the error result as
|
// NATS to terminate the message. We'll store the error result as
|
||||||
|
|
@ -347,7 +349,7 @@ func (w *worker) _next() {
|
||||||
}).Warn("Roomserver rejected event")
|
}).Warn("Roomserver rejected event")
|
||||||
default:
|
default:
|
||||||
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
|
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
|
||||||
sentry.CaptureException(err)
|
w.sentryHub.CaptureException(err)
|
||||||
}
|
}
|
||||||
logrus.WithError(err).WithFields(logrus.Fields{
|
logrus.WithError(err).WithFields(logrus.Fields{
|
||||||
"room_id": w.roomID,
|
"room_id": w.roomID,
|
||||||
|
|
|
||||||
|
|
@ -298,6 +298,7 @@ func (u *latestEventsUpdater) latestState() error {
|
||||||
}).Warnf("State reset detected (removing %d events)", removed)
|
}).Warnf("State reset detected (removing %d events)", removed)
|
||||||
sentry.WithScope(func(scope *sentry.Scope) {
|
sentry.WithScope(func(scope *sentry.Scope) {
|
||||||
scope.SetLevel("warning")
|
scope.SetLevel("warning")
|
||||||
|
scope.SetTag("room_id", u.event.RoomID().String())
|
||||||
scope.SetContext("State reset", map[string]interface{}{
|
scope.SetContext("State reset", map[string]interface{}{
|
||||||
"Event ID": u.event.EventID(),
|
"Event ID": u.event.EventID(),
|
||||||
"Old state NID": fmt.Sprintf("%d", u.oldStateNID),
|
"Old state NID": fmt.Sprintf("%d", u.oldStateNID),
|
||||||
|
|
|
||||||
|
|
@ -203,6 +203,12 @@ func (p *PDUStreamProvider) IncrementalSync(
|
||||||
req.Log.WithError(err).Error("unable to update event filter with ignored users")
|
req.Log.WithError(err).Error("unable to update event filter with ignored users")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dbEvents, err := p.getRecentEvents(ctx, stateDeltas, r, eventFilter, snapshot)
|
||||||
|
if err != nil {
|
||||||
|
req.Log.WithError(err).Error("unable to get recent events")
|
||||||
|
return r.From
|
||||||
|
}
|
||||||
|
|
||||||
newPos = from
|
newPos = from
|
||||||
for _, delta := range stateDeltas {
|
for _, delta := range stateDeltas {
|
||||||
newRange := r
|
newRange := r
|
||||||
|
|
@ -218,7 +224,7 @@ func (p *PDUStreamProvider) IncrementalSync(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var pos types.StreamPosition
|
var pos types.StreamPosition
|
||||||
if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req); err != nil {
|
if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req, dbEvents); err != nil {
|
||||||
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
|
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
|
||||||
if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone {
|
if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone {
|
||||||
return newPos
|
return newPos
|
||||||
|
|
@ -240,6 +246,66 @@ func (p *PDUStreamProvider) IncrementalSync(
|
||||||
return newPos
|
return newPos
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *PDUStreamProvider) getRecentEvents(ctx context.Context, stateDeltas []types.StateDelta, r types.Range, eventFilter synctypes.RoomEventFilter, snapshot storage.DatabaseTransaction) (map[string]types.RecentEvents, error) {
|
||||||
|
var roomIDs []string
|
||||||
|
var newlyJoinedRoomIDs []string
|
||||||
|
for _, delta := range stateDeltas {
|
||||||
|
if delta.NewlyJoined {
|
||||||
|
newlyJoinedRoomIDs = append(newlyJoinedRoomIDs, delta.RoomID)
|
||||||
|
} else {
|
||||||
|
roomIDs = append(roomIDs, delta.RoomID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dbEvents := make(map[string]types.RecentEvents)
|
||||||
|
if len(roomIDs) > 0 {
|
||||||
|
events, err := snapshot.RecentEvents(
|
||||||
|
ctx, roomIDs, r,
|
||||||
|
&eventFilter, true, true,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
if err != sql.ErrNoRows {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for k, v := range events {
|
||||||
|
dbEvents[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(newlyJoinedRoomIDs) > 0 {
|
||||||
|
// For rooms that were joined in this sync, try to fetch
|
||||||
|
// as much timeline events as allowed by the filter.
|
||||||
|
|
||||||
|
filter := eventFilter
|
||||||
|
// If we're going backwards, grep at least X events, this is mostly to satisfy Sytest
|
||||||
|
if eventFilter.Limit < recentEventBackwardsLimit {
|
||||||
|
filter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way
|
||||||
|
diff := r.From - r.To
|
||||||
|
if diff > 0 && diff < recentEventBackwardsLimit {
|
||||||
|
filter.Limit = int(diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
events, err := snapshot.RecentEvents(
|
||||||
|
ctx, newlyJoinedRoomIDs, types.Range{
|
||||||
|
From: r.To,
|
||||||
|
To: 0,
|
||||||
|
Backwards: true,
|
||||||
|
},
|
||||||
|
&filter, true, true,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
if err != sql.ErrNoRows {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for k, v := range events {
|
||||||
|
dbEvents[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return dbEvents, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Limit the recent events to X when going backwards
|
// Limit the recent events to X when going backwards
|
||||||
const recentEventBackwardsLimit = 100
|
const recentEventBackwardsLimit = 100
|
||||||
|
|
||||||
|
|
@ -253,29 +319,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
eventFilter *synctypes.RoomEventFilter,
|
eventFilter *synctypes.RoomEventFilter,
|
||||||
stateFilter *synctypes.StateFilter,
|
stateFilter *synctypes.StateFilter,
|
||||||
req *types.SyncRequest,
|
req *types.SyncRequest,
|
||||||
|
dbEvents map[string]types.RecentEvents,
|
||||||
) (types.StreamPosition, error) {
|
) (types.StreamPosition, error) {
|
||||||
var err error
|
var err error
|
||||||
originalLimit := eventFilter.Limit
|
|
||||||
// If we're going backwards, grep at least X events, this is mostly to satisfy Sytest
|
|
||||||
if r.Backwards && originalLimit < recentEventBackwardsLimit {
|
|
||||||
eventFilter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way
|
|
||||||
diff := r.From - r.To
|
|
||||||
if diff > 0 && diff < recentEventBackwardsLimit {
|
|
||||||
eventFilter.Limit = int(diff)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
dbEvents, err := snapshot.RecentEvents(
|
|
||||||
ctx, []string{delta.RoomID}, r,
|
|
||||||
eventFilter, true, true,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
return r.To, nil
|
|
||||||
}
|
|
||||||
return r.From, fmt.Errorf("p.DB.RecentEvents: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
recentStreamEvents := dbEvents[delta.RoomID].Events
|
recentStreamEvents := dbEvents[delta.RoomID].Events
|
||||||
limited := dbEvents[delta.RoomID].Limited
|
limited := dbEvents[delta.RoomID].Limited
|
||||||
|
|
||||||
|
|
@ -337,9 +383,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
logrus.WithError(err).Error("unable to apply history visibility filter")
|
logrus.WithError(err).Error("unable to apply history visibility filter")
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.Backwards && len(events) > originalLimit {
|
if r.Backwards && len(events) > eventFilter.Limit {
|
||||||
// We're going backwards and the events are ordered chronologically, so take the last `limit` events
|
// We're going backwards and the events are ordered chronologically, so take the last `limit` events
|
||||||
events = events[len(events)-originalLimit:]
|
events = events[len(events)-eventFilter.Limit:]
|
||||||
limited = true
|
limited = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue