Merge branch 'master' of github.com:matrix-org/dendrite into erikj/errcheck_lint

This commit is contained in:
Erik Johnston 2017-09-20 10:37:39 +01:00
commit fd14726686
3 changed files with 61 additions and 28 deletions

View file

@ -18,14 +18,20 @@ import (
"context"
"database/sql"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/syncapi/types"
)
const accountDataSchema = `
-- Stores the users account data
-- This sequence is shared between all the tables generated from kafka logs.
CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id;
-- Stores the types of account data that a user set has globally and in each room
-- and the stream ID when that type was last updated.
CREATE TABLE IF NOT EXISTS syncapi_account_data_type (
-- The highest numeric ID from the output_room_events at the time of saving the data
id BIGINT,
-- An incrementing ID which denotes the position in the log that this event resides at.
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
-- ID of the user the data belongs to
user_id TEXT NOT NULL,
-- ID of the room the data is related to (empty string if not related to a specific room)
@ -33,8 +39,6 @@ CREATE TABLE IF NOT EXISTS syncapi_account_data_type (
-- Type of the data
type TEXT NOT NULL,
PRIMARY KEY(user_id, room_id, type),
-- We don't want two entries of the same type for the same user
CONSTRAINT syncapi_account_data_unique UNIQUE (user_id, room_id, type)
);
@ -43,18 +47,23 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account
`
const insertAccountDataSQL = "" +
"INSERT INTO syncapi_account_data_type (id, user_id, room_id, type) VALUES ($1, $2, $3, $4)" +
"INSERT INTO syncapi_account_data_type (user_id, room_id, type) VALUES ($1, $2, $3)" +
" ON CONFLICT ON CONSTRAINT syncapi_account_data_unique" +
" DO UPDATE SET id = EXCLUDED.id"
" DO UPDATE SET id = EXCLUDED.id" +
" RETURNING id"
const selectAccountDataInRangeSQL = "" +
"SELECT room_id, type FROM syncapi_account_data_type" +
" WHERE user_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id ASC"
const selectMaxAccountDataIDSQL = "" +
"SELECT MAX(id) FROM syncapi_account_data_type"
type accountDataStatements struct {
insertAccountDataStmt *sql.Stmt
selectAccountDataInRangeStmt *sql.Stmt
selectMaxAccountDataIDStmt *sql.Stmt
}
func (s *accountDataStatements) prepare(db *sql.DB) (err error) {
@ -68,15 +77,17 @@ func (s *accountDataStatements) prepare(db *sql.DB) (err error) {
if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil {
return
}
if s.selectMaxAccountDataIDStmt, err = db.Prepare(selectMaxAccountDataIDSQL); err != nil {
return
}
return
}
func (s *accountDataStatements) insertAccountData(
ctx context.Context,
pos types.StreamPosition,
userID, roomID, dataType string,
) (err error) {
_, err = s.insertAccountDataStmt.ExecContext(ctx, pos, userID, roomID, dataType)
) (pos int64, err error) {
s.insertAccountDataStmt.QueryRowContext(ctx, userID, roomID, dataType).Scan(&pos)
return
}
@ -116,3 +127,15 @@ func (s *accountDataStatements) selectAccountDataInRange(
return
}
func (s *accountDataStatements) selectMaxAccountDataID(
ctx context.Context, txn *sql.Tx,
) (id int64, err error) {
var nullableID sql.NullInt64
stmt := common.TxStmt(txn, s.selectMaxAccountDataIDStmt)
err = stmt.QueryRowContext(ctx).Scan(&nullableID)
if nullableID.Valid {
id = nullableID.Int64
}
return
}

View file

@ -26,12 +26,15 @@ import (
)
const outputRoomEventsSchema = `
-- This sequence is shared between all the tables generated from kafka logs.
CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id;
-- Stores output room events received from the roomserver.
CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
-- An incrementing ID which denotes the position in the log that this event resides at.
-- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments.
-- This isn't a problem for us since we just want to order by this field.
id BIGSERIAL PRIMARY KEY,
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
-- The event ID for the event
event_id TEXT NOT NULL,
-- The 'room_id' key for the event.
@ -60,7 +63,7 @@ const selectRecentEventsSQL = "" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id DESC LIMIT $4"
const selectMaxIDSQL = "" +
const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events"
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
@ -73,7 +76,7 @@ const selectStateInRangeSQL = "" +
type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
selectMaxIDStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
}
@ -89,7 +92,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil {
return
}
if s.selectMaxIDStmt, err = db.Prepare(selectMaxIDSQL); err != nil {
if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil {
return
}
if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
@ -170,11 +173,11 @@ func (s *outputRoomEventsStatements) selectStateInRange(
// MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied,
// then this function should only ever be used at startup, as it will race with inserting events if it is
// done afterwards. If there are no inserted events, 0 is returned.
func (s *outputRoomEventsStatements) selectMaxID(
func (s *outputRoomEventsStatements) selectMaxEventID(
ctx context.Context, txn *sql.Tx,
) (id int64, err error) {
var nullableID sql.NullInt64
stmt := common.TxStmt(txn, s.selectMaxIDStmt)
stmt := common.TxStmt(txn, s.selectMaxEventIDStmt)
err = stmt.QueryRowContext(ctx).Scan(&nullableID)
if nullableID.Valid {
id = nullableID.Int64

View file

@ -174,11 +174,24 @@ func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, o
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
func (d *SyncServerDatabase) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) {
id, err := d.events.selectMaxID(ctx, nil)
return d.syncStreamPositionTx(ctx, nil)
}
func (d *SyncServerDatabase) syncStreamPositionTx(
ctx context.Context, txn *sql.Tx,
) (types.StreamPosition, error) {
maxID, err := d.events.selectMaxEventID(ctx, txn)
if err != nil {
return types.StreamPosition(0), err
return 0, err
}
return types.StreamPosition(id), nil
maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn)
if err != nil {
return 0, err
}
if maxAccountDataID > maxID {
maxID = maxAccountDataID
}
return types.StreamPosition(maxID), nil
}
// IncrementalSync returns all the data needed in order to create an incremental sync response.
@ -271,11 +284,10 @@ func (d *SyncServerDatabase) CompleteSync(
defer common.EndTransaction(txn, &succeeded)
// Get the current stream position which we will base the sync response on.
id, err := d.events.selectMaxID(ctx, txn)
pos, err := d.syncStreamPositionTx(ctx, txn)
if err != nil {
return nil, err
}
pos := types.StreamPosition(id)
// Extract room state and recent events for all rooms the user is joined to.
roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join")
@ -348,13 +360,8 @@ func (d *SyncServerDatabase) GetAccountDataInRange(
func (d *SyncServerDatabase) UpsertAccountData(
ctx context.Context, userID, roomID, dataType string,
) (types.StreamPosition, error) {
pos, err := d.SyncStreamPosition(ctx)
if err != nil {
return pos, err
}
err = d.accountData.insertAccountData(ctx, pos, userID, roomID, dataType)
return pos, err
pos, err := d.accountData.insertAccountData(ctx, userID, roomID, dataType)
return types.StreamPosition(pos), err
}
func (d *SyncServerDatabase) addInvitesToResponse(