mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-06 22:43:10 -06:00
Multiroom feature (#45)
* Multiroom feature * Run multiroom visibility expiration conditionally Remove SQLite and go 1.18 for tests matrixes * Remove sqlite from unit tests * Fix linter errors * Do not build with go1.18 * Do not run upgrade tests * Fix dendrite workflow * Add forgotten content and timestamp fields to multiroom in sync response * Fix syncapi multiroom unit tests * Review adjustments in queries and naming * Remove no longer maintained linters from golangci-lint configuration * Document sqlc code generation
This commit is contained in:
parent
bc17086f63
commit
369890c5d1
68
.github/workflows/dendrite.yml
vendored
68
.github/workflows/dendrite.yml
vendored
|
|
@ -29,7 +29,7 @@ jobs:
|
|||
- name: golangci-lint
|
||||
uses: golangci/golangci-lint-action@v3
|
||||
|
||||
# run go test with go 1.18
|
||||
# run go test with go 1.19
|
||||
test:
|
||||
timeout-minutes: 5
|
||||
name: Unit tests (Go ${{ matrix.go }})
|
||||
|
|
@ -57,7 +57,7 @@ jobs:
|
|||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
go: ["1.18", "1.19"]
|
||||
go: ["1.19"]
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Setup go
|
||||
|
|
@ -87,7 +87,7 @@ jobs:
|
|||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
go: ["1.18", "1.19"]
|
||||
go: ["1.19"]
|
||||
goos: ["linux"]
|
||||
goarch: ["amd64"]
|
||||
steps:
|
||||
|
|
@ -126,56 +126,6 @@ jobs:
|
|||
with:
|
||||
jobs: ${{ toJSON(needs) }}
|
||||
|
||||
# run database upgrade tests
|
||||
upgrade_test:
|
||||
name: Upgrade tests
|
||||
timeout-minutes: 20
|
||||
needs: initial-tests-done
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Setup go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: "1.18"
|
||||
- uses: actions/cache@v3
|
||||
with:
|
||||
path: |
|
||||
~/.cache/go-build
|
||||
~/go/pkg/mod
|
||||
key: ${{ runner.os }}-go-upgrade-${{ hashFiles('**/go.sum') }}
|
||||
restore-keys: |
|
||||
${{ runner.os }}-go-upgrade
|
||||
- name: Build upgrade-tests
|
||||
run: go build ./cmd/dendrite-upgrade-tests
|
||||
- name: Test upgrade
|
||||
run: ./dendrite-upgrade-tests --head .
|
||||
|
||||
# run database upgrade tests, skipping over one version
|
||||
upgrade_test_direct:
|
||||
name: Upgrade tests from HEAD-2
|
||||
timeout-minutes: 20
|
||||
needs: initial-tests-done
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Setup go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: "1.18"
|
||||
- uses: actions/cache@v3
|
||||
with:
|
||||
path: |
|
||||
~/.cache/go-build
|
||||
~/go/pkg/mod
|
||||
key: ${{ runner.os }}-go-upgrade-${{ hashFiles('**/go.sum') }}
|
||||
restore-keys: |
|
||||
${{ runner.os }}-go-upgrade
|
||||
- name: Build upgrade-tests
|
||||
run: go build ./cmd/dendrite-upgrade-tests
|
||||
- name: Test upgrade
|
||||
run: ./dendrite-upgrade-tests -direct -from HEAD-2 --head .
|
||||
|
||||
# run Sytest in different variations
|
||||
sytest:
|
||||
timeout-minutes: 20
|
||||
|
|
@ -186,11 +136,6 @@ jobs:
|
|||
fail-fast: false
|
||||
matrix:
|
||||
include:
|
||||
- label: SQLite
|
||||
|
||||
- label: SQLite, full HTTP APIs
|
||||
api: full-http
|
||||
|
||||
- label: PostgreSQL
|
||||
postgres: postgres
|
||||
|
||||
|
|
@ -239,11 +184,6 @@ jobs:
|
|||
fail-fast: false
|
||||
matrix:
|
||||
include:
|
||||
- label: SQLite
|
||||
|
||||
- label: SQLite, full HTTP APIs
|
||||
api: full-http
|
||||
|
||||
- label: PostgreSQL
|
||||
postgres: Postgres
|
||||
|
||||
|
|
@ -316,8 +256,6 @@ jobs:
|
|||
needs:
|
||||
[
|
||||
initial-tests-done,
|
||||
upgrade_test,
|
||||
upgrade_test_direct,
|
||||
sytest,
|
||||
complement,
|
||||
]
|
||||
|
|
|
|||
|
|
@ -179,7 +179,6 @@ linters-settings:
|
|||
|
||||
linters:
|
||||
enable:
|
||||
- deadcode
|
||||
- errcheck
|
||||
- goconst
|
||||
- gocyclo
|
||||
|
|
@ -191,10 +190,8 @@ linters:
|
|||
- misspell # Check code comments, whereas misspell in CI checks *.md files
|
||||
- nakedret
|
||||
- staticcheck
|
||||
- structcheck
|
||||
- unparam
|
||||
- unused
|
||||
- varcheck
|
||||
enable-all: false
|
||||
disable:
|
||||
- bodyclose
|
||||
|
|
|
|||
|
|
@ -52,6 +52,7 @@ func AddPublicRoutes(
|
|||
TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
|
||||
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
|
||||
TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
|
||||
TopicMultiRoomCast: cfg.Matrix.JetStream.Prefixed(jetstream.OutputMultiRoomCast),
|
||||
UserAPI: userAPI,
|
||||
ServerName: cfg.Matrix.ServerName,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ type SyncAPIProducer struct {
|
|||
TopicSendToDeviceEvent string
|
||||
TopicTypingEvent string
|
||||
TopicPresenceEvent string
|
||||
TopicMultiRoomCast string
|
||||
JetStream nats.JetStreamContext
|
||||
ServerName gomatrixserverlib.ServerName
|
||||
UserAPI userapi.ClientUserAPI
|
||||
|
|
@ -159,3 +160,14 @@ func (p *SyncAPIProducer) SendPresence(
|
|||
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *SyncAPIProducer) SendMultiroom(
|
||||
ctx context.Context, userID string, dataType string, message []byte,
|
||||
) error {
|
||||
m := nats.NewMsg(p.TopicMultiRoomCast)
|
||||
m.Header.Set(jetstream.UserID, userID)
|
||||
m.Header.Set("type", dataType)
|
||||
m.Data = message
|
||||
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
48
clientapi/routing/multiroom.go
Normal file
48
clientapi/routing/multiroom.go
Normal file
|
|
@ -0,0 +1,48 @@
|
|||
package routing
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func PostMultiroom(
|
||||
req *http.Request,
|
||||
device *api.Device,
|
||||
producer *producers.SyncAPIProducer,
|
||||
dataType string,
|
||||
) util.JSONResponse {
|
||||
b, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("failed to read request body")
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusInternalServerError,
|
||||
JSON: jsonerror.InternalServerError(),
|
||||
}
|
||||
}
|
||||
canonicalB, err := gomatrixserverlib.CanonicalJSON(b)
|
||||
if err != nil {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.BadJSON("The request body is not valid canonical JSON." + err.Error()),
|
||||
}
|
||||
}
|
||||
err = producer.SendMultiroom(req.Context(), device.UserID, dataType, canonicalB)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("failed to send multiroomcast")
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusInternalServerError,
|
||||
JSON: jsonerror.InternalServerError(),
|
||||
}
|
||||
}
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: struct{}{},
|
||||
}
|
||||
}
|
||||
|
|
@ -430,6 +430,17 @@ func Setup(
|
|||
}),
|
||||
).Methods(http.MethodPut, http.MethodOptions)
|
||||
|
||||
v3mux.Handle("/multiroom/{dataType}",
|
||||
httputil.MakeAuthAPI("send_multiroom", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
|
||||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
dataType := vars["dataType"]
|
||||
return PostMultiroom(req, device, syncProducer, dataType)
|
||||
}),
|
||||
).Methods(http.MethodPost, http.MethodOptions)
|
||||
|
||||
v3mux.Handle("/register", httputil.MakeExternalAPI("register", func(req *http.Request) util.JSONResponse {
|
||||
if r := rateLimits.Limit(req, nil); r != nil {
|
||||
return *r
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ var (
|
|||
RequestPresence = "GetPresence"
|
||||
OutputPresenceEvent = "OutputPresenceEvent"
|
||||
InputFulltextReindex = "InputFulltextReindex"
|
||||
OutputMultiRoomCast = "OutputMultiRoomCast"
|
||||
)
|
||||
|
||||
var safeCharacters = regexp.MustCompile("[^A-Za-z0-9$]+")
|
||||
|
|
@ -101,4 +102,9 @@ var streams = []*nats.StreamConfig{
|
|||
Storage: nats.MemoryStorage,
|
||||
MaxAge: time.Minute * 5,
|
||||
},
|
||||
{
|
||||
Name: OutputMultiRoomCast,
|
||||
Retention: nats.InterestPolicy,
|
||||
Storage: nats.FileStorage,
|
||||
},
|
||||
}
|
||||
|
|
|
|||
113
syncapi/consumers/multiroomdata.go
Normal file
113
syncapi/consumers/multiroomdata.go
Normal file
|
|
@ -0,0 +1,113 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package consumers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/nats-io/nats.go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/mrd"
|
||||
"github.com/matrix-org/dendrite/syncapi/streams"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
||||
// OutputMultiRoomDataConsumer consumes events that originated in the client API server.
|
||||
type OutputMultiRoomDataConsumer struct {
|
||||
ctx context.Context
|
||||
jetstream nats.JetStreamContext
|
||||
durable string
|
||||
topic string
|
||||
db *mrd.Queries
|
||||
stream streams.StreamProvider
|
||||
notifier *notifier.Notifier
|
||||
}
|
||||
|
||||
// NewOutputMultiRoomDataConsumer creates a new OutputMultiRoomDataConsumer consumer. Call Start() to begin consuming from room servers.
|
||||
func NewOutputMultiRoomDataConsumer(
|
||||
process *process.ProcessContext,
|
||||
cfg *config.SyncAPI,
|
||||
js nats.JetStreamContext,
|
||||
q *mrd.Queries,
|
||||
notifier *notifier.Notifier,
|
||||
stream streams.StreamProvider,
|
||||
) *OutputMultiRoomDataConsumer {
|
||||
return &OutputMultiRoomDataConsumer{
|
||||
ctx: process.Context(),
|
||||
jetstream: js,
|
||||
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputMultiRoomCast),
|
||||
durable: cfg.Matrix.JetStream.Durable("SyncAPIMultiRoomDataConsumer"),
|
||||
db: q,
|
||||
notifier: notifier,
|
||||
stream: stream,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *OutputMultiRoomDataConsumer) Start() error {
|
||||
return jetstream.JetStreamConsumer(
|
||||
s.ctx, s.jetstream, s.topic, s.durable, 1,
|
||||
s.onMessage, nats.DeliverAll(), nats.ManualAck(),
|
||||
)
|
||||
}
|
||||
|
||||
func (s *OutputMultiRoomDataConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||
msg := msgs[0]
|
||||
userID := msg.Header.Get(jetstream.UserID)
|
||||
dataType := msg.Header.Get("type")
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"type": dataType,
|
||||
"user_id": userID,
|
||||
}).Debug("Received multiroom data from client API server")
|
||||
|
||||
pos, err := s.db.InsertMultiRoomData(ctx, mrd.InsertMultiRoomDataParams{
|
||||
UserID: userID,
|
||||
Type: dataType,
|
||||
Data: msg.Data,
|
||||
})
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.WithFields(log.Fields{
|
||||
"type": dataType,
|
||||
"user_id": userID,
|
||||
}).WithError(err).Errorf("could not insert multi room data")
|
||||
return false
|
||||
}
|
||||
|
||||
rooms, err := s.db.SelectMultiRoomVisibilityRooms(ctx, mrd.SelectMultiRoomVisibilityRoomsParams{
|
||||
UserID: userID,
|
||||
ExpireTs: time.Now().Unix(),
|
||||
})
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.WithFields(log.Fields{
|
||||
"type": dataType,
|
||||
"user_id": userID,
|
||||
}).WithError(err).Errorf("failed to select multi room visibility")
|
||||
return false
|
||||
}
|
||||
|
||||
s.stream.Advance(types.StreamPosition(pos))
|
||||
s.notifier.OnNewMultiRoomData(types.StreamingToken{MultiRoomDataPosition: types.StreamPosition(pos)}, rooms)
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
@ -280,6 +280,32 @@ func (n *Notifier) _sharedUsers(userID string) []string {
|
|||
return sharedUsers
|
||||
}
|
||||
|
||||
func (n *Notifier) OnNewMultiRoomData(
|
||||
posUpdate types.StreamingToken, roomIds []string,
|
||||
) {
|
||||
n.lock.Lock()
|
||||
defer n.lock.Unlock()
|
||||
|
||||
n.currPos.ApplyUpdates(posUpdate)
|
||||
usersInRoom := n._usersInRooms(roomIds)
|
||||
|
||||
n._wakeupUsers(usersInRoom, nil, n.currPos)
|
||||
}
|
||||
|
||||
func (n *Notifier) _usersInRooms(roomIds []string) []string {
|
||||
for i := range roomIds {
|
||||
for _, userID := range n._joinedUsers(roomIds[i]) {
|
||||
n._sharedUserMap[userID] = struct{}{}
|
||||
}
|
||||
}
|
||||
usersInRooms := make([]string, 0, len(n._sharedUserMap)+1)
|
||||
for userID := range n._sharedUserMap {
|
||||
usersInRooms = append(usersInRooms, userID)
|
||||
delete(n._sharedUserMap, userID)
|
||||
}
|
||||
return usersInRooms
|
||||
}
|
||||
|
||||
func (n *Notifier) IsSharedUser(userA, userB string) bool {
|
||||
n.lock.RLock()
|
||||
defer n.lock.RUnlock()
|
||||
|
|
|
|||
|
|
@ -109,6 +109,7 @@ type DatabaseTransaction interface {
|
|||
GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error)
|
||||
PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error)
|
||||
RelationsFor(ctx context.Context, roomID, eventID, relType, eventType string, from, to types.StreamPosition, backwards bool, limit int) (events []types.StreamEvent, prevBatch, nextBatch string, err error)
|
||||
SelectMultiRoomData(ctx context.Context, r *types.Range, joinedRooms []string) (types.MultiRoom, error)
|
||||
}
|
||||
|
||||
type Database interface {
|
||||
|
|
|
|||
3
syncapi/storage/mrd/README.md
Normal file
3
syncapi/storage/mrd/README.md
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
## Multiroom storage
|
||||
|
||||
please install `sqlc`: `go install github.com/kyleconroy/sqlc/cmd/sqlc@latest`. Then run `sqlc -f sqlc.yaml generate` in this directory after changing `queries.sql` or `../postgres/schema.sql` files.
|
||||
138
syncapi/storage/mrd/db.go
Normal file
138
syncapi/storage/mrd/db.go
Normal file
|
|
@ -0,0 +1,138 @@
|
|||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.15.0
|
||||
|
||||
package mrd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type DBTX interface {
|
||||
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
|
||||
PrepareContext(context.Context, string) (*sql.Stmt, error)
|
||||
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(context.Context, string, ...interface{}) *sql.Row
|
||||
}
|
||||
|
||||
func New(db DBTX) *Queries {
|
||||
return &Queries{db: db}
|
||||
}
|
||||
|
||||
func Prepare(ctx context.Context, db DBTX) (*Queries, error) {
|
||||
q := Queries{db: db}
|
||||
var err error
|
||||
if q.deleteMultiRoomVisibilityStmt, err = db.PrepareContext(ctx, deleteMultiRoomVisibility); err != nil {
|
||||
return nil, fmt.Errorf("error preparing query DeleteMultiRoomVisibility: %w", err)
|
||||
}
|
||||
if q.deleteMultiRoomVisibilityByExpireTSStmt, err = db.PrepareContext(ctx, deleteMultiRoomVisibilityByExpireTS); err != nil {
|
||||
return nil, fmt.Errorf("error preparing query DeleteMultiRoomVisibilityByExpireTS: %w", err)
|
||||
}
|
||||
if q.insertMultiRoomDataStmt, err = db.PrepareContext(ctx, insertMultiRoomData); err != nil {
|
||||
return nil, fmt.Errorf("error preparing query InsertMultiRoomData: %w", err)
|
||||
}
|
||||
if q.insertMultiRoomVisibilityStmt, err = db.PrepareContext(ctx, insertMultiRoomVisibility); err != nil {
|
||||
return nil, fmt.Errorf("error preparing query InsertMultiRoomVisibility: %w", err)
|
||||
}
|
||||
if q.selectMaxIdStmt, err = db.PrepareContext(ctx, selectMaxId); err != nil {
|
||||
return nil, fmt.Errorf("error preparing query SelectMaxId: %w", err)
|
||||
}
|
||||
if q.selectMultiRoomVisibilityRoomsStmt, err = db.PrepareContext(ctx, selectMultiRoomVisibilityRooms); err != nil {
|
||||
return nil, fmt.Errorf("error preparing query SelectMultiRoomVisibilityRooms: %w", err)
|
||||
}
|
||||
return &q, nil
|
||||
}
|
||||
|
||||
func (q *Queries) Close() error {
|
||||
var err error
|
||||
if q.deleteMultiRoomVisibilityStmt != nil {
|
||||
if cerr := q.deleteMultiRoomVisibilityStmt.Close(); cerr != nil {
|
||||
err = fmt.Errorf("error closing deleteMultiRoomVisibilityStmt: %w", cerr)
|
||||
}
|
||||
}
|
||||
if q.deleteMultiRoomVisibilityByExpireTSStmt != nil {
|
||||
if cerr := q.deleteMultiRoomVisibilityByExpireTSStmt.Close(); cerr != nil {
|
||||
err = fmt.Errorf("error closing deleteMultiRoomVisibilityByExpireTSStmt: %w", cerr)
|
||||
}
|
||||
}
|
||||
if q.insertMultiRoomDataStmt != nil {
|
||||
if cerr := q.insertMultiRoomDataStmt.Close(); cerr != nil {
|
||||
err = fmt.Errorf("error closing insertMultiRoomDataStmt: %w", cerr)
|
||||
}
|
||||
}
|
||||
if q.insertMultiRoomVisibilityStmt != nil {
|
||||
if cerr := q.insertMultiRoomVisibilityStmt.Close(); cerr != nil {
|
||||
err = fmt.Errorf("error closing insertMultiRoomVisibilityStmt: %w", cerr)
|
||||
}
|
||||
}
|
||||
if q.selectMaxIdStmt != nil {
|
||||
if cerr := q.selectMaxIdStmt.Close(); cerr != nil {
|
||||
err = fmt.Errorf("error closing selectMaxIdStmt: %w", cerr)
|
||||
}
|
||||
}
|
||||
if q.selectMultiRoomVisibilityRoomsStmt != nil {
|
||||
if cerr := q.selectMultiRoomVisibilityRoomsStmt.Close(); cerr != nil {
|
||||
err = fmt.Errorf("error closing selectMultiRoomVisibilityRoomsStmt: %w", cerr)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (q *Queries) exec(ctx context.Context, stmt *sql.Stmt, query string, args ...interface{}) (sql.Result, error) {
|
||||
switch {
|
||||
case stmt != nil && q.tx != nil:
|
||||
return q.tx.StmtContext(ctx, stmt).ExecContext(ctx, args...)
|
||||
case stmt != nil:
|
||||
return stmt.ExecContext(ctx, args...)
|
||||
default:
|
||||
return q.db.ExecContext(ctx, query, args...)
|
||||
}
|
||||
}
|
||||
|
||||
func (q *Queries) query(ctx context.Context, stmt *sql.Stmt, query string, args ...interface{}) (*sql.Rows, error) {
|
||||
switch {
|
||||
case stmt != nil && q.tx != nil:
|
||||
return q.tx.StmtContext(ctx, stmt).QueryContext(ctx, args...)
|
||||
case stmt != nil:
|
||||
return stmt.QueryContext(ctx, args...)
|
||||
default:
|
||||
return q.db.QueryContext(ctx, query, args...)
|
||||
}
|
||||
}
|
||||
|
||||
func (q *Queries) queryRow(ctx context.Context, stmt *sql.Stmt, query string, args ...interface{}) *sql.Row {
|
||||
switch {
|
||||
case stmt != nil && q.tx != nil:
|
||||
return q.tx.StmtContext(ctx, stmt).QueryRowContext(ctx, args...)
|
||||
case stmt != nil:
|
||||
return stmt.QueryRowContext(ctx, args...)
|
||||
default:
|
||||
return q.db.QueryRowContext(ctx, query, args...)
|
||||
}
|
||||
}
|
||||
|
||||
type Queries struct {
|
||||
db DBTX
|
||||
tx *sql.Tx
|
||||
deleteMultiRoomVisibilityStmt *sql.Stmt
|
||||
deleteMultiRoomVisibilityByExpireTSStmt *sql.Stmt
|
||||
insertMultiRoomDataStmt *sql.Stmt
|
||||
insertMultiRoomVisibilityStmt *sql.Stmt
|
||||
selectMaxIdStmt *sql.Stmt
|
||||
selectMultiRoomVisibilityRoomsStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (q *Queries) WithTx(tx *sql.Tx) *Queries {
|
||||
return &Queries{
|
||||
db: tx,
|
||||
tx: tx,
|
||||
deleteMultiRoomVisibilityStmt: q.deleteMultiRoomVisibilityStmt,
|
||||
deleteMultiRoomVisibilityByExpireTSStmt: q.deleteMultiRoomVisibilityByExpireTSStmt,
|
||||
insertMultiRoomDataStmt: q.insertMultiRoomDataStmt,
|
||||
insertMultiRoomVisibilityStmt: q.insertMultiRoomVisibilityStmt,
|
||||
selectMaxIdStmt: q.selectMaxIdStmt,
|
||||
selectMultiRoomVisibilityRoomsStmt: q.selectMultiRoomVisibilityRoomsStmt,
|
||||
}
|
||||
}
|
||||
24
syncapi/storage/mrd/models.go
Normal file
24
syncapi/storage/mrd/models.go
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.15.0
|
||||
|
||||
package mrd
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type SyncapiMultiroomDatum struct {
|
||||
ID int64 `json:"id"`
|
||||
UserID string `json:"user_id"`
|
||||
Type string `json:"type"`
|
||||
Data []byte `json:"data"`
|
||||
Ts time.Time `json:"ts"`
|
||||
}
|
||||
|
||||
type SyncapiMultiroomVisibility struct {
|
||||
UserID string `json:"user_id"`
|
||||
Type string `json:"type"`
|
||||
RoomID string `json:"room_id"`
|
||||
ExpireTs int64 `json:"expire_ts"`
|
||||
}
|
||||
44
syncapi/storage/mrd/queries.sql
Normal file
44
syncapi/storage/mrd/queries.sql
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
-- name: InsertMultiRoomData :one
|
||||
INSERT INTO syncapi_multiroom_data (
|
||||
user_id,
|
||||
type,
|
||||
data
|
||||
) VALUES (
|
||||
$1,
|
||||
$2,
|
||||
$3
|
||||
) ON CONFLICT (user_id, type) DO UPDATE SET id = nextval('syncapi_multiroom_id'), data = $3, ts = current_timestamp
|
||||
RETURNING id;
|
||||
|
||||
|
||||
-- name: InsertMultiRoomVisibility :exec
|
||||
INSERT INTO syncapi_multiroom_visibility (
|
||||
user_id,
|
||||
type,
|
||||
room_id,
|
||||
expire_ts
|
||||
) VALUES (
|
||||
$1,
|
||||
$2,
|
||||
$3,
|
||||
$4
|
||||
) ON CONFLICT (user_id, type, room_id) DO UPDATE SET expire_ts = $4;
|
||||
|
||||
-- name: SelectMultiRoomVisibilityRooms :many
|
||||
SELECT room_id FROM syncapi_multiroom_visibility
|
||||
WHERE user_id = $1
|
||||
AND expire_ts > $2;
|
||||
|
||||
|
||||
-- name: SelectMaxId :one
|
||||
SELECT MAX(id) FROM syncapi_multiroom_data;
|
||||
|
||||
-- name: DeleteMultiRoomVisibility :exec
|
||||
DELETE FROM syncapi_multiroom_visibility
|
||||
WHERE user_id = $1
|
||||
AND type = $2
|
||||
AND room_id = $3;
|
||||
|
||||
-- name: DeleteMultiRoomVisibilityByExpireTS :execrows
|
||||
DELETE FROM syncapi_multiroom_visibility
|
||||
WHERE expire_ts <= $1;
|
||||
143
syncapi/storage/mrd/queries.sql.go
Normal file
143
syncapi/storage/mrd/queries.sql.go
Normal file
|
|
@ -0,0 +1,143 @@
|
|||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.15.0
|
||||
// source: queries.sql
|
||||
|
||||
package mrd
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
const deleteMultiRoomVisibility = `-- name: DeleteMultiRoomVisibility :exec
|
||||
DELETE FROM syncapi_multiroom_visibility
|
||||
WHERE user_id = $1
|
||||
AND type = $2
|
||||
AND room_id = $3
|
||||
`
|
||||
|
||||
type DeleteMultiRoomVisibilityParams struct {
|
||||
UserID string `json:"user_id"`
|
||||
Type string `json:"type"`
|
||||
RoomID string `json:"room_id"`
|
||||
}
|
||||
|
||||
func (q *Queries) DeleteMultiRoomVisibility(ctx context.Context, arg DeleteMultiRoomVisibilityParams) error {
|
||||
_, err := q.exec(ctx, q.deleteMultiRoomVisibilityStmt, deleteMultiRoomVisibility, arg.UserID, arg.Type, arg.RoomID)
|
||||
return err
|
||||
}
|
||||
|
||||
const deleteMultiRoomVisibilityByExpireTS = `-- name: DeleteMultiRoomVisibilityByExpireTS :execrows
|
||||
DELETE FROM syncapi_multiroom_visibility
|
||||
WHERE expire_ts <= $1
|
||||
`
|
||||
|
||||
func (q *Queries) DeleteMultiRoomVisibilityByExpireTS(ctx context.Context, expireTs int64) (int64, error) {
|
||||
result, err := q.exec(ctx, q.deleteMultiRoomVisibilityByExpireTSStmt, deleteMultiRoomVisibilityByExpireTS, expireTs)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return result.RowsAffected()
|
||||
}
|
||||
|
||||
const insertMultiRoomData = `-- name: InsertMultiRoomData :one
|
||||
INSERT INTO syncapi_multiroom_data (
|
||||
user_id,
|
||||
type,
|
||||
data
|
||||
) VALUES (
|
||||
$1,
|
||||
$2,
|
||||
$3
|
||||
) ON CONFLICT (user_id, type) DO UPDATE SET id = nextval('syncapi_multiroom_id'), data = $3, ts = current_timestamp
|
||||
RETURNING id
|
||||
`
|
||||
|
||||
type InsertMultiRoomDataParams struct {
|
||||
UserID string `json:"user_id"`
|
||||
Type string `json:"type"`
|
||||
Data []byte `json:"data"`
|
||||
}
|
||||
|
||||
func (q *Queries) InsertMultiRoomData(ctx context.Context, arg InsertMultiRoomDataParams) (int64, error) {
|
||||
row := q.queryRow(ctx, q.insertMultiRoomDataStmt, insertMultiRoomData, arg.UserID, arg.Type, arg.Data)
|
||||
var id int64
|
||||
err := row.Scan(&id)
|
||||
return id, err
|
||||
}
|
||||
|
||||
const insertMultiRoomVisibility = `-- name: InsertMultiRoomVisibility :exec
|
||||
INSERT INTO syncapi_multiroom_visibility (
|
||||
user_id,
|
||||
type,
|
||||
room_id,
|
||||
expire_ts
|
||||
) VALUES (
|
||||
$1,
|
||||
$2,
|
||||
$3,
|
||||
$4
|
||||
) ON CONFLICT (user_id, type, room_id) DO UPDATE SET expire_ts = $4
|
||||
`
|
||||
|
||||
type InsertMultiRoomVisibilityParams struct {
|
||||
UserID string `json:"user_id"`
|
||||
Type string `json:"type"`
|
||||
RoomID string `json:"room_id"`
|
||||
ExpireTs int64 `json:"expire_ts"`
|
||||
}
|
||||
|
||||
func (q *Queries) InsertMultiRoomVisibility(ctx context.Context, arg InsertMultiRoomVisibilityParams) error {
|
||||
_, err := q.exec(ctx, q.insertMultiRoomVisibilityStmt, insertMultiRoomVisibility,
|
||||
arg.UserID,
|
||||
arg.Type,
|
||||
arg.RoomID,
|
||||
arg.ExpireTs,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
const selectMaxId = `-- name: SelectMaxId :one
|
||||
SELECT MAX(id) FROM syncapi_multiroom_data
|
||||
`
|
||||
|
||||
func (q *Queries) SelectMaxId(ctx context.Context) (interface{}, error) {
|
||||
row := q.queryRow(ctx, q.selectMaxIdStmt, selectMaxId)
|
||||
var max interface{}
|
||||
err := row.Scan(&max)
|
||||
return max, err
|
||||
}
|
||||
|
||||
const selectMultiRoomVisibilityRooms = `-- name: SelectMultiRoomVisibilityRooms :many
|
||||
SELECT room_id FROM syncapi_multiroom_visibility
|
||||
WHERE user_id = $1
|
||||
AND expire_ts > $2
|
||||
`
|
||||
|
||||
type SelectMultiRoomVisibilityRoomsParams struct {
|
||||
UserID string `json:"user_id"`
|
||||
ExpireTs int64 `json:"expire_ts"`
|
||||
}
|
||||
|
||||
func (q *Queries) SelectMultiRoomVisibilityRooms(ctx context.Context, arg SelectMultiRoomVisibilityRoomsParams) ([]string, error) {
|
||||
rows, err := q.query(ctx, q.selectMultiRoomVisibilityRoomsStmt, selectMultiRoomVisibilityRooms, arg.UserID, arg.ExpireTs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []string
|
||||
for rows.Next() {
|
||||
var room_id string
|
||||
if err := rows.Scan(&room_id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, room_id)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
8
syncapi/storage/mrd/sqlc.yaml
Normal file
8
syncapi/storage/mrd/sqlc.yaml
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
version: 1
|
||||
packages:
|
||||
- path: ../mrd
|
||||
engine: postgresql
|
||||
schema: ../postgres/schema.sql
|
||||
queries: queries.sql
|
||||
emit_json_tags: true
|
||||
emit_prepared_queries: true
|
||||
6
syncapi/storage/mrd/types.go
Normal file
6
syncapi/storage/mrd/types.go
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
package mrd
|
||||
|
||||
type StateEvent struct {
|
||||
Hidden bool `json:"hidden"`
|
||||
ExpireTs int `json:"expire_ts"`
|
||||
}
|
||||
61
syncapi/storage/postgres/multiroomcast_table.go
Normal file
61
syncapi/storage/postgres/multiroomcast_table.go
Normal file
|
|
@ -0,0 +1,61 @@
|
|||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
_ "embed"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
||||
//go:embed schema.sql
|
||||
var schema string
|
||||
|
||||
var selectMultiRoomCastSQL = `SELECT d.user_id, d.type, d.data, d.ts FROM syncapi_multiroom_data AS d
|
||||
JOIN syncapi_multiroom_visibility AS v
|
||||
ON d.user_id = v.user_id
|
||||
AND d.type = v.type
|
||||
WHERE v.room_id = ANY($1)
|
||||
AND id > $2
|
||||
AND id <= $3`
|
||||
|
||||
type multiRoomStatements struct {
|
||||
selectMultiRoomCast *sql.Stmt
|
||||
}
|
||||
|
||||
func NewPostgresMultiRoomCastTable(db *sql.DB) (tables.MultiRoom, error) {
|
||||
r := &multiRoomStatements{}
|
||||
_, err := db.Exec(schema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r, sqlutil.StatementList{
|
||||
{&r.selectMultiRoomCast, selectMultiRoomCastSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
func (s *multiRoomStatements) SelectMultiRoomData(ctx context.Context, r *types.Range, joinedRooms []string, txn *sql.Tx) ([]*types.MultiRoomDataRow, error) {
|
||||
rows, err := sqlutil.TxStmt(txn, s.selectMultiRoomCast).QueryContext(ctx, pq.StringArray(joinedRooms), r.Low(), r.High())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
data := make([]*types.MultiRoomDataRow, 0)
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "SelectMultiRoomData: rows.close() failed")
|
||||
var t time.Time
|
||||
for rows.Next() {
|
||||
r := types.MultiRoomDataRow{}
|
||||
err = rows.Scan(&r.UserId, &r.Type, &r.Data, &t)
|
||||
r.Timestamp = t.Unix()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("rows scan: %w", err)
|
||||
}
|
||||
data = append(data, &r)
|
||||
}
|
||||
return data, rows.Err()
|
||||
}
|
||||
19
syncapi/storage/postgres/schema.sql
Normal file
19
syncapi/storage/postgres/schema.sql
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
CREATE SEQUENCE IF NOT EXISTS syncapi_multiroom_id;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS syncapi_multiroom_data (
|
||||
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_multiroom_id'),
|
||||
user_id TEXT NOT NULL,
|
||||
type TEXT NOT NULL,
|
||||
data BYTEA NOT NULL,
|
||||
ts TIMESTAMP NOT NULL DEFAULT current_timestamp
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_multiroom_data_user_id_type_idx ON syncapi_multiroom_data(user_id, type);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS syncapi_multiroom_visibility (
|
||||
user_id TEXT NOT NULL,
|
||||
type TEXT NOT NULL,
|
||||
room_id TEXT NOT NULL,
|
||||
expire_ts BIGINT NOT NULL DEFAULT 0,
|
||||
PRIMARY KEY(user_id, type, room_id)
|
||||
)
|
||||
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/setup/base"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/mrd"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
||||
)
|
||||
|
|
@ -102,6 +103,11 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mr, err := NewPostgresMultiRoomCastTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mrq := mrd.New(d.db)
|
||||
|
||||
// apply migrations which need multiple tables
|
||||
m := sqlutil.NewMigrator(d.db)
|
||||
|
|
@ -134,6 +140,8 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
|
|||
Ignores: ignores,
|
||||
Presence: presence,
|
||||
Relations: relations,
|
||||
MultiRoom: mr,
|
||||
MultiRoomQ: mrq,
|
||||
}
|
||||
return &d, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import (
|
|||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/tidwall/gjson"
|
||||
|
||||
|
|
@ -30,6 +31,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/mrd"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
|
@ -54,6 +56,8 @@ type Database struct {
|
|||
Ignores tables.Ignores
|
||||
Presence tables.Presence
|
||||
Relations tables.Relations
|
||||
MultiRoomQ *mrd.Queries
|
||||
MultiRoom tables.MultiRoom
|
||||
}
|
||||
|
||||
func (d *Database) NewDatabaseSnapshot(ctx context.Context) (*DatabaseTransaction, error) {
|
||||
|
|
@ -336,6 +340,13 @@ func (d *Database) updateRoomState(
|
|||
}
|
||||
}
|
||||
|
||||
if strings.HasPrefix(event.Type(), "connect.mrd") {
|
||||
err := d.UpdateMultiRoomVisibility(ctx, event)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithField("event_id", event.EventID()).Error("failed to update multi room visibility")
|
||||
}
|
||||
}
|
||||
|
||||
if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil {
|
||||
return fmt.Errorf("d.CurrentRoomState.UpsertRoomState: %w", err)
|
||||
}
|
||||
|
|
@ -633,3 +644,33 @@ func (d *Database) PresenceAfter(ctx context.Context, after types.StreamPosition
|
|||
func (s *Database) UpdateLastActive(ctx context.Context, userId string, lastActiveTs uint64) error {
|
||||
return s.Presence.UpdateLastActive(ctx, userId, lastActiveTs)
|
||||
}
|
||||
|
||||
func (d *Database) UpdateMultiRoomVisibility(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
|
||||
var mrdEv mrd.StateEvent
|
||||
err := json.Unmarshal(event.Content(), &mrdEv)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unmarshal multiroom visibility failed: %w", err)
|
||||
}
|
||||
if mrdEv.Hidden {
|
||||
err = d.MultiRoomQ.DeleteMultiRoomVisibility(ctx, mrd.DeleteMultiRoomVisibilityParams{
|
||||
UserID: event.Sender(),
|
||||
Type: event.Type(),
|
||||
RoomID: event.RoomID(),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete multiroom visibility failed: %w", err)
|
||||
}
|
||||
}
|
||||
if mrdEv.ExpireTs > 0 {
|
||||
err = d.MultiRoomQ.InsertMultiRoomVisibility(ctx, mrd.InsertMultiRoomVisibilityParams{
|
||||
UserID: event.Sender(),
|
||||
Type: event.Type(),
|
||||
RoomID: event.RoomID(),
|
||||
ExpireTs: int64(mrdEv.ExpireTs),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("insert multiroom visibility failed: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -688,3 +688,22 @@ func (d *DatabaseTransaction) RelationsFor(ctx context.Context, roomID, eventID,
|
|||
|
||||
return events, prevBatch, nextBatch, nil
|
||||
}
|
||||
|
||||
func (d *DatabaseTransaction) SelectMultiRoomData(ctx context.Context, r *types.Range, joinedRooms []string) (types.MultiRoom, error) {
|
||||
rows, err := d.MultiRoom.SelectMultiRoomData(ctx, r, joinedRooms, d.txn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("select multi room data: %w", err)
|
||||
}
|
||||
mr := make(types.MultiRoom, 3)
|
||||
for _, row := range rows {
|
||||
if mr[row.UserId] == nil {
|
||||
mr[row.UserId] = make(map[string]types.MultiRoomData)
|
||||
}
|
||||
mr[row.UserId][row.Type] = types.MultiRoomData{
|
||||
Content: row.Data,
|
||||
Timestamp: row.Timestamp,
|
||||
}
|
||||
}
|
||||
return mr, nil
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,18 +22,26 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/setup/base"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/mrd"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/postgres"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3"
|
||||
)
|
||||
|
||||
// NewSyncServerDatasource opens a database connection.
|
||||
func NewSyncServerDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) {
|
||||
func NewSyncServerDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, *mrd.Queries, error) {
|
||||
switch {
|
||||
case dbProperties.ConnectionString.IsSQLite():
|
||||
return sqlite3.NewDatabase(base, dbProperties)
|
||||
ds, err := sqlite3.NewDatabase(base, dbProperties)
|
||||
return ds, nil, err
|
||||
|
||||
case dbProperties.ConnectionString.IsPostgres():
|
||||
return postgres.NewDatabase(base, dbProperties)
|
||||
ds, err := postgres.NewDatabase(base, dbProperties)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
mrq := mrd.New(ds.DB)
|
||||
return ds, mrq, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected database type")
|
||||
return nil, nil, fmt.Errorf("unexpected database type")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ var ctx = context.Background()
|
|||
func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func(), func()) {
|
||||
connStr, close := test.PrepareDBConnectionString(t, dbType)
|
||||
base, closeBase := testrig.CreateBaseDendrite(t, dbType)
|
||||
db, err := storage.NewSyncServerDatasource(base, &config.DatabaseOptions{
|
||||
db, _, err := storage.NewSyncServerDatasource(base, &config.DatabaseOptions{
|
||||
ConnectionString: config.DataSource(connStr),
|
||||
})
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -227,3 +227,7 @@ type Relations interface {
|
|||
// "from" or want to work forwards and don't have a "to").
|
||||
SelectMaxRelationID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
||||
}
|
||||
|
||||
type MultiRoom interface {
|
||||
SelectMultiRoomData(ctx context.Context, r *types.Range, joinedRooms []string, txn *sql.Tx) ([]*types.MultiRoomDataRow, error)
|
||||
}
|
||||
|
|
|
|||
72
syncapi/streams/stream_multiroomdata.go
Normal file
72
syncapi/streams/stream_multiroomdata.go
Normal file
|
|
@ -0,0 +1,72 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package streams
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/mrd"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
||||
type MultiRoomDataStreamProvider struct {
|
||||
DefaultStreamProvider
|
||||
notifier *notifier.Notifier
|
||||
mrdDb *mrd.Queries
|
||||
}
|
||||
|
||||
func (p *MultiRoomDataStreamProvider) Setup(ctx context.Context, snapshot storage.DatabaseTransaction) {
|
||||
p.DefaultStreamProvider.Setup(ctx, snapshot)
|
||||
|
||||
id, err := p.mrdDb.SelectMaxId(context.Background())
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
panic(err)
|
||||
}
|
||||
p.latestMutex.Lock()
|
||||
defer p.latestMutex.Unlock()
|
||||
if id == nil {
|
||||
p.latest = types.StreamPosition(0)
|
||||
} else {
|
||||
p.latest = types.StreamPosition(id.(int64))
|
||||
}
|
||||
}
|
||||
|
||||
func (p *MultiRoomDataStreamProvider) CompleteSync(
|
||||
ctx context.Context,
|
||||
snapshot storage.DatabaseTransaction,
|
||||
req *types.SyncRequest,
|
||||
) types.StreamPosition {
|
||||
return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx))
|
||||
}
|
||||
|
||||
func (p *MultiRoomDataStreamProvider) IncrementalSync(
|
||||
ctx context.Context,
|
||||
snapshot storage.DatabaseTransaction,
|
||||
req *types.SyncRequest,
|
||||
from, to types.StreamPosition,
|
||||
) types.StreamPosition {
|
||||
mr, err := snapshot.SelectMultiRoomData(ctx, &types.Range{From: from, To: to}, req.JoinedRooms)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("GetUserUnreadNotificationCountsForRooms failed")
|
||||
return from
|
||||
}
|
||||
req.Log.Tracef("MultiRoomDataStreamProvider IncrementalSync: %+v", mr)
|
||||
req.Response.MultiRoom = mr
|
||||
return to
|
||||
|
||||
}
|
||||
|
|
@ -77,6 +77,7 @@ func (p *PDUStreamProvider) CompleteSync(
|
|||
req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed")
|
||||
return from
|
||||
}
|
||||
req.JoinedRooms = joinedRoomIDs
|
||||
|
||||
stateFilter := req.Filter.Room.State
|
||||
eventFilter := req.Filter.Room.Timeline
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import (
|
|||
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/mrd"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
)
|
||||
|
|
@ -23,12 +24,14 @@ type Streams struct {
|
|||
DeviceListStreamProvider StreamProvider
|
||||
NotificationDataStreamProvider StreamProvider
|
||||
PresenceStreamProvider StreamProvider
|
||||
MultiRoomStreamProvider StreamProvider
|
||||
}
|
||||
|
||||
func NewSyncStreamProviders(
|
||||
d storage.Database, userAPI userapi.SyncUserAPI,
|
||||
rsAPI rsapi.SyncRoomserverAPI, keyAPI keyapi.SyncKeyAPI,
|
||||
eduCache *caching.EDUCache, lazyLoadCache caching.LazyLoadCache, notifier *notifier.Notifier,
|
||||
mrdb *mrd.Queries,
|
||||
) *Streams {
|
||||
streams := &Streams{
|
||||
PDUStreamProvider: &PDUStreamProvider{
|
||||
|
|
@ -66,6 +69,11 @@ func NewSyncStreamProviders(
|
|||
DefaultStreamProvider: DefaultStreamProvider{DB: d},
|
||||
notifier: notifier,
|
||||
},
|
||||
MultiRoomStreamProvider: &MultiRoomDataStreamProvider{
|
||||
DefaultStreamProvider: DefaultStreamProvider{DB: d},
|
||||
notifier: notifier,
|
||||
mrdDb: mrdb,
|
||||
},
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
|
|
@ -85,6 +93,7 @@ func NewSyncStreamProviders(
|
|||
streams.NotificationDataStreamProvider.Setup(ctx, snapshot)
|
||||
streams.DeviceListStreamProvider.Setup(ctx, snapshot)
|
||||
streams.PresenceStreamProvider.Setup(ctx, snapshot)
|
||||
streams.MultiRoomStreamProvider.Setup(ctx, snapshot)
|
||||
|
||||
succeeded = true
|
||||
return streams
|
||||
|
|
|
|||
|
|
@ -407,6 +407,14 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
|||
)
|
||||
},
|
||||
),
|
||||
MultiRoomDataPosition: withTransaction(
|
||||
syncReq.Since.MultiRoomDataPosition,
|
||||
func(txn storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.MultiRoomStreamProvider.CompleteSync(
|
||||
syncReq.Context, txn, syncReq,
|
||||
)
|
||||
},
|
||||
),
|
||||
}
|
||||
} else {
|
||||
// Incremental sync
|
||||
|
|
@ -492,6 +500,15 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
|||
)
|
||||
},
|
||||
),
|
||||
MultiRoomDataPosition: withTransaction(
|
||||
syncReq.Since.MultiRoomDataPosition,
|
||||
func(snapshot storage.DatabaseTransaction) types.StreamPosition {
|
||||
return rp.streams.MultiRoomStreamProvider.IncrementalSync(
|
||||
syncReq.Context, snapshot, syncReq,
|
||||
syncReq.Since.PresencePosition, rp.Notifier.CurrentPosition().PresencePosition,
|
||||
)
|
||||
},
|
||||
),
|
||||
}
|
||||
// it's possible for there to be no updates for this user even though since < current pos,
|
||||
// e.g busy servers with a quiet user. In this scenario, we don't want to return a no-op
|
||||
|
|
|
|||
|
|
@ -50,14 +50,26 @@ func AddPublicRoutes(
|
|||
|
||||
js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
||||
|
||||
syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database)
|
||||
syncDB, mrq, err := storage.NewSyncServerDatasource(base, &cfg.Database)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panicf("failed to connect to sync db")
|
||||
}
|
||||
|
||||
go func() {
|
||||
var affected int64
|
||||
for {
|
||||
affected, err = mrq.DeleteMultiRoomVisibilityByExpireTS(context.Background(), time.Now().Unix())
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("failed to expire multiroom visibility")
|
||||
}
|
||||
logrus.WithField("rows", affected).Info("expired multiroom visibility")
|
||||
time.Sleep(time.Minute)
|
||||
}
|
||||
}()
|
||||
|
||||
eduCache := caching.NewTypingCache()
|
||||
notifier := notifier.NewNotifier()
|
||||
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, base.Caches, notifier)
|
||||
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, base.Caches, notifier, mrq)
|
||||
notifier.SetCurrentPosition(streams.Latest(context.Background()))
|
||||
if err = notifier.Load(context.Background(), syncDB); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to load notifier ")
|
||||
|
|
@ -132,6 +144,13 @@ func AddPublicRoutes(
|
|||
logrus.WithError(err).Panicf("failed to start receipts consumer")
|
||||
}
|
||||
|
||||
multiRoomConsumer := consumers.NewOutputMultiRoomDataConsumer(
|
||||
base.ProcessContext, cfg, js, mrq, notifier, streams.MultiRoomStreamProvider,
|
||||
)
|
||||
if err = multiRoomConsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start multiroom consumer")
|
||||
}
|
||||
|
||||
routing.Setup(
|
||||
base.PublicClientAPIMux, requestPool, syncDB, userAPI,
|
||||
rsAPI, cfg, base.Caches, base.Fulltext,
|
||||
|
|
|
|||
21
syncapi/types/multiroom.go
Normal file
21
syncapi/types/multiroom.go
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
package types
|
||||
|
||||
type MultiRoom map[string]map[string]MultiRoomData
|
||||
|
||||
type MultiRoomContent []byte
|
||||
|
||||
type MultiRoomData struct {
|
||||
Content MultiRoomContent `json:"content"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
}
|
||||
|
||||
func (d MultiRoomContent) MarshalJSON() ([]byte, error) {
|
||||
return d, nil
|
||||
}
|
||||
|
||||
type MultiRoomDataRow struct {
|
||||
Data []byte
|
||||
Type string
|
||||
UserId string
|
||||
Timestamp int64
|
||||
}
|
||||
21
syncapi/types/multiroom_test.go
Normal file
21
syncapi/types/multiroom_test.go
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
package types
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/matryer/is"
|
||||
)
|
||||
|
||||
func TestMarshallMultiRoom(t *testing.T) {
|
||||
is := is.New(t)
|
||||
m, err := json.Marshal(
|
||||
MultiRoom{
|
||||
"@3:example.com": map[string]MultiRoomData{
|
||||
"location": {
|
||||
Content: MultiRoomContent(`{"foo":"bar"}`),
|
||||
Timestamp: 123,
|
||||
}}})
|
||||
is.NoErr(err)
|
||||
is.Equal(m, []byte(`{"@3:example.com":{"location":{"content":{"foo":"bar"},"timestamp":123}}}`))
|
||||
}
|
||||
|
|
@ -22,6 +22,7 @@ type SyncRequest struct {
|
|||
|
||||
// Updated by the PDU stream.
|
||||
Rooms map[string]string
|
||||
JoinedRooms []string
|
||||
// Updated by the PDU stream.
|
||||
MembershipChanges map[string]struct{}
|
||||
// Updated by the PDU stream.
|
||||
|
|
|
|||
|
|
@ -115,6 +115,7 @@ type StreamingToken struct {
|
|||
DeviceListPosition StreamPosition
|
||||
NotificationDataPosition StreamPosition
|
||||
PresencePosition StreamPosition
|
||||
MultiRoomDataPosition StreamPosition
|
||||
}
|
||||
|
||||
// This will be used as a fallback by json.Marshal.
|
||||
|
|
@ -130,12 +131,12 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) {
|
|||
|
||||
func (t StreamingToken) String() string {
|
||||
posStr := fmt.Sprintf(
|
||||
"s%d_%d_%d_%d_%d_%d_%d_%d_%d",
|
||||
"s%d_%d_%d_%d_%d_%d_%d_%d_%d_%d",
|
||||
t.PDUPosition, t.TypingPosition,
|
||||
t.ReceiptPosition, t.SendToDevicePosition,
|
||||
t.InvitePosition, t.AccountDataPosition,
|
||||
t.DeviceListPosition, t.NotificationDataPosition,
|
||||
t.PresencePosition,
|
||||
t.PresencePosition, t.MultiRoomDataPosition,
|
||||
)
|
||||
return posStr
|
||||
}
|
||||
|
|
@ -161,12 +162,14 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
|
|||
return true
|
||||
case t.PresencePosition > other.PresencePosition:
|
||||
return true
|
||||
case t.MultiRoomDataPosition > other.MultiRoomDataPosition:
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *StreamingToken) IsEmpty() bool {
|
||||
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition+t.PresencePosition == 0
|
||||
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition+t.PresencePosition+t.MultiRoomDataPosition == 0
|
||||
}
|
||||
|
||||
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
|
||||
|
|
@ -210,6 +213,9 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) {
|
|||
if other.PresencePosition > t.PresencePosition {
|
||||
t.PresencePosition = other.PresencePosition
|
||||
}
|
||||
if other.MultiRoomDataPosition > t.MultiRoomDataPosition {
|
||||
t.MultiRoomDataPosition = other.MultiRoomDataPosition
|
||||
}
|
||||
}
|
||||
|
||||
type TopologyToken struct {
|
||||
|
|
@ -304,7 +310,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
|
|||
// s478_0_0_0_0_13.dl-0-2 but we have now removed partitioned stream positions
|
||||
tok = strings.Split(tok, ".")[0]
|
||||
parts := strings.Split(tok[1:], "_")
|
||||
var positions [9]StreamPosition
|
||||
var positions [10]StreamPosition
|
||||
for i, p := range parts {
|
||||
if i >= len(positions) {
|
||||
break
|
||||
|
|
@ -328,6 +334,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
|
|||
DeviceListPosition: positions[6],
|
||||
NotificationDataPosition: positions[7],
|
||||
PresencePosition: positions[8],
|
||||
MultiRoomDataPosition: positions[9],
|
||||
}
|
||||
return token, nil
|
||||
}
|
||||
|
|
@ -364,6 +371,7 @@ type Response struct {
|
|||
ToDevice *ToDeviceResponse `json:"to_device,omitempty"`
|
||||
DeviceLists *DeviceLists `json:"device_lists,omitempty"`
|
||||
DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"`
|
||||
MultiRoom MultiRoom `json:"multiroom,omitempty"`
|
||||
}
|
||||
|
||||
func (r Response) MarshalJSON() ([]byte, error) {
|
||||
|
|
|
|||
|
|
@ -9,9 +9,9 @@ import (
|
|||
|
||||
func TestSyncTokens(t *testing.T) {
|
||||
shouldPass := map[string]string{
|
||||
"s4_0_0_0_0_0_0_0_3": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0, 3}.String(),
|
||||
"s3_1_0_0_0_0_2_0_5": StreamingToken{3, 1, 0, 0, 0, 0, 2, 0, 5}.String(),
|
||||
"s3_1_2_3_5_0_0_0_6": StreamingToken{3, 1, 2, 3, 5, 0, 0, 0, 6}.String(),
|
||||
"s4_0_0_0_0_0_0_0_3_0": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0, 3, 0}.String(),
|
||||
"s3_1_0_0_0_0_2_0_5_1": StreamingToken{3, 1, 0, 0, 0, 0, 2, 0, 5, 1}.String(),
|
||||
"s3_1_2_3_5_0_0_0_6_2": StreamingToken{3, 1, 2, 3, 5, 0, 0, 0, 6, 2}.String(),
|
||||
"t3_1": TopologyToken{3, 1}.String(),
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -171,7 +171,6 @@ func PrepareDBConnectionString(t *testing.T, dbType DBType) (connStr string, clo
|
|||
func WithAllDatabases(t *testing.T, testFn func(t *testing.T, db DBType)) {
|
||||
dbs := map[string]DBType{
|
||||
"postgres": DBTypePostgres,
|
||||
"sqlite": DBTypeSQLite,
|
||||
}
|
||||
for dbName, dbType := range dbs {
|
||||
dbt := dbType
|
||||
|
|
|
|||
6
userapi/api/api_multicast.go
Normal file
6
userapi/api/api_multicast.go
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
package api
|
||||
|
||||
type MulticastMetadata struct {
|
||||
ExpireMs int
|
||||
ExcludeRoomIds []string
|
||||
}
|
||||
Loading…
Reference in a new issue