Write membership updates

This commit is contained in:
Mark Haines 2017-07-24 13:41:20 +01:00
parent 21e47b8fd2
commit dab501854a
5 changed files with 201 additions and 104 deletions

View file

@ -21,8 +21,14 @@ import (
// An OutputType is a type of roomserver output.
type OutputType string
// OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent
const OutputTypeNewRoomEvent OutputType = "new_room_event"
const (
// OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent
OutputTypeNewRoomEvent OutputType = "new_room_event"
// OutputTypeNewInviteEvent indicates that the event is an OutputNewInviteEvent
OutputTypeNewInviteEvent OutputType = "new_invite_event"
// OutputTypeRetireInviteEvent indicates that the event is an OutputRetireInviteEvent
OutputTypeRetireInviteEvent OutputType = "retire_invite_event"
)
// An OutputEvent is an entry in the roomserver output kafka log.
// Consumers should check the type field when consuming this event.
@ -31,6 +37,10 @@ type OutputEvent struct {
Type OutputType `json:"type"`
// The content of event with type OutputTypeNewRoomEvent
NewRoomEvent *OutputNewRoomEvent `json:"new_room_event,omitempty"`
// The content of event with type OutputTypeNewInviteEvent
NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"`
// The content of event with type OutputTypeRetireInviteEvent
RetireInviteEvent *OutputRetireInviteEvent `json:"retire_invite_event,omitempty"`
}
// An OutputNewRoomEvent is written when the roomserver receives a new event.
@ -111,14 +121,12 @@ type OutputNewInviteEvent struct {
// active. An invite stops being active if the user joins the room or if the
// invite is rejected by the user.
type OutputRetireInviteEvent struct {
// The room ID of the "m.room.member" invite event.
RoomID string
// The ID of the "m.room.member" invite event.
EventID string
// Optional event ID of the event that replaced the invite.
// This can be empty if the invite was rejected locally and we were unable
// to reach the server that originally sent the invite.
ReplacedByEventID string
RetiredByEventID string
// The "membership" of the user after retiring the invite. One of "join"
// "leave" or "ban".
Membership string

View file

@ -43,8 +43,8 @@ type RoomEventDatabase interface {
// OutputRoomEventWriter has the APIs needed to write an event to the output logs.
type OutputRoomEventWriter interface {
// Write an event.
WriteOutputRoomEvent(output api.OutputNewRoomEvent) error
// Write a list of events for a room
WriteOutputEvents(roomID string, updates []api.OutputEvent) error
}
func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent) error {

View file

@ -46,22 +46,21 @@ type RoomserverInputAPI struct {
processed int64
}
// WriteOutputRoomEvent implements OutputRoomEventWriter
func (r *RoomserverInputAPI) WriteOutputRoomEvent(output api.OutputNewRoomEvent) error {
var m sarama.ProducerMessage
oe := api.OutputEvent{
Type: api.OutputTypeNewRoomEvent,
NewRoomEvent: &output,
}
value, err := json.Marshal(oe)
// WriteOutputEvents implements OutputRoomEventWriter
func (r *RoomserverInputAPI) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
messages := make([]*sarama.ProducerMessage, len(updates))
for i := range updates {
value, err := json.Marshal(updates[i])
if err != nil {
return err
}
m.Topic = r.OutputRoomEventTopic
m.Key = sarama.StringEncoder("")
m.Value = sarama.ByteEncoder(value)
_, _, err = r.Producer.SendMessage(&m)
return err
messages[i] = &sarama.ProducerMessage{
Topic: r.OutputRoomEventTopic,
Key: sarama.StringEncoder(roomID),
Value: sarama.ByteEncoder(value),
}
}
return r.Producer.SendMessages(messages)
}
// InputRoomEvents implements api.RoomserverInputAPI

View file

@ -66,69 +66,76 @@ func updateLatestEvents(
}
}()
err = doUpdateLatestEvents(db, updater, ow, roomNID, stateAtEvent, event, sendAsServer)
return
u := latestEventsUpdater{
db: db, updater: updater, ow: ow, roomNID: roomNID,
stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer,
}
return u.doUpdateLatestEvents()
}
func doUpdateLatestEvents(
db RoomEventDatabase,
updater types.RoomRecentEventsUpdater,
ow OutputRoomEventWriter,
roomNID types.RoomNID,
stateAtEvent types.StateAtEvent,
event gomatrixserverlib.Event,
sendAsServer string,
) error {
type latestEventsUpdater struct {
db RoomEventDatabase
updater types.RoomRecentEventsUpdater
ow OutputRoomEventWriter
roomNID types.RoomNID
stateAtEvent types.StateAtEvent
event gomatrixserverlib.Event
sendAsServer string
lastEventIDSent string
latest []types.StateAtEventAndReference
removed []types.StateEntry
added []types.StateEntry
stateBeforeEventRemoves []types.StateEntry
stateBeforeEventAdds []types.StateEntry
oldStateNID types.StateSnapshotNID
newStateNID types.StateSnapshotNID
}
func (u *latestEventsUpdater) doUpdateLatestEvents() error {
var err error
var prevEvents []gomatrixserverlib.EventReference
prevEvents = event.PrevEvents()
oldLatest := updater.LatestEvents()
lastEventIDSent := updater.LastEventIDSent()
oldStateNID := updater.CurrentStateSnapshotNID()
prevEvents = u.event.PrevEvents()
oldLatest := u.updater.LatestEvents()
u.lastEventIDSent = u.updater.LastEventIDSent()
u.oldStateNID = u.updater.CurrentStateSnapshotNID()
if hasBeenSent, err := updater.HasEventBeenSent(stateAtEvent.EventNID); err != nil {
if hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID); err != nil {
return err
} else if hasBeenSent {
// Already sent this event so we can stop processing
return nil
}
if err = updater.StorePreviousEvents(stateAtEvent.EventNID, prevEvents); err != nil {
if err = u.updater.StorePreviousEvents(u.stateAtEvent.EventNID, prevEvents); err != nil {
return err
}
eventReference := event.EventReference()
eventReference := u.event.EventReference()
// Check if this event is already referenced by another event in the room.
var alreadyReferenced bool
if alreadyReferenced, err = updater.IsReferenced(eventReference); err != nil {
if alreadyReferenced, err = u.updater.IsReferenced(eventReference); err != nil {
return err
}
newLatest := calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{
u.latest = calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{
EventReference: eventReference,
StateAtEvent: stateAtEvent,
StateAtEvent: u.stateAtEvent,
})
latestStateAtEvents := make([]types.StateAtEvent, len(newLatest))
for i := range newLatest {
latestStateAtEvents[i] = newLatest[i].StateAtEvent
if err = u.latestState(); err != nil {
return err
}
newStateNID, err := state.CalculateAndStoreStateAfterEvents(db, roomNID, latestStateAtEvents)
updates, err := updateMemberships(u.db, u.updater, u.removed, u.added)
if err != nil {
return err
}
removed, added, err := state.DifferenceBetweeenStateSnapshots(db, oldStateNID, newStateNID)
if err != nil {
return err
}
stateBeforeEventRemoves, stateBeforeEventAdds, err := state.DifferenceBetweeenStateSnapshots(
db, newStateNID, stateAtEvent.BeforeStateSnapshotNID,
)
update, err := u.makeOutputNewRoomEvent()
if err != nil {
return err
}
updates = append(updates, *update)
// Send the event to the output logs.
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
@ -138,24 +145,47 @@ func doUpdateLatestEvents(
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
// necessary bookkeeping we'll keep the event sending synchronous for now.
if err = writeEvent(
db, ow, lastEventIDSent, event, newLatest, removed, added,
stateBeforeEventRemoves, stateBeforeEventAdds, sendAsServer,
); err != nil {
if err = u.ow.WriteOutputEvents(u.event.RoomID(), updates); err != nil {
return err
}
if err = updater.SetLatestEvents(roomNID, newLatest, stateAtEvent.EventNID, newStateNID); err != nil {
if err = u.updater.SetLatestEvents(u.roomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil {
return err
}
if err = updater.MarkEventAsSent(stateAtEvent.EventNID); err != nil {
if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil {
return err
}
return nil
}
func (u *latestEventsUpdater) latestState() error {
var err error
latestStateAtEvents := make([]types.StateAtEvent, len(u.latest))
for i := range u.latest {
latestStateAtEvents[i] = u.latest[i].StateAtEvent
}
u.newStateNID, err = state.CalculateAndStoreStateAfterEvents(u.db, u.roomNID, latestStateAtEvents)
if err != nil {
return err
}
u.removed, u.added, err = state.DifferenceBetweeenStateSnapshots(u.db, u.oldStateNID, u.newStateNID)
if err != nil {
return err
}
u.stateBeforeEventRemoves, u.stateBeforeEventAdds, err = state.DifferenceBetweeenStateSnapshots(
u.db, u.newStateNID, u.stateAtEvent.BeforeStateSnapshotNID,
)
if err != nil {
return err
}
return nil
}
func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenced bool, prevEvents []gomatrixserverlib.EventReference, newEvent types.StateAtEventAndReference) []types.StateAtEventAndReference {
var alreadyInLatest bool
var newLatest []types.StateAtEventAndReference
@ -189,57 +219,55 @@ func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenc
return newLatest
}
func writeEvent(
db RoomEventDatabase, ow OutputRoomEventWriter, lastEventIDSent string,
event gomatrixserverlib.Event, latest []types.StateAtEventAndReference,
removed, added []types.StateEntry,
stateBeforeEventRemoves, stateBeforeEventAdds []types.StateEntry,
sendAsServer string,
) error {
func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) {
latestEventIDs := make([]string, len(latest))
for i := range latest {
latestEventIDs[i] = latest[i].EventID
latestEventIDs := make([]string, len(u.latest))
for i := range u.latest {
latestEventIDs[i] = u.latest[i].EventID
}
ore := api.OutputNewRoomEvent{
Event: event,
LastSentEventID: lastEventIDSent,
Event: u.event,
LastSentEventID: u.lastEventIDSent,
LatestEventIDs: latestEventIDs,
}
var stateEventNIDs []types.EventNID
for _, entry := range added {
for _, entry := range u.added {
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
for _, entry := range removed {
for _, entry := range u.removed {
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
for _, entry := range stateBeforeEventRemoves {
for _, entry := range u.stateBeforeEventRemoves {
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
for _, entry := range stateBeforeEventAdds {
for _, entry := range u.stateBeforeEventAdds {
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))]
eventIDMap, err := db.EventIDs(stateEventNIDs)
eventIDMap, err := u.db.EventIDs(stateEventNIDs)
if err != nil {
return err
return nil, err
}
for _, entry := range added {
for _, entry := range u.added {
ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID])
}
for _, entry := range removed {
for _, entry := range u.removed {
ore.RemovesStateEventIDs = append(ore.RemovesStateEventIDs, eventIDMap[entry.EventNID])
}
for _, entry := range stateBeforeEventRemoves {
for _, entry := range u.stateBeforeEventRemoves {
ore.StateBeforeRemovesEventIDs = append(ore.StateBeforeRemovesEventIDs, eventIDMap[entry.EventNID])
}
for _, entry := range stateBeforeEventAdds {
for _, entry := range u.stateBeforeEventAdds {
ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID])
}
ore.SendAsServer = sendAsServer
return ow.WriteOutputRoomEvent(ore)
ore.SendAsServer = u.sendAsServer
return &api.OutputEvent{
Type: api.OutputTypeNewRoomEvent,
NewRoomEvent: &ore,
}, nil
}
type eventNIDSorter []types.EventNID

View file

@ -15,13 +15,16 @@
package input
import (
"fmt"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
func updateMemberships(
db RoomEventDatabase, updater types.RoomRecentEventsUpdater, removed, added []types.StateEntry,
) error {
) ([]api.OutputEvent, error) {
changes := membershipChanges(removed, added)
var eventNIDs []types.EventNID
for _, change := range changes {
@ -34,9 +37,11 @@ func updateMemberships(
}
events, err := db.Events(eventNIDs)
if err != nil {
return err
return nil, err
}
var updates []api.OutputEvent
for _, change := range changes {
var ae *gomatrixserverlib.Event
var re *gomatrixserverlib.Event
@ -55,16 +60,18 @@ func updateMemberships(
}
targetNID = change.added.EventStateKeyNID
}
if err := updateMembership(updater, targetNID, re, ae); err != nil {
return err
if updates, err = updateMembership(updater, targetNID, re, ae, updates); err != nil {
return nil, err
}
}
return nil
return nil, nil
}
func updateMembership(
updater types.RoomRecentEventsUpdater, targetNID types.EventStateKeyNID, remove *gomatrixserverlib.Event, add *gomatrixserverlib.Event,
) error {
updater types.RoomRecentEventsUpdater, targetNID types.EventStateKeyNID,
remove *gomatrixserverlib.Event, add *gomatrixserverlib.Event,
updates []api.OutputEvent,
) ([]api.OutputEvent, error) {
var err error
old := "leave"
new := "leave"
@ -72,40 +79,95 @@ func updateMembership(
if remove != nil {
old, err = remove.Membership()
if err != nil {
return err
return nil, err
}
}
if add != nil {
new, err = add.Membership()
if err != nil {
return err
return nil, err
}
}
if old == new {
return nil
return updates, nil
}
mu, err := updater.MembershipUpdater(targetNID)
if err != nil {
return err
return nil, err
}
switch new {
return updateWithNewMembership(mu, new, add, updates)
}
func updateWithNewMembership(
mu types.MembershipUpdater, newMembership string, add *gomatrixserverlib.Event,
updates []api.OutputEvent,
) ([]api.OutputEvent, error) {
switch newMembership {
case "invite":
_, err := mu.SetToInvite(*add)
needsSending, err := mu.SetToInvite(*add)
if err != nil {
return err
return nil, err
}
if needsSending {
updates = appendInviteUpdate(updates, add)
}
return updates, nil
case "join":
if !mu.IsJoin() {
mu.SetToJoin(add.Sender())
if mu.IsJoin() {
return updates, nil
}
case "leave":
if !mu.IsLeave() {
retired, err := mu.SetToJoin(add.Sender())
if err != nil {
return nil, err
}
return appendRetireUpdates(updates, add, newMembership, retired), nil
case "leave", "ban":
if mu.IsLeave() {
return updates, nil
}
retired, err := mu.SetToLeave(add.Sender())
if err != nil {
mu.SetToLeave(add.Sender())
}
return appendRetireUpdates(updates, add, newMembership, retired), nil
default:
panic(fmt.Errorf(
"input: membership %q is not one of the allowed values", newMembership,
))
}
return nil
}
func appendInviteUpdate(
updates []api.OutputEvent, add *gomatrixserverlib.Event,
) []api.OutputEvent {
onie := api.OutputNewInviteEvent{
Event: *add,
}
return append(updates, api.OutputEvent{
Type: api.OutputTypeNewInviteEvent,
NewInviteEvent: &onie,
})
}
func appendRetireUpdates(
updates []api.OutputEvent, add *gomatrixserverlib.Event, membership string, retired []string,
) []api.OutputEvent {
for _, eventID := range retired {
orie := api.OutputRetireInviteEvent{
EventID: eventID,
Membership: membership,
}
if add != nil {
orie.RetiredByEventID = add.EventID()
}
updates = append(updates, api.OutputEvent{
Type: api.OutputTypeRetireInviteEvent,
RetireInviteEvent: &orie,
})
}
return updates
}
type stateChange struct {