mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-17 03:43:11 -06:00
WIP backfill, doesn't work
This commit is contained in:
parent
4ad946957a
commit
366ed43c9c
|
|
@ -3,6 +3,7 @@ package query
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
@ -10,7 +11,7 @@ import (
|
||||||
|
|
||||||
// backfillRequester implements gomatrixserverlib.BackfillRequester
|
// backfillRequester implements gomatrixserverlib.BackfillRequester
|
||||||
type backfillRequester struct {
|
type backfillRequester struct {
|
||||||
db RoomserverQueryAPIDatabase
|
db storage.Database
|
||||||
fedClient *gomatrixserverlib.FederationClient
|
fedClient *gomatrixserverlib.FederationClient
|
||||||
thisServer gomatrixserverlib.ServerName
|
thisServer gomatrixserverlib.ServerName
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -306,7 +306,7 @@ func (r *RoomserverQueryAPI) QueryMembershipsForRoom(
|
||||||
// only keep the "m.room.member" events with a "join" membership. These events are returned.
|
// only keep the "m.room.member" events with a "join" membership. These events are returned.
|
||||||
// Returns an error if there was an issue fetching the events.
|
// Returns an error if there was an issue fetching the events.
|
||||||
func getMembershipsBeforeEventNID(
|
func getMembershipsBeforeEventNID(
|
||||||
ctx context.Context, db RoomserverQueryAPIDatabase, eventNID types.EventNID, joinedOnly bool,
|
ctx context.Context, db storage.Database, eventNID types.EventNID, joinedOnly bool,
|
||||||
) ([]types.Event, error) {
|
) ([]types.Event, error) {
|
||||||
roomState := state.NewStateResolution(db)
|
roomState := state.NewStateResolution(db)
|
||||||
events := []types.Event{}
|
events := []types.Event{}
|
||||||
|
|
@ -496,6 +496,7 @@ func (r *RoomserverQueryAPI) QueryBackfill(
|
||||||
if request.ServerName == r.ServerName {
|
if request.ServerName == r.ServerName {
|
||||||
return r.backfillViaFederation(ctx, request, response)
|
return r.backfillViaFederation(ctx, request, response)
|
||||||
}
|
}
|
||||||
|
// someone else is requesting the backfill, try to service their request.
|
||||||
var err error
|
var err error
|
||||||
var front []string
|
var front []string
|
||||||
|
|
||||||
|
|
@ -550,8 +551,89 @@ func (r *RoomserverQueryAPI) backfillViaFederation(ctx context.Context, req *api
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// TODO: persist these new events and update the state db so we can get state snapshots at these new backfilled events
|
backfilledEventMap := make(map[string]types.Event)
|
||||||
// this will be important if we want to backfill multiple times as we get the join memberships from state snapshots.
|
// persist these new events - auth checks have already been done
|
||||||
|
for _, ev := range events {
|
||||||
|
nidMap, err := r.DB.EventNIDs(ctx, ev.AuthEventIDs())
|
||||||
|
if err != nil { // this shouldn't happen as RequestBackill already found them
|
||||||
|
logrus.WithError(err).WithField("auth_events", ev.AuthEventIDs()).Error("Failed to find one or more auth events")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
authNids := make([]types.EventNID, len(nidMap))
|
||||||
|
i := 0
|
||||||
|
for _, nid := range nidMap {
|
||||||
|
authNids[i] = nid
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
_, stateAtEvent, err := r.DB.StoreEvent(ctx, ev.Unwrap(), nil, authNids)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to store backfilled event")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
backfilledEventMap[ev.EventID()] = types.Event{
|
||||||
|
EventNID: stateAtEvent.StateEntry.EventNID,
|
||||||
|
Event: ev.Unwrap(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: This needs tests around it and should probably live in the state package.
|
||||||
|
// Update the state db so we can get state snapshots at these new backfilled events
|
||||||
|
// This will be important if we want to backfill multiple times as we get the join memberships from state snapshots.
|
||||||
|
// The steps for this are:
|
||||||
|
// - load the events `req.EarliestEventIDs`
|
||||||
|
// - get the []state at `req.EarliestEventIDs` which we should know as worst case it's the join event with state from send_join
|
||||||
|
// - loop each earliest event and for each:
|
||||||
|
// - backwardsStateSnapshot = earliest event ID's state snapshot
|
||||||
|
// - loop its prev_events, and for each:
|
||||||
|
// - try to find the prev event in the backfill response. If found:
|
||||||
|
// * is it a state event?
|
||||||
|
// YES: create a new state snapshot and use that. backwardsStateSnapshot = this new snapshot.
|
||||||
|
// NO: use the same state snapshot as backwardsStateSnapshot.
|
||||||
|
// - the remaining backfilled events are outliers so don't need anything done to them.
|
||||||
|
earliestEvents, err := r.DB.EventsFromIDs(ctx, req.EarliestEventsIDs)
|
||||||
|
if err != nil || len(earliestEvents) != len(req.EarliestEventsIDs) { // this should never happen
|
||||||
|
logrus.WithError(err).Error("Cannot find earliest event IDs for backfilling")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
stateAtEvents, err := r.DB.StateAtEventIDs(ctx, req.EarliestEventsIDs)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Error("Cannot get state at earliest event IDs for backfilling")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(stateAtEvents) != len(earliestEvents) {
|
||||||
|
err = fmt.Errorf("backfill: loaded %d events but only have state for %d of them", len(earliestEvents), len(stateAtEvents))
|
||||||
|
logrus.Errorf("Cannot calculate state at backfilled events: %s", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// the best struct here bundles together state and event
|
||||||
|
|
||||||
|
for i := range earliestEvents {
|
||||||
|
currEventExtremity := earliestEvents[i]
|
||||||
|
//currState := stateAtEvents[i]
|
||||||
|
// start working our way back up the DAG
|
||||||
|
|
||||||
|
for _, prevEventID := range currEventExtremity.PrevEventIDs() {
|
||||||
|
prevEvent, ok := backfilledEventMap[prevEventID]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if prevEvent.StateKey() == nil {
|
||||||
|
// simple case, the state snapshot is the same as the current
|
||||||
|
snapshotNID, err := r.DB.SnapshotNIDFromEventID(ctx, currEventExtremity.EventID())
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).WithField("event_id", currEventExtremity.EventID()).Error("backfill: Failed to lookup state snapshot for event")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := r.DB.SetState(ctx, prevEvent.EventNID, snapshotNID); err != nil {
|
||||||
|
logrus.WithError(err).Error("Failed to store state snapshot for backfilled message event")
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: update backwards extremities, as that should be moved from syncapi to roomserver at some point.
|
||||||
|
|
||||||
res.Events = events
|
res.Events = events
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,8 @@ type Database interface {
|
||||||
state []types.StateEntry,
|
state []types.StateEntry,
|
||||||
) (types.StateSnapshotNID, error)
|
) (types.StateSnapshotNID, error)
|
||||||
// Look up the state of a room at each event for a list of string event IDs.
|
// Look up the state of a room at each event for a list of string event IDs.
|
||||||
// Returns an error if there is an error talking to the database
|
// Returns an error if there is an error talking to the database.
|
||||||
|
// The length of []types.StateAtEvent is guaranteed to equal the length of eventIDs if no error is returned.
|
||||||
// Returns a types.MissingEventError if the room state for the event IDs aren't in the database
|
// Returns a types.MissingEventError if the room state for the event IDs aren't in the database
|
||||||
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
|
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
|
||||||
// Look up the numeric IDs for a list of string event types.
|
// Look up the numeric IDs for a list of string event types.
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue