From 61dffdacf0bf8c770a3caf64e47e031667f49d7c Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 11 Mar 2020 17:34:06 +0000 Subject: [PATCH] Mangle everything so that room versions are now sent in the kafka events --- appservice/consumers/roomserver.go | 51 +++++++++++---------- federationsender/consumers/roomserver.go | 51 +++++++++++---------- publicroomsapi/consumers/roomserver.go | 53 +++++++++++----------- roomserver/input/events.go | 12 +++-- roomserver/input/input.go | 11 ++++- roomserver/input/latest_events.go | 10 ++-- roomserver/query/query.go | 23 ++++------ roomserver/storage/interface.go | 3 +- roomserver/storage/postgres/rooms_table.go | 14 ++++++ roomserver/storage/postgres/storage.go | 12 ++++- roomserver/storage/sqlite3/rooms_table.go | 14 ++++++ roomserver/storage/sqlite3/storage.go | 12 ++++- syncapi/consumers/roomserver.go | 53 +++++++++++++++++----- 13 files changed, 205 insertions(+), 114 deletions(-) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 395fa994c..cebb917bb 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -15,8 +15,10 @@ package consumers import ( + "bytes" "context" "encoding/json" + "errors" "github.com/matrix-org/dendrite/appservice/storage" "github.com/matrix-org/dendrite/appservice/types" @@ -81,6 +83,30 @@ func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent + + // See if the room version is present in the headers. If it isn't + // then we can't process the event as we don't know what the format + // will be + var roomVersion gomatrixserverlib.RoomVersion + for _, header := range msg.Headers { + if bytes.Equal(header.Key, []byte("room_version")) { + roomVersion = gomatrixserverlib.RoomVersion(header.Value) + break + } + } + if roomVersion == "" { + return errors.New("room version was not in sarama headers") + } + + // Prepare the room event so that it has the correct field types + // for the room version + if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil { + log.WithFields(log.Fields{ + "room_version": roomVersion, + }).WithError(err).Errorf("can't prepare event to version") + return err + } + if err := json.Unmarshal(msg.Value, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") @@ -94,31 +120,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - // Get the room version of the room - vQueryReq := api.QueryRoomVersionForRoomIDRequest{RoomID: string(msg.Key)} - vQueryRes := api.QueryRoomVersionForRoomIDResponse{} - if err := s.query.QueryRoomVersionForRoomID(context.Background(), &vQueryReq, &vQueryRes); err != nil { - log.WithFields(log.Fields{ - "room_id": string(msg.Key), - }).WithError(err).Errorf("can't query room version") - return err - } - - // Prepare the room event so that it has the correct field types - // for the room version - if err := output.NewRoomEvent.Event.PrepareAs(vQueryRes.RoomVersion); err != nil { - log.WithFields(log.Fields{ - "room_version": vQueryRes.RoomVersion, - }).WithError(err).Errorf("can't prepare event to version") - return err - } - - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil - } - ev := output.NewRoomEvent.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index d47254efd..e68f0beee 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -15,8 +15,10 @@ package consumers import ( + "bytes" "context" "encoding/json" + "errors" "fmt" "github.com/matrix-org/dendrite/common" @@ -74,6 +76,30 @@ func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent + + // See if the room version is present in the headers. If it isn't + // then we can't process the event as we don't know what the format + // will be + var roomVersion gomatrixserverlib.RoomVersion + for _, header := range msg.Headers { + if bytes.Equal(header.Key, []byte("room_version")) { + roomVersion = gomatrixserverlib.RoomVersion(header.Value) + break + } + } + if roomVersion == "" { + return errors.New("room version was not in sarama headers") + } + + // Prepare the room event so that it has the correct field types + // for the room version + if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil { + log.WithFields(log.Fields{ + "room_version": roomVersion, + }).WithError(err).Errorf("can't prepare event to version") + return err + } + if err := json.Unmarshal(msg.Value, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") @@ -87,31 +113,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - // Get the room version of the room - vQueryReq := api.QueryRoomVersionForRoomIDRequest{RoomID: string(msg.Key)} - vQueryRes := api.QueryRoomVersionForRoomIDResponse{} - if err := s.query.QueryRoomVersionForRoomID(context.Background(), &vQueryReq, &vQueryRes); err != nil { - log.WithFields(log.Fields{ - "room_id": string(msg.Key), - }).WithError(err).Errorf("can't query room version") - return err - } - - // Prepare the room event so that it has the correct field types - // for the room version - if err := output.NewRoomEvent.Event.PrepareAs(vQueryRes.RoomVersion); err != nil { - log.WithFields(log.Fields{ - "room_version": vQueryRes.RoomVersion, - }).WithError(err).Errorf("can't prepare event to version") - return err - } - - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil - } - ev := output.NewRoomEvent.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), diff --git a/publicroomsapi/consumers/roomserver.go b/publicroomsapi/consumers/roomserver.go index c56d8215b..099c1337e 100644 --- a/publicroomsapi/consumers/roomserver.go +++ b/publicroomsapi/consumers/roomserver.go @@ -15,13 +15,16 @@ package consumers import ( + "bytes" "context" "encoding/json" + "errors" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -63,6 +66,30 @@ func (s *OutputRoomEventConsumer) Start() error { // onMessage is called when the sync server receives a new event from the room server output log. func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { var output api.OutputEvent + + // See if the room version is present in the headers. If it isn't + // then we can't process the event as we don't know what the format + // will be + var roomVersion gomatrixserverlib.RoomVersion + for _, header := range msg.Headers { + if bytes.Equal(header.Key, []byte("room_version")) { + roomVersion = gomatrixserverlib.RoomVersion(header.Value) + break + } + } + if roomVersion == "" { + return errors.New("room version was not in sarama headers") + } + + // Prepare the room event so that it has the correct field types + // for the room version + if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil { + log.WithFields(log.Fields{ + "room_version": roomVersion, + }).WithError(err).Errorf("can't prepare event to version") + return err + } + if err := json.Unmarshal(msg.Value, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") @@ -77,32 +104,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - // Get the room version of the room - vQueryReq := api.QueryRoomVersionForRoomIDRequest{RoomID: string(msg.Key)} - vQueryRes := api.QueryRoomVersionForRoomIDResponse{} - if err := s.query.QueryRoomVersionForRoomID(context.Background(), &vQueryReq, &vQueryRes); err != nil { - log.WithFields(log.Fields{ - "room_id": string(msg.Key), - }).WithError(err).Errorf("can't query room version") - return err - } - - // Prepare the room event so that it has the correct field types - // for the room version - if err := output.NewRoomEvent.Event.PrepareAs(vQueryRes.RoomVersion); err != nil { - log.WithFields(log.Fields{ - "room_version": vQueryRes.RoomVersion, - }).WithError(err).Errorf("can't prepare event to version") - return err - } - - // Parse out the event JSON - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil - } - ev := output.NewRoomEvent.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), diff --git a/roomserver/input/events.go b/roomserver/input/events.go index 17c92917f..c170a1c93 100644 --- a/roomserver/input/events.go +++ b/roomserver/input/events.go @@ -73,7 +73,10 @@ type RoomEventDatabase interface { sessionID int64, userID string, ) (string, error) // Look up the room version from the database. - GetRoomVersionForRoom( + GetRoomVersionForRoomID( + ctx context.Context, roomID string, + ) (gomatrixserverlib.RoomVersion, error) + GetRoomVersionForRoomNID( ctx context.Context, roomNID types.RoomNID, ) (gomatrixserverlib.RoomVersion, error) } @@ -81,7 +84,7 @@ type RoomEventDatabase interface { // OutputRoomEventWriter has the APIs needed to write an event to the output logs. type OutputRoomEventWriter interface { // Write a list of events for a room - WriteOutputEvents(roomID string, updates []api.OutputEvent) error + WriteOutputEvents(roomID string, roomVersion gomatrixserverlib.RoomVersion, updates []api.OutputEvent) error } // processRoomEvent can only be called once at a time @@ -156,7 +159,7 @@ func calculateAndSetState( stateAtEvent *types.StateAtEvent, event gomatrixserverlib.Event, ) error { - roomVersion, err := db.GetRoomVersionForRoom(ctx, roomNID) + roomVersion, err := db.GetRoomVersionForRoomNID(ctx, roomNID) if err != nil { return err } @@ -196,6 +199,7 @@ func processInviteEvent( } roomID := input.Event.RoomID() + roomVersion := gomatrixserverlib.RoomVersionV1 // TODO: Feeeeeeex targetUserID := *input.Event.StateKey() updater, err := db.MembershipUpdater(ctx, roomID, targetUserID) @@ -246,7 +250,7 @@ func processInviteEvent( return err } - if err = ow.WriteOutputEvents(roomID, outputUpdates); err != nil { + if err = ow.WriteOutputEvents(roomID, roomVersion, outputUpdates); err != nil { return err } diff --git a/roomserver/input/input.go b/roomserver/input/input.go index bd029d8df..52c7e4d10 100644 --- a/roomserver/input/input.go +++ b/roomserver/input/input.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -39,7 +40,9 @@ type RoomserverInputAPI struct { } // WriteOutputEvents implements OutputRoomEventWriter -func (r *RoomserverInputAPI) WriteOutputEvents(roomID string, updates []api.OutputEvent) error { +func (r *RoomserverInputAPI) WriteOutputEvents( + roomID string, roomVersion gomatrixserverlib.RoomVersion, updates []api.OutputEvent, +) error { messages := make([]*sarama.ProducerMessage, len(updates)) for i := range updates { value, err := json.Marshal(updates[i]) @@ -50,6 +53,12 @@ func (r *RoomserverInputAPI) WriteOutputEvents(roomID string, updates []api.Outp Topic: r.OutputRoomEventTopic, Key: sarama.StringEncoder(roomID), Value: sarama.ByteEncoder(value), + Headers: []sarama.RecordHeader{ + sarama.RecordHeader{ + Key: []byte("room_version"), + Value: []byte(roomVersion), + }, + }, } } return r.Producer.SendMessages(messages) diff --git a/roomserver/input/latest_events.go b/roomserver/input/latest_events.go index eb31b4779..eab971547 100644 --- a/roomserver/input/latest_events.go +++ b/roomserver/input/latest_events.go @@ -67,7 +67,7 @@ func updateLatestEvents( } }() - roomVersion, err := db.GetRoomVersionForRoom(ctx, roomNID) + roomVersion, err := db.GetRoomVersionForRoomNID(ctx, roomNID) if err != nil { return err } @@ -163,6 +163,8 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { } updates = append(updates, *update) + roomVersion := gomatrixserverlib.RoomVersionV1 + // Send the event to the output logs. // We do this inside the database transaction to ensure that we only mark an event as sent if we sent it. // (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but @@ -171,7 +173,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // send the event asynchronously but we would need to ensure that 1) the events are written to the log in // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the // necessary bookkeeping we'll keep the event sending synchronous for now. - if err = u.ow.WriteOutputEvents(u.event.RoomID(), updates); err != nil { + if err = u.ow.WriteOutputEvents(u.event.RoomID(), roomVersion, updates); err != nil { return err } @@ -184,7 +186,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { func (u *latestEventsUpdater) latestState() error { var err error - roomVersion, err := u.db.GetRoomVersionForRoom(u.ctx, u.roomNID) + roomVersion, err := u.db.GetRoomVersionForRoomNID(u.ctx, u.roomNID) if err != nil { return err } @@ -262,7 +264,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) latestEventIDs[i] = u.latest[i].EventID } - roomVersion, err := u.db.GetRoomVersionForRoom(context.Background(), u.roomNID) + roomVersion, err := u.db.GetRoomVersionForRoomNID(context.Background(), u.roomNID) if err != nil { return nil, err } diff --git a/roomserver/query/query.go b/roomserver/query/query.go index 7ac9e7389..c018cd71a 100644 --- a/roomserver/query/query.go +++ b/roomserver/query/query.go @@ -90,7 +90,10 @@ type RoomserverQueryAPIDatabase interface { context.Context, []types.EventStateKeyNID, ) (map[types.EventStateKeyNID]string, error) // Look up the room version from the database. - GetRoomVersionForRoom( + GetRoomVersionForRoomID( + ctx context.Context, roomID string, + ) (gomatrixserverlib.RoomVersion, error) + GetRoomVersionForRoomNID( ctx context.Context, roomNID types.RoomNID, ) (gomatrixserverlib.RoomVersion, error) // Get the room NID for a given event ID. @@ -123,7 +126,7 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState( return nil } response.RoomExists = true - roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, roomNID) + roomVersion, err := r.DB.GetRoomVersionForRoomID(ctx, request.RoomID) if err != nil { return err } @@ -170,7 +173,7 @@ func (r *RoomserverQueryAPI) QueryStateAfterEvents( return nil } response.RoomExists = true - roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, roomNID) + roomVersion, err := r.DB.GetRoomVersionForRoomID(ctx, request.RoomID) if err != nil { return err } @@ -350,7 +353,7 @@ func (r *RoomserverQueryAPI) getMembershipsBeforeEventNID( if err != nil { return nil, err } - roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, roomNID) + roomVersion, err := r.DB.GetRoomVersionForRoomNID(ctx, roomNID) if err != nil { return nil, err } @@ -464,7 +467,7 @@ func (r *RoomserverQueryAPI) checkServerAllowedToSeeEvent( if err != nil { return false, err } - roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, roomNID) + roomVersion, err := r.DB.GetRoomVersionForRoomNID(ctx, roomNID) if err != nil { return false, err } @@ -631,7 +634,7 @@ func (r *RoomserverQueryAPI) QueryStateAndAuthChain( if err != nil { return err } - roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, roomNID) + roomVersion, err := r.DB.GetRoomVersionForRoomID(ctx, request.RoomID) if err != nil { return err } @@ -786,14 +789,8 @@ func (r *RoomserverQueryAPI) QueryRoomVersionForRoomID( request *api.QueryRoomVersionForRoomIDRequest, response *api.QueryRoomVersionForRoomIDResponse, ) error { - // Get the room NID for the given room ID - roomNID, err := r.DB.RoomNID(ctx, request.RoomID) - if err != nil { - return err - } - // Then look up the room version for that room NID - roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, roomNID) + roomVersion, err := r.DB.GetRoomVersionForRoomID(ctx, request.RoomID) if err != nil { return err } diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index 0f7b9e614..77a08e2f6 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -45,7 +45,8 @@ type Database interface { GetMembership(ctx context.Context, roomNID types.RoomNID, requestSenderUserID string) (membershipEventNID types.EventNID, stillInRoom bool, err error) GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool) ([]types.EventNID, error) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) - GetRoomVersionForRoom(ctx context.Context, roomNID types.RoomNID) (gomatrixserverlib.RoomVersion, error) + GetRoomVersionForRoomID(ctx context.Context, roomID string) (gomatrixserverlib.RoomVersion, error) + GetRoomVersionForRoomNID(ctx context.Context, roomNID types.RoomNID) (gomatrixserverlib.RoomVersion, error) RoomNIDForEventID(ctx context.Context, eventID string) (types.RoomNID, error) RoomNIDForEventNID(ctx context.Context, eventNID types.EventNID) (types.RoomNID, error) } diff --git a/roomserver/storage/postgres/rooms_table.go b/roomserver/storage/postgres/rooms_table.go index d42117309..169252c17 100644 --- a/roomserver/storage/postgres/rooms_table.go +++ b/roomserver/storage/postgres/rooms_table.go @@ -65,6 +65,9 @@ const selectLatestEventNIDsForUpdateSQL = "" + const updateLatestEventNIDsSQL = "" + "UPDATE roomserver_rooms SET latest_event_nids = $2, last_event_sent_nid = $3, state_snapshot_nid = $4 WHERE room_nid = $1" +const selectRoomVersionForRoomIDSQL = "" + + "SELECT room_version FROM roomserver_rooms WHERE room_id = $1" + const selectRoomVersionForRoomNIDSQL = "" + "SELECT room_version FROM roomserver_rooms WHERE room_nid = $1" @@ -74,6 +77,7 @@ type roomStatements struct { selectLatestEventNIDsStmt *sql.Stmt selectLatestEventNIDsForUpdateStmt *sql.Stmt updateLatestEventNIDsStmt *sql.Stmt + selectRoomVersionForRoomIDStmt *sql.Stmt selectRoomVersionForRoomNIDStmt *sql.Stmt } @@ -88,6 +92,7 @@ func (s *roomStatements) prepare(db *sql.DB) (err error) { {&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL}, {&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL}, {&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL}, + {&s.selectRoomVersionForRoomIDStmt, selectRoomVersionForRoomIDSQL}, {&s.selectRoomVersionForRoomNIDStmt, selectRoomVersionForRoomNIDSQL}, }.prepare(db) } @@ -173,3 +178,12 @@ func (s *roomStatements) selectRoomVersionForRoomNID( err := stmt.QueryRowContext(ctx, roomNID).Scan(&roomVersion) return roomVersion, err } + +func (s *roomStatements) selectRoomVersionForRoomID( + ctx context.Context, txn *sql.Tx, roomID string, +) (gomatrixserverlib.RoomVersion, error) { + var roomVersion gomatrixserverlib.RoomVersion + stmt := common.TxStmt(txn, s.selectRoomVersionForRoomIDStmt) + err := stmt.QueryRowContext(ctx, roomID).Scan(&roomVersion) + return roomVersion, err +} diff --git a/roomserver/storage/postgres/storage.go b/roomserver/storage/postgres/storage.go index 8bd974f45..222ddd2fa 100644 --- a/roomserver/storage/postgres/storage.go +++ b/roomserver/storage/postgres/storage.go @@ -237,7 +237,7 @@ func (d *Database) Events( if err != nil { return nil, err } - roomVersion, err := d.GetRoomVersionForRoom(ctx, roomNID) + roomVersion, err := d.GetRoomVersionForRoomNID(ctx, roomNID) if err != nil { return nil, err } @@ -743,7 +743,15 @@ func (d *Database) EventsFromIDs(ctx context.Context, eventIDs []string) ([]type return d.Events(ctx, nids) } -func (d *Database) GetRoomVersionForRoom( +func (d *Database) GetRoomVersionForRoomID( + ctx context.Context, roomID string, +) (gomatrixserverlib.RoomVersion, error) { + return d.statements.selectRoomVersionForRoomID( + ctx, nil, roomID, + ) +} + +func (d *Database) GetRoomVersionForRoomNID( ctx context.Context, roomNID types.RoomNID, ) (gomatrixserverlib.RoomVersion, error) { return d.statements.selectRoomVersionForRoomNID( diff --git a/roomserver/storage/sqlite3/rooms_table.go b/roomserver/storage/sqlite3/rooms_table.go index 2f721a222..d2496c053 100644 --- a/roomserver/storage/sqlite3/rooms_table.go +++ b/roomserver/storage/sqlite3/rooms_table.go @@ -54,6 +54,9 @@ const selectLatestEventNIDsForUpdateSQL = "" + const updateLatestEventNIDsSQL = "" + "UPDATE roomserver_rooms SET latest_event_nids = $1, last_event_sent_nid = $2, state_snapshot_nid = $3 WHERE room_nid = $4" +const selectRoomVersionForRoomIDSQL = "" + + "SELECT room_version FROM roomserver_rooms WHERE room_id = $1" + const selectRoomVersionForRoomNIDSQL = "" + "SELECT room_version FROM roomserver_rooms WHERE room_nid = $1" @@ -63,6 +66,7 @@ type roomStatements struct { selectLatestEventNIDsStmt *sql.Stmt selectLatestEventNIDsForUpdateStmt *sql.Stmt updateLatestEventNIDsStmt *sql.Stmt + selectRoomVersionForRoomIDStmt *sql.Stmt selectRoomVersionForRoomNIDStmt *sql.Stmt } @@ -77,6 +81,7 @@ func (s *roomStatements) prepare(db *sql.DB) (err error) { {&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL}, {&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL}, {&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL}, + {&s.selectRoomVersionForRoomIDStmt, selectRoomVersionForRoomIDSQL}, {&s.selectRoomVersionForRoomNIDStmt, selectRoomVersionForRoomNIDSQL}, }.prepare(db) } @@ -157,6 +162,15 @@ func (s *roomStatements) updateLatestEventNIDs( return err } +func (s *roomStatements) selectRoomVersionForRoomID( + ctx context.Context, txn *sql.Tx, roomID string, +) (gomatrixserverlib.RoomVersion, error) { + var roomVersion gomatrixserverlib.RoomVersion + stmt := common.TxStmt(txn, s.selectRoomVersionForRoomIDStmt) + err := stmt.QueryRowContext(ctx, roomID).Scan(&roomVersion) + return roomVersion, err +} + func (s *roomStatements) selectRoomVersionForRoomNID( ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, ) (gomatrixserverlib.RoomVersion, error) { diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index f5904798e..7fe57a083 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -289,7 +289,7 @@ func (d *Database) Events( if err != nil { return err } - roomVersion, err := d.GetRoomVersionForRoom(ctx, roomNID) + roomVersion, err := d.GetRoomVersionForRoomNID(ctx, roomNID) if err != nil { return err } @@ -896,7 +896,15 @@ func (d *Database) EventsFromIDs(ctx context.Context, eventIDs []string) ([]type return d.Events(ctx, nids) } -func (d *Database) GetRoomVersionForRoom( +func (d *Database) GetRoomVersionForRoomID( + ctx context.Context, roomID string, +) (gomatrixserverlib.RoomVersion, error) { + return d.statements.selectRoomVersionForRoomID( + ctx, nil, roomID, + ) +} + +func (d *Database) GetRoomVersionForRoomNID( ctx context.Context, roomNID types.RoomNID, ) (gomatrixserverlib.RoomVersion, error) { return d.statements.selectRoomVersionForRoomNID( diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 587398d93..8184d1bcc 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -15,8 +15,10 @@ package consumers import ( + "bytes" "context" "encoding/json" + "errors" "fmt" "github.com/matrix-org/dendrite/common" @@ -74,18 +76,54 @@ func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil + + // See if the room version is present in the headers. If it isn't + // then we can't process the event as we don't know what the format + // will be + var roomVersion gomatrixserverlib.RoomVersion + for _, header := range msg.Headers { + if bytes.Equal(header.Key, []byte("room_version")) { + roomVersion = gomatrixserverlib.RoomVersion(header.Value) + break + } + } + if roomVersion == "" { + return errors.New("room version was not in sarama headers") } switch output.Type { case api.OutputTypeNewRoomEvent: + if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil { + log.WithFields(log.Fields{ + "room_version": roomVersion, + }).WithError(err).Errorf("can't prepare event to version") + return err + } + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) case api.OutputTypeNewInviteEvent: + if err := output.NewInviteEvent.Event.PrepareAs(roomVersion); err != nil { + log.WithFields(log.Fields{ + "room_version": roomVersion, + }).WithError(err).Errorf("can't prepare event to version") + return err + } + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) case api.OutputTypeRetireInviteEvent: + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent) default: log.WithField("type", output.Type).Debug( @@ -100,13 +138,6 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( ) error { ev := msg.Event - if err := msg.Event.PrepareAs(msg.RoomVersion); err != nil { - log.WithFields(log.Fields{ - "room_version": msg.RoomVersion, - }).WithError(err).Errorf("can't prepare event to version") - return err - } - log.WithFields(log.Fields{ "event_id": ev.EventID(), "room_id": ev.RoomID(),