From fcaac928e89fcea63873a1e2e25f7a8291b0838c Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 10 Sep 2020 16:00:20 +0100 Subject: [PATCH] Rewrite room state in sync API storage --- syncapi/consumers/roomserver.go | 17 ++++++++++++++- syncapi/storage/interface.go | 3 +++ .../postgres/current_room_state_table.go | 15 +++++++++++++ syncapi/storage/shared/syncserver.go | 21 +++++++++++++++++++ .../sqlite3/current_room_state_table.go | 15 +++++++++++++ syncapi/storage/tables/interface.go | 1 + 6 files changed, 71 insertions(+), 1 deletion(-) diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 5ca1be118..21122abdb 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -17,6 +17,7 @@ package consumers import ( "context" "encoding/json" + "fmt" "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal" @@ -143,6 +144,20 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( } } + if msg.Type == api.OutputRoomState { + err = s.db.RewriteState( + ctx, + &ev, + addsStateEvents, + msg.AddsStateEventIDs, + msg.TransactionID, + ) + if err != nil { + return fmt.Errorf("s.db.RewriteState: %w", err) + } + return nil + } + pduPos, err := s.db.WriteEvent( ctx, &ev, @@ -150,7 +165,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( msg.AddsStateEventIDs, msg.RemovesStateEventIDs, msg.TransactionID, - msg.Type == api.OutputRoomState, + false, ) if err != nil { // panic rather than continue with an inconsistent database diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 838fd547d..04f2eb346 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -41,6 +41,9 @@ type Database interface { // Returns an error if there was a problem inserting this event. WriteEvent(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []gomatrixserverlib.HeaderedEvent, addStateEventIDs []string, removeStateEventIDs []string, transactionID *api.TransactionID, excludeFromSync bool) (types.StreamPosition, error) + // RewriteState rewrites a current room state event. If the state event is a create event then all existing + // state in the room will be deleted before rewriting the event. + RewriteState(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, addStateEvents []gomatrixserverlib.HeaderedEvent, addStateEventIDs []string, transactionID *api.TransactionID) error // GetStateEvent returns the Matrix state event of a given type for a given room with a given state key // If no event could be found, returns nil // If there was an issue during the retrieval, returns an error diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index 5cb7baadf..f1d2b70ac 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -69,6 +69,9 @@ const upsertRoomStateSQL = "" + const deleteRoomStateByEventIDSQL = "" + "DELETE FROM syncapi_current_room_state WHERE event_id = $1" +const deleteRoomStateByRoomIDSQL = "" + + "DELETE FROM syncapi_current_room_state WHERE event_id = $1" + const selectRoomIDsWithMembershipSQL = "" + "SELECT DISTINCT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2" @@ -98,6 +101,7 @@ const selectEventsWithEventIDsSQL = "" + type currentRoomStateStatements struct { upsertRoomStateStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt + deleteRoomStateByRoomIDStmt *sql.Stmt selectRoomIDsWithMembershipStmt *sql.Stmt selectCurrentStateStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt @@ -117,6 +121,9 @@ func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, erro if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil { return nil, err } + if s.deleteRoomStateByRoomIDStmt, err = db.Prepare(deleteRoomStateByRoomIDSQL); err != nil { + return nil, err + } if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil { return nil, err } @@ -214,6 +221,14 @@ func (s *currentRoomStateStatements) DeleteRoomStateByEventID( return err } +func (s *currentRoomStateStatements) DeleteRoomStateByRoomID( + ctx context.Context, txn *sql.Tx, roomID string, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteRoomStateByRoomIDStmt) + _, err := stmt.ExecContext(ctx, roomID) + return err +} + func (s *currentRoomStateStatements) UpsertRoomState( ctx context.Context, txn *sql.Tx, event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition, diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 6267dd3aa..627932e83 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -241,6 +241,27 @@ func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, e return nil } +func (d *Database) RewriteState( + ctx context.Context, + ev *gomatrixserverlib.HeaderedEvent, + addStateEvents []gomatrixserverlib.HeaderedEvent, + addStateEventIDs []string, + transactionID *api.TransactionID, +) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + // If the event is a create event then we'll delete all of the existing + // state in the room. The only reason that a create event would be replayed + // to us in this way is if we're about to receive the entire room state. + if ev.Type() == gomatrixserverlib.MRoomCreate { + if err := d.CurrentRoomState.DeleteRoomStateByRoomID(ctx, txn, ev.RoomID()); err != nil { + return fmt.Errorf("d.CurrentRoomState.DeleteRoomStateByRoomID: %w", err) + } + } + + return d.updateRoomState(ctx, txn, []string{}, addStateEvents, types.StreamPosition(0)) + }) +} + func (d *Database) WriteEvent( ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index cb19639df..9f1755abf 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -57,6 +57,9 @@ const upsertRoomStateSQL = "" + const deleteRoomStateByEventIDSQL = "" + "DELETE FROM syncapi_current_room_state WHERE event_id = $1" +const deleteRoomStateByRoomIDSQL = "" + + "DELETE FROM syncapi_current_room_state WHERE event_id = $1" + const selectRoomIDsWithMembershipSQL = "" + "SELECT DISTINCT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2" @@ -88,6 +91,7 @@ type currentRoomStateStatements struct { streamIDStatements *streamIDStatements upsertRoomStateStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt + deleteRoomStateByRoomIDStmt *sql.Stmt selectRoomIDsWithMembershipStmt *sql.Stmt selectCurrentStateStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt @@ -109,6 +113,9 @@ func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *streamIDStatements) (t if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil { return nil, err } + if s.deleteRoomStateByRoomIDStmt, err = db.Prepare(deleteRoomStateByRoomIDSQL); err != nil { + return nil, err + } if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil { return nil, err } @@ -203,6 +210,14 @@ func (s *currentRoomStateStatements) DeleteRoomStateByEventID( return err } +func (s *currentRoomStateStatements) DeleteRoomStateByRoomID( + ctx context.Context, txn *sql.Tx, roomID string, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteRoomStateByRoomIDStmt) + _, err := stmt.ExecContext(ctx, roomID) + return err +} + func (s *currentRoomStateStatements) UpsertRoomState( ctx context.Context, txn *sql.Tx, event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition, diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 38f6d8482..50038b54a 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -75,6 +75,7 @@ type CurrentRoomState interface { SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error) UpsertRoomState(ctx context.Context, txn *sql.Tx, event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition) error DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error + DeleteRoomStateByRoomID(ctx context.Context, txn *sql.Tx, roomID string) error // SelectCurrentState returns all the current state events for the given room. SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter) ([]gomatrixserverlib.HeaderedEvent, error) // SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.