From 134ec186146352ffff3a935aadd5c000007db520 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 3 Nov 2021 15:57:36 +0000 Subject: [PATCH] Ack tweaks --- appservice/consumers/roomserver.go | 5 ++--- federationsender/consumers/eduserver.go | 24 ++++++++++----------- federationsender/consumers/roomserver.go | 4 ++-- roomserver/internal/input/input.go | 6 +++--- syncapi/consumers/clientapi.go | 2 +- syncapi/consumers/eduserver_receipts.go | 2 +- syncapi/consumers/eduserver_sendtodevice.go | 6 +++--- syncapi/consumers/eduserver_typing.go | 2 +- syncapi/consumers/roomserver.go | 5 +++-- 9 files changed, 28 insertions(+), 28 deletions(-) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index cde02348d..ddcc478c6 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -74,12 +74,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { if err := json.Unmarshal(msg.Data, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") - _ = msg.Nak() + _ = msg.Ack() return } if output.Type != api.OutputTypeNewRoomEvent { - _ = msg.Nak() + _ = msg.Ack() return } @@ -89,7 +89,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { // Send event to any relevant application services if err := s.filterRoomserverEvents(context.TODO(), events); err != nil { log.WithError(err).Errorf("roomserver output log: filter error") - _ = msg.Nak() return } diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go index 5fee5b48f..bedc51f2f 100644 --- a/federationsender/consumers/eduserver.go +++ b/federationsender/consumers/eduserver.go @@ -81,7 +81,7 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) { var ote api.OutputSendToDeviceEvent if err := json.Unmarshal(msg.Data, &ote); err != nil { log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)") - _ = msg.Nak() + _ = msg.Ack() return } @@ -89,19 +89,19 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) { _, originServerName, err := gomatrixserverlib.SplitID('@', ote.Sender) if err != nil { log.WithError(err).WithField("user_id", ote.Sender).Error("Failed to extract domain from send-to-device sender") - _ = msg.Nak() + _ = msg.Ack() return } if originServerName != t.ServerName { log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere") - _ = msg.Nak() + _ = msg.Ack() return } _, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID) if err != nil { log.WithError(err).WithField("user_id", ote.UserID).Error("Failed to extract domain from send-to-device destination") - _ = msg.Nak() + _ = msg.Ack() return } @@ -122,7 +122,7 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) { } if edu.Content, err = json.Marshal(tdm); err != nil { log.WithError(err).Error("failed to marshal EDU JSON") - _ = msg.Nak() + _ = msg.Ack() return } @@ -142,7 +142,7 @@ func (t *OutputEDUConsumer) onTypingEvent(msg *nats.Msg) { if err := json.Unmarshal(msg.Data, &ote); err != nil { // Skip this msg but continue processing messages. log.WithError(err).Errorf("eduserver output log: message parse failed (expected typing)") - _ = msg.Nak() + _ = msg.Ack() return } @@ -150,7 +150,7 @@ func (t *OutputEDUConsumer) onTypingEvent(msg *nats.Msg) { _, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID) if err != nil { log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender") - _ = msg.Nak() + _ = msg.Ack() return } if typingServerName != t.ServerName { @@ -175,7 +175,7 @@ func (t *OutputEDUConsumer) onTypingEvent(msg *nats.Msg) { "typing": ote.Event.Typing, }); err != nil { log.WithError(err).Error("failed to marshal EDU JSON") - _ = msg.Nak() + _ = msg.Ack() return } @@ -194,7 +194,7 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) { if err := json.Unmarshal(msg.Data, &receipt); err != nil { // Skip this msg but continue processing messages. log.WithError(err).Errorf("eduserver output log: message parse failed (expected receipt)") - _ = msg.Nak() + _ = msg.Ack() return } @@ -202,11 +202,11 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) { _, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID) if err != nil { log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender") - _ = msg.Nak() + _ = msg.Ack() return } if receiptServerName != t.ServerName { - _ = msg.Nak() + _ = msg.Ack() return // don't log, very spammy as it logs for each remote receipt } @@ -239,7 +239,7 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) { } if edu.Content, err = json.Marshal(content); err != nil { log.WithError(err).Error("failed to marshal EDU JSON") - _ = msg.Nak() + _ = msg.Ack() return } diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 61db7e84c..af509d084 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -76,7 +76,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { if err := json.Unmarshal(msg.Data, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") - _ = msg.Nak() + _ = msg.Ack() return } @@ -97,7 +97,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { log.WithField("error", output.Type).Info( err.Error(), ) - _ = msg.Nak() + _ = msg.Ack() default: // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index b5a45470c..05803a40d 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -60,18 +60,18 @@ func (r *Inputer) Start() error { defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() var inputRoomEvent api.InputRoomEvent if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { - _ = msg.Nak() + _ = msg.Ack() return } inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) inbox.(*phony.Inbox).Act(nil, func() { if _, err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { sentry.CaptureException(err) - _ = msg.Nak() + } else { hooks.Run(hooks.KindNewEventPersisted, inputRoomEvent.Event) - _ = msg.Ack() } + _ = msg.Ack() }) }, nats.ManualAck(), diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index 9f80f0600..06756fb56 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -74,7 +74,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *nats.Msg) { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("client API server output log: message parse failure") sentry.CaptureException(err) - _ = msg.Nak() + _ = msg.Ack() return } diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 7b3e984af..99189a785 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -70,7 +70,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *nats.Msg) { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("EDU server output log: message parse failure") sentry.CaptureException(err) - _ = msg.Nak() + _ = msg.Ack() return } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index d8c43a856..9ffd572e6 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -74,18 +74,18 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *nats.Msg) { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("EDU server output log: message parse failure") sentry.CaptureException(err) - _ = msg.Nak() + _ = msg.Ack() return } _, domain, err := gomatrixserverlib.SplitID('@', output.UserID) if err != nil { sentry.CaptureException(err) - _ = msg.Nak() + _ = msg.Ack() return } if domain != s.serverName { - _ = msg.Nak() + _ = msg.Ack() return } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 47c5da685..b69293c6d 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -71,7 +71,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *nats.Msg) { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("EDU server output log: message parse failure") sentry.CaptureException(err) - _ = msg.Nak() + _ = msg.Ack() return } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index c6e1c23a7..e03d874b7 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -83,7 +83,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { if err = json.Unmarshal(msg.Data, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") - _ = msg.Nak() + _ = msg.Ack() return } @@ -96,6 +96,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { // in the special case where the event redacts itself, just pass the message through because // we will never see the other part of the pair if event.Redacts() != event.EventID() { + _ = msg.Ack() return } } @@ -116,7 +117,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", ) - _ = msg.Nak() + _ = msg.Ack() } if err != nil { log.WithError(err).Error("roomserver output log: failed to process event")