mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-16 11:23:11 -06:00
Hopefully finish wiring room versions into syncapi
This commit is contained in:
parent
ea2fa4a401
commit
a78ecad80c
2
go.mod
2
go.mod
|
|
@ -17,7 +17,7 @@ require (
|
|||
github.com/pkg/errors v0.8.1
|
||||
github.com/prometheus/client_golang v1.4.1
|
||||
github.com/sirupsen/logrus v1.4.2
|
||||
github.com/tidwall/gjson v1.6.0 // indirect
|
||||
github.com/tidwall/gjson v1.6.0
|
||||
github.com/tidwall/pretty v1.0.1 // indirect
|
||||
github.com/uber/jaeger-client-go v2.22.1+incompatible
|
||||
github.com/uber/jaeger-lib v2.2.0+incompatible
|
||||
|
|
|
|||
|
|
@ -48,8 +48,10 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
|
|||
membership TEXT,
|
||||
-- The serial ID of the output_room_events table when this event became
|
||||
-- part of the current state of the room.
|
||||
added_at BIGINT,
|
||||
-- Clobber based on 3-uple of room_id, type and state_key
|
||||
added_at BIGINT,
|
||||
-- The version of the room
|
||||
room_version TEXT NOT NULL,
|
||||
-- Clobber based on 3-uple of room_id, type and state_key
|
||||
CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key)
|
||||
);
|
||||
-- for event deletion
|
||||
|
|
@ -94,6 +96,7 @@ const selectEventsWithEventIDsSQL = "" +
|
|||
" FROM syncapi_current_room_state WHERE event_id = ANY($1)"
|
||||
|
||||
type currentRoomStateStatements struct {
|
||||
roomVersions *roomVersionStatements
|
||||
upsertRoomStateStmt *sql.Stmt
|
||||
deleteRoomStateByEventIDStmt *sql.Stmt
|
||||
selectRoomIDsWithMembershipStmt *sql.Stmt
|
||||
|
|
@ -103,7 +106,8 @@ type currentRoomStateStatements struct {
|
|||
selectStateEventStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
|
||||
func (s *currentRoomStateStatements) prepare(db *sql.DB, rvs *roomVersionStatements) (err error) {
|
||||
s.roomVersions = rvs
|
||||
_, err = db.Exec(currentRoomStateSchema)
|
||||
if err != nil {
|
||||
return
|
||||
|
|
@ -165,6 +169,9 @@ func (s *currentRoomStateStatements) selectRoomIDsWithMembership(
|
|||
) ([]string, error) {
|
||||
stmt := common.TxStmt(txn, s.selectRoomIDsWithMembershipStmt)
|
||||
rows, err := stmt.QueryContext(ctx, userID, membership)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -195,12 +202,19 @@ func (s *currentRoomStateStatements) selectCurrentState(
|
|||
stateFilter.ContainsURL,
|
||||
stateFilter.Limit,
|
||||
)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
|
||||
return rowsToEvents(rows)
|
||||
if roomVersion, e := s.roomVersions.selectRoomVersion(ctx, txn, roomID); e == nil {
|
||||
return rowsToEvents(rows, roomVersion)
|
||||
} else {
|
||||
return nil, e
|
||||
}
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) deleteRoomStateByEventID(
|
||||
|
|
@ -249,7 +263,7 @@ func (s *currentRoomStateStatements) selectEventsWithEventIDs(
|
|||
return nil, err
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
return rowsToStreamEvents(rows)
|
||||
return rowsToStreamEvents(ctx, txn, s.roomVersions, rows)
|
||||
}
|
||||
|
||||
func rowsToEvents(rows *sql.Rows, roomVersion gomatrixserverlib.RoomVersion) ([]gomatrixserverlib.Event, error) {
|
||||
|
|
@ -270,9 +284,9 @@ func rowsToEvents(rows *sql.Rows, roomVersion gomatrixserverlib.RoomVersion) ([]
|
|||
}
|
||||
|
||||
func (s *currentRoomStateStatements) selectStateEvent(
|
||||
ctx context.Context, roomID, evType, stateKey string,
|
||||
ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string,
|
||||
) (*gomatrixserverlib.Event, error) {
|
||||
stmt := s.selectStateEventStmt
|
||||
stmt := common.TxStmt(txn, s.selectStateEventStmt)
|
||||
var res []byte
|
||||
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
|
||||
if err == sql.ErrNoRows {
|
||||
|
|
@ -281,6 +295,10 @@ func (s *currentRoomStateStatements) selectStateEvent(
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false)
|
||||
return &ev, err
|
||||
if roomVersion, e := s.roomVersions.selectRoomVersion(ctx, txn, roomID); e == nil {
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false, roomVersion)
|
||||
return &ev, err
|
||||
} else {
|
||||
return nil, e
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ package postgres
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
|
|
@ -59,13 +60,15 @@ const selectMaxInviteIDSQL = "" +
|
|||
"SELECT MAX(id) FROM syncapi_invite_events"
|
||||
|
||||
type inviteEventsStatements struct {
|
||||
roomVersions *roomVersionStatements
|
||||
insertInviteEventStmt *sql.Stmt
|
||||
selectInviteEventsInRangeStmt *sql.Stmt
|
||||
deleteInviteEventStmt *sql.Stmt
|
||||
selectMaxInviteIDStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) {
|
||||
func (s *inviteEventsStatements) prepare(db *sql.DB, rvs *roomVersionStatements) (err error) {
|
||||
s.roomVersions = rvs
|
||||
_, err = db.Exec(inviteEventsSchema)
|
||||
if err != nil {
|
||||
return
|
||||
|
|
@ -126,7 +129,13 @@ func (s *inviteEventsStatements) selectInviteEventsInRange(
|
|||
return nil, err
|
||||
}
|
||||
|
||||
event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false)
|
||||
roomVersion, err := s.roomVersions.selectRoomVersion(ctx, txn, roomID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fmt.Println("In invite events range, room version for", roomID, "is", roomVersion)
|
||||
|
||||
event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false, roomVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,10 +19,12 @@ import (
|
|||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"sort"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/tidwall/gjson"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
|
|
@ -110,6 +112,7 @@ const selectStateInRangeSQL = "" +
|
|||
" LIMIT $8"
|
||||
|
||||
type outputRoomEventsStatements struct {
|
||||
roomVersions *roomVersionStatements
|
||||
insertEventStmt *sql.Stmt
|
||||
selectEventsStmt *sql.Stmt
|
||||
selectMaxEventIDStmt *sql.Stmt
|
||||
|
|
@ -119,7 +122,8 @@ type outputRoomEventsStatements struct {
|
|||
selectStateInRangeStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
||||
func (s *outputRoomEventsStatements) prepare(db *sql.DB, rvs *roomVersionStatements) (err error) {
|
||||
s.roomVersions = rvs
|
||||
_, err = db.Exec(outputRoomEventsSchema)
|
||||
if err != nil {
|
||||
return
|
||||
|
|
@ -201,9 +205,16 @@ func (s *outputRoomEventsStatements) selectStateInRange(
|
|||
"dels": delIDs,
|
||||
}).Warn("StateBetween: ignoring deleted state")
|
||||
}
|
||||
|
||||
roomIDFromJSON := gjson.Get(string(eventBytes), "room_id")
|
||||
if !roomIDFromJSON.Exists() {
|
||||
return nil, nil, errors.New("room ID not in event")
|
||||
}
|
||||
roomVersion, err := s.roomVersions.selectRoomVersion(ctx, txn, roomIDFromJSON.String())
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false, roomVersion)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
@ -304,7 +315,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents(
|
|||
return nil, err
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
events, err := rowsToStreamEvents(rows)
|
||||
events, err := rowsToStreamEvents(ctx, txn, s.roomVersions, rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -331,7 +342,7 @@ func (s *outputRoomEventsStatements) selectEarlyEvents(
|
|||
return nil, err
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
events, err := rowsToStreamEvents(rows)
|
||||
events, err := rowsToStreamEvents(ctx, txn, s.roomVersions, rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -355,10 +366,13 @@ func (s *outputRoomEventsStatements) selectEvents(
|
|||
return nil, err
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
return rowsToStreamEvents(rows)
|
||||
return rowsToStreamEvents(ctx, txn, s.roomVersions, rows)
|
||||
}
|
||||
|
||||
func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
||||
func rowsToStreamEvents(
|
||||
ctx context.Context, txn *sql.Tx, roomVersions *roomVersionStatements,
|
||||
rows *sql.Rows,
|
||||
) ([]types.StreamEvent, error) {
|
||||
var result []types.StreamEvent
|
||||
for rows.Next() {
|
||||
var (
|
||||
|
|
@ -368,12 +382,21 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
|||
sessionID *int64
|
||||
txnID *string
|
||||
transactionID *api.TransactionID
|
||||
roomVersion gomatrixserverlib.RoomVersion
|
||||
)
|
||||
if err := rows.Scan(&streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
roomIDFromJSON := gjson.Get(string(eventBytes), "room_id")
|
||||
if !roomIDFromJSON.Exists() {
|
||||
return nil, errors.New("room ID not found in event")
|
||||
}
|
||||
roomVersion, err := roomVersions.selectRoomVersion(ctx, txn, roomIDFromJSON.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false, roomVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
45
syncapi/storage/postgres/room_versions.go
Normal file
45
syncapi/storage/postgres/room_versions.go
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
type roomVersionStatements struct {
|
||||
selectStateEventStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *roomVersionStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(currentRoomStateSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *roomVersionStatements) selectRoomVersion(
|
||||
ctx context.Context, txn *sql.Tx, roomID string,
|
||||
) (roomVersion gomatrixserverlib.RoomVersion, err error) {
|
||||
stmt := common.TxStmt(txn, s.selectStateEventStmt)
|
||||
var res []byte
|
||||
err = stmt.QueryRowContext(ctx, roomID, "m.room.create", "").Scan(&res)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
rv := gjson.Get(string(res), "content.room_version")
|
||||
if !rv.Exists() {
|
||||
roomVersion = gomatrixserverlib.RoomVersionV1
|
||||
return
|
||||
}
|
||||
roomVersion = gomatrixserverlib.RoomVersion(rv.String())
|
||||
fmt.Println("room version for", roomID, "is", rv.String())
|
||||
return
|
||||
}
|
||||
|
|
@ -49,6 +49,7 @@ type stateDelta struct {
|
|||
type SyncServerDatasource struct {
|
||||
db *sql.DB
|
||||
common.PartitionOffsetStatements
|
||||
roomVersions roomVersionStatements
|
||||
accountData accountDataStatements
|
||||
events outputRoomEventsStatements
|
||||
roomstate currentRoomStateStatements
|
||||
|
|
@ -68,16 +69,19 @@ func NewSyncServerDatasource(dbDataSourceName string) (*SyncServerDatasource, er
|
|||
if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = d.roomVersions.prepare(d.db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = d.accountData.prepare(d.db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = d.events.prepare(d.db); err != nil {
|
||||
if err = d.events.prepare(d.db, &d.roomVersions); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := d.roomstate.prepare(d.db); err != nil {
|
||||
if err := d.roomstate.prepare(d.db, &d.roomVersions); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := d.invites.prepare(d.db); err != nil {
|
||||
if err := d.invites.prepare(d.db, &d.roomVersions); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := d.topology.prepare(d.db); err != nil {
|
||||
|
|
@ -229,7 +233,7 @@ func (d *SyncServerDatasource) updateRoomState(
|
|||
func (d *SyncServerDatasource) GetStateEvent(
|
||||
ctx context.Context, roomID, evType, stateKey string,
|
||||
) (*gomatrixserverlib.Event, error) {
|
||||
return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey)
|
||||
return d.roomstate.selectStateEvent(ctx, nil, roomID, evType, stateKey)
|
||||
}
|
||||
|
||||
// GetStateEventsForRoom fetches the state events for a given room.
|
||||
|
|
|
|||
|
|
@ -37,7 +37,8 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
|
|||
state_key TEXT NOT NULL,
|
||||
event_json TEXT NOT NULL,
|
||||
membership TEXT,
|
||||
added_at BIGINT,
|
||||
added_at BIGINT,
|
||||
room_version TEXT NOT NULL,
|
||||
UNIQUE (room_id, type, state_key)
|
||||
);
|
||||
-- for event deletion
|
||||
|
|
@ -82,6 +83,7 @@ const selectEventsWithEventIDsSQL = "" +
|
|||
" FROM syncapi_current_room_state WHERE event_id IN ($1)"
|
||||
|
||||
type currentRoomStateStatements struct {
|
||||
roomVersions *roomVersionStatements
|
||||
streamIDStatements *streamIDStatements
|
||||
upsertRoomStateStmt *sql.Stmt
|
||||
deleteRoomStateByEventIDStmt *sql.Stmt
|
||||
|
|
@ -91,7 +93,8 @@ type currentRoomStateStatements struct {
|
|||
selectStateEventStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) {
|
||||
func (s *currentRoomStateStatements) prepare(db *sql.DB, rvs *roomVersionStatements, streamID *streamIDStatements) (err error) {
|
||||
s.roomVersions = rvs
|
||||
s.streamIDStatements = streamID
|
||||
_, err = db.Exec(currentRoomStateSchema)
|
||||
if err != nil {
|
||||
|
|
@ -186,7 +189,11 @@ func (s *currentRoomStateStatements) selectCurrentState(
|
|||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
|
||||
return rowsToEvents(rows)
|
||||
if roomVersion, e := s.roomVersions.selectRoomVersion(ctx, txn, roomID); e == nil {
|
||||
return rowsToEvents(rows, roomVersion)
|
||||
} else {
|
||||
return nil, e
|
||||
}
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) deleteRoomStateByEventID(
|
||||
|
|
@ -239,10 +246,10 @@ func (s *currentRoomStateStatements) selectEventsWithEventIDs(
|
|||
return nil, err
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
return rowsToStreamEvents(rows)
|
||||
return rowsToStreamEvents(ctx, txn, s.roomVersions, rows)
|
||||
}
|
||||
|
||||
func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
|
||||
func rowsToEvents(rows *sql.Rows, roomVersion gomatrixserverlib.RoomVersion) ([]gomatrixserverlib.Event, error) {
|
||||
result := []gomatrixserverlib.Event{}
|
||||
for rows.Next() {
|
||||
var eventBytes []byte
|
||||
|
|
@ -250,7 +257,7 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
|
|||
return nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false, roomVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -260,7 +267,7 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
|
|||
}
|
||||
|
||||
func (s *currentRoomStateStatements) selectStateEvent(
|
||||
ctx context.Context, roomID, evType, stateKey string,
|
||||
ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string,
|
||||
) (*gomatrixserverlib.Event, error) {
|
||||
stmt := s.selectStateEventStmt
|
||||
var res []byte
|
||||
|
|
@ -271,6 +278,10 @@ func (s *currentRoomStateStatements) selectStateEvent(
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false)
|
||||
return &ev, err
|
||||
if roomVersion, e := s.roomVersions.selectRoomVersion(ctx, txn, roomID); e == nil {
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false, roomVersion)
|
||||
return &ev, err
|
||||
} else {
|
||||
return nil, e
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ package sqlite3
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
|
|
@ -54,6 +55,7 @@ const selectMaxInviteIDSQL = "" +
|
|||
"SELECT MAX(id) FROM syncapi_invite_events"
|
||||
|
||||
type inviteEventsStatements struct {
|
||||
roomVersions *roomVersionStatements
|
||||
streamIDStatements *streamIDStatements
|
||||
insertInviteEventStmt *sql.Stmt
|
||||
selectInviteEventsInRangeStmt *sql.Stmt
|
||||
|
|
@ -61,7 +63,8 @@ type inviteEventsStatements struct {
|
|||
selectMaxInviteIDStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *inviteEventsStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) {
|
||||
func (s *inviteEventsStatements) prepare(db *sql.DB, rvs *roomVersionStatements, streamID *streamIDStatements) (err error) {
|
||||
s.roomVersions = rvs
|
||||
s.streamIDStatements = streamID
|
||||
_, err = db.Exec(inviteEventsSchema)
|
||||
if err != nil {
|
||||
|
|
@ -124,7 +127,13 @@ func (s *inviteEventsStatements) selectInviteEventsInRange(
|
|||
return nil, err
|
||||
}
|
||||
|
||||
event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false)
|
||||
roomVersion, err := s.roomVersions.selectRoomVersion(ctx, txn, roomID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fmt.Println("In invite events range, room version for", roomID, "is", roomVersion)
|
||||
|
||||
event, err := gomatrixserverlib.NewEventFromTrustedJSON(eventJSON, false, roomVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,10 +19,12 @@ import (
|
|||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"sort"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/tidwall/gjson"
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
|
@ -99,6 +101,7 @@ const selectStateInRangeSQL = "" +
|
|||
" LIMIT $8" // limit
|
||||
|
||||
type outputRoomEventsStatements struct {
|
||||
roomVersions *roomVersionStatements
|
||||
streamIDStatements *streamIDStatements
|
||||
insertEventStmt *sql.Stmt
|
||||
selectEventsStmt *sql.Stmt
|
||||
|
|
@ -109,7 +112,7 @@ type outputRoomEventsStatements struct {
|
|||
selectStateInRangeStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *outputRoomEventsStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) {
|
||||
func (s *outputRoomEventsStatements) prepare(db *sql.DB, rvs *roomVersionStatements, streamID *streamIDStatements) (err error) {
|
||||
s.streamIDStatements = streamID
|
||||
_, err = db.Exec(outputRoomEventsSchema)
|
||||
if err != nil {
|
||||
|
|
@ -197,9 +200,16 @@ func (s *outputRoomEventsStatements) selectStateInRange(
|
|||
"dels": delIDsJSON,
|
||||
}).Warn("StateBetween: ignoring deleted state")
|
||||
}
|
||||
|
||||
roomIDFromJSON := gjson.Get(string(eventBytes), "room_id")
|
||||
if !roomIDFromJSON.Exists() {
|
||||
return nil, nil, errors.New("room ID not in event")
|
||||
}
|
||||
roomVersion, err := s.roomVersions.selectRoomVersion(ctx, txn, roomIDFromJSON.String())
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false, roomVersion)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
@ -316,7 +326,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents(
|
|||
return nil, err
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
events, err := rowsToStreamEvents(rows)
|
||||
events, err := rowsToStreamEvents(ctx, txn, s.roomVersions, rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -343,7 +353,7 @@ func (s *outputRoomEventsStatements) selectEarlyEvents(
|
|||
return nil, err
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
events, err := rowsToStreamEvents(rows)
|
||||
events, err := rowsToStreamEvents(ctx, txn, s.roomVersions, rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -368,7 +378,7 @@ func (s *outputRoomEventsStatements) selectEvents(
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if streamEvents, err := rowsToStreamEvents(rows); err == nil {
|
||||
if streamEvents, err := rowsToStreamEvents(ctx, txn, s.roomVersions, rows); err == nil {
|
||||
returnEvents = append(returnEvents, streamEvents...)
|
||||
}
|
||||
rows.Close() // nolint: errcheck
|
||||
|
|
@ -376,7 +386,10 @@ func (s *outputRoomEventsStatements) selectEvents(
|
|||
return returnEvents, nil
|
||||
}
|
||||
|
||||
func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
||||
func rowsToStreamEvents(
|
||||
ctx context.Context, txn *sql.Tx, roomVersions *roomVersionStatements,
|
||||
rows *sql.Rows,
|
||||
) ([]types.StreamEvent, error) {
|
||||
var result []types.StreamEvent
|
||||
for rows.Next() {
|
||||
var (
|
||||
|
|
@ -390,8 +403,16 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
|||
if err := rows.Scan(&streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
roomIDFromJSON := gjson.Get(string(eventBytes), "room_id")
|
||||
if !roomIDFromJSON.Exists() {
|
||||
return nil, errors.New("room ID not found in event")
|
||||
}
|
||||
roomVersion, err := roomVersions.selectRoomVersion(ctx, txn, roomIDFromJSON.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false, roomVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
45
syncapi/storage/sqlite3/room_versions.go
Normal file
45
syncapi/storage/sqlite3/room_versions.go
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
type roomVersionStatements struct {
|
||||
selectStateEventStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *roomVersionStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(currentRoomStateSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *roomVersionStatements) selectRoomVersion(
|
||||
ctx context.Context, txn *sql.Tx, roomID string,
|
||||
) (roomVersion gomatrixserverlib.RoomVersion, err error) {
|
||||
stmt := common.TxStmt(txn, s.selectStateEventStmt)
|
||||
var res []byte
|
||||
err = stmt.QueryRowContext(ctx, roomID, "m.room.create", "").Scan(&res)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
rv := gjson.Get(string(res), "content.room_version")
|
||||
if !rv.Exists() {
|
||||
roomVersion = gomatrixserverlib.RoomVersionV1
|
||||
return
|
||||
}
|
||||
roomVersion = gomatrixserverlib.RoomVersion(rv.String())
|
||||
fmt.Println("room version for", roomID, "is", rv.String())
|
||||
return
|
||||
}
|
||||
|
|
@ -52,6 +52,7 @@ type stateDelta struct {
|
|||
type SyncServerDatasource struct {
|
||||
db *sql.DB
|
||||
common.PartitionOffsetStatements
|
||||
roomVersions roomVersionStatements
|
||||
streamID streamIDStatements
|
||||
accountData accountDataStatements
|
||||
events outputRoomEventsStatements
|
||||
|
|
@ -92,19 +93,22 @@ func (d *SyncServerDatasource) prepare() (err error) {
|
|||
if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = d.roomVersions.prepare(d.db); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = d.streamID.prepare(d.db); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = d.accountData.prepare(d.db, &d.streamID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = d.events.prepare(d.db, &d.streamID); err != nil {
|
||||
if err = d.events.prepare(d.db, &d.roomVersions, &d.streamID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := d.roomstate.prepare(d.db, &d.streamID); err != nil {
|
||||
if err := d.roomstate.prepare(d.db, &d.roomVersions, &d.streamID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := d.invites.prepare(d.db, &d.streamID); err != nil {
|
||||
if err := d.invites.prepare(d.db, &d.roomVersions, &d.streamID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := d.topology.prepare(d.db); err != nil {
|
||||
|
|
@ -255,7 +259,7 @@ func (d *SyncServerDatasource) updateRoomState(
|
|||
func (d *SyncServerDatasource) GetStateEvent(
|
||||
ctx context.Context, roomID, evType, stateKey string,
|
||||
) (*gomatrixserverlib.Event, error) {
|
||||
return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey)
|
||||
return d.roomstate.selectStateEvent(ctx, nil, roomID, evType, stateKey)
|
||||
}
|
||||
|
||||
// GetStateEventsForRoom fetches the state events for a given room.
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ func init() {
|
|||
"room_id": "`+roomID+`",
|
||||
"origin_server_ts": 12345,
|
||||
"event_id": "$randomMessageEvent:localhost"
|
||||
}`), false)
|
||||
}`), false, gomatrixserverlib.RoomVersionV1)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
@ -91,7 +91,7 @@ func init() {
|
|||
"room_id": "`+roomID+`",
|
||||
"origin_server_ts": 12345,
|
||||
"event_id": "$aliceInviteBobEvent:localhost"
|
||||
}`), false)
|
||||
}`), false, gomatrixserverlib.RoomVersionV1)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
@ -105,7 +105,7 @@ func init() {
|
|||
"room_id": "`+roomID+`",
|
||||
"origin_server_ts": 12345,
|
||||
"event_id": "$bobLeaveEvent:localhost"
|
||||
}`), false)
|
||||
}`), false, gomatrixserverlib.RoomVersionV1)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue