From 4c84c4eeb4576e4a666d6dab4584795c3b0e0984 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 17 May 2022 10:36:31 +0100 Subject: [PATCH] Fix a race condition which could cause events to not be sent to servers If a new room event which rewrites state arrives, we remove all joined hosts then re-calculate them. This wasn't done in a transaction so for a brief period we would have no joined hosts. During this interim, key change events which arrive would not be sent to destination servers. This would sporadically fail on sytest. --- federationapi/consumers/roomserver.go | 13 +++---------- federationapi/internal/perform.go | 2 +- federationapi/storage/interface.go | 3 +-- federationapi/storage/shared/storage.go | 24 ++++++++++-------------- 4 files changed, 15 insertions(+), 27 deletions(-) diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index deba5b2f1..7a0816ff2 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -90,15 +90,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) switch output.Type { case api.OutputTypeNewRoomEvent: ev := output.NewRoomEvent.Event - - if output.NewRoomEvent.RewritesState { - if err := s.db.PurgeRoomState(s.ctx, ev.RoomID()); err != nil { - log.WithError(err).Errorf("roomserver output log: purge room state failure") - return false - } - } - - if err := s.processMessage(*output.NewRoomEvent); err != nil { + if err := s.processMessage(*output.NewRoomEvent, output.NewRoomEvent.RewritesState); err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ "event_id": ev.EventID(), @@ -146,7 +138,7 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee // processMessage updates the list of currently joined hosts in the room // and then sends the event to the hosts that were joined before the event. -func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { +func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent, rewritesState bool) error { addsStateEvents, missingEventIDs := ore.NeededStateEventIDs() // Ask the roomserver and add in the rest of the results into the set. @@ -180,6 +172,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err ore.Event.RoomID(), addsJoinedHosts, ore.RemovesStateEventIDs, + rewritesState, // if we're re-writing state, nuke all joined hosts before adding ) if err != nil { return err diff --git a/federationapi/internal/perform.go b/federationapi/internal/perform.go index ac21573df..7ccd68ef0 100644 --- a/federationapi/internal/perform.go +++ b/federationapi/internal/perform.go @@ -247,7 +247,7 @@ func (r *FederationInternalAPI) performJoinUsingServer( return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err) } logrus.WithField("hosts", joinedHosts).WithField("room", roomID).Info("Joined federated room with hosts") - if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil); err != nil { + if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil { return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err) } diff --git a/federationapi/storage/interface.go b/federationapi/storage/interface.go index fb90cae35..29254948b 100644 --- a/federationapi/storage/interface.go +++ b/federationapi/storage/interface.go @@ -25,13 +25,12 @@ import ( type Database interface { gomatrixserverlib.KeyDatabase - UpdateRoom(ctx context.Context, roomID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error) + UpdateRoom(ctx context.Context, roomID string, addHosts []types.JoinedHost, removeHosts []string, purgeRoomFirst bool) (joinedHosts []types.JoinedHost, err error) GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error) GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) // GetJoinedHostsForRooms returns the complete set of servers in the rooms given. GetJoinedHostsForRooms(ctx context.Context, roomIDs []string, excludeSelf bool) ([]gomatrixserverlib.ServerName, error) - PurgeRoomState(ctx context.Context, roomID string) error StoreJSON(ctx context.Context, js string) (*shared.Receipt, error) diff --git a/federationapi/storage/shared/storage.go b/federationapi/storage/shared/storage.go index b6e851cc1..01bf940e1 100644 --- a/federationapi/storage/shared/storage.go +++ b/federationapi/storage/shared/storage.go @@ -66,8 +66,18 @@ func (d *Database) UpdateRoom( roomID string, addHosts []types.JoinedHost, removeHosts []string, + purgeRoomFirst bool, ) (joinedHosts []types.JoinedHost, err error) { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + if purgeRoomFirst { + // If the event is a create event then we'll delete all of the existing + // data for the room. The only reason that a create event would be replayed + // to us in this way is if we're about to receive the entire room state. + if err := d.FederationJoinedHosts.DeleteJoinedHostsForRoom(ctx, txn, roomID); err != nil { + return fmt.Errorf("d.FederationJoinedHosts.DeleteJoinedHosts: %w", err) + } + } + joinedHosts, err = d.FederationJoinedHosts.SelectJoinedHostsWithTx(ctx, txn, roomID) if err != nil { return err @@ -138,20 +148,6 @@ func (d *Database) StoreJSON( }, nil } -func (d *Database) PurgeRoomState( - ctx context.Context, roomID string, -) error { - return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - // If the event is a create event then we'll delete all of the existing - // data for the room. The only reason that a create event would be replayed - // to us in this way is if we're about to receive the entire room state. - if err := d.FederationJoinedHosts.DeleteJoinedHostsForRoom(ctx, txn, roomID); err != nil { - return fmt.Errorf("d.FederationJoinedHosts.DeleteJoinedHosts: %w", err) - } - return nil - }) -} - func (d *Database) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.FederationBlacklist.InsertBlacklist(context.TODO(), txn, serverName)