mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-03-04 01:13:10 -06:00
Merge branch 'main' into neilalexander/asrefactor
This commit is contained in:
commit
93df9fd254
|
|
@ -72,9 +72,9 @@ func (s *OutputRoomEventConsumer) Start() error {
|
||||||
appsvc, token := as, jetstream.Tokenise(as.ID)
|
appsvc, token := as, jetstream.Tokenise(as.ID)
|
||||||
if err := jetstream.JetStreamConsumer(
|
if err := jetstream.JetStreamConsumer(
|
||||||
s.ctx, s.jetstream, s.topic,
|
s.ctx, s.jetstream, s.topic,
|
||||||
s.cfg.Matrix.JetStream.Durable("Appservice_"+token),
|
s.cfg.Matrix.JetStream.Durable("Appservice_"+token), 10,
|
||||||
func(ctx context.Context, msg *nats.Msg) bool {
|
func(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
return s.onMessage(ctx, &appsvc, msg)
|
return s.onMessage(ctx, &appsvc, msgs)
|
||||||
},
|
},
|
||||||
nats.DeliverAll(), nats.ManualAck(),
|
nats.DeliverAll(), nats.ManualAck(),
|
||||||
); err != nil {
|
); err != nil {
|
||||||
|
|
@ -86,54 +86,58 @@ func (s *OutputRoomEventConsumer) Start() error {
|
||||||
|
|
||||||
// onMessage is called when the appservice component receives a new event from
|
// onMessage is called when the appservice component receives a new event from
|
||||||
// the room server output log.
|
// the room server output log.
|
||||||
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, as *config.ApplicationService, msg *nats.Msg) bool {
|
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, as *config.ApplicationService, msgs []*nats.Msg) bool {
|
||||||
// Parse out the event JSON
|
log.WithField("appservice", as.ID).Debugf("Appservice worker received %d message(s) from roomserver", len(msgs))
|
||||||
var output api.OutputEvent
|
events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(msgs))
|
||||||
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
for _, msg := range msgs {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// Parse out the event JSON
|
||||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
var output api.OutputEvent
|
||||||
return true
|
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
||||||
}
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
|
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||||
log.WithFields(log.Fields{
|
continue
|
||||||
"type": output.Type,
|
}
|
||||||
}).Debug("Got a message in OutputRoomEventConsumer")
|
switch output.Type {
|
||||||
|
case api.OutputTypeNewRoomEvent:
|
||||||
events := []*gomatrixserverlib.HeaderedEvent{}
|
if output.NewRoomEvent == nil {
|
||||||
if output.Type == api.OutputTypeNewRoomEvent && output.NewRoomEvent != nil {
|
continue
|
||||||
newEventID := output.NewRoomEvent.Event.EventID()
|
}
|
||||||
events = append(events, output.NewRoomEvent.Event)
|
events = append(events, output.NewRoomEvent.Event)
|
||||||
if len(output.NewRoomEvent.AddsStateEventIDs) > 0 {
|
if len(output.NewRoomEvent.AddsStateEventIDs) > 0 {
|
||||||
eventsReq := &api.QueryEventsByIDRequest{
|
newEventID := output.NewRoomEvent.Event.EventID()
|
||||||
EventIDs: make([]string, 0, len(output.NewRoomEvent.AddsStateEventIDs)),
|
eventsReq := &api.QueryEventsByIDRequest{
|
||||||
}
|
EventIDs: make([]string, 0, len(output.NewRoomEvent.AddsStateEventIDs)),
|
||||||
eventsRes := &api.QueryEventsByIDResponse{}
|
}
|
||||||
for _, eventID := range output.NewRoomEvent.AddsStateEventIDs {
|
eventsRes := &api.QueryEventsByIDResponse{}
|
||||||
if eventID != newEventID {
|
for _, eventID := range output.NewRoomEvent.AddsStateEventIDs {
|
||||||
eventsReq.EventIDs = append(eventsReq.EventIDs, eventID)
|
if eventID != newEventID {
|
||||||
}
|
eventsReq.EventIDs = append(eventsReq.EventIDs, eventID)
|
||||||
}
|
}
|
||||||
if len(eventsReq.EventIDs) > 0 {
|
}
|
||||||
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
|
if len(eventsReq.EventIDs) > 0 {
|
||||||
log.WithError(err).Errorf("s.rsAPI.QueryEventsByID failed")
|
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
|
||||||
return false
|
log.WithError(err).Errorf("s.rsAPI.QueryEventsByID failed")
|
||||||
}
|
return false
|
||||||
events = append(events, eventsRes.Events...)
|
}
|
||||||
}
|
events = append(events, eventsRes.Events...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
case api.OutputTypeNewInviteEvent:
|
||||||
|
if output.NewInviteEvent == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
events = append(events, output.NewInviteEvent.Event)
|
||||||
|
|
||||||
|
default:
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
} else if output.Type == api.OutputTypeNewInviteEvent && output.NewInviteEvent != nil {
|
|
||||||
events = append(events, output.NewInviteEvent.Event)
|
|
||||||
} else {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"type": output.Type,
|
|
||||||
}).Debug("appservice OutputRoomEventConsumer ignoring event", string(msg.Data))
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send event to any relevant application services
|
// Send event to any relevant application services
|
||||||
if err := s.filterRoomserverEvents(ctx, events); err != nil {
|
if err := s.filterRoomserverEvents(ctx, events); err != nil {
|
||||||
log.WithError(err).Errorf("roomserver output log: filter error")
|
log.WithError(err).Errorf("roomserver output log: filter error")
|
||||||
return true
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
return true
|
||||||
|
|
|
||||||
|
|
@ -67,14 +67,15 @@ func NewKeyChangeConsumer(
|
||||||
// Start consuming from key servers
|
// Start consuming from key servers
|
||||||
func (t *KeyChangeConsumer) Start() error {
|
func (t *KeyChangeConsumer) Start() error {
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
|
t.ctx, t.jetstream, t.topic, t.durable, 1,
|
||||||
nats.DeliverAll(), nats.ManualAck(),
|
t.onMessage, nats.DeliverAll(), nats.ManualAck(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// onMessage is called in response to a message received on the
|
// onMessage is called in response to a message received on the
|
||||||
// key change events topic from the key server.
|
// key change events topic from the key server.
|
||||||
func (t *KeyChangeConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
func (t *KeyChangeConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
var m api.DeviceMessage
|
var m api.DeviceMessage
|
||||||
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
||||||
logrus.WithError(err).Errorf("failed to read device message from key change topic")
|
logrus.WithError(err).Errorf("failed to read device message from key change topic")
|
||||||
|
|
|
||||||
|
|
@ -69,14 +69,15 @@ func (t *OutputPresenceConsumer) Start() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
|
t.ctx, t.jetstream, t.topic, t.durable, 1, t.onMessage,
|
||||||
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
|
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// onMessage is called in response to a message received on the presence
|
// onMessage is called in response to a message received on the presence
|
||||||
// events topic from the client api.
|
// events topic from the client api.
|
||||||
func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
// only send presence events which originated from us
|
// only send presence events which originated from us
|
||||||
userID := msg.Header.Get(jetstream.UserID)
|
userID := msg.Header.Get(jetstream.UserID)
|
||||||
_, serverName, err := gomatrixserverlib.SplitID('@', userID)
|
_, serverName, err := gomatrixserverlib.SplitID('@', userID)
|
||||||
|
|
|
||||||
|
|
@ -65,14 +65,15 @@ func NewOutputReceiptConsumer(
|
||||||
// Start consuming from the clientapi
|
// Start consuming from the clientapi
|
||||||
func (t *OutputReceiptConsumer) Start() error {
|
func (t *OutputReceiptConsumer) Start() error {
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
|
t.ctx, t.jetstream, t.topic, t.durable, 1, t.onMessage,
|
||||||
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
|
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// onMessage is called in response to a message received on the receipt
|
// onMessage is called in response to a message received on the receipt
|
||||||
// events topic from the client api.
|
// events topic from the client api.
|
||||||
func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
receipt := syncTypes.OutputReceiptEvent{
|
receipt := syncTypes.OutputReceiptEvent{
|
||||||
UserID: msg.Header.Get(jetstream.UserID),
|
UserID: msg.Header.Get(jetstream.UserID),
|
||||||
RoomID: msg.Header.Get(jetstream.RoomID),
|
RoomID: msg.Header.Get(jetstream.RoomID),
|
||||||
|
|
|
||||||
|
|
@ -68,8 +68,8 @@ func NewOutputRoomEventConsumer(
|
||||||
// Start consuming from room servers
|
// Start consuming from room servers
|
||||||
func (s *OutputRoomEventConsumer) Start() error {
|
func (s *OutputRoomEventConsumer) Start() error {
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
|
s.ctx, s.jetstream, s.topic, s.durable, 1,
|
||||||
nats.DeliverAll(), nats.ManualAck(),
|
s.onMessage, nats.DeliverAll(), nats.ManualAck(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -77,7 +77,8 @@ func (s *OutputRoomEventConsumer) Start() error {
|
||||||
// It is unsafe to call this with messages for the same room in multiple gorountines
|
// It is unsafe to call this with messages for the same room in multiple gorountines
|
||||||
// because updates it will likely fail with a types.EventIDMismatchError when it
|
// because updates it will likely fail with a types.EventIDMismatchError when it
|
||||||
// realises that it cannot update the room state using the deltas.
|
// realises that it cannot update the room state using the deltas.
|
||||||
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
// Parse out the event JSON
|
// Parse out the event JSON
|
||||||
var output api.OutputEvent
|
var output api.OutputEvent
|
||||||
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -63,14 +63,15 @@ func NewOutputSendToDeviceConsumer(
|
||||||
// Start consuming from the client api
|
// Start consuming from the client api
|
||||||
func (t *OutputSendToDeviceConsumer) Start() error {
|
func (t *OutputSendToDeviceConsumer) Start() error {
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
|
t.ctx, t.jetstream, t.topic, t.durable, 1,
|
||||||
nats.DeliverAll(), nats.ManualAck(),
|
t.onMessage, nats.DeliverAll(), nats.ManualAck(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// onMessage is called in response to a message received on the
|
// onMessage is called in response to a message received on the
|
||||||
// send-to-device events topic from the client api.
|
// send-to-device events topic from the client api.
|
||||||
func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
// only send send-to-device events which originated from us
|
// only send send-to-device events which originated from us
|
||||||
sender := msg.Header.Get("sender")
|
sender := msg.Header.Get("sender")
|
||||||
_, originServerName, err := gomatrixserverlib.SplitID('@', sender)
|
_, originServerName, err := gomatrixserverlib.SplitID('@', sender)
|
||||||
|
|
|
||||||
|
|
@ -62,14 +62,15 @@ func NewOutputTypingConsumer(
|
||||||
// Start consuming from the clientapi
|
// Start consuming from the clientapi
|
||||||
func (t *OutputTypingConsumer) Start() error {
|
func (t *OutputTypingConsumer) Start() error {
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
|
t.ctx, t.jetstream, t.topic, t.durable, 1, t.onMessage,
|
||||||
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
|
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// onMessage is called in response to a message received on the typing
|
// onMessage is called in response to a message received on the typing
|
||||||
// events topic from the client api.
|
// events topic from the client api.
|
||||||
func (t *OutputTypingConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
func (t *OutputTypingConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
// Extract the typing event from msg.
|
// Extract the typing event from msg.
|
||||||
roomID := msg.Header.Get(jetstream.RoomID)
|
roomID := msg.Header.Get(jetstream.RoomID)
|
||||||
userID := msg.Header.Get(jetstream.UserID)
|
userID := msg.Header.Get(jetstream.UserID)
|
||||||
|
|
|
||||||
|
|
@ -55,14 +55,15 @@ func NewDeviceListUpdateConsumer(
|
||||||
// Start consuming from key servers
|
// Start consuming from key servers
|
||||||
func (t *DeviceListUpdateConsumer) Start() error {
|
func (t *DeviceListUpdateConsumer) Start() error {
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
|
t.ctx, t.jetstream, t.topic, t.durable, 1,
|
||||||
nats.DeliverAll(), nats.ManualAck(),
|
t.onMessage, nats.DeliverAll(), nats.ManualAck(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// onMessage is called in response to a message received on the
|
// onMessage is called in response to a message received on the
|
||||||
// key change events topic from the key server.
|
// key change events topic from the key server.
|
||||||
func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
var m gomatrixserverlib.DeviceListUpdateEvent
|
var m gomatrixserverlib.DeviceListUpdateEvent
|
||||||
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to read from device list update input topic")
|
logrus.WithError(err).Errorf("Failed to read from device list update input topic")
|
||||||
|
|
|
||||||
|
|
@ -9,9 +9,16 @@ import (
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// JetStreamConsumer starts a durable consumer on the given subject with the
|
||||||
|
// given durable name. The function will be called when one or more messages
|
||||||
|
// is available, up to the maximum batch size specified. If the batch is set to
|
||||||
|
// 1 then messages will be delivered one at a time. If the function is called,
|
||||||
|
// the messages array is guaranteed to be at least 1 in size. Any provided NATS
|
||||||
|
// options will be passed through to the pull subscriber creation. The consumer
|
||||||
|
// will continue to run until the context expires, at which point it will stop.
|
||||||
func JetStreamConsumer(
|
func JetStreamConsumer(
|
||||||
ctx context.Context, js nats.JetStreamContext, subj, durable string,
|
ctx context.Context, js nats.JetStreamContext, subj, durable string, batch int,
|
||||||
f func(ctx context.Context, msg *nats.Msg) bool,
|
f func(ctx context.Context, msgs []*nats.Msg) bool,
|
||||||
opts ...nats.SubOpt,
|
opts ...nats.SubOpt,
|
||||||
) error {
|
) error {
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
@ -27,6 +34,14 @@ func JetStreamConsumer(
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// If the batch size is greater than 1, we will want to acknowledge all
|
||||||
|
// received messages in the batch. Below we will send an acknowledgement
|
||||||
|
// for the most recent message in the batch and AckAll will ensure that
|
||||||
|
// all messages that came before it are also acknowledged implicitly.
|
||||||
|
if batch > 1 {
|
||||||
|
opts = append(opts, nats.AckAll())
|
||||||
|
}
|
||||||
|
|
||||||
name := durable + "Pull"
|
name := durable + "Pull"
|
||||||
sub, err := js.PullSubscribe(subj, name, opts...)
|
sub, err := js.PullSubscribe(subj, name, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -50,7 +65,7 @@ func JetStreamConsumer(
|
||||||
// enforce its own deadline (roughly 5 seconds by default). Therefore
|
// enforce its own deadline (roughly 5 seconds by default). Therefore
|
||||||
// it is our responsibility to check whether our context expired or
|
// it is our responsibility to check whether our context expired or
|
||||||
// not when a context error is returned. Footguns. Footguns everywhere.
|
// not when a context error is returned. Footguns. Footguns everywhere.
|
||||||
msgs, err := sub.Fetch(1, nats.Context(ctx))
|
msgs, err := sub.Fetch(batch, nats.Context(ctx))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == context.Canceled || err == context.DeadlineExceeded {
|
if err == context.Canceled || err == context.DeadlineExceeded {
|
||||||
// Work out whether it was the JetStream context that expired
|
// Work out whether it was the JetStream context that expired
|
||||||
|
|
@ -74,13 +89,13 @@ func JetStreamConsumer(
|
||||||
if len(msgs) < 1 {
|
if len(msgs) < 1 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
msg := msgs[0]
|
msg := msgs[len(msgs)-1] // most recent message, in case of AckAll
|
||||||
if err = msg.InProgress(nats.Context(ctx)); err != nil {
|
if err = msg.InProgress(nats.Context(ctx)); err != nil {
|
||||||
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
|
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if f(ctx, msg) {
|
if f(ctx, msgs) {
|
||||||
if err = msg.AckSync(nats.Context(ctx)); err != nil {
|
if err = msg.AckSync(nats.Context(ctx)); err != nil {
|
||||||
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err))
|
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err))
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
|
|
|
||||||
|
|
@ -75,15 +75,16 @@ func NewOutputClientDataConsumer(
|
||||||
// Start consuming from room servers
|
// Start consuming from room servers
|
||||||
func (s *OutputClientDataConsumer) Start() error {
|
func (s *OutputClientDataConsumer) Start() error {
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
|
s.ctx, s.jetstream, s.topic, s.durable, 1,
|
||||||
nats.DeliverAll(), nats.ManualAck(),
|
s.onMessage, nats.DeliverAll(), nats.ManualAck(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// onMessage is called when the sync server receives a new event from the client API server output log.
|
// onMessage is called when the sync server receives a new event from the client API server output log.
|
||||||
// It is not safe for this function to be called from multiple goroutines, or else the
|
// It is not safe for this function to be called from multiple goroutines, or else the
|
||||||
// sync stream position may race and be incorrectly calculated.
|
// sync stream position may race and be incorrectly calculated.
|
||||||
func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
// Parse out the event JSON
|
// Parse out the event JSON
|
||||||
userID := msg.Header.Get(jetstream.UserID)
|
userID := msg.Header.Get(jetstream.UserID)
|
||||||
var output eventutil.AccountData
|
var output eventutil.AccountData
|
||||||
|
|
|
||||||
|
|
@ -75,12 +75,13 @@ func NewOutputKeyChangeEventConsumer(
|
||||||
// Start consuming from the key server
|
// Start consuming from the key server
|
||||||
func (s *OutputKeyChangeEventConsumer) Start() error {
|
func (s *OutputKeyChangeEventConsumer) Start() error {
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
|
s.ctx, s.jetstream, s.topic, s.durable, 1,
|
||||||
nats.DeliverAll(), nats.ManualAck(),
|
s.onMessage, nats.DeliverAll(), nats.ManualAck(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputKeyChangeEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
func (s *OutputKeyChangeEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
var m api.DeviceMessage
|
var m api.DeviceMessage
|
||||||
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
||||||
logrus.WithError(err).Errorf("failed to read device message from key change topic")
|
logrus.WithError(err).Errorf("failed to read device message from key change topic")
|
||||||
|
|
|
||||||
|
|
@ -128,12 +128,13 @@ func (s *PresenceConsumer) Start() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
s.ctx, s.jetstream, s.presenceTopic, s.durable, s.onMessage,
|
s.ctx, s.jetstream, s.presenceTopic, s.durable, 1, s.onMessage,
|
||||||
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
|
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
func (s *PresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
userID := msg.Header.Get(jetstream.UserID)
|
userID := msg.Header.Get(jetstream.UserID)
|
||||||
presence := msg.Header.Get("presence")
|
presence := msg.Header.Get("presence")
|
||||||
timestamp := msg.Header.Get("last_active_ts")
|
timestamp := msg.Header.Get("last_active_ts")
|
||||||
|
|
|
||||||
|
|
@ -74,12 +74,13 @@ func NewOutputReceiptEventConsumer(
|
||||||
// Start consuming receipts events.
|
// Start consuming receipts events.
|
||||||
func (s *OutputReceiptEventConsumer) Start() error {
|
func (s *OutputReceiptEventConsumer) Start() error {
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
|
s.ctx, s.jetstream, s.topic, s.durable, 1,
|
||||||
nats.DeliverAll(), nats.ManualAck(),
|
s.onMessage, nats.DeliverAll(), nats.ManualAck(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
output := types.OutputReceiptEvent{
|
output := types.OutputReceiptEvent{
|
||||||
UserID: msg.Header.Get(jetstream.UserID),
|
UserID: msg.Header.Get(jetstream.UserID),
|
||||||
RoomID: msg.Header.Get(jetstream.RoomID),
|
RoomID: msg.Header.Get(jetstream.RoomID),
|
||||||
|
|
|
||||||
|
|
@ -79,15 +79,16 @@ func NewOutputRoomEventConsumer(
|
||||||
// Start consuming from room servers
|
// Start consuming from room servers
|
||||||
func (s *OutputRoomEventConsumer) Start() error {
|
func (s *OutputRoomEventConsumer) Start() error {
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
|
s.ctx, s.jetstream, s.topic, s.durable, 1,
|
||||||
nats.DeliverAll(), nats.ManualAck(),
|
s.onMessage, nats.DeliverAll(), nats.ManualAck(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// onMessage is called when the sync server receives a new event from the room server output log.
|
// onMessage is called when the sync server receives a new event from the room server output log.
|
||||||
// It is not safe for this function to be called from multiple goroutines, or else the
|
// It is not safe for this function to be called from multiple goroutines, or else the
|
||||||
// sync stream position may race and be incorrectly calculated.
|
// sync stream position may race and be incorrectly calculated.
|
||||||
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
// Parse out the event JSON
|
// Parse out the event JSON
|
||||||
var err error
|
var err error
|
||||||
var output api.OutputEvent
|
var output api.OutputEvent
|
||||||
|
|
|
||||||
|
|
@ -68,12 +68,13 @@ func NewOutputSendToDeviceEventConsumer(
|
||||||
// Start consuming send-to-device events.
|
// Start consuming send-to-device events.
|
||||||
func (s *OutputSendToDeviceEventConsumer) Start() error {
|
func (s *OutputSendToDeviceEventConsumer) Start() error {
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
|
s.ctx, s.jetstream, s.topic, s.durable, 1,
|
||||||
nats.DeliverAll(), nats.ManualAck(),
|
s.onMessage, nats.DeliverAll(), nats.ManualAck(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
userID := msg.Header.Get(jetstream.UserID)
|
userID := msg.Header.Get(jetstream.UserID)
|
||||||
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -64,12 +64,13 @@ func NewOutputTypingEventConsumer(
|
||||||
// Start consuming typing events.
|
// Start consuming typing events.
|
||||||
func (s *OutputTypingEventConsumer) Start() error {
|
func (s *OutputTypingEventConsumer) Start() error {
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
|
s.ctx, s.jetstream, s.topic, s.durable, 1,
|
||||||
nats.DeliverAll(), nats.ManualAck(),
|
s.onMessage, nats.DeliverAll(), nats.ManualAck(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputTypingEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
func (s *OutputTypingEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
roomID := msg.Header.Get(jetstream.RoomID)
|
roomID := msg.Header.Get(jetstream.RoomID)
|
||||||
userID := msg.Header.Get(jetstream.UserID)
|
userID := msg.Header.Get(jetstream.UserID)
|
||||||
typing, err := strconv.ParseBool(msg.Header.Get("typing"))
|
typing, err := strconv.ParseBool(msg.Header.Get("typing"))
|
||||||
|
|
|
||||||
|
|
@ -67,8 +67,8 @@ func NewOutputNotificationDataConsumer(
|
||||||
// Start starts consumption.
|
// Start starts consumption.
|
||||||
func (s *OutputNotificationDataConsumer) Start() error {
|
func (s *OutputNotificationDataConsumer) Start() error {
|
||||||
return jetstream.JetStreamConsumer(
|
return jetstream.JetStreamConsumer(
|
||||||
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
|
s.ctx, s.jetstream, s.topic, s.durable, 1,
|
||||||
nats.DeliverAll(), nats.ManualAck(),
|
s.onMessage, nats.DeliverAll(), nats.ManualAck(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -76,7 +76,8 @@ func (s *OutputNotificationDataConsumer) Start() error {
|
||||||
// the push server. It is not safe for this function to be called from
|
// the push server. It is not safe for this function to be called from
|
||||||
// multiple goroutines, or else the sync stream position may race and
|
// multiple goroutines, or else the sync stream position may race and
|
||||||
// be incorrectly calculated.
|
// be incorrectly calculated.
|
||||||
func (s *OutputNotificationDataConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
func (s *OutputNotificationDataConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
userID := string(msg.Header.Get(jetstream.UserID))
|
userID := string(msg.Header.Get(jetstream.UserID))
|
||||||
|
|
||||||
// Parse out the event JSON
|
// Parse out the event JSON
|
||||||
|
|
|
||||||
|
|
@ -56,15 +56,16 @@ func NewOutputReadUpdateConsumer(
|
||||||
|
|
||||||
func (s *OutputReadUpdateConsumer) Start() error {
|
func (s *OutputReadUpdateConsumer) Start() error {
|
||||||
if err := jetstream.JetStreamConsumer(
|
if err := jetstream.JetStreamConsumer(
|
||||||
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
|
s.ctx, s.jetstream, s.topic, s.durable, 1,
|
||||||
nats.DeliverAll(), nats.ManualAck(),
|
s.onMessage, nats.DeliverAll(), nats.ManualAck(),
|
||||||
); err != nil {
|
); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputReadUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
func (s *OutputReadUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
var read types.ReadUpdate
|
var read types.ReadUpdate
|
||||||
if err := json.Unmarshal(msg.Data, &read); err != nil {
|
if err := json.Unmarshal(msg.Data, &read); err != nil {
|
||||||
log.WithError(err).Error("userapi clientapi consumer: message parse failure")
|
log.WithError(err).Error("userapi clientapi consumer: message parse failure")
|
||||||
|
|
|
||||||
|
|
@ -65,15 +65,16 @@ func NewOutputStreamEventConsumer(
|
||||||
|
|
||||||
func (s *OutputStreamEventConsumer) Start() error {
|
func (s *OutputStreamEventConsumer) Start() error {
|
||||||
if err := jetstream.JetStreamConsumer(
|
if err := jetstream.JetStreamConsumer(
|
||||||
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
|
s.ctx, s.jetstream, s.topic, s.durable, 1,
|
||||||
nats.DeliverAll(), nats.ManualAck(),
|
s.onMessage, nats.DeliverAll(), nats.ManualAck(),
|
||||||
); err != nil {
|
); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputStreamEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
func (s *OutputStreamEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||||
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
var output types.StreamedEvent
|
var output types.StreamedEvent
|
||||||
output.Event = &gomatrixserverlib.HeaderedEvent{}
|
output.Event = &gomatrixserverlib.HeaderedEvent{}
|
||||||
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue