diff --git a/appservice/appservice.go b/appservice/appservice.go index b33d7b701..3e19e09b2 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -59,7 +59,7 @@ func NewInternalAPI( }, }, } - js := jetstream.Prepare(&base.Cfg.Global.JetStream) + js, _ := jetstream.Prepare(&base.Cfg.Global.JetStream) // Create a connection to the appservice postgres DB appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database) diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 918476674..e4279c220 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -49,7 +49,7 @@ func AddPublicRoutes( extRoomsProvider api.ExtraPublicRoomsProvider, mscCfg *config.MSCs, ) { - js := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) syncProducer := &producers.SyncAPIProducer{ JetStream: js, diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index 9b7e21651..6882399da 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -42,7 +42,7 @@ func NewInternalAPI( ) api.EDUServerInputAPI { cfg := &base.Cfg.EDUServer - js := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) return &input.EDUServerInputAPI{ Cache: eduCache, diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index a982d8009..9f149d973 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -92,7 +92,7 @@ func NewInternalAPI( FailuresUntilBlacklist: cfg.FederationMaxRetries, } - js := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) queues := queue.NewOutgoingQueues( federationDB, base.ProcessContext, diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index bd36fd9f9..8a0ce6178 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -39,7 +39,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { func NewInternalAPI( base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, ) api.KeyInternalAPI { - js := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) db, err := storage.NewDatabase(&cfg.Database) if err != nil { diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 10c8c844e..91001e418 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -43,6 +43,7 @@ type RoomserverInternalAPI struct { ServerACLs *acls.ServerACLs fsAPI fsAPI.FederationInternalAPI asAPI asAPI.AppServiceQueryAPI + NATSClient *nats.Conn JetStream nats.JetStreamContext Durable string InputRoomEventTopic string // JetStream topic for new input room events @@ -52,7 +53,8 @@ type RoomserverInternalAPI struct { func NewRoomserverAPI( processCtx *process.ProcessContext, cfg *config.RoomServer, roomserverDB storage.Database, - consumer nats.JetStreamContext, inputRoomEventTopic, outputRoomEventTopic string, + consumer nats.JetStreamContext, nc *nats.Conn, + inputRoomEventTopic, outputRoomEventTopic string, caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName, ) *RoomserverInternalAPI { serverACLs := acls.NewServerACLs(roomserverDB) @@ -66,6 +68,7 @@ func NewRoomserverAPI( InputRoomEventTopic: inputRoomEventTopic, OutputRoomEventTopic: outputRoomEventTopic, JetStream: consumer, + NATSClient: nc, Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"), ServerACLs: serverACLs, Queryer: &query.Queryer{ @@ -92,6 +95,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA InputRoomEventTopic: r.InputRoomEventTopic, OutputRoomEventTopic: r.OutputRoomEventTopic, JetStream: r.JetStream, + NATSClient: r.NATSClient, Durable: nats.Durable(r.Durable), ServerName: r.Cfg.Matrix.ServerName, FSAPI: fsAPI, diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 178533ded..c6e354611 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -48,6 +48,7 @@ var keyContentFields = map[string]string{ type Inputer struct { ProcessContext *process.ProcessContext DB storage.Database + NATSClient *nats.Conn JetStream nats.JetStreamContext Durable nats.SubOpt ServerName gomatrixserverlib.ServerName @@ -103,6 +104,7 @@ func (r *Inputer) Start() error { _ = msg.InProgress() // resets the acknowledgement wait timer defer eventsInProgress.Delete(index) defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() + var errString string if err := r.processRoomEvent(r.ProcessContext.Context(), &inputRoomEvent); err != nil { if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { sentry.CaptureException(err) @@ -113,9 +115,19 @@ func (r *Inputer) Start() error { "type": inputRoomEvent.Event.Type(), }).Warn("Roomserver failed to process async event") _ = msg.Term() + errString = err.Error() } else { _ = msg.Ack() } + if replyTo := msg.Header.Get("sync"); replyTo != "" { + if err := r.NATSClient.Publish(replyTo, []byte(errString)); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "room_id": roomID, + "event_id": inputRoomEvent.Event.EventID(), + "type": inputRoomEvent.Event.Type(), + }).Warn("Roomserver failed to respond for sync event") + } + } }) }, // NATS wants to acknowledge automatically by default when the message is @@ -131,6 +143,9 @@ func (r *Inputer) Start() error { // Ensure that NATS doesn't try to resend us something that wasn't done // within the period of time that we might still be processing it. nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)), + // It is recommended to disable this for pull consumers as per the docs: + // https://docs.nats.io/nats-concepts/jetstream/consumers#note-about-push-and-pull-consumers + nats.MaxAckPending(-1), ) return err } @@ -141,74 +156,58 @@ func (r *Inputer) InputRoomEvents( request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, ) { - if request.Asynchronous { + var replyTo string + var replySub *nats.Subscription + if !request.Asynchronous { var err error - for _, e := range request.InputRoomEvents { - msg := &nats.Msg{ - Subject: r.InputRoomEventTopic, - Header: nats.Header{}, - } - roomID := e.Event.RoomID() - msg.Header.Set("room_id", roomID) - msg.Data, err = json.Marshal(e) - if err != nil { - response.ErrMsg = err.Error() - return - } - if _, err = r.JetStream.PublishMsg(msg); err != nil { - logrus.WithError(err).WithFields(logrus.Fields{ - "room_id": roomID, - "event_id": e.Event.EventID(), - }).Error("Roomserver failed to queue async event") - return - } + replyTo = nats.NewInbox() + replySub, err = r.NATSClient.SubscribeSync(replyTo) + if err != nil { + response.ErrMsg = err.Error() + return } - } else { - responses := make(chan error, len(request.InputRoomEvents)) - for _, e := range request.InputRoomEvents { - inputRoomEvent := e - roomID := inputRoomEvent.Event.RoomID() - index := roomID + "\000" + inputRoomEvent.Event.EventID() - if _, ok := eventsInProgress.LoadOrStore(index, struct{}{}); ok { - // We're already waiting to deal with this event, so there's no - // point in queuing it up again. We've notified NATS that we're - // working on the message still, so that will have deferred the - // redelivery by a bit. - return - } - roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() - worker := r.workerForRoom(roomID) - worker.Act(nil, func() { - defer eventsInProgress.Delete(index) - defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() - err := r.processRoomEvent(ctx, &inputRoomEvent) - if err != nil { - if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { - sentry.CaptureException(err) - } - logrus.WithError(err).WithFields(logrus.Fields{ - "room_id": roomID, - "event_id": inputRoomEvent.Event.EventID(), - }).Warn("Roomserver failed to process sync event") - } - select { - case <-ctx.Done(): - default: - responses <- err - } - }) + } + + var err error + for _, e := range request.InputRoomEvents { + msg := &nats.Msg{ + Subject: r.InputRoomEventTopic, + Header: nats.Header{}, + Reply: replyTo, } - for i := 0; i < len(request.InputRoomEvents); i++ { - select { - case <-ctx.Done(): - response.ErrMsg = context.DeadlineExceeded.Error() - return - case err := <-responses: - if err != nil { - response.ErrMsg = err.Error() - return - } - } + roomID := e.Event.RoomID() + msg.Header.Set("room_id", roomID) + if replyTo != "" { + msg.Header.Set("sync", replyTo) + } + msg.Data, err = json.Marshal(e) + if err != nil { + response.ErrMsg = err.Error() + return + } + if _, err = r.JetStream.PublishMsg(msg); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "room_id": roomID, + "event_id": e.Event.EventID(), + }).Error("Roomserver failed to queue async event") + return + } + } + + if request.Asynchronous || replySub == nil { + return + } + + defer replySub.Drain() // nolint:errcheck + for i := 0; i < len(request.InputRoomEvents); i++ { + msg, err := replySub.NextMsgWithContext(ctx) + if err != nil { + response.ErrMsg = err.Error() + return + } + if len(msg.Data) > 0 { + response.ErrMsg = string(msg.Data) + return } } } diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 950c6b4e7..1992ac335 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -50,10 +50,10 @@ func NewInternalAPI( logrus.WithError(err).Panicf("failed to connect to room server db") } - js := jetstream.Prepare(&cfg.Matrix.JetStream) + js, nc := jetstream.Prepare(&cfg.Matrix.JetStream) return internal.NewRoomserverAPI( - base.ProcessContext, cfg, roomserverDB, js, + base.ProcessContext, cfg, roomserverDB, js, nc, cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent), cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), base.Caches, perspectiveServerNames, diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go index d444272d2..78cecb6ae 100644 --- a/setup/jetstream/helpers.go +++ b/setup/jetstream/helpers.go @@ -71,8 +71,8 @@ func JetStreamConsumer( continue } if f(ctx, msg) { - if err = msg.Ack(); err != nil { - logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Ack: %w", err)) + if err = msg.AckSync(); err != nil { + logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err)) sentry.CaptureException(err) } } else { diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 562b0131e..37597d584 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -15,7 +15,7 @@ import ( var natsServer *natsserver.Server var natsServerMutex sync.Mutex -func Prepare(cfg *config.JetStream) natsclient.JetStreamContext { +func Prepare(cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { // check if we need an in-process NATS Server if len(cfg.Addresses) != 0 { return setupNATS(cfg, nil) @@ -48,20 +48,20 @@ func Prepare(cfg *config.JetStream) natsclient.JetStreamContext { return setupNATS(cfg, nc) } -func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) natsclient.JetStreamContext { +func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) { if nc == nil { var err error nc, err = natsclient.Connect(strings.Join(cfg.Addresses, ",")) if err != nil { logrus.WithError(err).Panic("Unable to connect to NATS") - return nil + return nil, nil } } s, err := nc.JetStream() if err != nil { logrus.WithError(err).Panic("Unable to get JetStream context") - return nil + return nil, nil } for _, stream := range streams { // streams are defined in streams.go @@ -89,5 +89,5 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) natsclient.JetStreamC } } - return s + return s, nc } diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 3f07488f9..aa3e95cb8 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -42,7 +42,7 @@ var streams = []*nats.StreamConfig{ }, { Name: OutputKeyChangeEvent, - Retention: nats.LimitsPolicy, + Retention: nats.InterestPolicy, Storage: nats.FileStorage, }, { diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index cb9890ff7..41635c911 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -49,7 +49,7 @@ func AddPublicRoutes( federation *gomatrixserverlib.FederationClient, cfg *config.SyncAPI, ) { - js := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) if err != nil { diff --git a/userapi/userapi.go b/userapi/userapi.go index 251a4edad..1e4ebcb2e 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -46,7 +46,7 @@ func NewInternalAPI( appServices []config.ApplicationService, keyAPI keyapi.KeyInternalAPI, rsAPI rsapi.RoomserverInternalAPI, pgClient pushgateway.Client, ) api.UserInternalAPI { - js := jetstream.Prepare(&cfg.Matrix.JetStream) + js, _ := jetstream.Prepare(&cfg.Matrix.JetStream) syncProducer := producers.NewSyncAPI( db, js,