Compare commits
20 commits
main
...
s7evink/la
Author | SHA1 | Date | |
---|---|---|---|
9e2afb5586 | |||
329d15ef44 | |||
925843d05b | |||
2c81b060d6 | |||
d5204ef67f | |||
a165781f4a | |||
4ae7335b01 | |||
143285708b | |||
4fec392cc4 | |||
2f99680c38 | |||
94960c507a | |||
7db3e9f689 | |||
0f74cbfb27 | |||
7999ef09d5 | |||
2047cca580 | |||
416dbcee76 | |||
06064e0b48 | |||
8bbd9406f8 | |||
53c474f93e | |||
a4e111cd40 |
|
@ -21,16 +21,15 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/state"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// updateLatestEvents updates the list of latest events for this room in the database and writes the
|
||||
|
@ -58,7 +57,7 @@ func (r *Inputer) updateLatestEvents(
|
|||
transactionID *api.TransactionID,
|
||||
rewritesState bool,
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||
) (err error) {
|
||||
) error {
|
||||
trace, ctx := internal.StartRegion(ctx, "updateLatestEvents")
|
||||
defer trace.EndRegion()
|
||||
|
||||
|
@ -70,25 +69,60 @@ func (r *Inputer) updateLatestEvents(
|
|||
|
||||
defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err)
|
||||
|
||||
u := latestEventsUpdater{
|
||||
ctx: ctx,
|
||||
api: r,
|
||||
updater: updater,
|
||||
roomInfo: roomInfo,
|
||||
stateAtEvent: stateAtEvent,
|
||||
event: event,
|
||||
sendAsServer: sendAsServer,
|
||||
transactionID: transactionID,
|
||||
rewritesState: rewritesState,
|
||||
historyVisibility: historyVisibility,
|
||||
// If the event has already been written to the output log then we
|
||||
// don't need to do anything, as we've handled it already.
|
||||
hasBeenSent, err := updater.HasEventBeenSent(stateAtEvent.EventNID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.updater.HasEventBeenSent: %w", err)
|
||||
}
|
||||
if hasBeenSent {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err = u.doUpdateLatestEvents(); err != nil {
|
||||
u := latestEventsUpdater{
|
||||
api: r,
|
||||
updater: updater,
|
||||
stateAtEvent: stateAtEvent,
|
||||
event: event,
|
||||
rewritesState: rewritesState,
|
||||
}
|
||||
|
||||
var updates []api.OutputEvent
|
||||
updates, err = u.doUpdateLatestEvents(ctx, roomInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.doUpdateLatestEvents: %w", err)
|
||||
}
|
||||
|
||||
update, err := u.makeOutputNewRoomEvent(ctx, transactionID, sendAsServer, updater.LastEventIDSent(), historyVisibility)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err)
|
||||
}
|
||||
updates = append(updates, *update)
|
||||
|
||||
// Send the event to the output logs.
|
||||
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
|
||||
// (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but
|
||||
// the write to the output log succeeds)
|
||||
// TODO: This assumes that writing the event to the output log is synchronous. It should be possible to
|
||||
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
|
||||
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
|
||||
// necessary bookkeeping we'll keep the event sending synchronous for now.
|
||||
if len(updates) > 0 {
|
||||
if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID().String(), updates); err != nil {
|
||||
return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
|
||||
}
|
||||
|
||||
if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil {
|
||||
return fmt.Errorf("u.updater.MarkEventAsSent: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err = u.updater.SetLatestEvents(roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil {
|
||||
return fmt.Errorf("u.updater.SetLatestEvents: %w", err)
|
||||
}
|
||||
|
||||
succeeded = true
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// latestEventsUpdater tracks the state used to update the latest events in the
|
||||
|
@ -96,18 +130,11 @@ func (r *Inputer) updateLatestEvents(
|
|||
// The state could be passed using function arguments, but it becomes impractical
|
||||
// when there are so many variables to pass around.
|
||||
type latestEventsUpdater struct {
|
||||
ctx context.Context
|
||||
api *Inputer
|
||||
updater *shared.RoomUpdater
|
||||
roomInfo *types.RoomInfo
|
||||
stateAtEvent types.StateAtEvent
|
||||
event gomatrixserverlib.PDU
|
||||
transactionID *api.TransactionID
|
||||
rewritesState bool
|
||||
// Which server to send this event as.
|
||||
sendAsServer string
|
||||
// The eventID of the event that was processed before this one.
|
||||
lastEventIDSent string
|
||||
// The latest events in the room after processing this event.
|
||||
oldLatest types.StateAtEventAndReferences
|
||||
latest types.StateAtEventAndReferences
|
||||
|
@ -122,13 +149,9 @@ type latestEventsUpdater struct {
|
|||
// The snapshots of current state before and after processing this event
|
||||
oldStateNID types.StateSnapshotNID
|
||||
newStateNID types.StateSnapshotNID
|
||||
// The history visibility of the event itself (from the state before the event).
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
}
|
||||
|
||||
func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
||||
u.lastEventIDSent = u.updater.LastEventIDSent()
|
||||
|
||||
func (u *latestEventsUpdater) doUpdateLatestEvents(ctx context.Context, roomInfo *types.RoomInfo) ([]api.OutputEvent, error) {
|
||||
// If we are doing a regular event update then we will get the
|
||||
// previous latest events to use as a part of the calculation. If
|
||||
// we are overwriting the latest events because we have a complete
|
||||
|
@ -141,17 +164,10 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
|||
u.oldLatest = u.updater.LatestEvents()
|
||||
}
|
||||
|
||||
// If the event has already been written to the output log then we
|
||||
// don't need to do anything, as we've handled it already.
|
||||
if hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID); err != nil {
|
||||
return fmt.Errorf("u.updater.HasEventBeenSent: %w", err)
|
||||
} else if hasBeenSent {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Work out what the latest events are. This will include the new
|
||||
// event if it is not already referenced.
|
||||
extremitiesChanged, err := u.calculateLatest(
|
||||
ctx,
|
||||
u.oldLatest, u.event,
|
||||
types.StateAtEventAndReference{
|
||||
EventID: u.event.EventID(),
|
||||
|
@ -159,61 +175,35 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
|||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.calculateLatest: %w", err)
|
||||
return nil, fmt.Errorf("u.calculateLatest: %w", err)
|
||||
}
|
||||
|
||||
// Now that we know what the latest events are, it's time to get the
|
||||
// latest state.
|
||||
var updates []api.OutputEvent
|
||||
var membershipUpdates []api.OutputEvent
|
||||
if extremitiesChanged || u.rewritesState {
|
||||
if err = u.latestState(); err != nil {
|
||||
return fmt.Errorf("u.latestState: %w", err)
|
||||
if err = u.latestState(ctx, roomInfo); err != nil {
|
||||
return nil, fmt.Errorf("u.latestState: %w", err)
|
||||
}
|
||||
|
||||
// If we need to generate any output events then here's where we do it.
|
||||
// TODO: Move this!
|
||||
if updates, err = u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added); err != nil {
|
||||
return fmt.Errorf("u.api.updateMemberships: %w", err)
|
||||
if membershipUpdates, err = u.api.updateMemberships(ctx, u.updater, u.removed, u.added); err != nil {
|
||||
return nil, fmt.Errorf("u.api.updateMemberships: %w", err)
|
||||
}
|
||||
} else {
|
||||
u.newStateNID = u.oldStateNID
|
||||
}
|
||||
|
||||
if err = u.updater.SetLatestEvents(u.roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil {
|
||||
return fmt.Errorf("u.updater.SetLatestEvents: %w", err)
|
||||
}
|
||||
|
||||
update, err := u.makeOutputNewRoomEvent()
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err)
|
||||
}
|
||||
updates = append(updates, *update)
|
||||
|
||||
// Send the event to the output logs.
|
||||
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
|
||||
// (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but
|
||||
// the write to the output log succeeds)
|
||||
// TODO: This assumes that writing the event to the output log is synchronous. It should be possible to
|
||||
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
|
||||
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
|
||||
// necessary bookkeeping we'll keep the event sending synchronous for now.
|
||||
if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID().String(), updates); err != nil {
|
||||
return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
|
||||
}
|
||||
|
||||
if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil {
|
||||
return fmt.Errorf("u.updater.MarkEventAsSent: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return membershipUpdates, nil
|
||||
}
|
||||
|
||||
func (u *latestEventsUpdater) latestState() error {
|
||||
trace, ctx := internal.StartRegion(u.ctx, "processEventWithMissingState")
|
||||
func (u *latestEventsUpdater) latestState(ctx context.Context, roomInfo *types.RoomInfo) error {
|
||||
trace, ctx := internal.StartRegion(ctx, "processEventWithMissingState")
|
||||
defer trace.EndRegion()
|
||||
|
||||
var err error
|
||||
roomState := state.NewStateResolution(u.updater, u.roomInfo, u.api.Queryer)
|
||||
roomState := state.NewStateResolution(u.updater, roomInfo, u.api.Queryer)
|
||||
|
||||
// Work out if the state at the extremities has actually changed
|
||||
// or not. If they haven't then we won't bother doing all of the
|
||||
|
@ -289,22 +279,26 @@ func (u *latestEventsUpdater) latestState() error {
|
|||
|
||||
if removed := len(u.removed) - len(u.added); !u.rewritesState && removed > 0 {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"event_id": u.event.EventID(),
|
||||
"room_id": u.event.RoomID().String(),
|
||||
"old_state_nid": u.oldStateNID,
|
||||
"new_state_nid": u.newStateNID,
|
||||
"old_latest": u.oldLatest.EventIDs(),
|
||||
"new_latest": u.latest.EventIDs(),
|
||||
"event_id": u.event.EventID(),
|
||||
"room_id": u.event.RoomID().String(),
|
||||
"old_state_nid": u.oldStateNID,
|
||||
"new_state_nid": u.newStateNID,
|
||||
"old_latest": u.oldLatest.EventIDs(),
|
||||
"new_latest": u.latest.EventIDs(),
|
||||
"rewrites_state": u.rewritesState,
|
||||
"state_at_event": fmt.Sprintf("%#v", u.stateAtEvent),
|
||||
}).Warnf("State reset detected (removing %d events)", removed)
|
||||
sentry.WithScope(func(scope *sentry.Scope) {
|
||||
scope.SetLevel("warning")
|
||||
scope.SetContext("State reset", map[string]interface{}{
|
||||
"Event ID": u.event.EventID(),
|
||||
"Old state NID": fmt.Sprintf("%d", u.oldStateNID),
|
||||
"New state NID": fmt.Sprintf("%d", u.newStateNID),
|
||||
"Old latest": u.oldLatest.EventIDs(),
|
||||
"New latest": u.latest.EventIDs(),
|
||||
"State removed": removed,
|
||||
"Event ID": u.event.EventID(),
|
||||
"Old state NID": fmt.Sprintf("%d", u.oldStateNID),
|
||||
"New state NID": fmt.Sprintf("%d", u.newStateNID),
|
||||
"Old latest": u.oldLatest.EventIDs(),
|
||||
"New latest": u.latest.EventIDs(),
|
||||
"State removed": removed,
|
||||
"State rewritten": fmt.Sprintf("%v", u.rewritesState),
|
||||
"State at event": fmt.Sprintf("%#v", u.stateAtEvent),
|
||||
})
|
||||
sentry.CaptureMessage("State reset detected")
|
||||
})
|
||||
|
@ -325,11 +319,12 @@ func (u *latestEventsUpdater) latestState() error {
|
|||
// calculateLatest works out the new set of forward extremities. Returns
|
||||
// true if the new event is included in those extremites, false otherwise.
|
||||
func (u *latestEventsUpdater) calculateLatest(
|
||||
ctx context.Context,
|
||||
oldLatest []types.StateAtEventAndReference,
|
||||
newEvent gomatrixserverlib.PDU,
|
||||
newStateAndRef types.StateAtEventAndReference,
|
||||
) (bool, error) {
|
||||
trace, _ := internal.StartRegion(u.ctx, "calculateLatest")
|
||||
trace, _ := internal.StartRegion(ctx, "calculateLatest")
|
||||
defer trace.EndRegion()
|
||||
|
||||
// First of all, get a list of all of the events in our current
|
||||
|
@ -386,7 +381,13 @@ func (u *latestEventsUpdater) calculateLatest(
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) {
|
||||
func (u *latestEventsUpdater) makeOutputNewRoomEvent(
|
||||
ctx context.Context,
|
||||
transactionID *api.TransactionID,
|
||||
sendAsServer string,
|
||||
lastEventIDSent string,
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||
) (*api.OutputEvent, error) {
|
||||
latestEventIDs := make([]string, len(u.latest))
|
||||
for i := range u.latest {
|
||||
latestEventIDs[i] = u.latest[i].EventID
|
||||
|
@ -395,14 +396,14 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
|
|||
ore := api.OutputNewRoomEvent{
|
||||
Event: &types.HeaderedEvent{PDU: u.event},
|
||||
RewritesState: u.rewritesState,
|
||||
LastSentEventID: u.lastEventIDSent,
|
||||
LastSentEventID: lastEventIDSent,
|
||||
LatestEventIDs: latestEventIDs,
|
||||
TransactionID: u.transactionID,
|
||||
SendAsServer: u.sendAsServer,
|
||||
HistoryVisibility: u.historyVisibility,
|
||||
TransactionID: transactionID,
|
||||
SendAsServer: sendAsServer,
|
||||
HistoryVisibility: historyVisibility,
|
||||
}
|
||||
|
||||
eventIDMap, err := u.stateEventMap()
|
||||
eventIDMap, err := u.stateEventMap(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -426,7 +427,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
|
|||
}
|
||||
|
||||
// retrieve an event nid -> event ID map for all events that need updating
|
||||
func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error) {
|
||||
func (u *latestEventsUpdater) stateEventMap(ctx context.Context) (map[types.EventNID]string, error) {
|
||||
cap := len(u.added) + len(u.removed) + len(u.stateBeforeEventRemoves) + len(u.stateBeforeEventAdds)
|
||||
stateEventNIDs := make(types.EventNIDs, 0, cap)
|
||||
allStateEntries := make([]types.StateEntry, 0, cap)
|
||||
|
@ -438,5 +439,5 @@ func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error)
|
|||
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
||||
}
|
||||
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(stateEventNIDs)]
|
||||
return u.updater.EventIDs(u.ctx, stateEventNIDs)
|
||||
return u.updater.EventIDs(ctx, stateEventNIDs)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue