Merge branch 'master' of github.com:matrix-org/dendrite into anoa/fix_lint_ci

This commit is contained in:
Andrew Morgan 2020-01-09 16:40:55 +00:00
commit dea0e1de6a
76 changed files with 1699 additions and 1315 deletions

View file

@ -1,21 +1,23 @@
# Code Style
We follow the standard Go style using gofmt, but with a few extra
We follow the standard Go style using goimports, but with a few extra
considerations.
## Linters
We use `gometalinter` to run a number of linters, the exact list can be found
in [linter.json](linter.json). Some of these are slow and expensive to run, but
a subset can be found in [linter-fast.json](linter-fast.json) that run quickly
enough to be run as part of an IDE.
We use `golangci-lint` to run a number of linters, the exact list can be found
under linters in [.golangci.yml](.golangci.yml).
[Installation](https://github.com/golangci/golangci-lint#install) and [Editor
Integration](https://github.com/golangci/golangci-lint#editor-integration) for
it can be found in the readme of golangci-lint.
For rare cases where a linter is giving a spurious warning, it can be disabled
for that line or statement using a [comment directive](https://github.com/alecthomas/gometalinter#comment-directives), e.g.
`// nolint: gocyclo`. This should be used sparingly and only when its clear
that the lint warning is spurious.
for that line or statement using a [comment
directive](https://github.com/golangci/golangci-lint#nolint), e.g. `var
bad_name int //nolint:golint,unused`. This should be used sparingly and only
when its clear that the lint warning is spurious.
The linters are vendored, and can be run using [scripts/find-lint.sh](scripts/find-lint.sh)
The linters can be run using [scripts/find-lint.sh](scripts/find-lint.sh)
(see file for docs) or as part of a build/test/lint cycle using
[scripts/build-test-lint.sh](scripts/build-test-lint.sh).
@ -82,9 +84,9 @@ sets up linting correctly:
```json
{
"go.gopath": "${workspaceRoot}:${workspaceRoot}/vendor",
"go.lintOnSave": "workspace",
"go.lintTool": "gometalinter",
"go.lintFlags": ["--config=linter-fast.json", "--concurrency=5"]
"go.lintTool":"golangci-lint",
"go.lintFlags": [
"--fast"
]
}
```

View file

@ -27,7 +27,7 @@ import (
"github.com/matrix-org/util"
)
const pathPrefixApp = "/_matrix/app/r0"
const pathPrefixApp = "/_matrix/app/v1"
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
// to clients which need to make outbound HTTP requests.

View file

@ -1,3 +1,3 @@
#!/bin/sh
GOBIN=$PWD/`dirname $0`/bin go install -v ./cmd/...
GOBIN=$PWD/`dirname $0`/bin go install -v $PWD/`dirname $0`/cmd/...

View file

@ -29,6 +29,7 @@ func main() {
deviceDB := base.CreateDeviceDB()
keyDB := base.CreateKeyDB()
federation := base.CreateFederationClient()
federationSender := base.CreateHTTPFederationSenderAPIs()
keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
alias, input, query := base.CreateHTTPRoomserverAPIs()
@ -36,7 +37,7 @@ func main() {
federationapi.SetupFederationAPIComponent(
base, accountDB, deviceDB, federation, &keyRing,
alias, input, query, asQuery,
alias, input, query, asQuery, federationSender,
)
base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI))

View file

@ -67,7 +67,7 @@ func main() {
federation, &keyRing, alias, input, query,
typingInputAPI, asQuery, transactions.New(), fedSenderAPI,
)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI)
mediaapi.SetupMediaAPIComponent(base, deviceDB)
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB)
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query)

View file

@ -138,7 +138,7 @@ func (b *BaseDendrite) CreateAccountsDB() *accounts.Database {
// CreateKeyDB creates a new instance of the key database. Should only be called
// once per component.
func (b *BaseDendrite) CreateKeyDB() *keydb.Database {
func (b *BaseDendrite) CreateKeyDB() keydb.Database {
db, err := keydb.NewDatabase(string(b.Cfg.Database.ServerKey))
if err != nil {
logrus.WithError(err).Panicf("failed to connect to keys db: %s", b.Cfg.Database.ServerKey)

View file

@ -1,4 +1,4 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -16,67 +16,29 @@ package keydb
import (
"context"
"database/sql"
"errors"
"net/url"
"github.com/matrix-org/dendrite/common/keydb/postgres"
"github.com/matrix-org/gomatrixserverlib"
)
// A Database implements gomatrixserverlib.KeyDatabase and is used to store
// the public keys for other matrix servers.
type Database struct {
statements serverKeyStatements
type Database interface {
FetcherName() string
FetchKeys(ctx context.Context, requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error)
StoreKeys(ctx context.Context, keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult) error
}
// NewDatabase prepares a new key database.
// It creates the necessary tables if they don't already exist.
// It prepares all the SQL statements that it will use.
// Returns an error if there was a problem talking to the database.
func NewDatabase(dataSourceName string) (*Database, error) {
db, err := sql.Open("postgres", dataSourceName)
// NewDatabase opens a database connection.
func NewDatabase(dataSourceName string) (Database, error) {
uri, err := url.Parse(dataSourceName)
if err != nil {
return nil, err
}
d := &Database{}
err = d.statements.prepare(db)
if err != nil {
return nil, err
switch uri.Scheme {
case "postgres":
return postgres.NewDatabase(dataSourceName)
default:
return nil, errors.New("unknown schema")
}
return d, nil
}
// FetcherName implements KeyFetcher
func (d Database) FetcherName() string {
return "KeyDatabase"
}
// FetchKeys implements gomatrixserverlib.KeyDatabase
func (d *Database) FetchKeys(
ctx context.Context,
requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) {
return d.statements.bulkSelectServerKeys(ctx, requests)
}
// StoreKeys implements gomatrixserverlib.KeyDatabase
func (d *Database) StoreKeys(
ctx context.Context,
keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult,
) error {
// TODO: Inserting all the keys within a single transaction may
// be more efficient since the transaction overhead can be quite
// high for a single insert statement.
var lastErr error
for request, keys := range keyMap {
if err := d.statements.upsertServerKeys(ctx, request, keys); err != nil {
// Rather than returning immediately on error we try to insert the
// remaining keys.
// Since we are inserting the keys outside of a transaction it is
// possible for some of the inserts to succeed even though some
// of the inserts have failed.
// Ensuring that we always insert all the keys we can means that
// this behaviour won't depend on the iteration order of the map.
lastErr = err
}
}
return lastErr
}

View file

@ -0,0 +1,83 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"github.com/matrix-org/gomatrixserverlib"
)
// A Database implements gomatrixserverlib.KeyDatabase and is used to store
// the public keys for other matrix servers.
type Database struct {
statements serverKeyStatements
}
// NewDatabase prepares a new key database.
// It creates the necessary tables if they don't already exist.
// It prepares all the SQL statements that it will use.
// Returns an error if there was a problem talking to the database.
func NewDatabase(dataSourceName string) (*Database, error) {
db, err := sql.Open("postgres", dataSourceName)
if err != nil {
return nil, err
}
d := &Database{}
err = d.statements.prepare(db)
if err != nil {
return nil, err
}
return d, nil
}
// FetcherName implements KeyFetcher
func (d Database) FetcherName() string {
return "KeyDatabase"
}
// FetchKeys implements gomatrixserverlib.KeyDatabase
func (d *Database) FetchKeys(
ctx context.Context,
requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) {
return d.statements.bulkSelectServerKeys(ctx, requests)
}
// StoreKeys implements gomatrixserverlib.KeyDatabase
func (d *Database) StoreKeys(
ctx context.Context,
keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult,
) error {
// TODO: Inserting all the keys within a single transaction may
// be more efficient since the transaction overhead can be quite
// high for a single insert statement.
var lastErr error
for request, keys := range keyMap {
if err := d.statements.upsertServerKeys(ctx, request, keys); err != nil {
// Rather than returning immediately on error we try to insert the
// remaining keys.
// Since we are inserting the keys outside of a transaction it is
// possible for some of the inserts to succeed even though some
// of the inserts have failed.
// Ensuring that we always insert all the keys we can means that
// this behaviour won't depend on the iteration order of the map.
lastErr = err
}
}
return lastErr
}

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package keydb
package postgres
import (
"context"

View file

@ -2,4 +2,4 @@
bash ./docker/build.sh
./bin/dendrite-monolith-server --tls-cert=server.crt --tls-key=server.key
./bin/dendrite-monolith-server --tls-cert=server.crt --tls-key=server.key $@

View file

@ -19,6 +19,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/common/basecomponent"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
// TODO: Are we really wanting to pull in the producer from clientapi
@ -39,11 +40,13 @@ func SetupFederationAPIComponent(
inputAPI roomserverAPI.RoomserverInputAPI,
queryAPI roomserverAPI.RoomserverQueryAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
federationSenderAPI federationSenderAPI.FederationSenderQueryAPI,
) {
roomserverProducer := producers.NewRoomserverProducer(inputAPI)
routing.Setup(
base.APIMux, *base.Cfg, queryAPI, aliasAPI, asAPI,
roomserverProducer, *keyRing, federation, accountsDB, deviceDB,
roomserverProducer, federationSenderAPI, *keyRing, federation, accountsDB,
deviceDB,
)
}

View file

@ -17,11 +17,11 @@ package routing
import (
"net/http"
"strconv"
"time"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/federationapi/types"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -90,9 +90,11 @@ func Backfill(
}
}
txn := types.NewTransaction()
txn.Origin = cfg.Matrix.ServerName
txn.PDUs = evs
txn := gomatrixserverlib.Transaction{
Origin: cfg.Matrix.ServerName,
PDUs: evs,
OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()),
}
// Send the events to the client.
return util.JSONResponse{

View file

@ -154,16 +154,23 @@ func SendJoin(
// Fetch the state and auth chain. We do this before we send the events
// on, in case this fails.
var stateAndAuthChainRepsonse api.QueryStateAndAuthChainResponse
var stateAndAuthChainResponse api.QueryStateAndAuthChainResponse
err = query.QueryStateAndAuthChain(httpReq.Context(), &api.QueryStateAndAuthChainRequest{
PrevEventIDs: event.PrevEventIDs(),
AuthEventIDs: event.AuthEventIDs(),
RoomID: roomID,
}, &stateAndAuthChainRepsonse)
}, &stateAndAuthChainResponse)
if err != nil {
return httputil.LogThenError(httpReq, err)
}
if !stateAndAuthChainResponse.RoomExists {
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("Room does not exist"),
}
}
// Send the events to the room server.
// We are responsible for notifying other servers that the user has joined
// the room, so set SendAsServer to cfg.Matrix.ServerName
@ -177,8 +184,8 @@ func SendJoin(
return util.JSONResponse{
Code: http.StatusOK,
JSON: map[string]interface{}{
"state": stateAndAuthChainRepsonse.StateEvents,
"auth_chain": stateAndAuthChainRepsonse.AuthChainEvents,
"state": stateAndAuthChainResponse.StateEvents,
"auth_chain": stateAndAuthChainResponse.AuthChainEvents,
},
}
}

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/common/config"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
@ -33,6 +34,7 @@ func RoomAliasToID(
federation *gomatrixserverlib.FederationClient,
cfg config.Dendrite,
aliasAPI roomserverAPI.RoomserverAliasAPI,
senderAPI federationSenderAPI.FederationSenderQueryAPI,
) util.JSONResponse {
roomAlias := httpReq.FormValue("room_alias")
if roomAlias == "" {
@ -59,10 +61,15 @@ func RoomAliasToID(
}
if queryRes.RoomID != "" {
// TODO: List servers that are aware of this room alias
serverQueryReq := federationSenderAPI.QueryJoinedHostServerNamesInRoomRequest{RoomID: queryRes.RoomID}
var serverQueryRes federationSenderAPI.QueryJoinedHostServerNamesInRoomResponse
if err = senderAPI.QueryJoinedHostServerNamesInRoom(httpReq.Context(), &serverQueryReq, &serverQueryRes); err != nil {
return httputil.LogThenError(httpReq, err)
}
resp = gomatrixserverlib.RespDirectory{
RoomID: queryRes.RoomID,
Servers: []gomatrixserverlib.ServerName{},
Servers: serverQueryRes.ServerNames,
}
} else {
// If no alias was found, return an error

View file

@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -32,6 +33,7 @@ import (
const (
pathPrefixV2Keys = "/_matrix/key/v2"
pathPrefixV1Federation = "/_matrix/federation/v1"
pathPrefixV2Federation = "/_matrix/federation/v2"
)
// Setup registers HTTP handlers with the given ServeMux.
@ -46,6 +48,7 @@ func Setup(
aliasAPI roomserverAPI.RoomserverAliasAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
producer *producers.RoomserverProducer,
federationSenderAPI federationSenderAPI.FederationSenderQueryAPI,
keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
accountDB *accounts.Database,
@ -53,6 +56,7 @@ func Setup(
) {
v2keysmux := apiMux.PathPrefix(pathPrefixV2Keys).Subrouter()
v1fedmux := apiMux.PathPrefix(pathPrefixV1Federation).Subrouter()
v2fedmux := apiMux.PathPrefix(pathPrefixV2Federation).Subrouter()
localKeys := common.MakeExternalAPI("localkeys", func(req *http.Request) util.JSONResponse {
return LocalKeys(cfg)
@ -156,7 +160,7 @@ func Setup(
"federation_query_room_alias", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
return RoomAliasToID(
httpReq, federation, cfg, aliasAPI,
httpReq, federation, cfg, aliasAPI, federationSenderAPI,
)
},
)).Methods(http.MethodGet)
@ -198,7 +202,7 @@ func Setup(
},
)).Methods(http.MethodGet)
v1fedmux.Handle("/send_join/{roomID}/{userID}", common.MakeFedAPI(
v2fedmux.Handle("/send_join/{roomID}/{userID}", common.MakeFedAPI(
"federation_send_join", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars, err := common.URLDecodeMapValues(mux.Vars(httpReq))
@ -228,7 +232,7 @@ func Setup(
},
)).Methods(http.MethodGet)
v1fedmux.Handle("/send_leave/{roomID}/{userID}", common.MakeFedAPI(
v2fedmux.Handle("/send_leave/{roomID}/{userID}", common.MakeFedAPI(
"federation_send_leave", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars, err := common.URLDecodeMapValues(mux.Vars(httpReq))

View file

@ -1,43 +0,0 @@
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"time"
"github.com/matrix-org/gomatrixserverlib"
)
// Transaction is the representation of a transaction from the federation API
// See https://matrix.org/docs/spec/server_server/unstable.html for more info.
type Transaction struct {
// The server_name of the homeserver sending this transaction.
Origin gomatrixserverlib.ServerName `json:"origin"`
// POSIX timestamp in milliseconds on originating homeserver when this
// transaction started.
OriginServerTS int64 `json:"origin_server_ts"`
// List of persistent updates to rooms.
PDUs []gomatrixserverlib.Event `json:"pdus"`
}
// NewTransaction sets the timestamp of a new transaction instance and then
// returns the said instance.
func NewTransaction() Transaction {
// Retrieve the current timestamp in nanoseconds and make it a milliseconds
// one.
ts := time.Now().UnixNano() / int64(time.Millisecond)
return Transaction{OriginServerTS: ts}
}

View file

@ -33,7 +33,7 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer
db *storage.Database
db storage.Database
queues *queue.OutgoingQueues
query api.RoomserverQueryAPI
}
@ -43,7 +43,7 @@ func NewOutputRoomEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store *storage.Database,
store storage.Database,
queryAPI api.RoomserverQueryAPI,
) *OutputRoomEventConsumer {
consumer := common.ContinualConsumer{

View file

@ -29,7 +29,7 @@ import (
// OutputTypingEventConsumer consumes events that originate in typing server.
type OutputTypingEventConsumer struct {
consumer *common.ContinualConsumer
db *storage.Database
db storage.Database
queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName
}
@ -39,7 +39,7 @@ func NewOutputTypingEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store *storage.Database,
store storage.Database,
) *OutputTypingEventConsumer {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputTypingEvent),

View file

@ -45,15 +45,12 @@ func (f *FederationSenderQueryAPI) QueryJoinedHostServerNamesInRoom(
return
}
serverNamesSet := make(map[gomatrixserverlib.ServerName]bool, len(joinedHosts))
response.ServerNames = make([]gomatrixserverlib.ServerName, 0, len(joinedHosts))
for _, host := range joinedHosts {
serverNamesSet[host.ServerName] = true
response.ServerNames = append(response.ServerNames, host.ServerName)
}
response.ServerNames = make([]gomatrixserverlib.ServerName, 0, len(serverNamesSet))
for name := range serverNamesSet {
response.ServerNames = append(response.ServerNames, name)
}
// TODO: remove duplicates?
return
}

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -0,0 +1,122 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/federationsender/types"
)
// Database stores information needed by the federation sender
type Database struct {
joinedHostsStatements
roomStatements
common.PartitionOffsetStatements
db *sql.DB
}
// NewDatabase opens a new database
func NewDatabase(dataSourceName string) (*Database, error) {
var result Database
var err error
if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
return nil, err
}
if err = result.prepare(); err != nil {
return nil, err
}
return &result, nil
}
func (d *Database) prepare() error {
var err error
if err = d.joinedHostsStatements.prepare(d.db); err != nil {
return err
}
if err = d.roomStatements.prepare(d.db); err != nil {
return err
}
return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
}
// UpdateRoom updates the joined hosts for a room and returns what the joined
// hosts were before the update, or nil if this was a duplicate message.
// This is called when we receive a message from kafka, so we pass in
// oldEventID and newEventID to check that we haven't missed any messages or
// this isn't a duplicate message.
func (d *Database) UpdateRoom(
ctx context.Context,
roomID, oldEventID, newEventID string,
addHosts []types.JoinedHost,
removeHosts []string,
) (joinedHosts []types.JoinedHost, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
err = d.insertRoom(ctx, txn, roomID)
if err != nil {
return err
}
lastSentEventID, err := d.selectRoomForUpdate(ctx, txn, roomID)
if err != nil {
return err
}
if lastSentEventID == newEventID {
// We've handled this message before, so let's just ignore it.
// We can only get a duplicate for the last message we processed,
// so its enough just to compare the newEventID with lastSentEventID
return nil
}
if lastSentEventID != oldEventID {
return types.EventIDMismatchError{
DatabaseID: lastSentEventID, RoomServerID: oldEventID,
}
}
joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID)
if err != nil {
return err
}
for _, add := range addHosts {
err = d.insertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName)
if err != nil {
return err
}
}
if err = d.deleteJoinedHosts(ctx, txn, removeHosts); err != nil {
return err
}
return d.updateRoom(ctx, txn, roomID, newEventID)
})
return
}
// GetJoinedHosts returns the currently joined hosts for room,
// as known to federationserver.
// Returns an error if something goes wrong.
func (d *Database) GetJoinedHosts(
ctx context.Context, roomID string,
) ([]types.JoinedHost, error) {
return d.selectJoinedHosts(ctx, roomID)
}

View file

@ -1,4 +1,4 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -16,106 +16,30 @@ package storage
import (
"context"
"database/sql"
"errors"
"net/url"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/federationsender/storage/postgres"
"github.com/matrix-org/dendrite/federationsender/types"
)
// Database stores information needed by the federation sender
type Database struct {
joinedHostsStatements
roomStatements
common.PartitionOffsetStatements
db *sql.DB
type Database interface {
common.PartitionStorer
UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error)
GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
}
// NewDatabase opens a new database
func NewDatabase(dataSourceName string) (*Database, error) {
var result Database
var err error
if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
func NewDatabase(dataSourceName string) (Database, error) {
uri, err := url.Parse(dataSourceName)
if err != nil {
return nil, err
}
if err = result.prepare(); err != nil {
return nil, err
switch uri.Scheme {
case "postgres":
return postgres.NewDatabase(dataSourceName)
default:
return nil, errors.New("unknown schema")
}
return &result, nil
}
func (d *Database) prepare() error {
var err error
if err = d.joinedHostsStatements.prepare(d.db); err != nil {
return err
}
if err = d.roomStatements.prepare(d.db); err != nil {
return err
}
return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
}
// UpdateRoom updates the joined hosts for a room and returns what the joined
// hosts were before the update, or nil if this was a duplicate message.
// This is called when we receive a message from kafka, so we pass in
// oldEventID and newEventID to check that we haven't missed any messages or
// this isn't a duplicate message.
func (d *Database) UpdateRoom(
ctx context.Context,
roomID, oldEventID, newEventID string,
addHosts []types.JoinedHost,
removeHosts []string,
) (joinedHosts []types.JoinedHost, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
err = d.insertRoom(ctx, txn, roomID)
if err != nil {
return err
}
lastSentEventID, err := d.selectRoomForUpdate(ctx, txn, roomID)
if err != nil {
return err
}
if lastSentEventID == newEventID {
// We've handled this message before, so let's just ignore it.
// We can only get a duplicate for the last message we processed,
// so its enough just to compare the newEventID with lastSentEventID
return nil
}
if lastSentEventID != oldEventID {
return types.EventIDMismatchError{
DatabaseID: lastSentEventID, RoomServerID: oldEventID,
}
}
joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID)
if err != nil {
return err
}
for _, add := range addHosts {
err = d.insertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName)
if err != nil {
return err
}
}
if err = d.deleteJoinedHosts(ctx, txn, removeHosts); err != nil {
return err
}
return d.updateRoom(ctx, txn, roomID, newEventID)
})
return
}
// GetJoinedHosts returns the currently joined hosts for room,
// as known to federationserver.
// Returns an error if something goes wrong.
func (d *Database) GetJoinedHosts(
ctx context.Context, roomID string,
) ([]types.JoinedHost, error) {
return d.selectJoinedHosts(ctx, roomID)
}

View file

@ -67,7 +67,7 @@ func Download(
origin gomatrixserverlib.ServerName,
mediaID types.MediaID,
cfg *config.Dendrite,
db *storage.Database,
db storage.Database,
client *gomatrixserverlib.Client,
activeRemoteRequests *types.ActiveRemoteRequests,
activeThumbnailGeneration *types.ActiveThumbnailGeneration,
@ -192,7 +192,7 @@ func (r *downloadRequest) doDownload(
ctx context.Context,
w http.ResponseWriter,
cfg *config.Dendrite,
db *storage.Database,
db storage.Database,
client *gomatrixserverlib.Client,
activeRemoteRequests *types.ActiveRemoteRequests,
activeThumbnailGeneration *types.ActiveThumbnailGeneration,
@ -235,7 +235,7 @@ func (r *downloadRequest) respondFromLocalFile(
absBasePath config.Path,
activeThumbnailGeneration *types.ActiveThumbnailGeneration,
maxThumbnailGenerators int,
db *storage.Database,
db storage.Database,
dynamicThumbnails bool,
thumbnailSizes []config.ThumbnailSize,
) (*types.MediaMetadata, error) {
@ -325,7 +325,7 @@ func (r *downloadRequest) getThumbnailFile(
filePath types.Path,
activeThumbnailGeneration *types.ActiveThumbnailGeneration,
maxThumbnailGenerators int,
db *storage.Database,
db storage.Database,
dynamicThumbnails bool,
thumbnailSizes []config.ThumbnailSize,
) (*os.File, *types.ThumbnailMetadata, error) {
@ -407,7 +407,7 @@ func (r *downloadRequest) generateThumbnail(
thumbnailSize types.ThumbnailSize,
activeThumbnailGeneration *types.ActiveThumbnailGeneration,
maxThumbnailGenerators int,
db *storage.Database,
db storage.Database,
) (*types.ThumbnailMetadata, error) {
r.Logger.WithFields(log.Fields{
"Width": thumbnailSize.Width,
@ -443,7 +443,7 @@ func (r *downloadRequest) getRemoteFile(
ctx context.Context,
client *gomatrixserverlib.Client,
cfg *config.Dendrite,
db *storage.Database,
db storage.Database,
activeRemoteRequests *types.ActiveRemoteRequests,
activeThumbnailGeneration *types.ActiveThumbnailGeneration,
) (errorResponse error) {
@ -545,7 +545,7 @@ func (r *downloadRequest) fetchRemoteFileAndStoreMetadata(
client *gomatrixserverlib.Client,
absBasePath config.Path,
maxFileSizeBytes config.FileSizeBytes,
db *storage.Database,
db storage.Database,
thumbnailSizes []config.ThumbnailSize,
activeThumbnailGeneration *types.ActiveThumbnailGeneration,
maxThumbnailGenerators int,

View file

@ -43,7 +43,7 @@ const pathPrefixR0 = "/_matrix/media/r0"
func Setup(
apiMux *mux.Router,
cfg *config.Dendrite,
db *storage.Database,
db storage.Database,
deviceDB *devices.Database,
client *gomatrixserverlib.Client,
) {
@ -80,7 +80,7 @@ func Setup(
func makeDownloadAPI(
name string,
cfg *config.Dendrite,
db *storage.Database,
db storage.Database,
client *gomatrixserverlib.Client,
activeRemoteRequests *types.ActiveRemoteRequests,
activeThumbnailGeneration *types.ActiveThumbnailGeneration,

View file

@ -53,7 +53,7 @@ type uploadResponse struct {
// This implementation supports a configurable maximum file size limit in bytes. If a user tries to upload more than this, they will receive an error that their upload is too large.
// Uploaded files are processed piece-wise to avoid DoS attacks which would starve the server of memory.
// TODO: We should time out requests if they have not received any data within a configured timeout period.
func Upload(req *http.Request, cfg *config.Dendrite, db *storage.Database, activeThumbnailGeneration *types.ActiveThumbnailGeneration) util.JSONResponse {
func Upload(req *http.Request, cfg *config.Dendrite, db storage.Database, activeThumbnailGeneration *types.ActiveThumbnailGeneration) util.JSONResponse {
r, resErr := parseAndValidateRequest(req, cfg)
if resErr != nil {
return *resErr
@ -96,7 +96,7 @@ func (r *uploadRequest) doUpload(
ctx context.Context,
reqReader io.Reader,
cfg *config.Dendrite,
db *storage.Database,
db storage.Database,
activeThumbnailGeneration *types.ActiveThumbnailGeneration,
) *util.JSONResponse {
r.Logger.WithFields(log.Fields{
@ -214,7 +214,7 @@ func (r *uploadRequest) storeFileAndMetadata(
ctx context.Context,
tmpDir types.Path,
absBasePath config.Path,
db *storage.Database,
db storage.Database,
thumbnailSizes []config.ThumbnailSize,
activeThumbnailGeneration *types.ActiveThumbnailGeneration,
maxThumbnailGenerators int,

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -14,7 +15,7 @@
// FIXME: This should be made common!
package storage
package postgres
import (
"database/sql"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"database/sql"

View file

@ -0,0 +1,106 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/mediaapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
// Database is used to store metadata about a repository of media files.
type Database struct {
statements statements
db *sql.DB
}
// Open opens a postgres database.
func Open(dataSourceName string) (*Database, error) {
var d Database
var err error
if d.db, err = sql.Open("postgres", dataSourceName); err != nil {
return nil, err
}
if err = d.statements.prepare(d.db); err != nil {
return nil, err
}
return &d, nil
}
// StoreMediaMetadata inserts the metadata about the uploaded media into the database.
// Returns an error if the combination of MediaID and Origin are not unique in the table.
func (d *Database) StoreMediaMetadata(
ctx context.Context, mediaMetadata *types.MediaMetadata,
) error {
return d.statements.media.insertMedia(ctx, mediaMetadata)
}
// GetMediaMetadata returns metadata about media stored on this server.
// The media could have been uploaded to this server or fetched from another server and cached here.
// Returns nil metadata if there is no metadata associated with this media.
func (d *Database) GetMediaMetadata(
ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName,
) (*types.MediaMetadata, error) {
mediaMetadata, err := d.statements.media.selectMedia(ctx, mediaID, mediaOrigin)
if err != nil && err == sql.ErrNoRows {
return nil, nil
}
return mediaMetadata, err
}
// StoreThumbnail inserts the metadata about the thumbnail into the database.
// Returns an error if the combination of MediaID and Origin are not unique in the table.
func (d *Database) StoreThumbnail(
ctx context.Context, thumbnailMetadata *types.ThumbnailMetadata,
) error {
return d.statements.thumbnail.insertThumbnail(ctx, thumbnailMetadata)
}
// GetThumbnail returns metadata about a specific thumbnail.
// The media could have been uploaded to this server or fetched from another server and cached here.
// Returns nil metadata if there is no metadata associated with this thumbnail.
func (d *Database) GetThumbnail(
ctx context.Context,
mediaID types.MediaID,
mediaOrigin gomatrixserverlib.ServerName,
width, height int,
resizeMethod string,
) (*types.ThumbnailMetadata, error) {
thumbnailMetadata, err := d.statements.thumbnail.selectThumbnail(
ctx, mediaID, mediaOrigin, width, height, resizeMethod,
)
if err != nil && err == sql.ErrNoRows {
return nil, nil
}
return thumbnailMetadata, err
}
// GetThumbnails returns metadata about all thumbnails for a specific media stored on this server.
// The media could have been uploaded to this server or fetched from another server and cached here.
// Returns nil metadata if there are no thumbnails associated with this media.
func (d *Database) GetThumbnails(
ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName,
) ([]*types.ThumbnailMetadata, error) {
thumbnails, err := d.statements.thumbnail.selectThumbnails(ctx, mediaID, mediaOrigin)
if err != nil && err == sql.ErrNoRows {
return nil, nil
}
return thumbnails, err
}

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -1,4 +1,4 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -16,90 +16,32 @@ package storage
import (
"context"
"database/sql"
"errors"
"net/url"
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/mediaapi/storage/postgres"
"github.com/matrix-org/dendrite/mediaapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
// Database is used to store metadata about a repository of media files.
type Database struct {
statements statements
db *sql.DB
type Database interface {
StoreMediaMetadata(ctx context.Context, mediaMetadata *types.MediaMetadata) error
GetMediaMetadata(ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName) (*types.MediaMetadata, error)
StoreThumbnail(ctx context.Context, thumbnailMetadata *types.ThumbnailMetadata) error
GetThumbnail(ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName, width, height int, resizeMethod string) (*types.ThumbnailMetadata, error)
GetThumbnails(ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName) ([]*types.ThumbnailMetadata, error)
}
// Open opens a postgres database.
func Open(dataSourceName string) (*Database, error) {
var d Database
var err error
if d.db, err = sql.Open("postgres", dataSourceName); err != nil {
func Open(dataSourceName string) (Database, error) {
uri, err := url.Parse(dataSourceName)
if err != nil {
return nil, err
}
if err = d.statements.prepare(d.db); err != nil {
return nil, err
switch uri.Scheme {
case "postgres":
return postgres.Open(dataSourceName)
default:
return nil, errors.New("unknown schema")
}
return &d, nil
}
// StoreMediaMetadata inserts the metadata about the uploaded media into the database.
// Returns an error if the combination of MediaID and Origin are not unique in the table.
func (d *Database) StoreMediaMetadata(
ctx context.Context, mediaMetadata *types.MediaMetadata,
) error {
return d.statements.media.insertMedia(ctx, mediaMetadata)
}
// GetMediaMetadata returns metadata about media stored on this server.
// The media could have been uploaded to this server or fetched from another server and cached here.
// Returns nil metadata if there is no metadata associated with this media.
func (d *Database) GetMediaMetadata(
ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName,
) (*types.MediaMetadata, error) {
mediaMetadata, err := d.statements.media.selectMedia(ctx, mediaID, mediaOrigin)
if err != nil && err == sql.ErrNoRows {
return nil, nil
}
return mediaMetadata, err
}
// StoreThumbnail inserts the metadata about the thumbnail into the database.
// Returns an error if the combination of MediaID and Origin are not unique in the table.
func (d *Database) StoreThumbnail(
ctx context.Context, thumbnailMetadata *types.ThumbnailMetadata,
) error {
return d.statements.thumbnail.insertThumbnail(ctx, thumbnailMetadata)
}
// GetThumbnail returns metadata about a specific thumbnail.
// The media could have been uploaded to this server or fetched from another server and cached here.
// Returns nil metadata if there is no metadata associated with this thumbnail.
func (d *Database) GetThumbnail(
ctx context.Context,
mediaID types.MediaID,
mediaOrigin gomatrixserverlib.ServerName,
width, height int,
resizeMethod string,
) (*types.ThumbnailMetadata, error) {
thumbnailMetadata, err := d.statements.thumbnail.selectThumbnail(
ctx, mediaID, mediaOrigin, width, height, resizeMethod,
)
if err != nil && err == sql.ErrNoRows {
return nil, nil
}
return thumbnailMetadata, err
}
// GetThumbnails returns metadata about all thumbnails for a specific media stored on this server.
// The media could have been uploaded to this server or fetched from another server and cached here.
// Returns nil metadata if there are no thumbnails associated with this media.
func (d *Database) GetThumbnails(
ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName,
) ([]*types.ThumbnailMetadata, error) {
thumbnails, err := d.statements.thumbnail.selectThumbnails(ctx, mediaID, mediaOrigin)
if err != nil && err == sql.ErrNoRows {
return nil, nil
}
return thumbnails, err
}

View file

@ -136,7 +136,7 @@ func isThumbnailExists(
dst types.Path,
config types.ThumbnailSize,
mediaMetadata *types.MediaMetadata,
db *storage.Database,
db storage.Database,
logger *log.Entry,
) (bool, error) {
thumbnailMetadata, err := db.GetThumbnail(

View file

@ -45,7 +45,7 @@ func GenerateThumbnails(
mediaMetadata *types.MediaMetadata,
activeThumbnailGeneration *types.ActiveThumbnailGeneration,
maxThumbnailGenerators int,
db *storage.Database,
db storage.Database,
logger *log.Entry,
) (busy bool, errorReturn error) {
img, err := readFile(string(src))
@ -78,7 +78,7 @@ func GenerateThumbnail(
mediaMetadata *types.MediaMetadata,
activeThumbnailGeneration *types.ActiveThumbnailGeneration,
maxThumbnailGenerators int,
db *storage.Database,
db storage.Database,
logger *log.Entry,
) (busy bool, errorReturn error) {
img, err := readFile(string(src))
@ -142,7 +142,7 @@ func createThumbnail(
mediaMetadata *types.MediaMetadata,
activeThumbnailGeneration *types.ActiveThumbnailGeneration,
maxThumbnailGenerators int,
db *storage.Database,
db storage.Database,
logger *log.Entry,
) (busy bool, errorReturn error) {
logger = logger.WithFields(log.Fields{

View file

@ -29,7 +29,7 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer
db *storage.PublicRoomsServerDatabase
db storage.Database
query api.RoomserverQueryAPI
}
@ -37,7 +37,7 @@ type OutputRoomEventConsumer struct {
func NewOutputRoomEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
store *storage.PublicRoomsServerDatabase,
store storage.Database,
queryAPI api.RoomserverQueryAPI,
) *OutputRoomEventConsumer {
consumer := common.ContinualConsumer{

View file

@ -30,7 +30,7 @@ type roomVisibility struct {
// GetVisibility implements GET /directory/list/room/{roomID}
func GetVisibility(
req *http.Request, publicRoomsDatabase *storage.PublicRoomsServerDatabase,
req *http.Request, publicRoomsDatabase storage.Database,
roomID string,
) util.JSONResponse {
isPublic, err := publicRoomsDatabase.GetRoomVisibility(req.Context(), roomID)
@ -54,7 +54,7 @@ func GetVisibility(
// SetVisibility implements PUT /directory/list/room/{roomID}
// TODO: Check if user has the power level to edit the room visibility
func SetVisibility(
req *http.Request, publicRoomsDatabase *storage.PublicRoomsServerDatabase,
req *http.Request, publicRoomsDatabase storage.Database,
roomID string,
) util.JSONResponse {
var v roomVisibility

View file

@ -44,7 +44,7 @@ type publicRoomRes struct {
// GetPostPublicRooms implements GET and POST /publicRooms
func GetPostPublicRooms(
req *http.Request, publicRoomDatabase *storage.PublicRoomsServerDatabase,
req *http.Request, publicRoomDatabase storage.Database,
) util.JSONResponse {
var limit int16
var offset int64

View file

@ -34,7 +34,7 @@ const pathPrefixR0 = "/_matrix/client/r0"
// Due to Setup being used to call many other functions, a gocyclo nolint is
// applied:
// nolint: gocyclo
func Setup(apiMux *mux.Router, deviceDB *devices.Database, publicRoomsDB *storage.PublicRoomsServerDatabase) {
func Setup(apiMux *mux.Router, deviceDB *devices.Database, publicRoomsDB storage.Database) {
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
authData := auth.Data{

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"database/sql"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -0,0 +1,253 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"encoding/json"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/publicroomsapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
// PublicRoomsServerDatabase represents a public rooms server database.
type PublicRoomsServerDatabase struct {
db *sql.DB
common.PartitionOffsetStatements
statements publicRoomsStatements
}
type attributeValue interface{}
// NewPublicRoomsServerDatabase creates a new public rooms server database.
func NewPublicRoomsServerDatabase(dataSourceName string) (*PublicRoomsServerDatabase, error) {
var db *sql.DB
var err error
if db, err = sql.Open("postgres", dataSourceName); err != nil {
return nil, err
}
partitions := common.PartitionOffsetStatements{}
if err = partitions.Prepare(db, "publicroomsapi"); err != nil {
return nil, err
}
statements := publicRoomsStatements{}
if err = statements.prepare(db); err != nil {
return nil, err
}
return &PublicRoomsServerDatabase{db, partitions, statements}, nil
}
// GetRoomVisibility returns the room visibility as a boolean: true if the room
// is publicly visible, false if not.
// Returns an error if the retrieval failed.
func (d *PublicRoomsServerDatabase) GetRoomVisibility(
ctx context.Context, roomID string,
) (bool, error) {
return d.statements.selectRoomVisibility(ctx, roomID)
}
// SetRoomVisibility updates the visibility attribute of a room. This attribute
// must be set to true if the room is publicly visible, false if not.
// Returns an error if the update failed.
func (d *PublicRoomsServerDatabase) SetRoomVisibility(
ctx context.Context, visible bool, roomID string,
) error {
return d.statements.updateRoomAttribute(ctx, "visibility", visible, roomID)
}
// CountPublicRooms returns the number of room set as publicly visible on the server.
// Returns an error if the retrieval failed.
func (d *PublicRoomsServerDatabase) CountPublicRooms(ctx context.Context) (int64, error) {
return d.statements.countPublicRooms(ctx)
}
// GetPublicRooms returns an array containing the local rooms set as publicly visible, ordered by their number
// of joined members. This array can be limited by a given number of elements, and offset by a given value.
// If the limit is 0, doesn't limit the number of results. If the offset is 0 too, the array contains all
// the rooms set as publicly visible on the server.
// Returns an error if the retrieval failed.
func (d *PublicRoomsServerDatabase) GetPublicRooms(
ctx context.Context, offset int64, limit int16, filter string,
) ([]types.PublicRoom, error) {
return d.statements.selectPublicRooms(ctx, offset, limit, filter)
}
// UpdateRoomFromEvents iterate over a slice of state events and call
// UpdateRoomFromEvent on each of them to update the database representation of
// the rooms updated by each event.
// The slice of events to remove is used to update the number of joined members
// for the room in the database.
// If the update triggered by one of the events failed, aborts the process and
// returns an error.
func (d *PublicRoomsServerDatabase) UpdateRoomFromEvents(
ctx context.Context,
eventsToAdd []gomatrixserverlib.Event,
eventsToRemove []gomatrixserverlib.Event,
) error {
for _, event := range eventsToAdd {
if err := d.UpdateRoomFromEvent(ctx, event); err != nil {
return err
}
}
for _, event := range eventsToRemove {
if event.Type() == "m.room.member" {
if err := d.updateNumJoinedUsers(ctx, event, true); err != nil {
return err
}
}
}
return nil
}
// UpdateRoomFromEvent updates the database representation of a room from a Matrix event, by
// checking the event's type to know which attribute to change and using the event's content
// to define the new value of the attribute.
// If the event doesn't match with any property used to compute the public room directory,
// does nothing.
// If something went wrong during the process, returns an error.
func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(
ctx context.Context, event gomatrixserverlib.Event,
) error {
// Process the event according to its type
switch event.Type() {
case "m.room.create":
return d.statements.insertNewRoom(ctx, event.RoomID())
case "m.room.member":
return d.updateNumJoinedUsers(ctx, event, false)
case "m.room.aliases":
return d.updateRoomAliases(ctx, event)
case "m.room.canonical_alias":
var content common.CanonicalAliasContent
field := &(content.Alias)
attrName := "canonical_alias"
return d.updateStringAttribute(ctx, attrName, event, &content, field)
case "m.room.name":
var content common.NameContent
field := &(content.Name)
attrName := "name"
return d.updateStringAttribute(ctx, attrName, event, &content, field)
case "m.room.topic":
var content common.TopicContent
field := &(content.Topic)
attrName := "topic"
return d.updateStringAttribute(ctx, attrName, event, &content, field)
case "m.room.avatar":
var content common.AvatarContent
field := &(content.URL)
attrName := "avatar_url"
return d.updateStringAttribute(ctx, attrName, event, &content, field)
case "m.room.history_visibility":
var content common.HistoryVisibilityContent
field := &(content.HistoryVisibility)
attrName := "world_readable"
strForTrue := "world_readable"
return d.updateBooleanAttribute(ctx, attrName, event, &content, field, strForTrue)
case "m.room.guest_access":
var content common.GuestAccessContent
field := &(content.GuestAccess)
attrName := "guest_can_join"
strForTrue := "can_join"
return d.updateBooleanAttribute(ctx, attrName, event, &content, field, strForTrue)
}
// If the event type didn't match, return with no error
return nil
}
// updateNumJoinedUsers updates the number of joined user in the database representation
// of a room using a given "m.room.member" Matrix event.
// If the membership property of the event isn't "join", ignores it and returs nil.
// If the remove parameter is set to false, increments the joined members counter in the
// database, if set to truem decrements it.
// Returns an error if the update failed.
func (d *PublicRoomsServerDatabase) updateNumJoinedUsers(
ctx context.Context, membershipEvent gomatrixserverlib.Event, remove bool,
) error {
membership, err := membershipEvent.Membership()
if err != nil {
return err
}
if membership != gomatrixserverlib.Join {
return nil
}
if remove {
return d.statements.decrementJoinedMembersInRoom(ctx, membershipEvent.RoomID())
}
return d.statements.incrementJoinedMembersInRoom(ctx, membershipEvent.RoomID())
}
// updateStringAttribute updates a given string attribute in the database
// representation of a room using a given string data field from content of the
// Matrix event triggering the update.
// Returns an error if decoding the Matrix event's content or updating the attribute
// failed.
func (d *PublicRoomsServerDatabase) updateStringAttribute(
ctx context.Context, attrName string, event gomatrixserverlib.Event,
content interface{}, field *string,
) error {
if err := json.Unmarshal(event.Content(), content); err != nil {
return err
}
return d.statements.updateRoomAttribute(ctx, attrName, *field, event.RoomID())
}
// updateBooleanAttribute updates a given boolean attribute in the database
// representation of a room using a given string data field from content of the
// Matrix event triggering the update.
// The attribute is set to true if the field matches a given string, false if not.
// Returns an error if decoding the Matrix event's content or updating the attribute
// failed.
func (d *PublicRoomsServerDatabase) updateBooleanAttribute(
ctx context.Context, attrName string, event gomatrixserverlib.Event,
content interface{}, field *string, strForTrue string,
) error {
if err := json.Unmarshal(event.Content(), content); err != nil {
return err
}
var attrValue bool
if *field == strForTrue {
attrValue = true
} else {
attrValue = false
}
return d.statements.updateRoomAttribute(ctx, attrName, attrValue, event.RoomID())
}
// updateRoomAliases decodes the content of a "m.room.aliases" Matrix event and update the list of aliases of
// a given room with it.
// Returns an error if decoding the Matrix event or updating the list failed.
func (d *PublicRoomsServerDatabase) updateRoomAliases(
ctx context.Context, aliasesEvent gomatrixserverlib.Event,
) error {
var content common.AliasesContent
if err := json.Unmarshal(aliasesEvent.Content(), &content); err != nil {
return err
}
return d.statements.updateRoomAttribute(
ctx, "aliases", content.Aliases, aliasesEvent.RoomID(),
)
}

View file

@ -1,4 +1,4 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -16,237 +16,35 @@ package storage
import (
"context"
"database/sql"
"encoding/json"
"errors"
"net/url"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/publicroomsapi/storage/postgres"
"github.com/matrix-org/dendrite/publicroomsapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
// PublicRoomsServerDatabase represents a public rooms server database.
type PublicRoomsServerDatabase struct {
db *sql.DB
common.PartitionOffsetStatements
statements publicRoomsStatements
type Database interface {
common.PartitionStorer
GetRoomVisibility(ctx context.Context, roomID string) (bool, error)
SetRoomVisibility(ctx context.Context, visible bool, roomID string) error
CountPublicRooms(ctx context.Context) (int64, error)
GetPublicRooms(ctx context.Context, offset int64, limit int16, filter string) ([]types.PublicRoom, error)
UpdateRoomFromEvents(ctx context.Context, eventsToAdd []gomatrixserverlib.Event, eventsToRemove []gomatrixserverlib.Event) error
UpdateRoomFromEvent(ctx context.Context, event gomatrixserverlib.Event) error
}
type attributeValue interface{}
// NewPublicRoomsServerDatabase creates a new public rooms server database.
func NewPublicRoomsServerDatabase(dataSourceName string) (*PublicRoomsServerDatabase, error) {
var db *sql.DB
var err error
if db, err = sql.Open("postgres", dataSourceName); err != nil {
return nil, err
}
partitions := common.PartitionOffsetStatements{}
if err = partitions.Prepare(db, "publicroomsapi"); err != nil {
return nil, err
}
statements := publicRoomsStatements{}
if err = statements.prepare(db); err != nil {
return nil, err
}
return &PublicRoomsServerDatabase{db, partitions, statements}, nil
}
// GetRoomVisibility returns the room visibility as a boolean: true if the room
// is publicly visible, false if not.
// Returns an error if the retrieval failed.
func (d *PublicRoomsServerDatabase) GetRoomVisibility(
ctx context.Context, roomID string,
) (bool, error) {
return d.statements.selectRoomVisibility(ctx, roomID)
}
// SetRoomVisibility updates the visibility attribute of a room. This attribute
// must be set to true if the room is publicly visible, false if not.
// Returns an error if the update failed.
func (d *PublicRoomsServerDatabase) SetRoomVisibility(
ctx context.Context, visible bool, roomID string,
) error {
return d.statements.updateRoomAttribute(ctx, "visibility", visible, roomID)
}
// CountPublicRooms returns the number of room set as publicly visible on the server.
// Returns an error if the retrieval failed.
func (d *PublicRoomsServerDatabase) CountPublicRooms(ctx context.Context) (int64, error) {
return d.statements.countPublicRooms(ctx)
}
// GetPublicRooms returns an array containing the local rooms set as publicly visible, ordered by their number
// of joined members. This array can be limited by a given number of elements, and offset by a given value.
// If the limit is 0, doesn't limit the number of results. If the offset is 0 too, the array contains all
// the rooms set as publicly visible on the server.
// Returns an error if the retrieval failed.
func (d *PublicRoomsServerDatabase) GetPublicRooms(
ctx context.Context, offset int64, limit int16, filter string,
) ([]types.PublicRoom, error) {
return d.statements.selectPublicRooms(ctx, offset, limit, filter)
}
// UpdateRoomFromEvents iterate over a slice of state events and call
// UpdateRoomFromEvent on each of them to update the database representation of
// the rooms updated by each event.
// The slice of events to remove is used to update the number of joined members
// for the room in the database.
// If the update triggered by one of the events failed, aborts the process and
// returns an error.
func (d *PublicRoomsServerDatabase) UpdateRoomFromEvents(
ctx context.Context,
eventsToAdd []gomatrixserverlib.Event,
eventsToRemove []gomatrixserverlib.Event,
) error {
for _, event := range eventsToAdd {
if err := d.UpdateRoomFromEvent(ctx, event); err != nil {
return err
}
}
for _, event := range eventsToRemove {
if event.Type() == "m.room.member" {
if err := d.updateNumJoinedUsers(ctx, event, true); err != nil {
return err
}
}
}
return nil
}
// UpdateRoomFromEvent updates the database representation of a room from a Matrix event, by
// checking the event's type to know which attribute to change and using the event's content
// to define the new value of the attribute.
// If the event doesn't match with any property used to compute the public room directory,
// does nothing.
// If something went wrong during the process, returns an error.
func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(
ctx context.Context, event gomatrixserverlib.Event,
) error {
// Process the event according to its type
switch event.Type() {
case "m.room.create":
return d.statements.insertNewRoom(ctx, event.RoomID())
case "m.room.member":
return d.updateNumJoinedUsers(ctx, event, false)
case "m.room.aliases":
return d.updateRoomAliases(ctx, event)
case "m.room.canonical_alias":
var content common.CanonicalAliasContent
field := &(content.Alias)
attrName := "canonical_alias"
return d.updateStringAttribute(ctx, attrName, event, &content, field)
case "m.room.name":
var content common.NameContent
field := &(content.Name)
attrName := "name"
return d.updateStringAttribute(ctx, attrName, event, &content, field)
case "m.room.topic":
var content common.TopicContent
field := &(content.Topic)
attrName := "topic"
return d.updateStringAttribute(ctx, attrName, event, &content, field)
case "m.room.avatar":
var content common.AvatarContent
field := &(content.URL)
attrName := "avatar_url"
return d.updateStringAttribute(ctx, attrName, event, &content, field)
case "m.room.history_visibility":
var content common.HistoryVisibilityContent
field := &(content.HistoryVisibility)
attrName := "world_readable"
strForTrue := "world_readable"
return d.updateBooleanAttribute(ctx, attrName, event, &content, field, strForTrue)
case "m.room.guest_access":
var content common.GuestAccessContent
field := &(content.GuestAccess)
attrName := "guest_can_join"
strForTrue := "can_join"
return d.updateBooleanAttribute(ctx, attrName, event, &content, field, strForTrue)
}
// If the event type didn't match, return with no error
return nil
}
// updateNumJoinedUsers updates the number of joined user in the database representation
// of a room using a given "m.room.member" Matrix event.
// If the membership property of the event isn't "join", ignores it and returs nil.
// If the remove parameter is set to false, increments the joined members counter in the
// database, if set to truem decrements it.
// Returns an error if the update failed.
func (d *PublicRoomsServerDatabase) updateNumJoinedUsers(
ctx context.Context, membershipEvent gomatrixserverlib.Event, remove bool,
) error {
membership, err := membershipEvent.Membership()
// NewPublicRoomsServerDatabase opens a database connection.
func NewPublicRoomsServerDatabase(dataSourceName string) (Database, error) {
uri, err := url.Parse(dataSourceName)
if err != nil {
return err
return nil, err
}
if membership != gomatrixserverlib.Join {
return nil
switch uri.Scheme {
case "postgres":
return postgres.NewPublicRoomsServerDatabase(dataSourceName)
default:
return nil, errors.New("unknown schema")
}
if remove {
return d.statements.decrementJoinedMembersInRoom(ctx, membershipEvent.RoomID())
}
return d.statements.incrementJoinedMembersInRoom(ctx, membershipEvent.RoomID())
}
// updateStringAttribute updates a given string attribute in the database
// representation of a room using a given string data field from content of the
// Matrix event triggering the update.
// Returns an error if decoding the Matrix event's content or updating the attribute
// failed.
func (d *PublicRoomsServerDatabase) updateStringAttribute(
ctx context.Context, attrName string, event gomatrixserverlib.Event,
content interface{}, field *string,
) error {
if err := json.Unmarshal(event.Content(), content); err != nil {
return err
}
return d.statements.updateRoomAttribute(ctx, attrName, *field, event.RoomID())
}
// updateBooleanAttribute updates a given boolean attribute in the database
// representation of a room using a given string data field from content of the
// Matrix event triggering the update.
// The attribute is set to true if the field matches a given string, false if not.
// Returns an error if decoding the Matrix event's content or updating the attribute
// failed.
func (d *PublicRoomsServerDatabase) updateBooleanAttribute(
ctx context.Context, attrName string, event gomatrixserverlib.Event,
content interface{}, field *string, strForTrue string,
) error {
if err := json.Unmarshal(event.Content(), content); err != nil {
return err
}
var attrValue bool
if *field == strForTrue {
attrValue = true
} else {
attrValue = false
}
return d.statements.updateRoomAttribute(ctx, attrName, attrValue, event.RoomID())
}
// updateRoomAliases decodes the content of a "m.room.aliases" Matrix event and update the list of aliases of
// a given room with it.
// Returns an error if decoding the Matrix event or updating the list failed.
func (d *PublicRoomsServerDatabase) updateRoomAliases(
ctx context.Context, aliasesEvent gomatrixserverlib.Event,
) error {
var content common.AliasesContent
if err := json.Unmarshal(aliasesEvent.Content(), &content); err != nil {
return err
}
return d.statements.updateRoomAttribute(
ctx, "aliases", content.Aliases, aliasesEvent.RoomID(),
)
}

View file

@ -332,7 +332,7 @@ const RoomserverQueryMissingEventsPath = "/api/roomserver/queryMissingEvents"
// RoomserverQueryStateAndAuthChainPath is the HTTP path for the QueryStateAndAuthChain API
const RoomserverQueryStateAndAuthChainPath = "/api/roomserver/queryStateAndAuthChain"
// RoomserverQueryBackfillPath is the HTTP path for the QueryMissingEvents API
// RoomserverQueryBackfillPath is the HTTP path for the QueryBackfill API
const RoomserverQueryBackfillPath = "/api/roomserver/QueryBackfill"
// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API.
@ -475,6 +475,6 @@ func (h *httpRoomserverQueryAPI) QueryBackfill(
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBackfill")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverQueryMissingEventsPath
apiURL := h.roomserverURL + RoomserverQueryBackfillPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"database/sql"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"database/sql"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"sort"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -0,0 +1,713 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
// A Database is used to store room events and stream offsets.
type Database struct {
statements statements
db *sql.DB
}
// Open a postgres database.
func Open(dataSourceName string) (*Database, error) {
var d Database
var err error
if d.db, err = sql.Open("postgres", dataSourceName); err != nil {
return nil, err
}
if err = d.statements.prepare(d.db); err != nil {
return nil, err
}
return &d, nil
}
// StoreEvent implements input.EventDatabase
func (d *Database) StoreEvent(
ctx context.Context, event gomatrixserverlib.Event,
txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID,
) (types.RoomNID, types.StateAtEvent, error) {
var (
roomNID types.RoomNID
eventTypeNID types.EventTypeNID
eventStateKeyNID types.EventStateKeyNID
eventNID types.EventNID
stateNID types.StateSnapshotNID
err error
)
if txnAndSessionID != nil {
if err = d.statements.insertTransaction(
ctx, txnAndSessionID.TransactionID,
txnAndSessionID.SessionID, event.Sender(), event.EventID(),
); err != nil {
return 0, types.StateAtEvent{}, err
}
}
if roomNID, err = d.assignRoomNID(ctx, nil, event.RoomID()); err != nil {
return 0, types.StateAtEvent{}, err
}
if eventTypeNID, err = d.assignEventTypeNID(ctx, event.Type()); err != nil {
return 0, types.StateAtEvent{}, err
}
eventStateKey := event.StateKey()
// Assigned a numeric ID for the state_key if there is one present.
// Otherwise set the numeric ID for the state_key to 0.
if eventStateKey != nil {
if eventStateKeyNID, err = d.assignStateKeyNID(ctx, nil, *eventStateKey); err != nil {
return 0, types.StateAtEvent{}, err
}
}
if eventNID, stateNID, err = d.statements.insertEvent(
ctx,
roomNID,
eventTypeNID,
eventStateKeyNID,
event.EventID(),
event.EventReference().EventSHA256,
authEventNIDs,
event.Depth(),
); err != nil {
if err == sql.ErrNoRows {
// We've already inserted the event so select the numeric event ID
eventNID, stateNID, err = d.statements.selectEvent(ctx, event.EventID())
}
if err != nil {
return 0, types.StateAtEvent{}, err
}
}
if err = d.statements.insertEventJSON(ctx, eventNID, event.JSON()); err != nil {
return 0, types.StateAtEvent{}, err
}
return roomNID, types.StateAtEvent{
BeforeStateSnapshotNID: stateNID,
StateEntry: types.StateEntry{
StateKeyTuple: types.StateKeyTuple{
EventTypeNID: eventTypeNID,
EventStateKeyNID: eventStateKeyNID,
},
EventNID: eventNID,
},
}, nil
}
func (d *Database) assignRoomNID(
ctx context.Context, txn *sql.Tx, roomID string,
) (types.RoomNID, error) {
// Check if we already have a numeric ID in the database.
roomNID, err := d.statements.selectRoomNID(ctx, txn, roomID)
if err == sql.ErrNoRows {
// We don't have a numeric ID so insert one into the database.
roomNID, err = d.statements.insertRoomNID(ctx, txn, roomID)
if err == sql.ErrNoRows {
// We raced with another insert so run the select again.
roomNID, err = d.statements.selectRoomNID(ctx, txn, roomID)
}
}
return roomNID, err
}
func (d *Database) assignEventTypeNID(
ctx context.Context, eventType string,
) (types.EventTypeNID, error) {
// Check if we already have a numeric ID in the database.
eventTypeNID, err := d.statements.selectEventTypeNID(ctx, eventType)
if err == sql.ErrNoRows {
// We don't have a numeric ID so insert one into the database.
eventTypeNID, err = d.statements.insertEventTypeNID(ctx, eventType)
if err == sql.ErrNoRows {
// We raced with another insert so run the select again.
eventTypeNID, err = d.statements.selectEventTypeNID(ctx, eventType)
}
}
return eventTypeNID, err
}
func (d *Database) assignStateKeyNID(
ctx context.Context, txn *sql.Tx, eventStateKey string,
) (types.EventStateKeyNID, error) {
// Check if we already have a numeric ID in the database.
eventStateKeyNID, err := d.statements.selectEventStateKeyNID(ctx, txn, eventStateKey)
if err == sql.ErrNoRows {
// We don't have a numeric ID so insert one into the database.
eventStateKeyNID, err = d.statements.insertEventStateKeyNID(ctx, txn, eventStateKey)
if err == sql.ErrNoRows {
// We raced with another insert so run the select again.
eventStateKeyNID, err = d.statements.selectEventStateKeyNID(ctx, txn, eventStateKey)
}
}
return eventStateKeyNID, err
}
// StateEntriesForEventIDs implements input.EventDatabase
func (d *Database) StateEntriesForEventIDs(
ctx context.Context, eventIDs []string,
) ([]types.StateEntry, error) {
return d.statements.bulkSelectStateEventByID(ctx, eventIDs)
}
// EventTypeNIDs implements state.RoomStateDatabase
func (d *Database) EventTypeNIDs(
ctx context.Context, eventTypes []string,
) (map[string]types.EventTypeNID, error) {
return d.statements.bulkSelectEventTypeNID(ctx, eventTypes)
}
// EventStateKeyNIDs implements state.RoomStateDatabase
func (d *Database) EventStateKeyNIDs(
ctx context.Context, eventStateKeys []string,
) (map[string]types.EventStateKeyNID, error) {
return d.statements.bulkSelectEventStateKeyNID(ctx, eventStateKeys)
}
// EventStateKeys implements query.RoomserverQueryAPIDatabase
func (d *Database) EventStateKeys(
ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID,
) (map[types.EventStateKeyNID]string, error) {
return d.statements.bulkSelectEventStateKey(ctx, eventStateKeyNIDs)
}
// EventNIDs implements query.RoomserverQueryAPIDatabase
func (d *Database) EventNIDs(
ctx context.Context, eventIDs []string,
) (map[string]types.EventNID, error) {
return d.statements.bulkSelectEventNID(ctx, eventIDs)
}
// Events implements input.EventDatabase
func (d *Database) Events(
ctx context.Context, eventNIDs []types.EventNID,
) ([]types.Event, error) {
eventJSONs, err := d.statements.bulkSelectEventJSON(ctx, eventNIDs)
if err != nil {
return nil, err
}
results := make([]types.Event, len(eventJSONs))
for i, eventJSON := range eventJSONs {
result := &results[i]
result.EventNID = eventJSON.EventNID
// TODO: Use NewEventFromTrustedJSON for efficiency
result.Event, err = gomatrixserverlib.NewEventFromUntrustedJSON(eventJSON.EventJSON)
if err != nil {
return nil, err
}
}
return results, nil
}
// AddState implements input.EventDatabase
func (d *Database) AddState(
ctx context.Context,
roomNID types.RoomNID,
stateBlockNIDs []types.StateBlockNID,
state []types.StateEntry,
) (types.StateSnapshotNID, error) {
if len(state) > 0 {
stateBlockNID, err := d.statements.selectNextStateBlockNID(ctx)
if err != nil {
return 0, err
}
if err = d.statements.bulkInsertStateData(ctx, stateBlockNID, state); err != nil {
return 0, err
}
stateBlockNIDs = append(stateBlockNIDs[:len(stateBlockNIDs):len(stateBlockNIDs)], stateBlockNID)
}
return d.statements.insertState(ctx, roomNID, stateBlockNIDs)
}
// SetState implements input.EventDatabase
func (d *Database) SetState(
ctx context.Context, eventNID types.EventNID, stateNID types.StateSnapshotNID,
) error {
return d.statements.updateEventState(ctx, eventNID, stateNID)
}
// StateAtEventIDs implements input.EventDatabase
func (d *Database) StateAtEventIDs(
ctx context.Context, eventIDs []string,
) ([]types.StateAtEvent, error) {
return d.statements.bulkSelectStateAtEventByID(ctx, eventIDs)
}
// StateBlockNIDs implements state.RoomStateDatabase
func (d *Database) StateBlockNIDs(
ctx context.Context, stateNIDs []types.StateSnapshotNID,
) ([]types.StateBlockNIDList, error) {
return d.statements.bulkSelectStateBlockNIDs(ctx, stateNIDs)
}
// StateEntries implements state.RoomStateDatabase
func (d *Database) StateEntries(
ctx context.Context, stateBlockNIDs []types.StateBlockNID,
) ([]types.StateEntryList, error) {
return d.statements.bulkSelectStateBlockEntries(ctx, stateBlockNIDs)
}
// SnapshotNIDFromEventID implements state.RoomStateDatabase
func (d *Database) SnapshotNIDFromEventID(
ctx context.Context, eventID string,
) (types.StateSnapshotNID, error) {
_, stateNID, err := d.statements.selectEvent(ctx, eventID)
return stateNID, err
}
// EventIDs implements input.RoomEventDatabase
func (d *Database) EventIDs(
ctx context.Context, eventNIDs []types.EventNID,
) (map[types.EventNID]string, error) {
return d.statements.bulkSelectEventID(ctx, eventNIDs)
}
// GetLatestEventsForUpdate implements input.EventDatabase
func (d *Database) GetLatestEventsForUpdate(
ctx context.Context, roomNID types.RoomNID,
) (types.RoomRecentEventsUpdater, error) {
txn, err := d.db.Begin()
if err != nil {
return nil, err
}
eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err :=
d.statements.selectLatestEventsNIDsForUpdate(ctx, txn, roomNID)
if err != nil {
txn.Rollback() // nolint: errcheck
return nil, err
}
stateAndRefs, err := d.statements.bulkSelectStateAtEventAndReference(ctx, txn, eventNIDs)
if err != nil {
txn.Rollback() // nolint: errcheck
return nil, err
}
var lastEventIDSent string
if lastEventNIDSent != 0 {
lastEventIDSent, err = d.statements.selectEventID(ctx, txn, lastEventNIDSent)
if err != nil {
txn.Rollback() // nolint: errcheck
return nil, err
}
}
return &roomRecentEventsUpdater{
transaction{ctx, txn}, d, roomNID, stateAndRefs, lastEventIDSent, currentStateSnapshotNID,
}, nil
}
// GetTransactionEventID implements input.EventDatabase
func (d *Database) GetTransactionEventID(
ctx context.Context, transactionID string,
sessionID int64, userID string,
) (string, error) {
eventID, err := d.statements.selectTransactionEventID(ctx, transactionID, sessionID, userID)
if err == sql.ErrNoRows {
return "", nil
}
return eventID, err
}
type roomRecentEventsUpdater struct {
transaction
d *Database
roomNID types.RoomNID
latestEvents []types.StateAtEventAndReference
lastEventIDSent string
currentStateSnapshotNID types.StateSnapshotNID
}
// LatestEvents implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) LatestEvents() []types.StateAtEventAndReference {
return u.latestEvents
}
// LastEventIDSent implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) LastEventIDSent() string {
return u.lastEventIDSent
}
// CurrentStateSnapshotNID implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) CurrentStateSnapshotNID() types.StateSnapshotNID {
return u.currentStateSnapshotNID
}
// StorePreviousEvents implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error {
for _, ref := range previousEventReferences {
if err := u.d.statements.insertPreviousEvent(u.ctx, u.txn, ref.EventID, ref.EventSHA256, eventNID); err != nil {
return err
}
}
return nil
}
// IsReferenced implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) {
err := u.d.statements.selectPreviousEventExists(u.ctx, u.txn, eventReference.EventID, eventReference.EventSHA256)
if err == nil {
return true, nil
}
if err == sql.ErrNoRows {
return false, nil
}
return false, err
}
// SetLatestEvents implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) SetLatestEvents(
roomNID types.RoomNID, latest []types.StateAtEventAndReference, lastEventNIDSent types.EventNID,
currentStateSnapshotNID types.StateSnapshotNID,
) error {
eventNIDs := make([]types.EventNID, len(latest))
for i := range latest {
eventNIDs[i] = latest[i].EventNID
}
return u.d.statements.updateLatestEventNIDs(u.ctx, u.txn, roomNID, eventNIDs, lastEventNIDSent, currentStateSnapshotNID)
}
// HasEventBeenSent implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) HasEventBeenSent(eventNID types.EventNID) (bool, error) {
return u.d.statements.selectEventSentToOutput(u.ctx, u.txn, eventNID)
}
// MarkEventAsSent implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error {
return u.d.statements.updateEventSentToOutput(u.ctx, u.txn, eventNID)
}
func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID) (types.MembershipUpdater, error) {
return u.d.membershipUpdaterTxn(u.ctx, u.txn, u.roomNID, targetUserNID)
}
// RoomNID implements query.RoomserverQueryAPIDB
func (d *Database) RoomNID(ctx context.Context, roomID string) (types.RoomNID, error) {
roomNID, err := d.statements.selectRoomNID(ctx, nil, roomID)
if err == sql.ErrNoRows {
return 0, nil
}
return roomNID, err
}
// LatestEventIDs implements query.RoomserverQueryAPIDatabase
func (d *Database) LatestEventIDs(
ctx context.Context, roomNID types.RoomNID,
) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, int64, error) {
eventNIDs, currentStateSnapshotNID, err := d.statements.selectLatestEventNIDs(ctx, roomNID)
if err != nil {
return nil, 0, 0, err
}
references, err := d.statements.bulkSelectEventReference(ctx, eventNIDs)
if err != nil {
return nil, 0, 0, err
}
depth, err := d.statements.selectMaxEventDepth(ctx, eventNIDs)
if err != nil {
return nil, 0, 0, err
}
return references, currentStateSnapshotNID, depth, nil
}
// GetInvitesForUser implements query.RoomserverQueryAPIDatabase
func (d *Database) GetInvitesForUser(
ctx context.Context,
roomNID types.RoomNID,
targetUserNID types.EventStateKeyNID,
) (senderUserIDs []types.EventStateKeyNID, err error) {
return d.statements.selectInviteActiveForUserInRoom(ctx, targetUserNID, roomNID)
}
// SetRoomAlias implements alias.RoomserverAliasAPIDB
func (d *Database) SetRoomAlias(ctx context.Context, alias string, roomID string, creatorUserID string) error {
return d.statements.insertRoomAlias(ctx, alias, roomID, creatorUserID)
}
// GetRoomIDForAlias implements alias.RoomserverAliasAPIDB
func (d *Database) GetRoomIDForAlias(ctx context.Context, alias string) (string, error) {
return d.statements.selectRoomIDFromAlias(ctx, alias)
}
// GetAliasesForRoomID implements alias.RoomserverAliasAPIDB
func (d *Database) GetAliasesForRoomID(ctx context.Context, roomID string) ([]string, error) {
return d.statements.selectAliasesFromRoomID(ctx, roomID)
}
// GetCreatorIDForAlias implements alias.RoomserverAliasAPIDB
func (d *Database) GetCreatorIDForAlias(
ctx context.Context, alias string,
) (string, error) {
return d.statements.selectCreatorIDFromAlias(ctx, alias)
}
// RemoveRoomAlias implements alias.RoomserverAliasAPIDB
func (d *Database) RemoveRoomAlias(ctx context.Context, alias string) error {
return d.statements.deleteRoomAlias(ctx, alias)
}
// StateEntriesForTuples implements state.RoomStateDatabase
func (d *Database) StateEntriesForTuples(
ctx context.Context,
stateBlockNIDs []types.StateBlockNID,
stateKeyTuples []types.StateKeyTuple,
) ([]types.StateEntryList, error) {
return d.statements.bulkSelectFilteredStateBlockEntries(
ctx, stateBlockNIDs, stateKeyTuples,
)
}
// MembershipUpdater implements input.RoomEventDatabase
func (d *Database) MembershipUpdater(
ctx context.Context, roomID, targetUserID string,
) (types.MembershipUpdater, error) {
txn, err := d.db.Begin()
if err != nil {
return nil, err
}
succeeded := false
defer func() {
if !succeeded {
txn.Rollback() // nolint: errcheck
}
}()
roomNID, err := d.assignRoomNID(ctx, txn, roomID)
if err != nil {
return nil, err
}
targetUserNID, err := d.assignStateKeyNID(ctx, txn, targetUserID)
if err != nil {
return nil, err
}
updater, err := d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID)
if err != nil {
return nil, err
}
succeeded = true
return updater, nil
}
type membershipUpdater struct {
transaction
d *Database
roomNID types.RoomNID
targetUserNID types.EventStateKeyNID
membership membershipState
}
func (d *Database) membershipUpdaterTxn(
ctx context.Context,
txn *sql.Tx,
roomNID types.RoomNID,
targetUserNID types.EventStateKeyNID,
) (types.MembershipUpdater, error) {
if err := d.statements.insertMembership(ctx, txn, roomNID, targetUserNID); err != nil {
return nil, err
}
membership, err := d.statements.selectMembershipForUpdate(ctx, txn, roomNID, targetUserNID)
if err != nil {
return nil, err
}
return &membershipUpdater{
transaction{ctx, txn}, d, roomNID, targetUserNID, membership,
}, nil
}
// IsInvite implements types.MembershipUpdater
func (u *membershipUpdater) IsInvite() bool {
return u.membership == membershipStateInvite
}
// IsJoin implements types.MembershipUpdater
func (u *membershipUpdater) IsJoin() bool {
return u.membership == membershipStateJoin
}
// IsLeave implements types.MembershipUpdater
func (u *membershipUpdater) IsLeave() bool {
return u.membership == membershipStateLeaveOrBan
}
// SetToInvite implements types.MembershipUpdater
func (u *membershipUpdater) SetToInvite(event gomatrixserverlib.Event) (bool, error) {
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, event.Sender())
if err != nil {
return false, err
}
inserted, err := u.d.statements.insertInviteEvent(
u.ctx, u.txn, event.EventID(), u.roomNID, u.targetUserNID, senderUserNID, event.JSON(),
)
if err != nil {
return false, err
}
if u.membership != membershipStateInvite {
if err = u.d.statements.updateMembership(
u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, membershipStateInvite, 0,
); err != nil {
return false, err
}
}
return inserted, nil
}
// SetToJoin implements types.MembershipUpdater
func (u *membershipUpdater) SetToJoin(senderUserID string, eventID string, isUpdate bool) ([]string, error) {
var inviteEventIDs []string
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, senderUserID)
if err != nil {
return nil, err
}
// If this is a join event update, there is no invite to update
if !isUpdate {
inviteEventIDs, err = u.d.statements.updateInviteRetired(
u.ctx, u.txn, u.roomNID, u.targetUserNID,
)
if err != nil {
return nil, err
}
}
// Look up the NID of the new join event
nIDs, err := u.d.EventNIDs(u.ctx, []string{eventID})
if err != nil {
return nil, err
}
if u.membership != membershipStateJoin || isUpdate {
if err = u.d.statements.updateMembership(
u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID,
membershipStateJoin, nIDs[eventID],
); err != nil {
return nil, err
}
}
return inviteEventIDs, nil
}
// SetToLeave implements types.MembershipUpdater
func (u *membershipUpdater) SetToLeave(senderUserID string, eventID string) ([]string, error) {
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, senderUserID)
if err != nil {
return nil, err
}
inviteEventIDs, err := u.d.statements.updateInviteRetired(
u.ctx, u.txn, u.roomNID, u.targetUserNID,
)
if err != nil {
return nil, err
}
// Look up the NID of the new leave event
nIDs, err := u.d.EventNIDs(u.ctx, []string{eventID})
if err != nil {
return nil, err
}
if u.membership != membershipStateLeaveOrBan {
if err = u.d.statements.updateMembership(
u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID,
membershipStateLeaveOrBan, nIDs[eventID],
); err != nil {
return nil, err
}
}
return inviteEventIDs, nil
}
// GetMembership implements query.RoomserverQueryAPIDB
func (d *Database) GetMembership(
ctx context.Context, roomNID types.RoomNID, requestSenderUserID string,
) (membershipEventNID types.EventNID, stillInRoom bool, err error) {
requestSenderUserNID, err := d.assignStateKeyNID(ctx, nil, requestSenderUserID)
if err != nil {
return
}
senderMembershipEventNID, senderMembership, err :=
d.statements.selectMembershipFromRoomAndTarget(
ctx, roomNID, requestSenderUserNID,
)
if err == sql.ErrNoRows {
// The user has never been a member of that room
return 0, false, nil
} else if err != nil {
return
}
return senderMembershipEventNID, senderMembership == membershipStateJoin, nil
}
// GetMembershipEventNIDsForRoom implements query.RoomserverQueryAPIDB
func (d *Database) GetMembershipEventNIDsForRoom(
ctx context.Context, roomNID types.RoomNID, joinOnly bool,
) ([]types.EventNID, error) {
if joinOnly {
return d.statements.selectMembershipsFromRoomAndMembership(
ctx, roomNID, membershipStateJoin,
)
}
return d.statements.selectMembershipsFromRoom(ctx, roomNID)
}
// EventsFromIDs implements query.RoomserverQueryAPIEventDB
func (d *Database) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
nidMap, err := d.EventNIDs(ctx, eventIDs)
if err != nil {
return nil, err
}
var nids []types.EventNID
for _, nid := range nidMap {
nids = append(nids, nid)
}
return d.Events(ctx, nids)
}
type transaction struct {
ctx context.Context
txn *sql.Tx
}
// Commit implements types.Transaction
func (t *transaction) Commit() error {
return t.txn.Commit()
}
// Rollback implements types.Transaction
func (t *transaction) Rollback() error {
return t.txn.Rollback()
}

View file

@ -1,3 +1,6 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -10,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -1,4 +1,4 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -16,697 +16,57 @@ package storage
import (
"context"
"database/sql"
"errors"
"net/url"
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage/postgres"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
// A Database is used to store room events and stream offsets.
type Database struct {
statements statements
db *sql.DB
type Database interface {
StoreEvent(ctx context.Context, event gomatrixserverlib.Event, txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID) (types.RoomNID, types.StateAtEvent, error)
StateEntriesForEventIDs(ctx context.Context, eventIDs []string) ([]types.StateEntry, error)
EventTypeNIDs(ctx context.Context, eventTypes []string) (map[string]types.EventTypeNID, error)
EventStateKeyNIDs(ctx context.Context, eventStateKeys []string) (map[string]types.EventStateKeyNID, error)
EventStateKeys(ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]string, error)
EventNIDs(ctx context.Context, eventIDs []string) (map[string]types.EventNID, error)
Events(ctx context.Context, eventNIDs []types.EventNID) ([]types.Event, error)
AddState(ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error)
SetState(ctx context.Context, eventNID types.EventNID, stateNID types.StateSnapshotNID) error
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
StateBlockNIDs(ctx context.Context, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error)
StateEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error)
SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error)
EventIDs(ctx context.Context, eventNIDs []types.EventNID) (map[types.EventNID]string, error)
GetLatestEventsForUpdate(ctx context.Context, roomNID types.RoomNID) (types.RoomRecentEventsUpdater, error)
GetTransactionEventID(ctx context.Context, transactionID string, sessionID int64, userID string) (string, error)
RoomNID(ctx context.Context, roomID string) (types.RoomNID, error)
LatestEventIDs(ctx context.Context, roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, int64, error)
GetInvitesForUser(ctx context.Context, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) (senderUserIDs []types.EventStateKeyNID, err error)
SetRoomAlias(ctx context.Context, alias string, roomID string, creatorUserID string) error
GetRoomIDForAlias(ctx context.Context, alias string) (string, error)
GetAliasesForRoomID(ctx context.Context, roomID string) ([]string, error)
GetCreatorIDForAlias(ctx context.Context, alias string) (string, error)
RemoveRoomAlias(ctx context.Context, alias string) error
StateEntriesForTuples(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error)
MembershipUpdater(ctx context.Context, roomID, targetUserID string) (types.MembershipUpdater, error)
GetMembership(ctx context.Context, roomNID types.RoomNID, requestSenderUserID string) (membershipEventNID types.EventNID, stillInRoom bool, err error)
GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool) ([]types.EventNID, error)
EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error)
}
// Open a postgres database.
func Open(dataSourceName string) (*Database, error) {
var d Database
var err error
if d.db, err = sql.Open("postgres", dataSourceName); err != nil {
return nil, err
}
if err = d.statements.prepare(d.db); err != nil {
return nil, err
}
return &d, nil
}
// StoreEvent implements input.EventDatabase
func (d *Database) StoreEvent(
ctx context.Context, event gomatrixserverlib.Event,
txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID,
) (types.RoomNID, types.StateAtEvent, error) {
var (
roomNID types.RoomNID
eventTypeNID types.EventTypeNID
eventStateKeyNID types.EventStateKeyNID
eventNID types.EventNID
stateNID types.StateSnapshotNID
err error
)
if txnAndSessionID != nil {
if err = d.statements.insertTransaction(
ctx, txnAndSessionID.TransactionID,
txnAndSessionID.SessionID, event.Sender(), event.EventID(),
); err != nil {
return 0, types.StateAtEvent{}, err
}
}
if roomNID, err = d.assignRoomNID(ctx, nil, event.RoomID()); err != nil {
return 0, types.StateAtEvent{}, err
}
if eventTypeNID, err = d.assignEventTypeNID(ctx, event.Type()); err != nil {
return 0, types.StateAtEvent{}, err
}
eventStateKey := event.StateKey()
// Assigned a numeric ID for the state_key if there is one present.
// Otherwise set the numeric ID for the state_key to 0.
if eventStateKey != nil {
if eventStateKeyNID, err = d.assignStateKeyNID(ctx, nil, *eventStateKey); err != nil {
return 0, types.StateAtEvent{}, err
}
}
if eventNID, stateNID, err = d.statements.insertEvent(
ctx,
roomNID,
eventTypeNID,
eventStateKeyNID,
event.EventID(),
event.EventReference().EventSHA256,
authEventNIDs,
event.Depth(),
); err != nil {
if err == sql.ErrNoRows {
// We've already inserted the event so select the numeric event ID
eventNID, stateNID, err = d.statements.selectEvent(ctx, event.EventID())
}
if err != nil {
return 0, types.StateAtEvent{}, err
}
}
if err = d.statements.insertEventJSON(ctx, eventNID, event.JSON()); err != nil {
return 0, types.StateAtEvent{}, err
}
return roomNID, types.StateAtEvent{
BeforeStateSnapshotNID: stateNID,
StateEntry: types.StateEntry{
StateKeyTuple: types.StateKeyTuple{
EventTypeNID: eventTypeNID,
EventStateKeyNID: eventStateKeyNID,
},
EventNID: eventNID,
},
}, nil
}
func (d *Database) assignRoomNID(
ctx context.Context, txn *sql.Tx, roomID string,
) (types.RoomNID, error) {
// Check if we already have a numeric ID in the database.
roomNID, err := d.statements.selectRoomNID(ctx, txn, roomID)
if err == sql.ErrNoRows {
// We don't have a numeric ID so insert one into the database.
roomNID, err = d.statements.insertRoomNID(ctx, txn, roomID)
if err == sql.ErrNoRows {
// We raced with another insert so run the select again.
roomNID, err = d.statements.selectRoomNID(ctx, txn, roomID)
}
}
return roomNID, err
}
func (d *Database) assignEventTypeNID(
ctx context.Context, eventType string,
) (types.EventTypeNID, error) {
// Check if we already have a numeric ID in the database.
eventTypeNID, err := d.statements.selectEventTypeNID(ctx, eventType)
if err == sql.ErrNoRows {
// We don't have a numeric ID so insert one into the database.
eventTypeNID, err = d.statements.insertEventTypeNID(ctx, eventType)
if err == sql.ErrNoRows {
// We raced with another insert so run the select again.
eventTypeNID, err = d.statements.selectEventTypeNID(ctx, eventType)
}
}
return eventTypeNID, err
}
func (d *Database) assignStateKeyNID(
ctx context.Context, txn *sql.Tx, eventStateKey string,
) (types.EventStateKeyNID, error) {
// Check if we already have a numeric ID in the database.
eventStateKeyNID, err := d.statements.selectEventStateKeyNID(ctx, txn, eventStateKey)
if err == sql.ErrNoRows {
// We don't have a numeric ID so insert one into the database.
eventStateKeyNID, err = d.statements.insertEventStateKeyNID(ctx, txn, eventStateKey)
if err == sql.ErrNoRows {
// We raced with another insert so run the select again.
eventStateKeyNID, err = d.statements.selectEventStateKeyNID(ctx, txn, eventStateKey)
}
}
return eventStateKeyNID, err
}
// StateEntriesForEventIDs implements input.EventDatabase
func (d *Database) StateEntriesForEventIDs(
ctx context.Context, eventIDs []string,
) ([]types.StateEntry, error) {
return d.statements.bulkSelectStateEventByID(ctx, eventIDs)
}
// EventTypeNIDs implements state.RoomStateDatabase
func (d *Database) EventTypeNIDs(
ctx context.Context, eventTypes []string,
) (map[string]types.EventTypeNID, error) {
return d.statements.bulkSelectEventTypeNID(ctx, eventTypes)
}
// EventStateKeyNIDs implements state.RoomStateDatabase
func (d *Database) EventStateKeyNIDs(
ctx context.Context, eventStateKeys []string,
) (map[string]types.EventStateKeyNID, error) {
return d.statements.bulkSelectEventStateKeyNID(ctx, eventStateKeys)
}
// EventStateKeys implements query.RoomserverQueryAPIDatabase
func (d *Database) EventStateKeys(
ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID,
) (map[types.EventStateKeyNID]string, error) {
return d.statements.bulkSelectEventStateKey(ctx, eventStateKeyNIDs)
}
// EventNIDs implements query.RoomserverQueryAPIDatabase
func (d *Database) EventNIDs(
ctx context.Context, eventIDs []string,
) (map[string]types.EventNID, error) {
return d.statements.bulkSelectEventNID(ctx, eventIDs)
}
// Events implements input.EventDatabase
func (d *Database) Events(
ctx context.Context, eventNIDs []types.EventNID,
) ([]types.Event, error) {
eventJSONs, err := d.statements.bulkSelectEventJSON(ctx, eventNIDs)
// NewPublicRoomsServerDatabase opens a database connection.
func Open(dataSourceName string) (Database, error) {
uri, err := url.Parse(dataSourceName)
if err != nil {
return nil, err
}
results := make([]types.Event, len(eventJSONs))
for i, eventJSON := range eventJSONs {
result := &results[i]
result.EventNID = eventJSON.EventNID
// TODO: Use NewEventFromTrustedJSON for efficiency
result.Event, err = gomatrixserverlib.NewEventFromUntrustedJSON(eventJSON.EventJSON)
if err != nil {
return nil, err
switch uri.Scheme {
case "postgres":
return postgres.Open(dataSourceName)
default:
return nil, errors.New("unknown schema")
}
}
return results, nil
}
// AddState implements input.EventDatabase
func (d *Database) AddState(
ctx context.Context,
roomNID types.RoomNID,
stateBlockNIDs []types.StateBlockNID,
state []types.StateEntry,
) (types.StateSnapshotNID, error) {
if len(state) > 0 {
stateBlockNID, err := d.statements.selectNextStateBlockNID(ctx)
if err != nil {
return 0, err
}
if err = d.statements.bulkInsertStateData(ctx, stateBlockNID, state); err != nil {
return 0, err
}
stateBlockNIDs = append(stateBlockNIDs[:len(stateBlockNIDs):len(stateBlockNIDs)], stateBlockNID)
}
return d.statements.insertState(ctx, roomNID, stateBlockNIDs)
}
// SetState implements input.EventDatabase
func (d *Database) SetState(
ctx context.Context, eventNID types.EventNID, stateNID types.StateSnapshotNID,
) error {
return d.statements.updateEventState(ctx, eventNID, stateNID)
}
// StateAtEventIDs implements input.EventDatabase
func (d *Database) StateAtEventIDs(
ctx context.Context, eventIDs []string,
) ([]types.StateAtEvent, error) {
return d.statements.bulkSelectStateAtEventByID(ctx, eventIDs)
}
// StateBlockNIDs implements state.RoomStateDatabase
func (d *Database) StateBlockNIDs(
ctx context.Context, stateNIDs []types.StateSnapshotNID,
) ([]types.StateBlockNIDList, error) {
return d.statements.bulkSelectStateBlockNIDs(ctx, stateNIDs)
}
// StateEntries implements state.RoomStateDatabase
func (d *Database) StateEntries(
ctx context.Context, stateBlockNIDs []types.StateBlockNID,
) ([]types.StateEntryList, error) {
return d.statements.bulkSelectStateBlockEntries(ctx, stateBlockNIDs)
}
// SnapshotNIDFromEventID implements state.RoomStateDatabase
func (d *Database) SnapshotNIDFromEventID(
ctx context.Context, eventID string,
) (types.StateSnapshotNID, error) {
_, stateNID, err := d.statements.selectEvent(ctx, eventID)
return stateNID, err
}
// EventIDs implements input.RoomEventDatabase
func (d *Database) EventIDs(
ctx context.Context, eventNIDs []types.EventNID,
) (map[types.EventNID]string, error) {
return d.statements.bulkSelectEventID(ctx, eventNIDs)
}
// GetLatestEventsForUpdate implements input.EventDatabase
func (d *Database) GetLatestEventsForUpdate(
ctx context.Context, roomNID types.RoomNID,
) (types.RoomRecentEventsUpdater, error) {
txn, err := d.db.Begin()
if err != nil {
return nil, err
}
eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err :=
d.statements.selectLatestEventsNIDsForUpdate(ctx, txn, roomNID)
if err != nil {
txn.Rollback() // nolint: errcheck
return nil, err
}
stateAndRefs, err := d.statements.bulkSelectStateAtEventAndReference(ctx, txn, eventNIDs)
if err != nil {
txn.Rollback() // nolint: errcheck
return nil, err
}
var lastEventIDSent string
if lastEventNIDSent != 0 {
lastEventIDSent, err = d.statements.selectEventID(ctx, txn, lastEventNIDSent)
if err != nil {
txn.Rollback() // nolint: errcheck
return nil, err
}
}
return &roomRecentEventsUpdater{
transaction{ctx, txn}, d, roomNID, stateAndRefs, lastEventIDSent, currentStateSnapshotNID,
}, nil
}
// GetTransactionEventID implements input.EventDatabase
func (d *Database) GetTransactionEventID(
ctx context.Context, transactionID string,
sessionID int64, userID string,
) (string, error) {
eventID, err := d.statements.selectTransactionEventID(ctx, transactionID, sessionID, userID)
if err == sql.ErrNoRows {
return "", nil
}
return eventID, err
}
type roomRecentEventsUpdater struct {
transaction
d *Database
roomNID types.RoomNID
latestEvents []types.StateAtEventAndReference
lastEventIDSent string
currentStateSnapshotNID types.StateSnapshotNID
}
// LatestEvents implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) LatestEvents() []types.StateAtEventAndReference {
return u.latestEvents
}
// LastEventIDSent implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) LastEventIDSent() string {
return u.lastEventIDSent
}
// CurrentStateSnapshotNID implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) CurrentStateSnapshotNID() types.StateSnapshotNID {
return u.currentStateSnapshotNID
}
// StorePreviousEvents implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error {
for _, ref := range previousEventReferences {
if err := u.d.statements.insertPreviousEvent(u.ctx, u.txn, ref.EventID, ref.EventSHA256, eventNID); err != nil {
return err
}
}
return nil
}
// IsReferenced implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) {
err := u.d.statements.selectPreviousEventExists(u.ctx, u.txn, eventReference.EventID, eventReference.EventSHA256)
if err == nil {
return true, nil
}
if err == sql.ErrNoRows {
return false, nil
}
return false, err
}
// SetLatestEvents implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) SetLatestEvents(
roomNID types.RoomNID, latest []types.StateAtEventAndReference, lastEventNIDSent types.EventNID,
currentStateSnapshotNID types.StateSnapshotNID,
) error {
eventNIDs := make([]types.EventNID, len(latest))
for i := range latest {
eventNIDs[i] = latest[i].EventNID
}
return u.d.statements.updateLatestEventNIDs(u.ctx, u.txn, roomNID, eventNIDs, lastEventNIDSent, currentStateSnapshotNID)
}
// HasEventBeenSent implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) HasEventBeenSent(eventNID types.EventNID) (bool, error) {
return u.d.statements.selectEventSentToOutput(u.ctx, u.txn, eventNID)
}
// MarkEventAsSent implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error {
return u.d.statements.updateEventSentToOutput(u.ctx, u.txn, eventNID)
}
func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID) (types.MembershipUpdater, error) {
return u.d.membershipUpdaterTxn(u.ctx, u.txn, u.roomNID, targetUserNID)
}
// RoomNID implements query.RoomserverQueryAPIDB
func (d *Database) RoomNID(ctx context.Context, roomID string) (types.RoomNID, error) {
roomNID, err := d.statements.selectRoomNID(ctx, nil, roomID)
if err == sql.ErrNoRows {
return 0, nil
}
return roomNID, err
}
// LatestEventIDs implements query.RoomserverQueryAPIDatabase
func (d *Database) LatestEventIDs(
ctx context.Context, roomNID types.RoomNID,
) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, int64, error) {
eventNIDs, currentStateSnapshotNID, err := d.statements.selectLatestEventNIDs(ctx, roomNID)
if err != nil {
return nil, 0, 0, err
}
references, err := d.statements.bulkSelectEventReference(ctx, eventNIDs)
if err != nil {
return nil, 0, 0, err
}
depth, err := d.statements.selectMaxEventDepth(ctx, eventNIDs)
if err != nil {
return nil, 0, 0, err
}
return references, currentStateSnapshotNID, depth, nil
}
// GetInvitesForUser implements query.RoomserverQueryAPIDatabase
func (d *Database) GetInvitesForUser(
ctx context.Context,
roomNID types.RoomNID,
targetUserNID types.EventStateKeyNID,
) (senderUserIDs []types.EventStateKeyNID, err error) {
return d.statements.selectInviteActiveForUserInRoom(ctx, targetUserNID, roomNID)
}
// SetRoomAlias implements alias.RoomserverAliasAPIDB
func (d *Database) SetRoomAlias(ctx context.Context, alias string, roomID string, creatorUserID string) error {
return d.statements.insertRoomAlias(ctx, alias, roomID, creatorUserID)
}
// GetRoomIDForAlias implements alias.RoomserverAliasAPIDB
func (d *Database) GetRoomIDForAlias(ctx context.Context, alias string) (string, error) {
return d.statements.selectRoomIDFromAlias(ctx, alias)
}
// GetAliasesForRoomID implements alias.RoomserverAliasAPIDB
func (d *Database) GetAliasesForRoomID(ctx context.Context, roomID string) ([]string, error) {
return d.statements.selectAliasesFromRoomID(ctx, roomID)
}
// GetCreatorIDForAlias implements alias.RoomserverAliasAPIDB
func (d *Database) GetCreatorIDForAlias(
ctx context.Context, alias string,
) (string, error) {
return d.statements.selectCreatorIDFromAlias(ctx, alias)
}
// RemoveRoomAlias implements alias.RoomserverAliasAPIDB
func (d *Database) RemoveRoomAlias(ctx context.Context, alias string) error {
return d.statements.deleteRoomAlias(ctx, alias)
}
// StateEntriesForTuples implements state.RoomStateDatabase
func (d *Database) StateEntriesForTuples(
ctx context.Context,
stateBlockNIDs []types.StateBlockNID,
stateKeyTuples []types.StateKeyTuple,
) ([]types.StateEntryList, error) {
return d.statements.bulkSelectFilteredStateBlockEntries(
ctx, stateBlockNIDs, stateKeyTuples,
)
}
// MembershipUpdater implements input.RoomEventDatabase
func (d *Database) MembershipUpdater(
ctx context.Context, roomID, targetUserID string,
) (types.MembershipUpdater, error) {
txn, err := d.db.Begin()
if err != nil {
return nil, err
}
succeeded := false
defer func() {
if !succeeded {
txn.Rollback() // nolint: errcheck
}
}()
roomNID, err := d.assignRoomNID(ctx, txn, roomID)
if err != nil {
return nil, err
}
targetUserNID, err := d.assignStateKeyNID(ctx, txn, targetUserID)
if err != nil {
return nil, err
}
updater, err := d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID)
if err != nil {
return nil, err
}
succeeded = true
return updater, nil
}
type membershipUpdater struct {
transaction
d *Database
roomNID types.RoomNID
targetUserNID types.EventStateKeyNID
membership membershipState
}
func (d *Database) membershipUpdaterTxn(
ctx context.Context,
txn *sql.Tx,
roomNID types.RoomNID,
targetUserNID types.EventStateKeyNID,
) (types.MembershipUpdater, error) {
if err := d.statements.insertMembership(ctx, txn, roomNID, targetUserNID); err != nil {
return nil, err
}
membership, err := d.statements.selectMembershipForUpdate(ctx, txn, roomNID, targetUserNID)
if err != nil {
return nil, err
}
return &membershipUpdater{
transaction{ctx, txn}, d, roomNID, targetUserNID, membership,
}, nil
}
// IsInvite implements types.MembershipUpdater
func (u *membershipUpdater) IsInvite() bool {
return u.membership == membershipStateInvite
}
// IsJoin implements types.MembershipUpdater
func (u *membershipUpdater) IsJoin() bool {
return u.membership == membershipStateJoin
}
// IsLeave implements types.MembershipUpdater
func (u *membershipUpdater) IsLeave() bool {
return u.membership == membershipStateLeaveOrBan
}
// SetToInvite implements types.MembershipUpdater
func (u *membershipUpdater) SetToInvite(event gomatrixserverlib.Event) (bool, error) {
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, event.Sender())
if err != nil {
return false, err
}
inserted, err := u.d.statements.insertInviteEvent(
u.ctx, u.txn, event.EventID(), u.roomNID, u.targetUserNID, senderUserNID, event.JSON(),
)
if err != nil {
return false, err
}
if u.membership != membershipStateInvite {
if err = u.d.statements.updateMembership(
u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, membershipStateInvite, 0,
); err != nil {
return false, err
}
}
return inserted, nil
}
// SetToJoin implements types.MembershipUpdater
func (u *membershipUpdater) SetToJoin(senderUserID string, eventID string, isUpdate bool) ([]string, error) {
var inviteEventIDs []string
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, senderUserID)
if err != nil {
return nil, err
}
// If this is a join event update, there is no invite to update
if !isUpdate {
inviteEventIDs, err = u.d.statements.updateInviteRetired(
u.ctx, u.txn, u.roomNID, u.targetUserNID,
)
if err != nil {
return nil, err
}
}
// Look up the NID of the new join event
nIDs, err := u.d.EventNIDs(u.ctx, []string{eventID})
if err != nil {
return nil, err
}
if u.membership != membershipStateJoin || isUpdate {
if err = u.d.statements.updateMembership(
u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID,
membershipStateJoin, nIDs[eventID],
); err != nil {
return nil, err
}
}
return inviteEventIDs, nil
}
// SetToLeave implements types.MembershipUpdater
func (u *membershipUpdater) SetToLeave(senderUserID string, eventID string) ([]string, error) {
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, senderUserID)
if err != nil {
return nil, err
}
inviteEventIDs, err := u.d.statements.updateInviteRetired(
u.ctx, u.txn, u.roomNID, u.targetUserNID,
)
if err != nil {
return nil, err
}
// Look up the NID of the new leave event
nIDs, err := u.d.EventNIDs(u.ctx, []string{eventID})
if err != nil {
return nil, err
}
if u.membership != membershipStateLeaveOrBan {
if err = u.d.statements.updateMembership(
u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID,
membershipStateLeaveOrBan, nIDs[eventID],
); err != nil {
return nil, err
}
}
return inviteEventIDs, nil
}
// GetMembership implements query.RoomserverQueryAPIDB
func (d *Database) GetMembership(
ctx context.Context, roomNID types.RoomNID, requestSenderUserID string,
) (membershipEventNID types.EventNID, stillInRoom bool, err error) {
requestSenderUserNID, err := d.assignStateKeyNID(ctx, nil, requestSenderUserID)
if err != nil {
return
}
senderMembershipEventNID, senderMembership, err :=
d.statements.selectMembershipFromRoomAndTarget(
ctx, roomNID, requestSenderUserNID,
)
if err == sql.ErrNoRows {
// The user has never been a member of that room
return 0, false, nil
} else if err != nil {
return
}
return senderMembershipEventNID, senderMembership == membershipStateJoin, nil
}
// GetMembershipEventNIDsForRoom implements query.RoomserverQueryAPIDB
func (d *Database) GetMembershipEventNIDsForRoom(
ctx context.Context, roomNID types.RoomNID, joinOnly bool,
) ([]types.EventNID, error) {
if joinOnly {
return d.statements.selectMembershipsFromRoomAndMembership(
ctx, roomNID, membershipStateJoin,
)
}
return d.statements.selectMembershipsFromRoom(ctx, roomNID)
}
// EventsFromIDs implements query.RoomserverQueryAPIEventDB
func (d *Database) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
nidMap, err := d.EventNIDs(ctx, eventIDs)
if err != nil {
return nil, err
}
var nids []types.EventNID
for _, nid := range nidMap {
nids = append(nids, nid)
}
return d.Events(ctx, nids)
}
type transaction struct {
ctx context.Context
txn *sql.Tx
}
// Commit implements types.Transaction
func (t *transaction) Commit() error {
return t.txn.Commit()
}
// Rollback implements types.Transaction
func (t *transaction) Rollback() error {
return t.txn.Rollback()
}

View file

@ -27,10 +27,17 @@ echo "Installing golangci-lint..."
# TODO: Once go 1.13 is out, use go get's -mod=readonly option
# https://github.com/golang/go/issues/30667
cp go.mod go.mod.bak && cp go.sum go.sum.bak
go get github.com/golangci/golangci-lint/cmd/golangci-lint
go get github.com/golangci/golangci-lint/cmd/golangci-lint@v1.19.1
# Run linting
echo "Looking for lint..."
golangci-lint run $args
# Capture exit code to ensure go.{mod,sum} is restored before exiting
exit_code=0
golangci-lint run $args || exit_code=1
# Restore go.{mod,sum}
mv go.mod.bak go.mod && mv go.sum.bak go.sum
exit $exit_code

View file

@ -30,7 +30,7 @@ import (
// OutputClientDataConsumer consumes events that originated in the client API server.
type OutputClientDataConsumer struct {
clientAPIConsumer *common.ContinualConsumer
db *storage.SyncServerDatasource
db storage.Database
notifier *sync.Notifier
}
@ -39,7 +39,7 @@ func NewOutputClientDataConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store *storage.SyncServerDatasource,
store storage.Database,
) *OutputClientDataConsumer {
consumer := common.ContinualConsumer{

View file

@ -33,7 +33,7 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer
db *storage.SyncServerDatasource
db storage.Database
notifier *sync.Notifier
query api.RoomserverQueryAPI
}
@ -43,7 +43,7 @@ func NewOutputRoomEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store *storage.SyncServerDatasource,
store storage.Database,
queryAPI api.RoomserverQueryAPI,
) *OutputRoomEventConsumer {

View file

@ -30,7 +30,7 @@ import (
// OutputTypingEventConsumer consumes events that originated in the typing server.
type OutputTypingEventConsumer struct {
typingConsumer *common.ContinualConsumer
db *storage.SyncServerDatasource
db storage.Database
notifier *sync.Notifier
}
@ -40,7 +40,7 @@ func NewOutputTypingEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store *storage.SyncServerDatasource,
store storage.Database,
) *OutputTypingEventConsumer {
consumer := common.ContinualConsumer{

View file

@ -34,7 +34,7 @@ const pathPrefixR0 = "/_matrix/client/r0"
// Due to Setup being used to call many other functions, a gocyclo nolint is
// applied:
// nolint: gocyclo
func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServerDatasource, deviceDB *devices.Database) {
func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB storage.Database, deviceDB *devices.Database) {
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
authData := auth.Data{

View file

@ -40,7 +40,7 @@ type stateEventInStateResp struct {
// TODO: Check if the user is in the room. If not, check if the room's history
// is publicly visible. Current behaviour is returning an empty array if the
// user cannot see the room's history.
func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatasource, roomID string) util.JSONResponse {
func OnIncomingStateRequest(req *http.Request, db storage.Database, roomID string) util.JSONResponse {
// TODO(#287): Auth request and handle the case where the user has left (where
// we should return the state at the poin they left)
@ -87,7 +87,7 @@ func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatasource,
// /rooms/{roomID}/state/{type}/{statekey} request. It will look in current
// state to see if there is an event with that type and state key, if there
// is then (by default) we return the content, otherwise a 404.
func OnIncomingStateTypeRequest(req *http.Request, db *storage.SyncServerDatasource, roomID string, evType, stateKey string) util.JSONResponse {
func OnIncomingStateTypeRequest(req *http.Request, db storage.Database, roomID string, evType, stateKey string) util.JSONResponse {
// TODO(#287): Auth request and handle the case where the user has left (where
// we should return the state at the poin they left)

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"
@ -84,7 +85,12 @@ const selectStateEventSQL = "" +
"SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
const selectEventsWithEventIDsSQL = "" +
"SELECT added_at, event_json FROM syncapi_current_room_state WHERE event_id = ANY($1)"
// TODO: The session_id and transaction_id blanks are here because otherwise
// the rowsToStreamEvents expects there to be exactly four columns. We need to
// figure out if these really need to be in the DB, and if so, we need a
// better permanent fix for this. - neilalexander, 2 Jan 2020
"SELECT added_at, event_json, 0 AS session_id, '' AS transaction_id" +
" FROM syncapi_current_room_state WHERE event_id = ANY($1)"
type currentRoomStateStatements struct {
upsertRoomStateStmt *sql.Stmt

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"strings"

View file

@ -1,4 +1,19 @@
package storage
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"
@ -67,7 +68,7 @@ const insertEventSQL = "" +
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id"
const selectEventsSQL = "" +
"SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)"
"SELECT id, event_json, session_id, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)"
const selectRecentEventsSQL = "" +
"SELECT id, event_json, session_id, transaction_id FROM syncapi_output_room_events" +

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -0,0 +1,63 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"errors"
"net/url"
"time"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage/postgres"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/gomatrixserverlib"
)
type Database interface {
common.PartitionStorer
AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error)
Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error)
WriteEvent(ctx context.Context, ev *gomatrixserverlib.Event, addStateEvents []gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string, transactionID *api.TransactionID) (pduPosition int64, returnErr error)
GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.Event, error)
GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.FilterPart) (stateEvents []gomatrixserverlib.Event, err error)
SyncPosition(ctx context.Context) (types.SyncPosition, error)
IncrementalSync(ctx context.Context, device authtypes.Device, fromPos, toPos types.SyncPosition, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error)
CompleteSync(ctx context.Context, userID string, numRecentEventsPerRoom int) (*types.Response, error)
GetAccountDataInRange(ctx context.Context, userID string, oldPos, newPos int64, accountDataFilterPart *gomatrixserverlib.FilterPart) (map[string][]string, error)
UpsertAccountData(ctx context.Context, userID, roomID, dataType string) (int64, error)
AddInviteEvent(ctx context.Context, inviteEvent gomatrixserverlib.Event) (int64, error)
RetireInviteEvent(ctx context.Context, inviteEventID string) error
SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn)
AddTypingUser(userID, roomID string, expireTime *time.Time) int64
RemoveTypingUser(userID, roomID string) int64
}
// NewPublicRoomsServerDatabase opens a database connection.
func NewSyncServerDatasource(dataSourceName string) (Database, error) {
uri, err := url.Parse(dataSourceName)
if err != nil {
return nil, err
}
switch uri.Scheme {
case "postgres":
return postgres.NewSyncServerDatasource(dataSourceName)
default:
return nil, errors.New("unknown schema")
}
}

View file

@ -141,7 +141,7 @@ func (n *Notifier) GetListener(req syncRequest) UserStreamListener {
}
// Load the membership states required to notify users correctly.
func (n *Notifier) Load(ctx context.Context, db *storage.SyncServerDatasource) error {
func (n *Notifier) Load(ctx context.Context, db storage.Database) error {
roomToUsers, err := db.AllJoinedUsersInRooms(ctx)
if err != nil {
return err

View file

@ -31,13 +31,13 @@ import (
// RequestPool manages HTTP long-poll connections for /sync
type RequestPool struct {
db *storage.SyncServerDatasource
db storage.Database
accountDB *accounts.Database
notifier *Notifier
}
// NewRequestPool makes a new RequestPool
func NewRequestPool(db *storage.SyncServerDatasource, n *Notifier, adb *accounts.Database) *RequestPool {
func NewRequestPool(db storage.Database, n *Notifier, adb *accounts.Database) *RequestPool {
return &RequestPool{db, adb, n}
}

View file

@ -126,8 +126,14 @@ Checking local federation server
Inbound federation can query profile data
Outbound federation can send room-join requests
Outbound federation can send events
Inbound federation can backfill events
Backfill checks the events requested belong to the room
# SyTest currently only implements the v1 endpoints for /send_join and /send_leave,
# whereas Dendrite only supports the v2 endpoints for those, so let's ignore this
# test for now.
#Inbound federation can backfill events
# SyTest currently only implements the v1 endpoints for /send_join and /send_leave,
# whereas Dendrite only supports the v2 endpoints for those, so let's ignore this
# test for now.
#Backfill checks the events requested belong to the room
Can upload without a file name
Can download without a file name locally
Can upload with ASCII file name
@ -143,7 +149,10 @@ Trying to get push rules with unknown rule_id fails with 404
Events come down the correct room
local user can join room with version 5
User can invite local user to room with version 5
Inbound federation can receive v1 room-join requests
# SyTest currently only implements the v1 endpoints for /send_join and /send_leave,
# whereas Dendrite only supports the v2 endpoints for those, so let's ignore this
# test for now.
#Inbound federation can receive v1 room-join requests
Typing events appear in initial sync
Typing events appear in incremental sync
Typing events appear in gapped sync
@ -156,8 +165,14 @@ User can create and send/receive messages in a room with version 1
POST /createRoom ignores attempts to set the room version via creation_content
Inbound federation rejects remote attempts to join local users to rooms
Inbound federation rejects remote attempts to kick local users to rooms
An event which redacts itself should be ignored
A pair of events which redact each other should be ignored
# SyTest currently only implements the v1 endpoints for /send_join and /send_leave,
# whereas Dendrite only supports the v2 endpoints for those, so let's ignore this
# test for now.
#An event which redacts itself should be ignored
# SyTest currently only implements the v1 endpoints for /send_join and /send_leave,
# whereas Dendrite only supports the v2 endpoints for those, so let's ignore this
# test for now.
#A pair of events which redact each other should be ignored
Full state sync includes joined rooms
A message sent after an initial sync appears in the timeline of an incremental sync.
Can add tag
@ -182,3 +197,9 @@ Regular users cannot create room aliases within the AS namespace
Deleting a non-existent alias should return a 404
Users can't delete other's aliases
Outbound federation can query room alias directory
After deactivating account, can't log in with an email
Remote room alias queries can handle Unicode
Newly joined room is included in an incremental sync after invite
Outbound federation can query v1 /send_join
Inbound /v1/make_join rejects remote attempts to join local users to rooms
Inbound federation rejects invites which are not signed by the sender