Store ignore knowledge in the sync API

This commit is contained in:
Neil Alexander 2022-04-07 14:44:35 +01:00
parent 8f25a1021c
commit c192c6096b
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
20 changed files with 250 additions and 36 deletions

View file

@ -40,7 +40,7 @@ type SyncAPIProducer struct {
}
// SendData sends account data to the sync API server
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON) error {
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON, ignoredUsers *types.IgnoredUsers) error {
m := &nats.Msg{
Subject: p.TopicClientData,
Header: nats.Header{},
@ -51,6 +51,7 @@ func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string
RoomID: roomID,
Type: dataType,
ReadMarker: readMarker,
IgnoredUsers: ignoredUsers,
}
var err error
m.Data, err = json.Marshal(data)

View file

@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal/eventutil"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
@ -126,8 +127,14 @@ func SaveAccountData(
return util.ErrorResponse(err)
}
var ignoredUsers *types.IgnoredUsers
if dataType == "m.ignored_user_list" {
ignoredUsers = &types.IgnoredUsers{}
_ = json.Unmarshal(body, ignoredUsers)
}
// TODO: user API should do this since it's account data
if err := syncProducer.SendData(userID, roomID, dataType, nil); err != nil {
if err := syncProducer.SendData(userID, roomID, dataType, nil, ignoredUsers); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed")
return jsonerror.InternalServerError()
}
@ -184,7 +191,7 @@ func SaveReadMarker(
return util.ErrorResponse(err)
}
if err := syncProducer.SendData(device.UserID, roomID, "m.fully_read", &r); err != nil {
if err := syncProducer.SendData(device.UserID, roomID, "m.fully_read", &r, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed")
return jsonerror.InternalServerError()
}

View file

@ -98,7 +98,7 @@ func PutTag(
return jsonerror.InternalServerError()
}
if err = syncProducer.SendData(userID, roomID, "m.tag", nil); err != nil {
if err = syncProducer.SendData(userID, roomID, "m.tag", nil, nil); err != nil {
logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi")
}
@ -151,7 +151,7 @@ func DeleteTag(
}
// TODO: user API should do this since it's account data
if err := syncProducer.SendData(userID, roomID, "m.tag", nil); err != nil {
if err := syncProducer.SendData(userID, roomID, "m.tag", nil, nil); err != nil {
logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi")
}

View file

@ -17,6 +17,8 @@ package eventutil
import (
"errors"
"strconv"
"github.com/matrix-org/dendrite/syncapi/types"
)
// ErrProfileNoExists is returned when trying to lookup a user's profile that
@ -29,6 +31,7 @@ type AccountData struct {
RoomID string `json:"room_id"`
Type string `json:"type"`
ReadMarker *ReadMarkerJSON `json:"read_marker,omitempty"` // optional
IgnoredUsers *types.IgnoredUsers `json:"ignored_users,omitempty"` // optional
}
type ReadMarkerJSON struct {

View file

@ -119,6 +119,15 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg)
return false
}
if output.IgnoredUsers != nil {
if err := s.db.UpdateIgnoresForUser(ctx, userID, output.IgnoredUsers); err != nil {
log.WithError(err).WithFields(logrus.Fields{
"user_id": userID,
}).Errorf("Failed to update ignored users")
sentry.CaptureException(err)
}
}
s.stream.Advance(streamPos)
s.notifier.OnNewAccountData(userID, types.StreamingToken{AccountDataPosition: streamPos})

View file

@ -149,4 +149,7 @@ type Database interface {
SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
StreamToTopologicalPosition(ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool) (types.TopologyToken, error)
IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error)
UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error
}

View file

@ -0,0 +1,87 @@
// Copyright 2017 Jan Christian Grünhage
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"encoding/json"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
)
const ignoresSchema = `
-- Stores data about ignoress
CREATE TABLE IF NOT EXISTS syncapi_ignores (
-- The user ID whose ignore list this belongs to.
user_id TEXT NOT NULL,
ignores_json TEXT NOT NULL,
PRIMARY KEY(user_id)
);
`
const selectIgnoresSQL = "" +
"SELECT ignores_json FROM syncapi_ignores WHERE user_id = $1"
const upsertIgnoresSQL = "" +
"INSERT INTO syncapi_ignores (user_id, ignores_json) VALUES ($1, $2)" +
" ON CONFLICT (user_id) DO UPDATE set ignores_json = $2"
type ignoresStatements struct {
selectIgnoresStmt *sql.Stmt
upsertIgnoresStmt *sql.Stmt
}
func NewPostgresIgnoresTable(db *sql.DB) (tables.Ignores, error) {
_, err := db.Exec(ignoresSchema)
if err != nil {
return nil, err
}
s := &ignoresStatements{}
if s.selectIgnoresStmt, err = db.Prepare(selectIgnoresSQL); err != nil {
return nil, err
}
if s.upsertIgnoresStmt, err = db.Prepare(upsertIgnoresSQL); err != nil {
return nil, err
}
return s, nil
}
func (s *ignoresStatements) SelectIgnores(
ctx context.Context, userID string,
) (*types.IgnoredUsers, error) {
var ignoresData []byte
err := s.selectIgnoresStmt.QueryRowContext(ctx, userID).Scan(&ignoresData)
if err != nil {
return nil, err
}
var ignores types.IgnoredUsers
if err = json.Unmarshal(ignoresData, &ignores); err != nil {
return nil, err
}
return &ignores, nil
}
func (s *ignoresStatements) UpsertIgnores(
ctx context.Context, userID string, ignores *types.IgnoredUsers,
) error {
ignoresJSON, err := json.Marshal(ignores)
if err != nil {
return err
}
_, err = s.upsertIgnoresStmt.ExecContext(ctx, userID, ignoresJSON)
return err
}

View file

@ -90,6 +90,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
if err != nil {
return nil, err
}
ignores, err := NewPostgresIgnoresTable(d.db)
if err != nil {
return nil, err
}
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
@ -111,6 +115,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
Receipts: receipts,
Memberships: memberships,
NotificationData: notificationData,
Ignores: ignores,
}
return &d, nil
}

View file

@ -48,6 +48,7 @@ type Database struct {
Receipts tables.Receipts
Memberships tables.Memberships
NotificationData tables.NotificationData
Ignores tables.Ignores
}
func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
@ -1002,3 +1003,11 @@ func (s *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID
func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) {
return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
}
func (s *Database) IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) {
return s.Ignores.SelectIgnores(ctx, userID)
}
func (s *Database) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error {
return s.Ignores.UpsertIgnores(ctx, userID, ignores)
}

View file

@ -0,0 +1,87 @@
// Copyright 2017 Jan Christian Grünhage
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"encoding/json"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
)
const ignoresSchema = `
-- Stores data about ignoress
CREATE TABLE IF NOT EXISTS syncapi_ignores (
-- The user ID whose ignore list this belongs to.
user_id TEXT NOT NULL,
ignores_json TEXT NOT NULL,
PRIMARY KEY(user_id)
);
`
const selectIgnoresSQL = "" +
"SELECT ignores_json FROM syncapi_ignores WHERE user_id = $1"
const upsertIgnoresSQL = "" +
"INSERT INTO syncapi_ignores (user_id, ignores_json) VALUES ($1, $2)" +
" ON CONFLICT DO UPDATE set ignores_json = $2"
type ignoresStatements struct {
selectIgnoresStmt *sql.Stmt
upsertIgnoresStmt *sql.Stmt
}
func NewSqliteIgnoresTable(db *sql.DB) (tables.Ignores, error) {
_, err := db.Exec(ignoresSchema)
if err != nil {
return nil, err
}
s := &ignoresStatements{}
if s.selectIgnoresStmt, err = db.Prepare(selectIgnoresSQL); err != nil {
return nil, err
}
if s.upsertIgnoresStmt, err = db.Prepare(upsertIgnoresSQL); err != nil {
return nil, err
}
return s, nil
}
func (s *ignoresStatements) SelectIgnores(
ctx context.Context, userID string,
) (*types.IgnoredUsers, error) {
var ignoresData []byte
err := s.selectIgnoresStmt.QueryRowContext(ctx, userID).Scan(&ignoresData)
if err != nil {
return nil, err
}
var ignores types.IgnoredUsers
if err = json.Unmarshal(ignoresData, &ignores); err != nil {
return nil, err
}
return &ignores, nil
}
func (s *ignoresStatements) UpsertIgnores(
ctx context.Context, userID string, ignores *types.IgnoredUsers,
) error {
ignoresJSON, err := json.Marshal(ignores)
if err != nil {
return err
}
_, err = s.upsertIgnoresStmt.ExecContext(ctx, userID, ignoresJSON)
return err
}

View file

@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
if err != nil {
return err
}
ignores, err := NewSqliteIgnoresTable(d.db)
if err != nil {
return err
}
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
@ -121,6 +125,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
Receipts: receipts,
Memberships: memberships,
NotificationData: notificationData,
Ignores: ignores,
}
return nil
}

View file

@ -182,3 +182,8 @@ type NotificationData interface {
SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error)
SelectMaxID(ctx context.Context) (int64, error)
}
type Ignores interface {
SelectIgnores(ctx context.Context, userID string) (*types.IgnoredUsers, error)
UpsertIgnores(ctx context.Context, userID string, ignores *types.IgnoredUsers) error
}

View file

@ -55,7 +55,7 @@ func (p *InviteStreamProvider) IncrementalSync(
for roomID, inviteEvent := range invites {
// skip ignored user events
if _, ok := req.IgnoredUsers[inviteEvent.Sender()]; ok {
if _, ok := req.IgnoredUsers.List[inviteEvent.Sender()]; ok {
continue
}
ir := types.NewInviteResponse(inviteEvent)

View file

@ -2,7 +2,7 @@ package streams
import (
"context"
"encoding/json"
"database/sql"
"sync"
"time"
@ -415,24 +415,17 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
// addIgnoredUsersToFilter adds ignored users to the eventfilter and
// the syncreq itself for further use in streams.
func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error {
accountData := userapi.QueryAccountDataResponse{}
err := p.userAPI.QueryAccountData(ctx, &userapi.QueryAccountDataRequest{
UserID: req.Device.UserID, RoomID: "", DataType: "m.ignored_user_list",
}, &accountData)
ignores, err := p.DB.SelectIgnores(ctx, req.Device.UserID)
if err != nil {
req.Log.WithError(err).Error("unable to query ignored users")
if err == sql.ErrNoRows {
return nil
}
return err
}
if data, ok := accountData.GlobalAccountData["m.ignored_user_list"]; ok {
err = json.Unmarshal(data, &req)
if err != nil {
req.Log.WithError(err).Error("unable to parse json")
return err
}
for userID := range req.IgnoredUsers {
req.IgnoredUsers = *ignores
for userID := range ignores.List {
eventFilter.NotSenders = append(eventFilter.NotSenders, userID)
}
}
return nil
}

View file

@ -55,7 +55,7 @@ func (p *ReceiptStreamProvider) IncrementalSync(
receiptsByRoom := make(map[string][]types.OutputReceiptEvent)
for _, receipt := range receipts {
// skip ignored user events
if _, ok := req.IgnoredUsers[receipt.UserID]; ok {
if _, ok := req.IgnoredUsers.List[receipt.UserID]; ok {
continue
}
receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt)

View file

@ -49,7 +49,7 @@ func (p *SendToDeviceStreamProvider) IncrementalSync(
// Add the updates into the sync response.
for _, event := range events {
// skip ignored user events
if _, ok := req.IgnoredUsers[event.Sender]; ok {
if _, ok := req.IgnoredUsers.List[event.Sender]; ok {
continue
}
req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent)

View file

@ -43,7 +43,7 @@ func (p *TypingStreamProvider) IncrementalSync(
typingUsers := make([]string, 0, len(users))
for i := range users {
// skip ignored user events
if _, ok := req.IgnoredUsers[users[i]]; !ok {
if _, ok := req.IgnoredUsers.List[users[i]]; !ok {
typingUsers = append(typingUsers, users[i])
}
}

View file

@ -22,7 +22,7 @@ type SyncRequest struct {
// Updated by the PDU stream.
Rooms map[string]string
// Updated by the PDU stream.
IgnoredUsers map[string]interface{} `json:"ignored_users"`
IgnoredUsers IgnoredUsers
}
type StreamProvider interface {

View file

@ -510,3 +510,7 @@ type OutputSendToDeviceEvent struct {
DeviceID string `json:"device_id"`
gomatrixserverlib.SendToDeviceEvent
}
type IgnoredUsers struct {
List map[string]interface{} `json:"ignored_users"`
}

View file

@ -395,10 +395,6 @@ func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *goma
return nil
}
type ignoredUsers struct {
List map[string]interface{} `json:"ignored_users"`
}
// evaluatePushRules fetches and evaluates the push rules of a local
// user. Returns actions (including dont_notify).
func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, mem *localMembership, roomSize int) ([]*pushrules.Action, error) {
@ -414,7 +410,7 @@ func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event
return nil, err
}
if data != nil {
ignored := ignoredUsers{}
ignored := types.IgnoredUsers{}
err = json.Unmarshal(data, &ignored)
if err != nil {
return nil, err