diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 6a00abb4a..dd8243845 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -205,10 +205,9 @@ func (oq *destinationQueue) backgroundSend() { // There are new PDUs waiting in the database. hydrate() case edu := <-oq.incomingEDUs: - // Likewise for EDUs, although we should probably not try - // too hard with some EDUs (like typing notifications) after - // a certain amount of time has passed. - // TODO: think about EDU expiry some more + // EDUs are handled in-memory for now. We will try to keep + // the ordering intact. + // TODO: Certain EDU types need persistence, e.g. send-to-device oq.pendingEDUs = append(oq.pendingEDUs, edu) // If there are any more things waiting in the channel queue // then read them. This is safe because we guarantee only @@ -322,41 +321,29 @@ func (oq *destinationQueue) backgroundSend() { // cleanPendingPDUs cleans out the pending PDU buffer, removing // all references so that the underlying objects can be GC'd. func (oq *destinationQueue) cleanPendingPDUs() { - numPDUs := len(oq.pendingPDUs) - for i := 0; i < numPDUs; i++ { + for i := 0; i < len(oq.pendingPDUs); i++ { oq.pendingPDUs[i] = nil } - oq.pendingPDUs = append( - []*gomatrixserverlib.HeaderedEvent{}, - oq.pendingPDUs[numPDUs:]..., - ) + oq.pendingPDUs = []*gomatrixserverlib.HeaderedEvent{} } // cleanPendingEDUs cleans out the pending EDU buffer, removing // all references so that the underlying objects can be GC'd. func (oq *destinationQueue) cleanPendingEDUs() { - numEDUs := len(oq.pendingEDUs) - for i := 0; i < numEDUs; i++ { + for i := 0; i < len(oq.pendingEDUs); i++ { oq.pendingEDUs[i] = nil } - oq.pendingEDUs = append( - []*gomatrixserverlib.EDU{}, - oq.pendingEDUs[numEDUs:]..., - ) + oq.pendingEDUs = []*gomatrixserverlib.EDU{} } // cleanPendingInvites cleans out the pending invite buffer, // removing all references so that the underlying objects can // be GC'd. func (oq *destinationQueue) cleanPendingInvites() { - numInvites := len(oq.pendingInvites) - for i := 0; i < numInvites; i++ { + for i := 0; i < len(oq.pendingInvites); i++ { oq.pendingInvites[i] = nil } - oq.pendingInvites = append( - []*gomatrixserverlib.InviteV2Request{}, - oq.pendingInvites[numInvites:]..., - ) + oq.pendingInvites = []*gomatrixserverlib.InviteV2Request{} } // nextTransaction creates a new transaction from the pending event diff --git a/federationsender/storage/postgres/queue_json_table.go b/federationsender/storage/postgres/queue_json_table.go index c2fa46455..eac2ea988 100644 --- a/federationsender/storage/postgres/queue_json_table.go +++ b/federationsender/storage/postgres/queue_json_table.go @@ -1,5 +1,4 @@ -// Copyright 2017-2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// Copyright 2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -25,7 +24,7 @@ import ( ) const queueJSONSchema = ` --- The queue_retry_json table contains event contents that +-- The federationsender_queue_json table contains event contents that -- we failed to send. CREATE TABLE IF NOT EXISTS federationsender_queue_json ( -- The JSON NID. This allows the federationsender_queue_retry table to diff --git a/federationsender/storage/postgres/queue_pdus_table.go b/federationsender/storage/postgres/queue_pdus_table.go index 13b3b49c2..ef7a9f41e 100644 --- a/federationsender/storage/postgres/queue_pdus_table.go +++ b/federationsender/storage/postgres/queue_pdus_table.go @@ -1,5 +1,4 @@ -// Copyright 2017-2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// Copyright 2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -28,7 +27,7 @@ const queuePDUsSchema = ` CREATE TABLE IF NOT EXISTS federationsender_queue_pdus ( -- The transaction ID that was generated before persisting the event. transaction_id TEXT NOT NULL, - -- The domain part of the user ID the m.room.member event is for. + -- The destination server that we will send the event to. server_name TEXT NOT NULL, -- The JSON NID from the federationsender_queue_pdus_json table. json_nid BIGINT NOT NULL @@ -137,7 +136,10 @@ func (s *queuePDUsStatements) selectQueueReferenceJSONCount( stmt := sqlutil.TxStmt(txn, s.selectQueueReferenceJSONCountStmt) err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count) if err == sql.ErrNoRows { - return -1, nil + // It's acceptable for there to be no rows referencing a given + // JSON NID but it's not an error condition. Just return as if + // there's a zero count. + return 0, nil } return count, err }