From a0fc7989784ea393f22fcd1db2fb6470e8e19326 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 1 Apr 2020 18:21:48 +0100 Subject: [PATCH] Early federationsender code for sending invites --- federationsender/consumers/roomserver.go | 61 +++++++++++++++------- federationsender/queue/destinationqueue.go | 24 +++++++++ federationsender/queue/queue.go | 42 +++++++++++++++ 3 files changed, 109 insertions(+), 18 deletions(-) diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 8ab2affe2..e43294ae0 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -79,29 +79,48 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { log.WithError(err).Errorf("roomserver output log: message parse failure") return nil } - if output.Type != api.OutputTypeNewRoomEvent { + + switch output.Type { + case api.OutputTypeNewRoomEvent: + ev := &output.NewRoomEvent.Event + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_id": ev.RoomID(), + "send_as_server": output.NewRoomEvent.SendAsServer, + }).Info("received room event from roomserver") + + if err := s.processMessage(*output.NewRoomEvent); err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + "add": output.NewRoomEvent.AddsStateEventIDs, + "del": output.NewRoomEvent.RemovesStateEventIDs, + log.ErrorKey: err, + }).Panicf("roomserver output log: write room event failure") + return nil + } + case api.OutputTypeNewInviteEvent: + ev := &output.NewInviteEvent.Event + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_id": ev.RoomID(), + "state_key": ev.StateKey(), + }).Info("received invite event from roomserver") + + if err := s.processInvite(*output.NewInviteEvent); err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + log.ErrorKey: err, + }).Panicf("roomserver output log: write invite event failure") + return nil + } + default: log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", ) return nil } - ev := &output.NewRoomEvent.Event - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "room_id": ev.RoomID(), - "send_as_server": output.NewRoomEvent.SendAsServer, - }).Info("received event from roomserver") - - if err := s.processMessage(*output.NewRoomEvent); err != nil { - // panic rather than continue with an inconsistent database - log.WithFields(log.Fields{ - "event": string(ev.JSON()), - log.ErrorKey: err, - "add": output.NewRoomEvent.AddsStateEventIDs, - "del": output.NewRoomEvent.RemovesStateEventIDs, - }).Panicf("roomserver output log: write event failure") - return nil - } return nil } @@ -159,6 +178,12 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err ) } +// processInvite handles an invite event for sending over federation. +func (s *OutputRoomEventConsumer) processInvite(oie api.OutputNewInviteEvent) error { + // Send the event. + return s.queues.SendInvite(&oie.Event) +} + // joinedHostsAtEvent works out a list of matrix servers that were joined to // the room at the event. // It is important to use the state at the event for sending messages because: diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index b4a6da1a3..04ca8a631 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -42,6 +42,7 @@ type destinationQueue struct { lastTransactionIDs []gomatrixserverlib.TransactionID pendingEvents []*gomatrixserverlib.HeaderedEvent pendingEDUs []*gomatrixserverlib.EDU + pendingInvites []*gomatrixserverlib.HeaderedEvent } // Send event adds the event to the pending queue for the destination. @@ -70,6 +71,16 @@ func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) { } } +func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.HeaderedEvent) { + oq.runningMutex.Lock() + defer oq.runningMutex.Unlock() + oq.pendingInvites = append(oq.pendingInvites, ev) + if !oq.running { + oq.running = true + go oq.backgroundSend() + } +} + func (oq *destinationQueue) backgroundSend() { for { t := oq.next() @@ -101,6 +112,19 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { oq.runningMutex.Lock() defer oq.runningMutex.Unlock() + if len(oq.pendingInvites) > 0 { + for _, invite := range oq.pendingInvites { + if _, err := oq.client.SendInvite(context.TODO(), oq.destination, invite.Unwrap()); err != nil { + log.WithFields(log.Fields{ + "event_id": invite.EventID(), + "state_key": invite.StateKey(), + "destination": oq.destination, + }).Info("failed to send invite") + } + } + oq.pendingInvites = oq.pendingInvites[:0] + } + if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 { oq.running = false return nil diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 840fe4afe..9de253318 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -80,6 +80,48 @@ func (oqs *OutgoingQueues) SendEvent( return nil } +// SendEvent sends an event to the destinations +func (oqs *OutgoingQueues) SendInvite( + ev *gomatrixserverlib.HeaderedEvent, +) error { + stateKey := ev.StateKey() + if stateKey == nil { + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + }).Info("invite had no state key, dropping") + return nil + } + + _, destination, err := gomatrixserverlib.SplitID('@', *stateKey) + if err != nil { + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "state_key": stateKey, + }).Info("failed to split destination from state key") + return nil + } + + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + }).Info("Sending invite") + + oqs.queuesMutex.Lock() + defer oqs.queuesMutex.Unlock() + oq := oqs.queues[destination] + if oq == nil { + oq = &destinationQueue{ + origin: oqs.origin, + destination: destination, + client: oqs.client, + } + oqs.queues[destination] = oq + } + + oq.sendInvite(ev) + + return nil +} + // SendEDU sends an EDU event to the destinations func (oqs *OutgoingQueues) SendEDU( e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName,