Make the roomserver output format more flexible

This commit is contained in:
Mark Haines 2017-07-11 18:23:24 +01:00
parent 7d36ca03af
commit bd668b6859
7 changed files with 71 additions and 110 deletions

View file

@ -98,15 +98,13 @@ func createTestUser(database, username, token string) error {
// trimmed to the client format and then canonicalised and returned as a string. // trimmed to the client format and then canonicalised and returned as a string.
// Panics if there are any problems. // Panics if there are any problems.
func clientEventJSONForOutputRoomEvent(outputRoomEvent string) string { func clientEventJSONForOutputRoomEvent(outputRoomEvent string) string {
var out api.OutputRoomEvent var out api.OutputEvent
if err := json.Unmarshal([]byte(outputRoomEvent), &out); err != nil { if err := json.Unmarshal([]byte(outputRoomEvent), &out); err != nil {
panic("failed to unmarshal output room event: " + err.Error()) panic("failed to unmarshal output room event: " + err.Error())
} }
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(out.Event, false) clientEvs := gomatrixserverlib.ToClientEvents([]gomatrixserverlib.Event{
if err != nil { out.NewRoomEvent.Event,
panic("failed to convert event field in output room event to Event: " + err.Error()) }, gomatrixserverlib.FormatSync)
}
clientEvs := gomatrixserverlib.ToClientEvents([]gomatrixserverlib.Event{ev}, gomatrixserverlib.FormatSync)
b, err := json.Marshal(clientEvs[0]) b, err := json.Marshal(clientEvs[0])
if err != nil { if err != nil {
panic("failed to marshal client event as json: " + err.Error()) panic("failed to marshal client event as json: " + err.Error())

View file

@ -72,31 +72,32 @@ func (s *OutputRoomEvent) Start() error {
// realises that it cannot update the room state using the deltas. // realises that it cannot update the room state using the deltas.
func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON // Parse out the event JSON
var output api.OutputRoomEvent var output api.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil { if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure") log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil return nil
} }
if output.Type != api.OutputTypeNewRoomEvent {
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.Event, false) log.WithField("type", output.Type).Debug(
if err != nil { "roomserver output logignoring unknown output type",
log.WithError(err).Errorf("roomserver output log: event parse failure") )
return nil return nil
} }
ev := &output.NewRoomEvent.Event
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": ev.EventID(), "event_id": ev.EventID(),
"room_id": ev.RoomID(), "room_id": ev.RoomID(),
"send_as_server": output.SendAsServer, "send_as_server": output.NewRoomEvent.SendAsServer,
}).Info("received event from roomserver") }).Info("received event from roomserver")
if err = s.processMessage(output, ev); err != nil { if err := s.processMessage(*output.NewRoomEvent); err != nil {
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event": string(ev.JSON()), "event": string(ev.JSON()),
log.ErrorKey: err, log.ErrorKey: err,
"add": output.AddsStateEventIDs, "add": output.NewRoomEvent.AddsStateEventIDs,
"del": output.RemovesStateEventIDs, "del": output.NewRoomEvent.RemovesStateEventIDs,
}).Panicf("roomserver output log: write event failure") }).Panicf("roomserver output log: write event failure")
return nil return nil
} }
@ -106,8 +107,8 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
// processMessage updates the list of currently joined hosts in the room // processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event. // and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEvent) processMessage(ore api.OutputRoomEvent, ev gomatrixserverlib.Event) error { func (s *OutputRoomEvent) processMessage(ore api.OutputNewRoomEvent) error {
addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ev) addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event)
if err != nil { if err != nil {
return err return err
} }
@ -121,7 +122,7 @@ func (s *OutputRoomEvent) processMessage(ore api.OutputRoomEvent, ev gomatrixser
// TODO: handle EventIDMismatchError and recover the current state by talking // TODO: handle EventIDMismatchError and recover the current state by talking
// to the roomserver // to the roomserver
oldJoinedHosts, err := s.db.UpdateRoom( oldJoinedHosts, err := s.db.UpdateRoom(
ev.RoomID(), ore.LastSentEventID, ev.EventID(), ore.Event.RoomID(), ore.LastSentEventID, ore.Event.EventID(),
addsJoinedHosts, ore.RemovesStateEventIDs, addsJoinedHosts, ore.RemovesStateEventIDs,
) )
if err != nil { if err != nil {
@ -134,14 +135,14 @@ func (s *OutputRoomEvent) processMessage(ore api.OutputRoomEvent, ev gomatrixser
} }
// Work out which hosts were joined at the event itself. // Work out which hosts were joined at the event itself.
joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, ev, oldJoinedHosts) joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, oldJoinedHosts)
if err != nil { if err != nil {
return err return err
} }
// Send the event. // Send the event.
if err = s.queues.SendEvent( if err = s.queues.SendEvent(
&ev, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent, &ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent,
); err != nil { ); err != nil {
return err return err
} }
@ -159,7 +160,7 @@ func (s *OutputRoomEvent) processMessage(ore api.OutputRoomEvent, ev gomatrixser
// events from the room server. // events from the room server.
// Returns an error if there was a problem talking to the room server. // Returns an error if there was a problem talking to the room server.
func (s *OutputRoomEvent) joinedHostsAtEvent( func (s *OutputRoomEvent) joinedHostsAtEvent(
ore api.OutputRoomEvent, ev gomatrixserverlib.Event, oldJoinedHosts []types.JoinedHost, ore api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost,
) ([]gomatrixserverlib.ServerName, error) { ) ([]gomatrixserverlib.ServerName, error) {
// Combine the delta into a single delta so that the adds and removes can // Combine the delta into a single delta so that the adds and removes can
// cancel each other out. This should reduce the number of times we need // cancel each other out. This should reduce the number of times we need
@ -168,7 +169,7 @@ func (s *OutputRoomEvent) joinedHostsAtEvent(
ore.AddsStateEventIDs, ore.RemovesStateEventIDs, ore.AddsStateEventIDs, ore.RemovesStateEventIDs,
ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs, ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs,
) )
combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ev) combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ore.Event)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -15,10 +15,25 @@
package api package api
import ( import (
"encoding/json" "github.com/matrix-org/gomatrixserverlib"
) )
// An OutputRoomEvent is written when the roomserver receives a new event. // An OutputType is a type of roomserver output.
type OutputType string
// OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent
const OutputTypeNewRoomEvent OutputType = "new_room_event"
// An OutputEvent is an entry in the roomserver output kafka log.
// Consumers should check the type field when consuming this event.
type OutputEvent struct {
// What sort of event this is.
Type OutputType `json:"type"`
// The content of event with type OutputTypeNewRoomEvent
NewRoomEvent *OutputNewRoomEvent `json:"new_room_event,omitempty"`
}
// An OutputNewRoomEvent is written when the roomserver receives a new event.
// It contains the full matrix room event and enough information for a // It contains the full matrix room event and enough information for a
// consumer to construct the current state of the room and the state before the // consumer to construct the current state of the room and the state before the
// event. // event.
@ -27,19 +42,19 @@ import (
// after a list of events. The current state is the state after the latest // after a list of events. The current state is the state after the latest
// event IDs in the room. The state before an event is the state after its // event IDs in the room. The state before an event is the state after its
// prev_events. // prev_events.
type OutputRoomEvent struct { type OutputNewRoomEvent struct {
// The JSON bytes of the event. // The Event.
Event []byte Event gomatrixserverlib.Event `json:"event"`
// The latest events in the room after this event. // The latest events in the room after this event.
// This can be used to set the prev events for new events in the room. // This can be used to set the prev events for new events in the room.
// This also can be used to get the full current state after this event. // This also can be used to get the full current state after this event.
LatestEventIDs []string LatestEventIDs []string `json:"latest_event_ids"`
// The state event IDs that were added to the state of the room by this event. // The state event IDs that were added to the state of the room by this event.
// Together with RemovesStateEventIDs this allows the receiver to keep an up to date // Together with RemovesStateEventIDs this allows the receiver to keep an up to date
// view of the current state of the room. // view of the current state of the room.
AddsStateEventIDs []string AddsStateEventIDs []string `json:"adds_state_event_ids"`
// The state event IDs that were removed from the state of the room by this event. // The state event IDs that were removed from the state of the room by this event.
RemovesStateEventIDs []string RemovesStateEventIDs []string `json:"removes_state_event_ids"`
// The ID of the event that was output before this event. // The ID of the event that was output before this event.
// Or the empty string if this is the first event output for this room. // Or the empty string if this is the first event output for this room.
// This is used by consumers to check if they can safely update their // This is used by consumers to check if they can safely update their
@ -48,7 +63,7 @@ type OutputRoomEvent struct {
// //
// If the LastSentEventID doesn't match what they were expecting it to be // If the LastSentEventID doesn't match what they were expecting it to be
// they can use the LatestEventIDs to request the full current state. // they can use the LatestEventIDs to request the full current state.
LastSentEventID string LastSentEventID string `json:"last_sent_event_id"`
// The state event IDs that are part of the state at the event, but not // The state event IDs that are part of the state at the event, but not
// part of the current state. Together with the StateBeforeRemovesEventIDs // part of the current state. Together with the StateBeforeRemovesEventIDs
// this can be used to construct the state before the event from the // this can be used to construct the state before the event from the
@ -62,10 +77,10 @@ type OutputRoomEvent struct {
// //
// The state is given as a delta against the current state because they are // The state is given as a delta against the current state because they are
// usually either the same state, or differ by just a couple of events. // usually either the same state, or differ by just a couple of events.
StateBeforeAddsEventIDs []string StateBeforeAddsEventIDs []string `json:"state_before_adds_event_ids"`
// The state event IDs that are part of the current state, but not part // The state event IDs that are part of the current state, but not part
// of the state at the event. // of the state at the event.
StateBeforeRemovesEventIDs []string StateBeforeRemovesEventIDs []string `json:"state_before_removes_event_ids"`
// The server name to use to push this event to other servers. // The server name to use to push this event to other servers.
// Or empty if this event shouldn't be pushed to other servers. // Or empty if this event shouldn't be pushed to other servers.
// //
@ -81,66 +96,5 @@ type OutputRoomEvent struct {
// //
// We encode the server name that the event should be sent using here to // We encode the server name that the event should be sent using here to
// future proof the API for virtual hosting. // future proof the API for virtual hosting.
SendAsServer string SendAsServer string `json:"send_as_server"`
}
// UnmarshalJSON implements json.Unmarshaller
func (ore *OutputRoomEvent) UnmarshalJSON(data []byte) error {
// Create a struct rather than unmarshalling directly into the OutputRoomEvent
// so that we can use json.RawMessage.
// We use json.RawMessage so that the event JSON is sent as JSON rather than
// being base64 encoded which is the default for []byte.
var content struct {
Event *json.RawMessage
LatestEventIDs []string
AddsStateEventIDs []string
RemovesStateEventIDs []string
LastSentEventID string
StateBeforeAddsEventIDs []string
StateBeforeRemovesEventIDs []string
SendAsServer string
}
if err := json.Unmarshal(data, &content); err != nil {
return err
}
if content.Event != nil {
ore.Event = []byte(*content.Event)
}
ore.LatestEventIDs = content.LatestEventIDs
ore.AddsStateEventIDs = content.AddsStateEventIDs
ore.RemovesStateEventIDs = content.RemovesStateEventIDs
ore.LastSentEventID = content.LastSentEventID
ore.StateBeforeAddsEventIDs = content.StateBeforeAddsEventIDs
ore.StateBeforeRemovesEventIDs = content.StateBeforeRemovesEventIDs
ore.SendAsServer = content.SendAsServer
return nil
}
// MarshalJSON implements json.Marshaller
func (ore OutputRoomEvent) MarshalJSON() ([]byte, error) {
// Create a struct rather than marshalling directly from the OutputRoomEvent
// so that we can use json.RawMessage.
// We use json.RawMessage so that the event JSON is sent as JSON rather than
// being base64 encoded which is the default for []byte.
event := json.RawMessage(ore.Event)
content := struct {
Event *json.RawMessage
LatestEventIDs []string
AddsStateEventIDs []string
RemovesStateEventIDs []string
LastSentEventID string
StateBeforeAddsEventIDs []string
StateBeforeRemovesEventIDs []string
SendAsServer string
}{
Event: &event,
LatestEventIDs: ore.LatestEventIDs,
AddsStateEventIDs: ore.AddsStateEventIDs,
RemovesStateEventIDs: ore.RemovesStateEventIDs,
LastSentEventID: ore.LastSentEventID,
StateBeforeAddsEventIDs: ore.StateBeforeAddsEventIDs,
StateBeforeRemovesEventIDs: ore.StateBeforeRemovesEventIDs,
SendAsServer: ore.SendAsServer,
}
return json.Marshal(&content)
} }

View file

@ -63,9 +63,13 @@ type Consumer struct {
} }
// WriteOutputRoomEvent implements OutputRoomEventWriter // WriteOutputRoomEvent implements OutputRoomEventWriter
func (c *Consumer) WriteOutputRoomEvent(output api.OutputRoomEvent) error { func (c *Consumer) WriteOutputRoomEvent(output api.OutputNewRoomEvent) error {
var m sarama.ProducerMessage var m sarama.ProducerMessage
value, err := json.Marshal(output) oe := api.OutputEvent{
Type: api.OutputTypeNewRoomEvent,
NewRoomEvent: &output,
}
value, err := json.Marshal(oe)
if err != nil { if err != nil {
return err return err
} }

View file

@ -44,7 +44,7 @@ type RoomEventDatabase interface {
// OutputRoomEventWriter has the APIs needed to write an event to the output logs. // OutputRoomEventWriter has the APIs needed to write an event to the output logs.
type OutputRoomEventWriter interface { type OutputRoomEventWriter interface {
// Write an event. // Write an event.
WriteOutputRoomEvent(output api.OutputRoomEvent) error WriteOutputRoomEvent(output api.OutputNewRoomEvent) error
} }
func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent) error { func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent) error {

View file

@ -16,6 +16,7 @@ package input
import ( import (
"bytes" "bytes"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
@ -201,8 +202,8 @@ func writeEvent(
latestEventIDs[i] = latest[i].EventID latestEventIDs[i] = latest[i].EventID
} }
ore := api.OutputRoomEvent{ ore := api.OutputNewRoomEvent{
Event: event.JSON(), Event: event,
LastSentEventID: lastEventIDSent, LastSentEventID: lastEventIDSent,
LatestEventIDs: latestEventIDs, LatestEventIDs: latestEventIDs,
} }

View file

@ -71,35 +71,38 @@ func (s *OutputRoomEvent) Start() error {
// sync stream position may race and be incorrectly calculated. // sync stream position may race and be incorrectly calculated.
func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON // Parse out the event JSON
var output api.OutputRoomEvent var output api.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil { if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure") log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil return nil
} }
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.Event, false) if output.Type != api.OutputTypeNewRoomEvent {
if err != nil { log.WithField("type", output.Type).Debug(
log.WithError(err).Errorf("roomserver output log: event parse failure") "roomserver output log: ignoring unknown output type",
)
return nil return nil
} }
ev := output.NewRoomEvent.Event
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": ev.EventID(), "event_id": ev.EventID(),
"room_id": ev.RoomID(), "room_id": ev.RoomID(),
}).Info("received event from roomserver") }).Info("received event from roomserver")
addsStateEvents, err := s.lookupStateEvents(output.AddsStateEventIDs, ev) addsStateEvents, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event": string(ev.JSON()), "event": string(ev.JSON()),
log.ErrorKey: err, log.ErrorKey: err,
"add": output.AddsStateEventIDs, "add": output.NewRoomEvent.AddsStateEventIDs,
"del": output.RemovesStateEventIDs, "del": output.NewRoomEvent.RemovesStateEventIDs,
}).Panicf("roomserver output log: state event lookup failure") }).Panicf("roomserver output log: state event lookup failure")
} }
syncStreamPos, err := s.db.WriteEvent( syncStreamPos, err := s.db.WriteEvent(
&ev, addsStateEvents, output.AddsStateEventIDs, output.RemovesStateEventIDs, &ev, addsStateEvents, output.NewRoomEvent.AddsStateEventIDs, output.NewRoomEvent.RemovesStateEventIDs,
) )
if err != nil { if err != nil {
@ -107,8 +110,8 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event": string(ev.JSON()), "event": string(ev.JSON()),
log.ErrorKey: err, log.ErrorKey: err,
"add": output.AddsStateEventIDs, "add": output.NewRoomEvent.AddsStateEventIDs,
"del": output.RemovesStateEventIDs, "del": output.NewRoomEvent.RemovesStateEventIDs,
}).Panicf("roomserver output log: write event failure") }).Panicf("roomserver output log: write event failure")
return nil return nil
} }