From 7c3991ee2f749108fb99a3a9cd1b35acc8c3b643 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 28 Jun 2021 15:11:36 +0100 Subject: [PATCH 01/11] Use a custom FIFO queue for the RS input API (#1888) * Use a FIFO queue instead of a channel to reduce backpressure * Make sure someone wakes up * Tweaks * Add comments --- roomserver/internal/input/input.go | 15 +++--- roomserver/internal/input/input_fifo.go | 64 +++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 6 deletions(-) create mode 100644 roomserver/internal/input/input_fifo.go diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 82ece2307..b8279a866 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -38,8 +38,7 @@ type Inputer struct { ServerName gomatrixserverlib.ServerName ACLs *acls.ServerACLs OutputRoomEventTopic string - - workers sync.Map // room ID -> *inputWorker + workers sync.Map // room ID -> *inputWorker } type inputTask struct { @@ -52,7 +51,7 @@ type inputTask struct { type inputWorker struct { r *Inputer running atomic.Bool - input chan *inputTask + input *fifoQueue } // Guarded by a CAS on w.running @@ -60,7 +59,11 @@ func (w *inputWorker) start() { defer w.running.Store(false) for { select { - case task := <-w.input: + case <-w.input.wait(): + task, ok := w.input.pop() + if !ok { + continue + } hooks.Run(hooks.KindNewEventReceived, task.event.Event) _, task.err = w.r.processRoomEvent(task.ctx, task.event) if task.err == nil { @@ -143,7 +146,7 @@ func (r *Inputer) InputRoomEvents( // room - the channel will be quite small as it's just pointer types. w, _ := r.workers.LoadOrStore(roomID, &inputWorker{ r: r, - input: make(chan *inputTask, 32), + input: newFIFOQueue(), }) worker := w.(*inputWorker) @@ -160,7 +163,7 @@ func (r *Inputer) InputRoomEvents( if worker.running.CAS(false, true) { go worker.start() } - worker.input <- tasks[i] + worker.input.push(tasks[i]) } // Wait for all of the workers to return results about our tasks. diff --git a/roomserver/internal/input/input_fifo.go b/roomserver/internal/input/input_fifo.go new file mode 100644 index 000000000..694b17245 --- /dev/null +++ b/roomserver/internal/input/input_fifo.go @@ -0,0 +1,64 @@ +package input + +import ( + "sync" +) + +type fifoQueue struct { + tasks []*inputTask + count int + mutex sync.Mutex + notifs chan struct{} +} + +func newFIFOQueue() *fifoQueue { + q := &fifoQueue{ + notifs: make(chan struct{}, 1), + } + return q +} + +func (q *fifoQueue) push(frame *inputTask) { + q.mutex.Lock() + defer q.mutex.Unlock() + q.tasks = append(q.tasks, frame) + q.count++ + select { + case q.notifs <- struct{}{}: + default: + } +} + +// pop returns the first item of the queue, if there is one. +// The second return value will indicate if a task was returned. +// You must check this value, even after calling wait(). +func (q *fifoQueue) pop() (*inputTask, bool) { + q.mutex.Lock() + defer q.mutex.Unlock() + if q.count == 0 { + return nil, false + } + frame := q.tasks[0] + q.tasks[0] = nil + q.tasks = q.tasks[1:] + q.count-- + if q.count == 0 { + // Force a GC of the underlying array, since it might have + // grown significantly if the queue was hammered for some reason + q.tasks = nil + } + return frame, true +} + +// wait returns a channel which can be used to detect when an +// item is waiting in the queue. +func (q *fifoQueue) wait() <-chan struct{} { + q.mutex.Lock() + defer q.mutex.Unlock() + if q.count > 0 && len(q.notifs) == 0 { + ch := make(chan struct{}) + close(ch) + return ch + } + return q.notifs +} From 4417f24678988b823a1dbf6ee118b08eef95c645 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 28 Jun 2021 15:11:59 +0100 Subject: [PATCH 02/11] Protect processEventWithMissingState with per-room mutex, to prevent mass CPU burn/RAM usage Squashed commit of the following: commit 7fad77c10e3c1c78feddb37351812b209d9c0f25 Author: Neil Alexander Date: Mon Jun 28 15:06:52 2021 +0100 Fix processEventWithMissingStateMutexes commit 138cddcac7b8373a8e1816a232f84a7bda6adcdf Author: Neil Alexander Date: Mon Jun 28 13:59:44 2021 +0100 Use internal.MutexByRoom commit 6e6f026cfad31da391ad261cfec16d41dff1b15b Author: Neil Alexander Date: Mon Jun 28 13:50:18 2021 +0100 Try to slow things down per room commit b97d406dff2e11769a9202fbf58b138a541ca449 Author: Neil Alexander Date: Mon Jun 28 13:41:27 2021 +0100 Try to slow things down commit 8866120ebf880b4fd8a456937f69903e233c19a2 Merge: 9f2de8a2 4a37b19a Author: Neil Alexander Date: Mon Jun 28 13:40:33 2021 +0100 Merge branch 'neilalexander/rsinputfifo' into neilalexander/rsinputfifo2 commit 4a37b19a8f6fe8af02e979827253d22a0ccdedb8 Author: Neil Alexander Date: Mon Jun 28 13:34:54 2021 +0100 Add comments commit f9ab3f4b8157a42d657735101bc2c768c663e814 Author: Neil Alexander Date: Mon Jun 28 13:31:21 2021 +0100 Tweaks commit 9f2de8a29cadec4c785d9c2e4e74c1138305f759 Author: Neil Alexander Date: Mon Jun 28 13:15:59 2021 +0100 Ask origin only for missing things for now commit 8fd878c75a4066abb21597d524a4eb4670a392d4 Author: Neil Alexander Date: Mon Jun 28 11:18:11 2021 +0100 Make sure someone wakes up commit b63f699f1b74948d180885449398f999fafb18c8 Author: Neil Alexander Date: Mon Jun 28 11:12:58 2021 +0100 Use a FIFO queue instead of a channel to reduce backpressure --- federationapi/routing/send.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 708ba38ec..40d4b0338 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -486,14 +486,16 @@ func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserver return t.servers } t.servers = []gomatrixserverlib.ServerName{t.Origin} - serverReq := &api.QueryServerJoinedToRoomRequest{ - RoomID: roomID, - } - serverRes := &api.QueryServerJoinedToRoomResponse{} - if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil { - t.servers = append(t.servers, serverRes.ServerNames...) - util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(t.servers), roomID) - } + /* + serverReq := &api.QueryServerJoinedToRoomRequest{ + RoomID: roomID, + } + serverRes := &api.QueryServerJoinedToRoomResponse{} + if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil { + t.servers = append(t.servers, serverRes.ServerNames...) + util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(t.servers), roomID) + } + */ return t.servers } @@ -618,13 +620,18 @@ func checkAllowedByState(e *gomatrixserverlib.Event, stateEvents []*gomatrixserv return gomatrixserverlib.Allowed(e, &authUsingState) } +var processEventWithMissingStateMutexes = internal.NewMutexByRoom() + func (t *txnReq) processEventWithMissingState( ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, ) error { + processEventWithMissingStateMutexes.Lock(e.RoomID()) + defer processEventWithMissingStateMutexes.Unlock(e.RoomID()) + // Do this with a fresh context, so that we keep working even if the // original request times out. With any luck, by the time the remote // side retries, we'll have fetched the missing state. - gmectx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + gmectx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() // We are missing the previous events for this events. // This means that there is a gap in our view of the history of the From f645646ca90eb361c5673dc65bc82673de474559 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 29 Jun 2021 09:37:28 +0100 Subject: [PATCH 03/11] Restore the getServers RS query (needs optimisation) --- federationapi/routing/send.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 40d4b0338..d9d28fdcf 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -486,16 +486,14 @@ func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserver return t.servers } t.servers = []gomatrixserverlib.ServerName{t.Origin} - /* - serverReq := &api.QueryServerJoinedToRoomRequest{ - RoomID: roomID, - } - serverRes := &api.QueryServerJoinedToRoomResponse{} - if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil { - t.servers = append(t.servers, serverRes.ServerNames...) - util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(t.servers), roomID) - } - */ + serverReq := &api.QueryServerJoinedToRoomRequest{ + RoomID: roomID, + } + serverRes := &api.QueryServerJoinedToRoomResponse{} + if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil { + t.servers = append(t.servers, serverRes.ServerNames...) + util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(t.servers), roomID) + } return t.servers } From e2b6a90d90a5f4bfd2658110ae8c2edf5777efb3 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 29 Jun 2021 10:22:26 +0100 Subject: [PATCH 04/11] Put gmectx back to 5 minutes --- federationapi/routing/send.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index d9d28fdcf..6ef565130 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -629,7 +629,7 @@ func (t *txnReq) processEventWithMissingState( // Do this with a fresh context, so that we keep working even if the // original request times out. With any luck, by the time the remote // side retries, we'll have fetched the missing state. - gmectx, cancel := context.WithTimeout(context.Background(), time.Minute) + gmectx, cancel := context.WithTimeout(context.Background(), time.Minute*5) defer cancel() // We are missing the previous events for this events. // This means that there is a gap in our view of the history of the From c849e74dfc9aabfd0d98db1310230aa362f6df2a Mon Sep 17 00:00:00 2001 From: kegsay Date: Tue, 29 Jun 2021 11:25:17 +0100 Subject: [PATCH 05/11] db migration: fix #1844 and add additional assertions (#1889) * db migration: fix #1844 and add additional assertions - Migration scripts will now check to see if there are any unconverted snapshot IDs and fail the migration if there are any. This should prevent people from getting a corrupt database in the event the root cause is still unknown. - Add an ORDER BY clause when doing batch queries in the postgres migration. LIMIT and OFFSET without ORDER BY are undefined and must not be relied upon to produce a deterministic ordering (e.g row order). See https://www.postgresql.org/docs/current/queries-limit.html * Linting Co-authored-by: Neil Alexander --- .../2021041615092700_state_blocks_refactor.go | 25 +++++++++++++++++-- .../2021041615092700_state_blocks_refactor.go | 23 ++++++++++++++++- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go b/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go index 84da96149..d87ae052b 100644 --- a/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go +++ b/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go @@ -119,11 +119,15 @@ func UpStateBlocksRefactor(tx *sql.Tx) error { _roomserver_state_snapshots JOIN _roomserver_state_block ON _roomserver_state_block.state_block_nid = ANY (_roomserver_state_snapshots.state_block_nids) WHERE - _roomserver_state_snapshots.state_snapshot_nid = ANY ( SELECT DISTINCT + _roomserver_state_snapshots.state_snapshot_nid = ANY ( + SELECT _roomserver_state_snapshots.state_snapshot_nid FROM _roomserver_state_snapshots - LIMIT $1 OFFSET $2)) AS _roomserver_state_block + ORDER BY _roomserver_state_snapshots.state_snapshot_nid ASC + LIMIT $1 OFFSET $2 + ) + ) AS _roomserver_state_block GROUP BY state_snapshot_nid, room_nid, @@ -202,6 +206,23 @@ func UpStateBlocksRefactor(tx *sql.Tx) error { } } + // By this point we should have no more state_snapshot_nids below maxsnapshotid in either roomserver_rooms or roomserver_events + // If we do, this is a problem if Dendrite tries to load the snapshot as it will not exist + // in roomserver_state_snapshots + var count int64 + if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_events WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil { + return fmt.Errorf("assertion query failed: %s", err) + } + if count > 0 { + return fmt.Errorf("%d events exist in roomserver_events which have not been converted to a new state_snapshot_nid; this is a bug, please report", count) + } + if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil { + return fmt.Errorf("assertion query failed: %s", err) + } + if count > 0 { + return fmt.Errorf("%d rooms exist in roomserver_rooms which have not been converted to a new state_snapshot_nid; this is a bug, please report", count) + } + if _, err = tx.Exec(` DROP TABLE _roomserver_state_snapshots; DROP SEQUENCE roomserver_state_snapshot_nid_seq; diff --git a/roomserver/storage/sqlite3/deltas/2021041615092700_state_blocks_refactor.go b/roomserver/storage/sqlite3/deltas/2021041615092700_state_blocks_refactor.go index 3b93b3fa6..42edbbc6f 100644 --- a/roomserver/storage/sqlite3/deltas/2021041615092700_state_blocks_refactor.go +++ b/roomserver/storage/sqlite3/deltas/2021041615092700_state_blocks_refactor.go @@ -31,6 +31,7 @@ func LoadStateBlocksRefactor(m *sqlutil.Migrations) { m.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor) } +// nolint:gocyclo func UpStateBlocksRefactor(tx *sql.Tx) error { logrus.Warn("Performing state storage upgrade. Please wait, this may take some time!") defer logrus.Warn("State storage upgrade complete") @@ -45,6 +46,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error { } maxsnapshotid++ maxblockid++ + oldMaxSnapshotID := maxsnapshotid if _, err := tx.Exec(`ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil { return fmt.Errorf("tx.Exec: %w", err) @@ -133,6 +135,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error { if jerr != nil { return fmt.Errorf("json.Marshal (new blocks): %w", jerr) } + var newsnapshot types.StateSnapshotNID err = tx.QueryRow(` INSERT INTO roomserver_state_snapshots (state_snapshot_nid, state_snapshot_hash, room_nid, state_block_nids) @@ -144,7 +147,8 @@ func UpStateBlocksRefactor(tx *sql.Tx) error { return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err) } maxsnapshotid++ - if _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid); err != nil { + _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid) + if err != nil { return fmt.Errorf("tx.Exec (update events): %w", err) } if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid); err != nil { @@ -153,6 +157,23 @@ func UpStateBlocksRefactor(tx *sql.Tx) error { } } + // By this point we should have no more state_snapshot_nids below oldMaxSnapshotID in either roomserver_rooms or roomserver_events + // If we do, this is a problem if Dendrite tries to load the snapshot as it will not exist + // in roomserver_state_snapshots + var count int64 + if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_events WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID).Scan(&count); err != nil { + return fmt.Errorf("assertion query failed: %s", err) + } + if count > 0 { + return fmt.Errorf("%d events exist in roomserver_events which have not been converted to a new state_snapshot_nid; this is a bug, please report", count) + } + if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID).Scan(&count); err != nil { + return fmt.Errorf("assertion query failed: %s", err) + } + if count > 0 { + return fmt.Errorf("%d rooms exist in roomserver_rooms which have not been converted to a new state_snapshot_nid; this is a bug, please report", count) + } + if _, err = tx.Exec(`DROP TABLE _roomserver_state_snapshots;`); err != nil { return fmt.Errorf("tx.Exec (delete old snapshot table): %w", err) } From 3afb1613522891e68a8c2f21807bb83762c4122f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 30 Jun 2021 10:01:56 +0100 Subject: [PATCH 06/11] Reduce memory usage in federation /send endpoint (#1890) * More aggressive event caching * Deduplicate /state results * Deduplicate more * Ensure we use the correct list of events when excluding repeated state * Fixes * Ensure we track all events we already knew about properly --- federationapi/routing/send.go | 69 +++++++++++++++++------------- federationapi/routing/send_test.go | 2 +- 2 files changed, 41 insertions(+), 30 deletions(-) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 6ef565130..06a38b9c0 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -106,8 +106,8 @@ func Send( eduAPI: eduAPI, keys: keys, federation: federation, + hadEvents: make(map[string]bool), haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), - newEvents: make(map[string]bool), keyAPI: keyAPI, roomsMu: mu, } @@ -167,13 +167,12 @@ type txnReq struct { servers []gomatrixserverlib.ServerName serversMutex sync.RWMutex roomsMu *internal.MutexByRoom + // a list of events from the auth and prev events which we already had + hadEvents map[string]bool // local cache of events for auth checks, etc - this may include events // which the roomserver is unaware of. haveEvents map[string]*gomatrixserverlib.HeaderedEvent - // new events which the roomserver does not know about - newEvents map[string]bool - newEventsMutex sync.RWMutex - work string // metrics + work string // metrics } // A subset of FederationClient functionality that txn requires. Useful for testing. @@ -340,19 +339,6 @@ func (e missingPrevEventsError) Error() string { return fmt.Sprintf("unable to get prev_events for event %q: %s", e.eventID, e.err) } -func (t *txnReq) haveEventIDs() map[string]bool { - t.newEventsMutex.RLock() - defer t.newEventsMutex.RUnlock() - result := make(map[string]bool, len(t.haveEvents)) - for eventID := range t.haveEvents { - if t.newEvents[eventID] { - continue - } - result[eventID] = true - } - return result -} - func (t *txnReq) processEDUs(ctx context.Context) { for _, e := range t.EDUs { eduCountTotal.Inc() @@ -527,6 +513,15 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e return roomNotFoundError{e.RoomID()} } + // Prepare a map of all the events we already had before this point, so + // that we don't send them to the roomserver again. + for _, eventID := range append(e.AuthEventIDs(), e.PrevEventIDs()...) { + t.hadEvents[eventID] = true + } + for _, eventID := range append(stateResp.MissingAuthEventIDs, stateResp.MissingPrevEventIDs...) { + t.hadEvents[eventID] = false + } + if len(stateResp.MissingAuthEventIDs) > 0 { t.work = MetricsWorkMissingAuthEvents logger.Infof("Event refers to %d unknown auth_events", len(stateResp.MissingAuthEventIDs)) @@ -596,6 +591,8 @@ withNextEvent: ); err != nil { return fmt.Errorf("api.SendEvents: %w", err) } + t.hadEvents[ev.EventID()] = true // if the roomserver didn't know about the event before, it does now + t.cacheAndReturn(ev.Headered(stateResp.RoomVersion)) delete(missingAuthEvents, missingAuthEventID) continue withNextEvent } @@ -739,7 +736,7 @@ func (t *txnReq) processEventWithMissingState( api.KindOld, resolvedState, backwardsExtremity.Headered(roomVersion), - t.haveEventIDs(), + t.hadEvents, ) if err != nil { return fmt.Errorf("api.SendEventWithState: %w", err) @@ -791,7 +788,7 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix default: return nil, false, fmt.Errorf("t.lookupEvent: %w", err) } - t.cacheAndReturn(h) + h = t.cacheAndReturn(h) if h.StateKey() != nil { addedToState := false for i := range respState.StateEvents { @@ -833,6 +830,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event // set the event from the haveEvents cache - this means we will share pointers with other prev_event branches for this // processEvent request, which is better for memory. stateEvents[i] = t.cacheAndReturn(ev) + t.hadEvents[ev.EventID()] = true } // we should never access res.StateEvents again so we delete it here to make GC faster res.StateEvents = nil @@ -863,8 +861,9 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil { return nil } - for i := range queryRes.Events { + for i, ev := range queryRes.Events { authEvents = append(authEvents, t.cacheAndReturn(queryRes.Events[i]).Unwrap()) + t.hadEvents[ev.EventID()] = true } queryRes.Events = nil } @@ -939,8 +938,9 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even return nil, err } latestEvents := make([]string, len(res.LatestEvents)) - for i := range res.LatestEvents { + for i, ev := range res.LatestEvents { latestEvents[i] = res.LatestEvents[i].EventID + t.hadEvents[ev.EventID] = true } var missingResp *gomatrixserverlib.RespMissingEvents @@ -985,6 +985,12 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even // For now, we do not allow Case B, so reject the event. logger.Infof("get_missing_events returned %d events", len(missingResp.Events)) + // Make sure events from the missingResp are using the cache - missing events + // will be added and duplicates will be removed. + for i, ev := range missingResp.Events { + missingResp.Events[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap() + } + // topologically sort and sanity check that we are making forward progress newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents) shouldHaveSomeEventIDs := e.PrevEventIDs() @@ -1023,6 +1029,14 @@ func (t *txnReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID if err := state.Check(ctx, t.keys, nil); err != nil { return nil, err } + // Cache the results of this state lookup and deduplicate anything we already + // have in the cache, freeing up memory. + for i, ev := range state.AuthEvents { + state.AuthEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap() + } + for i, ev := range state.StateEvents { + state.StateEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap() + } return &state, nil } @@ -1055,9 +1069,10 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil { return nil, err } - for i := range queryRes.Events { + for i, ev := range queryRes.Events { + queryRes.Events[i] = t.cacheAndReturn(queryRes.Events[i]) + t.hadEvents[ev.EventID()] = true evID := queryRes.Events[i].EventID() - t.cacheAndReturn(queryRes.Events[i]) if missing[evID] { delete(missing, evID) } @@ -1221,9 +1236,5 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib. util.GetLogger(ctx).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID()) return nil, verifySigError{event.EventID(), err} } - h := event.Headered(roomVersion) - t.newEventsMutex.Lock() - t.newEvents[h.EventID()] = true - t.newEventsMutex.Unlock() - return h, nil + return t.cacheAndReturn(event.Headered(roomVersion)), nil } diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index b14cbd35a..98ff1a0a3 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -370,7 +370,7 @@ func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederat keys: &test.NopJSONVerifier{}, federation: fedClient, haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), - newEvents: make(map[string]bool), + hadEvents: make(map[string]bool), roomsMu: internal.NewMutexByRoom(), } t.PDUs = pdus From 0e69212206d7abbe5d3e4c65b4ae369a0067bdc9 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 30 Jun 2021 10:39:47 +0100 Subject: [PATCH 07/11] Give up on loops when the context expires (#1891) --- federationapi/routing/send.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 06a38b9c0..032c0c3b4 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "encoding/json" + "errors" "fmt" "net/http" "sync" @@ -570,6 +571,9 @@ withNextEvent: tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID) if err != nil { logger.WithError(err).Warnf("Failed to retrieve auth event %q", missingAuthEventID) + if errors.Is(err, context.DeadlineExceeded) { + return err + } continue withNextServer } ev, err := gomatrixserverlib.NewEventFromUntrustedJSON(tx.PDUs[0], stateResp.RoomVersion) @@ -958,6 +962,9 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even break } else { logger.WithError(err).Errorf("%s pushed us an event but %q did not respond to /get_missing_events", t.Origin, server) + if errors.Is(err, context.DeadlineExceeded) { + break + } } } @@ -1218,6 +1225,9 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib. txn, err := t.federation.GetEvent(ctx, serverName, missingEventID) if err != nil || len(txn.PDUs) == 0 { util.GetLogger(ctx).WithError(err).WithField("event_id", missingEventID).Warn("Failed to get missing /event for event ID") + if errors.Is(err, context.DeadlineExceeded) { + break + } continue } event, err = gomatrixserverlib.NewEventFromUntrustedJSON(txn.PDUs[0], roomVersion) From b7a2d369c0b4c18208db5f44a29a4829dd7e39f3 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 30 Jun 2021 12:05:58 +0100 Subject: [PATCH 08/11] Change how servers are selected for missing auth/prev events (#1892) * Change how servers are selected for missing auth/prev events * Shuffle order * Move ServersInRoomProvider into api package --- .../personalities/federationapi.go | 2 +- federationapi/api/servers.go | 11 ++++ federationapi/federationapi.go | 3 + federationapi/federationapi_test.go | 2 +- federationapi/routing/routing.go | 4 +- federationapi/routing/send.go | 55 ++++++++++--------- setup/monolith.go | 2 +- 7 files changed, 50 insertions(+), 29 deletions(-) create mode 100644 federationapi/api/servers.go diff --git a/cmd/dendrite-polylith-multi/personalities/federationapi.go b/cmd/dendrite-polylith-multi/personalities/federationapi.go index 498be3c43..5ff085282 100644 --- a/cmd/dendrite-polylith-multi/personalities/federationapi.go +++ b/cmd/dendrite-polylith-multi/personalities/federationapi.go @@ -33,7 +33,7 @@ func FederationAPI(base *setup.BaseDendrite, cfg *config.Dendrite) { base.PublicFederationAPIMux, base.PublicKeyAPIMux, &base.Cfg.FederationAPI, userAPI, federation, keyRing, rsAPI, fsAPI, base.EDUServerClient(), keyAPI, - &base.Cfg.MSCs, + &base.Cfg.MSCs, nil, ) base.SetupAndServeHTTP( diff --git a/federationapi/api/servers.go b/federationapi/api/servers.go new file mode 100644 index 000000000..6bb15763d --- /dev/null +++ b/federationapi/api/servers.go @@ -0,0 +1,11 @@ +package api + +import ( + "context" + + "github.com/matrix-org/gomatrixserverlib" +) + +type ServersInRoomProvider interface { + GetServersForRoom(ctx context.Context, roomID string, event *gomatrixserverlib.Event) []gomatrixserverlib.ServerName +} diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index 6188b283e..b3297434a 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -17,6 +17,7 @@ package federationapi import ( "github.com/gorilla/mux" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" + federationAPI "github.com/matrix-org/dendrite/federationapi/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" @@ -39,10 +40,12 @@ func AddPublicRoutes( eduAPI eduserverAPI.EDUServerInputAPI, keyAPI keyserverAPI.KeyInternalAPI, mscCfg *config.MSCs, + servers federationAPI.ServersInRoomProvider, ) { routing.Setup( fedRouter, keyRouter, cfg, rsAPI, eduAPI, federationSenderAPI, keyRing, federation, userAPI, keyAPI, mscCfg, + servers, ) } diff --git a/federationapi/federationapi_test.go b/federationapi/federationapi_test.go index b97876d3d..505a11dae 100644 --- a/federationapi/federationapi_test.go +++ b/federationapi/federationapi_test.go @@ -31,7 +31,7 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) { fsAPI := base.FederationSenderHTTPClient() // TODO: This is pretty fragile, as if anything calls anything on these nils this test will break. // Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing. - federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, &cfg.MSCs) + federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, &cfg.MSCs, nil) baseURL, cancel := test.ListenAndServe(t, base.PublicFederationAPIMux, true) defer cancel() serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://")) diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index 07a28c3fc..8f33c7660 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -20,6 +20,7 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/clientapi/jsonerror" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" + federationAPI "github.com/matrix-org/dendrite/federationapi/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/httputil" @@ -50,6 +51,7 @@ func Setup( userAPI userapi.UserInternalAPI, keyAPI keyserverAPI.KeyInternalAPI, mscCfg *config.MSCs, + servers federationAPI.ServersInRoomProvider, ) { v2keysmux := keyMux.PathPrefix("/v2").Subrouter() v1fedmux := fedMux.PathPrefix("/v1").Subrouter() @@ -99,7 +101,7 @@ func Setup( func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { return Send( httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]), - cfg, rsAPI, eduAPI, keyAPI, keys, federation, mu, + cfg, rsAPI, eduAPI, keyAPI, keys, federation, mu, servers, ) }, )).Methods(http.MethodPut, http.MethodOptions) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 032c0c3b4..96932fac2 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -27,6 +27,7 @@ import ( "github.com/getsentry/sentry-go" "github.com/matrix-org/dendrite/clientapi/jsonerror" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" + federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" @@ -101,6 +102,7 @@ func Send( keys gomatrixserverlib.JSONVerifier, federation *gomatrixserverlib.FederationClient, mu *internal.MutexByRoom, + servers federationAPI.ServersInRoomProvider, ) util.JSONResponse { t := txnReq{ rsAPI: rsAPI, @@ -109,6 +111,7 @@ func Send( federation: federation, hadEvents: make(map[string]bool), haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), + servers: servers, keyAPI: keyAPI, roomsMu: mu, } @@ -160,14 +163,14 @@ func Send( type txnReq struct { gomatrixserverlib.Transaction - rsAPI api.RoomserverInternalAPI - eduAPI eduserverAPI.EDUServerInputAPI - keyAPI keyapi.KeyInternalAPI - keys gomatrixserverlib.JSONVerifier - federation txnFederationClient - servers []gomatrixserverlib.ServerName - serversMutex sync.RWMutex - roomsMu *internal.MutexByRoom + rsAPI api.RoomserverInternalAPI + eduAPI eduserverAPI.EDUServerInputAPI + keyAPI keyapi.KeyInternalAPI + keys gomatrixserverlib.JSONVerifier + federation txnFederationClient + roomsMu *internal.MutexByRoom + // something that can tell us about which servers are in a room right now + servers federationAPI.ServersInRoomProvider // a list of events from the auth and prev events which we already had hadEvents map[string]bool // local cache of events for auth checks, etc - this may include events @@ -466,22 +469,24 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli } } -func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserverlib.ServerName { - t.serversMutex.Lock() - defer t.serversMutex.Unlock() +func (t *txnReq) getServers(ctx context.Context, roomID string, event *gomatrixserverlib.Event) []gomatrixserverlib.ServerName { + // The server that sent us the event should be sufficient to tell us about missing + // prev and auth events. + servers := []gomatrixserverlib.ServerName{t.Origin} + // If the event origin is different to the transaction origin then we can use + // this as a last resort. The origin server that created the event would have + // had to know the auth and prev events. + if event != nil { + if origin := event.Origin(); origin != t.Origin { + servers = append(servers, origin) + } + } + // If a specific room-to-server provider exists then use that. This will primarily + // be used for the P2P demos. if t.servers != nil { - return t.servers + servers = append(servers, t.servers.GetServersForRoom(ctx, roomID, event)...) } - t.servers = []gomatrixserverlib.ServerName{t.Origin} - serverReq := &api.QueryServerJoinedToRoomRequest{ - RoomID: roomID, - } - serverRes := &api.QueryServerJoinedToRoomResponse{} - if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil { - t.servers = append(t.servers, serverRes.ServerNames...) - util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(t.servers), roomID) - } - return t.servers + return servers } func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error { @@ -566,7 +571,7 @@ func (t *txnReq) retrieveMissingAuthEvents( withNextEvent: for missingAuthEventID := range missingAuthEvents { withNextServer: - for _, server := range t.getServers(ctx, e.RoomID()) { + for _, server := range t.getServers(ctx, e.RoomID(), e) { logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server) tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID) if err != nil { @@ -948,7 +953,7 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even } var missingResp *gomatrixserverlib.RespMissingEvents - servers := t.getServers(ctx, e.RoomID()) + servers := t.getServers(ctx, e.RoomID(), e) for _, server := range servers { var m gomatrixserverlib.RespMissingEvents if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{ @@ -1220,7 +1225,7 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib. } var event *gomatrixserverlib.Event found := false - servers := t.getServers(ctx, roomID) + servers := t.getServers(ctx, roomID, nil) for _, serverName := range servers { txn, err := t.federation.GetEvent(ctx, serverName, missingEventID) if err != nil || len(txn.PDUs) == 0 { diff --git a/setup/monolith.go b/setup/monolith.go index a740ebb7f..235be4474 100644 --- a/setup/monolith.go +++ b/setup/monolith.go @@ -68,7 +68,7 @@ func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ss federationapi.AddPublicRoutes( ssMux, keyMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient, m.KeyRing, m.RoomserverAPI, m.FederationSenderAPI, - m.EDUInternalAPI, m.KeyAPI, &m.Config.MSCs, + m.EDUInternalAPI, m.KeyAPI, &m.Config.MSCs, nil, ) mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client) syncapi.AddPublicRoutes( From 2647f6e9c5887db30104bc303c326f79ab4a0dae Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 30 Jun 2021 12:32:20 +0100 Subject: [PATCH 09/11] Fix concurrent map read/write on haveEvents (#1893) --- federationapi/routing/send.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 96932fac2..a514127ce 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -175,8 +175,9 @@ type txnReq struct { hadEvents map[string]bool // local cache of events for auth checks, etc - this may include events // which the roomserver is unaware of. - haveEvents map[string]*gomatrixserverlib.HeaderedEvent - work string // metrics + haveEvents map[string]*gomatrixserverlib.HeaderedEvent + haveEventsMutex sync.Mutex + work string // metrics } // A subset of FederationClient functionality that txn requires. Useful for testing. @@ -817,6 +818,8 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix } func (t *txnReq) cacheAndReturn(ev *gomatrixserverlib.HeaderedEvent) *gomatrixserverlib.HeaderedEvent { + t.haveEventsMutex.Lock() + defer t.haveEventsMutex.Unlock() if cached, exists := t.haveEvents[ev.EventID()]; exists { return cached } @@ -847,6 +850,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event var authEvents []*gomatrixserverlib.Event missingAuthEvents := map[string]bool{} for _, ev := range stateEvents { + t.haveEventsMutex.Lock() for _, ae := range ev.AuthEventIDs() { if aev, ok := t.haveEvents[ae]; ok { authEvents = append(authEvents, aev.Unwrap()) @@ -854,6 +858,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event missingAuthEvents[ae] = true } } + t.haveEventsMutex.Unlock() } // QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't // have stored the event. @@ -1064,6 +1069,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even wantIDs := append(stateIDs.StateEventIDs, stateIDs.AuthEventIDs...) missing := make(map[string]bool) var missingEventList []string + t.haveEventsMutex.Lock() for _, sid := range wantIDs { if _, ok := t.haveEvents[sid]; !ok { if !missing[sid] { @@ -1072,6 +1078,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even } } } + t.haveEventsMutex.Unlock() // fetch as many as we can from the roomserver queryReq := api.QueryEventsByIDRequest{ @@ -1185,6 +1192,9 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs) ( *gomatrixserverlib.RespState, error) { // nolint:unparam + t.haveEventsMutex.Lock() + defer t.haveEventsMutex.Unlock() + // create a RespState response using the response to /state_ids as a guide respState := gomatrixserverlib.RespState{} From 192a7a792320d97fd3da0903d6b09620f0e05b35 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 2 Jul 2021 09:48:55 +0100 Subject: [PATCH 10/11] Roomserver input backpressure metric Squashed commit of the following: commit 56e934ac0aeedcfb2c072010959ba49734d4e0cb Author: Neil Alexander Date: Fri Jul 2 09:39:30 2021 +0100 Fix metric commit 3911f3a0c17b164b012e881c085ceca30f5de408 Author: Neil Alexander Date: Fri Jul 2 09:36:29 2021 +0100 Register correct metric commit a9ddbfaed421538a701151801e9451198a8be4f3 Author: Neil Alexander Date: Fri Jul 2 09:33:33 2021 +0100 Try to capture RS input backpressure metric --- roomserver/internal/input/input.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index b8279a866..6bc43c9cd 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -28,6 +28,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/gomatrixserverlib" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "go.uber.org/atomic" ) @@ -64,6 +65,9 @@ func (w *inputWorker) start() { if !ok { continue } + roomserverInputBackpressure.With(prometheus.Labels{ + "room_id": task.event.Event.RoomID(), + }).Dec() hooks.Run(hooks.KindNewEventReceived, task.event.Event) _, task.err = w.r.processRoomEvent(task.ctx, task.event) if task.err == nil { @@ -120,6 +124,20 @@ func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) er return errs } +func init() { + prometheus.MustRegister(roomserverInputBackpressure) +} + +var roomserverInputBackpressure = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "dendrite", + Subsystem: "roomserver", + Name: "input_backpressure", + Help: "How many events are queued for input for a given room", + }, + []string{"room_id"}, +) + // InputRoomEvents implements api.RoomserverInternalAPI func (r *Inputer) InputRoomEvents( _ context.Context, @@ -164,6 +182,9 @@ func (r *Inputer) InputRoomEvents( go worker.start() } worker.input.push(tasks[i]) + roomserverInputBackpressure.With(prometheus.Labels{ + "room_id": roomID, + }).Inc() } // Wait for all of the workers to return results about our tasks. From 57320897cba655046d47d2cddc7a1381a04d5c66 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 2 Jul 2021 12:33:27 +0100 Subject: [PATCH 11/11] Federation API workers for /send to reduce memory usage (#1897) * Try to process rooms concurrently in FS /send * Clean up * Use request context so that dead things don't linger for so long * Remove mutex * Free up pdus slice so only references remaining are in channel * Revert "Remove mutex" This reverts commit 8558075e8c9bab3c1d8b2252b4ab40c7eaf774e8. * Process EDUs in parallel * Try refactoring /send concurrency * Fix waitgroup * Release on waitgroup * Respond to transaction * Reduce CPU usage, fix unit tests * Tweaks * Move into one file --- federationapi/routing/send.go | 221 ++++++++++++++++++++++------------ 1 file changed, 147 insertions(+), 74 deletions(-) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index a514127ce..ae9a63fc2 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -16,7 +16,6 @@ package routing import ( "context" - "database/sql" "encoding/json" "errors" "fmt" @@ -24,7 +23,6 @@ import ( "sync" "time" - "github.com/getsentry/sentry-go" "github.com/matrix-org/dendrite/clientapi/jsonerror" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api" @@ -36,6 +34,7 @@ import ( "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) const ( @@ -90,6 +89,67 @@ func init() { ) } +type sendFIFOQueue struct { + tasks []*inputTask + count int + mutex sync.Mutex + notifs chan struct{} +} + +func newSendFIFOQueue() *sendFIFOQueue { + q := &sendFIFOQueue{ + notifs: make(chan struct{}, 1), + } + return q +} + +func (q *sendFIFOQueue) push(frame *inputTask) { + q.mutex.Lock() + defer q.mutex.Unlock() + q.tasks = append(q.tasks, frame) + q.count++ + select { + case q.notifs <- struct{}{}: + default: + } +} + +// pop returns the first item of the queue, if there is one. +// The second return value will indicate if a task was returned. +func (q *sendFIFOQueue) pop() (*inputTask, bool) { + q.mutex.Lock() + defer q.mutex.Unlock() + if q.count == 0 { + return nil, false + } + frame := q.tasks[0] + q.tasks[0] = nil + q.tasks = q.tasks[1:] + q.count-- + if q.count == 0 { + // Force a GC of the underlying array, since it might have + // grown significantly if the queue was hammered for some reason + q.tasks = nil + } + return frame, true +} + +type inputTask struct { + ctx context.Context + t *txnReq + event *gomatrixserverlib.Event + wg *sync.WaitGroup + err error // written back by worker, only safe to read when all tasks are done + duration time.Duration // written back by worker, only safe to read when all tasks are done +} + +type inputWorker struct { + running atomic.Bool + input *sendFIFOQueue +} + +var inputWorkers sync.Map // room ID -> *inputWorker + // Send implements /_matrix/federation/v1/send/{txnID} func Send( httpReq *http.Request, @@ -193,8 +253,12 @@ type txnFederationClient interface { func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) { results := make(map[string]gomatrixserverlib.PDUResult) + //var resultsMutex sync.Mutex + + var wg sync.WaitGroup + var tasks []*inputTask + wg.Add(1) // for processEDUs - pdus := []*gomatrixserverlib.HeaderedEvent{} for _, pdu := range t.PDUs { pduCountTotal.WithLabelValues("total").Inc() var header struct { @@ -245,83 +309,97 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res } continue } - pdus = append(pdus, event.Headered(verRes.RoomVersion)) + v, _ := inputWorkers.LoadOrStore(event.RoomID(), &inputWorker{ + input: newSendFIFOQueue(), + }) + worker := v.(*inputWorker) + if !worker.running.Load() { + go worker.run() + } + wg.Add(1) + task := &inputTask{ + ctx: ctx, + t: t, + event: event, + wg: &wg, + } + tasks = append(tasks, task) + worker.input.push(task) } - // Process the events. - for _, e := range pdus { - evStart := time.Now() - if err := t.processEvent(ctx, e.Unwrap()); err != nil { - // If the error is due to the event itself being bad then we skip - // it and move onto the next event. We report an error so that the - // sender knows that we have skipped processing it. - // - // However if the event is due to a temporary failure in our server - // such as a database being unavailable then we should bail, and - // hope that the sender will retry when we are feeling better. - // - // It is uncertain what we should do if an event fails because - // we failed to fetch more information from the sending server. - // For example if a request to /state fails. - // If we skip the event then we risk missing the event until we - // receive another event referencing it. - // If we bail and stop processing then we risk wedging incoming - // transactions from that server forever. - if isProcessingErrorFatal(err) { - sentry.CaptureException(err) - // Any other error should be the result of a temporary error in - // our server so we should bail processing the transaction entirely. - util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err) - jsonErr := util.ErrorResponse(err) - processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe( - float64(time.Since(evStart).Nanoseconds()) / 1000., - ) - return nil, &jsonErr - } else { - // Auth errors mean the event is 'rejected' which have to be silent to appease sytest - errMsg := "" - outcome := MetricsOutcomeRejected - _, rejected := err.(*gomatrixserverlib.NotAllowed) - if !rejected { - errMsg = err.Error() - outcome = MetricsOutcomeFail - } - util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn( - "Failed to process incoming federation event, skipping", - ) - processEventSummary.WithLabelValues(t.work, outcome).Observe( - float64(time.Since(evStart).Nanoseconds()) / 1000., - ) - results[e.EventID()] = gomatrixserverlib.PDUResult{ - Error: errMsg, - } + go func() { + defer wg.Done() + t.processEDUs(ctx) + }() + + wg.Wait() + + for _, task := range tasks { + if task.err != nil { + results[task.event.EventID()] = gomatrixserverlib.PDUResult{ + Error: task.err.Error(), } } else { - results[e.EventID()] = gomatrixserverlib.PDUResult{} - pduCountTotal.WithLabelValues("success").Inc() - processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe( - float64(time.Since(evStart).Nanoseconds()) / 1000., - ) + results[task.event.EventID()] = gomatrixserverlib.PDUResult{} } } - t.processEDUs(ctx) if c := len(results); c > 0 { util.GetLogger(ctx).Infof("Processed %d PDUs from transaction %q", c, t.TransactionID) } return &gomatrixserverlib.RespSend{PDUs: results}, nil } -// isProcessingErrorFatal returns true if the error is really bad and -// we should stop processing the transaction, and returns false if it -// is just some less serious error about a specific event. -func isProcessingErrorFatal(err error) bool { - switch err { - case sql.ErrConnDone: - case sql.ErrTxDone: - return true +func (t *inputWorker) run() { + if !t.running.CAS(false, true) { + return + } + defer t.running.Store(false) + for { + task, ok := t.input.pop() + if !ok { + return + } + if task == nil { + continue + } + func() { + defer task.wg.Done() + select { + case <-task.ctx.Done(): + task.err = context.DeadlineExceeded + return + default: + evStart := time.Now() + task.err = task.t.processEvent(task.ctx, task.event) + task.duration = time.Since(evStart) + if err := task.err; err != nil { + switch err.(type) { + case *gomatrixserverlib.NotAllowed: + processEventSummary.WithLabelValues(task.t.work, MetricsOutcomeRejected).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) + util.GetLogger(task.ctx).WithError(err).WithField("event_id", task.event.EventID()).WithField("rejected", true).Warn( + "Failed to process incoming federation event, skipping", + ) + task.err = nil // make "rejected" failures silent + default: + processEventSummary.WithLabelValues(task.t.work, MetricsOutcomeFail).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) + util.GetLogger(task.ctx).WithError(err).WithField("event_id", task.event.EventID()).WithField("rejected", false).Warn( + "Failed to process incoming federation event, skipping", + ) + } + } else { + pduCountTotal.WithLabelValues("success").Inc() + processEventSummary.WithLabelValues(task.t.work, MetricsOutcomeOK).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) + } + } + }() } - return false } type roomNotFoundError struct { @@ -633,11 +711,6 @@ func (t *txnReq) processEventWithMissingState( processEventWithMissingStateMutexes.Lock(e.RoomID()) defer processEventWithMissingStateMutexes.Unlock(e.RoomID()) - // Do this with a fresh context, so that we keep working even if the - // original request times out. With any luck, by the time the remote - // side retries, we'll have fetched the missing state. - gmectx, cancel := context.WithTimeout(context.Background(), time.Minute*5) - defer cancel() // We are missing the previous events for this events. // This means that there is a gap in our view of the history of the // room. There two ways that we can handle such a gap: @@ -658,7 +731,7 @@ func (t *txnReq) processEventWithMissingState( // - fill in the gap completely then process event `e` returning no backwards extremity // - fail to fill in the gap and tell us to terminate the transaction err=not nil // - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction - newEvents, err := t.getMissingEvents(gmectx, e, roomVersion) + newEvents, err := t.getMissingEvents(ctx, e, roomVersion) if err != nil { return err } @@ -685,7 +758,7 @@ func (t *txnReq) processEventWithMissingState( // Look up what the state is after the backward extremity. This will either // come from the roomserver, if we know all the required events, or it will // come from a remote server via /state_ids if not. - prevState, trustworthy, lerr := t.lookupStateAfterEvent(gmectx, roomVersion, backwardsExtremity.RoomID(), prevEventID) + prevState, trustworthy, lerr := t.lookupStateAfterEvent(ctx, roomVersion, backwardsExtremity.RoomID(), prevEventID) if lerr != nil { util.GetLogger(ctx).WithError(lerr).Errorf("Failed to lookup state after prev_event: %s", prevEventID) return lerr @@ -729,7 +802,7 @@ func (t *txnReq) processEventWithMissingState( } // There's more than one previous state - run them all through state res t.roomsMu.Lock(e.RoomID()) - resolvedState, err = t.resolveStatesAndCheck(gmectx, roomVersion, respStates, backwardsExtremity) + resolvedState, err = t.resolveStatesAndCheck(ctx, roomVersion, respStates, backwardsExtremity) t.roomsMu.Unlock(e.RoomID()) if err != nil { util.GetLogger(ctx).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())