From 23d100214290f02cd5228ffe088ba32c5c50597e Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Wed, 21 Dec 2022 16:49:10 +0100 Subject: [PATCH] Fix FederationAPI not processing purges; start SyncAPI/FedAPI consumer --- federationapi/consumers/roomserver.go | 6 ++++-- roomserver/roomserver_test.go | 16 +++++++++++++++- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index 8f3f81f8a..a04b5128d 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -91,8 +91,10 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms msg := msgs[0] // Guaranteed to exist if onMessage is called receivedType := api.OutputType(msg.Header.Get(jetstream.RoomEventType)) - // Only handle events we care about - if receivedType != api.OutputTypeNewRoomEvent && receivedType != api.OutputTypeNewInboundPeek { + // Only handle events we care about, avoids unneeded unmarshalling + switch receivedType { + case api.OutputTypeNewRoomEvent, api.OutputTypeNewInboundPeek, api.OutputTypePurgeRoom: + default: return true } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 6d68bca60..437c4540c 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -7,6 +7,10 @@ import ( "time" "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/federationapi" + "github.com/matrix-org/dendrite/keyserver" + "github.com/matrix-org/dendrite/syncapi" + "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/dendrite/internal/httputil" @@ -160,9 +164,16 @@ func TestPurgeRoom(t *testing.T) { base, db, close := mustCreateDatabase(t, dbType) defer close() + fedClient := base.CreateFederationClient() rsAPI := roomserver.NewInternalAPI(base) - // SetFederationAPI starts the room event input consumer + keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fedClient, rsAPI) + userAPI := userapi.NewInternalAPI(base, &base.Cfg.UserAPI, nil, keyAPI, rsAPI, nil) + + // this starts the JetStream consumers + syncapi.AddPublicRoutes(base, userAPI, rsAPI, keyAPI) + federationapi.NewInternalAPI(base, fedClient, rsAPI, base.Caches, nil, true) rsAPI.SetFederationAPI(nil, nil) + // Create the room if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil { t.Fatalf("failed to send events: %v", err) @@ -237,6 +248,9 @@ func TestPurgeRoom(t *testing.T) { if err = rsAPI.PerformAdminPurgeRoom(ctx, &api.PerformAdminPurgeRoomRequest{RoomID: room.ID}, purgeResp); err != nil { t.Fatal(err) } + // TODO: Find a better solution, e.g. "hook" into the JetStream stream. + // Gives the SyncAPI and FederationAPI some time to delete their entries + time.Sleep(time.Millisecond * 100) roomInfo, err = db.RoomInfo(ctx, room.ID) if err != nil {