mirror of
https://github.com/matrix-org/dendrite.git
synced 2024-11-22 14:21:55 -06:00
Add AS invite test, fix issue with invitations being processed twice (#3020)
The AS roomserver consumer would receive the events twice, one time as type `OutputTypeNewInviteEvent` and the other time as `OutputTypeNewRoomEvent`. [skip ci]
This commit is contained in:
parent
e2d2482ca6
commit
aa1bda4c58
|
@ -16,10 +16,12 @@ import (
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/roomserver"
|
"github.com/matrix-org/dendrite/roomserver"
|
||||||
|
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
"github.com/matrix-org/dendrite/test"
|
"github.com/matrix-org/dendrite/test"
|
||||||
"github.com/matrix-org/dendrite/userapi"
|
"github.com/matrix-org/dendrite/userapi"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/test/testrig"
|
"github.com/matrix-org/dendrite/test/testrig"
|
||||||
)
|
)
|
||||||
|
@ -212,3 +214,86 @@ func testProtocol(t *testing.T, asAPI api.AppServiceInternalAPI, proto string, w
|
||||||
t.Errorf("unexpected result for Protocols(%s): %+v, expected %+v", proto, protoResp.Protocols[proto], wantResult)
|
t.Errorf("unexpected result for Protocols(%s): %+v, expected %+v", proto, protoResp.Protocols[proto], wantResult)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests that the roomserver consumer only receives one invite
|
||||||
|
func TestRoomserverConsumerOneInvite(t *testing.T) {
|
||||||
|
|
||||||
|
alice := test.NewUser(t)
|
||||||
|
bob := test.NewUser(t)
|
||||||
|
room := test.NewRoom(t, alice)
|
||||||
|
|
||||||
|
// Invite Bob
|
||||||
|
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomMember, map[string]interface{}{
|
||||||
|
"membership": "invite",
|
||||||
|
}, test.WithStateKey(bob.ID))
|
||||||
|
|
||||||
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||||
|
cfg, processCtx, closeDB := testrig.CreateConfig(t, dbType)
|
||||||
|
defer closeDB()
|
||||||
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
||||||
|
natsInstance := &jetstream.NATSInstance{}
|
||||||
|
|
||||||
|
evChan := make(chan struct{})
|
||||||
|
// create a dummy AS url, handling the events
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var txn gomatrixserverlib.ApplicationServiceTransaction
|
||||||
|
err := json.NewDecoder(r.Body).Decode(&txn)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
for _, ev := range txn.Events {
|
||||||
|
if ev.Type != gomatrixserverlib.MRoomMember {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Usually we would check the event content for the membership, but since
|
||||||
|
// we only invited bob, this should be fine for this test.
|
||||||
|
if ev.StateKey != nil && *ev.StateKey == bob.ID {
|
||||||
|
evChan <- struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
// Create a dummy application service
|
||||||
|
cfg.AppServiceAPI.Derived.ApplicationServices = []config.ApplicationService{
|
||||||
|
{
|
||||||
|
ID: "someID",
|
||||||
|
URL: srv.URL,
|
||||||
|
ASToken: "",
|
||||||
|
HSToken: "",
|
||||||
|
SenderLocalpart: "senderLocalPart",
|
||||||
|
NamespaceMap: map[string][]config.ApplicationServiceNamespace{
|
||||||
|
"users": {{RegexpObject: regexp.MustCompile(bob.ID)}},
|
||||||
|
"aliases": {{RegexpObject: regexp.MustCompile(room.ID)}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
||||||
|
// Create required internal APIs
|
||||||
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
|
||||||
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
|
usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil)
|
||||||
|
// start the consumer
|
||||||
|
appservice.NewInternalAPI(processCtx, cfg, natsInstance, usrAPI, rsAPI)
|
||||||
|
|
||||||
|
// Create the room
|
||||||
|
if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
|
||||||
|
t.Fatalf("failed to send events: %v", err)
|
||||||
|
}
|
||||||
|
var seenInvitesForBob int
|
||||||
|
waitLoop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Millisecond * 50): // wait for the AS to process the events
|
||||||
|
break waitLoop
|
||||||
|
case <-evChan:
|
||||||
|
seenInvitesForBob++
|
||||||
|
if seenInvitesForBob != 1 {
|
||||||
|
t.Fatalf("received unexpected invites: %d", seenInvitesForBob)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(evChan)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -140,12 +140,6 @@ func (s *OutputRoomEventConsumer) onMessage(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case api.OutputTypeNewInviteEvent:
|
|
||||||
if output.NewInviteEvent == nil || !s.appserviceIsInterestedInEvent(ctx, output.NewInviteEvent.Event, state.ApplicationService) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
events = append(events, output.NewInviteEvent.Event)
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue