track latestevent on /peek

This commit is contained in:
Matthew Hodgson 2020-09-22 23:42:19 +01:00
parent 41b9b663a5
commit 20e2cb4b7e
8 changed files with 40 additions and 25 deletions

View file

@ -87,14 +87,17 @@ func Peek(
return util.JSONResponse{Code: http.StatusNotFound, JSON: nil} return util.JSONResponse{Code: http.StatusNotFound, JSON: nil}
} }
respPeek := gomatrixserverlib.RespPeek{
StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents),
AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents),
RoomVersion: response.RoomVersion,
LatestEvent: response.LatestEvent.Unwrap(),
RenewalInterval: renewalInterval,
}
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: gomatrixserverlib.RespPeek{ JSON: respPeek,
StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents),
AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents),
RoomVersion: response.RoomVersion,
RenewalInterval: renewalInterval,
},
} }
} }

View file

@ -456,7 +456,7 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser
// pass the event along with the state to the roomserver using a background context so we don't // pass the event along with the state to the roomserver using a background context so we don't
// needlessly expire // needlessly expire
headeredEvent := e.Headered(roomVersion) headeredEvent := e.Headered(roomVersion)
return api.SendEventWithState(context.Background(), t.rsAPI, resolvedState, &headeredEvent, t.haveEventIDs(), roomVersion) return api.SendEventWithState(context.Background(), t.rsAPI, resolvedState, headeredEvent, t.haveEventIDs(), roomVersion)
} }
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event) // lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)

View file

@ -118,6 +118,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// processInboundPeek starts tracking a new federated inbound peek (replacing the existing one if any) // processInboundPeek starts tracking a new federated inbound peek (replacing the existing one if any)
// causing the federationsender to start sending messages to the peeking server // causing the federationsender to start sending messages to the peeking server
func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPeek) error { func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPeek) error {
// FIXME: do something with orp.LatestEventID to prevent races
return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval) return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval)
} }
@ -227,7 +228,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
} }
// handle peeking hosts // handle peeking hosts
inboundPeeks, err := s.db.GetInboundPeeks(context.TODO(), ore.Event.RoomID()) inboundPeeks, err := s.db.GetInboundPeeks(context.TODO(), ore.Event.Event.RoomID())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -357,7 +357,7 @@ func (r *FederationSenderInternalAPI) performOutboundPeekUsingServer(
if err = roomserverAPI.SendEventWithState( if err = roomserverAPI.SendEventWithState(
ctx, r.rsAPI, ctx, r.rsAPI,
&respState, &respState,
nil, nil, respPeek.LatestEvent.Headered(respPeek.RoomVersion), nil,
respPeek.RoomVersion, respPeek.RoomVersion,
); err != nil { ); err != nil {
return fmt.Errorf("r.producer.SendEventWithState: %w", err) return fmt.Errorf("r.producer.SendEventWithState: %w", err)

View file

@ -232,6 +232,10 @@ type OutputNewPeek struct {
type OutputNewInboundPeek struct { type OutputNewInboundPeek struct {
RoomID string RoomID string
PeekID string PeekID string
// the event ID at which the peek begins (so we can avoid
// a race between tracking the state returned by /peek and emitting subsequent
// peeked events)
LatestEventID string
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
// how often we told the peeking server to renew the peek // how often we told the peeking server to renew the peek
RenewalInterval int64 RenewalInterval int64

View file

@ -178,4 +178,6 @@ type PerformInboundPeekResponse struct {
// The lists will be in an arbitrary order. // The lists will be in an arbitrary order.
StateEvents []gomatrixserverlib.HeaderedEvent `json:"state_events"` StateEvents []gomatrixserverlib.HeaderedEvent `json:"state_events"`
AuthChainEvents []gomatrixserverlib.HeaderedEvent `json:"auth_chain_events"` AuthChainEvents []gomatrixserverlib.HeaderedEvent `json:"auth_chain_events"`
// The event at which this state was captured
LatestEvent gomatrixserverlib.HeaderedEvent `json:"latest_event"`
} }

View file

@ -42,11 +42,10 @@ func SendEvents(
// SendEventWithState writes an event with KindNew to the roomserver // SendEventWithState writes an event with KindNew to the roomserver
// with the state at the event as KindOutlier before it. Will not send any event that is // with the state at the event as KindOutlier before it. Will not send any event that is
// marked as `true` in haveEventIDs. The event itself is optional in case // marked as `true` in haveEventIDs.
// hou just want to write outliers to the roomserver.
func SendEventWithState( func SendEventWithState(
ctx context.Context, rsAPI RoomserverInternalAPI, state *gomatrixserverlib.RespState, ctx context.Context, rsAPI RoomserverInternalAPI, state *gomatrixserverlib.RespState,
event *gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool, event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool,
roomVersion gomatrixserverlib.RoomVersion, roomVersion gomatrixserverlib.RoomVersion,
) error { ) error {
outliers, err := state.Events() outliers, err := state.Events()
@ -71,15 +70,13 @@ func SendEventWithState(
stateEventIDs[i] = state.StateEvents[i].EventID() stateEventIDs[i] = state.StateEvents[i].EventID()
} }
if event != nil { ires = append(ires, InputRoomEvent{
ires = append(ires, InputRoomEvent{ Kind: KindNew,
Kind: KindNew, Event: event,
Event: *event, AuthEventIDs: event.AuthEventIDs(),
AuthEventIDs: event.AuthEventIDs(), HasState: true,
HasState: true, StateEventIDs: stateEventIDs,
StateEventIDs: stateEventIDs, })
})
}
return SendInputRoomEvents(ctx, rsAPI, ires) return SendInputRoomEvents(ctx, rsAPI, ires)
} }

View file

@ -58,15 +58,22 @@ func (r *InboundPeeker) PerformInboundPeek(
var stateEvents []gomatrixserverlib.Event var stateEvents []gomatrixserverlib.Event
// XXX: is this right?
roomState := state.NewStateResolution(r.DB, *info)
var currentStateSnapshotNID types.StateSnapshotNID var currentStateSnapshotNID types.StateSnapshotNID
_, currentStateSnapshotNID, _, err = latestEventRefs, currentStateSnapshotNID, _, err :=
r.DB.LatestEventIDs(ctx, info.RoomNID) r.DB.LatestEventIDs(ctx, info.RoomNID)
if err != nil { if err != nil {
return err return err
} }
// XXX: is this actually the latest of the latest events?
latestEvents, err := r.DB.EventsFromIDs(ctx, []string{ latestEventRefs[0].EventID })
if err != nil {
return err
}
response.LatestEvent = latestEvents[0].Headered(info.RoomVersion)
// XXX: do we actually need to do a state resolution here?
roomState := state.NewStateResolution(r.DB, *info)
var stateEntries []types.StateEntry var stateEntries []types.StateEntry
stateEntries, err = roomState.LoadStateAtSnapshot( stateEntries, err = roomState.LoadStateAtSnapshot(
ctx, currentStateSnapshotNID, ctx, currentStateSnapshotNID,
@ -109,6 +116,7 @@ func (r *InboundPeeker) PerformInboundPeek(
NewInboundPeek: &api.OutputNewInboundPeek{ NewInboundPeek: &api.OutputNewInboundPeek{
RoomID: request.RoomID, RoomID: request.RoomID,
PeekID: request.PeekID, PeekID: request.PeekID,
LatestEventID: latestEvents[0].EventID(),
ServerName: request.ServerName, ServerName: request.ServerName,
RenewalInterval: request.RenewalInterval, RenewalInterval: request.RenewalInterval,
}, },