From 11b8dc0d0b75e340383fd3de848104731169a142 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Nov 2017 15:39:43 +0000 Subject: [PATCH 1/3] Add span to kafka streams --- .../clientapi/consumers/roomserver.go | 5 +- .../dendrite/roomserver/api/output.go | 41 +++++++++++++++ .../dendrite/roomserver/input/events.go | 2 +- .../roomserver/input/latest_events.go | 11 +++- .../dendrite/roomserver/input/membership.go | 50 ++++++++++++++----- 5 files changed, 93 insertions(+), 16 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go index 0ee7c6bf0..ad8acacff 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go @@ -77,6 +77,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } + ctx, span := output.StartSpanAndReplaceContext(context.Background()) + defer span.Finish() + if output.Type != api.OutputTypeNewRoomEvent { log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", @@ -96,7 +99,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return err } - return s.db.UpdateMemberships(context.TODO(), events, output.NewRoomEvent.RemovesStateEventIDs) + return s.db.UpdateMemberships(ctx, events, output.NewRoomEvent.RemovesStateEventIDs) } // lookupStateEvents looks up the state events that are added by a new event. diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/output.go b/src/github.com/matrix-org/dendrite/roomserver/api/output.go index 6a5c924c6..522f1b613 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/output.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/output.go @@ -15,7 +15,11 @@ package api import ( + "context" + "github.com/matrix-org/gomatrixserverlib" + opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" ) // An OutputType is a type of roomserver output. @@ -41,6 +45,43 @@ type OutputEvent struct { NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"` // The content of event with type OutputTypeRetireInviteEvent RetireInviteEvent *OutputRetireInviteEvent `json:"retire_invite_event,omitempty"` + // Serialized span context + OpentracingCarrier opentracing.TextMapCarrier `json:"opentracing_carrier"` +} + +// AddSpanFromContext fills out the OpentracingCarrier field from the given context +func (o *OutputEvent) AddSpanFromContext(ctx context.Context) error { + span := opentracing.SpanFromContext(ctx) + ext.SpanKindProducer.Set(span) + var carrier opentracing.TextMapCarrier + tracer := opentracing.GlobalTracer() + + err := tracer.Inject(span.Context(), opentracing.TextMap, carrier) + if err != nil { + return err + } + + o.OpentracingCarrier = carrier + + return nil +} + +func (o *OutputEvent) StartSpanAndReplaceContext( + ctx context.Context, +) (context.Context, opentracing.Span) { + tracer := opentracing.GlobalTracer() + producerContext, err := tracer.Extract(opentracing.TextMap, o.OpentracingCarrier) + + var span opentracing.Span + if err == nil { + // Default to a span without reference to producer context. + span = tracer.StartSpan("room_event_consumer") + } else { + // Set the producer context. + span = tracer.StartSpan("room_event_consumer", opentracing.FollowsFrom(producerContext)) + } + + return opentracing.ContextWithSpan(ctx, span), span } // An OutputNewRoomEvent is written when the roomserver receives a new event. diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index 9032219ee..31fd5f2f8 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -183,7 +183,7 @@ func processInviteEvent( return nil } - outputUpdates, err := updateToInviteMembership(updater, &input.Event, nil) + outputUpdates, err := updateToInviteMembership(ctx, updater, &input.Event, nil) if err != nil { return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go index 5767daab8..9fb945c41 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go @@ -275,10 +275,17 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) } ore.SendAsServer = u.sendAsServer - return &api.OutputEvent{ + oe := api.OutputEvent{ Type: api.OutputTypeNewRoomEvent, NewRoomEvent: &ore, - }, nil + } + + err = oe.AddSpanFromContext(u.ctx) + if err != nil { + return nil, err + } + + return &oe, nil } type eventNIDSorter []types.EventNID diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/membership.go b/src/github.com/matrix-org/dendrite/roomserver/input/membership.go index 4c42cadd9..5e1d0f935 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/membership.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/membership.go @@ -77,7 +77,7 @@ func updateMemberships( ae = &ev.Event } } - if updates, err = updateMembership(updater, targetUserNID, re, ae, updates); err != nil { + if updates, err = updateMembership(ctx, updater, targetUserNID, re, ae, updates); err != nil { return nil, err } } @@ -85,6 +85,7 @@ func updateMemberships( } func updateMembership( + ctx context.Context, updater types.RoomRecentEventsUpdater, targetUserNID types.EventStateKeyNID, remove, add *gomatrixserverlib.Event, updates []api.OutputEvent, @@ -119,11 +120,11 @@ func updateMembership( switch new { case invite: - return updateToInviteMembership(mu, add, updates) + return updateToInviteMembership(ctx, mu, add, updates) case join: - return updateToJoinMembership(mu, add, updates) + return updateToJoinMembership(ctx, mu, add, updates) case leave, ban: - return updateToLeaveMembership(mu, add, new, updates) + return updateToLeaveMembership(ctx, mu, add, new, updates) default: panic(fmt.Errorf( "input: membership %q is not one of the allowed values", new, @@ -132,7 +133,7 @@ func updateMembership( } func updateToInviteMembership( - mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, + ctx context.Context, mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, ) ([]api.OutputEvent, error) { // We may have already sent the invite to the user, either because we are // reprocessing this event, or because the we received this invite from a @@ -151,16 +152,24 @@ func updateToInviteMembership( onie := api.OutputNewInviteEvent{ Event: *add, } - updates = append(updates, api.OutputEvent{ + + oe := api.OutputEvent{ Type: api.OutputTypeNewInviteEvent, NewInviteEvent: &onie, - }) + } + + err = oe.AddSpanFromContext(ctx) + if err != nil { + return nil, err + } + + updates = append(updates, oe) } return updates, nil } func updateToJoinMembership( - mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, + ctx context.Context, mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, ) ([]api.OutputEvent, error) { // If the user is already marked as being joined, we call SetToJoin to update // the event ID then we can return immediately. Retired is ignored as there @@ -187,15 +196,24 @@ func updateToJoinMembership( RetiredByEventID: add.EventID(), TargetUserID: *add.StateKey(), } - updates = append(updates, api.OutputEvent{ + + oe := api.OutputEvent{ Type: api.OutputTypeRetireInviteEvent, RetireInviteEvent: &orie, - }) + } + + err = oe.AddSpanFromContext(ctx) + if err != nil { + return nil, err + } + + updates = append(updates, oe) } return updates, nil } func updateToLeaveMembership( + ctx context.Context, mu types.MembershipUpdater, add *gomatrixserverlib.Event, newMembership string, updates []api.OutputEvent, ) ([]api.OutputEvent, error) { @@ -219,10 +237,18 @@ func updateToLeaveMembership( RetiredByEventID: add.EventID(), TargetUserID: *add.StateKey(), } - updates = append(updates, api.OutputEvent{ + + oe := api.OutputEvent{ Type: api.OutputTypeRetireInviteEvent, RetireInviteEvent: &orie, - }) + } + + err = oe.AddSpanFromContext(ctx) + if err != nil { + return nil, err + } + + updates = append(updates, oe) } return updates, nil } From d843b515548915c1664435cf2a9cd0eb5217e757 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Nov 2017 16:19:22 +0000 Subject: [PATCH 2/3] Create opentracing spans on kafka consumers --- .../federationsender/consumers/roomserver.go | 24 ++++++++++++------- .../publicroomsapi/consumers/roomserver.go | 9 ++++--- .../dendrite/roomserver/api/output.go | 8 +++---- .../dendrite/syncapi/consumers/roomserver.go | 9 ++++--- 4 files changed, 31 insertions(+), 19 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go index 45e48f166..5e856ebbf 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go @@ -85,6 +85,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { ) return nil } + + ctx, span := output.StartSpanAndReplaceContext(context.Background()) + defer span.Finish() + ev := &output.NewRoomEvent.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), @@ -92,7 +96,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { "send_as_server": output.NewRoomEvent.SendAsServer, }).Info("received event from roomserver") - if err := s.processMessage(*output.NewRoomEvent); err != nil { + if err := s.processMessage(ctx, *output.NewRoomEvent); err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ "event": string(ev.JSON()), @@ -108,8 +112,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // processMessage updates the list of currently joined hosts in the room // and then sends the event to the hosts that were joined before the event. -func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { - addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event) +func (s *OutputRoomEventConsumer) processMessage( + ctx context.Context, ore api.OutputNewRoomEvent, +) error { + addsStateEvents, err := s.lookupStateEvents(ctx, ore.AddsStateEventIDs, ore.Event) if err != nil { return err } @@ -123,7 +129,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err // TODO(#290): handle EventIDMismatchError and recover the current state by // talking to the roomserver oldJoinedHosts, err := s.db.UpdateRoom( - context.TODO(), + ctx, ore.Event.RoomID(), ore.LastSentEventID, ore.Event.EventID(), @@ -148,7 +154,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err } // Work out which hosts were joined at the event itself. - joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, oldJoinedHosts) + joinedHostsAtEvent, err := s.joinedHostsAtEvent(ctx, ore, oldJoinedHosts) if err != nil { return err } @@ -169,7 +175,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err // events from the room server. // Returns an error if there was a problem talking to the room server. func (s *OutputRoomEventConsumer) joinedHostsAtEvent( - ore api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost, + ctx context.Context, ore api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost, ) ([]gomatrixserverlib.ServerName, error) { // Combine the delta into a single delta so that the adds and removes can // cancel each other out. This should reduce the number of times we need @@ -178,7 +184,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent( ore.AddsStateEventIDs, ore.RemovesStateEventIDs, ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs, ) - combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ore.Event) + combinedAddsEvents, err := s.lookupStateEvents(ctx, combinedAdds, ore.Event) if err != nil { return nil, err } @@ -288,7 +294,7 @@ func combineDeltas(adds1, removes1, adds2, removes2 []string) (adds, removes []s // lookupStateEvents looks up the state events that are added by a new event. func (s *OutputRoomEventConsumer) lookupStateEvents( - addsStateEventIDs []string, event gomatrixserverlib.Event, + ctx context.Context, addsStateEventIDs []string, event gomatrixserverlib.Event, ) ([]gomatrixserverlib.Event, error) { // Fast path if there aren't any new state events. if len(addsStateEventIDs) == 0 { @@ -321,7 +327,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents( // from the roomserver using the query API. eventReq := api.QueryEventsByIDRequest{EventIDs: missing} var eventResp api.QueryEventsByIDResponse - if err := s.query.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil { + if err := s.query.QueryEventsByID(ctx, &eventReq, &eventResp); err != nil { return nil, err } diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go index b7d42b111..3a83ba670 100644 --- a/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go @@ -77,6 +77,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } + ctx, span := output.StartSpanAndReplaceContext(context.Background()) + defer span.Finish() + ev := output.NewRoomEvent.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), @@ -86,17 +89,17 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { addQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.AddsStateEventIDs} var addQueryRes api.QueryEventsByIDResponse - if err := s.query.QueryEventsByID(context.TODO(), &addQueryReq, &addQueryRes); err != nil { + if err := s.query.QueryEventsByID(ctx, &addQueryReq, &addQueryRes); err != nil { log.Warn(err) return err } remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs} var remQueryRes api.QueryEventsByIDResponse - if err := s.query.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil { + if err := s.query.QueryEventsByID(ctx, &remQueryReq, &remQueryRes); err != nil { log.Warn(err) return err } - return s.db.UpdateRoomFromEvents(context.TODO(), addQueryRes.Events, remQueryRes.Events) + return s.db.UpdateRoomFromEvents(ctx, addQueryRes.Events, remQueryRes.Events) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/output.go b/src/github.com/matrix-org/dendrite/roomserver/api/output.go index 522f1b613..3c928202f 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/output.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/output.go @@ -53,7 +53,7 @@ type OutputEvent struct { func (o *OutputEvent) AddSpanFromContext(ctx context.Context) error { span := opentracing.SpanFromContext(ctx) ext.SpanKindProducer.Set(span) - var carrier opentracing.TextMapCarrier + carrier := make(opentracing.TextMapCarrier) tracer := opentracing.GlobalTracer() err := tracer.Inject(span.Context(), opentracing.TextMap, carrier) @@ -73,12 +73,12 @@ func (o *OutputEvent) StartSpanAndReplaceContext( producerContext, err := tracer.Extract(opentracing.TextMap, o.OpentracingCarrier) var span opentracing.Span - if err == nil { + if err != nil { // Default to a span without reference to producer context. - span = tracer.StartSpan("room_event_consumer") + span = tracer.StartSpan("output_event_consumer") } else { // Set the producer context. - span = tracer.StartSpan("room_event_consumer", opentracing.FollowsFrom(producerContext)) + span = tracer.StartSpan("output_event_consumer", opentracing.FollowsFrom(producerContext)) } return opentracing.ContextWithSpan(ctx, span), span diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index 677eeb42b..a02c4be43 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -80,13 +80,16 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } + ctx, span := output.StartSpanAndReplaceContext(context.Background()) + defer span.Finish() + switch output.Type { case api.OutputTypeNewRoomEvent: - return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) + return s.onNewRoomEvent(ctx, *output.NewRoomEvent) case api.OutputTypeNewInviteEvent: - return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) + return s.onNewInviteEvent(ctx, *output.NewInviteEvent) case api.OutputTypeRetireInviteEvent: - return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent) + return s.onRetireInviteEvent(ctx, *output.RetireInviteEvent) default: log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", From dcf04b251c39d8de098e599678ec55d0878903ee Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Nov 2017 16:39:19 +0000 Subject: [PATCH 3/3] Add comment --- src/github.com/matrix-org/dendrite/roomserver/api/output.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/output.go b/src/github.com/matrix-org/dendrite/roomserver/api/output.go index 3c928202f..4f35dbbc7 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/output.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/output.go @@ -66,6 +66,8 @@ func (o *OutputEvent) AddSpanFromContext(ctx context.Context) error { return nil } +// StartSpanAndReplaceContext produces a context and opentracing span from the +// info embedded in OutputEvent func (o *OutputEvent) StartSpanAndReplaceContext( ctx context.Context, ) (context.Context, opentracing.Span) {