mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-01-19 10:24:27 -06:00
b189edf4f4
Replaced with types.HeaderedEvent _for now_. In reality we want to move them all to gmsl.Event and only use HeaderedEvent when we _need_ to bundle the version/event ID with the event (seriailsation boundaries, and even then only when we don't have the room version). Requires https://github.com/matrix-org/gomatrixserverlib/pull/373
605 lines
20 KiB
Go
605 lines
20 KiB
Go
// Copyright 2017 Vector Creations Ltd
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package queue
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/matrix-org/gomatrix"
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
"github.com/matrix-org/gomatrixserverlib/fclient"
|
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
|
"github.com/sirupsen/logrus"
|
|
"go.uber.org/atomic"
|
|
|
|
"github.com/matrix-org/dendrite/federationapi/statistics"
|
|
"github.com/matrix-org/dendrite/federationapi/storage"
|
|
"github.com/matrix-org/dendrite/federationapi/storage/shared/receipt"
|
|
"github.com/matrix-org/dendrite/roomserver/api"
|
|
"github.com/matrix-org/dendrite/roomserver/types"
|
|
"github.com/matrix-org/dendrite/setup/process"
|
|
)
|
|
|
|
const (
|
|
maxPDUsPerTransaction = 50
|
|
maxEDUsPerTransaction = 100
|
|
maxPDUsInMemory = 128
|
|
maxEDUsInMemory = 128
|
|
queueIdleTimeout = time.Second * 30
|
|
)
|
|
|
|
// destinationQueue is a queue of events for a single destination.
|
|
// It is responsible for sending the events to the destination and
|
|
// ensures that only one request is in flight to a given destination
|
|
// at a time.
|
|
type destinationQueue struct {
|
|
queues *OutgoingQueues
|
|
db storage.Database
|
|
process *process.ProcessContext
|
|
signing map[spec.ServerName]*fclient.SigningIdentity
|
|
rsAPI api.FederationRoomserverAPI
|
|
client fclient.FederationClient // federation client
|
|
origin spec.ServerName // origin of requests
|
|
destination spec.ServerName // destination of requests
|
|
running atomic.Bool // is the queue worker running?
|
|
backingOff atomic.Bool // true if we're backing off
|
|
overflowed atomic.Bool // the queues exceed maxPDUsInMemory/maxEDUsInMemory, so we should consult the database for more
|
|
statistics *statistics.ServerStatistics // statistics about this remote server
|
|
transactionIDMutex sync.Mutex // protects transactionID
|
|
transactionID gomatrixserverlib.TransactionID // last transaction ID if retrying, or "" if last txn was successful
|
|
notify chan struct{} // interrupts idle wait pending PDUs/EDUs
|
|
pendingPDUs []*queuedPDU // PDUs waiting to be sent
|
|
pendingEDUs []*queuedEDU // EDUs waiting to be sent
|
|
pendingMutex sync.RWMutex // protects pendingPDUs and pendingEDUs
|
|
}
|
|
|
|
// Send event adds the event to the pending queue for the destination.
|
|
// If the queue is empty then it starts a background goroutine to
|
|
// start sending events to that destination.
|
|
func (oq *destinationQueue) sendEvent(event *types.HeaderedEvent, dbReceipt *receipt.Receipt) {
|
|
if event == nil {
|
|
logrus.Errorf("attempt to send nil PDU with destination %q", oq.destination)
|
|
return
|
|
}
|
|
|
|
// Check if the destination is blacklisted. If it isn't then wake
|
|
// up the queue.
|
|
if !oq.statistics.Blacklisted() {
|
|
// If there's room in memory to hold the event then add it to the
|
|
// list.
|
|
oq.pendingMutex.Lock()
|
|
if len(oq.pendingPDUs) < maxPDUsInMemory {
|
|
oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{
|
|
pdu: event,
|
|
dbReceipt: dbReceipt,
|
|
})
|
|
} else {
|
|
oq.overflowed.Store(true)
|
|
}
|
|
oq.pendingMutex.Unlock()
|
|
|
|
if !oq.backingOff.Load() {
|
|
oq.wakeQueueAndNotify()
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendEDU adds the EDU event to the pending queue for the destination.
|
|
// If the queue is empty then it starts a background goroutine to
|
|
// start sending events to that destination.
|
|
func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, dbReceipt *receipt.Receipt) {
|
|
if event == nil {
|
|
logrus.Errorf("attempt to send nil EDU with destination %q", oq.destination)
|
|
return
|
|
}
|
|
|
|
// Check if the destination is blacklisted. If it isn't then wake
|
|
// up the queue.
|
|
if !oq.statistics.Blacklisted() {
|
|
// If there's room in memory to hold the event then add it to the
|
|
// list.
|
|
oq.pendingMutex.Lock()
|
|
if len(oq.pendingEDUs) < maxEDUsInMemory {
|
|
oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{
|
|
edu: event,
|
|
dbReceipt: dbReceipt,
|
|
})
|
|
} else {
|
|
oq.overflowed.Store(true)
|
|
}
|
|
oq.pendingMutex.Unlock()
|
|
|
|
if !oq.backingOff.Load() {
|
|
oq.wakeQueueAndNotify()
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleBackoffNotifier is registered as the backoff notification
|
|
// callback with Statistics. It will wakeup and notify the queue
|
|
// if the queue is currently backing off.
|
|
func (oq *destinationQueue) handleBackoffNotifier() {
|
|
// Only wake up the queue if it is backing off.
|
|
// Otherwise there is no pending work for the queue to handle
|
|
// so waking the queue would be a waste of resources.
|
|
if oq.backingOff.Load() {
|
|
oq.wakeQueueAndNotify()
|
|
}
|
|
}
|
|
|
|
// wakeQueueIfEventsPending calls wakeQueueAndNotify only if there are
|
|
// pending events or if forceWakeup is true. This prevents starting the
|
|
// queue unnecessarily.
|
|
func (oq *destinationQueue) wakeQueueIfEventsPending(forceWakeup bool) {
|
|
eventsPending := func() bool {
|
|
oq.pendingMutex.Lock()
|
|
defer oq.pendingMutex.Unlock()
|
|
return len(oq.pendingPDUs) > 0 || len(oq.pendingEDUs) > 0
|
|
}
|
|
|
|
// NOTE : Only wakeup and notify the queue if there are pending events
|
|
// or if forceWakeup is true. Otherwise there is no reason to start the
|
|
// queue goroutine and waste resources.
|
|
if forceWakeup || eventsPending() {
|
|
logrus.Info("Starting queue due to pending events or forceWakeup")
|
|
oq.wakeQueueAndNotify()
|
|
}
|
|
}
|
|
|
|
// wakeQueueAndNotify ensures the destination queue is running and notifies it
|
|
// that there is pending work.
|
|
func (oq *destinationQueue) wakeQueueAndNotify() {
|
|
// NOTE : Send notification before waking queue to prevent a race
|
|
// where the queue was running and stops due to a timeout in between
|
|
// checking it and sending the notification.
|
|
|
|
// Notify the queue that there are events ready to send.
|
|
select {
|
|
case oq.notify <- struct{}{}:
|
|
default:
|
|
}
|
|
|
|
// Wake up the queue if it's asleep.
|
|
oq.wakeQueueIfNeeded()
|
|
}
|
|
|
|
// wakeQueueIfNeeded will wake up the destination queue if it is
|
|
// not already running.
|
|
func (oq *destinationQueue) wakeQueueIfNeeded() {
|
|
// Clear the backingOff flag and update the backoff metrics if it was set.
|
|
if oq.backingOff.CompareAndSwap(true, false) {
|
|
destinationQueueBackingOff.Dec()
|
|
}
|
|
|
|
// If we aren't running then wake up the queue.
|
|
if !oq.running.Load() {
|
|
// Start the queue.
|
|
go oq.backgroundSend()
|
|
}
|
|
}
|
|
|
|
// getPendingFromDatabase will look at the database and see if
|
|
// there are any persisted events that haven't been sent to this
|
|
// destination yet. If so, they will be queued up.
|
|
func (oq *destinationQueue) getPendingFromDatabase() {
|
|
// Check to see if there's anything to do for this server
|
|
// in the database.
|
|
retrieved := false
|
|
ctx := oq.process.Context()
|
|
oq.pendingMutex.Lock()
|
|
defer oq.pendingMutex.Unlock()
|
|
|
|
// Take a note of all of the PDUs and EDUs that we already
|
|
// have cached. We will index them based on the receipt,
|
|
// which ultimately just contains the index of the PDU/EDU
|
|
// in the database.
|
|
gotPDUs := map[string]struct{}{}
|
|
gotEDUs := map[string]struct{}{}
|
|
for _, pdu := range oq.pendingPDUs {
|
|
gotPDUs[pdu.dbReceipt.String()] = struct{}{}
|
|
}
|
|
for _, edu := range oq.pendingEDUs {
|
|
gotEDUs[edu.dbReceipt.String()] = struct{}{}
|
|
}
|
|
|
|
overflowed := false
|
|
if pduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); pduCapacity > 0 {
|
|
// We have room in memory for some PDUs - let's request no more than that.
|
|
if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, maxPDUsInMemory); err == nil {
|
|
if len(pdus) == maxPDUsInMemory {
|
|
overflowed = true
|
|
}
|
|
for receipt, pdu := range pdus {
|
|
if _, ok := gotPDUs[receipt.String()]; ok {
|
|
continue
|
|
}
|
|
oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{receipt, pdu})
|
|
retrieved = true
|
|
if len(oq.pendingPDUs) == maxPDUsInMemory {
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
logrus.WithError(err).Errorf("Failed to get pending PDUs for %q", oq.destination)
|
|
}
|
|
}
|
|
|
|
if eduCapacity := maxEDUsInMemory - len(oq.pendingEDUs); eduCapacity > 0 {
|
|
// We have room in memory for some EDUs - let's request no more than that.
|
|
if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, maxEDUsInMemory); err == nil {
|
|
if len(edus) == maxEDUsInMemory {
|
|
overflowed = true
|
|
}
|
|
for receipt, edu := range edus {
|
|
if _, ok := gotEDUs[receipt.String()]; ok {
|
|
continue
|
|
}
|
|
oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{receipt, edu})
|
|
retrieved = true
|
|
if len(oq.pendingEDUs) == maxEDUsInMemory {
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
logrus.WithError(err).Errorf("Failed to get pending EDUs for %q", oq.destination)
|
|
}
|
|
}
|
|
|
|
// If we've retrieved all of the events from the database with room to spare
|
|
// in memory then we'll no longer consider this queue to be overflowed.
|
|
if !overflowed {
|
|
oq.overflowed.Store(false)
|
|
} else {
|
|
}
|
|
// If we've retrieved some events then notify the destination queue goroutine.
|
|
if retrieved {
|
|
select {
|
|
case oq.notify <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// checkNotificationsOnClose checks for any remaining notifications
|
|
// and starts a new backgroundSend goroutine if any exist.
|
|
func (oq *destinationQueue) checkNotificationsOnClose() {
|
|
// NOTE : If we are stopping the queue due to blacklist then it
|
|
// doesn't matter if we have been notified of new work since
|
|
// this queue instance will be deleted anyway.
|
|
if !oq.statistics.Blacklisted() {
|
|
select {
|
|
case <-oq.notify:
|
|
// We received a new notification in between the
|
|
// idle timeout firing and stopping the goroutine.
|
|
// Immediately restart the queue.
|
|
oq.wakeQueueAndNotify()
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// backgroundSend is the worker goroutine for sending events.
|
|
func (oq *destinationQueue) backgroundSend() {
|
|
// Check if a worker is already running, and if it isn't, then
|
|
// mark it as started.
|
|
if !oq.running.CompareAndSwap(false, true) {
|
|
return
|
|
}
|
|
|
|
// Register queue cleanup functions.
|
|
// NOTE : The ordering here is very intentional.
|
|
defer oq.checkNotificationsOnClose()
|
|
defer oq.running.Store(false)
|
|
|
|
destinationQueueRunning.Inc()
|
|
defer destinationQueueRunning.Dec()
|
|
|
|
idleTimeout := time.NewTimer(queueIdleTimeout)
|
|
defer idleTimeout.Stop()
|
|
|
|
// Mark the queue as overflowed, so we will consult the database
|
|
// to see if there's anything new to send.
|
|
oq.overflowed.Store(true)
|
|
|
|
for {
|
|
// If we are overflowing memory and have sent things out to the
|
|
// database then we can look up what those things are.
|
|
if oq.overflowed.Load() {
|
|
oq.getPendingFromDatabase()
|
|
}
|
|
|
|
// Reset the queue idle timeout.
|
|
if !idleTimeout.Stop() {
|
|
select {
|
|
case <-idleTimeout.C:
|
|
default:
|
|
}
|
|
}
|
|
idleTimeout.Reset(queueIdleTimeout)
|
|
|
|
// If we have nothing to do then wait either for incoming events, or
|
|
// until we hit an idle timeout.
|
|
select {
|
|
case <-oq.notify:
|
|
// There's work to do, either because getPendingFromDatabase
|
|
// told us there is, a new event has come in via sendEvent/sendEDU,
|
|
// or we are backing off and it is time to retry.
|
|
case <-idleTimeout.C:
|
|
// The worker is idle so stop the goroutine. It'll get
|
|
// restarted automatically the next time we have an event to
|
|
// send.
|
|
return
|
|
case <-oq.process.Context().Done():
|
|
// The parent process is shutting down, so stop.
|
|
oq.statistics.ClearBackoff()
|
|
return
|
|
}
|
|
|
|
// Work out which PDUs/EDUs to include in the next transaction.
|
|
oq.pendingMutex.RLock()
|
|
pduCount := len(oq.pendingPDUs)
|
|
eduCount := len(oq.pendingEDUs)
|
|
if pduCount > maxPDUsPerTransaction {
|
|
pduCount = maxPDUsPerTransaction
|
|
}
|
|
if eduCount > maxEDUsPerTransaction {
|
|
eduCount = maxEDUsPerTransaction
|
|
}
|
|
toSendPDUs := oq.pendingPDUs[:pduCount]
|
|
toSendEDUs := oq.pendingEDUs[:eduCount]
|
|
oq.pendingMutex.RUnlock()
|
|
|
|
// If we didn't get anything from the database and there are no
|
|
// pending EDUs then there's nothing to do - stop here.
|
|
if pduCount == 0 && eduCount == 0 {
|
|
continue
|
|
}
|
|
|
|
// If we have pending PDUs or EDUs then construct a transaction.
|
|
// Try sending the next transaction and see what happens.
|
|
terr, sendMethod := oq.nextTransaction(toSendPDUs, toSendEDUs)
|
|
if terr != nil {
|
|
// We failed to send the transaction. Mark it as a failure.
|
|
_, blacklisted := oq.statistics.Failure()
|
|
if !blacklisted {
|
|
// Register the backoff state and exit the goroutine.
|
|
// It'll get restarted automatically when the backoff
|
|
// completes.
|
|
oq.backingOff.Store(true)
|
|
destinationQueueBackingOff.Inc()
|
|
return
|
|
} else {
|
|
// Immediately trigger the blacklist logic.
|
|
oq.blacklistDestination()
|
|
return
|
|
}
|
|
} else {
|
|
oq.handleTransactionSuccess(pduCount, eduCount, sendMethod)
|
|
}
|
|
}
|
|
}
|
|
|
|
// nextTransaction creates a new transaction from the pending event
|
|
// queue and sends it.
|
|
// Returns an error if the transaction wasn't sent. And whether the success
|
|
// was to a relay server or not.
|
|
func (oq *destinationQueue) nextTransaction(
|
|
pdus []*queuedPDU,
|
|
edus []*queuedEDU,
|
|
) (err error, sendMethod statistics.SendMethod) {
|
|
// Create the transaction.
|
|
t, pduReceipts, eduReceipts := oq.createTransaction(pdus, edus)
|
|
logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
|
|
|
|
// Try to send the transaction to the destination server.
|
|
ctx, cancel := context.WithTimeout(oq.process.Context(), time.Minute*5)
|
|
defer cancel()
|
|
|
|
relayServers := oq.statistics.KnownRelayServers()
|
|
hasRelayServers := len(relayServers) > 0
|
|
shouldSendToRelays := oq.statistics.AssumedOffline() && hasRelayServers
|
|
if !shouldSendToRelays {
|
|
sendMethod = statistics.SendDirect
|
|
_, err = oq.client.SendTransaction(ctx, t)
|
|
} else {
|
|
// Try sending directly to the destination first in case they came back online.
|
|
sendMethod = statistics.SendDirect
|
|
_, err = oq.client.SendTransaction(ctx, t)
|
|
if err != nil {
|
|
// The destination is still offline, try sending to relays.
|
|
sendMethod = statistics.SendViaRelay
|
|
relaySuccess := false
|
|
logrus.Infof("Sending %q to relay servers: %v", t.TransactionID, relayServers)
|
|
// TODO : how to pass through actual userID here?!?!?!?!
|
|
userID, userErr := spec.NewUserID("@user:"+string(oq.destination), false)
|
|
if userErr != nil {
|
|
return userErr, sendMethod
|
|
}
|
|
|
|
// Attempt sending to each known relay server.
|
|
for _, relayServer := range relayServers {
|
|
_, relayErr := oq.client.P2PSendTransactionToRelay(ctx, *userID, t, relayServer)
|
|
if relayErr != nil {
|
|
err = relayErr
|
|
} else {
|
|
// If sending to one of the relay servers succeeds, consider the send successful.
|
|
relaySuccess = true
|
|
|
|
// TODO : what about if the dest comes back online but can't see their relay?
|
|
// How do I sync with the dest in that case?
|
|
// Should change the database to have a "relay success" flag on events and if
|
|
// I see the node back online, maybe directly send through the backlog of events
|
|
// with "relay success"... could lead to duplicate events, but only those that
|
|
// I sent. And will lead to a much more consistent experience.
|
|
}
|
|
}
|
|
|
|
// Clear the error if sending to any of the relay servers succeeded.
|
|
if relaySuccess {
|
|
err = nil
|
|
}
|
|
}
|
|
}
|
|
switch errResponse := err.(type) {
|
|
case nil:
|
|
// Clean up the transaction in the database.
|
|
if pduReceipts != nil {
|
|
//logrus.Infof("Cleaning PDUs %q", pduReceipt.String())
|
|
if err = oq.db.CleanPDUs(oq.process.Context(), oq.destination, pduReceipts); err != nil {
|
|
logrus.WithError(err).Errorf("Failed to clean PDUs for server %q", t.Destination)
|
|
}
|
|
}
|
|
if eduReceipts != nil {
|
|
//logrus.Infof("Cleaning EDUs %q", eduReceipt.String())
|
|
if err = oq.db.CleanEDUs(oq.process.Context(), oq.destination, eduReceipts); err != nil {
|
|
logrus.WithError(err).Errorf("Failed to clean EDUs for server %q", t.Destination)
|
|
}
|
|
}
|
|
// Reset the transaction ID.
|
|
oq.transactionIDMutex.Lock()
|
|
oq.transactionID = ""
|
|
oq.transactionIDMutex.Unlock()
|
|
return nil, sendMethod
|
|
case gomatrix.HTTPError:
|
|
// Report that we failed to send the transaction and we
|
|
// will retry again, subject to backoff.
|
|
|
|
// TODO: we should check for 500-ish fails vs 400-ish here,
|
|
// since we shouldn't queue things indefinitely in response
|
|
// to a 400-ish error
|
|
code := errResponse.Code
|
|
logrus.Debug("Transaction failed with HTTP", code)
|
|
return err, sendMethod
|
|
default:
|
|
logrus.WithFields(logrus.Fields{
|
|
"destination": oq.destination,
|
|
logrus.ErrorKey: err,
|
|
}).Debugf("Failed to send transaction %q", t.TransactionID)
|
|
return err, sendMethod
|
|
}
|
|
}
|
|
|
|
// createTransaction generates a gomatrixserverlib.Transaction from the provided pdus and edus.
|
|
// It also returns the associated event receipts so they can be cleaned from the database in
|
|
// the case of a successful transaction.
|
|
func (oq *destinationQueue) createTransaction(
|
|
pdus []*queuedPDU,
|
|
edus []*queuedEDU,
|
|
) (gomatrixserverlib.Transaction, []*receipt.Receipt, []*receipt.Receipt) {
|
|
// If there's no projected transaction ID then generate one. If
|
|
// the transaction succeeds then we'll set it back to "" so that
|
|
// we generate a new one next time. If it fails, we'll preserve
|
|
// it so that we retry with the same transaction ID.
|
|
oq.transactionIDMutex.Lock()
|
|
if oq.transactionID == "" {
|
|
now := spec.AsTimestamp(time.Now())
|
|
oq.transactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
|
|
}
|
|
oq.transactionIDMutex.Unlock()
|
|
|
|
t := gomatrixserverlib.Transaction{
|
|
PDUs: []json.RawMessage{},
|
|
EDUs: []gomatrixserverlib.EDU{},
|
|
}
|
|
t.Origin = oq.origin
|
|
t.Destination = oq.destination
|
|
t.OriginServerTS = spec.AsTimestamp(time.Now())
|
|
t.TransactionID = oq.transactionID
|
|
|
|
var pduReceipts []*receipt.Receipt
|
|
var eduReceipts []*receipt.Receipt
|
|
|
|
// Go through PDUs that we retrieved from the database, if any,
|
|
// and add them into the transaction.
|
|
for _, pdu := range pdus {
|
|
// These should never be nil.
|
|
if pdu == nil || pdu.pdu == nil {
|
|
continue
|
|
}
|
|
// Append the JSON of the event, since this is a json.RawMessage type in the
|
|
// gomatrixserverlib.Transaction struct
|
|
t.PDUs = append(t.PDUs, pdu.pdu.JSON())
|
|
pduReceipts = append(pduReceipts, pdu.dbReceipt)
|
|
}
|
|
|
|
// Do the same for pending EDUS in the queue.
|
|
for _, edu := range edus {
|
|
// These should never be nil.
|
|
if edu == nil || edu.edu == nil {
|
|
continue
|
|
}
|
|
t.EDUs = append(t.EDUs, *edu.edu)
|
|
eduReceipts = append(eduReceipts, edu.dbReceipt)
|
|
}
|
|
|
|
return t, pduReceipts, eduReceipts
|
|
}
|
|
|
|
// blacklistDestination removes all pending PDUs and EDUs that have been cached
|
|
// and deletes this queue.
|
|
func (oq *destinationQueue) blacklistDestination() {
|
|
// It's been suggested that we should give up because the backoff
|
|
// has exceeded a maximum allowable value. Clean up the in-memory
|
|
// buffers at this point. The PDU clean-up is already on a defer.
|
|
logrus.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination)
|
|
|
|
oq.pendingMutex.Lock()
|
|
for i := range oq.pendingPDUs {
|
|
oq.pendingPDUs[i] = nil
|
|
}
|
|
for i := range oq.pendingEDUs {
|
|
oq.pendingEDUs[i] = nil
|
|
}
|
|
oq.pendingPDUs = nil
|
|
oq.pendingEDUs = nil
|
|
oq.pendingMutex.Unlock()
|
|
|
|
// Delete this queue as no more messages will be sent to this
|
|
// destination until it is no longer blacklisted.
|
|
oq.statistics.AssignBackoffNotifier(nil)
|
|
oq.queues.clearQueue(oq)
|
|
}
|
|
|
|
// handleTransactionSuccess updates the cached event queues as well as the success and
|
|
// backoff information for this server.
|
|
func (oq *destinationQueue) handleTransactionSuccess(pduCount int, eduCount int, sendMethod statistics.SendMethod) {
|
|
// If we successfully sent the transaction then clear out
|
|
// the pending events and EDUs, and wipe our transaction ID.
|
|
|
|
oq.statistics.Success(sendMethod)
|
|
oq.pendingMutex.Lock()
|
|
defer oq.pendingMutex.Unlock()
|
|
|
|
for i := range oq.pendingPDUs[:pduCount] {
|
|
oq.pendingPDUs[i] = nil
|
|
}
|
|
for i := range oq.pendingEDUs[:eduCount] {
|
|
oq.pendingEDUs[i] = nil
|
|
}
|
|
oq.pendingPDUs = oq.pendingPDUs[pduCount:]
|
|
oq.pendingEDUs = oq.pendingEDUs[eduCount:]
|
|
|
|
if len(oq.pendingPDUs) > 0 || len(oq.pendingEDUs) > 0 {
|
|
select {
|
|
case oq.notify <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|