diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 076e18d90..0187dd522 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -84,6 +84,19 @@ func (r *Inputer) startWorkerForRoom(roomID string) { consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID)) subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID)) + // Create the consumer. We do this as a specific step rather than + // letting PullSubscribe create it for us because we need the consumer + // to outlive the subscription. If we do it this way, we can Bind in the + // next step, and when we Unsubscribe, the consumer continues to live. If + // we leave PullSubscribe to create the durable consumer, Unsubscribe will + // delete it because it thinks it "owns" it, which in turn breaks the + // interest-based retention storage policy. + // If the durable consumer already exists, this is effectively a no-op. + // Another interesting tid-bit here: the ACK policy is set to "all" so that + // if we acknowledge a message, we also acknowledge everything that comes + // before it. This is necessary because otherwise our consumer will never + // acknowledge things we filtered out for other subjects and therefore they + // will linger around forever. if _, err := w.r.JetStream.AddConsumer( r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), &nats.ConsumerConfig{ @@ -98,6 +111,9 @@ func (r *Inputer) startWorkerForRoom(roomID string) { return } + // Bind to our durable consumer. We want to receive all messages waiting + // for this subject and we want to manually acknowledge them, so that we + // can ensure they are only cleaned up when we are done processing them. sub, err := w.r.JetStream.PullSubscribe( subject, consumer, nats.ManualAck(), @@ -110,20 +126,26 @@ func (r *Inputer) startWorkerForRoom(roomID string) { return } - logrus.Infof("Stream for room %q active", w.roomID) + // Go and start pulling messages off the queue. w.subscription = sub - w.Act(nil, w.next) + w.Act(nil, w._next) } } -// onMessage is called when a new event arrives in the roomserver input stream. +// Start creates an ephemeral non-durable consumer on the roomserver +// input topic. It is configured to deliver us headers only because we +// don't actually care about the contents of the message at this point, +// we only care about the `room_id` field. Once a message arrives, we +// will look to see if we have a worker for that room which has its +// own consumer. If we don't, we'll start one. func (r *Inputer) Start() error { _, err := r.JetStream.Subscribe( - "", // don't supply a subject because we're using BindStream + "", // This is blank because we specified it in BindStream. func(m *nats.Msg) { roomID := m.Header.Get(jetstream.RoomID) r.startWorkerForRoom(roomID) }, + nats.HeadersOnly(), nats.DeliverAll(), nats.AckNone(), nats.BindStream(r.InputRoomEventTopic), @@ -131,18 +153,30 @@ func (r *Inputer) Start() error { return err } -func (w *worker) next() { - ctx, cancel := context.WithTimeout(w.r.ProcessContext.Context(), time.Second*30) +// _next is called by the worker for the room. It must only be called +// by the actor embedded into the worker. +func (w *worker) _next() { + // Look up what the next event is that's waiting to be processed. + ctx, cancel := context.WithTimeout(w.r.ProcessContext.Context(), time.Minute) defer cancel() msgs, err := w.subscription.Fetch(1, nats.Context(ctx)) switch err { case nil: + // Make sure that once we're done here, we queue up another call + // to _next in the inbox. + defer w.Act(nil, w._next) + + // If no error was reported, but we didn't get exactly one message, + // then skip over this and try again on the next iteration. if len(msgs) != 1 { return } - defer w.Act(nil, w.next) + case context.DeadlineExceeded: - logrus.Infof("Stream for room %q inactive", w.roomID) + // The context exceeded, so we've been waiting for more than a + // minute for activity in this room. At this point we will shut + // down the subscriber to free up resources. It'll get started + // again if new activity happens. if err = w.subscription.Unsubscribe(); err != nil { logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) } @@ -150,10 +184,12 @@ func (w *worker) next() { w.subscription = nil w.Unlock() return - case nats.ErrTimeout: - w.Act(nil, w.next) - return + default: + // Something went wrong while trying to fetch the next event + // from the queue. In which case, we'll shut down the subscriber + // and wait to be notified about new room activity again. Maybe + // the problem will be corrected by then. logrus.WithError(err).Errorf("Failed to get next stream message for room %q", w.roomID) if err = w.subscription.Unsubscribe(); err != nil { logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) @@ -164,6 +200,9 @@ func (w *worker) next() { return } + // Try to unmarshal the input room event. If the JSON unmarshalling + // fails then we'll terminate the message — this notifies NATS that + // we are done with the message and never want to see it again. msg := msgs[0] var inputRoomEvent api.InputRoomEvent if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { @@ -174,6 +213,10 @@ func (w *worker) next() { roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc() defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() + // Process the room event. If something goes wrong then we'll tell + // NATS to terminate the message. We'll store the error result as + // a string, because we might want to return that to the caller if + // it was a synchronous request. var errString string if err = w.r.processRoomEvent(w.r.ProcessContext.Context(), &inputRoomEvent); err != nil { if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { @@ -189,6 +232,13 @@ func (w *worker) next() { } else { _ = msg.Ack() } + + // If it was a synchronous input request then the "sync" field + // will be present in the message. That means that someone is + // waiting for a response. The temporary inbox name is present in + // that field, so send back the error string (if any). If there + // was no error then we'll return a blank message, which means + // that everything was OK. if replyTo := msg.Header.Get("sync"); replyTo != "" { if err = w.r.NATSClient.Publish(replyTo, []byte(errString)); err != nil { logrus.WithError(err).WithFields(logrus.Fields{ @@ -206,6 +256,10 @@ func (r *Inputer) InputRoomEvents( request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, ) { + // If the request is synchronous then we need to create a + // temporary inbox to wait for responses on, and then create + // a subscription to it. If it's asynchronous then we won't + // bother, so these values will remain empty. var replyTo string var replySub *nats.Subscription if !request.Asynchronous { @@ -218,6 +272,8 @@ func (r *Inputer) InputRoomEvents( } } + // For each event, marshal the input room event and then + // send it into the input queue. var err error for _, e := range request.InputRoomEvents { roomID := e.Event.RoomID() @@ -246,10 +302,16 @@ func (r *Inputer) InputRoomEvents( } } + // If we aren't waiting for synchronous responses then we can + // give up here, there is nothing further to do. if request.Asynchronous || replySub == nil { return } + // Otherwise, we'll want to sit and wait for the responses + // from the roomserver. There will be one response for every + // input we submitted. The last error value we receive will + // be the one returned as the error string. defer replySub.Drain() // nolint:errcheck for i := 0; i < len(request.InputRoomEvents); i++ { msg, err := replySub.NextMsgWithContext(ctx) @@ -259,7 +321,6 @@ func (r *Inputer) InputRoomEvents( } if len(msg.Data) > 0 { response.ErrMsg = string(msg.Data) - return } } }