Try to process sarama message type from headers

This commit is contained in:
Neil Alexander 2020-03-12 10:23:59 +00:00
parent 1babe2b7ae
commit 67f6f74876
8 changed files with 83 additions and 87 deletions

View file

@ -15,7 +15,6 @@
package consumers package consumers
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
@ -83,29 +82,23 @@ func (s *OutputRoomEventConsumer) Start() error {
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON // Parse out the event JSON
var output api.OutputEvent var output api.OutputEvent
headers := common.SaramaHeaders(msg.Headers)
if err := json.Unmarshal(msg.Value, &output); err != nil { if msgtype, ok := headers["type"]; ok {
// If the message was invalid, log it and move on to the next message in the stream if api.OutputType(msgtype) != api.OutputTypeNewRoomEvent {
log.WithError(err).Errorf("roomserver output log: message parse failure") log.WithField("type", msgtype).Debug(
return nil "roomserver output log: ignoring unknown output type",
} )
return nil
if output.Type != api.OutputTypeNewRoomEvent { }
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
} }
// See if the room version is present in the headers. If it isn't // See if the room version is present in the headers. If it isn't
// then we can't process the event as we don't know what the format // then we can't process the event as we don't know what the format
// will be // will be
var roomVersion gomatrixserverlib.RoomVersion var roomVersion gomatrixserverlib.RoomVersion
for _, header := range msg.Headers { if rv, ok := headers["room_version"]; ok {
if bytes.Equal(header.Key, []byte("room_version")) { roomVersion = gomatrixserverlib.RoomVersion(rv)
roomVersion = gomatrixserverlib.RoomVersion(header.Value)
break
}
} }
if roomVersion == "" { if roomVersion == "" {
return errors.New("room version was not in sarama headers") return errors.New("room version was not in sarama headers")

View file

@ -15,7 +15,6 @@
package consumers package consumers
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
@ -73,28 +72,23 @@ func (s *OutputRoomEventConsumer) Start() error {
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON // Parse out the event JSON
var output api.OutputEvent var output api.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil { headers := common.SaramaHeaders(msg.Headers)
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
if output.Type != api.OutputTypeNewRoomEvent { if msgtype, ok := headers["type"]; ok {
log.WithField("type", output.Type).Debug( if api.OutputType(msgtype) != api.OutputTypeNewRoomEvent {
"roomserver output log: ignoring unknown output type", log.WithField("type", msgtype).Debug(
) "roomserver output log: ignoring unknown output type",
return nil )
return nil
}
} }
// See if the room version is present in the headers. If it isn't // See if the room version is present in the headers. If it isn't
// then we can't process the event as we don't know what the format // then we can't process the event as we don't know what the format
// will be // will be
var roomVersion gomatrixserverlib.RoomVersion var roomVersion gomatrixserverlib.RoomVersion
for _, header := range msg.Headers { if rv, ok := headers["room_version"]; ok {
if bytes.Equal(header.Key, []byte("room_version")) { roomVersion = gomatrixserverlib.RoomVersion(rv)
roomVersion = gomatrixserverlib.RoomVersion(header.Value)
break
}
} }
if roomVersion == "" { if roomVersion == "" {
return errors.New("room version was not in sarama headers") return errors.New("room version was not in sarama headers")
@ -102,27 +96,27 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Prepare the room event so that it has the correct field types // Prepare the room event so that it has the correct field types
// for the room version // for the room version
ev := gomatrixserverlib.Event{} output.NewRoomEvent.Event = gomatrixserverlib.Event{}
if err := ev.PrepareAs(roomVersion); err != nil { if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"room_version": roomVersion, "room_version": roomVersion,
}).WithError(err).Errorf("can't prepare event to version") }).WithError(err).Errorf("can't prepare event to version")
return err return err
} }
if err := json.Unmarshal(msg.Value, &ev); err != nil { if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure") log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil return nil
} }
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": ev.EventID(), "event_id": output.NewRoomEvent.Event.EventID(),
"room_id": ev.RoomID(), "room_id": output.NewRoomEvent.Event.RoomID(),
"type": ev.Type(), "type": output.NewRoomEvent.Event.Type(),
}).Info("received event from roomserver") }).Info("received event from roomserver")
events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, output.NewRoomEvent.Event)
if err != nil { if err != nil {
return err return err
} }

14
common/sarama.go Normal file
View file

@ -0,0 +1,14 @@
package common
import "gopkg.in/Shopify/sarama.v1"
func SaramaHeaders(headers []*sarama.RecordHeader) map[string][]byte {
result := make(map[string][]byte)
for _, header := range headers {
if header == nil {
continue
}
result[string(header.Key)] = header.Value
}
return result
}

View file

@ -15,7 +15,6 @@
package consumers package consumers
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
@ -76,29 +75,23 @@ func (s *OutputRoomEventConsumer) Start() error {
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON // Parse out the event JSON
var output api.OutputEvent var output api.OutputEvent
headers := common.SaramaHeaders(msg.Headers)
if err := json.Unmarshal(msg.Value, &output); err != nil { if msgtype, ok := headers["type"]; ok {
// If the message was invalid, log it and move on to the next message in the stream if api.OutputType(msgtype) != api.OutputTypeNewRoomEvent {
log.WithError(err).Errorf("roomserver output log: message parse failure") log.WithField("type", msgtype).Debug(
return nil "roomserver output log: ignoring unknown output type",
} )
return nil
if output.Type != api.OutputTypeNewRoomEvent { }
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
} }
// See if the room version is present in the headers. If it isn't // See if the room version is present in the headers. If it isn't
// then we can't process the event as we don't know what the format // then we can't process the event as we don't know what the format
// will be // will be
var roomVersion gomatrixserverlib.RoomVersion var roomVersion gomatrixserverlib.RoomVersion
for _, header := range msg.Headers { if rv, ok := headers["room_version"]; ok {
if bytes.Equal(header.Key, []byte("room_version")) { roomVersion = gomatrixserverlib.RoomVersion(rv)
roomVersion = gomatrixserverlib.RoomVersion(header.Value)
break
}
} }
if roomVersion == "" { if roomVersion == "" {
return errors.New("room version was not in sarama headers") return errors.New("room version was not in sarama headers")

View file

@ -15,7 +15,6 @@
package consumers package consumers
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
@ -66,30 +65,23 @@ func (s *OutputRoomEventConsumer) Start() error {
// onMessage is called when the sync server receives a new event from the room server output log. // onMessage is called when the sync server receives a new event from the room server output log.
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
var output api.OutputEvent var output api.OutputEvent
headers := common.SaramaHeaders(msg.Headers)
if err := json.Unmarshal(msg.Value, &output); err != nil { if msgtype, ok := headers["type"]; ok {
// If the message was invalid, log it and move on to the next message in the stream if api.OutputType(msgtype) != api.OutputTypeNewRoomEvent {
log.WithError(err).Errorf("roomserver output log: message parse failure") log.WithField("type", msgtype).Debug(
return nil "roomserver output log: ignoring unknown output type",
} )
return nil
// Filter out any messages that aren't new room events }
if output.Type != api.OutputTypeNewRoomEvent {
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
} }
// See if the room version is present in the headers. If it isn't // See if the room version is present in the headers. If it isn't
// then we can't process the event as we don't know what the format // then we can't process the event as we don't know what the format
// will be // will be
var roomVersion gomatrixserverlib.RoomVersion var roomVersion gomatrixserverlib.RoomVersion
for _, header := range msg.Headers { if rv, ok := headers["room_version"]; ok {
if bytes.Equal(header.Key, []byte("room_version")) { roomVersion = gomatrixserverlib.RoomVersion(rv)
roomVersion = gomatrixserverlib.RoomVersion(header.Value)
break
}
} }
if roomVersion == "" { if roomVersion == "" {
return errors.New("room version was not in sarama headers") return errors.New("room version was not in sarama headers")

View file

@ -44,16 +44,21 @@ func (r *RoomserverInputAPI) WriteOutputEvents(
roomID string, roomVersion gomatrixserverlib.RoomVersion, updates []api.OutputEvent, roomID string, roomVersion gomatrixserverlib.RoomVersion, updates []api.OutputEvent,
) error { ) error {
messages := make([]*sarama.ProducerMessage, len(updates)) messages := make([]*sarama.ProducerMessage, len(updates))
for i := range updates { for i, update := range updates {
value, err := json.Marshal(updates[i]) value, err := json.Marshal(update)
if err != nil { if err != nil {
return err return err
} }
messages[i] = &sarama.ProducerMessage{ messages[i] = &sarama.ProducerMessage{
Topic: r.OutputRoomEventTopic, Topic: r.OutputRoomEventTopic,
Key: sarama.StringEncoder(roomID), Key: sarama.StringEncoder(roomID),
Value: sarama.ByteEncoder(value), Value: sarama.ByteEncoder(value),
Headers: []sarama.RecordHeader{ Headers: []sarama.RecordHeader{
sarama.RecordHeader{
Key: []byte("type"),
Value: []byte(update.Type),
},
sarama.RecordHeader{ sarama.RecordHeader{
Key: []byte("room_version"), Key: []byte("room_version"),
Value: []byte(roomVersion), Value: []byte(roomVersion),

View file

@ -19,6 +19,7 @@ package input
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -266,9 +267,13 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
roomVersion, err := u.db.GetRoomVersionForRoomNID(context.Background(), u.roomNID) roomVersion, err := u.db.GetRoomVersionForRoomNID(context.Background(), u.roomNID)
if err != nil { if err != nil {
fmt.Println("FAILED TO GET ROOM VERSION:", err)
return nil, err return nil, err
} }
fmt.Println("GOT ROOM VERSION", roomVersion)
fmt.Println("EVENT IS", u.event)
ore := api.OutputNewRoomEvent{ ore := api.OutputNewRoomEvent{
Event: u.event, Event: u.event,
RoomVersion: roomVersion, RoomVersion: roomVersion,

View file

@ -15,7 +15,6 @@
package consumers package consumers
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
@ -76,22 +75,23 @@ func (s *OutputRoomEventConsumer) Start() error {
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON // Parse out the event JSON
var output api.OutputEvent var output api.OutputEvent
headers := common.SaramaHeaders(msg.Headers)
if err := json.Unmarshal(msg.Value, &output); err != nil { if msgtype, ok := headers["type"]; ok {
// If the message was invalid, log it and move on to the next message in the stream if api.OutputType(msgtype) != api.OutputTypeNewRoomEvent {
log.WithError(err).Errorf("roomserver output log: message parse failure") log.WithField("type", msgtype).Debug(
return nil "roomserver output log: ignoring unknown output type",
)
return nil
}
} }
// See if the room version is present in the headers. If it isn't // See if the room version is present in the headers. If it isn't
// then we can't process the event as we don't know what the format // then we can't process the event as we don't know what the format
// will be // will be
var roomVersion gomatrixserverlib.RoomVersion var roomVersion gomatrixserverlib.RoomVersion
for _, header := range msg.Headers { if rv, ok := headers["room_version"]; ok {
if bytes.Equal(header.Key, []byte("room_version")) { roomVersion = gomatrixserverlib.RoomVersion(rv)
roomVersion = gomatrixserverlib.RoomVersion(header.Value)
break
}
} }
if roomVersion == "" { if roomVersion == "" {
return errors.New("room version was not in sarama headers") return errors.New("room version was not in sarama headers")