From dab501854a3167c7d7afeb289c8af43346421625 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 24 Jul 2017 13:41:20 +0100 Subject: [PATCH] Write membership updates --- .../dendrite/roomserver/api/output.go | 18 +- .../dendrite/roomserver/input/events.go | 4 +- .../dendrite/roomserver/input/input.go | 29 ++-- .../roomserver/input/latest_events.go | 154 +++++++++++------- .../dendrite/roomserver/input/membership.go | 100 +++++++++--- 5 files changed, 201 insertions(+), 104 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/output.go b/src/github.com/matrix-org/dendrite/roomserver/api/output.go index f84ed7766..953fe3c8f 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/output.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/output.go @@ -21,8 +21,14 @@ import ( // An OutputType is a type of roomserver output. type OutputType string -// OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent -const OutputTypeNewRoomEvent OutputType = "new_room_event" +const ( + // OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent + OutputTypeNewRoomEvent OutputType = "new_room_event" + // OutputTypeNewInviteEvent indicates that the event is an OutputNewInviteEvent + OutputTypeNewInviteEvent OutputType = "new_invite_event" + // OutputTypeRetireInviteEvent indicates that the event is an OutputRetireInviteEvent + OutputTypeRetireInviteEvent OutputType = "retire_invite_event" +) // An OutputEvent is an entry in the roomserver output kafka log. // Consumers should check the type field when consuming this event. @@ -31,6 +37,10 @@ type OutputEvent struct { Type OutputType `json:"type"` // The content of event with type OutputTypeNewRoomEvent NewRoomEvent *OutputNewRoomEvent `json:"new_room_event,omitempty"` + // The content of event with type OutputTypeNewInviteEvent + NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"` + // The content of event with type OutputTypeRetireInviteEvent + RetireInviteEvent *OutputRetireInviteEvent `json:"retire_invite_event,omitempty"` } // An OutputNewRoomEvent is written when the roomserver receives a new event. @@ -111,14 +121,12 @@ type OutputNewInviteEvent struct { // active. An invite stops being active if the user joins the room or if the // invite is rejected by the user. type OutputRetireInviteEvent struct { - // The room ID of the "m.room.member" invite event. - RoomID string // The ID of the "m.room.member" invite event. EventID string // Optional event ID of the event that replaced the invite. // This can be empty if the invite was rejected locally and we were unable // to reach the server that originally sent the invite. - ReplacedByEventID string + RetiredByEventID string // The "membership" of the user after retiring the invite. One of "join" // "leave" or "ban". Membership string diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index f8acff476..c1eee4c96 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -43,8 +43,8 @@ type RoomEventDatabase interface { // OutputRoomEventWriter has the APIs needed to write an event to the output logs. type OutputRoomEventWriter interface { - // Write an event. - WriteOutputRoomEvent(output api.OutputNewRoomEvent) error + // Write a list of events for a room + WriteOutputEvents(roomID string, updates []api.OutputEvent) error } func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent) error { diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/input.go b/src/github.com/matrix-org/dendrite/roomserver/input/input.go index ffbebd0c7..c8ac58d3a 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/input.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/input.go @@ -46,22 +46,21 @@ type RoomserverInputAPI struct { processed int64 } -// WriteOutputRoomEvent implements OutputRoomEventWriter -func (r *RoomserverInputAPI) WriteOutputRoomEvent(output api.OutputNewRoomEvent) error { - var m sarama.ProducerMessage - oe := api.OutputEvent{ - Type: api.OutputTypeNewRoomEvent, - NewRoomEvent: &output, +// WriteOutputEvents implements OutputRoomEventWriter +func (r *RoomserverInputAPI) WriteOutputEvents(roomID string, updates []api.OutputEvent) error { + messages := make([]*sarama.ProducerMessage, len(updates)) + for i := range updates { + value, err := json.Marshal(updates[i]) + if err != nil { + return err + } + messages[i] = &sarama.ProducerMessage{ + Topic: r.OutputRoomEventTopic, + Key: sarama.StringEncoder(roomID), + Value: sarama.ByteEncoder(value), + } } - value, err := json.Marshal(oe) - if err != nil { - return err - } - m.Topic = r.OutputRoomEventTopic - m.Key = sarama.StringEncoder("") - m.Value = sarama.ByteEncoder(value) - _, _, err = r.Producer.SendMessage(&m) - return err + return r.Producer.SendMessages(messages) } // InputRoomEvents implements api.RoomserverInputAPI diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go index 6b5f39679..c1e90cf62 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go @@ -66,69 +66,76 @@ func updateLatestEvents( } }() - err = doUpdateLatestEvents(db, updater, ow, roomNID, stateAtEvent, event, sendAsServer) - return + u := latestEventsUpdater{ + db: db, updater: updater, ow: ow, roomNID: roomNID, + stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer, + } + return u.doUpdateLatestEvents() } -func doUpdateLatestEvents( - db RoomEventDatabase, - updater types.RoomRecentEventsUpdater, - ow OutputRoomEventWriter, - roomNID types.RoomNID, - stateAtEvent types.StateAtEvent, - event gomatrixserverlib.Event, - sendAsServer string, -) error { +type latestEventsUpdater struct { + db RoomEventDatabase + updater types.RoomRecentEventsUpdater + ow OutputRoomEventWriter + roomNID types.RoomNID + stateAtEvent types.StateAtEvent + event gomatrixserverlib.Event + sendAsServer string + lastEventIDSent string + latest []types.StateAtEventAndReference + removed []types.StateEntry + added []types.StateEntry + stateBeforeEventRemoves []types.StateEntry + stateBeforeEventAdds []types.StateEntry + oldStateNID types.StateSnapshotNID + newStateNID types.StateSnapshotNID +} + +func (u *latestEventsUpdater) doUpdateLatestEvents() error { var err error var prevEvents []gomatrixserverlib.EventReference - prevEvents = event.PrevEvents() - oldLatest := updater.LatestEvents() - lastEventIDSent := updater.LastEventIDSent() - oldStateNID := updater.CurrentStateSnapshotNID() + prevEvents = u.event.PrevEvents() + oldLatest := u.updater.LatestEvents() + u.lastEventIDSent = u.updater.LastEventIDSent() + u.oldStateNID = u.updater.CurrentStateSnapshotNID() - if hasBeenSent, err := updater.HasEventBeenSent(stateAtEvent.EventNID); err != nil { + if hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID); err != nil { return err } else if hasBeenSent { // Already sent this event so we can stop processing return nil } - if err = updater.StorePreviousEvents(stateAtEvent.EventNID, prevEvents); err != nil { + if err = u.updater.StorePreviousEvents(u.stateAtEvent.EventNID, prevEvents); err != nil { return err } - eventReference := event.EventReference() + eventReference := u.event.EventReference() // Check if this event is already referenced by another event in the room. var alreadyReferenced bool - if alreadyReferenced, err = updater.IsReferenced(eventReference); err != nil { + if alreadyReferenced, err = u.updater.IsReferenced(eventReference); err != nil { return err } - newLatest := calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{ + u.latest = calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{ EventReference: eventReference, - StateAtEvent: stateAtEvent, + StateAtEvent: u.stateAtEvent, }) - latestStateAtEvents := make([]types.StateAtEvent, len(newLatest)) - for i := range newLatest { - latestStateAtEvents[i] = newLatest[i].StateAtEvent + if err = u.latestState(); err != nil { + return err } - newStateNID, err := state.CalculateAndStoreStateAfterEvents(db, roomNID, latestStateAtEvents) + + updates, err := updateMemberships(u.db, u.updater, u.removed, u.added) if err != nil { return err } - removed, added, err := state.DifferenceBetweeenStateSnapshots(db, oldStateNID, newStateNID) - if err != nil { - return err - } - - stateBeforeEventRemoves, stateBeforeEventAdds, err := state.DifferenceBetweeenStateSnapshots( - db, newStateNID, stateAtEvent.BeforeStateSnapshotNID, - ) + update, err := u.makeOutputNewRoomEvent() if err != nil { return err } + updates = append(updates, *update) // Send the event to the output logs. // We do this inside the database transaction to ensure that we only mark an event as sent if we sent it. @@ -138,24 +145,47 @@ func doUpdateLatestEvents( // send the event asynchronously but we would need to ensure that 1) the events are written to the log in // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the // necessary bookkeeping we'll keep the event sending synchronous for now. - if err = writeEvent( - db, ow, lastEventIDSent, event, newLatest, removed, added, - stateBeforeEventRemoves, stateBeforeEventAdds, sendAsServer, - ); err != nil { + if err = u.ow.WriteOutputEvents(u.event.RoomID(), updates); err != nil { return err } - if err = updater.SetLatestEvents(roomNID, newLatest, stateAtEvent.EventNID, newStateNID); err != nil { + if err = u.updater.SetLatestEvents(u.roomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil { return err } - if err = updater.MarkEventAsSent(stateAtEvent.EventNID); err != nil { + if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil { return err } return nil } +func (u *latestEventsUpdater) latestState() error { + var err error + + latestStateAtEvents := make([]types.StateAtEvent, len(u.latest)) + for i := range u.latest { + latestStateAtEvents[i] = u.latest[i].StateAtEvent + } + u.newStateNID, err = state.CalculateAndStoreStateAfterEvents(u.db, u.roomNID, latestStateAtEvents) + if err != nil { + return err + } + + u.removed, u.added, err = state.DifferenceBetweeenStateSnapshots(u.db, u.oldStateNID, u.newStateNID) + if err != nil { + return err + } + + u.stateBeforeEventRemoves, u.stateBeforeEventAdds, err = state.DifferenceBetweeenStateSnapshots( + u.db, u.newStateNID, u.stateAtEvent.BeforeStateSnapshotNID, + ) + if err != nil { + return err + } + return nil +} + func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenced bool, prevEvents []gomatrixserverlib.EventReference, newEvent types.StateAtEventAndReference) []types.StateAtEventAndReference { var alreadyInLatest bool var newLatest []types.StateAtEventAndReference @@ -189,57 +219,55 @@ func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenc return newLatest } -func writeEvent( - db RoomEventDatabase, ow OutputRoomEventWriter, lastEventIDSent string, - event gomatrixserverlib.Event, latest []types.StateAtEventAndReference, - removed, added []types.StateEntry, - stateBeforeEventRemoves, stateBeforeEventAdds []types.StateEntry, - sendAsServer string, -) error { +func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) { - latestEventIDs := make([]string, len(latest)) - for i := range latest { - latestEventIDs[i] = latest[i].EventID + latestEventIDs := make([]string, len(u.latest)) + for i := range u.latest { + latestEventIDs[i] = u.latest[i].EventID } ore := api.OutputNewRoomEvent{ - Event: event, - LastSentEventID: lastEventIDSent, + Event: u.event, + LastSentEventID: u.lastEventIDSent, LatestEventIDs: latestEventIDs, } var stateEventNIDs []types.EventNID - for _, entry := range added { + for _, entry := range u.added { stateEventNIDs = append(stateEventNIDs, entry.EventNID) } - for _, entry := range removed { + for _, entry := range u.removed { stateEventNIDs = append(stateEventNIDs, entry.EventNID) } - for _, entry := range stateBeforeEventRemoves { + for _, entry := range u.stateBeforeEventRemoves { stateEventNIDs = append(stateEventNIDs, entry.EventNID) } - for _, entry := range stateBeforeEventAdds { + for _, entry := range u.stateBeforeEventAdds { stateEventNIDs = append(stateEventNIDs, entry.EventNID) } stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))] - eventIDMap, err := db.EventIDs(stateEventNIDs) + eventIDMap, err := u.db.EventIDs(stateEventNIDs) if err != nil { - return err + return nil, err } - for _, entry := range added { + for _, entry := range u.added { ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID]) } - for _, entry := range removed { + for _, entry := range u.removed { ore.RemovesStateEventIDs = append(ore.RemovesStateEventIDs, eventIDMap[entry.EventNID]) } - for _, entry := range stateBeforeEventRemoves { + for _, entry := range u.stateBeforeEventRemoves { ore.StateBeforeRemovesEventIDs = append(ore.StateBeforeRemovesEventIDs, eventIDMap[entry.EventNID]) } - for _, entry := range stateBeforeEventAdds { + for _, entry := range u.stateBeforeEventAdds { ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID]) } - ore.SendAsServer = sendAsServer - return ow.WriteOutputRoomEvent(ore) + ore.SendAsServer = u.sendAsServer + + return &api.OutputEvent{ + Type: api.OutputTypeNewRoomEvent, + NewRoomEvent: &ore, + }, nil } type eventNIDSorter []types.EventNID diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/membership.go b/src/github.com/matrix-org/dendrite/roomserver/input/membership.go index 575230d9c..35c39e9d2 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/membership.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/membership.go @@ -15,13 +15,16 @@ package input import ( + "fmt" + + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" ) func updateMemberships( db RoomEventDatabase, updater types.RoomRecentEventsUpdater, removed, added []types.StateEntry, -) error { +) ([]api.OutputEvent, error) { changes := membershipChanges(removed, added) var eventNIDs []types.EventNID for _, change := range changes { @@ -34,9 +37,11 @@ func updateMemberships( } events, err := db.Events(eventNIDs) if err != nil { - return err + return nil, err } + var updates []api.OutputEvent + for _, change := range changes { var ae *gomatrixserverlib.Event var re *gomatrixserverlib.Event @@ -55,16 +60,18 @@ func updateMemberships( } targetNID = change.added.EventStateKeyNID } - if err := updateMembership(updater, targetNID, re, ae); err != nil { - return err + if updates, err = updateMembership(updater, targetNID, re, ae, updates); err != nil { + return nil, err } } - return nil + return nil, nil } func updateMembership( - updater types.RoomRecentEventsUpdater, targetNID types.EventStateKeyNID, remove *gomatrixserverlib.Event, add *gomatrixserverlib.Event, -) error { + updater types.RoomRecentEventsUpdater, targetNID types.EventStateKeyNID, + remove *gomatrixserverlib.Event, add *gomatrixserverlib.Event, + updates []api.OutputEvent, +) ([]api.OutputEvent, error) { var err error old := "leave" new := "leave" @@ -72,40 +79,95 @@ func updateMembership( if remove != nil { old, err = remove.Membership() if err != nil { - return err + return nil, err } } if add != nil { new, err = add.Membership() if err != nil { - return err + return nil, err } } if old == new { - return nil + return updates, nil } mu, err := updater.MembershipUpdater(targetNID) if err != nil { - return err + return nil, err } - switch new { + return updateWithNewMembership(mu, new, add, updates) +} + +func updateWithNewMembership( + mu types.MembershipUpdater, newMembership string, add *gomatrixserverlib.Event, + updates []api.OutputEvent, +) ([]api.OutputEvent, error) { + switch newMembership { case "invite": - _, err := mu.SetToInvite(*add) + needsSending, err := mu.SetToInvite(*add) if err != nil { - return err + return nil, err } + if needsSending { + updates = appendInviteUpdate(updates, add) + } + return updates, nil case "join": - if !mu.IsJoin() { - mu.SetToJoin(add.Sender()) + if mu.IsJoin() { + return updates, nil } - case "leave": - if !mu.IsLeave() { + retired, err := mu.SetToJoin(add.Sender()) + if err != nil { + return nil, err + } + return appendRetireUpdates(updates, add, newMembership, retired), nil + case "leave", "ban": + if mu.IsLeave() { + return updates, nil + } + retired, err := mu.SetToLeave(add.Sender()) + if err != nil { mu.SetToLeave(add.Sender()) } + return appendRetireUpdates(updates, add, newMembership, retired), nil + default: + panic(fmt.Errorf( + "input: membership %q is not one of the allowed values", newMembership, + )) } - return nil +} + +func appendInviteUpdate( + updates []api.OutputEvent, add *gomatrixserverlib.Event, +) []api.OutputEvent { + onie := api.OutputNewInviteEvent{ + Event: *add, + } + return append(updates, api.OutputEvent{ + Type: api.OutputTypeNewInviteEvent, + NewInviteEvent: &onie, + }) +} + +func appendRetireUpdates( + updates []api.OutputEvent, add *gomatrixserverlib.Event, membership string, retired []string, +) []api.OutputEvent { + for _, eventID := range retired { + orie := api.OutputRetireInviteEvent{ + EventID: eventID, + Membership: membership, + } + if add != nil { + orie.RetiredByEventID = add.EventID() + } + updates = append(updates, api.OutputEvent{ + Type: api.OutputTypeRetireInviteEvent, + RetireInviteEvent: &orie, + }) + } + return updates } type stateChange struct {