diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go index 9b1d6b1a2..9ab90391d 100644 --- a/clientapi/producers/syncapi.go +++ b/clientapi/producers/syncapi.go @@ -30,7 +30,7 @@ type SyncAPIProducer struct { } // SendData sends account data to the sync API server -func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error { +func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON) error { m := &nats.Msg{ Subject: p.Topic, Header: nats.Header{}, @@ -38,8 +38,9 @@ func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string m.Header.Set(jetstream.UserID, userID) data := eventutil.AccountData{ - RoomID: roomID, - Type: dataType, + RoomID: roomID, + Type: dataType, + ReadMarker: readMarker, } var err error m.Data, err = json.Marshal(data) diff --git a/clientapi/routing/account_data.go b/clientapi/routing/account_data.go index 03025f1da..d8e982690 100644 --- a/clientapi/routing/account_data.go +++ b/clientapi/routing/account_data.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/internal/eventutil" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/userapi/api" @@ -127,7 +128,7 @@ func SaveAccountData( } // TODO: user API should do this since it's account data - if err := syncProducer.SendData(userID, roomID, dataType); err != nil { + if err := syncProducer.SendData(userID, roomID, dataType, nil); err != nil { util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed") return jsonerror.InternalServerError() } @@ -138,11 +139,6 @@ func SaveAccountData( } } -type readMarkerJSON struct { - FullyRead string `json:"m.fully_read"` - Read string `json:"m.read"` -} - type fullyReadEvent struct { EventID string `json:"event_id"` } @@ -159,7 +155,7 @@ func SaveReadMarker( return *resErr } - var r readMarkerJSON + var r eventutil.ReadMarkerJSON resErr = httputil.UnmarshalJSONRequest(req, &r) if resErr != nil { return *resErr @@ -189,7 +185,7 @@ func SaveReadMarker( return util.ErrorResponse(err) } - if err := syncProducer.SendData(device.UserID, roomID, "m.fully_read"); err != nil { + if err := syncProducer.SendData(device.UserID, roomID, "m.fully_read", &r); err != nil { util.GetLogger(req.Context()).WithError(err).Error("syncProducer.SendData failed") return jsonerror.InternalServerError() } diff --git a/clientapi/routing/room_tagging.go b/clientapi/routing/room_tagging.go index c683cc949..83294b180 100644 --- a/clientapi/routing/room_tagging.go +++ b/clientapi/routing/room_tagging.go @@ -98,7 +98,7 @@ func PutTag( return jsonerror.InternalServerError() } - if err = syncProducer.SendData(userID, roomID, "m.tag"); err != nil { + if err = syncProducer.SendData(userID, roomID, "m.tag", nil); err != nil { logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi") } @@ -151,7 +151,7 @@ func DeleteTag( } // TODO: user API should do this since it's account data - if err := syncProducer.SendData(userID, roomID, "m.tag"); err != nil { + if err := syncProducer.SendData(userID, roomID, "m.tag", nil); err != nil { logrus.WithError(err).Error("Failed to send m.tag account data update to syncapi") } diff --git a/internal/eventutil/types.go b/internal/eventutil/types.go index d0405a515..17861d6c5 100644 --- a/internal/eventutil/types.go +++ b/internal/eventutil/types.go @@ -26,8 +26,14 @@ var ErrProfileNoExists = errors.New("no known profile for given user ID") // AccountData represents account data sent from the client API server to the // sync API server type AccountData struct { - RoomID string `json:"room_id"` - Type string `json:"type"` + RoomID string `json:"room_id"` + Type string `json:"type"` + ReadMarker *ReadMarkerJSON `json:"read_marker,omitempty"` // optional +} + +type ReadMarkerJSON struct { + FullyRead string `json:"m.fully_read"` + Read string `json:"m.read"` } // NotificationData contains statistics about notifications, sent from diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 418e019f4..3f07488f9 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -20,6 +20,8 @@ var ( OutputClientData = "OutputClientData" OutputNotificationData = "OutputNotificationData" OutputReceiptEvent = "OutputReceiptEvent" + OutputStreamEvent = "OutputStreamEvent" + OutputReadUpdate = "OutputReadUpdate" ) var streams = []*nats.StreamConfig{ @@ -64,4 +66,14 @@ var streams = []*nats.StreamConfig{ Retention: nats.InterestPolicy, Storage: nats.FileStorage, }, + { + Name: OutputStreamEvent, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, + { + Name: OutputReadUpdate, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, } diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index c3650085f..f01afce6d 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -17,6 +17,7 @@ package consumers import ( "context" "encoding/json" + "fmt" "github.com/getsentry/sentry-go" "github.com/matrix-org/dendrite/internal/eventutil" @@ -24,21 +25,26 @@ import ( "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/producers" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) // OutputClientDataConsumer consumes events that originated in the client API server. type OutputClientDataConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - topic string - db storage.Database - stream types.StreamProvider - notifier *notifier.Notifier + ctx context.Context + jetstream nats.JetStreamContext + durable string + topic string + db storage.Database + stream types.StreamProvider + notifier *notifier.Notifier + serverName gomatrixserverlib.ServerName + producer *producers.UserAPIReadProducer } // NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. @@ -49,15 +55,18 @@ func NewOutputClientDataConsumer( store storage.Database, notifier *notifier.Notifier, stream types.StreamProvider, + producer *producers.UserAPIReadProducer, ) *OutputClientDataConsumer { return &OutputClientDataConsumer{ - ctx: process.Context(), - jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), - durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"), - db: store, - notifier: notifier, - stream: stream, + ctx: process.Context(), + jetstream: js, + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), + durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"), + db: store, + notifier: notifier, + stream: stream, + serverName: cfg.Matrix.ServerName, + producer: producer, } } @@ -100,8 +109,48 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg) }).Panicf("could not save account data") } + if err = s.sendReadUpdate(ctx, userID, output); err != nil { + log.WithError(err).WithFields(logrus.Fields{ + "user_id": userID, + "room_id": output.RoomID, + }).Errorf("Failed to generate read update") + sentry.CaptureException(err) + return false + } + s.stream.Advance(streamPos) s.notifier.OnNewAccountData(userID, types.StreamingToken{AccountDataPosition: streamPos}) return true } + +func (s *OutputClientDataConsumer) sendReadUpdate(ctx context.Context, userID string, output eventutil.AccountData) error { + if output.Type != "m.fully_read" || output.ReadMarker == nil { + return nil + } + _, serverName, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return fmt.Errorf("gomatrixserverlib.SplitID: %w", err) + } + if serverName != s.serverName { + return nil + } + var readPos types.StreamPosition + var fullyReadPos types.StreamPosition + if output.ReadMarker.Read != "" { + if _, readPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.Read); err != nil { + return fmt.Errorf("s.db.PositionInTopology (Read): %w", err) + } + } + if output.ReadMarker.FullyRead != "" { + if _, fullyReadPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.FullyRead); err != nil { + return fmt.Errorf("s.db.PositionInTopology (FullyRead): %w", err) + } + } + if readPos > 0 || fullyReadPos > 0 { + if err := s.producer.SendReadUpdate(userID, output.RoomID, readPos, fullyReadPos); err != nil { + return fmt.Errorf("s.producer.SendReadUpdate: %w", err) + } + } + return nil +} diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 392840ece..881583449 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -17,6 +17,7 @@ package consumers import ( "context" "encoding/json" + "fmt" "github.com/getsentry/sentry-go" "github.com/matrix-org/dendrite/eduserver/api" @@ -24,21 +25,26 @@ import ( "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/producers" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) // OutputReceiptEventConsumer consumes events that originated in the EDU server. type OutputReceiptEventConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - topic string - db storage.Database - stream types.StreamProvider - notifier *notifier.Notifier + ctx context.Context + jetstream nats.JetStreamContext + durable string + topic string + db storage.Database + stream types.StreamProvider + notifier *notifier.Notifier + serverName gomatrixserverlib.ServerName + producer *producers.UserAPIReadProducer } // NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer. @@ -50,15 +56,18 @@ func NewOutputReceiptEventConsumer( store storage.Database, notifier *notifier.Notifier, stream types.StreamProvider, + producer *producers.UserAPIReadProducer, ) *OutputReceiptEventConsumer { return &OutputReceiptEventConsumer{ - ctx: process.Context(), - jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), - durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"), - db: store, - notifier: notifier, - stream: stream, + ctx: process.Context(), + jetstream: js, + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"), + db: store, + notifier: notifier, + stream: stream, + serverName: cfg.Matrix.ServerName, + producer: producer, } } @@ -92,8 +101,42 @@ func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Ms return true } + if err = s.sendReadUpdate(ctx, output); err != nil { + log.WithError(err).WithFields(logrus.Fields{ + "user_id": output.UserID, + "room_id": output.RoomID, + }).Errorf("Failed to generate read update") + sentry.CaptureException(err) + return false + } + s.stream.Advance(streamPos) s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos}) return true } + +func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output api.OutputReceiptEvent) error { + if output.Type != "m.read" { + return nil + } + _, serverName, err := gomatrixserverlib.SplitID('@', output.UserID) + if err != nil { + return fmt.Errorf("gomatrixserverlib.SplitID: %w", err) + } + if serverName != s.serverName { + return nil + } + var readPos types.StreamPosition + if output.EventID != "" { + if _, readPos, err = s.db.PositionInTopology(ctx, output.EventID); err != nil { + return fmt.Errorf("s.db.PositionInTopology (Read): %w", err) + } + } + if readPos > 0 { + if err := s.producer.SendReadUpdate(output.UserID, output.RoomID, readPos, 0); err != nil { + return fmt.Errorf("s.producer.SendReadUpdate: %w", err) + } + } + return nil +} diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 15485bb35..159657f9f 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/producers" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" @@ -45,6 +46,7 @@ type OutputRoomEventConsumer struct { pduStream types.StreamProvider inviteStream types.StreamProvider notifier *notifier.Notifier + producer *producers.UserAPIStreamEventProducer } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. @@ -57,6 +59,7 @@ func NewOutputRoomEventConsumer( pduStream types.StreamProvider, inviteStream types.StreamProvider, rsAPI api.RoomserverInternalAPI, + producer *producers.UserAPIStreamEventProducer, ) *OutputRoomEventConsumer { return &OutputRoomEventConsumer{ ctx: process.Context(), @@ -69,6 +72,7 @@ func NewOutputRoomEventConsumer( pduStream: pduStream, inviteStream: inviteStream, rsAPI: rsAPI, + producer: producer, } } @@ -194,6 +198,12 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( return nil } + if err = s.producer.SendStreamEvent(ev.RoomID(), ev, pduPos); err != nil { + log.WithError(err).Errorf("Failed to send stream output event for event %s", ev.EventID()) + sentry.CaptureException(err) + return err + } + if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil { log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos) sentry.CaptureException(err) diff --git a/syncapi/consumers/pushserver.go b/syncapi/consumers/userapi.go similarity index 95% rename from syncapi/consumers/pushserver.go rename to syncapi/consumers/userapi.go index 76da03274..a3b2dd53d 100644 --- a/syncapi/consumers/pushserver.go +++ b/syncapi/consumers/userapi.go @@ -83,7 +83,7 @@ func (s *OutputNotificationDataConsumer) onMessage(ctx context.Context, msg *nat var data eventutil.NotificationData if err := json.Unmarshal(msg.Data, &data); err != nil { sentry.CaptureException(err) - log.WithField("user_id", userID).WithError(err).Error("push server consumer: message parse failure") + log.WithField("user_id", userID).WithError(err).Error("user API consumer: message parse failure") return true } @@ -104,7 +104,7 @@ func (s *OutputNotificationDataConsumer) onMessage(ctx context.Context, msg *nat "user_id": userID, "room_id": data.RoomID, "streamPos": streamPos, - }).Info("Received data from Push server") + }).Trace("Received notification data from user API") return true } diff --git a/syncapi/producers/userapi_readupdate.go b/syncapi/producers/userapi_readupdate.go new file mode 100644 index 000000000..d56cab776 --- /dev/null +++ b/syncapi/producers/userapi_readupdate.go @@ -0,0 +1,62 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "encoding/json" + + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" +) + +// UserAPIProducer produces events for the user API server to consume +type UserAPIReadProducer struct { + Topic string + JetStream nats.JetStreamContext +} + +// SendData sends account data to the user API server +func (p *UserAPIReadProducer) SendReadUpdate(userID, roomID string, readPos, fullyReadPos types.StreamPosition) error { + m := &nats.Msg{ + Subject: p.Topic, + Header: nats.Header{}, + } + m.Header.Set(jetstream.UserID, userID) + m.Header.Set(jetstream.RoomID, roomID) + + data := types.ReadUpdate{ + UserID: userID, + RoomID: roomID, + Read: readPos, + FullyRead: fullyReadPos, + } + var err error + m.Data, err = json.Marshal(data) + if err != nil { + return err + } + + log.WithFields(log.Fields{ + "user_id": userID, + "room_id": roomID, + "read_pos": readPos, + "fully_read_pos": fullyReadPos, + }).Tracef("Producing to topic '%s'", p.Topic) + + _, err = p.JetStream.PublishMsg(m) + return err +} diff --git a/syncapi/producers/userapi_streamevent.go b/syncapi/producers/userapi_streamevent.go new file mode 100644 index 000000000..2bbd19c0b --- /dev/null +++ b/syncapi/producers/userapi_streamevent.go @@ -0,0 +1,60 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "encoding/json" + + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" +) + +// UserAPIProducer produces events for the user API server to consume +type UserAPIStreamEventProducer struct { + Topic string + JetStream nats.JetStreamContext +} + +// SendData sends account data to the user API server +func (p *UserAPIStreamEventProducer) SendStreamEvent(roomID string, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) error { + m := &nats.Msg{ + Subject: p.Topic, + Header: nats.Header{}, + } + m.Header.Set(jetstream.RoomID, roomID) + + data := types.StreamedEvent{ + Event: event, + StreamPosition: pos, + } + var err error + m.Data, err = json.Marshal(data) + if err != nil { + return err + } + + log.WithFields(log.Fields{ + "room_id": roomID, + "event_id": event.EventID(), + "event_type": event.Type(), + "stream_pos": pos, + }).Tracef("Producing to topic '%s'", p.Topic) + + _, err = p.JetStream.PublishMsg(m) + return err +} diff --git a/syncapi/storage/postgres/notification_data_table.go b/syncapi/storage/postgres/notification_data_table.go index 4f2f0f3a5..f3fc4451f 100644 --- a/syncapi/storage/postgres/notification_data_table.go +++ b/syncapi/storage/postgres/notification_data_table.go @@ -15,12 +15,35 @@ package postgres import ( + "context" "database/sql" - "github.com/matrix-org/dendrite/syncapi/storage/shared" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/eventutil" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/syncapi/storage/tables" + "github.com/matrix-org/dendrite/syncapi/types" ) +func NewPostgresNotificationDataTable(db *sql.DB) (tables.NotificationData, error) { + _, err := db.Exec(notificationDataSchema) + if err != nil { + return nil, err + } + r := ¬ificationDataStatements{} + return r, sqlutil.StatementList{ + {&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL}, + {&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL}, + {&r.selectMaxID, selectMaxNotificationIDSQL}, + }.Prepare(db) +} + +type notificationDataStatements struct { + upsertRoomUnreadCounts *sql.Stmt + selectUserUnreadCounts *sql.Stmt + selectMaxID *sql.Stmt +} + const notificationDataSchema = ` CREATE TABLE IF NOT EXISTS syncapi_notification_data ( id BIGSERIAL PRIMARY KEY, @@ -31,10 +54,55 @@ CREATE TABLE IF NOT EXISTS syncapi_notification_data ( CONSTRAINT syncapi_notification_data_unique UNIQUE (user_id, room_id) );` -func NewPostgresNotificationDataTable(db *sql.DB) (tables.NotificationData, error) { - _, err := db.Exec(notificationDataSchema) +const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_data + (user_id, room_id, notification_count, highlight_count) + VALUES ($1, $2, $3, $4) + ON CONFLICT (user_id, room_id) + DO UPDATE SET notification_count = $3, highlight_count = $4 + RETURNING id` + +const selectUserUnreadNotificationCountsSQL = `SELECT + id, room_id, notification_count, highlight_count + FROM syncapi_notification_data + WHERE + user_id = $1 AND + id BETWEEN $2 + 1 AND $3` + +const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data` + +func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) { + err = r.upsertRoomUnreadCounts.QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos) + return +} + +func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) { + rows, err := r.selectUserUnreadCounts.QueryContext(ctx, userID, fromExcl, toIncl) if err != nil { return nil, err } - return shared.NewNotificationDataTable(db) + defer internal.CloseAndLogIfError(ctx, rows, "SelectUserUnreadCounts: rows.close() failed") + + roomCounts := map[string]*eventutil.NotificationData{} + for rows.Next() { + var id types.StreamPosition + var roomID string + var notificationCount, highlightCount int + + if err = rows.Scan(&id, &roomID, ¬ificationCount, &highlightCount); err != nil { + return nil, err + } + + roomCounts[roomID] = &eventutil.NotificationData{ + RoomID: roomID, + UnreadNotificationCount: notificationCount, + UnreadHighlightCount: highlightCount, + } + } + return roomCounts, rows.Err() +} + +func (r *notificationDataStatements) SelectMaxID(ctx context.Context) (int64, error) { + var id int64 + err := r.selectMaxID.QueryRowContext(ctx).Scan(&id) + return id, err } diff --git a/syncapi/storage/shared/notification_data_table.go b/syncapi/storage/shared/notification_data_table.go deleted file mode 100644 index 8b9d341b3..000000000 --- a/syncapi/storage/shared/notification_data_table.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package shared - -import ( - "context" - "database/sql" - - "github.com/matrix-org/dendrite/internal" - "github.com/matrix-org/dendrite/internal/eventutil" - "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/syncapi/storage/tables" - "github.com/matrix-org/dendrite/syncapi/types" -) - -type notificationDataStatements struct { - upsertRoomUnreadCounts *sql.Stmt - selectUserUnreadCounts *sql.Stmt - selectMaxID *sql.Stmt -} - -func NewNotificationDataTable(db *sql.DB) (tables.NotificationData, error) { - r := ¬ificationDataStatements{} - return r, sqlutil.StatementList{ - {&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL}, - {&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL}, - {&r.selectMaxID, selectMaxNotificationIDSQL}, - }.Prepare(db) -} - -const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_data - (user_id, room_id, notification_count, highlight_count) - VALUES ($1, $2, $3, $4) - ON CONFLICT (user_id, room_id) - DO UPDATE SET notification_count = $3, highlight_count = $4 - RETURNING id` - -func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) { - err = r.upsertRoomUnreadCounts.QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos) - return -} - -const selectUserUnreadNotificationCountsSQL = `SELECT - id, room_id, notification_count, highlight_count - FROM syncapi_notification_data - WHERE - user_id = $1 AND - id BETWEEN $2 + 1 AND $3` - -func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) { - rows, err := r.selectUserUnreadCounts.QueryContext(ctx, userID, fromExcl, toIncl) - if err != nil { - return nil, err - } - defer internal.CloseAndLogIfError(ctx, rows, "SelectUserUnreadCounts: rows.close() failed") - - roomCounts := map[string]*eventutil.NotificationData{} - for rows.Next() { - var id types.StreamPosition - var roomID string - var notificationCount, highlightCount int - - if err = rows.Scan(&id, &roomID, ¬ificationCount, &highlightCount); err != nil { - return nil, err - } - - roomCounts[roomID] = &eventutil.NotificationData{ - RoomID: roomID, - UnreadNotificationCount: notificationCount, - UnreadHighlightCount: highlightCount, - } - } - return roomCounts, rows.Err() -} - -const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data` - -func (r *notificationDataStatements) SelectMaxID(ctx context.Context) (int64, error) { - var id int64 - err := r.selectMaxID.QueryRowContext(ctx).Scan(&id) - return id, err -} diff --git a/syncapi/storage/sqlite3/notification_data_table.go b/syncapi/storage/sqlite3/notification_data_table.go index 36292e23e..4b3f074db 100644 --- a/syncapi/storage/sqlite3/notification_data_table.go +++ b/syncapi/storage/sqlite3/notification_data_table.go @@ -15,12 +15,35 @@ package sqlite3 import ( + "context" "database/sql" - "github.com/matrix-org/dendrite/syncapi/storage/shared" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/eventutil" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/syncapi/storage/tables" + "github.com/matrix-org/dendrite/syncapi/types" ) +func NewSqliteNotificationDataTable(db *sql.DB) (tables.NotificationData, error) { + _, err := db.Exec(notificationDataSchema) + if err != nil { + return nil, err + } + r := ¬ificationDataStatements{} + return r, sqlutil.StatementList{ + {&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL}, + {&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL}, + {&r.selectMaxID, selectMaxNotificationIDSQL}, + }.Prepare(db) +} + +type notificationDataStatements struct { + upsertRoomUnreadCounts *sql.Stmt + selectUserUnreadCounts *sql.Stmt + selectMaxID *sql.Stmt +} + const notificationDataSchema = ` CREATE TABLE IF NOT EXISTS syncapi_notification_data ( id INTEGER PRIMARY KEY, @@ -31,10 +54,55 @@ CREATE TABLE IF NOT EXISTS syncapi_notification_data ( CONSTRAINT syncapi_notifications_unique UNIQUE (user_id, room_id) );` -func NewSqliteNotificationDataTable(db *sql.DB) (tables.NotificationData, error) { - _, err := db.Exec(notificationDataSchema) +const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_data + (user_id, room_id, notification_count, highlight_count) + VALUES ($1, $2, $3, $4) + ON CONFLICT (user_id, room_id) + DO UPDATE SET notification_count = $3, highlight_count = $4 + RETURNING id` + +const selectUserUnreadNotificationCountsSQL = `SELECT + id, room_id, notification_count, highlight_count + FROM syncapi_notification_data + WHERE + user_id = $1 AND + id BETWEEN $2 + 1 AND $3` + +const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data` + +func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) { + err = r.upsertRoomUnreadCounts.QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos) + return +} + +func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) { + rows, err := r.selectUserUnreadCounts.QueryContext(ctx, userID, fromExcl, toIncl) if err != nil { return nil, err } - return shared.NewNotificationDataTable(db) + defer internal.CloseAndLogIfError(ctx, rows, "SelectUserUnreadCounts: rows.close() failed") + + roomCounts := map[string]*eventutil.NotificationData{} + for rows.Next() { + var id types.StreamPosition + var roomID string + var notificationCount, highlightCount int + + if err = rows.Scan(&id, &roomID, ¬ificationCount, &highlightCount); err != nil { + return nil, err + } + + roomCounts[roomID] = &eventutil.NotificationData{ + RoomID: roomID, + UnreadNotificationCount: notificationCount, + UnreadHighlightCount: highlightCount, + } + } + return roomCounts, rows.Err() +} + +func (r *notificationDataStatements) SelectMaxID(ctx context.Context) (int64, error) { + var id int64 + err := r.selectMaxID.QueryRowContext(ctx).Scan(&id) + return id, err } diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 581ee6928..1b256f91a 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -62,16 +62,19 @@ const selectEventsSQL = "" + const selectRecentEventsSQL = "" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters const selectRecentEventsForSyncSQL = "" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" + // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters const selectEarlyEventsSQL = "" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters const selectMaxEventIDSQL = "" + @@ -85,6 +88,7 @@ const selectStateInRangeSQL = "" + " FROM syncapi_output_room_events" + " WHERE (id > $1 AND id <= $2)" + " AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))" + // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters const deleteEventsForRoomSQL = "" + @@ -95,10 +99,12 @@ const selectContextEventSQL = "" + const selectContextBeforeEventSQL = "" + "SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" + // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters const selectContextAfterEventSQL = "" + "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" + // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters type outputRoomEventsStatements struct { diff --git a/syncapi/streams/stream_notificationdata.go b/syncapi/streams/stream_notificationdata.go index c2ad0823d..8ba9e07ca 100644 --- a/syncapi/streams/stream_notificationdata.go +++ b/syncapi/streams/stream_notificationdata.go @@ -33,7 +33,7 @@ func (p *NotificationDataStreamProvider) IncrementalSync( from, to types.StreamPosition, ) types.StreamPosition { // We want counts for all possible rooms, so always start from zero. - countsByRoom, err := p.DB.GetUserUnreadNotificationCounts(ctx, req.Device.UserID, 0, to) + countsByRoom, err := p.DB.GetUserUnreadNotificationCounts(ctx, req.Device.UserID, from, to) if err != nil { req.Log.WithError(err).Error("GetUserUnreadNotificationCounts failed") return from diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 3462fed04..cb9890ff7 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -31,6 +31,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/consumers" "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/producers" "github.com/matrix-org/dendrite/syncapi/routing" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/streams" @@ -64,6 +65,18 @@ func AddPublicRoutes( requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier) + userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{ + JetStream: js, + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent), + } + + userAPIReadUpdateProducer := &producers.UserAPIReadProducer{ + JetStream: js, + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate), + } + + _ = userAPIReadUpdateProducer + keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( process, cfg, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), js, keyAPI, rsAPI, syncDB, notifier, @@ -75,7 +88,7 @@ func AddPublicRoutes( roomConsumer := consumers.NewOutputRoomEventConsumer( process, cfg, js, syncDB, notifier, streams.PDUStreamProvider, - streams.InviteStreamProvider, rsAPI, + streams.InviteStreamProvider, rsAPI, userAPIStreamEventProducer, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer") @@ -83,6 +96,7 @@ func AddPublicRoutes( clientConsumer := consumers.NewOutputClientDataConsumer( process, cfg, js, syncDB, notifier, streams.AccountDataStreamProvider, + userAPIReadUpdateProducer, ) if err = clientConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start client data consumer") @@ -111,6 +125,7 @@ func AddPublicRoutes( receiptConsumer := consumers.NewOutputReceiptEventConsumer( process, cfg, js, syncDB, notifier, streams.ReceiptStreamProvider, + userAPIReadUpdateProducer, ) if err = receiptConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start receipts consumer") diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 540f3f0f9..4150e6c98 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -474,3 +474,16 @@ type Peek struct { New bool Deleted bool } + +type ReadUpdate struct { + UserID string `json:"user_id"` + RoomID string `json:"room_id"` + Read StreamPosition `json:"read,omitempty"` + FullyRead StreamPosition `json:"fully_read,omitempty"` +} + +// StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event. +type StreamedEvent struct { + Event *gomatrixserverlib.HeaderedEvent `json:"event"` + StreamPosition StreamPosition `json:"stream_position"` +} diff --git a/sytest-blacklist b/sytest-blacklist index e8617dcdf..7f518b21a 100644 --- a/sytest-blacklist +++ b/sytest-blacklist @@ -30,3 +30,9 @@ Local device key changes appear in /keys/changes Remove group category Remove group role +# Flakey +AS-ghosted users can use rooms themselves + +# Flakey, need additional investigation +Messages that notify from another user increment notification_count +Messages that highlight from another user increment unread highlight count diff --git a/sytest-whitelist b/sytest-whitelist index 42260b33c..602f86465 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -78,7 +78,7 @@ Room creation reports m.room.member to myself Outbound federation rejects send_join responses with no m.room.create event Outbound federation rejects m.room.create events with an unknown room version Invited user can see room metadata -# Blacklisted because these tests call /r0/events which we don't implement +# Blacklisted because these tests call /v3/events which we don't implement # New room members see their own join event # Existing members see new members' join events setting 'm.room.power_levels' respects room powerlevel @@ -257,7 +257,7 @@ Guest non-joined users cannot send messages to guest_access rooms if not joined Real non-joined users cannot room initalSync for non-world_readable rooms Push rules come down in an initial /sync Regular users can add and delete aliases in the default room configuration -GET /r0/capabilities is not public +GET /v3/capabilities is not public GET /joined_rooms lists newly-created room /joined_rooms returns only joined rooms Message history can be paginated over federation @@ -366,8 +366,8 @@ Outbound federation will ignore a missing event with bad JSON for room version 6 Server correctly handles transactions that break edu limits Server rejects invalid JSON in a version 6 room Can download without a file name over federation -POST /media/r0/upload can create an upload -GET /media/r0/download can fetch the value again +POST /media/v3/upload can create an upload +GET /media/v3/download can fetch the value again Remote users can join room by alias Alias creators can delete alias with no ops Alias creators can delete canonical alias with no ops @@ -603,7 +603,6 @@ Can add global push rule after an existing rule Can delete a push rule Can disable a push rule Adding the same push rule twice is idempotent -Messages that notify from another user increment notification_count Can change the actions of default rules Can change the actions of a user specified rule Adding a push rule wakes up an incremental /sync @@ -642,7 +641,6 @@ Test that rejected pushers are removed. Notifications can be viewed with GET /notifications Trying to add push rule with no scope fails with 400 Trying to add push rule with invalid scope fails with 400 -Messages that highlight from another user increment unread highlight count Forward extremities remain so even after the next events are populated as outliers If a device list update goes missing, the server resyncs on the next one uploading self-signing key notifies over federation @@ -662,4 +660,4 @@ registration accepts non-ascii passwords registration with inhibit_login inhibits login The operation must be consistent through an interactive authentication session Multiple calls to /sync should not cause 500 errors - +/context/ with lazy_load_members filter works diff --git a/userapi/consumers/clientapi.go b/userapi/consumers/clientapi.go deleted file mode 100644 index 60936502e..000000000 --- a/userapi/consumers/clientapi.go +++ /dev/null @@ -1,191 +0,0 @@ -package consumers - -import ( - "context" - "encoding/json" - - "github.com/matrix-org/dendrite/internal/eventutil" - "github.com/matrix-org/dendrite/internal/pushgateway" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/setup/process" - uapi "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/dendrite/userapi/producers" - "github.com/matrix-org/dendrite/userapi/storage" - "github.com/matrix-org/dendrite/userapi/util" - "github.com/matrix-org/gomatrixserverlib" - "github.com/nats-io/nats.go" - log "github.com/sirupsen/logrus" -) - -type OutputClientDataConsumer struct { - ctx context.Context - cfg *config.UserAPI - jetstream nats.JetStreamContext - durable string - db storage.Database - pgClient pushgateway.Client - ServerName gomatrixserverlib.ServerName - topic string - userAPI uapi.UserInternalAPI - syncProducer *producers.SyncAPI -} - -func NewOutputClientDataConsumer( - process *process.ProcessContext, - cfg *config.UserAPI, - js nats.JetStreamContext, - store storage.Database, - pgClient pushgateway.Client, - userAPI uapi.UserInternalAPI, - syncProducer *producers.SyncAPI, -) *OutputClientDataConsumer { - return &OutputClientDataConsumer{ - ctx: process.Context(), - cfg: cfg, - jetstream: js, - db: store, - ServerName: cfg.Matrix.ServerName, - durable: cfg.Matrix.JetStream.Durable("UserAPIClientAPIConsumer"), - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), - pgClient: pgClient, - userAPI: userAPI, - syncProducer: syncProducer, - } -} - -func (s *OutputClientDataConsumer) Start() error { - if err := jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, - nats.DeliverAll(), nats.ManualAck(), - ); err != nil { - return err - } - return nil -} - -func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { - var event eventutil.AccountData - if err := json.Unmarshal(msg.Data, &event); err != nil { - log.WithError(err).Error("pushserver clientapi consumer: message parse failure") - return true - } - - if event.Type != mFullyRead { - return true - } - - userID := string(msg.Header.Get("user_id")) - localpart, domain, err := gomatrixserverlib.SplitID('@', userID) - if err != nil { - log.WithFields(log.Fields{ - "user_id": userID, - "room_id": event.RoomID, - "event_type": event.Type, - }).WithError(err).Error("pushserver clientapi consumer: SplitID failure") - return true - } - - if domain != s.ServerName { - log.WithFields(log.Fields{ - "user_id": userID, - "room_id": event.RoomID, - "event_type": event.Type, - }).Error("pushserver clientapi consumer: not a local user") - return true - } - - log.WithFields(log.Fields{ - "localpart": localpart, - "room_id": event.RoomID, - "event_type": event.Type, - }).Tracef("Received message from clientapi: %#v", event) - - userReq := uapi.QueryAccountDataRequest{ - UserID: userID, - RoomID: event.RoomID, - DataType: mFullyRead, - } - var userRes uapi.QueryAccountDataResponse - if err = s.userAPI.QueryAccountData(ctx, &userReq, &userRes); err != nil { - log.WithFields(log.Fields{ - "localpart": localpart, - "room_id": event.RoomID, - "event_type": event.Type, - }).WithError(err).Error("pushserver clientapi consumer: failed to query account data") - return false - } - ad, ok := userRes.RoomAccountData[event.RoomID] - if !ok { - log.WithFields(log.Fields{ - "localpart": localpart, - "room_id": event.RoomID, - }).Errorf("pushserver clientapi consumer: room not found in account data response: %#v", userRes.RoomAccountData) - return true - } - bs, ok := ad[mFullyRead] - if !ok { - log.WithFields(log.Fields{ - "localpart": localpart, - "room_id": event.RoomID, - }).Errorf("pushserver clientapi consumer: m.fully_read not found in account data: %#v", ad) - return true - } - var data fullyReadAccountData - if err = json.Unmarshal([]byte(bs), &data); err != nil { - log.WithFields(log.Fields{ - "localpart": localpart, - "room_id": event.RoomID, - }).WithError(err).Error("pushserver clientapi consumer: json.Unmarshal of m.fully_read failed") - return true - } - - // TODO: we cannot know if this EventID caused a notification, so - // we should first resolve it and find the closest earlier - // notification. - deleted, err := s.db.DeleteNotificationsUpTo(ctx, localpart, event.RoomID, data.EventID) - if err != nil { - log.WithFields(log.Fields{ - "localpart": localpart, - "room_id": event.RoomID, - "event_id": data.EventID, - }).WithError(err).Errorf("pushserver clientapi consumer: DeleteNotificationsUpTo failed") - return false - } - - if deleted { - if err := util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil { - log.WithFields(log.Fields{ - "localpart": localpart, - "room_id": event.RoomID, - "event_id": data.EventID, - }).WithError(err).Error("pushserver clientapi consumer: NotifyUserCounts failed") - return false - } - - if err := s.syncProducer.GetAndSendNotificationData(ctx, userID, event.RoomID); err != nil { - log.WithFields(log.Fields{ - "localpart": localpart, - "room_id": event.RoomID, - "event_id": data.EventID, - }).WithError(err).Errorf("pushserver clientapi consumer: GetAndSendNotificationData failed") - return false - } - } - - return true -} - -// mFullyRead is the account data type for the marker for the event up -// to which the user has read. -const mFullyRead = "m.fully_read" - -// A fullyReadAccountData is what the m.fully_read account data value -// contains. -// -// TODO: this is duplicated with -// clientapi/routing/account_data.go. Should probably move to -// eventutil. -type fullyReadAccountData struct { - EventID string `json:"event_id"` -} diff --git a/userapi/consumers/eduserver.go b/userapi/consumers/eduserver.go deleted file mode 100644 index 941c36b11..000000000 --- a/userapi/consumers/eduserver.go +++ /dev/null @@ -1,119 +0,0 @@ -package consumers - -import ( - "context" - "encoding/json" - - eduapi "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/internal/pushgateway" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/setup/process" - "github.com/matrix-org/dendrite/userapi/producers" - "github.com/matrix-org/dendrite/userapi/storage" - "github.com/matrix-org/dendrite/userapi/util" - "github.com/matrix-org/gomatrixserverlib" - "github.com/nats-io/nats.go" - log "github.com/sirupsen/logrus" -) - -type OutputReceiptEventConsumer struct { - ctx context.Context - cfg *config.UserAPI - jetstream nats.JetStreamContext - durable string - db storage.Database - pgClient pushgateway.Client - receiptTopic string - syncProducer *producers.SyncAPI -} - -// NewOutputReceiptEventConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers. -func NewOutputReceiptEventConsumer( - process *process.ProcessContext, - cfg *config.UserAPI, - js nats.JetStreamContext, - store storage.Database, - pgClient pushgateway.Client, - syncProducer *producers.SyncAPI, -) *OutputReceiptEventConsumer { - return &OutputReceiptEventConsumer{ - ctx: process.Context(), - cfg: cfg, - jetstream: js, - db: store, - durable: cfg.Matrix.JetStream.Durable("UserAPIEDUServerConsumer"), - receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), - pgClient: pgClient, - syncProducer: syncProducer, - } -} - -func (s *OutputReceiptEventConsumer) Start() error { - if err := jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.receiptTopic, s.durable, s.onMessage, - nats.DeliverAll(), nats.ManualAck(), - ); err != nil { - return err - } - return nil -} - -func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { - var event eduapi.OutputReceiptEvent - if err := json.Unmarshal(msg.Data, &event); err != nil { - log.WithError(err).Errorf("pushserver EDU consumer: message parse failure") - return true - } - - localpart, domain, err := gomatrixserverlib.SplitID('@', event.UserID) - if err != nil { - return true - } - if domain != s.cfg.Matrix.ServerName { - return true - } - - log.WithFields(log.Fields{ - "localpart": localpart, - "room_id": event.RoomID, - "event_id": event.EventID, - "event_type": event.Type, - }).Tracef("Received message from EDU server: %#v", event) - - // TODO: we cannot know if this EventID caused a notification, so - // we should first resolve it and find the closest earlier - // notification. - updated, err := s.db.SetNotificationsRead(ctx, localpart, event.RoomID, event.EventID, true) - if err != nil { - log.WithFields(log.Fields{ - "localpart": localpart, - "room_id": event.RoomID, - "event_id": event.EventID, - }).WithError(err).Error("pushserver EDU consumer") - return false - } - - if updated { - if err := s.syncProducer.GetAndSendNotificationData(ctx, event.UserID, event.RoomID); err != nil { - log.WithFields(log.Fields{ - "localpart": localpart, - "room_id": event.RoomID, - "event_id": event.EventID, - }).WithError(err).Error("pushserver EDU consumer: GetAndSendNotificationData failed") - return false - } - - if err := util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil { - log.WithFields(log.Fields{ - "localpart": localpart, - "room_id": event.RoomID, - "event_id": event.EventID, - }).WithError(err).Error("pushserver EDU consumer: NotifyUserCounts failed") - return false - } - - } - - return true -} diff --git a/userapi/consumers/roomserver_test.go b/userapi/consumers/roomserver_test.go deleted file mode 100644 index 4c3920024..000000000 --- a/userapi/consumers/roomserver_test.go +++ /dev/null @@ -1,256 +0,0 @@ -package consumers - -import ( - "context" - "encoding/json" - "sync" - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/matrix-org/dendrite/internal/pushgateway" - "github.com/matrix-org/dendrite/internal/pushrules" - rsapi "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/dendrite/userapi/producers" - "github.com/matrix-org/dendrite/userapi/storage" - "github.com/matrix-org/gomatrixserverlib" - "github.com/nats-io/nats.go" -) - -const serverName = gomatrixserverlib.ServerName("example.org") - -func TestOutputRoomEventConsumer(t *testing.T) { - t.SkipNow() // TODO: Come back to this test! - - ctx := context.Background() - - dbopts := &config.DatabaseOptions{ - ConnectionString: "file::memory:", - MaxOpenConnections: 1, - MaxIdleConnections: 1, - } - db, err := storage.NewDatabase(dbopts, serverName, 5, 0, 0) - if err != nil { - t.Fatalf("NewDatabase failed: %v", err) - } - err = db.UpsertPusher(ctx, - api.Pusher{ - PushKey: "apushkey", - Kind: api.HTTPKind, - AppID: "anappid", - Data: map[string]interface{}{ - "url": "http://example.org/pusher/notify", - "extra": "someextra", - }, - }, - "alice") - if err != nil { - t.Fatalf("UpsertPusher failed: %v", err) - } - - var rsAPI fakeRoomServerInternalAPI - var userAPI fakeUserInternalAPI - var messageSender fakeMessageSender - var wg sync.WaitGroup - wg.Add(1) - pgClient := fakePushGatewayClient{ - WG: &wg, - } - s := &OutputRoomEventConsumer{ - cfg: &config.UserAPI{ - Matrix: &config.Global{ - ServerName: serverName, - }, - }, - db: db, - rsAPI: &rsAPI, - userAPI: &userAPI, - pgClient: &pgClient, - syncProducer: producers.NewSyncAPI(db, &messageSender, "clientDataTopic", "notificationDataTopic"), - } - - event, err := gomatrixserverlib.NewEventFromTrustedJSONWithEventID("$143273582443PhrSn:example.org", []byte(`{ - "content": { - "body": "This is an example text message", - "format": "org.matrix.custom.html", - "formatted_body": "\u003cb\u003eThis is an example text message\u003c/b\u003e", - "msgtype": "m.text" - }, - "origin_server_ts": 1432735824653, - "room_id": "!jEsUZKDJdhlrceRyVU:example.org", - "sender": "@example:example.org", - "type": "m.room.message", - "unsigned": { - "age": 1234 - } -}`), false, gomatrixserverlib.RoomVersionV7) - if err != nil { - t.Fatalf("NewEventFromTrustedJSON failed: %v", err) - } - - ev := &gomatrixserverlib.HeaderedEvent{ - Event: event, - } - if err := s.processMessage(ctx, ev); err != nil { - t.Fatalf("processMessage failed: %v", err) - } - - t.Log("Waiting for backend calls to finish.") - wg.Wait() - - if diff := cmp.Diff([]*rsapi.QueryMembershipsForRoomRequest{{JoinedOnly: true, RoomID: "!jEsUZKDJdhlrceRyVU:example.org"}}, rsAPI.MembershipReqs); diff != "" { - t.Errorf("rsAPI.QueryMembershipsForRoom Reqs: +got -want:\n%s", diff) - } - if diff := cmp.Diff([]*pushgateway.NotifyRequest{{ - Notification: pushgateway.Notification{ - Type: "m.room.message", - Content: event.Content(), - Counts: &pushgateway.Counts{ - Unread: 1, - }, - Devices: []*pushgateway.Device{{ - AppID: "anappid", - PushKey: "apushkey", - Data: map[string]interface{}{ - "extra": "someextra", - }, - }}, - EventID: "$143273582443PhrSn:example.org", - ID: "$143273582443PhrSn:example.org", - RoomID: "!jEsUZKDJdhlrceRyVU:example.org", - RoomName: "aname", - Sender: "@example:example.org", - }, - }}, pgClient.Reqs); diff != "" { - t.Errorf("pgClient.NotifyHTTP Reqs: +got -want:\n%s", diff) - } - msg := &nats.Msg{ - Subject: "notificationDataTopic", - Header: nats.Header{}, - Data: []byte(`{"room_id":"!jEsUZKDJdhlrceRyVU:example.org","unread_highlight_count":0,"unread_notification_count":1}`), - } - msg.Header.Set("user_id", "@alice:example.org") - if diff := cmp.Diff([]*nats.Msg{msg}, messageSender.Messages, cmpopts.IgnoreUnexported(nats.Msg{})); diff != "" { - t.Errorf("SendMessage Messages: +got -want:\n%s", diff) - } -} - -type fakeRoomServerInternalAPI struct { - rsapi.RoomserverInternalAPI - - MembershipReqs []*rsapi.QueryMembershipsForRoomRequest -} - -func (s *fakeRoomServerInternalAPI) QueryCurrentState( - ctx context.Context, - req *rsapi.QueryCurrentStateRequest, - res *rsapi.QueryCurrentStateResponse, -) error { - *res = rsapi.QueryCurrentStateResponse{ - StateEvents: map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent{ - roomNameTuple: mustParseHeaderedEventJSON(`{ - "_room_version": "7", - "content": { - "name": "aname" - }, - "event_id": "$3957tyerfgewrf382:example.org", - "origin_server_ts": 1432735824652, - "room_id": "!jEsUZKDJdhlrceRyVU:example.org", - "sender": "@example:example.org", - "state_key": "@alice:example.org", - "type": "m.room.name" -}`), - }, - } - return nil -} - -func (s *fakeRoomServerInternalAPI) QueryMembershipsForRoom( - ctx context.Context, - req *rsapi.QueryMembershipsForRoomRequest, - res *rsapi.QueryMembershipsForRoomResponse, -) error { - s.MembershipReqs = append(s.MembershipReqs, req) - *res = rsapi.QueryMembershipsForRoomResponse{ - JoinEvents: []gomatrixserverlib.ClientEvent{ - mustParseClientEventJSON(`{ - "content": { - "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF", - "displayname": "Alice Margatroid", - "membership": "join", - "reason": "Looking for support" - }, - "event_id": "$3957tyerfgewrf384:example.org", - "origin_server_ts": 1432735824653, - "room_id": "!jEsUZKDJdhlrceRyVU:example.org", - "sender": "@example:example.org", - "state_key": "@alice:example.org", - "type": "m.room.member", - "unsigned": { - "age": 1234 - } -}`), - }, - } - return nil -} - -type fakeUserInternalAPI struct { - api.UserInternalAPI -} - -func (s *fakeUserInternalAPI) QueryPushRules(ctx context.Context, req *api.QueryPushRulesRequest, res *api.QueryPushRulesResponse) error { - localpart, _, err := gomatrixserverlib.SplitID('@', req.UserID) - if err != nil { - return err - } - res.RuleSets = pushrules.DefaultAccountRuleSets(localpart, "example.org") - return nil -} - -type fakePushGatewayClient struct { - pushgateway.Client - - WG *sync.WaitGroup - Reqs []*pushgateway.NotifyRequest -} - -func (c *fakePushGatewayClient) Notify(ctx context.Context, url string, req *pushgateway.NotifyRequest, res *pushgateway.NotifyResponse) error { - c.Reqs = append(c.Reqs, req) - if c.WG != nil { - c.WG.Done() - } - *res = pushgateway.NotifyResponse{ - Rejected: []string{ - "apushkey", - }, - } - return nil -} - -func mustParseClientEventJSON(s string) gomatrixserverlib.ClientEvent { - var ev gomatrixserverlib.ClientEvent - if err := json.Unmarshal([]byte(s), &ev); err != nil { - panic(err) - } - return ev -} - -func mustParseHeaderedEventJSON(s string) *gomatrixserverlib.HeaderedEvent { - var ev gomatrixserverlib.HeaderedEvent - if err := json.Unmarshal([]byte(s), &ev); err != nil { - panic(err) - } - return &ev -} - -type fakeMessageSender struct { - Messages []*nats.Msg -} - -func (s *fakeMessageSender) PublishMsg(msg *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error) { - s.Messages = append(s.Messages, msg) - return nil, nil -} diff --git a/userapi/consumers/syncapi_readupdate.go b/userapi/consumers/syncapi_readupdate.go new file mode 100644 index 000000000..2e58020b4 --- /dev/null +++ b/userapi/consumers/syncapi_readupdate.go @@ -0,0 +1,136 @@ +package consumers + +import ( + "context" + "encoding/json" + + "github.com/matrix-org/dendrite/internal/pushgateway" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/dendrite/syncapi/types" + uapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/dendrite/userapi/producers" + "github.com/matrix-org/dendrite/userapi/storage" + "github.com/matrix-org/dendrite/userapi/util" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" +) + +type OutputReadUpdateConsumer struct { + ctx context.Context + cfg *config.UserAPI + jetstream nats.JetStreamContext + durable string + db storage.Database + pgClient pushgateway.Client + ServerName gomatrixserverlib.ServerName + topic string + userAPI uapi.UserInternalAPI + syncProducer *producers.SyncAPI +} + +func NewOutputReadUpdateConsumer( + process *process.ProcessContext, + cfg *config.UserAPI, + js nats.JetStreamContext, + store storage.Database, + pgClient pushgateway.Client, + userAPI uapi.UserInternalAPI, + syncProducer *producers.SyncAPI, +) *OutputReadUpdateConsumer { + return &OutputReadUpdateConsumer{ + ctx: process.Context(), + cfg: cfg, + jetstream: js, + db: store, + ServerName: cfg.Matrix.ServerName, + durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIReadUpdateConsumer"), + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate), + pgClient: pgClient, + userAPI: userAPI, + syncProducer: syncProducer, + } +} + +func (s *OutputReadUpdateConsumer) Start() error { + if err := jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), + ); err != nil { + return err + } + return nil +} + +func (s *OutputReadUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { + var read types.ReadUpdate + if err := json.Unmarshal(msg.Data, &read); err != nil { + log.WithError(err).Error("userapi clientapi consumer: message parse failure") + return true + } + if read.FullyRead == 0 && read.Read == 0 { + return true + } + + userID := string(msg.Header.Get(jetstream.UserID)) + roomID := string(msg.Header.Get(jetstream.RoomID)) + + localpart, domain, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + log.WithError(err).Error("userapi clientapi consumer: SplitID failure") + return true + } + if domain != s.ServerName { + log.Error("userapi clientapi consumer: not a local user") + return true + } + + log := log.WithFields(log.Fields{ + "room_id": roomID, + "user_id": userID, + }) + log.Tracef("Received read update from sync API: %#v", read) + + if read.Read > 0 { + updated, err := s.db.SetNotificationsRead(ctx, localpart, roomID, int64(read.Read), true) + if err != nil { + log.WithError(err).Error("userapi EDU consumer") + return false + } + + if updated { + if err = s.syncProducer.GetAndSendNotificationData(ctx, userID, roomID); err != nil { + log.WithError(err).Error("userapi EDU consumer: GetAndSendNotificationData failed") + return false + } + if err = util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil { + log.WithError(err).Error("userapi EDU consumer: NotifyUserCounts failed") + return false + } + } + } + + if read.FullyRead > 0 { + deleted, err := s.db.DeleteNotificationsUpTo(ctx, localpart, roomID, int64(read.FullyRead)) + if err != nil { + log.WithError(err).Errorf("userapi clientapi consumer: DeleteNotificationsUpTo failed") + return false + } + + if deleted { + if err := util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil { + log.WithError(err).Error("userapi clientapi consumer: NotifyUserCounts failed") + return false + } + + if err := s.syncProducer.GetAndSendNotificationData(ctx, userID, read.RoomID); err != nil { + log.WithError(err).Errorf("userapi clientapi consumer: GetAndSendNotificationData failed") + return false + } + } + } + + return true +} diff --git a/userapi/consumers/roomserver.go b/userapi/consumers/syncapi_streamevent.go similarity index 83% rename from userapi/consumers/roomserver.go rename to userapi/consumers/syncapi_streamevent.go index 8617c6282..110813274 100644 --- a/userapi/consumers/roomserver.go +++ b/userapi/consumers/syncapi_streamevent.go @@ -14,6 +14,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/producers" "github.com/matrix-org/dendrite/userapi/storage" @@ -24,7 +25,7 @@ import ( log "github.com/sirupsen/logrus" ) -type OutputRoomEventConsumer struct { +type OutputStreamEventConsumer struct { ctx context.Context cfg *config.UserAPI userAPI api.UserInternalAPI @@ -37,7 +38,7 @@ type OutputRoomEventConsumer struct { syncProducer *producers.SyncAPI } -func NewOutputRoomEventConsumer( +func NewOutputStreamEventConsumer( process *process.ProcessContext, cfg *config.UserAPI, js nats.JetStreamContext, @@ -46,14 +47,14 @@ func NewOutputRoomEventConsumer( userAPI api.UserInternalAPI, rsAPI rsapi.RoomserverInternalAPI, syncProducer *producers.SyncAPI, -) *OutputRoomEventConsumer { - return &OutputRoomEventConsumer{ +) *OutputStreamEventConsumer { + return &OutputStreamEventConsumer{ ctx: process.Context(), cfg: cfg, jetstream: js, db: store, - durable: cfg.Matrix.JetStream.Durable("UserAPIRoomServerConsumer"), - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), + durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"), + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent), pgClient: pgClient, userAPI: userAPI, rsAPI: rsAPI, @@ -61,7 +62,7 @@ func NewOutputRoomEventConsumer( } } -func (s *OutputRoomEventConsumer) Start() error { +func (s *OutputStreamEventConsumer) Start() error { if err := jetstream.JetStreamConsumer( s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, nats.DeliverAll(), nats.ManualAck(), @@ -71,48 +72,34 @@ func (s *OutputRoomEventConsumer) Start() error { return nil } -func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { - var output rsapi.OutputEvent +func (s *OutputStreamEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { + var output types.StreamedEvent + output.Event = &gomatrixserverlib.HeaderedEvent{} if err := json.Unmarshal(msg.Data, &output); err != nil { - log.WithError(err).Errorf("pushserver consumer: message parse failure") + log.WithError(err).Errorf("userapi consumer: message parse failure") + return true + } + if output.Event.Event == nil { + log.Errorf("userapi consumer: expected event") return true } log.WithFields(log.Fields{ - "event_type": output.Type, - }).Tracef("Received message from room server: %#v", output) + "event_id": output.Event.EventID(), + "event_type": output.Event.Type(), + "stream_pos": output.StreamPosition, + }).Tracef("Received message from sync API: %#v", output) - switch output.Type { - case rsapi.OutputTypeNewRoomEvent: - ev := output.NewRoomEvent.Event - if err := s.processMessage(ctx, output.NewRoomEvent.Event); err != nil { - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "event": string(ev.JSON()), - }).WithError(err).Errorf("pushserver consumer: process room event failure") - } - - case rsapi.OutputTypeNewInviteEvent: - ev := output.NewInviteEvent.Event - if err := s.processMessage(ctx, output.NewInviteEvent.Event); err != nil { - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "event": string(ev.JSON()), - }).WithError(err).Errorf("pushserver consumer: process invite event failure") - } - - default: - // Ignore old events, peeks, so on. + if err := s.processMessage(ctx, output.Event, int64(output.StreamPosition)); err != nil { + log.WithFields(log.Fields{ + "event_id": output.Event.EventID(), + }).WithError(err).Errorf("userapi consumer: process room event failure") } return true } -func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error { - log.WithFields(log.Fields{ - "event_type": event.Type(), - }).Tracef("Received event from room server: %#v", event) - +func (s *OutputStreamEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, pos int64) error { members, roomSize, err := s.localRoomMembers(ctx, event.RoomID()) if err != nil { return fmt.Errorf("s.localRoomMembers: %w", err) @@ -152,7 +139,7 @@ func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gom // removing it means we can send all notifications to // e.g. Element's Push gateway in one go. for _, mem := range members { - if err := s.notifyLocal(ctx, event, mem, roomSize, roomName); err != nil { + if err := s.notifyLocal(ctx, event, pos, mem, roomSize, roomName); err != nil { log.WithFields(log.Fields{ "localpart": mem.Localpart, }).WithError(err).Debugf("Unable to push to local user") @@ -193,7 +180,7 @@ func newLocalMembership(event *gomatrixserverlib.ClientEvent) (*localMembership, // localRoomMembers fetches the current local members of a room, and // the total number of members. -func (s *OutputRoomEventConsumer) localRoomMembers(ctx context.Context, roomID string) ([]*localMembership, int, error) { +func (s *OutputStreamEventConsumer) localRoomMembers(ctx context.Context, roomID string) ([]*localMembership, int, error) { req := &rsapi.QueryMembershipsForRoomRequest{ RoomID: roomID, JoinedOnly: true, @@ -232,7 +219,7 @@ func (s *OutputRoomEventConsumer) localRoomMembers(ctx context.Context, roomID s // looks it up in roomserver. If there is no name, // m.room.canonical_alias is consulted. Returns an empty string if the // room has no name. -func (s *OutputRoomEventConsumer) roomName(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) (string, error) { +func (s *OutputStreamEventConsumer) roomName(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) (string, error) { if event.Type() == gomatrixserverlib.MRoomName { name, err := unmarshalRoomName(event) if err != nil { @@ -300,7 +287,7 @@ func unmarshalCanonicalAlias(event *gomatrixserverlib.HeaderedEvent) (string, er } // notifyLocal finds the right push actions for a local user, given an event. -func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, mem *localMembership, roomSize int, roomName string) error { +func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, pos int64, mem *localMembership, roomSize int, roomName string) error { actions, err := s.evaluatePushRules(ctx, event, mem, roomSize) if err != nil { return err @@ -338,7 +325,7 @@ func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *gomatr RoomID: event.RoomID(), TS: gomatrixserverlib.AsTimestamp(time.Now()), } - if err = s.db.InsertNotification(ctx, mem.Localpart, event.EventID(), tweaks, n); err != nil { + if err = s.db.InsertNotification(ctx, mem.Localpart, event.EventID(), pos, tweaks, n); err != nil { return err } @@ -409,7 +396,7 @@ func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *gomatr // evaluatePushRules fetches and evaluates the push rules of a local // user. Returns actions (including dont_notify). -func (s *OutputRoomEventConsumer) evaluatePushRules(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, mem *localMembership, roomSize int) ([]*pushrules.Action, error) { +func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, mem *localMembership, roomSize int) ([]*pushrules.Action, error) { if event.Sender() == mem.UserID { // SPEC: Homeservers MUST NOT notify the Push Gateway for // events that the user has sent themselves. @@ -488,7 +475,7 @@ func (rse *ruleSetEvalContext) HasPowerLevel(userID, levelKey string) (bool, err // localPushDevices pushes to the configured devices of a local // user. The map keys are [url][format]. -func (s *OutputRoomEventConsumer) localPushDevices(ctx context.Context, localpart string, tweaks map[string]interface{}) (map[string]map[string][]*pushgateway.Device, string, error) { +func (s *OutputStreamEventConsumer) localPushDevices(ctx context.Context, localpart string, tweaks map[string]interface{}) (map[string]map[string][]*pushgateway.Device, string, error) { pusherDevices, err := util.GetPushDevices(ctx, localpart, tweaks, s.db) if err != nil { return nil, "", err @@ -512,7 +499,7 @@ func (s *OutputRoomEventConsumer) localPushDevices(ctx context.Context, localpar } // notifyHTTP performs a notificatation to a Push Gateway. -func (s *OutputRoomEventConsumer) notifyHTTP(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, url, format string, devices []*pushgateway.Device, localpart, roomName string, userNumUnreadNotifs int) ([]*pushgateway.Device, error) { +func (s *OutputStreamEventConsumer) notifyHTTP(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, url, format string, devices []*pushgateway.Device, localpart, roomName string, userNumUnreadNotifs int) ([]*pushgateway.Device, error) { logger := log.WithFields(log.Fields{ "event_id": event.EventID(), "url": url, @@ -556,12 +543,13 @@ func (s *OutputRoomEventConsumer) notifyHTTP(ctx context.Context, event *gomatri } } - logger.Debugf("Notifying HTTP push gateway") + logger.Debugf("Notifying push gateway %s", url) var res pushgateway.NotifyResponse if err := s.pgClient.Notify(ctx, url, &req, &res); err != nil { + logger.WithError(err).Errorf("Failed to notify push gateway %s", url) return nil, err } - logger.WithField("num_rejected", len(res.Rejected)).Tracef("HTTP push gateway result") + logger.WithField("num_rejected", len(res.Rejected)).Tracef("Push gateway result") if len(res.Rejected) == 0 { return nil, nil @@ -583,12 +571,12 @@ func (s *OutputRoomEventConsumer) notifyHTTP(ctx context.Context, event *gomatri } // deleteRejectedPushers deletes the pushers associated with the given devices. -func (s *OutputRoomEventConsumer) deleteRejectedPushers(ctx context.Context, devices []*pushgateway.Device, localpart string) { +func (s *OutputStreamEventConsumer) deleteRejectedPushers(ctx context.Context, devices []*pushgateway.Device, localpart string) { log.WithFields(log.Fields{ "localpart": localpart, "app_id0": devices[0].AppID, "num_devices": len(devices), - }).Infof("Deleting pushers rejected by the HTTP push gateway") + }).Warnf("Deleting pushers rejected by the HTTP push gateway") for _, d := range devices { if err := s.db.RemovePusher(ctx, d.AppID, d.PushKey, localpart); err != nil { diff --git a/userapi/storage/interface.go b/userapi/storage/interface.go index 75b286c42..6d22fea9d 100644 --- a/userapi/storage/interface.go +++ b/userapi/storage/interface.go @@ -91,9 +91,9 @@ type Database interface { // May return sql.ErrNoRows. GetLoginTokenDataByToken(ctx context.Context, token string) (*api.LoginTokenData, error) - InsertNotification(ctx context.Context, localpart, eventID string, tweaks map[string]interface{}, n *api.Notification) error - DeleteNotificationsUpTo(ctx context.Context, localpart, roomID, upToEventID string) (affected bool, err error) - SetNotificationsRead(ctx context.Context, localpart, roomID, upToEventID string, b bool) (affected bool, err error) + InsertNotification(ctx context.Context, localpart, eventID string, pos int64, tweaks map[string]interface{}, n *api.Notification) error + DeleteNotificationsUpTo(ctx context.Context, localpart, roomID string, pos int64) (affected bool, err error) + SetNotificationsRead(ctx context.Context, localpart, roomID string, pos int64, b bool) (affected bool, err error) GetNotifications(ctx context.Context, localpart string, fromID int64, limit int, filter tables.NotificationFilter) ([]*api.Notification, int64, error) GetNotificationCount(ctx context.Context, localpart string, filter tables.NotificationFilter) (int64, error) GetRoomNotificationCounts(ctx context.Context, localpart, roomID string) (total int64, highlight int64, _ error) diff --git a/userapi/storage/postgres/notifications_table.go b/userapi/storage/postgres/notifications_table.go index 678f9191b..7bcc0f9cd 100644 --- a/userapi/storage/postgres/notifications_table.go +++ b/userapi/storage/postgres/notifications_table.go @@ -42,6 +42,7 @@ CREATE TABLE IF NOT EXISTS userapi_notifications ( localpart TEXT NOT NULL, room_id TEXT NOT NULL, event_id TEXT NOT NULL, + stream_pos BIGINT NOT NULL, ts_ms BIGINT NOT NULL, highlight BOOLEAN NOT NULL, notification_json TEXT NOT NULL, @@ -54,17 +55,13 @@ CREATE INDEX IF NOT EXISTS userapi_notification_localpart_id_idx ON userapi_noti ` const insertNotificationSQL = "" + - "INSERT INTO userapi_notifications (localpart, room_id, event_id, ts_ms, highlight, notification_json) VALUES ($1, $2, $3, $4, $5, $6)" + "INSERT INTO userapi_notifications (localpart, room_id, event_id, stream_pos, ts_ms, highlight, notification_json) VALUES ($1, $2, $3, $4, $5, $6, $7)" const deleteNotificationsUpToSQL = "" + - "DELETE FROM userapi_notifications WHERE localpart = $1 AND room_id = $2 AND id <= (" + - "SELECT MAX(id) FROM userapi_notifications WHERE localpart = $1 AND room_id = $2 AND event_id = $3" + - ")" + "DELETE FROM userapi_notifications WHERE localpart = $1 AND room_id = $2 AND stream_pos <= $3" const updateNotificationReadSQL = "" + - "UPDATE userapi_notifications SET read = $1 WHERE localpart = $2 AND room_id = $3 AND id <= (" + - "SELECT MAX(id) FROM userapi_notifications WHERE localpart = $2 AND room_id = $3 AND event_id = $4" + - ") AND read <> $1" + "UPDATE userapi_notifications SET read = $1 WHERE localpart = $2 AND room_id = $3 AND stream_pos <= $4 AND read <> $1" const selectNotificationSQL = "" + "SELECT id, room_id, ts_ms, read, notification_json FROM userapi_notifications WHERE localpart = $1 AND id > $2 AND (" + @@ -97,7 +94,7 @@ func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error) } // Insert inserts a notification into the database. -func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, highlight bool, n *api.Notification) error { +func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error { roomID, tsMS := n.RoomID, n.TS nn := *n // Clears out fields that have their own columns to (1) shrink the @@ -108,13 +105,13 @@ func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, local if err != nil { return err } - _, err = sqlutil.TxStmt(txn, s.insertStmt).ExecContext(ctx, localpart, roomID, eventID, tsMS, highlight, string(bs)) + _, err = sqlutil.TxStmt(txn, s.insertStmt).ExecContext(ctx, localpart, roomID, eventID, pos, tsMS, highlight, string(bs)) return err } // DeleteUpTo deletes all previous notifications, up to and including the event. -func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string) (affected bool, _ error) { - res, err := sqlutil.TxStmt(txn, s.deleteUpToStmt).ExecContext(ctx, localpart, roomID, eventID) +func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error) { + res, err := sqlutil.TxStmt(txn, s.deleteUpToStmt).ExecContext(ctx, localpart, roomID, pos) if err != nil { return false, err } @@ -122,13 +119,13 @@ func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, l if err != nil { return true, err } - log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "event_id": eventID}).Tracef("DeleteUpTo: %d rows affected", nrows) + log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "stream_pos": pos}).Tracef("DeleteUpTo: %d rows affected", nrows) return nrows > 0, nil } // UpdateRead updates the "read" value for an event. -func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string, v bool) (affected bool, _ error) { - res, err := sqlutil.TxStmt(txn, s.updateReadStmt).ExecContext(ctx, v, localpart, roomID, eventID) +func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error) { + res, err := sqlutil.TxStmt(txn, s.updateReadStmt).ExecContext(ctx, v, localpart, roomID, pos) if err != nil { return false, err } @@ -136,7 +133,7 @@ func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, l if err != nil { return true, err } - log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "event_id": eventID}).Tracef("UpdateRead: %d rows affected", nrows) + log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "stream_pos": pos}).Tracef("UpdateRead: %d rows affected", nrows) return nrows > 0, nil } diff --git a/userapi/storage/shared/storage.go b/userapi/storage/shared/storage.go index 9a899d5e0..a58974b41 100644 --- a/userapi/storage/shared/storage.go +++ b/userapi/storage/shared/storage.go @@ -671,23 +671,23 @@ func (d *Database) GetLoginTokenDataByToken(ctx context.Context, token string) ( return d.LoginTokens.SelectLoginToken(ctx, token) } -func (d *Database) InsertNotification(ctx context.Context, localpart, eventID string, tweaks map[string]interface{}, n *api.Notification) error { +func (d *Database) InsertNotification(ctx context.Context, localpart, eventID string, pos int64, tweaks map[string]interface{}, n *api.Notification) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - return d.Notifications.Insert(ctx, txn, localpart, eventID, pushrules.BoolTweakOr(tweaks, pushrules.HighlightTweak, false), n) + return d.Notifications.Insert(ctx, txn, localpart, eventID, pos, pushrules.BoolTweakOr(tweaks, pushrules.HighlightTweak, false), n) }) } -func (d *Database) DeleteNotificationsUpTo(ctx context.Context, localpart, roomID, upToEventID string) (affected bool, err error) { +func (d *Database) DeleteNotificationsUpTo(ctx context.Context, localpart, roomID string, pos int64) (affected bool, err error) { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - affected, err = d.Notifications.DeleteUpTo(ctx, txn, localpart, roomID, upToEventID) + affected, err = d.Notifications.DeleteUpTo(ctx, txn, localpart, roomID, pos) return err }) return } -func (d *Database) SetNotificationsRead(ctx context.Context, localpart, roomID, upToEventID string, b bool) (affected bool, err error) { +func (d *Database) SetNotificationsRead(ctx context.Context, localpart, roomID string, pos int64, b bool) (affected bool, err error) { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - affected, err = d.Notifications.UpdateRead(ctx, txn, localpart, roomID, upToEventID, b) + affected, err = d.Notifications.UpdateRead(ctx, txn, localpart, roomID, pos, b) return err }) return diff --git a/userapi/storage/sqlite3/notifications_table.go b/userapi/storage/sqlite3/notifications_table.go index 572daa68c..fcfb1aadc 100644 --- a/userapi/storage/sqlite3/notifications_table.go +++ b/userapi/storage/sqlite3/notifications_table.go @@ -42,6 +42,7 @@ CREATE TABLE IF NOT EXISTS userapi_notifications ( localpart TEXT NOT NULL, room_id TEXT NOT NULL, event_id TEXT NOT NULL, + stream_pos BIGINT NOT NULL, ts_ms BIGINT NOT NULL, highlight BOOLEAN NOT NULL, notification_json TEXT NOT NULL, @@ -54,17 +55,13 @@ CREATE INDEX IF NOT EXISTS userapi_notification_localpart_id_idx ON userapi_noti ` const insertNotificationSQL = "" + - "INSERT INTO userapi_notifications (localpart, room_id, event_id, ts_ms, highlight, notification_json) VALUES ($1, $2, $3, $4, $5, $6)" + "INSERT INTO userapi_notifications (localpart, room_id, event_id, stream_pos, ts_ms, highlight, notification_json) VALUES ($1, $2, $3, $4, $5, $6, $7)" const deleteNotificationsUpToSQL = "" + - "DELETE FROM userapi_notifications WHERE localpart = $1 AND room_id = $2 AND id <= (" + - "SELECT MAX(id) FROM userapi_notifications WHERE localpart = $1 AND room_id = $2 AND event_id = $3" + - ")" + "DELETE FROM userapi_notifications WHERE localpart = $1 AND room_id = $2 AND stream_pos <= $3" const updateNotificationReadSQL = "" + - "UPDATE userapi_notifications SET read = $1 WHERE localpart = $2 AND room_id = $3 AND id <= (" + - "SELECT MAX(id) FROM userapi_notifications WHERE localpart = $2 AND room_id = $3 AND event_id = $4" + - ") AND read <> $1" + "UPDATE userapi_notifications SET read = $1 WHERE localpart = $2 AND room_id = $3 AND stream_pos <= $4 AND read <> $1" const selectNotificationSQL = "" + "SELECT id, room_id, ts_ms, read, notification_json FROM userapi_notifications WHERE localpart = $1 AND id > $2 AND (" + @@ -97,7 +94,7 @@ func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) { } // Insert inserts a notification into the database. -func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, highlight bool, n *api.Notification) error { +func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error { roomID, tsMS := n.RoomID, n.TS nn := *n // Clears out fields that have their own columns to (1) shrink the @@ -108,13 +105,13 @@ func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, local if err != nil { return err } - _, err = sqlutil.TxStmt(txn, s.insertStmt).ExecContext(ctx, localpart, roomID, eventID, tsMS, highlight, string(bs)) + _, err = sqlutil.TxStmt(txn, s.insertStmt).ExecContext(ctx, localpart, roomID, eventID, pos, tsMS, highlight, string(bs)) return err } // DeleteUpTo deletes all previous notifications, up to and including the event. -func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string) (affected bool, _ error) { - res, err := sqlutil.TxStmt(txn, s.deleteUpToStmt).ExecContext(ctx, localpart, roomID, eventID) +func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error) { + res, err := sqlutil.TxStmt(txn, s.deleteUpToStmt).ExecContext(ctx, localpart, roomID, pos) if err != nil { return false, err } @@ -122,13 +119,13 @@ func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, l if err != nil { return true, err } - log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "event_id": eventID}).Tracef("DeleteUpTo: %d rows affected", nrows) + log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "stream_pos": pos}).Tracef("DeleteUpTo: %d rows affected", nrows) return nrows > 0, nil } // UpdateRead updates the "read" value for an event. -func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string, v bool) (affected bool, _ error) { - res, err := sqlutil.TxStmt(txn, s.updateReadStmt).ExecContext(ctx, v, localpart, roomID, eventID) +func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error) { + res, err := sqlutil.TxStmt(txn, s.updateReadStmt).ExecContext(ctx, v, localpart, roomID, pos) if err != nil { return false, err } @@ -136,7 +133,7 @@ func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, l if err != nil { return true, err } - log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "event_id": eventID}).Tracef("UpdateRead: %d rows affected", nrows) + log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "stream_pos": pos}).Tracef("UpdateRead: %d rows affected", nrows) return nrows > 0, nil } diff --git a/userapi/storage/tables/interface.go b/userapi/storage/tables/interface.go index 994e8974c..815e51193 100644 --- a/userapi/storage/tables/interface.go +++ b/userapi/storage/tables/interface.go @@ -103,9 +103,9 @@ type PusherTable interface { } type NotificationTable interface { - Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, highlight bool, n *api.Notification) error - DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string) (affected bool, _ error) - UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string, v bool) (affected bool, _ error) + Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error + DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error) + UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error) Select(ctx context.Context, txn *sql.Tx, localpart string, fromID int64, limit int, filter NotificationFilter) ([]*api.Notification, int64, error) SelectCount(ctx context.Context, txn *sql.Tx, localpart string, filter NotificationFilter) (int64, error) SelectRoomCounts(ctx context.Context, txn *sql.Tx, localpart, roomID string) (total int64, highlight int64, _ error) diff --git a/userapi/userapi.go b/userapi/userapi.go index 326cd5eac..2382e9512 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -72,25 +72,18 @@ func NewInternalAPI( DisableTLSValidation: cfg.PushGatewayDisableTLSValidation, } - caConsumer := consumers.NewOutputClientDataConsumer( + readConsumer := consumers.NewOutputReadUpdateConsumer( base.ProcessContext, cfg, js, db, pgClient, userAPI, syncProducer, ) - if err := caConsumer.Start(); err != nil { - logrus.WithError(err).Panic("failed to start user API clientapi consumer") + if err := readConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start user API read update consumer") } - eduConsumer := consumers.NewOutputReceiptEventConsumer( - base.ProcessContext, cfg, js, db, pgClient, syncProducer, - ) - if err := eduConsumer.Start(); err != nil { - logrus.WithError(err).Panic("failed to start user API EDU consumer") - } - - rsConsumer := consumers.NewOutputRoomEventConsumer( + eventConsumer := consumers.NewOutputStreamEventConsumer( base.ProcessContext, cfg, js, db, pgClient, userAPI, rsAPI, syncProducer, ) - if err := rsConsumer.Start(); err != nil { - logrus.WithError(err).Panic("failed to start user API room server consumer") + if err := eventConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start user API streamed event consumer") } return userAPI