From a0715ceb1f8286490ab83891deb34d8e6ba80c05 Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Thu, 10 May 2018 18:57:24 +0530 Subject: [PATCH] Add transactions cache, write tests. Add a transactions module in dendrite/common. This is needed for idempotent APIs. Signed-off-by: Anant Prakash --- .../common/transactions/transactions.go | 87 +++++++++++++++++++ .../common/transactions/transactions_test.go | 75 ++++++++++++++++ 2 files changed, 162 insertions(+) create mode 100644 src/github.com/matrix-org/dendrite/common/transactions/transactions.go create mode 100644 src/github.com/matrix-org/dendrite/common/transactions/transactions_test.go diff --git a/src/github.com/matrix-org/dendrite/common/transactions/transactions.go b/src/github.com/matrix-org/dendrite/common/transactions/transactions.go new file mode 100644 index 000000000..349292a4b --- /dev/null +++ b/src/github.com/matrix-org/dendrite/common/transactions/transactions.go @@ -0,0 +1,87 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transactions + +import ( + "errors" + "sync" + "time" + + "github.com/matrix-org/util" +) + +// CleanupPeriod represents time in nanoseconds after which cacheCleanService runs. +const CleanupPeriod time.Duration = 30 * time.Minute + +type entry struct { + value *util.JSONResponse + entryTime time.Time +} + +// Cache represents a temporary store for entries. +// Entries are evicted after a certain period, defined by CleanupPeriod. +type Cache struct { + sync.RWMutex + txns map[string]entry +} + +// New creates a new Cache object, starts cacheCleanService. +// Returns a referance to newly created Cache. +func New() *Cache { + t := Cache{txns: make(map[string]entry)} + + // Start cleanup service as the Cache is created + go cacheCleanService(&t) + return &t +} + +// FetchTransaction looks up an entry for txnID in Cache. +// Returns a JSON response if txnID is found, else returns error. +func (t *Cache) FetchTransaction(txnID string) (*util.JSONResponse, error) { + t.RLock() + res, ok := t.txns[txnID] + t.RUnlock() + + if ok { + return res.value, nil + } + return nil, errors.New("TxnID not present") +} + +// AddTransaction adds a entry for txnID in Cache for later access. +func (t *Cache) AddTransaction(txnID string, res *util.JSONResponse) { + t.Lock() + defer t.Unlock() + + t.txns[txnID] = entry{value: res, entryTime: time.Now()} +} + +// cacheCleanService is responsible for cleaning up transactions older than CleanupPeriod. +// It guarantees that a transaction will be present in cache for at least CleanupPeriod & at most 2 * CleanupPeriod. +func cacheCleanService(t *Cache) { + for { + time.Sleep(CleanupPeriod) + go clean(t) + } +} + +func clean(t *Cache) { + expire := time.Now().Add(-CleanupPeriod) + for key := range t.txns { + t.Lock() + if t.txns[key].entryTime.Before(expire) { + delete(t.txns, key) + } + t.Unlock() + } +} diff --git a/src/github.com/matrix-org/dendrite/common/transactions/transactions_test.go b/src/github.com/matrix-org/dendrite/common/transactions/transactions_test.go new file mode 100644 index 000000000..459cc7617 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/common/transactions/transactions_test.go @@ -0,0 +1,75 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transactions + +import ( + "net/http" + "testing" + + "github.com/matrix-org/util" +) + +type fakeType struct { + ID string `json:"ID"` +} + +var ( + fakeTxnID = "aRandomTxnID" + fakeResponse = &util.JSONResponse{Code: http.StatusOK, JSON: fakeType{ID: "0"}} +) + +// TestCache creates a New Cache and tests AddTransaction & FetchTransaction +func TestCache(t *testing.T) { + + fakeTxnCache := New() + fakeTxnCache.AddTransaction(fakeTxnID, fakeResponse) + + // Add entries for noise. + for i := 1; i <= 100; i++ { + fakeTxnCache.AddTransaction( + fakeTxnID+string(i), + &util.JSONResponse{Code: http.StatusOK, JSON: fakeType{ID: string(i)}}, + ) + } + + testResponse, err := fakeTxnCache.FetchTransaction(fakeTxnID) + if err != nil { + t.Error("Failed to retrieve entry for txnID: ", fakeTxnID) + } else if testResponse.JSON != fakeResponse.JSON { + t.Error("Incorrect fetched response. Expected: ", fakeResponse.JSON, " got: ", testResponse.JSON) + } +} + +// TestConcurentAccess provides some guarantee against corruption. +func TestConcurentAccess(t *testing.T) { + fakeTxnCache := New() + // Signal that this test should run in parallel + t.Parallel() + // Add entries concurrently to test concurrent access. + for i := 1; i <= 1000; i++ { + go check(t, fakeTxnCache, i) + } +} + +// Adds an entry and checks it is retrieved. +func check(t *testing.T, fakeTxnCache *Cache, i int) { + fakeTxnCache.AddTransaction( + fakeTxnID+string(i), + &util.JSONResponse{Code: http.StatusOK, JSON: fakeType{ID: string(i)}}, + ) + _, err := fakeTxnCache.FetchTransaction(fakeTxnID + string(i)) + + if err != nil { + t.Error("Failed to retrieve entry for txnID: ", fakeTxnID+string(i)) + } +}