Merge branch 'main' into main

This commit is contained in:
X. Ding 2022-10-28 11:01:35 +08:00 committed by GitHub
commit 839af200b7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 522 additions and 116 deletions

128
.github/workflows/schedules.yaml vendored Normal file
View file

@ -0,0 +1,128 @@
name: Scheduled
on:
schedule:
- cron: '0 0 * * *' # every day at midnight
workflow_dispatch:
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
# run go test with different go versions
test:
timeout-minutes: 20
name: Unit tests (Go ${{ matrix.go }})
runs-on: ubuntu-latest
# Service containers to run with `container-job`
services:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres:13-alpine
# Provide the password for postgres
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: dendrite
ports:
# Maps tcp port 5432 on service container to the host
- 5432:5432
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
strategy:
fail-fast: false
matrix:
go: ["1.18", "1.19"]
steps:
- uses: actions/checkout@v3
- name: Setup go
uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go }}
- uses: actions/cache@v3
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go${{ matrix.go }}-test-race-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go${{ matrix.go }}-test-race-
- run: go test -race ./...
env:
POSTGRES_HOST: localhost
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: dendrite
# Dummy step to gate other tests on without repeating the whole list
initial-tests-done:
name: Initial tests passed
needs: [test]
runs-on: ubuntu-latest
if: ${{ !cancelled() }} # Run this even if prior jobs were skipped
steps:
- name: Check initial tests passed
uses: re-actors/alls-green@release/v1
with:
jobs: ${{ toJSON(needs) }}
# run Sytest in different variations
sytest:
timeout-minutes: 60
needs: initial-tests-done
name: "Sytest (${{ matrix.label }})"
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
include:
- label: SQLite
- label: SQLite, full HTTP APIs
api: full-http
- label: PostgreSQL
postgres: postgres
- label: PostgreSQL, full HTTP APIs
postgres: postgres
api: full-http
container:
image: matrixdotorg/sytest-dendrite:latest
volumes:
- ${{ github.workspace }}:/src
env:
POSTGRES: ${{ matrix.postgres && 1}}
API: ${{ matrix.api && 1 }}
SYTEST_BRANCH: ${{ github.head_ref }}
RACE_DETECTION: 1
steps:
- uses: actions/checkout@v2
- name: Run Sytest
run: /bootstrap.sh dendrite
working-directory: /src
- name: Summarise results.tap
if: ${{ always() }}
run: /sytest/scripts/tap_to_gha.pl /logs/results.tap
- name: Sytest List Maintenance
if: ${{ always() }}
run: /src/show-expected-fail-tests.sh /logs/results.tap /src/sytest-whitelist /src/sytest-blacklist
continue-on-error: true # not fatal
- name: Are We Synapse Yet?
if: ${{ always() }}
run: /src/are-we-synapse-yet.py /logs/results.tap -v
continue-on-error: true # not fatal
- name: Upload Sytest logs
uses: actions/upload-artifact@v2
if: ${{ always() }}
with:
name: Sytest Logs - ${{ job.status }} - (Dendrite, ${{ join(matrix.*, ', ') }})
path: |
/logs/results.tap
/logs/**/*.log*

View file

@ -18,14 +18,15 @@ import (
"fmt"
"net/http"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
type roomDirectoryResponse struct {
@ -318,3 +319,43 @@ func SetVisibility(
JSON: struct{}{},
}
}
func SetVisibilityAS(
req *http.Request, rsAPI roomserverAPI.ClientRoomserverAPI, dev *userapi.Device,
networkID, roomID string,
) util.JSONResponse {
if dev.AccountType != userapi.AccountTypeAppService {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("Only appservice may use this endpoint"),
}
}
var v roomVisibility
// If the method is delete, we simply mark the visibility as private
if req.Method == http.MethodDelete {
v.Visibility = "private"
} else {
if reqErr := httputil.UnmarshalJSONRequest(req, &v); reqErr != nil {
return *reqErr
}
}
var publishRes roomserverAPI.PerformPublishResponse
if err := rsAPI.PerformPublish(req.Context(), &roomserverAPI.PerformPublishRequest{
RoomID: roomID,
Visibility: v.Visibility,
NetworkID: networkID,
AppserviceID: dev.AppserviceID,
}, &publishRes); err != nil {
return jsonerror.InternalAPIError(req.Context(), err)
}
if publishRes.Error != nil {
util.GetLogger(req.Context()).WithError(publishRes.Error).Error("PerformPublish failed")
return publishRes.Error.JSONResponse()
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: struct{}{},
}
}

View file

@ -39,14 +39,17 @@ var (
)
type PublicRoomReq struct {
Since string `json:"since,omitempty"`
Limit int16 `json:"limit,omitempty"`
Filter filter `json:"filter,omitempty"`
Server string `json:"server,omitempty"`
Since string `json:"since,omitempty"`
Limit int64 `json:"limit,omitempty"`
Filter filter `json:"filter,omitempty"`
Server string `json:"server,omitempty"`
IncludeAllNetworks bool `json:"include_all_networks,omitempty"`
NetworkID string `json:"third_party_instance_id,omitempty"`
}
type filter struct {
SearchTerms string `json:"generic_search_term,omitempty"`
SearchTerms string `json:"generic_search_term,omitempty"`
RoomTypes []string `json:"room_types,omitempty"` // TODO: Implement filter on this
}
// GetPostPublicRooms implements GET and POST /publicRooms
@ -61,6 +64,13 @@ func GetPostPublicRooms(
return *fillErr
}
if request.IncludeAllNetworks && request.NetworkID != "" {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidParam("include_all_networks and third_party_instance_id can not be used together"),
}
}
serverName := gomatrixserverlib.ServerName(request.Server)
if serverName != "" && !cfg.Matrix.IsLocalServerName(serverName) {
res, err := federation.GetPublicRoomsFiltered(
@ -97,7 +107,7 @@ func publicRooms(
response := gomatrixserverlib.RespPublicRooms{
Chunk: []gomatrixserverlib.PublicRoom{},
}
var limit int16
var limit int64
var offset int64
limit = request.Limit
if limit == 0 {
@ -114,7 +124,7 @@ func publicRooms(
var rooms []gomatrixserverlib.PublicRoom
if request.Since == "" {
rooms = refreshPublicRoomCache(ctx, rsAPI, extRoomsProvider)
rooms = refreshPublicRoomCache(ctx, rsAPI, extRoomsProvider, request)
} else {
rooms = getPublicRoomsFromCache()
}
@ -176,7 +186,7 @@ func fillPublicRoomsReq(httpReq *http.Request, request *PublicRoomReq) *util.JSO
JSON: jsonerror.BadJSON("limit param is not a number"),
}
}
request.Limit = int16(limit)
request.Limit = int64(limit)
request.Since = httpReq.FormValue("since")
request.Server = httpReq.FormValue("server")
} else {
@ -204,7 +214,7 @@ func fillPublicRoomsReq(httpReq *http.Request, request *PublicRoomReq) *util.JSO
// limit=3&since=6 => G (prev='3', next='')
//
// A value of '-1' for prev/next indicates no position.
func sliceInto(slice []gomatrixserverlib.PublicRoom, since int64, limit int16) (subset []gomatrixserverlib.PublicRoom, prev, next int) {
func sliceInto(slice []gomatrixserverlib.PublicRoom, since int64, limit int64) (subset []gomatrixserverlib.PublicRoom, prev, next int) {
prev = -1
next = -1
@ -230,6 +240,7 @@ func sliceInto(slice []gomatrixserverlib.PublicRoom, since int64, limit int16) (
func refreshPublicRoomCache(
ctx context.Context, rsAPI roomserverAPI.ClientRoomserverAPI, extRoomsProvider api.ExtraPublicRoomsProvider,
request PublicRoomReq,
) []gomatrixserverlib.PublicRoom {
cacheMu.Lock()
defer cacheMu.Unlock()
@ -238,8 +249,17 @@ func refreshPublicRoomCache(
extraRooms = extRoomsProvider.Rooms()
}
// TODO: this is only here to make Sytest happy, for now.
ns := strings.Split(request.NetworkID, "|")
if len(ns) == 2 {
request.NetworkID = ns[1]
}
var queryRes roomserverAPI.QueryPublishedRoomsResponse
err := rsAPI.QueryPublishedRooms(ctx, &roomserverAPI.QueryPublishedRoomsRequest{}, &queryRes)
err := rsAPI.QueryPublishedRooms(ctx, &roomserverAPI.QueryPublishedRoomsRequest{
NetworkID: request.NetworkID,
IncludeAllNetworks: request.IncludeAllNetworks,
}, &queryRes)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed")
return publicRoomsCache

View file

@ -17,7 +17,7 @@ func TestSliceInto(t *testing.T) {
slice := []gomatrixserverlib.PublicRoom{
pubRoom("a"), pubRoom("b"), pubRoom("c"), pubRoom("d"), pubRoom("e"), pubRoom("f"), pubRoom("g"),
}
limit := int16(3)
limit := int64(3)
testCases := []struct {
since int64
wantPrev int

View file

@ -480,7 +480,7 @@ func Setup(
return GetVisibility(req, rsAPI, vars["roomID"])
}),
).Methods(http.MethodGet, http.MethodOptions)
// TODO: Add AS support
v3mux.Handle("/directory/list/room/{roomID}",
httputil.MakeAuthAPI("directory_list", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -490,6 +490,27 @@ func Setup(
return SetVisibility(req, rsAPI, device, vars["roomID"])
}),
).Methods(http.MethodPut, http.MethodOptions)
v3mux.Handle("/directory/list/appservice/{networkID}/{roomID}",
httputil.MakeAuthAPI("directory_list", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
return SetVisibilityAS(req, rsAPI, device, vars["networkID"], vars["roomID"])
}),
).Methods(http.MethodPut, http.MethodOptions)
// Undocumented endpoint
v3mux.Handle("/directory/list/appservice/{networkID}/{roomID}",
httputil.MakeAuthAPI("directory_list", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
return SetVisibilityAS(req, rsAPI, device, vars["networkID"], vars["roomID"])
}),
).Methods(http.MethodDelete, http.MethodOptions)
v3mux.Handle("/publicRooms",
httputil.MakeExternalAPI("public_rooms", func(req *http.Request) util.JSONResponse {
return GetPostPublicRooms(req, rsAPI, extRoomsProvider, federation, cfg)

View file

@ -44,7 +44,7 @@ func (a *FederationInternalAPI) ClaimKeys(
) (gomatrixserverlib.RespClaimKeys, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequestIfNotBackingOffOrBlacklisted(s, func() (interface{}, error) {
ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
return a.federation.ClaimKeys(ctx, s, oneTimeKeys)
})
if err != nil {

View file

@ -2,24 +2,29 @@ package routing
import (
"context"
"fmt"
"net/http"
"strconv"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
type PublicRoomReq struct {
Since string `json:"since,omitempty"`
Limit int16 `json:"limit,omitempty"`
Filter filter `json:"filter,omitempty"`
Since string `json:"since,omitempty"`
Limit int16 `json:"limit,omitempty"`
Filter filter `json:"filter,omitempty"`
IncludeAllNetworks bool `json:"include_all_networks,omitempty"`
NetworkID string `json:"third_party_instance_id,omitempty"`
}
type filter struct {
SearchTerms string `json:"generic_search_term,omitempty"`
SearchTerms string `json:"generic_search_term,omitempty"`
RoomTypes []string `json:"room_types,omitempty"`
}
// GetPostPublicRooms implements GET and POST /publicRooms
@ -57,8 +62,14 @@ func publicRooms(
return nil, err
}
if request.IncludeAllNetworks && request.NetworkID != "" {
return nil, fmt.Errorf("include_all_networks and third_party_instance_id can not be used together")
}
var queryRes roomserverAPI.QueryPublishedRoomsResponse
err = rsAPI.QueryPublishedRooms(ctx, &roomserverAPI.QueryPublishedRoomsRequest{}, &queryRes)
err = rsAPI.QueryPublishedRooms(ctx, &roomserverAPI.QueryPublishedRoomsRequest{
NetworkID: request.NetworkID,
}, &queryRes)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed")
return nil, err

View file

@ -128,58 +128,49 @@ func (a *KeyInternalAPI) PerformClaimKeys(ctx context.Context, req *api.PerformC
func (a *KeyInternalAPI) claimRemoteKeys(
ctx context.Context, timeout time.Duration, res *api.PerformClaimKeysResponse, domainToDeviceKeys map[string]map[string]map[string]string,
) {
resultCh := make(chan *gomatrixserverlib.RespClaimKeys, len(domainToDeviceKeys))
// allows us to wait until all federation servers have been poked
var wg sync.WaitGroup
wg.Add(len(domainToDeviceKeys))
// mutex for failures
var failMu sync.Mutex
util.GetLogger(ctx).WithField("num_servers", len(domainToDeviceKeys)).Info("Claiming remote keys from servers")
var wg sync.WaitGroup // Wait for fan-out goroutines to finish
var mu sync.Mutex // Protects the response struct
var claimed int // Number of keys claimed in total
var failures int // Number of servers we failed to ask
util.GetLogger(ctx).Infof("Claiming remote keys from %d server(s)", len(domainToDeviceKeys))
wg.Add(len(domainToDeviceKeys))
// fan out
for d, k := range domainToDeviceKeys {
go func(domain string, keysToClaim map[string]map[string]string) {
defer wg.Done()
fedCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
defer wg.Done()
claimKeyRes, err := a.FedClient.ClaimKeys(fedCtx, gomatrixserverlib.ServerName(domain), keysToClaim)
mu.Lock()
defer mu.Unlock()
if err != nil {
util.GetLogger(ctx).WithError(err).WithField("server", domain).Error("ClaimKeys failed")
failMu.Lock()
res.Failures[domain] = map[string]interface{}{
"message": err.Error(),
}
failMu.Unlock()
failures++
return
}
resultCh <- &claimKeyRes
for userID, deviceIDToKeys := range claimKeyRes.OneTimeKeys {
res.OneTimeKeys[userID] = make(map[string]map[string]json.RawMessage)
for deviceID, keys := range deviceIDToKeys {
res.OneTimeKeys[userID][deviceID] = keys
claimed += len(keys)
}
}
}(d, k)
}
// Close the result channel when the goroutines have quit so the for .. range exits
go func() {
wg.Wait()
close(resultCh)
}()
keysClaimed := 0
for result := range resultCh {
for userID, nest := range result.OneTimeKeys {
res.OneTimeKeys[userID] = make(map[string]map[string]json.RawMessage)
for deviceID, nest2 := range nest {
res.OneTimeKeys[userID][deviceID] = make(map[string]json.RawMessage)
for keyIDWithAlgo, otk := range nest2 {
keyJSON, err := json.Marshal(otk)
if err != nil {
continue
}
res.OneTimeKeys[userID][deviceID][keyIDWithAlgo] = keyJSON
keysClaimed++
}
}
}
}
util.GetLogger(ctx).WithField("num_keys", keysClaimed).Info("Claimed remote keys")
wg.Wait()
util.GetLogger(ctx).WithFields(logrus.Fields{
"num_keys": claimed,
"num_failures": failures,
}).Infof("Claimed remote keys from %d server(s)", len(domainToDeviceKeys))
}
func (a *KeyInternalAPI) PerformDeleteKeys(ctx context.Context, req *api.PerformDeleteKeysRequest, res *api.PerformDeleteKeysResponse) error {

View file

@ -168,8 +168,10 @@ type PerformBackfillResponse struct {
}
type PerformPublishRequest struct {
RoomID string
Visibility string
RoomID string
Visibility string
AppserviceID string
NetworkID string
}
type PerformPublishResponse struct {

View file

@ -21,8 +21,9 @@ import (
"fmt"
"strings"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
)
// QueryLatestEventsAndStateRequest is a request to QueryLatestEventsAndState
@ -257,7 +258,9 @@ type QueryRoomVersionForRoomResponse struct {
type QueryPublishedRoomsRequest struct {
// Optional. If specified, returns whether this room is published or not.
RoomID string
RoomID string
NetworkID string
IncludeAllNetworks bool
}
type QueryPublishedRoomsResponse struct {

View file

@ -30,7 +30,7 @@ func (r *Publisher) PerformPublish(
req *api.PerformPublishRequest,
res *api.PerformPublishResponse,
) error {
err := r.DB.PublishRoom(ctx, req.RoomID, req.Visibility == "public")
err := r.DB.PublishRoom(ctx, req.RoomID, req.AppserviceID, req.NetworkID, req.Visibility == "public")
if err != nil {
res.Error = &api.PerformError{
Msg: err.Error(),

View file

@ -702,7 +702,7 @@ func (r *Queryer) QueryPublishedRooms(
}
return err
}
rooms, err := r.DB.GetPublishedRooms(ctx)
rooms, err := r.DB.GetPublishedRooms(ctx, req.NetworkID, req.IncludeAllNetworks)
if err != nil {
return err
}

View file

@ -139,9 +139,9 @@ type Database interface {
// Returns an error if the retrieval went wrong.
EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error)
// Publish or unpublish a room from the room directory.
PublishRoom(ctx context.Context, roomID string, publish bool) error
PublishRoom(ctx context.Context, roomID, appserviceID, networkID string, publish bool) error
// Returns a list of room IDs for rooms which are published.
GetPublishedRooms(ctx context.Context) ([]string, error)
GetPublishedRooms(ctx context.Context, networkID string, includeAllNetworks bool) ([]string, error)
// Returns whether a given room is published or not.
GetPublishedRoom(ctx context.Context, roomID string) (bool, error)

View file

@ -0,0 +1,45 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package deltas
import (
"context"
"database/sql"
"fmt"
)
func UpPulishedAppservice(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `ALTER TABLE roomserver_published ADD COLUMN IF NOT EXISTS appservice_id TEXT NOT NULL DEFAULT '';`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
_, err = tx.ExecContext(ctx, `ALTER TABLE roomserver_published ADD COLUMN IF NOT EXISTS network_id TEXT NOT NULL DEFAULT '';`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
func DownPublishedAppservice(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, `ALTER TABLE roomserver_published DROP COLUMN IF EXISTS appservice_id;`)
if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err)
}
_, err = tx.ExecContext(ctx, `ALTER TABLE roomserver_published DROP COLUMN IF EXISTS network_id;`)
if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err)
}
return nil
}

View file

@ -20,6 +20,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/postgres/deltas"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
)
@ -27,31 +28,48 @@ const publishedSchema = `
-- Stores which rooms are published in the room directory
CREATE TABLE IF NOT EXISTS roomserver_published (
-- The room ID of the room
room_id TEXT NOT NULL PRIMARY KEY,
room_id TEXT NOT NULL,
-- The appservice ID of the room
appservice_id TEXT NOT NULL,
-- The network_id of the room
network_id TEXT NOT NULL,
-- Whether it is published or not
published BOOLEAN NOT NULL DEFAULT false
published BOOLEAN NOT NULL DEFAULT false,
PRIMARY KEY (room_id, appservice_id, network_id)
);
`
const upsertPublishedSQL = "" +
"INSERT INTO roomserver_published (room_id, published) VALUES ($1, $2) " +
"ON CONFLICT (room_id) DO UPDATE SET published=$2"
"INSERT INTO roomserver_published (room_id, appservice_id, network_id, published) VALUES ($1, $2, $3, $4) " +
"ON CONFLICT (room_id, appservice_id, network_id) DO UPDATE SET published=$4"
const selectAllPublishedSQL = "" +
"SELECT room_id FROM roomserver_published WHERE published = $1 ORDER BY room_id ASC"
"SELECT room_id FROM roomserver_published WHERE published = $1 AND CASE WHEN $2 THEN 1=1 ELSE network_id = '' END ORDER BY room_id ASC"
const selectNetworkPublishedSQL = "" +
"SELECT room_id FROM roomserver_published WHERE published = $1 AND network_id = $2 ORDER BY room_id ASC"
const selectPublishedSQL = "" +
"SELECT published FROM roomserver_published WHERE room_id = $1"
type publishedStatements struct {
upsertPublishedStmt *sql.Stmt
selectAllPublishedStmt *sql.Stmt
selectPublishedStmt *sql.Stmt
upsertPublishedStmt *sql.Stmt
selectAllPublishedStmt *sql.Stmt
selectPublishedStmt *sql.Stmt
selectNetworkPublishedStmt *sql.Stmt
}
func CreatePublishedTable(db *sql.DB) error {
_, err := db.Exec(publishedSchema)
return err
if err != nil {
return err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "roomserver: published appservice",
Up: deltas.UpPulishedAppservice,
})
return m.Up(context.Background())
}
func PreparePublishedTable(db *sql.DB) (tables.Published, error) {
@ -61,14 +79,15 @@ func PreparePublishedTable(db *sql.DB) (tables.Published, error) {
{&s.upsertPublishedStmt, upsertPublishedSQL},
{&s.selectAllPublishedStmt, selectAllPublishedSQL},
{&s.selectPublishedStmt, selectPublishedSQL},
{&s.selectNetworkPublishedStmt, selectNetworkPublishedSQL},
}.Prepare(db)
}
func (s *publishedStatements) UpsertRoomPublished(
ctx context.Context, txn *sql.Tx, roomID string, published bool,
ctx context.Context, txn *sql.Tx, roomID, appserviceID, networkID string, published bool,
) (err error) {
stmt := sqlutil.TxStmt(txn, s.upsertPublishedStmt)
_, err = stmt.ExecContext(ctx, roomID, published)
_, err = stmt.ExecContext(ctx, roomID, appserviceID, networkID, published)
return
}
@ -84,10 +103,18 @@ func (s *publishedStatements) SelectPublishedFromRoomID(
}
func (s *publishedStatements) SelectAllPublishedRooms(
ctx context.Context, txn *sql.Tx, published bool,
ctx context.Context, txn *sql.Tx, networkID string, published, includeAllNetworks bool,
) ([]string, error) {
stmt := sqlutil.TxStmt(txn, s.selectAllPublishedStmt)
rows, err := stmt.QueryContext(ctx, published)
var rows *sql.Rows
var err error
if networkID != "" {
stmt := sqlutil.TxStmt(txn, s.selectNetworkPublishedStmt)
rows, err = stmt.QueryContext(ctx, published, networkID)
} else {
stmt := sqlutil.TxStmt(txn, s.selectAllPublishedStmt)
rows, err = stmt.QueryContext(ctx, published, includeAllNetworks)
}
if err != nil {
return nil, err
}

View file

@ -722,9 +722,9 @@ func (d *Database) storeEvent(
}, redactionEvent, redactedEventID, err
}
func (d *Database) PublishRoom(ctx context.Context, roomID string, publish bool) error {
func (d *Database) PublishRoom(ctx context.Context, roomID, appserviceID, networkID string, publish bool) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.PublishedTable.UpsertRoomPublished(ctx, txn, roomID, publish)
return d.PublishedTable.UpsertRoomPublished(ctx, txn, roomID, appserviceID, networkID, publish)
})
}
@ -732,8 +732,8 @@ func (d *Database) GetPublishedRoom(ctx context.Context, roomID string) (bool, e
return d.PublishedTable.SelectPublishedFromRoomID(ctx, nil, roomID)
}
func (d *Database) GetPublishedRooms(ctx context.Context) ([]string, error) {
return d.PublishedTable.SelectAllPublishedRooms(ctx, nil, true)
func (d *Database) GetPublishedRooms(ctx context.Context, networkID string, includeAllNetworks bool) ([]string, error) {
return d.PublishedTable.SelectAllPublishedRooms(ctx, nil, networkID, true, includeAllNetworks)
}
func (d *Database) MissingAuthPrevEvents(

View file

@ -0,0 +1,64 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package deltas
import (
"context"
"database/sql"
"fmt"
)
func UpPulishedAppservice(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, ` ALTER TABLE roomserver_published RENAME TO roomserver_published_tmp;
CREATE TABLE IF NOT EXISTS roomserver_published (
room_id TEXT NOT NULL,
appservice_id TEXT NOT NULL,
network_id TEXT NOT NULL,
published BOOLEAN NOT NULL DEFAULT false,
CONSTRAINT unique_published_idx PRIMARY KEY (room_id, appservice_id, network_id)
);
INSERT
INTO roomserver_published (
room_id, published
) SELECT
room_id, published
FROM roomserver_published_tmp
;
DROP TABLE roomserver_published_tmp;`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
func DownPublishedAppservice(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, ` ALTER TABLE roomserver_published RENAME TO roomserver_published_tmp;
CREATE TABLE IF NOT EXISTS roomserver_published (
room_id TEXT NOT NULL PRIMARY KEY,
published BOOLEAN NOT NULL DEFAULT false
);
INSERT
INTO roomserver_published (
room_id, published
) SELECT
room_id, published
FROM roomserver_published_tmp
;
DROP TABLE roomserver_published_tmp;`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}

View file

@ -20,6 +20,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
)
@ -27,31 +28,49 @@ const publishedSchema = `
-- Stores which rooms are published in the room directory
CREATE TABLE IF NOT EXISTS roomserver_published (
-- The room ID of the room
room_id TEXT NOT NULL PRIMARY KEY,
room_id TEXT NOT NULL,
-- The appservice ID of the room
appservice_id TEXT NOT NULL,
-- The network_id of the room
network_id TEXT NOT NULL,
-- Whether it is published or not
published BOOLEAN NOT NULL DEFAULT false
published BOOLEAN NOT NULL DEFAULT false,
PRIMARY KEY (room_id, appservice_id, network_id)
);
`
const upsertPublishedSQL = "" +
"INSERT OR REPLACE INTO roomserver_published (room_id, published) VALUES ($1, $2)"
"INSERT INTO roomserver_published (room_id, appservice_id, network_id, published) VALUES ($1, $2, $3, $4)" +
" ON CONFLICT (room_id, appservice_id, network_id) DO UPDATE SET published = $4"
const selectAllPublishedSQL = "" +
"SELECT room_id FROM roomserver_published WHERE published = $1 ORDER BY room_id ASC"
"SELECT room_id FROM roomserver_published WHERE published = $1 AND CASE WHEN $2 THEN 1=1 ELSE network_id = '' END ORDER BY room_id ASC"
const selectNetworkPublishedSQL = "" +
"SELECT room_id FROM roomserver_published WHERE published = $1 AND network_id = $2 ORDER BY room_id ASC"
const selectPublishedSQL = "" +
"SELECT published FROM roomserver_published WHERE room_id = $1"
type publishedStatements struct {
db *sql.DB
upsertPublishedStmt *sql.Stmt
selectAllPublishedStmt *sql.Stmt
selectPublishedStmt *sql.Stmt
db *sql.DB
upsertPublishedStmt *sql.Stmt
selectAllPublishedStmt *sql.Stmt
selectPublishedStmt *sql.Stmt
selectNetworkPublishedStmt *sql.Stmt
}
func CreatePublishedTable(db *sql.DB) error {
_, err := db.Exec(publishedSchema)
return err
if err != nil {
return err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(sqlutil.Migration{
Version: "roomserver: published appservice",
Up: deltas.UpPulishedAppservice,
})
return m.Up(context.Background())
}
func PreparePublishedTable(db *sql.DB) (tables.Published, error) {
@ -63,14 +82,15 @@ func PreparePublishedTable(db *sql.DB) (tables.Published, error) {
{&s.upsertPublishedStmt, upsertPublishedSQL},
{&s.selectAllPublishedStmt, selectAllPublishedSQL},
{&s.selectPublishedStmt, selectPublishedSQL},
{&s.selectNetworkPublishedStmt, selectNetworkPublishedSQL},
}.Prepare(db)
}
func (s *publishedStatements) UpsertRoomPublished(
ctx context.Context, txn *sql.Tx, roomID string, published bool,
ctx context.Context, txn *sql.Tx, roomID, appserviceID, networkID string, published bool,
) error {
stmt := sqlutil.TxStmt(txn, s.upsertPublishedStmt)
_, err := stmt.ExecContext(ctx, roomID, published)
_, err := stmt.ExecContext(ctx, roomID, appserviceID, networkID, published)
return err
}
@ -86,10 +106,17 @@ func (s *publishedStatements) SelectPublishedFromRoomID(
}
func (s *publishedStatements) SelectAllPublishedRooms(
ctx context.Context, txn *sql.Tx, published bool,
ctx context.Context, txn *sql.Tx, networkID string, published, includeAllNetworks bool,
) ([]string, error) {
stmt := sqlutil.TxStmt(txn, s.selectAllPublishedStmt)
rows, err := stmt.QueryContext(ctx, published)
var rows *sql.Rows
var err error
if networkID != "" {
stmt := sqlutil.TxStmt(txn, s.selectNetworkPublishedStmt)
rows, err = stmt.QueryContext(ctx, published, networkID)
} else {
stmt := sqlutil.TxStmt(txn, s.selectAllPublishedStmt)
rows, err = stmt.QueryContext(ctx, published, includeAllNetworks)
}
if err != nil {
return nil, err
}

View file

@ -146,9 +146,9 @@ type Membership interface {
}
type Published interface {
UpsertRoomPublished(ctx context.Context, txn *sql.Tx, roomID string, published bool) (err error)
UpsertRoomPublished(ctx context.Context, txn *sql.Tx, roomID, appserviceID, networkID string, published bool) (err error)
SelectPublishedFromRoomID(ctx context.Context, txn *sql.Tx, roomID string) (published bool, err error)
SelectAllPublishedRooms(ctx context.Context, txn *sql.Tx, published bool) ([]string, error)
SelectAllPublishedRooms(ctx context.Context, txn *sql.Tx, networkdID string, published, includeAllNetworks bool) ([]string, error)
}
type RedactionInfo struct {

View file

@ -2,16 +2,18 @@ package tables_test
import (
"context"
"fmt"
"sort"
"testing"
"github.com/stretchr/testify/assert"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/postgres"
"github.com/matrix-org/dendrite/roomserver/storage/sqlite3"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/test"
"github.com/stretchr/testify/assert"
)
func mustCreatePublishedTable(t *testing.T, dbType test.DBType) (tab tables.Published, close func()) {
@ -46,10 +48,12 @@ func TestPublishedTable(t *testing.T) {
// Publish some rooms
publishedRooms := []string{}
asID := ""
nwID := ""
for i := 0; i < 10; i++ {
room := test.NewRoom(t, alice)
published := i%2 == 0
err := tab.UpsertRoomPublished(ctx, nil, room.ID, published)
err := tab.UpsertRoomPublished(ctx, nil, room.ID, asID, nwID, published)
assert.NoError(t, err)
if published {
publishedRooms = append(publishedRooms, room.ID)
@ -61,19 +65,36 @@ func TestPublishedTable(t *testing.T) {
sort.Strings(publishedRooms)
// check that we get the expected published rooms
roomIDs, err := tab.SelectAllPublishedRooms(ctx, nil, true)
roomIDs, err := tab.SelectAllPublishedRooms(ctx, nil, "", true, true)
assert.NoError(t, err)
assert.Equal(t, publishedRooms, roomIDs)
// test an actual upsert
room := test.NewRoom(t, alice)
err = tab.UpsertRoomPublished(ctx, nil, room.ID, true)
err = tab.UpsertRoomPublished(ctx, nil, room.ID, asID, nwID, true)
assert.NoError(t, err)
err = tab.UpsertRoomPublished(ctx, nil, room.ID, false)
err = tab.UpsertRoomPublished(ctx, nil, room.ID, asID, nwID, false)
assert.NoError(t, err)
// should now be false, due to the upsert
publishedRes, err := tab.SelectPublishedFromRoomID(ctx, nil, room.ID)
assert.NoError(t, err)
assert.False(t, publishedRes)
assert.False(t, publishedRes, fmt.Sprintf("expected room %s to be unpublished", room.ID))
// network specific test
nwID = "irc"
room = test.NewRoom(t, alice)
err = tab.UpsertRoomPublished(ctx, nil, room.ID, asID, nwID, true)
assert.NoError(t, err)
publishedRooms = append(publishedRooms, room.ID)
sort.Strings(publishedRooms)
// should only return the room for network "irc"
allNWPublished, err := tab.SelectAllPublishedRooms(ctx, nil, nwID, true, true)
assert.NoError(t, err)
assert.Equal(t, []string{room.ID}, allNWPublished)
// check that we still get all published rooms regardless networkID
roomIDs, err = tab.SelectAllPublishedRooms(ctx, nil, "", true, true)
assert.NoError(t, err)
assert.Equal(t, publishedRooms, roomIDs)
})
}

View file

@ -76,6 +76,13 @@ func GetMemberships(
}
}
if joinedOnly && !queryRes.IsInRoom {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("You aren't a member of the room and weren't previously a member of the room."),
}
}
db, err := syncDB.NewDatabaseSnapshot(req.Context())
if err != nil {
return jsonerror.InternalServerError()
@ -102,19 +109,15 @@ func GetMemberships(
return jsonerror.InternalServerError()
}
result, err := db.Events(req.Context(), eventIDs)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("db.Events failed")
qryRes := &api.QueryEventsByIDResponse{}
if err := rsAPI.QueryEventsByID(req.Context(), &api.QueryEventsByIDRequest{EventIDs: eventIDs}, qryRes); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("rsAPI.QueryEventsByID failed")
return jsonerror.InternalServerError()
}
result := qryRes.Events
if joinedOnly {
if !queryRes.IsInRoom {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("You aren't a member of the room and weren't previously a member of the room."),
}
}
var res getJoinedMembersResponse
res.Joined = make(map[string]joinedMember)
for _, ev := range result {

View file

@ -758,3 +758,5 @@ Local device key changes appear in /keys/changes
Can get rooms/{roomId}/members at a given point
Can filter rooms/{roomId}/members
Current state appears in timeline in private history with many messages after
AS can publish rooms in their own list
AS and main public room lists are separate