diff --git a/go.mod b/go.mod index 6ec2bac63..dfa210eae 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v1.4.1 github.com/sirupsen/logrus v1.4.2 - github.com/tidwall/gjson v1.6.0 // indirect + github.com/tidwall/gjson v1.6.0 github.com/tidwall/pretty v1.0.1 // indirect github.com/uber/jaeger-client-go v2.22.1+incompatible github.com/uber/jaeger-lib v2.2.0+incompatible diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index 102ff78d3..62d5565f4 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -48,8 +48,10 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state ( membership TEXT, -- The serial ID of the output_room_events table when this event became -- part of the current state of the room. - added_at BIGINT, - -- Clobber based on 3-uple of room_id, type and state_key + added_at BIGINT, + -- The version of the room + room_version TEXT NOT NULL, + -- Clobber based on 3-uple of room_id, type and state_key CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key) ); -- for event deletion @@ -94,6 +96,7 @@ const selectEventsWithEventIDsSQL = "" + " FROM syncapi_current_room_state WHERE event_id = ANY($1)" type currentRoomStateStatements struct { + roomVersions *roomVersionStatements upsertRoomStateStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt selectRoomIDsWithMembershipStmt *sql.Stmt @@ -103,7 +106,8 @@ type currentRoomStateStatements struct { selectStateEventStmt *sql.Stmt } -func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { +func (s *currentRoomStateStatements) prepare(db *sql.DB, rvs *roomVersionStatements) (err error) { + s.roomVersions = rvs _, err = db.Exec(currentRoomStateSchema) if err != nil { return @@ -165,6 +169,9 @@ func (s *currentRoomStateStatements) selectRoomIDsWithMembership( ) ([]string, error) { stmt := common.TxStmt(txn, s.selectRoomIDsWithMembershipStmt) rows, err := stmt.QueryContext(ctx, userID, membership) + if err == sql.ErrNoRows { + return nil, nil + } if err != nil { return nil, err } @@ -195,12 +202,19 @@ func (s *currentRoomStateStatements) selectCurrentState( stateFilter.ContainsURL, stateFilter.Limit, ) + if err == sql.ErrNoRows { + return nil, nil + } if err != nil { return nil, err } defer rows.Close() // nolint: errcheck - return rowsToEvents(rows) + if roomVersion, e := s.roomVersions.selectRoomVersion(ctx, txn, roomID); e == nil { + return rowsToEvents(rows, roomVersion) + } else { + return nil, e + } } func (s *currentRoomStateStatements) deleteRoomStateByEventID( @@ -249,7 +263,7 @@ func (s *currentRoomStateStatements) selectEventsWithEventIDs( return nil, err } defer rows.Close() // nolint: errcheck - return rowsToStreamEvents(rows) + return rowsToStreamEvents(ctx, txn, s.roomVersions, rows) } func rowsToEvents(rows *sql.Rows, roomVersion gomatrixserverlib.RoomVersion) ([]gomatrixserverlib.Event, error) { @@ -270,9 +284,9 @@ func rowsToEvents(rows *sql.Rows, roomVersion gomatrixserverlib.RoomVersion) ([] } func (s *currentRoomStateStatements) selectStateEvent( - ctx context.Context, roomID, evType, stateKey string, + ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string, ) (*gomatrixserverlib.Event, error) { - stmt := s.selectStateEventStmt + stmt := common.TxStmt(txn, s.selectStateEventStmt) var res []byte err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res) if err == sql.ErrNoRows { @@ -281,6 +295,10 @@ func (s *currentRoomStateStatements) selectStateEvent( if err != nil { return nil, err } - ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false) - return &ev, err + if roomVersion, e := s.roomVersions.selectRoomVersion(ctx, txn, roomID); e == nil { + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false, roomVersion) + return &ev, err + } else { + return nil, e + } } diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go index 2cb8fb199..15248a7ab 100644 --- a/syncapi/storage/postgres/invites_table.go +++ b/syncapi/storage/postgres/invites_table.go @@ -18,6 +18,7 @@ package postgres import ( "context" "database/sql" + "fmt" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/syncapi/types" @@ -59,13 +60,15 @@ const selectMaxInviteIDSQL = "" + "SELECT MAX(id) FROM syncapi_invite_events" type inviteEventsStatements struct { + roomVersions *roomVersionStatements insertInviteEventStmt *sql.Stmt selectInviteEventsInRangeStmt *sql.Stmt deleteInviteEventStmt *sql.Stmt selectMaxInviteIDStmt *sql.Stmt } -func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) { +func (s *inviteEventsStatements) prepare(db *sql.DB, rvs *roomVersionStatements) (err error) { + s.roomVersions = rvs _, err = db.Exec(inviteEventsSchema) if err != nil { return @@ -126,7 +129,13 @@ func (s *inviteEventsStatements) selectInviteEventsInRange( return nil, err } - event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false) + roomVersion, err := s.roomVersions.selectRoomVersion(ctx, txn, roomID) + if err != nil { + return nil, err + } + fmt.Println("In invite events range, room version for", roomID, "is", roomVersion) + + event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false, roomVersion) if err != nil { return nil, err } diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 2db46c5db..cefad46dd 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -19,10 +19,12 @@ import ( "context" "database/sql" "encoding/json" + "errors" "sort" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" + "github.com/tidwall/gjson" "github.com/lib/pq" "github.com/matrix-org/dendrite/common" @@ -110,6 +112,7 @@ const selectStateInRangeSQL = "" + " LIMIT $8" type outputRoomEventsStatements struct { + roomVersions *roomVersionStatements insertEventStmt *sql.Stmt selectEventsStmt *sql.Stmt selectMaxEventIDStmt *sql.Stmt @@ -119,7 +122,8 @@ type outputRoomEventsStatements struct { selectStateInRangeStmt *sql.Stmt } -func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { +func (s *outputRoomEventsStatements) prepare(db *sql.DB, rvs *roomVersionStatements) (err error) { + s.roomVersions = rvs _, err = db.Exec(outputRoomEventsSchema) if err != nil { return @@ -201,9 +205,16 @@ func (s *outputRoomEventsStatements) selectStateInRange( "dels": delIDs, }).Warn("StateBetween: ignoring deleted state") } - + roomIDFromJSON := gjson.Get(string(eventBytes), "room_id") + if !roomIDFromJSON.Exists() { + return nil, nil, errors.New("room ID not in event") + } + roomVersion, err := s.roomVersions.selectRoomVersion(ctx, txn, roomIDFromJSON.String()) + if err != nil { + return nil, nil, err + } // TODO: Handle redacted events - ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false, roomVersion) if err != nil { return nil, nil, err } @@ -304,7 +315,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents( return nil, err } defer rows.Close() // nolint: errcheck - events, err := rowsToStreamEvents(rows) + events, err := rowsToStreamEvents(ctx, txn, s.roomVersions, rows) if err != nil { return nil, err } @@ -331,7 +342,7 @@ func (s *outputRoomEventsStatements) selectEarlyEvents( return nil, err } defer rows.Close() // nolint: errcheck - events, err := rowsToStreamEvents(rows) + events, err := rowsToStreamEvents(ctx, txn, s.roomVersions, rows) if err != nil { return nil, err } @@ -355,10 +366,13 @@ func (s *outputRoomEventsStatements) selectEvents( return nil, err } defer rows.Close() // nolint: errcheck - return rowsToStreamEvents(rows) + return rowsToStreamEvents(ctx, txn, s.roomVersions, rows) } -func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { +func rowsToStreamEvents( + ctx context.Context, txn *sql.Tx, roomVersions *roomVersionStatements, + rows *sql.Rows, +) ([]types.StreamEvent, error) { var result []types.StreamEvent for rows.Next() { var ( @@ -368,12 +382,21 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { sessionID *int64 txnID *string transactionID *api.TransactionID + roomVersion gomatrixserverlib.RoomVersion ) if err := rows.Scan(&streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil { return nil, err } + roomIDFromJSON := gjson.Get(string(eventBytes), "room_id") + if !roomIDFromJSON.Exists() { + return nil, errors.New("room ID not found in event") + } + roomVersion, err := roomVersions.selectRoomVersion(ctx, txn, roomIDFromJSON.String()) + if err != nil { + return nil, err + } // TODO: Handle redacted events - ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false, roomVersion) if err != nil { return nil, err } diff --git a/syncapi/storage/postgres/room_versions.go b/syncapi/storage/postgres/room_versions.go new file mode 100644 index 000000000..27d541572 --- /dev/null +++ b/syncapi/storage/postgres/room_versions.go @@ -0,0 +1,45 @@ +package postgres + +import ( + "context" + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrixserverlib" + "github.com/tidwall/gjson" +) + +type roomVersionStatements struct { + selectStateEventStmt *sql.Stmt +} + +func (s *roomVersionStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(currentRoomStateSchema) + if err != nil { + return + } + if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { + return + } + return +} + +func (s *roomVersionStatements) selectRoomVersion( + ctx context.Context, txn *sql.Tx, roomID string, +) (roomVersion gomatrixserverlib.RoomVersion, err error) { + stmt := common.TxStmt(txn, s.selectStateEventStmt) + var res []byte + err = stmt.QueryRowContext(ctx, roomID, "m.room.create", "").Scan(&res) + if err != nil { + return + } + rv := gjson.Get(string(res), "content.room_version") + if !rv.Exists() { + roomVersion = gomatrixserverlib.RoomVersionV1 + return + } + roomVersion = gomatrixserverlib.RoomVersion(rv.String()) + fmt.Println("room version for", roomID, "is", rv.String()) + return +} diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index abb5b4a4c..2e7b4923a 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -49,6 +49,7 @@ type stateDelta struct { type SyncServerDatasource struct { db *sql.DB common.PartitionOffsetStatements + roomVersions roomVersionStatements accountData accountDataStatements events outputRoomEventsStatements roomstate currentRoomStateStatements @@ -68,16 +69,19 @@ func NewSyncServerDatasource(dbDataSourceName string) (*SyncServerDatasource, er if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil { return nil, err } + if err = d.roomVersions.prepare(d.db); err != nil { + return nil, err + } if err = d.accountData.prepare(d.db); err != nil { return nil, err } - if err = d.events.prepare(d.db); err != nil { + if err = d.events.prepare(d.db, &d.roomVersions); err != nil { return nil, err } - if err := d.roomstate.prepare(d.db); err != nil { + if err := d.roomstate.prepare(d.db, &d.roomVersions); err != nil { return nil, err } - if err := d.invites.prepare(d.db); err != nil { + if err := d.invites.prepare(d.db, &d.roomVersions); err != nil { return nil, err } if err := d.topology.prepare(d.db); err != nil { @@ -229,7 +233,7 @@ func (d *SyncServerDatasource) updateRoomState( func (d *SyncServerDatasource) GetStateEvent( ctx context.Context, roomID, evType, stateKey string, ) (*gomatrixserverlib.Event, error) { - return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey) + return d.roomstate.selectStateEvent(ctx, nil, roomID, evType, stateKey) } // GetStateEventsForRoom fetches the state events for a given room. diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index ed76177be..d4388ee90 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -37,7 +37,8 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state ( state_key TEXT NOT NULL, event_json TEXT NOT NULL, membership TEXT, - added_at BIGINT, + added_at BIGINT, + room_version TEXT NOT NULL, UNIQUE (room_id, type, state_key) ); -- for event deletion @@ -82,6 +83,7 @@ const selectEventsWithEventIDsSQL = "" + " FROM syncapi_current_room_state WHERE event_id IN ($1)" type currentRoomStateStatements struct { + roomVersions *roomVersionStatements streamIDStatements *streamIDStatements upsertRoomStateStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt @@ -91,7 +93,8 @@ type currentRoomStateStatements struct { selectStateEventStmt *sql.Stmt } -func (s *currentRoomStateStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) { +func (s *currentRoomStateStatements) prepare(db *sql.DB, rvs *roomVersionStatements, streamID *streamIDStatements) (err error) { + s.roomVersions = rvs s.streamIDStatements = streamID _, err = db.Exec(currentRoomStateSchema) if err != nil { @@ -186,7 +189,11 @@ func (s *currentRoomStateStatements) selectCurrentState( } defer rows.Close() // nolint: errcheck - return rowsToEvents(rows) + if roomVersion, e := s.roomVersions.selectRoomVersion(ctx, txn, roomID); e == nil { + return rowsToEvents(rows, roomVersion) + } else { + return nil, e + } } func (s *currentRoomStateStatements) deleteRoomStateByEventID( @@ -239,10 +246,10 @@ func (s *currentRoomStateStatements) selectEventsWithEventIDs( return nil, err } defer rows.Close() // nolint: errcheck - return rowsToStreamEvents(rows) + return rowsToStreamEvents(ctx, txn, s.roomVersions, rows) } -func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { +func rowsToEvents(rows *sql.Rows, roomVersion gomatrixserverlib.RoomVersion) ([]gomatrixserverlib.Event, error) { result := []gomatrixserverlib.Event{} for rows.Next() { var eventBytes []byte @@ -250,7 +257,7 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { return nil, err } // TODO: Handle redacted events - ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false, roomVersion) if err != nil { return nil, err } @@ -260,7 +267,7 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { } func (s *currentRoomStateStatements) selectStateEvent( - ctx context.Context, roomID, evType, stateKey string, + ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string, ) (*gomatrixserverlib.Event, error) { stmt := s.selectStateEventStmt var res []byte @@ -271,6 +278,10 @@ func (s *currentRoomStateStatements) selectStateEvent( if err != nil { return nil, err } - ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false) - return &ev, err + if roomVersion, e := s.roomVersions.selectRoomVersion(ctx, txn, roomID); e == nil { + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false, roomVersion) + return &ev, err + } else { + return nil, e + } } diff --git a/syncapi/storage/sqlite3/invites_table.go b/syncapi/storage/sqlite3/invites_table.go index baf8871bd..bc728dd2c 100644 --- a/syncapi/storage/sqlite3/invites_table.go +++ b/syncapi/storage/sqlite3/invites_table.go @@ -18,6 +18,7 @@ package sqlite3 import ( "context" "database/sql" + "fmt" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/syncapi/types" @@ -54,6 +55,7 @@ const selectMaxInviteIDSQL = "" + "SELECT MAX(id) FROM syncapi_invite_events" type inviteEventsStatements struct { + roomVersions *roomVersionStatements streamIDStatements *streamIDStatements insertInviteEventStmt *sql.Stmt selectInviteEventsInRangeStmt *sql.Stmt @@ -61,7 +63,8 @@ type inviteEventsStatements struct { selectMaxInviteIDStmt *sql.Stmt } -func (s *inviteEventsStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) { +func (s *inviteEventsStatements) prepare(db *sql.DB, rvs *roomVersionStatements, streamID *streamIDStatements) (err error) { + s.roomVersions = rvs s.streamIDStatements = streamID _, err = db.Exec(inviteEventsSchema) if err != nil { @@ -124,7 +127,13 @@ func (s *inviteEventsStatements) selectInviteEventsInRange( return nil, err } - event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false) + roomVersion, err := s.roomVersions.selectRoomVersion(ctx, txn, roomID) + if err != nil { + return nil, err + } + fmt.Println("In invite events range, room version for", roomID, "is", roomVersion) + + event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false, roomVersion) if err != nil { return nil, err } diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index be8937435..56cb7cceb 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -19,10 +19,12 @@ import ( "context" "database/sql" "encoding/json" + "errors" "sort" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" + "github.com/tidwall/gjson" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/gomatrixserverlib" @@ -99,6 +101,7 @@ const selectStateInRangeSQL = "" + " LIMIT $8" // limit type outputRoomEventsStatements struct { + roomVersions *roomVersionStatements streamIDStatements *streamIDStatements insertEventStmt *sql.Stmt selectEventsStmt *sql.Stmt @@ -109,7 +112,7 @@ type outputRoomEventsStatements struct { selectStateInRangeStmt *sql.Stmt } -func (s *outputRoomEventsStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) { +func (s *outputRoomEventsStatements) prepare(db *sql.DB, rvs *roomVersionStatements, streamID *streamIDStatements) (err error) { s.streamIDStatements = streamID _, err = db.Exec(outputRoomEventsSchema) if err != nil { @@ -197,9 +200,16 @@ func (s *outputRoomEventsStatements) selectStateInRange( "dels": delIDsJSON, }).Warn("StateBetween: ignoring deleted state") } - + roomIDFromJSON := gjson.Get(string(eventBytes), "room_id") + if !roomIDFromJSON.Exists() { + return nil, nil, errors.New("room ID not in event") + } + roomVersion, err := s.roomVersions.selectRoomVersion(ctx, txn, roomIDFromJSON.String()) + if err != nil { + return nil, nil, err + } // TODO: Handle redacted events - ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false, roomVersion) if err != nil { return nil, nil, err } @@ -316,7 +326,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents( return nil, err } defer rows.Close() // nolint: errcheck - events, err := rowsToStreamEvents(rows) + events, err := rowsToStreamEvents(ctx, txn, s.roomVersions, rows) if err != nil { return nil, err } @@ -343,7 +353,7 @@ func (s *outputRoomEventsStatements) selectEarlyEvents( return nil, err } defer rows.Close() // nolint: errcheck - events, err := rowsToStreamEvents(rows) + events, err := rowsToStreamEvents(ctx, txn, s.roomVersions, rows) if err != nil { return nil, err } @@ -368,7 +378,7 @@ func (s *outputRoomEventsStatements) selectEvents( if err != nil { return nil, err } - if streamEvents, err := rowsToStreamEvents(rows); err == nil { + if streamEvents, err := rowsToStreamEvents(ctx, txn, s.roomVersions, rows); err == nil { returnEvents = append(returnEvents, streamEvents...) } rows.Close() // nolint: errcheck @@ -376,7 +386,10 @@ func (s *outputRoomEventsStatements) selectEvents( return returnEvents, nil } -func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { +func rowsToStreamEvents( + ctx context.Context, txn *sql.Tx, roomVersions *roomVersionStatements, + rows *sql.Rows, +) ([]types.StreamEvent, error) { var result []types.StreamEvent for rows.Next() { var ( @@ -390,8 +403,16 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { if err := rows.Scan(&streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil { return nil, err } + roomIDFromJSON := gjson.Get(string(eventBytes), "room_id") + if !roomIDFromJSON.Exists() { + return nil, errors.New("room ID not found in event") + } + roomVersion, err := roomVersions.selectRoomVersion(ctx, txn, roomIDFromJSON.String()) + if err != nil { + return nil, err + } // TODO: Handle redacted events - ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false, roomVersion) if err != nil { return nil, err } diff --git a/syncapi/storage/sqlite3/room_versions.go b/syncapi/storage/sqlite3/room_versions.go new file mode 100644 index 000000000..56e2cddf9 --- /dev/null +++ b/syncapi/storage/sqlite3/room_versions.go @@ -0,0 +1,45 @@ +package sqlite3 + +import ( + "context" + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrixserverlib" + "github.com/tidwall/gjson" +) + +type roomVersionStatements struct { + selectStateEventStmt *sql.Stmt +} + +func (s *roomVersionStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(currentRoomStateSchema) + if err != nil { + return + } + if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { + return + } + return +} + +func (s *roomVersionStatements) selectRoomVersion( + ctx context.Context, txn *sql.Tx, roomID string, +) (roomVersion gomatrixserverlib.RoomVersion, err error) { + stmt := common.TxStmt(txn, s.selectStateEventStmt) + var res []byte + err = stmt.QueryRowContext(ctx, roomID, "m.room.create", "").Scan(&res) + if err != nil { + return + } + rv := gjson.Get(string(res), "content.room_version") + if !rv.Exists() { + roomVersion = gomatrixserverlib.RoomVersionV1 + return + } + roomVersion = gomatrixserverlib.RoomVersion(rv.String()) + fmt.Println("room version for", roomID, "is", rv.String()) + return +} diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 0e84c8c86..67d073734 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -52,6 +52,7 @@ type stateDelta struct { type SyncServerDatasource struct { db *sql.DB common.PartitionOffsetStatements + roomVersions roomVersionStatements streamID streamIDStatements accountData accountDataStatements events outputRoomEventsStatements @@ -92,19 +93,22 @@ func (d *SyncServerDatasource) prepare() (err error) { if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil { return err } + if err = d.roomVersions.prepare(d.db); err != nil { + return err + } if err = d.streamID.prepare(d.db); err != nil { return err } if err = d.accountData.prepare(d.db, &d.streamID); err != nil { return err } - if err = d.events.prepare(d.db, &d.streamID); err != nil { + if err = d.events.prepare(d.db, &d.roomVersions, &d.streamID); err != nil { return err } - if err := d.roomstate.prepare(d.db, &d.streamID); err != nil { + if err := d.roomstate.prepare(d.db, &d.roomVersions, &d.streamID); err != nil { return err } - if err := d.invites.prepare(d.db, &d.streamID); err != nil { + if err := d.invites.prepare(d.db, &d.roomVersions, &d.streamID); err != nil { return err } if err := d.topology.prepare(d.db); err != nil { @@ -255,7 +259,7 @@ func (d *SyncServerDatasource) updateRoomState( func (d *SyncServerDatasource) GetStateEvent( ctx context.Context, roomID, evType, stateKey string, ) (*gomatrixserverlib.Event, error) { - return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey) + return d.roomstate.selectStateEvent(ctx, nil, roomID, evType, stateKey) } // GetStateEventsForRoom fetches the state events for a given room. diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go index 02da0f7e6..77f4325fc 100644 --- a/syncapi/sync/notifier_test.go +++ b/syncapi/sync/notifier_test.go @@ -77,7 +77,7 @@ func init() { "room_id": "`+roomID+`", "origin_server_ts": 12345, "event_id": "$randomMessageEvent:localhost" - }`), false) + }`), false, gomatrixserverlib.RoomVersionV1) if err != nil { panic(err) } @@ -91,7 +91,7 @@ func init() { "room_id": "`+roomID+`", "origin_server_ts": 12345, "event_id": "$aliceInviteBobEvent:localhost" - }`), false) + }`), false, gomatrixserverlib.RoomVersionV1) if err != nil { panic(err) } @@ -105,7 +105,7 @@ func init() { "room_id": "`+roomID+`", "origin_server_ts": 12345, "event_id": "$bobLeaveEvent:localhost" - }`), false) + }`), false, gomatrixserverlib.RoomVersionV1) if err != nil { panic(err) }