Shuffle things around a bit

This commit is contained in:
Neil Alexander 2020-07-01 10:12:14 +01:00
parent eaa7a679ac
commit 3419a7450b

View file

@ -51,7 +51,6 @@ type destinationQueue struct {
incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
transactionID gomatrixserverlib.TransactionID // last transaction ID transactionID gomatrixserverlib.TransactionID // last transaction ID
transactionCount int // how many events in this transaction so far transactionCount int // how many events in this transaction so far
pendingPDUs []*gomatrixserverlib.HeaderedEvent // owned by backgroundSend
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
retryServerCh chan bool // interrupts backoff retryServerCh chan bool // interrupts backoff
@ -160,85 +159,55 @@ func (oq *destinationQueue) backgroundSend() {
} }
defer oq.running.Store(false) defer oq.running.Store(false)
// Since the PDU buffer can be rehydrated from the database, we
// dont need to keep it in memory after the worker stops running.
// transactionID and transactionCount are enough to help queue up
// additional event in the same transaction.
defer oq.cleanPendingPDUs()
for { for {
pendingPDUs := false
// For now we don't know the next transaction ID. Set it to an // For now we don't know the next transaction ID. Set it to an
// empty one. The next step will populate it if we have pending // empty one. The next step will populate it if we have pending
// PDUs in the database. Otherwise we'll generate one later on, // PDUs in the database. Otherwise we'll generate one later on,
// e.g. in response to EDUs. // e.g. in response to EDUs.
transactionID := gomatrixserverlib.TransactionID("") transactionID := gomatrixserverlib.TransactionID("")
// Retrieve the events in the next transaction. Note that this
// does *not* necessarily mean we will fill the buffer - the
// important thing here is that we still continue to send
// transactions in order.
hydrate := func() {
txid, pdus, err := oq.db.GetNextTransactionPDUs(
context.TODO(), // context
oq.destination, // server name
maxPDUsPerTransaction-len(oq.pendingPDUs), // how many events to retrieve
)
if err != nil {
log.WithError(err).Errorf("failed to get next transaction PDUs for server %q", oq.destination)
return
}
transactionID = txid
oq.pendingPDUs = append(oq.pendingPDUs, pdus...)
}
// If we haven't reached the PDU limit yet then rehydrate the
// PDU queue from the database.
if len(oq.pendingPDUs) < maxPDUsPerTransaction {
hydrate()
}
// Wait either for incoming events, or until we hit an // Wait either for incoming events, or until we hit an
// idle timeout. // idle timeout.
if len(oq.pendingPDUs) == 0 { select {
select { case <-oq.incomingPDUs:
case <-oq.incomingPDUs: // There are new PDUs waiting in the database.
// There are new PDUs waiting in the database. pendingPDUs = true
hydrate() case edu := <-oq.incomingEDUs:
case edu := <-oq.incomingEDUs: // EDUs are handled in-memory for now. We will try to keep
// EDUs are handled in-memory for now. We will try to keep // the ordering intact.
// the ordering intact. // TODO: Certain EDU types need persistence, e.g. send-to-device
// TODO: Certain EDU types need persistence, e.g. send-to-device oq.pendingEDUs = append(oq.pendingEDUs, edu)
oq.pendingEDUs = append(oq.pendingEDUs, edu) // If there are any more things waiting in the channel queue
// If there are any more things waiting in the channel queue // then read them. This is safe because we guarantee only
// then read them. This is safe because we guarantee only // having one goroutine per destination queue, so the channel
// having one goroutine per destination queue, so the channel // isn't being consumed anywhere else.
// isn't being consumed anywhere else. for len(oq.incomingEDUs) > 0 {
for len(oq.incomingEDUs) > 0 { oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
}
case invite := <-oq.incomingInvites:
// There's no strict ordering requirement for invites like
// there is for transactions, so we put the invite onto the
// front of the queue. This means that if an invite that is
// stuck failing already, that it won't block our new invite
// from being sent.
oq.pendingInvites = append(
[]*gomatrixserverlib.InviteV2Request{invite},
oq.pendingInvites...,
)
// If there are any more things waiting in the channel queue
// then read them. This is safe because we guarantee only
// having one goroutine per destination queue, so the channel
// isn't being consumed anywhere else.
for len(oq.incomingInvites) > 0 {
oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
}
case <-time.After(time.Second * 30):
// The worker is idle so stop the goroutine. It'll get
// restarted automatically the next time we have an event to
// send.
return
} }
case invite := <-oq.incomingInvites:
// There's no strict ordering requirement for invites like
// there is for transactions, so we put the invite onto the
// front of the queue. This means that if an invite that is
// stuck failing already, that it won't block our new invite
// from being sent.
oq.pendingInvites = append(
[]*gomatrixserverlib.InviteV2Request{invite},
oq.pendingInvites...,
)
// If there are any more things waiting in the channel queue
// then read them. This is safe because we guarantee only
// having one goroutine per destination queue, so the channel
// isn't being consumed anywhere else.
for len(oq.incomingInvites) > 0 {
oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
}
case <-time.After(time.Second * 30):
// The worker is idle so stop the goroutine. It'll get
// restarted automatically the next time we have an event to
// send.
return
} }
// If we are backing off this server then wait for the // If we are backing off this server then wait for the
@ -254,7 +223,7 @@ func (oq *destinationQueue) backgroundSend() {
} }
// If we have pending PDUs or EDUs then construct a transaction. // If we have pending PDUs or EDUs then construct a transaction.
if len(oq.pendingPDUs) > 0 || len(oq.pendingEDUs) > 0 { if pendingPDUs || len(oq.pendingEDUs) > 0 {
// If we haven't got a transaction ID then we should generate // If we haven't got a transaction ID then we should generate
// one. Ideally we'd know this already because something queued // one. Ideally we'd know this already because something queued
// in the database would give us one, but if we're dealing with // in the database would give us one, but if we're dealing with
@ -265,7 +234,7 @@ func (oq *destinationQueue) backgroundSend() {
} }
// Try sending the next transaction and see what happens. // Try sending the next transaction and see what happens.
transaction, terr := oq.nextTransaction(transactionID, oq.pendingPDUs, oq.pendingEDUs) transaction, terr := oq.nextTransaction(transactionID, oq.pendingEDUs)
if terr != nil { if terr != nil {
// We failed to send the transaction. // We failed to send the transaction.
if giveUp := oq.statistics.Failure(); giveUp { if giveUp := oq.statistics.Failure(); giveUp {
@ -282,7 +251,6 @@ func (oq *destinationQueue) backgroundSend() {
oq.statistics.Success() oq.statistics.Success()
oq.transactionID = "" oq.transactionID = ""
// Clean up the in-memory buffers. // Clean up the in-memory buffers.
oq.cleanPendingPDUs()
oq.cleanPendingEDUs() oq.cleanPendingEDUs()
// Clean up the transaction in the database. // Clean up the transaction in the database.
if err := oq.db.CleanTransactionPDUs( if err := oq.db.CleanTransactionPDUs(
@ -318,15 +286,6 @@ func (oq *destinationQueue) backgroundSend() {
} }
} }
// cleanPendingPDUs cleans out the pending PDU buffer, removing
// all references so that the underlying objects can be GC'd.
func (oq *destinationQueue) cleanPendingPDUs() {
for i := 0; i < len(oq.pendingPDUs); i++ {
oq.pendingPDUs[i] = nil
}
oq.pendingPDUs = []*gomatrixserverlib.HeaderedEvent{}
}
// cleanPendingEDUs cleans out the pending EDU buffer, removing // cleanPendingEDUs cleans out the pending EDU buffer, removing
// all references so that the underlying objects can be GC'd. // all references so that the underlying objects can be GC'd.
func (oq *destinationQueue) cleanPendingEDUs() { func (oq *destinationQueue) cleanPendingEDUs() {
@ -351,19 +310,37 @@ func (oq *destinationQueue) cleanPendingInvites() {
// false otherwise. // false otherwise.
func (oq *destinationQueue) nextTransaction( func (oq *destinationQueue) nextTransaction(
transactionID gomatrixserverlib.TransactionID, transactionID gomatrixserverlib.TransactionID,
pendingPDUs []*gomatrixserverlib.HeaderedEvent,
pendingEDUs []*gomatrixserverlib.EDU, pendingEDUs []*gomatrixserverlib.EDU,
) (bool, error) { ) (bool, error) {
t := gomatrixserverlib.Transaction{ t := gomatrixserverlib.Transaction{
PDUs: []json.RawMessage{}, PDUs: []json.RawMessage{},
EDUs: []gomatrixserverlib.EDU{}, EDUs: []gomatrixserverlib.EDU{},
} }
t.TransactionID = transactionID
t.Origin = oq.origin t.Origin = oq.origin
t.Destination = oq.destination t.Destination = oq.destination
t.OriginServerTS = gomatrixserverlib.AsTimestamp(time.Now()) t.OriginServerTS = gomatrixserverlib.AsTimestamp(time.Now())
for _, pdu := range pendingPDUs { txid, pdus, err := oq.db.GetNextTransactionPDUs(
context.TODO(), // context
oq.destination, // server name
maxPDUsPerTransaction, // how many events to retrieve
)
if err != nil {
log.WithError(err).Errorf("failed to get next transaction PDUs for server %q", oq.destination)
return false, fmt.Errorf("oq.db.GetNextTransactionPDUs: %w", err)
}
if txid != "" {
// The database supplied us with a transaction ID to use
// from a failed PDU so use that.
t.TransactionID = txid
} else {
// Otherwise, use the one that the function call gave us.
// This would happen if it's EDUs only.
t.TransactionID = transactionID
}
for _, pdu := range pdus {
// Append the JSON of the event, since this is a json.RawMessage type in the // Append the JSON of the event, since this is a json.RawMessage type in the
// gomatrixserverlib.Transaction struct // gomatrixserverlib.Transaction struct
t.PDUs = append(t.PDUs, (*pdu).JSON()) t.PDUs = append(t.PDUs, (*pdu).JSON())
@ -378,7 +355,7 @@ func (oq *destinationQueue) nextTransaction(
// TODO: we should check for 500-ish fails vs 400-ish here, // TODO: we should check for 500-ish fails vs 400-ish here,
// since we shouldn't queue things indefinitely in response // since we shouldn't queue things indefinitely in response
// to a 400-ish error // to a 400-ish error
_, err := oq.client.SendTransaction(context.TODO(), t) _, err = oq.client.SendTransaction(context.TODO(), t)
switch e := err.(type) { switch e := err.(type) {
case nil: case nil:
// No error was returned so the transaction looks to have // No error was returned so the transaction looks to have