mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-02 19:53:10 -06:00
Re-jig control flow a bit
This commit is contained in:
parent
e62ee5cd8e
commit
527ef5df05
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -283,9 +284,26 @@ func (w *worker) _next() {
|
||||||
func (r *Inputer) queueInputRoomEvents(
|
func (r *Inputer) queueInputRoomEvents(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.InputRoomEventsRequest,
|
request *api.InputRoomEventsRequest,
|
||||||
response *api.InputRoomEventsResponse,
|
) (replySub *nats.Subscription, err error) {
|
||||||
replyTo string, // empty string if not synchronous
|
// If the request is synchronous then we need to create a
|
||||||
) (err error) {
|
// temporary inbox to wait for responses on, and then create
|
||||||
|
// a subscription to it. If it's asynchronous then we won't
|
||||||
|
// bother, so these values will remain empty.
|
||||||
|
var replyTo string
|
||||||
|
if !request.Asynchronous {
|
||||||
|
replyTo = nats.NewInbox()
|
||||||
|
replySub, err = r.NATSClient.SubscribeSync(replyTo)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("r.NATSClient.SubscribeSync: %w", err)
|
||||||
|
}
|
||||||
|
if replySub == nil {
|
||||||
|
// This shouldn't ever happen, but it doesn't hurt to check
|
||||||
|
// because we can potentially avoid a nil pointer panic later
|
||||||
|
// if it did for some reason.
|
||||||
|
return nil, fmt.Errorf("expected a subscription to the temporary inbox")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// For each event, marshal the input room event and then
|
// For each event, marshal the input room event and then
|
||||||
// send it into the input queue.
|
// send it into the input queue.
|
||||||
for _, e := range request.InputRoomEvents {
|
for _, e := range request.InputRoomEvents {
|
||||||
|
|
@ -301,8 +319,7 @@ func (r *Inputer) queueInputRoomEvents(
|
||||||
}
|
}
|
||||||
msg.Data, err = json.Marshal(e)
|
msg.Data, err = json.Marshal(e)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
response.ErrMsg = err.Error()
|
return nil, fmt.Errorf("json.Marshal: %w", err)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
if _, err = r.JetStream.PublishMsg(msg, nats.Context(ctx)); err != nil {
|
if _, err = r.JetStream.PublishMsg(msg, nats.Context(ctx)); err != nil {
|
||||||
logrus.WithError(err).WithFields(logrus.Fields{
|
logrus.WithError(err).WithFields(logrus.Fields{
|
||||||
|
|
@ -310,7 +327,7 @@ func (r *Inputer) queueInputRoomEvents(
|
||||||
"event_id": e.Event.EventID(),
|
"event_id": e.Event.EventID(),
|
||||||
"subj": subj,
|
"subj": subj,
|
||||||
}).Error("Roomserver failed to queue async event")
|
}).Error("Roomserver failed to queue async event")
|
||||||
return
|
return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
|
@ -322,38 +339,16 @@ func (r *Inputer) InputRoomEvents(
|
||||||
request *api.InputRoomEventsRequest,
|
request *api.InputRoomEventsRequest,
|
||||||
response *api.InputRoomEventsResponse,
|
response *api.InputRoomEventsResponse,
|
||||||
) {
|
) {
|
||||||
// If the request is synchronous then we need to create a
|
|
||||||
// temporary inbox to wait for responses on, and then create
|
|
||||||
// a subscription to it. If it's asynchronous then we won't
|
|
||||||
// bother, so these values will remain empty.
|
|
||||||
var replyTo string
|
|
||||||
var replySub *nats.Subscription
|
|
||||||
if !request.Asynchronous {
|
|
||||||
var err error
|
|
||||||
replyTo = nats.NewInbox()
|
|
||||||
replySub, err = r.NATSClient.SubscribeSync(replyTo)
|
|
||||||
if err != nil {
|
|
||||||
response.ErrMsg = err.Error()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if replySub == nil {
|
|
||||||
// This shouldn't ever happen, but it doesn't hurt to check
|
|
||||||
// because we can potentially avoid a nil pointer panic later
|
|
||||||
// if it did for some reason.
|
|
||||||
response.ErrMsg = "no subscription to temporary inbox"
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Queue up the event into the roomserver.
|
// Queue up the event into the roomserver.
|
||||||
if err := r.queueInputRoomEvents(ctx, request, response, replyTo); err != nil {
|
replySub, err := r.queueInputRoomEvents(ctx, request)
|
||||||
|
if err != nil {
|
||||||
response.ErrMsg = err.Error()
|
response.ErrMsg = err.Error()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we aren't waiting for synchronous responses then we can
|
// If we aren't waiting for synchronous responses then we can
|
||||||
// give up here, there is nothing further to do.
|
// give up here, there is nothing further to do.
|
||||||
if request.Asynchronous {
|
if replySub == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue