mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-18 12:23:09 -06:00
Try to prepare event using room ID in sarama metadata
This commit is contained in:
parent
ba7f0c44a0
commit
8591f3c4b3
|
|
@ -127,7 +127,7 @@ func writeEvent(event gomatrixserverlib.Event) {
|
||||||
ire.Kind = api.KindNew
|
ire.Kind = api.KindNew
|
||||||
ire.Event = event
|
ire.Event = event
|
||||||
authEventIDs := []string{}
|
authEventIDs := []string{}
|
||||||
for _, ref := range b.AuthEvents.([]gomatrixserverlib.EventReference) {
|
for _, ref := range b.AuthEvents {
|
||||||
authEventIDs = append(authEventIDs, ref.EventID)
|
authEventIDs = append(authEventIDs, ref.EventID)
|
||||||
}
|
}
|
||||||
ire.AuthEventIDs = authEventIDs
|
ire.AuthEventIDs = authEventIDs
|
||||||
|
|
|
||||||
|
|
@ -62,14 +62,9 @@ func (s *OutputRoomEventConsumer) Start() error {
|
||||||
|
|
||||||
// onMessage is called when the sync server receives a new event from the room server output log.
|
// onMessage is called when the sync server receives a new event from the room server output log.
|
||||||
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
// Parse out the event JSON
|
|
||||||
var output api.OutputEvent
|
var output api.OutputEvent
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
|
||||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Filter out any messages that aren't new room events
|
||||||
if output.Type != api.OutputTypeNewRoomEvent {
|
if output.Type != api.OutputTypeNewRoomEvent {
|
||||||
log.WithField("type", output.Type).Debug(
|
log.WithField("type", output.Type).Debug(
|
||||||
"roomserver output log: ignoring unknown output type",
|
"roomserver output log: ignoring unknown output type",
|
||||||
|
|
@ -78,18 +73,27 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the room version of the room
|
// Get the room version of the room
|
||||||
vQueryReq := api.QueryRoomVersionForRoomIDRequest{RoomID: output.NewRoomEvent.Event.RoomID()}
|
vQueryReq := api.QueryRoomVersionForRoomIDRequest{RoomID: string(msg.Key)}
|
||||||
vQueryRes := api.QueryRoomVersionForRoomIDResponse{}
|
vQueryRes := api.QueryRoomVersionForRoomIDResponse{}
|
||||||
if err := s.query.QueryRoomVersionForRoomID(context.Background(), &vQueryReq, &vQueryRes); err != nil {
|
if err := s.query.QueryRoomVersionForRoomID(context.Background(), &vQueryReq, &vQueryRes); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Prepare the room event so that it has the correct field types
|
||||||
|
// for the room version
|
||||||
if err := output.NewRoomEvent.Event.PrepareAs(vQueryRes.RoomVersion); err != nil {
|
if err := output.NewRoomEvent.Event.PrepareAs(vQueryRes.RoomVersion); err != nil {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"room_version": vQueryRes.RoomVersion,
|
"room_version": vQueryRes.RoomVersion,
|
||||||
}).WithError(err).Errorf("can't prepare event to version")
|
}).WithError(err).Errorf("can't prepare event to version")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Parse out the event JSON
|
||||||
|
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||||
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
|
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
ev := output.NewRoomEvent.Event
|
ev := output.NewRoomEvent.Event
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"event_id": ev.EventID(),
|
"event_id": ev.EventID(),
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue