Compare commits

...

20 commits

Author SHA1 Message Date
Till Faelligen 9e2afb5586
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/latest-event-updater 2024-01-22 07:59:56 +01:00
Till Faelligen 329d15ef44
Revert "Try to fix state reset sentry messages, maybe?"
This reverts commit 925843d05b.
2023-12-29 23:03:53 +01:00
Till Faelligen 925843d05b
Try to fix state reset sentry messages, maybe? 2023-12-29 22:19:45 +01:00
Till Faelligen 2c81b060d6
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/latest-event-updater 2023-12-29 19:20:49 +01:00
Till d5204ef67f
Merge branch 'main' into s7evink/latest-event-updater 2023-12-13 10:13:50 +01:00
Till Faelligen a165781f4a
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/latest-event-updater 2023-11-29 09:17:33 +01:00
Till Faelligen 4ae7335b01
Be more verbose when logging state resets 2023-11-29 09:15:04 +01:00
Till Faelligen 143285708b
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/latest-event-updater 2023-11-25 19:13:24 +01:00
Till Faelligen 4fec392cc4
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/latest-event-updater 2023-11-24 23:38:36 +01:00
Till Faelligen 2f99680c38
Move checking if the event has been sent as well 2023-11-24 10:05:30 +01:00
Till Faelligen 94960c507a
Move SetLatestEvents 2023-11-24 08:11:50 +01:00
Till Faelligen 7db3e9f689
Move context 2023-11-23 20:34:48 +01:00
Till Faelligen 0f74cbfb27
Move historyVisibility 2023-11-23 19:48:53 +01:00
Till Faelligen 7999ef09d5
Move lastEventIDSent 2023-11-23 19:46:28 +01:00
Till Faelligen 2047cca580
Move roomInfo 2023-11-23 19:44:48 +01:00
Till Faelligen 416dbcee76
No naked returns 2023-11-23 19:38:55 +01:00
Till Faelligen 06064e0b48
Move sendAsServer 2023-11-23 19:37:23 +01:00
Till Faelligen 8bbd9406f8
Remove TransactionID from struct 2023-11-23 19:34:57 +01:00
Till Faelligen 53c474f93e
Move makeOutputNewRoomEvent 2023-11-23 19:33:08 +01:00
Till Faelligen a4e111cd40
Let doUpdateLatestEvents return the updates to send to NATS 2023-11-23 19:28:59 +01:00

View file

@ -21,16 +21,15 @@ import (
"fmt"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
// updateLatestEvents updates the list of latest events for this room in the database and writes the
@ -58,7 +57,7 @@ func (r *Inputer) updateLatestEvents(
transactionID *api.TransactionID,
rewritesState bool,
historyVisibility gomatrixserverlib.HistoryVisibility,
) (err error) {
) error {
trace, ctx := internal.StartRegion(ctx, "updateLatestEvents")
defer trace.EndRegion()
@ -70,25 +69,60 @@ func (r *Inputer) updateLatestEvents(
defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err)
u := latestEventsUpdater{
ctx: ctx,
api: r,
updater: updater,
roomInfo: roomInfo,
stateAtEvent: stateAtEvent,
event: event,
sendAsServer: sendAsServer,
transactionID: transactionID,
rewritesState: rewritesState,
historyVisibility: historyVisibility,
// If the event has already been written to the output log then we
// don't need to do anything, as we've handled it already.
hasBeenSent, err := updater.HasEventBeenSent(stateAtEvent.EventNID)
if err != nil {
return fmt.Errorf("u.updater.HasEventBeenSent: %w", err)
}
if hasBeenSent {
return nil
}
if err = u.doUpdateLatestEvents(); err != nil {
u := latestEventsUpdater{
api: r,
updater: updater,
stateAtEvent: stateAtEvent,
event: event,
rewritesState: rewritesState,
}
var updates []api.OutputEvent
updates, err = u.doUpdateLatestEvents(ctx, roomInfo)
if err != nil {
return fmt.Errorf("u.doUpdateLatestEvents: %w", err)
}
update, err := u.makeOutputNewRoomEvent(ctx, transactionID, sendAsServer, updater.LastEventIDSent(), historyVisibility)
if err != nil {
return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err)
}
updates = append(updates, *update)
// Send the event to the output logs.
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
// (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but
// the write to the output log succeeds)
// TODO: This assumes that writing the event to the output log is synchronous. It should be possible to
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
// necessary bookkeeping we'll keep the event sending synchronous for now.
if len(updates) > 0 {
if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID().String(), updates); err != nil {
return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
}
if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil {
return fmt.Errorf("u.updater.MarkEventAsSent: %w", err)
}
}
if err = u.updater.SetLatestEvents(roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil {
return fmt.Errorf("u.updater.SetLatestEvents: %w", err)
}
succeeded = true
return
return nil
}
// latestEventsUpdater tracks the state used to update the latest events in the
@ -96,18 +130,11 @@ func (r *Inputer) updateLatestEvents(
// The state could be passed using function arguments, but it becomes impractical
// when there are so many variables to pass around.
type latestEventsUpdater struct {
ctx context.Context
api *Inputer
updater *shared.RoomUpdater
roomInfo *types.RoomInfo
stateAtEvent types.StateAtEvent
event gomatrixserverlib.PDU
transactionID *api.TransactionID
rewritesState bool
// Which server to send this event as.
sendAsServer string
// The eventID of the event that was processed before this one.
lastEventIDSent string
// The latest events in the room after processing this event.
oldLatest types.StateAtEventAndReferences
latest types.StateAtEventAndReferences
@ -122,13 +149,9 @@ type latestEventsUpdater struct {
// The snapshots of current state before and after processing this event
oldStateNID types.StateSnapshotNID
newStateNID types.StateSnapshotNID
// The history visibility of the event itself (from the state before the event).
historyVisibility gomatrixserverlib.HistoryVisibility
}
func (u *latestEventsUpdater) doUpdateLatestEvents() error {
u.lastEventIDSent = u.updater.LastEventIDSent()
func (u *latestEventsUpdater) doUpdateLatestEvents(ctx context.Context, roomInfo *types.RoomInfo) ([]api.OutputEvent, error) {
// If we are doing a regular event update then we will get the
// previous latest events to use as a part of the calculation. If
// we are overwriting the latest events because we have a complete
@ -141,17 +164,10 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
u.oldLatest = u.updater.LatestEvents()
}
// If the event has already been written to the output log then we
// don't need to do anything, as we've handled it already.
if hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID); err != nil {
return fmt.Errorf("u.updater.HasEventBeenSent: %w", err)
} else if hasBeenSent {
return nil
}
// Work out what the latest events are. This will include the new
// event if it is not already referenced.
extremitiesChanged, err := u.calculateLatest(
ctx,
u.oldLatest, u.event,
types.StateAtEventAndReference{
EventID: u.event.EventID(),
@ -159,61 +175,35 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
},
)
if err != nil {
return fmt.Errorf("u.calculateLatest: %w", err)
return nil, fmt.Errorf("u.calculateLatest: %w", err)
}
// Now that we know what the latest events are, it's time to get the
// latest state.
var updates []api.OutputEvent
var membershipUpdates []api.OutputEvent
if extremitiesChanged || u.rewritesState {
if err = u.latestState(); err != nil {
return fmt.Errorf("u.latestState: %w", err)
if err = u.latestState(ctx, roomInfo); err != nil {
return nil, fmt.Errorf("u.latestState: %w", err)
}
// If we need to generate any output events then here's where we do it.
// TODO: Move this!
if updates, err = u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added); err != nil {
return fmt.Errorf("u.api.updateMemberships: %w", err)
if membershipUpdates, err = u.api.updateMemberships(ctx, u.updater, u.removed, u.added); err != nil {
return nil, fmt.Errorf("u.api.updateMemberships: %w", err)
}
} else {
u.newStateNID = u.oldStateNID
}
if err = u.updater.SetLatestEvents(u.roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil {
return fmt.Errorf("u.updater.SetLatestEvents: %w", err)
return membershipUpdates, nil
}
update, err := u.makeOutputNewRoomEvent()
if err != nil {
return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err)
}
updates = append(updates, *update)
// Send the event to the output logs.
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
// (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but
// the write to the output log succeeds)
// TODO: This assumes that writing the event to the output log is synchronous. It should be possible to
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
// necessary bookkeeping we'll keep the event sending synchronous for now.
if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID().String(), updates); err != nil {
return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
}
if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil {
return fmt.Errorf("u.updater.MarkEventAsSent: %w", err)
}
return nil
}
func (u *latestEventsUpdater) latestState() error {
trace, ctx := internal.StartRegion(u.ctx, "processEventWithMissingState")
func (u *latestEventsUpdater) latestState(ctx context.Context, roomInfo *types.RoomInfo) error {
trace, ctx := internal.StartRegion(ctx, "processEventWithMissingState")
defer trace.EndRegion()
var err error
roomState := state.NewStateResolution(u.updater, u.roomInfo, u.api.Queryer)
roomState := state.NewStateResolution(u.updater, roomInfo, u.api.Queryer)
// Work out if the state at the extremities has actually changed
// or not. If they haven't then we won't bother doing all of the
@ -295,6 +285,8 @@ func (u *latestEventsUpdater) latestState() error {
"new_state_nid": u.newStateNID,
"old_latest": u.oldLatest.EventIDs(),
"new_latest": u.latest.EventIDs(),
"rewrites_state": u.rewritesState,
"state_at_event": fmt.Sprintf("%#v", u.stateAtEvent),
}).Warnf("State reset detected (removing %d events)", removed)
sentry.WithScope(func(scope *sentry.Scope) {
scope.SetLevel("warning")
@ -305,6 +297,8 @@ func (u *latestEventsUpdater) latestState() error {
"Old latest": u.oldLatest.EventIDs(),
"New latest": u.latest.EventIDs(),
"State removed": removed,
"State rewritten": fmt.Sprintf("%v", u.rewritesState),
"State at event": fmt.Sprintf("%#v", u.stateAtEvent),
})
sentry.CaptureMessage("State reset detected")
})
@ -325,11 +319,12 @@ func (u *latestEventsUpdater) latestState() error {
// calculateLatest works out the new set of forward extremities. Returns
// true if the new event is included in those extremites, false otherwise.
func (u *latestEventsUpdater) calculateLatest(
ctx context.Context,
oldLatest []types.StateAtEventAndReference,
newEvent gomatrixserverlib.PDU,
newStateAndRef types.StateAtEventAndReference,
) (bool, error) {
trace, _ := internal.StartRegion(u.ctx, "calculateLatest")
trace, _ := internal.StartRegion(ctx, "calculateLatest")
defer trace.EndRegion()
// First of all, get a list of all of the events in our current
@ -386,7 +381,13 @@ func (u *latestEventsUpdater) calculateLatest(
return true, nil
}
func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) {
func (u *latestEventsUpdater) makeOutputNewRoomEvent(
ctx context.Context,
transactionID *api.TransactionID,
sendAsServer string,
lastEventIDSent string,
historyVisibility gomatrixserverlib.HistoryVisibility,
) (*api.OutputEvent, error) {
latestEventIDs := make([]string, len(u.latest))
for i := range u.latest {
latestEventIDs[i] = u.latest[i].EventID
@ -395,14 +396,14 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
ore := api.OutputNewRoomEvent{
Event: &types.HeaderedEvent{PDU: u.event},
RewritesState: u.rewritesState,
LastSentEventID: u.lastEventIDSent,
LastSentEventID: lastEventIDSent,
LatestEventIDs: latestEventIDs,
TransactionID: u.transactionID,
SendAsServer: u.sendAsServer,
HistoryVisibility: u.historyVisibility,
TransactionID: transactionID,
SendAsServer: sendAsServer,
HistoryVisibility: historyVisibility,
}
eventIDMap, err := u.stateEventMap()
eventIDMap, err := u.stateEventMap(ctx)
if err != nil {
return nil, err
}
@ -426,7 +427,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
}
// retrieve an event nid -> event ID map for all events that need updating
func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error) {
func (u *latestEventsUpdater) stateEventMap(ctx context.Context) (map[types.EventNID]string, error) {
cap := len(u.added) + len(u.removed) + len(u.stateBeforeEventRemoves) + len(u.stateBeforeEventAdds)
stateEventNIDs := make(types.EventNIDs, 0, cap)
allStateEntries := make([]types.StateEntry, 0, cap)
@ -438,5 +439,5 @@ func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error)
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(stateEventNIDs)]
return u.updater.EventIDs(u.ctx, stateEventNIDs)
return u.updater.EventIDs(ctx, stateEventNIDs)
}