mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-15 19:03:09 -06:00
Merge abee5d1305 into b72ed3e38c
This commit is contained in:
commit
da417c0dc9
|
|
@ -17,8 +17,9 @@ package accounts
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrix"
|
||||||
)
|
)
|
||||||
|
|
||||||
const filterSchema = `
|
const filterSchema = `
|
||||||
|
|
@ -71,20 +72,29 @@ func (s *filterStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
|
||||||
func (s *filterStatements) selectFilter(
|
func (s *filterStatements) selectFilter(
|
||||||
ctx context.Context, localpart string, filterID string,
|
ctx context.Context, localpart string, filterID string,
|
||||||
) (filter []byte, err error) {
|
) (*gomatrix.Filter, error) {
|
||||||
err = s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filter)
|
var filterData []byte
|
||||||
return
|
err := s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filterData)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var filter gomatrix.Filter
|
||||||
|
if err = json.Unmarshal(filterData, &filter); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &filter, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *filterStatements) insertFilter(
|
func (s *filterStatements) insertFilter(
|
||||||
ctx context.Context, filter []byte, localpart string,
|
ctx context.Context, filter *gomatrix.Filter, localpart string,
|
||||||
) (filterID string, err error) {
|
) (filterID string, err error) {
|
||||||
var existingFilterID string
|
var existingFilterID string
|
||||||
|
|
||||||
// This can result in a race condition when two clients try to insert the
|
// This can result in a race condition when two clients try to insert the
|
||||||
// same filter and localpart at the same time, however this is not a
|
// same filter and localpart at the same time, however this is not a
|
||||||
// problem as both calls will result in the same filterID
|
// problem as both calls will result in the same filterID
|
||||||
filterJSON, err := gomatrixserverlib.CanonicalJSON(filter)
|
filterJSON, err := json.Marshal(filter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/gomatrix"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"golang.org/x/crypto/bcrypt"
|
"golang.org/x/crypto/bcrypt"
|
||||||
// Import the postgres database driver.
|
// Import the postgres database driver.
|
||||||
|
|
@ -335,7 +336,7 @@ func (d *Database) GetThreePIDsForLocalpart(
|
||||||
// no such filter exists or if there was an error talking to the database.
|
// no such filter exists or if there was an error talking to the database.
|
||||||
func (d *Database) GetFilter(
|
func (d *Database) GetFilter(
|
||||||
ctx context.Context, localpart string, filterID string,
|
ctx context.Context, localpart string, filterID string,
|
||||||
) ([]byte, error) {
|
) (*gomatrix.Filter, error) {
|
||||||
return d.filter.selectFilter(ctx, localpart, filterID)
|
return d.filter.selectFilter(ctx, localpart, filterID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -343,7 +344,7 @@ func (d *Database) GetFilter(
|
||||||
// Returns the filterID as a string. Otherwise returns an error if something
|
// Returns the filterID as a string. Otherwise returns an error if something
|
||||||
// goes wrong.
|
// goes wrong.
|
||||||
func (d *Database) PutFilter(
|
func (d *Database) PutFilter(
|
||||||
ctx context.Context, localpart string, filter []byte,
|
ctx context.Context, localpart string, filter *gomatrix.Filter,
|
||||||
) (string, error) {
|
) (string, error) {
|
||||||
return d.filter.insertFilter(ctx, filter, localpart)
|
return d.filter.insertFilter(ctx, filter, localpart)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,8 +17,6 @@ package routing
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"encoding/json"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
|
|
@ -49,7 +47,7 @@ func GetFilter(
|
||||||
return httputil.LogThenError(req, err)
|
return httputil.LogThenError(req, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := accountDB.GetFilter(req.Context(), localpart, filterID)
|
filter, err := accountDB.GetFilter(req.Context(), localpart, filterID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
//TODO better error handling. This error message is *probably* right,
|
//TODO better error handling. This error message is *probably* right,
|
||||||
// but if there are obscure db errors, this will also be returned,
|
// but if there are obscure db errors, this will also be returned,
|
||||||
|
|
@ -59,11 +57,6 @@ func GetFilter(
|
||||||
JSON: jsonerror.NotFound("No such filter"),
|
JSON: jsonerror.NotFound("No such filter"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
filter := gomatrix.Filter{}
|
|
||||||
err = json.Unmarshal(res, &filter)
|
|
||||||
if err != nil {
|
|
||||||
httputil.LogThenError(req, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: 200,
|
Code: 200,
|
||||||
|
|
@ -103,15 +96,14 @@ func PutFilter(
|
||||||
return *reqErr
|
return *reqErr
|
||||||
}
|
}
|
||||||
|
|
||||||
filterArray, err := json.Marshal(filter)
|
if err = filter.Validate(); err != nil {
|
||||||
if err != nil {
|
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: 400,
|
Code: 400,
|
||||||
JSON: jsonerror.BadJSON("Filter is malformed"),
|
JSON: jsonerror.BadJSON("Invalid filter: " + err.Error()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
filterID, err := accountDB.PutFilter(req.Context(), localpart, filterArray)
|
filterID, err := accountDB.PutFilter(req.Context(), localpart, &filter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return httputil.LogThenError(req, err)
|
return httputil.LogThenError(req, err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ package common
|
||||||
type AccountData struct {
|
type AccountData struct {
|
||||||
RoomID string `json:"room_id"`
|
RoomID string `json:"room_id"`
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
|
Sender string `json:"sender"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProfileResponse is a struct containing all known user profile data
|
// ProfileResponse is a struct containing all known user profile data
|
||||||
|
|
|
||||||
|
|
@ -79,7 +79,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
|
||||||
}).Info("received data from client API server")
|
}).Info("received data from client API server")
|
||||||
|
|
||||||
syncStreamPos, err := s.db.UpsertAccountData(
|
syncStreamPos, err := s.db.UpsertAccountData(
|
||||||
context.TODO(), string(msg.Key), output.RoomID, output.Type,
|
context.TODO(), string(msg.Key), output.RoomID, output.Type, output.Sender,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatabase, r
|
||||||
// TODO(#287): Auth request and handle the case where the user has left (where
|
// TODO(#287): Auth request and handle the case where the user has left (where
|
||||||
// we should return the state at the poin they left)
|
// we should return the state at the poin they left)
|
||||||
|
|
||||||
stateEvents, err := db.GetStateEventsForRoom(req.Context(), roomID)
|
stateEvents, err := db.GetStateEventsForRoom(req.Context(), roomID, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return httputil.LogThenError(req, err)
|
return httputil.LogThenError(req, err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,9 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
|
||||||
|
"github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/gomatrix"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
)
|
)
|
||||||
|
|
@ -38,16 +40,17 @@ CREATE TABLE IF NOT EXISTS syncapi_account_data_type (
|
||||||
room_id TEXT NOT NULL,
|
room_id TEXT NOT NULL,
|
||||||
-- Type of the data
|
-- Type of the data
|
||||||
type TEXT NOT NULL,
|
type TEXT NOT NULL,
|
||||||
|
sender TEXT NOT NULL,
|
||||||
|
|
||||||
-- We don't want two entries of the same type for the same user
|
-- We don't want two entries of the same type for the same user
|
||||||
CONSTRAINT syncapi_account_data_unique UNIQUE (user_id, room_id, type)
|
CONSTRAINT syncapi_account_data_unique UNIQUE (user_id, room_id, type)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account_data_type(id);
|
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account_data_type(id, type, sender);
|
||||||
`
|
`
|
||||||
|
|
||||||
const insertAccountDataSQL = "" +
|
const insertAccountDataSQL = "" +
|
||||||
"INSERT INTO syncapi_account_data_type (user_id, room_id, type) VALUES ($1, $2, $3)" +
|
"INSERT INTO syncapi_account_data_type (user_id, room_id, type, sender) VALUES ($1, $2, $3, $4)" +
|
||||||
" ON CONFLICT ON CONSTRAINT syncapi_account_data_unique" +
|
" ON CONFLICT ON CONSTRAINT syncapi_account_data_unique" +
|
||||||
" DO UPDATE SET id = EXCLUDED.id" +
|
" DO UPDATE SET id = EXCLUDED.id" +
|
||||||
" RETURNING id"
|
" RETURNING id"
|
||||||
|
|
@ -55,7 +58,11 @@ const insertAccountDataSQL = "" +
|
||||||
const selectAccountDataInRangeSQL = "" +
|
const selectAccountDataInRangeSQL = "" +
|
||||||
"SELECT room_id, type FROM syncapi_account_data_type" +
|
"SELECT room_id, type FROM syncapi_account_data_type" +
|
||||||
" WHERE user_id = $1 AND id > $2 AND id <= $3" +
|
" WHERE user_id = $1 AND id > $2 AND id <= $3" +
|
||||||
" ORDER BY id ASC"
|
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||||
|
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||||
|
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
||||||
|
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
||||||
|
" ORDER BY id ASC LIMIT $8"
|
||||||
|
|
||||||
const selectMaxAccountDataIDSQL = "" +
|
const selectMaxAccountDataIDSQL = "" +
|
||||||
"SELECT MAX(id) FROM syncapi_account_data_type"
|
"SELECT MAX(id) FROM syncapi_account_data_type"
|
||||||
|
|
@ -85,16 +92,16 @@ func (s *accountDataStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
|
||||||
func (s *accountDataStatements) insertAccountData(
|
func (s *accountDataStatements) insertAccountData(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
userID, roomID, dataType string,
|
userID, roomID, dataType, sender string,
|
||||||
) (pos int64, err error) {
|
) (pos int64, err error) {
|
||||||
err = s.insertAccountDataStmt.QueryRowContext(ctx, userID, roomID, dataType).Scan(&pos)
|
err = s.insertAccountDataStmt.QueryRowContext(ctx, userID, roomID, dataType, sender).Scan(&pos)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *accountDataStatements) selectAccountDataInRange(
|
func (s *accountDataStatements) selectAccountDataInRange(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
userID string,
|
userID string,
|
||||||
oldPos, newPos types.StreamPosition,
|
oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrix.FilterPart,
|
||||||
) (data map[string][]string, err error) {
|
) (data map[string][]string, err error) {
|
||||||
data = make(map[string][]string)
|
data = make(map[string][]string)
|
||||||
|
|
||||||
|
|
@ -105,7 +112,13 @@ func (s *accountDataStatements) selectAccountDataInRange(
|
||||||
oldPos--
|
oldPos--
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos)
|
rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos,
|
||||||
|
pq.StringArray(filterConvertWildcardToSQL(accountDataFilterPart.Types)),
|
||||||
|
pq.StringArray(filterConvertWildcardToSQL(accountDataFilterPart.NotTypes)),
|
||||||
|
pq.StringArray(accountDataFilterPart.Senders),
|
||||||
|
pq.StringArray(accountDataFilterPart.NotSenders),
|
||||||
|
accountDataFilterPart.Limit,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,9 +17,11 @@ package storage
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/gomatrix"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -32,6 +34,10 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
|
||||||
event_id TEXT NOT NULL,
|
event_id TEXT NOT NULL,
|
||||||
-- The state event type e.g 'm.room.member'
|
-- The state event type e.g 'm.room.member'
|
||||||
type TEXT NOT NULL,
|
type TEXT NOT NULL,
|
||||||
|
-- The 'sender' property for the event.
|
||||||
|
sender TEXT NOT NULL,
|
||||||
|
-- true if the event content contains a url key
|
||||||
|
contains_url BOOL NOT NULL,
|
||||||
-- The state_key value for this state event e.g ''
|
-- The state_key value for this state event e.g ''
|
||||||
state_key TEXT NOT NULL,
|
state_key TEXT NOT NULL,
|
||||||
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
|
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
|
||||||
|
|
@ -46,16 +52,17 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
|
||||||
CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key)
|
CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key)
|
||||||
);
|
);
|
||||||
-- for event deletion
|
-- for event deletion
|
||||||
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id);
|
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id, room_id, type, sender, contains_url);
|
||||||
-- for querying membership states of users
|
-- for querying membership states of users
|
||||||
CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave';
|
CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave';
|
||||||
|
|
||||||
`
|
`
|
||||||
|
|
||||||
const upsertRoomStateSQL = "" +
|
const upsertRoomStateSQL = "" +
|
||||||
"INSERT INTO syncapi_current_room_state (room_id, event_id, type, state_key, event_json, membership, added_at)" +
|
"INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, event_json, membership, added_at)" +
|
||||||
" VALUES ($1, $2, $3, $4, $5, $6, $7)" +
|
" VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" +
|
||||||
" ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" +
|
" ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" +
|
||||||
" DO UPDATE SET event_id = $2, event_json = $5, membership = $6, added_at = $7"
|
" DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, event_json = $7, membership = $8, added_at = $9"
|
||||||
|
|
||||||
const deleteRoomStateByEventIDSQL = "" +
|
const deleteRoomStateByEventIDSQL = "" +
|
||||||
"DELETE FROM syncapi_current_room_state WHERE event_id = $1"
|
"DELETE FROM syncapi_current_room_state WHERE event_id = $1"
|
||||||
|
|
@ -64,7 +71,13 @@ const selectRoomIDsWithMembershipSQL = "" +
|
||||||
"SELECT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2"
|
"SELECT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2"
|
||||||
|
|
||||||
const selectCurrentStateSQL = "" +
|
const selectCurrentStateSQL = "" +
|
||||||
"SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1"
|
"SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1" +
|
||||||
|
" AND ( $2::text[] IS NULL OR sender = ANY($2) )" +
|
||||||
|
" AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" +
|
||||||
|
" AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" +
|
||||||
|
" AND ( $5::text[] IS NULL OR NOT(type LIKE ANY($5)) )" +
|
||||||
|
" AND ( $6::bool IS NULL OR contains_url = $6 )" +
|
||||||
|
" LIMIT $7"
|
||||||
|
|
||||||
const selectJoinedUsersSQL = "" +
|
const selectJoinedUsersSQL = "" +
|
||||||
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
|
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
|
||||||
|
|
@ -165,10 +178,25 @@ func (s *currentRoomStateStatements) selectRoomIDsWithMembership(
|
||||||
|
|
||||||
// CurrentState returns all the current state events for the given room.
|
// CurrentState returns all the current state events for the given room.
|
||||||
func (s *currentRoomStateStatements) selectCurrentState(
|
func (s *currentRoomStateStatements) selectCurrentState(
|
||||||
ctx context.Context, txn *sql.Tx, roomID string,
|
ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrix.FilterPart,
|
||||||
) ([]gomatrixserverlib.Event, error) {
|
) ([]gomatrixserverlib.Event, error) {
|
||||||
|
|
||||||
|
var filter gomatrix.FilterPart
|
||||||
|
if stateFilter == nil {
|
||||||
|
filter = gomatrix.DefaultFilterPart()
|
||||||
|
} else {
|
||||||
|
filter = *stateFilter
|
||||||
|
}
|
||||||
|
|
||||||
stmt := common.TxStmt(txn, s.selectCurrentStateStmt)
|
stmt := common.TxStmt(txn, s.selectCurrentStateStmt)
|
||||||
rows, err := stmt.QueryContext(ctx, roomID)
|
rows, err := stmt.QueryContext(ctx, roomID,
|
||||||
|
pq.StringArray(filter.Senders),
|
||||||
|
pq.StringArray(filter.NotSenders),
|
||||||
|
pq.StringArray(filterConvertWildcardToSQL(filter.Types)),
|
||||||
|
pq.StringArray(filterConvertWildcardToSQL(filter.NotTypes)),
|
||||||
|
filter.ContainsURL,
|
||||||
|
stateFilter.Limit,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -189,12 +217,20 @@ func (s *currentRoomStateStatements) upsertRoomState(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
event gomatrixserverlib.Event, membership *string, addedAt int64,
|
event gomatrixserverlib.Event, membership *string, addedAt int64,
|
||||||
) error {
|
) error {
|
||||||
|
var containsURL bool
|
||||||
|
var content map[string]interface{}
|
||||||
|
if json.Unmarshal(event.Content(), &content) != nil {
|
||||||
|
_, containsURL = content["url"]
|
||||||
|
}
|
||||||
|
|
||||||
stmt := common.TxStmt(txn, s.upsertRoomStateStmt)
|
stmt := common.TxStmt(txn, s.upsertRoomStateStmt)
|
||||||
_, err := stmt.ExecContext(
|
_, err := stmt.ExecContext(
|
||||||
ctx,
|
ctx,
|
||||||
event.RoomID(),
|
event.RoomID(),
|
||||||
event.EventID(),
|
event.EventID(),
|
||||||
event.Type(),
|
event.Type(),
|
||||||
|
event.Sender(),
|
||||||
|
containsURL,
|
||||||
*event.StateKey(),
|
*event.StateKey(),
|
||||||
event.JSON(),
|
event.JSON(),
|
||||||
membership,
|
membership,
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,58 @@
|
||||||
|
// Copyright 2017 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrix"
|
||||||
|
)
|
||||||
|
|
||||||
|
func isRoomFiltered(roomID string, filter *gomatrix.Filter, filterPart *gomatrix.FilterPart) bool {
|
||||||
|
if filter != nil {
|
||||||
|
if filter.Room.Rooms != nil && !hasValue(roomID, filter.Room.Rooms) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if filter.Room.NotRooms != nil && hasValue(roomID, filter.Room.NotRooms) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if filterPart != nil {
|
||||||
|
if filterPart.Rooms != nil && !hasValue(roomID, filterPart.Rooms) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if filterPart.NotRooms != nil && hasValue(roomID, filterPart.NotRooms) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func hasValue(value string, list []string) bool {
|
||||||
|
for i := range list {
|
||||||
|
if list[i] == value {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterConvertWildcardToSQL(values []string) []string {
|
||||||
|
ret := make([]string, len(values))
|
||||||
|
for i := range values {
|
||||||
|
ret[i] = strings.Replace(values[i], "*", "%", -1)
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
@ -3,8 +3,11 @@ package storage
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/gomatrix"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -13,13 +16,16 @@ CREATE TABLE IF NOT EXISTS syncapi_invite_events (
|
||||||
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
|
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
|
||||||
event_id TEXT NOT NULL,
|
event_id TEXT NOT NULL,
|
||||||
room_id TEXT NOT NULL,
|
room_id TEXT NOT NULL,
|
||||||
|
type TEXT NOT NULL,
|
||||||
|
sender TEXT NOT NULL,
|
||||||
|
contains_url BOOL NOT NULL,
|
||||||
target_user_id TEXT NOT NULL,
|
target_user_id TEXT NOT NULL,
|
||||||
event_json TEXT NOT NULL
|
event_json TEXT NOT NULL
|
||||||
);
|
);
|
||||||
|
|
||||||
-- For looking up the invites for a given user.
|
-- For looking up the invites for a given user.
|
||||||
CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx
|
CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx
|
||||||
ON syncapi_invite_events (target_user_id, id);
|
ON syncapi_invite_events (target_user_id, id, room_id, type, sender, contains_url);
|
||||||
|
|
||||||
-- For deleting old invites
|
-- For deleting old invites
|
||||||
CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx
|
CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx
|
||||||
|
|
@ -28,8 +34,8 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx
|
||||||
|
|
||||||
const insertInviteEventSQL = "" +
|
const insertInviteEventSQL = "" +
|
||||||
"INSERT INTO syncapi_invite_events (" +
|
"INSERT INTO syncapi_invite_events (" +
|
||||||
" room_id, event_id, target_user_id, event_json" +
|
" room_id, event_id, type, sender, contains_url, target_user_id, event_json" +
|
||||||
") VALUES ($1, $2, $3, $4) RETURNING id"
|
") VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id"
|
||||||
|
|
||||||
const deleteInviteEventSQL = "" +
|
const deleteInviteEventSQL = "" +
|
||||||
"DELETE FROM syncapi_invite_events WHERE event_id = $1"
|
"DELETE FROM syncapi_invite_events WHERE event_id = $1"
|
||||||
|
|
@ -37,6 +43,13 @@ const deleteInviteEventSQL = "" +
|
||||||
const selectInviteEventsInRangeSQL = "" +
|
const selectInviteEventsInRangeSQL = "" +
|
||||||
"SELECT room_id, event_json FROM syncapi_invite_events" +
|
"SELECT room_id, event_json FROM syncapi_invite_events" +
|
||||||
" WHERE target_user_id = $1 AND id > $2 AND id <= $3" +
|
" WHERE target_user_id = $1 AND id > $2 AND id <= $3" +
|
||||||
|
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||||
|
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||||
|
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
||||||
|
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
||||||
|
" AND ( $8::text[] IS NULL OR room_id = ANY($8) )" +
|
||||||
|
" AND ( $9::text[] IS NULL OR NOT(room_id = ANY($9)) )" +
|
||||||
|
" AND ( $10::bool IS NULL OR contains_url = $10 )" +
|
||||||
" ORDER BY id DESC"
|
" ORDER BY id DESC"
|
||||||
|
|
||||||
const selectMaxInviteIDSQL = "" +
|
const selectMaxInviteIDSQL = "" +
|
||||||
|
|
@ -72,10 +85,19 @@ func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) {
|
||||||
func (s *inviteEventsStatements) insertInviteEvent(
|
func (s *inviteEventsStatements) insertInviteEvent(
|
||||||
ctx context.Context, inviteEvent gomatrixserverlib.Event,
|
ctx context.Context, inviteEvent gomatrixserverlib.Event,
|
||||||
) (streamPos int64, err error) {
|
) (streamPos int64, err error) {
|
||||||
|
var containsURL bool
|
||||||
|
var content map[string]interface{}
|
||||||
|
if json.Unmarshal(inviteEvent.Content(), &content) != nil {
|
||||||
|
_, containsURL = content["url"]
|
||||||
|
}
|
||||||
|
|
||||||
err = s.insertInviteEventStmt.QueryRowContext(
|
err = s.insertInviteEventStmt.QueryRowContext(
|
||||||
ctx,
|
ctx,
|
||||||
inviteEvent.RoomID(),
|
inviteEvent.RoomID(),
|
||||||
inviteEvent.EventID(),
|
inviteEvent.EventID(),
|
||||||
|
inviteEvent.Type(),
|
||||||
|
inviteEvent.Sender(),
|
||||||
|
containsURL,
|
||||||
*inviteEvent.StateKey(),
|
*inviteEvent.StateKey(),
|
||||||
inviteEvent.JSON(),
|
inviteEvent.JSON(),
|
||||||
).Scan(&streamPos)
|
).Scan(&streamPos)
|
||||||
|
|
@ -92,10 +114,19 @@ func (s *inviteEventsStatements) deleteInviteEvent(
|
||||||
// selectInviteEventsInRange returns a map of room ID to invite event for the
|
// selectInviteEventsInRange returns a map of room ID to invite event for the
|
||||||
// active invites for the target user ID in the supplied range.
|
// active invites for the target user ID in the supplied range.
|
||||||
func (s *inviteEventsStatements) selectInviteEventsInRange(
|
func (s *inviteEventsStatements) selectInviteEventsInRange(
|
||||||
ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos int64,
|
ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos int64, filter *gomatrix.Filter,
|
||||||
) (map[string]gomatrixserverlib.Event, error) {
|
) (map[string]gomatrixserverlib.Event, error) {
|
||||||
stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt)
|
stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt)
|
||||||
rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos)
|
|
||||||
|
rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos,
|
||||||
|
pq.StringArray(filterConvertWildcardToSQL(filter.Room.State.Types)),
|
||||||
|
pq.StringArray(filterConvertWildcardToSQL(filter.Room.State.NotTypes)),
|
||||||
|
pq.StringArray(filter.Room.State.Senders),
|
||||||
|
pq.StringArray(filter.Room.State.NotSenders),
|
||||||
|
pq.StringArray(append(filter.Room.Rooms, filter.Room.State.Rooms...)),
|
||||||
|
pq.StringArray(append(filter.Room.NotRooms, filter.Room.State.NotRooms...)),
|
||||||
|
filter.Room.State.ContainsURL,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,8 +17,10 @@ package storage
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/gomatrix"
|
||||||
|
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
|
@ -41,6 +43,12 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
|
||||||
event_id TEXT NOT NULL,
|
event_id TEXT NOT NULL,
|
||||||
-- The 'room_id' key for the event.
|
-- The 'room_id' key for the event.
|
||||||
room_id TEXT NOT NULL,
|
room_id TEXT NOT NULL,
|
||||||
|
-- The 'type' property for the event.
|
||||||
|
type TEXT NOT NULL,
|
||||||
|
-- The 'sender' property for the event.
|
||||||
|
sender TEXT NOT NULL,
|
||||||
|
-- true if the event content contains a url key
|
||||||
|
contains_url BOOL NOT NULL,
|
||||||
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
|
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
|
||||||
event_json TEXT NOT NULL,
|
event_json TEXT NOT NULL,
|
||||||
-- A list of event IDs which represent a delta of added/removed room state. This can be NULL
|
-- A list of event IDs which represent a delta of added/removed room state. This can be NULL
|
||||||
|
|
@ -51,21 +59,32 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
|
||||||
transaction_id TEXT -- The transaction id used to send the event, if any
|
transaction_id TEXT -- The transaction id used to send the event, if any
|
||||||
);
|
);
|
||||||
-- for event selection
|
-- for event selection
|
||||||
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id);
|
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(
|
||||||
|
event_id,
|
||||||
|
room_id,
|
||||||
|
type,
|
||||||
|
sender,
|
||||||
|
contains_url);
|
||||||
`
|
`
|
||||||
|
|
||||||
const insertEventSQL = "" +
|
const insertEventSQL = "" +
|
||||||
"INSERT INTO syncapi_output_room_events (" +
|
"INSERT INTO syncapi_output_room_events (" +
|
||||||
" room_id, event_id, event_json, add_state_ids, remove_state_ids, device_id, transaction_id" +
|
" room_id, event_id, type, sender, contains_url, event_json, add_state_ids, remove_state_ids, device_id, transaction_id" +
|
||||||
") VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id"
|
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id"
|
||||||
|
|
||||||
const selectEventsSQL = "" +
|
const selectEventsSQL = "" +
|
||||||
"SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)"
|
"SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)"
|
||||||
|
|
||||||
const selectRecentEventsSQL = "" +
|
const selectRecentEventsSQL = "" +
|
||||||
"SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" +
|
"SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" +
|
||||||
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
" WHERE room_id=$1 " +
|
||||||
" ORDER BY id ASC LIMIT $4"
|
" AND id > $2 AND id <= $3" +
|
||||||
|
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||||
|
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||||
|
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
||||||
|
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
||||||
|
" AND ( $8::bool IS NULL OR contains_url = $8 )" +
|
||||||
|
" ORDER BY id DESC LIMIT $9"
|
||||||
|
|
||||||
const selectMaxEventIDSQL = "" +
|
const selectMaxEventIDSQL = "" +
|
||||||
"SELECT MAX(id) FROM syncapi_output_room_events"
|
"SELECT MAX(id) FROM syncapi_output_room_events"
|
||||||
|
|
@ -205,11 +224,20 @@ func (s *outputRoomEventsStatements) insertEvent(
|
||||||
txnID = &transactionID.TransactionID
|
txnID = &transactionID.TransactionID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var containsURL bool
|
||||||
|
var content map[string]interface{}
|
||||||
|
if json.Unmarshal(event.Content(), &content) != nil {
|
||||||
|
_, containsURL = content["url"]
|
||||||
|
}
|
||||||
|
|
||||||
stmt := common.TxStmt(txn, s.insertEventStmt)
|
stmt := common.TxStmt(txn, s.insertEventStmt)
|
||||||
err = stmt.QueryRowContext(
|
err = stmt.QueryRowContext(
|
||||||
ctx,
|
ctx,
|
||||||
event.RoomID(),
|
event.RoomID(),
|
||||||
event.EventID(),
|
event.EventID(),
|
||||||
|
event.Type(),
|
||||||
|
event.Sender(),
|
||||||
|
containsURL,
|
||||||
event.JSON(),
|
event.JSON(),
|
||||||
pq.StringArray(addState),
|
pq.StringArray(addState),
|
||||||
pq.StringArray(removeState),
|
pq.StringArray(removeState),
|
||||||
|
|
@ -219,22 +247,36 @@ func (s *outputRoomEventsStatements) insertEvent(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'.
|
|
||||||
func (s *outputRoomEventsStatements) selectRecentEvents(
|
func (s *outputRoomEventsStatements) selectRecentEvents(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
roomID string, fromPos, toPos types.StreamPosition, limit int,
|
roomID string, fromPos, toPos types.StreamPosition, timelineFilter *gomatrix.FilterPart,
|
||||||
) ([]streamEvent, error) {
|
) ([]streamEvent, bool, error) {
|
||||||
|
|
||||||
stmt := common.TxStmt(txn, s.selectRecentEventsStmt)
|
stmt := common.TxStmt(txn, s.selectRecentEventsStmt)
|
||||||
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
|
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos,
|
||||||
|
pq.StringArray(timelineFilter.Senders),
|
||||||
|
pq.StringArray(timelineFilter.NotSenders),
|
||||||
|
pq.StringArray(filterConvertWildcardToSQL(timelineFilter.Types)),
|
||||||
|
pq.StringArray(filterConvertWildcardToSQL(timelineFilter.NotTypes)),
|
||||||
|
timelineFilter.ContainsURL,
|
||||||
|
timelineFilter.Limit+1, // TODO: limit abusive values? This can also be done in gomatrix.Filter.Validate
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
defer rows.Close() // nolint: errcheck
|
defer rows.Close() // nolint: errcheck
|
||||||
events, err := rowsToStreamEvents(rows)
|
events, err := rowsToStreamEvents(rows)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
return events, nil
|
|
||||||
|
limited := false
|
||||||
|
if len(events) > timelineFilter.Limit {
|
||||||
|
limited = true
|
||||||
|
events = events[:len(events)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
return events, limited, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing
|
// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/gomatrix"
|
||||||
// Import the postgres database driver.
|
// Import the postgres database driver.
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
|
@ -177,10 +178,11 @@ func (d *SyncServerDatabase) GetStateEvent(
|
||||||
// Returns an empty slice if no state events could be found for this room.
|
// Returns an empty slice if no state events could be found for this room.
|
||||||
// Returns an error if there was an issue with the retrieval.
|
// Returns an error if there was an issue with the retrieval.
|
||||||
func (d *SyncServerDatabase) GetStateEventsForRoom(
|
func (d *SyncServerDatabase) GetStateEventsForRoom(
|
||||||
ctx context.Context, roomID string,
|
ctx context.Context, roomID string, stateFilter *gomatrix.FilterPart,
|
||||||
) (stateEvents []gomatrixserverlib.Event, err error) {
|
) (stateEvents []gomatrixserverlib.Event, err error) {
|
||||||
|
|
||||||
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID)
|
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilter)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
|
|
@ -224,7 +226,7 @@ func (d *SyncServerDatabase) IncrementalSync(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
device authtypes.Device,
|
device authtypes.Device,
|
||||||
fromPos, toPos types.StreamPosition,
|
fromPos, toPos types.StreamPosition,
|
||||||
numRecentEventsPerRoom int,
|
filter *gomatrix.Filter,
|
||||||
) (*types.Response, error) {
|
) (*types.Response, error) {
|
||||||
txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
|
txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -237,21 +239,23 @@ func (d *SyncServerDatabase) IncrementalSync(
|
||||||
// joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions.
|
// joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions.
|
||||||
// This works out what the 'state' key should be for each room as well as which membership block
|
// This works out what the 'state' key should be for each room as well as which membership block
|
||||||
// to put the room into.
|
// to put the room into.
|
||||||
deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID)
|
deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID, &filter.Room.State)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
res := types.NewResponse(toPos)
|
res := types.NewResponse(toPos)
|
||||||
for _, delta := range deltas {
|
for _, delta := range deltas {
|
||||||
err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res)
|
if !isRoomFiltered(delta.roomID, filter, &filter.Room.Timeline) {
|
||||||
if err != nil {
|
err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, filter, res)
|
||||||
return nil, err
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: This should be done in getStateDeltas
|
// TODO: This should be done in getStateDeltas
|
||||||
if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil {
|
if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, filter, res); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -261,7 +265,7 @@ func (d *SyncServerDatabase) IncrementalSync(
|
||||||
|
|
||||||
// CompleteSync a complete /sync API response for the given user.
|
// CompleteSync a complete /sync API response for the given user.
|
||||||
func (d *SyncServerDatabase) CompleteSync(
|
func (d *SyncServerDatabase) CompleteSync(
|
||||||
ctx context.Context, userID string, numRecentEventsPerRoom int,
|
ctx context.Context, userID string, filter *gomatrix.Filter,
|
||||||
) (*types.Response, error) {
|
) (*types.Response, error) {
|
||||||
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
|
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
|
||||||
// a consistent view of the database throughout. This includes extracting the sync stream position.
|
// a consistent view of the database throughout. This includes extracting the sync stream position.
|
||||||
|
|
@ -275,7 +279,8 @@ func (d *SyncServerDatabase) CompleteSync(
|
||||||
defer common.EndTransaction(txn, &succeeded)
|
defer common.EndTransaction(txn, &succeeded)
|
||||||
|
|
||||||
// Get the current stream position which we will base the sync response on.
|
// Get the current stream position which we will base the sync response on.
|
||||||
pos, err := d.syncStreamPositionTx(ctx, txn)
|
posFrom := types.StreamPosition(0)
|
||||||
|
posTo, err := d.syncStreamPositionTx(ctx, txn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -287,39 +292,58 @@ func (d *SyncServerDatabase) CompleteSync(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build up a /sync response. Add joined rooms.
|
// Build up a /sync response. Add joined rooms.
|
||||||
res := types.NewResponse(pos)
|
res := types.NewResponse(posTo)
|
||||||
for _, roomID := range roomIDs {
|
for _, roomID := range roomIDs {
|
||||||
var stateEvents []gomatrixserverlib.Event
|
|
||||||
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// TODO: When filters are added, we may need to call this multiple times to get enough events.
|
|
||||||
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
|
|
||||||
var recentStreamEvents []streamEvent
|
|
||||||
recentStreamEvents, err = d.events.selectRecentEvents(
|
|
||||||
ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// We don't include a device here as we don't need to send down
|
|
||||||
// transaction IDs for complete syncs
|
|
||||||
recentEvents := streamEventsToEvents(nil, recentStreamEvents)
|
|
||||||
|
|
||||||
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
|
||||||
jr := types.NewJoinResponse()
|
jr := types.NewJoinResponse()
|
||||||
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
|
||||||
jr.Timeline.Limited = true
|
//Join response should contain events only if room isn't filtered
|
||||||
jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
|
if !isRoomFiltered(roomID, filter, nil) {
|
||||||
|
// Timeline events
|
||||||
|
var recentEvents []gomatrixserverlib.Event
|
||||||
|
if !isRoomFiltered(roomID, nil, &filter.Room.Timeline) {
|
||||||
|
var recentStreamEvents []streamEvent
|
||||||
|
var limited bool
|
||||||
|
recentStreamEvents, limited, err = d.events.selectRecentEvents(
|
||||||
|
ctx, txn, roomID, posFrom, posTo, &filter.Room.Timeline)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
recentEvents = streamEventsToEvents(nil, recentStreamEvents)
|
||||||
|
|
||||||
|
jr.Timeline.Limited = limited
|
||||||
|
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(
|
||||||
|
recentEvents,
|
||||||
|
gomatrixserverlib.FormatSync)
|
||||||
|
}
|
||||||
|
|
||||||
|
// State events
|
||||||
|
if !isRoomFiltered(roomID, nil, &filter.Room.State) {
|
||||||
|
var stateEvents []gomatrixserverlib.Event
|
||||||
|
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &filter.Room.State)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if recentEvents != nil {
|
||||||
|
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
||||||
|
}
|
||||||
|
jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
|
||||||
|
}
|
||||||
|
//TODO AccountData events
|
||||||
|
//TODO Ephemeral events
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO Handle jr.Timeline.prev_batch
|
||||||
|
|
||||||
res.Rooms.Join[roomID] = *jr
|
res.Rooms.Join[roomID] = *jr
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = d.addInvitesToResponse(ctx, txn, userID, 0, pos, res); err != nil {
|
if err = d.addInvitesToResponse(ctx, txn, userID, 0, posTo, filter, res); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//TODO handle res.Room[roomID].Leave
|
||||||
|
|
||||||
succeeded = true
|
succeeded = true
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
@ -340,9 +364,9 @@ var txReadOnlySnapshot = sql.TxOptions{
|
||||||
// If no data is retrieved, returns an empty map
|
// If no data is retrieved, returns an empty map
|
||||||
// If there was an issue with the retrieval, returns an error
|
// If there was an issue with the retrieval, returns an error
|
||||||
func (d *SyncServerDatabase) GetAccountDataInRange(
|
func (d *SyncServerDatabase) GetAccountDataInRange(
|
||||||
ctx context.Context, userID string, oldPos, newPos types.StreamPosition,
|
ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrix.FilterPart,
|
||||||
) (map[string][]string, error) {
|
) (map[string][]string, error) {
|
||||||
return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos)
|
return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpsertAccountData keeps track of new or updated account data, by saving the type
|
// UpsertAccountData keeps track of new or updated account data, by saving the type
|
||||||
|
|
@ -352,9 +376,9 @@ func (d *SyncServerDatabase) GetAccountDataInRange(
|
||||||
// creates a new row, else update the existing one
|
// creates a new row, else update the existing one
|
||||||
// Returns an error if there was an issue with the upsert
|
// Returns an error if there was an issue with the upsert
|
||||||
func (d *SyncServerDatabase) UpsertAccountData(
|
func (d *SyncServerDatabase) UpsertAccountData(
|
||||||
ctx context.Context, userID, roomID, dataType string,
|
ctx context.Context, userID, roomID, dataType, sender string,
|
||||||
) (types.StreamPosition, error) {
|
) (types.StreamPosition, error) {
|
||||||
pos, err := d.accountData.insertAccountData(ctx, userID, roomID, dataType)
|
pos, err := d.accountData.insertAccountData(ctx, userID, roomID, dataType, sender)
|
||||||
return types.StreamPosition(pos), err
|
return types.StreamPosition(pos), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -383,10 +407,11 @@ func (d *SyncServerDatabase) addInvitesToResponse(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
userID string,
|
userID string,
|
||||||
fromPos, toPos types.StreamPosition,
|
fromPos, toPos types.StreamPosition,
|
||||||
|
filter *gomatrix.Filter,
|
||||||
res *types.Response,
|
res *types.Response,
|
||||||
) error {
|
) error {
|
||||||
invites, err := d.invites.selectInviteEventsInRange(
|
invites, err := d.invites.selectInviteEventsInRange(
|
||||||
ctx, txn, userID, int64(fromPos), int64(toPos),
|
ctx, txn, userID, int64(fromPos), int64(toPos), filter,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -409,7 +434,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
|
||||||
txn *sql.Tx,
|
txn *sql.Tx,
|
||||||
fromPos, toPos types.StreamPosition,
|
fromPos, toPos types.StreamPosition,
|
||||||
delta stateDelta,
|
delta stateDelta,
|
||||||
numRecentEventsPerRoom int,
|
filter *gomatrix.Filter,
|
||||||
res *types.Response,
|
res *types.Response,
|
||||||
) error {
|
) error {
|
||||||
endPos := toPos
|
endPos := toPos
|
||||||
|
|
@ -422,8 +447,8 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
|
||||||
// This is all "okay" assuming history_visibility == "shared" which it is by default.
|
// This is all "okay" assuming history_visibility == "shared" which it is by default.
|
||||||
endPos = delta.membershipPos
|
endPos = delta.membershipPos
|
||||||
}
|
}
|
||||||
recentStreamEvents, err := d.events.selectRecentEvents(
|
recentStreamEvents, limited, err := d.events.selectRecentEvents(
|
||||||
ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom,
|
ctx, txn, delta.roomID, fromPos, endPos, &filter.Room.Timeline,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -440,7 +465,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
|
||||||
case "join":
|
case "join":
|
||||||
jr := types.NewJoinResponse()
|
jr := types.NewJoinResponse()
|
||||||
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
jr.Timeline.Limited = limited
|
||||||
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
||||||
res.Rooms.Join[delta.roomID] = *jr
|
res.Rooms.Join[delta.roomID] = *jr
|
||||||
case "leave":
|
case "leave":
|
||||||
|
|
@ -450,7 +475,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
|
||||||
// no longer in the room.
|
// no longer in the room.
|
||||||
lr := types.NewLeaveResponse()
|
lr := types.NewLeaveResponse()
|
||||||
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
lr.Timeline.Limited = limited
|
||||||
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
||||||
res.Rooms.Leave[delta.roomID] = *lr
|
res.Rooms.Leave[delta.roomID] = *lr
|
||||||
}
|
}
|
||||||
|
|
@ -546,7 +571,7 @@ func (d *SyncServerDatabase) fetchMissingStateEvents(
|
||||||
|
|
||||||
func (d *SyncServerDatabase) getStateDeltas(
|
func (d *SyncServerDatabase) getStateDeltas(
|
||||||
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
|
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
|
||||||
fromPos, toPos types.StreamPosition, userID string,
|
fromPos, toPos types.StreamPosition, userID string, stateFilter *gomatrix.FilterPart,
|
||||||
) ([]stateDelta, error) {
|
) ([]stateDelta, error) {
|
||||||
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
||||||
// - Get membership list changes for this user in this sync response
|
// - Get membership list changes for this user in this sync response
|
||||||
|
|
@ -579,7 +604,7 @@ func (d *SyncServerDatabase) getStateDeltas(
|
||||||
if membership == "join" {
|
if membership == "join" {
|
||||||
// send full room state down instead of a delta
|
// send full room state down instead of a delta
|
||||||
var allState []gomatrixserverlib.Event
|
var allState []gomatrixserverlib.Event
|
||||||
allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID)
|
allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
|
"github.com/matrix-org/gomatrix"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
@ -286,8 +287,8 @@ func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest {
|
||||||
timeout: 1 * time.Minute,
|
timeout: 1 * time.Minute,
|
||||||
since: &since,
|
since: &since,
|
||||||
wantFullState: false,
|
wantFullState: false,
|
||||||
limit: defaultTimelineLimit,
|
|
||||||
log: util.GetLogger(context.TODO()),
|
log: util.GetLogger(context.TODO()),
|
||||||
ctx: context.TODO(),
|
ctx: context.TODO(),
|
||||||
|
filter: gomatrix.DefaultFilter(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,11 +16,15 @@ package sync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||||
|
"github.com/matrix-org/gomatrix"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
|
@ -28,20 +32,19 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const defaultSyncTimeout = time.Duration(30) * time.Second
|
const defaultSyncTimeout = time.Duration(30) * time.Second
|
||||||
const defaultTimelineLimit = 20
|
|
||||||
|
|
||||||
// syncRequest represents a /sync request, with sensible defaults/sanity checks applied.
|
// syncRequest represents a /sync request, with sensible defaults/sanity checks applied.
|
||||||
type syncRequest struct {
|
type syncRequest struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
device authtypes.Device
|
device authtypes.Device
|
||||||
limit int
|
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
since *types.StreamPosition // nil means that no since token was supplied
|
since *types.StreamPosition // nil means that no since token was supplied
|
||||||
wantFullState bool
|
wantFullState bool
|
||||||
log *log.Entry
|
log *log.Entry
|
||||||
|
filter gomatrix.Filter
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, error) {
|
func newSyncRequest(req *http.Request, device authtypes.Device, accountDB *accounts.Database) (*syncRequest, error) {
|
||||||
timeout := getTimeout(req.URL.Query().Get("timeout"))
|
timeout := getTimeout(req.URL.Query().Get("timeout"))
|
||||||
fullState := req.URL.Query().Get("full_state")
|
fullState := req.URL.Query().Get("full_state")
|
||||||
wantFullState := fullState != "" && fullState != "false"
|
wantFullState := fullState != "" && fullState != "false"
|
||||||
|
|
@ -49,15 +52,48 @@ func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, e
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// TODO: Additional query params: set_presence, filter
|
|
||||||
|
filterStr := req.URL.Query().Get("filter")
|
||||||
|
var filter gomatrix.Filter
|
||||||
|
if filterStr != "" {
|
||||||
|
if filterStr[0] == '{' {
|
||||||
|
// Inline filter
|
||||||
|
filter = gomatrix.DefaultFilter()
|
||||||
|
err = json.Unmarshal([]byte(filterStr), &filter)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
err = filter.Validate()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Filter ID
|
||||||
|
filterID, err := strconv.Atoi(filterStr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
recvFilter, err := accountDB.GetFilter(req.Context(), localpart, strconv.Itoa(filterID)) //TODO GetFilter should receive filterID as an int
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
filter = *recvFilter
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Additional query params: set_presence
|
||||||
return &syncRequest{
|
return &syncRequest{
|
||||||
ctx: req.Context(),
|
ctx: req.Context(),
|
||||||
device: device,
|
device: device,
|
||||||
timeout: timeout,
|
timeout: timeout,
|
||||||
since: since,
|
since: since,
|
||||||
wantFullState: wantFullState,
|
wantFullState: wantFullState,
|
||||||
limit: defaultTimelineLimit, // TODO: read from filter
|
|
||||||
log: util.GetLogger(req.Context()),
|
log: util.GetLogger(req.Context()),
|
||||||
|
filter: filter,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -48,7 +48,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
||||||
// Extract values from request
|
// Extract values from request
|
||||||
logger := util.GetLogger(req.Context())
|
logger := util.GetLogger(req.Context())
|
||||||
userID := device.UserID
|
userID := device.UserID
|
||||||
syncReq, err := newSyncRequest(req, *device)
|
syncReq, err := newSyncRequest(req, *device, rp.accountDB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: 400,
|
Code: 400,
|
||||||
|
|
@ -122,9 +122,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
||||||
func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) {
|
func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) {
|
||||||
// TODO: handle ignored users
|
// TODO: handle ignored users
|
||||||
if req.since == nil {
|
if req.since == nil {
|
||||||
res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit)
|
res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, &req.filter)
|
||||||
} else {
|
} else {
|
||||||
res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, currentPos, req.limit)
|
res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, currentPos, &req.filter)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -170,7 +170,7 @@ func (rp *RequestPool) appendAccountData(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync is not initial, get all account data since the latest sync
|
// Sync is not initial, get all account data since the latest sync
|
||||||
dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, *req.since, currentPos)
|
dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, *req.since, currentPos, &req.filter.AccountData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,8 @@
|
||||||
|
|
||||||
package gomatrix
|
package gomatrix
|
||||||
|
|
||||||
|
import "errors"
|
||||||
|
|
||||||
//Filter is used by clients to specify how the server should filter responses to e.g. sync requests
|
//Filter is used by clients to specify how the server should filter responses to e.g. sync requests
|
||||||
//Specified by: https://matrix.org/docs/spec/client_server/r0.2.0.html#filtering
|
//Specified by: https://matrix.org/docs/spec/client_server/r0.2.0.html#filtering
|
||||||
type Filter struct {
|
type Filter struct {
|
||||||
|
|
@ -21,23 +23,62 @@ type Filter struct {
|
||||||
EventFields []string `json:"event_fields,omitempty"`
|
EventFields []string `json:"event_fields,omitempty"`
|
||||||
EventFormat string `json:"event_format,omitempty"`
|
EventFormat string `json:"event_format,omitempty"`
|
||||||
Presence FilterPart `json:"presence,omitempty"`
|
Presence FilterPart `json:"presence,omitempty"`
|
||||||
Room struct {
|
Room FilterRoom `json:"room,omitempty"`
|
||||||
AccountData FilterPart `json:"account_data,omitempty"`
|
}
|
||||||
Ephemeral FilterPart `json:"ephemeral,omitempty"`
|
|
||||||
IncludeLeave bool `json:"include_leave,omitempty"`
|
type FilterRoom struct {
|
||||||
NotRooms []string `json:"not_rooms,omitempty"`
|
AccountData FilterPart `json:"account_data,omitempty"`
|
||||||
Rooms []string `json:"rooms,omitempty"`
|
Ephemeral FilterPart `json:"ephemeral,omitempty"`
|
||||||
State FilterPart `json:"state,omitempty"`
|
IncludeLeave bool `json:"include_leave,omitempty"`
|
||||||
Timeline FilterPart `json:"timeline,omitempty"`
|
NotRooms []string `json:"not_rooms,omitempty"`
|
||||||
} `json:"room,omitempty"`
|
Rooms []string `json:"rooms,omitempty"`
|
||||||
|
State FilterPart `json:"state,omitempty"`
|
||||||
|
Timeline FilterPart `json:"timeline,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type FilterPart struct {
|
type FilterPart struct {
|
||||||
NotRooms []string `json:"not_rooms,omitempty"`
|
NotRooms []string `json:"not_rooms,omitempty"`
|
||||||
Rooms []string `json:"rooms,omitempty"`
|
Rooms []string `json:"rooms,omitempty"`
|
||||||
Limit *int `json:"limit,omitempty"`
|
Limit int `json:"limit,omitempty"`
|
||||||
NotSenders []string `json:"not_senders,omitempty"`
|
NotSenders []string `json:"not_senders,omitempty"`
|
||||||
NotTypes []string `json:"not_types,omitempty"`
|
NotTypes []string `json:"not_types,omitempty"`
|
||||||
Senders []string `json:"senders,omitempty"`
|
Senders []string `json:"senders,omitempty"`
|
||||||
Types []string `json:"types,omitempty"`
|
Types []string `json:"types,omitempty"`
|
||||||
|
ContainsURL *bool `json:"contains_url,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (filter *Filter) Validate() error {
|
||||||
|
if filter.EventFormat != "client" && filter.EventFormat != "federation" {
|
||||||
|
return errors.New("Bad event_format value. Must be any of [\"client\", \"federation\"]")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func DefaultFilter() Filter {
|
||||||
|
return Filter{
|
||||||
|
AccountData: DefaultFilterPart(),
|
||||||
|
EventFields: nil,
|
||||||
|
EventFormat: "client",
|
||||||
|
Presence: DefaultFilterPart(),
|
||||||
|
Room: FilterRoom{
|
||||||
|
AccountData: DefaultFilterPart(),
|
||||||
|
Ephemeral: DefaultFilterPart(),
|
||||||
|
IncludeLeave: false,
|
||||||
|
NotRooms: nil,
|
||||||
|
Rooms: nil,
|
||||||
|
State: DefaultFilterPart(),
|
||||||
|
Timeline: DefaultFilterPart(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func DefaultFilterPart() FilterPart {
|
||||||
|
return FilterPart{
|
||||||
|
NotRooms: nil,
|
||||||
|
Rooms: nil,
|
||||||
|
Limit: 20,
|
||||||
|
NotSenders: nil,
|
||||||
|
NotTypes: nil,
|
||||||
|
Senders: nil,
|
||||||
|
Types: nil,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue