From 0eceb3bcd7495356417e7fa4efb152dd5e56c510 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 24 Feb 2017 16:46:04 +0000 Subject: [PATCH] Write output events --- .../dendrite/roomserver/api/output.go | 85 +++++++++++++++++++ .../dendrite/roomserver/input/consumer.go | 31 +++++-- .../dendrite/roomserver/input/events.go | 8 +- .../roomserver/input/latest_events.go | 27 +++++- .../roomserver/roomserver/roomserver.go | 20 +++-- 5 files changed, 152 insertions(+), 19 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/roomserver/api/output.go diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/output.go b/src/github.com/matrix-org/dendrite/roomserver/api/output.go new file mode 100644 index 000000000..ba514ec10 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/api/output.go @@ -0,0 +1,85 @@ +package api + +import ( + "encoding/json" +) + +// An OutputRoomEvent is written when the roomserver receives a new event. +type OutputRoomEvent struct { + // The JSON bytes of the event. + Event []byte + // The state event IDs needed to determine who can see this event. + // This can be used to tell which users to send the event to. + VisibilityEventIDs []string + // The latest events in the room after this event. + // This can be used to set the prev events for new events in the room. + // This also can be used to get the full current state after this event. + LatestEventIDs []string + // The state event IDs that were added to the state of the room by this event. + // Together with RemovesStateEventIDs this allows the receiver to keep an up to date + // view of the current state of the room. + AddsStateEventIDs []string + // The state event IDs that were removed from the state of the room by this event. + RemovesStateEventIDs []string + // The ID of the event that was output before this event. + // Or the empty string if this is the first event output for this room. + // This is used by consumers to check if they can safely update their + // current state using the delta supplied in AddsStateEventIDs and + // RemovesStateEventIDs. + // If they the LastSentEventID doesn't match what they were expecting it to + // be they can use the LatestEventIDs to request the full current state. + LastSentEventID string +} + +// UnmarshalJSON implements json.Unmarshaller +func (ore *OutputRoomEvent) UnmarshalJSON(data []byte) error { + // Create a struct rather than unmarshalling directly into the OutputRoomEvent + // so that we can use json.RawMessage. + // We use json.RawMessage so that the event JSON is sent as JSON rather than + // being base64 encoded which is the default for []byte. + var content struct { + Event *json.RawMessage + VisibilityEventIDs []string + LatestEventIDs []string + AddsStateEventIDs []string + RemovesStateEventIDs []string + LastSentEventID string + } + if err := json.Unmarshal(data, &content); err != nil { + return err + } + if content.Event != nil { + ore.Event = []byte(*content.Event) + } + ore.VisibilityEventIDs = content.VisibilityEventIDs + ore.LatestEventIDs = content.LatestEventIDs + ore.AddsStateEventIDs = content.AddsStateEventIDs + ore.RemovesStateEventIDs = content.RemovesStateEventIDs + ore.LastSentEventID = content.LastSentEventID + return nil +} + +// MarshalJSON implements json.Marshaller +func (ore OutputRoomEvent) MarshalJSON() ([]byte, error) { + // Create a struct rather than marshalling directly from the OutputRoomEvent + // so that we can use json.RawMessage. + // We use json.RawMessage so that the event JSON is sent as JSON rather than + // being base64 encoded which is the default for []byte. + event := json.RawMessage(ore.Event) + content := struct { + Event *json.RawMessage + VisibilityEventIDs []string + LatestEventIDs []string + AddsStateEventIDs []string + RemovesStateEventIDs []string + LastSentEventID string + }{ + Event: &event, + VisibilityEventIDs: ore.VisibilityEventIDs, + LatestEventIDs: ore.LatestEventIDs, + AddsStateEventIDs: ore.AddsStateEventIDs, + RemovesStateEventIDs: ore.RemovesStateEventIDs, + LastSentEventID: ore.LastSentEventID, + } + return json.Marshal(&content) +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go index 14a3ce15f..f299861a0 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go @@ -34,15 +34,32 @@ type Consumer struct { // But any equivalent event streaming protocol could be made to implement the same interface. Consumer sarama.Consumer // The database used to store the room events. - DB ConsumerDatabase + DB ConsumerDatabase + Producer sarama.SyncProducer // The kafkaesque topic to consume room events from. // This is the name used in kafka to identify the stream to consume events from. - RoomEventTopic string + InputRoomEventTopic string + // The kafkaesque topic to output new room events to. + // This is the name used in kafka to identify the stream to write events to. + OutputRoomEventTopic string // The ErrorLogger for this consumer. // If left as nil then the consumer will panic when it encounters an error ErrorLogger ErrorLogger } +func (c *Consumer) WriteOutputRoomEvent(output api.OutputRoomEvent) error { + var m sarama.ProducerMessage + value, err := json.Marshal(output) + if err != nil { + return err + } + m.Topic = c.OutputRoomEventTopic + m.Key = sarama.StringEncoder("") + m.Value = sarama.ByteEncoder(value) + _, _, err = c.Producer.SendMessage(&m) + return err +} + // Start starts the consumer consuming. // Starts up a goroutine for each partition in the kafka stream. // Returns nil once all the goroutines are started. @@ -50,7 +67,7 @@ type Consumer struct { func (c *Consumer) Start() error { offsets := map[int32]int64{} - partitions, err := c.Consumer.Partitions(c.RoomEventTopic) + partitions, err := c.Consumer.Partitions(c.InputRoomEventTopic) if err != nil { return err } @@ -59,7 +76,7 @@ func (c *Consumer) Start() error { offsets[partition] = sarama.OffsetOldest } - storedOffsets, err := c.DB.PartitionOffsets(c.RoomEventTopic) + storedOffsets, err := c.DB.PartitionOffsets(c.InputRoomEventTopic) if err != nil { return err } @@ -70,7 +87,7 @@ func (c *Consumer) Start() error { var partitionConsumers []sarama.PartitionConsumer for partition, offset := range offsets { - pc, err := c.Consumer.ConsumePartition(c.RoomEventTopic, partition, offset) + pc, err := c.Consumer.ConsumePartition(c.InputRoomEventTopic, partition, offset) if err != nil { for _, p := range partitionConsumers { p.Close() @@ -95,7 +112,7 @@ func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) { // If the message is invalid then log it and move onto the next message in the stream. c.logError(message, err) } else { - if err := processRoomEvent(c.DB, input); err != nil { + if err := processRoomEvent(c.DB, c, input); err != nil { // If there was an error processing the message then log it and // move onto the next message in the stream. // TODO: If the error was due to a problem talking to the database @@ -105,7 +122,7 @@ func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) { } } // Advance our position in the stream so that we will start at the right position after a restart. - if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil { + if err := c.DB.SetPartitionOffset(c.InputRoomEventTopic, message.Partition, message.Offset); err != nil { c.logError(message, err) } } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index 36a9f9975..f2cdc2304 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -40,7 +40,11 @@ type RoomEventDatabase interface { GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, string, types.RoomRecentEventsUpdater, error) } -func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error { +type OutputRoomEventWriter interface { + WriteOutputRoomEvent(output api.OutputRoomEvent) error +} + +func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent) error { // Parse and validate the event JSON event, err := gomatrixserverlib.NewEventFromUntrustedJSON(input.Event) if err != nil { @@ -95,7 +99,7 @@ func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error { } // Update the extremities of the event graph for the room - if err := updateLatestEvents(db, roomNID, stateAtEvent, event); err != nil { + if err := updateLatestEvents(db, ow, roomNID, stateAtEvent, event); err != nil { return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go index 983ca1efd..bb8748c26 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go @@ -2,6 +2,7 @@ package input import ( "bytes" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -22,7 +23,7 @@ import ( // 7 <----- latest // func updateLatestEvents( - db RoomEventDatabase, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event, + db RoomEventDatabase, ow OutputRoomEventWriter, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event, ) (err error) { oldLatest, lastEventIDSent, updater, err := db.GetLatestEventsForUpdate(roomNID) if err != nil { @@ -42,12 +43,12 @@ func updateLatestEvents( } }() - err = doUpdateLatestEvents(updater, oldLatest, lastEventIDSent, roomNID, stateAtEvent, event) + err = doUpdateLatestEvents(updater, ow, oldLatest, lastEventIDSent, roomNID, stateAtEvent, event) return } func doUpdateLatestEvents( - updater types.RoomRecentEventsUpdater, oldLatest []types.StateAtEventAndReference, lastEventIDSent string, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event, + updater types.RoomRecentEventsUpdater, ow OutputRoomEventWriter, oldLatest []types.StateAtEventAndReference, lastEventIDSent string, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event, ) error { var err error var prevEvents []gomatrixserverlib.EventReference @@ -76,7 +77,11 @@ func doUpdateLatestEvents( StateAtEvent: stateAtEvent, }) - // TODO: Send the event to the output logs. + // Send the event to the output logs. + // We do this inside the database transaction to ensure that we only mark an event as sent if we sent it. + if err = writeEvent(ow, lastEventIDSent, event, newLatest); err != nil { + return err + } if err = updater.SetLatestEvents(roomNID, newLatest, stateAtEvent.EventNID); err != nil { return err @@ -121,3 +126,17 @@ func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenc return newLatest } + +func writeEvent(ow OutputRoomEventWriter, lastEventIDSent string, event gomatrixserverlib.Event, latest []types.StateAtEventAndReference) error { + + latestEventIDs := make([]string, len(latest)) + for i := range latest { + latestEventIDs[i] = latest[i].EventID + } + + return ow.WriteOutputRoomEvent(api.OutputRoomEvent{ + Event: event.JSON(), + LastSentEventID: lastEventIDSent, + LatestEventIDs: latestEventIDs, + }) +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go b/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go index 0205ff007..d2f126bf5 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go +++ b/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go @@ -10,9 +10,10 @@ import ( ) var ( - database = os.Getenv("DATABASE") - kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",") - roomEventTopic = os.Getenv("TOPIC_ROOM_EVENT") + database = os.Getenv("DATABASE") + kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",") + inputRoomEventTopic = os.Getenv("TOPIC_INPUT_ROOM_EVENT") + outputRoomEventTopic = os.Getenv("TOPIC_OUTPUT_ROOM_EVENT") ) func main() { @@ -26,10 +27,17 @@ func main() { panic(err) } + kafkaProducer, err := sarama.NewSyncProducer(kafkaURIs, nil) + if err != nil { + panic(err) + } + consumer := input.Consumer{ - Consumer: kafkaConsumer, - DB: db, - RoomEventTopic: roomEventTopic, + Consumer: kafkaConsumer, + DB: db, + Producer: kafkaProducer, + InputRoomEventTopic: inputRoomEventTopic, + OutputRoomEventTopic: outputRoomEventTopic, } if err = consumer.Start(); err != nil {