diff --git a/federationapi/routing/backfill.go b/federationapi/routing/backfill.go index eee590eca..e7621683d 100644 --- a/federationapi/routing/backfill.go +++ b/federationapi/routing/backfill.go @@ -98,7 +98,7 @@ func Backfill( } var eventJSONs []json.RawMessage - for _, e := range gomatrixserverlib.ReverseTopologicalOrdering(evs) { + for _, e := range gomatrixserverlib.ReverseTopologicalOrdering(evs, gomatrixserverlib.TopologicalOrderByPrevEvents) { eventJSONs = append(eventJSONs, e.JSON()) } diff --git a/go.mod b/go.mod index f707d7c93..ce33b6c19 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200424154222-2827b39252bd + github.com/matrix-org/gomatrixserverlib v0.0.0-20200427152419-6a0535cc473a github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible @@ -26,6 +26,7 @@ require ( github.com/opentracing/opentracing-go v1.1.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.4.1 + github.com/prometheus/common v0.9.1 github.com/sirupsen/logrus v1.4.2 github.com/uber/jaeger-client-go v2.15.0+incompatible github.com/uber/jaeger-lib v1.5.0 diff --git a/go.sum b/go.sum index 6ead661ed..c5c608aa8 100644 --- a/go.sum +++ b/go.sum @@ -373,6 +373,10 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200423090438-562549dbe799 h1:Os github.com/matrix-org/gomatrixserverlib v0.0.0-20200423090438-562549dbe799/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/gomatrixserverlib v0.0.0-20200424154222-2827b39252bd h1:243fMfK0XqTQsdUY3IIqtxPX5g9MfPTaAP92CseqOek= github.com/matrix-org/gomatrixserverlib v0.0.0-20200424154222-2827b39252bd/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3 h1:aJMAKjfXG5I8TqPxJQbQIkGSWM770oxkpgsPHE8C06E= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200427152419-6a0535cc473a h1:tlXCVU3eab9kksGYBRA3oyrmIRwD/aPujo5KJCdlCVQ= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200427152419-6a0535cc473a/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= @@ -681,6 +685,7 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= gopkg.in/Shopify/sarama.v1 v1.20.1 h1:Gi09A3fJXm0Jgt8kuKZ8YK+r60GfYn7MQuEmI3oq6hE= gopkg.in/Shopify/sarama.v1 v1.20.1/go.mod h1:AxnvoaevB2nBjNK17cG61A3LleFcWFwVBHBt+cot4Oc= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/roomserver/input/events.go b/roomserver/input/events.go index bc729b0fd..b0d5f13fe 100644 --- a/roomserver/input/events.go +++ b/roomserver/input/events.go @@ -92,11 +92,6 @@ func processRoomEvent( } } - if input.Kind == api.KindBackfill { - // Backfill is not implemented. - panic("Not implemented") - } - // Update the extremities of the event graph for the room return event.EventID(), updateLatestEvents( ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID, diff --git a/roomserver/query/backfill.go b/roomserver/query/backfill.go index 45bf5b593..980385fa7 100644 --- a/roomserver/query/backfill.go +++ b/roomserver/query/backfill.go @@ -16,28 +16,127 @@ type backfillRequester struct { thisServer gomatrixserverlib.ServerName // per-request state - servers []gomatrixserverlib.ServerName - stateIDs []string + servers []gomatrixserverlib.ServerName + eventIDToBeforeStateIDs map[string][]string + eventIDMap map[string]gomatrixserverlib.Event } -func (b *backfillRequester) StateIDsBeforeEvent(ctx context.Context, roomID, atEventID string) ([]string, error) { - c := gomatrixserverlib.FederatedStateProvider{ - FedClient: b.fedClient, - AuthEventsOnly: true, - Server: b.servers[0], +func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName) *backfillRequester { + return &backfillRequester{ + db: db, + fedClient: fedClient, + thisServer: thisServer, + eventIDToBeforeStateIDs: make(map[string][]string), + eventIDMap: make(map[string]gomatrixserverlib.Event), } - res, err := c.StateIDsAtEvent(ctx, roomID, atEventID) - b.stateIDs = res - return res, err } -func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatrixserverlib.RoomVersion, roomID, atEventID string, eventIDs []string) (map[string]*gomatrixserverlib.Event, error) { +func (b *backfillRequester) StateIDsBeforeEvent(ctx context.Context, targetEvent gomatrixserverlib.HeaderedEvent) ([]string, error) { + b.eventIDMap[targetEvent.EventID()] = targetEvent.Unwrap() + if ids, ok := b.eventIDToBeforeStateIDs[targetEvent.EventID()]; ok { + return ids, nil + } + // if we have exactly 1 prev event and we know the state of the room at that prev event, then just roll forward the prev event. + // Else, we have to hit /state_ids because either we don't know the state at all at this event (new backwards extremity) or + // we don't know the result of state res to merge forks (2 or more prev_events) + if len(targetEvent.PrevEventIDs()) == 1 { + prevEventID := targetEvent.PrevEventIDs()[0] + prevEvent, ok := b.eventIDMap[prevEventID] + if !ok { + goto FederationHit + } + prevEventStateIDs, ok := b.eventIDToBeforeStateIDs[prevEventID] + if !ok { + goto FederationHit + } + // The state IDs BEFORE the target event are the state IDs BEFORE the prev_event PLUS the prev_event itself + newStateIDs := prevEventStateIDs[:] + if prevEvent.StateKey() == nil { + // state is the same as the previous event + b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs + return newStateIDs, nil + } + + missingState := false // true if we are missing the info for a state event ID + foundEvent := false // true if we found a (type, state_key) match + // find which state ID to replace, if any + for i, id := range newStateIDs { + ev, ok := b.eventIDMap[id] + if !ok { + missingState = true + continue + } + if ev.Type() == prevEvent.Type() && ev.StateKey() != nil && ev.StateKey() == prevEvent.StateKey() { + newStateIDs[i] = prevEvent.EventID() + foundEvent = true + break + } + } + if !foundEvent && !missingState { + // we can be certain that this is new state + newStateIDs = append(newStateIDs, prevEvent.EventID()) + foundEvent = true + } + + if foundEvent { + b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs + return newStateIDs, nil + } + // else fallthrough because we don't know if one of the missing state IDs was the one we could replace. + } + +FederationHit: + var lastErr error + logrus.WithField("event_id", targetEvent.EventID()).Info("Requesting /state_ids at event") + for _, srv := range b.servers { // hit any valid server + c := gomatrixserverlib.FederatedStateProvider{ + FedClient: b.fedClient, + AuthEventsOnly: false, + Server: srv, + } + res, err := c.StateIDsBeforeEvent(ctx, targetEvent) + if err != nil { + lastErr = err + continue + } + b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res + return res, nil + } + return nil, lastErr +} + +func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatrixserverlib.RoomVersion, event gomatrixserverlib.HeaderedEvent, eventIDs []string) (map[string]*gomatrixserverlib.Event, error) { + // try to fetch the events from the database first + events, err := b.ProvideEvents(roomVer, eventIDs) + if err != nil { + // non-fatal, fallthrough + logrus.WithError(err).Info("Failed to fetch events banana") + } else { + logrus.Infof("Fetched %d/%d events from the database banana", len(events), len(eventIDs)) + if len(events) == len(eventIDs) { + result := make(map[string]*gomatrixserverlib.Event) + for i := range events { + result[events[i].EventID()] = &events[i] + b.eventIDMap[events[i].EventID()] = events[i] + } + return result, nil + } + } + + logrus.WithField("event_id", event.EventID()).Info("Requesting /state at event banana") c := gomatrixserverlib.FederatedStateProvider{ FedClient: b.fedClient, - AuthEventsOnly: true, + AuthEventsOnly: false, Server: b.servers[0], } - return c.StateAtEvent(ctx, roomVer, roomID, atEventID, eventIDs) + result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs) + if err != nil { + return nil, err + } + for eventID, ev := range result { + b.eventIDMap[eventID] = *ev + } + return result, nil } // ServersAtEvent is called when trying to determine which server to request from. @@ -85,7 +184,6 @@ func (b *backfillRequester) Backfill(ctx context.Context, server gomatrixserverl } func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.Event, error) { - logrus.Info("backfillRequester.ProvideEvents ", eventIDs) ctx := context.Background() nidMap, err := b.db.EventNIDs(ctx, eventIDs) if err != nil { @@ -107,6 +205,5 @@ func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, for i := range eventsWithNids { events[i] = eventsWithNids[i].Event } - logrus.Infof("backfillRequester.ProvideEvents Returning %+v", events) return events, nil } diff --git a/roomserver/query/query.go b/roomserver/query/query.go index 2c110c90c..25b2498b2 100644 --- a/roomserver/query/query.go +++ b/roomserver/query/query.go @@ -543,15 +543,17 @@ func (r *RoomserverQueryAPI) backfillViaFederation(ctx context.Context, req *api if err != nil { return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err) } - events, err := gomatrixserverlib.RequestBackfill(ctx, &backfillRequester{ - db: r.DB, - fedClient: r.FedClient, - thisServer: r.ServerName, - }, r.KeyRing, req.RoomID, roomVer, req.EarliestEventsIDs, req.Limit) + requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName) + events, err := gomatrixserverlib.RequestBackfill( + ctx, requester, + r.KeyRing, req.RoomID, roomVer, req.EarliestEventsIDs, req.Limit) if err != nil { return err } + logrus.WithField("room_id", req.RoomID).Infof("backfilled %d events", len(events)) + backfilledEventMap := make(map[string]types.Event) + var roomNID types.RoomNID // persist these new events - auth checks have already been done for _, ev := range events { nidMap, err := r.DB.EventNIDs(ctx, ev.AuthEventIDs()) @@ -565,7 +567,8 @@ func (r *RoomserverQueryAPI) backfillViaFederation(ctx context.Context, req *api authNids[i] = nid i++ } - _, stateAtEvent, err := r.DB.StoreEvent(ctx, ev.Unwrap(), nil, authNids) + var stateAtEvent types.StateAtEvent + roomNID, stateAtEvent, err = r.DB.StoreEvent(ctx, ev.Unwrap(), nil, authNids) if err != nil { logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to store backfilled event") continue @@ -576,61 +579,25 @@ func (r *RoomserverQueryAPI) backfillViaFederation(ctx context.Context, req *api } } - // TODO: This needs tests around it and should probably live in the state package. - // Update the state db so we can get state snapshots at these new backfilled events - // This will be important if we want to backfill multiple times as we get the join memberships from state snapshots. - // The steps for this are: - // - load the events `req.EarliestEventIDs` - // - get the []state at `req.EarliestEventIDs` which we should know as worst case it's the join event with state from send_join - // - loop each earliest event and for each: - // - backwardsStateSnapshot = earliest event ID's state snapshot - // - loop its prev_events, and for each: - // - try to find the prev event in the backfill response. If found: - // * is it a state event? - // YES: create a new state snapshot and use that. backwardsStateSnapshot = this new snapshot. - // NO: use the same state snapshot as backwardsStateSnapshot. - // - the remaining backfilled events are outliers so don't need anything done to them. - earliestEvents, err := r.DB.EventsFromIDs(ctx, req.EarliestEventsIDs) - if err != nil || len(earliestEvents) != len(req.EarliestEventsIDs) { // this should never happen - logrus.WithError(err).Error("Cannot find earliest event IDs for backfilling") - return err - } - stateAtEvents, err := r.DB.StateAtEventIDs(ctx, req.EarliestEventsIDs) - if err != nil { - logrus.WithError(err).Error("Cannot get state at earliest event IDs for backfilling") - return err - } - if len(stateAtEvents) != len(earliestEvents) { - err = fmt.Errorf("backfill: loaded %d events but only have state for %d of them", len(earliestEvents), len(stateAtEvents)) - logrus.Errorf("Cannot calculate state at backfilled events: %s", err) - return err - } - // the best struct here bundles together state and event - - for i := range earliestEvents { - currEventExtremity := earliestEvents[i] - //currState := stateAtEvents[i] - // start working our way back up the DAG - - for _, prevEventID := range currEventExtremity.PrevEventIDs() { - prevEvent, ok := backfilledEventMap[prevEventID] - if !ok { - continue - } - if prevEvent.StateKey() == nil { - // simple case, the state snapshot is the same as the current - snapshotNID, err := r.DB.SnapshotNIDFromEventID(ctx, currEventExtremity.EventID()) - if err != nil { - logrus.WithError(err).WithField("event_id", currEventExtremity.EventID()).Error("backfill: Failed to lookup state snapshot for event") - continue - } - if err := r.DB.SetState(ctx, prevEvent.EventNID, snapshotNID); err != nil { - logrus.WithError(err).Error("Failed to store state snapshot for backfilled message event") - } - continue - } + for _, ev := range backfilledEventMap { + // now add state for these events + stateIDs, ok := requester.eventIDToBeforeStateIDs[ev.EventID()] + if !ok { + logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to find state IDs for event which passed auth checks") + continue + } + var entries []types.StateEntry + if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs); err != nil { + return err } + var beforeStateSnapshotNID types.StateSnapshotNID + if beforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, entries); err != nil { + return err + } + if err = r.DB.SetState(ctx, ev.EventNID, beforeStateSnapshotNID); err != nil { + logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to set state before event") + } } // TODO: update backwards extremities, as that should be moved from syncapi to roomserver at some point. diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index d32a73c7a..55aa79191 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -210,7 +210,7 @@ func (r *messagesReq) retrieveEvents() ( } // Sort the events to ensure we send them in the right order. - events = gomatrixserverlib.HeaderedReverseTopologicalOrdering(events) + events = gomatrixserverlib.HeaderedReverseTopologicalOrdering(events, gomatrixserverlib.TopologicalOrderByPrevEvents) if r.backwardOrdering { // This reverses the array from old->new to new->old sort.SliceStable(events, func(i, j int) bool {