From f26dad5dcedf0212bd9392ba650131c2b47e403b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 28 Jun 2017 13:59:35 +0100 Subject: [PATCH] Review comments --- .../consumers/roomserver_test.go | 1 - .../queue/destinationqueue.go | 102 ++++++++++++++++++ .../dendrite/federationsender/queue/queue.go | 95 ++-------------- 3 files changed, 111 insertions(+), 87 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver_test.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver_test.go index d16310c07..bb659b9cd 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver_test.go +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver_test.go @@ -50,5 +50,4 @@ func TestCombineDedup(t *testing.T) { if len(gotDel) != 1 || gotDel[0] != "b" { t.Errorf("wanted combined removes to be %#v, got %#v", []string{"b"}, gotDel) } - } diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go new file mode 100644 index 000000000..c1bce8efc --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go @@ -0,0 +1,102 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "fmt" + "sync" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/gomatrixserverlib" +) + +// destinationQueue is a queue of events for a single destination. +// It is responsible for sending the events to the destination and +// ensures that only one request is in flight to a given destination +// at a time. +type destinationQueue struct { + mutex sync.Mutex + client *gomatrixserverlib.FederationClient + origin gomatrixserverlib.ServerName + destination gomatrixserverlib.ServerName + running bool + sentCounter int + lastTransactionIDs []gomatrixserverlib.TransactionID + pendingEvents []*gomatrixserverlib.Event +} + +// Send event adds the event to the pending queue for the destination. +// If the queue is empty then it starts a background goroutine to +// start sending events to that destination. +func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) { + oq.mutex.Lock() + defer oq.mutex.Unlock() + oq.pendingEvents = append(oq.pendingEvents, ev) + if !oq.running { + go oq.backgroundSend() + } +} + +func (oq *destinationQueue) backgroundSend() { + for { + t := oq.next() + if t == nil { + // If the queue is empty then stop processing for this destination. + // TODO: Remove this destination from the queue map. + return + } + + // TODO: handle retries. + // TODO: blacklist uncooperative servers. + + _, err := oq.client.SendTransaction(*t) + if err != nil { + log.WithFields(log.Fields{ + "destination": oq.destination, + log.ErrorKey: err, + }).Info("problem sending transaction") + } + } +} + +// next creates a new transaction from the pending event queue +// and flushes the queue. +// Returns nil if the queue was empty. +func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { + oq.mutex.Lock() + defer oq.mutex.Unlock() + if len(oq.pendingEvents) == 0 { + oq.running = false + return nil + } + var t gomatrixserverlib.Transaction + now := gomatrixserverlib.AsTimestamp(time.Now()) + t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.sentCounter)) + t.Origin = oq.origin + t.Destination = oq.destination + t.OriginServerTS = now + t.PreviousIDs = oq.lastTransactionIDs + if t.PreviousIDs == nil { + t.PreviousIDs = []gomatrixserverlib.TransactionID{} + } + oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID} + for _, pdu := range oq.pendingEvents { + t.PDUs = append(t.PDUs, *pdu) + } + oq.pendingEvents = nil + oq.sentCounter += len(t.PDUs) + return &t +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go index 473c8634c..52692d92d 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go @@ -16,17 +16,17 @@ package queue import ( "fmt" + "sync" + log "github.com/Sirupsen/logrus" "github.com/matrix-org/gomatrixserverlib" - "sync" - "time" ) // OutgoingQueues is a collection of queues for sending transactions to other // matrix servers type OutgoingQueues struct { mutex sync.Mutex - queues map[gomatrixserverlib.ServerName]*outgoingQueue + queues map[gomatrixserverlib.ServerName]*destinationQueue origin gomatrixserverlib.ServerName client *gomatrixserverlib.FederationClient } @@ -36,7 +36,7 @@ func NewOutgoingQueues(origin gomatrixserverlib.ServerName, client *gomatrixserv return &OutgoingQueues{ origin: origin, client: client, - queues: map[gomatrixserverlib.ServerName]*outgoingQueue{}, + queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, } } @@ -46,6 +46,10 @@ func (oqs *OutgoingQueues) SendEvent( destinations []gomatrixserverlib.ServerName, ) error { if origin != oqs.origin { + // TODO: Support virtual hosting by allowing us to send events using + // different origin server names. + // For now assume we are always asked to send as the single server configured + // in the dendrite config. return fmt.Errorf( "sendevent: unexpected server to send as: got %q expected %q", origin, oqs.origin, @@ -62,12 +66,9 @@ func (oqs *OutgoingQueues) SendEvent( oqs.mutex.Lock() defer oqs.mutex.Unlock() for _, destination := range destinations { - if destination == oqs.origin { - continue - } oq := oqs.queues[destination] if oq == nil { - oq = &outgoingQueue{ + oq = &destinationQueue{ origin: oqs.origin, destination: destination, client: oqs.client, @@ -91,81 +92,3 @@ func filterDestinations(origin gomatrixserverlib.ServerName, destinations []goma } return result } - -// outgoingQueue is a queue of events for a single destination. -// It is responsible for sending the events to the destination and -// ensures that only one request is in flight to a given destination -// at a time. -type outgoingQueue struct { - mutex sync.Mutex - client *gomatrixserverlib.FederationClient - origin gomatrixserverlib.ServerName - destination gomatrixserverlib.ServerName - running bool - sentCounter int - lastTransactionIDs []gomatrixserverlib.TransactionID - pendingEvents []*gomatrixserverlib.Event -} - -// Send event adds the event to the pending queue for the destination. -// If the queue is empty then it starts a background goroutine to -// start sending events to that destination. -func (oq *outgoingQueue) sendEvent(ev *gomatrixserverlib.Event) { - oq.mutex.Lock() - defer oq.mutex.Unlock() - oq.pendingEvents = append(oq.pendingEvents, ev) - if !oq.running { - go oq.backgroundSend() - } -} - -func (oq *outgoingQueue) backgroundSend() { - for { - t := oq.next() - if t == nil { - // If the queue is empty then stop processing for this destination. - // TODO: Remove this destination from the queue map. - return - } - - // TODO: handle retries. - // TODO: blacklist uncooperative servers. - - _, err := oq.client.SendTransaction(*t) - if err != nil { - log.WithFields(log.Fields{ - "destination": oq.destination, - log.ErrorKey: err, - }).Info("problem sending transaction") - } - } -} - -// next creates a new transaction from the pending event queue -// and flushes the queue. -// Returns nil if the queue was empty. -func (oq *outgoingQueue) next() *gomatrixserverlib.Transaction { - oq.mutex.Lock() - defer oq.mutex.Unlock() - if len(oq.pendingEvents) == 0 { - oq.running = false - return nil - } - var t gomatrixserverlib.Transaction - now := gomatrixserverlib.AsTimestamp(time.Now()) - t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.sentCounter)) - t.Origin = oq.origin - t.Destination = oq.destination - t.OriginServerTS = now - t.PreviousIDs = oq.lastTransactionIDs - if t.PreviousIDs == nil { - t.PreviousIDs = []gomatrixserverlib.TransactionID{} - } - oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID} - for _, pdu := range oq.pendingEvents { - t.PDUs = append(t.PDUs, *pdu) - } - oq.pendingEvents = nil - oq.sentCounter += len(t.PDUs) - return &t -}