diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/output.go b/src/github.com/matrix-org/dendrite/roomserver/api/output.go index 953fe3c8f..6a5c924c6 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/output.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/output.go @@ -123,6 +123,8 @@ type OutputNewInviteEvent struct { type OutputRetireInviteEvent struct { // The ID of the "m.room.member" invite event. EventID string + // The target user ID of the "m.room.member" invite event that was retired. + TargetUserID string // Optional event ID of the event that replaced the invite. // This can be empty if the invite was rejected locally and we were unable // to reach the server that originally sent the invite. diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/membership.go b/src/github.com/matrix-org/dendrite/roomserver/input/membership.go index cd09001e1..4c42cadd9 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/membership.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/membership.go @@ -182,11 +182,10 @@ func updateToJoinMembership( } for _, eventID := range retired { orie := api.OutputRetireInviteEvent{ - EventID: eventID, - Membership: join, - } - if add != nil { - orie.RetiredByEventID = add.EventID() + EventID: eventID, + Membership: join, + RetiredByEventID: add.EventID(), + TargetUserID: *add.StateKey(), } updates = append(updates, api.OutputEvent{ Type: api.OutputTypeRetireInviteEvent, @@ -215,11 +214,10 @@ func updateToLeaveMembership( } for _, eventID := range retired { orie := api.OutputRetireInviteEvent{ - EventID: eventID, - Membership: newMembership, - } - if add != nil { - orie.RetiredByEventID = add.EventID() + EventID: eventID, + Membership: newMembership, + RetiredByEventID: add.EventID(), + TargetUserID: *add.StateKey(), } updates = append(updates, api.OutputEvent{ Type: api.OutputTypeRetireInviteEvent, diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index fc547e92b..911ccd0d9 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -86,26 +86,37 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { return nil } - if output.Type != api.OutputTypeNewRoomEvent { + switch output.Type { + case api.OutputTypeNewRoomEvent: + return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) + case api.OutputTypeNewInviteEvent: + return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) + case api.OutputTypeRetireInviteEvent: + return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent) + default: log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", ) return nil } +} - ev := output.NewRoomEvent.Event +func (s *OutputRoomEvent) onNewRoomEvent( + ctx context.Context, msg api.OutputNewRoomEvent, +) error { + ev := msg.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), "room_id": ev.RoomID(), }).Info("received event from roomserver") - addsStateEvents, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) + addsStateEvents, err := s.lookupStateEvents(msg.AddsStateEventIDs, ev) if err != nil { log.WithFields(log.Fields{ "event": string(ev.JSON()), log.ErrorKey: err, - "add": output.NewRoomEvent.AddsStateEventIDs, - "del": output.NewRoomEvent.RemovesStateEventIDs, + "add": msg.AddsStateEventIDs, + "del": msg.RemovesStateEventIDs, }).Panicf("roomserver output log: state event lookup failure") } @@ -122,20 +133,23 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { } syncStreamPos, err := s.db.WriteEvent( - context.TODO(), + ctx, &ev, addsStateEvents, - output.NewRoomEvent.AddsStateEventIDs, - output.NewRoomEvent.RemovesStateEventIDs, + msg.AddsStateEventIDs, + msg.RemovesStateEventIDs, ) + if err != nil { + return err + } if err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ "event": string(ev.JSON()), log.ErrorKey: err, - "add": output.NewRoomEvent.AddsStateEventIDs, - "del": output.NewRoomEvent.RemovesStateEventIDs, + "add": msg.AddsStateEventIDs, + "del": msg.RemovesStateEventIDs, }).Panicf("roomserver output log: write event failure") return nil } @@ -144,6 +158,39 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { return nil } +func (s *OutputRoomEvent) onNewInviteEvent( + ctx context.Context, msg api.OutputNewInviteEvent, +) error { + syncStreamPos, err := s.db.AddInviteEvent(ctx, msg.Event) + if err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event": string(msg.Event.JSON()), + log.ErrorKey: err, + }).Panicf("roomserver output log: write invite failure") + return nil + } + s.notifier.OnNewEvent(&msg.Event, "", syncStreamPos) + return nil +} + +func (s *OutputRoomEvent) onRetireInviteEvent( + ctx context.Context, msg api.OutputRetireInviteEvent, +) error { + err := s.db.RetireInviteEvent(ctx, msg.EventID) + if err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event_id": msg.EventID, + log.ErrorKey: err, + }).Panicf("roomserver output log: remove invite failure") + return nil + } + // TODO: Notify any active sync requests that the invite has been retired. + // s.notifier.OnNewEvent(nil, msg.TargetUserID, syncStreamPos) + return nil +} + // lookupStateEvents looks up the state events that are added by a new event. func (s *OutputRoomEvent) lookupStateEvents( addsStateEventIDs []string, event gomatrixserverlib.Event, diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index e5430d14d..4ea06808a 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -140,7 +140,10 @@ func (s *currentRoomStateStatements) selectJoinedUsers( // SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state. func (s *currentRoomStateStatements) selectRoomIDsWithMembership( - ctx context.Context, txn *sql.Tx, userID, membership string, + ctx context.Context, + txn *sql.Tx, + userID string, + membership string, // nolint: unparam ) ([]string, error) { stmt := common.TxStmt(txn, s.selectRoomIDsWithMembershipStmt) rows, err := stmt.QueryContext(ctx, userID, membership) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go new file mode 100644 index 000000000..88c98f7e3 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go @@ -0,0 +1,133 @@ +package storage + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrixserverlib" +) + +const inviteEventsSchema = ` +CREATE TABLE IF NOT EXISTS syncapi_invite_events ( + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + target_user_id TEXT NOT NULL, + event_json TEXT NOT NULL +); + +-- For looking up the invites for a given user. +CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx + ON syncapi_invite_events (target_user_id, id); + +-- For deleting old invites +CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx + ON syncapi_invite_events(target_user_id, id); +` + +const insertInviteEventSQL = "" + + "INSERT INTO syncapi_invite_events (" + + " room_id, event_id, target_user_id, event_json" + + ") VALUES ($1, $2, $3, $4) RETURNING id" + +const deleteInviteEventSQL = "" + + "DELETE FROM syncapi_invite_events WHERE event_id = $1" + +const selectInviteEventsInRangeSQL = "" + + "SELECT room_id, event_json FROM syncapi_invite_events" + + " WHERE target_user_id = $1 AND id > $2 AND id <= $3" + + " ORDER BY id DESC" + +const selectMaxInviteIDSQL = "" + + "SELECT MAX(id) FROM syncapi_invite_events" + +type inviteEventsStatements struct { + insertInviteEventStmt *sql.Stmt + selectInviteEventsInRangeStmt *sql.Stmt + deleteInviteEventStmt *sql.Stmt + selectMaxInviteIDStmt *sql.Stmt +} + +func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(inviteEventsSchema) + if err != nil { + return + } + if s.insertInviteEventStmt, err = db.Prepare(insertInviteEventSQL); err != nil { + return + } + if s.selectInviteEventsInRangeStmt, err = db.Prepare(selectInviteEventsInRangeSQL); err != nil { + return + } + if s.deleteInviteEventStmt, err = db.Prepare(deleteInviteEventSQL); err != nil { + return + } + if s.selectMaxInviteIDStmt, err = db.Prepare(selectMaxInviteIDSQL); err != nil { + return + } + return +} + +func (s *inviteEventsStatements) insertInviteEvent( + ctx context.Context, inviteEvent gomatrixserverlib.Event, +) (streamPos int64, err error) { + err = s.insertInviteEventStmt.QueryRowContext( + ctx, + inviteEvent.RoomID(), + inviteEvent.EventID(), + *inviteEvent.StateKey(), + inviteEvent.JSON(), + ).Scan(&streamPos) + return +} + +func (s *inviteEventsStatements) deleteInviteEvent( + ctx context.Context, inviteEventID string, +) error { + _, err := s.deleteInviteEventStmt.ExecContext(ctx, inviteEventID) + return err +} + +// selectInviteEventsInRange returns a map of room ID to invite event for the +// active invites for the target user ID in the supplied range. +func (s *inviteEventsStatements) selectInviteEventsInRange( + ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos int64, +) (map[string]gomatrixserverlib.Event, error) { + stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt) + rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + result := map[string]gomatrixserverlib.Event{} + for rows.Next() { + var ( + roomID string + eventJSON []byte + ) + if err = rows.Scan(&roomID, &eventJSON); err != nil { + return nil, err + } + + event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false) + if err != nil { + return nil, err + } + + result[roomID] = event + } + return result, nil +} + +func (s *inviteEventsStatements) selectMaxInviteID( + ctx context.Context, txn *sql.Tx, +) (id int64, err error) { + var nullableID sql.NullInt64 + stmt := common.TxStmt(txn, s.selectMaxInviteIDStmt) + err = stmt.QueryRowContext(ctx).Scan(&nullableID) + if nullableID.Valid { + id = nullableID.Int64 + } + return +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index e65639eb7..491cf3959 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -47,32 +47,32 @@ type SyncServerDatabase struct { accountData accountDataStatements events outputRoomEventsStatements roomstate currentRoomStateStatements + invites inviteEventsStatements } // NewSyncServerDatabase creates a new sync server database func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { - var db *sql.DB + var d SyncServerDatabase var err error - if db, err = sql.Open("postgres", dataSourceName); err != nil { + if d.db, err = sql.Open("postgres", dataSourceName); err != nil { return nil, err } - partitions := common.PartitionOffsetStatements{} - if err = partitions.Prepare(db, "syncapi"); err != nil { + if err = d.partitions.Prepare(d.db, "syncapi"); err != nil { return nil, err } - accountData := accountDataStatements{} - if err = accountData.prepare(db); err != nil { + if err = d.accountData.prepare(d.db); err != nil { return nil, err } - events := outputRoomEventsStatements{} - if err = events.prepare(db); err != nil { + if err = d.events.prepare(d.db); err != nil { return nil, err } - state := currentRoomStateStatements{} - if err := state.prepare(db); err != nil { + if err := d.roomstate.prepare(d.db); err != nil { return nil, err } - return &SyncServerDatabase{db, partitions, accountData, events, state}, nil + if err := d.invites.prepare(d.db); err != nil { + return nil, err + } + return &d, nil } // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. @@ -191,6 +191,13 @@ func (d *SyncServerDatabase) syncStreamPositionTx( if maxAccountDataID > maxID { maxID = maxAccountDataID } + maxInviteID, err := d.invites.selectMaxInviteID(ctx, txn) + if err != nil { + return 0, err + } + if maxInviteID > maxID { + maxID = maxInviteID + } return types.StreamPosition(maxID), nil } @@ -260,7 +267,7 @@ func (d *SyncServerDatabase) IncrementalSync( } // TODO: This should be done in getStateDeltas - if err = d.addInvitesToResponse(ctx, txn, userID, res); err != nil { + if err = d.addInvitesToResponse(ctx, txn, userID, fromPos, toPos, res); err != nil { return nil, err } @@ -322,7 +329,7 @@ func (d *SyncServerDatabase) CompleteSync( res.Rooms.Join[roomID] = *jr } - if err = d.addInvitesToResponse(ctx, txn, userID, res); err != nil { + if err = d.addInvitesToResponse(ctx, txn, userID, 0, pos, res); err != nil { return nil, err } @@ -364,16 +371,45 @@ func (d *SyncServerDatabase) UpsertAccountData( return types.StreamPosition(pos), err } +// AddInviteEvent stores a new invite event for a user. +// If the invite was successfully stored this returns the stream ID it was stored at. +// Returns an error if there was a problem communicating with the database. +func (d *SyncServerDatabase) AddInviteEvent( + ctx context.Context, inviteEvent gomatrixserverlib.Event, +) (types.StreamPosition, error) { + pos, err := d.invites.insertInviteEvent(ctx, inviteEvent) + return types.StreamPosition(pos), err +} + +// RetireInviteEvent removes an old invite event from the database. +// Returns an error if there was a problem communicating with the database. +func (d *SyncServerDatabase) RetireInviteEvent( + ctx context.Context, inviteEventID string, +) error { + // TODO: Record that invite has been retired in a stream so that we can + // notify the user in an incremental sync. + err := d.invites.deleteInviteEvent(ctx, inviteEventID) + return err +} + func (d *SyncServerDatabase) addInvitesToResponse( - ctx context.Context, txn *sql.Tx, userID string, res *types.Response) error { - // Add invites - TODO: This will break over federation as they won't be in the current state table according to Mark. - roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "invite") + ctx context.Context, txn *sql.Tx, + userID string, + fromPos, toPos types.StreamPosition, + res *types.Response, +) error { + invites, err := d.invites.selectInviteEventsInRange( + ctx, txn, userID, int64(fromPos), int64(toPos), + ) if err != nil { return err } - for _, roomID := range roomIDs { + for roomID, inviteEvent := range invites { ir := types.NewInviteResponse() - // TODO: invite_state. The state won't be in the current state table in cases where you get invited over federation + ir.InviteState.Events = gomatrixserverlib.ToClientEvents( + []gomatrixserverlib.Event{inviteEvent}, gomatrixserverlib.FormatSync, + ) + // TODO: add the invite state from the invite event. res.Rooms.Invite[roomID] = *ir } return nil