mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-01 03:03:10 -06:00
Merge branch 'matrix-org:main' into main
This commit is contained in:
commit
993922d0b6
21
CHANGES.md
21
CHANGES.md
|
|
@ -1,5 +1,26 @@
|
|||
# Changelog
|
||||
|
||||
## Dendrite 0.8.0 (2022-04-07)
|
||||
|
||||
### Features
|
||||
|
||||
* Support for presence has been added
|
||||
* Presence is not enabled by default
|
||||
* The `global.presence.enable_inbound` and `global.presence.enable_outbound` configuration options allow configuring inbound and outbound presence separately
|
||||
* Support for room upgrades via the `/room/{roomID}/upgrade` endpoint has been added (contributed by [DavidSpenler](https://github.com/DavidSpenler), [alexkursell](https://github.com/alexkursell))
|
||||
* Support for ignoring users has been added
|
||||
* Joined and invite user counts are now sent in the `/sync` room summaries
|
||||
* Queued federation and stale device list updates will now be staggered at startup over an up-to 2 minute warm-up period, rather than happening all at once
|
||||
* Memory pressure created by the sync notifier has been reduced
|
||||
* The EDU server component has now been removed, with the work being moved to more relevant components
|
||||
|
||||
### Fixes
|
||||
|
||||
* It is now possible to set the `power_level_content_override` when creating a room to include power levels over 100
|
||||
* `/send_join` and `/state` responses will now not unmarshal the JSON twice
|
||||
* The stream event consumer for push notifications will no longer request membership events that are irrelevant
|
||||
* Appservices will no longer incorrectly receive state events twice
|
||||
|
||||
## Dendrite 0.7.0 (2022-03-25)
|
||||
|
||||
### Features
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ type SyncAPIProducer struct {
|
|||
}
|
||||
|
||||
// SendData sends account data to the sync API server
|
||||
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON) error {
|
||||
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON, ignoredUsers *types.IgnoredUsers) error {
|
||||
m := &nats.Msg{
|
||||
Subject: p.TopicClientData,
|
||||
Header: nats.Header{},
|
||||
|
|
@ -53,6 +53,7 @@ func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string
|
|||
RoomID: roomID,
|
||||
Type: dataType,
|
||||
ReadMarker: readMarker,
|
||||
IgnoredUsers: ignoredUsers,
|
||||
}
|
||||
var err error
|
||||
m.Data, err = json.Marshal(data)
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/dendrite/userapi/api"
|
||||
|
||||
"github.com/matrix-org/util"
|
||||
|
|
@ -126,8 +127,14 @@ func SaveAccountData(
|
|||
return util.ErrorResponse(err)
|
||||
}
|
||||
|
||||
var ignoredUsers *types.IgnoredUsers
|
||||
if dataType == "m.ignored_user_list" {
|
||||
ignoredUsers = &types.IgnoredUsers{}
|
||||
_ = json.Unmarshal(body, ignoredUsers)
|
||||
}
|
||||
|
||||
// TODO: user API should do this since it's account data
|
||||
if err := syncProducer.SendData(userID, roomID, dataType, nil); err != nil {
|
||||
if err := syncProducer.SendData(userID, roomID, dataType, nil, ignoredUsers); err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed")
|
||||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
|
@ -184,7 +191,7 @@ func SaveReadMarker(
|
|||
return util.ErrorResponse(err)
|
||||
}
|
||||
|
||||
if err := syncProducer.SendData(device.UserID, roomID, "m.fully_read", &r); err != nil {
|
||||
if err := syncProducer.SendData(device.UserID, roomID, "m.fully_read", &r, nil); err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed")
|
||||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -70,7 +70,6 @@ func SetPresence(
|
|||
JSON: jsonerror.Unknown(fmt.Sprintf("Unknown presence '%s'.", presence.Presence)),
|
||||
}
|
||||
}
|
||||
|
||||
err := producer.SendPresence(req.Context(), userID, presenceStatus, presence.StatusMsg)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("failed to update presence")
|
||||
|
|
|
|||
|
|
@ -180,7 +180,7 @@ func SetAvatarURL(
|
|||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
||||
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, cfg.Matrix.ServerName, nil, false); err != nil {
|
||||
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, cfg.Matrix.ServerName, nil, true); err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
|
||||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -98,7 +98,7 @@ func PutTag(
|
|||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
||||
if err = syncProducer.SendData(userID, roomID, "m.tag", nil); err != nil {
|
||||
if err = syncProducer.SendData(userID, roomID, "m.tag", nil, nil); err != nil {
|
||||
logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi")
|
||||
}
|
||||
|
||||
|
|
@ -151,7 +151,7 @@ func DeleteTag(
|
|||
}
|
||||
|
||||
// TODO: user API should do this since it's account data
|
||||
if err := syncProducer.SendData(userID, roomID, "m.tag", nil); err != nil {
|
||||
if err := syncProducer.SendData(userID, roomID, "m.tag", nil, nil); err != nil {
|
||||
logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -413,7 +413,6 @@ func (t *txnReq) processPresence(ctx context.Context, e gomatrixserverlib.EDU) e
|
|||
for _, content := range payload.Push {
|
||||
presence, ok := syncTypes.PresenceFromString(content.Presence)
|
||||
if !ok {
|
||||
logrus.Warnf("invalid presence '%s', skipping.", content.Presence)
|
||||
continue
|
||||
}
|
||||
if err := t.producer.SendPresence(ctx, content.UserID, presence, content.StatusMsg, content.LastActiveAgo); err != nil {
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@ package eventutil
|
|||
import (
|
||||
"errors"
|
||||
"strconv"
|
||||
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
||||
// ErrProfileNoExists is returned when trying to lookup a user's profile that
|
||||
|
|
@ -29,6 +31,7 @@ type AccountData struct {
|
|||
RoomID string `json:"room_id"`
|
||||
Type string `json:"type"`
|
||||
ReadMarker *ReadMarkerJSON `json:"read_marker,omitempty"` // optional
|
||||
IgnoredUsers *types.IgnoredUsers `json:"ignored_users,omitempty"` // optional
|
||||
}
|
||||
|
||||
type ReadMarkerJSON struct {
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ var build string
|
|||
|
||||
const (
|
||||
VersionMajor = 0
|
||||
VersionMinor = 7
|
||||
VersionMinor = 8
|
||||
VersionPatch = 0
|
||||
VersionTag = "" // example: "rc1"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -119,6 +119,15 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg)
|
|||
return false
|
||||
}
|
||||
|
||||
if output.IgnoredUsers != nil {
|
||||
if err := s.db.UpdateIgnoresForUser(ctx, userID, output.IgnoredUsers); err != nil {
|
||||
log.WithError(err).WithFields(logrus.Fields{
|
||||
"user_id": userID,
|
||||
}).Errorf("Failed to update ignored users")
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
}
|
||||
|
||||
s.stream.Advance(streamPos)
|
||||
s.notifier.OnNewAccountData(userID, types.StreamingToken{AccountDataPosition: streamPos})
|
||||
|
||||
|
|
|
|||
|
|
@ -150,6 +150,9 @@ type Database interface {
|
|||
SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
|
||||
|
||||
StreamToTopologicalPosition(ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool) (types.TopologyToken, error)
|
||||
|
||||
IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error)
|
||||
UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error
|
||||
}
|
||||
|
||||
type Presence interface {
|
||||
|
|
|
|||
87
syncapi/storage/postgres/ignores_table.go
Normal file
87
syncapi/storage/postgres/ignores_table.go
Normal file
|
|
@ -0,0 +1,87 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
||||
const ignoresSchema = `
|
||||
-- Stores data about ignoress
|
||||
CREATE TABLE IF NOT EXISTS syncapi_ignores (
|
||||
-- The user ID whose ignore list this belongs to.
|
||||
user_id TEXT NOT NULL,
|
||||
ignores_json TEXT NOT NULL,
|
||||
PRIMARY KEY(user_id)
|
||||
);
|
||||
`
|
||||
|
||||
const selectIgnoresSQL = "" +
|
||||
"SELECT ignores_json FROM syncapi_ignores WHERE user_id = $1"
|
||||
|
||||
const upsertIgnoresSQL = "" +
|
||||
"INSERT INTO syncapi_ignores (user_id, ignores_json) VALUES ($1, $2)" +
|
||||
" ON CONFLICT (user_id) DO UPDATE set ignores_json = $2"
|
||||
|
||||
type ignoresStatements struct {
|
||||
selectIgnoresStmt *sql.Stmt
|
||||
upsertIgnoresStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewPostgresIgnoresTable(db *sql.DB) (tables.Ignores, error) {
|
||||
_, err := db.Exec(ignoresSchema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := &ignoresStatements{}
|
||||
if s.selectIgnoresStmt, err = db.Prepare(selectIgnoresSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.upsertIgnoresStmt, err = db.Prepare(upsertIgnoresSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *ignoresStatements) SelectIgnores(
|
||||
ctx context.Context, userID string,
|
||||
) (*types.IgnoredUsers, error) {
|
||||
var ignoresData []byte
|
||||
err := s.selectIgnoresStmt.QueryRowContext(ctx, userID).Scan(&ignoresData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ignores types.IgnoredUsers
|
||||
if err = json.Unmarshal(ignoresData, &ignores); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ignores, nil
|
||||
}
|
||||
|
||||
func (s *ignoresStatements) UpsertIgnores(
|
||||
ctx context.Context, userID string, ignores *types.IgnoredUsers,
|
||||
) error {
|
||||
ignoresJSON, err := json.Marshal(ignores)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.upsertIgnoresStmt.ExecContext(ctx, userID, ignoresJSON)
|
||||
return err
|
||||
}
|
||||
|
|
@ -90,6 +90,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ignores, err := NewPostgresIgnoresTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
presence, err := NewPostgresPresenceTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -115,6 +119,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
|
|||
Receipts: receipts,
|
||||
Memberships: memberships,
|
||||
NotificationData: notificationData,
|
||||
Ignores: ignores,
|
||||
Presence: presence,
|
||||
}
|
||||
return &d, nil
|
||||
|
|
|
|||
|
|
@ -48,6 +48,7 @@ type Database struct {
|
|||
Receipts tables.Receipts
|
||||
Memberships tables.Memberships
|
||||
NotificationData tables.NotificationData
|
||||
Ignores tables.Ignores
|
||||
Presence tables.Presence
|
||||
}
|
||||
|
||||
|
|
@ -1004,6 +1005,14 @@ func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID s
|
|||
return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
|
||||
}
|
||||
|
||||
func (s *Database) IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) {
|
||||
return s.Ignores.SelectIgnores(ctx, userID)
|
||||
}
|
||||
|
||||
func (s *Database) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error {
|
||||
return s.Ignores.UpsertIgnores(ctx, userID, ignores)
|
||||
}
|
||||
|
||||
func (s *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
|
||||
return s.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync)
|
||||
}
|
||||
|
|
|
|||
87
syncapi/storage/sqlite3/ignores_table.go
Normal file
87
syncapi/storage/sqlite3/ignores_table.go
Normal file
|
|
@ -0,0 +1,87 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
||||
const ignoresSchema = `
|
||||
-- Stores data about ignoress
|
||||
CREATE TABLE IF NOT EXISTS syncapi_ignores (
|
||||
-- The user ID whose ignore list this belongs to.
|
||||
user_id TEXT NOT NULL,
|
||||
ignores_json TEXT NOT NULL,
|
||||
PRIMARY KEY(user_id)
|
||||
);
|
||||
`
|
||||
|
||||
const selectIgnoresSQL = "" +
|
||||
"SELECT ignores_json FROM syncapi_ignores WHERE user_id = $1"
|
||||
|
||||
const upsertIgnoresSQL = "" +
|
||||
"INSERT INTO syncapi_ignores (user_id, ignores_json) VALUES ($1, $2)" +
|
||||
" ON CONFLICT DO UPDATE set ignores_json = $2"
|
||||
|
||||
type ignoresStatements struct {
|
||||
selectIgnoresStmt *sql.Stmt
|
||||
upsertIgnoresStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewSqliteIgnoresTable(db *sql.DB) (tables.Ignores, error) {
|
||||
_, err := db.Exec(ignoresSchema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := &ignoresStatements{}
|
||||
if s.selectIgnoresStmt, err = db.Prepare(selectIgnoresSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.upsertIgnoresStmt, err = db.Prepare(upsertIgnoresSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *ignoresStatements) SelectIgnores(
|
||||
ctx context.Context, userID string,
|
||||
) (*types.IgnoredUsers, error) {
|
||||
var ignoresData []byte
|
||||
err := s.selectIgnoresStmt.QueryRowContext(ctx, userID).Scan(&ignoresData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ignores types.IgnoredUsers
|
||||
if err = json.Unmarshal(ignoresData, &ignores); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ignores, nil
|
||||
}
|
||||
|
||||
func (s *ignoresStatements) UpsertIgnores(
|
||||
ctx context.Context, userID string, ignores *types.IgnoredUsers,
|
||||
) error {
|
||||
ignoresJSON, err := json.Marshal(ignores)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.upsertIgnoresStmt.ExecContext(ctx, userID, ignoresJSON)
|
||||
return err
|
||||
}
|
||||
|
|
@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ignores, err := NewSqliteIgnoresTable(d.db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
presence, err := NewSqlitePresenceTable(d.db, &d.streamID)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -125,6 +129,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
|||
Receipts: receipts,
|
||||
Memberships: memberships,
|
||||
NotificationData: notificationData,
|
||||
Ignores: ignores,
|
||||
Presence: presence,
|
||||
}
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -183,6 +183,11 @@ type NotificationData interface {
|
|||
SelectMaxID(ctx context.Context) (int64, error)
|
||||
}
|
||||
|
||||
type Ignores interface {
|
||||
SelectIgnores(ctx context.Context, userID string) (*types.IgnoredUsers, error)
|
||||
UpsertIgnores(ctx context.Context, userID string, ignores *types.IgnoredUsers) error
|
||||
}
|
||||
|
||||
type Presence interface {
|
||||
UpsertPresence(ctx context.Context, txn *sql.Tx, userID string, statusMsg *string, presence types.Presence, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (pos types.StreamPosition, err error)
|
||||
GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.PresenceInternal, err error)
|
||||
|
|
|
|||
|
|
@ -54,6 +54,10 @@ func (p *InviteStreamProvider) IncrementalSync(
|
|||
}
|
||||
|
||||
for roomID, inviteEvent := range invites {
|
||||
// skip ignored user events
|
||||
if _, ok := req.IgnoredUsers.List[inviteEvent.Sender()]; ok {
|
||||
continue
|
||||
}
|
||||
ir := types.NewInviteResponse(inviteEvent)
|
||||
req.Response.Rooms.Invite[roomID] = *ir
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package streams
|
|||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
|
@ -25,6 +26,7 @@ type PDUStreamProvider struct {
|
|||
|
||||
tasks chan func()
|
||||
workers atomic.Int32
|
||||
userAPI userapi.UserInternalAPI
|
||||
}
|
||||
|
||||
func (p *PDUStreamProvider) worker() {
|
||||
|
|
@ -87,6 +89,10 @@ func (p *PDUStreamProvider) CompleteSync(
|
|||
stateFilter := req.Filter.Room.State
|
||||
eventFilter := req.Filter.Room.Timeline
|
||||
|
||||
if err = p.addIgnoredUsersToFilter(ctx, req, &eventFilter); err != nil {
|
||||
req.Log.WithError(err).Error("unable to update event filter with ignored users")
|
||||
}
|
||||
|
||||
// Build up a /sync response. Add joined rooms.
|
||||
var reqMutex sync.Mutex
|
||||
var reqWaitGroup sync.WaitGroup
|
||||
|
|
@ -175,6 +181,10 @@ func (p *PDUStreamProvider) IncrementalSync(
|
|||
return to
|
||||
}
|
||||
|
||||
if err = p.addIgnoredUsersToFilter(ctx, req, &eventFilter); err != nil {
|
||||
req.Log.WithError(err).Error("unable to update event filter with ignored users")
|
||||
}
|
||||
|
||||
newPos = from
|
||||
for _, delta := range stateDeltas {
|
||||
var pos types.StreamPosition
|
||||
|
|
@ -402,6 +412,23 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
|||
return jr, nil
|
||||
}
|
||||
|
||||
// addIgnoredUsersToFilter adds ignored users to the eventfilter and
|
||||
// the syncreq itself for further use in streams.
|
||||
func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error {
|
||||
ignores, err := p.DB.IgnoresForUser(ctx, req.Device.UserID)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
req.IgnoredUsers = *ignores
|
||||
for userID := range ignores.List {
|
||||
eventFilter.NotSenders = append(eventFilter.NotSenders, userID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
|
||||
for _, recentEv := range recentEvents {
|
||||
if recentEv.StateKey() == nil {
|
||||
|
|
|
|||
|
|
@ -111,11 +111,16 @@ func (p *PresenceStreamProvider) IncrementalSync(
|
|||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if _, known := types.PresenceFromString(presence.ClientFields.Presence); known {
|
||||
presence.ClientFields.LastActiveAgo = presence.LastActiveAgo()
|
||||
if presence.ClientFields.Presence == "online" {
|
||||
currentlyActive := presence.CurrentlyActive()
|
||||
presence.ClientFields.CurrentlyActive = ¤tlyActive
|
||||
}
|
||||
} else {
|
||||
presence.ClientFields.Presence = "offline"
|
||||
}
|
||||
|
||||
content, err := json.Marshal(presence.ClientFields)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -54,6 +54,10 @@ func (p *ReceiptStreamProvider) IncrementalSync(
|
|||
// Group receipts by room, so we can create one ClientEvent for every room
|
||||
receiptsByRoom := make(map[string][]types.OutputReceiptEvent)
|
||||
for _, receipt := range receipts {
|
||||
// skip ignored user events
|
||||
if _, ok := req.IgnoredUsers.List[receipt.UserID]; ok {
|
||||
continue
|
||||
}
|
||||
receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -48,6 +48,10 @@ func (p *SendToDeviceStreamProvider) IncrementalSync(
|
|||
|
||||
// Add the updates into the sync response.
|
||||
for _, event := range events {
|
||||
// skip ignored user events
|
||||
if _, ok := req.IgnoredUsers.List[event.Sender]; ok {
|
||||
continue
|
||||
}
|
||||
req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,11 +40,18 @@ func (p *TypingStreamProvider) IncrementalSync(
|
|||
if users, updated := p.EDUCache.GetTypingUsersIfUpdatedAfter(
|
||||
roomID, int64(from),
|
||||
); updated {
|
||||
typingUsers := make([]string, 0, len(users))
|
||||
for i := range users {
|
||||
// skip ignored user events
|
||||
if _, ok := req.IgnoredUsers.List[users[i]]; !ok {
|
||||
typingUsers = append(typingUsers, users[i])
|
||||
}
|
||||
}
|
||||
ev := gomatrixserverlib.ClientEvent{
|
||||
Type: gomatrixserverlib.MTyping,
|
||||
}
|
||||
ev.Content, err = json.Marshal(map[string]interface{}{
|
||||
"user_ids": users,
|
||||
"user_ids": typingUsers,
|
||||
})
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("json.Marshal failed")
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ func NewSyncStreamProviders(
|
|||
streams := &Streams{
|
||||
PDUStreamProvider: &PDUStreamProvider{
|
||||
StreamProvider: StreamProvider{DB: d},
|
||||
userAPI: userAPI,
|
||||
},
|
||||
TypingStreamProvider: &TypingStreamProvider{
|
||||
StreamProvider: StreamProvider{DB: d},
|
||||
|
|
|
|||
|
|
@ -122,10 +122,8 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
|||
|
||||
presenceID, ok := types.PresenceFromString(presence)
|
||||
if !ok { // this should almost never happen
|
||||
logrus.Errorf("unknown presence '%s'", presence)
|
||||
return
|
||||
}
|
||||
|
||||
newPresence := types.PresenceInternal{
|
||||
ClientFields: types.PresenceClientResponse{
|
||||
Presence: presenceID.String(),
|
||||
|
|
|
|||
|
|
@ -21,25 +21,41 @@ import (
|
|||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
//go:generate stringer -type=Presence -linecomment
|
||||
type Presence uint8
|
||||
|
||||
const (
|
||||
PresenceUnavailable Presence = iota + 1 // unavailable
|
||||
PresenceUnknown Presence = iota
|
||||
PresenceUnavailable // unavailable
|
||||
PresenceOnline // online
|
||||
PresenceOffline // offline
|
||||
)
|
||||
|
||||
func (p Presence) String() string {
|
||||
switch p {
|
||||
case PresenceUnavailable:
|
||||
return "unavailable"
|
||||
case PresenceOnline:
|
||||
return "online"
|
||||
case PresenceOffline:
|
||||
return "offline"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
// PresenceFromString returns the integer representation of the given input presence.
|
||||
// Returns false for ok, if input is not a valid presence value.
|
||||
func PresenceFromString(input string) (p Presence, ok bool) {
|
||||
for i := 0; i < len(_Presence_index)-1; i++ {
|
||||
l, r := _Presence_index[i], _Presence_index[i+1]
|
||||
if strings.EqualFold(input, _Presence_name[l:r]) {
|
||||
return Presence(i + 1), true
|
||||
func PresenceFromString(input string) (Presence, bool) {
|
||||
switch strings.ToLower(input) {
|
||||
case "unavailable":
|
||||
return PresenceUnavailable, true
|
||||
case "online":
|
||||
return PresenceOnline, true
|
||||
case "offline":
|
||||
return PresenceOffline, true
|
||||
default:
|
||||
return PresenceUnknown, false
|
||||
}
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
type PresenceInternal struct {
|
||||
|
|
|
|||
|
|
@ -1,26 +0,0 @@
|
|||
// Code generated by "stringer -type=Presence -linecomment"; DO NOT EDIT.
|
||||
|
||||
package types
|
||||
|
||||
import "strconv"
|
||||
|
||||
func _() {
|
||||
// An "invalid array index" compiler error signifies that the constant values have changed.
|
||||
// Re-run the stringer command to generate them again.
|
||||
var x [1]struct{}
|
||||
_ = x[PresenceUnavailable-1]
|
||||
_ = x[PresenceOnline-2]
|
||||
_ = x[PresenceOffline-3]
|
||||
}
|
||||
|
||||
const _Presence_name = "unavailableonlineoffline"
|
||||
|
||||
var _Presence_index = [...]uint8{0, 11, 17, 24}
|
||||
|
||||
func (i Presence) String() string {
|
||||
i -= 1
|
||||
if i >= Presence(len(_Presence_index)-1) {
|
||||
return "Presence(" + strconv.FormatInt(int64(i+1), 10) + ")"
|
||||
}
|
||||
return _Presence_name[_Presence_index[i]:_Presence_index[i+1]]
|
||||
}
|
||||
|
|
@ -1,42 +0,0 @@
|
|||
package types
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestPresenceFromString(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input string
|
||||
wantStatus Presence
|
||||
wantOk bool
|
||||
}{
|
||||
{
|
||||
name: "presence unavailable",
|
||||
input: "unavailable",
|
||||
wantStatus: PresenceUnavailable,
|
||||
wantOk: true,
|
||||
},
|
||||
{
|
||||
name: "presence online",
|
||||
input: "OnLINE",
|
||||
wantStatus: PresenceOnline,
|
||||
wantOk: true,
|
||||
},
|
||||
{
|
||||
name: "unknown presence",
|
||||
input: "unknown",
|
||||
wantStatus: 0,
|
||||
wantOk: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, got1 := PresenceFromString(tt.input)
|
||||
if got != tt.wantStatus {
|
||||
t.Errorf("PresenceFromString() got = %v, want %v", got, tt.wantStatus)
|
||||
}
|
||||
if got1 != tt.wantOk {
|
||||
t.Errorf("PresenceFromString() got1 = %v, want %v", got1, tt.wantOk)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -21,6 +21,8 @@ type SyncRequest struct {
|
|||
|
||||
// Updated by the PDU stream.
|
||||
Rooms map[string]string
|
||||
// Updated by the PDU stream.
|
||||
IgnoredUsers IgnoredUsers
|
||||
}
|
||||
|
||||
type StreamProvider interface {
|
||||
|
|
|
|||
|
|
@ -518,3 +518,7 @@ type OutputSendToDeviceEvent struct {
|
|||
DeviceID string `json:"device_id"`
|
||||
gomatrixserverlib.SendToDeviceEvent
|
||||
}
|
||||
|
||||
type IgnoredUsers struct {
|
||||
List map[string]interface{} `json:"ignored_users"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,10 +2,6 @@
|
|||
|
||||
Latest account data appears in v2 /sync
|
||||
|
||||
# Blacklisted because we don't support ignores yet
|
||||
|
||||
Ignore invite in incremental sync
|
||||
|
||||
# Relies on a rejected PL event which will never be accepted into the DAG
|
||||
|
||||
# Caused by <https://github.com/matrix-org/sytest/pull/911>
|
||||
|
|
|
|||
|
|
@ -284,8 +284,6 @@ local user can join room with version 4
|
|||
remote user can join room with version 3
|
||||
remote user can join room with version 4
|
||||
Remote user can backfill in a room with version 4
|
||||
# We don't support ignores yet, so ignore this for now - ha ha.
|
||||
# Ignore invite in incremental sync
|
||||
Outbound federation can send invites via v2 API
|
||||
User can invite local user to room with version 3
|
||||
User can invite local user to room with version 4
|
||||
|
|
@ -696,3 +694,6 @@ New federated private chats get full presence information (SYN-115)
|
|||
/upgrade copies >100 power levels to the new room
|
||||
Room state after a rejected message event is the same as before
|
||||
Room state after a rejected state event is the same as before
|
||||
Ignore user in existing room
|
||||
Ignore invite in full sync
|
||||
Ignore invite in incremental sync
|
||||
|
|
@ -404,8 +404,24 @@ func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// Get accountdata to check if the event.Sender() is ignored by mem.LocalPart
|
||||
data, err := s.db.GetAccountDataByType(ctx, mem.Localpart, "", "m.ignored_user_list")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if data != nil {
|
||||
ignored := types.IgnoredUsers{}
|
||||
err = json.Unmarshal(data, &ignored)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sender := event.Sender()
|
||||
if _, ok := ignored.List[sender]; ok {
|
||||
return nil, fmt.Errorf("user %s is ignored", sender)
|
||||
}
|
||||
}
|
||||
var res api.QueryPushRulesResponse
|
||||
if err := s.userAPI.QueryPushRules(ctx, &api.QueryPushRulesRequest{UserID: mem.UserID}, &res); err != nil {
|
||||
if err = s.userAPI.QueryPushRules(ctx, &api.QueryPushRulesRequest{UserID: mem.UserID}, &res); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue