From ce1255e7dbac56f06cbc947965662a97cde44e98 Mon Sep 17 00:00:00 2001 From: Tommie Gannert Date: Sun, 10 Oct 2021 11:51:34 +0200 Subject: [PATCH] Use PushGatewayClient and the pushrules module in Pushserver's room consumer. * Use one goroutine per user to avoid locking up the entire server for one bad push gateway. * Split pushing by format. * Send one device per push. Sytest does not support coalescing multiple devices into one push. Matches Synapse. Either we change Sytest, or remove the group-by-url-and-format logic. * Write OutputNotificationData from push server. Sync API is already the consumer. --- clientapi/routing/routing.go | 2 +- cmd/dendrite-demo-libp2p/main.go | 5 + cmd/dendrite-demo-yggdrasil/main.go | 9 + cmd/dendrite-monolith-server/main.go | 3 +- .../personalities/pushserver.go | 3 +- pushserver/consumers/roomserver.go | 629 ++++++++++++++++++ pushserver/consumers/roomserver_test.go | 260 ++++++++ pushserver/internal/api.go | 4 +- pushserver/producers/syncapi.go | 74 ++- pushserver/pushserver.go | 24 +- pushserver/util/devices.go | 100 +++ pushserver/util/notify.go | 76 +++ setup/base/base.go | 6 + setup/config/config_pushserver.go | 5 + 14 files changed, 1183 insertions(+), 17 deletions(-) create mode 100644 pushserver/consumers/roomserver.go create mode 100644 pushserver/consumers/roomserver_test.go create mode 100644 pushserver/util/devices.go create mode 100644 pushserver/util/notify.go diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 32556500f..67814bf88 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -604,7 +604,7 @@ func Setup( r0mux.Handle("/pushrules/{scope}/{kind}/{ruleID}", httputil.MakeAuthAPI("push_rules", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { - if r := rateLimits.rateLimit(req); r != nil { + if r := rateLimits.Limit(req); r != nil { return *r } vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index d518d1586..541dc7cc6 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -33,6 +33,7 @@ import ( "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" + "github.com/matrix-org/dendrite/pushserver" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/config" @@ -172,6 +173,9 @@ func main() { base, keyRing, ) + pgClient := base.Base.PushGatewayHTTPClient() + psAPI := pushserver.NewInternalAPI(&cfg.PushServer, base.Base.ProcessContext, pgClient, rsAPI, userAPI) + monolith := setup.Monolith{ Config: base.Base.Cfg, AccountDB: accountDB, @@ -182,6 +186,7 @@ func main() { AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, FederationAPI: fsAPI, + PushserverAPI: psAPI, RoomserverAPI: rsAPI, UserAPI: userAPI, KeyAPI: keyAPI, diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 1e0a7d03e..9ac06df72 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -36,6 +36,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" + "github.com/matrix-org/dendrite/pushserver" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/base" @@ -124,6 +125,13 @@ func main() { rsComponent.SetFederationAPI(fsAPI) rsComponent.SetKeyring(keyRing) + pgClient := base.PushGatewayHTTPClient() + psAPI := pushserver.NewInternalAPI(&cfg.PushServer, base.ProcessContext, pgClient, rsAPI, userAPI) + if base.UseHTTPAPIs { + pushserver.AddInternalRoutes(base.InternalAPIMux, psAPI) + psAPI = base.PushServerHTTPClient() + } + monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, @@ -134,6 +142,7 @@ func main() { AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, FederationAPI: fsAPI, + PushserverAPI: psAPI, RoomserverAPI: rsAPI, UserAPI: userAPI, KeyAPI: keyAPI, diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 89168301c..5d4e1a15f 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -148,7 +148,8 @@ func main() { eduInputAPI = base.EDUServerClient() } - psAPI := pushserver.NewInternalAPI(&base.Cfg.PushServer, rsAPI, userAPI) + pgClient := base.PushGatewayHTTPClient() + psAPI := pushserver.NewInternalAPI(&base.Cfg.PushServer, base.ProcessContext, pgClient, rsAPI, userAPI) if base.UseHTTPAPIs { pushserver.AddInternalRoutes(base.InternalAPIMux, psAPI) psAPI = base.PushServerHTTPClient() diff --git a/cmd/dendrite-polylith-multi/personalities/pushserver.go b/cmd/dendrite-polylith-multi/personalities/pushserver.go index 414450af3..fbd19f86c 100644 --- a/cmd/dendrite-polylith-multi/personalities/pushserver.go +++ b/cmd/dendrite-polylith-multi/personalities/pushserver.go @@ -22,7 +22,8 @@ import ( ) func PushServer(base *basepkg.BaseDendrite, cfg *config.Dendrite, rsAPI roomserverAPI.RoomserverInternalAPI) { - intAPI := pushserver.NewInternalAPI(&cfg.PushServer, rsAPI, base.UserAPIClient()) + pgClient := base.PushGatewayHTTPClient() + intAPI := pushserver.NewInternalAPI(&cfg.PushServer, base.ProcessContext, pgClient, rsAPI, base.UserAPIClient()) pushserver.AddInternalRoutes(base.InternalAPIMux, intAPI) diff --git a/pushserver/consumers/roomserver.go b/pushserver/consumers/roomserver.go new file mode 100644 index 000000000..d55705ed4 --- /dev/null +++ b/pushserver/consumers/roomserver.go @@ -0,0 +1,629 @@ +package consumers + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/eventutil" + "github.com/matrix-org/dendrite/internal/pushgateway" + "github.com/matrix-org/dendrite/internal/pushrules" + "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/dendrite/pushserver/producers" + "github.com/matrix-org/dendrite/pushserver/storage" + "github.com/matrix-org/dendrite/pushserver/storage/tables" + "github.com/matrix-org/dendrite/pushserver/util" + rsapi "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" +) + +type OutputRoomEventConsumer struct { + cfg *config.PushServer + rsAPI rsapi.RoomserverInternalAPI + psAPI api.PushserverInternalAPI + pgClient pushgateway.Client + rsConsumer *internal.ContinualConsumer + db storage.Database + syncProducer *producers.SyncAPI +} + +func NewOutputRoomEventConsumer( + process *process.ProcessContext, + cfg *config.PushServer, + kafkaConsumer sarama.Consumer, + store storage.Database, + pgClient pushgateway.Client, + psAPI api.PushserverInternalAPI, + rsAPI rsapi.RoomserverInternalAPI, + syncProducer *producers.SyncAPI, +) *OutputRoomEventConsumer { + consumer := internal.ContinualConsumer{ + Process: process, + ComponentName: "pushserver/roomserver", + Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &OutputRoomEventConsumer{ + cfg: cfg, + rsConsumer: &consumer, + db: store, + rsAPI: rsAPI, + psAPI: psAPI, + pgClient: pgClient, + syncProducer: syncProducer, + } + consumer.ProcessMessage = s.onMessage + return s +} + +func (s *OutputRoomEventConsumer) Start() error { + return s.rsConsumer.Start() +} + +func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { + ctx := context.Background() + + var output rsapi.OutputEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + log.WithError(err).Errorf("pushserver consumer: message parse failure") + return nil + } + + log.WithFields(log.Fields{ + "event_type": output.Type, + }).Tracef("Received message from room server: %#v", output) + + switch output.Type { + case rsapi.OutputTypeNewRoomEvent: + ev := output.NewRoomEvent.Event + if err := s.processMessage(ctx, output.NewRoomEvent.Event); err != nil { + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "event": string(ev.JSON()), + }).WithError(err).Errorf("pushserver consumer: process room event failure") + } + + case rsapi.OutputTypeNewInviteEvent: + ev := output.NewInviteEvent.Event + if err := s.processMessage(ctx, output.NewInviteEvent.Event); err != nil { + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "event": string(ev.JSON()), + }).WithError(err).Errorf("pushserver consumer: process invite event failure") + } + + default: + // Ignore old events, peeks, so on. + } + + return nil +} + +func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error { + log.WithFields(log.Fields{ + "event_type": event.Type(), + }).Tracef("Received event from room server: %#v", event) + + members, roomSize, err := s.localRoomMembers(ctx, event.RoomID()) + if err != nil { + return err + } + + if event.Type() == gomatrixserverlib.MRoomMember { + cevent := gomatrixserverlib.HeaderedToClientEvent(event, gomatrixserverlib.FormatAll) + member, err := newLocalMembership(&cevent) + if err != nil { + return err + } + if member.Membership == gomatrixserverlib.Invite && member.Domain == s.cfg.Matrix.ServerName { + // localRoomMembers only adds joined members. An invite + // should also be pushed to the target user. + members = append(members, member) + } + } + + // TODO: run in parallel with localRoomMembers. + roomName, err := s.roomName(ctx, event) + if err != nil { + return err + } + + log.WithFields(log.Fields{ + "event_id": event.EventID(), + "room_id": event.RoomID(), + "num_members": len(members), + "room_size": roomSize, + }).Tracef("Notifying members") + + // Notification.UserIsTarget is a per-member field, so we + // cannot group all users in a single request. + // + // TODO: does it have to be set? It's not required, and + // removing it means we can send all notifications to + // e.g. Element's Push gateway in one go. + for _, mem := range members { + if err := s.notifyLocal(ctx, event, mem, roomSize, roomName); err != nil { + log.WithFields(log.Fields{ + "localpart": mem.Localpart, + }).WithError(err).Errorf("Unable to push to local user") + continue + } + } + + return nil +} + +type localMembership struct { + gomatrixserverlib.MemberContent + UserID string + Localpart string + Domain gomatrixserverlib.ServerName +} + +func newLocalMembership(event *gomatrixserverlib.ClientEvent) (*localMembership, error) { + if event.StateKey == nil { + return nil, fmt.Errorf("missing state_key") + } + + var member localMembership + if err := json.Unmarshal(event.Content, &member.MemberContent); err != nil { + return nil, err + } + + localpart, domain, err := gomatrixserverlib.SplitID('@', *event.StateKey) + if err != nil { + return nil, err + } + + member.UserID = *event.StateKey + member.Localpart = localpart + member.Domain = domain + return &member, nil +} + +// localRoomMembers fetches the current local members of a room, and +// the total number of members. +func (s *OutputRoomEventConsumer) localRoomMembers(ctx context.Context, roomID string) ([]*localMembership, int, error) { + req := &rsapi.QueryMembershipsForRoomRequest{ + RoomID: roomID, + JoinedOnly: true, + } + var res rsapi.QueryMembershipsForRoomResponse + + // XXX: This could potentially race if the state for the event is not known yet + // e.g. the event came over federation but we do not have the full state persisted. + if err := s.rsAPI.QueryMembershipsForRoom(ctx, req, &res); err != nil { + return nil, 0, err + } + + var members []*localMembership + var ntotal int + for _, event := range res.JoinEvents { + member, err := newLocalMembership(&event) + if err != nil { + log.WithError(err).Errorf("Parsing MemberContent") + continue + } + if member.Membership != gomatrixserverlib.Join { + continue + } + if member.Domain != s.cfg.Matrix.ServerName { + continue + } + + ntotal++ + members = append(members, member) + } + + return members, ntotal, nil +} + +// roomName returns the name in the event (if type==m.room.name), or +// looks it up in roomserver. If there is no name, +// m.room.canonical_alias is consulted. Returns an empty string if the +// room has no name. +func (s *OutputRoomEventConsumer) roomName(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) (string, error) { + if event.Type() == gomatrixserverlib.MRoomName { + name, err := unmarshalRoomName(event) + if err != nil { + return "", err + } + + if name != "" { + return name, nil + } + } + + req := &rsapi.QueryCurrentStateRequest{ + RoomID: event.RoomID(), + StateTuples: []gomatrixserverlib.StateKeyTuple{roomNameTuple, canonicalAliasTuple}, + } + var res rsapi.QueryCurrentStateResponse + + if err := s.rsAPI.QueryCurrentState(ctx, req, &res); err != nil { + return "", err + } + + if event := res.StateEvents[roomNameTuple]; event != nil { + return unmarshalRoomName(event) + } + + if event.Type() == gomatrixserverlib.MRoomCanonicalAlias { + alias, err := unmarshalCanonicalAlias(event) + if err != nil { + return "", err + } + + if alias != "" { + return alias, nil + } + } + + if event := res.StateEvents[canonicalAliasTuple]; event != nil { + return unmarshalCanonicalAlias(event) + } + + return "", nil +} + +var ( + canonicalAliasTuple = gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias} + roomNameTuple = gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomName} +) + +func unmarshalRoomName(event *gomatrixserverlib.HeaderedEvent) (string, error) { + var nc eventutil.NameContent + if err := json.Unmarshal(event.Content(), &nc); err != nil { + return "", fmt.Errorf("unmarshaling NameContent: %w", err) + } + + return nc.Name, nil +} + +func unmarshalCanonicalAlias(event *gomatrixserverlib.HeaderedEvent) (string, error) { + var cac eventutil.CanonicalAliasContent + if err := json.Unmarshal(event.Content(), &cac); err != nil { + return "", fmt.Errorf("unmarshaling CanonicalAliasContent: %w", err) + } + + return cac.Alias, nil +} + +// notifyLocal finds the right push actions for a local user, given an event. +func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, mem *localMembership, roomSize int, roomName string) error { + actions, err := s.evaluatePushRules(ctx, event, mem, roomSize) + if err != nil { + return err + } + a, tweaks, err := pushrules.ActionsToTweaks(actions) + if err != nil { + return err + } + // TODO: support coalescing. + if a != pushrules.NotifyAction && a != pushrules.CoalesceAction { + log.WithFields(log.Fields{ + "event_id": event.EventID(), + "room_id": event.RoomID(), + "localpart": mem.Localpart, + }).Tracef("Push rule evaluation rejected the event") + return nil + } + + devicesByURLAndFormat, profileTag, err := s.localPushDevices(ctx, mem.Localpart, tweaks) + if err != nil { + return err + } + + n := &api.Notification{ + Actions: actions, + // UNSPEC: the spec doesn't say this is a ClientEvent, but the + // fields seem to match. room_id should be missing, which + // matches the behavior of FormatSync. + Event: gomatrixserverlib.HeaderedToClientEvent(event, gomatrixserverlib.FormatSync), + // TODO: this is per-device, but it's not part of the primary + // key. So inserting one notification per profile tag doesn't + // make sense. What is this supposed to be? Sytests require it + // to "work", but they only use a single device. + ProfileTag: profileTag, + RoomID: event.RoomID(), + TS: gomatrixserverlib.AsTimestamp(time.Now()), + } + if err := s.db.InsertNotification(ctx, mem.Localpart, event.EventID(), tweaks, n); err != nil { + return err + } + + if err := s.syncProducer.GetAndSendNotificationData(ctx, mem.UserID, event.RoomID()); err != nil { + return err + } + + // We do this after InsertNotification. Thus, this should always return >=1. + userNumUnreadNotifs, err := s.db.GetNotificationCount(ctx, mem.Localpart, tables.AllNotifications) + if err != nil { + return err + } + + log.WithFields(log.Fields{ + "event_id": event.EventID(), + "room_id": event.RoomID(), + "localpart": mem.Localpart, + "num_urls": len(devicesByURLAndFormat), + "num_unread": userNumUnreadNotifs, + }).Tracef("Notifying single member") + + // Push gateways are out of our control, and we cannot risk + // looking up the server on a misbehaving push gateway. Each user + // receives a goroutine now that all internal API calls have been + // made. + // + // TODO: think about bounding this to one per user, and what + // ordering guarantees we must provide. + go func() { + // This background processing cannot be tied to a request. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + var rejected []*pushgateway.Device + for url, fmts := range devicesByURLAndFormat { + for format, devices := range fmts { + // TODO: support "email". + if !strings.HasPrefix(url, "http") { + continue + } + + // UNSPEC: the specification suggests there can be + // more than one device per request. There is at least + // one Sytest that expects one HTTP request per + // device, rather than per URL. For now, we must + // notify each one separately. + for _, dev := range devices { + rej, err := s.notifyHTTP(ctx, event, url, format, []*pushgateway.Device{dev}, mem.Localpart, roomName, int(userNumUnreadNotifs)) + if err != nil { + log.WithFields(log.Fields{ + "event_id": event.EventID(), + "localpart": mem.Localpart, + }).WithError(err).Errorf("Unable to notify HTTP pusher") + continue + } + rejected = append(rejected, rej...) + } + } + } + + if len(rejected) > 0 { + if err := s.deleteRejectedPushers(ctx, rejected, mem.Localpart); err != nil { + log.WithFields(log.Fields{ + "localpart": mem.Localpart, + "num_pushers": len(rejected), + }).WithError(err).Errorf("Unable to delete rejected pushers") + } + } + }() + + return nil +} + +// evaluatePushRules fetches and evaluates the push rules of a local +// user. Returns actions (including dont_notify). +func (s *OutputRoomEventConsumer) evaluatePushRules(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, mem *localMembership, roomSize int) ([]*pushrules.Action, error) { + if event.Sender() == mem.UserID { + // SPEC: Homeservers MUST NOT notify the Push Gateway for + // events that the user has sent themselves. + return nil, nil + } + + var res api.QueryPushRulesResponse + if err := s.psAPI.QueryPushRules(ctx, &api.QueryPushRulesRequest{UserID: mem.UserID}, &res); err != nil { + return nil, err + } + + ec := &ruleSetEvalContext{ + ctx: ctx, + rsAPI: s.rsAPI, + mem: mem, + roomID: event.RoomID(), + roomSize: roomSize, + } + eval := pushrules.NewRuleSetEvaluator(ec, &res.RuleSets.Global) + rule, err := eval.MatchEvent(event.Event) + if err != nil { + return nil, err + } + if rule == nil { + // SPEC: If no rules match an event, the homeserver MUST NOT + // notify the Push Gateway for that event. + return nil, err + } + + log.WithFields(log.Fields{ + "event_id": event.EventID(), + "room_id": event.RoomID(), + "localpart": mem.Localpart, + "rule_id": rule.RuleID, + }).Tracef("Matched a push rule") + + return rule.Actions, nil +} + +type ruleSetEvalContext struct { + ctx context.Context + rsAPI rsapi.RoomserverInternalAPI + mem *localMembership + roomID string + roomSize int +} + +func (rse *ruleSetEvalContext) UserDisplayName() string { return rse.mem.DisplayName } + +func (rse *ruleSetEvalContext) RoomMemberCount() (int, error) { return rse.roomSize, nil } + +func (rse *ruleSetEvalContext) HasPowerLevel(userID, levelKey string) (bool, error) { + req := &rsapi.QueryLatestEventsAndStateRequest{ + RoomID: rse.roomID, + StateToFetch: []gomatrixserverlib.StateKeyTuple{ + {EventType: "m.room.power_levels"}, + }, + } + var res rsapi.QueryLatestEventsAndStateResponse + if err := rse.rsAPI.QueryLatestEventsAndState(rse.ctx, req, &res); err != nil { + return false, err + } + for _, ev := range res.StateEvents { + if ev.Type() != gomatrixserverlib.MRoomPowerLevels { + continue + } + + plc, err := gomatrixserverlib.NewPowerLevelContentFromEvent(ev.Event) + if err != nil { + return false, err + } + return plc.UserLevel(userID) >= plc.NotificationLevel(levelKey), nil + } + return true, nil +} + +// localPushDevices pushes to the configured devices of a local +// user. The map keys are [url][format]. +func (s *OutputRoomEventConsumer) localPushDevices(ctx context.Context, localpart string, tweaks map[string]interface{}) (map[string]map[string][]*pushgateway.Device, string, error) { + pusherDevices, err := util.GetPushDevices(ctx, localpart, tweaks, s.db) + if err != nil { + return nil, "", err + } + + var profileTag string + devicesByURL := make(map[string]map[string][]*pushgateway.Device, len(pusherDevices)) + for _, pusherDevice := range pusherDevices { + if profileTag == "" { + profileTag = pusherDevice.Pusher.ProfileTag + } + + url := pusherDevice.URL + if devicesByURL[url] == nil { + devicesByURL[url] = make(map[string][]*pushgateway.Device, 2) + } + devicesByURL[url][pusherDevice.Format] = append(devicesByURL[url][pusherDevice.Format], &pusherDevice.Device) + } + + return devicesByURL, profileTag, nil +} + +// notifyHTTP performs a notificatation to a Push Gateway. +func (s *OutputRoomEventConsumer) notifyHTTP(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, url, format string, devices []*pushgateway.Device, localpart, roomName string, userNumUnreadNotifs int) ([]*pushgateway.Device, error) { + var req pushgateway.NotifyRequest + switch format { + case "event_id_only": + req = pushgateway.NotifyRequest{ + Notification: pushgateway.Notification{ + Counts: &pushgateway.Counts{}, + Devices: devices, + EventID: event.EventID(), + RoomID: event.RoomID(), + }, + } + + default: + req = pushgateway.NotifyRequest{ + Notification: pushgateway.Notification{ + Content: event.Content(), + Counts: &pushgateway.Counts{ + Unread: userNumUnreadNotifs, + }, + Devices: devices, + EventID: event.EventID(), + ID: event.EventID(), + RoomID: event.RoomID(), + RoomName: roomName, + Sender: event.Sender(), + Type: event.Type(), + }, + } + if mem, err := event.Membership(); err == nil { + req.Notification.Membership = mem + } + if event.StateKey() != nil && *event.StateKey() == fmt.Sprintf("@%s:%s", localpart, s.cfg.Matrix.ServerName) { + req.Notification.UserIsTarget = true + } + } + + log.WithFields(log.Fields{ + "event_id": event.EventID(), + "url": url, + "localpart": localpart, + "app_id0": devices[0].AppID, + "pushkey": devices[0].PushKey, + "num_devices": len(devices), + }).Debugf("Notifying HTTP push gateway") + + var res pushgateway.NotifyResponse + if err := s.pgClient.Notify(ctx, url, &req, &res); err != nil { + return nil, err + } + + log.WithFields(log.Fields{ + "event_id": event.EventID(), + "url": url, + "localpart": localpart, + "num_rejected": len(res.Rejected), + }).Tracef("HTTP push gateway result") + + if len(res.Rejected) == 0 { + return nil, nil + } + + devMap := make(map[string]*pushgateway.Device, len(devices)) + for _, d := range devices { + devMap[d.PushKey] = d + } + rejected := make([]*pushgateway.Device, 0, len(res.Rejected)) + for _, pushKey := range res.Rejected { + d := devMap[pushKey] + if d != nil { + rejected = append(rejected, d) + } + } + + return rejected, nil +} + +// deleteRejectedPushers deletes the pushers associated with the given devices. +func (s *OutputRoomEventConsumer) deleteRejectedPushers(ctx context.Context, devices []*pushgateway.Device, localpart string) error { + log.WithFields(log.Fields{ + "localpart": localpart, + "app_id0": devices[0].AppID, + "num_devices": len(devices), + }).Infof("Deleting pushers rejected by the HTTP push gateway") + + for _, d := range devices { + if err := s.db.RemovePusher(ctx, d.AppID, d.PushKey, localpart); err != nil { + log.WithFields(log.Fields{ + "localpart": localpart, + }).WithError(err).Errorf("Unable to delete rejected pusher") + } + } + + return nil +} + +// mapWithout returns a shallow copy of the map, without the given +// key. Returns nil if the resulting map is empty. +func mapWithout(m map[string]interface{}, key string) map[string]interface{} { + ret := make(map[string]interface{}, len(m)) + for k, v := range m { + // The specification says we do not send "url". + if k == key { + continue + } + ret[k] = v + } + if len(ret) == 0 { + return nil + } + return ret +} diff --git a/pushserver/consumers/roomserver_test.go b/pushserver/consumers/roomserver_test.go new file mode 100644 index 000000000..a7a565421 --- /dev/null +++ b/pushserver/consumers/roomserver_test.go @@ -0,0 +1,260 @@ +package consumers + +import ( + "context" + "encoding/json" + "sync" + "testing" + + "github.com/Shopify/sarama" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/matrix-org/dendrite/internal/pushgateway" + "github.com/matrix-org/dendrite/internal/pushrules" + "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/dendrite/pushserver/producers" + "github.com/matrix-org/dendrite/pushserver/storage" + rsapi "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/gomatrixserverlib" +) + +const serverName = gomatrixserverlib.ServerName("example.org") + +func TestOutputRoomEventConsumer(t *testing.T) { + ctx := context.Background() + + dbopts := &config.DatabaseOptions{ + ConnectionString: "file::memory:", + MaxOpenConnections: 1, + MaxIdleConnections: 1, + } + db, err := storage.Open(dbopts) + if err != nil { + t.Fatalf("NewDatabase failed: %v", err) + } + err = db.CreatePusher(ctx, + api.Pusher{ + PushKey: "apushkey", + Kind: api.HTTPKind, + AppID: "anappid", + Data: map[string]interface{}{ + "url": "http://example.org/pusher/notify", + "extra": "someextra", + }, + }, + "alice") + if err != nil { + t.Fatalf("CreatePusher failed: %v", err) + } + + var rsAPI fakeRoomServerInternalAPI + var psAPI fakePushserverInternalAPI + var messageSender fakeMessageSender + var wg sync.WaitGroup + wg.Add(1) + pgClient := fakePushGatewayClient{ + WG: &wg, + } + s := &OutputRoomEventConsumer{ + cfg: &config.PushServer{ + Matrix: &config.Global{ + ServerName: serverName, + }, + }, + db: db, + rsAPI: &rsAPI, + psAPI: &psAPI, + pgClient: &pgClient, + syncProducer: producers.NewSyncAPI(db, &messageSender, "clientDataTopic", "notificationDataTopic"), + } + + event, err := gomatrixserverlib.NewEventFromTrustedJSONWithEventID("$143273582443PhrSn:example.org", []byte(`{ + "content": { + "body": "This is an example text message", + "format": "org.matrix.custom.html", + "formatted_body": "\u003cb\u003eThis is an example text message\u003c/b\u003e", + "msgtype": "m.text" + }, + "origin_server_ts": 1432735824653, + "room_id": "!jEsUZKDJdhlrceRyVU:example.org", + "sender": "@example:example.org", + "type": "m.room.message", + "unsigned": { + "age": 1234 + } +}`), false, gomatrixserverlib.RoomVersionV7) + if err != nil { + t.Fatalf("NewEventFromTrustedJSON failed: %v", err) + } + + ev := &gomatrixserverlib.HeaderedEvent{ + Event: event, + } + if err := s.processMessage(ctx, ev); err != nil { + t.Fatalf("processMessage failed: %v", err) + } + + t.Log("Waiting for backend calls to finish.") + wg.Wait() + + if diff := cmp.Diff([]*rsapi.QueryMembershipsForRoomRequest{{JoinedOnly: true, RoomID: "!jEsUZKDJdhlrceRyVU:example.org"}}, rsAPI.MembershipReqs); diff != "" { + t.Errorf("rsAPI.QueryMembershipsForRoom Reqs: +got -want:\n%s", diff) + } + if diff := cmp.Diff([]*pushgateway.NotifyRequest{{ + Notification: pushgateway.Notification{ + Type: "m.room.message", + Content: event.Content(), + Counts: &pushgateway.Counts{ + Unread: 1, + }, + Devices: []*pushgateway.Device{{ + AppID: "anappid", + PushKey: "apushkey", + Data: map[string]interface{}{ + "extra": "someextra", + }, + }}, + EventID: "$143273582443PhrSn:example.org", + ID: "$143273582443PhrSn:example.org", + RoomID: "!jEsUZKDJdhlrceRyVU:example.org", + RoomName: "aname", + Sender: "@example:example.org", + }, + }}, pgClient.Reqs); diff != "" { + t.Errorf("pgClient.NotifyHTTP Reqs: +got -want:\n%s", diff) + } + if diff := cmp.Diff([]sarama.ProducerMessage{{ + Topic: "notificationDataTopic", + Key: sarama.StringEncoder("@alice:example.org"), + Value: sarama.ByteEncoder([]byte(`{"room_id":"!jEsUZKDJdhlrceRyVU:example.org","unread_highlight_count":0,"unread_notification_count":1}`)), + }}, messageSender.Messages, cmpopts.IgnoreUnexported(sarama.ProducerMessage{})); diff != "" { + t.Errorf("SendMessage Messages: +got -want:\n%s", diff) + } +} + +type fakeRoomServerInternalAPI struct { + rsapi.RoomserverInternalAPI + + MembershipReqs []*rsapi.QueryMembershipsForRoomRequest +} + +func (s *fakeRoomServerInternalAPI) QueryCurrentState( + ctx context.Context, + req *rsapi.QueryCurrentStateRequest, + res *rsapi.QueryCurrentStateResponse, +) error { + *res = rsapi.QueryCurrentStateResponse{ + StateEvents: map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent{ + roomNameTuple: mustParseHeaderedEventJSON(`{ + "_room_version": "7", + "content": { + "name": "aname" + }, + "event_id": "$3957tyerfgewrf382:example.org", + "origin_server_ts": 1432735824652, + "room_id": "!jEsUZKDJdhlrceRyVU:example.org", + "sender": "@example:example.org", + "state_key": "@alice:example.org", + "type": "m.room.name" +}`), + }, + } + return nil +} + +func (s *fakeRoomServerInternalAPI) QueryMembershipsForRoom( + ctx context.Context, + req *rsapi.QueryMembershipsForRoomRequest, + res *rsapi.QueryMembershipsForRoomResponse, +) error { + s.MembershipReqs = append(s.MembershipReqs, req) + *res = rsapi.QueryMembershipsForRoomResponse{ + JoinEvents: []gomatrixserverlib.ClientEvent{ + mustParseClientEventJSON(`{ + "content": { + "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF", + "displayname": "Alice Margatroid", + "membership": "join", + "reason": "Looking for support" + }, + "event_id": "$3957tyerfgewrf384:example.org", + "origin_server_ts": 1432735824653, + "room_id": "!jEsUZKDJdhlrceRyVU:example.org", + "sender": "@example:example.org", + "state_key": "@alice:example.org", + "type": "m.room.member", + "unsigned": { + "age": 1234 + } +}`), + }, + } + return nil +} + +type fakePushserverInternalAPI struct { + api.PushserverInternalAPI +} + +func (s *fakePushserverInternalAPI) QueryPushRules(ctx context.Context, req *api.QueryPushRulesRequest, res *api.QueryPushRulesResponse) error { + localpart, _, err := gomatrixserverlib.SplitID('@', req.UserID) + if err != nil { + return err + } + res.RuleSets = pushrules.DefaultAccountRuleSets(localpart, "example.org") + return nil +} + +type fakePushGatewayClient struct { + pushgateway.Client + + WG *sync.WaitGroup + Reqs []*pushgateway.NotifyRequest +} + +func (c *fakePushGatewayClient) Notify(ctx context.Context, url string, req *pushgateway.NotifyRequest, res *pushgateway.NotifyResponse) error { + c.Reqs = append(c.Reqs, req) + if c.WG != nil { + c.WG.Done() + } + *res = pushgateway.NotifyResponse{ + Rejected: []string{ + "apushkey", + }, + } + return nil +} + +func mustMarshalJSON(v interface{}) []byte { + bs, err := json.Marshal(v) + if err != nil { + panic(err) + } + return bs +} + +func mustParseClientEventJSON(s string) gomatrixserverlib.ClientEvent { + var ev gomatrixserverlib.ClientEvent + if err := json.Unmarshal([]byte(s), &ev); err != nil { + panic(err) + } + return ev +} + +func mustParseHeaderedEventJSON(s string) *gomatrixserverlib.HeaderedEvent { + var ev gomatrixserverlib.HeaderedEvent + if err := json.Unmarshal([]byte(s), &ev); err != nil { + panic(err) + } + return &ev +} + +type fakeMessageSender struct { + Messages []sarama.ProducerMessage +} + +func (s *fakeMessageSender) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { + s.Messages = append(s.Messages, *msg) + return 0, 0, nil +} diff --git a/pushserver/internal/api.go b/pushserver/internal/api.go index 2362e3877..a1bdfac75 100644 --- a/pushserver/internal/api.go +++ b/pushserver/internal/api.go @@ -22,11 +22,11 @@ type PushserverInternalAPI struct { Cfg *config.PushServer DB storage.Database userAPI uapi.UserInternalAPI - syncProducer *producers.SyncAPIProducer + syncProducer *producers.SyncAPI } func NewPushserverAPI( - cfg *config.PushServer, pushserverDB storage.Database, userAPI uapi.UserInternalAPI, syncProducer *producers.SyncAPIProducer, + cfg *config.PushServer, pushserverDB storage.Database, userAPI uapi.UserInternalAPI, syncProducer *producers.SyncAPI, ) *PushserverInternalAPI { a := &PushserverInternalAPI{ Cfg: cfg, diff --git a/pushserver/producers/syncapi.go b/pushserver/producers/syncapi.go index 279d08674..61ad89527 100644 --- a/pushserver/producers/syncapi.go +++ b/pushserver/producers/syncapi.go @@ -1,21 +1,35 @@ package producers import ( + "context" "encoding/json" "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal/eventutil" + "github.com/matrix-org/dendrite/pushserver/storage" + "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" ) -// SyncAPIProducer produces messages for the Sync API server to consume. -type SyncAPIProducer struct { - Producer sarama.SyncProducer - ClientDataTopic string +// SyncAPI produces messages for the Sync API server to consume. +type SyncAPI struct { + db storage.Database + producer MessageSender + clientDataTopic string + notificationDataTopic string +} + +func NewSyncAPI(db storage.Database, producer MessageSender, clientDataTopic string, notificationDataTopic string) *SyncAPI { + return &SyncAPI{ + db: db, + producer: producer, + clientDataTopic: clientDataTopic, + notificationDataTopic: notificationDataTopic, + } } // SendAccountData sends account data to the Sync API server. -func (p *SyncAPIProducer) SendAccountData(userID string, roomID string, dataType string) error { +func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string) error { var m sarama.ProducerMessage data := eventutil.AccountData{ @@ -27,15 +41,59 @@ func (p *SyncAPIProducer) SendAccountData(userID string, roomID string, dataType return err } - m.Topic = string(p.ClientDataTopic) + m.Topic = string(p.clientDataTopic) m.Key = sarama.StringEncoder(userID) m.Value = sarama.ByteEncoder(value) log.WithFields(log.Fields{ "user_id": userID, "room_id": roomID, "data_type": dataType, - }).Infof("Producing to topic '%s'", m.Topic) + }).Infof("Producing to topic %q", m.Topic) - _, _, err = p.Producer.SendMessage(&m) + _, _, err = p.producer.SendMessage(&m) return err } + +// GetAndSendNotificationData reads the database and sends data about unread +// notifications to the Sync API server. +func (p *SyncAPI) GetAndSendNotificationData(ctx context.Context, userID, roomID string) error { + localpart, _, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return err + } + + ntotal, nhighlight, err := p.db.GetRoomNotificationCounts(ctx, localpart, roomID) + if err != nil { + return err + } + + return p.sendNotificationData(userID, &eventutil.NotificationData{ + RoomID: roomID, + UnreadHighlightCount: int(nhighlight), + UnreadNotificationCount: int(ntotal), + }) +} + +// sendNotificationData sends data about unread notifications to the Sync API server. +func (p *SyncAPI) sendNotificationData(userID string, data *eventutil.NotificationData) error { + value, err := json.Marshal(data) + if err != nil { + return err + } + + var m sarama.ProducerMessage + m.Topic = string(p.notificationDataTopic) + m.Key = sarama.StringEncoder(userID) + m.Value = sarama.ByteEncoder(value) + log.WithFields(log.Fields{ + "user_id": userID, + "room_id": data.RoomID, + }).Infof("Producing to topic %q", m.Topic) + + _, _, err = p.producer.SendMessage(&m) + return err +} + +type MessageSender interface { + SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) +} diff --git a/pushserver/pushserver.go b/pushserver/pushserver.go index 4fb8098d4..03e2766ea 100644 --- a/pushserver/pushserver.go +++ b/pushserver/pushserver.go @@ -2,7 +2,9 @@ package pushserver import ( "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/dendrite/pushserver/consumers" "github.com/matrix-org/dendrite/pushserver/internal" "github.com/matrix-org/dendrite/pushserver/inthttp" "github.com/matrix-org/dendrite/pushserver/producers" @@ -10,6 +12,7 @@ import ( roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/process" uapi "github.com/matrix-org/dendrite/userapi/api" "github.com/sirupsen/logrus" ) @@ -24,27 +27,40 @@ func AddInternalRoutes(router *mux.Router, intAPI api.PushserverInternalAPI) { // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( cfg *config.PushServer, + process *process.ProcessContext, + pgClient pushgateway.Client, rsAPI roomserverAPI.RoomserverInternalAPI, userAPI uapi.UserInternalAPI, ) api.PushserverInternalAPI { + consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + db, err := storage.Open(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to push server db") } _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) - syncProducer := &producers.SyncAPIProducer{ - Producer: producer, + syncProducer := producers.NewSyncAPI( + db, + producer, // TODO: user API should handle syncs for account data. Right now, // it's handled by clientapi, and hence uses its topic. When user // API handles it for all account data, we can remove it from // here. - ClientDataTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData), - } + cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData), + cfg.Matrix.Kafka.TopicFor(config.TopicOutputNotificationData), + ) psAPI := internal.NewPushserverAPI( cfg, db, userAPI, syncProducer, ) + rsConsumer := consumers.NewOutputRoomEventConsumer( + process, cfg, consumer, db, pgClient, psAPI, rsAPI, syncProducer, + ) + if err := rsConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start push server room server consumer") + } + return psAPI } diff --git a/pushserver/util/devices.go b/pushserver/util/devices.go new file mode 100644 index 000000000..8447e4d54 --- /dev/null +++ b/pushserver/util/devices.go @@ -0,0 +1,100 @@ +package util + +import ( + "context" + + "github.com/matrix-org/dendrite/internal/pushgateway" + "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/dendrite/pushserver/storage" + log "github.com/sirupsen/logrus" +) + +type PusherDevice struct { + Device pushgateway.Device + Pusher *api.Pusher + URL string + Format string +} + +// GetPushDevices pushes to the configured devices of a local user. +func GetPushDevices(ctx context.Context, localpart string, tweaks map[string]interface{}, db storage.Database) ([]*PusherDevice, error) { + pushers, err := db.GetPushers(ctx, localpart) + if err != nil { + return nil, err + } + + devices := make([]*PusherDevice, 0, len(pushers)) + for _, pusher := range pushers { + var url, format string + data := pusher.Data + switch pusher.Kind { + case api.EmailKind: + url = "mailto:" + + case api.HTTPKind: + // TODO: The spec says only event_id_only is supported, + // but Sytests assume "" means "full notification". + fmtIface := pusher.Data["format"] + var ok bool + format, ok = fmtIface.(string) + if ok && format != "event_id_only" { + log.WithFields(log.Fields{ + "localpart": localpart, + "app_id": pusher.AppID, + }).Errorf("Only data.format event_id_only or empty is supported") + continue + } + + urlIface := pusher.Data["url"] + url, ok = urlIface.(string) + if !ok { + log.WithFields(log.Fields{ + "localpart": localpart, + "app_id": pusher.AppID, + }).Errorf("No data.url configured for HTTP Pusher") + continue + } + data = mapWithout(data, "url") + + default: + log.WithFields(log.Fields{ + "localpart": localpart, + "app_id": pusher.AppID, + "kind": pusher.Kind, + }).Errorf("Unhandled pusher kind") + continue + } + + devices = append(devices, &PusherDevice{ + Device: pushgateway.Device{ + AppID: pusher.AppID, + Data: data, + PushKey: pusher.PushKey, + PushKeyTS: pusher.PushKeyTS, + Tweaks: tweaks, + }, + Pusher: &pusher, + URL: url, + Format: format, + }) + } + + return devices, nil +} + +// mapWithout returns a shallow copy of the map, without the given +// key. Returns nil if the resulting map is empty. +func mapWithout(m map[string]interface{}, key string) map[string]interface{} { + ret := make(map[string]interface{}, len(m)) + for k, v := range m { + // The specification says we do not send "url". + if k == key { + continue + } + ret[k] = v + } + if len(ret) == 0 { + return nil + } + return ret +} diff --git a/pushserver/util/notify.go b/pushserver/util/notify.go new file mode 100644 index 000000000..198f5b7aa --- /dev/null +++ b/pushserver/util/notify.go @@ -0,0 +1,76 @@ +package util + +import ( + "context" + "strings" + "time" + + "github.com/matrix-org/dendrite/internal/pushgateway" + "github.com/matrix-org/dendrite/pushserver/storage" + "github.com/matrix-org/dendrite/pushserver/storage/tables" + log "github.com/sirupsen/logrus" +) + +// NotifyUserCountsAsync sends notifications to a local user's +// notification destinations. Database lookups run synchronously, but +// a single goroutine is started when talking to the Push +// gateways. There is no way to know when the background goroutine has +// finished. +func NotifyUserCountsAsync(ctx context.Context, pgClient pushgateway.Client, localpart string, db storage.Database) error { + pusherDevices, err := GetPushDevices(ctx, localpart, nil, db) + if err != nil { + return err + } + + if len(pusherDevices) == 0 { + return nil + } + + userNumUnreadNotifs, err := db.GetNotificationCount(ctx, localpart, tables.AllNotifications) + if err != nil { + return err + } + + log.WithFields(log.Fields{ + "localpart": localpart, + "app_id0": pusherDevices[0].Device.AppID, + "pushkey": pusherDevices[0].Device.PushKey, + }).Tracef("Notifying HTTP push gateway about notification counts") + + // TODO: think about bounding this to one per user, and what + // ordering guarantees we must provide. + go func() { + // This background processing cannot be tied to a request. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // TODO: we could batch all devices with the same URL, but + // Sytest requires consumers/roomserver.go to do it + // one-by-one, so we do the same here. + for _, pusherDevice := range pusherDevices { + // TODO: support "email". + if !strings.HasPrefix(pusherDevice.URL, "http") { + continue + } + + req := pushgateway.NotifyRequest{ + Notification: pushgateway.Notification{ + Counts: &pushgateway.Counts{ + Unread: int(userNumUnreadNotifs), + }, + Devices: []*pushgateway.Device{&pusherDevice.Device}, + }, + } + if err := pgClient.Notify(ctx, pusherDevice.URL, &req, &pushgateway.NotifyResponse{}); err != nil { + log.WithFields(log.Fields{ + "localpart": localpart, + "app_id0": pusherDevice.Device.AppID, + "pushkey": pusherDevice.Device.PushKey, + }).WithError(err).Error("HTTP push gateway request failed") + return + } + } + }() + + return nil +} diff --git a/setup/base/base.go b/setup/base/base.go index 93c65d34f..cda8cc8f1 100644 --- a/setup/base/base.go +++ b/setup/base/base.go @@ -30,6 +30,7 @@ import ( sentryhttp "github.com/getsentry/sentry-go/http" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/gomatrixserverlib" "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/atomic" @@ -284,6 +285,11 @@ func (b *BaseDendrite) PushServerHTTPClient() pushserverAPI.PushserverInternalAP return f } +// PushGatewayHTTPClient returns a new client for interacting with (external) Push Gateways. +func (b *BaseDendrite) PushGatewayHTTPClient() pushgateway.Client { + return pushgateway.NewHTTPClient(b.Cfg.PushServer.DisableTLSValidation) +} + // CreateAccountsDB creates a new instance of the accounts database. Should only // be called once per component. func (b *BaseDendrite) CreateAccountsDB() accounts.Database { diff --git a/setup/config/config_pushserver.go b/setup/config/config_pushserver.go index 08ac08f76..437d72635 100644 --- a/setup/config/config_pushserver.go +++ b/setup/config/config_pushserver.go @@ -6,6 +6,11 @@ type PushServer struct { InternalAPI InternalAPIOptions `yaml:"internal_api"` Database DatabaseOptions `yaml:"database"` + + // DisableTLSValidation disables the validation of X.509 TLS certs + // on remote Push gateway endpoints. This is not recommended in + // production! + DisableTLSValidation bool `yaml:"disable_tls_validation"` } func (c *PushServer) Defaults() {