Early federationsender code for sending invites

This commit is contained in:
Neil Alexander 2020-04-01 18:21:48 +01:00
parent 72fbd8f388
commit a0fc798978
3 changed files with 109 additions and 18 deletions

View file

@ -79,29 +79,48 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
log.WithError(err).Errorf("roomserver output log: message parse failure") log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil return nil
} }
if output.Type != api.OutputTypeNewRoomEvent {
switch output.Type {
case api.OutputTypeNewRoomEvent:
ev := &output.NewRoomEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"send_as_server": output.NewRoomEvent.SendAsServer,
}).Info("received room event from roomserver")
if err := s.processMessage(*output.NewRoomEvent); err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event": string(ev.JSON()),
"add": output.NewRoomEvent.AddsStateEventIDs,
"del": output.NewRoomEvent.RemovesStateEventIDs,
log.ErrorKey: err,
}).Panicf("roomserver output log: write room event failure")
return nil
}
case api.OutputTypeNewInviteEvent:
ev := &output.NewInviteEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"state_key": ev.StateKey(),
}).Info("received invite event from roomserver")
if err := s.processInvite(*output.NewInviteEvent); err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event": string(ev.JSON()),
log.ErrorKey: err,
}).Panicf("roomserver output log: write invite event failure")
return nil
}
default:
log.WithField("type", output.Type).Debug( log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type", "roomserver output log: ignoring unknown output type",
) )
return nil return nil
} }
ev := &output.NewRoomEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"send_as_server": output.NewRoomEvent.SendAsServer,
}).Info("received event from roomserver")
if err := s.processMessage(*output.NewRoomEvent); err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event": string(ev.JSON()),
log.ErrorKey: err,
"add": output.NewRoomEvent.AddsStateEventIDs,
"del": output.NewRoomEvent.RemovesStateEventIDs,
}).Panicf("roomserver output log: write event failure")
return nil
}
return nil return nil
} }
@ -159,6 +178,12 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
) )
} }
// processInvite handles an invite event for sending over federation.
func (s *OutputRoomEventConsumer) processInvite(oie api.OutputNewInviteEvent) error {
// Send the event.
return s.queues.SendInvite(&oie.Event)
}
// joinedHostsAtEvent works out a list of matrix servers that were joined to // joinedHostsAtEvent works out a list of matrix servers that were joined to
// the room at the event. // the room at the event.
// It is important to use the state at the event for sending messages because: // It is important to use the state at the event for sending messages because:

View file

@ -42,6 +42,7 @@ type destinationQueue struct {
lastTransactionIDs []gomatrixserverlib.TransactionID lastTransactionIDs []gomatrixserverlib.TransactionID
pendingEvents []*gomatrixserverlib.HeaderedEvent pendingEvents []*gomatrixserverlib.HeaderedEvent
pendingEDUs []*gomatrixserverlib.EDU pendingEDUs []*gomatrixserverlib.EDU
pendingInvites []*gomatrixserverlib.HeaderedEvent
} }
// Send event adds the event to the pending queue for the destination. // Send event adds the event to the pending queue for the destination.
@ -70,6 +71,16 @@ func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) {
} }
} }
func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.HeaderedEvent) {
oq.runningMutex.Lock()
defer oq.runningMutex.Unlock()
oq.pendingInvites = append(oq.pendingInvites, ev)
if !oq.running {
oq.running = true
go oq.backgroundSend()
}
}
func (oq *destinationQueue) backgroundSend() { func (oq *destinationQueue) backgroundSend() {
for { for {
t := oq.next() t := oq.next()
@ -101,6 +112,19 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
oq.runningMutex.Lock() oq.runningMutex.Lock()
defer oq.runningMutex.Unlock() defer oq.runningMutex.Unlock()
if len(oq.pendingInvites) > 0 {
for _, invite := range oq.pendingInvites {
if _, err := oq.client.SendInvite(context.TODO(), oq.destination, invite.Unwrap()); err != nil {
log.WithFields(log.Fields{
"event_id": invite.EventID(),
"state_key": invite.StateKey(),
"destination": oq.destination,
}).Info("failed to send invite")
}
}
oq.pendingInvites = oq.pendingInvites[:0]
}
if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 { if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 {
oq.running = false oq.running = false
return nil return nil

View file

@ -80,6 +80,48 @@ func (oqs *OutgoingQueues) SendEvent(
return nil return nil
} }
// SendEvent sends an event to the destinations
func (oqs *OutgoingQueues) SendInvite(
ev *gomatrixserverlib.HeaderedEvent,
) error {
stateKey := ev.StateKey()
if stateKey == nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
}).Info("invite had no state key, dropping")
return nil
}
_, destination, err := gomatrixserverlib.SplitID('@', *stateKey)
if err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"state_key": stateKey,
}).Info("failed to split destination from state key")
return nil
}
log.WithFields(log.Fields{
"event_id": ev.EventID(),
}).Info("Sending invite")
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
oq := oqs.queues[destination]
if oq == nil {
oq = &destinationQueue{
origin: oqs.origin,
destination: destination,
client: oqs.client,
}
oqs.queues[destination] = oq
}
oq.sendInvite(ev)
return nil
}
// SendEDU sends an EDU event to the destinations // SendEDU sends an EDU event to the destinations
func (oqs *OutgoingQueues) SendEDU( func (oqs *OutgoingQueues) SendEDU(
e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName, e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName,