Add workerForRoom
for tidiness
This commit is contained in:
parent
95eb3545d6
commit
8db25eaa65
|
@ -57,6 +57,11 @@ type Inputer struct {
|
||||||
Queryer *query.Queryer
|
Queryer *query.Queryer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Inputer) workerForRoom(roomID string) *phony.Inbox {
|
||||||
|
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
|
||||||
|
return inbox.(*phony.Inbox)
|
||||||
|
}
|
||||||
|
|
||||||
// onMessage is called when a new event arrives in the roomserver input stream.
|
// onMessage is called when a new event arrives in the roomserver input stream.
|
||||||
func (r *Inputer) Start() error {
|
func (r *Inputer) Start() error {
|
||||||
_, err := r.JetStream.Subscribe(
|
_, err := r.JetStream.Subscribe(
|
||||||
|
@ -71,9 +76,8 @@ func (r *Inputer) Start() error {
|
||||||
_ = msg.Term()
|
_ = msg.Term()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
|
|
||||||
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
|
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
|
||||||
inbox.(*phony.Inbox).Act(nil, func() {
|
r.workerForRoom(roomID).Act(nil, func() {
|
||||||
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
||||||
if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil {
|
if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
|
@ -104,7 +108,7 @@ func (r *Inputer) InputRoomEvents(
|
||||||
request *api.InputRoomEventsRequest,
|
request *api.InputRoomEventsRequest,
|
||||||
response *api.InputRoomEventsResponse,
|
response *api.InputRoomEventsResponse,
|
||||||
) {
|
) {
|
||||||
if request.Asynchronous {
|
if false && request.Asynchronous {
|
||||||
var err error
|
var err error
|
||||||
for _, e := range request.InputRoomEvents {
|
for _, e := range request.InputRoomEvents {
|
||||||
msg := &nats.Msg{
|
msg := &nats.Msg{
|
||||||
|
@ -128,9 +132,8 @@ func (r *Inputer) InputRoomEvents(
|
||||||
for _, e := range request.InputRoomEvents {
|
for _, e := range request.InputRoomEvents {
|
||||||
inputRoomEvent := e
|
inputRoomEvent := e
|
||||||
roomID := inputRoomEvent.Event.RoomID()
|
roomID := inputRoomEvent.Event.RoomID()
|
||||||
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
|
|
||||||
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
|
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
|
||||||
inbox.(*phony.Inbox).Act(nil, func() {
|
r.workerForRoom(roomID).Act(nil, func() {
|
||||||
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
||||||
err := r.processRoomEvent(ctx, &inputRoomEvent)
|
err := r.processRoomEvent(ctx, &inputRoomEvent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue