From e385c8d4c74790f0df26eb1bcc11b1dc5d91d95d Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 15 Jun 2022 12:15:28 +0100 Subject: [PATCH] Cache expiries, async publishing of EDUs --- federationapi/producers/syncapi.go | 8 ++++---- internal/caching/impl_ristretto.go | 16 ++++++++++++---- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/federationapi/producers/syncapi.go b/federationapi/producers/syncapi.go index 494150036..be773f23c 100644 --- a/federationapi/producers/syncapi.go +++ b/federationapi/producers/syncapi.go @@ -54,7 +54,7 @@ func (p *SyncAPIProducer) SendReceipt( m.Header.Set("timestamp", strconv.Itoa(int(timestamp))) log.WithFields(log.Fields{}).Tracef("Producing to topic '%s'", p.TopicReceiptEvent) - _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) + _, err := p.JetStream.PublishMsgAsync(m, nats.Context(ctx)) return err } @@ -122,7 +122,7 @@ func (p *SyncAPIProducer) SendToDevice( m.Header.Set("sender", sender) m.Header.Set(jetstream.UserID, userID) - if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil { + if _, err = p.JetStream.PublishMsgAsync(m, nats.Context(ctx)); err != nil { log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") return err } @@ -141,7 +141,7 @@ func (p *SyncAPIProducer) SendTyping( m.Header.Set(jetstream.RoomID, roomID) m.Header.Set("typing", strconv.FormatBool(typing)) m.Header.Set("timeout_ms", strconv.Itoa(int(timeoutMS))) - _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) + _, err := p.JetStream.PublishMsgAsync(m, nats.Context(ctx)) return err } @@ -158,6 +158,6 @@ func (p *SyncAPIProducer) SendPresence( m.Header.Set("last_active_ts", strconv.Itoa(int(lastActiveTS))) log.Debugf("Sending presence to syncAPI: %+v", m.Header) - _, err := p.JetStream.PublishMsg(m, nats.Context(ctx)) + _, err := p.JetStream.PublishMsgAsync(m, nats.Context(ctx)) return err } diff --git a/internal/caching/impl_ristretto.go b/internal/caching/impl_ristretto.go index 9ee9c99cf..11e871fb2 100644 --- a/internal/caching/impl_ristretto.go +++ b/internal/caching/impl_ristretto.go @@ -44,38 +44,46 @@ func NewRistrettoCache(maxCost CacheSize, enablePrometheus bool) (*Caches, error return &Caches{ RoomVersions: &RistrettoCachePartition[string, gomatrixserverlib.RoomVersion]{ - cache: MustCreateCache("room_versions", 1*MB, enablePrometheus), + cache: MustCreateCache("room_versions", 1*MB, enablePrometheus), + MaxAge: time.Hour, }, ServerKeys: &RistrettoCachePartition[string, gomatrixserverlib.PublicKeyLookupResult]{ cache: MustCreateCache("server_keys", 32*MB, enablePrometheus), Mutable: true, + MaxAge: time.Hour, }, RoomServerRoomIDs: &RistrettoCachePartition[int64, string]{ - cache: MustCreateCache("room_ids", 1*MB, enablePrometheus), + cache: MustCreateCache("room_ids", 1*MB, enablePrometheus), + MaxAge: time.Hour, }, RoomServerEvents: &RistrettoCachePartition[int64, *gomatrixserverlib.Event]{ - cache: MustCreateCache("room_events", 1*GB, enablePrometheus), + cache: MustCreateCache("room_events", 1*GB, enablePrometheus), + MaxAge: time.Hour, }, RoomInfos: &RistrettoCachePartition[string, types.RoomInfo]{ cache: MustCreateCache("room_infos", 16*MB, enablePrometheus), Mutable: true, - MaxAge: time.Minute * 5, + MaxAge: time.Hour, }, FederationPDUs: &RistrettoCachePartition[int64, *gomatrixserverlib.HeaderedEvent]{ cache: MustCreateCache("federation_pdus", 128*MB, enablePrometheus), Mutable: true, + MaxAge: time.Hour / 2, }, FederationEDUs: &RistrettoCachePartition[int64, *gomatrixserverlib.EDU]{ cache: MustCreateCache("federation_edus", 128*MB, enablePrometheus), Mutable: true, + MaxAge: time.Hour / 2, }, SpaceSummaryRooms: &RistrettoCachePartition[string, gomatrixserverlib.MSC2946SpacesResponse]{ cache: MustCreateCache("space_summary_rooms", 128, enablePrometheus), // TODO: not costed Mutable: true, + MaxAge: time.Hour, }, LazyLoading: &RistrettoCachePartition[string, any]{ // TODO: type cache: MustCreateCache("lazy_loading", 256, enablePrometheus), // TODO: not costed Mutable: true, + MaxAge: time.Hour, }, }, nil }