mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-16 11:23:11 -06:00
Merge branch 'master' into user_directory
This commit is contained in:
commit
af591d3848
|
|
@ -100,7 +100,7 @@ func SetupAppServiceAPIComponent(
|
|||
|
||||
// Set up HTTP Endpoints
|
||||
routing.Setup(
|
||||
base.APIMux, *base.Cfg, roomserverQueryAPI, roomserverAliasAPI,
|
||||
base.APIMux, base.Cfg, roomserverQueryAPI, roomserverAliasAPI,
|
||||
accountsDB, federation, transactionsCache,
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ const pathPrefixApp = "/_matrix/app/v1"
|
|||
// applied:
|
||||
// nolint: gocyclo
|
||||
func Setup(
|
||||
apiMux *mux.Router, cfg config.Dendrite, // nolint: unparam
|
||||
apiMux *mux.Router, cfg *config.Dendrite, // nolint: unparam
|
||||
queryAPI api.RoomserverQueryAPI, aliasAPI api.RoomserverAliasAPI, // nolint: unparam
|
||||
accountDB *accounts.Database, // nolint: unparam
|
||||
federation *gomatrixserverlib.FederationClient, // nolint: unparam
|
||||
|
|
|
|||
|
|
@ -90,6 +90,7 @@ func (s *accountDataStatements) selectAccountData(
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
|
||||
global = []gomatrixserverlib.ClientEvent{}
|
||||
rooms = make(map[string][]gomatrixserverlib.ClientEvent)
|
||||
|
|
@ -114,8 +115,7 @@ func (s *accountDataStatements) selectAccountData(
|
|||
global = append(global, ac)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
return global, rooms, rows.Err()
|
||||
}
|
||||
|
||||
func (s *accountDataStatements) selectAccountDataByType(
|
||||
|
|
|
|||
|
|
@ -122,11 +122,10 @@ func (s *membershipStatements) selectMembershipsByLocalpart(
|
|||
for rows.Next() {
|
||||
var m authtypes.Membership
|
||||
m.Localpart = localpart
|
||||
if err := rows.Scan(&m.RoomID, &m.EventID); err != nil {
|
||||
return nil, err
|
||||
if err = rows.Scan(&m.RoomID, &m.EventID); err != nil {
|
||||
return
|
||||
}
|
||||
memberships = append(memberships, m)
|
||||
}
|
||||
|
||||
return
|
||||
return memberships, rows.Err()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -97,6 +97,7 @@ func (s *threepidStatements) selectThreePIDsForLocalpart(
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
|
||||
threepids = []authtypes.ThreePID{}
|
||||
for rows.Next() {
|
||||
|
|
@ -110,8 +111,7 @@ func (s *threepidStatements) selectThreePIDsForLocalpart(
|
|||
Medium: medium,
|
||||
})
|
||||
}
|
||||
|
||||
return
|
||||
return threepids, rows.Err()
|
||||
}
|
||||
|
||||
func (s *threepidStatements) insertThreePID(
|
||||
|
|
|
|||
|
|
@ -19,10 +19,10 @@ import (
|
|||
"database/sql"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/clientapi/userutil"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
|
|
@ -80,6 +80,9 @@ const deleteDeviceSQL = "" +
|
|||
const deleteDevicesByLocalpartSQL = "" +
|
||||
"DELETE FROM device_devices WHERE localpart = $1"
|
||||
|
||||
const deleteDevicesSQL = "" +
|
||||
"DELETE FROM device_devices WHERE localpart = $1 AND device_id = ANY($2)"
|
||||
|
||||
type devicesStatements struct {
|
||||
insertDeviceStmt *sql.Stmt
|
||||
selectDeviceByTokenStmt *sql.Stmt
|
||||
|
|
@ -88,6 +91,7 @@ type devicesStatements struct {
|
|||
updateDeviceNameStmt *sql.Stmt
|
||||
deleteDeviceStmt *sql.Stmt
|
||||
deleteDevicesByLocalpartStmt *sql.Stmt
|
||||
deleteDevicesStmt *sql.Stmt
|
||||
serverName gomatrixserverlib.ServerName
|
||||
}
|
||||
|
||||
|
|
@ -117,6 +121,9 @@ func (s *devicesStatements) prepare(db *sql.DB, server gomatrixserverlib.ServerN
|
|||
if s.deleteDevicesByLocalpartStmt, err = db.Prepare(deleteDevicesByLocalpartSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.deleteDevicesStmt, err = db.Prepare(deleteDevicesSQL); err != nil {
|
||||
return
|
||||
}
|
||||
s.serverName = server
|
||||
return
|
||||
}
|
||||
|
|
@ -142,6 +149,7 @@ func (s *devicesStatements) insertDevice(
|
|||
}, nil
|
||||
}
|
||||
|
||||
// deleteDevice removes a single device by id and user localpart.
|
||||
func (s *devicesStatements) deleteDevice(
|
||||
ctx context.Context, txn *sql.Tx, id, localpart string,
|
||||
) error {
|
||||
|
|
@ -150,6 +158,18 @@ func (s *devicesStatements) deleteDevice(
|
|||
return err
|
||||
}
|
||||
|
||||
// deleteDevices removes a single or multiple devices by ids and user localpart.
|
||||
// Returns an error if the execution failed.
|
||||
func (s *devicesStatements) deleteDevices(
|
||||
ctx context.Context, txn *sql.Tx, localpart string, devices []string,
|
||||
) error {
|
||||
stmt := common.TxStmt(txn, s.deleteDevicesStmt)
|
||||
_, err := stmt.ExecContext(ctx, localpart, pq.Array(devices))
|
||||
return err
|
||||
}
|
||||
|
||||
// deleteDevicesByLocalpart removes all devices for the
|
||||
// given user localpart.
|
||||
func (s *devicesStatements) deleteDevicesByLocalpart(
|
||||
ctx context.Context, txn *sql.Tx, localpart string,
|
||||
) error {
|
||||
|
|
@ -206,6 +226,7 @@ func (s *devicesStatements) selectDevicesByLocalpart(
|
|||
if err != nil {
|
||||
return devices, err
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
|
||||
for rows.Next() {
|
||||
var dev authtypes.Device
|
||||
|
|
@ -217,5 +238,5 @@ func (s *devicesStatements) selectDevicesByLocalpart(
|
|||
devices = append(devices, dev)
|
||||
}
|
||||
|
||||
return devices, nil
|
||||
return devices, rows.Err()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -152,6 +152,21 @@ func (d *Database) RemoveDevice(
|
|||
})
|
||||
}
|
||||
|
||||
// RemoveDevices revokes one or more devices by deleting the entry in the database
|
||||
// matching with the given device IDs and user ID localpart.
|
||||
// If the devices don't exist, it will not return an error
|
||||
// If something went wrong during the deletion, it will return the SQL error.
|
||||
func (d *Database) RemoveDevices(
|
||||
ctx context.Context, localpart string, devices []string,
|
||||
) error {
|
||||
return common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||
if err := d.devices.deleteDevices(ctx, txn, localpart, devices); err != sql.ErrNoRows {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// RemoveAllDevices revokes devices by deleting the entry in the
|
||||
// database matching the given user ID localpart.
|
||||
// If something went wrong during the deletion, it will return the SQL error.
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ func SetupClientAPIComponent(
|
|||
}
|
||||
|
||||
routing.Setup(
|
||||
base.APIMux, *base.Cfg, roomserverProducer, queryAPI, aliasAPI, asAPI,
|
||||
base.APIMux, base.Cfg, roomserverProducer, queryAPI, aliasAPI, asAPI,
|
||||
accountsDB, deviceDB, federation, *keyRing, userUpdateProducer,
|
||||
syncProducer, typingProducer, transactionsCache, fedSenderAPI,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ func serveTemplate(w http.ResponseWriter, templateHTML string, data map[string]s
|
|||
// AuthFallback implements GET and POST /auth/{authType}/fallback/web?session={sessionID}
|
||||
func AuthFallback(
|
||||
w http.ResponseWriter, req *http.Request, authType string,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
) *util.JSONResponse {
|
||||
sessionID := req.URL.Query().Get("session")
|
||||
|
||||
|
|
@ -130,7 +130,7 @@ func AuthFallback(
|
|||
if req.Method == http.MethodGet {
|
||||
// Handle Recaptcha
|
||||
if authType == authtypes.LoginTypeRecaptcha {
|
||||
if err := checkRecaptchaEnabled(&cfg, w, req); err != nil {
|
||||
if err := checkRecaptchaEnabled(cfg, w, req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -144,7 +144,7 @@ func AuthFallback(
|
|||
} else if req.Method == http.MethodPost {
|
||||
// Handle Recaptcha
|
||||
if authType == authtypes.LoginTypeRecaptcha {
|
||||
if err := checkRecaptchaEnabled(&cfg, w, req); err != nil {
|
||||
if err := checkRecaptchaEnabled(cfg, w, req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -156,7 +156,7 @@ func AuthFallback(
|
|||
}
|
||||
|
||||
response := req.Form.Get("g-recaptcha-response")
|
||||
if err := validateRecaptcha(&cfg, response, clientIP); err != nil {
|
||||
if err := validateRecaptcha(cfg, response, clientIP); err != nil {
|
||||
util.GetLogger(req.Context()).Error(err)
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -134,7 +134,7 @@ type fledglingEvent struct {
|
|||
// CreateRoom implements /createRoom
|
||||
func CreateRoom(
|
||||
req *http.Request, device *authtypes.Device,
|
||||
cfg config.Dendrite, producer *producers.RoomserverProducer,
|
||||
cfg *config.Dendrite, producer *producers.RoomserverProducer,
|
||||
accountDB *accounts.Database, aliasAPI roomserverAPI.RoomserverAliasAPI,
|
||||
asAPI appserviceAPI.AppServiceQueryAPI,
|
||||
) util.JSONResponse {
|
||||
|
|
@ -148,7 +148,7 @@ func CreateRoom(
|
|||
// nolint: gocyclo
|
||||
func createRoom(
|
||||
req *http.Request, device *authtypes.Device,
|
||||
cfg config.Dendrite, roomID string, producer *producers.RoomserverProducer,
|
||||
cfg *config.Dendrite, roomID string, producer *producers.RoomserverProducer,
|
||||
accountDB *accounts.Database, aliasAPI roomserverAPI.RoomserverAliasAPI,
|
||||
asAPI appserviceAPI.AppServiceQueryAPI,
|
||||
) util.JSONResponse {
|
||||
|
|
@ -344,7 +344,7 @@ func createRoom(
|
|||
func buildEvent(
|
||||
builder *gomatrixserverlib.EventBuilder,
|
||||
provider gomatrixserverlib.AuthEventProvider,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
evTime time.Time,
|
||||
) (*gomatrixserverlib.Event, error) {
|
||||
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
|
||||
|
|
|
|||
|
|
@ -40,6 +40,10 @@ type deviceUpdateJSON struct {
|
|||
DisplayName *string `json:"display_name"`
|
||||
}
|
||||
|
||||
type devicesDeleteJSON struct {
|
||||
Devices []string `json:"devices"`
|
||||
}
|
||||
|
||||
// GetDeviceByID handles /devices/{deviceID}
|
||||
func GetDeviceByID(
|
||||
req *http.Request, deviceDB *devices.Database, device *authtypes.Device,
|
||||
|
|
@ -146,3 +150,54 @@ func UpdateDeviceByID(
|
|||
JSON: struct{}{},
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteDeviceById handles DELETE requests to /devices/{deviceId}
|
||||
func DeleteDeviceById(
|
||||
req *http.Request, deviceDB *devices.Database, device *authtypes.Device,
|
||||
deviceID string,
|
||||
) util.JSONResponse {
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
ctx := req.Context()
|
||||
|
||||
defer req.Body.Close() // nolint: errcheck
|
||||
|
||||
if err := deviceDB.RemoveDevice(ctx, deviceID, localpart); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: struct{}{},
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteDevices handles POST requests to /delete_devices
|
||||
func DeleteDevices(
|
||||
req *http.Request, deviceDB *devices.Database, device *authtypes.Device,
|
||||
) util.JSONResponse {
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
ctx := req.Context()
|
||||
payload := devicesDeleteJSON{}
|
||||
|
||||
if err := json.NewDecoder(req.Body).Decode(&payload); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
defer req.Body.Close() // nolint: errcheck
|
||||
|
||||
if err := deviceDB.RemoveDevices(ctx, localpart, payload.Devices); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: struct{}{},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ type getEventRequest struct {
|
|||
device *authtypes.Device
|
||||
roomID string
|
||||
eventID string
|
||||
cfg config.Dendrite
|
||||
cfg *config.Dendrite
|
||||
federation *gomatrixserverlib.FederationClient
|
||||
keyRing gomatrixserverlib.KeyRing
|
||||
requestedEvent gomatrixserverlib.Event
|
||||
|
|
@ -44,7 +44,7 @@ func GetEvent(
|
|||
device *authtypes.Device,
|
||||
roomID string,
|
||||
eventID string,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
queryAPI api.RoomserverQueryAPI,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
keyRing gomatrixserverlib.KeyRing,
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ func JoinRoomByIDOrAlias(
|
|||
req *http.Request,
|
||||
device *authtypes.Device,
|
||||
roomIDOrAlias string,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
producer *producers.RoomserverProducer,
|
||||
queryAPI roomserverAPI.RoomserverQueryAPI,
|
||||
|
|
@ -98,7 +98,7 @@ type joinRoomReq struct {
|
|||
evTime time.Time
|
||||
content map[string]interface{}
|
||||
userID string
|
||||
cfg config.Dendrite
|
||||
cfg *config.Dendrite
|
||||
federation *gomatrixserverlib.FederationClient
|
||||
producer *producers.RoomserverProducer
|
||||
queryAPI roomserverAPI.RoomserverQueryAPI
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ func passwordLogin() loginFlows {
|
|||
// Login implements GET and POST /login
|
||||
func Login(
|
||||
req *http.Request, accountDB *accounts.Database, deviceDB *devices.Database,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
) util.JSONResponse {
|
||||
if req.Method == http.MethodGet { // TODO: support other forms of login other than password, depending on config options
|
||||
return util.JSONResponse{
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ var errMissingUserID = errors.New("'user_id' must be supplied")
|
|||
// by building a m.room.member event then sending it to the room server
|
||||
func SendMembership(
|
||||
req *http.Request, accountDB *accounts.Database, device *authtypes.Device,
|
||||
roomID string, membership string, cfg config.Dendrite,
|
||||
roomID string, membership string, cfg *config.Dendrite,
|
||||
queryAPI roomserverAPI.RoomserverQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI,
|
||||
producer *producers.RoomserverProducer,
|
||||
) util.JSONResponse {
|
||||
|
|
@ -119,7 +119,7 @@ func buildMembershipEvent(
|
|||
body threepid.MembershipRequest, accountDB *accounts.Database,
|
||||
device *authtypes.Device,
|
||||
membership, roomID string,
|
||||
cfg config.Dendrite, evTime time.Time,
|
||||
cfg *config.Dendrite, evTime time.Time,
|
||||
queryAPI roomserverAPI.RoomserverQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI,
|
||||
) (*gomatrixserverlib.Event, error) {
|
||||
stateKey, reason, err := getMembershipStateKey(body, device, membership)
|
||||
|
|
@ -165,7 +165,7 @@ func buildMembershipEvent(
|
|||
func loadProfile(
|
||||
ctx context.Context,
|
||||
userID string,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
accountDB *accounts.Database,
|
||||
asAPI appserviceAPI.AppServiceQueryAPI,
|
||||
) (*authtypes.Profile, error) {
|
||||
|
|
@ -214,7 +214,7 @@ func checkAndProcessThreepid(
|
|||
req *http.Request,
|
||||
device *authtypes.Device,
|
||||
body *threepid.MembershipRequest,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
queryAPI roomserverAPI.RoomserverQueryAPI,
|
||||
accountDB *accounts.Database,
|
||||
producer *producers.RoomserverProducer,
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ type response struct {
|
|||
// GetMemberships implements GET /rooms/{roomId}/members
|
||||
func GetMemberships(
|
||||
req *http.Request, device *authtypes.Device, roomID string, joinedOnly bool,
|
||||
_ config.Dendrite,
|
||||
_ *config.Dendrite,
|
||||
queryAPI api.RoomserverQueryAPI,
|
||||
) util.JSONResponse {
|
||||
queryReq := api.QueryMembershipsForRoomRequest{
|
||||
|
|
|
|||
|
|
@ -343,7 +343,7 @@ func buildMembershipEvents(
|
|||
return nil, err
|
||||
}
|
||||
|
||||
event, err := common.BuildEvent(ctx, &builder, *cfg, evTime, queryAPI, nil)
|
||||
event, err := common.BuildEvent(ctx, &builder, cfg, evTime, queryAPI, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -990,7 +990,7 @@ type availableResponse struct {
|
|||
// RegisterAvailable checks if the username is already taken or invalid.
|
||||
func RegisterAvailable(
|
||||
req *http.Request,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
accountDB *accounts.Database,
|
||||
) util.JSONResponse {
|
||||
username := req.URL.Query().Get("username")
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ const pathPrefixUnstable = "/_matrix/client/unstable"
|
|||
// applied:
|
||||
// nolint: gocyclo
|
||||
func Setup(
|
||||
apiMux *mux.Router, cfg config.Dendrite,
|
||||
apiMux *mux.Router, cfg *config.Dendrite,
|
||||
producer *producers.RoomserverProducer,
|
||||
queryAPI roomserverAPI.RoomserverQueryAPI,
|
||||
aliasAPI roomserverAPI.RoomserverAliasAPI,
|
||||
|
|
@ -170,11 +170,11 @@ func Setup(
|
|||
).Methods(http.MethodPut, http.MethodOptions)
|
||||
|
||||
r0mux.Handle("/register", common.MakeExternalAPI("register", func(req *http.Request) util.JSONResponse {
|
||||
return Register(req, accountDB, deviceDB, &cfg)
|
||||
return Register(req, accountDB, deviceDB, cfg)
|
||||
})).Methods(http.MethodPost, http.MethodOptions)
|
||||
|
||||
v1mux.Handle("/register", common.MakeExternalAPI("register", func(req *http.Request) util.JSONResponse {
|
||||
return LegacyRegister(req, accountDB, deviceDB, &cfg)
|
||||
return LegacyRegister(req, accountDB, deviceDB, cfg)
|
||||
})).Methods(http.MethodPost, http.MethodOptions)
|
||||
|
||||
r0mux.Handle("/register/available", common.MakeExternalAPI("registerAvailable", func(req *http.Request) util.JSONResponse {
|
||||
|
|
@ -187,7 +187,7 @@ func Setup(
|
|||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return DirectoryRoom(req, vars["roomAlias"], federation, &cfg, aliasAPI, federationSender)
|
||||
return DirectoryRoom(req, vars["roomAlias"], federation, cfg, aliasAPI, federationSender)
|
||||
}),
|
||||
).Methods(http.MethodGet, http.MethodOptions)
|
||||
|
||||
|
|
@ -197,7 +197,7 @@ func Setup(
|
|||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return SetLocalAlias(req, device, vars["roomAlias"], &cfg, aliasAPI)
|
||||
return SetLocalAlias(req, device, vars["roomAlias"], cfg, aliasAPI)
|
||||
}),
|
||||
).Methods(http.MethodPut, http.MethodOptions)
|
||||
|
||||
|
|
@ -301,7 +301,7 @@ func Setup(
|
|||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return GetProfile(req, accountDB, &cfg, vars["userID"], asAPI, federation)
|
||||
return GetProfile(req, accountDB, cfg, vars["userID"], asAPI, federation)
|
||||
}),
|
||||
).Methods(http.MethodGet, http.MethodOptions)
|
||||
|
||||
|
|
@ -311,7 +311,7 @@ func Setup(
|
|||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return GetAvatarURL(req, accountDB, &cfg, vars["userID"], asAPI, federation)
|
||||
return GetAvatarURL(req, accountDB, cfg, vars["userID"], asAPI, federation)
|
||||
}),
|
||||
).Methods(http.MethodGet, http.MethodOptions)
|
||||
|
||||
|
|
@ -321,7 +321,7 @@ func Setup(
|
|||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return SetAvatarURL(req, accountDB, device, vars["userID"], userUpdateProducer, &cfg, producer, queryAPI)
|
||||
return SetAvatarURL(req, accountDB, device, vars["userID"], userUpdateProducer, cfg, producer, queryAPI)
|
||||
}),
|
||||
).Methods(http.MethodPut, http.MethodOptions)
|
||||
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
||||
|
|
@ -333,7 +333,7 @@ func Setup(
|
|||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return GetDisplayName(req, accountDB, &cfg, vars["userID"], asAPI, federation)
|
||||
return GetDisplayName(req, accountDB, cfg, vars["userID"], asAPI, federation)
|
||||
}),
|
||||
).Methods(http.MethodGet, http.MethodOptions)
|
||||
|
||||
|
|
@ -343,7 +343,7 @@ func Setup(
|
|||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return SetDisplayName(req, accountDB, device, vars["userID"], userUpdateProducer, &cfg, producer, queryAPI)
|
||||
return SetDisplayName(req, accountDB, device, vars["userID"], userUpdateProducer, cfg, producer, queryAPI)
|
||||
}),
|
||||
).Methods(http.MethodPut, http.MethodOptions)
|
||||
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
||||
|
|
@ -503,6 +503,22 @@ func Setup(
|
|||
}),
|
||||
).Methods(http.MethodPut, http.MethodOptions)
|
||||
|
||||
r0mux.Handle("/devices/{deviceID}",
|
||||
common.MakeAuthAPI("delete_device", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
vars, err := common.URLDecodeMapValues(mux.Vars(req))
|
||||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return DeleteDeviceById(req, deviceDB, device, vars["deviceID"])
|
||||
}),
|
||||
).Methods(http.MethodDelete, http.MethodOptions)
|
||||
|
||||
r0mux.Handle("/delete_devices",
|
||||
common.MakeAuthAPI("delete_devices", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
return DeleteDevices(req, deviceDB, device)
|
||||
}),
|
||||
).Methods(http.MethodPost, http.MethodOptions)
|
||||
|
||||
// Stub implementations for sytest
|
||||
r0mux.Handle("/events",
|
||||
common.MakeExternalAPI("events", func(req *http.Request) util.JSONResponse {
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ func SendEvent(
|
|||
req *http.Request,
|
||||
device *authtypes.Device,
|
||||
roomID, eventType string, txnID, stateKey *string,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
queryAPI api.RoomserverQueryAPI,
|
||||
producer *producers.RoomserverProducer,
|
||||
txnCache *transactions.Cache,
|
||||
|
|
@ -93,7 +93,7 @@ func generateSendEvent(
|
|||
req *http.Request,
|
||||
device *authtypes.Device,
|
||||
roomID, eventType string, stateKey *string,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
queryAPI api.RoomserverQueryAPI,
|
||||
) (*gomatrixserverlib.Event, *util.JSONResponse) {
|
||||
// parse the incoming http request
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ type threePIDsResponse struct {
|
|||
// RequestEmailToken implements:
|
||||
// POST /account/3pid/email/requestToken
|
||||
// POST /register/email/requestToken
|
||||
func RequestEmailToken(req *http.Request, accountDB *accounts.Database, cfg config.Dendrite) util.JSONResponse {
|
||||
func RequestEmailToken(req *http.Request, accountDB *accounts.Database, cfg *config.Dendrite) util.JSONResponse {
|
||||
var body threepid.EmailAssociationRequest
|
||||
if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil {
|
||||
return *reqErr
|
||||
|
|
@ -83,7 +83,7 @@ func RequestEmailToken(req *http.Request, accountDB *accounts.Database, cfg conf
|
|||
// CheckAndSave3PIDAssociation implements POST /account/3pid
|
||||
func CheckAndSave3PIDAssociation(
|
||||
req *http.Request, accountDB *accounts.Database, device *authtypes.Device,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
) util.JSONResponse {
|
||||
var body threepid.EmailAssociationCheckRequest
|
||||
if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil {
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ import (
|
|||
|
||||
// RequestTurnServer implements:
|
||||
// GET /voip/turnServer
|
||||
func RequestTurnServer(req *http.Request, device *authtypes.Device, cfg config.Dendrite) util.JSONResponse {
|
||||
func RequestTurnServer(req *http.Request, device *authtypes.Device, cfg *config.Dendrite) util.JSONResponse {
|
||||
turnConfig := cfg.TURN
|
||||
|
||||
// TODO Guest Support
|
||||
|
|
|
|||
|
|
@ -86,7 +86,7 @@ var (
|
|||
// can be emitted.
|
||||
func CheckAndProcessInvite(
|
||||
ctx context.Context,
|
||||
device *authtypes.Device, body *MembershipRequest, cfg config.Dendrite,
|
||||
device *authtypes.Device, body *MembershipRequest, cfg *config.Dendrite,
|
||||
queryAPI api.RoomserverQueryAPI, db *accounts.Database,
|
||||
producer *producers.RoomserverProducer, membership string, roomID string,
|
||||
evTime time.Time,
|
||||
|
|
@ -137,7 +137,7 @@ func CheckAndProcessInvite(
|
|||
// Returns an error if a check or a request failed.
|
||||
func queryIDServer(
|
||||
ctx context.Context,
|
||||
db *accounts.Database, cfg config.Dendrite, device *authtypes.Device,
|
||||
db *accounts.Database, cfg *config.Dendrite, device *authtypes.Device,
|
||||
body *MembershipRequest, roomID string,
|
||||
) (lookupRes *idServerLookupResponse, storeInviteRes *idServerStoreInviteResponse, err error) {
|
||||
if err = isTrusted(body.IDServer, cfg); err != nil {
|
||||
|
|
@ -206,7 +206,7 @@ func queryIDServerLookup(ctx context.Context, body *MembershipRequest) (*idServe
|
|||
// Returns an error if the request failed to send or if the response couldn't be parsed.
|
||||
func queryIDServerStoreInvite(
|
||||
ctx context.Context,
|
||||
db *accounts.Database, cfg config.Dendrite, device *authtypes.Device,
|
||||
db *accounts.Database, cfg *config.Dendrite, device *authtypes.Device,
|
||||
body *MembershipRequest, roomID string,
|
||||
) (*idServerStoreInviteResponse, error) {
|
||||
// Retrieve the sender's profile to get their display name
|
||||
|
|
@ -330,7 +330,7 @@ func checkIDServerSignatures(
|
|||
func emit3PIDInviteEvent(
|
||||
ctx context.Context,
|
||||
body *MembershipRequest, res *idServerStoreInviteResponse,
|
||||
device *authtypes.Device, roomID string, cfg config.Dendrite,
|
||||
device *authtypes.Device, roomID string, cfg *config.Dendrite,
|
||||
queryAPI api.RoomserverQueryAPI, producer *producers.RoomserverProducer,
|
||||
evTime time.Time,
|
||||
) error {
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ type Credentials struct {
|
|||
// Returns an error if there was a problem sending the request or decoding the
|
||||
// response, or if the identity server responded with a non-OK status.
|
||||
func CreateSession(
|
||||
ctx context.Context, req EmailAssociationRequest, cfg config.Dendrite,
|
||||
ctx context.Context, req EmailAssociationRequest, cfg *config.Dendrite,
|
||||
) (string, error) {
|
||||
if err := isTrusted(req.IDServer, cfg); err != nil {
|
||||
return "", err
|
||||
|
|
@ -101,7 +101,7 @@ func CreateSession(
|
|||
// Returns an error if there was a problem sending the request or decoding the
|
||||
// response, or if the identity server responded with a non-OK status.
|
||||
func CheckAssociation(
|
||||
ctx context.Context, creds Credentials, cfg config.Dendrite,
|
||||
ctx context.Context, creds Credentials, cfg *config.Dendrite,
|
||||
) (bool, string, string, error) {
|
||||
if err := isTrusted(creds.IDServer, cfg); err != nil {
|
||||
return false, "", "", err
|
||||
|
|
@ -142,7 +142,7 @@ func CheckAssociation(
|
|||
// identifier and a Matrix ID.
|
||||
// Returns an error if there was a problem sending the request or decoding the
|
||||
// response, or if the identity server responded with a non-OK status.
|
||||
func PublishAssociation(creds Credentials, userID string, cfg config.Dendrite) error {
|
||||
func PublishAssociation(creds Credentials, userID string, cfg *config.Dendrite) error {
|
||||
if err := isTrusted(creds.IDServer, cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -177,7 +177,7 @@ func PublishAssociation(creds Credentials, userID string, cfg config.Dendrite) e
|
|||
// isTrusted checks if a given identity server is part of the list of trusted
|
||||
// identity servers in the configuration file.
|
||||
// Returns an error if the server isn't trusted.
|
||||
func isTrusted(idServer string, cfg config.Dendrite) error {
|
||||
func isTrusted(idServer string, cfg *config.Dendrite) error {
|
||||
for _, server := range cfg.Matrix.TrustedIDServers {
|
||||
if idServer == server {
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ var ErrRoomNoExists = errors.New("Room does not exist")
|
|||
// Returns an error if something else went wrong
|
||||
func BuildEvent(
|
||||
ctx context.Context,
|
||||
builder *gomatrixserverlib.EventBuilder, cfg config.Dendrite, evTime time.Time,
|
||||
builder *gomatrixserverlib.EventBuilder, cfg *config.Dendrite, evTime time.Time,
|
||||
queryAPI api.RoomserverQueryAPI, queryRes *api.QueryLatestEventsAndStateResponse,
|
||||
) (*gomatrixserverlib.Event, error) {
|
||||
err := AddPrevEventsToEvent(ctx, builder, queryAPI, queryRes)
|
||||
|
|
|
|||
|
|
@ -117,7 +117,7 @@ func (s *serverKeyStatements) bulkSelectServerKeys(
|
|||
ExpiredTS: gomatrixserverlib.Timestamp(expiredTS),
|
||||
}
|
||||
}
|
||||
return results, nil
|
||||
return results, rows.Err()
|
||||
}
|
||||
|
||||
func (s *serverKeyStatements) upsertServerKeys(
|
||||
|
|
|
|||
|
|
@ -99,7 +99,7 @@ func (s *PartitionOffsetStatements) selectPartitionOffsets(
|
|||
}
|
||||
results = append(results, offset)
|
||||
}
|
||||
return results, nil
|
||||
return results, rows.Err()
|
||||
}
|
||||
|
||||
// UpsertPartitionOffset updates or inserts the partition offset for the given topic.
|
||||
|
|
|
|||
|
|
@ -111,6 +111,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
|
|||
// Bind to the same address as the listen address
|
||||
// All microservices are run on the same host in testing
|
||||
cfg.Bind.ClientAPI = cfg.Listen.ClientAPI
|
||||
cfg.Bind.AppServiceAPI = cfg.Listen.AppServiceAPI
|
||||
cfg.Bind.FederationAPI = cfg.Listen.FederationAPI
|
||||
cfg.Bind.MediaAPI = cfg.Listen.MediaAPI
|
||||
cfg.Bind.RoomServer = cfg.Listen.RoomServer
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ args:
|
|||
user: dendrite
|
||||
database: dendrite
|
||||
host: 127.0.0.1
|
||||
sslmode: disable
|
||||
type: pg
|
||||
EOF
|
||||
```
|
||||
|
|
|
|||
|
|
@ -45,8 +45,8 @@ func SetupFederationAPIComponent(
|
|||
roomserverProducer := producers.NewRoomserverProducer(inputAPI)
|
||||
|
||||
routing.Setup(
|
||||
base.APIMux, *base.Cfg, queryAPI, aliasAPI, asAPI,
|
||||
roomserverProducer, federationSenderAPI, *keyRing, federation, accountsDB,
|
||||
deviceDB,
|
||||
base.APIMux, base.Cfg, queryAPI, aliasAPI, asAPI,
|
||||
roomserverProducer, federationSenderAPI, *keyRing,
|
||||
federation, accountsDB, deviceDB,
|
||||
)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ func Backfill(
|
|||
request *gomatrixserverlib.FederationRequest,
|
||||
query api.RoomserverQueryAPI,
|
||||
roomID string,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
) util.JSONResponse {
|
||||
var res api.QueryBackfillResponse
|
||||
var eIDs []string
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ func Invite(
|
|||
request *gomatrixserverlib.FederationRequest,
|
||||
roomID string,
|
||||
eventID string,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
producer *producers.RoomserverProducer,
|
||||
keys gomatrixserverlib.KeyRing,
|
||||
) util.JSONResponse {
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ import (
|
|||
func MakeJoin(
|
||||
httpReq *http.Request,
|
||||
request *gomatrixserverlib.FederationRequest,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
query api.RoomserverQueryAPI,
|
||||
roomID, userID string,
|
||||
) util.JSONResponse {
|
||||
|
|
@ -97,7 +97,7 @@ func MakeJoin(
|
|||
func SendJoin(
|
||||
httpReq *http.Request,
|
||||
request *gomatrixserverlib.FederationRequest,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
query api.RoomserverQueryAPI,
|
||||
producer *producers.RoomserverProducer,
|
||||
keys gomatrixserverlib.KeyRing,
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ import (
|
|||
|
||||
// LocalKeys returns the local keys for the server.
|
||||
// See https://matrix.org/docs/spec/server_server/unstable.html#publishing-keys
|
||||
func LocalKeys(cfg config.Dendrite) util.JSONResponse {
|
||||
func LocalKeys(cfg *config.Dendrite) util.JSONResponse {
|
||||
keys, err := localKeys(cfg, time.Now().Add(cfg.Matrix.KeyValidityPeriod))
|
||||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
|
|
@ -35,7 +35,7 @@ func LocalKeys(cfg config.Dendrite) util.JSONResponse {
|
|||
return util.JSONResponse{Code: http.StatusOK, JSON: keys}
|
||||
}
|
||||
|
||||
func localKeys(cfg config.Dendrite, validUntil time.Time) (*gomatrixserverlib.ServerKeys, error) {
|
||||
func localKeys(cfg *config.Dendrite, validUntil time.Time) (*gomatrixserverlib.ServerKeys, error) {
|
||||
var keys gomatrixserverlib.ServerKeys
|
||||
|
||||
keys.ServerName = cfg.Matrix.ServerName
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ import (
|
|||
func MakeLeave(
|
||||
httpReq *http.Request,
|
||||
request *gomatrixserverlib.FederationRequest,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
query api.RoomserverQueryAPI,
|
||||
roomID, userID string,
|
||||
) util.JSONResponse {
|
||||
|
|
@ -95,7 +95,7 @@ func MakeLeave(
|
|||
func SendLeave(
|
||||
httpReq *http.Request,
|
||||
request *gomatrixserverlib.FederationRequest,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
producer *producers.RoomserverProducer,
|
||||
keys gomatrixserverlib.KeyRing,
|
||||
roomID, eventID string,
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ import (
|
|||
func GetProfile(
|
||||
httpReq *http.Request,
|
||||
accountDB *accounts.Database,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
asAPI appserviceAPI.AppServiceQueryAPI,
|
||||
) util.JSONResponse {
|
||||
userID, field := httpReq.FormValue("user_id"), httpReq.FormValue("field")
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ import (
|
|||
func RoomAliasToID(
|
||||
httpReq *http.Request,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
aliasAPI roomserverAPI.RoomserverAliasAPI,
|
||||
senderAPI federationSenderAPI.FederationSenderQueryAPI,
|
||||
) util.JSONResponse {
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ const (
|
|||
// nolint: gocyclo
|
||||
func Setup(
|
||||
apiMux *mux.Router,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
query roomserverAPI.RoomserverQueryAPI,
|
||||
aliasAPI roomserverAPI.RoomserverAliasAPI,
|
||||
asAPI appserviceAPI.AppServiceQueryAPI,
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ func Send(
|
|||
httpReq *http.Request,
|
||||
request *gomatrixserverlib.FederationRequest,
|
||||
txnID gomatrixserverlib.TransactionID,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
query api.RoomserverQueryAPI,
|
||||
producer *producers.RoomserverProducer,
|
||||
keys gomatrixserverlib.KeyRing,
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ var (
|
|||
// CreateInvitesFrom3PIDInvites implements POST /_matrix/federation/v1/3pid/onbind
|
||||
func CreateInvitesFrom3PIDInvites(
|
||||
req *http.Request, queryAPI roomserverAPI.RoomserverQueryAPI,
|
||||
asAPI appserviceAPI.AppServiceQueryAPI, cfg config.Dendrite,
|
||||
asAPI appserviceAPI.AppServiceQueryAPI, cfg *config.Dendrite,
|
||||
producer *producers.RoomserverProducer, federation *gomatrixserverlib.FederationClient,
|
||||
accountDB *accounts.Database,
|
||||
) util.JSONResponse {
|
||||
|
|
@ -98,7 +98,7 @@ func ExchangeThirdPartyInvite(
|
|||
request *gomatrixserverlib.FederationRequest,
|
||||
roomID string,
|
||||
queryAPI roomserverAPI.RoomserverQueryAPI,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
producer *producers.RoomserverProducer,
|
||||
) util.JSONResponse {
|
||||
|
|
@ -172,7 +172,7 @@ func ExchangeThirdPartyInvite(
|
|||
// necessary data to do so.
|
||||
func createInviteFrom3PIDInvite(
|
||||
ctx context.Context, queryAPI roomserverAPI.RoomserverQueryAPI,
|
||||
asAPI appserviceAPI.AppServiceQueryAPI, cfg config.Dendrite,
|
||||
asAPI appserviceAPI.AppServiceQueryAPI, cfg *config.Dendrite,
|
||||
inv invite, federation *gomatrixserverlib.FederationClient,
|
||||
accountDB *accounts.Database,
|
||||
) (*gomatrixserverlib.Event, error) {
|
||||
|
|
@ -230,7 +230,7 @@ func createInviteFrom3PIDInvite(
|
|||
func buildMembershipEvent(
|
||||
ctx context.Context,
|
||||
builder *gomatrixserverlib.EventBuilder, queryAPI roomserverAPI.RoomserverQueryAPI,
|
||||
cfg config.Dendrite,
|
||||
cfg *config.Dendrite,
|
||||
) (*gomatrixserverlib.Event, error) {
|
||||
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
|
||||
if err != nil {
|
||||
|
|
@ -290,7 +290,7 @@ func buildMembershipEvent(
|
|||
// them responded with an error.
|
||||
func sendToRemoteServer(
|
||||
ctx context.Context, inv invite,
|
||||
federation *gomatrixserverlib.FederationClient, _ config.Dendrite,
|
||||
federation *gomatrixserverlib.FederationClient, _ *config.Dendrite,
|
||||
builder gomatrixserverlib.EventBuilder,
|
||||
) (err error) {
|
||||
remoteServers := make([]gomatrixserverlib.ServerName, 2)
|
||||
|
|
|
|||
|
|
@ -132,5 +132,5 @@ func joinedHostsFromStmt(
|
|||
})
|
||||
}
|
||||
|
||||
return result, nil
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
|
|
|||
2
go.mod
2
go.mod
|
|
@ -18,8 +18,6 @@ require (
|
|||
github.com/matrix-org/naffka v0.0.0-20171115094957-662bfd0841d0
|
||||
github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5
|
||||
github.com/miekg/dns v1.1.12 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.1 // indirect
|
||||
github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5
|
||||
github.com/opentracing/opentracing-go v1.0.2
|
||||
github.com/pierrec/lz4 v0.0.0-20161206202305-5c9560bfa9ac // indirect
|
||||
|
|
|
|||
1
go.sum
1
go.sum
|
|
@ -183,6 +183,7 @@ gopkg.in/h2non/bimg.v1 v1.0.18 h1:qn6/RpBHt+7WQqoBcK+aF2puc6nC78eZj5LexxoalT4=
|
|||
gopkg.in/h2non/bimg.v1 v1.0.18/go.mod h1:PgsZL7dLwUbsGm1NYps320GxGgvQNTnecMCZqxV11So=
|
||||
gopkg.in/h2non/gock.v1 v1.0.14 h1:fTeu9fcUvSnLNacYvYI54h+1/XEteDyHvrVCZEEEYNM=
|
||||
gopkg.in/h2non/gock.v1 v1.0.14/go.mod h1:sX4zAkdYX1TRGJ2JY156cFspQn4yRWn6p9EMdODlynE=
|
||||
gopkg.in/macaroon.v2 v2.1.0 h1:HZcsjBCzq9t0eBPMKqTN/uSN6JOm78ZJ2INbqcBQOUI=
|
||||
gopkg.in/macaroon.v2 v2.1.0/go.mod h1:OUb+TQP/OP0WOerC2Jp/3CwhIKyIa9kQjuc7H24e6/o=
|
||||
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
|
|
|
|||
|
|
@ -144,6 +144,7 @@ func (s *thumbnailStatements) selectThumbnails(
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
|
||||
var thumbnails []*types.ThumbnailMetadata
|
||||
for rows.Next() {
|
||||
|
|
@ -167,5 +168,5 @@ func (s *thumbnailStatements) selectThumbnails(
|
|||
thumbnails = append(thumbnails, &thumbnailMetadata)
|
||||
}
|
||||
|
||||
return thumbnails, err
|
||||
return thumbnails, rows.Err()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -203,6 +203,7 @@ func (s *publicRoomsStatements) selectPublicRooms(
|
|||
if err != nil {
|
||||
return []types.PublicRoom{}, nil
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
|
||||
rooms := []types.PublicRoom{}
|
||||
for rows.Next() {
|
||||
|
|
@ -222,7 +223,7 @@ func (s *publicRoomsStatements) selectPublicRooms(
|
|||
rooms = append(rooms, r)
|
||||
}
|
||||
|
||||
return rooms, nil
|
||||
return rooms, rows.Err()
|
||||
}
|
||||
|
||||
func (s *publicRoomsStatements) selectRoomVisibility(
|
||||
|
|
|
|||
|
|
@ -102,5 +102,5 @@ func (s *eventJSONStatements) bulkSelectEventJSON(
|
|||
}
|
||||
result.EventNID = types.EventNID(eventNID)
|
||||
}
|
||||
return results[:i], nil
|
||||
return results[:i], rows.Err()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -125,7 +125,7 @@ func (s *eventStateKeyStatements) bulkSelectEventStateKeyNID(
|
|||
}
|
||||
result[stateKey] = types.EventStateKeyNID(stateKeyNID)
|
||||
}
|
||||
return result, nil
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
func (s *eventStateKeyStatements) bulkSelectEventStateKey(
|
||||
|
|
@ -150,5 +150,5 @@ func (s *eventStateKeyStatements) bulkSelectEventStateKey(
|
|||
}
|
||||
result[types.EventStateKeyNID(stateKeyNID)] = stateKey
|
||||
}
|
||||
return result, nil
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -143,5 +143,5 @@ func (s *eventTypeStatements) bulkSelectEventTypeNID(
|
|||
}
|
||||
result[eventType] = types.EventTypeNID(eventTypeNID)
|
||||
}
|
||||
return result, nil
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -209,6 +209,9 @@ func (s *eventStatements) bulkSelectStateEventByID(
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
if err = rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if i != len(eventIDs) {
|
||||
// If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have.
|
||||
// We don't know which ones were missing because we don't return the string IDs in the query.
|
||||
|
|
@ -219,7 +222,7 @@ func (s *eventStatements) bulkSelectStateEventByID(
|
|||
fmt.Sprintf("storage: state event IDs missing from the database (%d != %d)", i, len(eventIDs)),
|
||||
)
|
||||
}
|
||||
return results, err
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// bulkSelectStateAtEventByID lookups the state at a list of events by event ID.
|
||||
|
|
@ -251,12 +254,15 @@ func (s *eventStatements) bulkSelectStateAtEventByID(
|
|||
)
|
||||
}
|
||||
}
|
||||
if err = rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if i != len(eventIDs) {
|
||||
return nil, types.MissingEventError(
|
||||
fmt.Sprintf("storage: event IDs missing from the database (%d != %d)", i, len(eventIDs)),
|
||||
)
|
||||
}
|
||||
return results, err
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (s *eventStatements) updateEventState(
|
||||
|
|
@ -321,6 +327,9 @@ func (s *eventStatements) bulkSelectStateAtEventAndReference(
|
|||
result.EventID = eventID
|
||||
result.EventSHA256 = eventSHA256
|
||||
}
|
||||
if err = rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if i != len(eventNIDs) {
|
||||
return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs))
|
||||
}
|
||||
|
|
@ -343,6 +352,9 @@ func (s *eventStatements) bulkSelectEventReference(
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
if err = rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if i != len(eventNIDs) {
|
||||
return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs))
|
||||
}
|
||||
|
|
@ -366,6 +378,9 @@ func (s *eventStatements) bulkSelectEventID(ctx context.Context, eventNIDs []typ
|
|||
}
|
||||
results[types.EventNID(eventNID)] = eventID
|
||||
}
|
||||
if err = rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if i != len(eventNIDs) {
|
||||
return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs))
|
||||
}
|
||||
|
|
@ -389,7 +404,7 @@ func (s *eventStatements) bulkSelectEventNID(ctx context.Context, eventIDs []str
|
|||
}
|
||||
results[eventID] = types.EventNID(eventNID)
|
||||
}
|
||||
return results, nil
|
||||
return results, rows.Err()
|
||||
}
|
||||
|
||||
func (s *eventStatements) selectMaxEventDepth(ctx context.Context, eventNIDs []types.EventNID) (int64, error) {
|
||||
|
|
|
|||
|
|
@ -114,21 +114,23 @@ func (s *inviteStatements) insertInviteEvent(
|
|||
func (s *inviteStatements) updateInviteRetired(
|
||||
ctx context.Context,
|
||||
txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
|
||||
) (eventIDs []string, err error) {
|
||||
) ([]string, error) {
|
||||
stmt := common.TxStmt(txn, s.updateInviteRetiredStmt)
|
||||
rows, err := stmt.QueryContext(ctx, roomNID, targetUserNID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer (func() { err = rows.Close() })()
|
||||
defer rows.Close() // nolint: errcheck
|
||||
|
||||
var eventIDs []string
|
||||
for rows.Next() {
|
||||
var inviteEventID string
|
||||
if err := rows.Scan(&inviteEventID); err != nil {
|
||||
if err = rows.Scan(&inviteEventID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventIDs = append(eventIDs, inviteEventID)
|
||||
}
|
||||
return
|
||||
return eventIDs, rows.Err()
|
||||
}
|
||||
|
||||
// selectInviteActiveForUserInRoom returns a list of sender state key NIDs
|
||||
|
|
@ -151,5 +153,5 @@ func (s *inviteStatements) selectInviteActiveForUserInRoom(
|
|||
}
|
||||
result = append(result, types.EventStateKeyNID(senderUserNID))
|
||||
}
|
||||
return result, nil
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -151,6 +151,7 @@ func (s *membershipStatements) selectMembershipsFromRoom(
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
|
||||
for rows.Next() {
|
||||
var eNID types.EventNID
|
||||
|
|
@ -159,8 +160,9 @@ func (s *membershipStatements) selectMembershipsFromRoom(
|
|||
}
|
||||
eventNIDs = append(eventNIDs, eNID)
|
||||
}
|
||||
return
|
||||
return eventNIDs, rows.Err()
|
||||
}
|
||||
|
||||
func (s *membershipStatements) selectMembershipsFromRoomAndMembership(
|
||||
ctx context.Context,
|
||||
roomNID types.RoomNID, membership membershipState,
|
||||
|
|
@ -170,6 +172,7 @@ func (s *membershipStatements) selectMembershipsFromRoomAndMembership(
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
|
||||
for rows.Next() {
|
||||
var eNID types.EventNID
|
||||
|
|
@ -178,7 +181,7 @@ func (s *membershipStatements) selectMembershipsFromRoomAndMembership(
|
|||
}
|
||||
eventNIDs = append(eventNIDs, eNID)
|
||||
}
|
||||
return
|
||||
return eventNIDs, rows.Err()
|
||||
}
|
||||
|
||||
func (s *membershipStatements) updateMembership(
|
||||
|
|
|
|||
|
|
@ -90,23 +90,23 @@ func (s *roomAliasesStatements) selectRoomIDFromAlias(
|
|||
|
||||
func (s *roomAliasesStatements) selectAliasesFromRoomID(
|
||||
ctx context.Context, roomID string,
|
||||
) (aliases []string, err error) {
|
||||
aliases = []string{}
|
||||
) ([]string, error) {
|
||||
rows, err := s.selectAliasesFromRoomIDStmt.QueryContext(ctx, roomID)
|
||||
if err != nil {
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
|
||||
var aliases []string
|
||||
for rows.Next() {
|
||||
var alias string
|
||||
if err = rows.Scan(&alias); err != nil {
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
|
||||
aliases = append(aliases, alias)
|
||||
}
|
||||
|
||||
return
|
||||
return aliases, rows.Err()
|
||||
}
|
||||
|
||||
func (s *roomAliasesStatements) selectCreatorIDFromAlias(
|
||||
|
|
|
|||
|
|
@ -152,7 +152,7 @@ func (s *stateBlockStatements) bulkSelectStateBlockEntries(
|
|||
eventNID int64
|
||||
entry types.StateEntry
|
||||
)
|
||||
if err := rows.Scan(
|
||||
if err = rows.Scan(
|
||||
&stateBlockNID, &eventTypeNID, &eventStateKeyNID, &eventNID,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
|
|
@ -169,10 +169,13 @@ func (s *stateBlockStatements) bulkSelectStateBlockEntries(
|
|||
}
|
||||
current.StateEntries = append(current.StateEntries, entry)
|
||||
}
|
||||
if err = rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if i != len(stateBlockNIDs) {
|
||||
return nil, fmt.Errorf("storage: state data NIDs missing from the database (%d != %d)", i, len(stateBlockNIDs))
|
||||
}
|
||||
return results, nil
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (s *stateBlockStatements) bulkSelectFilteredStateBlockEntries(
|
||||
|
|
@ -237,7 +240,7 @@ func (s *stateBlockStatements) bulkSelectFilteredStateBlockEntries(
|
|||
if current.StateEntries != nil {
|
||||
results = append(results, current)
|
||||
}
|
||||
return results, nil
|
||||
return results, rows.Err()
|
||||
}
|
||||
|
||||
func stateBlockNIDsAsArray(stateBlockNIDs []types.StateBlockNID) pq.Int64Array {
|
||||
|
|
|
|||
|
|
@ -104,7 +104,7 @@ func (s *stateSnapshotStatements) bulkSelectStateBlockNIDs(
|
|||
for ; rows.Next(); i++ {
|
||||
result := &results[i]
|
||||
var stateBlockNIDs pq.Int64Array
|
||||
if err := rows.Scan(&result.StateSnapshotNID, &stateBlockNIDs); err != nil {
|
||||
if err = rows.Scan(&result.StateSnapshotNID, &stateBlockNIDs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.StateBlockNIDs = make([]types.StateBlockNID, len(stateBlockNIDs))
|
||||
|
|
@ -112,6 +112,9 @@ func (s *stateSnapshotStatements) bulkSelectStateBlockNIDs(
|
|||
result.StateBlockNIDs[k] = types.StateBlockNID(stateBlockNIDs[k])
|
||||
}
|
||||
}
|
||||
if err = rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if i != len(stateNIDs) {
|
||||
return nil, fmt.Errorf("storage: state NIDs missing from the database (%d != %d)", i, len(stateNIDs))
|
||||
}
|
||||
|
|
|
|||
123
syncapi/api/query.go
Normal file
123
syncapi/api/query.go
Normal file
|
|
@ -0,0 +1,123 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
commonHTTP "github.com/matrix-org/dendrite/common/http"
|
||||
"github.com/matrix-org/util"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
)
|
||||
|
||||
const (
|
||||
SyncAPIQuerySyncPath = "/api/syncapi/querySync"
|
||||
SyncAPIQueryStatePath = "/api/syncapi/queryState"
|
||||
SyncAPIQueryStateTypePath = "/api/syncapi/queryStateType"
|
||||
SyncAPIQueryMessagesPath = "/api/syncapi/queryMessages"
|
||||
)
|
||||
|
||||
func NewSyncQueryAPIHTTP(syncapiURL string, httpClient *http.Client) SyncQueryAPI {
|
||||
if httpClient == nil {
|
||||
httpClient = http.DefaultClient
|
||||
}
|
||||
return &httpSyncQueryAPI{syncapiURL, httpClient}
|
||||
}
|
||||
|
||||
type httpSyncQueryAPI struct {
|
||||
syncapiURL string
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
type SyncQueryAPI interface {
|
||||
QuerySync(ctx context.Context, request *QuerySyncRequest, response *QuerySyncResponse) error
|
||||
QueryState(ctx context.Context, request *QueryStateRequest, response *QueryStateResponse) error
|
||||
QueryStateType(ctx context.Context, request *QueryStateTypeRequest, response *QueryStateTypeResponse) error
|
||||
QueryMessages(ctx context.Context, request *QueryMessagesRequest, response *QueryMessagesResponse) error
|
||||
}
|
||||
|
||||
type QuerySyncRequest struct{}
|
||||
|
||||
type QueryStateRequest struct {
|
||||
RoomID string
|
||||
}
|
||||
|
||||
type QueryStateTypeRequest struct {
|
||||
RoomID string
|
||||
EventType string
|
||||
StateKey string
|
||||
}
|
||||
|
||||
type QueryMessagesRequest struct {
|
||||
RoomID string
|
||||
}
|
||||
|
||||
type QuerySyncResponse util.JSONResponse
|
||||
type QueryStateResponse util.JSONResponse
|
||||
type QueryStateTypeResponse util.JSONResponse
|
||||
type QueryMessagesResponse util.JSONResponse
|
||||
|
||||
// QueryLatestEventsAndState implements SyncQueryAPI
|
||||
func (h *httpSyncQueryAPI) QuerySync(
|
||||
ctx context.Context,
|
||||
request *QuerySyncRequest,
|
||||
response *QuerySyncResponse,
|
||||
) error {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "QuerySync")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.syncapiURL + SyncAPIQuerySyncPath
|
||||
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||
}
|
||||
|
||||
// QueryStateAfterEvents implements SyncQueryAPI
|
||||
func (h *httpSyncQueryAPI) QueryState(
|
||||
ctx context.Context,
|
||||
request *QueryStateRequest,
|
||||
response *QueryStateResponse,
|
||||
) error {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryState")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.syncapiURL + SyncAPIQueryStatePath
|
||||
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||
}
|
||||
|
||||
// QueryEventsByID implements SyncQueryAPI
|
||||
func (h *httpSyncQueryAPI) QueryStateType(
|
||||
ctx context.Context,
|
||||
request *QueryStateTypeRequest,
|
||||
response *QueryStateTypeResponse,
|
||||
) error {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryStateType")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.syncapiURL + SyncAPIQueryStateTypePath
|
||||
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||
}
|
||||
|
||||
// QueryMembershipForUser implements SyncQueryAPI
|
||||
func (h *httpSyncQueryAPI) QueryMessages(
|
||||
ctx context.Context,
|
||||
request *QueryMessagesRequest,
|
||||
response *QueryMessagesResponse,
|
||||
) error {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryMessages")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.syncapiURL + SyncAPIQueryMessagesPath
|
||||
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||
}
|
||||
|
|
@ -22,7 +22,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrix"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
|
@ -45,10 +44,10 @@ func OnIncomingStateRequest(req *http.Request, db storage.Database, roomID strin
|
|||
// TODO(#287): Auth request and handle the case where the user has left (where
|
||||
// we should return the state at the poin they left)
|
||||
|
||||
stateFilterPart := gomatrix.DefaultFilterPart()
|
||||
// TODO: stateFilterPart should not limit the number of state events (or only limits abusive number of events)
|
||||
stateFilter := gomatrixserverlib.DefaultStateFilter()
|
||||
// TODO: stateFilter should not limit the number of state events (or only limits abusive number of events)
|
||||
|
||||
stateEvents, err := db.GetStateEventsForRoom(req.Context(), roomID, &stateFilterPart)
|
||||
stateEvents, err := db.GetStateEventsForRoom(req.Context(), roomID, &stateFilter)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ import (
|
|||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrix"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const accountDataSchema = `
|
||||
|
|
@ -99,7 +99,7 @@ func (s *accountDataStatements) selectAccountDataInRange(
|
|||
ctx context.Context,
|
||||
userID string,
|
||||
oldPos, newPos types.StreamPosition,
|
||||
accountDataFilterPart *gomatrix.FilterPart,
|
||||
accountDataEventFilter *gomatrixserverlib.EventFilter,
|
||||
) (data map[string][]string, err error) {
|
||||
data = make(map[string][]string)
|
||||
|
||||
|
|
@ -111,13 +111,14 @@ func (s *accountDataStatements) selectAccountDataInRange(
|
|||
}
|
||||
|
||||
rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos,
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(accountDataFilterPart.Types)),
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(accountDataFilterPart.NotTypes)),
|
||||
accountDataFilterPart.Limit,
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(accountDataEventFilter.Types)),
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(accountDataEventFilter.NotTypes)),
|
||||
accountDataEventFilter.Limit,
|
||||
)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
|
||||
for rows.Next() {
|
||||
var dataType string
|
||||
|
|
@ -133,8 +134,7 @@ func (s *accountDataStatements) selectAccountDataInRange(
|
|||
data[roomID] = []string{dataType}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
return data, rows.Err()
|
||||
}
|
||||
|
||||
func (s *accountDataStatements) selectMaxAccountDataID(
|
||||
|
|
|
|||
|
|
@ -91,6 +91,7 @@ func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
|
||||
for rows.Next() {
|
||||
var eID string
|
||||
|
|
@ -101,7 +102,7 @@ func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
|
|||
eventIDs = append(eventIDs, eID)
|
||||
}
|
||||
|
||||
return
|
||||
return eventIDs, rows.Err()
|
||||
}
|
||||
|
||||
func (s *backwardExtremitiesStatements) isBackwardExtremity(
|
||||
|
|
|
|||
|
|
@ -23,7 +23,6 @@ import (
|
|||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrix"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
|
|
@ -154,7 +153,7 @@ func (s *currentRoomStateStatements) selectJoinedUsers(
|
|||
users = append(users, userID)
|
||||
result[roomID] = users
|
||||
}
|
||||
return result, nil
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
||||
|
|
@ -179,22 +178,22 @@ func (s *currentRoomStateStatements) selectRoomIDsWithMembership(
|
|||
}
|
||||
result = append(result, roomID)
|
||||
}
|
||||
return result, nil
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
// CurrentState returns all the current state events for the given room.
|
||||
func (s *currentRoomStateStatements) selectCurrentState(
|
||||
ctx context.Context, txn *sql.Tx, roomID string,
|
||||
stateFilterPart *gomatrix.FilterPart,
|
||||
stateFilter *gomatrixserverlib.StateFilter,
|
||||
) ([]gomatrixserverlib.Event, error) {
|
||||
stmt := common.TxStmt(txn, s.selectCurrentStateStmt)
|
||||
rows, err := stmt.QueryContext(ctx, roomID,
|
||||
pq.StringArray(stateFilterPart.Senders),
|
||||
pq.StringArray(stateFilterPart.NotSenders),
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)),
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)),
|
||||
stateFilterPart.ContainsURL,
|
||||
stateFilterPart.Limit,
|
||||
pq.StringArray(stateFilter.Senders),
|
||||
pq.StringArray(stateFilter.NotSenders),
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)),
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.NotTypes)),
|
||||
stateFilter.ContainsURL,
|
||||
stateFilter.Limit,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -267,7 +266,7 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
|
|||
}
|
||||
result = append(result, ev)
|
||||
}
|
||||
return result, nil
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) selectStateEvent(
|
||||
|
|
|
|||
|
|
@ -133,7 +133,7 @@ func (s *inviteEventsStatements) selectInviteEventsInRange(
|
|||
|
||||
result[roomID] = event
|
||||
}
|
||||
return result, nil
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
func (s *inviteEventsStatements) selectMaxInviteID(
|
||||
|
|
|
|||
|
|
@ -23,7 +23,6 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrix"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
|
|
@ -154,22 +153,23 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
|||
// two positions, only the most recent state is returned.
|
||||
func (s *outputRoomEventsStatements) selectStateInRange(
|
||||
ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition,
|
||||
stateFilterPart *gomatrix.FilterPart,
|
||||
stateFilter *gomatrixserverlib.StateFilter,
|
||||
) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
|
||||
stmt := common.TxStmt(txn, s.selectStateInRangeStmt)
|
||||
|
||||
rows, err := stmt.QueryContext(
|
||||
ctx, oldPos, newPos,
|
||||
pq.StringArray(stateFilterPart.Senders),
|
||||
pq.StringArray(stateFilterPart.NotSenders),
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)),
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)),
|
||||
stateFilterPart.ContainsURL,
|
||||
stateFilterPart.Limit,
|
||||
pq.StringArray(stateFilter.Senders),
|
||||
pq.StringArray(stateFilter.NotSenders),
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)),
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.NotTypes)),
|
||||
stateFilter.ContainsURL,
|
||||
stateFilter.Limit,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
// Fetch all the state change events for all rooms between the two positions then loop each event and:
|
||||
// - Keep a cache of the event by ID (99% of state change events are for the event itself)
|
||||
// - For each room ID, build up an array of event IDs which represents cumulative adds/removes
|
||||
|
|
@ -226,7 +226,7 @@ func (s *outputRoomEventsStatements) selectStateInRange(
|
|||
}
|
||||
}
|
||||
|
||||
return stateNeeded, eventIDToEvent, nil
|
||||
return stateNeeded, eventIDToEvent, rows.Err()
|
||||
}
|
||||
|
||||
// MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied,
|
||||
|
|
@ -392,5 +392,5 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
|||
ExcludeFromSync: excludeFromSync,
|
||||
})
|
||||
}
|
||||
return result, nil
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -134,6 +134,7 @@ func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange(
|
|||
} else if err != nil {
|
||||
return
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
|
||||
// Return the IDs.
|
||||
var eventID string
|
||||
|
|
@ -144,7 +145,7 @@ func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange(
|
|||
eventIDs = append(eventIDs, eventID)
|
||||
}
|
||||
|
||||
return
|
||||
return eventIDs, rows.Err()
|
||||
}
|
||||
|
||||
// selectPositionInTopology returns the position of a given event in the
|
||||
|
|
@ -176,6 +177,7 @@ func (s *outputRoomEventsTopologyStatements) selectEventIDsFromPosition(
|
|||
} else if err != nil {
|
||||
return
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
// Return the IDs.
|
||||
var eventID string
|
||||
for rows.Next() {
|
||||
|
|
@ -184,5 +186,5 @@ func (s *outputRoomEventsTopologyStatements) selectEventIDsFromPosition(
|
|||
}
|
||||
eventIDs = append(eventIDs, eventID)
|
||||
}
|
||||
return
|
||||
return eventIDs, rows.Err()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,7 +26,6 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrix"
|
||||
|
||||
// Import the postgres database driver.
|
||||
_ "github.com/lib/pq"
|
||||
|
|
@ -237,10 +236,10 @@ func (d *SyncServerDatasource) GetStateEvent(
|
|||
// Returns an empty slice if no state events could be found for this room.
|
||||
// Returns an error if there was an issue with the retrieval.
|
||||
func (d *SyncServerDatasource) GetStateEventsForRoom(
|
||||
ctx context.Context, roomID string, stateFilterPart *gomatrix.FilterPart,
|
||||
ctx context.Context, roomID string, stateFilter *gomatrixserverlib.StateFilter,
|
||||
) (stateEvents []gomatrixserverlib.Event, err error) {
|
||||
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilterPart)
|
||||
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilter)
|
||||
return err
|
||||
})
|
||||
return
|
||||
|
|
@ -422,7 +421,7 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
|
|||
var succeeded bool
|
||||
defer common.EndTransaction(txn, &succeeded)
|
||||
|
||||
stateFilterPart := gomatrix.DefaultFilterPart() // TODO: use filter provided in request
|
||||
stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
|
||||
|
||||
// Work out which rooms to return in the response. This is done by getting not only the currently
|
||||
// joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions.
|
||||
|
|
@ -432,11 +431,11 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
|
|||
var joinedRoomIDs []string
|
||||
if !wantFullState {
|
||||
deltas, joinedRoomIDs, err = d.getStateDeltas(
|
||||
ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilterPart,
|
||||
ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter,
|
||||
)
|
||||
} else {
|
||||
deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(
|
||||
ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilterPart,
|
||||
ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter,
|
||||
)
|
||||
}
|
||||
if err != nil {
|
||||
|
|
@ -587,12 +586,12 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
|
|||
return
|
||||
}
|
||||
|
||||
stateFilterPart := gomatrix.DefaultFilterPart() // TODO: use filter provided in request
|
||||
stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
|
||||
|
||||
// Build up a /sync response. Add joined rooms.
|
||||
for _, roomID := range joinedRoomIDs {
|
||||
var stateEvents []gomatrixserverlib.Event
|
||||
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilterPart)
|
||||
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilter)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
|
@ -681,7 +680,7 @@ var txReadOnlySnapshot = sql.TxOptions{
|
|||
// If there was an issue with the retrieval, returns an error
|
||||
func (d *SyncServerDatasource) GetAccountDataInRange(
|
||||
ctx context.Context, userID string, oldPos, newPos types.StreamPosition,
|
||||
accountDataFilterPart *gomatrix.FilterPart,
|
||||
accountDataFilterPart *gomatrixserverlib.EventFilter,
|
||||
) (map[string][]string, error) {
|
||||
return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart)
|
||||
}
|
||||
|
|
@ -931,7 +930,7 @@ func (d *SyncServerDatasource) fetchMissingStateEvents(
|
|||
func (d *SyncServerDatasource) getStateDeltas(
|
||||
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
|
||||
fromPos, toPos types.StreamPosition, userID string,
|
||||
stateFilterPart *gomatrix.FilterPart,
|
||||
stateFilter *gomatrixserverlib.StateFilter,
|
||||
) ([]stateDelta, []string, error) {
|
||||
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
||||
// - Get membership list changes for this user in this sync response
|
||||
|
|
@ -944,7 +943,7 @@ func (d *SyncServerDatasource) getStateDeltas(
|
|||
var deltas []stateDelta
|
||||
|
||||
// get all the state events ever between these two positions
|
||||
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart)
|
||||
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilter)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
@ -964,7 +963,7 @@ func (d *SyncServerDatasource) getStateDeltas(
|
|||
if membership == gomatrixserverlib.Join {
|
||||
// send full room state down instead of a delta
|
||||
var s []types.StreamEvent
|
||||
s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilterPart)
|
||||
s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilter)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
@ -1006,7 +1005,7 @@ func (d *SyncServerDatasource) getStateDeltas(
|
|||
func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
|
||||
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
|
||||
fromPos, toPos types.StreamPosition, userID string,
|
||||
stateFilterPart *gomatrix.FilterPart,
|
||||
stateFilter *gomatrixserverlib.StateFilter,
|
||||
) ([]stateDelta, []string, error) {
|
||||
joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
|
||||
if err != nil {
|
||||
|
|
@ -1018,7 +1017,7 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
|
|||
|
||||
// Add full states for all joined rooms
|
||||
for _, joinedRoomID := range joinedRoomIDs {
|
||||
s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, stateFilterPart)
|
||||
s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, stateFilter)
|
||||
if stateErr != nil {
|
||||
return nil, nil, stateErr
|
||||
}
|
||||
|
|
@ -1030,7 +1029,7 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
|
|||
}
|
||||
|
||||
// Get all the state events ever between these two positions
|
||||
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart)
|
||||
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilter)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
@ -1061,9 +1060,9 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
|
|||
|
||||
func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
|
||||
ctx context.Context, txn *sql.Tx, roomID string,
|
||||
stateFilterPart *gomatrix.FilterPart,
|
||||
stateFilter *gomatrixserverlib.StateFilter,
|
||||
) ([]types.StreamEvent, error) {
|
||||
allState, err := d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilterPart)
|
||||
allState, err := d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,7 +25,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/syncapi/storage/postgres"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/dendrite/typingserver/cache"
|
||||
"github.com/matrix-org/gomatrix"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
|
|
@ -35,11 +34,11 @@ type Database interface {
|
|||
Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error)
|
||||
WriteEvent(context.Context, *gomatrixserverlib.Event, []gomatrixserverlib.Event, []string, []string, *api.TransactionID, bool) (types.StreamPosition, error)
|
||||
GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.Event, error)
|
||||
GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrix.FilterPart) (stateEvents []gomatrixserverlib.Event, err error)
|
||||
GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) (stateEvents []gomatrixserverlib.Event, err error)
|
||||
SyncPosition(ctx context.Context) (types.PaginationToken, error)
|
||||
IncrementalSync(ctx context.Context, device authtypes.Device, fromPos, toPos types.PaginationToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error)
|
||||
CompleteSync(ctx context.Context, userID string, numRecentEventsPerRoom int) (*types.Response, error)
|
||||
GetAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrix.FilterPart) (map[string][]string, error)
|
||||
GetAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error)
|
||||
UpsertAccountData(ctx context.Context, userID, roomID, dataType string) (types.StreamPosition, error)
|
||||
AddInviteEvent(ctx context.Context, inviteEvent gomatrixserverlib.Event) (types.StreamPosition, error)
|
||||
RetireInviteEvent(ctx context.Context, inviteEventID string) error
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrix"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
|
@ -142,14 +141,14 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Pagin
|
|||
return
|
||||
}
|
||||
|
||||
accountDataFilter := gomatrix.DefaultFilterPart() // TODO: use filter provided in req instead
|
||||
res, err = rp.appendAccountData(res, req.device.UserID, req, int64(latestPos.PDUPosition), &accountDataFilter)
|
||||
accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
|
||||
res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition, &accountDataFilter)
|
||||
return
|
||||
}
|
||||
|
||||
func (rp *RequestPool) appendAccountData(
|
||||
data *types.Response, userID string, req syncRequest, currentPos int64,
|
||||
accountDataFilter *gomatrix.FilterPart,
|
||||
data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
|
||||
accountDataFilter *gomatrixserverlib.EventFilter,
|
||||
) (*types.Response, error) {
|
||||
// TODO: Account data doesn't have a sync position of its own, meaning that
|
||||
// account data might be sent multiple time to the client if multiple account
|
||||
|
|
|
|||
|
|
@ -22,3 +22,9 @@ Real non-joined users can get individual state for world_readable rooms after le
|
|||
|
||||
# Blacklisted until matrix-org/dendrite#862 is reverted due to Riot bug
|
||||
Latest account data appears in v2 /sync
|
||||
|
||||
# Blacklisted due to flakiness
|
||||
Outbound federation can backfill events
|
||||
|
||||
# Blacklisted due to alias work on Synapse
|
||||
Alias creators can delete canonical alias with no ops
|
||||
|
|
|
|||
|
|
@ -81,6 +81,7 @@ Can't forget room you're still in
|
|||
Can get rooms/{roomId}/members
|
||||
Can create filter
|
||||
Can download filter
|
||||
Lazy loading parameters in the filter are strictly boolean
|
||||
Can sync
|
||||
Can sync a joined room
|
||||
Newly joined room is included in an incremental sync
|
||||
|
|
|
|||
Loading…
Reference in a new issue