From 94fce4ed5302f9e987adce938440cfb64569eed6 Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Thu, 13 Jul 2023 13:19:47 +0200 Subject: [PATCH] Update comments, add benchmark --- userapi/consumers/roomserver.go | 7 ++--- userapi/consumers/roomserver_test.go | 39 ++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/userapi/consumers/roomserver.go b/userapi/consumers/roomserver.go index 1d0956811..1f866ef4d 100644 --- a/userapi/consumers/roomserver.go +++ b/userapi/consumers/roomserver.go @@ -405,19 +405,20 @@ func newLocalMembership(event *synctypes.ClientEvent) (*localMembership, error) // localRoomMembers fetches the current local members of a room, and // the total number of members. func (s *OutputRoomEventConsumer) localRoomMembers(ctx context.Context, roomID string) ([]*localMembership, int, error) { + // Get only locally joined users to avoid unmarshalling and caching + // membership events we only use to calculate the room size. req := &rsapi.QueryMembershipsForRoomRequest{ RoomID: roomID, JoinedOnly: true, LocalOnly: true, } var res rsapi.QueryMembershipsForRoomResponse - - // XXX: This could potentially race if the state for the event is not known yet - // e.g. the event came over federation but we do not have the full state persisted. if err := s.rsAPI.QueryMembershipsForRoom(ctx, req, &res); err != nil { return nil, 0, err } + // Since we only queried locally joined users above, + // we also need to ask the roomserver about the joined user count. totalCount, err := s.rsAPI.JoinedUserCount(ctx, roomID) if err != nil { return nil, 0, err diff --git a/userapi/consumers/roomserver_test.go b/userapi/consumers/roomserver_test.go index c5e68bc05..49dd5b238 100644 --- a/userapi/consumers/roomserver_test.go +++ b/userapi/consumers/roomserver_test.go @@ -299,3 +299,42 @@ func TestMessageStats(t *testing.T) { } }) } + +func BenchmarkLocalRoomMembers(b *testing.B) { + t := &testing.T{} + + cfg, processCtx, close := testrig.CreateConfig(t, test.DBTypePostgres) + defer close() + cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) + natsInstance := &jetstream.NATSInstance{} + caches := caching.NewRistrettoCache(8*1024*1024, time.Hour, caching.DisableMetrics) + rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics) + rsAPI.SetFederationAPI(nil, nil) + db, err := storage.NewUserDatabase(processCtx.Context(), cm, &cfg.UserAPI.AccountDatabase, cfg.Global.ServerName, bcrypt.MinCost, 1000, 1000, "") + assert.NoError(b, err) + + consumer := OutputRoomEventConsumer{db: db, rsAPI: rsAPI, serverName: "test", cfg: &cfg.UserAPI} + _, sk, err := ed25519.GenerateKey(nil) + assert.NoError(b, err) + + alice := test.NewUser(t) + room := test.NewRoom(t, alice) + + for i := 0; i < 100; i++ { + user := test.NewUser(t, test.WithSigningServer("notlocalhost", "ed25519:abc", sk)) + room.CreateAndInsert(t, user, spec.MRoomMember, map[string]string{"membership": spec.Join}, test.WithStateKey(user.ID)) + } + + err = rsapi.SendEvents(processCtx.Context(), rsAPI, rsapi.KindNew, room.Events(), "", "test", "test", nil, false) + assert.NoError(b, err) + + expectedLocalMember := &localMembership{UserID: alice.ID, Localpart: alice.Localpart, Domain: "test", MemberContent: gomatrixserverlib.MemberContent{Membership: spec.Join}} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + members, count, err := consumer.localRoomMembers(processCtx.Context(), room.ID) + assert.NoError(b, err) + assert.Equal(b, 101, count) + assert.Equal(b, expectedLocalMember, members[0]) + } +}