mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-12 09:23:09 -06:00
Merge dcf04b251c into 8a1f3195ca
This commit is contained in:
commit
1926684ea0
|
|
@ -77,6 +77,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
ctx, span := output.StartSpanAndReplaceContext(context.Background())
|
||||
defer span.Finish()
|
||||
|
||||
if output.Type != api.OutputTypeNewRoomEvent {
|
||||
log.WithField("type", output.Type).Debug(
|
||||
"roomserver output log: ignoring unknown output type",
|
||||
|
|
@ -96,7 +99,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return s.db.UpdateMemberships(context.TODO(), events, output.NewRoomEvent.RemovesStateEventIDs)
|
||||
return s.db.UpdateMemberships(ctx, events, output.NewRoomEvent.RemovesStateEventIDs)
|
||||
}
|
||||
|
||||
// lookupStateEvents looks up the state events that are added by a new event.
|
||||
|
|
|
|||
|
|
@ -85,6 +85,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
ctx, span := output.StartSpanAndReplaceContext(context.Background())
|
||||
defer span.Finish()
|
||||
|
||||
ev := &output.NewRoomEvent.Event
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
|
|
@ -92,7 +96,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
"send_as_server": output.NewRoomEvent.SendAsServer,
|
||||
}).Info("received event from roomserver")
|
||||
|
||||
if err := s.processMessage(*output.NewRoomEvent); err != nil {
|
||||
if err := s.processMessage(ctx, *output.NewRoomEvent); err != nil {
|
||||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
"event": string(ev.JSON()),
|
||||
|
|
@ -108,8 +112,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
|
||||
// processMessage updates the list of currently joined hosts in the room
|
||||
// and then sends the event to the hosts that were joined before the event.
|
||||
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
|
||||
addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event)
|
||||
func (s *OutputRoomEventConsumer) processMessage(
|
||||
ctx context.Context, ore api.OutputNewRoomEvent,
|
||||
) error {
|
||||
addsStateEvents, err := s.lookupStateEvents(ctx, ore.AddsStateEventIDs, ore.Event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -123,7 +129,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
|
|||
// TODO(#290): handle EventIDMismatchError and recover the current state by
|
||||
// talking to the roomserver
|
||||
oldJoinedHosts, err := s.db.UpdateRoom(
|
||||
context.TODO(),
|
||||
ctx,
|
||||
ore.Event.RoomID(),
|
||||
ore.LastSentEventID,
|
||||
ore.Event.EventID(),
|
||||
|
|
@ -148,7 +154,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
|
|||
}
|
||||
|
||||
// Work out which hosts were joined at the event itself.
|
||||
joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, oldJoinedHosts)
|
||||
joinedHostsAtEvent, err := s.joinedHostsAtEvent(ctx, ore, oldJoinedHosts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -169,7 +175,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
|
|||
// events from the room server.
|
||||
// Returns an error if there was a problem talking to the room server.
|
||||
func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
|
||||
ore api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost,
|
||||
ctx context.Context, ore api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost,
|
||||
) ([]gomatrixserverlib.ServerName, error) {
|
||||
// Combine the delta into a single delta so that the adds and removes can
|
||||
// cancel each other out. This should reduce the number of times we need
|
||||
|
|
@ -178,7 +184,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
|
|||
ore.AddsStateEventIDs, ore.RemovesStateEventIDs,
|
||||
ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs,
|
||||
)
|
||||
combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ore.Event)
|
||||
combinedAddsEvents, err := s.lookupStateEvents(ctx, combinedAdds, ore.Event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -288,7 +294,7 @@ func combineDeltas(adds1, removes1, adds2, removes2 []string) (adds, removes []s
|
|||
|
||||
// lookupStateEvents looks up the state events that are added by a new event.
|
||||
func (s *OutputRoomEventConsumer) lookupStateEvents(
|
||||
addsStateEventIDs []string, event gomatrixserverlib.Event,
|
||||
ctx context.Context, addsStateEventIDs []string, event gomatrixserverlib.Event,
|
||||
) ([]gomatrixserverlib.Event, error) {
|
||||
// Fast path if there aren't any new state events.
|
||||
if len(addsStateEventIDs) == 0 {
|
||||
|
|
@ -321,7 +327,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
|
|||
// from the roomserver using the query API.
|
||||
eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
|
||||
var eventResp api.QueryEventsByIDResponse
|
||||
if err := s.query.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
|
||||
if err := s.query.QueryEventsByID(ctx, &eventReq, &eventResp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -77,6 +77,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
ctx, span := output.StartSpanAndReplaceContext(context.Background())
|
||||
defer span.Finish()
|
||||
|
||||
ev := output.NewRoomEvent.Event
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
|
|
@ -86,17 +89,17 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
|
||||
addQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.AddsStateEventIDs}
|
||||
var addQueryRes api.QueryEventsByIDResponse
|
||||
if err := s.query.QueryEventsByID(context.TODO(), &addQueryReq, &addQueryRes); err != nil {
|
||||
if err := s.query.QueryEventsByID(ctx, &addQueryReq, &addQueryRes); err != nil {
|
||||
log.Warn(err)
|
||||
return err
|
||||
}
|
||||
|
||||
remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs}
|
||||
var remQueryRes api.QueryEventsByIDResponse
|
||||
if err := s.query.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil {
|
||||
if err := s.query.QueryEventsByID(ctx, &remQueryReq, &remQueryRes); err != nil {
|
||||
log.Warn(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return s.db.UpdateRoomFromEvents(context.TODO(), addQueryRes.Events, remQueryRes.Events)
|
||||
return s.db.UpdateRoomFromEvents(ctx, addQueryRes.Events, remQueryRes.Events)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,7 +15,11 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/opentracing/opentracing-go/ext"
|
||||
)
|
||||
|
||||
// An OutputType is a type of roomserver output.
|
||||
|
|
@ -41,6 +45,45 @@ type OutputEvent struct {
|
|||
NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"`
|
||||
// The content of event with type OutputTypeRetireInviteEvent
|
||||
RetireInviteEvent *OutputRetireInviteEvent `json:"retire_invite_event,omitempty"`
|
||||
// Serialized span context
|
||||
OpentracingCarrier opentracing.TextMapCarrier `json:"opentracing_carrier"`
|
||||
}
|
||||
|
||||
// AddSpanFromContext fills out the OpentracingCarrier field from the given context
|
||||
func (o *OutputEvent) AddSpanFromContext(ctx context.Context) error {
|
||||
span := opentracing.SpanFromContext(ctx)
|
||||
ext.SpanKindProducer.Set(span)
|
||||
carrier := make(opentracing.TextMapCarrier)
|
||||
tracer := opentracing.GlobalTracer()
|
||||
|
||||
err := tracer.Inject(span.Context(), opentracing.TextMap, carrier)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
o.OpentracingCarrier = carrier
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartSpanAndReplaceContext produces a context and opentracing span from the
|
||||
// info embedded in OutputEvent
|
||||
func (o *OutputEvent) StartSpanAndReplaceContext(
|
||||
ctx context.Context,
|
||||
) (context.Context, opentracing.Span) {
|
||||
tracer := opentracing.GlobalTracer()
|
||||
producerContext, err := tracer.Extract(opentracing.TextMap, o.OpentracingCarrier)
|
||||
|
||||
var span opentracing.Span
|
||||
if err != nil {
|
||||
// Default to a span without reference to producer context.
|
||||
span = tracer.StartSpan("output_event_consumer")
|
||||
} else {
|
||||
// Set the producer context.
|
||||
span = tracer.StartSpan("output_event_consumer", opentracing.FollowsFrom(producerContext))
|
||||
}
|
||||
|
||||
return opentracing.ContextWithSpan(ctx, span), span
|
||||
}
|
||||
|
||||
// An OutputNewRoomEvent is written when the roomserver receives a new event.
|
||||
|
|
|
|||
|
|
@ -188,7 +188,7 @@ func processInviteEvent(
|
|||
return nil
|
||||
}
|
||||
|
||||
outputUpdates, err := updateToInviteMembership(updater, &input.Event, nil)
|
||||
outputUpdates, err := updateToInviteMembership(ctx, updater, &input.Event, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -280,10 +280,17 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
|
|||
}
|
||||
ore.SendAsServer = u.sendAsServer
|
||||
|
||||
return &api.OutputEvent{
|
||||
oe := api.OutputEvent{
|
||||
Type: api.OutputTypeNewRoomEvent,
|
||||
NewRoomEvent: &ore,
|
||||
}, nil
|
||||
}
|
||||
|
||||
err = oe.AddSpanFromContext(u.ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &oe, nil
|
||||
}
|
||||
|
||||
type eventNIDSorter []types.EventNID
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ func updateMemberships(
|
|||
ae = &ev.Event
|
||||
}
|
||||
}
|
||||
if updates, err = updateMembership(updater, targetUserNID, re, ae, updates); err != nil {
|
||||
if updates, err = updateMembership(ctx, updater, targetUserNID, re, ae, updates); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
|
@ -85,6 +85,7 @@ func updateMemberships(
|
|||
}
|
||||
|
||||
func updateMembership(
|
||||
ctx context.Context,
|
||||
updater types.RoomRecentEventsUpdater, targetUserNID types.EventStateKeyNID,
|
||||
remove, add *gomatrixserverlib.Event,
|
||||
updates []api.OutputEvent,
|
||||
|
|
@ -119,11 +120,11 @@ func updateMembership(
|
|||
|
||||
switch new {
|
||||
case invite:
|
||||
return updateToInviteMembership(mu, add, updates)
|
||||
return updateToInviteMembership(ctx, mu, add, updates)
|
||||
case join:
|
||||
return updateToJoinMembership(mu, add, updates)
|
||||
return updateToJoinMembership(ctx, mu, add, updates)
|
||||
case leave, ban:
|
||||
return updateToLeaveMembership(mu, add, new, updates)
|
||||
return updateToLeaveMembership(ctx, mu, add, new, updates)
|
||||
default:
|
||||
panic(fmt.Errorf(
|
||||
"input: membership %q is not one of the allowed values", new,
|
||||
|
|
@ -132,7 +133,7 @@ func updateMembership(
|
|||
}
|
||||
|
||||
func updateToInviteMembership(
|
||||
mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
|
||||
ctx context.Context, mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
|
||||
) ([]api.OutputEvent, error) {
|
||||
// We may have already sent the invite to the user, either because we are
|
||||
// reprocessing this event, or because the we received this invite from a
|
||||
|
|
@ -151,16 +152,24 @@ func updateToInviteMembership(
|
|||
onie := api.OutputNewInviteEvent{
|
||||
Event: *add,
|
||||
}
|
||||
updates = append(updates, api.OutputEvent{
|
||||
|
||||
oe := api.OutputEvent{
|
||||
Type: api.OutputTypeNewInviteEvent,
|
||||
NewInviteEvent: &onie,
|
||||
})
|
||||
}
|
||||
|
||||
err = oe.AddSpanFromContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
updates = append(updates, oe)
|
||||
}
|
||||
return updates, nil
|
||||
}
|
||||
|
||||
func updateToJoinMembership(
|
||||
mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
|
||||
ctx context.Context, mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
|
||||
) ([]api.OutputEvent, error) {
|
||||
// If the user is already marked as being joined, we call SetToJoin to update
|
||||
// the event ID then we can return immediately. Retired is ignored as there
|
||||
|
|
@ -187,15 +196,24 @@ func updateToJoinMembership(
|
|||
RetiredByEventID: add.EventID(),
|
||||
TargetUserID: *add.StateKey(),
|
||||
}
|
||||
updates = append(updates, api.OutputEvent{
|
||||
|
||||
oe := api.OutputEvent{
|
||||
Type: api.OutputTypeRetireInviteEvent,
|
||||
RetireInviteEvent: &orie,
|
||||
})
|
||||
}
|
||||
|
||||
err = oe.AddSpanFromContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
updates = append(updates, oe)
|
||||
}
|
||||
return updates, nil
|
||||
}
|
||||
|
||||
func updateToLeaveMembership(
|
||||
ctx context.Context,
|
||||
mu types.MembershipUpdater, add *gomatrixserverlib.Event,
|
||||
newMembership string, updates []api.OutputEvent,
|
||||
) ([]api.OutputEvent, error) {
|
||||
|
|
@ -219,10 +237,18 @@ func updateToLeaveMembership(
|
|||
RetiredByEventID: add.EventID(),
|
||||
TargetUserID: *add.StateKey(),
|
||||
}
|
||||
updates = append(updates, api.OutputEvent{
|
||||
|
||||
oe := api.OutputEvent{
|
||||
Type: api.OutputTypeRetireInviteEvent,
|
||||
RetireInviteEvent: &orie,
|
||||
})
|
||||
}
|
||||
|
||||
err = oe.AddSpanFromContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
updates = append(updates, oe)
|
||||
}
|
||||
return updates, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -80,13 +80,16 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
ctx, span := output.StartSpanAndReplaceContext(context.Background())
|
||||
defer span.Finish()
|
||||
|
||||
switch output.Type {
|
||||
case api.OutputTypeNewRoomEvent:
|
||||
return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
|
||||
return s.onNewRoomEvent(ctx, *output.NewRoomEvent)
|
||||
case api.OutputTypeNewInviteEvent:
|
||||
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
|
||||
return s.onNewInviteEvent(ctx, *output.NewInviteEvent)
|
||||
case api.OutputTypeRetireInviteEvent:
|
||||
return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent)
|
||||
return s.onRetireInviteEvent(ctx, *output.RetireInviteEvent)
|
||||
default:
|
||||
log.WithField("type", output.Type).Debug(
|
||||
"roomserver output log: ignoring unknown output type",
|
||||
|
|
|
|||
Loading…
Reference in a new issue