Add table for tracking membership state

This commit is contained in:
Mark Haines 2017-07-05 11:41:16 +01:00
parent 3ea0644002
commit f68d31d011
4 changed files with 245 additions and 112 deletions

View file

@ -22,73 +22,88 @@ import (
const inviteSchema = ` const inviteSchema = `
CREATE TABLE invites ( CREATE TABLE invites (
-- The numeric ID of the invite event itself. -- The string ID of the invite event itself.
invite_event_nid BIGINT NOT NULL CONSTRAINT invite_event_nid_unique UNIQUE, -- We can't use a numeric event ID here because we don't always have
-- enough information to store an invite in the event table.
-- In particular we don't always have a chain of auth_events for invites
-- received over federation.
invite_event_id TEXT PRIMARY KEY,
-- The numeric ID of the room the invite m.room.member event is in. -- The numeric ID of the room the invite m.room.member event is in.
room_nid BIGINT NOT NULL, room_nid BIGINT NOT NULL,
-- The numeric ID for the state key of the invite m.room.member event. -- The numeric ID for the state key of the invite m.room.member event.
-- This tells us who the invite is for. -- This tells us who the invite is for.
-- This is used to query the active invites for a user. -- This is used to query the active invites for a user.
target_state_key_nid BIGINT NOT NULL, target_nid BIGINT NOT NULL,
-- The numeric ID for the sender of the invite m.room.member event. -- The numeric ID for the sender of the invite m.room.member event.
-- This tells us who sent the invite. -- This tells us who sent the invite.
-- This is used to work out which matrix server we should talk to when -- This is used to work out which matrix server we should talk to when
-- we try to join the room. -- we try to join the room.
sender_state_key_nid BIGINT NOT NULL DEFAULT 0, sender_nid BIGINT NOT NULL DEFAULT 0,
-- The numeric ID of the join or leave event that replaced the invite event.
-- This is used to track whether the invite is still active. -- This is used to track whether the invite is still active.
-- This is set implicitly when processing a KIND_NEW events and explici -- This is set implicitly when processing KIND_NEW events and explicitly
-- is set -- when rejecting events over federation.
replaced_by_event_nid BIGINT NOT NULL DEFAULT 0, retired BOOLEAN NOT NULL DEFAULT FALSE,
-- Whether the invite has been sent to the output stream. -- Whether the invite has been sent to the output stream.
-- We maintain a separate output stream of invite events since they don't -- We maintain a separate output stream of invite events since they don't
-- always occur within a room we have state for. -- always occur within a room we have state for.
sent_invite_to_output BOOLEAN NOT NULL DEFAULT FALSE, sent_invite_to_output BOOLEAN NOT NULL DEFAULT FALSE,
-- Whether the replacement has been sent to the output stream. -- Whether the retirement has been sent to the output stream.
sent_replaced_to_output BOOLEAN NOT NULL DEFAULT FALSE, sent_retired_to_output BOOLEAN NOT NULL DEFAULT FALSE,
-- The invite event JSON.
invite_event_json TEXT NOT NULL
); );
CREATE INDEX invites_active_idx ON invites (target_state_key_nid, room_nid) CREATE INDEX invites_active_idx ON invites (target_state_key_nid, room_nid)
WHERE replaced_by_event_nid == 0; WHERE NOT retired;
CREATE INDEX invites_unsent_retired_idx ON invites (target_state_key_nid, room_nid)
WHERE retired AND NOT sent_retired_to_output;
` `
const upsertInviteEventSQL = "" + const insertInviteEventSQL = "" +
"INSERT INTO invites (invite_event_nid, room_nid, target_state_key_nid," + "INSERT INTO invites (invite_event_id, room_nid, target_state_key_nid," +
" sender_state_key_nid) VALUES ($1, $2, $3, $4)" + " sender_state_key_nid, invite_event_json) VALUES ($1, $2, $3, $4, $5)" +
" ON CONFLICT ON invite_event_nid_unique" + " ON CONFLICT DO NOTHING"
" DO UPDATE SET sender_state_key_nid = $4"
const upsertInviteReplacedBySQL = "" +
"INSERT INTO invites (invite_event_nid, room_nid, target_state_key_nid," +
" replaced_by_event_nid) VALUES ($1, $2, $3, $4)" +
" ON CONFLICT ON invite_event_nid_unique" +
" DO UPDATE SET replaced_by_event_nid = $4"
const selectInviteSQL = "" + const selectInviteSQL = "" +
"SELECT replaced_by_event_nid, sent_invite_to_output," + "SELECT retired, sent_invite_to_output FROM invites" +
" sent_replaced_to_output FROM invites" + " WHERE invite_event_id = $1"
" WHERE invite_event_nid = $1"
const selectActiveInviteForUserInRoomSQL = "" +
"SELECT invite_event_nid, sender_state_key_nid FROM invites" +
" WHERE target_state_key_nid = $1 AND room_nid = $2" +
" AND replaced_by_event_nid = 0"
const updateInviteSentInviteToOutputSQL = "" + const updateInviteSentInviteToOutputSQL = "" +
"UPDATE invites SET sent_invite_to_output = TRUE" + "UPDATE invites SET sent_invite_to_output = TRUE" +
" WHERE invite_event_nid = $1" " WHERE invite_event_id = $1"
const updateInviteSentReplacedToOutputSQL = "" + const selectInviteActiveForUserInRoomSQL = "" +
"UPDATE invites SET sent_replaced_to_output = TRUE" + "SELECT invite_event_id, sender_state_key_nid FROM invites" +
" WHERE invite_event_nid = $1" " WHERE target_state_key_id = $1 AND room_nid = $2" +
" AND NOT retired"
// Retire every active invite.
// Ideally we'd know which invite events were retired by a given update so we
// wouldn't need to remove every active invite.
// However the matrix protocol doesn't give us a way to reliably identify the
// invites that were retired, so we are forced to retire all of them.
const updateInviteRetiredSQL = "" +
"UPDATE invites SET retired_by_event_nid = TRUE" +
" WHERE room_nid = $1 AND target_state_key_nid = $2 AND NOT retired"
const selectInviteUnsentRetiredSQL = "" +
"SELECT invite_event_id FROM invites" +
" WHERE target_state_key_id = $1 AND room_nid = $2" +
" AND retired AND NOT sent_retired_to_output"
const updateInviteSentRetiredToOutputSQL = "" +
"UPDATE invites SET sent_retired_to_output = TRUE" +
" WHERE invite_event_id = $1"
type inviteStatements struct { type inviteStatements struct {
upsertInviteEventStmt *sql.Stmt insertInviteEventStmt *sql.Stmt
upsertInviteReplacedByStmt *sql.Stmt selectInviteStmt *sql.Stmt
selectInviteStmt *sql.Stmt selectInviteActiveForUserInRoomStmt *sql.Stmt
selectActiveInviteForUserInRoomStmt *sql.Stmt updateInviteRetiredStmt *sql.Stmt
updateInviteSentInviteToOutputStmt *sql.Stmt selectInviteUnsentRetiredStmt *sql.Stmt
updateInviteSentReplacedToOutputStmt *sql.Stmt updateInviteSentInviteToOutputStmt *sql.Stmt
updateInviteSentRetiredToOutputStmt *sql.Stmt
} }
func (s *inviteStatements) prepare(db *sql.DB) (err error) { func (s *inviteStatements) prepare(db *sql.DB) (err error) {
@ -98,52 +113,63 @@ func (s *inviteStatements) prepare(db *sql.DB) (err error) {
} }
return statementList{ return statementList{
{&s.upsertInviteEventStmt, upsertInviteEventSQL}, {&s.insertInviteEventStmt, insertInviteEventSQL},
{&s.upsertInviteReplacedByStmt, upsertInviteReplacedBySQL},
{&s.selectInviteStmt, selectInviteSQL}, {&s.selectInviteStmt, selectInviteSQL},
{&s.selectActiveInviteForUserInRoomStmt, selectActiveInviteForUserInRoomSQL},
{&s.updateInviteSentInviteToOutputStmt, updateInviteSentInviteToOutputSQL}, {&s.updateInviteSentInviteToOutputStmt, updateInviteSentInviteToOutputSQL},
{&s.updateInviteSentReplacedToOutputStmt, updateInviteSentReplacedToOutputSQL}, {&s.selectInviteActiveForUserInRoomStmt, selectInviteActiveForUserInRoomSQL},
{&s.updateInviteRetiredStmt, updateInviteRetiredSQL},
{&s.updateInviteSentRetiredToOutputStmt, updateInviteSentRetiredToOutputSQL},
}.prepare(db) }.prepare(db)
} }
func (s *inviteStatements) upsertInviteEvent( func (s *inviteStatements) insertInviteEvent(
inviteEventNID types.EventNID, roomNID types.RoomNID, txn *sql.Tx, inviteEventNID types.EventNID, roomNID types.RoomNID,
targetStateKeyNID, senderStateKeyNID types.EventStateKeyNID, targetNID, senderNID types.EventStateKeyNID,
inviteEventJSON []byte,
) error { ) error {
_, err := s.upsertInviteEventStmt.Exec( _, err := txn.Stmt(s.insertInviteEventStmt).Exec(
inviteEventNID, roomNID, targetStateKeyNID, senderStateKeyNID, inviteEventNID, roomNID, targetNID, senderNID, inviteEventJSON,
) )
return err return err
} }
func (s *inviteStatements) upsertInviteReplacedBy( func (s *inviteStatements) updateInviteRetired(
inviteEventNID types.EventNID, roomNID types.RoomNID, txn *sql.Tx, roomNID types.RoomNID, targetNID types.EventStateKeyNID,
targetStateKeyNID types.EventStateKeyNID,
replacedByEventNID types.EventNID,
) error { ) error {
_, err := s.upsertInviteReplacedByStmt.Exec( _, err := txn.Stmt(s.updateInviteRetiredStmt).Exec(roomNID, targetNID)
inviteEventNID, roomNID, targetStateKeyNID, replacedByEventNID,
)
return err return err
} }
func (s *inviteStatements) selectInvite( func (s *inviteStatements) selectInvite(
inviteEventNID types.EventNID, txn *sql.Tx, inviteEventNID types.EventNID,
) (replacedByNID types.EventNID, sentInviteToOutput, sentReplacedToOutput bool, err error) { ) (RetiredByNID types.EventNID, sentInviteToOutput, sentRetiredToOutput bool, err error) {
err = s.selectInviteStmt.QueryRow(inviteEventNID).Scan( err = txn.Stmt(s.selectInviteStmt).QueryRow(inviteEventNID).Scan(
&replacedByNID, &sentInviteToOutput, &sentReplacedToOutput, &RetiredByNID, &sentInviteToOutput, &sentRetiredToOutput,
) )
return return
} }
func (s *inviteStatements) selectActiveInviteForUserInRoom( // selectInviteActiveForUserInRoom returns a list of sender state key NIDs
targetStateKeyNID types.EventStateKeyNID, roomNID types.RoomNID, func (s *inviteStatements) selectInviteActiveForUserInRoom(
) (inviteEventNID types.EventNID, senderStateKeyNID types.EventStateKeyNID, err error) { targetNID types.EventStateKeyNID, roomNID types.RoomNID,
err = s.selectActiveInviteForUserInRoomStmt.QueryRow( ) ([]types.EventStateKeyNID, error) {
targetStateKeyNID, roomNID, rows, err := s.selectInviteActiveForUserInRoomStmt.Query(
).Scan(&inviteEventNID, &senderStateKeyNID) targetNID, roomNID,
return )
if err != nil {
return nil, err
}
defer rows.Close()
var result []types.EventStateKeyNID
for rows.Next() {
var senderNID int64
if err := rows.Scan(&senderNID); err != nil {
return nil, err
}
result = append(result, types.EventStateKeyNID(senderNID))
}
return result, nil
} }
func (s *inviteStatements) updateInviteSentInviteToOutput( func (s *inviteStatements) updateInviteSentInviteToOutput(
@ -153,9 +179,9 @@ func (s *inviteStatements) updateInviteSentInviteToOutput(
return err return err
} }
func (s *inviteStatements) updateInviteSentReplacedToOutput( func (s *inviteStatements) updateInviteSentRetiredToOutput(
inviteEventNID types.EventNID, inviteEventNID types.EventNID,
) error { ) error {
_, err := s.updateInviteSentReplacedToOutputStmt.Exec(inviteEventNID) _, err := s.updateInviteSentRetiredToOutputStmt.Exec(inviteEventNID)
return err return err
} }

View file

@ -0,0 +1,106 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"database/sql"
"github.com/matrix-org/dendrite/roomserver/types"
)
type membershipState int64
const (
membershipStateLeaveOrBan membershipState = 0
membershipStateInvite membershipState = 1
membershipStateJoin membershipState = 2
)
const membershipSchema = `
-- The membership table is used to coordinate updates between the invite table
-- and the room state tables.
-- This table is updated in one of 3 ways:
-- 1) The membership of a user changes within the current state of the room.
-- 2) An invite is received outside of a room over federation.
-- 3) An invite is rejected outside of a room over federation.
CREATE TABLE membership IF NOT EXISTS (
room_nid BIGINT NOT NULL,
-- Numeric state key ID for the user ID this state is for.
target_nid BIGINT NOT NULL,
-- Numeric state key ID for the user ID who changed the state.
sender_nid BIGINT NOT NULL DEFAULT 0,
-- The state the user is in within this room.
membership_nid BIGINT NOT NULL DEFAULT 0,
UNIQUE (room_nid, target_nid)
);
`
const insertMembershipSQL = "" +
"INSERT INTO membership (room_nid, target_nid)" +
" VALUES ($1, $2)" +
" ON CONFLICT DO NOTHING"
const selectMembershipForUpdateSQL = "" +
"SELECT membership_nid FROM membership" +
" WHERE room_nid = $1, target_nid = $2 FOR UPDATE"
const updateMembershipSQL = "" +
"UPDATE membership SET membership_nid = $3, sender_nid = $4" +
" WHERE room_nid = $1, target_nid = $2"
type membershipStatements struct {
insertMembershipStmt *sql.Stmt
selectMembershipForUpdateStmt *sql.Stmt
updateMembershipStmt *sql.Stmt
}
func (s *membershipStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(membershipSchema)
if err != nil {
return
}
return statementList{
{&s.insertMembershipStmt, insertMembershipSQL},
{&s.selectMembershipForUpdateStmt, selectMembershipForUpdateSQL},
{&s.updateMembershipStmt, updateMembershipSQL},
}.prepare(db)
}
func (s *membershipStatements) insertMembership(
txn *sql.Tx, roomNID types.RoomNID, targetNID types.EventStateKeyNID,
) error {
_, err := txn.Stmt(s.insertMembershipStmt).Exec(roomNID, targetNID)
return err
}
func (s *membershipStatements) selectMembershipForUpdate(
txn *sql.Tx, roomNID types.RoomNID, targetNID types.EventStateKeyNID,
) (membership membershipState, err error) {
err = txn.Stmt(s.selectMembershipForUpdateStmt).QueryRow(
roomNID, targetNID,
).Scan(&membership)
return
}
func (s *membershipStatements) updateMembershipSQL(
txn *sql.Tx, roomNID types.RoomNID, targetNID types.EventStateKeyNID,
senderNID types.EventStateKeyNID, membership membershipState,
) error {
_, err := txn.Stmt(s.updateMembershipStmt).Exec(
roomNID, targetNID, senderNID, membership,
)
return err
}

View file

@ -16,6 +16,8 @@ package storage
import ( import (
"database/sql" "database/sql"
"fmt"
// Import the postgres database driver. // Import the postgres database driver.
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
@ -340,6 +342,10 @@ func (u *roomRecentEventsUpdater) Rollback() error {
return u.txn.Rollback() return u.txn.Rollback()
} }
func (u *roomRecentEventsUpdater) MembershipUpdater(targetNID types.EventStateKeyNID) (types.MembershipUpdater, error) {
panic(fmt.Errorf("Not implemented"))
}
// RoomNID implements query.RoomserverQueryAPIDB // RoomNID implements query.RoomserverQueryAPIDB
func (d *Database) RoomNID(roomID string) (types.RoomNID, error) { func (d *Database) RoomNID(roomID string) (types.RoomNID, error) {
roomNID, err := d.statements.selectRoomNID(roomID) roomNID, err := d.statements.selectRoomNID(roomID)
@ -372,41 +378,3 @@ func (d *Database) StateEntriesForTuples(
) ([]types.StateEntryList, error) { ) ([]types.StateEntryList, error) {
return d.statements.bulkSelectFilteredStateBlockEntries(stateBlockNIDs, stateKeyTuples) return d.statements.bulkSelectFilteredStateBlockEntries(stateBlockNIDs, stateKeyTuples)
} }
// StoreInvite implements input.EventDatabase
func (d *Database) StoreInvite(
roomNID types.RoomNID, inviteEventNID types.EventNID,
targetNID types.EventStateKeyNID, senderID string,
) (replacedByNID types.EventNID, sentInviteToOutput bool, err error) {
var senderNID types.EventStateKeyNID
if senderNID, err = d.assignStateKeyNID(senderID); err != nil {
return
}
if err = d.statements.upsertInviteEvent(inviteEventNID, roomNID, targetNID, senderNID); err != nil {
return
}
if replacedByNID, sentInviteToOutput, _, err = d.statements.selectInvite(inviteEventNID); err != nil {
return
}
return
}
// MarkInviteAsSent implements input.EventDatabase
func (d *Database) MarkInviteAsSent(inviteEventNID types.EventNID) error {
return d.statements.updateInviteSentInviteToOutput(inviteEventNID)
}
// MarkInviteReplacedAsSent implements input.EventDatabase
func (d *Database) MarkInviteReplacedAsSent(inviteEventNID types.EventNID) error {
return d.statements.updateInviteSentReplacedToOutput(inviteEventNID)
}
// LookupInviteForUserInRoom implements query.RoomserverQueryAPIDB
func (d *Database) LookupInviteForUserInRoom(
roomNID types.RoomNID, targetNID types.EventStateKeyNID,
) (eventNID types.EventNID, senderNID types.EventStateKeyNID, err error) {
return d.statements.selectActiveInviteForUserInRoom(targetNID, roomNID)
}

View file

@ -135,9 +135,17 @@ type StateEntryList struct {
StateEntries []StateEntry StateEntries []StateEntry
} }
// A Transaction is something that can be committed or rolledback.
type Transaction interface {
// Commit the transaction
Commit() error
// Rollback the transaction.
Rollback() error
}
// A RoomRecentEventsUpdater is used to update the recent events in a room. // A RoomRecentEventsUpdater is used to update the recent events in a room.
// (On postgresql this wraps a database transaction that holds a "FOR UPDATE" // (On postgresql this wraps a database transaction that holds a "FOR UPDATE"
// lock on the row holding the latest events for the room.) // lock on the row in the rooms table holding the latest events for the room.)
type RoomRecentEventsUpdater interface { type RoomRecentEventsUpdater interface {
// The latest event IDs and state in the room. // The latest event IDs and state in the room.
LatestEvents() []StateAtEventAndReference LatestEvents() []StateAtEventAndReference
@ -163,10 +171,35 @@ type RoomRecentEventsUpdater interface {
HasEventBeenSent(eventNID EventNID) (bool, error) HasEventBeenSent(eventNID EventNID) (bool, error)
// Mark the event as having been sent to the output logs. // Mark the event as having been sent to the output logs.
MarkEventAsSent(eventNID EventNID) error MarkEventAsSent(eventNID EventNID) error
// Commit the transaction // Build a membership updater for the target user in this room.
Commit() error // It will share the same transaction as this updater.
// Rollback the transaction. MembershipUpdater(targetNID EventStateKeyNID) (MembershipUpdater, error)
Rollback() error // Implements Transaction so it can be committed or rolledback
Transaction
}
// A MembershipUpdater is used to update the membership of a user in a room.
// (On postgresql this wraps a database transaction that holds a "FOR UPDATE"
// lock on the row in the membership table for this user in the room)
type MembershipUpdater interface {
// True if the target user is invited to the room.
IsInvite() bool
// True if the target user is joined to the room.
IsJoin() bool
// Set the state to invite.
// Returns whether this invite needs to be sent
SetToInviteFrom(senderID string, event gomatrixserverlib.Event) (needsSending bool, err error)
// Set the state to join.
SetToJoinFrom(senderID string) (inviteIDs []string, err error)
// Set the state to leave.
// Returns a list of invite event IDs that this state change retired.
SetToLeaveFrom(senderID string) (inviteIDs []string, err error)
// Mark the invite as sent.
MarkInviteAsSent(inviteID string) error
// Mark the invite retirement as sent.
MarkInviteRetirementAsSent(inviteIDs []string) error
// Implements Transaction so it can be committed or rolledback.
Transaction
} }
// A MissingEventError is an error that happened because the roomserver was // A MissingEventError is an error that happened because the roomserver was