Message acknowledgements
This commit is contained in:
parent
fb365a1bec
commit
eb07c2d5d7
|
@ -74,10 +74,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
|
||||||
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if output.Type != api.OutputTypeNewRoomEvent {
|
if output.Type != api.OutputTypeNewRoomEvent {
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,7 +89,11 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
|
||||||
// Send event to any relevant application services
|
// Send event to any relevant application services
|
||||||
if err := s.filterRoomserverEvents(context.TODO(), events); err != nil {
|
if err := s.filterRoomserverEvents(context.TODO(), events); err != nil {
|
||||||
log.WithError(err).Errorf("roomserver output log: filter error")
|
log.WithError(err).Errorf("roomserver output log: filter error")
|
||||||
|
_ = msg.Nak()
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_ = msg.Ack()
|
||||||
}
|
}
|
||||||
|
|
||||||
// filterRoomserverEvents takes in events and decides whether any of them need
|
// filterRoomserverEvents takes in events and decides whether any of them need
|
||||||
|
|
|
@ -81,6 +81,7 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) {
|
||||||
var ote api.OutputSendToDeviceEvent
|
var ote api.OutputSendToDeviceEvent
|
||||||
if err := json.Unmarshal(msg.Data, &ote); err != nil {
|
if err := json.Unmarshal(msg.Data, &ote); err != nil {
|
||||||
log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)")
|
log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)")
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,16 +89,19 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) {
|
||||||
_, originServerName, err := gomatrixserverlib.SplitID('@', ote.Sender)
|
_, originServerName, err := gomatrixserverlib.SplitID('@', ote.Sender)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).WithField("user_id", ote.Sender).Error("Failed to extract domain from send-to-device sender")
|
log.WithError(err).WithField("user_id", ote.Sender).Error("Failed to extract domain from send-to-device sender")
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if originServerName != t.ServerName {
|
if originServerName != t.ServerName {
|
||||||
log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere")
|
log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere")
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
_, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID)
|
_, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).WithField("user_id", ote.UserID).Error("Failed to extract domain from send-to-device destination")
|
log.WithError(err).WithField("user_id", ote.UserID).Error("Failed to extract domain from send-to-device destination")
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,6 +122,7 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) {
|
||||||
}
|
}
|
||||||
if edu.Content, err = json.Marshal(tdm); err != nil {
|
if edu.Content, err = json.Marshal(tdm); err != nil {
|
||||||
log.WithError(err).Error("failed to marshal EDU JSON")
|
log.WithError(err).Error("failed to marshal EDU JSON")
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,6 +130,8 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) {
|
||||||
if err := t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil {
|
if err := t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil {
|
||||||
log.WithError(err).Error("failed to send EDU")
|
log.WithError(err).Error("failed to send EDU")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_ = msg.Ack()
|
||||||
}
|
}
|
||||||
|
|
||||||
// onTypingEvent is called in response to a message received on the typing
|
// onTypingEvent is called in response to a message received on the typing
|
||||||
|
@ -135,6 +142,7 @@ func (t *OutputEDUConsumer) onTypingEvent(msg *nats.Msg) {
|
||||||
if err := json.Unmarshal(msg.Data, &ote); err != nil {
|
if err := json.Unmarshal(msg.Data, &ote); err != nil {
|
||||||
// Skip this msg but continue processing messages.
|
// Skip this msg but continue processing messages.
|
||||||
log.WithError(err).Errorf("eduserver output log: message parse failed (expected typing)")
|
log.WithError(err).Errorf("eduserver output log: message parse failed (expected typing)")
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,6 +150,7 @@ func (t *OutputEDUConsumer) onTypingEvent(msg *nats.Msg) {
|
||||||
_, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID)
|
_, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender")
|
log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender")
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if typingServerName != t.ServerName {
|
if typingServerName != t.ServerName {
|
||||||
|
@ -166,12 +175,15 @@ func (t *OutputEDUConsumer) onTypingEvent(msg *nats.Msg) {
|
||||||
"typing": ote.Event.Typing,
|
"typing": ote.Event.Typing,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
log.WithError(err).Error("failed to marshal EDU JSON")
|
log.WithError(err).Error("failed to marshal EDU JSON")
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
|
if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
|
||||||
log.WithError(err).Error("failed to send EDU")
|
log.WithError(err).Error("failed to send EDU")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_ = msg.Ack()
|
||||||
}
|
}
|
||||||
|
|
||||||
// onReceiptEvent is called in response to a message received on the receipt
|
// onReceiptEvent is called in response to a message received on the receipt
|
||||||
|
@ -182,6 +194,7 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) {
|
||||||
if err := json.Unmarshal(msg.Data, &receipt); err != nil {
|
if err := json.Unmarshal(msg.Data, &receipt); err != nil {
|
||||||
// Skip this msg but continue processing messages.
|
// Skip this msg but continue processing messages.
|
||||||
log.WithError(err).Errorf("eduserver output log: message parse failed (expected receipt)")
|
log.WithError(err).Errorf("eduserver output log: message parse failed (expected receipt)")
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,9 +202,11 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) {
|
||||||
_, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID)
|
_, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender")
|
log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender")
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if receiptServerName != t.ServerName {
|
if receiptServerName != t.ServerName {
|
||||||
|
_ = msg.Nak()
|
||||||
return // don't log, very spammy as it logs for each remote receipt
|
return // don't log, very spammy as it logs for each remote receipt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,10 +239,13 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) {
|
||||||
}
|
}
|
||||||
if edu.Content, err = json.Marshal(content); err != nil {
|
if edu.Content, err = json.Marshal(content); err != nil {
|
||||||
log.WithError(err).Error("failed to marshal EDU JSON")
|
log.WithError(err).Error("failed to marshal EDU JSON")
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
|
if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
|
||||||
log.WithError(err).Error("failed to send EDU")
|
log.WithError(err).Error("failed to send EDU")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_ = msg.Ack()
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,6 +76,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
|
||||||
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,6 +97,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
|
||||||
log.WithField("error", output.Type).Info(
|
log.WithField("error", output.Type).Info(
|
||||||
err.Error(),
|
err.Error(),
|
||||||
)
|
)
|
||||||
|
_ = msg.Nak()
|
||||||
default:
|
default:
|
||||||
// panic rather than continue with an inconsistent database
|
// panic rather than continue with an inconsistent database
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
|
@ -108,6 +110,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_ = msg.Ack()
|
||||||
|
|
||||||
case api.OutputTypeNewInboundPeek:
|
case api.OutputTypeNewInboundPeek:
|
||||||
if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
|
if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
|
@ -116,10 +121,13 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
|
||||||
}).Panicf("roomserver output log: remote peek event failure")
|
}).Panicf("roomserver output log: remote peek event failure")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
_ = msg.Ack()
|
||||||
|
|
||||||
default:
|
default:
|
||||||
log.WithField("type", output.Type).Debug(
|
log.WithField("type", output.Type).Debug(
|
||||||
"roomserver output log: ignoring unknown output type",
|
"roomserver output log: ignoring unknown output type",
|
||||||
)
|
)
|
||||||
|
_ = msg.Ack()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,6 +74,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *nats.Msg) {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithError(err).Errorf("client API server output log: message parse failure")
|
log.WithError(err).Errorf("client API server output log: message parse failure")
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,4 +97,6 @@ func (s *OutputClientDataConsumer) onMessage(msg *nats.Msg) {
|
||||||
|
|
||||||
s.stream.Advance(streamPos)
|
s.stream.Advance(streamPos)
|
||||||
s.notifier.OnNewAccountData(userID, types.StreamingToken{AccountDataPosition: streamPos})
|
s.notifier.OnNewAccountData(userID, types.StreamingToken{AccountDataPosition: streamPos})
|
||||||
|
|
||||||
|
_ = msg.Ack()
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,6 +70,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *nats.Msg) {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithError(err).Errorf("EDU server output log: message parse failure")
|
log.WithError(err).Errorf("EDU server output log: message parse failure")
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,4 +89,6 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *nats.Msg) {
|
||||||
|
|
||||||
s.stream.Advance(streamPos)
|
s.stream.Advance(streamPos)
|
||||||
s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
|
s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
|
||||||
|
|
||||||
|
_ = msg.Ack()
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,15 +74,18 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *nats.Msg) {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithError(err).Errorf("EDU server output log: message parse failure")
|
log.WithError(err).Errorf("EDU server output log: message parse failure")
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
_, domain, err := gomatrixserverlib.SplitID('@', output.UserID)
|
_, domain, err := gomatrixserverlib.SplitID('@', output.UserID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if domain != s.serverName {
|
if domain != s.serverName {
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,4 +111,6 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *nats.Msg) {
|
||||||
[]string{output.DeviceID},
|
[]string{output.DeviceID},
|
||||||
types.StreamingToken{SendToDevicePosition: streamPos},
|
types.StreamingToken{SendToDevicePosition: streamPos},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
_ = msg.Ack()
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,6 +71,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *nats.Msg) {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithError(err).Errorf("EDU server output log: message parse failure")
|
log.WithError(err).Errorf("EDU server output log: message parse failure")
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,4 +95,6 @@ func (s *OutputTypingEventConsumer) onMessage(msg *nats.Msg) {
|
||||||
|
|
||||||
s.stream.Advance(typingPos)
|
s.stream.Advance(typingPos)
|
||||||
s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos})
|
s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos})
|
||||||
|
|
||||||
|
_ = msg.Ack()
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,6 +83,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
|
||||||
if err = json.Unmarshal(msg.Data, &output); err != nil {
|
if err = json.Unmarshal(msg.Data, &output); err != nil {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||||
|
_ = msg.Nak()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,10 +116,15 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
|
||||||
log.WithField("type", output.Type).Debug(
|
log.WithField("type", output.Type).Debug(
|
||||||
"roomserver output log: ignoring unknown output type",
|
"roomserver output log: ignoring unknown output type",
|
||||||
)
|
)
|
||||||
|
_ = msg.Nak()
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("roomserver output log: failed to process event")
|
log.WithError(err).Error("roomserver output log: failed to process event")
|
||||||
|
_ = msg.Nak()
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_ = msg.Ack()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputRoomEventConsumer) onRedactEvent(
|
func (s *OutputRoomEventConsumer) onRedactEvent(
|
||||||
|
|
Loading…
Reference in a new issue