mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-12 09:23:09 -06:00
Merge 11c11a4fb0 into 8861437c26
This commit is contained in:
commit
5ddcae353d
|
|
@ -20,6 +20,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/clientapi/consumers"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/clientapi/routing"
|
||||
"github.com/matrix-org/dendrite/clientapi/transactions"
|
||||
"github.com/matrix-org/dendrite/common/basecomponent"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
|
@ -37,6 +38,7 @@ func SetupClientAPIComponent(
|
|||
aliasAPI api.RoomserverAliasAPI,
|
||||
inputAPI api.RoomserverInputAPI,
|
||||
queryAPI api.RoomserverQueryAPI,
|
||||
transactionsCache *transactions.Cache,
|
||||
) {
|
||||
roomserverProducer := producers.NewRoomserverProducer(inputAPI)
|
||||
|
||||
|
|
@ -62,5 +64,6 @@ func SetupClientAPIComponent(
|
|||
queryAPI, aliasAPI, accountsDB, deviceDB,
|
||||
federation, *keyRing,
|
||||
userUpdateProducer, syncProducer,
|
||||
transactionsCache,
|
||||
)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/clientapi/transactions"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
|
|
@ -48,6 +49,7 @@ func Setup(
|
|||
keyRing gomatrixserverlib.KeyRing,
|
||||
userUpdateProducer *producers.UserUpdateProducer,
|
||||
syncProducer *producers.SyncAPIProducer,
|
||||
transactionsCache *transactions.Cache,
|
||||
) {
|
||||
|
||||
apiMux.Handle("/_matrix/client/versions",
|
||||
|
|
@ -92,14 +94,14 @@ func Setup(
|
|||
r0mux.Handle("/rooms/{roomID}/send/{eventType}",
|
||||
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
vars := mux.Vars(req)
|
||||
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, queryAPI, producer)
|
||||
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, queryAPI, producer, nil)
|
||||
}),
|
||||
).Methods(http.MethodPost, http.MethodOptions)
|
||||
r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}",
|
||||
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
vars := mux.Vars(req)
|
||||
txnID := vars["txnID"]
|
||||
return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID, nil, cfg, queryAPI, producer)
|
||||
return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID, nil, cfg, queryAPI, producer, transactionsCache)
|
||||
}),
|
||||
).Methods(http.MethodPut, http.MethodOptions)
|
||||
r0mux.Handle("/rooms/{roomID}/state/{eventType:[^/]+/?}",
|
||||
|
|
@ -111,14 +113,14 @@ func Setup(
|
|||
if strings.HasSuffix(eventType, "/") {
|
||||
eventType = eventType[:len(eventType)-1]
|
||||
}
|
||||
return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, queryAPI, producer)
|
||||
return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, queryAPI, producer, nil)
|
||||
}),
|
||||
).Methods(http.MethodPut, http.MethodOptions)
|
||||
r0mux.Handle("/rooms/{roomID}/state/{eventType}/{stateKey}",
|
||||
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
vars := mux.Vars(req)
|
||||
stateKey := vars["stateKey"]
|
||||
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, queryAPI, producer)
|
||||
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, queryAPI, producer, nil)
|
||||
}),
|
||||
).Methods(http.MethodPut, http.MethodOptions)
|
||||
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/clientapi/transactions"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
|
|
@ -45,7 +46,18 @@ func SendEvent(
|
|||
cfg config.Dendrite,
|
||||
queryAPI api.RoomserverQueryAPI,
|
||||
producer *producers.RoomserverProducer,
|
||||
transactionsCache *transactions.Cache,
|
||||
) util.JSONResponse {
|
||||
|
||||
if txnID != nil {
|
||||
// Try to fetch response from transactionsCache
|
||||
res, err := transactionsCache.FetchTransaction(*txnID)
|
||||
|
||||
if err != nil {
|
||||
return *res
|
||||
}
|
||||
}
|
||||
|
||||
// parse the incoming http request
|
||||
userID := device.UserID
|
||||
var r map[string]interface{} // must be a JSON object
|
||||
|
|
@ -105,8 +117,15 @@ func SendEvent(
|
|||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
res := util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: sendEventResponse{e.EventID()},
|
||||
}
|
||||
|
||||
// Add response to transactionsCache for later access
|
||||
if txnID != nil {
|
||||
transactionsCache.AddTransaction(*txnID, &res)
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,91 @@
|
|||
// Copyright 2018 New Vector Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package transactions
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
// CleanupPeriod represents time in nanoseconds after which cacheCleanService runs.
|
||||
const CleanupPeriod time.Duration = 30 * time.Minute
|
||||
|
||||
type response struct {
|
||||
cache *util.JSONResponse
|
||||
cacheTime time.Time
|
||||
}
|
||||
|
||||
// Cache represents a temporary store for responses.
|
||||
// This is used to ensure idempotency of requests.
|
||||
type Cache struct {
|
||||
sync.RWMutex
|
||||
txns map[string]response
|
||||
}
|
||||
|
||||
// CreateCache creates and returns a initialized Cache object.
|
||||
// This cache is automatically cleaned every `CleanupPeriod`.
|
||||
func CreateCache() *Cache {
|
||||
t := Cache{txns: make(map[string]response)}
|
||||
|
||||
// Start cleanup service as the Cache is created
|
||||
go cacheCleanService(&t)
|
||||
|
||||
return &t
|
||||
}
|
||||
|
||||
// FetchTransaction looks up response for txnID in Cache.
|
||||
// Returns a JSON response if txnID is found, which can be sent to client,
|
||||
// else returns error.
|
||||
func (t *Cache) FetchTransaction(txnID string) (*util.JSONResponse, error) {
|
||||
t.RLock()
|
||||
res, ok := t.txns[txnID]
|
||||
t.RUnlock()
|
||||
|
||||
if ok {
|
||||
return res.cache, nil
|
||||
}
|
||||
|
||||
return nil, errors.New("TxnID not present")
|
||||
}
|
||||
|
||||
// AddTransaction adds a response for txnID in Cache for later access.
|
||||
func (t *Cache) AddTransaction(txnID string, res *util.JSONResponse) {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
t.txns[txnID] = response{cache: res, cacheTime: time.Now()}
|
||||
}
|
||||
|
||||
// cacheCleanService is responsible for cleaning up transactions older than 30 min.
|
||||
// It guarantees that a transaction will be present in cache for at least 30 min & at most 60 min.
|
||||
func cacheCleanService(t *Cache) {
|
||||
for {
|
||||
time.Sleep(CleanupPeriod)
|
||||
go clean(t)
|
||||
}
|
||||
}
|
||||
|
||||
func clean(t *Cache) {
|
||||
expire := time.Now().Add(-CleanupPeriod)
|
||||
for key := range t.txns {
|
||||
t.Lock()
|
||||
if t.txns[key].cacheTime.Before(expire) {
|
||||
delete(t.txns, key)
|
||||
}
|
||||
t.Unlock()
|
||||
}
|
||||
}
|
||||
|
|
@ -16,6 +16,7 @@ package main
|
|||
|
||||
import (
|
||||
"github.com/matrix-org/dendrite/clientapi"
|
||||
"github.com/matrix-org/dendrite/clientapi/transactions"
|
||||
"github.com/matrix-org/dendrite/common/basecomponent"
|
||||
"github.com/matrix-org/dendrite/common/keydb"
|
||||
)
|
||||
|
|
@ -33,10 +34,11 @@ func main() {
|
|||
keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
|
||||
|
||||
alias, input, query := base.CreateHTTPRoomserverAPIs()
|
||||
txnCache := transactions.CreateCache()
|
||||
|
||||
clientapi.SetupClientAPIComponent(
|
||||
base, deviceDB, accountDB, federation, &keyRing,
|
||||
alias, input, query,
|
||||
alias, input, query, txnCache,
|
||||
)
|
||||
|
||||
base.SetupAndServeHTTP(string(base.Cfg.Listen.ClientAPI))
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/common/keydb"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi"
|
||||
"github.com/matrix-org/dendrite/clientapi/transactions"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/common/basecomponent"
|
||||
"github.com/matrix-org/dendrite/federationapi"
|
||||
|
|
@ -51,8 +52,9 @@ func main() {
|
|||
keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
|
||||
|
||||
alias, input, query := roomserver.SetupRoomServerComponent(base)
|
||||
txnCache := transactions.CreateCache()
|
||||
|
||||
clientapi.SetupClientAPIComponent(base, deviceDB, accountDB, federation, &keyRing, alias, input, query)
|
||||
clientapi.SetupClientAPIComponent(base, deviceDB, accountDB, federation, &keyRing, alias, input, query, txnCache)
|
||||
federationapi.SetupFederationAPIComponent(base, accountDB, federation, &keyRing, alias, input, query)
|
||||
federationsender.SetupFederationSenderComponent(base, federation, query)
|
||||
mediaapi.SetupMediaAPIComponent(base, deviceDB)
|
||||
|
|
|
|||
Loading…
Reference in a new issue