From c3221556e18497211ce54c71765ef69c49cc6d73 Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Sat, 10 Mar 2018 01:05:23 +0530 Subject: [PATCH] Implement transaction cache to ensure idempotency of sendEvents Signed-off-by: Anant Prakash --- .../dendrite/clientapi/clientapi.go | 3 + .../dendrite/clientapi/routing/routing.go | 10 +- .../dendrite/clientapi/routing/sendevent.go | 21 ++++- .../clientapi/transactions/transactions.go | 91 +++++++++++++++++++ .../cmd/dendrite-client-api-server/main.go | 4 +- .../cmd/dendrite-monolith-server/main.go | 4 +- 6 files changed, 126 insertions(+), 7 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/clientapi/transactions/transactions.go diff --git a/src/github.com/matrix-org/dendrite/clientapi/clientapi.go b/src/github.com/matrix-org/dendrite/clientapi/clientapi.go index 11177ab08..1d62e3267 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/clientapi.go +++ b/src/github.com/matrix-org/dendrite/clientapi/clientapi.go @@ -20,6 +20,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/consumers" "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/routing" + "github.com/matrix-org/dendrite/clientapi/transactions" "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -37,6 +38,7 @@ func SetupClientAPIComponent( aliasAPI api.RoomserverAliasAPI, inputAPI api.RoomserverInputAPI, queryAPI api.RoomserverQueryAPI, + transactionsCache *transactions.Cache, ) { roomserverProducer := producers.NewRoomserverProducer(inputAPI) @@ -62,5 +64,6 @@ func SetupClientAPIComponent( queryAPI, aliasAPI, accountsDB, deviceDB, federation, *keyRing, userUpdateProducer, syncProducer, + transactionsCache, ) } diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index a7999bf49..f447c7e95 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -25,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/clientapi/transactions" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/roomserver/api" @@ -48,6 +49,7 @@ func Setup( keyRing gomatrixserverlib.KeyRing, userUpdateProducer *producers.UserUpdateProducer, syncProducer *producers.SyncAPIProducer, + transactionsCache *transactions.Cache, ) { apiMux.Handle("/_matrix/client/versions", @@ -91,14 +93,14 @@ func Setup( r0mux.Handle("/rooms/{roomID}/send/{eventType}", common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) - return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, queryAPI, producer) + return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, queryAPI, producer, nil) }), ).Methods("POST", "OPTIONS") r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}", common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) txnID := vars["txnID"] - return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID, nil, cfg, queryAPI, producer) + return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID, nil, cfg, queryAPI, producer, transactionsCache) }), ).Methods("PUT", "OPTIONS") r0mux.Handle("/rooms/{roomID}/state/{eventType:[^/]+/?}", @@ -110,14 +112,14 @@ func Setup( if strings.HasSuffix(eventType, "/") { eventType = eventType[:len(eventType)-1] } - return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, queryAPI, producer) + return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, queryAPI, producer, nil) }), ).Methods("PUT", "OPTIONS") r0mux.Handle("/rooms/{roomID}/state/{eventType}/{stateKey}", common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) stateKey := vars["stateKey"] - return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, queryAPI, producer) + return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, queryAPI, producer, nil) }), ).Methods("PUT", "OPTIONS") diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/sendevent.go b/src/github.com/matrix-org/dendrite/clientapi/routing/sendevent.go index 5b3803bb1..ead25c546 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/sendevent.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/sendevent.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/clientapi/transactions" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/roomserver/api" @@ -45,7 +46,18 @@ func SendEvent( cfg config.Dendrite, queryAPI api.RoomserverQueryAPI, producer *producers.RoomserverProducer, + transactionsCache *transactions.Cache, ) util.JSONResponse { + + if txnID != nil { + // Try to fetch response from transactionsCache + res, err := transactionsCache.FetchTransaction(*txnID) + + if err != nil { + return *res + } + } + // parse the incoming http request userID := device.UserID var r map[string]interface{} // must be a JSON object @@ -105,8 +117,15 @@ func SendEvent( return httputil.LogThenError(req, err) } - return util.JSONResponse{ + res := util.JSONResponse{ Code: 200, JSON: sendEventResponse{e.EventID()}, } + + // Add response to transactionsCache for later access + if txnID != nil { + transactionsCache.AddTransaction(*txnID, &res) + } + + return res } diff --git a/src/github.com/matrix-org/dendrite/clientapi/transactions/transactions.go b/src/github.com/matrix-org/dendrite/clientapi/transactions/transactions.go new file mode 100644 index 000000000..11c51b29e --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/transactions/transactions.go @@ -0,0 +1,91 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transactions + +import ( + "errors" + "sync" + "time" + + "github.com/matrix-org/util" +) + +// CleanupPeriod represents time in nanoseconds after which cacheCleanService runs. +const CleanupPeriod time.Duration = 30 * time.Minute + +type response struct { + cache *util.JSONResponse + cacheTime time.Time +} + +// Cache represents a temporary store for responses. +// This is used to ensure idempotency of requests. +type Cache struct { + sync.RWMutex + txns map[string]response +} + +// CreateCache creates and returns a initialized Cache object. +// This cache is automatically cleaned every `CleanupPeriod`. +func CreateCache() *Cache { + t := Cache{txns: make(map[string]response)} + + // Start cleanup service as the Cache is created + go cacheCleanService(&t) + + return &t +} + +// FetchTransaction looks up response for txnID in Cache. +// Returns a JSON response if txnID is found, which can be sent to client, +// else returns error. +func (t *Cache) FetchTransaction(txnID string) (*util.JSONResponse, error) { + t.RLock() + res, ok := t.txns[txnID] + t.RUnlock() + + if ok { + return res.cache, nil + } + + return nil, errors.New("TxnID not present") +} + +// AddTransaction adds a response for txnID in Cache for later access. +func (t *Cache) AddTransaction(txnID string, res *util.JSONResponse) { + t.Lock() + defer t.Unlock() + t.txns[txnID] = response{cache: res, cacheTime: time.Now()} +} + +// cacheCleanService is responsible for cleaning up transactions older than 30 min. +// It guarantees that a transaction will be present in cache for at least 30 min & at most 60 min. +func cacheCleanService(t *Cache) { + for { + time.Sleep(CleanupPeriod) + go clean(t) + } +} + +func clean(t *Cache) { + expire := time.Now().Add(-CleanupPeriod) + for key := range t.txns { + t.Lock() + if t.txns[key].cacheTime.Before(expire) { + delete(t.txns, key) + } + t.Unlock() + } +} diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go index 2845eb364..6cb3453a0 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go @@ -16,6 +16,7 @@ package main import ( "github.com/matrix-org/dendrite/clientapi" + "github.com/matrix-org/dendrite/clientapi/transactions" "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/keydb" ) @@ -33,10 +34,11 @@ func main() { keyRing := keydb.CreateKeyRing(federation.Client, keyDB) alias, input, query := base.CreateHTTPRoomserverAPIs() + txnCache := transactions.CreateCache() clientapi.SetupClientAPIComponent( base, deviceDB, accountDB, federation, &keyRing, - alias, input, query, + alias, input, query, txnCache, ) base.SetupAndServeHTTP(string(base.Cfg.Listen.ClientAPI)) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go index 89005c9d3..5475c41ac 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/dendrite/clientapi" + "github.com/matrix-org/dendrite/clientapi/transactions" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/federationapi" @@ -51,8 +52,9 @@ func main() { keyRing := keydb.CreateKeyRing(federation.Client, keyDB) alias, input, query := roomserver.SetupRoomServerComponent(base) + txnCache := transactions.CreateCache() - clientapi.SetupClientAPIComponent(base, deviceDB, accountDB, federation, &keyRing, alias, input, query) + clientapi.SetupClientAPIComponent(base, deviceDB, accountDB, federation, &keyRing, alias, input, query, txnCache) federationapi.SetupFederationAPIComponent(base, accountDB, federation, &keyRing, alias, input, query) federationsender.SetupFederationSenderComponent(base, federation, query) mediaapi.SetupMediaAPIComponent(base, deviceDB)