dendrite/federationapi/queue/queue_test.go
kegsay 1647213fac
Implement new RoomVersionImpl API (#3062)
As outlined in https://github.com/matrix-org/gomatrixserverlib/pull/368

The main change Dendrite side is that `RoomVersion` no longer has any
methods on it. Instead, you need to bounce via `gmsl.GetRoomVersion`.

It's very interesting to see where exactly Dendrite cares about this.
For some places it's creating events (fine) but others are way more
specific. Those areas will need to migrate to GMSL at some point.
2023-04-21 17:06:29 +01:00

986 lines
37 KiB
Go

// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package queue
import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/test/testrig"
"github.com/matrix-org/gomatrixserverlib/fclient"
"github.com/matrix-org/gomatrixserverlib/spec"
"go.uber.org/atomic"
"gotest.tools/v3/poll"
"github.com/matrix-org/gomatrixserverlib"
"github.com/stretchr/testify/assert"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/federationapi/storage"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/test"
)
func mustCreateFederationDatabase(t *testing.T, dbType test.DBType, realDatabase bool) (storage.Database, *process.ProcessContext, func()) {
if realDatabase {
// Real Database/s
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
connStr, dbClose := test.PrepareDBConnectionString(t, dbType)
db, err := storage.NewDatabase(processCtx.Context(), cm, &config.DatabaseOptions{
ConnectionString: config.DataSource(connStr),
}, caches, cfg.Global.IsLocalServerName)
if err != nil {
t.Fatalf("NewDatabase returned %s", err)
}
return db, processCtx, func() {
close()
dbClose()
}
} else {
// Fake Database
db := test.NewInMemoryFederationDatabase()
return db, process.NewProcessContext(), func() {}
}
}
type stubFederationRoomServerAPI struct {
rsapi.FederationRoomserverAPI
}
func (r *stubFederationRoomServerAPI) QueryServerBannedFromRoom(ctx context.Context, req *rsapi.QueryServerBannedFromRoomRequest, res *rsapi.QueryServerBannedFromRoomResponse) error {
res.Banned = false
return nil
}
type stubFederationClient struct {
api.FederationClient
shouldTxSucceed bool
shouldTxRelaySucceed bool
txCount atomic.Uint32
txRelayCount atomic.Uint32
}
func (f *stubFederationClient) SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res fclient.RespSend, err error) {
var result error
if !f.shouldTxSucceed {
result = fmt.Errorf("transaction failed")
}
f.txCount.Add(1)
return fclient.RespSend{}, result
}
func (f *stubFederationClient) P2PSendTransactionToRelay(ctx context.Context, u spec.UserID, t gomatrixserverlib.Transaction, forwardingServer spec.ServerName) (res fclient.EmptyResp, err error) {
var result error
if !f.shouldTxRelaySucceed {
result = fmt.Errorf("relay transaction failed")
}
f.txRelayCount.Add(1)
return fclient.EmptyResp{}, result
}
func mustCreatePDU(t *testing.T) *gomatrixserverlib.HeaderedEvent {
t.Helper()
content := `{"type":"m.room.message"}`
ev, err := gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionV10).NewEventFromTrustedJSON([]byte(content), false)
if err != nil {
t.Fatalf("failed to create event: %v", err)
}
return ev.Headered(gomatrixserverlib.RoomVersionV10)
}
func mustCreateEDU(t *testing.T) *gomatrixserverlib.EDU {
t.Helper()
return &gomatrixserverlib.EDU{Type: spec.MTyping}
}
func testSetup(failuresUntilBlacklist uint32, failuresUntilAssumedOffline uint32, shouldTxSucceed bool, shouldTxRelaySucceed bool, t *testing.T, dbType test.DBType, realDatabase bool) (storage.Database, *stubFederationClient, *OutgoingQueues, *process.ProcessContext, func()) {
db, processContext, close := mustCreateFederationDatabase(t, dbType, realDatabase)
fc := &stubFederationClient{
shouldTxSucceed: shouldTxSucceed,
shouldTxRelaySucceed: shouldTxRelaySucceed,
txCount: *atomic.NewUint32(0),
txRelayCount: *atomic.NewUint32(0),
}
rs := &stubFederationRoomServerAPI{}
stats := statistics.NewStatistics(db, failuresUntilBlacklist, failuresUntilAssumedOffline)
signingInfo := []*fclient.SigningIdentity{
{
KeyID: "ed21019:auto",
PrivateKey: test.PrivateKeyA,
ServerName: "localhost",
},
}
queues := NewOutgoingQueues(db, processContext, false, "localhost", fc, rs, &stats, signingInfo)
return db, fc, queues, processContext, close
}
func TestSendPDUOnSuccessRemovedFromDB(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := spec.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
ev := mustCreatePDU(t)
err := queues.SendEvent(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == 1 {
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 0 {
return poll.Success()
}
return poll.Continue("waiting for event to be removed from database. Currently present PDU: %d", len(data))
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
}
func TestSendEDUOnSuccessRemovedFromDB(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := spec.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
ev := mustCreateEDU(t)
err := queues.SendEDU(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == 1 {
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 0 {
return poll.Success()
}
return poll.Continue("waiting for event to be removed from database. Currently present EDU: %d", len(data))
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
}
func TestSendPDUOnFailStoredInDB(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := spec.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
ev := mustCreatePDU(t)
err := queues.SendEvent(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
// Wait for 2 backoff attempts to ensure there was adequate time to attempt sending
if fc.txCount.Load() >= 2 {
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 1 {
return poll.Success()
}
return poll.Continue("waiting for event to be added to database. Currently present PDU: %d", len(data))
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
}
func TestSendEDUOnFailStoredInDB(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := spec.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
ev := mustCreateEDU(t)
err := queues.SendEDU(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
// Wait for 2 backoff attempts to ensure there was adequate time to attempt sending
if fc.txCount.Load() >= 2 {
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 1 {
return poll.Success()
}
return poll.Continue("waiting for event to be added to database. Currently present EDU: %d", len(data))
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
}
func TestSendPDUAgainDoesntInterruptBackoff(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := spec.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
ev := mustCreatePDU(t)
err := queues.SendEvent(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
// Wait for 2 backoff attempts to ensure there was adequate time to attempt sending
if fc.txCount.Load() >= 2 {
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 1 {
return poll.Success()
}
return poll.Continue("waiting for event to be added to database. Currently present PDU: %d", len(data))
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
fc.shouldTxSucceed = true
ev = mustCreatePDU(t)
err = queues.SendEvent(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
pollEnd := time.Now().Add(1 * time.Second)
immediateCheck := func(log poll.LogT) poll.Result {
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 0 {
return poll.Error(fmt.Errorf("The backoff was interrupted early"))
}
if time.Now().After(pollEnd) {
// Allow more than enough time for the backoff to be interrupted before
// reporting that it wasn't.
return poll.Success()
}
return poll.Continue("waiting for events to be removed from database. Currently present PDU: %d", len(data))
}
poll.WaitOn(t, immediateCheck, poll.WithTimeout(2*time.Second), poll.WithDelay(100*time.Millisecond))
}
func TestSendEDUAgainDoesntInterruptBackoff(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := spec.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
ev := mustCreateEDU(t)
err := queues.SendEDU(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
// Wait for 2 backoff attempts to ensure there was adequate time to attempt sending
if fc.txCount.Load() >= 2 {
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 1 {
return poll.Success()
}
return poll.Continue("waiting for event to be added to database. Currently present EDU: %d", len(data))
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
fc.shouldTxSucceed = true
ev = mustCreateEDU(t)
err = queues.SendEDU(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
pollEnd := time.Now().Add(1 * time.Second)
immediateCheck := func(log poll.LogT) poll.Result {
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 0 {
return poll.Error(fmt.Errorf("The backoff was interrupted early"))
}
if time.Now().After(pollEnd) {
// Allow more than enough time for the backoff to be interrupted before
// reporting that it wasn't.
return poll.Success()
}
return poll.Continue("waiting for events to be removed from database. Currently present EDU: %d", len(data))
}
poll.WaitOn(t, immediateCheck, poll.WithTimeout(2*time.Second), poll.WithDelay(100*time.Millisecond))
}
func TestSendPDUMultipleFailuresBlacklisted(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(2)
destination := spec.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
ev := mustCreatePDU(t)
err := queues.SendEvent(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == failuresUntilBlacklist {
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 1 {
if val, _ := db.IsServerBlacklisted(destination); val {
return poll.Success()
}
return poll.Continue("waiting for server to be blacklisted")
}
return poll.Continue("waiting for event to be added to database. Currently present PDU: %d", len(data))
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
}
func TestSendEDUMultipleFailuresBlacklisted(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(2)
destination := spec.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
ev := mustCreateEDU(t)
err := queues.SendEDU(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == failuresUntilBlacklist {
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 1 {
if val, _ := db.IsServerBlacklisted(destination); val {
return poll.Success()
}
return poll.Continue("waiting for server to be blacklisted")
}
return poll.Continue("waiting for event to be added to database. Currently present EDU: %d", len(data))
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
}
func TestSendPDUBlacklistedWithPriorExternalFailure(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(2)
destination := spec.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
queues.statistics.ForServer(destination).Failure()
ev := mustCreatePDU(t)
err := queues.SendEvent(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == failuresUntilBlacklist {
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 1 {
if val, _ := db.IsServerBlacklisted(destination); val {
return poll.Success()
}
return poll.Continue("waiting for server to be blacklisted")
}
return poll.Continue("waiting for event to be added to database. Currently present PDU: %d", len(data))
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
}
func TestSendEDUBlacklistedWithPriorExternalFailure(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(2)
destination := spec.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
queues.statistics.ForServer(destination).Failure()
ev := mustCreateEDU(t)
err := queues.SendEDU(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == failuresUntilBlacklist {
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 1 {
if val, _ := db.IsServerBlacklisted(destination); val {
return poll.Success()
}
return poll.Continue("waiting for server to be blacklisted")
}
return poll.Continue("waiting for event to be added to database. Currently present EDU: %d", len(data))
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
}
func TestRetryServerSendsPDUSuccessfully(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(1)
destination := spec.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
// NOTE : getQueue before sending event to ensure we grab the same queue reference
// before it is blacklisted and deleted.
dest := queues.getQueue(destination)
ev := mustCreatePDU(t)
err := queues.SendEvent(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
checkBlacklisted := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == failuresUntilBlacklist {
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 1 {
if val, _ := db.IsServerBlacklisted(destination); val {
if !dest.running.Load() {
return poll.Success()
}
return poll.Continue("waiting for queue to stop completely")
}
return poll.Continue("waiting for server to be blacklisted")
}
return poll.Continue("waiting for event to be added to database. Currently present PDU: %d", len(data))
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, checkBlacklisted, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
fc.shouldTxSucceed = true
wasBlacklisted := dest.statistics.MarkServerAlive()
queues.RetryServer(destination, wasBlacklisted)
checkRetry := func(log poll.LogT) poll.Result {
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 0 {
return poll.Success()
}
return poll.Continue("waiting for event to be removed from database. Currently present PDU: %d", len(data))
}
poll.WaitOn(t, checkRetry, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
}
func TestRetryServerSendsEDUSuccessfully(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(1)
destination := spec.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
// NOTE : getQueue before sending event to ensure we grab the same queue reference
// before it is blacklisted and deleted.
dest := queues.getQueue(destination)
ev := mustCreateEDU(t)
err := queues.SendEDU(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
checkBlacklisted := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == failuresUntilBlacklist {
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 1 {
if val, _ := db.IsServerBlacklisted(destination); val {
if !dest.running.Load() {
return poll.Success()
}
return poll.Continue("waiting for queue to stop completely")
}
return poll.Continue("waiting for server to be blacklisted")
}
return poll.Continue("waiting for event to be added to database. Currently present EDU: %d", len(data))
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, checkBlacklisted, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
fc.shouldTxSucceed = true
wasBlacklisted := dest.statistics.MarkServerAlive()
queues.RetryServer(destination, wasBlacklisted)
checkRetry := func(log poll.LogT) poll.Result {
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 0 {
return poll.Success()
}
return poll.Continue("waiting for event to be removed from database. Currently present EDU: %d", len(data))
}
poll.WaitOn(t, checkRetry, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
}
func TestSendPDUBatches(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := spec.ServerName("remotehost")
// test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
// db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, dbType, true)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
destinations := map[spec.ServerName]struct{}{destination: {}}
// Populate database with > maxPDUsPerTransaction
pduMultiplier := uint32(3)
for i := 0; i < maxPDUsPerTransaction*int(pduMultiplier); i++ {
ev := mustCreatePDU(t)
headeredJSON, _ := json.Marshal(ev)
nid, _ := db.StoreJSON(pc.Context(), string(headeredJSON))
err := db.AssociatePDUWithDestinations(pc.Context(), destinations, nid)
assert.NoError(t, err, "failed to associate PDU with destinations")
}
ev := mustCreatePDU(t)
err := queues.SendEvent(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == pduMultiplier+1 { // +1 for the extra SendEvent()
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 200)
assert.NoError(t, dbErr)
if len(data) == 0 {
return poll.Success()
}
return poll.Continue("waiting for all events to be removed from database. Currently present PDU: %d", len(data))
}
return poll.Continue("waiting for the right amount of send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
// })
}
func TestSendEDUBatches(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := spec.ServerName("remotehost")
// test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
// db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, dbType, true)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
destinations := map[spec.ServerName]struct{}{destination: {}}
// Populate database with > maxEDUsPerTransaction
eduMultiplier := uint32(3)
for i := 0; i < maxEDUsPerTransaction*int(eduMultiplier); i++ {
ev := mustCreateEDU(t)
ephemeralJSON, _ := json.Marshal(ev)
nid, _ := db.StoreJSON(pc.Context(), string(ephemeralJSON))
err := db.AssociateEDUWithDestinations(pc.Context(), destinations, nid, ev.Type, nil)
assert.NoError(t, err, "failed to associate EDU with destinations")
}
ev := mustCreateEDU(t)
err := queues.SendEDU(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == eduMultiplier+1 { // +1 for the extra SendEvent()
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 200)
assert.NoError(t, dbErr)
if len(data) == 0 {
return poll.Success()
}
return poll.Continue("waiting for all events to be removed from database. Currently present EDU: %d", len(data))
}
return poll.Continue("waiting for the right amount of send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
// })
}
func TestSendPDUAndEDUBatches(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := spec.ServerName("remotehost")
// test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
// db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, dbType, true)
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
destinations := map[spec.ServerName]struct{}{destination: {}}
// Populate database with > maxEDUsPerTransaction
multiplier := uint32(3)
for i := 0; i < maxPDUsPerTransaction*int(multiplier)+1; i++ {
ev := mustCreatePDU(t)
headeredJSON, _ := json.Marshal(ev)
nid, _ := db.StoreJSON(pc.Context(), string(headeredJSON))
err := db.AssociatePDUWithDestinations(pc.Context(), destinations, nid)
assert.NoError(t, err, "failed to associate PDU with destinations")
}
for i := 0; i < maxEDUsPerTransaction*int(multiplier); i++ {
ev := mustCreateEDU(t)
ephemeralJSON, _ := json.Marshal(ev)
nid, _ := db.StoreJSON(pc.Context(), string(ephemeralJSON))
err := db.AssociateEDUWithDestinations(pc.Context(), destinations, nid, ev.Type, nil)
assert.NoError(t, err, "failed to associate EDU with destinations")
}
ev := mustCreateEDU(t)
err := queues.SendEDU(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == multiplier+1 { // +1 for the extra SendEvent()
pduData, dbErrPDU := db.GetPendingPDUs(pc.Context(), destination, 200)
assert.NoError(t, dbErrPDU)
eduData, dbErrEDU := db.GetPendingEDUs(pc.Context(), destination, 200)
assert.NoError(t, dbErrEDU)
if len(pduData) == 0 && len(eduData) == 0 {
return poll.Success()
}
return poll.Continue("waiting for all events to be removed from database. Currently present PDU: %d EDU: %d", len(pduData), len(eduData))
}
return poll.Continue("waiting for the right amount of send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
// })
}
func TestExternalFailureBackoffDoesntStartQueue(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := spec.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
dest := queues.getQueue(destination)
queues.statistics.ForServer(destination).Failure()
destinations := map[spec.ServerName]struct{}{destination: {}}
ev := mustCreatePDU(t)
headeredJSON, _ := json.Marshal(ev)
nid, _ := db.StoreJSON(pc.Context(), string(headeredJSON))
err := db.AssociatePDUWithDestinations(pc.Context(), destinations, nid)
assert.NoError(t, err, "failed to associate PDU with destinations")
pollEnd := time.Now().Add(3 * time.Second)
runningCheck := func(log poll.LogT) poll.Result {
if dest.running.Load() || fc.txCount.Load() > 0 {
return poll.Error(fmt.Errorf("The queue was started"))
}
if time.Now().After(pollEnd) {
// Allow more than enough time for the queue to be started in the case
// of backoff triggering it to start.
return poll.Success()
}
return poll.Continue("waiting to ensure queue doesn't start.")
}
poll.WaitOn(t, runningCheck, poll.WithTimeout(4*time.Second), poll.WithDelay(100*time.Millisecond))
}
func TestQueueInteractsWithRealDatabasePDUAndEDU(t *testing.T) {
// NOTE : Only one test case against real databases can be run at a time.
t.Parallel()
failuresUntilBlacklist := uint32(1)
destination := spec.ServerName("remotehost")
destinations := map[spec.ServerName]struct{}{destination: {}}
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, dbType, true)
// NOTE : These defers aren't called if go test is killed so the dbs may not get cleaned up.
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
// NOTE : getQueue before sending event to ensure we grab the same queue reference
// before it is blacklisted and deleted.
dest := queues.getQueue(destination)
ev := mustCreatePDU(t)
err := queues.SendEvent(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
// NOTE : The server can be blacklisted before this, so manually inject the event
// into the database.
edu := mustCreateEDU(t)
ephemeralJSON, _ := json.Marshal(edu)
nid, _ := db.StoreJSON(pc.Context(), string(ephemeralJSON))
err = db.AssociateEDUWithDestinations(pc.Context(), destinations, nid, edu.Type, nil)
assert.NoError(t, err, "failed to associate EDU with destinations")
checkBlacklisted := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == failuresUntilBlacklist {
pduData, dbErrPDU := db.GetPendingPDUs(pc.Context(), destination, 200)
assert.NoError(t, dbErrPDU)
eduData, dbErrEDU := db.GetPendingEDUs(pc.Context(), destination, 200)
assert.NoError(t, dbErrEDU)
if len(pduData) == 1 && len(eduData) == 1 {
if val, _ := db.IsServerBlacklisted(destination); val {
if !dest.running.Load() {
return poll.Success()
}
return poll.Continue("waiting for queue to stop completely")
}
return poll.Continue("waiting for server to be blacklisted")
}
return poll.Continue("waiting for events to be added to database. Currently present PDU: %d EDU: %d", len(pduData), len(eduData))
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, checkBlacklisted, poll.WithTimeout(10*time.Second), poll.WithDelay(100*time.Millisecond))
fc.shouldTxSucceed = true
wasBlacklisted := dest.statistics.MarkServerAlive()
queues.RetryServer(destination, wasBlacklisted)
checkRetry := func(log poll.LogT) poll.Result {
pduData, dbErrPDU := db.GetPendingPDUs(pc.Context(), destination, 200)
assert.NoError(t, dbErrPDU)
eduData, dbErrEDU := db.GetPendingEDUs(pc.Context(), destination, 200)
assert.NoError(t, dbErrEDU)
if len(pduData) == 0 && len(eduData) == 0 {
return poll.Success()
}
return poll.Continue("waiting for events to be removed from database. Currently present PDU: %d EDU: %d", len(pduData), len(eduData))
}
poll.WaitOn(t, checkRetry, poll.WithTimeout(10*time.Second), poll.WithDelay(100*time.Millisecond))
})
}
func TestSendPDUMultipleFailuresAssumedOffline(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(7)
failuresUntilAssumedOffline := uint32(2)
destination := spec.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
ev := mustCreatePDU(t)
err := queues.SendEvent(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == failuresUntilAssumedOffline {
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 1 {
if val, _ := db.IsServerAssumedOffline(context.Background(), destination); val {
return poll.Success()
}
return poll.Continue("waiting for server to be assumed offline")
}
return poll.Continue("waiting for event to be added to database. Currently present PDU: %d", len(data))
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
}
func TestSendEDUMultipleFailuresAssumedOffline(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(7)
failuresUntilAssumedOffline := uint32(2)
destination := spec.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
ev := mustCreateEDU(t)
err := queues.SendEDU(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
if fc.txCount.Load() == failuresUntilAssumedOffline {
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 1 {
if val, _ := db.IsServerAssumedOffline(context.Background(), destination); val {
return poll.Success()
}
return poll.Continue("waiting for server to be assumed offline")
}
return poll.Continue("waiting for event to be added to database. Currently present EDU: %d", len(data))
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
}
func TestSendPDUOnRelaySuccessRemovedFromDB(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
failuresUntilAssumedOffline := uint32(1)
destination := spec.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, true, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
relayServers := []spec.ServerName{"relayserver"}
queues.statistics.ForServer(destination).AddRelayServers(relayServers)
ev := mustCreatePDU(t)
err := queues.SendEvent(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
if fc.txCount.Load() >= 1 {
if fc.txRelayCount.Load() == 1 {
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 0 {
return poll.Success()
}
return poll.Continue("waiting for event to be removed from database. Currently present PDU: %d", len(data))
}
return poll.Continue("waiting for more relay send attempts before checking database. Currently %d", fc.txRelayCount.Load())
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
assumedOffline, _ := db.IsServerAssumedOffline(context.Background(), destination)
assert.Equal(t, true, assumedOffline)
}
func TestSendEDUOnRelaySuccessRemovedFromDB(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
failuresUntilAssumedOffline := uint32(1)
destination := spec.ServerName("remotehost")
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, true, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
<-pc.WaitForShutdown()
}()
relayServers := []spec.ServerName{"relayserver"}
queues.statistics.ForServer(destination).AddRelayServers(relayServers)
ev := mustCreateEDU(t)
err := queues.SendEDU(ev, "localhost", []spec.ServerName{destination})
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
if fc.txCount.Load() >= 1 {
if fc.txRelayCount.Load() == 1 {
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
if len(data) == 0 {
return poll.Success()
}
return poll.Continue("waiting for event to be removed from database. Currently present EDU: %d", len(data))
}
return poll.Continue("waiting for more relay send attempts before checking database. Currently %d", fc.txRelayCount.Load())
}
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
}
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
assumedOffline, _ := db.IsServerAssumedOffline(context.Background(), destination)
assert.Equal(t, true, assumedOffline)
}