mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-16 11:23:11 -06:00
Tweaks
This commit is contained in:
parent
64b57deed2
commit
dc7028b3ab
|
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/tidwall/gjson"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
sarama "gopkg.in/Shopify/sarama.v1"
|
||||
|
|
@ -109,24 +110,20 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
return errors.New("room version was not in sarama headers")
|
||||
}
|
||||
|
||||
// Prepare the room event so that it has the correct field types
|
||||
// for the room version
|
||||
output.NewRoomEvent = &api.OutputNewRoomEvent{
|
||||
Event: gomatrixserverlib.Event{},
|
||||
}
|
||||
if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"room_version": roomVersion,
|
||||
}).WithError(err).Errorf("can't prepare event to version")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||
return nil
|
||||
}
|
||||
|
||||
evJSON := gjson.Get(string(msg.Value), "new_room_event.event")
|
||||
if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil {
|
||||
output.NewRoomEvent.Event = ev
|
||||
} else {
|
||||
log.WithError(err).Errorf("roomserver output log: unable to find event from kafka message")
|
||||
return nil
|
||||
}
|
||||
|
||||
ev := output.NewRoomEvent.Event
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@
|
|||
package routing
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
|
|
@ -117,8 +116,8 @@ func SendJoin(
|
|||
keys gomatrixserverlib.KeyRing,
|
||||
roomID, eventID string,
|
||||
) util.JSONResponse {
|
||||
var event gomatrixserverlib.Event
|
||||
if err := json.Unmarshal(request.Content(), &event); err != nil {
|
||||
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(request.Content())
|
||||
if err != nil {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()),
|
||||
|
|
|
|||
|
|
@ -121,7 +121,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil {
|
||||
output.NewRoomEvent.Event = ev
|
||||
} else {
|
||||
return errors.New("unable to get new_room_event.event")
|
||||
log.WithError(err).Errorf("roomserver output log: unable to find event from kafka message")
|
||||
return nil
|
||||
}
|
||||
|
||||
ev := output.NewRoomEvent.Event
|
||||
|
|
|
|||
|
|
@ -103,7 +103,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil {
|
||||
output.NewRoomEvent.Event = ev
|
||||
} else {
|
||||
return errors.New("unable to get new_room_event.event")
|
||||
log.WithError(err).Errorf("roomserver output log: unable to find event from kafka message")
|
||||
return nil
|
||||
}
|
||||
|
||||
ev := output.NewRoomEvent.Event
|
||||
|
|
|
|||
|
|
@ -115,7 +115,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil {
|
||||
output.NewRoomEvent.Event = ev
|
||||
} else {
|
||||
return errors.New("unable to get new_room_event.event")
|
||||
log.WithError(err).Errorf("roomserver output log: unable to find event from kafka message")
|
||||
return nil
|
||||
}
|
||||
return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
|
||||
case api.OutputTypeNewInviteEvent:
|
||||
|
|
@ -123,7 +124,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil {
|
||||
output.NewInviteEvent.Event = ev
|
||||
} else {
|
||||
return errors.New("unable to get new_invite_event.event")
|
||||
log.WithError(err).Errorf("roomserver output log: unable to find event from kafka message")
|
||||
return nil
|
||||
}
|
||||
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
|
||||
case api.OutputTypeRetireInviteEvent:
|
||||
|
|
|
|||
Loading…
Reference in a new issue