Persist backfilled events with state snapshots

This commit is contained in:
Kegan Dougal 2020-04-27 18:44:07 +01:00
parent 1ff251e16f
commit 2b10a5ca23
7 changed files with 147 additions and 82 deletions

View file

@ -98,7 +98,7 @@ func Backfill(
} }
var eventJSONs []json.RawMessage var eventJSONs []json.RawMessage
for _, e := range gomatrixserverlib.ReverseTopologicalOrdering(evs) { for _, e := range gomatrixserverlib.ReverseTopologicalOrdering(evs, gomatrixserverlib.TopologicalOrderByPrevEvents) {
eventJSONs = append(eventJSONs, e.JSON()) eventJSONs = append(eventJSONs, e.JSON())
} }

3
go.mod
View file

@ -17,7 +17,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f
github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
github.com/matrix-org/gomatrixserverlib v0.0.0-20200424154222-2827b39252bd github.com/matrix-org/gomatrixserverlib v0.0.0-20200427152419-6a0535cc473a
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7
github.com/mattn/go-sqlite3 v2.0.2+incompatible github.com/mattn/go-sqlite3 v2.0.2+incompatible
@ -26,6 +26,7 @@ require (
github.com/opentracing/opentracing-go v1.1.0 github.com/opentracing/opentracing-go v1.1.0
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.4.1 github.com/prometheus/client_golang v1.4.1
github.com/prometheus/common v0.9.1
github.com/sirupsen/logrus v1.4.2 github.com/sirupsen/logrus v1.4.2
github.com/uber/jaeger-client-go v2.15.0+incompatible github.com/uber/jaeger-client-go v2.15.0+incompatible
github.com/uber/jaeger-lib v1.5.0 github.com/uber/jaeger-lib v1.5.0

5
go.sum
View file

@ -373,6 +373,10 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200423090438-562549dbe799 h1:Os
github.com/matrix-org/gomatrixserverlib v0.0.0-20200423090438-562549dbe799/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/gomatrixserverlib v0.0.0-20200423090438-562549dbe799/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200424154222-2827b39252bd h1:243fMfK0XqTQsdUY3IIqtxPX5g9MfPTaAP92CseqOek= github.com/matrix-org/gomatrixserverlib v0.0.0-20200424154222-2827b39252bd h1:243fMfK0XqTQsdUY3IIqtxPX5g9MfPTaAP92CseqOek=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200424154222-2827b39252bd/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/gomatrixserverlib v0.0.0-20200424154222-2827b39252bd/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3 h1:aJMAKjfXG5I8TqPxJQbQIkGSWM770oxkpgsPHE8C06E=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200427152419-6a0535cc473a h1:tlXCVU3eab9kksGYBRA3oyrmIRwD/aPujo5KJCdlCVQ=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200427152419-6a0535cc473a/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A=
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y=
@ -681,6 +685,7 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
gopkg.in/Shopify/sarama.v1 v1.20.1 h1:Gi09A3fJXm0Jgt8kuKZ8YK+r60GfYn7MQuEmI3oq6hE= gopkg.in/Shopify/sarama.v1 v1.20.1 h1:Gi09A3fJXm0Jgt8kuKZ8YK+r60GfYn7MQuEmI3oq6hE=
gopkg.in/Shopify/sarama.v1 v1.20.1/go.mod h1:AxnvoaevB2nBjNK17cG61A3LleFcWFwVBHBt+cot4Oc= gopkg.in/Shopify/sarama.v1 v1.20.1/go.mod h1:AxnvoaevB2nBjNK17cG61A3LleFcWFwVBHBt+cot4Oc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View file

@ -92,11 +92,6 @@ func processRoomEvent(
} }
} }
if input.Kind == api.KindBackfill {
// Backfill is not implemented.
panic("Not implemented")
}
// Update the extremities of the event graph for the room // Update the extremities of the event graph for the room
return event.EventID(), updateLatestEvents( return event.EventID(), updateLatestEvents(
ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID, ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID,

View file

@ -17,27 +17,126 @@ type backfillRequester struct {
// per-request state // per-request state
servers []gomatrixserverlib.ServerName servers []gomatrixserverlib.ServerName
stateIDs []string eventIDToBeforeStateIDs map[string][]string
eventIDMap map[string]gomatrixserverlib.Event
} }
func (b *backfillRequester) StateIDsBeforeEvent(ctx context.Context, roomID, atEventID string) ([]string, error) { func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName) *backfillRequester {
c := gomatrixserverlib.FederatedStateProvider{ return &backfillRequester{
FedClient: b.fedClient, db: db,
AuthEventsOnly: true, fedClient: fedClient,
Server: b.servers[0], thisServer: thisServer,
eventIDToBeforeStateIDs: make(map[string][]string),
eventIDMap: make(map[string]gomatrixserverlib.Event),
} }
res, err := c.StateIDsAtEvent(ctx, roomID, atEventID)
b.stateIDs = res
return res, err
} }
func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatrixserverlib.RoomVersion, roomID, atEventID string, eventIDs []string) (map[string]*gomatrixserverlib.Event, error) { func (b *backfillRequester) StateIDsBeforeEvent(ctx context.Context, targetEvent gomatrixserverlib.HeaderedEvent) ([]string, error) {
b.eventIDMap[targetEvent.EventID()] = targetEvent.Unwrap()
if ids, ok := b.eventIDToBeforeStateIDs[targetEvent.EventID()]; ok {
return ids, nil
}
// if we have exactly 1 prev event and we know the state of the room at that prev event, then just roll forward the prev event.
// Else, we have to hit /state_ids because either we don't know the state at all at this event (new backwards extremity) or
// we don't know the result of state res to merge forks (2 or more prev_events)
if len(targetEvent.PrevEventIDs()) == 1 {
prevEventID := targetEvent.PrevEventIDs()[0]
prevEvent, ok := b.eventIDMap[prevEventID]
if !ok {
goto FederationHit
}
prevEventStateIDs, ok := b.eventIDToBeforeStateIDs[prevEventID]
if !ok {
goto FederationHit
}
// The state IDs BEFORE the target event are the state IDs BEFORE the prev_event PLUS the prev_event itself
newStateIDs := prevEventStateIDs[:]
if prevEvent.StateKey() == nil {
// state is the same as the previous event
b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs
return newStateIDs, nil
}
missingState := false // true if we are missing the info for a state event ID
foundEvent := false // true if we found a (type, state_key) match
// find which state ID to replace, if any
for i, id := range newStateIDs {
ev, ok := b.eventIDMap[id]
if !ok {
missingState = true
continue
}
if ev.Type() == prevEvent.Type() && ev.StateKey() != nil && ev.StateKey() == prevEvent.StateKey() {
newStateIDs[i] = prevEvent.EventID()
foundEvent = true
break
}
}
if !foundEvent && !missingState {
// we can be certain that this is new state
newStateIDs = append(newStateIDs, prevEvent.EventID())
foundEvent = true
}
if foundEvent {
b.eventIDToBeforeStateIDs[targetEvent.EventID()] = newStateIDs
return newStateIDs, nil
}
// else fallthrough because we don't know if one of the missing state IDs was the one we could replace.
}
FederationHit:
var lastErr error
logrus.WithField("event_id", targetEvent.EventID()).Info("Requesting /state_ids at event")
for _, srv := range b.servers { // hit any valid server
c := gomatrixserverlib.FederatedStateProvider{ c := gomatrixserverlib.FederatedStateProvider{
FedClient: b.fedClient, FedClient: b.fedClient,
AuthEventsOnly: true, AuthEventsOnly: false,
Server: srv,
}
res, err := c.StateIDsBeforeEvent(ctx, targetEvent)
if err != nil {
lastErr = err
continue
}
b.eventIDToBeforeStateIDs[targetEvent.EventID()] = res
return res, nil
}
return nil, lastErr
}
func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatrixserverlib.RoomVersion, event gomatrixserverlib.HeaderedEvent, eventIDs []string) (map[string]*gomatrixserverlib.Event, error) {
// try to fetch the events from the database first
events, err := b.ProvideEvents(roomVer, eventIDs)
if err != nil {
// non-fatal, fallthrough
logrus.WithError(err).Info("Failed to fetch events banana")
} else {
logrus.Infof("Fetched %d/%d events from the database banana", len(events), len(eventIDs))
if len(events) == len(eventIDs) {
result := make(map[string]*gomatrixserverlib.Event)
for i := range events {
result[events[i].EventID()] = &events[i]
b.eventIDMap[events[i].EventID()] = events[i]
}
return result, nil
}
}
logrus.WithField("event_id", event.EventID()).Info("Requesting /state at event banana")
c := gomatrixserverlib.FederatedStateProvider{
FedClient: b.fedClient,
AuthEventsOnly: false,
Server: b.servers[0], Server: b.servers[0],
} }
return c.StateAtEvent(ctx, roomVer, roomID, atEventID, eventIDs) result, err := c.StateBeforeEvent(ctx, roomVer, event, eventIDs)
if err != nil {
return nil, err
}
for eventID, ev := range result {
b.eventIDMap[eventID] = *ev
}
return result, nil
} }
// ServersAtEvent is called when trying to determine which server to request from. // ServersAtEvent is called when trying to determine which server to request from.
@ -85,7 +184,6 @@ func (b *backfillRequester) Backfill(ctx context.Context, server gomatrixserverl
} }
func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.Event, error) { func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.Event, error) {
logrus.Info("backfillRequester.ProvideEvents ", eventIDs)
ctx := context.Background() ctx := context.Background()
nidMap, err := b.db.EventNIDs(ctx, eventIDs) nidMap, err := b.db.EventNIDs(ctx, eventIDs)
if err != nil { if err != nil {
@ -107,6 +205,5 @@ func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion,
for i := range eventsWithNids { for i := range eventsWithNids {
events[i] = eventsWithNids[i].Event events[i] = eventsWithNids[i].Event
} }
logrus.Infof("backfillRequester.ProvideEvents Returning %+v", events)
return events, nil return events, nil
} }

View file

@ -543,15 +543,17 @@ func (r *RoomserverQueryAPI) backfillViaFederation(ctx context.Context, req *api
if err != nil { if err != nil {
return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err) return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err)
} }
events, err := gomatrixserverlib.RequestBackfill(ctx, &backfillRequester{ requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName)
db: r.DB, events, err := gomatrixserverlib.RequestBackfill(
fedClient: r.FedClient, ctx, requester,
thisServer: r.ServerName, r.KeyRing, req.RoomID, roomVer, req.EarliestEventsIDs, req.Limit)
}, r.KeyRing, req.RoomID, roomVer, req.EarliestEventsIDs, req.Limit)
if err != nil { if err != nil {
return err return err
} }
logrus.WithField("room_id", req.RoomID).Infof("backfilled %d events", len(events))
backfilledEventMap := make(map[string]types.Event) backfilledEventMap := make(map[string]types.Event)
var roomNID types.RoomNID
// persist these new events - auth checks have already been done // persist these new events - auth checks have already been done
for _, ev := range events { for _, ev := range events {
nidMap, err := r.DB.EventNIDs(ctx, ev.AuthEventIDs()) nidMap, err := r.DB.EventNIDs(ctx, ev.AuthEventIDs())
@ -565,7 +567,8 @@ func (r *RoomserverQueryAPI) backfillViaFederation(ctx context.Context, req *api
authNids[i] = nid authNids[i] = nid
i++ i++
} }
_, stateAtEvent, err := r.DB.StoreEvent(ctx, ev.Unwrap(), nil, authNids) var stateAtEvent types.StateAtEvent
roomNID, stateAtEvent, err = r.DB.StoreEvent(ctx, ev.Unwrap(), nil, authNids)
if err != nil { if err != nil {
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to store backfilled event") logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to store backfilled event")
continue continue
@ -576,61 +579,25 @@ func (r *RoomserverQueryAPI) backfillViaFederation(ctx context.Context, req *api
} }
} }
// TODO: This needs tests around it and should probably live in the state package. for _, ev := range backfilledEventMap {
// Update the state db so we can get state snapshots at these new backfilled events // now add state for these events
// This will be important if we want to backfill multiple times as we get the join memberships from state snapshots. stateIDs, ok := requester.eventIDToBeforeStateIDs[ev.EventID()]
// The steps for this are:
// - load the events `req.EarliestEventIDs`
// - get the []state at `req.EarliestEventIDs` which we should know as worst case it's the join event with state from send_join
// - loop each earliest event and for each:
// - backwardsStateSnapshot = earliest event ID's state snapshot
// - loop its prev_events, and for each:
// - try to find the prev event in the backfill response. If found:
// * is it a state event?
// YES: create a new state snapshot and use that. backwardsStateSnapshot = this new snapshot.
// NO: use the same state snapshot as backwardsStateSnapshot.
// - the remaining backfilled events are outliers so don't need anything done to them.
earliestEvents, err := r.DB.EventsFromIDs(ctx, req.EarliestEventsIDs)
if err != nil || len(earliestEvents) != len(req.EarliestEventsIDs) { // this should never happen
logrus.WithError(err).Error("Cannot find earliest event IDs for backfilling")
return err
}
stateAtEvents, err := r.DB.StateAtEventIDs(ctx, req.EarliestEventsIDs)
if err != nil {
logrus.WithError(err).Error("Cannot get state at earliest event IDs for backfilling")
return err
}
if len(stateAtEvents) != len(earliestEvents) {
err = fmt.Errorf("backfill: loaded %d events but only have state for %d of them", len(earliestEvents), len(stateAtEvents))
logrus.Errorf("Cannot calculate state at backfilled events: %s", err)
return err
}
// the best struct here bundles together state and event
for i := range earliestEvents {
currEventExtremity := earliestEvents[i]
//currState := stateAtEvents[i]
// start working our way back up the DAG
for _, prevEventID := range currEventExtremity.PrevEventIDs() {
prevEvent, ok := backfilledEventMap[prevEventID]
if !ok { if !ok {
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to find state IDs for event which passed auth checks")
continue continue
} }
if prevEvent.StateKey() == nil { var entries []types.StateEntry
// simple case, the state snapshot is the same as the current if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs); err != nil {
snapshotNID, err := r.DB.SnapshotNIDFromEventID(ctx, currEventExtremity.EventID()) return err
if err != nil {
logrus.WithError(err).WithField("event_id", currEventExtremity.EventID()).Error("backfill: Failed to lookup state snapshot for event")
continue
}
if err := r.DB.SetState(ctx, prevEvent.EventNID, snapshotNID); err != nil {
logrus.WithError(err).Error("Failed to store state snapshot for backfilled message event")
}
continue
}
} }
var beforeStateSnapshotNID types.StateSnapshotNID
if beforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, entries); err != nil {
return err
}
if err = r.DB.SetState(ctx, ev.EventNID, beforeStateSnapshotNID); err != nil {
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to set state before event")
}
} }
// TODO: update backwards extremities, as that should be moved from syncapi to roomserver at some point. // TODO: update backwards extremities, as that should be moved from syncapi to roomserver at some point.

View file

@ -210,7 +210,7 @@ func (r *messagesReq) retrieveEvents() (
} }
// Sort the events to ensure we send them in the right order. // Sort the events to ensure we send them in the right order.
events = gomatrixserverlib.HeaderedReverseTopologicalOrdering(events) events = gomatrixserverlib.HeaderedReverseTopologicalOrdering(events, gomatrixserverlib.TopologicalOrderByPrevEvents)
if r.backwardOrdering { if r.backwardOrdering {
// This reverses the array from old->new to new->old // This reverses the array from old->new to new->old
sort.SliceStable(events, func(i, j int) bool { sort.SliceStable(events, func(i, j int) bool {