Implement transaction cache to ensure idempotency of sendEvents

Signed-off-by: Anant Prakash <anantprakashjsr@gmail.com>
This commit is contained in:
Anant Prakash 2018-03-10 01:05:23 +05:30
parent 6b55972183
commit c3221556e1
No known key found for this signature in database
GPG key ID: C5D399F626523045
6 changed files with 126 additions and 7 deletions

View file

@ -20,6 +20,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/consumers" "github.com/matrix-org/dendrite/clientapi/consumers"
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/routing" "github.com/matrix-org/dendrite/clientapi/routing"
"github.com/matrix-org/dendrite/clientapi/transactions"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -37,6 +38,7 @@ func SetupClientAPIComponent(
aliasAPI api.RoomserverAliasAPI, aliasAPI api.RoomserverAliasAPI,
inputAPI api.RoomserverInputAPI, inputAPI api.RoomserverInputAPI,
queryAPI api.RoomserverQueryAPI, queryAPI api.RoomserverQueryAPI,
transactionsCache *transactions.Cache,
) { ) {
roomserverProducer := producers.NewRoomserverProducer(inputAPI) roomserverProducer := producers.NewRoomserverProducer(inputAPI)
@ -62,5 +64,6 @@ func SetupClientAPIComponent(
queryAPI, aliasAPI, accountsDB, deviceDB, queryAPI, aliasAPI, accountsDB, deviceDB,
federation, *keyRing, federation, *keyRing,
userUpdateProducer, syncProducer, userUpdateProducer, syncProducer,
transactionsCache,
) )
} }

View file

@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/transactions"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -48,6 +49,7 @@ func Setup(
keyRing gomatrixserverlib.KeyRing, keyRing gomatrixserverlib.KeyRing,
userUpdateProducer *producers.UserUpdateProducer, userUpdateProducer *producers.UserUpdateProducer,
syncProducer *producers.SyncAPIProducer, syncProducer *producers.SyncAPIProducer,
transactionsCache *transactions.Cache,
) { ) {
apiMux.Handle("/_matrix/client/versions", apiMux.Handle("/_matrix/client/versions",
@ -91,14 +93,14 @@ func Setup(
r0mux.Handle("/rooms/{roomID}/send/{eventType}", r0mux.Handle("/rooms/{roomID}/send/{eventType}",
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req) vars := mux.Vars(req)
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, queryAPI, producer) return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, queryAPI, producer, nil)
}), }),
).Methods("POST", "OPTIONS") ).Methods("POST", "OPTIONS")
r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}", r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}",
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req) vars := mux.Vars(req)
txnID := vars["txnID"] txnID := vars["txnID"]
return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID, nil, cfg, queryAPI, producer) return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID, nil, cfg, queryAPI, producer, transactionsCache)
}), }),
).Methods("PUT", "OPTIONS") ).Methods("PUT", "OPTIONS")
r0mux.Handle("/rooms/{roomID}/state/{eventType:[^/]+/?}", r0mux.Handle("/rooms/{roomID}/state/{eventType:[^/]+/?}",
@ -110,14 +112,14 @@ func Setup(
if strings.HasSuffix(eventType, "/") { if strings.HasSuffix(eventType, "/") {
eventType = eventType[:len(eventType)-1] eventType = eventType[:len(eventType)-1]
} }
return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, queryAPI, producer) return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, queryAPI, producer, nil)
}), }),
).Methods("PUT", "OPTIONS") ).Methods("PUT", "OPTIONS")
r0mux.Handle("/rooms/{roomID}/state/{eventType}/{stateKey}", r0mux.Handle("/rooms/{roomID}/state/{eventType}/{stateKey}",
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req) vars := mux.Vars(req)
stateKey := vars["stateKey"] stateKey := vars["stateKey"]
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, queryAPI, producer) return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, queryAPI, producer, nil)
}), }),
).Methods("PUT", "OPTIONS") ).Methods("PUT", "OPTIONS")

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/transactions"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -45,7 +46,18 @@ func SendEvent(
cfg config.Dendrite, cfg config.Dendrite,
queryAPI api.RoomserverQueryAPI, queryAPI api.RoomserverQueryAPI,
producer *producers.RoomserverProducer, producer *producers.RoomserverProducer,
transactionsCache *transactions.Cache,
) util.JSONResponse { ) util.JSONResponse {
if txnID != nil {
// Try to fetch response from transactionsCache
res, err := transactionsCache.FetchTransaction(*txnID)
if err != nil {
return *res
}
}
// parse the incoming http request // parse the incoming http request
userID := device.UserID userID := device.UserID
var r map[string]interface{} // must be a JSON object var r map[string]interface{} // must be a JSON object
@ -105,8 +117,15 @@ func SendEvent(
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
return util.JSONResponse{ res := util.JSONResponse{
Code: 200, Code: 200,
JSON: sendEventResponse{e.EventID()}, JSON: sendEventResponse{e.EventID()},
} }
// Add response to transactionsCache for later access
if txnID != nil {
transactionsCache.AddTransaction(*txnID, &res)
}
return res
} }

View file

@ -0,0 +1,91 @@
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transactions
import (
"errors"
"sync"
"time"
"github.com/matrix-org/util"
)
// CleanupPeriod represents time in nanoseconds after which cacheCleanService runs.
const CleanupPeriod time.Duration = 30 * time.Minute
type response struct {
cache *util.JSONResponse
cacheTime time.Time
}
// Cache represents a temporary store for responses.
// This is used to ensure idempotency of requests.
type Cache struct {
sync.RWMutex
txns map[string]response
}
// CreateCache creates and returns a initialized Cache object.
// This cache is automatically cleaned every `CleanupPeriod`.
func CreateCache() *Cache {
t := Cache{txns: make(map[string]response)}
// Start cleanup service as the Cache is created
go cacheCleanService(&t)
return &t
}
// FetchTransaction looks up response for txnID in Cache.
// Returns a JSON response if txnID is found, which can be sent to client,
// else returns error.
func (t *Cache) FetchTransaction(txnID string) (*util.JSONResponse, error) {
t.RLock()
res, ok := t.txns[txnID]
t.RUnlock()
if ok {
return res.cache, nil
}
return nil, errors.New("TxnID not present")
}
// AddTransaction adds a response for txnID in Cache for later access.
func (t *Cache) AddTransaction(txnID string, res *util.JSONResponse) {
t.Lock()
defer t.Unlock()
t.txns[txnID] = response{cache: res, cacheTime: time.Now()}
}
// cacheCleanService is responsible for cleaning up transactions older than 30 min.
// It guarantees that a transaction will be present in cache for at least 30 min & at most 60 min.
func cacheCleanService(t *Cache) {
for {
time.Sleep(CleanupPeriod)
go clean(t)
}
}
func clean(t *Cache) {
expire := time.Now().Add(-CleanupPeriod)
for key := range t.txns {
t.Lock()
if t.txns[key].cacheTime.Before(expire) {
delete(t.txns, key)
}
t.Unlock()
}
}

View file

@ -16,6 +16,7 @@ package main
import ( import (
"github.com/matrix-org/dendrite/clientapi" "github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/transactions"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/dendrite/common/keydb"
) )
@ -33,10 +34,11 @@ func main() {
keyRing := keydb.CreateKeyRing(federation.Client, keyDB) keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
alias, input, query := base.CreateHTTPRoomserverAPIs() alias, input, query := base.CreateHTTPRoomserverAPIs()
txnCache := transactions.CreateCache()
clientapi.SetupClientAPIComponent( clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB, federation, &keyRing, base, deviceDB, accountDB, federation, &keyRing,
alias, input, query, alias, input, query, txnCache,
) )
base.SetupAndServeHTTP(string(base.Cfg.Listen.ClientAPI)) base.SetupAndServeHTTP(string(base.Cfg.Listen.ClientAPI))

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/clientapi" "github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/transactions"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi"
@ -51,8 +52,9 @@ func main() {
keyRing := keydb.CreateKeyRing(federation.Client, keyDB) keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
alias, input, query := roomserver.SetupRoomServerComponent(base) alias, input, query := roomserver.SetupRoomServerComponent(base)
txnCache := transactions.CreateCache()
clientapi.SetupClientAPIComponent(base, deviceDB, accountDB, federation, &keyRing, alias, input, query) clientapi.SetupClientAPIComponent(base, deviceDB, accountDB, federation, &keyRing, alias, input, query, txnCache)
federationapi.SetupFederationAPIComponent(base, accountDB, federation, &keyRing, alias, input, query) federationapi.SetupFederationAPIComponent(base, accountDB, federation, &keyRing, alias, input, query)
federationsender.SetupFederationSenderComponent(base, federation, query) federationsender.SetupFederationSenderComponent(base, federation, query)
mediaapi.SetupMediaAPIComponent(base, deviceDB) mediaapi.SetupMediaAPIComponent(base, deviceDB)