diff --git a/.github/workflows/dendrite.yml b/.github/workflows/dendrite.yml index 30277ed3d..6b61f2f22 100644 --- a/.github/workflows/dendrite.yml +++ b/.github/workflows/dendrite.yml @@ -29,7 +29,7 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v3 - # run go test with go 1.18 + # run go test with go 1.19 test: timeout-minutes: 5 name: Unit tests (Go ${{ matrix.go }}) @@ -57,7 +57,7 @@ jobs: strategy: fail-fast: false matrix: - go: ["1.18", "1.19"] + go: ["1.19"] steps: - uses: actions/checkout@v3 - name: Setup go @@ -87,7 +87,7 @@ jobs: strategy: fail-fast: false matrix: - go: ["1.18", "1.19"] + go: ["1.19"] goos: ["linux"] goarch: ["amd64"] steps: @@ -126,56 +126,6 @@ jobs: with: jobs: ${{ toJSON(needs) }} - # run database upgrade tests - upgrade_test: - name: Upgrade tests - timeout-minutes: 20 - needs: initial-tests-done - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - name: Setup go - uses: actions/setup-go@v3 - with: - go-version: "1.18" - - uses: actions/cache@v3 - with: - path: | - ~/.cache/go-build - ~/go/pkg/mod - key: ${{ runner.os }}-go-upgrade-${{ hashFiles('**/go.sum') }} - restore-keys: | - ${{ runner.os }}-go-upgrade - - name: Build upgrade-tests - run: go build ./cmd/dendrite-upgrade-tests - - name: Test upgrade - run: ./dendrite-upgrade-tests --head . - - # run database upgrade tests, skipping over one version - upgrade_test_direct: - name: Upgrade tests from HEAD-2 - timeout-minutes: 20 - needs: initial-tests-done - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - name: Setup go - uses: actions/setup-go@v3 - with: - go-version: "1.18" - - uses: actions/cache@v3 - with: - path: | - ~/.cache/go-build - ~/go/pkg/mod - key: ${{ runner.os }}-go-upgrade-${{ hashFiles('**/go.sum') }} - restore-keys: | - ${{ runner.os }}-go-upgrade - - name: Build upgrade-tests - run: go build ./cmd/dendrite-upgrade-tests - - name: Test upgrade - run: ./dendrite-upgrade-tests -direct -from HEAD-2 --head . - # run Sytest in different variations sytest: timeout-minutes: 20 @@ -186,11 +136,6 @@ jobs: fail-fast: false matrix: include: - - label: SQLite - - - label: SQLite, full HTTP APIs - api: full-http - - label: PostgreSQL postgres: postgres @@ -239,11 +184,6 @@ jobs: fail-fast: false matrix: include: - - label: SQLite - - - label: SQLite, full HTTP APIs - api: full-http - - label: PostgreSQL postgres: Postgres @@ -316,8 +256,6 @@ jobs: needs: [ initial-tests-done, - upgrade_test, - upgrade_test_direct, sytest, complement, ] diff --git a/.golangci.yml b/.golangci.yml index a327370e1..bb8d38a8b 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -179,7 +179,6 @@ linters-settings: linters: enable: - - deadcode - errcheck - goconst - gocyclo @@ -191,10 +190,8 @@ linters: - misspell # Check code comments, whereas misspell in CI checks *.md files - nakedret - staticcheck - - structcheck - unparam - unused - - varcheck enable-all: false disable: - bodyclose diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 080d4d9fa..bcaae0c3e 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -52,6 +52,7 @@ func AddPublicRoutes( TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), + TopicMultiRoomCast: cfg.Matrix.JetStream.Prefixed(jetstream.OutputMultiRoomCast), UserAPI: userAPI, ServerName: cfg.Matrix.ServerName, } diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go index 2dc0c4843..d0ea08418 100644 --- a/clientapi/producers/syncapi.go +++ b/clientapi/producers/syncapi.go @@ -36,6 +36,7 @@ type SyncAPIProducer struct { TopicSendToDeviceEvent string TopicTypingEvent string TopicPresenceEvent string + TopicMultiRoomCast string JetStream nats.JetStreamContext ServerName gomatrixserverlib.ServerName UserAPI userapi.ClientUserAPI @@ -159,3 +160,14 @@ func (p *SyncAPIProducer) SendPresence( _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) return err } + +func (p *SyncAPIProducer) SendMultiroom( + ctx context.Context, userID string, dataType string, message []byte, +) error { + m := nats.NewMsg(p.TopicMultiRoomCast) + m.Header.Set(jetstream.UserID, userID) + m.Header.Set("type", dataType) + m.Data = message + _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) + return err +} diff --git a/clientapi/routing/multiroom.go b/clientapi/routing/multiroom.go new file mode 100644 index 000000000..14d3c29b6 --- /dev/null +++ b/clientapi/routing/multiroom.go @@ -0,0 +1,48 @@ +package routing + +import ( + "io" + "net/http" + + "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + log "github.com/sirupsen/logrus" +) + +func PostMultiroom( + req *http.Request, + device *api.Device, + producer *producers.SyncAPIProducer, + dataType string, +) util.JSONResponse { + b, err := io.ReadAll(req.Body) + if err != nil { + log.WithError(err).Errorf("failed to read request body") + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: jsonerror.InternalServerError(), + } + } + canonicalB, err := gomatrixserverlib.CanonicalJSON(b) + if err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.BadJSON("The request body is not valid canonical JSON." + err.Error()), + } + } + err = producer.SendMultiroom(req.Context(), device.UserID, dataType, canonicalB) + if err != nil { + log.WithError(err).Errorf("failed to send multiroomcast") + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: jsonerror.InternalServerError(), + } + } + return util.JSONResponse{ + Code: http.StatusOK, + JSON: struct{}{}, + } +} diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index d1b304fd9..b748cf30e 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -430,6 +430,17 @@ func Setup( }), ).Methods(http.MethodPut, http.MethodOptions) + v3mux.Handle("/multiroom/{dataType}", + httputil.MakeAuthAPI("send_multiroom", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + dataType := vars["dataType"] + return PostMultiroom(req, device, syncProducer, dataType) + }), + ).Methods(http.MethodPost, http.MethodOptions) + v3mux.Handle("/register", httputil.MakeExternalAPI("register", func(req *http.Request) util.JSONResponse { if r := rateLimits.Limit(req, nil); r != nil { return *r diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 590f0cbd9..b68ca498d 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -31,6 +31,7 @@ var ( RequestPresence = "GetPresence" OutputPresenceEvent = "OutputPresenceEvent" InputFulltextReindex = "InputFulltextReindex" + OutputMultiRoomCast = "OutputMultiRoomCast" ) var safeCharacters = regexp.MustCompile("[^A-Za-z0-9$]+") @@ -101,4 +102,9 @@ var streams = []*nats.StreamConfig{ Storage: nats.MemoryStorage, MaxAge: time.Minute * 5, }, + { + Name: OutputMultiRoomCast, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, } diff --git a/syncapi/consumers/multiroomdata.go b/syncapi/consumers/multiroomdata.go new file mode 100644 index 000000000..d58f2b185 --- /dev/null +++ b/syncapi/consumers/multiroomdata.go @@ -0,0 +1,113 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "time" + + "github.com/getsentry/sentry-go" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" + + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/storage/mrd" + "github.com/matrix-org/dendrite/syncapi/streams" + "github.com/matrix-org/dendrite/syncapi/types" +) + +// OutputMultiRoomDataConsumer consumes events that originated in the client API server. +type OutputMultiRoomDataConsumer struct { + ctx context.Context + jetstream nats.JetStreamContext + durable string + topic string + db *mrd.Queries + stream streams.StreamProvider + notifier *notifier.Notifier +} + +// NewOutputMultiRoomDataConsumer creates a new OutputMultiRoomDataConsumer consumer. Call Start() to begin consuming from room servers. +func NewOutputMultiRoomDataConsumer( + process *process.ProcessContext, + cfg *config.SyncAPI, + js nats.JetStreamContext, + q *mrd.Queries, + notifier *notifier.Notifier, + stream streams.StreamProvider, +) *OutputMultiRoomDataConsumer { + return &OutputMultiRoomDataConsumer{ + ctx: process.Context(), + jetstream: js, + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputMultiRoomCast), + durable: cfg.Matrix.JetStream.Durable("SyncAPIMultiRoomDataConsumer"), + db: q, + notifier: notifier, + stream: stream, + } +} + +func (s *OutputMultiRoomDataConsumer) Start() error { + return jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.topic, s.durable, 1, + s.onMessage, nats.DeliverAll(), nats.ManualAck(), + ) +} + +func (s *OutputMultiRoomDataConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool { + msg := msgs[0] + userID := msg.Header.Get(jetstream.UserID) + dataType := msg.Header.Get("type") + + log.WithFields(log.Fields{ + "type": dataType, + "user_id": userID, + }).Debug("Received multiroom data from client API server") + + pos, err := s.db.InsertMultiRoomData(ctx, mrd.InsertMultiRoomDataParams{ + UserID: userID, + Type: dataType, + Data: msg.Data, + }) + if err != nil { + sentry.CaptureException(err) + log.WithFields(log.Fields{ + "type": dataType, + "user_id": userID, + }).WithError(err).Errorf("could not insert multi room data") + return false + } + + rooms, err := s.db.SelectMultiRoomVisibilityRooms(ctx, mrd.SelectMultiRoomVisibilityRoomsParams{ + UserID: userID, + ExpireTs: time.Now().Unix(), + }) + if err != nil { + sentry.CaptureException(err) + log.WithFields(log.Fields{ + "type": dataType, + "user_id": userID, + }).WithError(err).Errorf("failed to select multi room visibility") + return false + } + + s.stream.Advance(types.StreamPosition(pos)) + s.notifier.OnNewMultiRoomData(types.StreamingToken{MultiRoomDataPosition: types.StreamPosition(pos)}, rooms) + + return true +} diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go index 27f7c37ba..32c6f04e0 100644 --- a/syncapi/notifier/notifier.go +++ b/syncapi/notifier/notifier.go @@ -280,6 +280,32 @@ func (n *Notifier) _sharedUsers(userID string) []string { return sharedUsers } +func (n *Notifier) OnNewMultiRoomData( + posUpdate types.StreamingToken, roomIds []string, +) { + n.lock.Lock() + defer n.lock.Unlock() + + n.currPos.ApplyUpdates(posUpdate) + usersInRoom := n._usersInRooms(roomIds) + + n._wakeupUsers(usersInRoom, nil, n.currPos) +} + +func (n *Notifier) _usersInRooms(roomIds []string) []string { + for i := range roomIds { + for _, userID := range n._joinedUsers(roomIds[i]) { + n._sharedUserMap[userID] = struct{}{} + } + } + usersInRooms := make([]string, 0, len(n._sharedUserMap)+1) + for userID := range n._sharedUserMap { + usersInRooms = append(usersInRooms, userID) + delete(n._sharedUserMap, userID) + } + return usersInRooms +} + func (n *Notifier) IsSharedUser(userA, userB string) bool { n.lock.RLock() defer n.lock.RUnlock() diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 2485934cc..3275f9b26 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -109,6 +109,7 @@ type DatabaseTransaction interface { GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) RelationsFor(ctx context.Context, roomID, eventID, relType, eventType string, from, to types.StreamPosition, backwards bool, limit int) (events []types.StreamEvent, prevBatch, nextBatch string, err error) + SelectMultiRoomData(ctx context.Context, r *types.Range, joinedRooms []string) (types.MultiRoom, error) } type Database interface { diff --git a/syncapi/storage/mrd/README.md b/syncapi/storage/mrd/README.md new file mode 100644 index 000000000..f2269169b --- /dev/null +++ b/syncapi/storage/mrd/README.md @@ -0,0 +1,3 @@ +## Multiroom storage + +please install `sqlc`: `go install github.com/kyleconroy/sqlc/cmd/sqlc@latest`. Then run `sqlc -f sqlc.yaml generate` in this directory after changing `queries.sql` or `../postgres/schema.sql` files. \ No newline at end of file diff --git a/syncapi/storage/mrd/db.go b/syncapi/storage/mrd/db.go new file mode 100644 index 000000000..8dc9794be --- /dev/null +++ b/syncapi/storage/mrd/db.go @@ -0,0 +1,138 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.15.0 + +package mrd + +import ( + "context" + "database/sql" + "fmt" +) + +type DBTX interface { + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + PrepareContext(context.Context, string) (*sql.Stmt, error) + QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) + QueryRowContext(context.Context, string, ...interface{}) *sql.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +func Prepare(ctx context.Context, db DBTX) (*Queries, error) { + q := Queries{db: db} + var err error + if q.deleteMultiRoomVisibilityStmt, err = db.PrepareContext(ctx, deleteMultiRoomVisibility); err != nil { + return nil, fmt.Errorf("error preparing query DeleteMultiRoomVisibility: %w", err) + } + if q.deleteMultiRoomVisibilityByExpireTSStmt, err = db.PrepareContext(ctx, deleteMultiRoomVisibilityByExpireTS); err != nil { + return nil, fmt.Errorf("error preparing query DeleteMultiRoomVisibilityByExpireTS: %w", err) + } + if q.insertMultiRoomDataStmt, err = db.PrepareContext(ctx, insertMultiRoomData); err != nil { + return nil, fmt.Errorf("error preparing query InsertMultiRoomData: %w", err) + } + if q.insertMultiRoomVisibilityStmt, err = db.PrepareContext(ctx, insertMultiRoomVisibility); err != nil { + return nil, fmt.Errorf("error preparing query InsertMultiRoomVisibility: %w", err) + } + if q.selectMaxIdStmt, err = db.PrepareContext(ctx, selectMaxId); err != nil { + return nil, fmt.Errorf("error preparing query SelectMaxId: %w", err) + } + if q.selectMultiRoomVisibilityRoomsStmt, err = db.PrepareContext(ctx, selectMultiRoomVisibilityRooms); err != nil { + return nil, fmt.Errorf("error preparing query SelectMultiRoomVisibilityRooms: %w", err) + } + return &q, nil +} + +func (q *Queries) Close() error { + var err error + if q.deleteMultiRoomVisibilityStmt != nil { + if cerr := q.deleteMultiRoomVisibilityStmt.Close(); cerr != nil { + err = fmt.Errorf("error closing deleteMultiRoomVisibilityStmt: %w", cerr) + } + } + if q.deleteMultiRoomVisibilityByExpireTSStmt != nil { + if cerr := q.deleteMultiRoomVisibilityByExpireTSStmt.Close(); cerr != nil { + err = fmt.Errorf("error closing deleteMultiRoomVisibilityByExpireTSStmt: %w", cerr) + } + } + if q.insertMultiRoomDataStmt != nil { + if cerr := q.insertMultiRoomDataStmt.Close(); cerr != nil { + err = fmt.Errorf("error closing insertMultiRoomDataStmt: %w", cerr) + } + } + if q.insertMultiRoomVisibilityStmt != nil { + if cerr := q.insertMultiRoomVisibilityStmt.Close(); cerr != nil { + err = fmt.Errorf("error closing insertMultiRoomVisibilityStmt: %w", cerr) + } + } + if q.selectMaxIdStmt != nil { + if cerr := q.selectMaxIdStmt.Close(); cerr != nil { + err = fmt.Errorf("error closing selectMaxIdStmt: %w", cerr) + } + } + if q.selectMultiRoomVisibilityRoomsStmt != nil { + if cerr := q.selectMultiRoomVisibilityRoomsStmt.Close(); cerr != nil { + err = fmt.Errorf("error closing selectMultiRoomVisibilityRoomsStmt: %w", cerr) + } + } + return err +} + +func (q *Queries) exec(ctx context.Context, stmt *sql.Stmt, query string, args ...interface{}) (sql.Result, error) { + switch { + case stmt != nil && q.tx != nil: + return q.tx.StmtContext(ctx, stmt).ExecContext(ctx, args...) + case stmt != nil: + return stmt.ExecContext(ctx, args...) + default: + return q.db.ExecContext(ctx, query, args...) + } +} + +func (q *Queries) query(ctx context.Context, stmt *sql.Stmt, query string, args ...interface{}) (*sql.Rows, error) { + switch { + case stmt != nil && q.tx != nil: + return q.tx.StmtContext(ctx, stmt).QueryContext(ctx, args...) + case stmt != nil: + return stmt.QueryContext(ctx, args...) + default: + return q.db.QueryContext(ctx, query, args...) + } +} + +func (q *Queries) queryRow(ctx context.Context, stmt *sql.Stmt, query string, args ...interface{}) *sql.Row { + switch { + case stmt != nil && q.tx != nil: + return q.tx.StmtContext(ctx, stmt).QueryRowContext(ctx, args...) + case stmt != nil: + return stmt.QueryRowContext(ctx, args...) + default: + return q.db.QueryRowContext(ctx, query, args...) + } +} + +type Queries struct { + db DBTX + tx *sql.Tx + deleteMultiRoomVisibilityStmt *sql.Stmt + deleteMultiRoomVisibilityByExpireTSStmt *sql.Stmt + insertMultiRoomDataStmt *sql.Stmt + insertMultiRoomVisibilityStmt *sql.Stmt + selectMaxIdStmt *sql.Stmt + selectMultiRoomVisibilityRoomsStmt *sql.Stmt +} + +func (q *Queries) WithTx(tx *sql.Tx) *Queries { + return &Queries{ + db: tx, + tx: tx, + deleteMultiRoomVisibilityStmt: q.deleteMultiRoomVisibilityStmt, + deleteMultiRoomVisibilityByExpireTSStmt: q.deleteMultiRoomVisibilityByExpireTSStmt, + insertMultiRoomDataStmt: q.insertMultiRoomDataStmt, + insertMultiRoomVisibilityStmt: q.insertMultiRoomVisibilityStmt, + selectMaxIdStmt: q.selectMaxIdStmt, + selectMultiRoomVisibilityRoomsStmt: q.selectMultiRoomVisibilityRoomsStmt, + } +} diff --git a/syncapi/storage/mrd/models.go b/syncapi/storage/mrd/models.go new file mode 100644 index 000000000..5f61c2edf --- /dev/null +++ b/syncapi/storage/mrd/models.go @@ -0,0 +1,24 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.15.0 + +package mrd + +import ( + "time" +) + +type SyncapiMultiroomDatum struct { + ID int64 `json:"id"` + UserID string `json:"user_id"` + Type string `json:"type"` + Data []byte `json:"data"` + Ts time.Time `json:"ts"` +} + +type SyncapiMultiroomVisibility struct { + UserID string `json:"user_id"` + Type string `json:"type"` + RoomID string `json:"room_id"` + ExpireTs int64 `json:"expire_ts"` +} diff --git a/syncapi/storage/mrd/queries.sql b/syncapi/storage/mrd/queries.sql new file mode 100644 index 000000000..76d6e6578 --- /dev/null +++ b/syncapi/storage/mrd/queries.sql @@ -0,0 +1,44 @@ +-- name: InsertMultiRoomData :one +INSERT INTO syncapi_multiroom_data ( + user_id, + type, + data +) VALUES ( + $1, + $2, + $3 +) ON CONFLICT (user_id, type) DO UPDATE SET id = nextval('syncapi_multiroom_id'), data = $3, ts = current_timestamp +RETURNING id; + + +-- name: InsertMultiRoomVisibility :exec +INSERT INTO syncapi_multiroom_visibility ( + user_id, + type, + room_id, + expire_ts +) VALUES ( + $1, + $2, + $3, + $4 +) ON CONFLICT (user_id, type, room_id) DO UPDATE SET expire_ts = $4; + +-- name: SelectMultiRoomVisibilityRooms :many +SELECT room_id FROM syncapi_multiroom_visibility +WHERE user_id = $1 +AND expire_ts > $2; + + +-- name: SelectMaxId :one +SELECT MAX(id) FROM syncapi_multiroom_data; + +-- name: DeleteMultiRoomVisibility :exec +DELETE FROM syncapi_multiroom_visibility +WHERE user_id = $1 +AND type = $2 +AND room_id = $3; + +-- name: DeleteMultiRoomVisibilityByExpireTS :execrows +DELETE FROM syncapi_multiroom_visibility +WHERE expire_ts <= $1; \ No newline at end of file diff --git a/syncapi/storage/mrd/queries.sql.go b/syncapi/storage/mrd/queries.sql.go new file mode 100644 index 000000000..fb20b0096 --- /dev/null +++ b/syncapi/storage/mrd/queries.sql.go @@ -0,0 +1,143 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.15.0 +// source: queries.sql + +package mrd + +import ( + "context" +) + +const deleteMultiRoomVisibility = `-- name: DeleteMultiRoomVisibility :exec +DELETE FROM syncapi_multiroom_visibility +WHERE user_id = $1 +AND type = $2 +AND room_id = $3 +` + +type DeleteMultiRoomVisibilityParams struct { + UserID string `json:"user_id"` + Type string `json:"type"` + RoomID string `json:"room_id"` +} + +func (q *Queries) DeleteMultiRoomVisibility(ctx context.Context, arg DeleteMultiRoomVisibilityParams) error { + _, err := q.exec(ctx, q.deleteMultiRoomVisibilityStmt, deleteMultiRoomVisibility, arg.UserID, arg.Type, arg.RoomID) + return err +} + +const deleteMultiRoomVisibilityByExpireTS = `-- name: DeleteMultiRoomVisibilityByExpireTS :execrows +DELETE FROM syncapi_multiroom_visibility +WHERE expire_ts <= $1 +` + +func (q *Queries) DeleteMultiRoomVisibilityByExpireTS(ctx context.Context, expireTs int64) (int64, error) { + result, err := q.exec(ctx, q.deleteMultiRoomVisibilityByExpireTSStmt, deleteMultiRoomVisibilityByExpireTS, expireTs) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + +const insertMultiRoomData = `-- name: InsertMultiRoomData :one +INSERT INTO syncapi_multiroom_data ( + user_id, + type, + data +) VALUES ( + $1, + $2, + $3 +) ON CONFLICT (user_id, type) DO UPDATE SET id = nextval('syncapi_multiroom_id'), data = $3, ts = current_timestamp +RETURNING id +` + +type InsertMultiRoomDataParams struct { + UserID string `json:"user_id"` + Type string `json:"type"` + Data []byte `json:"data"` +} + +func (q *Queries) InsertMultiRoomData(ctx context.Context, arg InsertMultiRoomDataParams) (int64, error) { + row := q.queryRow(ctx, q.insertMultiRoomDataStmt, insertMultiRoomData, arg.UserID, arg.Type, arg.Data) + var id int64 + err := row.Scan(&id) + return id, err +} + +const insertMultiRoomVisibility = `-- name: InsertMultiRoomVisibility :exec +INSERT INTO syncapi_multiroom_visibility ( + user_id, + type, + room_id, + expire_ts +) VALUES ( + $1, + $2, + $3, + $4 +) ON CONFLICT (user_id, type, room_id) DO UPDATE SET expire_ts = $4 +` + +type InsertMultiRoomVisibilityParams struct { + UserID string `json:"user_id"` + Type string `json:"type"` + RoomID string `json:"room_id"` + ExpireTs int64 `json:"expire_ts"` +} + +func (q *Queries) InsertMultiRoomVisibility(ctx context.Context, arg InsertMultiRoomVisibilityParams) error { + _, err := q.exec(ctx, q.insertMultiRoomVisibilityStmt, insertMultiRoomVisibility, + arg.UserID, + arg.Type, + arg.RoomID, + arg.ExpireTs, + ) + return err +} + +const selectMaxId = `-- name: SelectMaxId :one +SELECT MAX(id) FROM syncapi_multiroom_data +` + +func (q *Queries) SelectMaxId(ctx context.Context) (interface{}, error) { + row := q.queryRow(ctx, q.selectMaxIdStmt, selectMaxId) + var max interface{} + err := row.Scan(&max) + return max, err +} + +const selectMultiRoomVisibilityRooms = `-- name: SelectMultiRoomVisibilityRooms :many +SELECT room_id FROM syncapi_multiroom_visibility +WHERE user_id = $1 +AND expire_ts > $2 +` + +type SelectMultiRoomVisibilityRoomsParams struct { + UserID string `json:"user_id"` + ExpireTs int64 `json:"expire_ts"` +} + +func (q *Queries) SelectMultiRoomVisibilityRooms(ctx context.Context, arg SelectMultiRoomVisibilityRoomsParams) ([]string, error) { + rows, err := q.query(ctx, q.selectMultiRoomVisibilityRoomsStmt, selectMultiRoomVisibilityRooms, arg.UserID, arg.ExpireTs) + if err != nil { + return nil, err + } + defer rows.Close() + var items []string + for rows.Next() { + var room_id string + if err := rows.Scan(&room_id); err != nil { + return nil, err + } + items = append(items, room_id) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/syncapi/storage/mrd/sqlc.yaml b/syncapi/storage/mrd/sqlc.yaml new file mode 100644 index 000000000..080407e19 --- /dev/null +++ b/syncapi/storage/mrd/sqlc.yaml @@ -0,0 +1,8 @@ +version: 1 +packages: + - path: ../mrd + engine: postgresql + schema: ../postgres/schema.sql + queries: queries.sql + emit_json_tags: true + emit_prepared_queries: true \ No newline at end of file diff --git a/syncapi/storage/mrd/types.go b/syncapi/storage/mrd/types.go new file mode 100644 index 000000000..a1bf76f81 --- /dev/null +++ b/syncapi/storage/mrd/types.go @@ -0,0 +1,6 @@ +package mrd + +type StateEvent struct { + Hidden bool `json:"hidden"` + ExpireTs int `json:"expire_ts"` +} diff --git a/syncapi/storage/postgres/multiroomcast_table.go b/syncapi/storage/postgres/multiroomcast_table.go new file mode 100644 index 000000000..7c819433d --- /dev/null +++ b/syncapi/storage/postgres/multiroomcast_table.go @@ -0,0 +1,61 @@ +package postgres + +import ( + "context" + "database/sql" + _ "embed" + "fmt" + "time" + + "github.com/lib/pq" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/syncapi/storage/tables" + "github.com/matrix-org/dendrite/syncapi/types" +) + +//go:embed schema.sql +var schema string + +var selectMultiRoomCastSQL = `SELECT d.user_id, d.type, d.data, d.ts FROM syncapi_multiroom_data AS d +JOIN syncapi_multiroom_visibility AS v +ON d.user_id = v.user_id +AND d.type = v.type +WHERE v.room_id = ANY($1) +AND id > $2 +AND id <= $3` + +type multiRoomStatements struct { + selectMultiRoomCast *sql.Stmt +} + +func NewPostgresMultiRoomCastTable(db *sql.DB) (tables.MultiRoom, error) { + r := &multiRoomStatements{} + _, err := db.Exec(schema) + if err != nil { + return nil, err + } + return r, sqlutil.StatementList{ + {&r.selectMultiRoomCast, selectMultiRoomCastSQL}, + }.Prepare(db) +} + +func (s *multiRoomStatements) SelectMultiRoomData(ctx context.Context, r *types.Range, joinedRooms []string, txn *sql.Tx) ([]*types.MultiRoomDataRow, error) { + rows, err := sqlutil.TxStmt(txn, s.selectMultiRoomCast).QueryContext(ctx, pq.StringArray(joinedRooms), r.Low(), r.High()) + if err != nil { + return nil, err + } + data := make([]*types.MultiRoomDataRow, 0) + defer internal.CloseAndLogIfError(ctx, rows, "SelectMultiRoomData: rows.close() failed") + var t time.Time + for rows.Next() { + r := types.MultiRoomDataRow{} + err = rows.Scan(&r.UserId, &r.Type, &r.Data, &t) + r.Timestamp = t.Unix() + if err != nil { + return nil, fmt.Errorf("rows scan: %w", err) + } + data = append(data, &r) + } + return data, rows.Err() +} diff --git a/syncapi/storage/postgres/schema.sql b/syncapi/storage/postgres/schema.sql new file mode 100644 index 000000000..5702fba9d --- /dev/null +++ b/syncapi/storage/postgres/schema.sql @@ -0,0 +1,19 @@ +CREATE SEQUENCE IF NOT EXISTS syncapi_multiroom_id; + +CREATE TABLE IF NOT EXISTS syncapi_multiroom_data ( + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_multiroom_id'), + user_id TEXT NOT NULL, + type TEXT NOT NULL, + data BYTEA NOT NULL, + ts TIMESTAMP NOT NULL DEFAULT current_timestamp +); + +CREATE UNIQUE INDEX IF NOT EXISTS syncapi_multiroom_data_user_id_type_idx ON syncapi_multiroom_data(user_id, type); + +CREATE TABLE IF NOT EXISTS syncapi_multiroom_visibility ( + user_id TEXT NOT NULL, + type TEXT NOT NULL, + room_id TEXT NOT NULL, + expire_ts BIGINT NOT NULL DEFAULT 0, + PRIMARY KEY(user_id, type, room_id) +) diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 850d24a07..2151b0032 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/syncapi/storage/mrd" "github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/syncapi/storage/shared" ) @@ -102,6 +103,11 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) if err != nil { return nil, err } + mr, err := NewPostgresMultiRoomCastTable(d.db) + if err != nil { + return nil, err + } + mrq := mrd.New(d.db) // apply migrations which need multiple tables m := sqlutil.NewMigrator(d.db) @@ -134,6 +140,8 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) Ignores: ignores, Presence: presence, Relations: relations, + MultiRoom: mr, + MultiRoomQ: mrq, } return &d, nil } diff --git a/syncapi/storage/shared/storage_consumer.go b/syncapi/storage/shared/storage_consumer.go index 8f81205de..b7458c7bd 100644 --- a/syncapi/storage/shared/storage_consumer.go +++ b/syncapi/storage/shared/storage_consumer.go @@ -19,6 +19,7 @@ import ( "database/sql" "encoding/json" "fmt" + "strings" "github.com/tidwall/gjson" @@ -30,6 +31,7 @@ import ( "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/storage/mrd" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" ) @@ -54,6 +56,8 @@ type Database struct { Ignores tables.Ignores Presence tables.Presence Relations tables.Relations + MultiRoomQ *mrd.Queries + MultiRoom tables.MultiRoom } func (d *Database) NewDatabaseSnapshot(ctx context.Context) (*DatabaseTransaction, error) { @@ -336,6 +340,13 @@ func (d *Database) updateRoomState( } } + if strings.HasPrefix(event.Type(), "connect.mrd") { + err := d.UpdateMultiRoomVisibility(ctx, event) + if err != nil { + logrus.WithError(err).WithField("event_id", event.EventID()).Error("failed to update multi room visibility") + } + } + if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil { return fmt.Errorf("d.CurrentRoomState.UpsertRoomState: %w", err) } @@ -633,3 +644,33 @@ func (d *Database) PresenceAfter(ctx context.Context, after types.StreamPosition func (s *Database) UpdateLastActive(ctx context.Context, userId string, lastActiveTs uint64) error { return s.Presence.UpdateLastActive(ctx, userId, lastActiveTs) } + +func (d *Database) UpdateMultiRoomVisibility(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error { + var mrdEv mrd.StateEvent + err := json.Unmarshal(event.Content(), &mrdEv) + if err != nil { + return fmt.Errorf("unmarshal multiroom visibility failed: %w", err) + } + if mrdEv.Hidden { + err = d.MultiRoomQ.DeleteMultiRoomVisibility(ctx, mrd.DeleteMultiRoomVisibilityParams{ + UserID: event.Sender(), + Type: event.Type(), + RoomID: event.RoomID(), + }) + if err != nil { + return fmt.Errorf("delete multiroom visibility failed: %w", err) + } + } + if mrdEv.ExpireTs > 0 { + err = d.MultiRoomQ.InsertMultiRoomVisibility(ctx, mrd.InsertMultiRoomVisibilityParams{ + UserID: event.Sender(), + Type: event.Type(), + RoomID: event.RoomID(), + ExpireTs: int64(mrdEv.ExpireTs), + }) + if err != nil { + return fmt.Errorf("insert multiroom visibility failed: %w", err) + } + } + return nil +} diff --git a/syncapi/storage/shared/storage_sync.go b/syncapi/storage/shared/storage_sync.go index 1f66ccc0e..f0896edab 100644 --- a/syncapi/storage/shared/storage_sync.go +++ b/syncapi/storage/shared/storage_sync.go @@ -688,3 +688,22 @@ func (d *DatabaseTransaction) RelationsFor(ctx context.Context, roomID, eventID, return events, prevBatch, nextBatch, nil } + +func (d *DatabaseTransaction) SelectMultiRoomData(ctx context.Context, r *types.Range, joinedRooms []string) (types.MultiRoom, error) { + rows, err := d.MultiRoom.SelectMultiRoomData(ctx, r, joinedRooms, d.txn) + if err != nil { + return nil, fmt.Errorf("select multi room data: %w", err) + } + mr := make(types.MultiRoom, 3) + for _, row := range rows { + if mr[row.UserId] == nil { + mr[row.UserId] = make(map[string]types.MultiRoomData) + } + mr[row.UserId][row.Type] = types.MultiRoomData{ + Content: row.Data, + Timestamp: row.Timestamp, + } + } + return mr, nil + +} diff --git a/syncapi/storage/storage.go b/syncapi/storage/storage.go index 5b20c6cc2..a47efa38e 100644 --- a/syncapi/storage/storage.go +++ b/syncapi/storage/storage.go @@ -22,18 +22,26 @@ import ( "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/syncapi/storage/mrd" "github.com/matrix-org/dendrite/syncapi/storage/postgres" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3" ) // NewSyncServerDatasource opens a database connection. -func NewSyncServerDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) { +func NewSyncServerDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, *mrd.Queries, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(base, dbProperties) + ds, err := sqlite3.NewDatabase(base, dbProperties) + return ds, nil, err + case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewDatabase(base, dbProperties) + ds, err := postgres.NewDatabase(base, dbProperties) + if err != nil { + return nil, nil, err + } + mrq := mrd.New(ds.DB) + return ds, mrq, nil default: - return nil, fmt.Errorf("unexpected database type") + return nil, nil, fmt.Errorf("unexpected database type") } } diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 5ff185a32..74f4c830f 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -21,7 +21,7 @@ var ctx = context.Background() func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func(), func()) { connStr, close := test.PrepareDBConnectionString(t, dbType) base, closeBase := testrig.CreateBaseDendrite(t, dbType) - db, err := storage.NewSyncServerDatasource(base, &config.DatabaseOptions{ + db, _, err := storage.NewSyncServerDatasource(base, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), }) if err != nil { diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 766e3559c..20070c028 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -227,3 +227,7 @@ type Relations interface { // "from" or want to work forwards and don't have a "to"). SelectMaxRelationID(ctx context.Context, txn *sql.Tx) (id int64, err error) } + +type MultiRoom interface { + SelectMultiRoomData(ctx context.Context, r *types.Range, joinedRooms []string, txn *sql.Tx) ([]*types.MultiRoomDataRow, error) +} diff --git a/syncapi/streams/stream_multiroomdata.go b/syncapi/streams/stream_multiroomdata.go new file mode 100644 index 000000000..2ba35191f --- /dev/null +++ b/syncapi/streams/stream_multiroomdata.go @@ -0,0 +1,72 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package streams + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/storage/mrd" + "github.com/matrix-org/dendrite/syncapi/types" +) + +type MultiRoomDataStreamProvider struct { + DefaultStreamProvider + notifier *notifier.Notifier + mrdDb *mrd.Queries +} + +func (p *MultiRoomDataStreamProvider) Setup(ctx context.Context, snapshot storage.DatabaseTransaction) { + p.DefaultStreamProvider.Setup(ctx, snapshot) + + id, err := p.mrdDb.SelectMaxId(context.Background()) + if err != nil && err != sql.ErrNoRows { + panic(err) + } + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + if id == nil { + p.latest = types.StreamPosition(0) + } else { + p.latest = types.StreamPosition(id.(int64)) + } +} + +func (p *MultiRoomDataStreamProvider) CompleteSync( + ctx context.Context, + snapshot storage.DatabaseTransaction, + req *types.SyncRequest, +) types.StreamPosition { + return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx)) +} + +func (p *MultiRoomDataStreamProvider) IncrementalSync( + ctx context.Context, + snapshot storage.DatabaseTransaction, + req *types.SyncRequest, + from, to types.StreamPosition, +) types.StreamPosition { + mr, err := snapshot.SelectMultiRoomData(ctx, &types.Range{From: from, To: to}, req.JoinedRooms) + if err != nil { + req.Log.WithError(err).Error("GetUserUnreadNotificationCountsForRooms failed") + return from + } + req.Log.Tracef("MultiRoomDataStreamProvider IncrementalSync: %+v", mr) + req.Response.MultiRoom = mr + return to + +} diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 9ec2b61cd..ea1c4018c 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -77,6 +77,7 @@ func (p *PDUStreamProvider) CompleteSync( req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed") return from } + req.JoinedRooms = joinedRoomIDs stateFilter := req.Filter.Room.State eventFilter := req.Filter.Room.Timeline diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index dc8547621..8cc028bdf 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -9,6 +9,7 @@ import ( rsapi "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/storage/mrd" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" ) @@ -23,12 +24,14 @@ type Streams struct { DeviceListStreamProvider StreamProvider NotificationDataStreamProvider StreamProvider PresenceStreamProvider StreamProvider + MultiRoomStreamProvider StreamProvider } func NewSyncStreamProviders( d storage.Database, userAPI userapi.SyncUserAPI, rsAPI rsapi.SyncRoomserverAPI, keyAPI keyapi.SyncKeyAPI, eduCache *caching.EDUCache, lazyLoadCache caching.LazyLoadCache, notifier *notifier.Notifier, + mrdb *mrd.Queries, ) *Streams { streams := &Streams{ PDUStreamProvider: &PDUStreamProvider{ @@ -66,6 +69,11 @@ func NewSyncStreamProviders( DefaultStreamProvider: DefaultStreamProvider{DB: d}, notifier: notifier, }, + MultiRoomStreamProvider: &MultiRoomDataStreamProvider{ + DefaultStreamProvider: DefaultStreamProvider{DB: d}, + notifier: notifier, + mrdDb: mrdb, + }, } ctx := context.TODO() @@ -85,6 +93,7 @@ func NewSyncStreamProviders( streams.NotificationDataStreamProvider.Setup(ctx, snapshot) streams.DeviceListStreamProvider.Setup(ctx, snapshot) streams.PresenceStreamProvider.Setup(ctx, snapshot) + streams.MultiRoomStreamProvider.Setup(ctx, snapshot) succeeded = true return streams diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 921eaf91c..77fdebb10 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -407,6 +407,14 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. ) }, ), + MultiRoomDataPosition: withTransaction( + syncReq.Since.MultiRoomDataPosition, + func(txn storage.DatabaseTransaction) types.StreamPosition { + return rp.streams.MultiRoomStreamProvider.CompleteSync( + syncReq.Context, txn, syncReq, + ) + }, + ), } } else { // Incremental sync @@ -492,6 +500,15 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. ) }, ), + MultiRoomDataPosition: withTransaction( + syncReq.Since.MultiRoomDataPosition, + func(snapshot storage.DatabaseTransaction) types.StreamPosition { + return rp.streams.MultiRoomStreamProvider.IncrementalSync( + syncReq.Context, snapshot, syncReq, + syncReq.Since.PresencePosition, rp.Notifier.CurrentPosition().PresencePosition, + ) + }, + ), } // it's possible for there to be no updates for this user even though since < current pos, // e.g busy servers with a quiet user. In this scenario, we don't want to return a no-op diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 6142d1957..8a7216228 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -50,14 +50,26 @@ func AddPublicRoutes( js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) - syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database) + syncDB, mrq, err := storage.NewSyncServerDatasource(base, &cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to sync db") } + go func() { + var affected int64 + for { + affected, err = mrq.DeleteMultiRoomVisibilityByExpireTS(context.Background(), time.Now().Unix()) + if err != nil { + logrus.WithError(err).Error("failed to expire multiroom visibility") + } + logrus.WithField("rows", affected).Info("expired multiroom visibility") + time.Sleep(time.Minute) + } + }() + eduCache := caching.NewTypingCache() notifier := notifier.NewNotifier() - streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, base.Caches, notifier) + streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, base.Caches, notifier, mrq) notifier.SetCurrentPosition(streams.Latest(context.Background())) if err = notifier.Load(context.Background(), syncDB); err != nil { logrus.WithError(err).Panicf("failed to load notifier ") @@ -132,6 +144,13 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start receipts consumer") } + multiRoomConsumer := consumers.NewOutputMultiRoomDataConsumer( + base.ProcessContext, cfg, js, mrq, notifier, streams.MultiRoomStreamProvider, + ) + if err = multiRoomConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start multiroom consumer") + } + routing.Setup( base.PublicClientAPIMux, requestPool, syncDB, userAPI, rsAPI, cfg, base.Caches, base.Fulltext, diff --git a/syncapi/types/multiroom.go b/syncapi/types/multiroom.go new file mode 100644 index 000000000..f1b5d7e49 --- /dev/null +++ b/syncapi/types/multiroom.go @@ -0,0 +1,21 @@ +package types + +type MultiRoom map[string]map[string]MultiRoomData + +type MultiRoomContent []byte + +type MultiRoomData struct { + Content MultiRoomContent `json:"content"` + Timestamp int64 `json:"timestamp"` +} + +func (d MultiRoomContent) MarshalJSON() ([]byte, error) { + return d, nil +} + +type MultiRoomDataRow struct { + Data []byte + Type string + UserId string + Timestamp int64 +} diff --git a/syncapi/types/multiroom_test.go b/syncapi/types/multiroom_test.go new file mode 100644 index 000000000..7fa906974 --- /dev/null +++ b/syncapi/types/multiroom_test.go @@ -0,0 +1,21 @@ +package types + +import ( + "encoding/json" + "testing" + + "github.com/matryer/is" +) + +func TestMarshallMultiRoom(t *testing.T) { + is := is.New(t) + m, err := json.Marshal( + MultiRoom{ + "@3:example.com": map[string]MultiRoomData{ + "location": { + Content: MultiRoomContent(`{"foo":"bar"}`), + Timestamp: 123, + }}}) + is.NoErr(err) + is.Equal(m, []byte(`{"@3:example.com":{"location":{"content":{"foo":"bar"},"timestamp":123}}}`)) +} diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go index 9a533002b..f8d52b479 100644 --- a/syncapi/types/provider.go +++ b/syncapi/types/provider.go @@ -21,7 +21,8 @@ type SyncRequest struct { WantFullState bool // Updated by the PDU stream. - Rooms map[string]string + Rooms map[string]string + JoinedRooms []string // Updated by the PDU stream. MembershipChanges map[string]struct{} // Updated by the PDU stream. diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 1044c17cf..442e3400c 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -115,6 +115,7 @@ type StreamingToken struct { DeviceListPosition StreamPosition NotificationDataPosition StreamPosition PresencePosition StreamPosition + MultiRoomDataPosition StreamPosition } // This will be used as a fallback by json.Marshal. @@ -130,12 +131,12 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) { func (t StreamingToken) String() string { posStr := fmt.Sprintf( - "s%d_%d_%d_%d_%d_%d_%d_%d_%d", + "s%d_%d_%d_%d_%d_%d_%d_%d_%d_%d", t.PDUPosition, t.TypingPosition, t.ReceiptPosition, t.SendToDevicePosition, t.InvitePosition, t.AccountDataPosition, t.DeviceListPosition, t.NotificationDataPosition, - t.PresencePosition, + t.PresencePosition, t.MultiRoomDataPosition, ) return posStr } @@ -161,12 +162,14 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool { return true case t.PresencePosition > other.PresencePosition: return true + case t.MultiRoomDataPosition > other.MultiRoomDataPosition: + return true } return false } func (t *StreamingToken) IsEmpty() bool { - return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition+t.PresencePosition == 0 + return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition+t.PresencePosition+t.MultiRoomDataPosition == 0 } // WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken. @@ -210,6 +213,9 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) { if other.PresencePosition > t.PresencePosition { t.PresencePosition = other.PresencePosition } + if other.MultiRoomDataPosition > t.MultiRoomDataPosition { + t.MultiRoomDataPosition = other.MultiRoomDataPosition + } } type TopologyToken struct { @@ -304,7 +310,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { // s478_0_0_0_0_13.dl-0-2 but we have now removed partitioned stream positions tok = strings.Split(tok, ".")[0] parts := strings.Split(tok[1:], "_") - var positions [9]StreamPosition + var positions [10]StreamPosition for i, p := range parts { if i >= len(positions) { break @@ -328,6 +334,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { DeviceListPosition: positions[6], NotificationDataPosition: positions[7], PresencePosition: positions[8], + MultiRoomDataPosition: positions[9], } return token, nil } @@ -364,6 +371,7 @@ type Response struct { ToDevice *ToDeviceResponse `json:"to_device,omitempty"` DeviceLists *DeviceLists `json:"device_lists,omitempty"` DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"` + MultiRoom MultiRoom `json:"multiroom,omitempty"` } func (r Response) MarshalJSON() ([]byte, error) { diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go index 19fcfc150..08614ebb7 100644 --- a/syncapi/types/types_test.go +++ b/syncapi/types/types_test.go @@ -9,10 +9,10 @@ import ( func TestSyncTokens(t *testing.T) { shouldPass := map[string]string{ - "s4_0_0_0_0_0_0_0_3": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0, 3}.String(), - "s3_1_0_0_0_0_2_0_5": StreamingToken{3, 1, 0, 0, 0, 0, 2, 0, 5}.String(), - "s3_1_2_3_5_0_0_0_6": StreamingToken{3, 1, 2, 3, 5, 0, 0, 0, 6}.String(), - "t3_1": TopologyToken{3, 1}.String(), + "s4_0_0_0_0_0_0_0_3_0": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0, 3, 0}.String(), + "s3_1_0_0_0_0_2_0_5_1": StreamingToken{3, 1, 0, 0, 0, 0, 2, 0, 5, 1}.String(), + "s3_1_2_3_5_0_0_0_6_2": StreamingToken{3, 1, 2, 3, 5, 0, 0, 0, 6, 2}.String(), + "t3_1": TopologyToken{3, 1}.String(), } for a, b := range shouldPass { diff --git a/test/db.go b/test/db.go index c7cb919f6..1eb80fb6c 100644 --- a/test/db.go +++ b/test/db.go @@ -171,7 +171,6 @@ func PrepareDBConnectionString(t *testing.T, dbType DBType) (connStr string, clo func WithAllDatabases(t *testing.T, testFn func(t *testing.T, db DBType)) { dbs := map[string]DBType{ "postgres": DBTypePostgres, - "sqlite": DBTypeSQLite, } for dbName, dbType := range dbs { dbt := dbType diff --git a/userapi/api/api_multicast.go b/userapi/api/api_multicast.go new file mode 100644 index 000000000..e98a39a58 --- /dev/null +++ b/userapi/api/api_multicast.go @@ -0,0 +1,6 @@ +package api + +type MulticastMetadata struct { + ExpireMs int + ExcludeRoomIds []string +}