From bd668b68592fe011fbd31864558319f705dde4a2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 11 Jul 2017 18:23:24 +0100 Subject: [PATCH] Make the roomserver output format more flexible --- .../cmd/syncserver-integration-tests/main.go | 10 +- .../federationsender/consumers/roomserver.go | 33 +++--- .../dendrite/roomserver/api/output.go | 100 +++++------------- .../dendrite/roomserver/input/consumer.go | 8 +- .../dendrite/roomserver/input/events.go | 2 +- .../roomserver/input/latest_events.go | 5 +- .../dendrite/syncapi/consumers/roomserver.go | 23 ++-- 7 files changed, 71 insertions(+), 110 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go index d1cf9fd12..6bc59456b 100644 --- a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go @@ -98,15 +98,13 @@ func createTestUser(database, username, token string) error { // trimmed to the client format and then canonicalised and returned as a string. // Panics if there are any problems. func clientEventJSONForOutputRoomEvent(outputRoomEvent string) string { - var out api.OutputRoomEvent + var out api.OutputEvent if err := json.Unmarshal([]byte(outputRoomEvent), &out); err != nil { panic("failed to unmarshal output room event: " + err.Error()) } - ev, err := gomatrixserverlib.NewEventFromTrustedJSON(out.Event, false) - if err != nil { - panic("failed to convert event field in output room event to Event: " + err.Error()) - } - clientEvs := gomatrixserverlib.ToClientEvents([]gomatrixserverlib.Event{ev}, gomatrixserverlib.FormatSync) + clientEvs := gomatrixserverlib.ToClientEvents([]gomatrixserverlib.Event{ + out.NewRoomEvent.Event, + }, gomatrixserverlib.FormatSync) b, err := json.Marshal(clientEvs[0]) if err != nil { panic("failed to marshal client event as json: " + err.Error()) diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go index 0a0af2b32..8cccd0760 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go @@ -72,31 +72,32 @@ func (s *OutputRoomEvent) Start() error { // realises that it cannot update the room state using the deltas. func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON - var output api.OutputRoomEvent + var output api.OutputEvent if err := json.Unmarshal(msg.Value, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") return nil } - - ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.Event, false) - if err != nil { - log.WithError(err).Errorf("roomserver output log: event parse failure") + if output.Type != api.OutputTypeNewRoomEvent { + log.WithField("type", output.Type).Debug( + "roomserver output logignoring unknown output type", + ) return nil } + ev := &output.NewRoomEvent.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), "room_id": ev.RoomID(), - "send_as_server": output.SendAsServer, + "send_as_server": output.NewRoomEvent.SendAsServer, }).Info("received event from roomserver") - if err = s.processMessage(output, ev); err != nil { + if err := s.processMessage(*output.NewRoomEvent); err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ "event": string(ev.JSON()), log.ErrorKey: err, - "add": output.AddsStateEventIDs, - "del": output.RemovesStateEventIDs, + "add": output.NewRoomEvent.AddsStateEventIDs, + "del": output.NewRoomEvent.RemovesStateEventIDs, }).Panicf("roomserver output log: write event failure") return nil } @@ -106,8 +107,8 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { // processMessage updates the list of currently joined hosts in the room // and then sends the event to the hosts that were joined before the event. -func (s *OutputRoomEvent) processMessage(ore api.OutputRoomEvent, ev gomatrixserverlib.Event) error { - addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ev) +func (s *OutputRoomEvent) processMessage(ore api.OutputNewRoomEvent) error { + addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event) if err != nil { return err } @@ -121,7 +122,7 @@ func (s *OutputRoomEvent) processMessage(ore api.OutputRoomEvent, ev gomatrixser // TODO: handle EventIDMismatchError and recover the current state by talking // to the roomserver oldJoinedHosts, err := s.db.UpdateRoom( - ev.RoomID(), ore.LastSentEventID, ev.EventID(), + ore.Event.RoomID(), ore.LastSentEventID, ore.Event.EventID(), addsJoinedHosts, ore.RemovesStateEventIDs, ) if err != nil { @@ -134,14 +135,14 @@ func (s *OutputRoomEvent) processMessage(ore api.OutputRoomEvent, ev gomatrixser } // Work out which hosts were joined at the event itself. - joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, ev, oldJoinedHosts) + joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, oldJoinedHosts) if err != nil { return err } // Send the event. if err = s.queues.SendEvent( - &ev, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent, + &ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent, ); err != nil { return err } @@ -159,7 +160,7 @@ func (s *OutputRoomEvent) processMessage(ore api.OutputRoomEvent, ev gomatrixser // events from the room server. // Returns an error if there was a problem talking to the room server. func (s *OutputRoomEvent) joinedHostsAtEvent( - ore api.OutputRoomEvent, ev gomatrixserverlib.Event, oldJoinedHosts []types.JoinedHost, + ore api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost, ) ([]gomatrixserverlib.ServerName, error) { // Combine the delta into a single delta so that the adds and removes can // cancel each other out. This should reduce the number of times we need @@ -168,7 +169,7 @@ func (s *OutputRoomEvent) joinedHostsAtEvent( ore.AddsStateEventIDs, ore.RemovesStateEventIDs, ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs, ) - combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ev) + combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ore.Event) if err != nil { return nil, err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/output.go b/src/github.com/matrix-org/dendrite/roomserver/api/output.go index 8055ce1b7..f1b402315 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/output.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/output.go @@ -15,10 +15,25 @@ package api import ( - "encoding/json" + "github.com/matrix-org/gomatrixserverlib" ) -// An OutputRoomEvent is written when the roomserver receives a new event. +// An OutputType is a type of roomserver output. +type OutputType string + +// OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent +const OutputTypeNewRoomEvent OutputType = "new_room_event" + +// An OutputEvent is an entry in the roomserver output kafka log. +// Consumers should check the type field when consuming this event. +type OutputEvent struct { + // What sort of event this is. + Type OutputType `json:"type"` + // The content of event with type OutputTypeNewRoomEvent + NewRoomEvent *OutputNewRoomEvent `json:"new_room_event,omitempty"` +} + +// An OutputNewRoomEvent is written when the roomserver receives a new event. // It contains the full matrix room event and enough information for a // consumer to construct the current state of the room and the state before the // event. @@ -27,19 +42,19 @@ import ( // after a list of events. The current state is the state after the latest // event IDs in the room. The state before an event is the state after its // prev_events. -type OutputRoomEvent struct { - // The JSON bytes of the event. - Event []byte +type OutputNewRoomEvent struct { + // The Event. + Event gomatrixserverlib.Event `json:"event"` // The latest events in the room after this event. // This can be used to set the prev events for new events in the room. // This also can be used to get the full current state after this event. - LatestEventIDs []string + LatestEventIDs []string `json:"latest_event_ids"` // The state event IDs that were added to the state of the room by this event. // Together with RemovesStateEventIDs this allows the receiver to keep an up to date // view of the current state of the room. - AddsStateEventIDs []string + AddsStateEventIDs []string `json:"adds_state_event_ids"` // The state event IDs that were removed from the state of the room by this event. - RemovesStateEventIDs []string + RemovesStateEventIDs []string `json:"removes_state_event_ids"` // The ID of the event that was output before this event. // Or the empty string if this is the first event output for this room. // This is used by consumers to check if they can safely update their @@ -48,7 +63,7 @@ type OutputRoomEvent struct { // // If the LastSentEventID doesn't match what they were expecting it to be // they can use the LatestEventIDs to request the full current state. - LastSentEventID string + LastSentEventID string `json:"last_sent_event_id"` // The state event IDs that are part of the state at the event, but not // part of the current state. Together with the StateBeforeRemovesEventIDs // this can be used to construct the state before the event from the @@ -62,10 +77,10 @@ type OutputRoomEvent struct { // // The state is given as a delta against the current state because they are // usually either the same state, or differ by just a couple of events. - StateBeforeAddsEventIDs []string + StateBeforeAddsEventIDs []string `json:"state_before_adds_event_ids"` // The state event IDs that are part of the current state, but not part // of the state at the event. - StateBeforeRemovesEventIDs []string + StateBeforeRemovesEventIDs []string `json:"state_before_removes_event_ids"` // The server name to use to push this event to other servers. // Or empty if this event shouldn't be pushed to other servers. // @@ -81,66 +96,5 @@ type OutputRoomEvent struct { // // We encode the server name that the event should be sent using here to // future proof the API for virtual hosting. - SendAsServer string -} - -// UnmarshalJSON implements json.Unmarshaller -func (ore *OutputRoomEvent) UnmarshalJSON(data []byte) error { - // Create a struct rather than unmarshalling directly into the OutputRoomEvent - // so that we can use json.RawMessage. - // We use json.RawMessage so that the event JSON is sent as JSON rather than - // being base64 encoded which is the default for []byte. - var content struct { - Event *json.RawMessage - LatestEventIDs []string - AddsStateEventIDs []string - RemovesStateEventIDs []string - LastSentEventID string - StateBeforeAddsEventIDs []string - StateBeforeRemovesEventIDs []string - SendAsServer string - } - if err := json.Unmarshal(data, &content); err != nil { - return err - } - if content.Event != nil { - ore.Event = []byte(*content.Event) - } - ore.LatestEventIDs = content.LatestEventIDs - ore.AddsStateEventIDs = content.AddsStateEventIDs - ore.RemovesStateEventIDs = content.RemovesStateEventIDs - ore.LastSentEventID = content.LastSentEventID - ore.StateBeforeAddsEventIDs = content.StateBeforeAddsEventIDs - ore.StateBeforeRemovesEventIDs = content.StateBeforeRemovesEventIDs - ore.SendAsServer = content.SendAsServer - return nil -} - -// MarshalJSON implements json.Marshaller -func (ore OutputRoomEvent) MarshalJSON() ([]byte, error) { - // Create a struct rather than marshalling directly from the OutputRoomEvent - // so that we can use json.RawMessage. - // We use json.RawMessage so that the event JSON is sent as JSON rather than - // being base64 encoded which is the default for []byte. - event := json.RawMessage(ore.Event) - content := struct { - Event *json.RawMessage - LatestEventIDs []string - AddsStateEventIDs []string - RemovesStateEventIDs []string - LastSentEventID string - StateBeforeAddsEventIDs []string - StateBeforeRemovesEventIDs []string - SendAsServer string - }{ - Event: &event, - LatestEventIDs: ore.LatestEventIDs, - AddsStateEventIDs: ore.AddsStateEventIDs, - RemovesStateEventIDs: ore.RemovesStateEventIDs, - LastSentEventID: ore.LastSentEventID, - StateBeforeAddsEventIDs: ore.StateBeforeAddsEventIDs, - StateBeforeRemovesEventIDs: ore.StateBeforeRemovesEventIDs, - SendAsServer: ore.SendAsServer, - } - return json.Marshal(&content) + SendAsServer string `json:"send_as_server"` } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go index 743e0b692..efe450381 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go @@ -63,9 +63,13 @@ type Consumer struct { } // WriteOutputRoomEvent implements OutputRoomEventWriter -func (c *Consumer) WriteOutputRoomEvent(output api.OutputRoomEvent) error { +func (c *Consumer) WriteOutputRoomEvent(output api.OutputNewRoomEvent) error { var m sarama.ProducerMessage - value, err := json.Marshal(output) + oe := api.OutputEvent{ + Type: api.OutputTypeNewRoomEvent, + NewRoomEvent: &output, + } + value, err := json.Marshal(oe) if err != nil { return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index 8031a7957..acf302212 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -44,7 +44,7 @@ type RoomEventDatabase interface { // OutputRoomEventWriter has the APIs needed to write an event to the output logs. type OutputRoomEventWriter interface { // Write an event. - WriteOutputRoomEvent(output api.OutputRoomEvent) error + WriteOutputRoomEvent(output api.OutputNewRoomEvent) error } func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent) error { diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go index d8de51b8f..6b5f39679 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go @@ -16,6 +16,7 @@ package input import ( "bytes" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/types" @@ -201,8 +202,8 @@ func writeEvent( latestEventIDs[i] = latest[i].EventID } - ore := api.OutputRoomEvent{ - Event: event.JSON(), + ore := api.OutputNewRoomEvent{ + Event: event, LastSentEventID: lastEventIDSent, LatestEventIDs: latestEventIDs, } diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index e5a971c85..70f42e1b7 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -71,35 +71,38 @@ func (s *OutputRoomEvent) Start() error { // sync stream position may race and be incorrectly calculated. func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON - var output api.OutputRoomEvent + var output api.OutputEvent if err := json.Unmarshal(msg.Value, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") return nil } - ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.Event, false) - if err != nil { - log.WithError(err).Errorf("roomserver output log: event parse failure") + if output.Type != api.OutputTypeNewRoomEvent { + log.WithField("type", output.Type).Debug( + "roomserver output log: ignoring unknown output type", + ) return nil } + + ev := output.NewRoomEvent.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), "room_id": ev.RoomID(), }).Info("received event from roomserver") - addsStateEvents, err := s.lookupStateEvents(output.AddsStateEventIDs, ev) + addsStateEvents, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) if err != nil { log.WithFields(log.Fields{ "event": string(ev.JSON()), log.ErrorKey: err, - "add": output.AddsStateEventIDs, - "del": output.RemovesStateEventIDs, + "add": output.NewRoomEvent.AddsStateEventIDs, + "del": output.NewRoomEvent.RemovesStateEventIDs, }).Panicf("roomserver output log: state event lookup failure") } syncStreamPos, err := s.db.WriteEvent( - &ev, addsStateEvents, output.AddsStateEventIDs, output.RemovesStateEventIDs, + &ev, addsStateEvents, output.NewRoomEvent.AddsStateEventIDs, output.NewRoomEvent.RemovesStateEventIDs, ) if err != nil { @@ -107,8 +110,8 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { log.WithFields(log.Fields{ "event": string(ev.JSON()), log.ErrorKey: err, - "add": output.AddsStateEventIDs, - "del": output.RemovesStateEventIDs, + "add": output.NewRoomEvent.AddsStateEventIDs, + "del": output.NewRoomEvent.RemovesStateEventIDs, }).Panicf("roomserver output log: write event failure") return nil }