mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-17 20:03:10 -06:00
Try to construct the events the hard way
This commit is contained in:
parent
5e1bf86ac7
commit
64b57deed2
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/common/config"
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
|
|
@ -99,24 +100,19 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
return errors.New("room version was not in sarama headers")
|
return errors.New("room version was not in sarama headers")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare the room event so that it has the correct field types
|
|
||||||
// for the room version
|
|
||||||
output.NewRoomEvent = &api.OutputNewRoomEvent{
|
|
||||||
Event: gomatrixserverlib.Event{},
|
|
||||||
}
|
|
||||||
if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"room_version": roomVersion,
|
|
||||||
}).WithError(err).Errorf("can't prepare event to version")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
evJSON := gjson.Get(string(msg.Value), "new_room_event.event")
|
||||||
|
if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil {
|
||||||
|
output.NewRoomEvent.Event = ev
|
||||||
|
} else {
|
||||||
|
return errors.New("unable to get new_room_event.event")
|
||||||
|
}
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"event_id": output.NewRoomEvent.Event.EventID(),
|
"event_id": output.NewRoomEvent.Event.EventID(),
|
||||||
"room_id": output.NewRoomEvent.Event.RoomID(),
|
"room_id": output.NewRoomEvent.Event.RoomID(),
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -110,26 +111,19 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
return errors.New("room version was not in sarama headers")
|
return errors.New("room version was not in sarama headers")
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("room version is", roomVersion)
|
|
||||||
|
|
||||||
// Prepare the room event so that it has the correct field types
|
|
||||||
// for the room version
|
|
||||||
output.NewRoomEvent = &api.OutputNewRoomEvent{
|
|
||||||
Event: gomatrixserverlib.Event{},
|
|
||||||
}
|
|
||||||
if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"room_version": roomVersion,
|
|
||||||
}).WithError(err).Errorf("can't prepare event to version")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
evJSON := gjson.Get(string(msg.Value), "new_room_event.event")
|
||||||
|
if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil {
|
||||||
|
output.NewRoomEvent.Event = ev
|
||||||
|
} else {
|
||||||
|
return errors.New("unable to get new_room_event.event")
|
||||||
|
}
|
||||||
|
|
||||||
ev := output.NewRoomEvent.Event
|
ev := output.NewRoomEvent.Event
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"event_id": ev.EventID(),
|
"event_id": ev.EventID(),
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -92,24 +93,19 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
return errors.New("room version was not in sarama headers")
|
return errors.New("room version was not in sarama headers")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare the room event so that it has the correct field types
|
|
||||||
// for the room version
|
|
||||||
output.NewRoomEvent = &api.OutputNewRoomEvent{
|
|
||||||
Event: gomatrixserverlib.Event{},
|
|
||||||
}
|
|
||||||
if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"room_version": roomVersion,
|
|
||||||
}).WithError(err).Errorf("can't prepare event to version")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
evJSON := gjson.Get(string(msg.Value), "new_room_event.event")
|
||||||
|
if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil {
|
||||||
|
output.NewRoomEvent.Event = ev
|
||||||
|
} else {
|
||||||
|
return errors.New("unable to get new_room_event.event")
|
||||||
|
}
|
||||||
|
|
||||||
ev := output.NewRoomEvent.Event
|
ev := output.NewRoomEvent.Event
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"event_id": ev.EventID(),
|
"event_id": ev.EventID(),
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -102,46 +103,30 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
return errors.New("room version was not in sarama headers")
|
return errors.New("room version was not in sarama headers")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||||
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
|
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
switch output.Type {
|
switch output.Type {
|
||||||
case api.OutputTypeNewRoomEvent:
|
case api.OutputTypeNewRoomEvent:
|
||||||
output.NewRoomEvent = &api.OutputNewRoomEvent{
|
evJSON := gjson.Get(string(msg.Value), "new_room_event.event")
|
||||||
Event: gomatrixserverlib.Event{},
|
if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil {
|
||||||
}
|
output.NewRoomEvent.Event = ev
|
||||||
if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil {
|
} else {
|
||||||
log.WithFields(log.Fields{
|
return errors.New("unable to get new_room_event.event")
|
||||||
"room_version": roomVersion,
|
|
||||||
}).WithError(err).Errorf("can't prepare event to version")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
|
||||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
|
return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
|
||||||
case api.OutputTypeNewInviteEvent:
|
case api.OutputTypeNewInviteEvent:
|
||||||
output.NewInviteEvent = &api.OutputNewInviteEvent{
|
evJSON := gjson.Get(string(msg.Value), "new_invite_event.event")
|
||||||
Event: gomatrixserverlib.Event{},
|
if ev, err := gomatrixserverlib.NewEventFromUntrustedJSON([]byte(evJSON.String()), roomVersion); err == nil {
|
||||||
}
|
output.NewInviteEvent.Event = ev
|
||||||
if err := output.NewInviteEvent.Event.PrepareAs(roomVersion); err != nil {
|
} else {
|
||||||
log.WithFields(log.Fields{
|
return errors.New("unable to get new_invite_event.event")
|
||||||
"room_version": roomVersion,
|
|
||||||
}).WithError(err).Errorf("can't prepare event to version")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
|
||||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
|
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
|
||||||
case api.OutputTypeRetireInviteEvent:
|
case api.OutputTypeRetireInviteEvent:
|
||||||
output.RetireInviteEvent = &api.OutputRetireInviteEvent{}
|
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
|
||||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent)
|
return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent)
|
||||||
default:
|
default:
|
||||||
log.WithField("type", output.Type).Debug(
|
log.WithField("type", output.Type).Debug(
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue