mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-21 21:13:09 -06:00
Tweak logging, add tests
This commit is contained in:
parent
c61e941762
commit
26a639fa19
|
|
@ -29,7 +29,6 @@ import (
|
||||||
type OutputStreamEventConsumer struct {
|
type OutputStreamEventConsumer struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cfg *config.UserAPI
|
cfg *config.UserAPI
|
||||||
userAPI api.UserInternalAPI
|
|
||||||
rsAPI rsapi.UserRoomserverAPI
|
rsAPI rsapi.UserRoomserverAPI
|
||||||
jetstream nats.JetStreamContext
|
jetstream nats.JetStreamContext
|
||||||
durable string
|
durable string
|
||||||
|
|
@ -45,7 +44,6 @@ func NewOutputStreamEventConsumer(
|
||||||
js nats.JetStreamContext,
|
js nats.JetStreamContext,
|
||||||
store storage.Database,
|
store storage.Database,
|
||||||
pgClient pushgateway.Client,
|
pgClient pushgateway.Client,
|
||||||
userAPI api.UserInternalAPI,
|
|
||||||
rsAPI rsapi.UserRoomserverAPI,
|
rsAPI rsapi.UserRoomserverAPI,
|
||||||
syncProducer *producers.SyncAPI,
|
syncProducer *producers.SyncAPI,
|
||||||
) *OutputStreamEventConsumer {
|
) *OutputStreamEventConsumer {
|
||||||
|
|
@ -57,7 +55,6 @@ func NewOutputStreamEventConsumer(
|
||||||
durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"),
|
durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"),
|
||||||
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent),
|
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent),
|
||||||
pgClient: pgClient,
|
pgClient: pgClient,
|
||||||
userAPI: userAPI,
|
|
||||||
rsAPI: rsAPI,
|
rsAPI: rsAPI,
|
||||||
syncProducer: syncProducer,
|
syncProducer: syncProducer,
|
||||||
}
|
}
|
||||||
|
|
@ -305,7 +302,7 @@ func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *goma
|
||||||
"event_id": event.EventID(),
|
"event_id": event.EventID(),
|
||||||
"room_id": event.RoomID(),
|
"room_id": event.RoomID(),
|
||||||
"localpart": mem.Localpart,
|
"localpart": mem.Localpart,
|
||||||
}).Tracef("Push rule evaluation rejected the event")
|
}).Debugf("Push rule evaluation rejected the event")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -348,7 +345,7 @@ func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *goma
|
||||||
"localpart": mem.Localpart,
|
"localpart": mem.Localpart,
|
||||||
"num_urls": len(devicesByURLAndFormat),
|
"num_urls": len(devicesByURLAndFormat),
|
||||||
"num_unread": userNumUnreadNotifs,
|
"num_unread": userNumUnreadNotifs,
|
||||||
}).Tracef("Notifying single member")
|
}).Debugf("Notifying single member")
|
||||||
|
|
||||||
// Push gateways are out of our control, and we cannot risk
|
// Push gateways are out of our control, and we cannot risk
|
||||||
// looking up the server on a misbehaving push gateway. Each user
|
// looking up the server on a misbehaving push gateway. Each user
|
||||||
|
|
@ -422,8 +419,8 @@ func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event
|
||||||
return nil, fmt.Errorf("user %s is ignored", sender)
|
return nil, fmt.Errorf("user %s is ignored", sender)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var res api.QueryPushRulesResponse
|
ruleSets, err := s.db.QueryPushRules(ctx, mem.Localpart)
|
||||||
if err = s.userAPI.QueryPushRules(ctx, &api.QueryPushRulesRequest{UserID: mem.UserID}, &res); err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -434,7 +431,7 @@ func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event
|
||||||
roomID: event.RoomID(),
|
roomID: event.RoomID(),
|
||||||
roomSize: roomSize,
|
roomSize: roomSize,
|
||||||
}
|
}
|
||||||
eval := pushrules.NewRuleSetEvaluator(ec, &res.RuleSets.Global)
|
eval := pushrules.NewRuleSetEvaluator(ec, &ruleSets.Global)
|
||||||
rule, err := eval.MatchEvent(event.Event)
|
rule, err := eval.MatchEvent(event.Event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
||||||
129
userapi/consumers/syncapi_streamevent_test.go
Normal file
129
userapi/consumers/syncapi_streamevent_test.go
Normal file
|
|
@ -0,0 +1,129 @@
|
||||||
|
package consumers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal/pushrules"
|
||||||
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
"github.com/matrix-org/dendrite/test"
|
||||||
|
"github.com/matrix-org/dendrite/userapi/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
|
||||||
|
t.Helper()
|
||||||
|
connStr, close := test.PrepareDBConnectionString(t, dbType)
|
||||||
|
db, err := storage.NewUserAPIDatabase(nil, &config.DatabaseOptions{
|
||||||
|
ConnectionString: config.DataSource(connStr),
|
||||||
|
}, "", 4, 0, 0, "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create new user db: %v", err)
|
||||||
|
}
|
||||||
|
return db, close
|
||||||
|
}
|
||||||
|
|
||||||
|
func mustCreateEvent(t *testing.T, content string) *gomatrixserverlib.HeaderedEvent {
|
||||||
|
t.Helper()
|
||||||
|
ev, err := gomatrixserverlib.NewEventFromTrustedJSON([]byte(content), false, gomatrixserverlib.RoomVersionV10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create event: %v", err)
|
||||||
|
}
|
||||||
|
return ev.Headered(gomatrixserverlib.RoomVersionV10)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_evaluatePushRules(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||||
|
db, close := mustCreateDatabase(t, dbType)
|
||||||
|
defer close()
|
||||||
|
consumer := OutputStreamEventConsumer{db: db}
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
eventContent string
|
||||||
|
wantAction pushrules.ActionKind
|
||||||
|
wantActions []*pushrules.Action
|
||||||
|
wantNotify bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "m.receipt doesn't notify",
|
||||||
|
eventContent: `{"type":"m.receipt"}`,
|
||||||
|
wantAction: pushrules.UnknownAction,
|
||||||
|
wantActions: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "m.reaction doesn't notify",
|
||||||
|
eventContent: `{"type":"m.reaction"}`,
|
||||||
|
wantAction: pushrules.DontNotifyAction,
|
||||||
|
wantActions: []*pushrules.Action{
|
||||||
|
{
|
||||||
|
Kind: pushrules.DontNotifyAction,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "m.room.message notifies",
|
||||||
|
eventContent: `{"type":"m.room.message"}`,
|
||||||
|
wantNotify: true,
|
||||||
|
wantAction: pushrules.NotifyAction,
|
||||||
|
wantActions: []*pushrules.Action{
|
||||||
|
{Kind: pushrules.NotifyAction},
|
||||||
|
{
|
||||||
|
Kind: pushrules.SetTweakAction,
|
||||||
|
Tweak: pushrules.HighlightTweak,
|
||||||
|
Value: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "m.room.message highlights",
|
||||||
|
eventContent: `{"type":"m.room.message", "content": {"body": "test"} }`,
|
||||||
|
wantNotify: true,
|
||||||
|
wantAction: pushrules.NotifyAction,
|
||||||
|
wantActions: []*pushrules.Action{
|
||||||
|
{Kind: pushrules.NotifyAction},
|
||||||
|
{
|
||||||
|
Kind: pushrules.SetTweakAction,
|
||||||
|
Tweak: pushrules.SoundTweak,
|
||||||
|
Value: "default",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Kind: pushrules.SetTweakAction,
|
||||||
|
Tweak: pushrules.HighlightTweak,
|
||||||
|
Value: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
actions, err := consumer.evaluatePushRules(ctx, mustCreateEvent(t, tc.eventContent), &localMembership{
|
||||||
|
UserID: "@test:localhost",
|
||||||
|
Localpart: "test",
|
||||||
|
Domain: "localhost",
|
||||||
|
}, 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to evaluate push rules: %v", err)
|
||||||
|
}
|
||||||
|
assert.Equal(t, tc.wantActions, actions)
|
||||||
|
gotAction, _, err := pushrules.ActionsToTweaks(actions)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to get actions: %v", err)
|
||||||
|
}
|
||||||
|
if gotAction != tc.wantAction {
|
||||||
|
t.Fatalf("expected action to be '%s', got '%s'", tc.wantAction, gotAction)
|
||||||
|
}
|
||||||
|
// this is taken from `notifyLocal`
|
||||||
|
if tc.wantNotify && gotAction != pushrules.NotifyAction && gotAction != pushrules.CoalesceAction {
|
||||||
|
t.Fatalf("expected to notify but didn't")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue