From 6925e1f2d6a106f405f097dfd03f352b54c7a8da Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Wed, 5 Oct 2022 09:54:07 +0200 Subject: [PATCH] Add a new RoomEventType to avoid unneeded unmarshalling --- appservice/consumers/roomserver.go | 5 +++++ federationapi/consumers/roomserver.go | 7 +++++++ roomserver/api/output.go | 12 ++++++++++++ roomserver/producers/roomevent.go | 13 ++++++------- setup/jetstream/streams.go | 7 ++++--- syncapi/consumers/roomserver.go | 19 ++++++++++++++++++- test/testrig/jetstream.go | 9 ++++----- userapi/consumers/roomserver.go | 7 ++++--- 8 files changed, 60 insertions(+), 19 deletions(-) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index d44f32b38..ac68f4bd4 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -101,6 +101,11 @@ func (s *OutputRoomEventConsumer) onMessage( log.WithField("appservice", state.ID).Tracef("Appservice worker received %d message(s) from roomserver", len(msgs)) events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(msgs)) for _, msg := range msgs { + // Only handle events we care about + receivedType := api.OutputType(msg.Header.Get(jetstream.RoomEventType)) + if receivedType != api.OutputTypeNewRoomEvent && receivedType != api.OutputTypeNewInviteEvent { + continue + } // Parse out the event JSON var output api.OutputEvent if err := json.Unmarshal(msg.Data, &output); err != nil { diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index 349b50b05..a42733628 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -79,6 +79,13 @@ func (s *OutputRoomEventConsumer) Start() error { // realises that it cannot update the room state using the deltas. func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool { msg := msgs[0] // Guaranteed to exist if onMessage is called + receivedType := api.OutputType(msg.Header.Get(jetstream.RoomEventType)) + + // Only handle events we care about + if receivedType != api.OutputTypeNewRoomEvent && receivedType != api.OutputTypeNewInboundPeek { + return true + } + // Parse out the event JSON var output api.OutputEvent if err := json.Unmarshal(msg.Data, &output); err != nil { diff --git a/roomserver/api/output.go b/roomserver/api/output.go index 36d0625c7..949c76cbf 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -21,6 +21,18 @@ import ( // An OutputType is a type of roomserver output. type OutputType string +// OutputTypes contains all possible output types from below +var OutputTypes = []OutputType{ + OutputTypeNewRoomEvent, + OutputTypeOldRoomEvent, + OutputTypeNewInviteEvent, + OutputTypeRetireInviteEvent, + OutputTypeRedactedEvent, + OutputTypeNewPeek, + OutputTypeNewInboundPeek, + OutputTypeRetirePeek, +} + const ( // OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent OutputTypeNewRoomEvent OutputType = "new_room_event" diff --git a/roomserver/producers/roomevent.go b/roomserver/producers/roomevent.go index 987e6c942..9c4521986 100644 --- a/roomserver/producers/roomevent.go +++ b/roomserver/producers/roomevent.go @@ -17,12 +17,13 @@ package producers import ( "encoding/json" - "github.com/matrix-org/dendrite/roomserver/acls" - "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/setup/jetstream" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" + + "github.com/matrix-org/dendrite/roomserver/acls" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/jetstream" ) var keyContentFields = map[string]string{ @@ -40,10 +41,8 @@ type RoomEventProducer struct { func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.OutputEvent) error { var err error for _, update := range updates { - msg := &nats.Msg{ - Subject: r.Topic, - Header: nats.Header{}, - } + msg := nats.NewMsg(r.Topic) + msg.Header.Set(jetstream.RoomEventType, string(update.Type)) msg.Header.Set(jetstream.RoomID, roomID) msg.Data, err = json.Marshal(update) if err != nil { diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index ee9810dae..590f0cbd9 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -9,9 +9,10 @@ import ( ) const ( - UserID = "user_id" - RoomID = "room_id" - EventID = "event_id" + UserID = "user_id" + RoomID = "room_id" + EventID = "event_id" + RoomEventType = "output_room_event_type" ) var ( diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index c7a11dbb4..7d3156fba 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -93,6 +93,23 @@ func (s *OutputRoomEventConsumer) Start() error { // sync stream position may race and be incorrectly calculated. func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool { msg := msgs[0] // Guaranteed to exist if onMessage is called + + receivedType := api.OutputType(msg.Header.Get(jetstream.RoomEventType)) + ok := false + // Only handle events we care about + for _, eventType := range api.OutputTypes { + if receivedType == eventType { + ok = true + break + } + } + if !ok { + log.WithField("type", receivedType).Debug( + "roomserver output log: ignoring unknown output type", + ) + return true + } + // Parse out the event JSON var err error var output api.OutputEvent @@ -102,7 +119,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms return true } - switch output.Type { + switch receivedType { case api.OutputTypeNewRoomEvent: // Ignore redaction events. We will add them to the database when they are // validated (when we receive OutputTypeRedactedEvent) diff --git a/test/testrig/jetstream.go b/test/testrig/jetstream.go index 74cf95062..b880eea43 100644 --- a/test/testrig/jetstream.go +++ b/test/testrig/jetstream.go @@ -4,10 +4,11 @@ import ( "encoding/json" "testing" + "github.com/nats-io/nats.go" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/nats-io/nats.go" ) func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Msg) { @@ -21,10 +22,8 @@ func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Ms func NewOutputEventMsg(t *testing.T, base *base.BaseDendrite, roomID string, update api.OutputEvent) *nats.Msg { t.Helper() - msg := &nats.Msg{ - Subject: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent), - Header: nats.Header{}, - } + msg := nats.NewMsg(base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)) + msg.Header.Set(jetstream.RoomEventType, string(update.Type)) msg.Header.Set(jetstream.RoomID, roomID) var err error msg.Data, err = json.Marshal(update) diff --git a/userapi/consumers/roomserver.go b/userapi/consumers/roomserver.go index 952de98f7..a12876946 100644 --- a/userapi/consumers/roomserver.go +++ b/userapi/consumers/roomserver.go @@ -72,15 +72,16 @@ func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool { msg := msgs[0] // Guaranteed to exist if onMessage is called + // Only handle events we care about + if rsapi.OutputType(msg.Header.Get(jetstream.RoomEventType)) != rsapi.OutputTypeNewRoomEvent { + return true + } var output rsapi.OutputEvent if err := json.Unmarshal(msg.Data, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") return true } - if output.Type != rsapi.OutputTypeNewRoomEvent { - return true - } event := output.NewRoomEvent.Event if event == nil { log.Errorf("userapi consumer: expected event")