From c5aef24d02964219873ac8c0e87de9c72ddc01e8 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 24 Feb 2017 11:24:44 +0000 Subject: [PATCH] Track which events have been sent and the last event sent for each room --- .../dendrite/roomserver/input/events.go | 2 +- .../roomserver/input/latest_events.go | 60 ++++++++++++------- .../roomserver/storage/events_table.go | 38 ++++++++++++ .../roomserver/storage/rooms_table.go | 21 ++++--- .../dendrite/roomserver/storage/storage.go | 32 +++++++--- .../dendrite/roomserver/types/types.go | 6 +- 6 files changed, 119 insertions(+), 40 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index 91999916e..36a9f9975 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -37,7 +37,7 @@ type RoomEventDatabase interface { // Lookup the latest events in a room in preparation for an update. // The RoomRecentEventsUpdater must have Commit or Rollback called on it if this doesn't return an error. // If this returns an error then no further action is required. - GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, types.RoomRecentEventsUpdater, error) + GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, string, types.RoomRecentEventsUpdater, error) } func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error { diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go index 0bd18ff75..983ca1efd 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go @@ -24,7 +24,7 @@ import ( func updateLatestEvents( db RoomEventDatabase, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event, ) (err error) { - oldLatest, updater, err := db.GetLatestEventsForUpdate(roomNID) + oldLatest, lastEventIDSent, updater, err := db.GetLatestEventsForUpdate(roomNID) if err != nil { return } @@ -42,22 +42,54 @@ func updateLatestEvents( } }() - err = doUpdateLatestEvents(updater, oldLatest, roomNID, stateAtEvent, event) + err = doUpdateLatestEvents(updater, oldLatest, lastEventIDSent, roomNID, stateAtEvent, event) return } func doUpdateLatestEvents( - updater types.RoomRecentEventsUpdater, oldLatest []types.StateAtEventAndReference, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event, + updater types.RoomRecentEventsUpdater, oldLatest []types.StateAtEventAndReference, lastEventIDSent string, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event, ) error { var err error var prevEvents []gomatrixserverlib.EventReference prevEvents = event.PrevEvents() + if hasBeenSent, err := updater.HasEventBeenSent(stateAtEvent.EventNID); err != nil { + return err + } else if hasBeenSent { + // Already sent this event so we can stop processing + return nil + } + if err = updater.StorePreviousEvents(stateAtEvent.EventNID, prevEvents); err != nil { return err } - // Check if this event references any of the latest events in the room. + eventReference := event.EventReference() + // Check if this event is already referenced by another event in the room. + var alreadyReferenced bool + if alreadyReferenced, err = updater.IsReferenced(eventReference); err != nil { + return err + } + + newLatest := calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{ + EventReference: eventReference, + StateAtEvent: stateAtEvent, + }) + + // TODO: Send the event to the output logs. + + if err = updater.SetLatestEvents(roomNID, newLatest, stateAtEvent.EventNID); err != nil { + return err + } + + if err = updater.MarkEventAsSent(stateAtEvent.EventNID); err != nil { + return err + } + + return nil +} + +func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenced bool, prevEvents []gomatrixserverlib.EventReference, newEvent types.StateAtEventAndReference) []types.StateAtEventAndReference { var alreadyInLatest bool var newLatest []types.StateAtEventAndReference for _, l := range oldLatest { @@ -71,7 +103,7 @@ func doUpdateLatestEvents( break } } - if l.EventNID == stateAtEvent.EventNID { + if l.EventNID == newEvent.EventNID { alreadyInLatest = true } if keep { @@ -80,26 +112,12 @@ func doUpdateLatestEvents( } } - eventReference := event.EventReference() - // Check if this event is already referenced by another event in the room. - var alreadyReferenced bool - if alreadyReferenced, err = updater.IsReferenced(eventReference); err != nil { - return err - } - if !alreadyReferenced && !alreadyInLatest { // This event is not referenced by any of the events in the room // and the event is not already in the latest events. // Add it to the latest events - newLatest = append(newLatest, types.StateAtEventAndReference{ - StateAtEvent: stateAtEvent, - EventReference: eventReference, - }) + newLatest = append(newLatest, newEvent) } - if err = updater.SetLatestEvents(roomNID, newLatest); err != nil { - return err - } - - return nil + return newLatest } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go index d89a1a0dd..a8c5f9c8f 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go @@ -23,6 +23,8 @@ CREATE TABLE IF NOT EXISTS events ( -- Local numeric ID for the state_key of the event -- This is 0 if the event is not a state event. event_state_key_nid BIGINT NOT NULL, + -- Whether the event has been written to the output log. + sent_to_output BOOLEAN NOT NULL DEFAULT FALSE, -- Local numeric ID for the state at the event. -- This is 0 if we don't know the state at the event. -- If the state is not 0 then this event is part of the contiguous @@ -68,6 +70,15 @@ const bulkSelectStateAtEventByIDSQL = "" + const updateEventStateSQL = "" + "UPDATE events SET state_snapshot_nid = $2 WHERE event_nid = $1" +const selectEventSentToOutputSQL = "" + + "SELECT sent_to_output FROM events WHERE event_nid = $1" + +const updateEventSentToOutputSQL = "" + + "UPDATE events SET sent_to_output = TRUE WHERE event_nid = $1" + +const selectEventIDSQL = "" + + "SELECT event_id FROM events WHERE event_nid = $1" + const bulkSelectStateAtEventAndReferenceSQL = "" + "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id, reference_sha256" + " FROM events WHERE event_nid = ANY($1)" @@ -78,6 +89,9 @@ type eventStatements struct { bulkSelectStateEventByIDStmt *sql.Stmt bulkSelectStateAtEventByIDStmt *sql.Stmt updateEventStateStmt *sql.Stmt + selectEventSentToOutputStmt *sql.Stmt + updateEventSentToOutputStmt *sql.Stmt + selectEventIDStmt *sql.Stmt bulkSelectStateAtEventAndReferenceStmt *sql.Stmt } @@ -101,6 +115,15 @@ func (s *eventStatements) prepare(db *sql.DB) (err error) { if s.updateEventStateStmt, err = db.Prepare(updateEventStateSQL); err != nil { return } + if s.updateEventSentToOutputStmt, err = db.Prepare(updateEventSentToOutputSQL); err != nil { + return + } + if s.selectEventSentToOutputStmt, err = db.Prepare(selectEventSentToOutputSQL); err != nil { + return + } + if s.selectEventIDStmt, err = db.Prepare(selectEventIDSQL); err != nil { + return + } if s.bulkSelectStateAtEventAndReferenceStmt, err = db.Prepare(bulkSelectStateAtEventAndReferenceSQL); err != nil { return } @@ -199,6 +222,21 @@ func (s *eventStatements) updateEventState(eventNID types.EventNID, stateNID typ return err } +func (s *eventStatements) selectEventSentToOutput(txn *sql.Tx, eventNID types.EventNID) (sentToOutput bool, err error) { + err = txn.Stmt(s.selectEventSentToOutputStmt).QueryRow(int64(eventNID)).Scan(&sentToOutput) + return +} + +func (s *eventStatements) updateEventSentToOutput(txn *sql.Tx, eventNID types.EventNID) error { + _, err := txn.Stmt(s.updateEventSentToOutputStmt).Exec(int64(eventNID)) + return err +} + +func (s *eventStatements) selectEventID(txn *sql.Tx, eventNID types.EventNID) (eventID string, err error) { + err = txn.Stmt(s.selectEventIDStmt).QueryRow(int64(eventNID)).Scan(&eventID) + return +} + func (s *eventStatements) bulkSelectStateAtEventAndReference(txn *sql.Tx, eventNIDs []types.EventNID) ([]types.StateAtEventAndReference, error) { nids := make([]int64, len(eventNIDs)) for i := range eventNIDs { diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go index 78c1c883f..ff932344b 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go @@ -16,7 +16,9 @@ CREATE TABLE IF NOT EXISTS rooms ( -- The most recent events in the room that aren't referenced by another event. -- This list may empty if the server hasn't joined the room yet. -- (The server will be in that state while it stores the events for the initial state of the room) - latest_event_nids BIGINT[] NOT NULL DEFAULT '{}'::BIGINT[] + latest_event_nids BIGINT[] NOT NULL DEFAULT '{}'::BIGINT[], + -- The last event written to the output log for this room. + last_event_sent_nid BIGINT NOT NULL DEFAULT 0 ); ` @@ -30,10 +32,10 @@ const selectRoomNIDSQL = "" + "SELECT room_nid FROM rooms WHERE room_id = $1" const selectLatestEventNIDsSQL = "" + - "SELECT latest_event_nids FROM rooms WHERE room_nid = $1 FOR UPDATE" + "SELECT latest_event_nids, last_event_sent_nid FROM rooms WHERE room_nid = $1 FOR UPDATE" const updateLatestEventNIDsSQL = "" + - "UPDATE rooms SET latest_event_nids = $2 WHERE room_nid = $1" + "UPDATE rooms SET latest_event_nids = $2, last_event_sent_nid = $3 WHERE room_nid = $1" type roomStatements struct { insertRoomNIDStmt *sql.Stmt @@ -74,24 +76,25 @@ func (s *roomStatements) selectRoomNID(roomID string) (types.RoomNID, error) { return types.RoomNID(roomNID), err } -func (s *roomStatements) selectLatestEventsNIDsForUpdate(txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, error) { +func (s *roomStatements) selectLatestEventsNIDsForUpdate(txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.EventNID, error) { var nids pq.Int64Array - err := txn.Stmt(s.selectLatestEventNIDsStmt).QueryRow(int64(roomNID)).Scan(&nids) + var lastEventSentNID int64 + err := txn.Stmt(s.selectLatestEventNIDsStmt).QueryRow(int64(roomNID)).Scan(&nids, &lastEventSentNID) if err != nil { - return nil, err + return nil, 0, err } eventNIDs := make([]types.EventNID, len(nids)) for i := range nids { eventNIDs[i] = types.EventNID(nids[i]) } - return eventNIDs, nil + return eventNIDs, types.EventNID(lastEventSentNID), nil } -func (s *roomStatements) updateLatestEventNIDs(txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID) error { +func (s *roomStatements) updateLatestEventNIDs(txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID, lastEventSentNID types.EventNID) error { nids := make([]int64, len(eventNIDs)) for i := range eventNIDs { nids[i] = int64(eventNIDs[i]) } - _, err := txn.Stmt(s.updateLatestEventNIDsStmt).Exec(roomNID, pq.Int64Array(nids)) + _, err := txn.Stmt(s.updateLatestEventNIDsStmt).Exec(roomNID, pq.Int64Array(nids), int64(lastEventSentNID)) return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index 11fc9f6c4..9db84085e 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -206,22 +206,30 @@ func (d *Database) StateEntries(stateBlockNIDs []types.StateBlockNID) ([]types.S } // GetLatestEventsForUpdate implements input.EventDatabase -func (d *Database) GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, types.RoomRecentEventsUpdater, error) { +func (d *Database) GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, string, types.RoomRecentEventsUpdater, error) { txn, err := d.db.Begin() if err != nil { - return nil, nil, err + return nil, "", nil, err } - eventNIDs, err := d.statements.selectLatestEventsNIDsForUpdate(txn, roomNID) + eventNIDs, lastEventNIDSent, err := d.statements.selectLatestEventsNIDsForUpdate(txn, roomNID) if err != nil { txn.Rollback() - return nil, nil, err + return nil, "", nil, err } stateAndRefs, err := d.statements.bulkSelectStateAtEventAndReference(txn, eventNIDs) if err != nil { txn.Rollback() - return nil, nil, err + return nil, "", nil, err } - return stateAndRefs, &roomRecentEventsUpdater{txn, d}, nil + var lastEventIDSent string + if lastEventNIDSent != 0 { + lastEventIDSent, err = d.statements.selectEventID(txn, lastEventNIDSent) + if err != nil { + txn.Rollback() + return nil, "", nil, err + } + } + return stateAndRefs, lastEventIDSent, &roomRecentEventsUpdater{txn, d}, nil } type roomRecentEventsUpdater struct { @@ -249,12 +257,20 @@ func (u *roomRecentEventsUpdater) IsReferenced(eventReference gomatrixserverlib. return false, err } -func (u *roomRecentEventsUpdater) SetLatestEvents(roomNID types.RoomNID, latest []types.StateAtEventAndReference) error { +func (u *roomRecentEventsUpdater) SetLatestEvents(roomNID types.RoomNID, latest []types.StateAtEventAndReference, lastEventNIDSent types.EventNID) error { eventNIDs := make([]types.EventNID, len(latest)) for i := range latest { eventNIDs[i] = latest[i].EventNID } - return u.d.statements.updateLatestEventNIDs(u.txn, roomNID, eventNIDs) + return u.d.statements.updateLatestEventNIDs(u.txn, roomNID, eventNIDs, lastEventNIDSent) +} + +func (u *roomRecentEventsUpdater) HasEventBeenSent(eventNID types.EventNID) (bool, error) { + return u.d.statements.selectEventSentToOutput(u.txn, eventNID) +} + +func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error { + return u.d.statements.updateEventSentToOutput(u.txn, eventNID) } func (u *roomRecentEventsUpdater) Commit() error { diff --git a/src/github.com/matrix-org/dendrite/roomserver/types/types.go b/src/github.com/matrix-org/dendrite/roomserver/types/types.go index 718c4deb1..0a90d40d1 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/types/types.go +++ b/src/github.com/matrix-org/dendrite/roomserver/types/types.go @@ -143,7 +143,11 @@ type RoomRecentEventsUpdater interface { IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) // Set the list of latest events for the room. // This replaces the current list stored in the database with the given list - SetLatestEvents(roomNID RoomNID, latest []StateAtEventAndReference) error + SetLatestEvents(roomNID RoomNID, latest []StateAtEventAndReference, lastEventNIDSent EventNID) error + // Check if the event has already be written to the output logs. + HasEventBeenSent(eventNID EventNID) (bool, error) + // Mark the event as having been sent to the output logs. + MarkEventAsSent(eventNID EventNID) error // Commit the transaction Commit() error // Rollback the transaction.