Output rooms.join.unread_notifications in /sync.

This is the read-side. Pushserver will be the write-side.
This commit is contained in:
Tommie Gannert 2021-10-28 11:34:08 +02:00
parent 0548a884ec
commit 54ece78a12
18 changed files with 483 additions and 37 deletions

View file

@ -30,6 +30,22 @@ type AccountData struct {
Type string `json:"type"` Type string `json:"type"`
} }
// NotificationData contains statistics about notifications, sent from
// the Push Server to the Sync API server.
type NotificationData struct {
// RoomID identifies the scope of the statistics, together with
// MXID (which is encoded in the Kafka key).
RoomID string `json:"room_id"`
// HighlightCount is the number of unread notifications with the
// highlight tweak.
UnreadHighlightCount int `json:"unread_highlight_count"`
// UnreadNotificationCount is the total number of unread
// notifications.
UnreadNotificationCount int `json:"unread_notification_count"`
}
// ProfileResponse is a struct containing all known user profile data // ProfileResponse is a struct containing all known user profile data
type ProfileResponse struct { type ProfileResponse struct {
AvatarURL string `json:"avatar_url"` AvatarURL string `json:"avatar_url"`

View file

@ -10,6 +10,7 @@ const (
TopicOutputRoomEvent = "OutputRoomEvent" TopicOutputRoomEvent = "OutputRoomEvent"
TopicOutputClientData = "OutputClientData" TopicOutputClientData = "OutputClientData"
TopicOutputReceiptEvent = "OutputReceiptEvent" TopicOutputReceiptEvent = "OutputReceiptEvent"
TopicOutputNotificationData = "OutputNotificationData"
) )
type Kafka struct { type Kafka struct {

View file

@ -0,0 +1,111 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumers
import (
"context"
"encoding/json"
"github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
)
// OutputNotificationDataConsumer consumes events that originated in
// the Push server.
type OutputNotificationDataConsumer struct {
consumer *internal.ContinualConsumer
db storage.Database
stream types.StreamProvider
notifier *notifier.Notifier
}
// NewOutputNotificationDataConsumer creates a new consumer. Call
// Start() to begin consuming.
func NewOutputNotificationDataConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputNotificationDataConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/pushserver",
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputNotificationData)),
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputNotificationDataConsumer{
consumer: &consumer,
db: store,
notifier: notifier,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
return s
}
// Start starts consumption.
func (s *OutputNotificationDataConsumer) Start() error {
return s.consumer.Start()
}
// onMessage is called when the Sync server receives a new event from
// the push server. It is not safe for this function to be called from
// multiple goroutines, or else the sync stream position may race and
// be incorrectly calculated.
func (s *OutputNotificationDataConsumer) onMessage(msg *sarama.ConsumerMessage) error {
ctx := context.Background()
userID := string(msg.Key)
// Parse out the event JSON
var data eventutil.NotificationData
if err := json.Unmarshal(msg.Value, &data); err != nil {
sentry.CaptureException(err)
log.WithField("user_id", userID).WithError(err).Error("push server consumer: message parse failure")
return nil
}
streamPos, err := s.db.UpsertRoomUnreadNotificationCounts(ctx, userID, data.RoomID, data.UnreadNotificationCount, data.UnreadHighlightCount)
if err != nil {
sentry.CaptureException(err)
log.WithFields(log.Fields{
"user_id": userID,
"room_id": data.RoomID,
}).WithError(err).Panic("Could not save notification counts")
}
s.stream.Advance(streamPos)
s.notifier.OnNewNotificationData(userID, types.StreamingToken{NotificationDataPosition: streamPos})
log.WithFields(log.Fields{
"user_id": userID,
"room_id": data.RoomID,
"streamPos": streamPos,
}).Info("Received data from Push server")
return nil
}

View file

@ -217,6 +217,17 @@ func (n *Notifier) OnNewInvite(
n.wakeupUsers([]string{wakeUserID}, nil, n.currPos) n.wakeupUsers([]string{wakeUserID}, nil, n.currPos)
} }
func (n *Notifier) OnNewNotificationData(
userID string,
posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.wakeupUsers([]string{userID}, nil, n.currPos)
}
// GetListener returns a UserStreamListener that can be used to wait for // GetListener returns a UserStreamListener that can be used to wait for
// updates for a user. Must be closed. // updates for a user. Must be closed.
// notify for anything before sincePos // notify for anything before sincePos

View file

@ -20,6 +20,7 @@ import (
eduAPI "github.com/matrix-org/dendrite/eduserver/api" eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
@ -34,6 +35,7 @@ type Database interface {
MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForAccountData(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForAccountData(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForSendToDeviceMessages(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForSendToDeviceMessages(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForNotificationData(ctx context.Context) (types.StreamPosition, error)
CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
@ -140,4 +142,10 @@ type Database interface {
StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
// GetRoomReceipts gets all receipts for a given roomID // GetRoomReceipts gets all receipts for a given roomID
GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error)
// UpsertRoomUnreadNotificationCounts updates the notification statistics about a (user, room) key.
UpsertRoomUnreadNotificationCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error)
// GetUserUnreadNotificationCounts returns statistics per room a user is interested in.
GetUserUnreadNotificationCounts(ctx context.Context, userID string, from, to types.StreamPosition) (map[string]*eventutil.NotificationData, error)
} }

View file

@ -0,0 +1,40 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"database/sql"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
)
const notificationDataSchema = `
CREATE TABLE IF NOT EXISTS syncapi_notification_data (
id BIGSERIAL PRIMARY KEY,
user_id TEXT NOT NULL,
room_id TEXT NOT NULL,
notification_count BIGINT NOT NULL DEFAULT 0,
highlight_count BIGINT NOT NULL DEFAULT 0,
CONSTRAINT syncapi_notification_data_unique UNIQUE (user_id, room_id)
);`
func NewPostgresNotificationDataTable(db *sql.DB) (tables.NotificationData, error) {
_, err := db.Exec(notificationDataSchema)
if err != nil {
return nil, err
}
return shared.NewNotificationDataTable(db)
}

View file

@ -90,6 +90,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
if err != nil { if err != nil {
return nil, err return nil, err
} }
notificationData, err := NewPostgresNotificationDataTable(d.db)
if err != nil {
return nil, err
}
m := sqlutil.NewMigrations() m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m) deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m) deltas.LoadRemoveSendToDeviceSentColumn(m)
@ -110,6 +114,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
SendToDevice: sendToDevice, SendToDevice: sendToDevice,
Receipts: receipts, Receipts: receipts,
Memberships: memberships, Memberships: memberships,
NotificationData: notificationData,
} }
return &d, nil return &d, nil
} }

View file

@ -0,0 +1,94 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package shared
import (
"context"
"database/sql"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
)
type notificationDataStatements struct {
upsertRoomUnreadCounts *sql.Stmt
selectUserUnreadCounts *sql.Stmt
selectMaxID *sql.Stmt
}
func NewNotificationDataTable(db *sql.DB) (tables.NotificationData, error) {
r := &notificationDataStatements{}
return r, sqlutil.StatementList{
{&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL},
{&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL},
{&r.selectMaxID, selectMaxNotificationIDSQL},
}.Prepare(db)
}
const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_data
(user_id, room_id, notification_count, highlight_count)
VALUES ($1, $2, $3, $4)
ON CONFLICT (user_id, room_id)
DO UPDATE SET notification_count = $3, highlight_count = $4
RETURNING id`
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
err = r.upsertRoomUnreadCounts.QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos)
return
}
const selectUserUnreadNotificationCountsSQL = `SELECT
id, room_id, notification_count, highlight_count
FROM syncapi_notification_data
WHERE
user_id = $1 AND
id BETWEEN $2 + 1 AND $3`
func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
rows, err := r.selectUserUnreadCounts.QueryContext(ctx, userID, fromExcl, toIncl)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectUserUnreadCounts: rows.close() failed")
roomCounts := map[string]*eventutil.NotificationData{}
for rows.Next() {
var id types.StreamPosition
var roomID string
var notificationCount, highlightCount int
if err = rows.Scan(&id, &roomID, &notificationCount, &highlightCount); err != nil {
return nil, err
}
roomCounts[roomID] = &eventutil.NotificationData{
RoomID: roomID,
UnreadNotificationCount: notificationCount,
UnreadHighlightCount: highlightCount,
}
}
return roomCounts, rows.Err()
}
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
func (r *notificationDataStatements) SelectMaxID(ctx context.Context) (int64, error) {
var id int64
err := r.selectMaxID.QueryRowContext(ctx).Scan(&id)
return id, err
}

View file

@ -48,6 +48,7 @@ type Database struct {
Filter tables.Filter Filter tables.Filter
Receipts tables.Receipts Receipts tables.Receipts
Memberships tables.Memberships Memberships tables.Memberships
NotificationData tables.NotificationData
} }
func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) { func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
@ -102,6 +103,14 @@ func (d *Database) MaxStreamPositionForAccountData(ctx context.Context) (types.S
return types.StreamPosition(id), nil return types.StreamPosition(id), nil
} }
func (d *Database) MaxStreamPositionForNotificationData(ctx context.Context) (types.StreamPosition, error) {
id, err := d.NotificationData.SelectMaxID(ctx)
if err != nil {
return 0, fmt.Errorf("d.NotificationData.SelectMaxID: %w", err)
}
return types.StreamPosition(id), nil
}
func (d *Database) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) { func (d *Database) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) {
return d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilterPart, excludeEventIDs) return d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilterPart, excludeEventIDs)
} }
@ -955,3 +964,15 @@ func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, stream
_, receipts, err := d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos) _, receipts, err := d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos)
return receipts, err return receipts, err
} }
func (d *Database) UpsertRoomUnreadNotificationCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
err = d.Writer.Do(nil, nil, func(_ *sql.Tx) error {
pos, err = d.NotificationData.UpsertRoomUnreadCounts(ctx, userID, roomID, notificationCount, highlightCount)
return err
})
return
}
func (d *Database) GetUserUnreadNotificationCounts(ctx context.Context, userID string, from, to types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
return d.NotificationData.SelectUserUnreadCounts(ctx, userID, from, to)
}

View file

@ -0,0 +1,40 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"database/sql"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
)
const notificationDataSchema = `
CREATE TABLE IF NOT EXISTS syncapi_notification_data (
id INTEGER PRIMARY KEY,
user_id TEXT NOT NULL,
room_id TEXT NOT NULL,
notification_count BIGINT NOT NULL DEFAULT 0,
highlight_count BIGINT NOT NULL DEFAULT 0,
CONSTRAINT syncapi_notifications_unique UNIQUE (user_id, room_id)
);`
func NewSqliteNotificationDataTable(db *sql.DB) (tables.NotificationData, error) {
_, err := db.Exec(notificationDataSchema)
if err != nil {
return nil, err
}
return shared.NewNotificationDataTable(db)
}

View file

@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
if err != nil { if err != nil {
return err return err
} }
notificationData, err := NewSqliteNotificationDataTable(d.db)
if err != nil {
return err
}
m := sqlutil.NewMigrations() m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m) deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m) deltas.LoadRemoveSendToDeviceSentColumn(m)
@ -120,6 +124,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
SendToDevice: sendToDevice, SendToDevice: sendToDevice,
Receipts: receipts, Receipts: receipts,
Memberships: memberships, Memberships: memberships,
NotificationData: notificationData,
} }
return nil return nil
} }

View file

@ -19,6 +19,7 @@ import (
"database/sql" "database/sql"
eduAPI "github.com/matrix-org/dendrite/eduserver/api" eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -167,3 +168,9 @@ type Memberships interface {
UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
SelectMembership(ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string) (eventID string, streamPos, topologyPos types.StreamPosition, err error) SelectMembership(ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string) (eventID string, streamPos, topologyPos types.StreamPosition, err error)
} }
type NotificationData interface {
UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error)
SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error)
SelectMaxID(ctx context.Context) (int64, error)
}

View file

@ -0,0 +1,55 @@
package streams
import (
"context"
"github.com/matrix-org/dendrite/syncapi/types"
)
type NotificationDataStreamProvider struct {
StreamProvider
}
func (p *NotificationDataStreamProvider) Setup() {
p.StreamProvider.Setup()
id, err := p.DB.MaxStreamPositionForNotificationData(context.Background())
if err != nil {
panic(err)
}
p.latest = id
}
func (p *NotificationDataStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
}
func (p *NotificationDataStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
// We want counts for all possible rooms, so always start from zero.
countsByRoom, err := p.DB.GetUserUnreadNotificationCounts(ctx, req.Device.UserID, 0, to)
if err != nil {
req.Log.WithError(err).Error("GetUserUnreadNotificationCounts failed")
return from
}
// We're merely decorating existing rooms. Note that the Join map
// values are not pointers.
for roomID, jr := range req.Response.Rooms.Join {
counts := countsByRoom[roomID]
if counts == nil {
continue
}
jr.UnreadNotifications.HighlightCount = counts.UnreadHighlightCount
jr.UnreadNotifications.NotificationCount = counts.UnreadNotificationCount
req.Response.Rooms.Join[roomID] = jr
}
return to
}

View file

@ -12,13 +12,14 @@ import (
) )
type Streams struct { type Streams struct {
PDUStreamProvider types.StreamProvider PDUStreamProvider types.StreamProvider
TypingStreamProvider types.StreamProvider TypingStreamProvider types.StreamProvider
ReceiptStreamProvider types.StreamProvider ReceiptStreamProvider types.StreamProvider
InviteStreamProvider types.StreamProvider InviteStreamProvider types.StreamProvider
SendToDeviceStreamProvider types.StreamProvider SendToDeviceStreamProvider types.StreamProvider
AccountDataStreamProvider types.StreamProvider AccountDataStreamProvider types.StreamProvider
DeviceListStreamProvider types.PartitionedStreamProvider NotificationDataStreamProvider types.StreamProvider
DeviceListStreamProvider types.PartitionedStreamProvider
} }
func NewSyncStreamProviders( func NewSyncStreamProviders(
@ -47,6 +48,9 @@ func NewSyncStreamProviders(
StreamProvider: StreamProvider{DB: d}, StreamProvider: StreamProvider{DB: d},
userAPI: userAPI, userAPI: userAPI,
}, },
NotificationDataStreamProvider: &NotificationDataStreamProvider{
StreamProvider: StreamProvider{DB: d},
},
DeviceListStreamProvider: &DeviceListStreamProvider{ DeviceListStreamProvider: &DeviceListStreamProvider{
PartitionedStreamProvider: PartitionedStreamProvider{DB: d}, PartitionedStreamProvider: PartitionedStreamProvider{DB: d},
rsAPI: rsAPI, rsAPI: rsAPI,
@ -60,6 +64,7 @@ func NewSyncStreamProviders(
streams.InviteStreamProvider.Setup() streams.InviteStreamProvider.Setup()
streams.SendToDeviceStreamProvider.Setup() streams.SendToDeviceStreamProvider.Setup()
streams.AccountDataStreamProvider.Setup() streams.AccountDataStreamProvider.Setup()
streams.NotificationDataStreamProvider.Setup()
streams.DeviceListStreamProvider.Setup() streams.DeviceListStreamProvider.Setup()
return streams return streams
@ -67,12 +72,13 @@ func NewSyncStreamProviders(
func (s *Streams) Latest(ctx context.Context) types.StreamingToken { func (s *Streams) Latest(ctx context.Context) types.StreamingToken {
return types.StreamingToken{ return types.StreamingToken{
PDUPosition: s.PDUStreamProvider.LatestPosition(ctx), PDUPosition: s.PDUStreamProvider.LatestPosition(ctx),
TypingPosition: s.TypingStreamProvider.LatestPosition(ctx), TypingPosition: s.TypingStreamProvider.LatestPosition(ctx),
ReceiptPosition: s.PDUStreamProvider.LatestPosition(ctx), ReceiptPosition: s.PDUStreamProvider.LatestPosition(ctx),
InvitePosition: s.InviteStreamProvider.LatestPosition(ctx), InvitePosition: s.InviteStreamProvider.LatestPosition(ctx),
SendToDevicePosition: s.SendToDeviceStreamProvider.LatestPosition(ctx), SendToDevicePosition: s.SendToDeviceStreamProvider.LatestPosition(ctx),
AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx), AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx),
DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx), NotificationDataPosition: s.NotificationDataStreamProvider.LatestPosition(ctx),
DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx),
} }
} }

View file

@ -183,7 +183,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
currentPos.ApplyUpdates(userStreamListener.GetSyncPosition()) currentPos.ApplyUpdates(userStreamListener.GetSyncPosition())
} }
} else { } else {
syncReq.Log.Debugln("Responding to sync immediately") syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync immediately")
} }
if syncReq.Since.IsEmpty() { if syncReq.Since.IsEmpty() {
@ -207,6 +207,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync( AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync(
syncReq.Context, syncReq, syncReq.Context, syncReq,
), ),
NotificationDataPosition: rp.streams.NotificationDataStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync( DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
syncReq.Context, syncReq, syncReq.Context, syncReq,
), ),
@ -238,6 +241,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
syncReq.Context, syncReq, syncReq.Context, syncReq,
syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition, syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition,
), ),
NotificationDataPosition: rp.streams.NotificationDataStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition,
),
DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync( DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync(
syncReq.Context, syncReq, syncReq.Context, syncReq,
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition, syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,

View file

@ -87,6 +87,13 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start client data consumer") logrus.WithError(err).Panicf("failed to start client data consumer")
} }
notificationConsumer := consumers.NewOutputNotificationDataConsumer(
process, cfg, consumer, syncDB, notifier, streams.NotificationDataStreamProvider,
)
if err = notificationConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start notification data consumer")
}
typingConsumer := consumers.NewOutputTypingEventConsumer( typingConsumer := consumers.NewOutputTypingEventConsumer(
process, cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider, process, cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider,
) )

View file

@ -118,13 +118,14 @@ const (
) )
type StreamingToken struct { type StreamingToken struct {
PDUPosition StreamPosition PDUPosition StreamPosition
TypingPosition StreamPosition TypingPosition StreamPosition
ReceiptPosition StreamPosition ReceiptPosition StreamPosition
SendToDevicePosition StreamPosition SendToDevicePosition StreamPosition
InvitePosition StreamPosition InvitePosition StreamPosition
AccountDataPosition StreamPosition AccountDataPosition StreamPosition
DeviceListPosition LogPosition NotificationDataPosition StreamPosition
DeviceListPosition LogPosition
} }
// This will be used as a fallback by json.Marshal. // This will be used as a fallback by json.Marshal.
@ -140,10 +141,11 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) {
func (t StreamingToken) String() string { func (t StreamingToken) String() string {
posStr := fmt.Sprintf( posStr := fmt.Sprintf(
"s%d_%d_%d_%d_%d_%d", "s%d_%d_%d_%d_%d_%d_%d",
t.PDUPosition, t.TypingPosition, t.PDUPosition, t.TypingPosition,
t.ReceiptPosition, t.SendToDevicePosition, t.ReceiptPosition, t.SendToDevicePosition,
t.InvitePosition, t.AccountDataPosition, t.InvitePosition, t.AccountDataPosition,
t.NotificationDataPosition,
) )
if dl := t.DeviceListPosition; !dl.IsEmpty() { if dl := t.DeviceListPosition; !dl.IsEmpty() {
posStr += fmt.Sprintf(".dl-%d-%d", dl.Partition, dl.Offset) posStr += fmt.Sprintf(".dl-%d-%d", dl.Partition, dl.Offset)
@ -166,6 +168,8 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
return true return true
case t.AccountDataPosition > other.AccountDataPosition: case t.AccountDataPosition > other.AccountDataPosition:
return true return true
case t.NotificationDataPosition > other.NotificationDataPosition:
return true
case t.DeviceListPosition.IsAfter(&other.DeviceListPosition): case t.DeviceListPosition.IsAfter(&other.DeviceListPosition):
return true return true
} }
@ -173,7 +177,7 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
} }
func (t *StreamingToken) IsEmpty() bool { func (t *StreamingToken) IsEmpty() bool {
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition == 0 && t.DeviceListPosition.IsEmpty() return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.NotificationDataPosition == 0 && t.DeviceListPosition.IsEmpty()
} }
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken. // WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
@ -208,6 +212,9 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) {
if other.AccountDataPosition > t.AccountDataPosition { if other.AccountDataPosition > t.AccountDataPosition {
t.AccountDataPosition = other.AccountDataPosition t.AccountDataPosition = other.AccountDataPosition
} }
if other.NotificationDataPosition > t.NotificationDataPosition {
t.NotificationDataPosition = other.NotificationDataPosition
}
if other.DeviceListPosition.IsAfter(&t.DeviceListPosition) { if other.DeviceListPosition.IsAfter(&t.DeviceListPosition) {
t.DeviceListPosition = other.DeviceListPosition t.DeviceListPosition = other.DeviceListPosition
} }
@ -301,7 +308,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
} }
categories := strings.Split(tok[1:], ".") categories := strings.Split(tok[1:], ".")
parts := strings.Split(categories[0], "_") parts := strings.Split(categories[0], "_")
var positions [6]StreamPosition var positions [7]StreamPosition
for i, p := range parts { for i, p := range parts {
if i > len(positions) { if i > len(positions) {
break break
@ -314,12 +321,13 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
positions[i] = StreamPosition(pos) positions[i] = StreamPosition(pos)
} }
token = StreamingToken{ token = StreamingToken{
PDUPosition: positions[0], PDUPosition: positions[0],
TypingPosition: positions[1], TypingPosition: positions[1],
ReceiptPosition: positions[2], ReceiptPosition: positions[2],
SendToDevicePosition: positions[3], SendToDevicePosition: positions[3],
InvitePosition: positions[4], InvitePosition: positions[4],
AccountDataPosition: positions[5], AccountDataPosition: positions[5],
NotificationDataPosition: positions[6],
} }
// dl-0-1234 // dl-0-1234
// $log_name-$partition-$offset // $log_name-$partition-$offset
@ -430,6 +438,10 @@ type JoinResponse struct {
AccountData struct { AccountData struct {
Events []gomatrixserverlib.ClientEvent `json:"events"` Events []gomatrixserverlib.ClientEvent `json:"events"`
} `json:"account_data"` } `json:"account_data"`
UnreadNotifications struct {
HighlightCount int `json:"highlight_count"`
NotificationCount int `json:"notification_count"`
} `json:"unread_notifications"`
} }
// NewJoinResponse creates an empty response with initialised arrays. // NewJoinResponse creates an empty response with initialised arrays.

View file

@ -10,10 +10,10 @@ import (
func TestNewSyncTokenWithLogs(t *testing.T) { func TestNewSyncTokenWithLogs(t *testing.T) {
tests := map[string]*StreamingToken{ tests := map[string]*StreamingToken{
"s4_0_0_0_0_0": { "s4_0_0_0_0_0_0": {
PDUPosition: 4, PDUPosition: 4,
}, },
"s4_0_0_0_0_0.dl-0-123": { "s4_0_0_0_0_0_0.dl-0-123": {
PDUPosition: 4, PDUPosition: 4,
DeviceListPosition: LogPosition{ DeviceListPosition: LogPosition{
Partition: 0, Partition: 0,
@ -42,10 +42,10 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
func TestSyncTokens(t *testing.T) { func TestSyncTokens(t *testing.T) {
shouldPass := map[string]string{ shouldPass := map[string]string{
"s4_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, LogPosition{}}.String(), "s4_0_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, 0, LogPosition{}}.String(),
"s3_1_0_0_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, 0, 0, LogPosition{1, 2}}.String(), "s3_1_0_0_0_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, 0, 0, 0, LogPosition{1, 2}}.String(),
"s3_1_2_3_5_0": StreamingToken{3, 1, 2, 3, 5, 0, LogPosition{}}.String(), "s1_2_3_4_5_6_7": StreamingToken{1, 2, 3, 4, 5, 6, 7, LogPosition{}}.String(),
"t3_1": TopologyToken{3, 1}.String(), "t3_1": TopologyToken{3, 1}.String(),
} }
for a, b := range shouldPass { for a, b := range shouldPass {