From 527ef5df0555d366c8cf6c012a023933dcd7fdb4 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 23 Mar 2022 09:56:43 +0000 Subject: [PATCH] Re-jig control flow a bit --- roomserver/internal/input/input.go | 57 ++++++++++++++---------------- 1 file changed, 26 insertions(+), 31 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index c90c3536f..6a8ae6d00 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -19,6 +19,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "sync" "time" @@ -283,9 +284,26 @@ func (w *worker) _next() { func (r *Inputer) queueInputRoomEvents( ctx context.Context, request *api.InputRoomEventsRequest, - response *api.InputRoomEventsResponse, - replyTo string, // empty string if not synchronous -) (err error) { +) (replySub *nats.Subscription, err error) { + // If the request is synchronous then we need to create a + // temporary inbox to wait for responses on, and then create + // a subscription to it. If it's asynchronous then we won't + // bother, so these values will remain empty. + var replyTo string + if !request.Asynchronous { + replyTo = nats.NewInbox() + replySub, err = r.NATSClient.SubscribeSync(replyTo) + if err != nil { + return nil, fmt.Errorf("r.NATSClient.SubscribeSync: %w", err) + } + if replySub == nil { + // This shouldn't ever happen, but it doesn't hurt to check + // because we can potentially avoid a nil pointer panic later + // if it did for some reason. + return nil, fmt.Errorf("expected a subscription to the temporary inbox") + } + } + // For each event, marshal the input room event and then // send it into the input queue. for _, e := range request.InputRoomEvents { @@ -301,8 +319,7 @@ func (r *Inputer) queueInputRoomEvents( } msg.Data, err = json.Marshal(e) if err != nil { - response.ErrMsg = err.Error() - return + return nil, fmt.Errorf("json.Marshal: %w", err) } if _, err = r.JetStream.PublishMsg(msg, nats.Context(ctx)); err != nil { logrus.WithError(err).WithFields(logrus.Fields{ @@ -310,7 +327,7 @@ func (r *Inputer) queueInputRoomEvents( "event_id": e.Event.EventID(), "subj": subj, }).Error("Roomserver failed to queue async event") - return + return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err) } } return @@ -322,38 +339,16 @@ func (r *Inputer) InputRoomEvents( request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, ) { - // If the request is synchronous then we need to create a - // temporary inbox to wait for responses on, and then create - // a subscription to it. If it's asynchronous then we won't - // bother, so these values will remain empty. - var replyTo string - var replySub *nats.Subscription - if !request.Asynchronous { - var err error - replyTo = nats.NewInbox() - replySub, err = r.NATSClient.SubscribeSync(replyTo) - if err != nil { - response.ErrMsg = err.Error() - return - } - if replySub == nil { - // This shouldn't ever happen, but it doesn't hurt to check - // because we can potentially avoid a nil pointer panic later - // if it did for some reason. - response.ErrMsg = "no subscription to temporary inbox" - return - } - } - // Queue up the event into the roomserver. - if err := r.queueInputRoomEvents(ctx, request, response, replyTo); err != nil { + replySub, err := r.queueInputRoomEvents(ctx, request) + if err != nil { response.ErrMsg = err.Error() return } // If we aren't waiting for synchronous responses then we can // give up here, there is nothing further to do. - if request.Asynchronous { + if replySub == nil { return }