Pull consumers

This commit is contained in:
Neil Alexander 2022-02-02 10:50:21 +00:00
parent a09d71d231
commit 9f969498ce
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
9 changed files with 476 additions and 445 deletions

View file

@ -66,37 +66,37 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.onMessage,
s.durable, nats.DeliverAll(), nats.ManualAck(),
)
}
// onMessage is called when the appservice component receives a new event from
// the room server output log.
func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return true
}
if output.Type != api.OutputTypeNewRoomEvent {
return true
}
events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event}
events = append(events, output.NewRoomEvent.AddStateEvents...)
// Send event to any relevant application services
if err := s.filterRoomserverEvents(context.TODO(), events); err != nil {
log.WithError(err).Errorf("roomserver output log: filter error")
return true
}
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return true
})
}
if output.Type != api.OutputTypeNewRoomEvent {
return true
}
events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event}
events = append(events, output.NewRoomEvent.AddStateEvents...)
// Send event to any relevant application services
if err := s.filterRoomserverEvents(context.TODO(), events); err != nil {
log.WithError(err).Errorf("roomserver output log: filter error")
return true
}
return true
}
// filterRoomserverEvents takes in events and decides whether any of them need

View file

@ -66,13 +66,22 @@ func NewOutputEDUConsumer(
// Start consuming from EDU servers
func (t *OutputEDUConsumer) Start() error {
if _, err := t.jetstream.Subscribe(t.typingTopic, t.onTypingEvent, t.durable); err != nil {
if err := jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.typingTopic, t.onTypingEvent,
t.durable, nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}
if _, err := t.jetstream.Subscribe(t.sendToDeviceTopic, t.onSendToDeviceEvent, t.durable); err != nil {
if err := jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.sendToDeviceTopic, t.onSendToDeviceEvent,
t.durable, nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}
if _, err := t.jetstream.Subscribe(t.receiptTopic, t.onReceiptEvent, t.durable); err != nil {
if err := jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.receiptTopic, t.onReceiptEvent,
t.durable, nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}
return nil
@ -80,175 +89,169 @@ func (t *OutputEDUConsumer) Start() error {
// onSendToDeviceEvent is called in response to a message received on the
// send-to-device events topic from the EDU server.
func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) {
func (t *OutputEDUConsumer) onSendToDeviceEvent(ctx context.Context, msg *nats.Msg) bool {
// Extract the send-to-device event from msg.
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
var ote api.OutputSendToDeviceEvent
if err := json.Unmarshal(msg.Data, &ote); err != nil {
log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)")
return true
}
// only send send-to-device events which originated from us
_, originServerName, err := gomatrixserverlib.SplitID('@', ote.Sender)
if err != nil {
log.WithError(err).WithField("user_id", ote.Sender).Error("Failed to extract domain from send-to-device sender")
return true
}
if originServerName != t.ServerName {
log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere")
return true
}
_, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID)
if err != nil {
log.WithError(err).WithField("user_id", ote.UserID).Error("Failed to extract domain from send-to-device destination")
return true
}
// Pack the EDU and marshal it
edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MDirectToDevice,
Origin: string(t.ServerName),
}
tdm := gomatrixserverlib.ToDeviceMessage{
Sender: ote.Sender,
Type: ote.Type,
MessageID: util.RandomString(32),
Messages: map[string]map[string]json.RawMessage{
ote.UserID: {
ote.DeviceID: ote.Content,
},
},
}
if edu.Content, err = json.Marshal(tdm); err != nil {
log.WithError(err).Error("failed to marshal EDU JSON")
return true
}
log.Infof("Sending send-to-device message into %q destination queue", destServerName)
if err := t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil {
log.WithError(err).Error("failed to send EDU")
return false
}
var ote api.OutputSendToDeviceEvent
if err := json.Unmarshal(msg.Data, &ote); err != nil {
log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)")
return true
})
}
// only send send-to-device events which originated from us
_, originServerName, err := gomatrixserverlib.SplitID('@', ote.Sender)
if err != nil {
log.WithError(err).WithField("user_id", ote.Sender).Error("Failed to extract domain from send-to-device sender")
return true
}
if originServerName != t.ServerName {
log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere")
return true
}
_, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID)
if err != nil {
log.WithError(err).WithField("user_id", ote.UserID).Error("Failed to extract domain from send-to-device destination")
return true
}
// Pack the EDU and marshal it
edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MDirectToDevice,
Origin: string(t.ServerName),
}
tdm := gomatrixserverlib.ToDeviceMessage{
Sender: ote.Sender,
Type: ote.Type,
MessageID: util.RandomString(32),
Messages: map[string]map[string]json.RawMessage{
ote.UserID: {
ote.DeviceID: ote.Content,
},
},
}
if edu.Content, err = json.Marshal(tdm); err != nil {
log.WithError(err).Error("failed to marshal EDU JSON")
return true
}
log.Infof("Sending send-to-device message into %q destination queue", destServerName)
if err := t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil {
log.WithError(err).Error("failed to send EDU")
return false
}
return true
}
// onTypingEvent is called in response to a message received on the typing
// events topic from the EDU server.
func (t *OutputEDUConsumer) onTypingEvent(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
// Extract the typing event from msg.
var ote api.OutputTypingEvent
if err := json.Unmarshal(msg.Data, &ote); err != nil {
// Skip this msg but continue processing messages.
log.WithError(err).Errorf("eduserver output log: message parse failed (expected typing)")
_ = msg.Ack()
return true
}
// only send typing events which originated from us
_, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID)
if err != nil {
log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender")
_ = msg.Ack()
return true
}
if typingServerName != t.ServerName {
return true
}
joined, err := t.db.GetJoinedHosts(t.ctx, ote.Event.RoomID)
if err != nil {
log.WithError(err).WithField("room_id", ote.Event.RoomID).Error("failed to get joined hosts for room")
return false
}
names := make([]gomatrixserverlib.ServerName, len(joined))
for i := range joined {
names[i] = joined[i].ServerName
}
edu := &gomatrixserverlib.EDU{Type: ote.Event.Type}
if edu.Content, err = json.Marshal(map[string]interface{}{
"room_id": ote.Event.RoomID,
"user_id": ote.Event.UserID,
"typing": ote.Event.Typing,
}); err != nil {
log.WithError(err).Error("failed to marshal EDU JSON")
return true
}
if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
log.WithError(err).Error("failed to send EDU")
return false
}
func (t *OutputEDUConsumer) onTypingEvent(ctx context.Context, msg *nats.Msg) bool {
// Extract the typing event from msg.
var ote api.OutputTypingEvent
if err := json.Unmarshal(msg.Data, &ote); err != nil {
// Skip this msg but continue processing messages.
log.WithError(err).Errorf("eduserver output log: message parse failed (expected typing)")
_ = msg.Ack()
return true
})
}
// only send typing events which originated from us
_, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID)
if err != nil {
log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender")
_ = msg.Ack()
return true
}
if typingServerName != t.ServerName {
return true
}
joined, err := t.db.GetJoinedHosts(ctx, ote.Event.RoomID)
if err != nil {
log.WithError(err).WithField("room_id", ote.Event.RoomID).Error("failed to get joined hosts for room")
return false
}
names := make([]gomatrixserverlib.ServerName, len(joined))
for i := range joined {
names[i] = joined[i].ServerName
}
edu := &gomatrixserverlib.EDU{Type: ote.Event.Type}
if edu.Content, err = json.Marshal(map[string]interface{}{
"room_id": ote.Event.RoomID,
"user_id": ote.Event.UserID,
"typing": ote.Event.Typing,
}); err != nil {
log.WithError(err).Error("failed to marshal EDU JSON")
return true
}
if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
log.WithError(err).Error("failed to send EDU")
return false
}
return true
}
// onReceiptEvent is called in response to a message received on the receipt
// events topic from the EDU server.
func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
// Extract the typing event from msg.
var receipt api.OutputReceiptEvent
if err := json.Unmarshal(msg.Data, &receipt); err != nil {
// Skip this msg but continue processing messages.
log.WithError(err).Errorf("eduserver output log: message parse failed (expected receipt)")
return true
}
// only send receipt events which originated from us
_, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID)
if err != nil {
log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender")
return true
}
if receiptServerName != t.ServerName {
return true
}
joined, err := t.db.GetJoinedHosts(t.ctx, receipt.RoomID)
if err != nil {
log.WithError(err).WithField("room_id", receipt.RoomID).Error("failed to get joined hosts for room")
return false
}
names := make([]gomatrixserverlib.ServerName, len(joined))
for i := range joined {
names[i] = joined[i].ServerName
}
content := map[string]api.FederationReceiptMRead{}
content[receipt.RoomID] = api.FederationReceiptMRead{
User: map[string]api.FederationReceiptData{
receipt.UserID: {
Data: api.ReceiptTS{
TS: receipt.Timestamp,
},
EventIDs: []string{receipt.EventID},
},
},
}
edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MReceipt,
Origin: string(t.ServerName),
}
if edu.Content, err = json.Marshal(content); err != nil {
log.WithError(err).Error("failed to marshal EDU JSON")
return true
}
if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
log.WithError(err).Error("failed to send EDU")
return false
}
func (t *OutputEDUConsumer) onReceiptEvent(ctx context.Context, msg *nats.Msg) bool {
// Extract the typing event from msg.
var receipt api.OutputReceiptEvent
if err := json.Unmarshal(msg.Data, &receipt); err != nil {
// Skip this msg but continue processing messages.
log.WithError(err).Errorf("eduserver output log: message parse failed (expected receipt)")
return true
})
}
// only send receipt events which originated from us
_, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID)
if err != nil {
log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender")
return true
}
if receiptServerName != t.ServerName {
return true
}
joined, err := t.db.GetJoinedHosts(ctx, receipt.RoomID)
if err != nil {
log.WithError(err).WithField("room_id", receipt.RoomID).Error("failed to get joined hosts for room")
return false
}
names := make([]gomatrixserverlib.ServerName, len(joined))
for i := range joined {
names[i] = joined[i].ServerName
}
content := map[string]api.FederationReceiptMRead{}
content[receipt.RoomID] = api.FederationReceiptMRead{
User: map[string]api.FederationReceiptData{
receipt.UserID: {
Data: api.ReceiptTS{
TS: receipt.Timestamp,
},
EventIDs: []string{receipt.EventID},
},
},
}
edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MReceipt,
Origin: string(t.ServerName),
}
if edu.Content, err = json.Marshal(content); err != nil {
log.WithError(err).Error("failed to marshal EDU JSON")
return true
}
if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
log.WithError(err).Error("failed to send EDU")
return false
}
return true
}

View file

@ -66,74 +66,70 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(
s.topic, s.onMessage, s.durable,
nats.DeliverAll(),
nats.ManualAck(),
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.onMessage,
s.durable, nats.DeliverAll(), nats.ManualAck(),
)
return err
}
// onMessage is called when the federation server receives a new event from the room server output log.
// It is unsafe to call this with messages for the same room in multiple gorountines
// because updates it will likely fail with a types.EventIDMismatchError when it
// realises that it cannot update the room state using the deltas.
func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return true
}
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return true
}
switch output.Type {
case api.OutputTypeNewRoomEvent:
ev := output.NewRoomEvent.Event
switch output.Type {
case api.OutputTypeNewRoomEvent:
ev := output.NewRoomEvent.Event
if output.NewRoomEvent.RewritesState {
if err := s.db.PurgeRoomState(s.ctx, ev.RoomID()); err != nil {
log.WithError(err).Errorf("roomserver output log: purge room state failure")
return false
}
}
if err := s.processMessage(*output.NewRoomEvent); err != nil {
switch err.(type) {
case *queue.ErrorFederationDisabled:
log.WithField("error", output.Type).Info(
err.Error(),
)
default:
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"event": string(ev.JSON()),
"add": output.NewRoomEvent.AddsStateEventIDs,
"del": output.NewRoomEvent.RemovesStateEventIDs,
log.ErrorKey: err,
}).Panicf("roomserver output log: write room event failure")
}
}
case api.OutputTypeNewInboundPeek:
if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
log.WithFields(log.Fields{
"event": output.NewInboundPeek,
log.ErrorKey: err,
}).Panicf("roomserver output log: remote peek event failure")
if output.NewRoomEvent.RewritesState {
if err := s.db.PurgeRoomState(s.ctx, ev.RoomID()); err != nil {
log.WithError(err).Errorf("roomserver output log: purge room state failure")
return false
}
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
}
return true
})
if err := s.processMessage(*output.NewRoomEvent); err != nil {
switch err.(type) {
case *queue.ErrorFederationDisabled:
log.WithField("error", output.Type).Info(
err.Error(),
)
default:
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"event": string(ev.JSON()),
"add": output.NewRoomEvent.AddsStateEventIDs,
"del": output.NewRoomEvent.RemovesStateEventIDs,
log.ErrorKey: err,
}).Panicf("roomserver output log: write room event failure")
}
}
case api.OutputTypeNewInboundPeek:
if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
log.WithFields(log.Fields{
"event": output.NewInboundPeek,
log.ErrorKey: err,
}).Panicf("roomserver output log: remote peek event failure")
return false
}
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
}
return true
}
// processInboundPeek starts tracking a new federated inbound peek (replacing the existing one if any)

View file

@ -1,12 +1,48 @@
package jetstream
import "github.com/nats-io/nats.go"
import (
"context"
"fmt"
func WithJetStreamMessage(msg *nats.Msg, f func(msg *nats.Msg) bool) {
_ = msg.InProgress()
if f(msg) {
_ = msg.Ack()
} else {
_ = msg.Nak()
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
func JetStreamConsumer(
ctx context.Context, nats nats.JetStreamContext, subj string,
f func(ctx context.Context, msg *nats.Msg) bool,
opts ...nats.SubOpt,
) error {
sub, err := nats.SubscribeSync(subj, opts...)
if err != nil {
return fmt.Errorf("nats.SubscribeSync: %w", err)
}
go func() {
handle := func(err error) {
if err == context.Canceled || err == context.DeadlineExceeded {
logrus.WithContext(ctx).WithField("subject", subj).Warn(err)
return
}
logrus.WithContext(ctx).WithField("subject", subj).Fatal(err)
}
for {
msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
handle(fmt.Errorf("sub.NextMsgWithContext: %w", err))
}
if err = msg.InProgress(); err != nil {
handle(fmt.Errorf("msg.InProgress: %w", err))
}
if f(ctx, msg) {
if err = msg.Ack(); err != nil {
handle(fmt.Errorf("msg.Ack: %w", err))
}
} else {
if err = msg.Nak(); err != nil {
handle(fmt.Errorf("msg.Nak: %w", err))
}
}
}
}()
return nil
}

View file

@ -63,45 +63,45 @@ func NewOutputClientDataConsumer(
// Start consuming from room servers
func (s *OutputClientDataConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.onMessage,
s.durable, nats.DeliverAll(), nats.ManualAck(),
)
}
// onMessage is called when the sync server receives a new event from the client API server output log.
// It is not safe for this function to be called from multiple goroutines, or else the
// sync stream position may race and be incorrectly calculated.
func (s *OutputClientDataConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
// Parse out the event JSON
userID := msg.Header.Get(jetstream.UserID)
var output eventutil.AccountData
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("client API server output log: message parse failure")
sentry.CaptureException(err)
return true
}
log.WithFields(log.Fields{
"type": output.Type,
"room_id": output.RoomID,
}).Debug("Received data from client API server")
streamPos, err := s.db.UpsertAccountData(
s.ctx, userID, output.RoomID, output.Type,
)
if err != nil {
sentry.CaptureException(err)
log.WithFields(log.Fields{
"type": output.Type,
"room_id": output.RoomID,
log.ErrorKey: err,
}).Panicf("could not save account data")
}
s.stream.Advance(streamPos)
s.notifier.OnNewAccountData(userID, types.StreamingToken{AccountDataPosition: streamPos})
func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
// Parse out the event JSON
userID := msg.Header.Get(jetstream.UserID)
var output eventutil.AccountData
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("client API server output log: message parse failure")
sentry.CaptureException(err)
return true
})
}
log.WithFields(log.Fields{
"type": output.Type,
"room_id": output.RoomID,
}).Debug("Received data from client API server")
streamPos, err := s.db.UpsertAccountData(
s.ctx, userID, output.RoomID, output.Type,
)
if err != nil {
sentry.CaptureException(err)
log.WithFields(log.Fields{
"type": output.Type,
"room_id": output.RoomID,
log.ErrorKey: err,
}).Panicf("could not save account data")
}
s.stream.Advance(streamPos)
s.notifier.OnNewAccountData(userID, types.StreamingToken{AccountDataPosition: streamPos})
return true
}

View file

@ -64,36 +64,36 @@ func NewOutputReceiptEventConsumer(
// Start consuming from EDU api
func (s *OutputReceiptEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.onMessage,
s.durable, nats.DeliverAll(), nats.ManualAck(),
)
}
func (s *OutputReceiptEventConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
var output api.OutputReceiptEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err)
return true
}
streamPos, err := s.db.StoreReceipt(
s.ctx,
output.RoomID,
output.Type,
output.UserID,
output.EventID,
output.Timestamp,
)
if err != nil {
sentry.CaptureException(err)
return true
}
s.stream.Advance(streamPos)
s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var output api.OutputReceiptEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err)
return true
})
}
streamPos, err := s.db.StoreReceipt(
s.ctx,
output.RoomID,
output.Type,
output.UserID,
output.EventID,
output.Timestamp,
)
if err != nil {
sentry.CaptureException(err)
return true
}
s.stream.Advance(streamPos)
s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
return true
}

View file

@ -68,52 +68,52 @@ func NewOutputSendToDeviceEventConsumer(
// Start consuming from EDU api
func (s *OutputSendToDeviceEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.onMessage,
s.durable, nats.DeliverAll(), nats.ManualAck(),
)
}
func (s *OutputSendToDeviceEventConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
var output api.OutputSendToDeviceEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err)
return true
}
_, domain, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil {
sentry.CaptureException(err)
return true
}
if domain != s.serverName {
return true
}
util.GetLogger(context.TODO()).WithFields(log.Fields{
"sender": output.Sender,
"user_id": output.UserID,
"device_id": output.DeviceID,
"event_type": output.Type,
}).Info("sync API received send-to-device event from EDU server")
streamPos, err := s.db.StoreNewSendForDeviceMessage(
s.ctx, output.UserID, output.DeviceID, output.SendToDeviceEvent,
)
if err != nil {
sentry.CaptureException(err)
log.WithError(err).Errorf("failed to store send-to-device message")
return false
}
s.stream.Advance(streamPos)
s.notifier.OnNewSendToDevice(
output.UserID,
[]string{output.DeviceID},
types.StreamingToken{SendToDevicePosition: streamPos},
)
func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var output api.OutputSendToDeviceEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err)
return true
})
}
_, domain, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil {
sentry.CaptureException(err)
return true
}
if domain != s.serverName {
return true
}
util.GetLogger(context.TODO()).WithFields(log.Fields{
"sender": output.Sender,
"user_id": output.UserID,
"device_id": output.DeviceID,
"event_type": output.Type,
}).Info("sync API received send-to-device event from EDU server")
streamPos, err := s.db.StoreNewSendForDeviceMessage(
s.ctx, output.UserID, output.DeviceID, output.SendToDeviceEvent,
)
if err != nil {
sentry.CaptureException(err)
log.WithError(err).Errorf("failed to store send-to-device message")
return false
}
s.stream.Advance(streamPos)
s.notifier.OnNewSendToDevice(
output.UserID,
[]string{output.DeviceID},
types.StreamingToken{SendToDevicePosition: streamPos},
)
return true
}

View file

@ -66,41 +66,41 @@ func NewOutputTypingEventConsumer(
// Start consuming from EDU api
func (s *OutputTypingEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.onMessage,
s.durable, nats.DeliverAll(), nats.ManualAck(),
)
}
func (s *OutputTypingEventConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
var output api.OutputTypingEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err)
return true
}
log.WithFields(log.Fields{
"room_id": output.Event.RoomID,
"user_id": output.Event.UserID,
"typing": output.Event.Typing,
}).Debug("received data from EDU server")
var typingPos types.StreamPosition
typingEvent := output.Event
if typingEvent.Typing {
typingPos = types.StreamPosition(
s.eduCache.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime),
)
} else {
typingPos = types.StreamPosition(
s.eduCache.RemoveUser(typingEvent.UserID, typingEvent.RoomID),
)
}
s.stream.Advance(typingPos)
s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos})
func (s *OutputTypingEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var output api.OutputTypingEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err)
return true
})
}
log.WithFields(log.Fields{
"room_id": output.Event.RoomID,
"user_id": output.Event.UserID,
"typing": output.Event.Typing,
}).Debug("received data from EDU server")
var typingPos types.StreamPosition
typingEvent := output.Event
if typingEvent.Typing {
typingPos = types.StreamPosition(
s.eduCache.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime),
)
} else {
typingPos = types.StreamPosition(
s.eduCache.RemoveUser(typingEvent.UserID, typingEvent.RoomID),
)
}
s.stream.Advance(typingPos)
s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos})
return true
}

View file

@ -73,65 +73,61 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(
s.topic, s.onMessage, s.durable,
nats.DeliverAll(),
nats.ManualAck(),
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.onMessage,
s.durable, nats.DeliverAll(), nats.ManualAck(),
)
return err
}
// onMessage is called when the sync server receives a new event from the room server output log.
// It is not safe for this function to be called from multiple goroutines, or else the
// sync stream position may race and be incorrectly calculated.
func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
// Parse out the event JSON
var err error
var output api.OutputEvent
if err = json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return true
}
switch output.Type {
case api.OutputTypeNewRoomEvent:
// Ignore redaction events. We will add them to the database when they are
// validated (when we receive OutputTypeRedactedEvent)
event := output.NewRoomEvent.Event
if event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil {
// in the special case where the event redacts itself, just pass the message through because
// we will never see the other part of the pair
if event.Redacts() != event.EventID() {
return true
}
}
err = s.onNewRoomEvent(s.ctx, *output.NewRoomEvent)
case api.OutputTypeOldRoomEvent:
err = s.onOldRoomEvent(s.ctx, *output.OldRoomEvent)
case api.OutputTypeNewInviteEvent:
s.onNewInviteEvent(s.ctx, *output.NewInviteEvent)
case api.OutputTypeRetireInviteEvent:
s.onRetireInviteEvent(s.ctx, *output.RetireInviteEvent)
case api.OutputTypeNewPeek:
s.onNewPeek(s.ctx, *output.NewPeek)
case api.OutputTypeRetirePeek:
s.onRetirePeek(s.ctx, *output.RetirePeek)
case api.OutputTypeRedactedEvent:
err = s.onRedactEvent(s.ctx, *output.RedactedEvent)
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
}
if err != nil {
log.WithError(err).Error("roomserver output log: failed to process event")
return false
}
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
// Parse out the event JSON
var err error
var output api.OutputEvent
if err = json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return true
})
}
switch output.Type {
case api.OutputTypeNewRoomEvent:
// Ignore redaction events. We will add them to the database when they are
// validated (when we receive OutputTypeRedactedEvent)
event := output.NewRoomEvent.Event
if event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil {
// in the special case where the event redacts itself, just pass the message through because
// we will never see the other part of the pair
if event.Redacts() != event.EventID() {
return true
}
}
err = s.onNewRoomEvent(s.ctx, *output.NewRoomEvent)
case api.OutputTypeOldRoomEvent:
err = s.onOldRoomEvent(s.ctx, *output.OldRoomEvent)
case api.OutputTypeNewInviteEvent:
s.onNewInviteEvent(s.ctx, *output.NewInviteEvent)
case api.OutputTypeRetireInviteEvent:
s.onRetireInviteEvent(s.ctx, *output.RetireInviteEvent)
case api.OutputTypeNewPeek:
s.onNewPeek(s.ctx, *output.NewPeek)
case api.OutputTypeRetirePeek:
s.onRetirePeek(s.ctx, *output.RetirePeek)
case api.OutputTypeRedactedEvent:
err = s.onRedactEvent(s.ctx, *output.RedactedEvent)
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
}
if err != nil {
log.WithError(err).Error("roomserver output log: failed to process event")
return false
}
return true
}
func (s *OutputRoomEventConsumer) onRedactEvent(