From 01403f9d6210636be2c3dc8253d0409aa8f8a114 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 23 Mar 2022 09:41:58 +0000 Subject: [PATCH] Review comments --- roomserver/internal/input/input.go | 36 +++++++++++++++++++++++++++++- setup/jetstream/nats.go | 6 +++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 988e26614..f57bd3109 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -46,6 +46,33 @@ var keyContentFields = map[string]string{ "m.room.member": "membership", } +// Inputer is responsible for consuming from the roomserver input +// streams and processing the events. All input events are queued +// into a single NATS stream and the order is preserved strictly. +// The `room_id` message header will contain the room ID which will +// be used to assign the pending event to a per-room worker. +// +// The input API maintains an ephemeral headers-only consumer. It +// will speed through the stream working out which room IDs are +// pending and create durable consumers for them. The durable +// consumer will then be used for each room worker goroutine to +// fetch events one by one and process them. Each room having a +// durable consumer of its own means there is no head-of-line +// blocking between rooms. Filtering ensures that each durable +// consumer only receives events for the room it is interested in. +// +// The ephemeral consumer closely tracks the newest events. The +// per-room durable consumers will only progress through the stream +// as events are processed. +// +// A BC * -> positions of each consumer (* = ephemeral) +// ⌄ ⌄⌄ ⌄ +// ABAABCAABCAA -> newest (letter = subject for each message) +// +// In this example, A is still processing an event but has two +// pending events to process afterwards. Both B and C are caught +// up, so they will do nothing until a new event comes in for B +// or C. type Inputer struct { Cfg *config.RoomServer ProcessContext *process.ProcessContext @@ -271,6 +298,13 @@ func (r *Inputer) InputRoomEvents( response.ErrMsg = err.Error() return } + if replySub == nil { + // This shouldn't ever happen, but it doesn't hurt to check + // because we can potentially avoid a nil pointer panic later + // if it did for some reason. + response.ErrMsg = "no subscription to temporary inbox" + return + } } // For each event, marshal the input room event and then @@ -305,7 +339,7 @@ func (r *Inputer) InputRoomEvents( // If we aren't waiting for synchronous responses then we can // give up here, there is nothing further to do. - if request.Asynchronous || replySub == nil { + if request.Asynchronous { return } diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 9e4a61ae1..748c191b0 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -83,6 +83,12 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream } subjects := stream.Subjects if len(subjects) == 0 { + // By default we want each stream to listen for the subjects + // that are either an exact match for the stream name, or where + // the first part of the subject is the stream name. ">" is a + // wildcard in NATS for one or more subject tokens. In the case + // that the stream is called "Foo", this will match any message + // with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc. subjects = []string{name, name + ".>"} } if info != nil {