Cache expiries, async publishing of EDUs

This commit is contained in:
Neil Alexander 2022-06-15 12:15:28 +01:00
parent 4e8f1a7d3f
commit e385c8d4c7
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 16 additions and 8 deletions

View file

@ -54,7 +54,7 @@ func (p *SyncAPIProducer) SendReceipt(
m.Header.Set("timestamp", strconv.Itoa(int(timestamp)))
log.WithFields(log.Fields{}).Tracef("Producing to topic '%s'", p.TopicReceiptEvent)
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
_, err := p.JetStream.PublishMsgAsync(m, nats.Context(ctx))
return err
}
@ -122,7 +122,7 @@ func (p *SyncAPIProducer) SendToDevice(
m.Header.Set("sender", sender)
m.Header.Set(jetstream.UserID, userID)
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
if _, err = p.JetStream.PublishMsgAsync(m, nats.Context(ctx)); err != nil {
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
return err
}
@ -141,7 +141,7 @@ func (p *SyncAPIProducer) SendTyping(
m.Header.Set(jetstream.RoomID, roomID)
m.Header.Set("typing", strconv.FormatBool(typing))
m.Header.Set("timeout_ms", strconv.Itoa(int(timeoutMS)))
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
_, err := p.JetStream.PublishMsgAsync(m, nats.Context(ctx))
return err
}
@ -158,6 +158,6 @@ func (p *SyncAPIProducer) SendPresence(
m.Header.Set("last_active_ts", strconv.Itoa(int(lastActiveTS)))
log.Debugf("Sending presence to syncAPI: %+v", m.Header)
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
_, err := p.JetStream.PublishMsgAsync(m, nats.Context(ctx))
return err
}

View file

@ -45,37 +45,45 @@ func NewRistrettoCache(maxCost CacheSize, enablePrometheus bool) (*Caches, error
return &Caches{
RoomVersions: &RistrettoCachePartition[string, gomatrixserverlib.RoomVersion]{
cache: MustCreateCache("room_versions", 1*MB, enablePrometheus),
MaxAge: time.Hour,
},
ServerKeys: &RistrettoCachePartition[string, gomatrixserverlib.PublicKeyLookupResult]{
cache: MustCreateCache("server_keys", 32*MB, enablePrometheus),
Mutable: true,
MaxAge: time.Hour,
},
RoomServerRoomIDs: &RistrettoCachePartition[int64, string]{
cache: MustCreateCache("room_ids", 1*MB, enablePrometheus),
MaxAge: time.Hour,
},
RoomServerEvents: &RistrettoCachePartition[int64, *gomatrixserverlib.Event]{
cache: MustCreateCache("room_events", 1*GB, enablePrometheus),
MaxAge: time.Hour,
},
RoomInfos: &RistrettoCachePartition[string, types.RoomInfo]{
cache: MustCreateCache("room_infos", 16*MB, enablePrometheus),
Mutable: true,
MaxAge: time.Minute * 5,
MaxAge: time.Hour,
},
FederationPDUs: &RistrettoCachePartition[int64, *gomatrixserverlib.HeaderedEvent]{
cache: MustCreateCache("federation_pdus", 128*MB, enablePrometheus),
Mutable: true,
MaxAge: time.Hour / 2,
},
FederationEDUs: &RistrettoCachePartition[int64, *gomatrixserverlib.EDU]{
cache: MustCreateCache("federation_edus", 128*MB, enablePrometheus),
Mutable: true,
MaxAge: time.Hour / 2,
},
SpaceSummaryRooms: &RistrettoCachePartition[string, gomatrixserverlib.MSC2946SpacesResponse]{
cache: MustCreateCache("space_summary_rooms", 128, enablePrometheus), // TODO: not costed
Mutable: true,
MaxAge: time.Hour,
},
LazyLoading: &RistrettoCachePartition[string, any]{ // TODO: type
cache: MustCreateCache("lazy_loading", 256, enablePrometheus), // TODO: not costed
Mutable: true,
MaxAge: time.Hour,
},
}, nil
}