From 67f6f74876b807ee5e86d2038cf2a29de90ad4a8 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 12 Mar 2020 10:23:59 +0000 Subject: [PATCH] Try to process sarama message type from headers --- appservice/consumers/roomserver.go | 27 ++++++---------- clientapi/consumers/roomserver.go | 40 ++++++++++-------------- common/sarama.go | 14 +++++++++ federationsender/consumers/roomserver.go | 27 ++++++---------- publicroomsapi/consumers/roomserver.go | 28 ++++++----------- roomserver/input/input.go | 9 ++++-- roomserver/input/latest_events.go | 5 +++ syncapi/consumers/roomserver.go | 20 ++++++------ 8 files changed, 83 insertions(+), 87 deletions(-) create mode 100644 common/sarama.go diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 766d4b277..b9e94f3bd 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -15,7 +15,6 @@ package consumers import ( - "bytes" "context" "encoding/json" "errors" @@ -83,29 +82,23 @@ func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent + headers := common.SaramaHeaders(msg.Headers) - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil - } - - if output.Type != api.OutputTypeNewRoomEvent { - log.WithField("type", output.Type).Debug( - "roomserver output log: ignoring unknown output type", - ) - return nil + if msgtype, ok := headers["type"]; ok { + if api.OutputType(msgtype) != api.OutputTypeNewRoomEvent { + log.WithField("type", msgtype).Debug( + "roomserver output log: ignoring unknown output type", + ) + return nil + } } // See if the room version is present in the headers. If it isn't // then we can't process the event as we don't know what the format // will be var roomVersion gomatrixserverlib.RoomVersion - for _, header := range msg.Headers { - if bytes.Equal(header.Key, []byte("room_version")) { - roomVersion = gomatrixserverlib.RoomVersion(header.Value) - break - } + if rv, ok := headers["room_version"]; ok { + roomVersion = gomatrixserverlib.RoomVersion(rv) } if roomVersion == "" { return errors.New("room version was not in sarama headers") diff --git a/clientapi/consumers/roomserver.go b/clientapi/consumers/roomserver.go index 98202dc69..aa9adf246 100644 --- a/clientapi/consumers/roomserver.go +++ b/clientapi/consumers/roomserver.go @@ -15,7 +15,6 @@ package consumers import ( - "bytes" "context" "encoding/json" "errors" @@ -73,28 +72,23 @@ func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil - } + headers := common.SaramaHeaders(msg.Headers) - if output.Type != api.OutputTypeNewRoomEvent { - log.WithField("type", output.Type).Debug( - "roomserver output log: ignoring unknown output type", - ) - return nil + if msgtype, ok := headers["type"]; ok { + if api.OutputType(msgtype) != api.OutputTypeNewRoomEvent { + log.WithField("type", msgtype).Debug( + "roomserver output log: ignoring unknown output type", + ) + return nil + } } // See if the room version is present in the headers. If it isn't // then we can't process the event as we don't know what the format // will be var roomVersion gomatrixserverlib.RoomVersion - for _, header := range msg.Headers { - if bytes.Equal(header.Key, []byte("room_version")) { - roomVersion = gomatrixserverlib.RoomVersion(header.Value) - break - } + if rv, ok := headers["room_version"]; ok { + roomVersion = gomatrixserverlib.RoomVersion(rv) } if roomVersion == "" { return errors.New("room version was not in sarama headers") @@ -102,27 +96,27 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Prepare the room event so that it has the correct field types // for the room version - ev := gomatrixserverlib.Event{} - if err := ev.PrepareAs(roomVersion); err != nil { + output.NewRoomEvent.Event = gomatrixserverlib.Event{} + if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil { log.WithFields(log.Fields{ "room_version": roomVersion, }).WithError(err).Errorf("can't prepare event to version") return err } - if err := json.Unmarshal(msg.Value, &ev); err != nil { + if err := json.Unmarshal(msg.Value, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") return nil } log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "room_id": ev.RoomID(), - "type": ev.Type(), + "event_id": output.NewRoomEvent.Event.EventID(), + "room_id": output.NewRoomEvent.Event.RoomID(), + "type": output.NewRoomEvent.Event.Type(), }).Info("received event from roomserver") - events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) + events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, output.NewRoomEvent.Event) if err != nil { return err } diff --git a/common/sarama.go b/common/sarama.go new file mode 100644 index 000000000..0c761418b --- /dev/null +++ b/common/sarama.go @@ -0,0 +1,14 @@ +package common + +import "gopkg.in/Shopify/sarama.v1" + +func SaramaHeaders(headers []*sarama.RecordHeader) map[string][]byte { + result := make(map[string][]byte) + for _, header := range headers { + if header == nil { + continue + } + result[string(header.Key)] = header.Value + } + return result +} diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 6940c0d51..9fdde072e 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -15,7 +15,6 @@ package consumers import ( - "bytes" "context" "encoding/json" "errors" @@ -76,29 +75,23 @@ func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent + headers := common.SaramaHeaders(msg.Headers) - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil - } - - if output.Type != api.OutputTypeNewRoomEvent { - log.WithField("type", output.Type).Debug( - "roomserver output log: ignoring unknown output type", - ) - return nil + if msgtype, ok := headers["type"]; ok { + if api.OutputType(msgtype) != api.OutputTypeNewRoomEvent { + log.WithField("type", msgtype).Debug( + "roomserver output log: ignoring unknown output type", + ) + return nil + } } // See if the room version is present in the headers. If it isn't // then we can't process the event as we don't know what the format // will be var roomVersion gomatrixserverlib.RoomVersion - for _, header := range msg.Headers { - if bytes.Equal(header.Key, []byte("room_version")) { - roomVersion = gomatrixserverlib.RoomVersion(header.Value) - break - } + if rv, ok := headers["room_version"]; ok { + roomVersion = gomatrixserverlib.RoomVersion(rv) } if roomVersion == "" { return errors.New("room version was not in sarama headers") diff --git a/publicroomsapi/consumers/roomserver.go b/publicroomsapi/consumers/roomserver.go index aae12bf50..b47be5c9d 100644 --- a/publicroomsapi/consumers/roomserver.go +++ b/publicroomsapi/consumers/roomserver.go @@ -15,7 +15,6 @@ package consumers import ( - "bytes" "context" "encoding/json" "errors" @@ -66,30 +65,23 @@ func (s *OutputRoomEventConsumer) Start() error { // onMessage is called when the sync server receives a new event from the room server output log. func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { var output api.OutputEvent + headers := common.SaramaHeaders(msg.Headers) - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil - } - - // Filter out any messages that aren't new room events - if output.Type != api.OutputTypeNewRoomEvent { - log.WithField("type", output.Type).Debug( - "roomserver output log: ignoring unknown output type", - ) - return nil + if msgtype, ok := headers["type"]; ok { + if api.OutputType(msgtype) != api.OutputTypeNewRoomEvent { + log.WithField("type", msgtype).Debug( + "roomserver output log: ignoring unknown output type", + ) + return nil + } } // See if the room version is present in the headers. If it isn't // then we can't process the event as we don't know what the format // will be var roomVersion gomatrixserverlib.RoomVersion - for _, header := range msg.Headers { - if bytes.Equal(header.Key, []byte("room_version")) { - roomVersion = gomatrixserverlib.RoomVersion(header.Value) - break - } + if rv, ok := headers["room_version"]; ok { + roomVersion = gomatrixserverlib.RoomVersion(rv) } if roomVersion == "" { return errors.New("room version was not in sarama headers") diff --git a/roomserver/input/input.go b/roomserver/input/input.go index 52c7e4d10..ea544c6d0 100644 --- a/roomserver/input/input.go +++ b/roomserver/input/input.go @@ -44,16 +44,21 @@ func (r *RoomserverInputAPI) WriteOutputEvents( roomID string, roomVersion gomatrixserverlib.RoomVersion, updates []api.OutputEvent, ) error { messages := make([]*sarama.ProducerMessage, len(updates)) - for i := range updates { - value, err := json.Marshal(updates[i]) + for i, update := range updates { + value, err := json.Marshal(update) if err != nil { return err } + messages[i] = &sarama.ProducerMessage{ Topic: r.OutputRoomEventTopic, Key: sarama.StringEncoder(roomID), Value: sarama.ByteEncoder(value), Headers: []sarama.RecordHeader{ + sarama.RecordHeader{ + Key: []byte("type"), + Value: []byte(update.Type), + }, sarama.RecordHeader{ Key: []byte("room_version"), Value: []byte(roomVersion), diff --git a/roomserver/input/latest_events.go b/roomserver/input/latest_events.go index eab971547..8f5454f18 100644 --- a/roomserver/input/latest_events.go +++ b/roomserver/input/latest_events.go @@ -19,6 +19,7 @@ package input import ( "bytes" "context" + "fmt" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" @@ -266,9 +267,13 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) roomVersion, err := u.db.GetRoomVersionForRoomNID(context.Background(), u.roomNID) if err != nil { + fmt.Println("FAILED TO GET ROOM VERSION:", err) return nil, err } + fmt.Println("GOT ROOM VERSION", roomVersion) + fmt.Println("EVENT IS", u.event) + ore := api.OutputNewRoomEvent{ Event: u.event, RoomVersion: roomVersion, diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 4731e830c..dec4d644a 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -15,7 +15,6 @@ package consumers import ( - "bytes" "context" "encoding/json" "errors" @@ -76,22 +75,23 @@ func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent + headers := common.SaramaHeaders(msg.Headers) - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil + if msgtype, ok := headers["type"]; ok { + if api.OutputType(msgtype) != api.OutputTypeNewRoomEvent { + log.WithField("type", msgtype).Debug( + "roomserver output log: ignoring unknown output type", + ) + return nil + } } // See if the room version is present in the headers. If it isn't // then we can't process the event as we don't know what the format // will be var roomVersion gomatrixserverlib.RoomVersion - for _, header := range msg.Headers { - if bytes.Equal(header.Key, []byte("room_version")) { - roomVersion = gomatrixserverlib.RoomVersion(header.Value) - break - } + if rv, ok := headers["room_version"]; ok { + roomVersion = gomatrixserverlib.RoomVersion(rv) } if roomVersion == "" { return errors.New("room version was not in sarama headers")