Send room version of kafka for output events from roomserver

This commit is contained in:
Neil Alexander 2020-03-10 16:09:06 +00:00
parent b7744e3c0c
commit 70fdf9f834
7 changed files with 67 additions and 15 deletions

View file

@ -94,7 +94,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil
}
ev := output.NewRoomEvent.Event
// TODO: Is this trusted here?
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.NewRoomEvent.Event, false, output.NewRoomEvent.RoomVersion)
if err != nil {
return err
}
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),

View file

@ -84,7 +84,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil
}
ev := output.NewRoomEvent.Event
// TODO: Is this trusted here?
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.NewRoomEvent.Event, false, output.NewRoomEvent.RoomVersion)
if err != nil {
return err
}
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),

View file

@ -85,7 +85,13 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
)
return nil
}
ev := &output.NewRoomEvent.Event
// TODO: Is this trusted here?
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.NewRoomEvent.Event, false, output.NewRoomEvent.RoomVersion)
if err != nil {
return err
}
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
@ -109,7 +115,13 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event)
// TODO: Is this trusted here?
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(ore.Event, false, ore.RoomVersion)
if err != nil {
return err
}
addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ev)
if err != nil {
return err
}
@ -124,9 +136,9 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
// talking to the roomserver
oldJoinedHosts, err := s.db.UpdateRoom(
context.TODO(),
ore.Event.RoomID(),
ev.RoomID(),
ore.LastSentEventID,
ore.Event.EventID(),
ev.EventID(),
addsJoinedHosts,
ore.RemovesStateEventIDs,
)
@ -155,7 +167,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
// Send the event.
return s.queues.SendEvent(
&ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent,
&ev, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent,
)
}
@ -171,6 +183,12 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
ore api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost,
) ([]gomatrixserverlib.ServerName, error) {
// TODO: Is this trusted here?
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(ore.Event, false, ore.RoomVersion)
if err != nil {
return nil, err
}
// Combine the delta into a single delta so that the adds and removes can
// cancel each other out. This should reduce the number of times we need
// to fetch a state event from the room server.
@ -178,7 +196,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
ore.AddsStateEventIDs, ore.RemovesStateEventIDs,
ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs,
)
combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ore.Event)
combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ev)
if err != nil {
return nil, err
}

View file

@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/publicroomsapi/storage"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
)
@ -77,7 +78,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil
}
ev := output.NewRoomEvent.Event
// TODO: Is this trusted here?
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.NewRoomEvent.Event, false, output.NewRoomEvent.RoomVersion)
if err != nil {
return err
}
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),

View file

@ -53,8 +53,10 @@ type OutputEvent struct {
// event IDs in the room. The state before an event is the state after its
// prev_events.
type OutputNewRoomEvent struct {
// The Event.
Event gomatrixserverlib.Event `json:"event"`
// The event.
Event []byte `json:"event"`
// The event version.
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
// The latest events in the room after this event.
// This can be used to set the prev events for new events in the room.
// This also can be used to get the full current state after this event.

View file

@ -67,10 +67,15 @@ func updateLatestEvents(
}
}()
roomVersion, err := db.GetRoomVersionForRoom(ctx, roomNID)
if err != nil {
return err
}
u := latestEventsUpdater{
ctx: ctx, db: db, updater: updater, ow: ow, roomNID: roomNID,
stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer,
transactionID: transactionID,
roomVersion: roomVersion, stateAtEvent: stateAtEvent, event: event,
sendAsServer: sendAsServer, transactionID: transactionID,
}
if err = u.doUpdateLatestEvents(); err != nil {
return err
@ -90,6 +95,7 @@ type latestEventsUpdater struct {
updater types.RoomRecentEventsUpdater
ow OutputRoomEventWriter
roomNID types.RoomNID
roomVersion gomatrixserverlib.RoomVersion
stateAtEvent types.StateAtEvent
event gomatrixserverlib.Event
transactionID *api.TransactionID
@ -256,8 +262,13 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
latestEventIDs[i] = u.latest[i].EventID
}
eventJSON, err := u.event.MarshalJSON()
if err != nil {
return nil, err
}
ore := api.OutputNewRoomEvent{
Event: u.event,
Event: eventJSON,
LastSentEventID: u.lastEventIDSent,
LatestEventIDs: latestEventIDs,
TransactionID: u.transactionID,

View file

@ -98,7 +98,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
func (s *OutputRoomEventConsumer) onNewRoomEvent(
ctx context.Context, msg api.OutputNewRoomEvent,
) error {
ev := msg.Event
// TODO: Is this trusted here?
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(msg.Event, false, msg.RoomVersion)
if err != nil {
return err
}
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),