From 20e2cb4b7eb09ce5106f82472ff8bdeaed7d83c4 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 22 Sep 2020 23:42:19 +0100 Subject: [PATCH] track latestevent on /peek --- federationapi/routing/peek.go | 15 +++++++------ federationapi/routing/send.go | 2 +- federationsender/consumers/roomserver.go | 3 ++- federationsender/internal/perform.go | 2 +- roomserver/api/output.go | 4 ++++ roomserver/api/perform.go | 2 ++ roomserver/api/wrapper.go | 21 ++++++++----------- .../internal/perform/perform_inbound_peek.go | 16 ++++++++++---- 8 files changed, 40 insertions(+), 25 deletions(-) diff --git a/federationapi/routing/peek.go b/federationapi/routing/peek.go index 6fd3e8296..65df54079 100644 --- a/federationapi/routing/peek.go +++ b/federationapi/routing/peek.go @@ -87,14 +87,17 @@ func Peek( return util.JSONResponse{Code: http.StatusNotFound, JSON: nil} } + respPeek := gomatrixserverlib.RespPeek{ + StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents), + AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents), + RoomVersion: response.RoomVersion, + LatestEvent: response.LatestEvent.Unwrap(), + RenewalInterval: renewalInterval, + } + return util.JSONResponse{ Code: http.StatusOK, - JSON: gomatrixserverlib.RespPeek{ - StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents), - AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents), - RoomVersion: response.RoomVersion, - RenewalInterval: renewalInterval, - }, + JSON: respPeek, } } diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index de60e59c6..690372414 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -456,7 +456,7 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser // pass the event along with the state to the roomserver using a background context so we don't // needlessly expire headeredEvent := e.Headered(roomVersion) - return api.SendEventWithState(context.Background(), t.rsAPI, resolvedState, &headeredEvent, t.haveEventIDs(), roomVersion) + return api.SendEventWithState(context.Background(), t.rsAPI, resolvedState, headeredEvent, t.haveEventIDs(), roomVersion) } // lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event) diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 182feb49c..fef706865 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -118,6 +118,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // processInboundPeek starts tracking a new federated inbound peek (replacing the existing one if any) // causing the federationsender to start sending messages to the peeking server func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPeek) error { + // FIXME: do something with orp.LatestEventID to prevent races return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval) } @@ -227,7 +228,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent( } // handle peeking hosts - inboundPeeks, err := s.db.GetInboundPeeks(context.TODO(), ore.Event.RoomID()) + inboundPeeks, err := s.db.GetInboundPeeks(context.TODO(), ore.Event.Event.RoomID()) if err != nil { return nil, err } diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go index 120df4e85..ec2fbe1ec 100644 --- a/federationsender/internal/perform.go +++ b/federationsender/internal/perform.go @@ -357,7 +357,7 @@ func (r *FederationSenderInternalAPI) performOutboundPeekUsingServer( if err = roomserverAPI.SendEventWithState( ctx, r.rsAPI, &respState, - nil, nil, + respPeek.LatestEvent.Headered(respPeek.RoomVersion), nil, respPeek.RoomVersion, ); err != nil { return fmt.Errorf("r.producer.SendEventWithState: %w", err) diff --git a/roomserver/api/output.go b/roomserver/api/output.go index be19b8d5f..bc1a55d04 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -232,6 +232,10 @@ type OutputNewPeek struct { type OutputNewInboundPeek struct { RoomID string PeekID string + // the event ID at which the peek begins (so we can avoid + // a race between tracking the state returned by /peek and emitting subsequent + // peeked events) + LatestEventID string ServerName gomatrixserverlib.ServerName // how often we told the peeking server to renew the peek RenewalInterval int64 diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go index 3b002474e..f2556d7b3 100644 --- a/roomserver/api/perform.go +++ b/roomserver/api/perform.go @@ -178,4 +178,6 @@ type PerformInboundPeekResponse struct { // The lists will be in an arbitrary order. StateEvents []gomatrixserverlib.HeaderedEvent `json:"state_events"` AuthChainEvents []gomatrixserverlib.HeaderedEvent `json:"auth_chain_events"` + // The event at which this state was captured + LatestEvent gomatrixserverlib.HeaderedEvent `json:"latest_event"` } \ No newline at end of file diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go index f1e0c40fd..8fc7f138d 100644 --- a/roomserver/api/wrapper.go +++ b/roomserver/api/wrapper.go @@ -42,11 +42,10 @@ func SendEvents( // SendEventWithState writes an event with KindNew to the roomserver // with the state at the event as KindOutlier before it. Will not send any event that is -// marked as `true` in haveEventIDs. The event itself is optional in case -// hou just want to write outliers to the roomserver. +// marked as `true` in haveEventIDs. func SendEventWithState( ctx context.Context, rsAPI RoomserverInternalAPI, state *gomatrixserverlib.RespState, - event *gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool, + event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool, roomVersion gomatrixserverlib.RoomVersion, ) error { outliers, err := state.Events() @@ -71,15 +70,13 @@ func SendEventWithState( stateEventIDs[i] = state.StateEvents[i].EventID() } - if event != nil { - ires = append(ires, InputRoomEvent{ - Kind: KindNew, - Event: *event, - AuthEventIDs: event.AuthEventIDs(), - HasState: true, - StateEventIDs: stateEventIDs, - }) - } + ires = append(ires, InputRoomEvent{ + Kind: KindNew, + Event: event, + AuthEventIDs: event.AuthEventIDs(), + HasState: true, + StateEventIDs: stateEventIDs, + }) return SendInputRoomEvents(ctx, rsAPI, ires) } diff --git a/roomserver/internal/perform/perform_inbound_peek.go b/roomserver/internal/perform/perform_inbound_peek.go index e6520b573..1cb0181ad 100644 --- a/roomserver/internal/perform/perform_inbound_peek.go +++ b/roomserver/internal/perform/perform_inbound_peek.go @@ -58,15 +58,22 @@ func (r *InboundPeeker) PerformInboundPeek( var stateEvents []gomatrixserverlib.Event - // XXX: is this right? - roomState := state.NewStateResolution(r.DB, *info) - var currentStateSnapshotNID types.StateSnapshotNID - _, currentStateSnapshotNID, _, err = + latestEventRefs, currentStateSnapshotNID, _, err := r.DB.LatestEventIDs(ctx, info.RoomNID) if err != nil { return err } + // XXX: is this actually the latest of the latest events? + latestEvents, err := r.DB.EventsFromIDs(ctx, []string{ latestEventRefs[0].EventID }) + if err != nil { + return err + } + response.LatestEvent = latestEvents[0].Headered(info.RoomVersion) + + // XXX: do we actually need to do a state resolution here? + roomState := state.NewStateResolution(r.DB, *info) + var stateEntries []types.StateEntry stateEntries, err = roomState.LoadStateAtSnapshot( ctx, currentStateSnapshotNID, @@ -109,6 +116,7 @@ func (r *InboundPeeker) PerformInboundPeek( NewInboundPeek: &api.OutputNewInboundPeek{ RoomID: request.RoomID, PeekID: request.PeekID, + LatestEventID: latestEvents[0].EventID(), ServerName: request.ServerName, RenewalInterval: request.RenewalInterval, },