From 8591f3c4b35910de07286c73258be68aea056304 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 11 Mar 2020 11:20:27 +0000 Subject: [PATCH] Try to prepare event using room ID in sarama metadata --- cmd/create-room-events/main.go | 2 +- publicroomsapi/consumers/roomserver.go | 18 +++++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/cmd/create-room-events/main.go b/cmd/create-room-events/main.go index 0bfd9d758..f1e8c8d8d 100644 --- a/cmd/create-room-events/main.go +++ b/cmd/create-room-events/main.go @@ -127,7 +127,7 @@ func writeEvent(event gomatrixserverlib.Event) { ire.Kind = api.KindNew ire.Event = event authEventIDs := []string{} - for _, ref := range b.AuthEvents.([]gomatrixserverlib.EventReference) { + for _, ref := range b.AuthEvents { authEventIDs = append(authEventIDs, ref.EventID) } ire.AuthEventIDs = authEventIDs diff --git a/publicroomsapi/consumers/roomserver.go b/publicroomsapi/consumers/roomserver.go index 6c93ad78a..d9a678929 100644 --- a/publicroomsapi/consumers/roomserver.go +++ b/publicroomsapi/consumers/roomserver.go @@ -62,14 +62,9 @@ func (s *OutputRoomEventConsumer) Start() error { // onMessage is called when the sync server receives a new event from the room server output log. func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { - // Parse out the event JSON var output api.OutputEvent - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil - } + // Filter out any messages that aren't new room events if output.Type != api.OutputTypeNewRoomEvent { log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", @@ -78,18 +73,27 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { } // Get the room version of the room - vQueryReq := api.QueryRoomVersionForRoomIDRequest{RoomID: output.NewRoomEvent.Event.RoomID()} + vQueryReq := api.QueryRoomVersionForRoomIDRequest{RoomID: string(msg.Key)} vQueryRes := api.QueryRoomVersionForRoomIDResponse{} if err := s.query.QueryRoomVersionForRoomID(context.Background(), &vQueryReq, &vQueryRes); err != nil { return err } + // Prepare the room event so that it has the correct field types + // for the room version if err := output.NewRoomEvent.Event.PrepareAs(vQueryRes.RoomVersion); err != nil { log.WithFields(log.Fields{ "room_version": vQueryRes.RoomVersion, }).WithError(err).Errorf("can't prepare event to version") } + // Parse out the event JSON + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + ev := output.NewRoomEvent.Event log.WithFields(log.Fields{ "event_id": ev.EventID(),