Fix a race condition which could cause events to not be sent to servers

If a new room event which rewrites state arrives, we remove all joined hosts
then re-calculate them. This wasn't done in a transaction so for a brief period
we would have no joined hosts. During this interim, key change events which arrive
would not be sent to destination servers. This would sporadically fail on sytest.
This commit is contained in:
Kegan Dougal 2022-05-17 10:36:31 +01:00
parent 533a9069e5
commit 4c84c4eeb4
4 changed files with 15 additions and 27 deletions

View file

@ -90,15 +90,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg)
switch output.Type {
case api.OutputTypeNewRoomEvent:
ev := output.NewRoomEvent.Event
if output.NewRoomEvent.RewritesState {
if err := s.db.PurgeRoomState(s.ctx, ev.RoomID()); err != nil {
log.WithError(err).Errorf("roomserver output log: purge room state failure")
return false
}
}
if err := s.processMessage(*output.NewRoomEvent); err != nil {
if err := s.processMessage(*output.NewRoomEvent, output.NewRoomEvent.RewritesState); err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event_id": ev.EventID(),
@ -146,7 +138,7 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee
// processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent, rewritesState bool) error {
addsStateEvents, missingEventIDs := ore.NeededStateEventIDs()
// Ask the roomserver and add in the rest of the results into the set.
@ -180,6 +172,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
ore.Event.RoomID(),
addsJoinedHosts,
ore.RemovesStateEventIDs,
rewritesState, // if we're re-writing state, nuke all joined hosts before adding
)
if err != nil {
return err

View file

@ -247,7 +247,7 @@ func (r *FederationInternalAPI) performJoinUsingServer(
return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err)
}
logrus.WithField("hosts", joinedHosts).WithField("room", roomID).Info("Joined federated room with hosts")
if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil); err != nil {
if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil {
return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err)
}

View file

@ -25,13 +25,12 @@ import (
type Database interface {
gomatrixserverlib.KeyDatabase
UpdateRoom(ctx context.Context, roomID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error)
UpdateRoom(ctx context.Context, roomID string, addHosts []types.JoinedHost, removeHosts []string, purgeRoomFirst bool) (joinedHosts []types.JoinedHost, err error)
GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
// GetJoinedHostsForRooms returns the complete set of servers in the rooms given.
GetJoinedHostsForRooms(ctx context.Context, roomIDs []string, excludeSelf bool) ([]gomatrixserverlib.ServerName, error)
PurgeRoomState(ctx context.Context, roomID string) error
StoreJSON(ctx context.Context, js string) (*shared.Receipt, error)

View file

@ -66,8 +66,18 @@ func (d *Database) UpdateRoom(
roomID string,
addHosts []types.JoinedHost,
removeHosts []string,
purgeRoomFirst bool,
) (joinedHosts []types.JoinedHost, err error) {
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
if purgeRoomFirst {
// If the event is a create event then we'll delete all of the existing
// data for the room. The only reason that a create event would be replayed
// to us in this way is if we're about to receive the entire room state.
if err := d.FederationJoinedHosts.DeleteJoinedHostsForRoom(ctx, txn, roomID); err != nil {
return fmt.Errorf("d.FederationJoinedHosts.DeleteJoinedHosts: %w", err)
}
}
joinedHosts, err = d.FederationJoinedHosts.SelectJoinedHostsWithTx(ctx, txn, roomID)
if err != nil {
return err
@ -138,20 +148,6 @@ func (d *Database) StoreJSON(
}, nil
}
func (d *Database) PurgeRoomState(
ctx context.Context, roomID string,
) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
// If the event is a create event then we'll delete all of the existing
// data for the room. The only reason that a create event would be replayed
// to us in this way is if we're about to receive the entire room state.
if err := d.FederationJoinedHosts.DeleteJoinedHostsForRoom(ctx, txn, roomID); err != nil {
return fmt.Errorf("d.FederationJoinedHosts.DeleteJoinedHosts: %w", err)
}
return nil
})
}
func (d *Database) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationBlacklist.InsertBlacklist(context.TODO(), txn, serverName)