From 0dc4ceaa2d8e46aa0134c1aabe96389ba4c1591d Mon Sep 17 00:00:00 2001 From: Kegsay Date: Fri, 12 Jun 2020 15:11:33 +0100 Subject: [PATCH] Minor perf/debugging improvements (#1121) * Minor perf/debugging improvements - publicroomsapi: Don't call QueryEventsByID with no event IDs - appservice: Consume only if there are 1 or more ASes - roomserver: don't keep a copy of the request "for debugging" - we trace now * fedsender: return early if we have no destinations * Unbreak tests --- appservice/appservice.go | 16 ++++++++++------ federationapi/routing/send_test.go | 3 --- federationsender/queue/queue.go | 3 +++ publicroomsapi/consumers/roomserver.go | 11 ++++++----- roomserver/api/query.go | 8 -------- roomserver/internal/query.go | 5 ----- 6 files changed, 19 insertions(+), 27 deletions(-) diff --git a/appservice/appservice.go b/appservice/appservice.go index 0fbe3f20b..bd261ff9b 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -86,12 +86,16 @@ func NewInternalAPI( Cfg: base.Cfg, } - consumer := consumers.NewOutputRoomEventConsumer( - base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB, - rsAPI, workerStates, - ) - if err := consumer.Start(); err != nil { - logrus.WithError(err).Panicf("failed to start appservice roomserver consumer") + // Only consume if we actually have ASes to track, else we'll just chew cycles needlessly. + // We can't add ASes at runtime so this is safe to do. + if len(workerStates) > 0 { + consumer := consumers.NewOutputRoomEventConsumer( + base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB, + rsAPI, workerStates, + ) + if err := consumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start appservice roomserver consumer") + } } // Create application service transaction workers diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index adae7c221..3123b55c9 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -128,7 +128,6 @@ func (t *testRoomserverAPI) QueryLatestEventsAndState( response *api.QueryLatestEventsAndStateResponse, ) error { r := t.queryLatestEventsAndState(request) - response.QueryLatestEventsAndStateRequest = *request response.RoomExists = r.RoomExists response.RoomVersion = testRoomVersion response.LatestEvents = r.LatestEvents @@ -144,7 +143,6 @@ func (t *testRoomserverAPI) QueryStateAfterEvents( response *api.QueryStateAfterEventsResponse, ) error { response.RoomVersion = testRoomVersion - response.QueryStateAfterEventsRequest = *request res := t.queryStateAfterEvents(request) response.PrevEventsExist = res.PrevEventsExist response.RoomExists = res.RoomExists @@ -612,7 +610,6 @@ func TestTransactionFetchMissingStateByStateIDs(t *testing.T) { } } } - res.QueryEventsByIDRequest = *req return res }, } diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 5b8fc3c56..240343559 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -107,6 +107,9 @@ func (oqs *OutgoingQueues) SendEvent( // Remove our own server from the list of destinations. destinations = filterAndDedupeDests(oqs.origin, destinations) + if len(destinations) == 0 { + return nil + } log.WithFields(log.Fields{ "destinations": destinations, "event": ev.EventID(), diff --git a/publicroomsapi/consumers/roomserver.go b/publicroomsapi/consumers/roomserver.go index ba187cb11..b9686d56d 100644 --- a/publicroomsapi/consumers/roomserver.go +++ b/publicroomsapi/consumers/roomserver.go @@ -78,18 +78,19 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs} var remQueryRes api.QueryEventsByIDResponse - if err := s.rsAPI.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil { - log.Warn(err) - return err + if len(output.NewRoomEvent.RemovesStateEventIDs) > 0 { + remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs} + if err := s.rsAPI.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil { + log.Warn(err) + return err + } } var addQueryEvents, remQueryEvents []gomatrixserverlib.Event for _, headeredEvent := range output.NewRoomEvent.AddsState() { addQueryEvents = append(addQueryEvents, headeredEvent.Event) } - addQueryEvents = append(addQueryEvents, output.NewRoomEvent.Event.Unwrap()) for _, headeredEvent := range remQueryRes.Events { remQueryEvents = append(remQueryEvents, headeredEvent.Event) } diff --git a/roomserver/api/query.go b/roomserver/api/query.go index b1525342e..6586b1af3 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -33,8 +33,6 @@ type QueryLatestEventsAndStateRequest struct { // This is used when sending events to set the prev_events, auth_events and depth. // It is also used to tell whether the event is allowed by the event auth rules. type QueryLatestEventsAndStateResponse struct { - // Copy of the request for debugging. - QueryLatestEventsAndStateRequest // Does the room exist? // If the room doesn't exist this will be false and LatestEvents will be empty. RoomExists bool `json:"room_exists"` @@ -66,8 +64,6 @@ type QueryStateAfterEventsRequest struct { // QueryStateAfterEventsResponse is a response to QueryStateAfterEvents type QueryStateAfterEventsResponse struct { - // Copy of the request for debugging. - QueryStateAfterEventsRequest // Does the room exist on this roomserver? // If the room doesn't exist this will be false and StateEvents will be empty. RoomExists bool `json:"room_exists"` @@ -89,8 +85,6 @@ type QueryEventsByIDRequest struct { // QueryEventsByIDResponse is a response to QueryEventsByID type QueryEventsByIDResponse struct { - // Copy of the request for debugging. - QueryEventsByIDRequest // A list of events with the requested IDs. // If the roomserver does not have a copy of a requested event // then it will omit that event from the list. @@ -187,8 +181,6 @@ type QueryStateAndAuthChainRequest struct { // QueryStateAndAuthChainResponse is a response to QueryStateAndAuthChain type QueryStateAndAuthChainResponse struct { - // Copy of the request for debugging. - QueryStateAndAuthChainRequest // Does the room exist on this roomserver? // If the room doesn't exist this will be false and StateEvents will be empty. RoomExists bool `json:"room_exists"` diff --git a/roomserver/internal/query.go b/roomserver/internal/query.go index 375ddc23c..4fc8e4c25 100644 --- a/roomserver/internal/query.go +++ b/roomserver/internal/query.go @@ -45,7 +45,6 @@ func (r *RoomserverInternalAPI) QueryLatestEventsAndState( roomState := state.NewStateResolution(r.DB) - response.QueryLatestEventsAndStateRequest = *request roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID) if err != nil { return err @@ -105,7 +104,6 @@ func (r *RoomserverInternalAPI) QueryStateAfterEvents( roomState := state.NewStateResolution(r.DB) - response.QueryStateAfterEventsRequest = *request roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID) if err != nil { return err @@ -153,8 +151,6 @@ func (r *RoomserverInternalAPI) QueryEventsByID( request *api.QueryEventsByIDRequest, response *api.QueryEventsByIDResponse, ) error { - response.QueryEventsByIDRequest = *request - eventNIDMap, err := r.DB.EventNIDs(ctx, request.EventIDs) if err != nil { return err @@ -734,7 +730,6 @@ func (r *RoomserverInternalAPI) QueryStateAndAuthChain( request *api.QueryStateAndAuthChainRequest, response *api.QueryStateAndAuthChainResponse, ) error { - response.QueryStateAndAuthChainRequest = *request roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID) if err != nil { return err