Add a new RoomEventType to avoid unneeded unmarshalling

This commit is contained in:
Till Faelligen 2022-10-05 09:54:07 +02:00
parent 21f8881985
commit 6925e1f2d6
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
8 changed files with 60 additions and 19 deletions

View file

@ -101,6 +101,11 @@ func (s *OutputRoomEventConsumer) onMessage(
log.WithField("appservice", state.ID).Tracef("Appservice worker received %d message(s) from roomserver", len(msgs)) log.WithField("appservice", state.ID).Tracef("Appservice worker received %d message(s) from roomserver", len(msgs))
events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(msgs)) events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(msgs))
for _, msg := range msgs { for _, msg := range msgs {
// Only handle events we care about
receivedType := api.OutputType(msg.Header.Get(jetstream.RoomEventType))
if receivedType != api.OutputTypeNewRoomEvent && receivedType != api.OutputTypeNewInviteEvent {
continue
}
// Parse out the event JSON // Parse out the event JSON
var output api.OutputEvent var output api.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil { if err := json.Unmarshal(msg.Data, &output); err != nil {

View file

@ -79,6 +79,13 @@ func (s *OutputRoomEventConsumer) Start() error {
// realises that it cannot update the room state using the deltas. // realises that it cannot update the room state using the deltas.
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool { func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called msg := msgs[0] // Guaranteed to exist if onMessage is called
receivedType := api.OutputType(msg.Header.Get(jetstream.RoomEventType))
// Only handle events we care about
if receivedType != api.OutputTypeNewRoomEvent && receivedType != api.OutputTypeNewInboundPeek {
return true
}
// Parse out the event JSON // Parse out the event JSON
var output api.OutputEvent var output api.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil { if err := json.Unmarshal(msg.Data, &output); err != nil {

View file

@ -21,6 +21,18 @@ import (
// An OutputType is a type of roomserver output. // An OutputType is a type of roomserver output.
type OutputType string type OutputType string
// OutputTypes contains all possible output types from below
var OutputTypes = []OutputType{
OutputTypeNewRoomEvent,
OutputTypeOldRoomEvent,
OutputTypeNewInviteEvent,
OutputTypeRetireInviteEvent,
OutputTypeRedactedEvent,
OutputTypeNewPeek,
OutputTypeNewInboundPeek,
OutputTypeRetirePeek,
}
const ( const (
// OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent // OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent
OutputTypeNewRoomEvent OutputType = "new_room_event" OutputTypeNewRoomEvent OutputType = "new_room_event"

View file

@ -17,12 +17,13 @@ package producers
import ( import (
"encoding/json" "encoding/json"
"github.com/matrix-org/dendrite/roomserver/acls"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/roomserver/acls"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/jetstream"
) )
var keyContentFields = map[string]string{ var keyContentFields = map[string]string{
@ -40,10 +41,8 @@ type RoomEventProducer struct {
func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.OutputEvent) error { func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.OutputEvent) error {
var err error var err error
for _, update := range updates { for _, update := range updates {
msg := &nats.Msg{ msg := nats.NewMsg(r.Topic)
Subject: r.Topic, msg.Header.Set(jetstream.RoomEventType, string(update.Type))
Header: nats.Header{},
}
msg.Header.Set(jetstream.RoomID, roomID) msg.Header.Set(jetstream.RoomID, roomID)
msg.Data, err = json.Marshal(update) msg.Data, err = json.Marshal(update)
if err != nil { if err != nil {

View file

@ -9,9 +9,10 @@ import (
) )
const ( const (
UserID = "user_id" UserID = "user_id"
RoomID = "room_id" RoomID = "room_id"
EventID = "event_id" EventID = "event_id"
RoomEventType = "output_room_event_type"
) )
var ( var (

View file

@ -93,6 +93,23 @@ func (s *OutputRoomEventConsumer) Start() error {
// sync stream position may race and be incorrectly calculated. // sync stream position may race and be incorrectly calculated.
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool { func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called msg := msgs[0] // Guaranteed to exist if onMessage is called
receivedType := api.OutputType(msg.Header.Get(jetstream.RoomEventType))
ok := false
// Only handle events we care about
for _, eventType := range api.OutputTypes {
if receivedType == eventType {
ok = true
break
}
}
if !ok {
log.WithField("type", receivedType).Debug(
"roomserver output log: ignoring unknown output type",
)
return true
}
// Parse out the event JSON // Parse out the event JSON
var err error var err error
var output api.OutputEvent var output api.OutputEvent
@ -102,7 +119,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
return true return true
} }
switch output.Type { switch receivedType {
case api.OutputTypeNewRoomEvent: case api.OutputTypeNewRoomEvent:
// Ignore redaction events. We will add them to the database when they are // Ignore redaction events. We will add them to the database when they are
// validated (when we receive OutputTypeRedactedEvent) // validated (when we receive OutputTypeRedactedEvent)

View file

@ -4,10 +4,11 @@ import (
"encoding/json" "encoding/json"
"testing" "testing"
"github.com/nats-io/nats.go"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/nats-io/nats.go"
) )
func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Msg) { func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Msg) {
@ -21,10 +22,8 @@ func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Ms
func NewOutputEventMsg(t *testing.T, base *base.BaseDendrite, roomID string, update api.OutputEvent) *nats.Msg { func NewOutputEventMsg(t *testing.T, base *base.BaseDendrite, roomID string, update api.OutputEvent) *nats.Msg {
t.Helper() t.Helper()
msg := &nats.Msg{ msg := nats.NewMsg(base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent))
Subject: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent), msg.Header.Set(jetstream.RoomEventType, string(update.Type))
Header: nats.Header{},
}
msg.Header.Set(jetstream.RoomID, roomID) msg.Header.Set(jetstream.RoomID, roomID)
var err error var err error
msg.Data, err = json.Marshal(update) msg.Data, err = json.Marshal(update)

View file

@ -72,15 +72,16 @@ func (s *OutputRoomEventConsumer) Start() error {
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool { func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called msg := msgs[0] // Guaranteed to exist if onMessage is called
// Only handle events we care about
if rsapi.OutputType(msg.Header.Get(jetstream.RoomEventType)) != rsapi.OutputTypeNewRoomEvent {
return true
}
var output rsapi.OutputEvent var output rsapi.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil { if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure") log.WithError(err).Errorf("roomserver output log: message parse failure")
return true return true
} }
if output.Type != rsapi.OutputTypeNewRoomEvent {
return true
}
event := output.NewRoomEvent.Event event := output.NewRoomEvent.Event
if event == nil { if event == nil {
log.Errorf("userapi consumer: expected event") log.Errorf("userapi consumer: expected event")