Review comments

This commit is contained in:
Neil Alexander 2020-07-03 11:26:35 +01:00
parent b300aaa0ed
commit 4267c6fd0a
2 changed files with 7 additions and 3 deletions

View file

@ -251,8 +251,10 @@ func (oq *destinationQueue) backgroundSend() {
oq.cleanPendingInvites() oq.cleanPendingInvites()
return return
} else { } else {
// We haven't been told to give up terminally yet // We haven't been told to give up terminally yet but we still have
// but we still need to wake up the next iteration. // PDUs waiting to be sent. By sending a message into the wake chan,
// the next loop iteration will try processing these PDUs again,
// subject to the backoff.
oq.wakeServerCh <- true oq.wakeServerCh <- true
} }
} else if transaction { } else if transaction {

View file

@ -61,10 +61,12 @@ func NewOutgoingQueues(
queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, queues: map[gomatrixserverlib.ServerName]*destinationQueue{},
} }
// Look up which servers we have pending items for and then rehydrate those queues. // Look up which servers we have pending items for and then rehydrate those queues.
if serverNames, err := db.GetPendingServerNames(context.TODO()); err == nil { if serverNames, err := db.GetPendingServerNames(context.Background()); err == nil {
for _, serverName := range serverNames { for _, serverName := range serverNames {
queues.getQueue(serverName).wakeQueueIfNeeded() queues.getQueue(serverName).wakeQueueIfNeeded()
} }
} else {
log.WithError(err).Error("Failed to get server names for destination queue hydration")
} }
return queues return queues
} }