Use PushGatewayClient and the pushrules module in Pushserver's room consumer.

* Use one goroutine per user to avoid locking up the entire server for
  one bad push gateway.
* Split pushing by format.
* Send one device per push. Sytest does not support coalescing
  multiple devices into one push. Matches Synapse. Either we change
  Sytest, or remove the group-by-url-and-format logic.
* Write OutputNotificationData from push server. Sync API is already
  the consumer.
This commit is contained in:
Tommie Gannert 2021-10-10 11:51:34 +02:00
parent 926252671b
commit ce1255e7db
14 changed files with 1183 additions and 17 deletions

View file

@ -604,7 +604,7 @@ func Setup(
r0mux.Handle("/pushrules/{scope}/{kind}/{ruleID}",
httputil.MakeAuthAPI("push_rules", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))

View file

@ -33,6 +33,7 @@ import (
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/pushserver"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/config"
@ -172,6 +173,9 @@ func main() {
base, keyRing,
)
pgClient := base.Base.PushGatewayHTTPClient()
psAPI := pushserver.NewInternalAPI(&cfg.PushServer, base.Base.ProcessContext, pgClient, rsAPI, userAPI)
monolith := setup.Monolith{
Config: base.Base.Cfg,
AccountDB: accountDB,
@ -182,6 +186,7 @@ func main() {
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,
FederationAPI: fsAPI,
PushserverAPI: psAPI,
RoomserverAPI: rsAPI,
UserAPI: userAPI,
KeyAPI: keyAPI,

View file

@ -36,6 +36,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/pushserver"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/base"
@ -124,6 +125,13 @@ func main() {
rsComponent.SetFederationAPI(fsAPI)
rsComponent.SetKeyring(keyRing)
pgClient := base.PushGatewayHTTPClient()
psAPI := pushserver.NewInternalAPI(&cfg.PushServer, base.ProcessContext, pgClient, rsAPI, userAPI)
if base.UseHTTPAPIs {
pushserver.AddInternalRoutes(base.InternalAPIMux, psAPI)
psAPI = base.PushServerHTTPClient()
}
monolith := setup.Monolith{
Config: base.Cfg,
AccountDB: accountDB,
@ -134,6 +142,7 @@ func main() {
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,
FederationAPI: fsAPI,
PushserverAPI: psAPI,
RoomserverAPI: rsAPI,
UserAPI: userAPI,
KeyAPI: keyAPI,

View file

@ -148,7 +148,8 @@ func main() {
eduInputAPI = base.EDUServerClient()
}
psAPI := pushserver.NewInternalAPI(&base.Cfg.PushServer, rsAPI, userAPI)
pgClient := base.PushGatewayHTTPClient()
psAPI := pushserver.NewInternalAPI(&base.Cfg.PushServer, base.ProcessContext, pgClient, rsAPI, userAPI)
if base.UseHTTPAPIs {
pushserver.AddInternalRoutes(base.InternalAPIMux, psAPI)
psAPI = base.PushServerHTTPClient()

View file

@ -22,7 +22,8 @@ import (
)
func PushServer(base *basepkg.BaseDendrite, cfg *config.Dendrite, rsAPI roomserverAPI.RoomserverInternalAPI) {
intAPI := pushserver.NewInternalAPI(&cfg.PushServer, rsAPI, base.UserAPIClient())
pgClient := base.PushGatewayHTTPClient()
intAPI := pushserver.NewInternalAPI(&cfg.PushServer, base.ProcessContext, pgClient, rsAPI, base.UserAPIClient())
pushserver.AddInternalRoutes(base.InternalAPIMux, intAPI)

View file

@ -0,0 +1,629 @@
package consumers
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/pushrules"
"github.com/matrix-org/dendrite/pushserver/api"
"github.com/matrix-org/dendrite/pushserver/producers"
"github.com/matrix-org/dendrite/pushserver/storage"
"github.com/matrix-org/dendrite/pushserver/storage/tables"
"github.com/matrix-org/dendrite/pushserver/util"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
type OutputRoomEventConsumer struct {
cfg *config.PushServer
rsAPI rsapi.RoomserverInternalAPI
psAPI api.PushserverInternalAPI
pgClient pushgateway.Client
rsConsumer *internal.ContinualConsumer
db storage.Database
syncProducer *producers.SyncAPI
}
func NewOutputRoomEventConsumer(
process *process.ProcessContext,
cfg *config.PushServer,
kafkaConsumer sarama.Consumer,
store storage.Database,
pgClient pushgateway.Client,
psAPI api.PushserverInternalAPI,
rsAPI rsapi.RoomserverInternalAPI,
syncProducer *producers.SyncAPI,
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "pushserver/roomserver",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
cfg: cfg,
rsConsumer: &consumer,
db: store,
rsAPI: rsAPI,
psAPI: psAPI,
pgClient: pgClient,
syncProducer: syncProducer,
}
consumer.ProcessMessage = s.onMessage
return s
}
func (s *OutputRoomEventConsumer) Start() error {
return s.rsConsumer.Start()
}
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
ctx := context.Background()
var output rsapi.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
log.WithError(err).Errorf("pushserver consumer: message parse failure")
return nil
}
log.WithFields(log.Fields{
"event_type": output.Type,
}).Tracef("Received message from room server: %#v", output)
switch output.Type {
case rsapi.OutputTypeNewRoomEvent:
ev := output.NewRoomEvent.Event
if err := s.processMessage(ctx, output.NewRoomEvent.Event); err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"event": string(ev.JSON()),
}).WithError(err).Errorf("pushserver consumer: process room event failure")
}
case rsapi.OutputTypeNewInviteEvent:
ev := output.NewInviteEvent.Event
if err := s.processMessage(ctx, output.NewInviteEvent.Event); err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"event": string(ev.JSON()),
}).WithError(err).Errorf("pushserver consumer: process invite event failure")
}
default:
// Ignore old events, peeks, so on.
}
return nil
}
func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
log.WithFields(log.Fields{
"event_type": event.Type(),
}).Tracef("Received event from room server: %#v", event)
members, roomSize, err := s.localRoomMembers(ctx, event.RoomID())
if err != nil {
return err
}
if event.Type() == gomatrixserverlib.MRoomMember {
cevent := gomatrixserverlib.HeaderedToClientEvent(event, gomatrixserverlib.FormatAll)
member, err := newLocalMembership(&cevent)
if err != nil {
return err
}
if member.Membership == gomatrixserverlib.Invite && member.Domain == s.cfg.Matrix.ServerName {
// localRoomMembers only adds joined members. An invite
// should also be pushed to the target user.
members = append(members, member)
}
}
// TODO: run in parallel with localRoomMembers.
roomName, err := s.roomName(ctx, event)
if err != nil {
return err
}
log.WithFields(log.Fields{
"event_id": event.EventID(),
"room_id": event.RoomID(),
"num_members": len(members),
"room_size": roomSize,
}).Tracef("Notifying members")
// Notification.UserIsTarget is a per-member field, so we
// cannot group all users in a single request.
//
// TODO: does it have to be set? It's not required, and
// removing it means we can send all notifications to
// e.g. Element's Push gateway in one go.
for _, mem := range members {
if err := s.notifyLocal(ctx, event, mem, roomSize, roomName); err != nil {
log.WithFields(log.Fields{
"localpart": mem.Localpart,
}).WithError(err).Errorf("Unable to push to local user")
continue
}
}
return nil
}
type localMembership struct {
gomatrixserverlib.MemberContent
UserID string
Localpart string
Domain gomatrixserverlib.ServerName
}
func newLocalMembership(event *gomatrixserverlib.ClientEvent) (*localMembership, error) {
if event.StateKey == nil {
return nil, fmt.Errorf("missing state_key")
}
var member localMembership
if err := json.Unmarshal(event.Content, &member.MemberContent); err != nil {
return nil, err
}
localpart, domain, err := gomatrixserverlib.SplitID('@', *event.StateKey)
if err != nil {
return nil, err
}
member.UserID = *event.StateKey
member.Localpart = localpart
member.Domain = domain
return &member, nil
}
// localRoomMembers fetches the current local members of a room, and
// the total number of members.
func (s *OutputRoomEventConsumer) localRoomMembers(ctx context.Context, roomID string) ([]*localMembership, int, error) {
req := &rsapi.QueryMembershipsForRoomRequest{
RoomID: roomID,
JoinedOnly: true,
}
var res rsapi.QueryMembershipsForRoomResponse
// XXX: This could potentially race if the state for the event is not known yet
// e.g. the event came over federation but we do not have the full state persisted.
if err := s.rsAPI.QueryMembershipsForRoom(ctx, req, &res); err != nil {
return nil, 0, err
}
var members []*localMembership
var ntotal int
for _, event := range res.JoinEvents {
member, err := newLocalMembership(&event)
if err != nil {
log.WithError(err).Errorf("Parsing MemberContent")
continue
}
if member.Membership != gomatrixserverlib.Join {
continue
}
if member.Domain != s.cfg.Matrix.ServerName {
continue
}
ntotal++
members = append(members, member)
}
return members, ntotal, nil
}
// roomName returns the name in the event (if type==m.room.name), or
// looks it up in roomserver. If there is no name,
// m.room.canonical_alias is consulted. Returns an empty string if the
// room has no name.
func (s *OutputRoomEventConsumer) roomName(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) (string, error) {
if event.Type() == gomatrixserverlib.MRoomName {
name, err := unmarshalRoomName(event)
if err != nil {
return "", err
}
if name != "" {
return name, nil
}
}
req := &rsapi.QueryCurrentStateRequest{
RoomID: event.RoomID(),
StateTuples: []gomatrixserverlib.StateKeyTuple{roomNameTuple, canonicalAliasTuple},
}
var res rsapi.QueryCurrentStateResponse
if err := s.rsAPI.QueryCurrentState(ctx, req, &res); err != nil {
return "", err
}
if event := res.StateEvents[roomNameTuple]; event != nil {
return unmarshalRoomName(event)
}
if event.Type() == gomatrixserverlib.MRoomCanonicalAlias {
alias, err := unmarshalCanonicalAlias(event)
if err != nil {
return "", err
}
if alias != "" {
return alias, nil
}
}
if event := res.StateEvents[canonicalAliasTuple]; event != nil {
return unmarshalCanonicalAlias(event)
}
return "", nil
}
var (
canonicalAliasTuple = gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias}
roomNameTuple = gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomName}
)
func unmarshalRoomName(event *gomatrixserverlib.HeaderedEvent) (string, error) {
var nc eventutil.NameContent
if err := json.Unmarshal(event.Content(), &nc); err != nil {
return "", fmt.Errorf("unmarshaling NameContent: %w", err)
}
return nc.Name, nil
}
func unmarshalCanonicalAlias(event *gomatrixserverlib.HeaderedEvent) (string, error) {
var cac eventutil.CanonicalAliasContent
if err := json.Unmarshal(event.Content(), &cac); err != nil {
return "", fmt.Errorf("unmarshaling CanonicalAliasContent: %w", err)
}
return cac.Alias, nil
}
// notifyLocal finds the right push actions for a local user, given an event.
func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, mem *localMembership, roomSize int, roomName string) error {
actions, err := s.evaluatePushRules(ctx, event, mem, roomSize)
if err != nil {
return err
}
a, tweaks, err := pushrules.ActionsToTweaks(actions)
if err != nil {
return err
}
// TODO: support coalescing.
if a != pushrules.NotifyAction && a != pushrules.CoalesceAction {
log.WithFields(log.Fields{
"event_id": event.EventID(),
"room_id": event.RoomID(),
"localpart": mem.Localpart,
}).Tracef("Push rule evaluation rejected the event")
return nil
}
devicesByURLAndFormat, profileTag, err := s.localPushDevices(ctx, mem.Localpart, tweaks)
if err != nil {
return err
}
n := &api.Notification{
Actions: actions,
// UNSPEC: the spec doesn't say this is a ClientEvent, but the
// fields seem to match. room_id should be missing, which
// matches the behavior of FormatSync.
Event: gomatrixserverlib.HeaderedToClientEvent(event, gomatrixserverlib.FormatSync),
// TODO: this is per-device, but it's not part of the primary
// key. So inserting one notification per profile tag doesn't
// make sense. What is this supposed to be? Sytests require it
// to "work", but they only use a single device.
ProfileTag: profileTag,
RoomID: event.RoomID(),
TS: gomatrixserverlib.AsTimestamp(time.Now()),
}
if err := s.db.InsertNotification(ctx, mem.Localpart, event.EventID(), tweaks, n); err != nil {
return err
}
if err := s.syncProducer.GetAndSendNotificationData(ctx, mem.UserID, event.RoomID()); err != nil {
return err
}
// We do this after InsertNotification. Thus, this should always return >=1.
userNumUnreadNotifs, err := s.db.GetNotificationCount(ctx, mem.Localpart, tables.AllNotifications)
if err != nil {
return err
}
log.WithFields(log.Fields{
"event_id": event.EventID(),
"room_id": event.RoomID(),
"localpart": mem.Localpart,
"num_urls": len(devicesByURLAndFormat),
"num_unread": userNumUnreadNotifs,
}).Tracef("Notifying single member")
// Push gateways are out of our control, and we cannot risk
// looking up the server on a misbehaving push gateway. Each user
// receives a goroutine now that all internal API calls have been
// made.
//
// TODO: think about bounding this to one per user, and what
// ordering guarantees we must provide.
go func() {
// This background processing cannot be tied to a request.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
var rejected []*pushgateway.Device
for url, fmts := range devicesByURLAndFormat {
for format, devices := range fmts {
// TODO: support "email".
if !strings.HasPrefix(url, "http") {
continue
}
// UNSPEC: the specification suggests there can be
// more than one device per request. There is at least
// one Sytest that expects one HTTP request per
// device, rather than per URL. For now, we must
// notify each one separately.
for _, dev := range devices {
rej, err := s.notifyHTTP(ctx, event, url, format, []*pushgateway.Device{dev}, mem.Localpart, roomName, int(userNumUnreadNotifs))
if err != nil {
log.WithFields(log.Fields{
"event_id": event.EventID(),
"localpart": mem.Localpart,
}).WithError(err).Errorf("Unable to notify HTTP pusher")
continue
}
rejected = append(rejected, rej...)
}
}
}
if len(rejected) > 0 {
if err := s.deleteRejectedPushers(ctx, rejected, mem.Localpart); err != nil {
log.WithFields(log.Fields{
"localpart": mem.Localpart,
"num_pushers": len(rejected),
}).WithError(err).Errorf("Unable to delete rejected pushers")
}
}
}()
return nil
}
// evaluatePushRules fetches and evaluates the push rules of a local
// user. Returns actions (including dont_notify).
func (s *OutputRoomEventConsumer) evaluatePushRules(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, mem *localMembership, roomSize int) ([]*pushrules.Action, error) {
if event.Sender() == mem.UserID {
// SPEC: Homeservers MUST NOT notify the Push Gateway for
// events that the user has sent themselves.
return nil, nil
}
var res api.QueryPushRulesResponse
if err := s.psAPI.QueryPushRules(ctx, &api.QueryPushRulesRequest{UserID: mem.UserID}, &res); err != nil {
return nil, err
}
ec := &ruleSetEvalContext{
ctx: ctx,
rsAPI: s.rsAPI,
mem: mem,
roomID: event.RoomID(),
roomSize: roomSize,
}
eval := pushrules.NewRuleSetEvaluator(ec, &res.RuleSets.Global)
rule, err := eval.MatchEvent(event.Event)
if err != nil {
return nil, err
}
if rule == nil {
// SPEC: If no rules match an event, the homeserver MUST NOT
// notify the Push Gateway for that event.
return nil, err
}
log.WithFields(log.Fields{
"event_id": event.EventID(),
"room_id": event.RoomID(),
"localpart": mem.Localpart,
"rule_id": rule.RuleID,
}).Tracef("Matched a push rule")
return rule.Actions, nil
}
type ruleSetEvalContext struct {
ctx context.Context
rsAPI rsapi.RoomserverInternalAPI
mem *localMembership
roomID string
roomSize int
}
func (rse *ruleSetEvalContext) UserDisplayName() string { return rse.mem.DisplayName }
func (rse *ruleSetEvalContext) RoomMemberCount() (int, error) { return rse.roomSize, nil }
func (rse *ruleSetEvalContext) HasPowerLevel(userID, levelKey string) (bool, error) {
req := &rsapi.QueryLatestEventsAndStateRequest{
RoomID: rse.roomID,
StateToFetch: []gomatrixserverlib.StateKeyTuple{
{EventType: "m.room.power_levels"},
},
}
var res rsapi.QueryLatestEventsAndStateResponse
if err := rse.rsAPI.QueryLatestEventsAndState(rse.ctx, req, &res); err != nil {
return false, err
}
for _, ev := range res.StateEvents {
if ev.Type() != gomatrixserverlib.MRoomPowerLevels {
continue
}
plc, err := gomatrixserverlib.NewPowerLevelContentFromEvent(ev.Event)
if err != nil {
return false, err
}
return plc.UserLevel(userID) >= plc.NotificationLevel(levelKey), nil
}
return true, nil
}
// localPushDevices pushes to the configured devices of a local
// user. The map keys are [url][format].
func (s *OutputRoomEventConsumer) localPushDevices(ctx context.Context, localpart string, tweaks map[string]interface{}) (map[string]map[string][]*pushgateway.Device, string, error) {
pusherDevices, err := util.GetPushDevices(ctx, localpart, tweaks, s.db)
if err != nil {
return nil, "", err
}
var profileTag string
devicesByURL := make(map[string]map[string][]*pushgateway.Device, len(pusherDevices))
for _, pusherDevice := range pusherDevices {
if profileTag == "" {
profileTag = pusherDevice.Pusher.ProfileTag
}
url := pusherDevice.URL
if devicesByURL[url] == nil {
devicesByURL[url] = make(map[string][]*pushgateway.Device, 2)
}
devicesByURL[url][pusherDevice.Format] = append(devicesByURL[url][pusherDevice.Format], &pusherDevice.Device)
}
return devicesByURL, profileTag, nil
}
// notifyHTTP performs a notificatation to a Push Gateway.
func (s *OutputRoomEventConsumer) notifyHTTP(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, url, format string, devices []*pushgateway.Device, localpart, roomName string, userNumUnreadNotifs int) ([]*pushgateway.Device, error) {
var req pushgateway.NotifyRequest
switch format {
case "event_id_only":
req = pushgateway.NotifyRequest{
Notification: pushgateway.Notification{
Counts: &pushgateway.Counts{},
Devices: devices,
EventID: event.EventID(),
RoomID: event.RoomID(),
},
}
default:
req = pushgateway.NotifyRequest{
Notification: pushgateway.Notification{
Content: event.Content(),
Counts: &pushgateway.Counts{
Unread: userNumUnreadNotifs,
},
Devices: devices,
EventID: event.EventID(),
ID: event.EventID(),
RoomID: event.RoomID(),
RoomName: roomName,
Sender: event.Sender(),
Type: event.Type(),
},
}
if mem, err := event.Membership(); err == nil {
req.Notification.Membership = mem
}
if event.StateKey() != nil && *event.StateKey() == fmt.Sprintf("@%s:%s", localpart, s.cfg.Matrix.ServerName) {
req.Notification.UserIsTarget = true
}
}
log.WithFields(log.Fields{
"event_id": event.EventID(),
"url": url,
"localpart": localpart,
"app_id0": devices[0].AppID,
"pushkey": devices[0].PushKey,
"num_devices": len(devices),
}).Debugf("Notifying HTTP push gateway")
var res pushgateway.NotifyResponse
if err := s.pgClient.Notify(ctx, url, &req, &res); err != nil {
return nil, err
}
log.WithFields(log.Fields{
"event_id": event.EventID(),
"url": url,
"localpart": localpart,
"num_rejected": len(res.Rejected),
}).Tracef("HTTP push gateway result")
if len(res.Rejected) == 0 {
return nil, nil
}
devMap := make(map[string]*pushgateway.Device, len(devices))
for _, d := range devices {
devMap[d.PushKey] = d
}
rejected := make([]*pushgateway.Device, 0, len(res.Rejected))
for _, pushKey := range res.Rejected {
d := devMap[pushKey]
if d != nil {
rejected = append(rejected, d)
}
}
return rejected, nil
}
// deleteRejectedPushers deletes the pushers associated with the given devices.
func (s *OutputRoomEventConsumer) deleteRejectedPushers(ctx context.Context, devices []*pushgateway.Device, localpart string) error {
log.WithFields(log.Fields{
"localpart": localpart,
"app_id0": devices[0].AppID,
"num_devices": len(devices),
}).Infof("Deleting pushers rejected by the HTTP push gateway")
for _, d := range devices {
if err := s.db.RemovePusher(ctx, d.AppID, d.PushKey, localpart); err != nil {
log.WithFields(log.Fields{
"localpart": localpart,
}).WithError(err).Errorf("Unable to delete rejected pusher")
}
}
return nil
}
// mapWithout returns a shallow copy of the map, without the given
// key. Returns nil if the resulting map is empty.
func mapWithout(m map[string]interface{}, key string) map[string]interface{} {
ret := make(map[string]interface{}, len(m))
for k, v := range m {
// The specification says we do not send "url".
if k == key {
continue
}
ret[k] = v
}
if len(ret) == 0 {
return nil
}
return ret
}

View file

@ -0,0 +1,260 @@
package consumers
import (
"context"
"encoding/json"
"sync"
"testing"
"github.com/Shopify/sarama"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/pushrules"
"github.com/matrix-org/dendrite/pushserver/api"
"github.com/matrix-org/dendrite/pushserver/producers"
"github.com/matrix-org/dendrite/pushserver/storage"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
)
const serverName = gomatrixserverlib.ServerName("example.org")
func TestOutputRoomEventConsumer(t *testing.T) {
ctx := context.Background()
dbopts := &config.DatabaseOptions{
ConnectionString: "file::memory:",
MaxOpenConnections: 1,
MaxIdleConnections: 1,
}
db, err := storage.Open(dbopts)
if err != nil {
t.Fatalf("NewDatabase failed: %v", err)
}
err = db.CreatePusher(ctx,
api.Pusher{
PushKey: "apushkey",
Kind: api.HTTPKind,
AppID: "anappid",
Data: map[string]interface{}{
"url": "http://example.org/pusher/notify",
"extra": "someextra",
},
},
"alice")
if err != nil {
t.Fatalf("CreatePusher failed: %v", err)
}
var rsAPI fakeRoomServerInternalAPI
var psAPI fakePushserverInternalAPI
var messageSender fakeMessageSender
var wg sync.WaitGroup
wg.Add(1)
pgClient := fakePushGatewayClient{
WG: &wg,
}
s := &OutputRoomEventConsumer{
cfg: &config.PushServer{
Matrix: &config.Global{
ServerName: serverName,
},
},
db: db,
rsAPI: &rsAPI,
psAPI: &psAPI,
pgClient: &pgClient,
syncProducer: producers.NewSyncAPI(db, &messageSender, "clientDataTopic", "notificationDataTopic"),
}
event, err := gomatrixserverlib.NewEventFromTrustedJSONWithEventID("$143273582443PhrSn:example.org", []byte(`{
"content": {
"body": "This is an example text message",
"format": "org.matrix.custom.html",
"formatted_body": "\u003cb\u003eThis is an example text message\u003c/b\u003e",
"msgtype": "m.text"
},
"origin_server_ts": 1432735824653,
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"sender": "@example:example.org",
"type": "m.room.message",
"unsigned": {
"age": 1234
}
}`), false, gomatrixserverlib.RoomVersionV7)
if err != nil {
t.Fatalf("NewEventFromTrustedJSON failed: %v", err)
}
ev := &gomatrixserverlib.HeaderedEvent{
Event: event,
}
if err := s.processMessage(ctx, ev); err != nil {
t.Fatalf("processMessage failed: %v", err)
}
t.Log("Waiting for backend calls to finish.")
wg.Wait()
if diff := cmp.Diff([]*rsapi.QueryMembershipsForRoomRequest{{JoinedOnly: true, RoomID: "!jEsUZKDJdhlrceRyVU:example.org"}}, rsAPI.MembershipReqs); diff != "" {
t.Errorf("rsAPI.QueryMembershipsForRoom Reqs: +got -want:\n%s", diff)
}
if diff := cmp.Diff([]*pushgateway.NotifyRequest{{
Notification: pushgateway.Notification{
Type: "m.room.message",
Content: event.Content(),
Counts: &pushgateway.Counts{
Unread: 1,
},
Devices: []*pushgateway.Device{{
AppID: "anappid",
PushKey: "apushkey",
Data: map[string]interface{}{
"extra": "someextra",
},
}},
EventID: "$143273582443PhrSn:example.org",
ID: "$143273582443PhrSn:example.org",
RoomID: "!jEsUZKDJdhlrceRyVU:example.org",
RoomName: "aname",
Sender: "@example:example.org",
},
}}, pgClient.Reqs); diff != "" {
t.Errorf("pgClient.NotifyHTTP Reqs: +got -want:\n%s", diff)
}
if diff := cmp.Diff([]sarama.ProducerMessage{{
Topic: "notificationDataTopic",
Key: sarama.StringEncoder("@alice:example.org"),
Value: sarama.ByteEncoder([]byte(`{"room_id":"!jEsUZKDJdhlrceRyVU:example.org","unread_highlight_count":0,"unread_notification_count":1}`)),
}}, messageSender.Messages, cmpopts.IgnoreUnexported(sarama.ProducerMessage{})); diff != "" {
t.Errorf("SendMessage Messages: +got -want:\n%s", diff)
}
}
type fakeRoomServerInternalAPI struct {
rsapi.RoomserverInternalAPI
MembershipReqs []*rsapi.QueryMembershipsForRoomRequest
}
func (s *fakeRoomServerInternalAPI) QueryCurrentState(
ctx context.Context,
req *rsapi.QueryCurrentStateRequest,
res *rsapi.QueryCurrentStateResponse,
) error {
*res = rsapi.QueryCurrentStateResponse{
StateEvents: map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent{
roomNameTuple: mustParseHeaderedEventJSON(`{
"_room_version": "7",
"content": {
"name": "aname"
},
"event_id": "$3957tyerfgewrf382:example.org",
"origin_server_ts": 1432735824652,
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"sender": "@example:example.org",
"state_key": "@alice:example.org",
"type": "m.room.name"
}`),
},
}
return nil
}
func (s *fakeRoomServerInternalAPI) QueryMembershipsForRoom(
ctx context.Context,
req *rsapi.QueryMembershipsForRoomRequest,
res *rsapi.QueryMembershipsForRoomResponse,
) error {
s.MembershipReqs = append(s.MembershipReqs, req)
*res = rsapi.QueryMembershipsForRoomResponse{
JoinEvents: []gomatrixserverlib.ClientEvent{
mustParseClientEventJSON(`{
"content": {
"avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
"displayname": "Alice Margatroid",
"membership": "join",
"reason": "Looking for support"
},
"event_id": "$3957tyerfgewrf384:example.org",
"origin_server_ts": 1432735824653,
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"sender": "@example:example.org",
"state_key": "@alice:example.org",
"type": "m.room.member",
"unsigned": {
"age": 1234
}
}`),
},
}
return nil
}
type fakePushserverInternalAPI struct {
api.PushserverInternalAPI
}
func (s *fakePushserverInternalAPI) QueryPushRules(ctx context.Context, req *api.QueryPushRulesRequest, res *api.QueryPushRulesResponse) error {
localpart, _, err := gomatrixserverlib.SplitID('@', req.UserID)
if err != nil {
return err
}
res.RuleSets = pushrules.DefaultAccountRuleSets(localpart, "example.org")
return nil
}
type fakePushGatewayClient struct {
pushgateway.Client
WG *sync.WaitGroup
Reqs []*pushgateway.NotifyRequest
}
func (c *fakePushGatewayClient) Notify(ctx context.Context, url string, req *pushgateway.NotifyRequest, res *pushgateway.NotifyResponse) error {
c.Reqs = append(c.Reqs, req)
if c.WG != nil {
c.WG.Done()
}
*res = pushgateway.NotifyResponse{
Rejected: []string{
"apushkey",
},
}
return nil
}
func mustMarshalJSON(v interface{}) []byte {
bs, err := json.Marshal(v)
if err != nil {
panic(err)
}
return bs
}
func mustParseClientEventJSON(s string) gomatrixserverlib.ClientEvent {
var ev gomatrixserverlib.ClientEvent
if err := json.Unmarshal([]byte(s), &ev); err != nil {
panic(err)
}
return ev
}
func mustParseHeaderedEventJSON(s string) *gomatrixserverlib.HeaderedEvent {
var ev gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal([]byte(s), &ev); err != nil {
panic(err)
}
return &ev
}
type fakeMessageSender struct {
Messages []sarama.ProducerMessage
}
func (s *fakeMessageSender) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
s.Messages = append(s.Messages, *msg)
return 0, 0, nil
}

View file

@ -22,11 +22,11 @@ type PushserverInternalAPI struct {
Cfg *config.PushServer
DB storage.Database
userAPI uapi.UserInternalAPI
syncProducer *producers.SyncAPIProducer
syncProducer *producers.SyncAPI
}
func NewPushserverAPI(
cfg *config.PushServer, pushserverDB storage.Database, userAPI uapi.UserInternalAPI, syncProducer *producers.SyncAPIProducer,
cfg *config.PushServer, pushserverDB storage.Database, userAPI uapi.UserInternalAPI, syncProducer *producers.SyncAPI,
) *PushserverInternalAPI {
a := &PushserverInternalAPI{
Cfg: cfg,

View file

@ -1,21 +1,35 @@
package producers
import (
"context"
"encoding/json"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/pushserver/storage"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
// SyncAPIProducer produces messages for the Sync API server to consume.
type SyncAPIProducer struct {
Producer sarama.SyncProducer
ClientDataTopic string
// SyncAPI produces messages for the Sync API server to consume.
type SyncAPI struct {
db storage.Database
producer MessageSender
clientDataTopic string
notificationDataTopic string
}
func NewSyncAPI(db storage.Database, producer MessageSender, clientDataTopic string, notificationDataTopic string) *SyncAPI {
return &SyncAPI{
db: db,
producer: producer,
clientDataTopic: clientDataTopic,
notificationDataTopic: notificationDataTopic,
}
}
// SendAccountData sends account data to the Sync API server.
func (p *SyncAPIProducer) SendAccountData(userID string, roomID string, dataType string) error {
func (p *SyncAPI) SendAccountData(userID string, roomID string, dataType string) error {
var m sarama.ProducerMessage
data := eventutil.AccountData{
@ -27,15 +41,59 @@ func (p *SyncAPIProducer) SendAccountData(userID string, roomID string, dataType
return err
}
m.Topic = string(p.ClientDataTopic)
m.Topic = string(p.clientDataTopic)
m.Key = sarama.StringEncoder(userID)
m.Value = sarama.ByteEncoder(value)
log.WithFields(log.Fields{
"user_id": userID,
"room_id": roomID,
"data_type": dataType,
}).Infof("Producing to topic '%s'", m.Topic)
}).Infof("Producing to topic %q", m.Topic)
_, _, err = p.Producer.SendMessage(&m)
_, _, err = p.producer.SendMessage(&m)
return err
}
// GetAndSendNotificationData reads the database and sends data about unread
// notifications to the Sync API server.
func (p *SyncAPI) GetAndSendNotificationData(ctx context.Context, userID, roomID string) error {
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return err
}
ntotal, nhighlight, err := p.db.GetRoomNotificationCounts(ctx, localpart, roomID)
if err != nil {
return err
}
return p.sendNotificationData(userID, &eventutil.NotificationData{
RoomID: roomID,
UnreadHighlightCount: int(nhighlight),
UnreadNotificationCount: int(ntotal),
})
}
// sendNotificationData sends data about unread notifications to the Sync API server.
func (p *SyncAPI) sendNotificationData(userID string, data *eventutil.NotificationData) error {
value, err := json.Marshal(data)
if err != nil {
return err
}
var m sarama.ProducerMessage
m.Topic = string(p.notificationDataTopic)
m.Key = sarama.StringEncoder(userID)
m.Value = sarama.ByteEncoder(value)
log.WithFields(log.Fields{
"user_id": userID,
"room_id": data.RoomID,
}).Infof("Producing to topic %q", m.Topic)
_, _, err = p.producer.SendMessage(&m)
return err
}
type MessageSender interface {
SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
}

View file

@ -2,7 +2,9 @@ package pushserver
import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/pushserver/api"
"github.com/matrix-org/dendrite/pushserver/consumers"
"github.com/matrix-org/dendrite/pushserver/internal"
"github.com/matrix-org/dendrite/pushserver/inthttp"
"github.com/matrix-org/dendrite/pushserver/producers"
@ -10,6 +12,7 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/kafka"
"github.com/matrix-org/dendrite/setup/process"
uapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/sirupsen/logrus"
)
@ -24,27 +27,40 @@ func AddInternalRoutes(router *mux.Router, intAPI api.PushserverInternalAPI) {
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI(
cfg *config.PushServer,
process *process.ProcessContext,
pgClient pushgateway.Client,
rsAPI roomserverAPI.RoomserverInternalAPI,
userAPI uapi.UserInternalAPI,
) api.PushserverInternalAPI {
consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
db, err := storage.Open(&cfg.Database)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to push server db")
}
_, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
syncProducer := &producers.SyncAPIProducer{
Producer: producer,
syncProducer := producers.NewSyncAPI(
db,
producer,
// TODO: user API should handle syncs for account data. Right now,
// it's handled by clientapi, and hence uses its topic. When user
// API handles it for all account data, we can remove it from
// here.
ClientDataTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData),
}
cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData),
cfg.Matrix.Kafka.TopicFor(config.TopicOutputNotificationData),
)
psAPI := internal.NewPushserverAPI(
cfg, db, userAPI, syncProducer,
)
rsConsumer := consumers.NewOutputRoomEventConsumer(
process, cfg, consumer, db, pgClient, psAPI, rsAPI, syncProducer,
)
if err := rsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start push server room server consumer")
}
return psAPI
}

100
pushserver/util/devices.go Normal file
View file

@ -0,0 +1,100 @@
package util
import (
"context"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/pushserver/api"
"github.com/matrix-org/dendrite/pushserver/storage"
log "github.com/sirupsen/logrus"
)
type PusherDevice struct {
Device pushgateway.Device
Pusher *api.Pusher
URL string
Format string
}
// GetPushDevices pushes to the configured devices of a local user.
func GetPushDevices(ctx context.Context, localpart string, tweaks map[string]interface{}, db storage.Database) ([]*PusherDevice, error) {
pushers, err := db.GetPushers(ctx, localpart)
if err != nil {
return nil, err
}
devices := make([]*PusherDevice, 0, len(pushers))
for _, pusher := range pushers {
var url, format string
data := pusher.Data
switch pusher.Kind {
case api.EmailKind:
url = "mailto:"
case api.HTTPKind:
// TODO: The spec says only event_id_only is supported,
// but Sytests assume "" means "full notification".
fmtIface := pusher.Data["format"]
var ok bool
format, ok = fmtIface.(string)
if ok && format != "event_id_only" {
log.WithFields(log.Fields{
"localpart": localpart,
"app_id": pusher.AppID,
}).Errorf("Only data.format event_id_only or empty is supported")
continue
}
urlIface := pusher.Data["url"]
url, ok = urlIface.(string)
if !ok {
log.WithFields(log.Fields{
"localpart": localpart,
"app_id": pusher.AppID,
}).Errorf("No data.url configured for HTTP Pusher")
continue
}
data = mapWithout(data, "url")
default:
log.WithFields(log.Fields{
"localpart": localpart,
"app_id": pusher.AppID,
"kind": pusher.Kind,
}).Errorf("Unhandled pusher kind")
continue
}
devices = append(devices, &PusherDevice{
Device: pushgateway.Device{
AppID: pusher.AppID,
Data: data,
PushKey: pusher.PushKey,
PushKeyTS: pusher.PushKeyTS,
Tweaks: tweaks,
},
Pusher: &pusher,
URL: url,
Format: format,
})
}
return devices, nil
}
// mapWithout returns a shallow copy of the map, without the given
// key. Returns nil if the resulting map is empty.
func mapWithout(m map[string]interface{}, key string) map[string]interface{} {
ret := make(map[string]interface{}, len(m))
for k, v := range m {
// The specification says we do not send "url".
if k == key {
continue
}
ret[k] = v
}
if len(ret) == 0 {
return nil
}
return ret
}

76
pushserver/util/notify.go Normal file
View file

@ -0,0 +1,76 @@
package util
import (
"context"
"strings"
"time"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/pushserver/storage"
"github.com/matrix-org/dendrite/pushserver/storage/tables"
log "github.com/sirupsen/logrus"
)
// NotifyUserCountsAsync sends notifications to a local user's
// notification destinations. Database lookups run synchronously, but
// a single goroutine is started when talking to the Push
// gateways. There is no way to know when the background goroutine has
// finished.
func NotifyUserCountsAsync(ctx context.Context, pgClient pushgateway.Client, localpart string, db storage.Database) error {
pusherDevices, err := GetPushDevices(ctx, localpart, nil, db)
if err != nil {
return err
}
if len(pusherDevices) == 0 {
return nil
}
userNumUnreadNotifs, err := db.GetNotificationCount(ctx, localpart, tables.AllNotifications)
if err != nil {
return err
}
log.WithFields(log.Fields{
"localpart": localpart,
"app_id0": pusherDevices[0].Device.AppID,
"pushkey": pusherDevices[0].Device.PushKey,
}).Tracef("Notifying HTTP push gateway about notification counts")
// TODO: think about bounding this to one per user, and what
// ordering guarantees we must provide.
go func() {
// This background processing cannot be tied to a request.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// TODO: we could batch all devices with the same URL, but
// Sytest requires consumers/roomserver.go to do it
// one-by-one, so we do the same here.
for _, pusherDevice := range pusherDevices {
// TODO: support "email".
if !strings.HasPrefix(pusherDevice.URL, "http") {
continue
}
req := pushgateway.NotifyRequest{
Notification: pushgateway.Notification{
Counts: &pushgateway.Counts{
Unread: int(userNumUnreadNotifs),
},
Devices: []*pushgateway.Device{&pusherDevice.Device},
},
}
if err := pgClient.Notify(ctx, pusherDevice.URL, &req, &pushgateway.NotifyResponse{}); err != nil {
log.WithFields(log.Fields{
"localpart": localpart,
"app_id0": pusherDevice.Device.AppID,
"pushkey": pusherDevice.Device.PushKey,
}).WithError(err).Error("HTTP push gateway request failed")
return
}
}
}()
return nil
}

View file

@ -30,6 +30,7 @@ import (
sentryhttp "github.com/getsentry/sentry-go/http"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/atomic"
@ -284,6 +285,11 @@ func (b *BaseDendrite) PushServerHTTPClient() pushserverAPI.PushserverInternalAP
return f
}
// PushGatewayHTTPClient returns a new client for interacting with (external) Push Gateways.
func (b *BaseDendrite) PushGatewayHTTPClient() pushgateway.Client {
return pushgateway.NewHTTPClient(b.Cfg.PushServer.DisableTLSValidation)
}
// CreateAccountsDB creates a new instance of the accounts database. Should only
// be called once per component.
func (b *BaseDendrite) CreateAccountsDB() accounts.Database {

View file

@ -6,6 +6,11 @@ type PushServer struct {
InternalAPI InternalAPIOptions `yaml:"internal_api"`
Database DatabaseOptions `yaml:"database"`
// DisableTLSValidation disables the validation of X.509 TLS certs
// on remote Push gateway endpoints. This is not recommended in
// production!
DisableTLSValidation bool `yaml:"disable_tls_validation"`
}
func (c *PushServer) Defaults() {