From f68d31d0119f0c6c91bc86c0de91a2ba2abd746d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 5 Jul 2017 11:41:16 +0100 Subject: [PATCH] Add table for tracking membership state --- .../roomserver/storage/invite_table.go | 164 ++++++++++-------- .../roomserver/storage/membership_table.go | 106 +++++++++++ .../dendrite/roomserver/storage/storage.go | 44 +---- .../dendrite/roomserver/types/types.go | 43 ++++- 4 files changed, 245 insertions(+), 112 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/roomserver/storage/membership_table.go diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/invite_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/invite_table.go index 059e12293..c41f9eb11 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/invite_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/invite_table.go @@ -22,73 +22,88 @@ import ( const inviteSchema = ` CREATE TABLE invites ( - -- The numeric ID of the invite event itself. - invite_event_nid BIGINT NOT NULL CONSTRAINT invite_event_nid_unique UNIQUE, + -- The string ID of the invite event itself. + -- We can't use a numeric event ID here because we don't always have + -- enough information to store an invite in the event table. + -- In particular we don't always have a chain of auth_events for invites + -- received over federation. + invite_event_id TEXT PRIMARY KEY, -- The numeric ID of the room the invite m.room.member event is in. room_nid BIGINT NOT NULL, -- The numeric ID for the state key of the invite m.room.member event. -- This tells us who the invite is for. -- This is used to query the active invites for a user. - target_state_key_nid BIGINT NOT NULL, + target_nid BIGINT NOT NULL, -- The numeric ID for the sender of the invite m.room.member event. -- This tells us who sent the invite. -- This is used to work out which matrix server we should talk to when -- we try to join the room. - sender_state_key_nid BIGINT NOT NULL DEFAULT 0, - -- The numeric ID of the join or leave event that replaced the invite event. + sender_nid BIGINT NOT NULL DEFAULT 0, -- This is used to track whether the invite is still active. - -- This is set implicitly when processing a KIND_NEW events and explici - -- is set - replaced_by_event_nid BIGINT NOT NULL DEFAULT 0, + -- This is set implicitly when processing KIND_NEW events and explicitly + -- when rejecting events over federation. + retired BOOLEAN NOT NULL DEFAULT FALSE, -- Whether the invite has been sent to the output stream. -- We maintain a separate output stream of invite events since they don't -- always occur within a room we have state for. sent_invite_to_output BOOLEAN NOT NULL DEFAULT FALSE, - -- Whether the replacement has been sent to the output stream. - sent_replaced_to_output BOOLEAN NOT NULL DEFAULT FALSE, + -- Whether the retirement has been sent to the output stream. + sent_retired_to_output BOOLEAN NOT NULL DEFAULT FALSE, + -- The invite event JSON. + invite_event_json TEXT NOT NULL ); CREATE INDEX invites_active_idx ON invites (target_state_key_nid, room_nid) - WHERE replaced_by_event_nid == 0; + WHERE NOT retired; + +CREATE INDEX invites_unsent_retired_idx ON invites (target_state_key_nid, room_nid) + WHERE retired AND NOT sent_retired_to_output; ` -const upsertInviteEventSQL = "" + - "INSERT INTO invites (invite_event_nid, room_nid, target_state_key_nid," + - " sender_state_key_nid) VALUES ($1, $2, $3, $4)" + - " ON CONFLICT ON invite_event_nid_unique" + - " DO UPDATE SET sender_state_key_nid = $4" - -const upsertInviteReplacedBySQL = "" + - "INSERT INTO invites (invite_event_nid, room_nid, target_state_key_nid," + - " replaced_by_event_nid) VALUES ($1, $2, $3, $4)" + - " ON CONFLICT ON invite_event_nid_unique" + - " DO UPDATE SET replaced_by_event_nid = $4" +const insertInviteEventSQL = "" + + "INSERT INTO invites (invite_event_id, room_nid, target_state_key_nid," + + " sender_state_key_nid, invite_event_json) VALUES ($1, $2, $3, $4, $5)" + + " ON CONFLICT DO NOTHING" const selectInviteSQL = "" + - "SELECT replaced_by_event_nid, sent_invite_to_output," + - " sent_replaced_to_output FROM invites" + - " WHERE invite_event_nid = $1" - -const selectActiveInviteForUserInRoomSQL = "" + - "SELECT invite_event_nid, sender_state_key_nid FROM invites" + - " WHERE target_state_key_nid = $1 AND room_nid = $2" + - " AND replaced_by_event_nid = 0" + "SELECT retired, sent_invite_to_output FROM invites" + + " WHERE invite_event_id = $1" const updateInviteSentInviteToOutputSQL = "" + "UPDATE invites SET sent_invite_to_output = TRUE" + - " WHERE invite_event_nid = $1" + " WHERE invite_event_id = $1" -const updateInviteSentReplacedToOutputSQL = "" + - "UPDATE invites SET sent_replaced_to_output = TRUE" + - " WHERE invite_event_nid = $1" +const selectInviteActiveForUserInRoomSQL = "" + + "SELECT invite_event_id, sender_state_key_nid FROM invites" + + " WHERE target_state_key_id = $1 AND room_nid = $2" + + " AND NOT retired" + +// Retire every active invite. +// Ideally we'd know which invite events were retired by a given update so we +// wouldn't need to remove every active invite. +// However the matrix protocol doesn't give us a way to reliably identify the +// invites that were retired, so we are forced to retire all of them. +const updateInviteRetiredSQL = "" + + "UPDATE invites SET retired_by_event_nid = TRUE" + + " WHERE room_nid = $1 AND target_state_key_nid = $2 AND NOT retired" + +const selectInviteUnsentRetiredSQL = "" + + "SELECT invite_event_id FROM invites" + + " WHERE target_state_key_id = $1 AND room_nid = $2" + + " AND retired AND NOT sent_retired_to_output" + +const updateInviteSentRetiredToOutputSQL = "" + + "UPDATE invites SET sent_retired_to_output = TRUE" + + " WHERE invite_event_id = $1" type inviteStatements struct { - upsertInviteEventStmt *sql.Stmt - upsertInviteReplacedByStmt *sql.Stmt - selectInviteStmt *sql.Stmt - selectActiveInviteForUserInRoomStmt *sql.Stmt - updateInviteSentInviteToOutputStmt *sql.Stmt - updateInviteSentReplacedToOutputStmt *sql.Stmt + insertInviteEventStmt *sql.Stmt + selectInviteStmt *sql.Stmt + selectInviteActiveForUserInRoomStmt *sql.Stmt + updateInviteRetiredStmt *sql.Stmt + selectInviteUnsentRetiredStmt *sql.Stmt + updateInviteSentInviteToOutputStmt *sql.Stmt + updateInviteSentRetiredToOutputStmt *sql.Stmt } func (s *inviteStatements) prepare(db *sql.DB) (err error) { @@ -98,52 +113,63 @@ func (s *inviteStatements) prepare(db *sql.DB) (err error) { } return statementList{ - {&s.upsertInviteEventStmt, upsertInviteEventSQL}, - {&s.upsertInviteReplacedByStmt, upsertInviteReplacedBySQL}, + {&s.insertInviteEventStmt, insertInviteEventSQL}, {&s.selectInviteStmt, selectInviteSQL}, - {&s.selectActiveInviteForUserInRoomStmt, selectActiveInviteForUserInRoomSQL}, {&s.updateInviteSentInviteToOutputStmt, updateInviteSentInviteToOutputSQL}, - {&s.updateInviteSentReplacedToOutputStmt, updateInviteSentReplacedToOutputSQL}, + {&s.selectInviteActiveForUserInRoomStmt, selectInviteActiveForUserInRoomSQL}, + {&s.updateInviteRetiredStmt, updateInviteRetiredSQL}, + + {&s.updateInviteSentRetiredToOutputStmt, updateInviteSentRetiredToOutputSQL}, }.prepare(db) } -func (s *inviteStatements) upsertInviteEvent( - inviteEventNID types.EventNID, roomNID types.RoomNID, - targetStateKeyNID, senderStateKeyNID types.EventStateKeyNID, +func (s *inviteStatements) insertInviteEvent( + txn *sql.Tx, inviteEventNID types.EventNID, roomNID types.RoomNID, + targetNID, senderNID types.EventStateKeyNID, + inviteEventJSON []byte, ) error { - _, err := s.upsertInviteEventStmt.Exec( - inviteEventNID, roomNID, targetStateKeyNID, senderStateKeyNID, + _, err := txn.Stmt(s.insertInviteEventStmt).Exec( + inviteEventNID, roomNID, targetNID, senderNID, inviteEventJSON, ) return err } -func (s *inviteStatements) upsertInviteReplacedBy( - inviteEventNID types.EventNID, roomNID types.RoomNID, - targetStateKeyNID types.EventStateKeyNID, - replacedByEventNID types.EventNID, +func (s *inviteStatements) updateInviteRetired( + txn *sql.Tx, roomNID types.RoomNID, targetNID types.EventStateKeyNID, ) error { - _, err := s.upsertInviteReplacedByStmt.Exec( - inviteEventNID, roomNID, targetStateKeyNID, replacedByEventNID, - ) + _, err := txn.Stmt(s.updateInviteRetiredStmt).Exec(roomNID, targetNID) return err } func (s *inviteStatements) selectInvite( - inviteEventNID types.EventNID, -) (replacedByNID types.EventNID, sentInviteToOutput, sentReplacedToOutput bool, err error) { - err = s.selectInviteStmt.QueryRow(inviteEventNID).Scan( - &replacedByNID, &sentInviteToOutput, &sentReplacedToOutput, + txn *sql.Tx, inviteEventNID types.EventNID, +) (RetiredByNID types.EventNID, sentInviteToOutput, sentRetiredToOutput bool, err error) { + err = txn.Stmt(s.selectInviteStmt).QueryRow(inviteEventNID).Scan( + &RetiredByNID, &sentInviteToOutput, &sentRetiredToOutput, ) return } -func (s *inviteStatements) selectActiveInviteForUserInRoom( - targetStateKeyNID types.EventStateKeyNID, roomNID types.RoomNID, -) (inviteEventNID types.EventNID, senderStateKeyNID types.EventStateKeyNID, err error) { - err = s.selectActiveInviteForUserInRoomStmt.QueryRow( - targetStateKeyNID, roomNID, - ).Scan(&inviteEventNID, &senderStateKeyNID) - return +// selectInviteActiveForUserInRoom returns a list of sender state key NIDs +func (s *inviteStatements) selectInviteActiveForUserInRoom( + targetNID types.EventStateKeyNID, roomNID types.RoomNID, +) ([]types.EventStateKeyNID, error) { + rows, err := s.selectInviteActiveForUserInRoomStmt.Query( + targetNID, roomNID, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var result []types.EventStateKeyNID + for rows.Next() { + var senderNID int64 + if err := rows.Scan(&senderNID); err != nil { + return nil, err + } + result = append(result, types.EventStateKeyNID(senderNID)) + } + return result, nil } func (s *inviteStatements) updateInviteSentInviteToOutput( @@ -153,9 +179,9 @@ func (s *inviteStatements) updateInviteSentInviteToOutput( return err } -func (s *inviteStatements) updateInviteSentReplacedToOutput( +func (s *inviteStatements) updateInviteSentRetiredToOutput( inviteEventNID types.EventNID, ) error { - _, err := s.updateInviteSentReplacedToOutputStmt.Exec(inviteEventNID) + _, err := s.updateInviteSentRetiredToOutputStmt.Exec(inviteEventNID) return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/membership_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/membership_table.go new file mode 100644 index 000000000..eada1a925 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/membership_table.go @@ -0,0 +1,106 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "database/sql" + + "github.com/matrix-org/dendrite/roomserver/types" +) + +type membershipState int64 + +const ( + membershipStateLeaveOrBan membershipState = 0 + membershipStateInvite membershipState = 1 + membershipStateJoin membershipState = 2 +) + +const membershipSchema = ` +-- The membership table is used to coordinate updates between the invite table +-- and the room state tables. +-- This table is updated in one of 3 ways: +-- 1) The membership of a user changes within the current state of the room. +-- 2) An invite is received outside of a room over federation. +-- 3) An invite is rejected outside of a room over federation. +CREATE TABLE membership IF NOT EXISTS ( + room_nid BIGINT NOT NULL, + -- Numeric state key ID for the user ID this state is for. + target_nid BIGINT NOT NULL, + -- Numeric state key ID for the user ID who changed the state. + sender_nid BIGINT NOT NULL DEFAULT 0, + -- The state the user is in within this room. + membership_nid BIGINT NOT NULL DEFAULT 0, + UNIQUE (room_nid, target_nid) +); +` + +const insertMembershipSQL = "" + + "INSERT INTO membership (room_nid, target_nid)" + + " VALUES ($1, $2)" + + " ON CONFLICT DO NOTHING" + +const selectMembershipForUpdateSQL = "" + + "SELECT membership_nid FROM membership" + + " WHERE room_nid = $1, target_nid = $2 FOR UPDATE" + +const updateMembershipSQL = "" + + "UPDATE membership SET membership_nid = $3, sender_nid = $4" + + " WHERE room_nid = $1, target_nid = $2" + +type membershipStatements struct { + insertMembershipStmt *sql.Stmt + selectMembershipForUpdateStmt *sql.Stmt + updateMembershipStmt *sql.Stmt +} + +func (s *membershipStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(membershipSchema) + if err != nil { + return + } + + return statementList{ + {&s.insertMembershipStmt, insertMembershipSQL}, + {&s.selectMembershipForUpdateStmt, selectMembershipForUpdateSQL}, + {&s.updateMembershipStmt, updateMembershipSQL}, + }.prepare(db) +} + +func (s *membershipStatements) insertMembership( + txn *sql.Tx, roomNID types.RoomNID, targetNID types.EventStateKeyNID, +) error { + _, err := txn.Stmt(s.insertMembershipStmt).Exec(roomNID, targetNID) + return err +} + +func (s *membershipStatements) selectMembershipForUpdate( + txn *sql.Tx, roomNID types.RoomNID, targetNID types.EventStateKeyNID, +) (membership membershipState, err error) { + err = txn.Stmt(s.selectMembershipForUpdateStmt).QueryRow( + roomNID, targetNID, + ).Scan(&membership) + return +} + +func (s *membershipStatements) updateMembershipSQL( + txn *sql.Tx, roomNID types.RoomNID, targetNID types.EventStateKeyNID, + senderNID types.EventStateKeyNID, membership membershipState, +) error { + _, err := txn.Stmt(s.updateMembershipStmt).Exec( + roomNID, targetNID, senderNID, membership, + ) + return err +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index 162dd7fda..22a33b008 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -16,6 +16,8 @@ package storage import ( "database/sql" + "fmt" + // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/common" @@ -340,6 +342,10 @@ func (u *roomRecentEventsUpdater) Rollback() error { return u.txn.Rollback() } +func (u *roomRecentEventsUpdater) MembershipUpdater(targetNID types.EventStateKeyNID) (types.MembershipUpdater, error) { + panic(fmt.Errorf("Not implemented")) +} + // RoomNID implements query.RoomserverQueryAPIDB func (d *Database) RoomNID(roomID string) (types.RoomNID, error) { roomNID, err := d.statements.selectRoomNID(roomID) @@ -372,41 +378,3 @@ func (d *Database) StateEntriesForTuples( ) ([]types.StateEntryList, error) { return d.statements.bulkSelectFilteredStateBlockEntries(stateBlockNIDs, stateKeyTuples) } - -// StoreInvite implements input.EventDatabase -func (d *Database) StoreInvite( - roomNID types.RoomNID, inviteEventNID types.EventNID, - targetNID types.EventStateKeyNID, senderID string, -) (replacedByNID types.EventNID, sentInviteToOutput bool, err error) { - var senderNID types.EventStateKeyNID - if senderNID, err = d.assignStateKeyNID(senderID); err != nil { - return - } - - if err = d.statements.upsertInviteEvent(inviteEventNID, roomNID, targetNID, senderNID); err != nil { - return - } - - if replacedByNID, sentInviteToOutput, _, err = d.statements.selectInvite(inviteEventNID); err != nil { - return - } - - return -} - -// MarkInviteAsSent implements input.EventDatabase -func (d *Database) MarkInviteAsSent(inviteEventNID types.EventNID) error { - return d.statements.updateInviteSentInviteToOutput(inviteEventNID) -} - -// MarkInviteReplacedAsSent implements input.EventDatabase -func (d *Database) MarkInviteReplacedAsSent(inviteEventNID types.EventNID) error { - return d.statements.updateInviteSentReplacedToOutput(inviteEventNID) -} - -// LookupInviteForUserInRoom implements query.RoomserverQueryAPIDB -func (d *Database) LookupInviteForUserInRoom( - roomNID types.RoomNID, targetNID types.EventStateKeyNID, -) (eventNID types.EventNID, senderNID types.EventStateKeyNID, err error) { - return d.statements.selectActiveInviteForUserInRoom(targetNID, roomNID) -} diff --git a/src/github.com/matrix-org/dendrite/roomserver/types/types.go b/src/github.com/matrix-org/dendrite/roomserver/types/types.go index b255b64b9..c01d18a09 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/types/types.go +++ b/src/github.com/matrix-org/dendrite/roomserver/types/types.go @@ -135,9 +135,17 @@ type StateEntryList struct { StateEntries []StateEntry } +// A Transaction is something that can be committed or rolledback. +type Transaction interface { + // Commit the transaction + Commit() error + // Rollback the transaction. + Rollback() error +} + // A RoomRecentEventsUpdater is used to update the recent events in a room. // (On postgresql this wraps a database transaction that holds a "FOR UPDATE" -// lock on the row holding the latest events for the room.) +// lock on the row in the rooms table holding the latest events for the room.) type RoomRecentEventsUpdater interface { // The latest event IDs and state in the room. LatestEvents() []StateAtEventAndReference @@ -163,10 +171,35 @@ type RoomRecentEventsUpdater interface { HasEventBeenSent(eventNID EventNID) (bool, error) // Mark the event as having been sent to the output logs. MarkEventAsSent(eventNID EventNID) error - // Commit the transaction - Commit() error - // Rollback the transaction. - Rollback() error + // Build a membership updater for the target user in this room. + // It will share the same transaction as this updater. + MembershipUpdater(targetNID EventStateKeyNID) (MembershipUpdater, error) + // Implements Transaction so it can be committed or rolledback + Transaction +} + +// A MembershipUpdater is used to update the membership of a user in a room. +// (On postgresql this wraps a database transaction that holds a "FOR UPDATE" +// lock on the row in the membership table for this user in the room) +type MembershipUpdater interface { + // True if the target user is invited to the room. + IsInvite() bool + // True if the target user is joined to the room. + IsJoin() bool + // Set the state to invite. + // Returns whether this invite needs to be sent + SetToInviteFrom(senderID string, event gomatrixserverlib.Event) (needsSending bool, err error) + // Set the state to join. + SetToJoinFrom(senderID string) (inviteIDs []string, err error) + // Set the state to leave. + // Returns a list of invite event IDs that this state change retired. + SetToLeaveFrom(senderID string) (inviteIDs []string, err error) + // Mark the invite as sent. + MarkInviteAsSent(inviteID string) error + // Mark the invite retirement as sent. + MarkInviteRetirementAsSent(inviteIDs []string) error + // Implements Transaction so it can be committed or rolledback. + Transaction } // A MissingEventError is an error that happened because the roomserver was