Merge branch 'master' of https://github.com/matrix-org/dendrite into basicauth-metrics

This commit is contained in:
Till Faelligen 2020-02-21 12:17:30 +01:00
commit 8323948c5f
31 changed files with 1197 additions and 255 deletions

View file

@ -34,7 +34,7 @@ import (
type OutputRoomEventConsumer struct { type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer roomServerConsumer *common.ContinualConsumer
db accounts.Database db accounts.Database
asDB *storage.Database asDB storage.Database
query api.RoomserverQueryAPI query api.RoomserverQueryAPI
alias api.RoomserverAliasAPI alias api.RoomserverAliasAPI
serverName string serverName string
@ -47,7 +47,7 @@ func NewOutputRoomEventConsumer(
cfg *config.Dendrite, cfg *config.Dendrite,
kafkaConsumer sarama.Consumer, kafkaConsumer sarama.Consumer,
store accounts.Database, store accounts.Database,
appserviceDB *storage.Database, appserviceDB storage.Database,
queryAPI api.RoomserverQueryAPI, queryAPI api.RoomserverQueryAPI,
aliasAPI api.RoomserverAliasAPI, aliasAPI api.RoomserverAliasAPI,
workerStates []types.ApplicationServiceWorkerState, workerStates []types.ApplicationServiceWorkerState,

View file

@ -1,4 +1,5 @@
// Copyright 2018 New Vector Ltd // Copyright 2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package storage package postgres
import ( import (
"context" "context"

View file

@ -0,0 +1,111 @@
// Copyright 2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
// Import postgres database driver
_ "github.com/lib/pq"
"github.com/matrix-org/gomatrixserverlib"
)
// Database stores events intended to be later sent to application services
type Database struct {
events eventsStatements
txnID txnStatements
db *sql.DB
}
// NewDatabase opens a new database
func NewDatabase(dataSourceName string) (*Database, error) {
var result Database
var err error
if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
return nil, err
}
if err = result.prepare(); err != nil {
return nil, err
}
return &result, nil
}
func (d *Database) prepare() error {
if err := d.events.prepare(d.db); err != nil {
return err
}
return d.txnID.prepare(d.db)
}
// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
// for a transaction worker to pull and later send to an application service.
func (d *Database) StoreEvent(
ctx context.Context,
appServiceID string,
event *gomatrixserverlib.Event,
) error {
return d.events.insertEvent(ctx, appServiceID, event)
}
// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
// be sent to an application service given its ID.
func (d *Database) GetEventsWithAppServiceID(
ctx context.Context,
appServiceID string,
limit int,
) (int, int, []gomatrixserverlib.Event, bool, error) {
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
}
// CountEventsWithAppServiceID returns the number of events destined for an
// application service given its ID.
func (d *Database) CountEventsWithAppServiceID(
ctx context.Context,
appServiceID string,
) (int, error) {
return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
}
// UpdateTxnIDForEvents takes in an application service ID and a
// and stores them in the DB, unless the pair already exists, in
// which case it updates them.
func (d *Database) UpdateTxnIDForEvents(
ctx context.Context,
appserviceID string,
maxID, txnID int,
) error {
return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
}
// RemoveEventsBeforeAndIncludingID removes all events from the database that
// are less than or equal to a given maximum ID. IDs here are implemented as a
// serial, thus this should always delete events in chronological order.
func (d *Database) RemoveEventsBeforeAndIncludingID(
ctx context.Context,
appserviceID string,
eventTableID int,
) error {
return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
}
// GetLatestTxnID returns the latest available transaction id
func (d *Database) GetLatestTxnID(
ctx context.Context,
) (int, error) {
return d.txnID.selectTxnID(ctx)
}

View file

@ -1,4 +1,5 @@
// Copyright 2018 New Vector Ltd // Copyright 2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package storage package postgres
import ( import (
"context" "context"

View file

@ -0,0 +1,249 @@
// Copyright 2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"encoding/json"
"time"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
const appserviceEventsSchema = `
-- Stores events to be sent to application services
CREATE TABLE IF NOT EXISTS appservice_events (
-- An auto-incrementing id unique to each event in the table
id INTEGER PRIMARY KEY AUTOINCREMENT,
-- The ID of the application service the event will be sent to
as_id TEXT NOT NULL,
-- JSON representation of the event
event_json TEXT NOT NULL,
-- The ID of the transaction that this event is a part of
txn_id INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
`
const selectEventsByApplicationServiceIDSQL = "" +
"SELECT id, event_json, txn_id " +
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
const countEventsByApplicationServiceIDSQL = "" +
"SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
const insertEventSQL = "" +
"INSERT INTO appservice_events(as_id, event_json, txn_id) " +
"VALUES ($1, $2, $3)"
const updateTxnIDForEventsSQL = "" +
"UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
const deleteEventsBeforeAndIncludingIDSQL = "" +
"DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2"
const (
// A transaction ID number that no transaction should ever have. Used for
// checking again the default value.
invalidTxnID = -2
)
type eventsStatements struct {
selectEventsByApplicationServiceIDStmt *sql.Stmt
countEventsByApplicationServiceIDStmt *sql.Stmt
insertEventStmt *sql.Stmt
updateTxnIDForEventsStmt *sql.Stmt
deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
}
func (s *eventsStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(appserviceEventsSchema)
if err != nil {
return
}
if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil {
return
}
if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil {
return
}
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
return
}
if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil {
return
}
if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil {
return
}
return
}
// selectEventsByApplicationServiceID takes in an application service ID and
// returns a slice of events that need to be sent to that application service,
// as well as an int later used to remove these same events from the database
// once successfully sent to an application service.
func (s *eventsStatements) selectEventsByApplicationServiceID(
ctx context.Context,
applicationServiceID string,
limit int,
) (
txnID, maxID int,
events []gomatrixserverlib.Event,
eventsRemaining bool,
err error,
) {
// Retrieve events from the database. Unsuccessfully sent events first
eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
if err != nil {
return
}
defer func() {
err = eventRows.Close()
if err != nil {
log.WithFields(log.Fields{
"appservice": applicationServiceID,
}).WithError(err).Fatalf("appservice unable to select new events to send")
}
}()
events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit)
if err != nil {
return
}
return
}
func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) {
// Get current time for use in calculating event age
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
// Iterate through each row and store event contents
// If txn_id changes dramatically, we've switched from collecting old events to
// new ones. Send back those events first.
lastTxnID := invalidTxnID
for eventsProcessed := 0; eventRows.Next(); {
var event gomatrixserverlib.Event
var eventJSON []byte
var id int
err = eventRows.Scan(
&id,
&eventJSON,
&txnID,
)
if err != nil {
return nil, 0, 0, false, err
}
// Unmarshal eventJSON
if err = json.Unmarshal(eventJSON, &event); err != nil {
return nil, 0, 0, false, err
}
// If txnID has changed on this event from the previous event, then we've
// reached the end of a transaction's events. Return only those events.
if lastTxnID > invalidTxnID && lastTxnID != txnID {
return events, maxID, lastTxnID, true, nil
}
lastTxnID = txnID
// Limit events that aren't part of an old transaction
if txnID == -1 {
// Return if we've hit the limit
if eventsProcessed++; eventsProcessed > limit {
return events, maxID, lastTxnID, true, nil
}
}
if id > maxID {
maxID = id
}
// Portion of the event that is unsigned due to rapid change
// TODO: Consider removing age as not many app services use it
if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil {
return nil, 0, 0, false, err
}
events = append(events, event)
}
return
}
// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service
// IDs into the db.
func (s *eventsStatements) countEventsByApplicationServiceID(
ctx context.Context,
appServiceID string,
) (int, error) {
var count int
err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
if err != nil && err != sql.ErrNoRows {
return 0, err
}
return count, nil
}
// insertEvent inserts an event mapped to its corresponding application service
// IDs into the db.
func (s *eventsStatements) insertEvent(
ctx context.Context,
appServiceID string,
event *gomatrixserverlib.Event,
) (err error) {
// Convert event to JSON before inserting
eventJSON, err := json.Marshal(event)
if err != nil {
return err
}
_, err = s.insertEventStmt.ExecContext(
ctx,
appServiceID,
eventJSON,
-1, // No transaction ID yet
)
return
}
// updateTxnIDForEvents sets the transactionID for a collection of events. Done
// before sending them to an AppService. Referenced before sending to make sure
// we aren't constructing multiple transactions with the same events.
func (s *eventsStatements) updateTxnIDForEvents(
ctx context.Context,
appserviceID string,
maxID, txnID int,
) (err error) {
_, err = s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID)
return
}
// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database.
func (s *eventsStatements) deleteEventsBeforeAndIncludingID(
ctx context.Context,
appserviceID string,
eventTableID int,
) (err error) {
_, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID)
return
}

View file

@ -0,0 +1,111 @@
// Copyright 2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
// Import SQLite database driver
"github.com/matrix-org/gomatrixserverlib"
_ "github.com/mattn/go-sqlite3"
)
// Database stores events intended to be later sent to application services
type Database struct {
events eventsStatements
txnID txnStatements
db *sql.DB
}
// NewDatabase opens a new database
func NewDatabase(dataSourceName string) (*Database, error) {
var result Database
var err error
if result.db, err = sql.Open("sqlite3", dataSourceName); err != nil {
return nil, err
}
if err = result.prepare(); err != nil {
return nil, err
}
return &result, nil
}
func (d *Database) prepare() error {
if err := d.events.prepare(d.db); err != nil {
return err
}
return d.txnID.prepare(d.db)
}
// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
// for a transaction worker to pull and later send to an application service.
func (d *Database) StoreEvent(
ctx context.Context,
appServiceID string,
event *gomatrixserverlib.Event,
) error {
return d.events.insertEvent(ctx, appServiceID, event)
}
// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
// be sent to an application service given its ID.
func (d *Database) GetEventsWithAppServiceID(
ctx context.Context,
appServiceID string,
limit int,
) (int, int, []gomatrixserverlib.Event, bool, error) {
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
}
// CountEventsWithAppServiceID returns the number of events destined for an
// application service given its ID.
func (d *Database) CountEventsWithAppServiceID(
ctx context.Context,
appServiceID string,
) (int, error) {
return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
}
// UpdateTxnIDForEvents takes in an application service ID and a
// and stores them in the DB, unless the pair already exists, in
// which case it updates them.
func (d *Database) UpdateTxnIDForEvents(
ctx context.Context,
appserviceID string,
maxID, txnID int,
) error {
return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
}
// RemoveEventsBeforeAndIncludingID removes all events from the database that
// are less than or equal to a given maximum ID. IDs here are implemented as a
// serial, thus this should always delete events in chronological order.
func (d *Database) RemoveEventsBeforeAndIncludingID(
ctx context.Context,
appserviceID string,
eventTableID int,
) error {
return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
}
// GetLatestTxnID returns the latest available transaction id
func (d *Database) GetLatestTxnID(
ctx context.Context,
) (int, error) {
return d.txnID.selectTxnID(ctx)
}

View file

@ -0,0 +1,60 @@
// Copyright 2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
)
const txnIDSchema = `
-- Keeps a count of the current transaction ID
CREATE TABLE IF NOT EXISTS appservice_counters (
name TEXT PRIMARY KEY NOT NULL,
last_id INTEGER DEFAULT 1
);
INSERT OR IGNORE INTO appservice_counters (name, last_id) VALUES('txn_id', 1);
`
const selectTxnIDSQL = `
SELECT last_id FROM appservice_counters WHERE name='txn_id';
UPDATE appservice_counters SET last_id=last_id+1 WHERE name='txn_id';
`
type txnStatements struct {
selectTxnIDStmt *sql.Stmt
}
func (s *txnStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(txnIDSchema)
if err != nil {
return
}
if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil {
return
}
return
}
// selectTxnID selects the latest ascending transaction ID
func (s *txnStatements) selectTxnID(
ctx context.Context,
) (txnID int, err error) {
err = s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID)
return
}

View file

@ -1,4 +1,4 @@
// Copyright 2018 New Vector Ltd // Copyright 2020 The Matrix.org Foundation C.I.C.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -16,95 +16,33 @@ package storage
import ( import (
"context" "context"
"database/sql" "net/url"
// Import postgres database driver "github.com/matrix-org/dendrite/appservice/storage/postgres"
_ "github.com/lib/pq" "github.com/matrix-org/dendrite/appservice/storage/sqlite3"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
// Database stores events intended to be later sent to application services type Database interface {
type Database struct { StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.Event) error
events eventsStatements GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.Event, bool, error)
txnID txnStatements CountEventsWithAppServiceID(ctx context.Context, appServiceID string) (int, error)
db *sql.DB UpdateTxnIDForEvents(ctx context.Context, appserviceID string, maxID, txnID int) error
RemoveEventsBeforeAndIncludingID(ctx context.Context, appserviceID string, eventTableID int) error
GetLatestTxnID(ctx context.Context) (int, error)
} }
// NewDatabase opens a new database func NewDatabase(dataSourceName string) (Database, error) {
func NewDatabase(dataSourceName string) (*Database, error) { uri, err := url.Parse(dataSourceName)
var result Database if err != nil {
var err error return postgres.NewDatabase(dataSourceName)
if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
return nil, err
} }
if err = result.prepare(); err != nil { switch uri.Scheme {
return nil, err case "postgres":
return postgres.NewDatabase(dataSourceName)
case "file":
return sqlite3.NewDatabase(dataSourceName)
default:
return postgres.NewDatabase(dataSourceName)
} }
return &result, nil
}
func (d *Database) prepare() error {
if err := d.events.prepare(d.db); err != nil {
return err
}
return d.txnID.prepare(d.db)
}
// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
// for a transaction worker to pull and later send to an application service.
func (d *Database) StoreEvent(
ctx context.Context,
appServiceID string,
event *gomatrixserverlib.Event,
) error {
return d.events.insertEvent(ctx, appServiceID, event)
}
// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
// be sent to an application service given its ID.
func (d *Database) GetEventsWithAppServiceID(
ctx context.Context,
appServiceID string,
limit int,
) (int, int, []gomatrixserverlib.Event, bool, error) {
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
}
// CountEventsWithAppServiceID returns the number of events destined for an
// application service given its ID.
func (d *Database) CountEventsWithAppServiceID(
ctx context.Context,
appServiceID string,
) (int, error) {
return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
}
// UpdateTxnIDForEvents takes in an application service ID and a
// and stores them in the DB, unless the pair already exists, in
// which case it updates them.
func (d *Database) UpdateTxnIDForEvents(
ctx context.Context,
appserviceID string,
maxID, txnID int,
) error {
return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
}
// RemoveEventsBeforeAndIncludingID removes all events from the database that
// are less than or equal to a given maximum ID. IDs here are implemented as a
// serial, thus this should always delete events in chronological order.
func (d *Database) RemoveEventsBeforeAndIncludingID(
ctx context.Context,
appserviceID string,
eventTableID int,
) error {
return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
}
// GetLatestTxnID returns the latest available transaction id
func (d *Database) GetLatestTxnID(
ctx context.Context,
) (int, error) {
return d.txnID.selectTxnID(ctx)
} }

View file

@ -43,7 +43,7 @@ var (
// size), then send that off to the AS's /transactions/{txnID} endpoint. It also // size), then send that off to the AS's /transactions/{txnID} endpoint. It also
// handles exponentially backing off in case the AS isn't currently available. // handles exponentially backing off in case the AS isn't currently available.
func SetupTransactionWorkers( func SetupTransactionWorkers(
appserviceDB *storage.Database, appserviceDB storage.Database,
workerStates []types.ApplicationServiceWorkerState, workerStates []types.ApplicationServiceWorkerState,
) error { ) error {
// Create a worker that handles transmitting events to a single homeserver // Create a worker that handles transmitting events to a single homeserver
@ -58,7 +58,7 @@ func SetupTransactionWorkers(
// worker is a goroutine that sends any queued events to the application service // worker is a goroutine that sends any queued events to the application service
// it is given. // it is given.
func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { func worker(db storage.Database, ws types.ApplicationServiceWorkerState) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"appservice": ws.AppService.ID, "appservice": ws.AppService.ID,
}).Info("starting application service") }).Info("starting application service")
@ -149,7 +149,7 @@ func backoff(ws *types.ApplicationServiceWorkerState, err error) {
// transaction, and JSON-encodes the results. // transaction, and JSON-encodes the results.
func createTransaction( func createTransaction(
ctx context.Context, ctx context.Context,
db *storage.Database, db storage.Database,
appserviceID string, appserviceID string,
) ( ) (
transactionJSON []byte, transactionJSON []byte,

View file

@ -18,6 +18,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"fmt"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -47,12 +48,8 @@ const selectFilterIDByContentSQL = "" +
const insertFilterSQL = "" + const insertFilterSQL = "" +
"INSERT INTO account_filter (filter, localpart) VALUES ($1, $2)" "INSERT INTO account_filter (filter, localpart) VALUES ($1, $2)"
const selectLastInsertedFilterIDSQL = "" +
"SELECT id FROM account_filter WHERE rowid = last_insert_rowid()"
type filterStatements struct { type filterStatements struct {
selectFilterStmt *sql.Stmt selectFilterStmt *sql.Stmt
selectLastInsertedFilterIDStmt *sql.Stmt
selectFilterIDByContentStmt *sql.Stmt selectFilterIDByContentStmt *sql.Stmt
insertFilterStmt *sql.Stmt insertFilterStmt *sql.Stmt
} }
@ -65,9 +62,6 @@ func (s *filterStatements) prepare(db *sql.DB) (err error) {
if s.selectFilterStmt, err = db.Prepare(selectFilterSQL); err != nil { if s.selectFilterStmt, err = db.Prepare(selectFilterSQL); err != nil {
return return
} }
if s.selectLastInsertedFilterIDStmt, err = db.Prepare(selectLastInsertedFilterIDSQL); err != nil {
return
}
if s.selectFilterIDByContentStmt, err = db.Prepare(selectFilterIDByContentSQL); err != nil { if s.selectFilterIDByContentStmt, err = db.Prepare(selectFilterIDByContentSQL); err != nil {
return return
} }
@ -128,12 +122,14 @@ func (s *filterStatements) insertFilter(
} }
// Otherwise insert the filter and return the new ID // Otherwise insert the filter and return the new ID
if _, err = s.insertFilterStmt.ExecContext(ctx, filterJSON, localpart); err != nil { res, err := s.insertFilterStmt.ExecContext(ctx, filterJSON, localpart)
if err != nil {
return "", err return "", err
} }
row := s.selectLastInsertedFilterIDStmt.QueryRowContext(ctx) rowid, err := res.LastInsertId()
if err := row.Scan(&filterID); err != nil { if err != nil {
return "", err return "", err
} }
filterID = fmt.Sprintf("%d", rowid)
return return
} }

View file

@ -1,8 +1,4 @@
<<<<<<< HEAD
FROM docker.io/golang:1.13.7-alpine3.11 FROM docker.io/golang:1.13.7-alpine3.11
=======
FROM docker.io/golang:1.13.6-alpine
>>>>>>> master
RUN mkdir /build RUN mkdir /build

View file

@ -0,0 +1,115 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"time"
"github.com/matrix-org/dendrite/mediaapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
const mediaSchema = `
-- The media_repository table holds metadata for each media file stored and accessible to the local server,
-- the actual file is stored separately.
CREATE TABLE IF NOT EXISTS mediaapi_media_repository (
-- The id used to refer to the media.
-- For uploads to this server this is a base64-encoded sha256 hash of the file data
-- For media from remote servers, this can be any unique identifier string
media_id TEXT NOT NULL,
-- The origin of the media as requested by the client. Should be a homeserver domain.
media_origin TEXT NOT NULL,
-- The MIME-type of the media file as specified when uploading.
content_type TEXT NOT NULL,
-- Size of the media file in bytes.
file_size_bytes INTEGER NOT NULL,
-- When the content was uploaded in UNIX epoch ms.
creation_ts INTEGER NOT NULL,
-- The file name with which the media was uploaded.
upload_name TEXT NOT NULL,
-- Alternate RFC 4648 unpadded base64 encoding string representation of a SHA-256 hash sum of the file data.
base64hash TEXT NOT NULL,
-- The user who uploaded the file. Should be a Matrix user ID.
user_id TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS mediaapi_media_repository_index ON mediaapi_media_repository (media_id, media_origin);
`
const insertMediaSQL = `
INSERT INTO mediaapi_media_repository (media_id, media_origin, content_type, file_size_bytes, creation_ts, upload_name, base64hash, user_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`
const selectMediaSQL = `
SELECT content_type, file_size_bytes, creation_ts, upload_name, base64hash, user_id FROM mediaapi_media_repository WHERE media_id = $1 AND media_origin = $2
`
type mediaStatements struct {
insertMediaStmt *sql.Stmt
selectMediaStmt *sql.Stmt
}
func (s *mediaStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(mediaSchema)
if err != nil {
return
}
return statementList{
{&s.insertMediaStmt, insertMediaSQL},
{&s.selectMediaStmt, selectMediaSQL},
}.prepare(db)
}
func (s *mediaStatements) insertMedia(
ctx context.Context, mediaMetadata *types.MediaMetadata,
) error {
mediaMetadata.CreationTimestamp = types.UnixMs(time.Now().UnixNano() / 1000000)
_, err := s.insertMediaStmt.ExecContext(
ctx,
mediaMetadata.MediaID,
mediaMetadata.Origin,
mediaMetadata.ContentType,
mediaMetadata.FileSizeBytes,
mediaMetadata.CreationTimestamp,
mediaMetadata.UploadName,
mediaMetadata.Base64Hash,
mediaMetadata.UserID,
)
return err
}
func (s *mediaStatements) selectMedia(
ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName,
) (*types.MediaMetadata, error) {
mediaMetadata := types.MediaMetadata{
MediaID: mediaID,
Origin: mediaOrigin,
}
err := s.selectMediaStmt.QueryRowContext(
ctx, mediaMetadata.MediaID, mediaMetadata.Origin,
).Scan(
&mediaMetadata.ContentType,
&mediaMetadata.FileSizeBytes,
&mediaMetadata.CreationTimestamp,
&mediaMetadata.UploadName,
&mediaMetadata.Base64Hash,
&mediaMetadata.UserID,
)
return &mediaMetadata, err
}

View file

@ -0,0 +1,38 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// FIXME: This should be made common!
package sqlite3
import (
"database/sql"
)
// a statementList is a list of SQL statements to prepare and a pointer to where to store the resulting prepared statement.
type statementList []struct {
statement **sql.Stmt
sql string
}
// prepare the SQL for each statement in the list and assign the result to the prepared statement.
func (s statementList) prepare(db *sql.DB) (err error) {
for _, statement := range s {
if *statement.statement, err = db.Prepare(statement.sql); err != nil {
return
}
}
return
}

View file

@ -0,0 +1,36 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"database/sql"
)
type statements struct {
media mediaStatements
thumbnail thumbnailStatements
}
func (s *statements) prepare(db *sql.DB) (err error) {
if err = s.media.prepare(db); err != nil {
return
}
if err = s.thumbnail.prepare(db); err != nil {
return
}
return
}

View file

@ -0,0 +1,106 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
// Import the postgres database driver.
"github.com/matrix-org/dendrite/mediaapi/types"
"github.com/matrix-org/gomatrixserverlib"
_ "github.com/mattn/go-sqlite3"
)
// Database is used to store metadata about a repository of media files.
type Database struct {
statements statements
db *sql.DB
}
// Open opens a postgres database.
func Open(dataSourceName string) (*Database, error) {
var d Database
var err error
if d.db, err = sql.Open("sqlite3", dataSourceName); err != nil {
return nil, err
}
if err = d.statements.prepare(d.db); err != nil {
return nil, err
}
return &d, nil
}
// StoreMediaMetadata inserts the metadata about the uploaded media into the database.
// Returns an error if the combination of MediaID and Origin are not unique in the table.
func (d *Database) StoreMediaMetadata(
ctx context.Context, mediaMetadata *types.MediaMetadata,
) error {
return d.statements.media.insertMedia(ctx, mediaMetadata)
}
// GetMediaMetadata returns metadata about media stored on this server.
// The media could have been uploaded to this server or fetched from another server and cached here.
// Returns nil metadata if there is no metadata associated with this media.
func (d *Database) GetMediaMetadata(
ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName,
) (*types.MediaMetadata, error) {
mediaMetadata, err := d.statements.media.selectMedia(ctx, mediaID, mediaOrigin)
if err != nil && err == sql.ErrNoRows {
return nil, nil
}
return mediaMetadata, err
}
// StoreThumbnail inserts the metadata about the thumbnail into the database.
// Returns an error if the combination of MediaID and Origin are not unique in the table.
func (d *Database) StoreThumbnail(
ctx context.Context, thumbnailMetadata *types.ThumbnailMetadata,
) error {
return d.statements.thumbnail.insertThumbnail(ctx, thumbnailMetadata)
}
// GetThumbnail returns metadata about a specific thumbnail.
// The media could have been uploaded to this server or fetched from another server and cached here.
// Returns nil metadata if there is no metadata associated with this thumbnail.
func (d *Database) GetThumbnail(
ctx context.Context,
mediaID types.MediaID,
mediaOrigin gomatrixserverlib.ServerName,
width, height int,
resizeMethod string,
) (*types.ThumbnailMetadata, error) {
thumbnailMetadata, err := d.statements.thumbnail.selectThumbnail(
ctx, mediaID, mediaOrigin, width, height, resizeMethod,
)
if err != nil && err == sql.ErrNoRows {
return nil, nil
}
return thumbnailMetadata, err
}
// GetThumbnails returns metadata about all thumbnails for a specific media stored on this server.
// The media could have been uploaded to this server or fetched from another server and cached here.
// Returns nil metadata if there are no thumbnails associated with this media.
func (d *Database) GetThumbnails(
ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName,
) ([]*types.ThumbnailMetadata, error) {
thumbnails, err := d.statements.thumbnail.selectThumbnails(ctx, mediaID, mediaOrigin)
if err != nil && err == sql.ErrNoRows {
return nil, nil
}
return thumbnails, err
}

View file

@ -0,0 +1,162 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"time"
"github.com/matrix-org/dendrite/mediaapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
const thumbnailSchema = `
-- The mediaapi_thumbnail table holds metadata for each thumbnail file stored and accessible to the local server,
-- the actual file is stored separately.
CREATE TABLE IF NOT EXISTS mediaapi_thumbnail (
media_id TEXT NOT NULL,
media_origin TEXT NOT NULL,
content_type TEXT NOT NULL,
file_size_bytes INTEGER NOT NULL,
creation_ts INTEGER NOT NULL,
width INTEGER NOT NULL,
height INTEGER NOT NULL,
resize_method TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS mediaapi_thumbnail_index ON mediaapi_thumbnail (media_id, media_origin, width, height, resize_method);
`
const insertThumbnailSQL = `
INSERT INTO mediaapi_thumbnail (media_id, media_origin, content_type, file_size_bytes, creation_ts, width, height, resize_method)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`
// Note: this selects one specific thumbnail
const selectThumbnailSQL = `
SELECT content_type, file_size_bytes, creation_ts FROM mediaapi_thumbnail WHERE media_id = $1 AND media_origin = $2 AND width = $3 AND height = $4 AND resize_method = $5
`
// Note: this selects all thumbnails for a media_origin and media_id
const selectThumbnailsSQL = `
SELECT content_type, file_size_bytes, creation_ts, width, height, resize_method FROM mediaapi_thumbnail WHERE media_id = $1 AND media_origin = $2
`
type thumbnailStatements struct {
insertThumbnailStmt *sql.Stmt
selectThumbnailStmt *sql.Stmt
selectThumbnailsStmt *sql.Stmt
}
func (s *thumbnailStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(thumbnailSchema)
if err != nil {
return
}
return statementList{
{&s.insertThumbnailStmt, insertThumbnailSQL},
{&s.selectThumbnailStmt, selectThumbnailSQL},
{&s.selectThumbnailsStmt, selectThumbnailsSQL},
}.prepare(db)
}
func (s *thumbnailStatements) insertThumbnail(
ctx context.Context, thumbnailMetadata *types.ThumbnailMetadata,
) error {
thumbnailMetadata.MediaMetadata.CreationTimestamp = types.UnixMs(time.Now().UnixNano() / 1000000)
_, err := s.insertThumbnailStmt.ExecContext(
ctx,
thumbnailMetadata.MediaMetadata.MediaID,
thumbnailMetadata.MediaMetadata.Origin,
thumbnailMetadata.MediaMetadata.ContentType,
thumbnailMetadata.MediaMetadata.FileSizeBytes,
thumbnailMetadata.MediaMetadata.CreationTimestamp,
thumbnailMetadata.ThumbnailSize.Width,
thumbnailMetadata.ThumbnailSize.Height,
thumbnailMetadata.ThumbnailSize.ResizeMethod,
)
return err
}
func (s *thumbnailStatements) selectThumbnail(
ctx context.Context,
mediaID types.MediaID,
mediaOrigin gomatrixserverlib.ServerName,
width, height int,
resizeMethod string,
) (*types.ThumbnailMetadata, error) {
thumbnailMetadata := types.ThumbnailMetadata{
MediaMetadata: &types.MediaMetadata{
MediaID: mediaID,
Origin: mediaOrigin,
},
ThumbnailSize: types.ThumbnailSize{
Width: width,
Height: height,
ResizeMethod: resizeMethod,
},
}
err := s.selectThumbnailStmt.QueryRowContext(
ctx,
thumbnailMetadata.MediaMetadata.MediaID,
thumbnailMetadata.MediaMetadata.Origin,
thumbnailMetadata.ThumbnailSize.Width,
thumbnailMetadata.ThumbnailSize.Height,
thumbnailMetadata.ThumbnailSize.ResizeMethod,
).Scan(
&thumbnailMetadata.MediaMetadata.ContentType,
&thumbnailMetadata.MediaMetadata.FileSizeBytes,
&thumbnailMetadata.MediaMetadata.CreationTimestamp,
)
return &thumbnailMetadata, err
}
func (s *thumbnailStatements) selectThumbnails(
ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName,
) ([]*types.ThumbnailMetadata, error) {
rows, err := s.selectThumbnailsStmt.QueryContext(
ctx, mediaID, mediaOrigin,
)
if err != nil {
return nil, err
}
defer rows.Close() // nolint: errcheck
var thumbnails []*types.ThumbnailMetadata
for rows.Next() {
thumbnailMetadata := types.ThumbnailMetadata{
MediaMetadata: &types.MediaMetadata{
MediaID: mediaID,
Origin: mediaOrigin,
},
}
err = rows.Scan(
&thumbnailMetadata.MediaMetadata.ContentType,
&thumbnailMetadata.MediaMetadata.FileSizeBytes,
&thumbnailMetadata.MediaMetadata.CreationTimestamp,
&thumbnailMetadata.ThumbnailSize.Width,
&thumbnailMetadata.ThumbnailSize.Height,
&thumbnailMetadata.ThumbnailSize.ResizeMethod,
)
if err != nil {
return nil, err
}
thumbnails = append(thumbnails, &thumbnailMetadata)
}
return thumbnails, rows.Err()
}

View file

@ -19,6 +19,7 @@ import (
"net/url" "net/url"
"github.com/matrix-org/dendrite/mediaapi/storage/postgres" "github.com/matrix-org/dendrite/mediaapi/storage/postgres"
"github.com/matrix-org/dendrite/mediaapi/storage/sqlite3"
"github.com/matrix-org/dendrite/mediaapi/types" "github.com/matrix-org/dendrite/mediaapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -40,6 +41,8 @@ func Open(dataSourceName string) (Database, error) {
switch uri.Scheme { switch uri.Scheme {
case "postgres": case "postgres":
return postgres.Open(dataSourceName) return postgres.Open(dataSourceName)
case "file":
return sqlite3.Open(dataSourceName)
default: default:
return postgres.Open(dataSourceName) return postgres.Open(dataSourceName)
} }

View file

@ -39,26 +39,15 @@ var editableAttributes = []string{
const publicRoomsSchema = ` const publicRoomsSchema = `
-- Stores all of the rooms with data needed to create the server's room directory -- Stores all of the rooms with data needed to create the server's room directory
CREATE TABLE IF NOT EXISTS publicroomsapi_public_rooms( CREATE TABLE IF NOT EXISTS publicroomsapi_public_rooms(
-- The room's ID
room_id TEXT NOT NULL PRIMARY KEY, room_id TEXT NOT NULL PRIMARY KEY,
-- Number of joined members in the room
joined_members INTEGER NOT NULL DEFAULT 0, joined_members INTEGER NOT NULL DEFAULT 0,
-- Aliases of the room (empty array if none) aliases TEXT NOT NULL DEFAULT '',
aliases TEXT[] NOT NULL DEFAULT '{}'::TEXT[],
-- Canonical alias of the room (empty string if none)
canonical_alias TEXT NOT NULL DEFAULT '', canonical_alias TEXT NOT NULL DEFAULT '',
-- Name of the room (empty string if none)
name TEXT NOT NULL DEFAULT '', name TEXT NOT NULL DEFAULT '',
-- Topic of the room (empty string if none)
topic TEXT NOT NULL DEFAULT '', topic TEXT NOT NULL DEFAULT '',
-- Is the room world readable?
world_readable BOOLEAN NOT NULL DEFAULT false, world_readable BOOLEAN NOT NULL DEFAULT false,
-- Can guest join the room?
guest_can_join BOOLEAN NOT NULL DEFAULT false, guest_can_join BOOLEAN NOT NULL DEFAULT false,
-- URL of the room avatar (empty string if none)
avatar_url TEXT NOT NULL DEFAULT '', avatar_url TEXT NOT NULL DEFAULT '',
-- Visibility of the room: true means the room is publicly visible, false
-- means the room is private
visibility BOOLEAN NOT NULL DEFAULT false visibility BOOLEAN NOT NULL DEFAULT false
); );
` `
@ -71,13 +60,13 @@ const selectPublicRoomsSQL = "" +
"SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" + "SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" +
" FROM publicroomsapi_public_rooms WHERE visibility = true" + " FROM publicroomsapi_public_rooms WHERE visibility = true" +
" ORDER BY joined_members DESC" + " ORDER BY joined_members DESC" +
" OFFSET $1" " LIMIT 30 OFFSET $1"
const selectPublicRoomsWithLimitSQL = "" + const selectPublicRoomsWithLimitSQL = "" +
"SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" + "SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" +
" FROM publicroomsapi_public_rooms WHERE visibility = true" + " FROM publicroomsapi_public_rooms WHERE visibility = true" +
" ORDER BY joined_members DESC" + " ORDER BY joined_members DESC" +
" OFFSET $1 LIMIT $2" " LIMIT $2 OFFSET $1"
const selectPublicRoomsWithFilterSQL = "" + const selectPublicRoomsWithFilterSQL = "" +
"SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" + "SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" +
@ -85,9 +74,9 @@ const selectPublicRoomsWithFilterSQL = "" +
" WHERE visibility = true" + " WHERE visibility = true" +
" AND (LOWER(name) LIKE LOWER($1)" + " AND (LOWER(name) LIKE LOWER($1)" +
" OR LOWER(topic) LIKE LOWER($1)" + " OR LOWER(topic) LIKE LOWER($1)" +
" OR LOWER(ARRAY_TO_STRING(aliases, ',')) LIKE LOWER($1))" + " OR LOWER(aliases) LIKE LOWER($1))" + // TODO: Is there a better way to search aliases?
" ORDER BY joined_members DESC" + " ORDER BY joined_members DESC" +
" OFFSET $2" " LIMIT 30 OFFSET $2"
const selectPublicRoomsWithLimitAndFilterSQL = "" + const selectPublicRoomsWithLimitAndFilterSQL = "" +
"SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" + "SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" +
@ -95,9 +84,9 @@ const selectPublicRoomsWithLimitAndFilterSQL = "" +
" WHERE visibility = true" + " WHERE visibility = true" +
" AND (LOWER(name) LIKE LOWER($1)" + " AND (LOWER(name) LIKE LOWER($1)" +
" OR LOWER(topic) LIKE LOWER($1)" + " OR LOWER(topic) LIKE LOWER($1)" +
" OR LOWER(ARRAY_TO_STRING(aliases, ',')) LIKE LOWER($1))" + " OR LOWER(aliases) LIKE LOWER($1))" + // TODO: Is there a better way to search aliases?
" ORDER BY joined_members DESC" + " ORDER BY joined_members DESC" +
" OFFSET $2 LIMIT $3" " LIMIT $3 OFFSET $2"
const selectRoomVisibilitySQL = "" + const selectRoomVisibilitySQL = "" +
"SELECT visibility FROM publicroomsapi_public_rooms" + "SELECT visibility FROM publicroomsapi_public_rooms" +
@ -187,7 +176,7 @@ func (s *publicRoomsStatements) selectPublicRooms(
) )
} else { } else {
rows, err = s.selectPublicRoomsWithLimitAndFilterStmt.QueryContext( rows, err = s.selectPublicRoomsWithLimitAndFilterStmt.QueryContext(
ctx, pattern, offset, limit, ctx, pattern, limit, offset,
) )
} }
} else { } else {
@ -195,7 +184,7 @@ func (s *publicRoomsStatements) selectPublicRooms(
rows, err = s.selectPublicRoomsStmt.QueryContext(ctx, offset) rows, err = s.selectPublicRoomsStmt.QueryContext(ctx, offset)
} else { } else {
rows, err = s.selectPublicRoomsWithLimitStmt.QueryContext( rows, err = s.selectPublicRoomsWithLimitStmt.QueryContext(
ctx, offset, limit, ctx, limit, offset,
) )
} }
} }

View file

@ -20,6 +20,7 @@ import (
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/publicroomsapi/storage/postgres" "github.com/matrix-org/dendrite/publicroomsapi/storage/postgres"
"github.com/matrix-org/dendrite/publicroomsapi/storage/sqlite3"
"github.com/matrix-org/dendrite/publicroomsapi/types" "github.com/matrix-org/dendrite/publicroomsapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -43,6 +44,8 @@ func NewPublicRoomsServerDatabase(dataSourceName string) (Database, error) {
switch uri.Scheme { switch uri.Scheme {
case "postgres": case "postgres":
return postgres.NewPublicRoomsServerDatabase(dataSourceName) return postgres.NewPublicRoomsServerDatabase(dataSourceName)
case "file":
return sqlite3.NewPublicRoomsServerDatabase(dataSourceName)
default: default:
return postgres.NewPublicRoomsServerDatabase(dataSourceName) return postgres.NewPublicRoomsServerDatabase(dataSourceName)
} }

View file

@ -52,16 +52,18 @@ const selectInviteActiveForUserInRoomSQL = "" +
// However the matrix protocol doesn't give us a way to reliably identify the // However the matrix protocol doesn't give us a way to reliably identify the
// invites that were retired, so we are forced to retire all of them. // invites that were retired, so we are forced to retire all of them.
const updateInviteRetiredSQL = ` const updateInviteRetiredSQL = `
UPDATE roomserver_invites SET retired = TRUE UPDATE roomserver_invites SET retired = TRUE WHERE room_nid = $1 AND target_nid = $2 AND NOT retired
WHERE room_nid = $1 AND target_nid = $2 AND NOT retired; `
SELECT invite_event_id FROM roomserver_invites
WHERE rowid = last_insert_rowid(); const selectInvitesAboutToRetireSQL = `
SELECT invite_event_id FROM roomserver_invites WHERE room_nid = $1 AND target_nid = $2 AND NOT retired
` `
type inviteStatements struct { type inviteStatements struct {
insertInviteEventStmt *sql.Stmt insertInviteEventStmt *sql.Stmt
selectInviteActiveForUserInRoomStmt *sql.Stmt selectInviteActiveForUserInRoomStmt *sql.Stmt
updateInviteRetiredStmt *sql.Stmt updateInviteRetiredStmt *sql.Stmt
selectInvitesAboutToRetireStmt *sql.Stmt
} }
func (s *inviteStatements) prepare(db *sql.DB) (err error) { func (s *inviteStatements) prepare(db *sql.DB) (err error) {
@ -74,6 +76,7 @@ func (s *inviteStatements) prepare(db *sql.DB) (err error) {
{&s.insertInviteEventStmt, insertInviteEventSQL}, {&s.insertInviteEventStmt, insertInviteEventSQL},
{&s.selectInviteActiveForUserInRoomStmt, selectInviteActiveForUserInRoomSQL}, {&s.selectInviteActiveForUserInRoomStmt, selectInviteActiveForUserInRoomSQL},
{&s.updateInviteRetiredStmt, updateInviteRetiredSQL}, {&s.updateInviteRetiredStmt, updateInviteRetiredSQL},
{&s.selectInvitesAboutToRetireStmt, selectInvitesAboutToRetireSQL},
}.prepare(db) }.prepare(db)
} }
@ -102,7 +105,8 @@ func (s *inviteStatements) updateInviteRetired(
ctx context.Context, ctx context.Context,
txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
) (eventIDs []string, err error) { ) (eventIDs []string, err error) {
stmt := common.TxStmt(txn, s.updateInviteRetiredStmt) // gather all the event IDs we will retire
stmt := txn.Stmt(s.selectInvitesAboutToRetireStmt)
rows, err := stmt.QueryContext(ctx, roomNID, targetUserNID) rows, err := stmt.QueryContext(ctx, roomNID, targetUserNID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -110,11 +114,15 @@ func (s *inviteStatements) updateInviteRetired(
defer (func() { err = rows.Close() })() defer (func() { err = rows.Close() })()
for rows.Next() { for rows.Next() {
var inviteEventID string var inviteEventID string
if err := rows.Scan(&inviteEventID); err != nil { if err = rows.Scan(&inviteEventID); err != nil {
return nil, err return nil, err
} }
eventIDs = append(eventIDs, inviteEventID) eventIDs = append(eventIDs, inviteEventID)
} }
// now retire the invites
stmt = txn.Stmt(s.updateInviteRetiredStmt)
_, err = stmt.ExecContext(ctx, roomNID, targetUserNID)
return return
} }

View file

@ -103,6 +103,8 @@ func (s *roomAliasesStatements) selectAliasesFromRoomID(
return return
} }
defer rows.Close() // nolint: errcheck
for rows.Next() { for rows.Next() {
var alias string var alias string
if err = rows.Scan(&alias); err != nil { if err = rows.Scan(&alias); err != nil {

View file

@ -30,7 +30,7 @@ import (
const stateDataSchema = ` const stateDataSchema = `
CREATE TABLE IF NOT EXISTS roomserver_state_block ( CREATE TABLE IF NOT EXISTS roomserver_state_block (
state_block_nid INTEGER PRIMARY KEY AUTOINCREMENT, state_block_nid INTEGER NOT NULL,
event_type_nid INTEGER NOT NULL, event_type_nid INTEGER NOT NULL,
event_state_key_nid INTEGER NOT NULL, event_state_key_nid INTEGER NOT NULL,
event_nid INTEGER NOT NULL, event_nid INTEGER NOT NULL,
@ -43,10 +43,7 @@ const insertStateDataSQL = "" +
" VALUES ($1, $2, $3, $4)" " VALUES ($1, $2, $3, $4)"
const selectNextStateBlockNIDSQL = ` const selectNextStateBlockNIDSQL = `
SELECT COALESCE(( SELECT IFNULL(MAX(state_block_nid), 0) + 1 FROM roomserver_state_block
SELECT seq+1 AS state_block_nid FROM sqlite_sequence
WHERE name = 'roomserver_state_block'), 1
) AS state_block_nid
` `
// Bulk state lookup by numeric state block ID. // Bulk state lookup by numeric state block ID.
@ -98,11 +95,19 @@ func (s *stateBlockStatements) prepare(db *sql.DB) (err error) {
func (s *stateBlockStatements) bulkInsertStateData( func (s *stateBlockStatements) bulkInsertStateData(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
stateBlockNID types.StateBlockNID,
entries []types.StateEntry, entries []types.StateEntry,
) error { ) (types.StateBlockNID, error) {
if len(entries) == 0 {
return 0, nil
}
var stateBlockNID types.StateBlockNID
err := txn.Stmt(s.selectNextStateBlockNIDStmt).QueryRowContext(ctx).Scan(&stateBlockNID)
if err != nil {
return 0, err
}
for _, entry := range entries { for _, entry := range entries {
_, err := common.TxStmt(txn, s.insertStateDataStmt).ExecContext( _, err := txn.Stmt(s.insertStateDataStmt).ExecContext(
ctx, ctx,
int64(stateBlockNID), int64(stateBlockNID),
int64(entry.EventTypeNID), int64(entry.EventTypeNID),
@ -110,20 +115,10 @@ func (s *stateBlockStatements) bulkInsertStateData(
int64(entry.EventNID), int64(entry.EventNID),
) )
if err != nil { if err != nil {
return err return 0, err
} }
} }
return nil return stateBlockNID, nil
}
func (s *stateBlockStatements) selectNextStateBlockNID(
ctx context.Context,
txn *sql.Tx,
) (types.StateBlockNID, error) {
var stateBlockNID int64
selectStmt := common.TxStmt(txn, s.selectNextStateBlockNIDStmt)
err := selectStmt.QueryRowContext(ctx).Scan(&stateBlockNID)
return types.StateBlockNID(stateBlockNID), err
} }
func (s *stateBlockStatements) bulkSelectStateBlockEntries( func (s *stateBlockStatements) bulkSelectStateBlockEntries(

View file

@ -54,7 +54,12 @@ func Open(dataSourceName string) (*Database, error) {
} }
//d.db.Exec("PRAGMA journal_mode=WAL;") //d.db.Exec("PRAGMA journal_mode=WAL;")
//d.db.Exec("PRAGMA read_uncommitted = true;") //d.db.Exec("PRAGMA read_uncommitted = true;")
d.db.SetMaxOpenConns(2)
// FIXME: We are leaking connections somewhere. Setting this to 2 will eventually
// cause the roomserver to be unresponsive to new events because something will
// acquire the global mutex and never unlock it because it is waiting for a connection
// which it will never obtain.
d.db.SetMaxOpenConns(20)
if err = d.statements.prepare(d.db); err != nil { if err = d.statements.prepare(d.db); err != nil {
return nil, err return nil, err
} }
@ -253,12 +258,13 @@ func (d *Database) Events(
) ([]types.Event, error) { ) ([]types.Event, error) {
var eventJSONs []eventJSONPair var eventJSONs []eventJSONPair
var err error var err error
results := make([]types.Event, len(eventNIDs)) var results []types.Event
err = common.WithTransaction(d.db, func(txn *sql.Tx) error { err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
eventJSONs, err = d.statements.bulkSelectEventJSON(ctx, txn, eventNIDs) eventJSONs, err = d.statements.bulkSelectEventJSON(ctx, txn, eventNIDs)
if err != nil || len(eventJSONs) == 0 { if err != nil || len(eventJSONs) == 0 {
return nil return nil
} }
results = make([]types.Event, len(eventJSONs))
for i, eventJSON := range eventJSONs { for i, eventJSON := range eventJSONs {
result := &results[i] result := &results[i]
result.EventNID = eventJSON.EventNID result.EventNID = eventJSON.EventNID
@ -286,13 +292,10 @@ func (d *Database) AddState(
err = common.WithTransaction(d.db, func(txn *sql.Tx) error { err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
if len(state) > 0 { if len(state) > 0 {
var stateBlockNID types.StateBlockNID var stateBlockNID types.StateBlockNID
stateBlockNID, err = d.statements.selectNextStateBlockNID(ctx, txn) stateBlockNID, err = d.statements.bulkInsertStateData(ctx, txn, state)
if err != nil { if err != nil {
return err return err
} }
if err = d.statements.bulkInsertStateData(ctx, txn, stateBlockNID, state); err != nil {
return err
}
stateBlockNIDs = append(stateBlockNIDs[:len(stateBlockNIDs):len(stateBlockNIDs)], stateBlockNID) stateBlockNIDs = append(stateBlockNIDs[:len(stateBlockNIDs):len(stateBlockNIDs)], stateBlockNID)
} }
stateNID, err = d.statements.insertState(ctx, txn, roomNID, stateBlockNIDs) stateNID, err = d.statements.insertState(ctx, txn, roomNID, stateBlockNIDs)
@ -602,8 +605,9 @@ func (d *Database) StateEntriesForTuples(
// MembershipUpdater implements input.RoomEventDatabase // MembershipUpdater implements input.RoomEventDatabase
func (d *Database) MembershipUpdater( func (d *Database) MembershipUpdater(
ctx context.Context, roomID, targetUserID string, ctx context.Context, roomID, targetUserID string,
) (types.MembershipUpdater, error) { ) (updater types.MembershipUpdater, err error) {
txn, err := d.db.Begin() var txn *sql.Tx
txn, err = d.db.Begin()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -611,6 +615,18 @@ func (d *Database) MembershipUpdater(
defer func() { defer func() {
if !succeeded { if !succeeded {
txn.Rollback() // nolint: errcheck txn.Rollback() // nolint: errcheck
} else {
// TODO: We should be holding open this transaction but we cannot have
// multiple write transactions on sqlite. The code will perform additional
// write transactions independent of this one which will consistently cause
// 'database is locked' errors. For now, we'll break up the transaction and
// hope we don't race too catastrophically. Long term, we should be able to
// thread in txn objects where appropriate (either at the interface level or
// bring matrix business logic into the storage layer).
txerr := txn.Commit()
if err == nil && txerr != nil {
err = txerr
}
} }
}() }()
@ -624,7 +640,7 @@ func (d *Database) MembershipUpdater(
return nil, err return nil, err
} }
updater, err := d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID) updater, err = d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -658,7 +674,8 @@ func (d *Database) membershipUpdaterTxn(
} }
return &membershipUpdater{ return &membershipUpdater{
transaction{ctx, txn}, d, roomNID, targetUserNID, membership, // purposefully set the txn to nil so if we try to use it we panic and fail fast
transaction{ctx, nil}, d, roomNID, targetUserNID, membership,
}, nil }, nil
} }

View file

@ -76,14 +76,30 @@ while read -r test_name; do
fi fi
done <<< "${passed_but_expected_fail}" done <<< "${passed_but_expected_fail}"
# TODO: Check that the same test doesn't exist in both the whitelist and blacklist
# TODO: Check that the same test doesn't appear twice in the whitelist|blacklist
# Trim test output strings
tests_to_add=$(echo -e $tests_to_add | xargs)
already_in_whitelist=$(echo -e $already_in_whitelist | xargs)
# Format output with markdown for buildkite annotation rendering purposes
if [ -n "${tests_to_add}" ] && [ -n "${already_in_whitelist}" ]; then
echo "### 📜 SyTest Whitelist Maintenance"
fi
if [ -n "${tests_to_add}" ]; then if [ -n "${tests_to_add}" ]; then
echo "ERROR: The following passed tests are not present in $2. Please append them to the file:" echo "**ERROR**: The following tests passed but are not present in \`$2\`. Please append them to the file:"
echo "\`\`\`"
echo -e "${tests_to_add}" echo -e "${tests_to_add}"
echo "\`\`\`"
fi fi
if [ -n "${already_in_whitelist}" ]; then if [ -n "${already_in_whitelist}" ]; then
echo "WARN: Tests in the whitelist still marked as expected fail:" echo "**WARN**: Tests in the whitelist still marked as **expected fail**:"
echo "\`\`\`"
echo -e "${already_in_whitelist}" echo -e "${already_in_whitelist}"
echo "\`\`\`"
fi fi
exit ${fail_build} exit ${fail_build}

View file

@ -158,6 +158,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event": string(msg.Event.JSON()), "event": string(msg.Event.JSON()),
"pdupos": pduPos,
log.ErrorKey: err, log.ErrorKey: err,
}).Panicf("roomserver output log: write invite failure") }).Panicf("roomserver output log: write invite failure")
return nil return nil

View file

@ -19,15 +19,13 @@ import (
"context" "context"
"database/sql" "database/sql"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
const accountDataSchema = ` const accountDataSchema = `
CREATE TABLE IF NOT EXISTS syncapi_account_data_type ( CREATE TABLE IF NOT EXISTS syncapi_account_data_type (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY,
user_id TEXT NOT NULL, user_id TEXT NOT NULL,
room_id TEXT NOT NULL, room_id TEXT NOT NULL,
type TEXT NOT NULL, type TEXT NOT NULL,
@ -43,9 +41,7 @@ const insertAccountDataSQL = "" +
const selectAccountDataInRangeSQL = "" + const selectAccountDataInRangeSQL = "" +
"SELECT room_id, type FROM syncapi_account_data_type" + "SELECT room_id, type FROM syncapi_account_data_type" +
" WHERE user_id = $1 AND id > $2 AND id <= $3" + " WHERE user_id = $1 AND id > $2 AND id <= $3" +
" AND ( $4 IS NULL OR type IN ($4) )" + " ORDER BY id ASC"
" AND ( $5 IS NULL OR NOT(type IN ($5)) )" +
" ORDER BY id ASC LIMIT $6"
const selectMaxAccountDataIDSQL = "" + const selectMaxAccountDataIDSQL = "" +
"SELECT MAX(id) FROM syncapi_account_data_type" "SELECT MAX(id) FROM syncapi_account_data_type"
@ -53,8 +49,8 @@ const selectMaxAccountDataIDSQL = "" +
type accountDataStatements struct { type accountDataStatements struct {
streamIDStatements *streamIDStatements streamIDStatements *streamIDStatements
insertAccountDataStmt *sql.Stmt insertAccountDataStmt *sql.Stmt
selectAccountDataInRangeStmt *sql.Stmt
selectMaxAccountDataIDStmt *sql.Stmt selectMaxAccountDataIDStmt *sql.Stmt
selectAccountDataInRangeStmt *sql.Stmt
} }
func (s *accountDataStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) { func (s *accountDataStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) {
@ -66,10 +62,10 @@ func (s *accountDataStatements) prepare(db *sql.DB, streamID *streamIDStatements
if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil { if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil {
return return
} }
if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil { if s.selectMaxAccountDataIDStmt, err = db.Prepare(selectMaxAccountDataIDSQL); err != nil {
return return
} }
if s.selectMaxAccountDataIDStmt, err = db.Prepare(selectMaxAccountDataIDSQL); err != nil { if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil {
return return
} }
return return
@ -83,8 +79,7 @@ func (s *accountDataStatements) insertAccountData(
if err != nil { if err != nil {
return return
} }
insertStmt := common.TxStmt(txn, s.insertAccountDataStmt) _, err = txn.Stmt(s.insertAccountDataStmt).ExecContext(ctx, pos, userID, roomID, dataType)
_, err = insertStmt.ExecContext(ctx, pos, userID, roomID, dataType)
return return
} }
@ -103,14 +98,13 @@ func (s *accountDataStatements) selectAccountDataInRange(
oldPos-- oldPos--
} }
rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos, rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos)
pq.StringArray(filterConvertTypeWildcardToSQL(accountDataFilterPart.Types)),
pq.StringArray(filterConvertTypeWildcardToSQL(accountDataFilterPart.NotTypes)),
accountDataFilterPart.Limit,
)
if err != nil { if err != nil {
return return
} }
defer rows.Close() // nolint: errcheck
var entries int
for rows.Next() { for rows.Next() {
var dataType string var dataType string
@ -120,22 +114,41 @@ func (s *accountDataStatements) selectAccountDataInRange(
return return
} }
// check if we should add this by looking at the filter.
// It would be nice if we could do this in SQL-land, but the mix of variadic
// and positional parameters makes the query annoyingly hard to do, it's easier
// and clearer to do it in Go-land. If there are no filters for [not]types then
// this gets skipped.
for _, includeType := range accountDataFilterPart.Types {
if includeType != dataType { // TODO: wildcard support
continue
}
}
for _, excludeType := range accountDataFilterPart.NotTypes {
if excludeType == dataType { // TODO: wildcard support
continue
}
}
if len(data[roomID]) > 0 { if len(data[roomID]) > 0 {
data[roomID] = append(data[roomID], dataType) data[roomID] = append(data[roomID], dataType)
} else { } else {
data[roomID] = []string{dataType} data[roomID] = []string{dataType}
} }
entries++
if entries >= accountDataFilterPart.Limit {
break
}
} }
return return data, nil
} }
func (s *accountDataStatements) selectMaxAccountDataID( func (s *accountDataStatements) selectMaxAccountDataID(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
) (id int64, err error) { ) (id int64, err error) {
var nullableID sql.NullInt64 var nullableID sql.NullInt64
stmt := common.TxStmt(txn, s.selectMaxAccountDataIDStmt) err = txn.Stmt(s.selectMaxAccountDataIDStmt).QueryRowContext(ctx).Scan(&nullableID)
err = stmt.QueryRowContext(ctx).Scan(&nullableID)
if nullableID.Valid { if nullableID.Valid {
id = nullableID.Int64 id = nullableID.Int64
} }

View file

@ -19,6 +19,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"strings"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
@ -88,7 +89,6 @@ type currentRoomStateStatements struct {
selectRoomIDsWithMembershipStmt *sql.Stmt selectRoomIDsWithMembershipStmt *sql.Stmt
selectCurrentStateStmt *sql.Stmt selectCurrentStateStmt *sql.Stmt
selectJoinedUsersStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt
selectEventsWithEventIDsStmt *sql.Stmt
selectStateEventStmt *sql.Stmt selectStateEventStmt *sql.Stmt
} }
@ -113,9 +113,6 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB, streamID *streamIDState
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil { if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
return return
} }
if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
return
}
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
return return
} }
@ -233,8 +230,12 @@ func (s *currentRoomStateStatements) upsertRoomState(
func (s *currentRoomStateStatements) selectEventsWithEventIDs( func (s *currentRoomStateStatements) selectEventsWithEventIDs(
ctx context.Context, txn *sql.Tx, eventIDs []string, ctx context.Context, txn *sql.Tx, eventIDs []string,
) ([]types.StreamEvent, error) { ) ([]types.StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectEventsWithEventIDsStmt) iEventIDs := make([]interface{}, len(eventIDs))
rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs)) for k, v := range eventIDs {
iEventIDs[k] = v
}
query := strings.Replace(selectEventsWithEventIDsSQL, "($1)", common.QueryVariadic(len(iEventIDs)), 1)
rows, err := txn.QueryContext(ctx, query, iEventIDs...)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -26,7 +26,7 @@ import (
const inviteEventsSchema = ` const inviteEventsSchema = `
CREATE TABLE IF NOT EXISTS syncapi_invite_events ( CREATE TABLE IF NOT EXISTS syncapi_invite_events (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY,
event_id TEXT NOT NULL, event_id TEXT NOT NULL,
room_id TEXT NOT NULL, room_id TEXT NOT NULL,
target_user_id TEXT NOT NULL, target_user_id TEXT NOT NULL,
@ -39,11 +39,8 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx ON syncapi_invite_events
const insertInviteEventSQL = "" + const insertInviteEventSQL = "" +
"INSERT INTO syncapi_invite_events" + "INSERT INTO syncapi_invite_events" +
" (room_id, event_id, target_user_id, event_json)" + " (id, room_id, event_id, target_user_id, event_json)" +
" VALUES ($1, $2, $3, $4)" " VALUES ($1, $2, $3, $4, $5)"
const selectLastInsertedInviteEventSQL = "" +
"SELECT id FROM syncapi_invite_events WHERE rowid = last_insert_rowid()"
const deleteInviteEventSQL = "" + const deleteInviteEventSQL = "" +
"DELETE FROM syncapi_invite_events WHERE event_id = $1" "DELETE FROM syncapi_invite_events WHERE event_id = $1"
@ -59,7 +56,6 @@ const selectMaxInviteIDSQL = "" +
type inviteEventsStatements struct { type inviteEventsStatements struct {
streamIDStatements *streamIDStatements streamIDStatements *streamIDStatements
insertInviteEventStmt *sql.Stmt insertInviteEventStmt *sql.Stmt
selectLastInsertedInviteEventStmt *sql.Stmt
selectInviteEventsInRangeStmt *sql.Stmt selectInviteEventsInRangeStmt *sql.Stmt
deleteInviteEventStmt *sql.Stmt deleteInviteEventStmt *sql.Stmt
selectMaxInviteIDStmt *sql.Stmt selectMaxInviteIDStmt *sql.Stmt
@ -74,9 +70,6 @@ func (s *inviteEventsStatements) prepare(db *sql.DB, streamID *streamIDStatement
if s.insertInviteEventStmt, err = db.Prepare(insertInviteEventSQL); err != nil { if s.insertInviteEventStmt, err = db.Prepare(insertInviteEventSQL); err != nil {
return return
} }
if s.selectLastInsertedInviteEventStmt, err = db.Prepare(selectLastInsertedInviteEventSQL); err != nil {
return
}
if s.selectInviteEventsInRangeStmt, err = db.Prepare(selectInviteEventsInRangeSQL); err != nil { if s.selectInviteEventsInRangeStmt, err = db.Prepare(selectInviteEventsInRangeSQL); err != nil {
return return
} }
@ -90,19 +83,16 @@ func (s *inviteEventsStatements) prepare(db *sql.DB, streamID *streamIDStatement
} }
func (s *inviteEventsStatements) insertInviteEvent( func (s *inviteEventsStatements) insertInviteEvent(
ctx context.Context, inviteEvent gomatrixserverlib.Event, ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.Event, streamPos types.StreamPosition,
) (streamPos types.StreamPosition, err error) { ) (err error) {
_, err = s.insertInviteEventStmt.ExecContext( _, err = txn.Stmt(s.insertInviteEventStmt).ExecContext(
ctx, ctx,
streamPos,
inviteEvent.RoomID(), inviteEvent.RoomID(),
inviteEvent.EventID(), inviteEvent.EventID(),
*inviteEvent.StateKey(), *inviteEvent.StateKey(),
inviteEvent.JSON(), inviteEvent.JSON(),
) )
if err != nil {
return
}
err = s.selectLastInsertedInviteEventStmt.QueryRowContext(ctx).Scan(&streamPos)
return return
} }

View file

@ -54,9 +54,6 @@ const insertEventSQL = "" +
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " +
"ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = $11" "ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = $11"
const selectLastInsertedEventSQL = "" +
"SELECT id FROM syncapi_output_room_events WHERE rowid = last_insert_rowid()"
const selectEventsSQL = "" + const selectEventsSQL = "" +
"SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = $1" "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = $1"
@ -105,7 +102,6 @@ const selectStateInRangeSQL = "" +
type outputRoomEventsStatements struct { type outputRoomEventsStatements struct {
streamIDStatements *streamIDStatements streamIDStatements *streamIDStatements
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
selectLastInsertedEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt selectEventsStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt selectMaxEventIDStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt selectRecentEventsStmt *sql.Stmt
@ -123,9 +119,6 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB, streamID *streamIDState
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
return return
} }
if s.selectLastInsertedEventStmt, err = db.Prepare(selectLastInsertedEventSQL); err != nil {
return
}
if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil { if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil {
return return
} }
@ -270,7 +263,6 @@ func (s *outputRoomEventsStatements) insertEvent(
} }
insertStmt := common.TxStmt(txn, s.insertEventStmt) insertStmt := common.TxStmt(txn, s.insertEventStmt)
selectStmt := common.TxStmt(txn, s.selectLastInsertedEventStmt)
_, err = insertStmt.ExecContext( _, err = insertStmt.ExecContext(
ctx, ctx,
streamPos, streamPos,
@ -286,10 +278,6 @@ func (s *outputRoomEventsStatements) insertEvent(
txnID, txnID,
excludeFromSync, excludeFromSync,
) )
if err != nil {
return
}
err = selectStmt.QueryRowContext(ctx).Scan(&streamPos)
return return
} }

View file

@ -193,24 +193,20 @@ func (d *SyncServerDatasource) WriteEvent(
ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync,
) )
if err != nil { if err != nil {
fmt.Println("d.events.insertEvent:", err)
return err return err
} }
pduPosition = pos pduPosition = pos
if err = d.topology.insertEventInTopology(ctx, txn, ev); err != nil { if err = d.topology.insertEventInTopology(ctx, txn, ev); err != nil {
fmt.Println("d.topology.insertEventInTopology:", err)
return err return err
} }
if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil { if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil {
fmt.Println("d.handleBackwardExtremities:", err)
return err return err
} }
if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 {
// Nothing to do, the event may have just been a message event. // Nothing to do, the event may have just been a message event.
fmt.Println("nothing to do")
return nil return nil
} }
@ -340,8 +336,12 @@ func (d *SyncServerDatasource) GetEventsInRange(
} }
// SyncPosition returns the latest positions for syncing. // SyncPosition returns the latest positions for syncing.
func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.PaginationToken, error) { func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.PaginationToken, err error) {
return d.syncPositionTx(ctx, nil) err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
tok, err = d.syncPositionTx(ctx, txn)
return err
})
return
} }
// BackwardExtremitiesForRoom returns the event IDs of all of the backward // BackwardExtremitiesForRoom returns the event IDs of all of the backward
@ -380,8 +380,12 @@ func (d *SyncServerDatasource) EventPositionInTopology(
} }
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (pos types.StreamPosition, err error) {
return d.syncStreamPositionTx(ctx, nil) err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
pos, err = d.syncStreamPositionTx(ctx, txn)
return err
})
return
} }
func (d *SyncServerDatasource) syncStreamPositionTx( func (d *SyncServerDatasource) syncStreamPositionTx(
@ -625,18 +629,15 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
if err != nil { if err != nil {
return return
} }
fmt.Println("Joined rooms:", joinedRoomIDs)
stateFilterPart := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request stateFilterPart := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
// Build up a /sync response. Add joined rooms. // Build up a /sync response. Add joined rooms.
for _, roomID := range joinedRoomIDs { for _, roomID := range joinedRoomIDs {
fmt.Println("WE'RE ON", roomID)
var stateEvents []gomatrixserverlib.Event var stateEvents []gomatrixserverlib.Event
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilterPart) stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilterPart)
if err != nil { if err != nil {
fmt.Println("d.roomstate.selectCurrentState:", err)
return return
} }
//fmt.Println("State events:", stateEvents) //fmt.Println("State events:", stateEvents)
@ -648,7 +649,6 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
numRecentEventsPerRoom, true, true, numRecentEventsPerRoom, true, true,
) )
if err != nil { if err != nil {
fmt.Println("d.events.selectRecentEvents:", err)
return return
} }
//fmt.Println("Recent stream events:", recentStreamEvents) //fmt.Println("Recent stream events:", recentStreamEvents)
@ -658,10 +658,9 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
var backwardTopologyPos types.StreamPosition var backwardTopologyPos types.StreamPosition
backwardTopologyPos, err = d.topology.selectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID()) backwardTopologyPos, err = d.topology.selectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID())
if err != nil { if err != nil {
fmt.Println("d.topology.selectPositionInTopology:", err)
return nil, types.PaginationToken{}, []string{}, err return nil, types.PaginationToken{}, []string{}, err
} }
fmt.Println("Backward topology position:", backwardTopologyPos)
if backwardTopologyPos-1 <= 0 { if backwardTopologyPos-1 <= 0 {
backwardTopologyPos = types.StreamPosition(1) backwardTopologyPos = types.StreamPosition(1)
} else { } else {
@ -683,7 +682,6 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
} }
if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition, res); err != nil { if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition, res); err != nil {
fmt.Println("d.addInvitesToResponse:", err)
return return
} }
@ -744,18 +742,10 @@ func (d *SyncServerDatasource) GetAccountDataInRange(
func (d *SyncServerDatasource) UpsertAccountData( func (d *SyncServerDatasource) UpsertAccountData(
ctx context.Context, userID, roomID, dataType string, ctx context.Context, userID, roomID, dataType string,
) (sp types.StreamPosition, err error) { ) (sp types.StreamPosition, err error) {
txn, err := d.db.BeginTx(ctx, nil) err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
if err != nil {
return types.StreamPosition(0), err
}
var succeeded bool
defer func() {
txerr := common.EndTransaction(txn, &succeeded)
if err == nil && txerr != nil {
err = txerr
}
}()
sp, err = d.accountData.insertAccountData(ctx, txn, userID, roomID, dataType) sp, err = d.accountData.insertAccountData(ctx, txn, userID, roomID, dataType)
return err
})
return return
} }
@ -764,8 +754,15 @@ func (d *SyncServerDatasource) UpsertAccountData(
// Returns an error if there was a problem communicating with the database. // Returns an error if there was a problem communicating with the database.
func (d *SyncServerDatasource) AddInviteEvent( func (d *SyncServerDatasource) AddInviteEvent(
ctx context.Context, inviteEvent gomatrixserverlib.Event, ctx context.Context, inviteEvent gomatrixserverlib.Event,
) (types.StreamPosition, error) { ) (streamPos types.StreamPosition, err error) {
return d.invites.insertInviteEvent(ctx, inviteEvent) err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
streamPos, err = d.streamID.nextStreamID(ctx, txn)
if err != nil {
return err
}
return d.invites.insertInviteEvent(ctx, txn, inviteEvent, streamPos)
})
return
} }
// RetireInviteEvent removes an old invite event from the database. // RetireInviteEvent removes an old invite event from the database.

View file

@ -183,7 +183,6 @@ GET /directory/room/:room_alias yields room ID
PUT /directory/room/:room_alias creates alias PUT /directory/room/:room_alias creates alias
Room aliases can contain Unicode Room aliases can contain Unicode
Creators can delete alias Creators can delete alias
Alias creators can delete canonical alias with no ops
Regular users cannot create room aliases within the AS namespace Regular users cannot create room aliases within the AS namespace
Deleting a non-existent alias should return a 404 Deleting a non-existent alias should return a 404
Users can't delete other's aliases Users can't delete other's aliases