nil PDUs/EDUs shouldn't happen but guard against them for safety

This commit is contained in:
Neil Alexander 2020-12-07 09:29:59 +00:00
parent 7faef15339
commit 6029c79dfa
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 65 additions and 63 deletions

View file

@ -69,6 +69,10 @@ type destinationQueue struct {
// If the queue is empty then it starts a background goroutine to // If the queue is empty then it starts a background goroutine to
// start sending events to that destination. // start sending events to that destination.
func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, receipt *shared.Receipt) { func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, receipt *shared.Receipt) {
if event == nil {
log.Errorf("attempt to send nil PDU with destination %q", oq.destination)
return
}
// Create a transaction ID. We'll either do this if we don't have // Create a transaction ID. We'll either do this if we don't have
// one made up yet, or if we've exceeded the number of maximum // one made up yet, or if we've exceeded the number of maximum
// events allowed in a single tranaction. We'll reset the counter // events allowed in a single tranaction. We'll reset the counter
@ -116,6 +120,10 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
// If the queue is empty then it starts a background goroutine to // If the queue is empty then it starts a background goroutine to
// start sending events to that destination. // start sending events to that destination.
func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *shared.Receipt) { func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *shared.Receipt) {
if event == nil {
log.Errorf("attempt to send nil EDU with destination %q", oq.destination)
return
}
// Create a database entry that associates the given PDU NID with // Create a database entry that associates the given PDU NID with
// this destination queue. We'll then be able to retrieve the PDU // this destination queue. We'll then be able to retrieve the PDU
// later. // later.
@ -370,6 +378,9 @@ func (oq *destinationQueue) nextTransaction(
// Go through PDUs that we retrieved from the database, if any, // Go through PDUs that we retrieved from the database, if any,
// and add them into the transaction. // and add them into the transaction.
for _, pdu := range pdus { for _, pdu := range pdus {
if pdu.pdu == nil {
continue
}
// Append the JSON of the event, since this is a json.RawMessage type in the // Append the JSON of the event, since this is a json.RawMessage type in the
// gomatrixserverlib.Transaction struct // gomatrixserverlib.Transaction struct
t.PDUs = append(t.PDUs, pdu.pdu.JSON()) t.PDUs = append(t.PDUs, pdu.pdu.JSON())
@ -378,6 +389,9 @@ func (oq *destinationQueue) nextTransaction(
// Do the same for pending EDUS in the queue. // Do the same for pending EDUS in the queue.
for _, edu := range edus { for _, edu := range edus {
if edu.edu == nil {
continue
}
t.EDUs = append(t.EDUs, *edu.edu) t.EDUs = append(t.EDUs, *edu.edu)
eduReceipts = append(pduReceipts, edu.receipt) eduReceipts = append(pduReceipts, edu.receipt)
} }

View file

@ -52,42 +52,36 @@ func (d *Database) GetPendingEDUs(
ctx context.Context, ctx context.Context,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
limit int, limit int,
) ( ) (map[*Receipt]*gomatrixserverlib.EDU, error) {
edus map[*Receipt]*gomatrixserverlib.EDU, edus := make(map[*Receipt]*gomatrixserverlib.EDU)
err error, nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, nil, serverName, limit)
) { if err != nil {
edus = make(map[*Receipt]*gomatrixserverlib.EDU) return nil, fmt.Errorf("SelectQueueEDUs: %w", err)
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { }
nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit)
if err != nil {
return fmt.Errorf("SelectQueueEDUs: %w", err)
}
retrieve := make([]int64, 0, len(nids)) retrieve := make([]int64, 0, len(nids))
for _, nid := range nids { for _, nid := range nids {
if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok { if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok {
edus[&Receipt{nid}] = edu edus[&Receipt{nid}] = edu
} else { } else {
retrieve = append(retrieve, nid) retrieve = append(retrieve, nid)
}
} }
}
blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve) blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, nil, retrieve)
if err != nil { if err != nil {
return fmt.Errorf("SelectQueueJSON: %w", err) return nil, fmt.Errorf("SelectQueueJSON: %w", err)
}
for nid, blob := range blobs {
var event gomatrixserverlib.EDU
if err := json.Unmarshal(blob, &event); err != nil {
return nil, fmt.Errorf("json.Unmarshal: %w", err)
} }
edus[&Receipt{nid}] = &event
}
for nid, blob := range blobs { return edus, nil
var event gomatrixserverlib.EDU
if err := json.Unmarshal(blob, &event); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err)
}
edus[&Receipt{nid}] = &event
}
return nil
})
return
} }
// CleanEDUs cleans up all specified EDUs. This is done when a // CleanEDUs cleans up all specified EDUs. This is done when a

View file

@ -53,48 +53,42 @@ func (d *Database) GetPendingPDUs(
ctx context.Context, ctx context.Context,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
limit int, limit int,
) ( ) (map[*Receipt]*gomatrixserverlib.HeaderedEvent, error) {
events map[*Receipt]*gomatrixserverlib.HeaderedEvent,
err error,
) {
// Strictly speaking this doesn't need to be using the writer // Strictly speaking this doesn't need to be using the writer
// since we are only performing selects, but since we don't have // since we are only performing selects, but since we don't have
// a guarantee of transactional isolation, it's actually useful // a guarantee of transactional isolation, it's actually useful
// to know in SQLite mode that nothing else is trying to modify // to know in SQLite mode that nothing else is trying to modify
// the database. // the database.
events = make(map[*Receipt]*gomatrixserverlib.HeaderedEvent) events := make(map[*Receipt]*gomatrixserverlib.HeaderedEvent)
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, nil, serverName, limit)
nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, limit) if err != nil {
if err != nil { return nil, fmt.Errorf("SelectQueuePDUs: %w", err)
return fmt.Errorf("SelectQueuePDUs: %w", err) }
}
retrieve := make([]int64, 0, len(nids)) retrieve := make([]int64, 0, len(nids))
for _, nid := range nids { for _, nid := range nids {
if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok { if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok {
events[&Receipt{nid}] = event events[&Receipt{nid}] = event
} else { } else {
retrieve = append(retrieve, nid) retrieve = append(retrieve, nid)
}
} }
}
blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve) blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, nil, retrieve)
if err != nil { if err != nil {
return fmt.Errorf("SelectQueueJSON: %w", err) return nil, fmt.Errorf("SelectQueueJSON: %w", err)
}
for nid, blob := range blobs {
var event gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(blob, &event); err != nil {
return nil, fmt.Errorf("json.Unmarshal: %w", err)
} }
events[&Receipt{nid}] = &event
d.Cache.StoreFederationSenderQueuedPDU(nid, &event)
}
for nid, blob := range blobs { return events, nil
var event gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(blob, &event); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err)
}
events[&Receipt{nid}] = &event
d.Cache.StoreFederationSenderQueuedPDU(nid, &event)
}
return nil
})
return
} }
// CleanTransactionPDUs cleans up all associated events for a // CleanTransactionPDUs cleans up all associated events for a