From ff90b77a1618327d38de91a4df94819a0efc713c Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 20 Mar 2020 16:11:36 +0000 Subject: [PATCH] Fix two backfill bugs which prevented backfill from working correctly - When receiving backfill requests, do not send the event that was in the original request. - When storing backfill results, correctly update the backwards extremity for the room. --- roomserver/query/query.go | 16 ++++- .../postgres/backward_extremities_table.go | 58 ++++++++--------- syncapi/storage/postgres/syncserver.go | 63 ++++++++++--------- 3 files changed, 77 insertions(+), 60 deletions(-) diff --git a/roomserver/query/query.go b/roomserver/query/query.go index ff875c15f..32e37bfe6 100644 --- a/roomserver/query/query.go +++ b/roomserver/query/query.go @@ -607,6 +607,15 @@ func (r *RoomserverQueryAPI) scanEventTree( var next []string var pre string + // TODO: add tests for this function to ensure it meets the contract that callers expect (and doc what that is supposed to be) + // Currently, callers like QueryBackfill will call scanEventTree with a pre-populated `visited` map, assuming that by doing + // so means that the events in that map will NOT be returned from this function. That is not currently true, resulting in + // duplicate events being sent in response to /backfill requests. + initialIgnoreList := make(map[string]bool, len(visited)) + for k, v := range visited { + initialIgnoreList[k] = v + } + resultNIDs = make([]types.EventNID, 0, limit) var checkedServerInRoom bool @@ -643,8 +652,11 @@ BFSLoop: if len(resultNIDs) == limit { break BFSLoop } - // Update the list of events to retrieve. - resultNIDs = append(resultNIDs, ev.EventNID) + + if !initialIgnoreList[ev.EventID()] { + // Update the list of events to retrieve. + resultNIDs = append(resultNIDs, ev.EventNID) + } // Loop through the event's parents. for _, pre = range ev.PrevEventIDs() { // Only add an event to the list of next events to process if it diff --git a/syncapi/storage/postgres/backward_extremities_table.go b/syncapi/storage/postgres/backward_extremities_table.go index 8286ca434..b3ee28e01 100644 --- a/syncapi/storage/postgres/backward_extremities_table.go +++ b/syncapi/storage/postgres/backward_extremities_table.go @@ -21,39 +21,53 @@ import ( "github.com/matrix-org/dendrite/common" ) +// The purpose of this table is to keep track of backwards extremities for a room. +// Backwards extremities are the earliest (DAG-wise) known events which we have +// the entire event JSON. These event IDs are used in federation requests to fetch +// even earlier events. +// +// We persist the previous event IDs as well, one per row, so when we do fetch even +// earlier events we can simply delete rows which referenced it. Consider the graph: +// A +// | Event C has 1 prev_event ID: A. +// B C +// |___| Event D has 2 prev_event IDs: B and C. +// | +// D +// The earliest known event we have is D, so this table has 2 rows. +// A backfill request gives us C but not B. We delete rows where prev_event=C. This +// still means that D is a backwards extremity as we do not have event B. However, event +// C is *also* a backwards extremity at this point as we do not have event A. Later, +// when we fetch event B, we delete rows where prev_event=B. This then removes D as +// a backwards extremity because there are no more rows with event_id=B. const backwardExtremitiesSchema = ` -- Stores output room events received from the roomserver. CREATE TABLE IF NOT EXISTS syncapi_backward_extremities ( -- The 'room_id' key for the event. room_id TEXT NOT NULL, - -- The event ID for the event. + -- The event ID for the last known event. This is the backwards extremity. event_id TEXT NOT NULL, + -- The prev_events for the last known event. This is used to update extremities. + prev_event_id TEXT NOT NULL, - PRIMARY KEY(room_id, event_id) + PRIMARY KEY(room_id, event_id, prev_event_id) ); ` const insertBackwardExtremitySQL = "" + - "INSERT INTO syncapi_backward_extremities (room_id, event_id)" + - " VALUES ($1, $2)" + + "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" + + " VALUES ($1, $2, $3)" + " ON CONFLICT DO NOTHING" const selectBackwardExtremitiesForRoomSQL = "" + "SELECT event_id FROM syncapi_backward_extremities WHERE room_id = $1" -const isBackwardExtremitySQL = "" + - "SELECT EXISTS (" + - " SELECT TRUE FROM syncapi_backward_extremities" + - " WHERE room_id = $1 AND event_id = $2" + - ")" - const deleteBackwardExtremitySQL = "" + - "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND event_id = $2" + "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" type backwardExtremitiesStatements struct { insertBackwardExtremityStmt *sql.Stmt selectBackwardExtremitiesForRoomStmt *sql.Stmt - isBackwardExtremityStmt *sql.Stmt deleteBackwardExtremityStmt *sql.Stmt } @@ -68,9 +82,6 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) { if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil { return } - if s.isBackwardExtremityStmt, err = db.Prepare(isBackwardExtremitySQL); err != nil { - return - } if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil { return } @@ -78,17 +89,15 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) { } func (s *backwardExtremitiesStatements) insertsBackwardExtremity( - ctx context.Context, roomID, eventID string, + ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string, ) (err error) { - _, err = s.insertBackwardExtremityStmt.ExecContext(ctx, roomID, eventID) + _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID) return } func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom( ctx context.Context, roomID string, ) (eventIDs []string, err error) { - eventIDs = make([]string, 0) - rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID) if err != nil { return @@ -107,16 +116,9 @@ func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom( return eventIDs, rows.Err() } -func (s *backwardExtremitiesStatements) isBackwardExtremity( - ctx context.Context, roomID, eventID string, -) (isBE bool, err error) { - err = s.isBackwardExtremityStmt.QueryRowContext(ctx, roomID, eventID).Scan(&isBE) - return -} - func (s *backwardExtremitiesStatements) deleteBackwardExtremity( - ctx context.Context, roomID, eventID string, + ctx context.Context, txn *sql.Tx, roomID, knownEventID string, ) (err error) { - _, err = s.insertBackwardExtremityStmt.ExecContext(ctx, roomID, eventID) + _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID) return } diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 80af06d59..b63926f0a 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -112,43 +112,44 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([ return d.StreamEventsToEvents(nil, streamEvents), nil } +// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of +// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table +// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such. func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent) error { - // If the event is already known as a backward extremity, don't consider - // it as such anymore now that we have it. - isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, ev.RoomID(), ev.EventID()) - if err != nil { - return err - } - if isBackwardExtremity { - if err = d.backwardExtremities.deleteBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil { + return common.WithTransaction(d.db, func(txn *sql.Tx) error { + if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { + util.GetLogger(ctx).Error("DELETE FAILED: ", err) return err } - } - // Check if we have all of the event's previous events. If an event is - // missing, add it to the room's backward extremities. - prevEvents, err := d.events.selectEvents(ctx, nil, ev.PrevEventIDs()) - if err != nil { - return err - } - var found bool - for _, eID := range ev.PrevEventIDs() { - found = false - for _, prevEv := range prevEvents { - if eID == prevEv.EventID() { - found = true + // Check if we have all of the event's previous events. If an event is + // missing, add it to the room's backward extremities. + prevEvents, err := d.events.selectEvents(ctx, txn, ev.PrevEventIDs()) + if err != nil { + return err + } + var found bool + for _, eID := range ev.PrevEventIDs() { + found = false + for _, prevEv := range prevEvents { + if eID == prevEv.EventID() { + found = true + } + } + + // If the event is missing, consider it a backward extremity. + if !found { + util.GetLogger(ctx).Info(eID, " is a backwards extremity for event ", ev.EventID()) + if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil { + return err + } + } else { + util.GetLogger(ctx).Info(eID, " is NOT a backwards extremity ", ev.EventID()) } } - // If the event is missing, consider it a backward extremity. - if !found { - if err = d.backwardExtremities.insertsBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil { - return err - } - } - } - - return nil + return nil + }) } // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races @@ -272,6 +273,7 @@ func (d *SyncServerDatasource) GetEventsInRange( forwardLimit = to.PDUPosition } + util.GetLogger(ctx).Info("TOPOLOGY SELECT from >", backwardLimit, " to <=", forwardLimit) // Select the event IDs from the defined range. var eIDs []string eIDs, err = d.topology.selectEventIDsInRange( @@ -280,6 +282,7 @@ func (d *SyncServerDatasource) GetEventsInRange( if err != nil { return } + util.GetLogger(ctx).Info("TOPOLOGY SELECTED ", eIDs) // Retrieve the events' contents using their IDs. events, err = d.events.selectEvents(ctx, nil, eIDs)