Ack tweaks

This commit is contained in:
Neil Alexander 2021-11-03 15:57:36 +00:00
parent eb07c2d5d7
commit 134ec18614
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
9 changed files with 28 additions and 28 deletions

View file

@ -74,12 +74,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
if err := json.Unmarshal(msg.Data, &output); err != nil { if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure") log.WithError(err).Errorf("roomserver output log: message parse failure")
_ = msg.Nak() _ = msg.Ack()
return return
} }
if output.Type != api.OutputTypeNewRoomEvent { if output.Type != api.OutputTypeNewRoomEvent {
_ = msg.Nak() _ = msg.Ack()
return return
} }
@ -89,7 +89,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
// Send event to any relevant application services // Send event to any relevant application services
if err := s.filterRoomserverEvents(context.TODO(), events); err != nil { if err := s.filterRoomserverEvents(context.TODO(), events); err != nil {
log.WithError(err).Errorf("roomserver output log: filter error") log.WithError(err).Errorf("roomserver output log: filter error")
_ = msg.Nak()
return return
} }

View file

@ -81,7 +81,7 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) {
var ote api.OutputSendToDeviceEvent var ote api.OutputSendToDeviceEvent
if err := json.Unmarshal(msg.Data, &ote); err != nil { if err := json.Unmarshal(msg.Data, &ote); err != nil {
log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)") log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)")
_ = msg.Nak() _ = msg.Ack()
return return
} }
@ -89,19 +89,19 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) {
_, originServerName, err := gomatrixserverlib.SplitID('@', ote.Sender) _, originServerName, err := gomatrixserverlib.SplitID('@', ote.Sender)
if err != nil { if err != nil {
log.WithError(err).WithField("user_id", ote.Sender).Error("Failed to extract domain from send-to-device sender") log.WithError(err).WithField("user_id", ote.Sender).Error("Failed to extract domain from send-to-device sender")
_ = msg.Nak() _ = msg.Ack()
return return
} }
if originServerName != t.ServerName { if originServerName != t.ServerName {
log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere") log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere")
_ = msg.Nak() _ = msg.Ack()
return return
} }
_, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID) _, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID)
if err != nil { if err != nil {
log.WithError(err).WithField("user_id", ote.UserID).Error("Failed to extract domain from send-to-device destination") log.WithError(err).WithField("user_id", ote.UserID).Error("Failed to extract domain from send-to-device destination")
_ = msg.Nak() _ = msg.Ack()
return return
} }
@ -122,7 +122,7 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) {
} }
if edu.Content, err = json.Marshal(tdm); err != nil { if edu.Content, err = json.Marshal(tdm); err != nil {
log.WithError(err).Error("failed to marshal EDU JSON") log.WithError(err).Error("failed to marshal EDU JSON")
_ = msg.Nak() _ = msg.Ack()
return return
} }
@ -142,7 +142,7 @@ func (t *OutputEDUConsumer) onTypingEvent(msg *nats.Msg) {
if err := json.Unmarshal(msg.Data, &ote); err != nil { if err := json.Unmarshal(msg.Data, &ote); err != nil {
// Skip this msg but continue processing messages. // Skip this msg but continue processing messages.
log.WithError(err).Errorf("eduserver output log: message parse failed (expected typing)") log.WithError(err).Errorf("eduserver output log: message parse failed (expected typing)")
_ = msg.Nak() _ = msg.Ack()
return return
} }
@ -150,7 +150,7 @@ func (t *OutputEDUConsumer) onTypingEvent(msg *nats.Msg) {
_, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID) _, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID)
if err != nil { if err != nil {
log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender") log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender")
_ = msg.Nak() _ = msg.Ack()
return return
} }
if typingServerName != t.ServerName { if typingServerName != t.ServerName {
@ -175,7 +175,7 @@ func (t *OutputEDUConsumer) onTypingEvent(msg *nats.Msg) {
"typing": ote.Event.Typing, "typing": ote.Event.Typing,
}); err != nil { }); err != nil {
log.WithError(err).Error("failed to marshal EDU JSON") log.WithError(err).Error("failed to marshal EDU JSON")
_ = msg.Nak() _ = msg.Ack()
return return
} }
@ -194,7 +194,7 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) {
if err := json.Unmarshal(msg.Data, &receipt); err != nil { if err := json.Unmarshal(msg.Data, &receipt); err != nil {
// Skip this msg but continue processing messages. // Skip this msg but continue processing messages.
log.WithError(err).Errorf("eduserver output log: message parse failed (expected receipt)") log.WithError(err).Errorf("eduserver output log: message parse failed (expected receipt)")
_ = msg.Nak() _ = msg.Ack()
return return
} }
@ -202,11 +202,11 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) {
_, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID) _, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID)
if err != nil { if err != nil {
log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender") log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender")
_ = msg.Nak() _ = msg.Ack()
return return
} }
if receiptServerName != t.ServerName { if receiptServerName != t.ServerName {
_ = msg.Nak() _ = msg.Ack()
return // don't log, very spammy as it logs for each remote receipt return // don't log, very spammy as it logs for each remote receipt
} }
@ -239,7 +239,7 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) {
} }
if edu.Content, err = json.Marshal(content); err != nil { if edu.Content, err = json.Marshal(content); err != nil {
log.WithError(err).Error("failed to marshal EDU JSON") log.WithError(err).Error("failed to marshal EDU JSON")
_ = msg.Nak() _ = msg.Ack()
return return
} }

View file

@ -76,7 +76,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
if err := json.Unmarshal(msg.Data, &output); err != nil { if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure") log.WithError(err).Errorf("roomserver output log: message parse failure")
_ = msg.Nak() _ = msg.Ack()
return return
} }
@ -97,7 +97,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
log.WithField("error", output.Type).Info( log.WithField("error", output.Type).Info(
err.Error(), err.Error(),
) )
_ = msg.Nak() _ = msg.Ack()
default: default:
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database
log.WithFields(log.Fields{ log.WithFields(log.Fields{

View file

@ -60,18 +60,18 @@ func (r *Inputer) Start() error {
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
var inputRoomEvent api.InputRoomEvent var inputRoomEvent api.InputRoomEvent
if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
_ = msg.Nak() _ = msg.Ack()
return return
} }
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
inbox.(*phony.Inbox).Act(nil, func() { inbox.(*phony.Inbox).Act(nil, func() {
if _, err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { if _, err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil {
sentry.CaptureException(err) sentry.CaptureException(err)
_ = msg.Nak()
} else { } else {
hooks.Run(hooks.KindNewEventPersisted, inputRoomEvent.Event) hooks.Run(hooks.KindNewEventPersisted, inputRoomEvent.Event)
_ = msg.Ack()
} }
_ = msg.Ack()
}) })
}, },
nats.ManualAck(), nats.ManualAck(),

View file

@ -74,7 +74,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *nats.Msg) {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("client API server output log: message parse failure") log.WithError(err).Errorf("client API server output log: message parse failure")
sentry.CaptureException(err) sentry.CaptureException(err)
_ = msg.Nak() _ = msg.Ack()
return return
} }

View file

@ -70,7 +70,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *nats.Msg) {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure") log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err) sentry.CaptureException(err)
_ = msg.Nak() _ = msg.Ack()
return return
} }

View file

@ -74,18 +74,18 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *nats.Msg) {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure") log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err) sentry.CaptureException(err)
_ = msg.Nak() _ = msg.Ack()
return return
} }
_, domain, err := gomatrixserverlib.SplitID('@', output.UserID) _, domain, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil { if err != nil {
sentry.CaptureException(err) sentry.CaptureException(err)
_ = msg.Nak() _ = msg.Ack()
return return
} }
if domain != s.serverName { if domain != s.serverName {
_ = msg.Nak() _ = msg.Ack()
return return
} }

View file

@ -71,7 +71,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *nats.Msg) {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure") log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err) sentry.CaptureException(err)
_ = msg.Nak() _ = msg.Ack()
return return
} }

View file

@ -83,7 +83,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
if err = json.Unmarshal(msg.Data, &output); err != nil { if err = json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure") log.WithError(err).Errorf("roomserver output log: message parse failure")
_ = msg.Nak() _ = msg.Ack()
return return
} }
@ -96,6 +96,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
// in the special case where the event redacts itself, just pass the message through because // in the special case where the event redacts itself, just pass the message through because
// we will never see the other part of the pair // we will never see the other part of the pair
if event.Redacts() != event.EventID() { if event.Redacts() != event.EventID() {
_ = msg.Ack()
return return
} }
} }
@ -116,7 +117,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
log.WithField("type", output.Type).Debug( log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type", "roomserver output log: ignoring unknown output type",
) )
_ = msg.Nak() _ = msg.Ack()
} }
if err != nil { if err != nil {
log.WithError(err).Error("roomserver output log: failed to process event") log.WithError(err).Error("roomserver output log: failed to process event")