Stream tweaks, use same codepath for sync vs async input room events, wait for error response via NATS messages (#2283)

This commit is contained in:
Neil Alexander 2022-03-16 14:21:11 +00:00 committed by GitHub
parent 485367fcfa
commit e30aa38fb0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 86 additions and 83 deletions

View file

@ -59,7 +59,7 @@ func NewInternalAPI(
},
},
}
js := jetstream.Prepare(&base.Cfg.Global.JetStream)
js, _ := jetstream.Prepare(&base.Cfg.Global.JetStream)
// Create a connection to the appservice postgres DB
appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database)

View file

@ -49,7 +49,7 @@ func AddPublicRoutes(
extRoomsProvider api.ExtraPublicRoomsProvider,
mscCfg *config.MSCs,
) {
js := jetstream.Prepare(&cfg.Matrix.JetStream)
js, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
syncProducer := &producers.SyncAPIProducer{
JetStream: js,

View file

@ -42,7 +42,7 @@ func NewInternalAPI(
) api.EDUServerInputAPI {
cfg := &base.Cfg.EDUServer
js := jetstream.Prepare(&cfg.Matrix.JetStream)
js, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
return &input.EDUServerInputAPI{
Cache: eduCache,

View file

@ -92,7 +92,7 @@ func NewInternalAPI(
FailuresUntilBlacklist: cfg.FederationMaxRetries,
}
js := jetstream.Prepare(&cfg.Matrix.JetStream)
js, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
queues := queue.NewOutgoingQueues(
federationDB, base.ProcessContext,

View file

@ -39,7 +39,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
func NewInternalAPI(
base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient,
) api.KeyInternalAPI {
js := jetstream.Prepare(&cfg.Matrix.JetStream)
js, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
db, err := storage.NewDatabase(&cfg.Database)
if err != nil {

View file

@ -43,6 +43,7 @@ type RoomserverInternalAPI struct {
ServerACLs *acls.ServerACLs
fsAPI fsAPI.FederationInternalAPI
asAPI asAPI.AppServiceQueryAPI
NATSClient *nats.Conn
JetStream nats.JetStreamContext
Durable string
InputRoomEventTopic string // JetStream topic for new input room events
@ -52,7 +53,8 @@ type RoomserverInternalAPI struct {
func NewRoomserverAPI(
processCtx *process.ProcessContext, cfg *config.RoomServer, roomserverDB storage.Database,
consumer nats.JetStreamContext, inputRoomEventTopic, outputRoomEventTopic string,
consumer nats.JetStreamContext, nc *nats.Conn,
inputRoomEventTopic, outputRoomEventTopic string,
caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName,
) *RoomserverInternalAPI {
serverACLs := acls.NewServerACLs(roomserverDB)
@ -66,6 +68,7 @@ func NewRoomserverAPI(
InputRoomEventTopic: inputRoomEventTopic,
OutputRoomEventTopic: outputRoomEventTopic,
JetStream: consumer,
NATSClient: nc,
Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"),
ServerACLs: serverACLs,
Queryer: &query.Queryer{
@ -92,6 +95,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
InputRoomEventTopic: r.InputRoomEventTopic,
OutputRoomEventTopic: r.OutputRoomEventTopic,
JetStream: r.JetStream,
NATSClient: r.NATSClient,
Durable: nats.Durable(r.Durable),
ServerName: r.Cfg.Matrix.ServerName,
FSAPI: fsAPI,

View file

@ -48,6 +48,7 @@ var keyContentFields = map[string]string{
type Inputer struct {
ProcessContext *process.ProcessContext
DB storage.Database
NATSClient *nats.Conn
JetStream nats.JetStreamContext
Durable nats.SubOpt
ServerName gomatrixserverlib.ServerName
@ -103,6 +104,7 @@ func (r *Inputer) Start() error {
_ = msg.InProgress() // resets the acknowledgement wait timer
defer eventsInProgress.Delete(index)
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
var errString string
if err := r.processRoomEvent(r.ProcessContext.Context(), &inputRoomEvent); err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
sentry.CaptureException(err)
@ -113,9 +115,19 @@ func (r *Inputer) Start() error {
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to process async event")
_ = msg.Term()
errString = err.Error()
} else {
_ = msg.Ack()
}
if replyTo := msg.Header.Get("sync"); replyTo != "" {
if err := r.NATSClient.Publish(replyTo, []byte(errString)); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": roomID,
"event_id": inputRoomEvent.Event.EventID(),
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to respond for sync event")
}
}
})
},
// NATS wants to acknowledge automatically by default when the message is
@ -131,6 +143,9 @@ func (r *Inputer) Start() error {
// Ensure that NATS doesn't try to resend us something that wasn't done
// within the period of time that we might still be processing it.
nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)),
// It is recommended to disable this for pull consumers as per the docs:
// https://docs.nats.io/nats-concepts/jetstream/consumers#note-about-push-and-pull-consumers
nats.MaxAckPending(-1),
)
return err
}
@ -141,15 +156,30 @@ func (r *Inputer) InputRoomEvents(
request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse,
) {
if request.Asynchronous {
var replyTo string
var replySub *nats.Subscription
if !request.Asynchronous {
var err error
replyTo = nats.NewInbox()
replySub, err = r.NATSClient.SubscribeSync(replyTo)
if err != nil {
response.ErrMsg = err.Error()
return
}
}
var err error
for _, e := range request.InputRoomEvents {
msg := &nats.Msg{
Subject: r.InputRoomEventTopic,
Header: nats.Header{},
Reply: replyTo,
}
roomID := e.Event.RoomID()
msg.Header.Set("room_id", roomID)
if replyTo != "" {
msg.Header.Set("sync", replyTo)
}
msg.Data, err = json.Marshal(e)
if err != nil {
response.ErrMsg = err.Error()
@ -163,52 +193,21 @@ func (r *Inputer) InputRoomEvents(
return
}
}
} else {
responses := make(chan error, len(request.InputRoomEvents))
for _, e := range request.InputRoomEvents {
inputRoomEvent := e
roomID := inputRoomEvent.Event.RoomID()
index := roomID + "\000" + inputRoomEvent.Event.EventID()
if _, ok := eventsInProgress.LoadOrStore(index, struct{}{}); ok {
// We're already waiting to deal with this event, so there's no
// point in queuing it up again. We've notified NATS that we're
// working on the message still, so that will have deferred the
// redelivery by a bit.
if request.Asynchronous || replySub == nil {
return
}
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
worker := r.workerForRoom(roomID)
worker.Act(nil, func() {
defer eventsInProgress.Delete(index)
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
err := r.processRoomEvent(ctx, &inputRoomEvent)
if err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
sentry.CaptureException(err)
}
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": roomID,
"event_id": inputRoomEvent.Event.EventID(),
}).Warn("Roomserver failed to process sync event")
}
select {
case <-ctx.Done():
default:
responses <- err
}
})
}
defer replySub.Drain() // nolint:errcheck
for i := 0; i < len(request.InputRoomEvents); i++ {
select {
case <-ctx.Done():
response.ErrMsg = context.DeadlineExceeded.Error()
return
case err := <-responses:
msg, err := replySub.NextMsgWithContext(ctx)
if err != nil {
response.ErrMsg = err.Error()
return
}
}
if len(msg.Data) > 0 {
response.ErrMsg = string(msg.Data)
return
}
}
}

View file

@ -50,10 +50,10 @@ func NewInternalAPI(
logrus.WithError(err).Panicf("failed to connect to room server db")
}
js := jetstream.Prepare(&cfg.Matrix.JetStream)
js, nc := jetstream.Prepare(&cfg.Matrix.JetStream)
return internal.NewRoomserverAPI(
base.ProcessContext, cfg, roomserverDB, js,
base.ProcessContext, cfg, roomserverDB, js, nc,
cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent),
cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
base.Caches, perspectiveServerNames,

View file

@ -71,8 +71,8 @@ func JetStreamConsumer(
continue
}
if f(ctx, msg) {
if err = msg.Ack(); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Ack: %w", err))
if err = msg.AckSync(); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err))
sentry.CaptureException(err)
}
} else {

View file

@ -15,7 +15,7 @@ import (
var natsServer *natsserver.Server
var natsServerMutex sync.Mutex
func Prepare(cfg *config.JetStream) natsclient.JetStreamContext {
func Prepare(cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
// check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 {
return setupNATS(cfg, nil)
@ -48,20 +48,20 @@ func Prepare(cfg *config.JetStream) natsclient.JetStreamContext {
return setupNATS(cfg, nc)
}
func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) natsclient.JetStreamContext {
func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
if nc == nil {
var err error
nc, err = natsclient.Connect(strings.Join(cfg.Addresses, ","))
if err != nil {
logrus.WithError(err).Panic("Unable to connect to NATS")
return nil
return nil, nil
}
}
s, err := nc.JetStream()
if err != nil {
logrus.WithError(err).Panic("Unable to get JetStream context")
return nil
return nil, nil
}
for _, stream := range streams { // streams are defined in streams.go
@ -89,5 +89,5 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) natsclient.JetStreamC
}
}
return s
return s, nc
}

View file

@ -42,7 +42,7 @@ var streams = []*nats.StreamConfig{
},
{
Name: OutputKeyChangeEvent,
Retention: nats.LimitsPolicy,
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{

View file

@ -49,7 +49,7 @@ func AddPublicRoutes(
federation *gomatrixserverlib.FederationClient,
cfg *config.SyncAPI,
) {
js := jetstream.Prepare(&cfg.Matrix.JetStream)
js, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database)
if err != nil {

View file

@ -46,7 +46,7 @@ func NewInternalAPI(
appServices []config.ApplicationService, keyAPI keyapi.KeyInternalAPI,
rsAPI rsapi.RoomserverInternalAPI, pgClient pushgateway.Client,
) api.UserInternalAPI {
js := jetstream.Prepare(&cfg.Matrix.JetStream)
js, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
syncProducer := producers.NewSyncAPI(
db, js,