mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-06 22:43:10 -06:00
Track which events have been sent and the last event sent for each room
This commit is contained in:
parent
a9b296c522
commit
c5aef24d02
|
|
@ -37,7 +37,7 @@ type RoomEventDatabase interface {
|
|||
// Lookup the latest events in a room in preparation for an update.
|
||||
// The RoomRecentEventsUpdater must have Commit or Rollback called on it if this doesn't return an error.
|
||||
// If this returns an error then no further action is required.
|
||||
GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, types.RoomRecentEventsUpdater, error)
|
||||
GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, string, types.RoomRecentEventsUpdater, error)
|
||||
}
|
||||
|
||||
func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error {
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ import (
|
|||
func updateLatestEvents(
|
||||
db RoomEventDatabase, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event,
|
||||
) (err error) {
|
||||
oldLatest, updater, err := db.GetLatestEventsForUpdate(roomNID)
|
||||
oldLatest, lastEventIDSent, updater, err := db.GetLatestEventsForUpdate(roomNID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
|
@ -42,22 +42,54 @@ func updateLatestEvents(
|
|||
}
|
||||
}()
|
||||
|
||||
err = doUpdateLatestEvents(updater, oldLatest, roomNID, stateAtEvent, event)
|
||||
err = doUpdateLatestEvents(updater, oldLatest, lastEventIDSent, roomNID, stateAtEvent, event)
|
||||
return
|
||||
}
|
||||
|
||||
func doUpdateLatestEvents(
|
||||
updater types.RoomRecentEventsUpdater, oldLatest []types.StateAtEventAndReference, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event,
|
||||
updater types.RoomRecentEventsUpdater, oldLatest []types.StateAtEventAndReference, lastEventIDSent string, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event,
|
||||
) error {
|
||||
var err error
|
||||
var prevEvents []gomatrixserverlib.EventReference
|
||||
prevEvents = event.PrevEvents()
|
||||
|
||||
if hasBeenSent, err := updater.HasEventBeenSent(stateAtEvent.EventNID); err != nil {
|
||||
return err
|
||||
} else if hasBeenSent {
|
||||
// Already sent this event so we can stop processing
|
||||
return nil
|
||||
}
|
||||
|
||||
if err = updater.StorePreviousEvents(stateAtEvent.EventNID, prevEvents); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if this event references any of the latest events in the room.
|
||||
eventReference := event.EventReference()
|
||||
// Check if this event is already referenced by another event in the room.
|
||||
var alreadyReferenced bool
|
||||
if alreadyReferenced, err = updater.IsReferenced(eventReference); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newLatest := calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{
|
||||
EventReference: eventReference,
|
||||
StateAtEvent: stateAtEvent,
|
||||
})
|
||||
|
||||
// TODO: Send the event to the output logs.
|
||||
|
||||
if err = updater.SetLatestEvents(roomNID, newLatest, stateAtEvent.EventNID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = updater.MarkEventAsSent(stateAtEvent.EventNID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenced bool, prevEvents []gomatrixserverlib.EventReference, newEvent types.StateAtEventAndReference) []types.StateAtEventAndReference {
|
||||
var alreadyInLatest bool
|
||||
var newLatest []types.StateAtEventAndReference
|
||||
for _, l := range oldLatest {
|
||||
|
|
@ -71,7 +103,7 @@ func doUpdateLatestEvents(
|
|||
break
|
||||
}
|
||||
}
|
||||
if l.EventNID == stateAtEvent.EventNID {
|
||||
if l.EventNID == newEvent.EventNID {
|
||||
alreadyInLatest = true
|
||||
}
|
||||
if keep {
|
||||
|
|
@ -80,26 +112,12 @@ func doUpdateLatestEvents(
|
|||
}
|
||||
}
|
||||
|
||||
eventReference := event.EventReference()
|
||||
// Check if this event is already referenced by another event in the room.
|
||||
var alreadyReferenced bool
|
||||
if alreadyReferenced, err = updater.IsReferenced(eventReference); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !alreadyReferenced && !alreadyInLatest {
|
||||
// This event is not referenced by any of the events in the room
|
||||
// and the event is not already in the latest events.
|
||||
// Add it to the latest events
|
||||
newLatest = append(newLatest, types.StateAtEventAndReference{
|
||||
StateAtEvent: stateAtEvent,
|
||||
EventReference: eventReference,
|
||||
})
|
||||
newLatest = append(newLatest, newEvent)
|
||||
}
|
||||
|
||||
if err = updater.SetLatestEvents(roomNID, newLatest); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return newLatest
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,6 +23,8 @@ CREATE TABLE IF NOT EXISTS events (
|
|||
-- Local numeric ID for the state_key of the event
|
||||
-- This is 0 if the event is not a state event.
|
||||
event_state_key_nid BIGINT NOT NULL,
|
||||
-- Whether the event has been written to the output log.
|
||||
sent_to_output BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
-- Local numeric ID for the state at the event.
|
||||
-- This is 0 if we don't know the state at the event.
|
||||
-- If the state is not 0 then this event is part of the contiguous
|
||||
|
|
@ -68,6 +70,15 @@ const bulkSelectStateAtEventByIDSQL = "" +
|
|||
const updateEventStateSQL = "" +
|
||||
"UPDATE events SET state_snapshot_nid = $2 WHERE event_nid = $1"
|
||||
|
||||
const selectEventSentToOutputSQL = "" +
|
||||
"SELECT sent_to_output FROM events WHERE event_nid = $1"
|
||||
|
||||
const updateEventSentToOutputSQL = "" +
|
||||
"UPDATE events SET sent_to_output = TRUE WHERE event_nid = $1"
|
||||
|
||||
const selectEventIDSQL = "" +
|
||||
"SELECT event_id FROM events WHERE event_nid = $1"
|
||||
|
||||
const bulkSelectStateAtEventAndReferenceSQL = "" +
|
||||
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id, reference_sha256" +
|
||||
" FROM events WHERE event_nid = ANY($1)"
|
||||
|
|
@ -78,6 +89,9 @@ type eventStatements struct {
|
|||
bulkSelectStateEventByIDStmt *sql.Stmt
|
||||
bulkSelectStateAtEventByIDStmt *sql.Stmt
|
||||
updateEventStateStmt *sql.Stmt
|
||||
selectEventSentToOutputStmt *sql.Stmt
|
||||
updateEventSentToOutputStmt *sql.Stmt
|
||||
selectEventIDStmt *sql.Stmt
|
||||
bulkSelectStateAtEventAndReferenceStmt *sql.Stmt
|
||||
}
|
||||
|
||||
|
|
@ -101,6 +115,15 @@ func (s *eventStatements) prepare(db *sql.DB) (err error) {
|
|||
if s.updateEventStateStmt, err = db.Prepare(updateEventStateSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.updateEventSentToOutputStmt, err = db.Prepare(updateEventSentToOutputSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectEventSentToOutputStmt, err = db.Prepare(selectEventSentToOutputSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectEventIDStmt, err = db.Prepare(selectEventIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.bulkSelectStateAtEventAndReferenceStmt, err = db.Prepare(bulkSelectStateAtEventAndReferenceSQL); err != nil {
|
||||
return
|
||||
}
|
||||
|
|
@ -199,6 +222,21 @@ func (s *eventStatements) updateEventState(eventNID types.EventNID, stateNID typ
|
|||
return err
|
||||
}
|
||||
|
||||
func (s *eventStatements) selectEventSentToOutput(txn *sql.Tx, eventNID types.EventNID) (sentToOutput bool, err error) {
|
||||
err = txn.Stmt(s.selectEventSentToOutputStmt).QueryRow(int64(eventNID)).Scan(&sentToOutput)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *eventStatements) updateEventSentToOutput(txn *sql.Tx, eventNID types.EventNID) error {
|
||||
_, err := txn.Stmt(s.updateEventSentToOutputStmt).Exec(int64(eventNID))
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *eventStatements) selectEventID(txn *sql.Tx, eventNID types.EventNID) (eventID string, err error) {
|
||||
err = txn.Stmt(s.selectEventIDStmt).QueryRow(int64(eventNID)).Scan(&eventID)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *eventStatements) bulkSelectStateAtEventAndReference(txn *sql.Tx, eventNIDs []types.EventNID) ([]types.StateAtEventAndReference, error) {
|
||||
nids := make([]int64, len(eventNIDs))
|
||||
for i := range eventNIDs {
|
||||
|
|
|
|||
|
|
@ -16,7 +16,9 @@ CREATE TABLE IF NOT EXISTS rooms (
|
|||
-- The most recent events in the room that aren't referenced by another event.
|
||||
-- This list may empty if the server hasn't joined the room yet.
|
||||
-- (The server will be in that state while it stores the events for the initial state of the room)
|
||||
latest_event_nids BIGINT[] NOT NULL DEFAULT '{}'::BIGINT[]
|
||||
latest_event_nids BIGINT[] NOT NULL DEFAULT '{}'::BIGINT[],
|
||||
-- The last event written to the output log for this room.
|
||||
last_event_sent_nid BIGINT NOT NULL DEFAULT 0
|
||||
);
|
||||
`
|
||||
|
||||
|
|
@ -30,10 +32,10 @@ const selectRoomNIDSQL = "" +
|
|||
"SELECT room_nid FROM rooms WHERE room_id = $1"
|
||||
|
||||
const selectLatestEventNIDsSQL = "" +
|
||||
"SELECT latest_event_nids FROM rooms WHERE room_nid = $1 FOR UPDATE"
|
||||
"SELECT latest_event_nids, last_event_sent_nid FROM rooms WHERE room_nid = $1 FOR UPDATE"
|
||||
|
||||
const updateLatestEventNIDsSQL = "" +
|
||||
"UPDATE rooms SET latest_event_nids = $2 WHERE room_nid = $1"
|
||||
"UPDATE rooms SET latest_event_nids = $2, last_event_sent_nid = $3 WHERE room_nid = $1"
|
||||
|
||||
type roomStatements struct {
|
||||
insertRoomNIDStmt *sql.Stmt
|
||||
|
|
@ -74,24 +76,25 @@ func (s *roomStatements) selectRoomNID(roomID string) (types.RoomNID, error) {
|
|||
return types.RoomNID(roomNID), err
|
||||
}
|
||||
|
||||
func (s *roomStatements) selectLatestEventsNIDsForUpdate(txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, error) {
|
||||
func (s *roomStatements) selectLatestEventsNIDsForUpdate(txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.EventNID, error) {
|
||||
var nids pq.Int64Array
|
||||
err := txn.Stmt(s.selectLatestEventNIDsStmt).QueryRow(int64(roomNID)).Scan(&nids)
|
||||
var lastEventSentNID int64
|
||||
err := txn.Stmt(s.selectLatestEventNIDsStmt).QueryRow(int64(roomNID)).Scan(&nids, &lastEventSentNID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, 0, err
|
||||
}
|
||||
eventNIDs := make([]types.EventNID, len(nids))
|
||||
for i := range nids {
|
||||
eventNIDs[i] = types.EventNID(nids[i])
|
||||
}
|
||||
return eventNIDs, nil
|
||||
return eventNIDs, types.EventNID(lastEventSentNID), nil
|
||||
}
|
||||
|
||||
func (s *roomStatements) updateLatestEventNIDs(txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID) error {
|
||||
func (s *roomStatements) updateLatestEventNIDs(txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID, lastEventSentNID types.EventNID) error {
|
||||
nids := make([]int64, len(eventNIDs))
|
||||
for i := range eventNIDs {
|
||||
nids[i] = int64(eventNIDs[i])
|
||||
}
|
||||
_, err := txn.Stmt(s.updateLatestEventNIDsStmt).Exec(roomNID, pq.Int64Array(nids))
|
||||
_, err := txn.Stmt(s.updateLatestEventNIDsStmt).Exec(roomNID, pq.Int64Array(nids), int64(lastEventSentNID))
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -206,22 +206,30 @@ func (d *Database) StateEntries(stateBlockNIDs []types.StateBlockNID) ([]types.S
|
|||
}
|
||||
|
||||
// GetLatestEventsForUpdate implements input.EventDatabase
|
||||
func (d *Database) GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, types.RoomRecentEventsUpdater, error) {
|
||||
func (d *Database) GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, string, types.RoomRecentEventsUpdater, error) {
|
||||
txn, err := d.db.Begin()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, "", nil, err
|
||||
}
|
||||
eventNIDs, err := d.statements.selectLatestEventsNIDsForUpdate(txn, roomNID)
|
||||
eventNIDs, lastEventNIDSent, err := d.statements.selectLatestEventsNIDsForUpdate(txn, roomNID)
|
||||
if err != nil {
|
||||
txn.Rollback()
|
||||
return nil, nil, err
|
||||
return nil, "", nil, err
|
||||
}
|
||||
stateAndRefs, err := d.statements.bulkSelectStateAtEventAndReference(txn, eventNIDs)
|
||||
if err != nil {
|
||||
txn.Rollback()
|
||||
return nil, nil, err
|
||||
return nil, "", nil, err
|
||||
}
|
||||
return stateAndRefs, &roomRecentEventsUpdater{txn, d}, nil
|
||||
var lastEventIDSent string
|
||||
if lastEventNIDSent != 0 {
|
||||
lastEventIDSent, err = d.statements.selectEventID(txn, lastEventNIDSent)
|
||||
if err != nil {
|
||||
txn.Rollback()
|
||||
return nil, "", nil, err
|
||||
}
|
||||
}
|
||||
return stateAndRefs, lastEventIDSent, &roomRecentEventsUpdater{txn, d}, nil
|
||||
}
|
||||
|
||||
type roomRecentEventsUpdater struct {
|
||||
|
|
@ -249,12 +257,20 @@ func (u *roomRecentEventsUpdater) IsReferenced(eventReference gomatrixserverlib.
|
|||
return false, err
|
||||
}
|
||||
|
||||
func (u *roomRecentEventsUpdater) SetLatestEvents(roomNID types.RoomNID, latest []types.StateAtEventAndReference) error {
|
||||
func (u *roomRecentEventsUpdater) SetLatestEvents(roomNID types.RoomNID, latest []types.StateAtEventAndReference, lastEventNIDSent types.EventNID) error {
|
||||
eventNIDs := make([]types.EventNID, len(latest))
|
||||
for i := range latest {
|
||||
eventNIDs[i] = latest[i].EventNID
|
||||
}
|
||||
return u.d.statements.updateLatestEventNIDs(u.txn, roomNID, eventNIDs)
|
||||
return u.d.statements.updateLatestEventNIDs(u.txn, roomNID, eventNIDs, lastEventNIDSent)
|
||||
}
|
||||
|
||||
func (u *roomRecentEventsUpdater) HasEventBeenSent(eventNID types.EventNID) (bool, error) {
|
||||
return u.d.statements.selectEventSentToOutput(u.txn, eventNID)
|
||||
}
|
||||
|
||||
func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error {
|
||||
return u.d.statements.updateEventSentToOutput(u.txn, eventNID)
|
||||
}
|
||||
|
||||
func (u *roomRecentEventsUpdater) Commit() error {
|
||||
|
|
|
|||
|
|
@ -143,7 +143,11 @@ type RoomRecentEventsUpdater interface {
|
|||
IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error)
|
||||
// Set the list of latest events for the room.
|
||||
// This replaces the current list stored in the database with the given list
|
||||
SetLatestEvents(roomNID RoomNID, latest []StateAtEventAndReference) error
|
||||
SetLatestEvents(roomNID RoomNID, latest []StateAtEventAndReference, lastEventNIDSent EventNID) error
|
||||
// Check if the event has already be written to the output logs.
|
||||
HasEventBeenSent(eventNID EventNID) (bool, error)
|
||||
// Mark the event as having been sent to the output logs.
|
||||
MarkEventAsSent(eventNID EventNID) error
|
||||
// Commit the transaction
|
||||
Commit() error
|
||||
// Rollback the transaction.
|
||||
|
|
|
|||
Loading…
Reference in a new issue