NOTSPEC: Make ?from= optional in /messages (#1647)
This commit is contained in:
parent
56b5847c74
commit
42e9cbf342
|
@ -25,6 +25,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/sync"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
@ -65,6 +66,7 @@ func OnIncomingMessagesRequest(
|
||||||
federation *gomatrixserverlib.FederationClient,
|
federation *gomatrixserverlib.FederationClient,
|
||||||
rsAPI api.RoomserverInternalAPI,
|
rsAPI api.RoomserverInternalAPI,
|
||||||
cfg *config.SyncAPI,
|
cfg *config.SyncAPI,
|
||||||
|
srp *sync.RequestPool,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
@ -84,9 +86,17 @@ func OnIncomingMessagesRequest(
|
||||||
// Extract parameters from the request's URL.
|
// Extract parameters from the request's URL.
|
||||||
// Pagination tokens.
|
// Pagination tokens.
|
||||||
var fromStream *types.StreamingToken
|
var fromStream *types.StreamingToken
|
||||||
from, err := types.NewTopologyTokenFromString(req.URL.Query().Get("from"))
|
fromQuery := req.URL.Query().Get("from")
|
||||||
|
if fromQuery == "" {
|
||||||
|
// NOTSPEC: We will pretend they used the latest sync token if no ?from= was provided.
|
||||||
|
// We do this to allow clients to get messages without having to call `/sync` e.g Cerulean
|
||||||
|
currPos := srp.Notifier.CurrentPosition()
|
||||||
|
fromQuery = currPos.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
from, err := types.NewTopologyTokenFromString(fromQuery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fs, err2 := types.NewStreamTokenFromString(req.URL.Query().Get("from"))
|
fs, err2 := types.NewStreamTokenFromString(fromQuery)
|
||||||
fromStream = &fs
|
fromStream = &fs
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
|
|
|
@ -51,7 +51,7 @@ func Setup(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], device, federation, rsAPI, cfg)
|
return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], device, federation, rsAPI, cfg, srp)
|
||||||
})).Methods(http.MethodGet, http.MethodOptions)
|
})).Methods(http.MethodGet, http.MethodOptions)
|
||||||
|
|
||||||
r0mux.Handle("/user/{userId}/filter",
|
r0mux.Handle("/user/{userId}/filter",
|
||||||
|
|
|
@ -44,7 +44,7 @@ type RequestPool struct {
|
||||||
db storage.Database
|
db storage.Database
|
||||||
cfg *config.SyncAPI
|
cfg *config.SyncAPI
|
||||||
userAPI userapi.UserInternalAPI
|
userAPI userapi.UserInternalAPI
|
||||||
notifier *Notifier
|
Notifier *Notifier
|
||||||
keyAPI keyapi.KeyInternalAPI
|
keyAPI keyapi.KeyInternalAPI
|
||||||
rsAPI roomserverAPI.RoomserverInternalAPI
|
rsAPI roomserverAPI.RoomserverInternalAPI
|
||||||
lastseen sync.Map
|
lastseen sync.Map
|
||||||
|
@ -152,7 +152,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
|
|
||||||
rp.updateLastSeen(req, device)
|
rp.updateLastSeen(req, device)
|
||||||
|
|
||||||
currPos := rp.notifier.CurrentPosition()
|
currPos := rp.Notifier.CurrentPosition()
|
||||||
|
|
||||||
if rp.shouldReturnImmediately(syncReq) {
|
if rp.shouldReturnImmediately(syncReq) {
|
||||||
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
|
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
|
||||||
|
@ -176,7 +176,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above
|
timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above
|
||||||
defer timer.Stop()
|
defer timer.Stop()
|
||||||
|
|
||||||
userStreamListener := rp.notifier.GetListener(*syncReq)
|
userStreamListener := rp.Notifier.GetListener(*syncReq)
|
||||||
defer userStreamListener.Close()
|
defer userStreamListener.Close()
|
||||||
|
|
||||||
// We need the loop in case userStreamListener wakes up even if there isn't
|
// We need the loop in case userStreamListener wakes up even if there isn't
|
||||||
|
|
Loading…
Reference in a new issue