This commit is contained in:
Mark Haines 2017-06-27 16:46:19 +01:00
parent 589c198ef8
commit c48e0eda71

View file

@ -65,6 +65,8 @@ func (oqs *OutgoingQueues) SendEvent(
return nil return nil
} }
// filterDestinations removes our own server from the list of destinations.
// Otherwise we could end up trying to talk to ourselves.
func filterDestinations(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) []gomatrixserverlib.ServerName { func filterDestinations(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) []gomatrixserverlib.ServerName {
var result []gomatrixserverlib.ServerName var result []gomatrixserverlib.ServerName
for _, destination := range destinations { for _, destination := range destinations {
@ -76,6 +78,10 @@ func filterDestinations(origin gomatrixserverlib.ServerName, destinations []goma
return result return result
} }
// outgoingQueue is a queue of events for a single destination.
// It is responsible for sending the events to the destination and
// ensures that only one request is in flight to a given destination
// at a time.
type outgoingQueue struct { type outgoingQueue struct {
mutex sync.Mutex mutex sync.Mutex
client *gomatrixserverlib.FederationClient client *gomatrixserverlib.FederationClient
@ -87,6 +93,9 @@ type outgoingQueue struct {
pendingEvents []*gomatrixserverlib.Event pendingEvents []*gomatrixserverlib.Event
} }
// Send event adds the event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending events to that destination.
func (oq *outgoingQueue) sendEvent(ev *gomatrixserverlib.Event) { func (oq *outgoingQueue) sendEvent(ev *gomatrixserverlib.Event) {
oq.mutex.Lock() oq.mutex.Lock()
defer oq.mutex.Unlock() defer oq.mutex.Unlock()
@ -118,6 +127,9 @@ func (oq *outgoingQueue) backgroundSend() {
} }
} }
// next creates a new transaction from the pending event queue
// and flushes the queue.
// Returns nil if the queue was empty.
func (oq *outgoingQueue) next() *gomatrixserverlib.Transaction { func (oq *outgoingQueue) next() *gomatrixserverlib.Transaction {
oq.mutex.Lock() oq.mutex.Lock()
defer oq.mutex.Unlock() defer oq.mutex.Unlock()