From 54ece78a12023171b17968b293aa0f05c9af1b83 Mon Sep 17 00:00:00 2001 From: Tommie Gannert Date: Thu, 28 Oct 2021 11:34:08 +0200 Subject: [PATCH] Output rooms.join.unread_notifications in /sync. This is the read-side. Pushserver will be the write-side. --- internal/eventutil/types.go | 16 +++ setup/config/config_kafka.go | 1 + syncapi/consumers/pushserver.go | 111 ++++++++++++++++++ syncapi/notifier/notifier.go | 11 ++ syncapi/storage/interface.go | 8 ++ .../postgres/notification_data_table.go | 40 +++++++ syncapi/storage/postgres/syncserver.go | 5 + .../storage/shared/notification_data_table.go | 94 +++++++++++++++ syncapi/storage/shared/syncserver.go | 21 ++++ .../sqlite3/notification_data_table.go | 40 +++++++ syncapi/storage/sqlite3/syncserver.go | 5 + syncapi/storage/tables/interface.go | 7 ++ syncapi/streams/stream_notificationdata.go | 55 +++++++++ syncapi/streams/streams.go | 34 +++--- syncapi/sync/requestpool.go | 9 +- syncapi/syncapi.go | 7 ++ syncapi/types/types.go | 44 ++++--- syncapi/types/types_test.go | 12 +- 18 files changed, 483 insertions(+), 37 deletions(-) create mode 100644 syncapi/consumers/pushserver.go create mode 100644 syncapi/storage/postgres/notification_data_table.go create mode 100644 syncapi/storage/shared/notification_data_table.go create mode 100644 syncapi/storage/sqlite3/notification_data_table.go create mode 100644 syncapi/streams/stream_notificationdata.go diff --git a/internal/eventutil/types.go b/internal/eventutil/types.go index 6d119ce6d..d0405a515 100644 --- a/internal/eventutil/types.go +++ b/internal/eventutil/types.go @@ -30,6 +30,22 @@ type AccountData struct { Type string `json:"type"` } +// NotificationData contains statistics about notifications, sent from +// the Push Server to the Sync API server. +type NotificationData struct { + // RoomID identifies the scope of the statistics, together with + // MXID (which is encoded in the Kafka key). + RoomID string `json:"room_id"` + + // HighlightCount is the number of unread notifications with the + // highlight tweak. + UnreadHighlightCount int `json:"unread_highlight_count"` + + // UnreadNotificationCount is the total number of unread + // notifications. + UnreadNotificationCount int `json:"unread_notification_count"` +} + // ProfileResponse is a struct containing all known user profile data type ProfileResponse struct { AvatarURL string `json:"avatar_url"` diff --git a/setup/config/config_kafka.go b/setup/config/config_kafka.go index 5a61f17eb..52257a16c 100644 --- a/setup/config/config_kafka.go +++ b/setup/config/config_kafka.go @@ -10,6 +10,7 @@ const ( TopicOutputRoomEvent = "OutputRoomEvent" TopicOutputClientData = "OutputClientData" TopicOutputReceiptEvent = "OutputReceiptEvent" + TopicOutputNotificationData = "OutputNotificationData" ) type Kafka struct { diff --git a/syncapi/consumers/pushserver.go b/syncapi/consumers/pushserver.go new file mode 100644 index 000000000..3b35dd4d7 --- /dev/null +++ b/syncapi/consumers/pushserver.go @@ -0,0 +1,111 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + + "github.com/Shopify/sarama" + "github.com/getsentry/sentry-go" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/eventutil" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" + log "github.com/sirupsen/logrus" +) + +// OutputNotificationDataConsumer consumes events that originated in +// the Push server. +type OutputNotificationDataConsumer struct { + consumer *internal.ContinualConsumer + db storage.Database + stream types.StreamProvider + notifier *notifier.Notifier +} + +// NewOutputNotificationDataConsumer creates a new consumer. Call +// Start() to begin consuming. +func NewOutputNotificationDataConsumer( + process *process.ProcessContext, + cfg *config.SyncAPI, + kafkaConsumer sarama.Consumer, + store storage.Database, + notifier *notifier.Notifier, + stream types.StreamProvider, +) *OutputNotificationDataConsumer { + consumer := internal.ContinualConsumer{ + Process: process, + ComponentName: "syncapi/pushserver", + Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputNotificationData)), + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &OutputNotificationDataConsumer{ + consumer: &consumer, + db: store, + notifier: notifier, + stream: stream, + } + consumer.ProcessMessage = s.onMessage + + return s +} + +// Start starts consumption. +func (s *OutputNotificationDataConsumer) Start() error { + return s.consumer.Start() +} + +// onMessage is called when the Sync server receives a new event from +// the push server. It is not safe for this function to be called from +// multiple goroutines, or else the sync stream position may race and +// be incorrectly calculated. +func (s *OutputNotificationDataConsumer) onMessage(msg *sarama.ConsumerMessage) error { + ctx := context.Background() + + userID := string(msg.Key) + + // Parse out the event JSON + var data eventutil.NotificationData + if err := json.Unmarshal(msg.Value, &data); err != nil { + sentry.CaptureException(err) + log.WithField("user_id", userID).WithError(err).Error("push server consumer: message parse failure") + return nil + } + + streamPos, err := s.db.UpsertRoomUnreadNotificationCounts(ctx, userID, data.RoomID, data.UnreadNotificationCount, data.UnreadHighlightCount) + if err != nil { + sentry.CaptureException(err) + log.WithFields(log.Fields{ + "user_id": userID, + "room_id": data.RoomID, + }).WithError(err).Panic("Could not save notification counts") + } + + s.stream.Advance(streamPos) + s.notifier.OnNewNotificationData(userID, types.StreamingToken{NotificationDataPosition: streamPos}) + + log.WithFields(log.Fields{ + "user_id": userID, + "room_id": data.RoomID, + "streamPos": streamPos, + }).Info("Received data from Push server") + + return nil +} diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go index d853cc0e4..6a641e6f8 100644 --- a/syncapi/notifier/notifier.go +++ b/syncapi/notifier/notifier.go @@ -217,6 +217,17 @@ func (n *Notifier) OnNewInvite( n.wakeupUsers([]string{wakeUserID}, nil, n.currPos) } +func (n *Notifier) OnNewNotificationData( + userID string, + posUpdate types.StreamingToken, +) { + n.streamLock.Lock() + defer n.streamLock.Unlock() + + n.currPos.ApplyUpdates(posUpdate) + n.wakeupUsers([]string{userID}, nil, n.currPos) +} + // GetListener returns a UserStreamListener that can be used to wait for // updates for a user. Must be closed. // notify for anything before sincePos diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 9cff4cad1..596b55fe9 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -20,6 +20,7 @@ import ( eduAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -34,6 +35,7 @@ type Database interface { MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForAccountData(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForSendToDeviceMessages(ctx context.Context) (types.StreamPosition, error) + MaxStreamPositionForNotificationData(ctx context.Context) (types.StreamPosition, error) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) @@ -140,4 +142,10 @@ type Database interface { StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) // GetRoomReceipts gets all receipts for a given roomID GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) + + // UpsertRoomUnreadNotificationCounts updates the notification statistics about a (user, room) key. + UpsertRoomUnreadNotificationCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error) + + // GetUserUnreadNotificationCounts returns statistics per room a user is interested in. + GetUserUnreadNotificationCounts(ctx context.Context, userID string, from, to types.StreamPosition) (map[string]*eventutil.NotificationData, error) } diff --git a/syncapi/storage/postgres/notification_data_table.go b/syncapi/storage/postgres/notification_data_table.go new file mode 100644 index 000000000..4f2f0f3a5 --- /dev/null +++ b/syncapi/storage/postgres/notification_data_table.go @@ -0,0 +1,40 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "database/sql" + + "github.com/matrix-org/dendrite/syncapi/storage/shared" + "github.com/matrix-org/dendrite/syncapi/storage/tables" +) + +const notificationDataSchema = ` +CREATE TABLE IF NOT EXISTS syncapi_notification_data ( + id BIGSERIAL PRIMARY KEY, + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + notification_count BIGINT NOT NULL DEFAULT 0, + highlight_count BIGINT NOT NULL DEFAULT 0, + CONSTRAINT syncapi_notification_data_unique UNIQUE (user_id, room_id) +);` + +func NewPostgresNotificationDataTable(db *sql.DB) (tables.NotificationData, error) { + _, err := db.Exec(notificationDataSchema) + if err != nil { + return nil, err + } + return shared.NewNotificationDataTable(db) +} diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 6f4e7749d..60fe5b54d 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -90,6 +90,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e if err != nil { return nil, err } + notificationData, err := NewPostgresNotificationDataTable(d.db) + if err != nil { + return nil, err + } m := sqlutil.NewMigrations() deltas.LoadFixSequences(m) deltas.LoadRemoveSendToDeviceSentColumn(m) @@ -110,6 +114,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e SendToDevice: sendToDevice, Receipts: receipts, Memberships: memberships, + NotificationData: notificationData, } return &d, nil } diff --git a/syncapi/storage/shared/notification_data_table.go b/syncapi/storage/shared/notification_data_table.go new file mode 100644 index 000000000..8b9d341b3 --- /dev/null +++ b/syncapi/storage/shared/notification_data_table.go @@ -0,0 +1,94 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shared + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/eventutil" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/syncapi/storage/tables" + "github.com/matrix-org/dendrite/syncapi/types" +) + +type notificationDataStatements struct { + upsertRoomUnreadCounts *sql.Stmt + selectUserUnreadCounts *sql.Stmt + selectMaxID *sql.Stmt +} + +func NewNotificationDataTable(db *sql.DB) (tables.NotificationData, error) { + r := ¬ificationDataStatements{} + return r, sqlutil.StatementList{ + {&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL}, + {&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL}, + {&r.selectMaxID, selectMaxNotificationIDSQL}, + }.Prepare(db) +} + +const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_data + (user_id, room_id, notification_count, highlight_count) + VALUES ($1, $2, $3, $4) + ON CONFLICT (user_id, room_id) + DO UPDATE SET notification_count = $3, highlight_count = $4 + RETURNING id` + +func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) { + err = r.upsertRoomUnreadCounts.QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos) + return +} + +const selectUserUnreadNotificationCountsSQL = `SELECT + id, room_id, notification_count, highlight_count + FROM syncapi_notification_data + WHERE + user_id = $1 AND + id BETWEEN $2 + 1 AND $3` + +func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) { + rows, err := r.selectUserUnreadCounts.QueryContext(ctx, userID, fromExcl, toIncl) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectUserUnreadCounts: rows.close() failed") + + roomCounts := map[string]*eventutil.NotificationData{} + for rows.Next() { + var id types.StreamPosition + var roomID string + var notificationCount, highlightCount int + + if err = rows.Scan(&id, &roomID, ¬ificationCount, &highlightCount); err != nil { + return nil, err + } + + roomCounts[roomID] = &eventutil.NotificationData{ + RoomID: roomID, + UnreadNotificationCount: notificationCount, + UnreadHighlightCount: highlightCount, + } + } + return roomCounts, rows.Err() +} + +const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data` + +func (r *notificationDataStatements) SelectMaxID(ctx context.Context) (int64, error) { + var id int64 + err := r.selectMaxID.QueryRowContext(ctx).Scan(&id) + return id, err +} diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index e6c681832..ca87969e4 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -48,6 +48,7 @@ type Database struct { Filter tables.Filter Receipts tables.Receipts Memberships tables.Memberships + NotificationData tables.NotificationData } func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) { @@ -102,6 +103,14 @@ func (d *Database) MaxStreamPositionForAccountData(ctx context.Context) (types.S return types.StreamPosition(id), nil } +func (d *Database) MaxStreamPositionForNotificationData(ctx context.Context) (types.StreamPosition, error) { + id, err := d.NotificationData.SelectMaxID(ctx) + if err != nil { + return 0, fmt.Errorf("d.NotificationData.SelectMaxID: %w", err) + } + return types.StreamPosition(id), nil +} + func (d *Database) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) { return d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilterPart, excludeEventIDs) } @@ -955,3 +964,15 @@ func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, stream _, receipts, err := d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos) return receipts, err } + +func (d *Database) UpsertRoomUnreadNotificationCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) { + err = d.Writer.Do(nil, nil, func(_ *sql.Tx) error { + pos, err = d.NotificationData.UpsertRoomUnreadCounts(ctx, userID, roomID, notificationCount, highlightCount) + return err + }) + return +} + +func (d *Database) GetUserUnreadNotificationCounts(ctx context.Context, userID string, from, to types.StreamPosition) (map[string]*eventutil.NotificationData, error) { + return d.NotificationData.SelectUserUnreadCounts(ctx, userID, from, to) +} diff --git a/syncapi/storage/sqlite3/notification_data_table.go b/syncapi/storage/sqlite3/notification_data_table.go new file mode 100644 index 000000000..36292e23e --- /dev/null +++ b/syncapi/storage/sqlite3/notification_data_table.go @@ -0,0 +1,40 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "database/sql" + + "github.com/matrix-org/dendrite/syncapi/storage/shared" + "github.com/matrix-org/dendrite/syncapi/storage/tables" +) + +const notificationDataSchema = ` +CREATE TABLE IF NOT EXISTS syncapi_notification_data ( + id INTEGER PRIMARY KEY, + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + notification_count BIGINT NOT NULL DEFAULT 0, + highlight_count BIGINT NOT NULL DEFAULT 0, + CONSTRAINT syncapi_notifications_unique UNIQUE (user_id, room_id) +);` + +func NewSqliteNotificationDataTable(db *sql.DB) (tables.NotificationData, error) { + _, err := db.Exec(notificationDataSchema) + if err != nil { + return nil, err + } + return shared.NewNotificationDataTable(db) +} diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 706d43f81..f5ae9fdd7 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er if err != nil { return err } + notificationData, err := NewSqliteNotificationDataTable(d.db) + if err != nil { + return err + } m := sqlutil.NewMigrations() deltas.LoadFixSequences(m) deltas.LoadRemoveSendToDeviceSentColumn(m) @@ -120,6 +124,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er SendToDevice: sendToDevice, Receipts: receipts, Memberships: memberships, + NotificationData: notificationData, } return nil } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 028872716..f2fa7a254 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -19,6 +19,7 @@ import ( "database/sql" eduAPI "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" @@ -167,3 +168,9 @@ type Memberships interface { UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error SelectMembership(ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string) (eventID string, streamPos, topologyPos types.StreamPosition, err error) } + +type NotificationData interface { + UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error) + SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) + SelectMaxID(ctx context.Context) (int64, error) +} diff --git a/syncapi/streams/stream_notificationdata.go b/syncapi/streams/stream_notificationdata.go new file mode 100644 index 000000000..c2ad0823d --- /dev/null +++ b/syncapi/streams/stream_notificationdata.go @@ -0,0 +1,55 @@ +package streams + +import ( + "context" + + "github.com/matrix-org/dendrite/syncapi/types" +) + +type NotificationDataStreamProvider struct { + StreamProvider +} + +func (p *NotificationDataStreamProvider) Setup() { + p.StreamProvider.Setup() + + id, err := p.DB.MaxStreamPositionForNotificationData(context.Background()) + if err != nil { + panic(err) + } + p.latest = id +} + +func (p *NotificationDataStreamProvider) CompleteSync( + ctx context.Context, + req *types.SyncRequest, +) types.StreamPosition { + return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) +} + +func (p *NotificationDataStreamProvider) IncrementalSync( + ctx context.Context, + req *types.SyncRequest, + from, to types.StreamPosition, +) types.StreamPosition { + // We want counts for all possible rooms, so always start from zero. + countsByRoom, err := p.DB.GetUserUnreadNotificationCounts(ctx, req.Device.UserID, 0, to) + if err != nil { + req.Log.WithError(err).Error("GetUserUnreadNotificationCounts failed") + return from + } + + // We're merely decorating existing rooms. Note that the Join map + // values are not pointers. + for roomID, jr := range req.Response.Rooms.Join { + counts := countsByRoom[roomID] + if counts == nil { + continue + } + + jr.UnreadNotifications.HighlightCount = counts.UnreadHighlightCount + jr.UnreadNotifications.NotificationCount = counts.UnreadNotificationCount + req.Response.Rooms.Join[roomID] = jr + } + return to +} diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index ba4118df5..8a5f9c070 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -12,13 +12,14 @@ import ( ) type Streams struct { - PDUStreamProvider types.StreamProvider - TypingStreamProvider types.StreamProvider - ReceiptStreamProvider types.StreamProvider - InviteStreamProvider types.StreamProvider - SendToDeviceStreamProvider types.StreamProvider - AccountDataStreamProvider types.StreamProvider - DeviceListStreamProvider types.PartitionedStreamProvider + PDUStreamProvider types.StreamProvider + TypingStreamProvider types.StreamProvider + ReceiptStreamProvider types.StreamProvider + InviteStreamProvider types.StreamProvider + SendToDeviceStreamProvider types.StreamProvider + AccountDataStreamProvider types.StreamProvider + NotificationDataStreamProvider types.StreamProvider + DeviceListStreamProvider types.PartitionedStreamProvider } func NewSyncStreamProviders( @@ -47,6 +48,9 @@ func NewSyncStreamProviders( StreamProvider: StreamProvider{DB: d}, userAPI: userAPI, }, + NotificationDataStreamProvider: &NotificationDataStreamProvider{ + StreamProvider: StreamProvider{DB: d}, + }, DeviceListStreamProvider: &DeviceListStreamProvider{ PartitionedStreamProvider: PartitionedStreamProvider{DB: d}, rsAPI: rsAPI, @@ -60,6 +64,7 @@ func NewSyncStreamProviders( streams.InviteStreamProvider.Setup() streams.SendToDeviceStreamProvider.Setup() streams.AccountDataStreamProvider.Setup() + streams.NotificationDataStreamProvider.Setup() streams.DeviceListStreamProvider.Setup() return streams @@ -67,12 +72,13 @@ func NewSyncStreamProviders( func (s *Streams) Latest(ctx context.Context) types.StreamingToken { return types.StreamingToken{ - PDUPosition: s.PDUStreamProvider.LatestPosition(ctx), - TypingPosition: s.TypingStreamProvider.LatestPosition(ctx), - ReceiptPosition: s.PDUStreamProvider.LatestPosition(ctx), - InvitePosition: s.InviteStreamProvider.LatestPosition(ctx), - SendToDevicePosition: s.SendToDeviceStreamProvider.LatestPosition(ctx), - AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx), - DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx), + PDUPosition: s.PDUStreamProvider.LatestPosition(ctx), + TypingPosition: s.TypingStreamProvider.LatestPosition(ctx), + ReceiptPosition: s.PDUStreamProvider.LatestPosition(ctx), + InvitePosition: s.InviteStreamProvider.LatestPosition(ctx), + SendToDevicePosition: s.SendToDeviceStreamProvider.LatestPosition(ctx), + AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx), + NotificationDataPosition: s.NotificationDataStreamProvider.LatestPosition(ctx), + DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx), } } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index a45736106..43063321a 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -183,7 +183,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. currentPos.ApplyUpdates(userStreamListener.GetSyncPosition()) } } else { - syncReq.Log.Debugln("Responding to sync immediately") + syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync immediately") } if syncReq.Since.IsEmpty() { @@ -207,6 +207,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync( syncReq.Context, syncReq, ), + NotificationDataPosition: rp.streams.NotificationDataStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync( syncReq.Context, syncReq, ), @@ -238,6 +241,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. syncReq.Context, syncReq, syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition, ), + NotificationDataPosition: rp.streams.NotificationDataStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition, + ), DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync( syncReq.Context, syncReq, syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition, diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 84c7140ca..9a734d000 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -87,6 +87,13 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start client data consumer") } + notificationConsumer := consumers.NewOutputNotificationDataConsumer( + process, cfg, consumer, syncDB, notifier, streams.NotificationDataStreamProvider, + ) + if err = notificationConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start notification data consumer") + } + typingConsumer := consumers.NewOutputTypingEventConsumer( process, cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider, ) diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 44e718b38..1993be2d0 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -118,13 +118,14 @@ const ( ) type StreamingToken struct { - PDUPosition StreamPosition - TypingPosition StreamPosition - ReceiptPosition StreamPosition - SendToDevicePosition StreamPosition - InvitePosition StreamPosition - AccountDataPosition StreamPosition - DeviceListPosition LogPosition + PDUPosition StreamPosition + TypingPosition StreamPosition + ReceiptPosition StreamPosition + SendToDevicePosition StreamPosition + InvitePosition StreamPosition + AccountDataPosition StreamPosition + NotificationDataPosition StreamPosition + DeviceListPosition LogPosition } // This will be used as a fallback by json.Marshal. @@ -140,10 +141,11 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) { func (t StreamingToken) String() string { posStr := fmt.Sprintf( - "s%d_%d_%d_%d_%d_%d", + "s%d_%d_%d_%d_%d_%d_%d", t.PDUPosition, t.TypingPosition, t.ReceiptPosition, t.SendToDevicePosition, t.InvitePosition, t.AccountDataPosition, + t.NotificationDataPosition, ) if dl := t.DeviceListPosition; !dl.IsEmpty() { posStr += fmt.Sprintf(".dl-%d-%d", dl.Partition, dl.Offset) @@ -166,6 +168,8 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool { return true case t.AccountDataPosition > other.AccountDataPosition: return true + case t.NotificationDataPosition > other.NotificationDataPosition: + return true case t.DeviceListPosition.IsAfter(&other.DeviceListPosition): return true } @@ -173,7 +177,7 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool { } func (t *StreamingToken) IsEmpty() bool { - return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition == 0 && t.DeviceListPosition.IsEmpty() + return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.NotificationDataPosition == 0 && t.DeviceListPosition.IsEmpty() } // WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken. @@ -208,6 +212,9 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) { if other.AccountDataPosition > t.AccountDataPosition { t.AccountDataPosition = other.AccountDataPosition } + if other.NotificationDataPosition > t.NotificationDataPosition { + t.NotificationDataPosition = other.NotificationDataPosition + } if other.DeviceListPosition.IsAfter(&t.DeviceListPosition) { t.DeviceListPosition = other.DeviceListPosition } @@ -301,7 +308,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { } categories := strings.Split(tok[1:], ".") parts := strings.Split(categories[0], "_") - var positions [6]StreamPosition + var positions [7]StreamPosition for i, p := range parts { if i > len(positions) { break @@ -314,12 +321,13 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { positions[i] = StreamPosition(pos) } token = StreamingToken{ - PDUPosition: positions[0], - TypingPosition: positions[1], - ReceiptPosition: positions[2], - SendToDevicePosition: positions[3], - InvitePosition: positions[4], - AccountDataPosition: positions[5], + PDUPosition: positions[0], + TypingPosition: positions[1], + ReceiptPosition: positions[2], + SendToDevicePosition: positions[3], + InvitePosition: positions[4], + AccountDataPosition: positions[5], + NotificationDataPosition: positions[6], } // dl-0-1234 // $log_name-$partition-$offset @@ -430,6 +438,10 @@ type JoinResponse struct { AccountData struct { Events []gomatrixserverlib.ClientEvent `json:"events"` } `json:"account_data"` + UnreadNotifications struct { + HighlightCount int `json:"highlight_count"` + NotificationCount int `json:"notification_count"` + } `json:"unread_notifications"` } // NewJoinResponse creates an empty response with initialised arrays. diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go index 3e5777888..4712e9485 100644 --- a/syncapi/types/types_test.go +++ b/syncapi/types/types_test.go @@ -10,10 +10,10 @@ import ( func TestNewSyncTokenWithLogs(t *testing.T) { tests := map[string]*StreamingToken{ - "s4_0_0_0_0_0": { + "s4_0_0_0_0_0_0": { PDUPosition: 4, }, - "s4_0_0_0_0_0.dl-0-123": { + "s4_0_0_0_0_0_0.dl-0-123": { PDUPosition: 4, DeviceListPosition: LogPosition{ Partition: 0, @@ -42,10 +42,10 @@ func TestNewSyncTokenWithLogs(t *testing.T) { func TestSyncTokens(t *testing.T) { shouldPass := map[string]string{ - "s4_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, LogPosition{}}.String(), - "s3_1_0_0_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, 0, 0, LogPosition{1, 2}}.String(), - "s3_1_2_3_5_0": StreamingToken{3, 1, 2, 3, 5, 0, LogPosition{}}.String(), - "t3_1": TopologyToken{3, 1}.String(), + "s4_0_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, 0, LogPosition{}}.String(), + "s3_1_0_0_0_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, 0, 0, 0, LogPosition{1, 2}}.String(), + "s1_2_3_4_5_6_7": StreamingToken{1, 2, 3, 4, 5, 6, 7, LogPosition{}}.String(), + "t3_1": TopologyToken{3, 1}.String(), } for a, b := range shouldPass {