Use HeaderedEvent in roomserver kafka output

This commit is contained in:
Neil Alexander 2020-03-16 17:40:31 +00:00
parent 1882719dc6
commit 25f3dbf7ea
7 changed files with 17 additions and 13 deletions

View file

@ -101,11 +101,11 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
"type": ev.Type(), "type": ev.Type(),
}).Info("appservice received an event from roomserver") }).Info("appservice received an event from roomserver")
missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev.Event)
if err != nil { if err != nil {
return err return err
} }
events := append(missingEvents, ev) events := append(missingEvents, ev.Event)
// Send event to any relevant application services // Send event to any relevant application services
return s.filterRoomserverEvents(context.TODO(), events) return s.filterRoomserverEvents(context.TODO(), events)

View file

@ -91,7 +91,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
"type": ev.Type(), "type": ev.Type(),
}).Info("received event from roomserver") }).Info("received event from roomserver")
events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev.Event)
if err != nil { if err != nil {
return err return err
} }

View file

@ -109,7 +109,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// processMessage updates the list of currently joined hosts in the room // processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event. // and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event) addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event.Event)
if err != nil { if err != nil {
return err return err
} }
@ -155,7 +155,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
// Send the event. // Send the event.
return s.queues.SendEvent( return s.queues.SendEvent(
&ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent, &ore.Event.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent,
) )
} }
@ -178,7 +178,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
ore.AddsStateEventIDs, ore.RemovesStateEventIDs, ore.AddsStateEventIDs, ore.RemovesStateEventIDs,
ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs, ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs,
) )
combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ore.Event) combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ore.Event.Event)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -54,7 +54,7 @@ type OutputEvent struct {
// prev_events. // prev_events.
type OutputNewRoomEvent struct { type OutputNewRoomEvent struct {
// The Event. // The Event.
Event gomatrixserverlib.Event `json:"event"` Event gomatrixserverlib.HeaderedEvent `json:"event"`
// The latest events in the room after this event. // The latest events in the room after this event.
// This can be used to set the prev events for new events in the room. // This can be used to set the prev events for new events in the room.
// This also can be used to get the full current state after this event. // This also can be used to get the full current state after this event.
@ -117,7 +117,7 @@ type OutputNewRoomEvent struct {
// tracked separately from the room events themselves. // tracked separately from the room events themselves.
type OutputNewInviteEvent struct { type OutputNewInviteEvent struct {
// The "m.room.member" invite event. // The "m.room.member" invite event.
Event gomatrixserverlib.Event `json:"event"` Event gomatrixserverlib.HeaderedEvent `json:"event"`
} }
// An OutputRetireInviteEvent is written whenever an existing invite is no longer // An OutputRetireInviteEvent is written whenever an existing invite is no longer

View file

@ -253,8 +253,11 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
latestEventIDs[i] = u.latest[i].EventID latestEventIDs[i] = u.latest[i].EventID
} }
// TODO: Room version here
roomVersion := gomatrixserverlib.RoomVersionV1
ore := api.OutputNewRoomEvent{ ore := api.OutputNewRoomEvent{
Event: u.event, Event: u.event.Headered(roomVersion),
LastSentEventID: u.lastEventIDSent, LastSentEventID: u.lastEventIDSent,
LatestEventIDs: latestEventIDs, LatestEventIDs: latestEventIDs,
TransactionID: u.transactionID, TransactionID: u.transactionID,

View file

@ -136,13 +136,14 @@ func updateToInviteMembership(
return nil, err return nil, err
} }
if needsSending { if needsSending {
roomVersion := gomatrixserverlib.RoomVersionV1
// We notify the consumers using a special event even though we will // We notify the consumers using a special event even though we will
// notify them about the change in current state as part of the normal // notify them about the change in current state as part of the normal
// room event stream. This ensures that the consumers only have to // room event stream. This ensures that the consumers only have to
// consider a single stream of events when determining whether a user // consider a single stream of events when determining whether a user
// is invited, rather than having to combine multiple streams themselves. // is invited, rather than having to combine multiple streams themselves.
onie := api.OutputNewInviteEvent{ onie := api.OutputNewInviteEvent{
Event: *add, Event: (*add).Headered(roomVersion),
} }
updates = append(updates, api.OutputEvent{ updates = append(updates, api.OutputEvent{
Type: api.OutputTypeNewInviteEvent, Type: api.OutputTypeNewInviteEvent,

View file

@ -98,7 +98,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
func (s *OutputRoomEventConsumer) onNewRoomEvent( func (s *OutputRoomEventConsumer) onNewRoomEvent(
ctx context.Context, msg api.OutputNewRoomEvent, ctx context.Context, msg api.OutputNewRoomEvent,
) error { ) error {
ev := msg.Event ev := msg.Event.Event
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": ev.EventID(), "event_id": ev.EventID(),
"room_id": ev.RoomID(), "room_id": ev.RoomID(),
@ -153,7 +153,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
func (s *OutputRoomEventConsumer) onNewInviteEvent( func (s *OutputRoomEventConsumer) onNewInviteEvent(
ctx context.Context, msg api.OutputNewInviteEvent, ctx context.Context, msg api.OutputNewInviteEvent,
) error { ) error {
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event) pduPos, err := s.db.AddInviteEvent(ctx, msg.Event.Event)
if err != nil { if err != nil {
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database
log.WithFields(log.Fields{ log.WithFields(log.Fields{
@ -163,7 +163,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure") }).Panicf("roomserver output log: write invite failure")
return nil return nil
} }
s.notifier.OnNewEvent(&msg.Event, "", nil, types.PaginationToken{PDUPosition: pduPos}) s.notifier.OnNewEvent(&msg.Event.Event, "", nil, types.PaginationToken{PDUPosition: pduPos})
return nil return nil
} }