diff --git a/federationsender/storage/sqlite3/joined_hosts_table.go b/federationsender/storage/sqlite3/joined_hosts_table.go new file mode 100644 index 000000000..1437a062b --- /dev/null +++ b/federationsender/storage/sqlite3/joined_hosts_table.go @@ -0,0 +1,139 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/gomatrixserverlib" +) + +const joinedHostsSchema = ` +-- The joined_hosts table stores a list of m.room.member event ids in the +-- current state for each room where the membership is "join". +-- There will be an entry for every user that is joined to the room. +CREATE TABLE IF NOT EXISTS federationsender_joined_hosts ( + -- The string ID of the room. + room_id TEXT NOT NULL, + -- The event ID of the m.room.member join event. + event_id TEXT NOT NULL, + -- The domain part of the user ID the m.room.member event is for. + server_name TEXT NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS federatonsender_joined_hosts_event_id_idx + ON federationsender_joined_hosts (event_id); + +CREATE INDEX IF NOT EXISTS federatonsender_joined_hosts_room_id_idx + ON federationsender_joined_hosts (room_id) +` + +const insertJoinedHostsSQL = "" + + "INSERT INTO federationsender_joined_hosts (room_id, event_id, server_name)" + + " VALUES ($1, $2, $3)" + +const deleteJoinedHostsSQL = "" + + "DELETE FROM federationsender_joined_hosts WHERE event_id = $1" + +const selectJoinedHostsSQL = "" + + "SELECT event_id, server_name FROM federationsender_joined_hosts" + + " WHERE room_id = $1" + +type joinedHostsStatements struct { + insertJoinedHostsStmt *sql.Stmt + deleteJoinedHostsStmt *sql.Stmt + selectJoinedHostsStmt *sql.Stmt +} + +func (s *joinedHostsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(joinedHostsSchema) + if err != nil { + return + } + if s.insertJoinedHostsStmt, err = db.Prepare(insertJoinedHostsSQL); err != nil { + return + } + if s.deleteJoinedHostsStmt, err = db.Prepare(deleteJoinedHostsSQL); err != nil { + return + } + if s.selectJoinedHostsStmt, err = db.Prepare(selectJoinedHostsSQL); err != nil { + return + } + return +} + +func (s *joinedHostsStatements) insertJoinedHosts( + ctx context.Context, + txn *sql.Tx, + roomID, eventID string, + serverName gomatrixserverlib.ServerName, +) error { + stmt := common.TxStmt(txn, s.insertJoinedHostsStmt) + _, err := stmt.ExecContext(ctx, roomID, eventID, serverName) + return err +} + +func (s *joinedHostsStatements) deleteJoinedHosts( + ctx context.Context, txn *sql.Tx, eventIDs []string, +) error { + for _, eventID := range eventIDs { + stmt := common.TxStmt(txn, s.deleteJoinedHostsStmt) + if _, err := stmt.ExecContext(ctx, eventID); err != nil { + return err + } + } + return nil +} + +func (s *joinedHostsStatements) selectJoinedHostsWithTx( + ctx context.Context, txn *sql.Tx, roomID string, +) ([]types.JoinedHost, error) { + stmt := common.TxStmt(txn, s.selectJoinedHostsStmt) + return joinedHostsFromStmt(ctx, stmt, roomID) +} + +func (s *joinedHostsStatements) selectJoinedHosts( + ctx context.Context, roomID string, +) ([]types.JoinedHost, error) { + return joinedHostsFromStmt(ctx, s.selectJoinedHostsStmt, roomID) +} + +func joinedHostsFromStmt( + ctx context.Context, stmt *sql.Stmt, roomID string, +) ([]types.JoinedHost, error) { + rows, err := stmt.QueryContext(ctx, roomID) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + + var result []types.JoinedHost + for rows.Next() { + var eventID, serverName string + if err = rows.Scan(&eventID, &serverName); err != nil { + return nil, err + } + result = append(result, types.JoinedHost{ + MemberEventID: eventID, + ServerName: gomatrixserverlib.ServerName(serverName), + }) + } + + return result, nil +} diff --git a/federationsender/storage/sqlite3/room_table.go b/federationsender/storage/sqlite3/room_table.go new file mode 100644 index 000000000..6361400d3 --- /dev/null +++ b/federationsender/storage/sqlite3/room_table.go @@ -0,0 +1,101 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/common" +) + +const roomSchema = ` +CREATE TABLE IF NOT EXISTS federationsender_rooms ( + -- The string ID of the room + room_id TEXT PRIMARY KEY, + -- The most recent event state by the room server. + -- We can use this to tell if our view of the room state has become + -- desynchronised. + last_event_id TEXT NOT NULL +);` + +const insertRoomSQL = "" + + "INSERT INTO federationsender_rooms (room_id, last_event_id) VALUES ($1, '')" + + " ON CONFLICT DO NOTHING" + +const selectRoomForUpdateSQL = "" + + "SELECT last_event_id FROM federationsender_rooms WHERE room_id = $1" + +const updateRoomSQL = "" + + "UPDATE federationsender_rooms SET last_event_id = $2 WHERE room_id = $1" + +type roomStatements struct { + insertRoomStmt *sql.Stmt + selectRoomForUpdateStmt *sql.Stmt + updateRoomStmt *sql.Stmt +} + +func (s *roomStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(roomSchema) + if err != nil { + return + } + + if s.insertRoomStmt, err = db.Prepare(insertRoomSQL); err != nil { + return + } + if s.selectRoomForUpdateStmt, err = db.Prepare(selectRoomForUpdateSQL); err != nil { + return + } + if s.updateRoomStmt, err = db.Prepare(updateRoomSQL); err != nil { + return + } + return +} + +// insertRoom inserts the room if it didn't already exist. +// If the room didn't exist then last_event_id is set to the empty string. +func (s *roomStatements) insertRoom( + ctx context.Context, txn *sql.Tx, roomID string, +) error { + _, err := common.TxStmt(txn, s.insertRoomStmt).ExecContext(ctx, roomID) + return err +} + +// selectRoomForUpdate locks the row for the room and returns the last_event_id. +// The row must already exist in the table. Callers can ensure that the row +// exists by calling insertRoom first. +func (s *roomStatements) selectRoomForUpdate( + ctx context.Context, txn *sql.Tx, roomID string, +) (string, error) { + var lastEventID string + stmt := common.TxStmt(txn, s.selectRoomForUpdateStmt) + err := stmt.QueryRowContext(ctx, roomID).Scan(&lastEventID) + if err != nil { + return "", err + } + return lastEventID, nil +} + +// updateRoom updates the last_event_id for the room. selectRoomForUpdate should +// have already been called earlier within the transaction. +func (s *roomStatements) updateRoom( + ctx context.Context, txn *sql.Tx, roomID, lastEventID string, +) error { + stmt := common.TxStmt(txn, s.updateRoomStmt) + _, err := stmt.ExecContext(ctx, roomID, lastEventID) + return err +} diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go new file mode 100644 index 000000000..f9cfaa99d --- /dev/null +++ b/federationsender/storage/sqlite3/storage.go @@ -0,0 +1,124 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + _ "github.com/mattn/go-sqlite3" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/federationsender/types" +) + +// Database stores information needed by the federation sender +type Database struct { + joinedHostsStatements + roomStatements + common.PartitionOffsetStatements + db *sql.DB +} + +// NewDatabase opens a new database +func NewDatabase(dataSourceName string) (*Database, error) { + var result Database + var err error + if result.db, err = sql.Open("sqlite3", dataSourceName); err != nil { + return nil, err + } + if err = result.prepare(); err != nil { + return nil, err + } + return &result, nil +} + +func (d *Database) prepare() error { + var err error + + if err = d.joinedHostsStatements.prepare(d.db); err != nil { + return err + } + + if err = d.roomStatements.prepare(d.db); err != nil { + return err + } + + return d.PartitionOffsetStatements.Prepare(d.db, "federationsender") +} + +// UpdateRoom updates the joined hosts for a room and returns what the joined +// hosts were before the update, or nil if this was a duplicate message. +// This is called when we receive a message from kafka, so we pass in +// oldEventID and newEventID to check that we haven't missed any messages or +// this isn't a duplicate message. +func (d *Database) UpdateRoom( + ctx context.Context, + roomID, oldEventID, newEventID string, + addHosts []types.JoinedHost, + removeHosts []string, +) (joinedHosts []types.JoinedHost, err error) { + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { + err = d.insertRoom(ctx, txn, roomID) + if err != nil { + return err + } + + lastSentEventID, err := d.selectRoomForUpdate(ctx, txn, roomID) + if err != nil { + return err + } + + if lastSentEventID == newEventID { + // We've handled this message before, so let's just ignore it. + // We can only get a duplicate for the last message we processed, + // so its enough just to compare the newEventID with lastSentEventID + return nil + } + + if lastSentEventID != "" && lastSentEventID != oldEventID { + return types.EventIDMismatchError{ + DatabaseID: lastSentEventID, RoomServerID: oldEventID, + } + } + + joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID) + if err != nil { + return err + } + + for _, add := range addHosts { + err = d.insertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName) + if err != nil { + return err + } + } + if err = d.deleteJoinedHosts(ctx, txn, removeHosts); err != nil { + return err + } + return d.updateRoom(ctx, txn, roomID, newEventID) + }) + return +} + +// GetJoinedHosts returns the currently joined hosts for room, +// as known to federationserver. +// Returns an error if something goes wrong. +func (d *Database) GetJoinedHosts( + ctx context.Context, roomID string, +) ([]types.JoinedHost, error) { + return d.selectJoinedHosts(ctx, roomID) +} diff --git a/federationsender/storage/storage.go b/federationsender/storage/storage.go index 4ce151c7a..e83c1e9d2 100644 --- a/federationsender/storage/storage.go +++ b/federationsender/storage/storage.go @@ -20,6 +20,7 @@ import ( "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/federationsender/storage/postgres" + "github.com/matrix-org/dendrite/federationsender/storage/sqlite3" "github.com/matrix-org/dendrite/federationsender/types" ) @@ -36,6 +37,8 @@ func NewDatabase(dataSourceName string) (Database, error) { return postgres.NewDatabase(dataSourceName) } switch uri.Scheme { + case "file": + return sqlite3.NewDatabase(dataSourceName) case "postgres": return postgres.NewDatabase(dataSourceName) default: