Review comments

This commit is contained in:
Neil Alexander 2022-03-23 09:41:58 +00:00
parent 1f10bce999
commit 01403f9d62
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 41 additions and 1 deletions

View file

@ -46,6 +46,33 @@ var keyContentFields = map[string]string{
"m.room.member": "membership", "m.room.member": "membership",
} }
// Inputer is responsible for consuming from the roomserver input
// streams and processing the events. All input events are queued
// into a single NATS stream and the order is preserved strictly.
// The `room_id` message header will contain the room ID which will
// be used to assign the pending event to a per-room worker.
//
// The input API maintains an ephemeral headers-only consumer. It
// will speed through the stream working out which room IDs are
// pending and create durable consumers for them. The durable
// consumer will then be used for each room worker goroutine to
// fetch events one by one and process them. Each room having a
// durable consumer of its own means there is no head-of-line
// blocking between rooms. Filtering ensures that each durable
// consumer only receives events for the room it is interested in.
//
// The ephemeral consumer closely tracks the newest events. The
// per-room durable consumers will only progress through the stream
// as events are processed.
//
// A BC * -> positions of each consumer (* = ephemeral)
// ⌄ ⌄⌄ ⌄
// ABAABCAABCAA -> newest (letter = subject for each message)
//
// In this example, A is still processing an event but has two
// pending events to process afterwards. Both B and C are caught
// up, so they will do nothing until a new event comes in for B
// or C.
type Inputer struct { type Inputer struct {
Cfg *config.RoomServer Cfg *config.RoomServer
ProcessContext *process.ProcessContext ProcessContext *process.ProcessContext
@ -271,6 +298,13 @@ func (r *Inputer) InputRoomEvents(
response.ErrMsg = err.Error() response.ErrMsg = err.Error()
return return
} }
if replySub == nil {
// This shouldn't ever happen, but it doesn't hurt to check
// because we can potentially avoid a nil pointer panic later
// if it did for some reason.
response.ErrMsg = "no subscription to temporary inbox"
return
}
} }
// For each event, marshal the input room event and then // For each event, marshal the input room event and then
@ -305,7 +339,7 @@ func (r *Inputer) InputRoomEvents(
// If we aren't waiting for synchronous responses then we can // If we aren't waiting for synchronous responses then we can
// give up here, there is nothing further to do. // give up here, there is nothing further to do.
if request.Asynchronous || replySub == nil { if request.Asynchronous {
return return
} }

View file

@ -83,6 +83,12 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
} }
subjects := stream.Subjects subjects := stream.Subjects
if len(subjects) == 0 { if len(subjects) == 0 {
// By default we want each stream to listen for the subjects
// that are either an exact match for the stream name, or where
// the first part of the subject is the stream name. ">" is a
// wildcard in NATS for one or more subject tokens. In the case
// that the stream is called "Foo", this will match any message
// with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc.
subjects = []string{name, name + ".>"} subjects = []string{name, name + ".>"}
} }
if info != nil { if info != nil {