mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-14 10:23:46 -06:00
Merge branch 'master' of https://github.com/matrix-org/dendrite into recaptcha
This commit is contained in:
commit
6c58fd4f8d
|
|
@ -52,6 +52,29 @@ github. These can be added just before merging of the PR to master, and the
|
||||||
issue number should be added to the comment, e.g. `// TODO(#324): ...`
|
issue number should be added to the comment, e.g. `// TODO(#324): ...`
|
||||||
|
|
||||||
|
|
||||||
|
## Logging
|
||||||
|
|
||||||
|
We generally prefer to log with static log messages and include any dynamic
|
||||||
|
information in fields.
|
||||||
|
|
||||||
|
```golang
|
||||||
|
logger := util.GetLogger(ctx)
|
||||||
|
|
||||||
|
// Not recommended
|
||||||
|
logger.Infof("Finished processing keys for %s, number of keys %d", name, numKeys)
|
||||||
|
|
||||||
|
// Recommended
|
||||||
|
logger.WithFields(logrus.Fields{
|
||||||
|
"numberOfKeys": numKeys,
|
||||||
|
"entityName": name,
|
||||||
|
}).Info("Finished processing keys")
|
||||||
|
```
|
||||||
|
|
||||||
|
This is useful when logging to systems that natively understand log fields, as
|
||||||
|
it allows people to search and process the fields without having to parse the
|
||||||
|
log message.
|
||||||
|
|
||||||
|
|
||||||
## Visual Studio Code
|
## Visual Studio Code
|
||||||
|
|
||||||
If you use VSCode then the following is an example of a workspace setting that
|
If you use VSCode then the following is an example of a workspace setting that
|
||||||
|
|
|
||||||
|
|
@ -64,7 +64,7 @@ Dendrite requires a postgres database engine, version 9.5 or later.
|
||||||
```
|
```
|
||||||
* Create databases:
|
* Create databases:
|
||||||
```bash
|
```bash
|
||||||
for i in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi; do
|
for i in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi naffka; do
|
||||||
sudo -u postgres createdb -O dendrite dendrite_$i
|
sudo -u postgres createdb -O dendrite dendrite_$i
|
||||||
done
|
done
|
||||||
```
|
```
|
||||||
|
|
|
||||||
|
|
@ -80,6 +80,7 @@ kafka:
|
||||||
# Naffka can only be used when running dendrite as a single monolithic server.
|
# Naffka can only be used when running dendrite as a single monolithic server.
|
||||||
# Kafka can be used both with a monolithic server and when running the
|
# Kafka can be used both with a monolithic server and when running the
|
||||||
# components as separate servers.
|
# components as separate servers.
|
||||||
|
# If enabled database.naffka must also be specified.
|
||||||
use_naffka: false
|
use_naffka: false
|
||||||
# The names of the kafka topics to use.
|
# The names of the kafka topics to use.
|
||||||
topics:
|
topics:
|
||||||
|
|
@ -97,6 +98,8 @@ database:
|
||||||
server_key: "postgres://dendrite:itsasecret@localhost/dendrite_serverkey?sslmode=disable"
|
server_key: "postgres://dendrite:itsasecret@localhost/dendrite_serverkey?sslmode=disable"
|
||||||
federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable"
|
federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable"
|
||||||
public_rooms_api: "postgres://dendrite:itsasecret@localhost/dendrite_publicroomsapi?sslmode=disable"
|
public_rooms_api: "postgres://dendrite:itsasecret@localhost/dendrite_publicroomsapi?sslmode=disable"
|
||||||
|
# If using naffka you need to specify a naffka database
|
||||||
|
# naffka: "postgres://dendrite:itsasecret@localhost/dendrite_naffka?sslmode=disable"
|
||||||
|
|
||||||
# The TCP host:port pairs to bind the internal HTTP APIs to.
|
# The TCP host:port pairs to bind the internal HTTP APIs to.
|
||||||
# These shouldn't be exposed to the public internet.
|
# These shouldn't be exposed to the public internet.
|
||||||
|
|
|
||||||
|
|
@ -5,9 +5,9 @@
|
||||||
set -eu
|
set -eu
|
||||||
|
|
||||||
# The mirror to download kafka from is picked from the list of mirrors at
|
# The mirror to download kafka from is picked from the list of mirrors at
|
||||||
# https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
|
# https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.11-0.11.0.2.tgz
|
||||||
# TODO: Check the signature since we are downloading over HTTP.
|
# TODO: Check the signature since we are downloading over HTTP.
|
||||||
MIRROR=http://apache.mirror.anlx.net/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
|
MIRROR=http://apache.mirror.anlx.net/kafka/0.11.0.2/kafka_2.11-0.11.0.2.tgz
|
||||||
|
|
||||||
# Only download the kafka if it isn't already downloaded.
|
# Only download the kafka if it isn't already downloaded.
|
||||||
test -f kafka.tgz || wget $MIRROR -O kafka.tgz
|
test -f kafka.tgz || wget $MIRROR -O kafka.tgz
|
||||||
|
|
@ -18,7 +18,7 @@ mkdir -p kafka && tar xzf kafka.tgz -C kafka --strip-components 1
|
||||||
kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties
|
kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties
|
||||||
# Enable topic deletion so that the integration tests can create a fresh topic
|
# Enable topic deletion so that the integration tests can create a fresh topic
|
||||||
# for each test run.
|
# for each test run.
|
||||||
echo "delete.topic.enable=true" >> kafka/config/server.properties
|
echo -e "\n\ndelete.topic.enable=true" >> kafka/config/server.properties
|
||||||
# Start the kafka server running in the background.
|
# Start the kafka server running in the background.
|
||||||
# By default the kafka listens on localhost:9092
|
# By default the kafka listens on localhost:9092
|
||||||
kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties
|
kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties
|
||||||
|
|
|
||||||
|
|
@ -92,11 +92,11 @@ func (s *filterStatements) insertFilter(
|
||||||
// Check if filter already exists in the database
|
// Check if filter already exists in the database
|
||||||
err = s.selectFilterIDByContentStmt.QueryRowContext(ctx,
|
err = s.selectFilterIDByContentStmt.QueryRowContext(ctx,
|
||||||
localpart, filterJSON).Scan(&existingFilterID)
|
localpart, filterJSON).Scan(&existingFilterID)
|
||||||
if err != nil {
|
if err != nil && err != sql.ErrNoRows {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
// If it does, return the existing ID
|
// If it does, return the existing ID
|
||||||
if len(existingFilterID) != 0 {
|
if existingFilterID != "" {
|
||||||
return existingFilterID, err
|
return existingFilterID, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -26,16 +26,10 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/common/config"
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
|
"github.com/matrix-org/gomatrix"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type turnServerResponse struct {
|
|
||||||
Username string `json:"username"`
|
|
||||||
Password string `json:"password"`
|
|
||||||
URIs []string `json:"uris"`
|
|
||||||
TTL int `json:"ttl"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequestTurnServer implements:
|
// RequestTurnServer implements:
|
||||||
// GET /voip/turnServer
|
// GET /voip/turnServer
|
||||||
func RequestTurnServer(req *http.Request, device *authtypes.Device, cfg config.Dendrite) util.JSONResponse {
|
func RequestTurnServer(req *http.Request, device *authtypes.Device, cfg config.Dendrite) util.JSONResponse {
|
||||||
|
|
@ -52,7 +46,7 @@ func RequestTurnServer(req *http.Request, device *authtypes.Device, cfg config.D
|
||||||
// Duration checked at startup, err not possible
|
// Duration checked at startup, err not possible
|
||||||
duration, _ := time.ParseDuration(turnConfig.UserLifetime)
|
duration, _ := time.ParseDuration(turnConfig.UserLifetime)
|
||||||
|
|
||||||
resp := turnServerResponse{
|
resp := gomatrix.RespTurnServer{
|
||||||
URIs: turnConfig.URIs,
|
URIs: turnConfig.URIs,
|
||||||
TTL: int(duration.Seconds()),
|
TTL: int(duration.Seconds()),
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -80,6 +80,7 @@ func main() {
|
||||||
|
|
||||||
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
|
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
|
||||||
inputAPI := api.NewRoomserverInputAPIHTTP(cfg.RoomServerURL(), nil)
|
inputAPI := api.NewRoomserverInputAPIHTTP(cfg.RoomServerURL(), nil)
|
||||||
|
aliasAPI := api.NewRoomserverAliasAPIHTTP(cfg.RoomServerURL(), nil)
|
||||||
|
|
||||||
roomserverProducer := producers.NewRoomserverProducer(inputAPI)
|
roomserverProducer := producers.NewRoomserverProducer(inputAPI)
|
||||||
|
|
||||||
|
|
@ -90,7 +91,7 @@ func main() {
|
||||||
log.Info("Starting federation API server on ", cfg.Listen.FederationAPI)
|
log.Info("Starting federation API server on ", cfg.Listen.FederationAPI)
|
||||||
|
|
||||||
api := mux.NewRouter()
|
api := mux.NewRouter()
|
||||||
routing.Setup(api, *cfg, queryAPI, roomserverProducer, keyRing, federation, accountDB)
|
routing.Setup(api, *cfg, queryAPI, aliasAPI, roomserverProducer, keyRing, federation, accountDB)
|
||||||
common.SetupHTTPAPI(http.DefaultServeMux, api)
|
common.SetupHTTPAPI(http.DefaultServeMux, api)
|
||||||
|
|
||||||
log.Fatal(http.ListenAndServe(string(cfg.Listen.FederationAPI), nil))
|
log.Fatal(http.ListenAndServe(string(cfg.Listen.FederationAPI), nil))
|
||||||
|
|
|
||||||
|
|
@ -348,7 +348,7 @@ func (m *monolith) setupAPIs() {
|
||||||
), m.syncAPIDB, m.deviceDB)
|
), m.syncAPIDB, m.deviceDB)
|
||||||
|
|
||||||
federationapi_routing.Setup(
|
federationapi_routing.Setup(
|
||||||
m.api, *m.cfg, m.queryAPI, m.roomServerProducer, m.keyRing, m.federation,
|
m.api, *m.cfg, m.queryAPI, m.aliasAPI, m.roomServerProducer, m.keyRing, m.federation,
|
||||||
m.accountDB,
|
m.accountDB,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,96 @@
|
||||||
|
// Copyright 2017 New Vector Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package routing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/gomatrix"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RoomAliasToID converts the queried alias into a room ID and returns it
|
||||||
|
func RoomAliasToID(
|
||||||
|
httpReq *http.Request,
|
||||||
|
federation *gomatrixserverlib.FederationClient,
|
||||||
|
cfg config.Dendrite,
|
||||||
|
aliasAPI api.RoomserverAliasAPI,
|
||||||
|
) util.JSONResponse {
|
||||||
|
roomAlias := httpReq.FormValue("alias")
|
||||||
|
if roomAlias == "" {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: 400,
|
||||||
|
JSON: jsonerror.BadJSON("Must supply room alias parameter."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_, domain, err := gomatrixserverlib.SplitID('#', roomAlias)
|
||||||
|
if err != nil {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: 400,
|
||||||
|
JSON: jsonerror.BadJSON("Room alias must be in the form '#localpart:domain'"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var resp gomatrixserverlib.RespDirectory
|
||||||
|
|
||||||
|
if domain == cfg.Matrix.ServerName {
|
||||||
|
queryReq := api.GetAliasRoomIDRequest{Alias: roomAlias}
|
||||||
|
var queryRes api.GetAliasRoomIDResponse
|
||||||
|
if err = aliasAPI.GetAliasRoomID(httpReq.Context(), &queryReq, &queryRes); err != nil {
|
||||||
|
return httputil.LogThenError(httpReq, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if queryRes.RoomID == "" {
|
||||||
|
// TODO: List servers that are aware of this room alias
|
||||||
|
resp = gomatrixserverlib.RespDirectory{
|
||||||
|
RoomID: queryRes.RoomID,
|
||||||
|
Servers: []gomatrixserverlib.ServerName{},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// If the response doesn't contain a non-empty string, return an error
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: 404,
|
||||||
|
JSON: jsonerror.NotFound(fmt.Sprintf("Room alias %s not found", roomAlias)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
resp, err = federation.LookupRoomAlias(httpReq.Context(), domain, roomAlias)
|
||||||
|
if err != nil {
|
||||||
|
switch x := err.(type) {
|
||||||
|
case gomatrix.HTTPError:
|
||||||
|
if x.Code == 404 {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: 404,
|
||||||
|
JSON: jsonerror.NotFound("Room alias not found"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// TODO: Return 502 if the remote server errored.
|
||||||
|
// TODO: Return 504 if the remote server timed out.
|
||||||
|
return httputil.LogThenError(httpReq, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: 200,
|
||||||
|
JSON: resp,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -38,6 +38,7 @@ func Setup(
|
||||||
apiMux *mux.Router,
|
apiMux *mux.Router,
|
||||||
cfg config.Dendrite,
|
cfg config.Dendrite,
|
||||||
query api.RoomserverQueryAPI,
|
query api.RoomserverQueryAPI,
|
||||||
|
aliasAPI api.RoomserverAliasAPI,
|
||||||
producer *producers.RoomserverProducer,
|
producer *producers.RoomserverProducer,
|
||||||
keys gomatrixserverlib.KeyRing,
|
keys gomatrixserverlib.KeyRing,
|
||||||
federation *gomatrixserverlib.FederationClient,
|
federation *gomatrixserverlib.FederationClient,
|
||||||
|
|
@ -105,6 +106,15 @@ func Setup(
|
||||||
},
|
},
|
||||||
)).Methods("GET")
|
)).Methods("GET")
|
||||||
|
|
||||||
|
v1fedmux.Handle("/query/directory/", common.MakeFedAPI(
|
||||||
|
"federation_query_room_alias", cfg.Matrix.ServerName, keys,
|
||||||
|
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
|
||||||
|
return RoomAliasToID(
|
||||||
|
httpReq, federation, cfg, aliasAPI,
|
||||||
|
)
|
||||||
|
},
|
||||||
|
)).Methods("GET")
|
||||||
|
|
||||||
v1fedmux.Handle("/query/profile", common.MakeFedAPI(
|
v1fedmux.Handle("/query/profile", common.MakeFedAPI(
|
||||||
"federation_query_profile", cfg.Matrix.ServerName, keys,
|
"federation_query_profile", cfg.Matrix.ServerName, keys,
|
||||||
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
|
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/common/config"
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
|
|
@ -188,7 +189,7 @@ func (r *uploadRequest) Validate(maxFileSizeBytes config.FileSizeBytes) *util.JS
|
||||||
JSON: jsonerror.Unknown("HTTP Content-Type request header must be set."),
|
JSON: jsonerror.Unknown("HTTP Content-Type request header must be set."),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if r.MediaMetadata.UploadName[0] == '~' {
|
if strings.HasPrefix(string(r.MediaMetadata.UploadName), "~") {
|
||||||
return &util.JSONResponse{
|
return &util.JSONResponse{
|
||||||
Code: 400,
|
Code: 400,
|
||||||
JSON: jsonerror.Unknown("File name must not begin with '~'."),
|
JSON: jsonerror.Unknown("File name must not begin with '~'."),
|
||||||
|
|
|
||||||
|
|
@ -264,7 +264,7 @@ func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
|
||||||
return types.StreamPosition(0), fmt.Errorf(
|
return types.StreamPosition(0), fmt.Errorf(
|
||||||
"waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since,
|
"waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since,
|
||||||
)
|
)
|
||||||
case <-listener.GetNotifyChannel(req.since):
|
case <-listener.GetNotifyChannel(*req.since):
|
||||||
p := listener.GetStreamPosition()
|
p := listener.GetStreamPosition()
|
||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
@ -282,7 +282,7 @@ func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest {
|
||||||
return syncRequest{
|
return syncRequest{
|
||||||
userID: userID,
|
userID: userID,
|
||||||
timeout: 1 * time.Minute,
|
timeout: 1 * time.Minute,
|
||||||
since: since,
|
since: &since,
|
||||||
wantFullState: false,
|
wantFullState: false,
|
||||||
limit: defaultTimelineLimit,
|
limit: defaultTimelineLimit,
|
||||||
log: util.GetLogger(context.TODO()),
|
log: util.GetLogger(context.TODO()),
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@ type syncRequest struct {
|
||||||
userID string
|
userID string
|
||||||
limit int
|
limit int
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
since types.StreamPosition
|
since *types.StreamPosition // nil means that no since token was supplied
|
||||||
wantFullState bool
|
wantFullState bool
|
||||||
log *log.Entry
|
log *log.Entry
|
||||||
}
|
}
|
||||||
|
|
@ -70,13 +70,16 @@ func getTimeout(timeoutMS string) time.Duration {
|
||||||
return time.Duration(i) * time.Millisecond
|
return time.Duration(i) * time.Millisecond
|
||||||
}
|
}
|
||||||
|
|
||||||
func getSyncStreamPosition(since string) (types.StreamPosition, error) {
|
// getSyncStreamPosition tries to parse a 'since' token taken from the API to a
|
||||||
|
// stream position. If the string is empty then (nil, nil) is returned.
|
||||||
|
func getSyncStreamPosition(since string) (*types.StreamPosition, error) {
|
||||||
if since == "" {
|
if since == "" {
|
||||||
return types.StreamPosition(0), nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
i, err := strconv.Atoi(since)
|
i, err := strconv.Atoi(since)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.StreamPosition(0), err
|
return nil, err
|
||||||
}
|
}
|
||||||
return types.StreamPosition(i), nil
|
token := types.StreamPosition(i)
|
||||||
|
return &token, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -64,7 +64,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
||||||
currPos := rp.notifier.CurrentPosition()
|
currPos := rp.notifier.CurrentPosition()
|
||||||
|
|
||||||
// If this is an initial sync or timeout=0 we return immediately
|
// If this is an initial sync or timeout=0 we return immediately
|
||||||
if syncReq.since == types.StreamPosition(0) || syncReq.timeout == 0 {
|
if syncReq.since == nil || syncReq.timeout == 0 {
|
||||||
syncData, err := rp.currentSyncForUser(*syncReq, currPos)
|
syncData, err := rp.currentSyncForUser(*syncReq, currPos)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return httputil.LogThenError(req, err)
|
return httputil.LogThenError(req, err)
|
||||||
|
|
@ -93,7 +93,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: 200,
|
Code: 200,
|
||||||
JSON: types.NewResponse(syncReq.since),
|
JSON: types.NewResponse(currPos),
|
||||||
}
|
}
|
||||||
// Or for the request to be cancelled
|
// Or for the request to be cancelled
|
||||||
case <-req.Context().Done():
|
case <-req.Context().Done():
|
||||||
|
|
@ -121,10 +121,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
||||||
|
|
||||||
func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) {
|
func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) {
|
||||||
// TODO: handle ignored users
|
// TODO: handle ignored users
|
||||||
if req.since == types.StreamPosition(0) {
|
if req.since == nil {
|
||||||
res, err = rp.db.CompleteSync(req.ctx, req.userID, req.limit)
|
res, err = rp.db.CompleteSync(req.ctx, req.userID, req.limit)
|
||||||
} else {
|
} else {
|
||||||
res, err = rp.db.IncrementalSync(req.ctx, req.userID, req.since, currentPos, req.limit)
|
res, err = rp.db.IncrementalSync(req.ctx, req.userID, *req.since, currentPos, req.limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -148,7 +148,7 @@ func (rp *RequestPool) appendAccountData(
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.since == types.StreamPosition(0) {
|
if req.since == nil {
|
||||||
// If this is the initial sync, we don't need to check if a data has
|
// If this is the initial sync, we don't need to check if a data has
|
||||||
// already been sent. Instead, we send the whole batch.
|
// already been sent. Instead, we send the whole batch.
|
||||||
var global []gomatrixserverlib.ClientEvent
|
var global []gomatrixserverlib.ClientEvent
|
||||||
|
|
@ -170,7 +170,7 @@ func (rp *RequestPool) appendAccountData(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync is not initial, get all account data since the latest sync
|
// Sync is not initial, get all account data since the latest sync
|
||||||
dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, req.since, currentPos)
|
dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, *req.since, currentPos)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue