mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-01-18 18:04:27 -06:00
Deflake currentstateserver integration tests (#1263)
This commit is contained in:
parent
b8b854d642
commit
0835107f5b
|
@ -29,7 +29,9 @@ import (
|
|||
"github.com/Shopify/sarama"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/internal"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/inthttp"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
|
@ -76,7 +78,24 @@ func init() {
|
|||
}
|
||||
}
|
||||
|
||||
func MustWriteOutputEvent(t *testing.T, producer sarama.SyncProducer, out *roomserverAPI.OutputNewRoomEvent) error {
|
||||
func waitForOffsetProcessed(t *testing.T, db storage.Database, offset int64) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
for {
|
||||
poffsets, err := db.PartitionOffsets(ctx, kafkaTopic)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to PartitionOffsets: %s", err)
|
||||
}
|
||||
for _, partition := range poffsets {
|
||||
if partition.Offset >= offset {
|
||||
return
|
||||
}
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
func MustWriteOutputEvent(t *testing.T, producer sarama.SyncProducer, out *roomserverAPI.OutputNewRoomEvent) int64 {
|
||||
value, err := json.Marshal(roomserverAPI.OutputEvent{
|
||||
Type: roomserverAPI.OutputTypeNewRoomEvent,
|
||||
NewRoomEvent: out,
|
||||
|
@ -84,7 +103,7 @@ func MustWriteOutputEvent(t *testing.T, producer sarama.SyncProducer, out *rooms
|
|||
if err != nil {
|
||||
t.Fatalf("failed to marshal output event: %s", err)
|
||||
}
|
||||
_, _, err = producer.SendMessage(&sarama.ProducerMessage{
|
||||
_, offset, err := producer.SendMessage(&sarama.ProducerMessage{
|
||||
Topic: kafkaTopic,
|
||||
Key: sarama.StringEncoder(out.Event.RoomID()),
|
||||
Value: sarama.ByteEncoder(value),
|
||||
|
@ -92,10 +111,10 @@ func MustWriteOutputEvent(t *testing.T, producer sarama.SyncProducer, out *rooms
|
|||
if err != nil {
|
||||
t.Fatalf("failed to send message: %s", err)
|
||||
}
|
||||
return nil
|
||||
return offset
|
||||
}
|
||||
|
||||
func MustMakeInternalAPI(t *testing.T) (api.CurrentStateInternalAPI, sarama.SyncProducer, func()) {
|
||||
func MustMakeInternalAPI(t *testing.T) (api.CurrentStateInternalAPI, storage.Database, sarama.SyncProducer, func()) {
|
||||
cfg := &config.Dendrite{}
|
||||
cfg.Defaults()
|
||||
stateDBName := "test_state.db"
|
||||
|
@ -117,26 +136,28 @@ func MustMakeInternalAPI(t *testing.T) (api.CurrentStateInternalAPI, sarama.Sync
|
|||
if err != nil {
|
||||
t.Fatalf("Failed to create naffka consumer: %s", err)
|
||||
}
|
||||
return NewInternalAPI(&cfg.CurrentStateServer, naff), naff, func() {
|
||||
stateAPI := NewInternalAPI(&cfg.CurrentStateServer, naff)
|
||||
// type-cast to pull out the DB
|
||||
stateAPIVal := stateAPI.(*internal.CurrentStateInternalAPI)
|
||||
return stateAPI, stateAPIVal.DB, naff, func() {
|
||||
os.Remove(naffkaDBName)
|
||||
os.Remove(stateDBName)
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueryCurrentState(t *testing.T) {
|
||||
currStateAPI, producer, cancel := MustMakeInternalAPI(t)
|
||||
currStateAPI, db, producer, cancel := MustMakeInternalAPI(t)
|
||||
defer cancel()
|
||||
plTuple := gomatrixserverlib.StateKeyTuple{
|
||||
EventType: "m.room.power_levels",
|
||||
StateKey: "",
|
||||
}
|
||||
plEvent := testEvents[4]
|
||||
MustWriteOutputEvent(t, producer, &roomserverAPI.OutputNewRoomEvent{
|
||||
offset := MustWriteOutputEvent(t, producer, &roomserverAPI.OutputNewRoomEvent{
|
||||
Event: plEvent,
|
||||
AddsStateEventIDs: []string{plEvent.EventID()},
|
||||
})
|
||||
// we have no good way to know /when/ the server has consumed the event
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
waitForOffsetProcessed(t, db, offset)
|
||||
|
||||
testCases := []struct {
|
||||
req api.QueryCurrentStateRequest
|
||||
|
@ -228,7 +249,7 @@ func mustMakeMembershipEvent(t *testing.T, roomID, userID, membership string) *r
|
|||
|
||||
// This test makes sure that QuerySharedUsers is returning the correct users for a range of sets.
|
||||
func TestQuerySharedUsers(t *testing.T) {
|
||||
currStateAPI, producer, cancel := MustMakeInternalAPI(t)
|
||||
currStateAPI, db, producer, cancel := MustMakeInternalAPI(t)
|
||||
defer cancel()
|
||||
MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo:bar", "@alice:localhost", "join"))
|
||||
MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo:bar", "@bob:localhost", "join"))
|
||||
|
@ -240,10 +261,8 @@ func TestQuerySharedUsers(t *testing.T) {
|
|||
MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo3:bar", "@bob:localhost", "join"))
|
||||
MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo3:bar", "@dave:localhost", "leave"))
|
||||
|
||||
MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo4:bar", "@alice:localhost", "join"))
|
||||
|
||||
// we don't know when the server has processed the events
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
offset := MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo4:bar", "@alice:localhost", "join"))
|
||||
waitForOffsetProcessed(t, db, offset)
|
||||
|
||||
testCases := []struct {
|
||||
req api.QuerySharedUsersRequest
|
||||
|
|
Loading…
Reference in a new issue