diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index e1bf8e0a0..e27325435 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -27,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -98,8 +99,13 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputRoomEventConsumer) onNewRoomEvent( ctx context.Context, msg api.OutputNewRoomEvent, ) error { + roomVersion := gomatrixserverlib.RoomVersionV1 + if rv := gjson.Get(string(msg.Event), "content.room_version"); rv.Exists() { + roomVersion = gomatrixserverlib.RoomVersion(rv.String()) + } + // TODO: Is this trusted here? - ev, err := gomatrixserverlib.NewEventFromTrustedJSON(msg.Event, false, msg.RoomVersion) + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(msg.Event, false, roomVersion) if err != nil { log.WithError(err).WithField("roomversion", msg.RoomVersion).Errorf( "roomserver output log: couldn't create event from trusted JSON (%d bytes)", diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index 62d5565f4..2e50d87b9 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -50,7 +50,7 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state ( -- part of the current state of the room. added_at BIGINT, -- The version of the room - room_version TEXT NOT NULL, + room_version TEXT, -- Clobber based on 3-uple of room_id, type and state_key CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key) ); @@ -61,8 +61,8 @@ CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state( ` const upsertRoomStateSQL = "" + - "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, event_json, membership, added_at)" + - " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" + + "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, event_json, membership, added_at, room_version)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)" + " ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" + " DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, event_json = $7, membership = $8, added_at = $9" @@ -229,6 +229,8 @@ func (s *currentRoomStateStatements) upsertRoomState( ctx context.Context, txn *sql.Tx, event gomatrixserverlib.Event, membership *string, addedAt types.StreamPosition, ) error { + roomVersionString := "" + // Parse content as JSON and search for an "url" key containsURL := false var content map[string]interface{} @@ -237,6 +239,12 @@ func (s *currentRoomStateStatements) upsertRoomState( _, containsURL = content["url"] } + if event.Type() == gomatrixserverlib.MRoomCreate { + if rv, hasRv := content["room_version"]; hasRv { + roomVersionString = rv.(string) + } + } + // upsert state event stmt := common.TxStmt(txn, s.upsertRoomStateStmt) _, err := stmt.ExecContext( @@ -250,6 +258,7 @@ func (s *currentRoomStateStatements) upsertRoomState( event.JSON(), membership, addedAt, + roomVersionString, ) return err } diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 2e7b4923a..b23a4a99b 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -170,23 +170,28 @@ func (d *SyncServerDatasource) WriteEvent( ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, ) if err != nil { + fmt.Println("d.events.insertEvent:", err) return err } pduPosition = pos if err = d.topology.insertEventInTopology(ctx, ev); err != nil { + fmt.Println("d.topology.insertEventInTopology:", err) return err } if err = d.handleBackwardExtremities(ctx, ev); err != nil { + fmt.Println("d.handleBackwardExtremities:", err) return err } if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { // Nothing to do, the event may have just been a message event. + fmt.Println("not a state event") return nil } + fmt.Println("d.updateRoomState") return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition) }) diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index d4388ee90..0d76a2ac4 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -38,7 +38,7 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state ( event_json TEXT NOT NULL, membership TEXT, added_at BIGINT, - room_version TEXT NOT NULL, + room_version TEXT, -- only set for m.room.create UNIQUE (room_id, type, state_key) ); -- for event deletion @@ -48,8 +48,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_s ` const upsertRoomStateSQL = "" + - "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, event_json, membership, added_at)" + - " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" + + "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, event_json, membership, added_at, room_version)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)" + " ON CONFLICT (event_id, room_id, type, sender, contains_url)" + " DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, event_json = $7, membership = $8, added_at = $9" @@ -208,12 +208,20 @@ func (s *currentRoomStateStatements) upsertRoomState( ctx context.Context, txn *sql.Tx, event gomatrixserverlib.Event, membership *string, addedAt types.StreamPosition, ) error { + roomVersionString := "" + // Parse content as JSON and search for an "url" key containsURL := false var content map[string]interface{} if json.Unmarshal(event.Content(), &content) != nil { // Set containsURL to true if url is present _, containsURL = content["url"] + + if event.Type() == gomatrixserverlib.MRoomCreate { + if rv, hasRv := content["room_version"]; hasRv { + roomVersionString = rv.(string) + } + } } // upsert state event @@ -229,6 +237,7 @@ func (s *currentRoomStateStatements) upsertRoomState( event.JSON(), membership, addedAt, + roomVersionString, ) return err } diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 67d073734..e0d2758d7 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -196,15 +196,18 @@ func (d *SyncServerDatasource) WriteEvent( ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, ) if err != nil { + fmt.Println("d.events.insertEvent:", err) return err } pduPosition = pos if err = d.topology.insertEventInTopology(ctx, txn, ev); err != nil { + fmt.Println("d.topology.insertEventInTopology:", err) return err } if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil { + fmt.Println("d.handleBackwardExtremities:", err) return err } @@ -213,6 +216,7 @@ func (d *SyncServerDatasource) WriteEvent( return nil } + fmt.Println("d.updateRoomState...") return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition) })