Add input API for adding invites to the roomserver.

This API handles invites received over federation that occur outside of a room.
This commit is contained in:
Mark Haines 2017-08-17 14:32:40 +01:00
parent 877ea5cb62
commit 47a23e1a0c
6 changed files with 136 additions and 33 deletions

View file

@ -68,9 +68,17 @@ type InputRoomEvent struct {
SendAsServer string `json:"send_as_server"` SendAsServer string `json:"send_as_server"`
} }
// InputInviteEvent is a matrix invite event received over federation without
// the usual context a matrix room event would have. We usually do not have
// access to the events needed to check the event auth rules for the invite.
type InputInviteEvent struct {
Event gomatrixserverlib.Event `json:"event"`
}
// InputRoomEventsRequest is a request to InputRoomEvents // InputRoomEventsRequest is a request to InputRoomEvents
type InputRoomEventsRequest struct { type InputRoomEventsRequest struct {
InputRoomEvents []InputRoomEvent `json:"input_room_events"` InputRoomEvents []InputRoomEvent `json:"input_room_events"`
InputInviteEvents []InputInviteEvent `json:"input_invite_events"`
} }
// InputRoomEventsResponse is a response to InputRoomEvents // InputRoomEventsResponse is a response to InputRoomEvents

View file

@ -15,6 +15,8 @@
package input package input
import ( import (
"fmt"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
@ -39,6 +41,8 @@ type RoomEventDatabase interface {
GetLatestEventsForUpdate(roomNID types.RoomNID) (updater types.RoomRecentEventsUpdater, err error) GetLatestEventsForUpdate(roomNID types.RoomNID) (updater types.RoomRecentEventsUpdater, err error)
// Lookup the string event IDs for a list of numeric event IDs // Lookup the string event IDs for a list of numeric event IDs
EventIDs(eventNIDs []types.EventNID) (map[types.EventNID]string, error) EventIDs(eventNIDs []types.EventNID) (map[types.EventNID]string, error)
// Build a membership updater for the target user in a room.
MembershipUpdater(roomID, targerUserID string) (types.MembershipUpdater, error)
} }
// OutputRoomEventWriter has the APIs needed to write an event to the output logs. // OutputRoomEventWriter has the APIs needed to write an event to the output logs.
@ -103,13 +107,81 @@ func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.
return err return err
} }
// TODO:
// * Caculate the new current state for the room if the forward extremities have changed.
// * Work out the delta between the new current state and the previous current state.
// * Work out the visibility of the event.
// * Write a message to the output logs containing:
// - The event itself
// - The visiblity of the event, i.e. who is allowed to see the event.
// - The changes to the current state of the room.
return nil return nil
} }
func processInviteEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputInviteEvent) (err error) {
if input.Event.StateKey() == nil {
return fmt.Errorf("invite must be a state event")
}
roomID := input.Event.RoomID()
targetUserID := *input.Event.StateKey()
updater, err := db.MembershipUpdater(roomID, targetUserID)
if err != nil {
return err
}
return withTransaction(updater, func() error {
if updater.IsJoin() {
// If the user is joined to the room then that takes precedence over this
// invite event. It makes little sense to move a user that is already
// joined to the room into the invite state.
// This could plausibly happen if an invite request raced with a join
// request for a user. For example if a user was invited to a public
// room and they joined the room at the same time as the invite was sent.
// The other way this could plausibly happen is if an invite raced with
// a kick. For example if a user was kicked from a room in error and in
// response someone else in the room re-invited them then it is possible
// for the invite request to race with the leave event so that the
// target receives invite before it learns that it has been kicked.
// There are a few ways this could be plausibly handled in the roomserver.
// 1) Store the invite, but mark it as retired. That will result in the
// permanent rejection of that invite event. So even if the target
// user leaves the room and the invite is retransmitted it will be
// ignored. However a new invite with a new event ID would still be
// accepted.
// 2) Silently discard the invite event. This means that if the event
// was retransmitted at a later date after the target user had left
// the room we would accept the invite. However since we hadn't told
// the sending server that the invite had been discarded it would
// have no reason to attempt to retry.
// 3) Signal the sending server that the user is already joined to the
// room.
// For now we will implement option 2. Since in the abesence of a retry
// mechanism it will be equivalent to option 1, and we don't have a
// signalling mechanism to implement option 3.
return nil
}
outputUpdates, err := updateToInviteMembership(updater, &input.Event, nil)
if err != nil {
return err
}
return ow.WriteOutputEvents(roomID, outputUpdates)
})
}
func withTransaction(t types.Transaction, f func() error) (err error) {
defer func() {
if r := recover(); r != nil {
t.Rollback()
panic(r)
}
if err != nil {
// Ignore any error we get rolling back since we don't want to
// clobber the current error
// TODO: log the error here.
t.Rollback()
} else {
// Commit if there wasn't an error.
// Set the returned err value if we encounter an error committing.
// This only works because err is a named return.
err = t.Commit()
}
}()
return f()
}

View file

@ -61,6 +61,11 @@ func (r *RoomserverInputAPI) InputRoomEvents(
return err return err
} }
} }
for i := range request.InputInviteEvents {
if err := processInviteEvent(r.DB, r, request.InputInviteEvents[i]); err != nil {
return err
}
}
return nil return nil
} }

View file

@ -52,25 +52,12 @@ func updateLatestEvents(
if err != nil { if err != nil {
return return
} }
defer func() {
if err == nil {
// Commit if there wasn't an error.
// Set the returned err value if we encounter an error committing.
// This only works because err is a named return.
err = updater.Commit()
} else {
// Ignore any error we get rolling back since we don't want to
// clobber the current error
// TODO: log the error here.
updater.Rollback()
}
}()
u := latestEventsUpdater{ u := latestEventsUpdater{
db: db, updater: updater, ow: ow, roomNID: roomNID, db: db, updater: updater, ow: ow, roomNID: roomNID,
stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer, stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer,
} }
return u.doUpdateLatestEvents() return withTransaction(updater, u.doUpdateLatestEvents)
} }
// latestEventsUpdater tracks the state used to update the latest events in the // latestEventsUpdater tracks the state used to update the latest events in the

View file

@ -80,15 +80,23 @@ func (s *roomStatements) prepare(db *sql.DB) (err error) {
}.prepare(db) }.prepare(db)
} }
func (s *roomStatements) insertRoomNID(roomID string) (types.RoomNID, error) { func (s *roomStatements) insertRoomNID(txn *sql.Tx, roomID string) (types.RoomNID, error) {
var roomNID int64 var roomNID int64
err := s.insertRoomNIDStmt.QueryRow(roomID).Scan(&roomNID) stmt := s.insertRoomNIDStmt
if txn != nil {
stmt = txn.Stmt(stmt)
}
err := stmt.QueryRow(roomID).Scan(&roomNID)
return types.RoomNID(roomNID), err return types.RoomNID(roomNID), err
} }
func (s *roomStatements) selectRoomNID(roomID string) (types.RoomNID, error) { func (s *roomStatements) selectRoomNID(txn *sql.Tx, roomID string) (types.RoomNID, error) {
var roomNID int64 var roomNID int64
err := s.selectRoomNIDStmt.QueryRow(roomID).Scan(&roomNID) stmt := s.selectRoomNIDStmt
if txn != nil {
stmt = txn.Stmt(stmt)
}
err := stmt.QueryRow(roomID).Scan(&roomNID)
return types.RoomNID(roomNID), err return types.RoomNID(roomNID), err
} }

View file

@ -17,6 +17,8 @@ package storage
import ( import (
"database/sql" "database/sql"
"github.com/matrix-org/dendrite/common"
// Import the postgres database driver. // Import the postgres database driver.
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
@ -53,7 +55,7 @@ func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []typ
err error err error
) )
if roomNID, err = d.assignRoomNID(event.RoomID()); err != nil { if roomNID, err = d.assignRoomNID(nil, event.RoomID()); err != nil {
return 0, types.StateAtEvent{}, err return 0, types.StateAtEvent{}, err
} }
@ -104,15 +106,15 @@ func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []typ
}, nil }, nil
} }
func (d *Database) assignRoomNID(roomID string) (types.RoomNID, error) { func (d *Database) assignRoomNID(txn *sql.Tx, roomID string) (types.RoomNID, error) {
// Check if we already have a numeric ID in the database. // Check if we already have a numeric ID in the database.
roomNID, err := d.statements.selectRoomNID(roomID) roomNID, err := d.statements.selectRoomNID(txn, roomID)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
// We don't have a numeric ID so insert one into the database. // We don't have a numeric ID so insert one into the database.
roomNID, err = d.statements.insertRoomNID(roomID) roomNID, err = d.statements.insertRoomNID(txn, roomID)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
// We raced with another insert so run the select again. // We raced with another insert so run the select again.
roomNID, err = d.statements.selectRoomNID(roomID) roomNID, err = d.statements.selectRoomNID(txn, roomID)
} }
} }
return roomNID, err return roomNID, err
@ -329,7 +331,7 @@ func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventSta
// RoomNID implements query.RoomserverQueryAPIDB // RoomNID implements query.RoomserverQueryAPIDB
func (d *Database) RoomNID(roomID string) (types.RoomNID, error) { func (d *Database) RoomNID(roomID string) (types.RoomNID, error) {
roomNID, err := d.statements.selectRoomNID(roomID) roomNID, err := d.statements.selectRoomNID(nil, roomID)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
return 0, nil return 0, nil
} }
@ -380,6 +382,27 @@ func (d *Database) StateEntriesForTuples(
return d.statements.bulkSelectFilteredStateBlockEntries(stateBlockNIDs, stateKeyTuples) return d.statements.bulkSelectFilteredStateBlockEntries(stateBlockNIDs, stateKeyTuples)
} }
// MembershipUpdater implements input.RoomEventDatabase
func (d *Database) MembershipUpdater(roomID, targetUserID string) (updater types.MembershipUpdater, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
var (
roomNID types.RoomNID
targetUserNID types.EventStateKeyNID
)
roomNID, err = d.assignRoomNID(txn, roomID)
if err != nil {
return err
}
targetUserNID, err = d.assignStateKeyNID(txn, targetUserID)
if err != nil {
return err
}
updater, err = d.membershipUpdaterTxn(txn, roomNID, targetUserNID)
return err
})
return
}
type membershipUpdater struct { type membershipUpdater struct {
transaction transaction
d *Database d *Database