From 173b1e8d3e800c0029725bcda321a240b5352f7d Mon Sep 17 00:00:00 2001 From: kegsay Date: Thu, 6 Jan 2022 17:13:34 +0000 Subject: [PATCH 1/2] Fix #2084 - incorrect /event_auth response (#2085) * Fix #2084 * Return early * Linting --- federationapi/routing/eventauth.go | 38 ++++++++++++++++++++++++++---- roomserver/api/query.go | 4 ++++ roomserver/internal/query/query.go | 15 ++++++++++++ 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/federationapi/routing/eventauth.go b/federationapi/routing/eventauth.go index 34eaad1c5..d92b66f4b 100644 --- a/federationapi/routing/eventauth.go +++ b/federationapi/routing/eventauth.go @@ -16,6 +16,7 @@ import ( "context" "net/http" + "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -29,15 +30,42 @@ func GetEventAuth( roomID string, eventID string, ) util.JSONResponse { - // TODO: Optimisation: we shouldn't be querying all the room state - // that is in state.StateEvents - we just ignore it. - state, err := getState(ctx, request, rsAPI, roomID, eventID) + event, resErr := fetchEvent(ctx, rsAPI, eventID) + if resErr != nil { + return *resErr + } + + if event.RoomID() != roomID { + return util.JSONResponse{Code: http.StatusNotFound, JSON: jsonerror.NotFound("event does not belong to this room")} + } + resErr = allowedToSeeEvent(ctx, request.Origin(), rsAPI, eventID) + if resErr != nil { + return *resErr + } + + var response api.QueryStateAndAuthChainResponse + err := rsAPI.QueryStateAndAuthChain( + ctx, + &api.QueryStateAndAuthChainRequest{ + RoomID: roomID, + PrevEventIDs: []string{eventID}, + AuthEventIDs: event.AuthEventIDs(), + OnlyFetchAuthChain: true, + }, + &response, + ) if err != nil { - return *err + return util.ErrorResponse(err) + } + + if !response.RoomExists { + return util.JSONResponse{Code: http.StatusNotFound, JSON: nil} } return util.JSONResponse{ Code: http.StatusOK, - JSON: gomatrixserverlib.RespEventAuth{AuthEvents: state.AuthEvents}, + JSON: gomatrixserverlib.RespEventAuth{ + AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents), + }, } } diff --git a/roomserver/api/query.go b/roomserver/api/query.go index 599156bb1..283217157 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -226,6 +226,10 @@ type QueryStateAndAuthChainRequest struct { PrevEventIDs []string `json:"prev_event_ids"` // The list of auth events for the event. Used to calculate the auth chain AuthEventIDs []string `json:"auth_event_ids"` + // If true, the auth chain events for the auth event IDs given will be fetched only. Prev event IDs are ignored. + // If false, state and auth chain events for the prev event IDs and entire current state will be included. + // TODO: not a great API shape. It serves 2 main uses: false=>response for send_join, true=>response for /event_auth + OnlyFetchAuthChain bool `json:"only_fetch_auth_chain"` // Should state resolution be ran on the result events? // TODO: check call sites and remove if we always want to do state res ResolveState bool `json:"resolve_state"` diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index b80f08ab6..28b140c76 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -457,6 +457,21 @@ func (r *Queryer) QueryStateAndAuthChain( response.RoomExists = true response.RoomVersion = info.RoomVersion + // handle this entirely separately to the other case so we don't have to pull out + // the entire current state of the room + // TODO: this probably means it should be a different query operation... + if request.OnlyFetchAuthChain { + var authEvents []*gomatrixserverlib.Event + authEvents, err = GetAuthChain(ctx, r.DB.EventsFromIDs, request.AuthEventIDs) + if err != nil { + return err + } + for _, event := range authEvents { + response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion)) + } + return nil + } + var stateEvents []*gomatrixserverlib.Event stateEvents, err = r.loadStateAtEventIDs(ctx, *info, request.PrevEventIDs) if err != nil { From a42232143526de360309b112b57cf0d95adf47cb Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 7 Jan 2022 13:41:53 +0000 Subject: [PATCH 2/2] Fix panic at startup if roomserver was not given federation API reference by the time NATS consumes an event, tweak backpressure metrics --- roomserver/internal/api.go | 7 ++++--- roomserver/internal/input/input.go | 9 ++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 5cfe68daa..e370f7e44 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -72,9 +72,6 @@ func NewRoomserverAPI( }, // perform-er structs get initialised when we have a federation sender to use } - if err := a.Inputer.Start(); err != nil { - logrus.WithError(err).Panic("failed to start roomserver input API") - } return a } @@ -140,6 +137,10 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA r.Forgetter = &perform.Forgetter{ DB: r.DB, } + + if err := r.Inputer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start roomserver input API") + } } func (r *RoomserverInternalAPI) SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI) { diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 1eab67802..dbff5fdda 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -59,14 +59,15 @@ func (r *Inputer) Start() error { // later, possibly with an error response to the inputter if synchronous. func(msg *nats.Msg) { roomID := msg.Header.Get("room_id") - defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() var inputRoomEvent api.InputRoomEvent if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { _ = msg.Term() return } inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() inbox.(*phony.Inbox).Act(nil, func() { + defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { sentry.CaptureException(err) } else { @@ -111,15 +112,17 @@ func (r *Inputer) InputRoomEvents( if _, err = r.JetStream.PublishMsg(msg); err != nil { return } - roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() } } else { responses := make(chan error, len(request.InputRoomEvents)) defer close(responses) for _, e := range request.InputRoomEvents { inputRoomEvent := e - inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{}) + roomID := inputRoomEvent.Event.RoomID() + inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() inbox.(*phony.Inbox).Act(nil, func() { + defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() err := r.processRoomEvent(context.TODO(), &inputRoomEvent) if err != nil { sentry.CaptureException(err)