Review comments

This commit is contained in:
Neil Alexander 2020-07-01 09:48:48 +01:00
parent b41308bd51
commit eaa7a679ac
3 changed files with 17 additions and 29 deletions

View file

@ -205,10 +205,9 @@ func (oq *destinationQueue) backgroundSend() {
// There are new PDUs waiting in the database. // There are new PDUs waiting in the database.
hydrate() hydrate()
case edu := <-oq.incomingEDUs: case edu := <-oq.incomingEDUs:
// Likewise for EDUs, although we should probably not try // EDUs are handled in-memory for now. We will try to keep
// too hard with some EDUs (like typing notifications) after // the ordering intact.
// a certain amount of time has passed. // TODO: Certain EDU types need persistence, e.g. send-to-device
// TODO: think about EDU expiry some more
oq.pendingEDUs = append(oq.pendingEDUs, edu) oq.pendingEDUs = append(oq.pendingEDUs, edu)
// If there are any more things waiting in the channel queue // If there are any more things waiting in the channel queue
// then read them. This is safe because we guarantee only // then read them. This is safe because we guarantee only
@ -322,41 +321,29 @@ func (oq *destinationQueue) backgroundSend() {
// cleanPendingPDUs cleans out the pending PDU buffer, removing // cleanPendingPDUs cleans out the pending PDU buffer, removing
// all references so that the underlying objects can be GC'd. // all references so that the underlying objects can be GC'd.
func (oq *destinationQueue) cleanPendingPDUs() { func (oq *destinationQueue) cleanPendingPDUs() {
numPDUs := len(oq.pendingPDUs) for i := 0; i < len(oq.pendingPDUs); i++ {
for i := 0; i < numPDUs; i++ {
oq.pendingPDUs[i] = nil oq.pendingPDUs[i] = nil
} }
oq.pendingPDUs = append( oq.pendingPDUs = []*gomatrixserverlib.HeaderedEvent{}
[]*gomatrixserverlib.HeaderedEvent{},
oq.pendingPDUs[numPDUs:]...,
)
} }
// cleanPendingEDUs cleans out the pending EDU buffer, removing // cleanPendingEDUs cleans out the pending EDU buffer, removing
// all references so that the underlying objects can be GC'd. // all references so that the underlying objects can be GC'd.
func (oq *destinationQueue) cleanPendingEDUs() { func (oq *destinationQueue) cleanPendingEDUs() {
numEDUs := len(oq.pendingEDUs) for i := 0; i < len(oq.pendingEDUs); i++ {
for i := 0; i < numEDUs; i++ {
oq.pendingEDUs[i] = nil oq.pendingEDUs[i] = nil
} }
oq.pendingEDUs = append( oq.pendingEDUs = []*gomatrixserverlib.EDU{}
[]*gomatrixserverlib.EDU{},
oq.pendingEDUs[numEDUs:]...,
)
} }
// cleanPendingInvites cleans out the pending invite buffer, // cleanPendingInvites cleans out the pending invite buffer,
// removing all references so that the underlying objects can // removing all references so that the underlying objects can
// be GC'd. // be GC'd.
func (oq *destinationQueue) cleanPendingInvites() { func (oq *destinationQueue) cleanPendingInvites() {
numInvites := len(oq.pendingInvites) for i := 0; i < len(oq.pendingInvites); i++ {
for i := 0; i < numInvites; i++ {
oq.pendingInvites[i] = nil oq.pendingInvites[i] = nil
} }
oq.pendingInvites = append( oq.pendingInvites = []*gomatrixserverlib.InviteV2Request{}
[]*gomatrixserverlib.InviteV2Request{},
oq.pendingInvites[numInvites:]...,
)
} }
// nextTransaction creates a new transaction from the pending event // nextTransaction creates a new transaction from the pending event

View file

@ -1,5 +1,4 @@
// Copyright 2017-2018 New Vector Ltd // Copyright 2020 The Matrix.org Foundation C.I.C.
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -25,7 +24,7 @@ import (
) )
const queueJSONSchema = ` const queueJSONSchema = `
-- The queue_retry_json table contains event contents that -- The federationsender_queue_json table contains event contents that
-- we failed to send. -- we failed to send.
CREATE TABLE IF NOT EXISTS federationsender_queue_json ( CREATE TABLE IF NOT EXISTS federationsender_queue_json (
-- The JSON NID. This allows the federationsender_queue_retry table to -- The JSON NID. This allows the federationsender_queue_retry table to

View file

@ -1,5 +1,4 @@
// Copyright 2017-2018 New Vector Ltd // Copyright 2020 The Matrix.org Foundation C.I.C.
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -28,7 +27,7 @@ const queuePDUsSchema = `
CREATE TABLE IF NOT EXISTS federationsender_queue_pdus ( CREATE TABLE IF NOT EXISTS federationsender_queue_pdus (
-- The transaction ID that was generated before persisting the event. -- The transaction ID that was generated before persisting the event.
transaction_id TEXT NOT NULL, transaction_id TEXT NOT NULL,
-- The domain part of the user ID the m.room.member event is for. -- The destination server that we will send the event to.
server_name TEXT NOT NULL, server_name TEXT NOT NULL,
-- The JSON NID from the federationsender_queue_pdus_json table. -- The JSON NID from the federationsender_queue_pdus_json table.
json_nid BIGINT NOT NULL json_nid BIGINT NOT NULL
@ -137,7 +136,10 @@ func (s *queuePDUsStatements) selectQueueReferenceJSONCount(
stmt := sqlutil.TxStmt(txn, s.selectQueueReferenceJSONCountStmt) stmt := sqlutil.TxStmt(txn, s.selectQueueReferenceJSONCountStmt)
err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count) err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
return -1, nil // It's acceptable for there to be no rows referencing a given
// JSON NID but it's not an error condition. Just return as if
// there's a zero count.
return 0, nil
} }
return count, err return count, err
} }