mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-16 11:23:11 -06:00
Give same treatment to clientapi
This commit is contained in:
parent
df4f3aefdd
commit
1babe2b7ae
|
|
@ -15,8 +15,10 @@
|
|||
package consumers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
|
|
@ -84,7 +86,36 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
ev := output.NewRoomEvent.Event
|
||||
// See if the room version is present in the headers. If it isn't
|
||||
// then we can't process the event as we don't know what the format
|
||||
// will be
|
||||
var roomVersion gomatrixserverlib.RoomVersion
|
||||
for _, header := range msg.Headers {
|
||||
if bytes.Equal(header.Key, []byte("room_version")) {
|
||||
roomVersion = gomatrixserverlib.RoomVersion(header.Value)
|
||||
break
|
||||
}
|
||||
}
|
||||
if roomVersion == "" {
|
||||
return errors.New("room version was not in sarama headers")
|
||||
}
|
||||
|
||||
// Prepare the room event so that it has the correct field types
|
||||
// for the room version
|
||||
ev := gomatrixserverlib.Event{}
|
||||
if err := ev.PrepareAs(roomVersion); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"room_version": roomVersion,
|
||||
}).WithError(err).Errorf("can't prepare event to version")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(msg.Value, &ev); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||
return nil
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
"room_id": ev.RoomID(),
|
||||
|
|
|
|||
Loading…
Reference in a new issue