This commit is contained in:
Neil Alexander 2022-03-22 16:25:38 +00:00
parent 8ab5ad2fb5
commit 08b992b0bc
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -84,6 +84,19 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID)) consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID))
subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID)) subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID))
// Create the consumer. We do this as a specific step rather than
// letting PullSubscribe create it for us because we need the consumer
// to outlive the subscription. If we do it this way, we can Bind in the
// next step, and when we Unsubscribe, the consumer continues to live. If
// we leave PullSubscribe to create the durable consumer, Unsubscribe will
// delete it because it thinks it "owns" it, which in turn breaks the
// interest-based retention storage policy.
// If the durable consumer already exists, this is effectively a no-op.
// Another interesting tid-bit here: the ACK policy is set to "all" so that
// if we acknowledge a message, we also acknowledge everything that comes
// before it. This is necessary because otherwise our consumer will never
// acknowledge things we filtered out for other subjects and therefore they
// will linger around forever.
if _, err := w.r.JetStream.AddConsumer( if _, err := w.r.JetStream.AddConsumer(
r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
&nats.ConsumerConfig{ &nats.ConsumerConfig{
@ -98,6 +111,9 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
return return
} }
// Bind to our durable consumer. We want to receive all messages waiting
// for this subject and we want to manually acknowledge them, so that we
// can ensure they are only cleaned up when we are done processing them.
sub, err := w.r.JetStream.PullSubscribe( sub, err := w.r.JetStream.PullSubscribe(
subject, consumer, subject, consumer,
nats.ManualAck(), nats.ManualAck(),
@ -110,20 +126,26 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
return return
} }
logrus.Infof("Stream for room %q active", w.roomID) // Go and start pulling messages off the queue.
w.subscription = sub w.subscription = sub
w.Act(nil, w.next) w.Act(nil, w._next)
} }
} }
// onMessage is called when a new event arrives in the roomserver input stream. // Start creates an ephemeral non-durable consumer on the roomserver
// input topic. It is configured to deliver us headers only because we
// don't actually care about the contents of the message at this point,
// we only care about the `room_id` field. Once a message arrives, we
// will look to see if we have a worker for that room which has its
// own consumer. If we don't, we'll start one.
func (r *Inputer) Start() error { func (r *Inputer) Start() error {
_, err := r.JetStream.Subscribe( _, err := r.JetStream.Subscribe(
"", // don't supply a subject because we're using BindStream "", // This is blank because we specified it in BindStream.
func(m *nats.Msg) { func(m *nats.Msg) {
roomID := m.Header.Get(jetstream.RoomID) roomID := m.Header.Get(jetstream.RoomID)
r.startWorkerForRoom(roomID) r.startWorkerForRoom(roomID)
}, },
nats.HeadersOnly(),
nats.DeliverAll(), nats.DeliverAll(),
nats.AckNone(), nats.AckNone(),
nats.BindStream(r.InputRoomEventTopic), nats.BindStream(r.InputRoomEventTopic),
@ -131,18 +153,30 @@ func (r *Inputer) Start() error {
return err return err
} }
func (w *worker) next() { // _next is called by the worker for the room. It must only be called
ctx, cancel := context.WithTimeout(w.r.ProcessContext.Context(), time.Second*30) // by the actor embedded into the worker.
func (w *worker) _next() {
// Look up what the next event is that's waiting to be processed.
ctx, cancel := context.WithTimeout(w.r.ProcessContext.Context(), time.Minute)
defer cancel() defer cancel()
msgs, err := w.subscription.Fetch(1, nats.Context(ctx)) msgs, err := w.subscription.Fetch(1, nats.Context(ctx))
switch err { switch err {
case nil: case nil:
// Make sure that once we're done here, we queue up another call
// to _next in the inbox.
defer w.Act(nil, w._next)
// If no error was reported, but we didn't get exactly one message,
// then skip over this and try again on the next iteration.
if len(msgs) != 1 { if len(msgs) != 1 {
return return
} }
defer w.Act(nil, w.next)
case context.DeadlineExceeded: case context.DeadlineExceeded:
logrus.Infof("Stream for room %q inactive", w.roomID) // The context exceeded, so we've been waiting for more than a
// minute for activity in this room. At this point we will shut
// down the subscriber to free up resources. It'll get started
// again if new activity happens.
if err = w.subscription.Unsubscribe(); err != nil { if err = w.subscription.Unsubscribe(); err != nil {
logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID)
} }
@ -150,10 +184,12 @@ func (w *worker) next() {
w.subscription = nil w.subscription = nil
w.Unlock() w.Unlock()
return return
case nats.ErrTimeout:
w.Act(nil, w.next)
return
default: default:
// Something went wrong while trying to fetch the next event
// from the queue. In which case, we'll shut down the subscriber
// and wait to be notified about new room activity again. Maybe
// the problem will be corrected by then.
logrus.WithError(err).Errorf("Failed to get next stream message for room %q", w.roomID) logrus.WithError(err).Errorf("Failed to get next stream message for room %q", w.roomID)
if err = w.subscription.Unsubscribe(); err != nil { if err = w.subscription.Unsubscribe(); err != nil {
logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID)
@ -164,6 +200,9 @@ func (w *worker) next() {
return return
} }
// Try to unmarshal the input room event. If the JSON unmarshalling
// fails then we'll terminate the message — this notifies NATS that
// we are done with the message and never want to see it again.
msg := msgs[0] msg := msgs[0]
var inputRoomEvent api.InputRoomEvent var inputRoomEvent api.InputRoomEvent
if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
@ -174,6 +213,10 @@ func (w *worker) next() {
roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc() roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc()
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
// Process the room event. If something goes wrong then we'll tell
// NATS to terminate the message. We'll store the error result as
// a string, because we might want to return that to the caller if
// it was a synchronous request.
var errString string var errString string
if err = w.r.processRoomEvent(w.r.ProcessContext.Context(), &inputRoomEvent); err != nil { if err = w.r.processRoomEvent(w.r.ProcessContext.Context(), &inputRoomEvent); err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
@ -189,6 +232,13 @@ func (w *worker) next() {
} else { } else {
_ = msg.Ack() _ = msg.Ack()
} }
// If it was a synchronous input request then the "sync" field
// will be present in the message. That means that someone is
// waiting for a response. The temporary inbox name is present in
// that field, so send back the error string (if any). If there
// was no error then we'll return a blank message, which means
// that everything was OK.
if replyTo := msg.Header.Get("sync"); replyTo != "" { if replyTo := msg.Header.Get("sync"); replyTo != "" {
if err = w.r.NATSClient.Publish(replyTo, []byte(errString)); err != nil { if err = w.r.NATSClient.Publish(replyTo, []byte(errString)); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{ logrus.WithError(err).WithFields(logrus.Fields{
@ -206,6 +256,10 @@ func (r *Inputer) InputRoomEvents(
request *api.InputRoomEventsRequest, request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse, response *api.InputRoomEventsResponse,
) { ) {
// If the request is synchronous then we need to create a
// temporary inbox to wait for responses on, and then create
// a subscription to it. If it's asynchronous then we won't
// bother, so these values will remain empty.
var replyTo string var replyTo string
var replySub *nats.Subscription var replySub *nats.Subscription
if !request.Asynchronous { if !request.Asynchronous {
@ -218,6 +272,8 @@ func (r *Inputer) InputRoomEvents(
} }
} }
// For each event, marshal the input room event and then
// send it into the input queue.
var err error var err error
for _, e := range request.InputRoomEvents { for _, e := range request.InputRoomEvents {
roomID := e.Event.RoomID() roomID := e.Event.RoomID()
@ -246,10 +302,16 @@ func (r *Inputer) InputRoomEvents(
} }
} }
// If we aren't waiting for synchronous responses then we can
// give up here, there is nothing further to do.
if request.Asynchronous || replySub == nil { if request.Asynchronous || replySub == nil {
return return
} }
// Otherwise, we'll want to sit and wait for the responses
// from the roomserver. There will be one response for every
// input we submitted. The last error value we receive will
// be the one returned as the error string.
defer replySub.Drain() // nolint:errcheck defer replySub.Drain() // nolint:errcheck
for i := 0; i < len(request.InputRoomEvents); i++ { for i := 0; i < len(request.InputRoomEvents); i++ {
msg, err := replySub.NextMsgWithContext(ctx) msg, err := replySub.NextMsgWithContext(ctx)
@ -259,7 +321,6 @@ func (r *Inputer) InputRoomEvents(
} }
if len(msg.Data) > 0 { if len(msg.Data) > 0 {
response.ErrMsg = string(msg.Data) response.ErrMsg = string(msg.Data)
return
} }
} }
} }