From 0e911a0eeaa88259ba45e4d0da9e054c475a710d Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 24 Nov 2020 11:01:12 +0000 Subject: [PATCH] initial cut of federated threading --- internal/mscs/msc2836/msc2836.go | 62 ++++++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 7 deletions(-) diff --git a/internal/mscs/msc2836/msc2836.go b/internal/mscs/msc2836/msc2836.go index 0fc2db823..036a5c75f 100644 --- a/internal/mscs/msc2836/msc2836.go +++ b/internal/mscs/msc2836/msc2836.go @@ -256,7 +256,7 @@ func (rc *reqCtx) includeParent(childEvent *gomatrixserverlib.HeaderedEvent) (pa if parentID == "" { return nil } - return rc.lookForEvent(parentID) + return rc.lookForEvent(parentID, false) } // If include_children: true, lookup all events which have event_id as an m.relationship @@ -271,7 +271,7 @@ func (rc *reqCtx) includeChildren(db Database, parentID string, limit int, recen } var childEvents []*gomatrixserverlib.HeaderedEvent for _, child := range children { - childEvent := rc.lookForEvent(child.EventID) + childEvent := rc.lookForEvent(child.EventID, false) if childEvent != nil { childEvents = append(childEvents, childEvent) } @@ -309,11 +309,11 @@ func walkThread( } // Process the event. - // TODO XXX: if event is not found, use remoteEventRelationships to explore that part of the thread remotely. + // if event is not found, use remoteEventRelationships to explore that part of the thread remotely. // This will probably be easiest if the event relationships response is directly pumped into the database // so the next walk will do the right thing. This requires those events to be authed and likely injected as // outliers into the roomserver DB, which will de-dupe appropriately. - event := rc.lookForEvent(wi.EventID) + event := rc.lookForEvent(wi.EventID, true) if event != nil { result = append(result, event) } @@ -438,7 +438,6 @@ func (rc *reqCtx) remoteEvent(eventID string) *gomatrixserverlib.HeaderedEvent { return nil } -// nolint:unused func (rc *reqCtx) remoteEventRelationships(eventID string) *gomatrixserverlib.MSC2836EventRelationshipsResponse { if rc.isFederatedRequest { return nil // we don't query remote servers for remote requests @@ -458,11 +457,27 @@ func (rc *reqCtx) remoteEventRelationships(eventID string) *gomatrixserverlib.MS } // lookForEvent returns the event for the event ID given, by trying to auto-join rooms if not authorised and by querying remote servers -// if the event ID is unknown. -func (rc *reqCtx) lookForEvent(eventID string) *gomatrixserverlib.HeaderedEvent { +// if the event ID is unknown. If `exploreThread` is true, remote requests will use /event_relationships instead of /event. This is +// desirable when walking the thread, but is not desirable when satisfying include_parent|children flags. +func (rc *reqCtx) lookForEvent(eventID string, exploreThread bool) *gomatrixserverlib.HeaderedEvent { event := rc.getLocalEvent(eventID) if event == nil { + if exploreThread { + queryRes := rc.remoteEventRelationships(eventID) + if queryRes != nil { + // inject all the events into the roomserver then return the event in question + rc.injectResponseToRoomserver(queryRes) + for _, ev := range queryRes.Events { + if ev.EventID() == eventID { + return ev.Headered(ev.Version()) + } + } + } + // if we fail to query /event_relationships or we don't have the event queried in the response, fallthrough + // to do a /event call. + } // this event may have occurred before we joined the room, so delegate to another server to see if they know anything. + // Only ask for this event though. event = rc.remoteEvent(eventID) if event == nil { return nil @@ -523,6 +538,39 @@ func (rc *reqCtx) getLocalEvent(eventID string) *gomatrixserverlib.HeaderedEvent return queryEventsRes.Events[0] } +func (rc *reqCtx) injectResponseToRoomserver(res *gomatrixserverlib.MSC2836EventRelationshipsResponse) { + var stateEvents []*gomatrixserverlib.Event + for _, ev := range res.Events { + if ev.StateKey() != nil { + stateEvents = append(stateEvents, ev) + } + } + respState := gomatrixserverlib.RespState{ + AuthEvents: res.AuthChain, + StateEvents: stateEvents, + } + eventsInOrder, err := respState.Events() + if err != nil { + util.GetLogger(rc.ctx).WithError(err).Error("failed to calculate order to send events in MSC2836EventRelationshipsResponse") + return + } + // everything gets sent as an outlier because auth chain events may be disjoint from the DAG + // as may the threaded events. + var ires []roomserver.InputRoomEvent + for _, outlier := range eventsInOrder { + ires = append(ires, roomserver.InputRoomEvent{ + Kind: roomserver.KindOutlier, + Event: outlier.Headered(outlier.Version()), + AuthEventIDs: outlier.AuthEventIDs(), + }) + } + // we've got the data by this point so use a background context + err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires) + if err != nil { + util.GetLogger(rc.ctx).WithError(err).Error("failed to inject MSC2836EventRelationshipsResponse into the roomserver") + } +} + type walkInfo struct { eventInfo SiblingNumber int