Update the invites table from the roomserver stream

This commit is contained in:
Mark Haines 2017-09-20 12:16:32 +01:00
parent ddd9841cb2
commit c370ae1217
6 changed files with 103 additions and 25 deletions

View file

@ -123,6 +123,8 @@ type OutputNewInviteEvent struct {
type OutputRetireInviteEvent struct { type OutputRetireInviteEvent struct {
// The ID of the "m.room.member" invite event. // The ID of the "m.room.member" invite event.
EventID string EventID string
// The target user ID of the "m.room.member" invite event that was retired.
TargetUserID string
// Optional event ID of the event that replaced the invite. // Optional event ID of the event that replaced the invite.
// This can be empty if the invite was rejected locally and we were unable // This can be empty if the invite was rejected locally and we were unable
// to reach the server that originally sent the invite. // to reach the server that originally sent the invite.

View file

@ -175,11 +175,10 @@ func updateToJoinMembership(
} }
for _, eventID := range retired { for _, eventID := range retired {
orie := api.OutputRetireInviteEvent{ orie := api.OutputRetireInviteEvent{
EventID: eventID, EventID: eventID,
Membership: "join", Membership: "join",
} RetiredByEventID: add.EventID(),
if add != nil { TargetUserID: *add.StateKey(),
orie.RetiredByEventID = add.EventID()
} }
updates = append(updates, api.OutputEvent{ updates = append(updates, api.OutputEvent{
Type: api.OutputTypeRetireInviteEvent, Type: api.OutputTypeRetireInviteEvent,
@ -208,11 +207,10 @@ func updateToLeaveMembership(
} }
for _, eventID := range retired { for _, eventID := range retired {
orie := api.OutputRetireInviteEvent{ orie := api.OutputRetireInviteEvent{
EventID: eventID, EventID: eventID,
Membership: newMembership, Membership: newMembership,
} RetiredByEventID: add.EventID(),
if add != nil { TargetUserID: *add.StateKey(),
orie.RetiredByEventID = add.EventID()
} }
updates = append(updates, api.OutputEvent{ updates = append(updates, api.OutputEvent{
Type: api.OutputTypeRetireInviteEvent, Type: api.OutputTypeRetireInviteEvent,

View file

@ -86,26 +86,37 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
if output.Type != api.OutputTypeNewRoomEvent { switch output.Type {
case api.OutputTypeNewRoomEvent:
return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
case api.OutputTypeNewInviteEvent:
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
case api.OutputTypeRetireInviteEvent:
return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent)
default:
log.WithField("type", output.Type).Debug( log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type", "roomserver output log: ignoring unknown output type",
) )
return nil return nil
} }
}
ev := output.NewRoomEvent.Event func (s *OutputRoomEvent) onNewRoomEvent(
ctx context.Context, msg api.OutputNewRoomEvent,
) error {
ev := msg.Event
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": ev.EventID(), "event_id": ev.EventID(),
"room_id": ev.RoomID(), "room_id": ev.RoomID(),
}).Info("received event from roomserver") }).Info("received event from roomserver")
addsStateEvents, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) addsStateEvents, err := s.lookupStateEvents(msg.AddsStateEventIDs, ev)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event": string(ev.JSON()), "event": string(ev.JSON()),
log.ErrorKey: err, log.ErrorKey: err,
"add": output.NewRoomEvent.AddsStateEventIDs, "add": msg.AddsStateEventIDs,
"del": output.NewRoomEvent.RemovesStateEventIDs, "del": msg.RemovesStateEventIDs,
}).Panicf("roomserver output log: state event lookup failure") }).Panicf("roomserver output log: state event lookup failure")
} }
@ -122,20 +133,23 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
} }
syncStreamPos, err := s.db.WriteEvent( syncStreamPos, err := s.db.WriteEvent(
context.TODO(), ctx,
&ev, &ev,
addsStateEvents, addsStateEvents,
output.NewRoomEvent.AddsStateEventIDs, msg.AddsStateEventIDs,
output.NewRoomEvent.RemovesStateEventIDs, msg.RemovesStateEventIDs,
) )
if err != nil {
return err
}
if err != nil { if err != nil {
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event": string(ev.JSON()), "event": string(ev.JSON()),
log.ErrorKey: err, log.ErrorKey: err,
"add": output.NewRoomEvent.AddsStateEventIDs, "add": msg.AddsStateEventIDs,
"del": output.NewRoomEvent.RemovesStateEventIDs, "del": msg.RemovesStateEventIDs,
}).Panicf("roomserver output log: write event failure") }).Panicf("roomserver output log: write event failure")
return nil return nil
} }
@ -144,6 +158,39 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
func (s *OutputRoomEvent) onNewInviteEvent(
ctx context.Context, msg api.OutputNewInviteEvent,
) error {
syncStreamPos, err := s.db.AddInviteEvent(ctx, msg.Event)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event": string(msg.Event.JSON()),
log.ErrorKey: err,
}).Panicf("roomserver output log: write invite failure")
return nil
}
s.notifier.OnNewEvent(&msg.Event, "", syncStreamPos)
return nil
}
func (s *OutputRoomEvent) onRetireInviteEvent(
ctx context.Context, msg api.OutputRetireInviteEvent,
) error {
err := s.db.RetireInviteEvent(ctx, msg.EventID)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event_id": msg.EventID,
log.ErrorKey: err,
}).Panicf("roomserver output log: remove invite failure")
return nil
}
// TODO: Notify any active sync requests that the invite has been retired.
// s.notifier.OnNewEvent(nil, msg.TargetUserID, syncStreamPos)
return nil
}
// lookupStateEvents looks up the state events that are added by a new event. // lookupStateEvents looks up the state events that are added by a new event.
func (s *OutputRoomEvent) lookupStateEvents( func (s *OutputRoomEvent) lookupStateEvents(
addsStateEventIDs []string, event gomatrixserverlib.Event, addsStateEventIDs []string, event gomatrixserverlib.Event,

View file

@ -29,6 +29,9 @@ const insertInviteEventSQL = "" +
" room_id, event_id, target_user_id, event_json" + " room_id, event_id, target_user_id, event_json" +
") VALUES ($1, $2, $3, $4) RETURNING id" ") VALUES ($1, $2, $3, $4) RETURNING id"
const deleteInviteEventSQL = "" +
"DELETE FROM syncapi_invite_events WHERE event_id = $1"
const selectInviteEventsInRangeSQL = "" + const selectInviteEventsInRangeSQL = "" +
"SELECT room_id, event_json FROM syncapi_invite_events" + "SELECT room_id, event_json FROM syncapi_invite_events" +
" WHERE target_user_id = $1 AND id > $2 AND id <= $3" + " WHERE target_user_id = $1 AND id > $2 AND id <= $3" +
@ -37,6 +40,7 @@ const selectInviteEventsInRangeSQL = "" +
type inviteEventsStatements struct { type inviteEventsStatements struct {
insertInviteEventStmt *sql.Stmt insertInviteEventStmt *sql.Stmt
selectInviteEventsInRangeStmt *sql.Stmt selectInviteEventsInRangeStmt *sql.Stmt
deleteInviteEventStmt *sql.Stmt
} }
func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) { func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) {
@ -50,6 +54,9 @@ func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) {
if s.selectInviteEventsInRangeStmt, err = db.Prepare(selectInviteEventsInRangeSQL); err != nil { if s.selectInviteEventsInRangeStmt, err = db.Prepare(selectInviteEventsInRangeSQL); err != nil {
return return
} }
if s.deleteInviteEventStmt, err = db.Prepare(deleteInviteEventSQL); err != nil {
return
}
return return
} }
@ -66,6 +73,13 @@ func (s *inviteEventsStatements) insertInviteEvent(
return return
} }
func (s *inviteEventsStatements) deleteInviteEvent(
ctx context.Context, inviteEventID string,
) error {
_, err := s.deleteInviteEventStmt.ExecContext(ctx, inviteEventID)
return err
}
// selectInviteEventsInRange returns a map of room ID to invite event for the // selectInviteEventsInRange returns a map of room ID to invite event for the
// active invites for the target user ID in the supplied range. // active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) selectInviteEventsInRange( func (s *inviteEventsStatements) selectInviteEventsInRange(

View file

@ -35,11 +35,7 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
-- An incrementing ID which denotes the position in the log that this event resides at. -- An incrementing ID which denotes the position in the log that this event resides at.
-- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments. -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments.
-- This isn't a problem for us since we just want to order by this field. -- This isn't a problem for us since we just want to order by this field.
<<<<<<< HEAD
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
=======
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_output_room_event_id_seq'),
>>>>>>> 960083b099c96b25b0c81ea602b372470e9cf889
-- The event ID for the event -- The event ID for the event
event_id TEXT NOT NULL, event_id TEXT NOT NULL,
-- The 'room_id' key for the event. -- The 'room_id' key for the event.

View file

@ -364,6 +364,27 @@ func (d *SyncServerDatabase) UpsertAccountData(
return types.StreamPosition(pos), err return types.StreamPosition(pos), err
} }
// AddInviteEvent stores a new invite event for a user.
// If the invite was successfully stored this returns the stream ID it was stored at.
// Returns an error if there was a problem communicating with the database.
func (d *SyncServerDatabase) AddInviteEvent(
ctx context.Context, inviteEvent gomatrixserverlib.Event,
) (types.StreamPosition, error) {
pos, err := d.invites.insertInviteEvent(ctx, inviteEvent)
return types.StreamPosition(pos), err
}
// RetireInviteEvent removes an old invite event from the database.
// Returns an error if there was a problem communicating with the database.
func (d *SyncServerDatabase) RetireInviteEvent(
ctx context.Context, inviteEventID string,
) error {
// TODO: Record that invite has been retired in a stream so that we can
// notify the user in an incremental sync.
err := d.invites.deleteInviteEvent(ctx, inviteEventID)
return err
}
func (d *SyncServerDatabase) addInvitesToResponse( func (d *SyncServerDatabase) addInvitesToResponse(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
userID string, userID string,
@ -381,7 +402,7 @@ func (d *SyncServerDatabase) addInvitesToResponse(
ir.InviteState.Events = gomatrixserverlib.ToClientEvents( ir.InviteState.Events = gomatrixserverlib.ToClientEvents(
[]gomatrixserverlib.Event{inviteEvent}, gomatrixserverlib.FormatSync, []gomatrixserverlib.Event{inviteEvent}, gomatrixserverlib.FormatSync,
) )
// TODO: invite_state. The state won't be in the current state table in cases where you get invited over federation // TODO: add the invite state from the invite event.
res.Rooms.Invite[roomID] = *ir res.Rooms.Invite[roomID] = *ir
} }
return nil return nil