5caae6f3a0
* bugfix: fix panic on new invite events from sytest I'm unsure why the previous code didn't work, but it's clearer, quicker and easier to read the `LastInsertID()` way. Previously, the code would panic as the SELECT would fail to find the last inserted row ID. * sqlite: Fix UNIQUE violations and close more cursors - Add missing `defer rows.Close()` - Do not have the state block NID as a PRIMARY KEY else it breaks for blocks with >1 state event in them. Instead, rejig the queries so we can still have monotonically increasing integers without using AUTOINCREMENT (which mandates PRIMARY KEY). * sqlite: Add missing variadic function * Use LastInsertId because empirically it works over the SELECT form (though I don't know why that is) * sqlite: Fix invite table by using the global stream pos rather than one specific to invites If we don't use the global, clients don't get notified about any invites because the position is too low. * linting: shadowing * sqlite: do not use last rowid, we already know the stream pos! * sqlite: Fix account data table in syncapi by commiting insert txns! * sqlite: Fix failing federation invite Was failing with 'database is locked' due to multiple write txns being taken out. * sqlite: Ensure we return exactly the number of events found in the database Previously we would return exactly the number of *requested* events, which meant that several zero-initialised events would bubble through the system, failing at JSON serialisation time. * sqlite: let's just ignore the problem for now.... * linting
157 lines
4.4 KiB
Go
157 lines
4.4 KiB
Go
// Copyright 2017-2018 New Vector Ltd
|
|
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package sqlite3
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
|
|
"github.com/matrix-org/dendrite/syncapi/types"
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
)
|
|
|
|
const accountDataSchema = `
|
|
CREATE TABLE IF NOT EXISTS syncapi_account_data_type (
|
|
id INTEGER PRIMARY KEY,
|
|
user_id TEXT NOT NULL,
|
|
room_id TEXT NOT NULL,
|
|
type TEXT NOT NULL,
|
|
UNIQUE (user_id, room_id, type)
|
|
);
|
|
`
|
|
|
|
const insertAccountDataSQL = "" +
|
|
"INSERT INTO syncapi_account_data_type (id, user_id, room_id, type) VALUES ($1, $2, $3, $4)" +
|
|
" ON CONFLICT (user_id, room_id, type) DO UPDATE" +
|
|
" SET id = EXCLUDED.id"
|
|
|
|
const selectAccountDataInRangeSQL = "" +
|
|
"SELECT room_id, type FROM syncapi_account_data_type" +
|
|
" WHERE user_id = $1 AND id > $2 AND id <= $3" +
|
|
" ORDER BY id ASC"
|
|
|
|
const selectMaxAccountDataIDSQL = "" +
|
|
"SELECT MAX(id) FROM syncapi_account_data_type"
|
|
|
|
type accountDataStatements struct {
|
|
streamIDStatements *streamIDStatements
|
|
insertAccountDataStmt *sql.Stmt
|
|
selectMaxAccountDataIDStmt *sql.Stmt
|
|
selectAccountDataInRangeStmt *sql.Stmt
|
|
}
|
|
|
|
func (s *accountDataStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) {
|
|
s.streamIDStatements = streamID
|
|
_, err = db.Exec(accountDataSchema)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil {
|
|
return
|
|
}
|
|
if s.selectMaxAccountDataIDStmt, err = db.Prepare(selectMaxAccountDataIDSQL); err != nil {
|
|
return
|
|
}
|
|
if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *accountDataStatements) insertAccountData(
|
|
ctx context.Context, txn *sql.Tx,
|
|
userID, roomID, dataType string,
|
|
) (pos types.StreamPosition, err error) {
|
|
pos, err = s.streamIDStatements.nextStreamID(ctx, txn)
|
|
if err != nil {
|
|
return
|
|
}
|
|
_, err = txn.Stmt(s.insertAccountDataStmt).ExecContext(ctx, pos, userID, roomID, dataType)
|
|
return
|
|
}
|
|
|
|
func (s *accountDataStatements) selectAccountDataInRange(
|
|
ctx context.Context,
|
|
userID string,
|
|
oldPos, newPos types.StreamPosition,
|
|
accountDataFilterPart *gomatrixserverlib.EventFilter,
|
|
) (data map[string][]string, err error) {
|
|
data = make(map[string][]string)
|
|
|
|
// If both positions are the same, it means that the data was saved after the
|
|
// latest room event. In that case, we need to decrement the old position as
|
|
// it would prevent the SQL request from returning anything.
|
|
if oldPos == newPos {
|
|
oldPos--
|
|
}
|
|
|
|
rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer rows.Close() // nolint: errcheck
|
|
|
|
var entries int
|
|
|
|
for rows.Next() {
|
|
var dataType string
|
|
var roomID string
|
|
|
|
if err = rows.Scan(&roomID, &dataType); err != nil {
|
|
return
|
|
}
|
|
|
|
// check if we should add this by looking at the filter.
|
|
// It would be nice if we could do this in SQL-land, but the mix of variadic
|
|
// and positional parameters makes the query annoyingly hard to do, it's easier
|
|
// and clearer to do it in Go-land. If there are no filters for [not]types then
|
|
// this gets skipped.
|
|
for _, includeType := range accountDataFilterPart.Types {
|
|
if includeType != dataType { // TODO: wildcard support
|
|
continue
|
|
}
|
|
}
|
|
for _, excludeType := range accountDataFilterPart.NotTypes {
|
|
if excludeType == dataType { // TODO: wildcard support
|
|
continue
|
|
}
|
|
}
|
|
|
|
if len(data[roomID]) > 0 {
|
|
data[roomID] = append(data[roomID], dataType)
|
|
} else {
|
|
data[roomID] = []string{dataType}
|
|
}
|
|
entries++
|
|
if entries >= accountDataFilterPart.Limit {
|
|
break
|
|
}
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
func (s *accountDataStatements) selectMaxAccountDataID(
|
|
ctx context.Context, txn *sql.Tx,
|
|
) (id int64, err error) {
|
|
var nullableID sql.NullInt64
|
|
err = txn.Stmt(s.selectMaxAccountDataIDStmt).QueryRowContext(ctx).Scan(&nullableID)
|
|
if nullableID.Valid {
|
|
id = nullableID.Int64
|
|
}
|
|
return
|
|
}
|