initial cut of federated threading

This commit is contained in:
Kegan Dougal 2020-11-24 11:01:12 +00:00
parent 7bfd7bbeb1
commit 0e911a0eea

View file

@ -256,7 +256,7 @@ func (rc *reqCtx) includeParent(childEvent *gomatrixserverlib.HeaderedEvent) (pa
if parentID == "" {
return nil
}
return rc.lookForEvent(parentID)
return rc.lookForEvent(parentID, false)
}
// If include_children: true, lookup all events which have event_id as an m.relationship
@ -271,7 +271,7 @@ func (rc *reqCtx) includeChildren(db Database, parentID string, limit int, recen
}
var childEvents []*gomatrixserverlib.HeaderedEvent
for _, child := range children {
childEvent := rc.lookForEvent(child.EventID)
childEvent := rc.lookForEvent(child.EventID, false)
if childEvent != nil {
childEvents = append(childEvents, childEvent)
}
@ -309,11 +309,11 @@ func walkThread(
}
// Process the event.
// TODO XXX: if event is not found, use remoteEventRelationships to explore that part of the thread remotely.
// if event is not found, use remoteEventRelationships to explore that part of the thread remotely.
// This will probably be easiest if the event relationships response is directly pumped into the database
// so the next walk will do the right thing. This requires those events to be authed and likely injected as
// outliers into the roomserver DB, which will de-dupe appropriately.
event := rc.lookForEvent(wi.EventID)
event := rc.lookForEvent(wi.EventID, true)
if event != nil {
result = append(result, event)
}
@ -438,7 +438,6 @@ func (rc *reqCtx) remoteEvent(eventID string) *gomatrixserverlib.HeaderedEvent {
return nil
}
// nolint:unused
func (rc *reqCtx) remoteEventRelationships(eventID string) *gomatrixserverlib.MSC2836EventRelationshipsResponse {
if rc.isFederatedRequest {
return nil // we don't query remote servers for remote requests
@ -458,11 +457,27 @@ func (rc *reqCtx) remoteEventRelationships(eventID string) *gomatrixserverlib.MS
}
// lookForEvent returns the event for the event ID given, by trying to auto-join rooms if not authorised and by querying remote servers
// if the event ID is unknown.
func (rc *reqCtx) lookForEvent(eventID string) *gomatrixserverlib.HeaderedEvent {
// if the event ID is unknown. If `exploreThread` is true, remote requests will use /event_relationships instead of /event. This is
// desirable when walking the thread, but is not desirable when satisfying include_parent|children flags.
func (rc *reqCtx) lookForEvent(eventID string, exploreThread bool) *gomatrixserverlib.HeaderedEvent {
event := rc.getLocalEvent(eventID)
if event == nil {
if exploreThread {
queryRes := rc.remoteEventRelationships(eventID)
if queryRes != nil {
// inject all the events into the roomserver then return the event in question
rc.injectResponseToRoomserver(queryRes)
for _, ev := range queryRes.Events {
if ev.EventID() == eventID {
return ev.Headered(ev.Version())
}
}
}
// if we fail to query /event_relationships or we don't have the event queried in the response, fallthrough
// to do a /event call.
}
// this event may have occurred before we joined the room, so delegate to another server to see if they know anything.
// Only ask for this event though.
event = rc.remoteEvent(eventID)
if event == nil {
return nil
@ -523,6 +538,39 @@ func (rc *reqCtx) getLocalEvent(eventID string) *gomatrixserverlib.HeaderedEvent
return queryEventsRes.Events[0]
}
func (rc *reqCtx) injectResponseToRoomserver(res *gomatrixserverlib.MSC2836EventRelationshipsResponse) {
var stateEvents []*gomatrixserverlib.Event
for _, ev := range res.Events {
if ev.StateKey() != nil {
stateEvents = append(stateEvents, ev)
}
}
respState := gomatrixserverlib.RespState{
AuthEvents: res.AuthChain,
StateEvents: stateEvents,
}
eventsInOrder, err := respState.Events()
if err != nil {
util.GetLogger(rc.ctx).WithError(err).Error("failed to calculate order to send events in MSC2836EventRelationshipsResponse")
return
}
// everything gets sent as an outlier because auth chain events may be disjoint from the DAG
// as may the threaded events.
var ires []roomserver.InputRoomEvent
for _, outlier := range eventsInOrder {
ires = append(ires, roomserver.InputRoomEvent{
Kind: roomserver.KindOutlier,
Event: outlier.Headered(outlier.Version()),
AuthEventIDs: outlier.AuthEventIDs(),
})
}
// we've got the data by this point so use a background context
err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires)
if err != nil {
util.GetLogger(rc.ctx).WithError(err).Error("failed to inject MSC2836EventRelationshipsResponse into the roomserver")
}
}
type walkInfo struct {
eventInfo
SiblingNumber int