From 237b912cf5133245e62e02346e8afa25842be9ae Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 27 Feb 2017 11:25:35 +0000 Subject: [PATCH] Write to an output kafka log for new room events in the roomserver. (#20) * Track which events have been sent and the last event sent for each room * Write output events * comment that an event could be sent twice --- .../dendrite/roomserver/api/output.go | 85 +++++++++++++++++ .../dendrite/roomserver/input/consumer.go | 32 +++++-- .../dendrite/roomserver/input/events.go | 15 ++- .../roomserver/input/latest_events.go | 92 ++++++++++++++----- .../roomserver/roomserver/roomserver.go | 20 ++-- .../roomserver/storage/events_table.go | 38 ++++++++ .../roomserver/storage/rooms_table.go | 21 +++-- .../dendrite/roomserver/storage/storage.go | 32 +++++-- .../dendrite/roomserver/types/types.go | 6 +- 9 files changed, 284 insertions(+), 57 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/roomserver/api/output.go diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/output.go b/src/github.com/matrix-org/dendrite/roomserver/api/output.go new file mode 100644 index 000000000..8cbe9b30d --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/api/output.go @@ -0,0 +1,85 @@ +package api + +import ( + "encoding/json" +) + +// An OutputRoomEvent is written when the roomserver receives a new event. +type OutputRoomEvent struct { + // The JSON bytes of the event. + Event []byte + // The state event IDs needed to determine who can see this event. + // This can be used to tell which users to send the event to. + VisibilityEventIDs []string + // The latest events in the room after this event. + // This can be used to set the prev events for new events in the room. + // This also can be used to get the full current state after this event. + LatestEventIDs []string + // The state event IDs that were added to the state of the room by this event. + // Together with RemovesStateEventIDs this allows the receiver to keep an up to date + // view of the current state of the room. + AddsStateEventIDs []string + // The state event IDs that were removed from the state of the room by this event. + RemovesStateEventIDs []string + // The ID of the event that was output before this event. + // Or the empty string if this is the first event output for this room. + // This is used by consumers to check if they can safely update their + // current state using the delta supplied in AddsStateEventIDs and + // RemovesStateEventIDs. + // If the LastSentEventID doesn't match what they were expecting it to be + // they can use the LatestEventIDs to request the full current state. + LastSentEventID string +} + +// UnmarshalJSON implements json.Unmarshaller +func (ore *OutputRoomEvent) UnmarshalJSON(data []byte) error { + // Create a struct rather than unmarshalling directly into the OutputRoomEvent + // so that we can use json.RawMessage. + // We use json.RawMessage so that the event JSON is sent as JSON rather than + // being base64 encoded which is the default for []byte. + var content struct { + Event *json.RawMessage + VisibilityEventIDs []string + LatestEventIDs []string + AddsStateEventIDs []string + RemovesStateEventIDs []string + LastSentEventID string + } + if err := json.Unmarshal(data, &content); err != nil { + return err + } + if content.Event != nil { + ore.Event = []byte(*content.Event) + } + ore.VisibilityEventIDs = content.VisibilityEventIDs + ore.LatestEventIDs = content.LatestEventIDs + ore.AddsStateEventIDs = content.AddsStateEventIDs + ore.RemovesStateEventIDs = content.RemovesStateEventIDs + ore.LastSentEventID = content.LastSentEventID + return nil +} + +// MarshalJSON implements json.Marshaller +func (ore OutputRoomEvent) MarshalJSON() ([]byte, error) { + // Create a struct rather than marshalling directly from the OutputRoomEvent + // so that we can use json.RawMessage. + // We use json.RawMessage so that the event JSON is sent as JSON rather than + // being base64 encoded which is the default for []byte. + event := json.RawMessage(ore.Event) + content := struct { + Event *json.RawMessage + VisibilityEventIDs []string + LatestEventIDs []string + AddsStateEventIDs []string + RemovesStateEventIDs []string + LastSentEventID string + }{ + Event: &event, + VisibilityEventIDs: ore.VisibilityEventIDs, + LatestEventIDs: ore.LatestEventIDs, + AddsStateEventIDs: ore.AddsStateEventIDs, + RemovesStateEventIDs: ore.RemovesStateEventIDs, + LastSentEventID: ore.LastSentEventID, + } + return json.Marshal(&content) +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go index 14a3ce15f..2c50103b4 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go @@ -34,15 +34,33 @@ type Consumer struct { // But any equivalent event streaming protocol could be made to implement the same interface. Consumer sarama.Consumer // The database used to store the room events. - DB ConsumerDatabase + DB ConsumerDatabase + Producer sarama.SyncProducer // The kafkaesque topic to consume room events from. // This is the name used in kafka to identify the stream to consume events from. - RoomEventTopic string + InputRoomEventTopic string + // The kafkaesque topic to output new room events to. + // This is the name used in kafka to identify the stream to write events to. + OutputRoomEventTopic string // The ErrorLogger for this consumer. // If left as nil then the consumer will panic when it encounters an error ErrorLogger ErrorLogger } +// WriteOutputRoomEvent implements OutputRoomEventWriter +func (c *Consumer) WriteOutputRoomEvent(output api.OutputRoomEvent) error { + var m sarama.ProducerMessage + value, err := json.Marshal(output) + if err != nil { + return err + } + m.Topic = c.OutputRoomEventTopic + m.Key = sarama.StringEncoder("") + m.Value = sarama.ByteEncoder(value) + _, _, err = c.Producer.SendMessage(&m) + return err +} + // Start starts the consumer consuming. // Starts up a goroutine for each partition in the kafka stream. // Returns nil once all the goroutines are started. @@ -50,7 +68,7 @@ type Consumer struct { func (c *Consumer) Start() error { offsets := map[int32]int64{} - partitions, err := c.Consumer.Partitions(c.RoomEventTopic) + partitions, err := c.Consumer.Partitions(c.InputRoomEventTopic) if err != nil { return err } @@ -59,7 +77,7 @@ func (c *Consumer) Start() error { offsets[partition] = sarama.OffsetOldest } - storedOffsets, err := c.DB.PartitionOffsets(c.RoomEventTopic) + storedOffsets, err := c.DB.PartitionOffsets(c.InputRoomEventTopic) if err != nil { return err } @@ -70,7 +88,7 @@ func (c *Consumer) Start() error { var partitionConsumers []sarama.PartitionConsumer for partition, offset := range offsets { - pc, err := c.Consumer.ConsumePartition(c.RoomEventTopic, partition, offset) + pc, err := c.Consumer.ConsumePartition(c.InputRoomEventTopic, partition, offset) if err != nil { for _, p := range partitionConsumers { p.Close() @@ -95,7 +113,7 @@ func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) { // If the message is invalid then log it and move onto the next message in the stream. c.logError(message, err) } else { - if err := processRoomEvent(c.DB, input); err != nil { + if err := processRoomEvent(c.DB, c, input); err != nil { // If there was an error processing the message then log it and // move onto the next message in the stream. // TODO: If the error was due to a problem talking to the database @@ -105,7 +123,7 @@ func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) { } } // Advance our position in the stream so that we will start at the right position after a restart. - if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil { + if err := c.DB.SetPartitionOffset(c.InputRoomEventTopic, message.Partition, message.Offset); err != nil { c.logError(message, err) } } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index 91999916e..d26c95d1c 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -36,11 +36,20 @@ type RoomEventDatabase interface { SetState(eventNID types.EventNID, stateNID types.StateSnapshotNID) error // Lookup the latest events in a room in preparation for an update. // The RoomRecentEventsUpdater must have Commit or Rollback called on it if this doesn't return an error. + // Returns the latest events in the room and the last eventID sent to the log along with an updater. // If this returns an error then no further action is required. - GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, types.RoomRecentEventsUpdater, error) + GetLatestEventsForUpdate(roomNID types.RoomNID) ( + latestEvents []types.StateAtEventAndReference, lastEventIDSent string, updater types.RoomRecentEventsUpdater, err error, + ) } -func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error { +// OutputRoomEventWriter has the APIs needed to write an event to the output logs. +type OutputRoomEventWriter interface { + // Write an event. + WriteOutputRoomEvent(output api.OutputRoomEvent) error +} + +func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent) error { // Parse and validate the event JSON event, err := gomatrixserverlib.NewEventFromUntrustedJSON(input.Event) if err != nil { @@ -95,7 +104,7 @@ func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error { } // Update the extremities of the event graph for the room - if err := updateLatestEvents(db, roomNID, stateAtEvent, event); err != nil { + if err := updateLatestEvents(db, ow, roomNID, stateAtEvent, event); err != nil { return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go index 0bd18ff75..55b712d82 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go @@ -2,11 +2,13 @@ package input import ( "bytes" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" ) -// updateLatestEvents updates the list of latest events for this room. +// updateLatestEvents updates the list of latest events for this room in the database and writes the +// event to the output log. // The latest events are the events that aren't referenced by another event in the database: // // Time goes down the page. 1 is the m.room.create event (root). @@ -22,9 +24,9 @@ import ( // 7 <----- latest // func updateLatestEvents( - db RoomEventDatabase, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event, + db RoomEventDatabase, ow OutputRoomEventWriter, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event, ) (err error) { - oldLatest, updater, err := db.GetLatestEventsForUpdate(roomNID) + oldLatest, lastEventIDSent, updater, err := db.GetLatestEventsForUpdate(roomNID) if err != nil { return } @@ -42,22 +44,64 @@ func updateLatestEvents( } }() - err = doUpdateLatestEvents(updater, oldLatest, roomNID, stateAtEvent, event) + err = doUpdateLatestEvents(updater, ow, oldLatest, lastEventIDSent, roomNID, stateAtEvent, event) return } func doUpdateLatestEvents( - updater types.RoomRecentEventsUpdater, oldLatest []types.StateAtEventAndReference, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event, + updater types.RoomRecentEventsUpdater, ow OutputRoomEventWriter, oldLatest []types.StateAtEventAndReference, lastEventIDSent string, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event, ) error { var err error var prevEvents []gomatrixserverlib.EventReference prevEvents = event.PrevEvents() + if hasBeenSent, err := updater.HasEventBeenSent(stateAtEvent.EventNID); err != nil { + return err + } else if hasBeenSent { + // Already sent this event so we can stop processing + return nil + } + if err = updater.StorePreviousEvents(stateAtEvent.EventNID, prevEvents); err != nil { return err } - // Check if this event references any of the latest events in the room. + eventReference := event.EventReference() + // Check if this event is already referenced by another event in the room. + var alreadyReferenced bool + if alreadyReferenced, err = updater.IsReferenced(eventReference); err != nil { + return err + } + + newLatest := calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{ + EventReference: eventReference, + StateAtEvent: stateAtEvent, + }) + + // Send the event to the output logs. + // We do this inside the database transaction to ensure that we only mark an event as sent if we sent it. + // (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but + // the write to the output log succeeds) + // TODO: This assumes that writing the event to the output log is synchronous. It should be possible to + // send the event asynchronously but we would need to ensure that 1) the events are written to the log in + // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the + // necessary bookkeeping we'll keep the event sending synchronous for now. + if err = writeEvent(ow, lastEventIDSent, event, newLatest); err != nil { + return err + } + + if err = updater.SetLatestEvents(roomNID, newLatest, stateAtEvent.EventNID); err != nil { + return err + } + + if err = updater.MarkEventAsSent(stateAtEvent.EventNID); err != nil { + return err + } + + return nil +} + +func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenced bool, prevEvents []gomatrixserverlib.EventReference, newEvent types.StateAtEventAndReference) []types.StateAtEventAndReference { var alreadyInLatest bool var newLatest []types.StateAtEventAndReference for _, l := range oldLatest { @@ -71,7 +115,7 @@ func doUpdateLatestEvents( break } } - if l.EventNID == stateAtEvent.EventNID { + if l.EventNID == newEvent.EventNID { alreadyInLatest = true } if keep { @@ -80,26 +124,28 @@ func doUpdateLatestEvents( } } - eventReference := event.EventReference() - // Check if this event is already referenced by another event in the room. - var alreadyReferenced bool - if alreadyReferenced, err = updater.IsReferenced(eventReference); err != nil { - return err - } - if !alreadyReferenced && !alreadyInLatest { // This event is not referenced by any of the events in the room // and the event is not already in the latest events. // Add it to the latest events - newLatest = append(newLatest, types.StateAtEventAndReference{ - StateAtEvent: stateAtEvent, - EventReference: eventReference, - }) + newLatest = append(newLatest, newEvent) } - if err = updater.SetLatestEvents(roomNID, newLatest); err != nil { - return err - } - - return nil + return newLatest +} + +func writeEvent(ow OutputRoomEventWriter, lastEventIDSent string, event gomatrixserverlib.Event, latest []types.StateAtEventAndReference) error { + + latestEventIDs := make([]string, len(latest)) + for i := range latest { + latestEventIDs[i] = latest[i].EventID + } + + // TODO: Fill out AddsStateEventIDs and RemovesStateEventIDs + // TODO: Fill out VisibilityStateIDs + return ow.WriteOutputRoomEvent(api.OutputRoomEvent{ + Event: event.JSON(), + LastSentEventID: lastEventIDSent, + LatestEventIDs: latestEventIDs, + }) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go b/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go index 0205ff007..d2f126bf5 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go +++ b/src/github.com/matrix-org/dendrite/roomserver/roomserver/roomserver.go @@ -10,9 +10,10 @@ import ( ) var ( - database = os.Getenv("DATABASE") - kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",") - roomEventTopic = os.Getenv("TOPIC_ROOM_EVENT") + database = os.Getenv("DATABASE") + kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",") + inputRoomEventTopic = os.Getenv("TOPIC_INPUT_ROOM_EVENT") + outputRoomEventTopic = os.Getenv("TOPIC_OUTPUT_ROOM_EVENT") ) func main() { @@ -26,10 +27,17 @@ func main() { panic(err) } + kafkaProducer, err := sarama.NewSyncProducer(kafkaURIs, nil) + if err != nil { + panic(err) + } + consumer := input.Consumer{ - Consumer: kafkaConsumer, - DB: db, - RoomEventTopic: roomEventTopic, + Consumer: kafkaConsumer, + DB: db, + Producer: kafkaProducer, + InputRoomEventTopic: inputRoomEventTopic, + OutputRoomEventTopic: outputRoomEventTopic, } if err = consumer.Start(); err != nil { diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go index d89a1a0dd..2a2f7e20f 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go @@ -23,6 +23,8 @@ CREATE TABLE IF NOT EXISTS events ( -- Local numeric ID for the state_key of the event -- This is 0 if the event is not a state event. event_state_key_nid BIGINT NOT NULL, + -- Whether the event has been written to the output log. + sent_to_output BOOLEAN NOT NULL DEFAULT FALSE, -- Local numeric ID for the state at the event. -- This is 0 if we don't know the state at the event. -- If the state is not 0 then this event is part of the contiguous @@ -68,6 +70,15 @@ const bulkSelectStateAtEventByIDSQL = "" + const updateEventStateSQL = "" + "UPDATE events SET state_snapshot_nid = $2 WHERE event_nid = $1" +const selectEventSentToOutputSQL = "" + + "SELECT sent_to_output FROM events WHERE event_nid = $1" + +const updateEventSentToOutputSQL = "" + + "UPDATE events SET sent_to_output = TRUE WHERE event_nid = $1" + +const selectEventIDSQL = "" + + "SELECT event_id FROM events WHERE event_nid = $1" + const bulkSelectStateAtEventAndReferenceSQL = "" + "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id, reference_sha256" + " FROM events WHERE event_nid = ANY($1)" @@ -78,6 +89,9 @@ type eventStatements struct { bulkSelectStateEventByIDStmt *sql.Stmt bulkSelectStateAtEventByIDStmt *sql.Stmt updateEventStateStmt *sql.Stmt + selectEventSentToOutputStmt *sql.Stmt + updateEventSentToOutputStmt *sql.Stmt + selectEventIDStmt *sql.Stmt bulkSelectStateAtEventAndReferenceStmt *sql.Stmt } @@ -101,6 +115,15 @@ func (s *eventStatements) prepare(db *sql.DB) (err error) { if s.updateEventStateStmt, err = db.Prepare(updateEventStateSQL); err != nil { return } + if s.updateEventSentToOutputStmt, err = db.Prepare(updateEventSentToOutputSQL); err != nil { + return + } + if s.selectEventSentToOutputStmt, err = db.Prepare(selectEventSentToOutputSQL); err != nil { + return + } + if s.selectEventIDStmt, err = db.Prepare(selectEventIDSQL); err != nil { + return + } if s.bulkSelectStateAtEventAndReferenceStmt, err = db.Prepare(bulkSelectStateAtEventAndReferenceSQL); err != nil { return } @@ -199,6 +222,21 @@ func (s *eventStatements) updateEventState(eventNID types.EventNID, stateNID typ return err } +func (s *eventStatements) selectEventSentToOutput(txn *sql.Tx, eventNID types.EventNID) (sentToOutput bool, err error) { + err = txn.Stmt(s.selectEventSentToOutputStmt).QueryRow(int64(eventNID)).Scan(&sentToOutput) + return +} + +func (s *eventStatements) updateEventSentToOutput(txn *sql.Tx, eventNID types.EventNID) error { + _, err := txn.Stmt(s.updateEventSentToOutputStmt).Exec(int64(eventNID)) + return err +} + +func (s *eventStatements) selectEventID(txn *sql.Tx, eventNID types.EventNID) (eventID string, err error) { + err = txn.Stmt(s.selectEventIDStmt).QueryRow(int64(eventNID)).Scan(&eventID) + return +} + func (s *eventStatements) bulkSelectStateAtEventAndReference(txn *sql.Tx, eventNIDs []types.EventNID) ([]types.StateAtEventAndReference, error) { nids := make([]int64, len(eventNIDs)) for i := range eventNIDs { diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go index 78c1c883f..ff932344b 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go @@ -16,7 +16,9 @@ CREATE TABLE IF NOT EXISTS rooms ( -- The most recent events in the room that aren't referenced by another event. -- This list may empty if the server hasn't joined the room yet. -- (The server will be in that state while it stores the events for the initial state of the room) - latest_event_nids BIGINT[] NOT NULL DEFAULT '{}'::BIGINT[] + latest_event_nids BIGINT[] NOT NULL DEFAULT '{}'::BIGINT[], + -- The last event written to the output log for this room. + last_event_sent_nid BIGINT NOT NULL DEFAULT 0 ); ` @@ -30,10 +32,10 @@ const selectRoomNIDSQL = "" + "SELECT room_nid FROM rooms WHERE room_id = $1" const selectLatestEventNIDsSQL = "" + - "SELECT latest_event_nids FROM rooms WHERE room_nid = $1 FOR UPDATE" + "SELECT latest_event_nids, last_event_sent_nid FROM rooms WHERE room_nid = $1 FOR UPDATE" const updateLatestEventNIDsSQL = "" + - "UPDATE rooms SET latest_event_nids = $2 WHERE room_nid = $1" + "UPDATE rooms SET latest_event_nids = $2, last_event_sent_nid = $3 WHERE room_nid = $1" type roomStatements struct { insertRoomNIDStmt *sql.Stmt @@ -74,24 +76,25 @@ func (s *roomStatements) selectRoomNID(roomID string) (types.RoomNID, error) { return types.RoomNID(roomNID), err } -func (s *roomStatements) selectLatestEventsNIDsForUpdate(txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, error) { +func (s *roomStatements) selectLatestEventsNIDsForUpdate(txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.EventNID, error) { var nids pq.Int64Array - err := txn.Stmt(s.selectLatestEventNIDsStmt).QueryRow(int64(roomNID)).Scan(&nids) + var lastEventSentNID int64 + err := txn.Stmt(s.selectLatestEventNIDsStmt).QueryRow(int64(roomNID)).Scan(&nids, &lastEventSentNID) if err != nil { - return nil, err + return nil, 0, err } eventNIDs := make([]types.EventNID, len(nids)) for i := range nids { eventNIDs[i] = types.EventNID(nids[i]) } - return eventNIDs, nil + return eventNIDs, types.EventNID(lastEventSentNID), nil } -func (s *roomStatements) updateLatestEventNIDs(txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID) error { +func (s *roomStatements) updateLatestEventNIDs(txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID, lastEventSentNID types.EventNID) error { nids := make([]int64, len(eventNIDs)) for i := range eventNIDs { nids[i] = int64(eventNIDs[i]) } - _, err := txn.Stmt(s.updateLatestEventNIDsStmt).Exec(roomNID, pq.Int64Array(nids)) + _, err := txn.Stmt(s.updateLatestEventNIDsStmt).Exec(roomNID, pq.Int64Array(nids), int64(lastEventSentNID)) return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index 11fc9f6c4..9db84085e 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -206,22 +206,30 @@ func (d *Database) StateEntries(stateBlockNIDs []types.StateBlockNID) ([]types.S } // GetLatestEventsForUpdate implements input.EventDatabase -func (d *Database) GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, types.RoomRecentEventsUpdater, error) { +func (d *Database) GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, string, types.RoomRecentEventsUpdater, error) { txn, err := d.db.Begin() if err != nil { - return nil, nil, err + return nil, "", nil, err } - eventNIDs, err := d.statements.selectLatestEventsNIDsForUpdate(txn, roomNID) + eventNIDs, lastEventNIDSent, err := d.statements.selectLatestEventsNIDsForUpdate(txn, roomNID) if err != nil { txn.Rollback() - return nil, nil, err + return nil, "", nil, err } stateAndRefs, err := d.statements.bulkSelectStateAtEventAndReference(txn, eventNIDs) if err != nil { txn.Rollback() - return nil, nil, err + return nil, "", nil, err } - return stateAndRefs, &roomRecentEventsUpdater{txn, d}, nil + var lastEventIDSent string + if lastEventNIDSent != 0 { + lastEventIDSent, err = d.statements.selectEventID(txn, lastEventNIDSent) + if err != nil { + txn.Rollback() + return nil, "", nil, err + } + } + return stateAndRefs, lastEventIDSent, &roomRecentEventsUpdater{txn, d}, nil } type roomRecentEventsUpdater struct { @@ -249,12 +257,20 @@ func (u *roomRecentEventsUpdater) IsReferenced(eventReference gomatrixserverlib. return false, err } -func (u *roomRecentEventsUpdater) SetLatestEvents(roomNID types.RoomNID, latest []types.StateAtEventAndReference) error { +func (u *roomRecentEventsUpdater) SetLatestEvents(roomNID types.RoomNID, latest []types.StateAtEventAndReference, lastEventNIDSent types.EventNID) error { eventNIDs := make([]types.EventNID, len(latest)) for i := range latest { eventNIDs[i] = latest[i].EventNID } - return u.d.statements.updateLatestEventNIDs(u.txn, roomNID, eventNIDs) + return u.d.statements.updateLatestEventNIDs(u.txn, roomNID, eventNIDs, lastEventNIDSent) +} + +func (u *roomRecentEventsUpdater) HasEventBeenSent(eventNID types.EventNID) (bool, error) { + return u.d.statements.selectEventSentToOutput(u.txn, eventNID) +} + +func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error { + return u.d.statements.updateEventSentToOutput(u.txn, eventNID) } func (u *roomRecentEventsUpdater) Commit() error { diff --git a/src/github.com/matrix-org/dendrite/roomserver/types/types.go b/src/github.com/matrix-org/dendrite/roomserver/types/types.go index 718c4deb1..0a90d40d1 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/types/types.go +++ b/src/github.com/matrix-org/dendrite/roomserver/types/types.go @@ -143,7 +143,11 @@ type RoomRecentEventsUpdater interface { IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) // Set the list of latest events for the room. // This replaces the current list stored in the database with the given list - SetLatestEvents(roomNID RoomNID, latest []StateAtEventAndReference) error + SetLatestEvents(roomNID RoomNID, latest []StateAtEventAndReference, lastEventNIDSent EventNID) error + // Check if the event has already be written to the output logs. + HasEventBeenSent(eventNID EventNID) (bool, error) + // Mark the event as having been sent to the output logs. + MarkEventAsSent(eventNID EventNID) error // Commit the transaction Commit() error // Rollback the transaction.