From 0ab4bc9e8ef536e722ef8aeda8d04a9d2598f6c5 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 23 Sep 2020 00:11:58 +0100 Subject: [PATCH] document the peek send stream race better --- federationsender/consumers/roomserver.go | 10 +++++++++- roomserver/internal/perform/perform_inbound_peek.go | 4 ---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 5663cc473..4d5e2ab4d 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -118,7 +118,15 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // processInboundPeek starts tracking a new federated inbound peek (replacing the existing one if any) // causing the federationsender to start sending messages to the peeking server func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPeek) error { - // FIXME: do something with orp.LatestEventID to prevent races + + // FIXME: there's a race here - we should start /sending new peeked events + // atomically after the orp.LatestEventID to ensure there are no gaps between + // the peek beginning and the send stream beginning. + // + // We probably need to track orp.LatestEventID on the inbound peek, but it's + // unclear how we then use that to prevent the race when we start the send + // stream. + return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval) } diff --git a/roomserver/internal/perform/perform_inbound_peek.go b/roomserver/internal/perform/perform_inbound_peek.go index 99342a90f..8599ad204 100644 --- a/roomserver/internal/perform/perform_inbound_peek.go +++ b/roomserver/internal/perform/perform_inbound_peek.go @@ -106,10 +106,6 @@ func (r *InboundPeeker) PerformInboundPeek( response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion)) } - // FIXME: there's a race here - we really should be atomically telling the - // federationsender to start sending peek events alongside having captured - // the current state, but it's unclear if/how we can do that. - err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{ { Type: api.OutputTypeNewInboundPeek,