From c370ae1217659e68621b7decc92cc8aef1d4f624 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 20 Sep 2017 12:16:32 +0100 Subject: [PATCH] Update the invites table from the roomserver stream --- .../dendrite/roomserver/api/output.go | 2 + .../dendrite/roomserver/input/membership.go | 18 +++-- .../dendrite/syncapi/consumers/roomserver.go | 67 ++++++++++++++++--- .../dendrite/syncapi/storage/invites_table.go | 14 ++++ .../storage/output_room_events_table.go | 4 -- .../dendrite/syncapi/storage/syncserver.go | 23 ++++++- 6 files changed, 103 insertions(+), 25 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/output.go b/src/github.com/matrix-org/dendrite/roomserver/api/output.go index 953fe3c8f..6a5c924c6 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/output.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/output.go @@ -123,6 +123,8 @@ type OutputNewInviteEvent struct { type OutputRetireInviteEvent struct { // The ID of the "m.room.member" invite event. EventID string + // The target user ID of the "m.room.member" invite event that was retired. + TargetUserID string // Optional event ID of the event that replaced the invite. // This can be empty if the invite was rejected locally and we were unable // to reach the server that originally sent the invite. diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/membership.go b/src/github.com/matrix-org/dendrite/roomserver/input/membership.go index f4d8e02ca..944a6cb1c 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/membership.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/membership.go @@ -175,11 +175,10 @@ func updateToJoinMembership( } for _, eventID := range retired { orie := api.OutputRetireInviteEvent{ - EventID: eventID, - Membership: "join", - } - if add != nil { - orie.RetiredByEventID = add.EventID() + EventID: eventID, + Membership: "join", + RetiredByEventID: add.EventID(), + TargetUserID: *add.StateKey(), } updates = append(updates, api.OutputEvent{ Type: api.OutputTypeRetireInviteEvent, @@ -208,11 +207,10 @@ func updateToLeaveMembership( } for _, eventID := range retired { orie := api.OutputRetireInviteEvent{ - EventID: eventID, - Membership: newMembership, - } - if add != nil { - orie.RetiredByEventID = add.EventID() + EventID: eventID, + Membership: newMembership, + RetiredByEventID: add.EventID(), + TargetUserID: *add.StateKey(), } updates = append(updates, api.OutputEvent{ Type: api.OutputTypeRetireInviteEvent, diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index fc547e92b..911ccd0d9 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -86,26 +86,37 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { return nil } - if output.Type != api.OutputTypeNewRoomEvent { + switch output.Type { + case api.OutputTypeNewRoomEvent: + return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) + case api.OutputTypeNewInviteEvent: + return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) + case api.OutputTypeRetireInviteEvent: + return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent) + default: log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", ) return nil } +} - ev := output.NewRoomEvent.Event +func (s *OutputRoomEvent) onNewRoomEvent( + ctx context.Context, msg api.OutputNewRoomEvent, +) error { + ev := msg.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), "room_id": ev.RoomID(), }).Info("received event from roomserver") - addsStateEvents, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) + addsStateEvents, err := s.lookupStateEvents(msg.AddsStateEventIDs, ev) if err != nil { log.WithFields(log.Fields{ "event": string(ev.JSON()), log.ErrorKey: err, - "add": output.NewRoomEvent.AddsStateEventIDs, - "del": output.NewRoomEvent.RemovesStateEventIDs, + "add": msg.AddsStateEventIDs, + "del": msg.RemovesStateEventIDs, }).Panicf("roomserver output log: state event lookup failure") } @@ -122,20 +133,23 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { } syncStreamPos, err := s.db.WriteEvent( - context.TODO(), + ctx, &ev, addsStateEvents, - output.NewRoomEvent.AddsStateEventIDs, - output.NewRoomEvent.RemovesStateEventIDs, + msg.AddsStateEventIDs, + msg.RemovesStateEventIDs, ) + if err != nil { + return err + } if err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ "event": string(ev.JSON()), log.ErrorKey: err, - "add": output.NewRoomEvent.AddsStateEventIDs, - "del": output.NewRoomEvent.RemovesStateEventIDs, + "add": msg.AddsStateEventIDs, + "del": msg.RemovesStateEventIDs, }).Panicf("roomserver output log: write event failure") return nil } @@ -144,6 +158,39 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { return nil } +func (s *OutputRoomEvent) onNewInviteEvent( + ctx context.Context, msg api.OutputNewInviteEvent, +) error { + syncStreamPos, err := s.db.AddInviteEvent(ctx, msg.Event) + if err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event": string(msg.Event.JSON()), + log.ErrorKey: err, + }).Panicf("roomserver output log: write invite failure") + return nil + } + s.notifier.OnNewEvent(&msg.Event, "", syncStreamPos) + return nil +} + +func (s *OutputRoomEvent) onRetireInviteEvent( + ctx context.Context, msg api.OutputRetireInviteEvent, +) error { + err := s.db.RetireInviteEvent(ctx, msg.EventID) + if err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event_id": msg.EventID, + log.ErrorKey: err, + }).Panicf("roomserver output log: remove invite failure") + return nil + } + // TODO: Notify any active sync requests that the invite has been retired. + // s.notifier.OnNewEvent(nil, msg.TargetUserID, syncStreamPos) + return nil +} + // lookupStateEvents looks up the state events that are added by a new event. func (s *OutputRoomEvent) lookupStateEvents( addsStateEventIDs []string, event gomatrixserverlib.Event, diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go index 48b9e4aba..68ca4168c 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go @@ -29,6 +29,9 @@ const insertInviteEventSQL = "" + " room_id, event_id, target_user_id, event_json" + ") VALUES ($1, $2, $3, $4) RETURNING id" +const deleteInviteEventSQL = "" + + "DELETE FROM syncapi_invite_events WHERE event_id = $1" + const selectInviteEventsInRangeSQL = "" + "SELECT room_id, event_json FROM syncapi_invite_events" + " WHERE target_user_id = $1 AND id > $2 AND id <= $3" + @@ -37,6 +40,7 @@ const selectInviteEventsInRangeSQL = "" + type inviteEventsStatements struct { insertInviteEventStmt *sql.Stmt selectInviteEventsInRangeStmt *sql.Stmt + deleteInviteEventStmt *sql.Stmt } func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) { @@ -50,6 +54,9 @@ func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) { if s.selectInviteEventsInRangeStmt, err = db.Prepare(selectInviteEventsInRangeSQL); err != nil { return } + if s.deleteInviteEventStmt, err = db.Prepare(deleteInviteEventSQL); err != nil { + return + } return } @@ -66,6 +73,13 @@ func (s *inviteEventsStatements) insertInviteEvent( return } +func (s *inviteEventsStatements) deleteInviteEvent( + ctx context.Context, inviteEventID string, +) error { + _, err := s.deleteInviteEventStmt.ExecContext(ctx, inviteEventID) + return err +} + // selectInviteEventsInRange returns a map of room ID to invite event for the // active invites for the target user ID in the supplied range. func (s *inviteEventsStatements) selectInviteEventsInRange( diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index 3ba9d3b48..7a55d3ee5 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -35,11 +35,7 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( -- An incrementing ID which denotes the position in the log that this event resides at. -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments. -- This isn't a problem for us since we just want to order by this field. -<<<<<<< HEAD id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), -======= - id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_output_room_event_id_seq'), ->>>>>>> 960083b099c96b25b0c81ea602b372470e9cf889 -- The event ID for the event event_id TEXT NOT NULL, -- The 'room_id' key for the event. diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index f8dca4d94..4b7d7e4c8 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -364,6 +364,27 @@ func (d *SyncServerDatabase) UpsertAccountData( return types.StreamPosition(pos), err } +// AddInviteEvent stores a new invite event for a user. +// If the invite was successfully stored this returns the stream ID it was stored at. +// Returns an error if there was a problem communicating with the database. +func (d *SyncServerDatabase) AddInviteEvent( + ctx context.Context, inviteEvent gomatrixserverlib.Event, +) (types.StreamPosition, error) { + pos, err := d.invites.insertInviteEvent(ctx, inviteEvent) + return types.StreamPosition(pos), err +} + +// RetireInviteEvent removes an old invite event from the database. +// Returns an error if there was a problem communicating with the database. +func (d *SyncServerDatabase) RetireInviteEvent( + ctx context.Context, inviteEventID string, +) error { + // TODO: Record that invite has been retired in a stream so that we can + // notify the user in an incremental sync. + err := d.invites.deleteInviteEvent(ctx, inviteEventID) + return err +} + func (d *SyncServerDatabase) addInvitesToResponse( ctx context.Context, txn *sql.Tx, userID string, @@ -381,7 +402,7 @@ func (d *SyncServerDatabase) addInvitesToResponse( ir.InviteState.Events = gomatrixserverlib.ToClientEvents( []gomatrixserverlib.Event{inviteEvent}, gomatrixserverlib.FormatSync, ) - // TODO: invite_state. The state won't be in the current state table in cases where you get invited over federation + // TODO: add the invite state from the invite event. res.Rooms.Invite[roomID] = *ir } return nil