Remove rooms table from federation sender (#1751)

* Remove last sent event ID column from federation sender

* Remove EventIDMismatchError

* Remove the federationsender rooms table altogether, it's useless

* Add migration

* Fix migrations

* Fix migrations
This commit is contained in:
Neil Alexander 2021-02-04 11:52:49 +00:00 committed by GitHub
parent b7e3b81a22
commit 6099379ea4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 105 additions and 268 deletions

View file

@ -0,0 +1,46 @@
// Copyright 2021 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package deltas
import (
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpRemoveRoomsTable, DownRemoveRoomsTable)
}
func LoadRemoveRoomsTable(m *sqlutil.Migrations) {
m.AddMigration(UpRemoveRoomsTable, DownRemoveRoomsTable)
}
func UpRemoveRoomsTable(tx *sql.Tx) error {
_, err := tx.Exec(`
DROP TABLE IF EXISTS federationsender_rooms;
`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
func DownRemoveRoomsTable(tx *sql.Tx) error {
// We can't reverse this.
return nil
}

View file

@ -1,104 +0,0 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
const roomSchema = `
CREATE TABLE IF NOT EXISTS federationsender_rooms (
-- The string ID of the room
room_id TEXT PRIMARY KEY,
-- The most recent event state by the room server.
-- We can use this to tell if our view of the room state has become
-- desynchronised.
last_event_id TEXT NOT NULL
);`
const insertRoomSQL = "" +
"INSERT INTO federationsender_rooms (room_id, last_event_id) VALUES ($1, '')" +
" ON CONFLICT DO NOTHING"
const selectRoomForUpdateSQL = "" +
"SELECT last_event_id FROM federationsender_rooms WHERE room_id = $1 FOR UPDATE"
const updateRoomSQL = "" +
"UPDATE federationsender_rooms SET last_event_id = $2 WHERE room_id = $1"
type roomStatements struct {
db *sql.DB
insertRoomStmt *sql.Stmt
selectRoomForUpdateStmt *sql.Stmt
updateRoomStmt *sql.Stmt
}
func NewPostgresRoomsTable(db *sql.DB) (s *roomStatements, err error) {
s = &roomStatements{
db: db,
}
_, err = s.db.Exec(roomSchema)
if err != nil {
return
}
if s.insertRoomStmt, err = s.db.Prepare(insertRoomSQL); err != nil {
return
}
if s.selectRoomForUpdateStmt, err = s.db.Prepare(selectRoomForUpdateSQL); err != nil {
return
}
if s.updateRoomStmt, err = s.db.Prepare(updateRoomSQL); err != nil {
return
}
return
}
// insertRoom inserts the room if it didn't already exist.
// If the room didn't exist then last_event_id is set to the empty string.
func (s *roomStatements) InsertRoom(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
_, err := sqlutil.TxStmt(txn, s.insertRoomStmt).ExecContext(ctx, roomID)
return err
}
// selectRoomForUpdate locks the row for the room and returns the last_event_id.
// The row must already exist in the table. Callers can ensure that the row
// exists by calling insertRoom first.
func (s *roomStatements) SelectRoomForUpdate(
ctx context.Context, txn *sql.Tx, roomID string,
) (string, error) {
var lastEventID string
stmt := sqlutil.TxStmt(txn, s.selectRoomForUpdateStmt)
err := stmt.QueryRowContext(ctx, roomID).Scan(&lastEventID)
if err != nil {
return "", err
}
return lastEventID, nil
}
// updateRoom updates the last_event_id for the room. selectRoomForUpdate should
// have already been called earlier within the transaction.
func (s *roomStatements) UpdateRoom(
ctx context.Context, txn *sql.Tx, roomID, lastEventID string,
) error {
stmt := sqlutil.TxStmt(txn, s.updateRoomStmt)
_, err := stmt.ExecContext(ctx, roomID, lastEventID)
return err
}

View file

@ -18,6 +18,7 @@ package postgres
import ( import (
"database/sql" "database/sql"
"github.com/matrix-org/dendrite/federationsender/storage/postgres/deltas"
"github.com/matrix-org/dendrite/federationsender/storage/shared" "github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
@ -56,10 +57,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
if err != nil { if err != nil {
return nil, err return nil, err
} }
rooms, err := NewPostgresRoomsTable(d.db)
if err != nil {
return nil, err
}
blacklist, err := NewPostgresBlacklistTable(d.db) blacklist, err := NewPostgresBlacklistTable(d.db)
if err != nil { if err != nil {
return nil, err return nil, err
@ -72,6 +69,11 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
if err != nil { if err != nil {
return nil, err return nil, err
} }
m := sqlutil.NewMigrations()
deltas.LoadRemoveRoomsTable(m)
if err = m.RunDeltas(d.db, dbProperties); err != nil {
return nil, err
}
d.Database = shared.Database{ d.Database = shared.Database{
DB: d.db, DB: d.db,
Cache: cache, Cache: cache,
@ -80,7 +82,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
FederationSenderQueuePDUs: queuePDUs, FederationSenderQueuePDUs: queuePDUs,
FederationSenderQueueEDUs: queueEDUs, FederationSenderQueueEDUs: queueEDUs,
FederationSenderQueueJSON: queueJSON, FederationSenderQueueJSON: queueJSON,
FederationSenderRooms: rooms,
FederationSenderBlacklist: blacklist, FederationSenderBlacklist: blacklist,
FederationSenderInboundPeeks: inboundPeeks, FederationSenderInboundPeeks: inboundPeeks,
FederationSenderOutboundPeeks: outboundPeeks, FederationSenderOutboundPeeks: outboundPeeks,

View file

@ -34,7 +34,6 @@ type Database struct {
FederationSenderQueueEDUs tables.FederationSenderQueueEDUs FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
FederationSenderQueueJSON tables.FederationSenderQueueJSON FederationSenderQueueJSON tables.FederationSenderQueueJSON
FederationSenderJoinedHosts tables.FederationSenderJoinedHosts FederationSenderJoinedHosts tables.FederationSenderJoinedHosts
FederationSenderRooms tables.FederationSenderRooms
FederationSenderBlacklist tables.FederationSenderBlacklist FederationSenderBlacklist tables.FederationSenderBlacklist
FederationSenderOutboundPeeks tables.FederationSenderOutboundPeeks FederationSenderOutboundPeeks tables.FederationSenderOutboundPeeks
FederationSenderInboundPeeks tables.FederationSenderInboundPeeks FederationSenderInboundPeeks tables.FederationSenderInboundPeeks
@ -64,29 +63,6 @@ func (d *Database) UpdateRoom(
removeHosts []string, removeHosts []string,
) (joinedHosts []types.JoinedHost, err error) { ) (joinedHosts []types.JoinedHost, err error) {
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
err = d.FederationSenderRooms.InsertRoom(ctx, txn, roomID)
if err != nil {
return err
}
lastSentEventID, err := d.FederationSenderRooms.SelectRoomForUpdate(ctx, txn, roomID)
if err != nil {
return err
}
if lastSentEventID == newEventID {
// We've handled this message before, so let's just ignore it.
// We can only get a duplicate for the last message we processed,
// so its enough just to compare the newEventID with lastSentEventID
return nil
}
if lastSentEventID != "" && lastSentEventID != oldEventID {
return types.EventIDMismatchError{
DatabaseID: lastSentEventID, RoomServerID: oldEventID,
}
}
joinedHosts, err = d.FederationSenderJoinedHosts.SelectJoinedHostsWithTx(ctx, txn, roomID) joinedHosts, err = d.FederationSenderJoinedHosts.SelectJoinedHostsWithTx(ctx, txn, roomID)
if err != nil { if err != nil {
return err return err
@ -101,7 +77,7 @@ func (d *Database) UpdateRoom(
if err = d.FederationSenderJoinedHosts.DeleteJoinedHosts(ctx, txn, removeHosts); err != nil { if err = d.FederationSenderJoinedHosts.DeleteJoinedHosts(ctx, txn, removeHosts); err != nil {
return err return err
} }
return d.FederationSenderRooms.UpdateRoom(ctx, txn, roomID, newEventID) return nil
}) })
return return
} }

View file

@ -0,0 +1,46 @@
// Copyright 2021 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package deltas
import (
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpRemoveRoomsTable, DownRemoveRoomsTable)
}
func LoadRemoveRoomsTable(m *sqlutil.Migrations) {
m.AddMigration(UpRemoveRoomsTable, DownRemoveRoomsTable)
}
func UpRemoveRoomsTable(tx *sql.Tx) error {
_, err := tx.Exec(`
DROP TABLE IF EXISTS federationsender_rooms;
`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
func DownRemoveRoomsTable(tx *sql.Tx) error {
// We can't reverse this.
return nil
}

View file

@ -1,105 +0,0 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
const roomSchema = `
CREATE TABLE IF NOT EXISTS federationsender_rooms (
-- The string ID of the room
room_id TEXT PRIMARY KEY,
-- The most recent event state by the room server.
-- We can use this to tell if our view of the room state has become
-- desynchronised.
last_event_id TEXT NOT NULL
);`
const insertRoomSQL = "" +
"INSERT INTO federationsender_rooms (room_id, last_event_id) VALUES ($1, '')" +
" ON CONFLICT DO NOTHING"
const selectRoomForUpdateSQL = "" +
"SELECT last_event_id FROM federationsender_rooms WHERE room_id = $1"
const updateRoomSQL = "" +
"UPDATE federationsender_rooms SET last_event_id = $2 WHERE room_id = $1"
type roomStatements struct {
db *sql.DB
insertRoomStmt *sql.Stmt
selectRoomForUpdateStmt *sql.Stmt
updateRoomStmt *sql.Stmt
}
func NewSQLiteRoomsTable(db *sql.DB) (s *roomStatements, err error) {
s = &roomStatements{
db: db,
}
_, err = db.Exec(roomSchema)
if err != nil {
return
}
if s.insertRoomStmt, err = db.Prepare(insertRoomSQL); err != nil {
return
}
if s.selectRoomForUpdateStmt, err = db.Prepare(selectRoomForUpdateSQL); err != nil {
return
}
if s.updateRoomStmt, err = db.Prepare(updateRoomSQL); err != nil {
return
}
return
}
// insertRoom inserts the room if it didn't already exist.
// If the room didn't exist then last_event_id is set to the empty string.
func (s *roomStatements) InsertRoom(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
_, err := sqlutil.TxStmt(txn, s.insertRoomStmt).ExecContext(ctx, roomID)
return err
}
// selectRoomForUpdate locks the row for the room and returns the last_event_id.
// The row must already exist in the table. Callers can ensure that the row
// exists by calling insertRoom first.
func (s *roomStatements) SelectRoomForUpdate(
ctx context.Context, txn *sql.Tx, roomID string,
) (string, error) {
var lastEventID string
stmt := sqlutil.TxStmt(txn, s.selectRoomForUpdateStmt)
err := stmt.QueryRowContext(ctx, roomID).Scan(&lastEventID)
if err != nil {
return "", err
}
return lastEventID, nil
}
// updateRoom updates the last_event_id for the room. selectRoomForUpdate should
// have already been called earlier within the transaction.
func (s *roomStatements) UpdateRoom(
ctx context.Context, txn *sql.Tx, roomID, lastEventID string,
) error {
stmt := sqlutil.TxStmt(txn, s.updateRoomStmt)
_, err := stmt.ExecContext(ctx, roomID, lastEventID)
return err
}

View file

@ -21,6 +21,7 @@ import (
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
"github.com/matrix-org/dendrite/federationsender/storage/shared" "github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/federationsender/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
@ -46,10 +47,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
if err != nil { if err != nil {
return nil, err return nil, err
} }
rooms, err := NewSQLiteRoomsTable(d.db)
if err != nil {
return nil, err
}
queuePDUs, err := NewSQLiteQueuePDUsTable(d.db) queuePDUs, err := NewSQLiteQueuePDUsTable(d.db)
if err != nil { if err != nil {
return nil, err return nil, err
@ -74,6 +71,11 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
if err != nil { if err != nil {
return nil, err return nil, err
} }
m := sqlutil.NewMigrations()
deltas.LoadRemoveRoomsTable(m)
if err = m.RunDeltas(d.db, dbProperties); err != nil {
return nil, err
}
d.Database = shared.Database{ d.Database = shared.Database{
DB: d.db, DB: d.db,
Cache: cache, Cache: cache,
@ -82,7 +84,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
FederationSenderQueuePDUs: queuePDUs, FederationSenderQueuePDUs: queuePDUs,
FederationSenderQueueEDUs: queueEDUs, FederationSenderQueueEDUs: queueEDUs,
FederationSenderQueueJSON: queueJSON, FederationSenderQueueJSON: queueJSON,
FederationSenderRooms: rooms,
FederationSenderBlacklist: blacklist, FederationSenderBlacklist: blacklist,
FederationSenderOutboundPeeks: outboundPeeks, FederationSenderOutboundPeeks: outboundPeeks,
FederationSenderInboundPeeks: inboundPeeks, FederationSenderInboundPeeks: inboundPeeks,

View file

@ -56,12 +56,6 @@ type FederationSenderJoinedHosts interface {
SelectJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error) SelectJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error)
} }
type FederationSenderRooms interface {
InsertRoom(ctx context.Context, txn *sql.Tx, roomID string) error
SelectRoomForUpdate(ctx context.Context, txn *sql.Tx, roomID string) (string, error)
UpdateRoom(ctx context.Context, txn *sql.Tx, roomID, lastEventID string) error
}
type FederationSenderBlacklist interface { type FederationSenderBlacklist interface {
InsertBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error InsertBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error
SelectBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (bool, error) SelectBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (bool, error)

View file

@ -15,8 +15,6 @@
package types package types
import ( import (
"fmt"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -34,22 +32,6 @@ func (s ServerNames) Len() int { return len(s) }
func (s ServerNames) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s ServerNames) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s ServerNames) Less(i, j int) bool { return s[i] < s[j] } func (s ServerNames) Less(i, j int) bool { return s[i] < s[j] }
// A EventIDMismatchError indicates that we have got out of sync with the
// room server.
type EventIDMismatchError struct {
// The event ID we have stored in our local database.
DatabaseID string
// The event ID received from the room server.
RoomServerID string
}
func (e EventIDMismatchError) Error() string {
return fmt.Sprintf(
"mismatched last sent event ID: had %q in database got %q from room server",
e.DatabaseID, e.RoomServerID,
)
}
// tracks peeks we're performing on another server over federation // tracks peeks we're performing on another server over federation
type OutboundPeek struct { type OutboundPeek struct {
PeekID string PeekID string