User API will consume events and read/fully read markers from the sync API with stream positions, instead of consuming directly

This commit is contained in:
Neil Alexander 2022-03-03 11:20:49 +00:00
parent 4d71df84ff
commit 67ef8e33fa
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
31 changed files with 691 additions and 827 deletions

View file

@ -30,7 +30,7 @@ type SyncAPIProducer struct {
} }
// SendData sends account data to the sync API server // SendData sends account data to the sync API server
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error { func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON) error {
m := &nats.Msg{ m := &nats.Msg{
Subject: p.Topic, Subject: p.Topic,
Header: nats.Header{}, Header: nats.Header{},
@ -40,6 +40,7 @@ func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string
data := eventutil.AccountData{ data := eventutil.AccountData{
RoomID: roomID, RoomID: roomID,
Type: dataType, Type: dataType,
ReadMarker: readMarker,
} }
var err error var err error
m.Data, err = json.Marshal(data) m.Data, err = json.Marshal(data)

View file

@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/eventutil"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
@ -127,7 +128,7 @@ func SaveAccountData(
} }
// TODO: user API should do this since it's account data // TODO: user API should do this since it's account data
if err := syncProducer.SendData(userID, roomID, dataType); err != nil { if err := syncProducer.SendData(userID, roomID, dataType, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed") util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
@ -138,11 +139,6 @@ func SaveAccountData(
} }
} }
type readMarkerJSON struct {
FullyRead string `json:"m.fully_read"`
Read string `json:"m.read"`
}
type fullyReadEvent struct { type fullyReadEvent struct {
EventID string `json:"event_id"` EventID string `json:"event_id"`
} }
@ -159,7 +155,7 @@ func SaveReadMarker(
return *resErr return *resErr
} }
var r readMarkerJSON var r eventutil.ReadMarkerJSON
resErr = httputil.UnmarshalJSONRequest(req, &r) resErr = httputil.UnmarshalJSONRequest(req, &r)
if resErr != nil { if resErr != nil {
return *resErr return *resErr
@ -189,7 +185,7 @@ func SaveReadMarker(
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
if err := syncProducer.SendData(device.UserID, roomID, "m.fully_read"); err != nil { if err := syncProducer.SendData(device.UserID, roomID, "m.fully_read", &r); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed") util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }

View file

@ -98,7 +98,7 @@ func PutTag(
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
if err = syncProducer.SendData(userID, roomID, "m.tag"); err != nil { if err = syncProducer.SendData(userID, roomID, "m.tag", nil); err != nil {
logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi") logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi")
} }
@ -151,7 +151,7 @@ func DeleteTag(
} }
// TODO: user API should do this since it's account data // TODO: user API should do this since it's account data
if err := syncProducer.SendData(userID, roomID, "m.tag"); err != nil { if err := syncProducer.SendData(userID, roomID, "m.tag", nil); err != nil {
logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi") logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi")
} }

View file

@ -28,6 +28,12 @@ var ErrProfileNoExists = errors.New("no known profile for given user ID")
type AccountData struct { type AccountData struct {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
Type string `json:"type"` Type string `json:"type"`
ReadMarker *ReadMarkerJSON `json:"read_marker,omitempty"` // optional
}
type ReadMarkerJSON struct {
FullyRead string `json:"m.fully_read"`
Read string `json:"m.read"`
} }
// NotificationData contains statistics about notifications, sent from // NotificationData contains statistics about notifications, sent from

View file

@ -20,6 +20,8 @@ var (
OutputClientData = "OutputClientData" OutputClientData = "OutputClientData"
OutputNotificationData = "OutputNotificationData" OutputNotificationData = "OutputNotificationData"
OutputReceiptEvent = "OutputReceiptEvent" OutputReceiptEvent = "OutputReceiptEvent"
OutputStreamEvent = "OutputStreamEvent"
OutputReadUpdate = "OutputReadUpdate"
) )
var streams = []*nats.StreamConfig{ var streams = []*nats.StreamConfig{
@ -64,4 +66,14 @@ var streams = []*nats.StreamConfig{
Retention: nats.InterestPolicy, Retention: nats.InterestPolicy,
Storage: nats.FileStorage, Storage: nats.FileStorage,
}, },
{
Name: OutputStreamEvent,
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{
Name: OutputReadUpdate,
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
} }

View file

@ -17,6 +17,7 @@ package consumers
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
@ -24,9 +25,12 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -39,6 +43,8 @@ type OutputClientDataConsumer struct {
db storage.Database db storage.Database
stream types.StreamProvider stream types.StreamProvider
notifier *notifier.Notifier notifier *notifier.Notifier
serverName gomatrixserverlib.ServerName
producer *producers.UserAPIReadProducer
} }
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. // NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
@ -49,6 +55,7 @@ func NewOutputClientDataConsumer(
store storage.Database, store storage.Database,
notifier *notifier.Notifier, notifier *notifier.Notifier,
stream types.StreamProvider, stream types.StreamProvider,
producer *producers.UserAPIReadProducer,
) *OutputClientDataConsumer { ) *OutputClientDataConsumer {
return &OutputClientDataConsumer{ return &OutputClientDataConsumer{
ctx: process.Context(), ctx: process.Context(),
@ -58,6 +65,8 @@ func NewOutputClientDataConsumer(
db: store, db: store,
notifier: notifier, notifier: notifier,
stream: stream, stream: stream,
serverName: cfg.Matrix.ServerName,
producer: producer,
} }
} }
@ -100,8 +109,48 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg)
}).Panicf("could not save account data") }).Panicf("could not save account data")
} }
if err = s.sendReadUpdate(ctx, userID, output); err != nil {
log.WithError(err).WithFields(logrus.Fields{
"user_id": userID,
"room_id": output.RoomID,
}).Errorf("Failed to generate read update")
sentry.CaptureException(err)
return false
}
s.stream.Advance(streamPos) s.stream.Advance(streamPos)
s.notifier.OnNewAccountData(userID, types.StreamingToken{AccountDataPosition: streamPos}) s.notifier.OnNewAccountData(userID, types.StreamingToken{AccountDataPosition: streamPos})
return true return true
} }
func (s *OutputClientDataConsumer) sendReadUpdate(ctx context.Context, userID string, output eventutil.AccountData) error {
if output.Type != "m.fully_read" || output.ReadMarker == nil {
return nil
}
_, serverName, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
}
if serverName != s.serverName {
return nil
}
var readPos types.StreamPosition
var fullyReadPos types.StreamPosition
if output.ReadMarker.Read != "" {
if _, readPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.Read); err != nil {
return fmt.Errorf("s.db.PositionInTopology (Read): %w", err)
}
}
if output.ReadMarker.FullyRead != "" {
if _, fullyReadPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.FullyRead); err != nil {
return fmt.Errorf("s.db.PositionInTopology (FullyRead): %w", err)
}
}
if readPos > 0 || fullyReadPos > 0 {
if err := s.producer.SendReadUpdate(userID, output.RoomID, readPos, fullyReadPos); err != nil {
return fmt.Errorf("s.producer.SendReadUpdate: %w", err)
}
}
return nil
}

View file

@ -17,6 +17,7 @@ package consumers
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/api"
@ -24,9 +25,12 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -39,6 +43,8 @@ type OutputReceiptEventConsumer struct {
db storage.Database db storage.Database
stream types.StreamProvider stream types.StreamProvider
notifier *notifier.Notifier notifier *notifier.Notifier
serverName gomatrixserverlib.ServerName
producer *producers.UserAPIReadProducer
} }
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer. // NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
@ -50,6 +56,7 @@ func NewOutputReceiptEventConsumer(
store storage.Database, store storage.Database,
notifier *notifier.Notifier, notifier *notifier.Notifier,
stream types.StreamProvider, stream types.StreamProvider,
producer *producers.UserAPIReadProducer,
) *OutputReceiptEventConsumer { ) *OutputReceiptEventConsumer {
return &OutputReceiptEventConsumer{ return &OutputReceiptEventConsumer{
ctx: process.Context(), ctx: process.Context(),
@ -59,6 +66,8 @@ func NewOutputReceiptEventConsumer(
db: store, db: store,
notifier: notifier, notifier: notifier,
stream: stream, stream: stream,
serverName: cfg.Matrix.ServerName,
producer: producer,
} }
} }
@ -92,8 +101,42 @@ func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Ms
return true return true
} }
if err = s.sendReadUpdate(ctx, output); err != nil {
log.WithError(err).WithFields(logrus.Fields{
"user_id": output.UserID,
"room_id": output.RoomID,
}).Errorf("Failed to generate read update")
sentry.CaptureException(err)
return false
}
s.stream.Advance(streamPos) s.stream.Advance(streamPos)
s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos}) s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
return true return true
} }
func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output api.OutputReceiptEvent) error {
if output.Type != "m.read" {
return nil
}
_, serverName, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil {
return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
}
if serverName != s.serverName {
return nil
}
var readPos types.StreamPosition
if output.EventID != "" {
if _, readPos, err = s.db.PositionInTopology(ctx, output.EventID); err != nil {
return fmt.Errorf("s.db.PositionInTopology (Read): %w", err)
}
}
if readPos > 0 {
if err := s.producer.SendReadUpdate(output.UserID, output.RoomID, readPos, 0); err != nil {
return fmt.Errorf("s.producer.SendReadUpdate: %w", err)
}
}
return nil
}

View file

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -45,6 +46,7 @@ type OutputRoomEventConsumer struct {
pduStream types.StreamProvider pduStream types.StreamProvider
inviteStream types.StreamProvider inviteStream types.StreamProvider
notifier *notifier.Notifier notifier *notifier.Notifier
producer *producers.UserAPIStreamEventProducer
} }
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@ -57,6 +59,7 @@ func NewOutputRoomEventConsumer(
pduStream types.StreamProvider, pduStream types.StreamProvider,
inviteStream types.StreamProvider, inviteStream types.StreamProvider,
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
producer *producers.UserAPIStreamEventProducer,
) *OutputRoomEventConsumer { ) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{ return &OutputRoomEventConsumer{
ctx: process.Context(), ctx: process.Context(),
@ -69,6 +72,7 @@ func NewOutputRoomEventConsumer(
pduStream: pduStream, pduStream: pduStream,
inviteStream: inviteStream, inviteStream: inviteStream,
rsAPI: rsAPI, rsAPI: rsAPI,
producer: producer,
} }
} }
@ -194,6 +198,12 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return nil return nil
} }
if err = s.producer.SendStreamEvent(ev.RoomID(), ev, pduPos); err != nil {
log.WithError(err).Errorf("Failed to send stream output event for event %s", ev.EventID())
sentry.CaptureException(err)
return err
}
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil { if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos) log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
sentry.CaptureException(err) sentry.CaptureException(err)

View file

@ -83,7 +83,7 @@ func (s *OutputNotificationDataConsumer) onMessage(ctx context.Context, msg *nat
var data eventutil.NotificationData var data eventutil.NotificationData
if err := json.Unmarshal(msg.Data, &data); err != nil { if err := json.Unmarshal(msg.Data, &data); err != nil {
sentry.CaptureException(err) sentry.CaptureException(err)
log.WithField("user_id", userID).WithError(err).Error("push server consumer: message parse failure") log.WithField("user_id", userID).WithError(err).Error("user API consumer: message parse failure")
return true return true
} }
@ -104,7 +104,7 @@ func (s *OutputNotificationDataConsumer) onMessage(ctx context.Context, msg *nat
"user_id": userID, "user_id": userID,
"room_id": data.RoomID, "room_id": data.RoomID,
"streamPos": streamPos, "streamPos": streamPos,
}).Info("Received data from Push server") }).Trace("Received notification data from user API")
return true return true
} }

View file

@ -0,0 +1,62 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package producers
import (
"encoding/json"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// UserAPIProducer produces events for the user API server to consume
type UserAPIReadProducer struct {
Topic string
JetStream nats.JetStreamContext
}
// SendData sends account data to the user API server
func (p *UserAPIReadProducer) SendReadUpdate(userID, roomID string, readPos, fullyReadPos types.StreamPosition) error {
m := &nats.Msg{
Subject: p.Topic,
Header: nats.Header{},
}
m.Header.Set(jetstream.UserID, userID)
m.Header.Set(jetstream.RoomID, roomID)
data := types.ReadUpdate{
UserID: userID,
RoomID: roomID,
Read: readPos,
FullyRead: fullyReadPos,
}
var err error
m.Data, err = json.Marshal(data)
if err != nil {
return err
}
log.WithFields(log.Fields{
"user_id": userID,
"room_id": roomID,
"read_pos": readPos,
"fully_read_pos": fullyReadPos,
}).Tracef("Producing to topic '%s'", p.Topic)
_, err = p.JetStream.PublishMsg(m)
return err
}

View file

@ -0,0 +1,60 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package producers
import (
"encoding/json"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// UserAPIProducer produces events for the user API server to consume
type UserAPIStreamEventProducer struct {
Topic string
JetStream nats.JetStreamContext
}
// SendData sends account data to the user API server
func (p *UserAPIStreamEventProducer) SendStreamEvent(roomID string, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) error {
m := &nats.Msg{
Subject: p.Topic,
Header: nats.Header{},
}
m.Header.Set(jetstream.RoomID, roomID)
data := types.StreamedEvent{
Event: event,
StreamPosition: pos,
}
var err error
m.Data, err = json.Marshal(data)
if err != nil {
return err
}
log.WithFields(log.Fields{
"room_id": roomID,
"event_id": event.EventID(),
"event_type": event.Type(),
"stream_pos": pos,
}).Tracef("Producing to topic '%s'", p.Topic)
_, err = p.JetStream.PublishMsg(m)
return err
}

View file

@ -15,12 +15,35 @@
package postgres package postgres
import ( import (
"context"
"database/sql" "database/sql"
"github.com/matrix-org/dendrite/syncapi/storage/shared" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
) )
func NewPostgresNotificationDataTable(db *sql.DB) (tables.NotificationData, error) {
_, err := db.Exec(notificationDataSchema)
if err != nil {
return nil, err
}
r := &notificationDataStatements{}
return r, sqlutil.StatementList{
{&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL},
{&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL},
{&r.selectMaxID, selectMaxNotificationIDSQL},
}.Prepare(db)
}
type notificationDataStatements struct {
upsertRoomUnreadCounts *sql.Stmt
selectUserUnreadCounts *sql.Stmt
selectMaxID *sql.Stmt
}
const notificationDataSchema = ` const notificationDataSchema = `
CREATE TABLE IF NOT EXISTS syncapi_notification_data ( CREATE TABLE IF NOT EXISTS syncapi_notification_data (
id BIGSERIAL PRIMARY KEY, id BIGSERIAL PRIMARY KEY,
@ -31,10 +54,55 @@ CREATE TABLE IF NOT EXISTS syncapi_notification_data (
CONSTRAINT syncapi_notification_data_unique UNIQUE (user_id, room_id) CONSTRAINT syncapi_notification_data_unique UNIQUE (user_id, room_id)
);` );`
func NewPostgresNotificationDataTable(db *sql.DB) (tables.NotificationData, error) { const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_data
_, err := db.Exec(notificationDataSchema) (user_id, room_id, notification_count, highlight_count)
VALUES ($1, $2, $3, $4)
ON CONFLICT (user_id, room_id)
DO UPDATE SET notification_count = $3, highlight_count = $4
RETURNING id`
const selectUserUnreadNotificationCountsSQL = `SELECT
id, room_id, notification_count, highlight_count
FROM syncapi_notification_data
WHERE
user_id = $1 AND
id BETWEEN $2 + 1 AND $3`
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
err = r.upsertRoomUnreadCounts.QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos)
return
}
func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
rows, err := r.selectUserUnreadCounts.QueryContext(ctx, userID, fromExcl, toIncl)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return shared.NewNotificationDataTable(db) defer internal.CloseAndLogIfError(ctx, rows, "SelectUserUnreadCounts: rows.close() failed")
roomCounts := map[string]*eventutil.NotificationData{}
for rows.Next() {
var id types.StreamPosition
var roomID string
var notificationCount, highlightCount int
if err = rows.Scan(&id, &roomID, &notificationCount, &highlightCount); err != nil {
return nil, err
}
roomCounts[roomID] = &eventutil.NotificationData{
RoomID: roomID,
UnreadNotificationCount: notificationCount,
UnreadHighlightCount: highlightCount,
}
}
return roomCounts, rows.Err()
}
func (r *notificationDataStatements) SelectMaxID(ctx context.Context) (int64, error) {
var id int64
err := r.selectMaxID.QueryRowContext(ctx).Scan(&id)
return id, err
} }

View file

@ -1,94 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package shared
import (
"context"
"database/sql"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
)
type notificationDataStatements struct {
upsertRoomUnreadCounts *sql.Stmt
selectUserUnreadCounts *sql.Stmt
selectMaxID *sql.Stmt
}
func NewNotificationDataTable(db *sql.DB) (tables.NotificationData, error) {
r := &notificationDataStatements{}
return r, sqlutil.StatementList{
{&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL},
{&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL},
{&r.selectMaxID, selectMaxNotificationIDSQL},
}.Prepare(db)
}
const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_data
(user_id, room_id, notification_count, highlight_count)
VALUES ($1, $2, $3, $4)
ON CONFLICT (user_id, room_id)
DO UPDATE SET notification_count = $3, highlight_count = $4
RETURNING id`
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
err = r.upsertRoomUnreadCounts.QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos)
return
}
const selectUserUnreadNotificationCountsSQL = `SELECT
id, room_id, notification_count, highlight_count
FROM syncapi_notification_data
WHERE
user_id = $1 AND
id BETWEEN $2 + 1 AND $3`
func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
rows, err := r.selectUserUnreadCounts.QueryContext(ctx, userID, fromExcl, toIncl)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectUserUnreadCounts: rows.close() failed")
roomCounts := map[string]*eventutil.NotificationData{}
for rows.Next() {
var id types.StreamPosition
var roomID string
var notificationCount, highlightCount int
if err = rows.Scan(&id, &roomID, &notificationCount, &highlightCount); err != nil {
return nil, err
}
roomCounts[roomID] = &eventutil.NotificationData{
RoomID: roomID,
UnreadNotificationCount: notificationCount,
UnreadHighlightCount: highlightCount,
}
}
return roomCounts, rows.Err()
}
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
func (r *notificationDataStatements) SelectMaxID(ctx context.Context) (int64, error) {
var id int64
err := r.selectMaxID.QueryRowContext(ctx).Scan(&id)
return id, err
}

View file

@ -15,12 +15,35 @@
package sqlite3 package sqlite3
import ( import (
"context"
"database/sql" "database/sql"
"github.com/matrix-org/dendrite/syncapi/storage/shared" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
) )
func NewSqliteNotificationDataTable(db *sql.DB) (tables.NotificationData, error) {
_, err := db.Exec(notificationDataSchema)
if err != nil {
return nil, err
}
r := &notificationDataStatements{}
return r, sqlutil.StatementList{
{&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL},
{&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL},
{&r.selectMaxID, selectMaxNotificationIDSQL},
}.Prepare(db)
}
type notificationDataStatements struct {
upsertRoomUnreadCounts *sql.Stmt
selectUserUnreadCounts *sql.Stmt
selectMaxID *sql.Stmt
}
const notificationDataSchema = ` const notificationDataSchema = `
CREATE TABLE IF NOT EXISTS syncapi_notification_data ( CREATE TABLE IF NOT EXISTS syncapi_notification_data (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
@ -31,10 +54,55 @@ CREATE TABLE IF NOT EXISTS syncapi_notification_data (
CONSTRAINT syncapi_notifications_unique UNIQUE (user_id, room_id) CONSTRAINT syncapi_notifications_unique UNIQUE (user_id, room_id)
);` );`
func NewSqliteNotificationDataTable(db *sql.DB) (tables.NotificationData, error) { const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_data
_, err := db.Exec(notificationDataSchema) (user_id, room_id, notification_count, highlight_count)
VALUES ($1, $2, $3, $4)
ON CONFLICT (user_id, room_id)
DO UPDATE SET notification_count = $3, highlight_count = $4
RETURNING id`
const selectUserUnreadNotificationCountsSQL = `SELECT
id, room_id, notification_count, highlight_count
FROM syncapi_notification_data
WHERE
user_id = $1 AND
id BETWEEN $2 + 1 AND $3`
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
err = r.upsertRoomUnreadCounts.QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos)
return
}
func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
rows, err := r.selectUserUnreadCounts.QueryContext(ctx, userID, fromExcl, toIncl)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return shared.NewNotificationDataTable(db) defer internal.CloseAndLogIfError(ctx, rows, "SelectUserUnreadCounts: rows.close() failed")
roomCounts := map[string]*eventutil.NotificationData{}
for rows.Next() {
var id types.StreamPosition
var roomID string
var notificationCount, highlightCount int
if err = rows.Scan(&id, &roomID, &notificationCount, &highlightCount); err != nil {
return nil, err
}
roomCounts[roomID] = &eventutil.NotificationData{
RoomID: roomID,
UnreadNotificationCount: notificationCount,
UnreadHighlightCount: highlightCount,
}
}
return roomCounts, rows.Err()
}
func (r *notificationDataStatements) SelectMaxID(ctx context.Context) (int64, error) {
var id int64
err := r.selectMaxID.QueryRowContext(ctx).Scan(&id)
return id, err
} }

View file

@ -62,16 +62,19 @@ const selectEventsSQL = "" +
const selectRecentEventsSQL = "" + const selectRecentEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" " WHERE room_id = $1 AND id > $2 AND id <= $3"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectRecentEventsForSyncSQL = "" + const selectRecentEventsForSyncSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectEarlyEventsSQL = "" + const selectEarlyEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" " WHERE room_id = $1 AND id > $2 AND id <= $3"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectMaxEventIDSQL = "" + const selectMaxEventIDSQL = "" +
@ -85,6 +88,7 @@ const selectStateInRangeSQL = "" +
" FROM syncapi_output_room_events" + " FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2)" + " WHERE (id > $1 AND id <= $2)" +
" AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))" " AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const deleteEventsForRoomSQL = "" + const deleteEventsForRoomSQL = "" +
@ -95,10 +99,12 @@ const selectContextEventSQL = "" +
const selectContextBeforeEventSQL = "" + const selectContextBeforeEventSQL = "" +
"SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" "SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectContextAfterEventSQL = "" + const selectContextAfterEventSQL = "" +
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
type outputRoomEventsStatements struct { type outputRoomEventsStatements struct {

View file

@ -33,7 +33,7 @@ func (p *NotificationDataStreamProvider) IncrementalSync(
from, to types.StreamPosition, from, to types.StreamPosition,
) types.StreamPosition { ) types.StreamPosition {
// We want counts for all possible rooms, so always start from zero. // We want counts for all possible rooms, so always start from zero.
countsByRoom, err := p.DB.GetUserUnreadNotificationCounts(ctx, req.Device.UserID, 0, to) countsByRoom, err := p.DB.GetUserUnreadNotificationCounts(ctx, req.Device.UserID, from, to)
if err != nil { if err != nil {
req.Log.WithError(err).Error("GetUserUnreadNotificationCounts failed") req.Log.WithError(err).Error("GetUserUnreadNotificationCounts failed")
return from return from

View file

@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/consumers" "github.com/matrix-org/dendrite/syncapi/consumers"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/routing" "github.com/matrix-org/dendrite/syncapi/routing"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/streams"
@ -64,6 +65,18 @@ func AddPublicRoutes(
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier) requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{
JetStream: js,
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent),
}
userAPIReadUpdateProducer := &producers.UserAPIReadProducer{
JetStream: js,
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate),
}
_ = userAPIReadUpdateProducer
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
process, cfg, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), process, cfg, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
js, keyAPI, rsAPI, syncDB, notifier, js, keyAPI, rsAPI, syncDB, notifier,
@ -75,7 +88,7 @@ func AddPublicRoutes(
roomConsumer := consumers.NewOutputRoomEventConsumer( roomConsumer := consumers.NewOutputRoomEventConsumer(
process, cfg, js, syncDB, notifier, streams.PDUStreamProvider, process, cfg, js, syncDB, notifier, streams.PDUStreamProvider,
streams.InviteStreamProvider, rsAPI, streams.InviteStreamProvider, rsAPI, userAPIStreamEventProducer,
) )
if err = roomConsumer.Start(); err != nil { if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer") logrus.WithError(err).Panicf("failed to start room server consumer")
@ -83,6 +96,7 @@ func AddPublicRoutes(
clientConsumer := consumers.NewOutputClientDataConsumer( clientConsumer := consumers.NewOutputClientDataConsumer(
process, cfg, js, syncDB, notifier, streams.AccountDataStreamProvider, process, cfg, js, syncDB, notifier, streams.AccountDataStreamProvider,
userAPIReadUpdateProducer,
) )
if err = clientConsumer.Start(); err != nil { if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer") logrus.WithError(err).Panicf("failed to start client data consumer")
@ -111,6 +125,7 @@ func AddPublicRoutes(
receiptConsumer := consumers.NewOutputReceiptEventConsumer( receiptConsumer := consumers.NewOutputReceiptEventConsumer(
process, cfg, js, syncDB, notifier, streams.ReceiptStreamProvider, process, cfg, js, syncDB, notifier, streams.ReceiptStreamProvider,
userAPIReadUpdateProducer,
) )
if err = receiptConsumer.Start(); err != nil { if err = receiptConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start receipts consumer") logrus.WithError(err).Panicf("failed to start receipts consumer")

View file

@ -474,3 +474,16 @@ type Peek struct {
New bool New bool
Deleted bool Deleted bool
} }
type ReadUpdate struct {
UserID string `json:"user_id"`
RoomID string `json:"room_id"`
Read StreamPosition `json:"read,omitempty"`
FullyRead StreamPosition `json:"fully_read,omitempty"`
}
// StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event.
type StreamedEvent struct {
Event *gomatrixserverlib.HeaderedEvent `json:"event"`
StreamPosition StreamPosition `json:"stream_position"`
}

View file

@ -30,3 +30,9 @@ Local device key changes appear in /keys/changes
Remove group category Remove group category
Remove group role Remove group role
# Flakey
AS-ghosted users can use rooms themselves
# Flakey, need additional investigation
Messages that notify from another user increment notification_count
Messages that highlight from another user increment unread highlight count

View file

@ -78,7 +78,7 @@ Room creation reports m.room.member to myself
Outbound federation rejects send_join responses with no m.room.create event Outbound federation rejects send_join responses with no m.room.create event
Outbound federation rejects m.room.create events with an unknown room version Outbound federation rejects m.room.create events with an unknown room version
Invited user can see room metadata Invited user can see room metadata
# Blacklisted because these tests call /r0/events which we don't implement # Blacklisted because these tests call /v3/events which we don't implement
# New room members see their own join event # New room members see their own join event
# Existing members see new members' join events # Existing members see new members' join events
setting 'm.room.power_levels' respects room powerlevel setting 'm.room.power_levels' respects room powerlevel
@ -257,7 +257,7 @@ Guest non-joined users cannot send messages to guest_access rooms if not joined
Real non-joined users cannot room initalSync for non-world_readable rooms Real non-joined users cannot room initalSync for non-world_readable rooms
Push rules come down in an initial /sync Push rules come down in an initial /sync
Regular users can add and delete aliases in the default room configuration Regular users can add and delete aliases in the default room configuration
GET /r0/capabilities is not public GET /v3/capabilities is not public
GET /joined_rooms lists newly-created room GET /joined_rooms lists newly-created room
/joined_rooms returns only joined rooms /joined_rooms returns only joined rooms
Message history can be paginated over federation Message history can be paginated over federation
@ -366,8 +366,8 @@ Outbound federation will ignore a missing event with bad JSON for room version 6
Server correctly handles transactions that break edu limits Server correctly handles transactions that break edu limits
Server rejects invalid JSON in a version 6 room Server rejects invalid JSON in a version 6 room
Can download without a file name over federation Can download without a file name over federation
POST /media/r0/upload can create an upload POST /media/v3/upload can create an upload
GET /media/r0/download can fetch the value again GET /media/v3/download can fetch the value again
Remote users can join room by alias Remote users can join room by alias
Alias creators can delete alias with no ops Alias creators can delete alias with no ops
Alias creators can delete canonical alias with no ops Alias creators can delete canonical alias with no ops
@ -603,7 +603,6 @@ Can add global push rule after an existing rule
Can delete a push rule Can delete a push rule
Can disable a push rule Can disable a push rule
Adding the same push rule twice is idempotent Adding the same push rule twice is idempotent
Messages that notify from another user increment notification_count
Can change the actions of default rules Can change the actions of default rules
Can change the actions of a user specified rule Can change the actions of a user specified rule
Adding a push rule wakes up an incremental /sync Adding a push rule wakes up an incremental /sync
@ -642,7 +641,6 @@ Test that rejected pushers are removed.
Notifications can be viewed with GET /notifications Notifications can be viewed with GET /notifications
Trying to add push rule with no scope fails with 400 Trying to add push rule with no scope fails with 400
Trying to add push rule with invalid scope fails with 400 Trying to add push rule with invalid scope fails with 400
Messages that highlight from another user increment unread highlight count
Forward extremities remain so even after the next events are populated as outliers Forward extremities remain so even after the next events are populated as outliers
If a device list update goes missing, the server resyncs on the next one If a device list update goes missing, the server resyncs on the next one
uploading self-signing key notifies over federation uploading self-signing key notifies over federation
@ -662,4 +660,4 @@ registration accepts non-ascii passwords
registration with inhibit_login inhibits login registration with inhibit_login inhibits login
The operation must be consistent through an interactive authentication session The operation must be consistent through an interactive authentication session
Multiple calls to /sync should not cause 500 errors Multiple calls to /sync should not cause 500 errors
/context/ with lazy_load_members filter works

View file

@ -1,191 +0,0 @@
package consumers
import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
uapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/producers"
"github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/dendrite/userapi/util"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
type OutputClientDataConsumer struct {
ctx context.Context
cfg *config.UserAPI
jetstream nats.JetStreamContext
durable string
db storage.Database
pgClient pushgateway.Client
ServerName gomatrixserverlib.ServerName
topic string
userAPI uapi.UserInternalAPI
syncProducer *producers.SyncAPI
}
func NewOutputClientDataConsumer(
process *process.ProcessContext,
cfg *config.UserAPI,
js nats.JetStreamContext,
store storage.Database,
pgClient pushgateway.Client,
userAPI uapi.UserInternalAPI,
syncProducer *producers.SyncAPI,
) *OutputClientDataConsumer {
return &OutputClientDataConsumer{
ctx: process.Context(),
cfg: cfg,
jetstream: js,
db: store,
ServerName: cfg.Matrix.ServerName,
durable: cfg.Matrix.JetStream.Durable("UserAPIClientAPIConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
pgClient: pgClient,
userAPI: userAPI,
syncProducer: syncProducer,
}
}
func (s *OutputClientDataConsumer) Start() error {
if err := jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}
return nil
}
func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var event eventutil.AccountData
if err := json.Unmarshal(msg.Data, &event); err != nil {
log.WithError(err).Error("pushserver clientapi consumer: message parse failure")
return true
}
if event.Type != mFullyRead {
return true
}
userID := string(msg.Header.Get("user_id"))
localpart, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
log.WithFields(log.Fields{
"user_id": userID,
"room_id": event.RoomID,
"event_type": event.Type,
}).WithError(err).Error("pushserver clientapi consumer: SplitID failure")
return true
}
if domain != s.ServerName {
log.WithFields(log.Fields{
"user_id": userID,
"room_id": event.RoomID,
"event_type": event.Type,
}).Error("pushserver clientapi consumer: not a local user")
return true
}
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
"event_type": event.Type,
}).Tracef("Received message from clientapi: %#v", event)
userReq := uapi.QueryAccountDataRequest{
UserID: userID,
RoomID: event.RoomID,
DataType: mFullyRead,
}
var userRes uapi.QueryAccountDataResponse
if err = s.userAPI.QueryAccountData(ctx, &userReq, &userRes); err != nil {
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
"event_type": event.Type,
}).WithError(err).Error("pushserver clientapi consumer: failed to query account data")
return false
}
ad, ok := userRes.RoomAccountData[event.RoomID]
if !ok {
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
}).Errorf("pushserver clientapi consumer: room not found in account data response: %#v", userRes.RoomAccountData)
return true
}
bs, ok := ad[mFullyRead]
if !ok {
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
}).Errorf("pushserver clientapi consumer: m.fully_read not found in account data: %#v", ad)
return true
}
var data fullyReadAccountData
if err = json.Unmarshal([]byte(bs), &data); err != nil {
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
}).WithError(err).Error("pushserver clientapi consumer: json.Unmarshal of m.fully_read failed")
return true
}
// TODO: we cannot know if this EventID caused a notification, so
// we should first resolve it and find the closest earlier
// notification.
deleted, err := s.db.DeleteNotificationsUpTo(ctx, localpart, event.RoomID, data.EventID)
if err != nil {
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
"event_id": data.EventID,
}).WithError(err).Errorf("pushserver clientapi consumer: DeleteNotificationsUpTo failed")
return false
}
if deleted {
if err := util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil {
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
"event_id": data.EventID,
}).WithError(err).Error("pushserver clientapi consumer: NotifyUserCounts failed")
return false
}
if err := s.syncProducer.GetAndSendNotificationData(ctx, userID, event.RoomID); err != nil {
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
"event_id": data.EventID,
}).WithError(err).Errorf("pushserver clientapi consumer: GetAndSendNotificationData failed")
return false
}
}
return true
}
// mFullyRead is the account data type for the marker for the event up
// to which the user has read.
const mFullyRead = "m.fully_read"
// A fullyReadAccountData is what the m.fully_read account data value
// contains.
//
// TODO: this is duplicated with
// clientapi/routing/account_data.go. Should probably move to
// eventutil.
type fullyReadAccountData struct {
EventID string `json:"event_id"`
}

View file

@ -1,119 +0,0 @@
package consumers
import (
"context"
"encoding/json"
eduapi "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/userapi/producers"
"github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/dendrite/userapi/util"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
type OutputReceiptEventConsumer struct {
ctx context.Context
cfg *config.UserAPI
jetstream nats.JetStreamContext
durable string
db storage.Database
pgClient pushgateway.Client
receiptTopic string
syncProducer *producers.SyncAPI
}
// NewOutputReceiptEventConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers.
func NewOutputReceiptEventConsumer(
process *process.ProcessContext,
cfg *config.UserAPI,
js nats.JetStreamContext,
store storage.Database,
pgClient pushgateway.Client,
syncProducer *producers.SyncAPI,
) *OutputReceiptEventConsumer {
return &OutputReceiptEventConsumer{
ctx: process.Context(),
cfg: cfg,
jetstream: js,
db: store,
durable: cfg.Matrix.JetStream.Durable("UserAPIEDUServerConsumer"),
receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
pgClient: pgClient,
syncProducer: syncProducer,
}
}
func (s *OutputReceiptEventConsumer) Start() error {
if err := jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.receiptTopic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}
return nil
}
func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var event eduapi.OutputReceiptEvent
if err := json.Unmarshal(msg.Data, &event); err != nil {
log.WithError(err).Errorf("pushserver EDU consumer: message parse failure")
return true
}
localpart, domain, err := gomatrixserverlib.SplitID('@', event.UserID)
if err != nil {
return true
}
if domain != s.cfg.Matrix.ServerName {
return true
}
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
"event_id": event.EventID,
"event_type": event.Type,
}).Tracef("Received message from EDU server: %#v", event)
// TODO: we cannot know if this EventID caused a notification, so
// we should first resolve it and find the closest earlier
// notification.
updated, err := s.db.SetNotificationsRead(ctx, localpart, event.RoomID, event.EventID, true)
if err != nil {
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
"event_id": event.EventID,
}).WithError(err).Error("pushserver EDU consumer")
return false
}
if updated {
if err := s.syncProducer.GetAndSendNotificationData(ctx, event.UserID, event.RoomID); err != nil {
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
"event_id": event.EventID,
}).WithError(err).Error("pushserver EDU consumer: GetAndSendNotificationData failed")
return false
}
if err := util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil {
log.WithFields(log.Fields{
"localpart": localpart,
"room_id": event.RoomID,
"event_id": event.EventID,
}).WithError(err).Error("pushserver EDU consumer: NotifyUserCounts failed")
return false
}
}
return true
}

View file

@ -1,256 +0,0 @@
package consumers
import (
"context"
"encoding/json"
"sync"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/pushrules"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/producers"
"github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
)
const serverName = gomatrixserverlib.ServerName("example.org")
func TestOutputRoomEventConsumer(t *testing.T) {
t.SkipNow() // TODO: Come back to this test!
ctx := context.Background()
dbopts := &config.DatabaseOptions{
ConnectionString: "file::memory:",
MaxOpenConnections: 1,
MaxIdleConnections: 1,
}
db, err := storage.NewDatabase(dbopts, serverName, 5, 0, 0)
if err != nil {
t.Fatalf("NewDatabase failed: %v", err)
}
err = db.UpsertPusher(ctx,
api.Pusher{
PushKey: "apushkey",
Kind: api.HTTPKind,
AppID: "anappid",
Data: map[string]interface{}{
"url": "http://example.org/pusher/notify",
"extra": "someextra",
},
},
"alice")
if err != nil {
t.Fatalf("UpsertPusher failed: %v", err)
}
var rsAPI fakeRoomServerInternalAPI
var userAPI fakeUserInternalAPI
var messageSender fakeMessageSender
var wg sync.WaitGroup
wg.Add(1)
pgClient := fakePushGatewayClient{
WG: &wg,
}
s := &OutputRoomEventConsumer{
cfg: &config.UserAPI{
Matrix: &config.Global{
ServerName: serverName,
},
},
db: db,
rsAPI: &rsAPI,
userAPI: &userAPI,
pgClient: &pgClient,
syncProducer: producers.NewSyncAPI(db, &messageSender, "clientDataTopic", "notificationDataTopic"),
}
event, err := gomatrixserverlib.NewEventFromTrustedJSONWithEventID("$143273582443PhrSn:example.org", []byte(`{
"content": {
"body": "This is an example text message",
"format": "org.matrix.custom.html",
"formatted_body": "\u003cb\u003eThis is an example text message\u003c/b\u003e",
"msgtype": "m.text"
},
"origin_server_ts": 1432735824653,
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"sender": "@example:example.org",
"type": "m.room.message",
"unsigned": {
"age": 1234
}
}`), false, gomatrixserverlib.RoomVersionV7)
if err != nil {
t.Fatalf("NewEventFromTrustedJSON failed: %v", err)
}
ev := &gomatrixserverlib.HeaderedEvent{
Event: event,
}
if err := s.processMessage(ctx, ev); err != nil {
t.Fatalf("processMessage failed: %v", err)
}
t.Log("Waiting for backend calls to finish.")
wg.Wait()
if diff := cmp.Diff([]*rsapi.QueryMembershipsForRoomRequest{{JoinedOnly: true, RoomID: "!jEsUZKDJdhlrceRyVU:example.org"}}, rsAPI.MembershipReqs); diff != "" {
t.Errorf("rsAPI.QueryMembershipsForRoom Reqs: +got -want:\n%s", diff)
}
if diff := cmp.Diff([]*pushgateway.NotifyRequest{{
Notification: pushgateway.Notification{
Type: "m.room.message",
Content: event.Content(),
Counts: &pushgateway.Counts{
Unread: 1,
},
Devices: []*pushgateway.Device{{
AppID: "anappid",
PushKey: "apushkey",
Data: map[string]interface{}{
"extra": "someextra",
},
}},
EventID: "$143273582443PhrSn:example.org",
ID: "$143273582443PhrSn:example.org",
RoomID: "!jEsUZKDJdhlrceRyVU:example.org",
RoomName: "aname",
Sender: "@example:example.org",
},
}}, pgClient.Reqs); diff != "" {
t.Errorf("pgClient.NotifyHTTP Reqs: +got -want:\n%s", diff)
}
msg := &nats.Msg{
Subject: "notificationDataTopic",
Header: nats.Header{},
Data: []byte(`{"room_id":"!jEsUZKDJdhlrceRyVU:example.org","unread_highlight_count":0,"unread_notification_count":1}`),
}
msg.Header.Set("user_id", "@alice:example.org")
if diff := cmp.Diff([]*nats.Msg{msg}, messageSender.Messages, cmpopts.IgnoreUnexported(nats.Msg{})); diff != "" {
t.Errorf("SendMessage Messages: +got -want:\n%s", diff)
}
}
type fakeRoomServerInternalAPI struct {
rsapi.RoomserverInternalAPI
MembershipReqs []*rsapi.QueryMembershipsForRoomRequest
}
func (s *fakeRoomServerInternalAPI) QueryCurrentState(
ctx context.Context,
req *rsapi.QueryCurrentStateRequest,
res *rsapi.QueryCurrentStateResponse,
) error {
*res = rsapi.QueryCurrentStateResponse{
StateEvents: map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent{
roomNameTuple: mustParseHeaderedEventJSON(`{
"_room_version": "7",
"content": {
"name": "aname"
},
"event_id": "$3957tyerfgewrf382:example.org",
"origin_server_ts": 1432735824652,
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"sender": "@example:example.org",
"state_key": "@alice:example.org",
"type": "m.room.name"
}`),
},
}
return nil
}
func (s *fakeRoomServerInternalAPI) QueryMembershipsForRoom(
ctx context.Context,
req *rsapi.QueryMembershipsForRoomRequest,
res *rsapi.QueryMembershipsForRoomResponse,
) error {
s.MembershipReqs = append(s.MembershipReqs, req)
*res = rsapi.QueryMembershipsForRoomResponse{
JoinEvents: []gomatrixserverlib.ClientEvent{
mustParseClientEventJSON(`{
"content": {
"avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
"displayname": "Alice Margatroid",
"membership": "join",
"reason": "Looking for support"
},
"event_id": "$3957tyerfgewrf384:example.org",
"origin_server_ts": 1432735824653,
"room_id": "!jEsUZKDJdhlrceRyVU:example.org",
"sender": "@example:example.org",
"state_key": "@alice:example.org",
"type": "m.room.member",
"unsigned": {
"age": 1234
}
}`),
},
}
return nil
}
type fakeUserInternalAPI struct {
api.UserInternalAPI
}
func (s *fakeUserInternalAPI) QueryPushRules(ctx context.Context, req *api.QueryPushRulesRequest, res *api.QueryPushRulesResponse) error {
localpart, _, err := gomatrixserverlib.SplitID('@', req.UserID)
if err != nil {
return err
}
res.RuleSets = pushrules.DefaultAccountRuleSets(localpart, "example.org")
return nil
}
type fakePushGatewayClient struct {
pushgateway.Client
WG *sync.WaitGroup
Reqs []*pushgateway.NotifyRequest
}
func (c *fakePushGatewayClient) Notify(ctx context.Context, url string, req *pushgateway.NotifyRequest, res *pushgateway.NotifyResponse) error {
c.Reqs = append(c.Reqs, req)
if c.WG != nil {
c.WG.Done()
}
*res = pushgateway.NotifyResponse{
Rejected: []string{
"apushkey",
},
}
return nil
}
func mustParseClientEventJSON(s string) gomatrixserverlib.ClientEvent {
var ev gomatrixserverlib.ClientEvent
if err := json.Unmarshal([]byte(s), &ev); err != nil {
panic(err)
}
return ev
}
func mustParseHeaderedEventJSON(s string) *gomatrixserverlib.HeaderedEvent {
var ev gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal([]byte(s), &ev); err != nil {
panic(err)
}
return &ev
}
type fakeMessageSender struct {
Messages []*nats.Msg
}
func (s *fakeMessageSender) PublishMsg(msg *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error) {
s.Messages = append(s.Messages, msg)
return nil, nil
}

View file

@ -0,0 +1,136 @@
package consumers
import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/types"
uapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/producers"
"github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/dendrite/userapi/util"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
type OutputReadUpdateConsumer struct {
ctx context.Context
cfg *config.UserAPI
jetstream nats.JetStreamContext
durable string
db storage.Database
pgClient pushgateway.Client
ServerName gomatrixserverlib.ServerName
topic string
userAPI uapi.UserInternalAPI
syncProducer *producers.SyncAPI
}
func NewOutputReadUpdateConsumer(
process *process.ProcessContext,
cfg *config.UserAPI,
js nats.JetStreamContext,
store storage.Database,
pgClient pushgateway.Client,
userAPI uapi.UserInternalAPI,
syncProducer *producers.SyncAPI,
) *OutputReadUpdateConsumer {
return &OutputReadUpdateConsumer{
ctx: process.Context(),
cfg: cfg,
jetstream: js,
db: store,
ServerName: cfg.Matrix.ServerName,
durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIReadUpdateConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate),
pgClient: pgClient,
userAPI: userAPI,
syncProducer: syncProducer,
}
}
func (s *OutputReadUpdateConsumer) Start() error {
if err := jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}
return nil
}
func (s *OutputReadUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var read types.ReadUpdate
if err := json.Unmarshal(msg.Data, &read); err != nil {
log.WithError(err).Error("userapi clientapi consumer: message parse failure")
return true
}
if read.FullyRead == 0 && read.Read == 0 {
return true
}
userID := string(msg.Header.Get(jetstream.UserID))
roomID := string(msg.Header.Get(jetstream.RoomID))
localpart, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
log.WithError(err).Error("userapi clientapi consumer: SplitID failure")
return true
}
if domain != s.ServerName {
log.Error("userapi clientapi consumer: not a local user")
return true
}
log := log.WithFields(log.Fields{
"room_id": roomID,
"user_id": userID,
})
log.Tracef("Received read update from sync API: %#v", read)
if read.Read > 0 {
updated, err := s.db.SetNotificationsRead(ctx, localpart, roomID, int64(read.Read), true)
if err != nil {
log.WithError(err).Error("userapi EDU consumer")
return false
}
if updated {
if err = s.syncProducer.GetAndSendNotificationData(ctx, userID, roomID); err != nil {
log.WithError(err).Error("userapi EDU consumer: GetAndSendNotificationData failed")
return false
}
if err = util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil {
log.WithError(err).Error("userapi EDU consumer: NotifyUserCounts failed")
return false
}
}
}
if read.FullyRead > 0 {
deleted, err := s.db.DeleteNotificationsUpTo(ctx, localpart, roomID, int64(read.FullyRead))
if err != nil {
log.WithError(err).Errorf("userapi clientapi consumer: DeleteNotificationsUpTo failed")
return false
}
if deleted {
if err := util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil {
log.WithError(err).Error("userapi clientapi consumer: NotifyUserCounts failed")
return false
}
if err := s.syncProducer.GetAndSendNotificationData(ctx, userID, read.RoomID); err != nil {
log.WithError(err).Errorf("userapi clientapi consumer: GetAndSendNotificationData failed")
return false
}
}
}
return true
}

View file

@ -14,6 +14,7 @@ import (
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/producers" "github.com/matrix-org/dendrite/userapi/producers"
"github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/dendrite/userapi/storage"
@ -24,7 +25,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
type OutputRoomEventConsumer struct { type OutputStreamEventConsumer struct {
ctx context.Context ctx context.Context
cfg *config.UserAPI cfg *config.UserAPI
userAPI api.UserInternalAPI userAPI api.UserInternalAPI
@ -37,7 +38,7 @@ type OutputRoomEventConsumer struct {
syncProducer *producers.SyncAPI syncProducer *producers.SyncAPI
} }
func NewOutputRoomEventConsumer( func NewOutputStreamEventConsumer(
process *process.ProcessContext, process *process.ProcessContext,
cfg *config.UserAPI, cfg *config.UserAPI,
js nats.JetStreamContext, js nats.JetStreamContext,
@ -46,14 +47,14 @@ func NewOutputRoomEventConsumer(
userAPI api.UserInternalAPI, userAPI api.UserInternalAPI,
rsAPI rsapi.RoomserverInternalAPI, rsAPI rsapi.RoomserverInternalAPI,
syncProducer *producers.SyncAPI, syncProducer *producers.SyncAPI,
) *OutputRoomEventConsumer { ) *OutputStreamEventConsumer {
return &OutputRoomEventConsumer{ return &OutputStreamEventConsumer{
ctx: process.Context(), ctx: process.Context(),
cfg: cfg, cfg: cfg,
jetstream: js, jetstream: js,
db: store, db: store,
durable: cfg.Matrix.JetStream.Durable("UserAPIRoomServerConsumer"), durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent),
pgClient: pgClient, pgClient: pgClient,
userAPI: userAPI, userAPI: userAPI,
rsAPI: rsAPI, rsAPI: rsAPI,
@ -61,7 +62,7 @@ func NewOutputRoomEventConsumer(
} }
} }
func (s *OutputRoomEventConsumer) Start() error { func (s *OutputStreamEventConsumer) Start() error {
if err := jetstream.JetStreamConsumer( if err := jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
nats.DeliverAll(), nats.ManualAck(), nats.DeliverAll(), nats.ManualAck(),
@ -71,48 +72,34 @@ func (s *OutputRoomEventConsumer) Start() error {
return nil return nil
} }
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (s *OutputStreamEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var output rsapi.OutputEvent var output types.StreamedEvent
output.Event = &gomatrixserverlib.HeaderedEvent{}
if err := json.Unmarshal(msg.Data, &output); err != nil { if err := json.Unmarshal(msg.Data, &output); err != nil {
log.WithError(err).Errorf("pushserver consumer: message parse failure") log.WithError(err).Errorf("userapi consumer: message parse failure")
return true
}
if output.Event.Event == nil {
log.Errorf("userapi consumer: expected event")
return true return true
} }
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_type": output.Type, "event_id": output.Event.EventID(),
}).Tracef("Received message from room server: %#v", output) "event_type": output.Event.Type(),
"stream_pos": output.StreamPosition,
}).Tracef("Received message from sync API: %#v", output)
switch output.Type { if err := s.processMessage(ctx, output.Event, int64(output.StreamPosition)); err != nil {
case rsapi.OutputTypeNewRoomEvent:
ev := output.NewRoomEvent.Event
if err := s.processMessage(ctx, output.NewRoomEvent.Event); err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": ev.EventID(), "event_id": output.Event.EventID(),
"event": string(ev.JSON()), }).WithError(err).Errorf("userapi consumer: process room event failure")
}).WithError(err).Errorf("pushserver consumer: process room event failure")
}
case rsapi.OutputTypeNewInviteEvent:
ev := output.NewInviteEvent.Event
if err := s.processMessage(ctx, output.NewInviteEvent.Event); err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"event": string(ev.JSON()),
}).WithError(err).Errorf("pushserver consumer: process invite event failure")
}
default:
// Ignore old events, peeks, so on.
} }
return true return true
} }
func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error { func (s *OutputStreamEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, pos int64) error {
log.WithFields(log.Fields{
"event_type": event.Type(),
}).Tracef("Received event from room server: %#v", event)
members, roomSize, err := s.localRoomMembers(ctx, event.RoomID()) members, roomSize, err := s.localRoomMembers(ctx, event.RoomID())
if err != nil { if err != nil {
return fmt.Errorf("s.localRoomMembers: %w", err) return fmt.Errorf("s.localRoomMembers: %w", err)
@ -152,7 +139,7 @@ func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gom
// removing it means we can send all notifications to // removing it means we can send all notifications to
// e.g. Element's Push gateway in one go. // e.g. Element's Push gateway in one go.
for _, mem := range members { for _, mem := range members {
if err := s.notifyLocal(ctx, event, mem, roomSize, roomName); err != nil { if err := s.notifyLocal(ctx, event, pos, mem, roomSize, roomName); err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"localpart": mem.Localpart, "localpart": mem.Localpart,
}).WithError(err).Debugf("Unable to push to local user") }).WithError(err).Debugf("Unable to push to local user")
@ -193,7 +180,7 @@ func newLocalMembership(event *gomatrixserverlib.ClientEvent) (*localMembership,
// localRoomMembers fetches the current local members of a room, and // localRoomMembers fetches the current local members of a room, and
// the total number of members. // the total number of members.
func (s *OutputRoomEventConsumer) localRoomMembers(ctx context.Context, roomID string) ([]*localMembership, int, error) { func (s *OutputStreamEventConsumer) localRoomMembers(ctx context.Context, roomID string) ([]*localMembership, int, error) {
req := &rsapi.QueryMembershipsForRoomRequest{ req := &rsapi.QueryMembershipsForRoomRequest{
RoomID: roomID, RoomID: roomID,
JoinedOnly: true, JoinedOnly: true,
@ -232,7 +219,7 @@ func (s *OutputRoomEventConsumer) localRoomMembers(ctx context.Context, roomID s
// looks it up in roomserver. If there is no name, // looks it up in roomserver. If there is no name,
// m.room.canonical_alias is consulted. Returns an empty string if the // m.room.canonical_alias is consulted. Returns an empty string if the
// room has no name. // room has no name.
func (s *OutputRoomEventConsumer) roomName(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) (string, error) { func (s *OutputStreamEventConsumer) roomName(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) (string, error) {
if event.Type() == gomatrixserverlib.MRoomName { if event.Type() == gomatrixserverlib.MRoomName {
name, err := unmarshalRoomName(event) name, err := unmarshalRoomName(event)
if err != nil { if err != nil {
@ -300,7 +287,7 @@ func unmarshalCanonicalAlias(event *gomatrixserverlib.HeaderedEvent) (string, er
} }
// notifyLocal finds the right push actions for a local user, given an event. // notifyLocal finds the right push actions for a local user, given an event.
func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, mem *localMembership, roomSize int, roomName string) error { func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, pos int64, mem *localMembership, roomSize int, roomName string) error {
actions, err := s.evaluatePushRules(ctx, event, mem, roomSize) actions, err := s.evaluatePushRules(ctx, event, mem, roomSize)
if err != nil { if err != nil {
return err return err
@ -338,7 +325,7 @@ func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *gomatr
RoomID: event.RoomID(), RoomID: event.RoomID(),
TS: gomatrixserverlib.AsTimestamp(time.Now()), TS: gomatrixserverlib.AsTimestamp(time.Now()),
} }
if err = s.db.InsertNotification(ctx, mem.Localpart, event.EventID(), tweaks, n); err != nil { if err = s.db.InsertNotification(ctx, mem.Localpart, event.EventID(), pos, tweaks, n); err != nil {
return err return err
} }
@ -409,7 +396,7 @@ func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *gomatr
// evaluatePushRules fetches and evaluates the push rules of a local // evaluatePushRules fetches and evaluates the push rules of a local
// user. Returns actions (including dont_notify). // user. Returns actions (including dont_notify).
func (s *OutputRoomEventConsumer) evaluatePushRules(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, mem *localMembership, roomSize int) ([]*pushrules.Action, error) { func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, mem *localMembership, roomSize int) ([]*pushrules.Action, error) {
if event.Sender() == mem.UserID { if event.Sender() == mem.UserID {
// SPEC: Homeservers MUST NOT notify the Push Gateway for // SPEC: Homeservers MUST NOT notify the Push Gateway for
// events that the user has sent themselves. // events that the user has sent themselves.
@ -488,7 +475,7 @@ func (rse *ruleSetEvalContext) HasPowerLevel(userID, levelKey string) (bool, err
// localPushDevices pushes to the configured devices of a local // localPushDevices pushes to the configured devices of a local
// user. The map keys are [url][format]. // user. The map keys are [url][format].
func (s *OutputRoomEventConsumer) localPushDevices(ctx context.Context, localpart string, tweaks map[string]interface{}) (map[string]map[string][]*pushgateway.Device, string, error) { func (s *OutputStreamEventConsumer) localPushDevices(ctx context.Context, localpart string, tweaks map[string]interface{}) (map[string]map[string][]*pushgateway.Device, string, error) {
pusherDevices, err := util.GetPushDevices(ctx, localpart, tweaks, s.db) pusherDevices, err := util.GetPushDevices(ctx, localpart, tweaks, s.db)
if err != nil { if err != nil {
return nil, "", err return nil, "", err
@ -512,7 +499,7 @@ func (s *OutputRoomEventConsumer) localPushDevices(ctx context.Context, localpar
} }
// notifyHTTP performs a notificatation to a Push Gateway. // notifyHTTP performs a notificatation to a Push Gateway.
func (s *OutputRoomEventConsumer) notifyHTTP(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, url, format string, devices []*pushgateway.Device, localpart, roomName string, userNumUnreadNotifs int) ([]*pushgateway.Device, error) { func (s *OutputStreamEventConsumer) notifyHTTP(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, url, format string, devices []*pushgateway.Device, localpart, roomName string, userNumUnreadNotifs int) ([]*pushgateway.Device, error) {
logger := log.WithFields(log.Fields{ logger := log.WithFields(log.Fields{
"event_id": event.EventID(), "event_id": event.EventID(),
"url": url, "url": url,
@ -556,12 +543,13 @@ func (s *OutputRoomEventConsumer) notifyHTTP(ctx context.Context, event *gomatri
} }
} }
logger.Debugf("Notifying HTTP push gateway") logger.Debugf("Notifying push gateway %s", url)
var res pushgateway.NotifyResponse var res pushgateway.NotifyResponse
if err := s.pgClient.Notify(ctx, url, &req, &res); err != nil { if err := s.pgClient.Notify(ctx, url, &req, &res); err != nil {
logger.WithError(err).Errorf("Failed to notify push gateway %s", url)
return nil, err return nil, err
} }
logger.WithField("num_rejected", len(res.Rejected)).Tracef("HTTP push gateway result") logger.WithField("num_rejected", len(res.Rejected)).Tracef("Push gateway result")
if len(res.Rejected) == 0 { if len(res.Rejected) == 0 {
return nil, nil return nil, nil
@ -583,12 +571,12 @@ func (s *OutputRoomEventConsumer) notifyHTTP(ctx context.Context, event *gomatri
} }
// deleteRejectedPushers deletes the pushers associated with the given devices. // deleteRejectedPushers deletes the pushers associated with the given devices.
func (s *OutputRoomEventConsumer) deleteRejectedPushers(ctx context.Context, devices []*pushgateway.Device, localpart string) { func (s *OutputStreamEventConsumer) deleteRejectedPushers(ctx context.Context, devices []*pushgateway.Device, localpart string) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"localpart": localpart, "localpart": localpart,
"app_id0": devices[0].AppID, "app_id0": devices[0].AppID,
"num_devices": len(devices), "num_devices": len(devices),
}).Infof("Deleting pushers rejected by the HTTP push gateway") }).Warnf("Deleting pushers rejected by the HTTP push gateway")
for _, d := range devices { for _, d := range devices {
if err := s.db.RemovePusher(ctx, d.AppID, d.PushKey, localpart); err != nil { if err := s.db.RemovePusher(ctx, d.AppID, d.PushKey, localpart); err != nil {

View file

@ -91,9 +91,9 @@ type Database interface {
// May return sql.ErrNoRows. // May return sql.ErrNoRows.
GetLoginTokenDataByToken(ctx context.Context, token string) (*api.LoginTokenData, error) GetLoginTokenDataByToken(ctx context.Context, token string) (*api.LoginTokenData, error)
InsertNotification(ctx context.Context, localpart, eventID string, tweaks map[string]interface{}, n *api.Notification) error InsertNotification(ctx context.Context, localpart, eventID string, pos int64, tweaks map[string]interface{}, n *api.Notification) error
DeleteNotificationsUpTo(ctx context.Context, localpart, roomID, upToEventID string) (affected bool, err error) DeleteNotificationsUpTo(ctx context.Context, localpart, roomID string, pos int64) (affected bool, err error)
SetNotificationsRead(ctx context.Context, localpart, roomID, upToEventID string, b bool) (affected bool, err error) SetNotificationsRead(ctx context.Context, localpart, roomID string, pos int64, b bool) (affected bool, err error)
GetNotifications(ctx context.Context, localpart string, fromID int64, limit int, filter tables.NotificationFilter) ([]*api.Notification, int64, error) GetNotifications(ctx context.Context, localpart string, fromID int64, limit int, filter tables.NotificationFilter) ([]*api.Notification, int64, error)
GetNotificationCount(ctx context.Context, localpart string, filter tables.NotificationFilter) (int64, error) GetNotificationCount(ctx context.Context, localpart string, filter tables.NotificationFilter) (int64, error)
GetRoomNotificationCounts(ctx context.Context, localpart, roomID string) (total int64, highlight int64, _ error) GetRoomNotificationCounts(ctx context.Context, localpart, roomID string) (total int64, highlight int64, _ error)

View file

@ -42,6 +42,7 @@ CREATE TABLE IF NOT EXISTS userapi_notifications (
localpart TEXT NOT NULL, localpart TEXT NOT NULL,
room_id TEXT NOT NULL, room_id TEXT NOT NULL,
event_id TEXT NOT NULL, event_id TEXT NOT NULL,
stream_pos BIGINT NOT NULL,
ts_ms BIGINT NOT NULL, ts_ms BIGINT NOT NULL,
highlight BOOLEAN NOT NULL, highlight BOOLEAN NOT NULL,
notification_json TEXT NOT NULL, notification_json TEXT NOT NULL,
@ -54,17 +55,13 @@ CREATE INDEX IF NOT EXISTS userapi_notification_localpart_id_idx ON userapi_noti
` `
const insertNotificationSQL = "" + const insertNotificationSQL = "" +
"INSERT INTO userapi_notifications (localpart, room_id, event_id, ts_ms, highlight, notification_json) VALUES ($1, $2, $3, $4, $5, $6)" "INSERT INTO userapi_notifications (localpart, room_id, event_id, stream_pos, ts_ms, highlight, notification_json) VALUES ($1, $2, $3, $4, $5, $6, $7)"
const deleteNotificationsUpToSQL = "" + const deleteNotificationsUpToSQL = "" +
"DELETE FROM userapi_notifications WHERE localpart = $1 AND room_id = $2 AND id <= (" + "DELETE FROM userapi_notifications WHERE localpart = $1 AND room_id = $2 AND stream_pos <= $3"
"SELECT MAX(id) FROM userapi_notifications WHERE localpart = $1 AND room_id = $2 AND event_id = $3" +
")"
const updateNotificationReadSQL = "" + const updateNotificationReadSQL = "" +
"UPDATE userapi_notifications SET read = $1 WHERE localpart = $2 AND room_id = $3 AND id <= (" + "UPDATE userapi_notifications SET read = $1 WHERE localpart = $2 AND room_id = $3 AND stream_pos <= $4 AND read <> $1"
"SELECT MAX(id) FROM userapi_notifications WHERE localpart = $2 AND room_id = $3 AND event_id = $4" +
") AND read <> $1"
const selectNotificationSQL = "" + const selectNotificationSQL = "" +
"SELECT id, room_id, ts_ms, read, notification_json FROM userapi_notifications WHERE localpart = $1 AND id > $2 AND (" + "SELECT id, room_id, ts_ms, read, notification_json FROM userapi_notifications WHERE localpart = $1 AND id > $2 AND (" +
@ -97,7 +94,7 @@ func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error)
} }
// Insert inserts a notification into the database. // Insert inserts a notification into the database.
func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, highlight bool, n *api.Notification) error { func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error {
roomID, tsMS := n.RoomID, n.TS roomID, tsMS := n.RoomID, n.TS
nn := *n nn := *n
// Clears out fields that have their own columns to (1) shrink the // Clears out fields that have their own columns to (1) shrink the
@ -108,13 +105,13 @@ func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, local
if err != nil { if err != nil {
return err return err
} }
_, err = sqlutil.TxStmt(txn, s.insertStmt).ExecContext(ctx, localpart, roomID, eventID, tsMS, highlight, string(bs)) _, err = sqlutil.TxStmt(txn, s.insertStmt).ExecContext(ctx, localpart, roomID, eventID, pos, tsMS, highlight, string(bs))
return err return err
} }
// DeleteUpTo deletes all previous notifications, up to and including the event. // DeleteUpTo deletes all previous notifications, up to and including the event.
func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string) (affected bool, _ error) { func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error) {
res, err := sqlutil.TxStmt(txn, s.deleteUpToStmt).ExecContext(ctx, localpart, roomID, eventID) res, err := sqlutil.TxStmt(txn, s.deleteUpToStmt).ExecContext(ctx, localpart, roomID, pos)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -122,13 +119,13 @@ func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, l
if err != nil { if err != nil {
return true, err return true, err
} }
log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "event_id": eventID}).Tracef("DeleteUpTo: %d rows affected", nrows) log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "stream_pos": pos}).Tracef("DeleteUpTo: %d rows affected", nrows)
return nrows > 0, nil return nrows > 0, nil
} }
// UpdateRead updates the "read" value for an event. // UpdateRead updates the "read" value for an event.
func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string, v bool) (affected bool, _ error) { func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error) {
res, err := sqlutil.TxStmt(txn, s.updateReadStmt).ExecContext(ctx, v, localpart, roomID, eventID) res, err := sqlutil.TxStmt(txn, s.updateReadStmt).ExecContext(ctx, v, localpart, roomID, pos)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -136,7 +133,7 @@ func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, l
if err != nil { if err != nil {
return true, err return true, err
} }
log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "event_id": eventID}).Tracef("UpdateRead: %d rows affected", nrows) log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "stream_pos": pos}).Tracef("UpdateRead: %d rows affected", nrows)
return nrows > 0, nil return nrows > 0, nil
} }

View file

@ -671,23 +671,23 @@ func (d *Database) GetLoginTokenDataByToken(ctx context.Context, token string) (
return d.LoginTokens.SelectLoginToken(ctx, token) return d.LoginTokens.SelectLoginToken(ctx, token)
} }
func (d *Database) InsertNotification(ctx context.Context, localpart, eventID string, tweaks map[string]interface{}, n *api.Notification) error { func (d *Database) InsertNotification(ctx context.Context, localpart, eventID string, pos int64, tweaks map[string]interface{}, n *api.Notification) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.Notifications.Insert(ctx, txn, localpart, eventID, pushrules.BoolTweakOr(tweaks, pushrules.HighlightTweak, false), n) return d.Notifications.Insert(ctx, txn, localpart, eventID, pos, pushrules.BoolTweakOr(tweaks, pushrules.HighlightTweak, false), n)
}) })
} }
func (d *Database) DeleteNotificationsUpTo(ctx context.Context, localpart, roomID, upToEventID string) (affected bool, err error) { func (d *Database) DeleteNotificationsUpTo(ctx context.Context, localpart, roomID string, pos int64) (affected bool, err error) {
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
affected, err = d.Notifications.DeleteUpTo(ctx, txn, localpart, roomID, upToEventID) affected, err = d.Notifications.DeleteUpTo(ctx, txn, localpart, roomID, pos)
return err return err
}) })
return return
} }
func (d *Database) SetNotificationsRead(ctx context.Context, localpart, roomID, upToEventID string, b bool) (affected bool, err error) { func (d *Database) SetNotificationsRead(ctx context.Context, localpart, roomID string, pos int64, b bool) (affected bool, err error) {
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
affected, err = d.Notifications.UpdateRead(ctx, txn, localpart, roomID, upToEventID, b) affected, err = d.Notifications.UpdateRead(ctx, txn, localpart, roomID, pos, b)
return err return err
}) })
return return

View file

@ -42,6 +42,7 @@ CREATE TABLE IF NOT EXISTS userapi_notifications (
localpart TEXT NOT NULL, localpart TEXT NOT NULL,
room_id TEXT NOT NULL, room_id TEXT NOT NULL,
event_id TEXT NOT NULL, event_id TEXT NOT NULL,
stream_pos BIGINT NOT NULL,
ts_ms BIGINT NOT NULL, ts_ms BIGINT NOT NULL,
highlight BOOLEAN NOT NULL, highlight BOOLEAN NOT NULL,
notification_json TEXT NOT NULL, notification_json TEXT NOT NULL,
@ -54,17 +55,13 @@ CREATE INDEX IF NOT EXISTS userapi_notification_localpart_id_idx ON userapi_noti
` `
const insertNotificationSQL = "" + const insertNotificationSQL = "" +
"INSERT INTO userapi_notifications (localpart, room_id, event_id, ts_ms, highlight, notification_json) VALUES ($1, $2, $3, $4, $5, $6)" "INSERT INTO userapi_notifications (localpart, room_id, event_id, stream_pos, ts_ms, highlight, notification_json) VALUES ($1, $2, $3, $4, $5, $6, $7)"
const deleteNotificationsUpToSQL = "" + const deleteNotificationsUpToSQL = "" +
"DELETE FROM userapi_notifications WHERE localpart = $1 AND room_id = $2 AND id <= (" + "DELETE FROM userapi_notifications WHERE localpart = $1 AND room_id = $2 AND stream_pos <= $3"
"SELECT MAX(id) FROM userapi_notifications WHERE localpart = $1 AND room_id = $2 AND event_id = $3" +
")"
const updateNotificationReadSQL = "" + const updateNotificationReadSQL = "" +
"UPDATE userapi_notifications SET read = $1 WHERE localpart = $2 AND room_id = $3 AND id <= (" + "UPDATE userapi_notifications SET read = $1 WHERE localpart = $2 AND room_id = $3 AND stream_pos <= $4 AND read <> $1"
"SELECT MAX(id) FROM userapi_notifications WHERE localpart = $2 AND room_id = $3 AND event_id = $4" +
") AND read <> $1"
const selectNotificationSQL = "" + const selectNotificationSQL = "" +
"SELECT id, room_id, ts_ms, read, notification_json FROM userapi_notifications WHERE localpart = $1 AND id > $2 AND (" + "SELECT id, room_id, ts_ms, read, notification_json FROM userapi_notifications WHERE localpart = $1 AND id > $2 AND (" +
@ -97,7 +94,7 @@ func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) {
} }
// Insert inserts a notification into the database. // Insert inserts a notification into the database.
func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, highlight bool, n *api.Notification) error { func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error {
roomID, tsMS := n.RoomID, n.TS roomID, tsMS := n.RoomID, n.TS
nn := *n nn := *n
// Clears out fields that have their own columns to (1) shrink the // Clears out fields that have their own columns to (1) shrink the
@ -108,13 +105,13 @@ func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, local
if err != nil { if err != nil {
return err return err
} }
_, err = sqlutil.TxStmt(txn, s.insertStmt).ExecContext(ctx, localpart, roomID, eventID, tsMS, highlight, string(bs)) _, err = sqlutil.TxStmt(txn, s.insertStmt).ExecContext(ctx, localpart, roomID, eventID, pos, tsMS, highlight, string(bs))
return err return err
} }
// DeleteUpTo deletes all previous notifications, up to and including the event. // DeleteUpTo deletes all previous notifications, up to and including the event.
func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string) (affected bool, _ error) { func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error) {
res, err := sqlutil.TxStmt(txn, s.deleteUpToStmt).ExecContext(ctx, localpart, roomID, eventID) res, err := sqlutil.TxStmt(txn, s.deleteUpToStmt).ExecContext(ctx, localpart, roomID, pos)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -122,13 +119,13 @@ func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, l
if err != nil { if err != nil {
return true, err return true, err
} }
log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "event_id": eventID}).Tracef("DeleteUpTo: %d rows affected", nrows) log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "stream_pos": pos}).Tracef("DeleteUpTo: %d rows affected", nrows)
return nrows > 0, nil return nrows > 0, nil
} }
// UpdateRead updates the "read" value for an event. // UpdateRead updates the "read" value for an event.
func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string, v bool) (affected bool, _ error) { func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error) {
res, err := sqlutil.TxStmt(txn, s.updateReadStmt).ExecContext(ctx, v, localpart, roomID, eventID) res, err := sqlutil.TxStmt(txn, s.updateReadStmt).ExecContext(ctx, v, localpart, roomID, pos)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -136,7 +133,7 @@ func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, l
if err != nil { if err != nil {
return true, err return true, err
} }
log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "event_id": eventID}).Tracef("UpdateRead: %d rows affected", nrows) log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "stream_pos": pos}).Tracef("UpdateRead: %d rows affected", nrows)
return nrows > 0, nil return nrows > 0, nil
} }

View file

@ -103,9 +103,9 @@ type PusherTable interface {
} }
type NotificationTable interface { type NotificationTable interface {
Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, highlight bool, n *api.Notification) error Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error
DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string) (affected bool, _ error) DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error)
UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string, v bool) (affected bool, _ error) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error)
Select(ctx context.Context, txn *sql.Tx, localpart string, fromID int64, limit int, filter NotificationFilter) ([]*api.Notification, int64, error) Select(ctx context.Context, txn *sql.Tx, localpart string, fromID int64, limit int, filter NotificationFilter) ([]*api.Notification, int64, error)
SelectCount(ctx context.Context, txn *sql.Tx, localpart string, filter NotificationFilter) (int64, error) SelectCount(ctx context.Context, txn *sql.Tx, localpart string, filter NotificationFilter) (int64, error)
SelectRoomCounts(ctx context.Context, txn *sql.Tx, localpart, roomID string) (total int64, highlight int64, _ error) SelectRoomCounts(ctx context.Context, txn *sql.Tx, localpart, roomID string) (total int64, highlight int64, _ error)

View file

@ -72,25 +72,18 @@ func NewInternalAPI(
DisableTLSValidation: cfg.PushGatewayDisableTLSValidation, DisableTLSValidation: cfg.PushGatewayDisableTLSValidation,
} }
caConsumer := consumers.NewOutputClientDataConsumer( readConsumer := consumers.NewOutputReadUpdateConsumer(
base.ProcessContext, cfg, js, db, pgClient, userAPI, syncProducer, base.ProcessContext, cfg, js, db, pgClient, userAPI, syncProducer,
) )
if err := caConsumer.Start(); err != nil { if err := readConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start user API clientapi consumer") logrus.WithError(err).Panic("failed to start user API read update consumer")
} }
eduConsumer := consumers.NewOutputReceiptEventConsumer( eventConsumer := consumers.NewOutputStreamEventConsumer(
base.ProcessContext, cfg, js, db, pgClient, syncProducer,
)
if err := eduConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start user API EDU consumer")
}
rsConsumer := consumers.NewOutputRoomEventConsumer(
base.ProcessContext, cfg, js, db, pgClient, userAPI, rsAPI, syncProducer, base.ProcessContext, cfg, js, db, pgClient, userAPI, rsAPI, syncProducer,
) )
if err := rsConsumer.Start(); err != nil { if err := eventConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start user API room server consumer") logrus.WithError(err).Panic("failed to start user API streamed event consumer")
} }
return userAPI return userAPI