diff --git a/cmd/resolve-state/main.go b/cmd/resolve-state/main.go new file mode 100644 index 000000000..9fb14f056 --- /dev/null +++ b/cmd/resolve-state/main.go @@ -0,0 +1,132 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "strconv" + + "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/roomserver/state" + "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/gomatrixserverlib" +) + +// This is a utility for inspecting state snapshots and running state resolution +// against real snapshots in an actual database. +// It takes one or more state snapshot NIDs as arguments, along with a room version +// to use for unmarshalling events, and will produce resolved output. +// +// Usage: ./resolve-state --roomversion=version snapshot [snapshot ...] +// e.g. ./resolve-state --roomversion=5 1254 1235 1282 + +var roomVersion = flag.String("roomversion", "5", "the room version to parse events as") + +// nolint:gocyclo +func main() { + ctx := context.Background() + cfg := setup.ParseFlags(true) + args := os.Args[1:] + + fmt.Println("Room version", *roomVersion) + + snapshotNIDs := []types.StateSnapshotNID{} + for _, arg := range args { + if i, err := strconv.Atoi(arg); err == nil { + snapshotNIDs = append(snapshotNIDs, types.StateSnapshotNID(i)) + } + } + + fmt.Println("Fetching", len(snapshotNIDs), "snapshot NIDs") + + cache, err := caching.NewInMemoryLRUCache(true) + if err != nil { + panic(err) + } + + roomserverDB, err := storage.Open(&cfg.RoomServer.Database, cache) + if err != nil { + panic(err) + } + + blockNIDs, err := roomserverDB.StateBlockNIDs(ctx, snapshotNIDs) + if err != nil { + panic(err) + } + + var stateEntries []types.StateEntryList + for _, list := range blockNIDs { + entries, err2 := roomserverDB.StateEntries(ctx, list.StateBlockNIDs) + if err2 != nil { + panic(err2) + } + stateEntries = append(stateEntries, entries...) + } + + var eventNIDs []types.EventNID + for _, entry := range stateEntries { + for _, e := range entry.StateEntries { + eventNIDs = append(eventNIDs, e.EventNID) + } + } + + fmt.Println("Fetching", len(eventNIDs), "state events") + eventEntries, err := roomserverDB.Events(ctx, eventNIDs) + if err != nil { + panic(err) + } + + authEventIDMap := make(map[string]struct{}) + eventPtrs := make([]*gomatrixserverlib.Event, len(eventEntries)) + for i := range eventEntries { + eventPtrs[i] = &eventEntries[i].Event + for _, authEventID := range eventEntries[i].AuthEventIDs() { + authEventIDMap[authEventID] = struct{}{} + } + } + + authEventIDs := make([]string, 0, len(authEventIDMap)) + for authEventID := range authEventIDMap { + authEventIDs = append(authEventIDs, authEventID) + } + + fmt.Println("Fetching", len(authEventIDs), "auth events") + authEventEntries, err := roomserverDB.EventsFromIDs(ctx, authEventIDs) + if err != nil { + panic(err) + } + + authEventPtrs := make([]*gomatrixserverlib.Event, len(authEventEntries)) + for i := range authEventEntries { + authEventPtrs[i] = &authEventEntries[i].Event + } + + events := make([]gomatrixserverlib.Event, len(eventEntries)) + authEvents := make([]gomatrixserverlib.Event, len(authEventEntries)) + for i, ptr := range eventPtrs { + events[i] = *ptr + } + for i, ptr := range authEventPtrs { + authEvents[i] = *ptr + } + + fmt.Println("Resolving state") + resolved, err := state.ResolveConflictsAdhoc( + gomatrixserverlib.RoomVersion(*roomVersion), + events, + authEvents, + ) + if err != nil { + panic(err) + } + + fmt.Println("Resolved state contains", len(resolved), "events") + for _, event := range resolved { + fmt.Println() + fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey()) + fmt.Printf(" %s\n", string(event.Content())) + } +} diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index efeb53fa6..ef945694c 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -87,6 +87,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { case api.OutputTypeNewRoomEvent: ev := &output.NewRoomEvent.Event + if output.NewRoomEvent.RewritesState { + if err := s.db.PurgeRoomState(context.TODO(), ev.RoomID()); err != nil { + return fmt.Errorf("s.db.PurgeRoom: %w", err) + } + } + if err := s.processMessage(*output.NewRoomEvent); err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go index 734b368fe..a3f5073f9 100644 --- a/federationsender/storage/interface.go +++ b/federationsender/storage/interface.go @@ -32,6 +32,7 @@ type Database interface { GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) // GetJoinedHostsForRooms returns the complete set of servers in the rooms given. GetJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error) + PurgeRoomState(ctx context.Context, roomID string) error StoreJSON(ctx context.Context, js string) (*shared.Receipt, error) diff --git a/federationsender/storage/postgres/joined_hosts_table.go b/federationsender/storage/postgres/joined_hosts_table.go index cc112b7f3..0bc9335dd 100644 --- a/federationsender/storage/postgres/joined_hosts_table.go +++ b/federationsender/storage/postgres/joined_hosts_table.go @@ -53,6 +53,9 @@ const insertJoinedHostsSQL = "" + const deleteJoinedHostsSQL = "" + "DELETE FROM federationsender_joined_hosts WHERE event_id = ANY($1)" +const deleteJoinedHostsForRoomSQL = "" + + "DELETE FROM federationsender_joined_hosts WHERE room_id = $1" + const selectJoinedHostsSQL = "" + "SELECT event_id, server_name FROM federationsender_joined_hosts" + " WHERE room_id = $1" @@ -67,6 +70,7 @@ type joinedHostsStatements struct { db *sql.DB insertJoinedHostsStmt *sql.Stmt deleteJoinedHostsStmt *sql.Stmt + deleteJoinedHostsForRoomStmt *sql.Stmt selectJoinedHostsStmt *sql.Stmt selectAllJoinedHostsStmt *sql.Stmt selectJoinedHostsForRoomsStmt *sql.Stmt @@ -86,6 +90,9 @@ func NewPostgresJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err erro if s.deleteJoinedHostsStmt, err = s.db.Prepare(deleteJoinedHostsSQL); err != nil { return } + if s.deleteJoinedHostsForRoomStmt, err = s.db.Prepare(deleteJoinedHostsForRoomSQL); err != nil { + return + } if s.selectJoinedHostsStmt, err = s.db.Prepare(selectJoinedHostsSQL); err != nil { return } @@ -117,6 +124,14 @@ func (s *joinedHostsStatements) DeleteJoinedHosts( return err } +func (s *joinedHostsStatements) DeleteJoinedHostsForRoom( + ctx context.Context, txn *sql.Tx, roomID string, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteJoinedHostsForRoomStmt) + _, err := stmt.ExecContext(ctx, roomID) + return err +} + func (s *joinedHostsStatements) SelectJoinedHostsWithTx( ctx context.Context, txn *sql.Tx, roomID string, ) ([]types.JoinedHost, error) { diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go index 4c80c0792..d5731f31c 100644 --- a/federationsender/storage/shared/storage.go +++ b/federationsender/storage/shared/storage.go @@ -148,6 +148,20 @@ func (d *Database) StoreJSON( }, nil } +func (d *Database) PurgeRoomState( + ctx context.Context, roomID string, +) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + // If the event is a create event then we'll delete all of the existing + // data for the room. The only reason that a create event would be replayed + // to us in this way is if we're about to receive the entire room state. + if err := d.FederationSenderJoinedHosts.DeleteJoinedHostsForRoom(ctx, txn, roomID); err != nil { + return fmt.Errorf("d.FederationSenderJoinedHosts.DeleteJoinedHosts: %w", err) + } + return nil + }) +} + func (d *Database) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.FederationSenderBlacklist.InsertBlacklist(context.TODO(), txn, serverName) diff --git a/federationsender/storage/sqlite3/joined_hosts_table.go b/federationsender/storage/sqlite3/joined_hosts_table.go index 3bc45e7d5..1f906808d 100644 --- a/federationsender/storage/sqlite3/joined_hosts_table.go +++ b/federationsender/storage/sqlite3/joined_hosts_table.go @@ -53,6 +53,9 @@ const insertJoinedHostsSQL = "" + const deleteJoinedHostsSQL = "" + "DELETE FROM federationsender_joined_hosts WHERE event_id = $1" +const deleteJoinedHostsForRoomSQL = "" + + "DELETE FROM federationsender_joined_hosts WHERE room_id = $1" + const selectJoinedHostsSQL = "" + "SELECT event_id, server_name FROM federationsender_joined_hosts" + " WHERE room_id = $1" @@ -64,11 +67,12 @@ const selectJoinedHostsForRoomsSQL = "" + "SELECT DISTINCT server_name FROM federationsender_joined_hosts WHERE room_id IN ($1)" type joinedHostsStatements struct { - db *sql.DB - insertJoinedHostsStmt *sql.Stmt - deleteJoinedHostsStmt *sql.Stmt - selectJoinedHostsStmt *sql.Stmt - selectAllJoinedHostsStmt *sql.Stmt + db *sql.DB + insertJoinedHostsStmt *sql.Stmt + deleteJoinedHostsStmt *sql.Stmt + deleteJoinedHostsForRoomStmt *sql.Stmt + selectJoinedHostsStmt *sql.Stmt + selectAllJoinedHostsStmt *sql.Stmt // selectJoinedHostsForRoomsStmt *sql.Stmt - prepared at runtime due to variadic } @@ -86,6 +90,9 @@ func NewSQLiteJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error) if s.deleteJoinedHostsStmt, err = db.Prepare(deleteJoinedHostsSQL); err != nil { return } + if s.deleteJoinedHostsForRoomStmt, err = s.db.Prepare(deleteJoinedHostsForRoomSQL); err != nil { + return + } if s.selectJoinedHostsStmt, err = db.Prepare(selectJoinedHostsSQL); err != nil { return } @@ -118,6 +125,14 @@ func (s *joinedHostsStatements) DeleteJoinedHosts( return nil } +func (s *joinedHostsStatements) DeleteJoinedHostsForRoom( + ctx context.Context, txn *sql.Tx, roomID string, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteJoinedHostsForRoomStmt) + _, err := stmt.ExecContext(ctx, roomID) + return err +} + func (s *joinedHostsStatements) SelectJoinedHostsWithTx( ctx context.Context, txn *sql.Tx, roomID string, ) ([]types.JoinedHost, error) { diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go index c6f8a2d52..1167a212a 100644 --- a/federationsender/storage/tables/interface.go +++ b/federationsender/storage/tables/interface.go @@ -50,6 +50,7 @@ type FederationSenderQueueJSON interface { type FederationSenderJoinedHosts interface { InsertJoinedHosts(ctx context.Context, txn *sql.Tx, roomID, eventID string, serverName gomatrixserverlib.ServerName) error DeleteJoinedHosts(ctx context.Context, txn *sql.Tx, eventIDs []string) error + DeleteJoinedHostsForRoom(ctx context.Context, txn *sql.Tx, roomID string) error SelectJoinedHostsWithTx(ctx context.Context, txn *sql.Tx, roomID string) ([]types.JoinedHost, error) SelectJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error) SelectAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index 5631959b7..2bf6b9f8a 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -17,7 +17,6 @@ package input import ( - "bytes" "context" "fmt" @@ -28,7 +27,6 @@ import ( "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - "github.com/sirupsen/logrus" ) // updateLatestEvents updates the list of latest events for this room in the database and writes the @@ -118,7 +116,6 @@ type latestEventsUpdater struct { func (u *latestEventsUpdater) doUpdateLatestEvents() error { u.lastEventIDSent = u.updater.LastEventIDSent() - u.oldStateNID = u.updater.CurrentStateSnapshotNID() // If we are doing a regular event update then we will get the // previous latest events to use as a part of the calculation. If @@ -127,7 +124,8 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // then start with an empty set - none of the forward extremities // that we knew about before matter anymore. oldLatest := []types.StateAtEventAndReference{} - if !u.stateAtEvent.Overwrite { + if !u.rewritesState { + u.oldStateNID = u.updater.CurrentStateSnapshotNID() oldLatest = u.updater.LatestEvents() } @@ -141,27 +139,32 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // Work out what the latest events are. This will include the new // event if it is not already referenced. - if err := u.calculateLatest( - oldLatest, + extremitiesChanged, err := u.calculateLatest( + oldLatest, &u.event, types.StateAtEventAndReference{ EventReference: u.event.EventReference(), StateAtEvent: u.stateAtEvent, }, - ); err != nil { + ) + if err != nil { return fmt.Errorf("u.calculateLatest: %w", err) } // Now that we know what the latest events are, it's time to get the // latest state. - if err := u.latestState(); err != nil { - return fmt.Errorf("u.latestState: %w", err) - } + var updates []api.OutputEvent + if extremitiesChanged || u.rewritesState { + if err = u.latestState(); err != nil { + return fmt.Errorf("u.latestState: %w", err) + } - // If we need to generate any output events then here's where we do it. - // TODO: Move this! - updates, err := u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added) - if err != nil { - return fmt.Errorf("u.api.updateMemberships: %w", err) + // If we need to generate any output events then here's where we do it. + // TODO: Move this! + if updates, err = u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added); err != nil { + return fmt.Errorf("u.api.updateMemberships: %w", err) + } + } else { + u.newStateNID = u.oldStateNID } update, err := u.makeOutputNewRoomEvent() @@ -250,54 +253,77 @@ func (u *latestEventsUpdater) latestState() error { // true if the new event is included in those extremites, false otherwise. func (u *latestEventsUpdater) calculateLatest( oldLatest []types.StateAtEventAndReference, - newEvent types.StateAtEventAndReference, -) error { - var newLatest []types.StateAtEventAndReference - - // First of all, let's see if any of the existing forward extremities - // now have entries in the previous events table. If they do then we - // will no longer include them as forward extremities. - for _, l := range oldLatest { - referenced, err := u.updater.IsReferenced(l.EventReference) - if err != nil { - logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", l.EventID) - return fmt.Errorf("u.updater.IsReferenced (old): %w", err) - } else if !referenced { - newLatest = append(newLatest, l) - } + newEvent *gomatrixserverlib.Event, + newStateAndRef types.StateAtEventAndReference, +) (bool, error) { + // First of all, get a list of all of the events in our current + // set of forward extremities. + existingRefs := make(map[string]*types.StateAtEventAndReference) + existingNIDs := make([]types.EventNID, len(oldLatest)) + for i, old := range oldLatest { + existingRefs[old.EventID] = &oldLatest[i] + existingNIDs[i] = old.EventNID } - // Then check and see if our new event is already included in that set. - // This ordinarily won't happen but it covers the edge-case that we've - // already seen this event before and it's a forward extremity, so rather - // than adding a duplicate, we'll just return the set as complete. - for _, l := range newLatest { - if l.EventReference.EventID == newEvent.EventReference.EventID && bytes.Equal(l.EventReference.EventSHA256, newEvent.EventReference.EventSHA256) { - // We've already referenced this new event so we can just return - // the newly completed extremities at this point. - u.latest = newLatest - return nil - } - } - - // At this point we've processed the old extremities, and we've checked - // that our new event isn't already in that set. Therefore now we can - // check if our *new* event is a forward extremity, and if it is, add - // it in. - referenced, err := u.updater.IsReferenced(newEvent.EventReference) + // Look up the old extremity events. This allows us to find their + // prev events. + events, err := u.api.DB.Events(u.ctx, existingNIDs) if err != nil { - logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", newEvent.EventReference.EventID) - return fmt.Errorf("u.updater.IsReferenced (new): %w", err) - } else if !referenced || len(newLatest) == 0 { - newLatest = append(newLatest, newEvent) + return false, fmt.Errorf("u.api.DB.Events: %w", err) + } + + // Make a list of all of the prev events as referenced by all of + // the current forward extremities. + existingPrevs := make(map[string]struct{}) + for _, old := range events { + for _, prevEventID := range old.PrevEventIDs() { + existingPrevs[prevEventID] = struct{}{} + } + } + + // If the "new" event is already referenced by a forward extremity + // then do nothing - it's not a candidate to be a new extremity if + // it has been referenced. + if _, ok := existingPrevs[newEvent.EventID()]; ok { + return false, nil + } + + // If the "new" event is already a forward extremity then stop, as + // nothing changes. + for _, event := range events { + if event.EventID() == newEvent.EventID() { + return false, nil + } + } + + // Include our new event in the extremities. + newLatest := []types.StateAtEventAndReference{newStateAndRef} + + // Then run through and see if the other extremities are still valid. + // If our new event references them then they are no longer good + // candidates. + for _, prevEventID := range newEvent.PrevEventIDs() { + delete(existingRefs, prevEventID) + } + + // Ensure that we don't add any candidate forward extremities from + // the old set that are, themselves, referenced by the old set of + // forward extremities. This shouldn't happen but guards against + // the possibility anyway. + for prevEventID := range existingPrevs { + delete(existingRefs, prevEventID) + } + + // Then re-add any old extremities that are still valid after all. + for _, old := range existingRefs { + newLatest = append(newLatest, *old) } u.latest = newLatest - return nil + return true, nil } func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) { - latestEventIDs := make([]string, len(u.latest)) for i := range u.latest { latestEventIDs[i] = u.latest[i].EventID @@ -338,11 +364,6 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) return nil, fmt.Errorf("failed to load add_state_events from db: %w", err) } } - // State is rewritten if the input room event HasState and we actually produced a delta on state events. - // Without this check, /get_missing_events which produce events with associated (but not complete) state - // will incorrectly purge the room and set it to no state. TODO: This is likely flakey, as if /gme produced - // a state conflict res which just so happens to include 2+ events we might purge the room state downstream. - ore.RewritesState = len(ore.AddsStateEventIDs) > 1 return &api.OutputEvent{ Type: api.OutputTypeNewRoomEvent, diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index c8e60efa3..41cbd2637 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -379,7 +379,7 @@ func TestOutputRewritesState(t *testing.T) { if len(producer.producedMessages) != 1 { t.Fatalf("Rewritten events got output, want only 1 got %d", len(producer.producedMessages)) } - outputEvent := producer.producedMessages[0] + outputEvent := producer.producedMessages[len(producer.producedMessages)-1] if !outputEvent.NewRoomEvent.RewritesState { t.Errorf("RewritesState flag not set on output event") } diff --git a/roomserver/state/state.go b/roomserver/state/state.go index 2944f71c1..d23f14c84 100644 --- a/roomserver/state/state.go +++ b/roomserver/state/state.go @@ -526,13 +526,7 @@ func (v StateResolution) CalculateAndStoreStateBeforeEvent( isRejected bool, ) (types.StateSnapshotNID, error) { // Load the state at the prev events. - prevEventRefs := event.PrevEvents() - prevEventIDs := make([]string, len(prevEventRefs)) - for i := range prevEventRefs { - prevEventIDs[i] = prevEventRefs[i].EventID - } - - prevStates, err := v.db.StateAtEventIDs(ctx, prevEventIDs) + prevStates, err := v.db.StateAtEventIDs(ctx, event.PrevEventIDs()) if err != nil { return 0, err } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 7b694f31f..5d315cdab 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -149,7 +149,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( } if msg.RewritesState { - if err = s.db.PurgeRoom(ctx, ev.RoomID()); err != nil { + if err = s.db.PurgeRoomState(ctx, ev.RoomID()); err != nil { return fmt.Errorf("s.db.PurgeRoom: %w", err) } } diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index ce7f1c152..e12a1166e 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -43,9 +43,9 @@ type Database interface { // Returns an error if there was a problem inserting this event. WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []gomatrixserverlib.HeaderedEvent, addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool) (types.StreamPosition, error) - // PurgeRoom completely purges room state from the sync API. This is done when + // PurgeRoomState completely purges room state from the sync API. This is done when // receiving an output event that completely resets the state. - PurgeRoom(ctx context.Context, roomID string) error + PurgeRoomState(ctx context.Context, roomID string) error // GetStateEvent returns the Matrix state event of a given type for a given room with a given state key // If no event could be found, returns nil // If there was an issue during the retrieval, returns an error diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index b9f21913e..a7c07f943 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -276,7 +276,7 @@ func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, e return nil } -func (d *Database) PurgeRoom( +func (d *Database) PurgeRoomState( ctx context.Context, roomID string, ) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { @@ -286,15 +286,6 @@ func (d *Database) PurgeRoom( if err := d.CurrentRoomState.DeleteRoomStateForRoom(ctx, txn, roomID); err != nil { return fmt.Errorf("d.CurrentRoomState.DeleteRoomStateForRoom: %w", err) } - if err := d.OutputEvents.DeleteEventsForRoom(ctx, txn, roomID); err != nil { - return fmt.Errorf("d.Events.DeleteEventsForRoom: %w", err) - } - if err := d.Topology.DeleteTopologyForRoom(ctx, txn, roomID); err != nil { - return fmt.Errorf("d.Topology.DeleteTopologyForRoom: %w", err) - } - if err := d.BackwardExtremities.DeleteBackwardExtremitiesForRoom(ctx, txn, roomID); err != nil { - return fmt.Errorf("d.BackwardExtremities.DeleteBackwardExtremitiesForRoom: %w", err) - } return nil }) }