diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index e8e1c1792..1bf9ad339 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -188,11 +188,6 @@ func (r *messagesReq) retrieveEvents() ( err = fmt.Errorf("GetEventsInRange: %w", err) return } - for i := range streamEvents { - s := streamEvents[i] - util.GetLogger(r.ctx).Info("spos=", s.StreamPosition, " content=", string(s.Content())) - } - util.GetLogger(r.ctx).Info("Found ", len(streamEvents), " in the database") var events []gomatrixserverlib.HeaderedEvent diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 1f4bb31ca..0b53dfa9e 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -23,7 +23,6 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/util" "github.com/lib/pq" "github.com/matrix-org/dendrite/common" @@ -289,7 +288,6 @@ func (s *outputRoomEventsStatements) insertEvent( txnID, excludeFromSync, ).Scan(&streamPos) - util.GetLogger(ctx).Info("INSERT EVENT: ", event.RoomID(), " Type:", event.Type(), " pos:", streamPos) return } @@ -307,7 +305,6 @@ func (s *outputRoomEventsStatements) selectRecentEvents( } else { stmt = common.TxStmt(txn, s.selectRecentEventsStmt) } - util.GetLogger(ctx).Info("SELECT ", roomID, " from=", fromPos, " to=", toPos, " limit=", limit) rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) if err != nil { return nil, err @@ -317,9 +314,6 @@ func (s *outputRoomEventsStatements) selectRecentEvents( if err != nil { return nil, err } - for _, e := range events { - util.GetLogger(ctx).Info("SELECTED ", e.RoomID(), " Type:", e.Type(), " pos:", e.StreamPosition) - } if chronologicalOrder { // The events need to be returned from oldest to latest, which isn't // necessary the way the SQL query returns them, so a sort is necessary to diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index b63926f0a..f3f1aabc7 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -26,7 +26,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/util" // Import the postgres database driver. _ "github.com/lib/pq" @@ -115,41 +114,35 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([ // handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of // the events listed in the event's 'prev_events'. This function also updates the backwards extremities table // to account for the fact that the given event is no longer a backwards extremity, but may be marked as such. -func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent) error { - return common.WithTransaction(d.db, func(txn *sql.Tx) error { - if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { - util.GetLogger(ctx).Error("DELETE FAILED: ", err) - return err - } +func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error { + if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { + return err + } - // Check if we have all of the event's previous events. If an event is - // missing, add it to the room's backward extremities. - prevEvents, err := d.events.selectEvents(ctx, txn, ev.PrevEventIDs()) - if err != nil { - return err - } - var found bool - for _, eID := range ev.PrevEventIDs() { - found = false - for _, prevEv := range prevEvents { - if eID == prevEv.EventID() { - found = true - } - } - - // If the event is missing, consider it a backward extremity. - if !found { - util.GetLogger(ctx).Info(eID, " is a backwards extremity for event ", ev.EventID()) - if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil { - return err - } - } else { - util.GetLogger(ctx).Info(eID, " is NOT a backwards extremity ", ev.EventID()) + // Check if we have all of the event's previous events. If an event is + // missing, add it to the room's backward extremities. + prevEvents, err := d.events.selectEvents(ctx, txn, ev.PrevEventIDs()) + if err != nil { + return err + } + var found bool + for _, eID := range ev.PrevEventIDs() { + found = false + for _, prevEv := range prevEvents { + if eID == prevEv.EventID() { + found = true } } - return nil - }) + // If the event is missing, consider it a backward extremity. + if !found { + if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil { + return err + } + } + } + + return nil } // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races @@ -176,7 +169,7 @@ func (d *SyncServerDatasource) WriteEvent( return err } - if err = d.handleBackwardExtremities(ctx, ev); err != nil { + if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil { return err } @@ -273,7 +266,6 @@ func (d *SyncServerDatasource) GetEventsInRange( forwardLimit = to.PDUPosition } - util.GetLogger(ctx).Info("TOPOLOGY SELECT from >", backwardLimit, " to <=", forwardLimit) // Select the event IDs from the defined range. var eIDs []string eIDs, err = d.topology.selectEventIDsInRange( @@ -282,7 +274,6 @@ func (d *SyncServerDatasource) GetEventsInRange( if err != nil { return } - util.GetLogger(ctx).Info("TOPOLOGY SELECTED ", eIDs) // Retrieve the events' contents using their IDs. events, err = d.events.selectEvents(ctx, nil, eIDs) @@ -300,7 +291,6 @@ func (d *SyncServerDatasource) GetEventsInRange( ); err != nil { return } - util.GetLogger(ctx).Info("selectRecentEvents from token: ", from.PDUPosition) } else { // When using forward ordering, we want the least recent events first. if events, err = d.events.selectEarlyEvents( diff --git a/syncapi/storage/sqlite3/backward_extremities_table.go b/syncapi/storage/sqlite3/backward_extremities_table.go index fcf15da25..0663e2a12 100644 --- a/syncapi/storage/sqlite3/backward_extremities_table.go +++ b/syncapi/storage/sqlite3/backward_extremities_table.go @@ -21,38 +21,53 @@ import ( "github.com/matrix-org/dendrite/common" ) +// The purpose of this table is to keep track of backwards extremities for a room. +// Backwards extremities are the earliest (DAG-wise) known events which we have +// the entire event JSON. These event IDs are used in federation requests to fetch +// even earlier events. +// +// We persist the previous event IDs as well, one per row, so when we do fetch even +// earlier events we can simply delete rows which referenced it. Consider the graph: +// A +// | Event C has 1 prev_event ID: A. +// B C +// |___| Event D has 2 prev_event IDs: B and C. +// | +// D +// The earliest known event we have is D, so this table has 2 rows. +// A backfill request gives us C but not B. We delete rows where prev_event=C. This +// still means that D is a backwards extremity as we do not have event B. However, event +// C is *also* a backwards extremity at this point as we do not have event A. Later, +// when we fetch event B, we delete rows where prev_event=B. This then removes D as +// a backwards extremity because there are no more rows with event_id=B. const backwardExtremitiesSchema = ` -- Stores output room events received from the roomserver. CREATE TABLE IF NOT EXISTS syncapi_backward_extremities ( + -- The 'room_id' key for the event. room_id TEXT NOT NULL, + -- The event ID for the last known event. This is the backwards extremity. event_id TEXT NOT NULL, + -- The prev_events for the last known event. This is used to update extremities. + prev_event_id TEXT NOT NULL, - PRIMARY KEY(room_id, event_id) + PRIMARY KEY(room_id, event_id, prev_event_id) ); ` const insertBackwardExtremitySQL = "" + - "INSERT INTO syncapi_backward_extremities (room_id, event_id)" + - " VALUES ($1, $2)" + - " ON CONFLICT (room_id, event_id) DO NOTHING" + "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" + + " VALUES ($1, $2, $3)" + + " ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING" const selectBackwardExtremitiesForRoomSQL = "" + "SELECT event_id FROM syncapi_backward_extremities WHERE room_id = $1" -const isBackwardExtremitySQL = "" + - "SELECT EXISTS (" + - " SELECT TRUE FROM syncapi_backward_extremities" + - " WHERE room_id = $1 AND event_id = $2" + - ")" - const deleteBackwardExtremitySQL = "" + - "DELETE FROM syncapi_backward_extremities" + - " WHERE room_id = $1 AND event_id = $2" + "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" type backwardExtremitiesStatements struct { insertBackwardExtremityStmt *sql.Stmt selectBackwardExtremitiesForRoomStmt *sql.Stmt - isBackwardExtremityStmt *sql.Stmt deleteBackwardExtremityStmt *sql.Stmt } @@ -67,9 +82,6 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) { if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil { return } - if s.isBackwardExtremityStmt, err = db.Prepare(isBackwardExtremitySQL); err != nil { - return - } if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil { return } @@ -77,23 +89,20 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) { } func (s *backwardExtremitiesStatements) insertsBackwardExtremity( - ctx context.Context, txn *sql.Tx, roomID, eventID string, + ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string, ) (err error) { - stmt := common.TxStmt(txn, s.insertBackwardExtremityStmt) - _, err = stmt.ExecContext(ctx, roomID, eventID) + _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID) return } func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom( - ctx context.Context, txn *sql.Tx, roomID string, + ctx context.Context, roomID string, ) (eventIDs []string, err error) { - eventIDs = make([]string, 0) - - stmt := common.TxStmt(txn, s.selectBackwardExtremitiesForRoomStmt) - rows, err := stmt.QueryContext(ctx, roomID) + rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID) if err != nil { return } + defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed") for rows.Next() { var eID string @@ -104,21 +113,12 @@ func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom( eventIDs = append(eventIDs, eID) } - return -} - -func (s *backwardExtremitiesStatements) isBackwardExtremity( - ctx context.Context, txn *sql.Tx, roomID, eventID string, -) (isBE bool, err error) { - stmt := common.TxStmt(txn, s.isBackwardExtremityStmt) - err = stmt.QueryRowContext(ctx, roomID, eventID).Scan(&isBE) - return + return eventIDs, rows.Err() } func (s *backwardExtremitiesStatements) deleteBackwardExtremity( - ctx context.Context, txn *sql.Tx, roomID, eventID string, + ctx context.Context, txn *sql.Tx, roomID, knownEventID string, ) (err error) { - stmt := common.TxStmt(txn, s.deleteBackwardExtremityStmt) - _, err = stmt.ExecContext(ctx, roomID, eventID) + _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID) return } diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index a06fc91f5..8ff189007 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -137,18 +137,13 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([ return d.StreamEventsToEvents(nil, streamEvents), nil } +// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of +// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table +// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such. func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error { - // If the event is already known as a backward extremity, don't consider - // it as such anymore now that we have it. - isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()) - if err != nil { + if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { return err } - if isBackwardExtremity { - if err = d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { - return err - } - } // Check if we have all of the event's previous events. If an event is // missing, add it to the room's backward extremities. @@ -167,7 +162,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx // If the event is missing, consider it a backward extremity. if !found { - if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { + if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil { return err } } @@ -348,7 +343,7 @@ func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.Pagi func (d *SyncServerDatasource) BackwardExtremitiesForRoom( ctx context.Context, roomID string, ) (backwardExtremities []string, err error) { - return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, nil, roomID) + return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, roomID) } // MaxTopologicalPosition returns the highest topological position for a given