From dc7028b3ab40b87777d628f7f371c2d63089fa09 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 12 Mar 2020 17:49:10 +0000 Subject: [PATCH] Tweaks --- appservice/consumers/roomserver.go | 21 +++++++++------------ federationapi/routing/join.go | 5 ++--- federationsender/consumers/roomserver.go | 3 ++- publicroomsapi/consumers/roomserver.go | 3 ++- syncapi/consumers/roomserver.go | 6 ++++-- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 04113f7ef..c052c21fa 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" + "github.com/tidwall/gjson" log "github.com/sirupsen/logrus" sarama "gopkg.in/Shopify/sarama.v1" @@ -109,24 +110,20 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return errors.New("room version was not in sarama headers") } - // Prepare the room event so that it has the correct field types - // for the room version - output.NewRoomEvent = &api.OutputNewRoomEvent{ - Event: gomatrixserverlib.Event{}, - } - if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil { - log.WithFields(log.Fields{ - "room_version": roomVersion, - }).WithError(err).Errorf("can't prepare event to version") - return err - } - if err := json.Unmarshal(msg.Value, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") return nil } + evJSON := gjson.Get(string(msg.Value), "new_room_event.event") + if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil { + output.NewRoomEvent.Event = ev + } else { + log.WithError(err).Errorf("roomserver output log: unable to find event from kafka message") + return nil + } + ev := output.NewRoomEvent.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go index 2fcb929ad..ca50e25f4 100644 --- a/federationapi/routing/join.go +++ b/federationapi/routing/join.go @@ -15,7 +15,6 @@ package routing import ( - "encoding/json" "net/http" "time" @@ -117,8 +116,8 @@ func SendJoin( keys gomatrixserverlib.KeyRing, roomID, eventID string, ) util.JSONResponse { - var event gomatrixserverlib.Event - if err := json.Unmarshal(request.Content(), &event); err != nil { + event, err := gomatrixserverlib.NewEventFromUntrustedJSON(request.Content()) + if err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()), diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index cf72f6ced..a296bb91f 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -121,7 +121,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil { output.NewRoomEvent.Event = ev } else { - return errors.New("unable to get new_room_event.event") + log.WithError(err).Errorf("roomserver output log: unable to find event from kafka message") + return nil } ev := output.NewRoomEvent.Event diff --git a/publicroomsapi/consumers/roomserver.go b/publicroomsapi/consumers/roomserver.go index 0f43278b3..4f5581578 100644 --- a/publicroomsapi/consumers/roomserver.go +++ b/publicroomsapi/consumers/roomserver.go @@ -103,7 +103,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil { output.NewRoomEvent.Event = ev } else { - return errors.New("unable to get new_room_event.event") + log.WithError(err).Errorf("roomserver output log: unable to find event from kafka message") + return nil } ev := output.NewRoomEvent.Event diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 57456815d..8879aa4fc 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -115,7 +115,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil { output.NewRoomEvent.Event = ev } else { - return errors.New("unable to get new_room_event.event") + log.WithError(err).Errorf("roomserver output log: unable to find event from kafka message") + return nil } return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) case api.OutputTypeNewInviteEvent: @@ -123,7 +124,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil { output.NewInviteEvent.Event = ev } else { - return errors.New("unable to get new_invite_event.event") + log.WithError(err).Errorf("roomserver output log: unable to find event from kafka message") + return nil } return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) case api.OutputTypeRetireInviteEvent: