From b404ed5c44e95d20bc2bd81a1c0afa684d6f6a09 Mon Sep 17 00:00:00 2001 From: S7evinK Date: Thu, 31 Mar 2022 08:10:16 +0200 Subject: [PATCH] Missing files --- clientapi/routing/presence.go | 128 +++++++++++++++ syncapi/consumers/presence.go | 136 ++++++++++++++++ syncapi/storage/postgres/presence_table.go | 165 +++++++++++++++++++ syncapi/storage/sqlite3/presence_table.go | 180 +++++++++++++++++++++ syncapi/streams/stream_presence.go | 155 ++++++++++++++++++ syncapi/sync/requestpool_test.go | 104 ++++++++++++ 6 files changed, 868 insertions(+) create mode 100644 clientapi/routing/presence.go create mode 100644 syncapi/consumers/presence.go create mode 100644 syncapi/storage/postgres/presence_table.go create mode 100644 syncapi/storage/sqlite3/presence_table.go create mode 100644 syncapi/streams/stream_presence.go create mode 100644 syncapi/sync/requestpool_test.go diff --git a/clientapi/routing/presence.go b/clientapi/routing/presence.go new file mode 100644 index 000000000..261797887 --- /dev/null +++ b/clientapi/routing/presence.go @@ -0,0 +1,128 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" +) + +type presenceReq struct { + Presence string `json:"presence"` + StatusMsg *string `json:"status_msg,omitempty"` +} + +func SetPresence( + req *http.Request, + device *api.Device, + producer *producers.SyncAPIProducer, + userID string, +) util.JSONResponse { + if device.UserID != userID { + return util.JSONResponse{ + Code: http.StatusForbidden, + JSON: jsonerror.Forbidden("Unable to set presence for other user."), + } + } + var presence presenceReq + parseErr := httputil.UnmarshalJSONRequest(req, &presence) + if parseErr != nil { + return *parseErr + } + p := strings.ToLower(presence.Presence) + if _, ok := types.PresenceToInt[p]; !ok { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.Unknown(fmt.Sprintf("Unknown presence '%s'.", p)), + } + } + + err := producer.SendPresence(req.Context(), userID, presence.Presence, presence.StatusMsg) + if err != nil { + log.WithError(err).Errorf("failed to update presence") + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: jsonerror.InternalServerError(), + } + } + + return util.JSONResponse{ + Code: http.StatusOK, + JSON: struct{}{}, + } +} + +func GetPresence( + req *http.Request, + device *api.Device, + natsClient *nats.Conn, + presenceTopic string, + userID string, +) util.JSONResponse { + msg := nats.NewMsg(presenceTopic) + msg.Header.Set(jetstream.UserID, userID) + + presence, err := natsClient.RequestMsg(msg, time.Second*10) + if err != nil { + log.WithError(err).Errorf("unable to get presence") + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: jsonerror.InternalServerError(), + } + } + + statusMsg := presence.Header.Get("status_msg") + e := presence.Header.Get("error") + if e != "" { + return util.JSONResponse{ + Code: http.StatusOK, + JSON: types.PresenceClientResponse{ + Presence: "unavailable", + }, + } + } + lastActive, err := strconv.Atoi(presence.Header.Get("last_active_ts")) + if err != nil { + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: jsonerror.InternalServerError(), + } + } + + lastActiveTS := gomatrixserverlib.Timestamp(lastActive) + return util.JSONResponse{ + Code: http.StatusOK, + JSON: types.PresenceClientResponse{ + CurrentlyActive: time.Since(lastActiveTS.Time()).Minutes() < 5, + LastActiveAgo: time.Since(lastActiveTS.Time()).Milliseconds(), + Presence: presence.Header.Get("presence"), + StatusMsg: &statusMsg, + }, + } +} diff --git a/syncapi/consumers/presence.go b/syncapi/consumers/presence.go new file mode 100644 index 000000000..0f07076e6 --- /dev/null +++ b/syncapi/consumers/presence.go @@ -0,0 +1,136 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "strconv" + + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" +) + +// OutputTypingEventConsumer consumes events that originated in the EDU server. +type PresenceConsumer struct { + ctx context.Context + jetstream nats.JetStreamContext + nats *nats.Conn + durable string + requestTopic string + presenceTopic string + db storage.Database + stream types.StreamProvider + notifier *notifier.Notifier + rsAPI api.RoomserverInternalAPI +} + +// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. +// Call Start() to begin consuming from the EDU server. +func NewPresenceConsumer( + process *process.ProcessContext, + cfg *config.SyncAPI, + js nats.JetStreamContext, + nats *nats.Conn, + db storage.Database, + notifier *notifier.Notifier, + stream types.StreamProvider, + rsAPI api.RoomserverInternalAPI, +) *PresenceConsumer { + return &PresenceConsumer{ + ctx: process.Context(), + nats: nats, + jetstream: js, + durable: cfg.Matrix.JetStream.Durable("SyncAPIPresenceConsumer"), + presenceTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), + requestTopic: cfg.Matrix.JetStream.Prefixed(jetstream.RequestPresence), + db: db, + notifier: notifier, + stream: stream, + rsAPI: rsAPI, + } +} + +// Start consuming typing events. +func (s *PresenceConsumer) Start() error { + // Normal NATS subscription, used by Request/Reply + _, err := s.nats.Subscribe(s.requestTopic, func(msg *nats.Msg) { + presence, err := s.db.GetPresence(context.Background(), msg.Header.Get(jetstream.UserID)) + m := &nats.Msg{ + Header: nats.Header{}, + } + if err != nil { + m.Header.Set("error", err.Error()) + if err = msg.RespondMsg(m); err != nil { + return + } + return + } + + m.Header.Set(jetstream.UserID, presence.UserID) + m.Header.Set("presence", presence.ClientFields.Presence) + m.Header.Set("status_msg", *presence.ClientFields.StatusMsg) + m.Header.Set("last_active_ts", strconv.Itoa(int(presence.LastActiveTS))) + + if err = msg.RespondMsg(m); err != nil { + return + } + }) + if err != nil { + return err + } + return jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.presenceTopic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(), + ) +} + +func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { + userID := msg.Header.Get(jetstream.UserID) + presence := msg.Header.Get("presence") + statusMsg := msg.Header.Get("status_msg") + timestamp := msg.Header.Get("last_active_ts") + fromSync, _ := strconv.ParseBool(msg.Header.Get("from_sync")) + nilStatusMsg, _ := strconv.ParseBool(msg.Header.Get("status_msg_nil")) + + logrus.Debugf("syncAPI received presence event: %+v", msg.Header) + + ts, err := strconv.Atoi(timestamp) + if err != nil { + return true + } + + newStatusMsg := &statusMsg + if nilStatusMsg { + newStatusMsg = nil + } + + pos, err := s.db.UpdatePresence(ctx, userID, presence, newStatusMsg, gomatrixserverlib.Timestamp(ts), fromSync) + if err != nil { + return true + } + + s.stream.Advance(pos) + s.notifier.OnNewPresence(types.StreamingToken{PresencePosition: pos}, userID) + + return true +} diff --git a/syncapi/storage/postgres/presence_table.go b/syncapi/storage/postgres/presence_table.go new file mode 100644 index 000000000..135cba052 --- /dev/null +++ b/syncapi/storage/postgres/presence_table.go @@ -0,0 +1,165 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +const presenceSchema = ` +CREATE SEQUENCE IF NOT EXISTS syncapi_presence_id; +-- Stores data about presence +CREATE TABLE IF NOT EXISTS syncapi_presence ( + -- The ID + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_presence_id'), + -- The Matrix user ID + user_id TEXT NOT NULL, + -- The actual presence + presence INT NOT NULL, + -- The status message + status_msg TEXT, + -- The last time an action was received by this user + last_active_ts BIGINT NOT NULL, + CONSTRAINT presence_presences_unique UNIQUE (user_id) +); +CREATE INDEX IF NOT EXISTS syncapi_presence_user_id ON syncapi_presence(user_id); +` + +const upsertPresenceSQL = "" + + "INSERT INTO syncapi_presence AS p" + + " (user_id, presence, status_msg, last_active_ts)" + + " VALUES ($1, $2, $3, $4)" + + " ON CONFLICT (user_id)" + + " DO UPDATE SET id = nextval('syncapi_presence_id')," + + " presence = $2, status_msg = COALESCE($3, p.status_msg), last_active_ts = $4" + + " RETURNING id" + +const upsertPresenceFromSyncSQL = "" + + "INSERT INTO syncapi_presence AS p" + + " (user_id, presence, last_active_ts)" + + " VALUES ($1, $2, $3)" + + " ON CONFLICT (user_id)" + + " DO UPDATE SET id = nextval('syncapi_presence_id')," + + " presence = $2, last_active_ts = $3" + + " RETURNING id" + +const selectPresenceForUserSQL = "" + + "SELECT presence, status_msg, last_active_ts" + + " FROM syncapi_presence" + + " WHERE user_id = $1 LIMIT 1" + +const selectMaxPresenceSQL = "" + + "SELECT COALESCE(MAX(id), 0) FROM syncapi_presence" + +const selectPresenceAfter = "" + + " SELECT id, user_id, presence, status_msg, last_active_ts" + + " FROM syncapi_presence" + + " WHERE id > $1" + +type presenceStatements struct { + upsertPresenceStmt *sql.Stmt + upsertPresenceFromSyncStmt *sql.Stmt + selectPresenceForUsersStmt *sql.Stmt + selectMaxPresenceStmt *sql.Stmt + selectPresenceAfterStmt *sql.Stmt +} + +func NewPostgresPresenceTable(db *sql.DB) (*presenceStatements, error) { + _, err := db.Exec(presenceSchema) + if err != nil { + return nil, err + } + s := &presenceStatements{} + return s, sqlutil.StatementList{ + {&s.upsertPresenceStmt, upsertPresenceSQL}, + {&s.upsertPresenceFromSyncStmt, upsertPresenceFromSyncSQL}, + {&s.selectPresenceForUsersStmt, selectPresenceForUserSQL}, + {&s.selectMaxPresenceStmt, selectMaxPresenceSQL}, + {&s.selectPresenceAfterStmt, selectPresenceAfter}, + }.Prepare(db) +} + +// UpsertPresence creates/updates a presence status. +func (p *presenceStatements) UpsertPresence( + ctx context.Context, + txn *sql.Tx, + userID string, + statusMsg *string, + presence string, + lastActiveTS gomatrixserverlib.Timestamp, + fromSync bool, +) (pos types.StreamPosition, err error) { + presenceStatusID := types.PresenceToInt[presence] + if fromSync { + stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt) + err = stmt.QueryRowContext(ctx, userID, presenceStatusID, lastActiveTS).Scan(&pos) + } else { + stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt) + err = stmt.QueryRowContext(ctx, userID, presenceStatusID, statusMsg, lastActiveTS).Scan(&pos) + } + return +} + +// GetPresenceForUser returns the current presence of a user. +func (p *presenceStatements) GetPresenceForUser( + ctx context.Context, txn *sql.Tx, + userID string, +) (*types.Presence, error) { + result := &types.Presence{ + UserID: userID, + } + stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt) + var presenceStatusID int + err := stmt.QueryRowContext(ctx, userID).Scan(&presenceStatusID, &result.ClientFields.StatusMsg, &result.LastActiveTS) + result.ClientFields.Presence = types.PresenceToString[presenceStatusID] + return result, err +} + +func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) { + stmt := sqlutil.TxStmt(txn, p.selectMaxPresenceStmt) + err = stmt.QueryRowContext(ctx).Scan(&pos) + return +} + +// GetPresenceAfter returns the changes presences after a given stream id +func (p *presenceStatements) GetPresenceAfter( + ctx context.Context, txn *sql.Tx, + after types.StreamPosition, +) (presences map[string]*types.Presence, err error) { + presences = make(map[string]*types.Presence) + stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt) + + rows, err := stmt.QueryContext(ctx, after) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows") + var presenceStatusID int + for rows.Next() { + presence := &types.Presence{} + if err := rows.Scan(&presence.StreamPos, &presence.UserID, &presenceStatusID, &presence.ClientFields.StatusMsg, &presence.LastActiveTS); err != nil { + return nil, err + } + presence.ClientFields.Presence = types.PresenceToString[presenceStatusID] + presences[presence.UserID] = presence + } + return presences, rows.Err() +} diff --git a/syncapi/storage/sqlite3/presence_table.go b/syncapi/storage/sqlite3/presence_table.go new file mode 100644 index 000000000..7510aa903 --- /dev/null +++ b/syncapi/storage/sqlite3/presence_table.go @@ -0,0 +1,180 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +const presenceSchema = ` +-- Stores data about presence +CREATE TABLE IF NOT EXISTS syncapi_presence ( + -- The ID + id BIGINT NOT NULL, + -- The Matrix user ID + user_id TEXT NOT NULL, + -- The actual presence + presence INT NOT NULL, + -- The status message + status_msg TEXT, + -- The last time an action was received by this user + last_active_ts BIGINT NOT NULL, + CONSTRAINT presence_presences_unique UNIQUE (user_id) +); +CREATE INDEX IF NOT EXISTS syncapi_presence_user_id ON syncapi_presence(user_id); +` + +const upsertPresenceSQL = "" + + "INSERT INTO syncapi_presence AS p" + + " (id, user_id, presence, status_msg, last_active_ts)" + + " VALUES ($1, $2, $3, $4, $5)" + + " ON CONFLICT (user_id)" + + " DO UPDATE SET id = $6, " + + " presence = $7, status_msg = COALESCE($8, p.status_msg), last_active_ts = $9" + + " RETURNING id" + +const upsertPresenceFromSyncSQL = "" + + "INSERT INTO syncapi_presence AS p" + + " (id, user_id, presence, last_active_ts)" + + " VALUES ($1, $2, $3, $4)" + + " ON CONFLICT (user_id)" + + " DO UPDATE SET id = $5, " + + " presence = $6, last_active_ts = $7" + + " RETURNING id" + +const selectPresenceForUserSQL = "" + + "SELECT presence, status_msg, last_active_ts" + + " FROM syncapi_presence" + + " WHERE user_id = $1 LIMIT 1" + +const selectMaxPresenceSQL = "" + + "SELECT COALESCE(MAX(id), 0) FROM syncapi_presence" + +const selectPresenceAfter = "" + + " SELECT id, user_id, presence, status_msg, last_active_ts" + + " FROM syncapi_presence" + + " WHERE id > $1" + +type presenceStatements struct { + db *sql.DB + streamIDStatements *streamIDStatements + upsertPresenceStmt *sql.Stmt + upsertPresenceFromSyncStmt *sql.Stmt + selectPresenceForUsersStmt *sql.Stmt + selectMaxPresenceStmt *sql.Stmt + selectPresenceAfterStmt *sql.Stmt +} + +func NewSqlitePresenceTable(db *sql.DB, streamID *streamIDStatements) (*presenceStatements, error) { + _, err := db.Exec(presenceSchema) + if err != nil { + return nil, err + } + s := &presenceStatements{ + db: db, + streamIDStatements: streamID, + } + return s, sqlutil.StatementList{ + {&s.upsertPresenceStmt, upsertPresenceSQL}, + {&s.upsertPresenceFromSyncStmt, upsertPresenceFromSyncSQL}, + {&s.selectPresenceForUsersStmt, selectPresenceForUserSQL}, + {&s.selectMaxPresenceStmt, selectMaxPresenceSQL}, + {&s.selectPresenceAfterStmt, selectPresenceAfter}, + }.Prepare(db) +} + +// UpsertPresence creates/updates a presence status. +func (p *presenceStatements) UpsertPresence( + ctx context.Context, + txn *sql.Tx, + userID string, + statusMsg *string, + presence string, + lastActiveTS gomatrixserverlib.Timestamp, + fromSync bool, +) (pos types.StreamPosition, err error) { + pos, err = p.streamIDStatements.nextPresenceID(ctx, txn) + if err != nil { + return pos, err + } + + presenceStatusID := types.PresenceToInt[presence] + if fromSync { + stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt) + err = stmt.QueryRowContext(ctx, + pos, userID, presenceStatusID, + lastActiveTS, pos, + presenceStatusID, lastActiveTS).Scan(&pos) + } else { + stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt) + err = stmt.QueryRowContext(ctx, + pos, userID, presenceStatusID, + statusMsg, lastActiveTS, pos, + presenceStatusID, statusMsg, lastActiveTS).Scan(&pos) + } + return +} + +// GetPresenceForUser returns the current presence of a user. +func (p *presenceStatements) GetPresenceForUser( + ctx context.Context, txn *sql.Tx, + userID string, +) (*types.Presence, error) { + result := &types.Presence{ + UserID: userID, + } + stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt) + var presenceStatusID int + err := stmt.QueryRowContext(ctx, userID).Scan(&presenceStatusID, &result.ClientFields.StatusMsg, &result.LastActiveTS) + result.ClientFields.Presence = types.PresenceToString[presenceStatusID] + return result, err +} + +func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) { + stmt := sqlutil.TxStmt(txn, p.selectMaxPresenceStmt) + err = stmt.QueryRowContext(ctx).Scan(&pos) + return +} + +// GetPresenceAfter returns the changes presences after a given stream id +func (p *presenceStatements) GetPresenceAfter( + ctx context.Context, txn *sql.Tx, + after types.StreamPosition, +) (presences map[string]*types.Presence, err error) { + presences = make(map[string]*types.Presence) + stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt) + + rows, err := stmt.QueryContext(ctx, after) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows") + var presenceStatusID int + for rows.Next() { + presence := &types.Presence{} + if err := rows.Scan(&presence.StreamPos, &presence.UserID, &presenceStatusID, &presence.ClientFields.StatusMsg, &presence.LastActiveTS); err != nil { + return nil, err + } + presence.ClientFields.Presence = types.PresenceToString[presenceStatusID] + presences[presence.UserID] = presence + } + return presences, rows.Err() +} diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go new file mode 100644 index 000000000..864492e5e --- /dev/null +++ b/syncapi/streams/stream_presence.go @@ -0,0 +1,155 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package streams + +import ( + "context" + "database/sql" + "encoding/json" + "time" + + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +type PresenceStreamProvider struct { + StreamProvider +} + +func (p *PresenceStreamProvider) Setup() { + p.StreamProvider.Setup() + + id, err := p.DB.MaxStreamPositionForPresence(context.Background()) + if err != nil { + panic(err) + } + p.latest = id +} + +func (p *PresenceStreamProvider) CompleteSync( + ctx context.Context, + req *types.SyncRequest, +) types.StreamPosition { + return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) +} + +func (p *PresenceStreamProvider) IncrementalSync( + ctx context.Context, + req *types.SyncRequest, + from, to types.StreamPosition, +) types.StreamPosition { + presences, err := p.DB.PresenceAfter(ctx, from) + if err != nil { + req.Log.WithError(err).Error("p.DB.PresenceAfter failed") + return from + } + + if len(presences) == 0 { + return to + } + + // get all joined users + rooms, err := p.DB.AllJoinedUsersInRooms(ctx) + if err != nil { + req.Log.WithError(err).Error("unable to query joined users") + } + + sharedUsers := map[string]bool{ + req.Device.UserID: true, + } + for roomID := range req.Rooms { + roomUsers := rooms[roomID] + for i := range roomUsers { + sharedUsers[roomUsers[i]] = true + } + } + + // add newly joined rooms user presences + newlyJoined := joinedRooms(req.Response, req.Device.UserID) + for _, roomID := range newlyJoined { + roomUsers := rooms[roomID] + for i := range roomUsers { + sharedUsers[roomUsers[i]] = true + // we already got a presence from this user + if _, ok := presences[roomUsers[i]]; ok { + continue + } + presences[roomUsers[i]], err = p.DB.GetPresence(ctx, roomUsers[i]) + if err != nil { + req.Log.WithError(err).Error("unable to query presence for user") + if err == sql.ErrNoRows { + continue + } + return to + } + } + } + + lastPos := to + for i := range presences { + presence := presences[i] + // Ignore users we don't share a room with + if !sharedUsers[presence.UserID] { + continue + } + presence.ClientFields.LastActiveAgo = time.Since(presence.LastActiveTS.Time()).Milliseconds() + presence.ClientFields.CurrentlyActive = time.Since(presence.LastActiveTS.Time()).Minutes() < 5 + + content, err := json.Marshal(presence.ClientFields) + if err != nil { + return from + } + + req.Response.Presence.Events = append(req.Response.Presence.Events, gomatrixserverlib.ClientEvent{ + Content: content, + Sender: presence.UserID, + Type: gomatrixserverlib.MPresence, + }) + if presence.StreamPos > lastPos { + lastPos = presence.StreamPos + } + } + + return lastPos +} + +func joinedRooms(res *types.Response, userID string) []string { + var roomIDs []string + for roomID, join := range res.Rooms.Join { + // we would expect to see our join event somewhere if we newly joined the room. + // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'. + newlyJoined := membershipEventPresent(join.State.Events, userID) + if newlyJoined { + roomIDs = append(roomIDs, roomID) + continue + } + newlyJoined = membershipEventPresent(join.Timeline.Events, userID) + if newlyJoined { + roomIDs = append(roomIDs, roomID) + } + } + return roomIDs +} + +func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool { + for _, ev := range events { + // it's enough to know that we have our member event here, don't need to check membership content + // as it's implied by being in the respective section of the sync response. + if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID { + return true + } + } + return false +} diff --git a/syncapi/sync/requestpool_test.go b/syncapi/sync/requestpool_test.go new file mode 100644 index 000000000..862bb7fce --- /dev/null +++ b/syncapi/sync/requestpool_test.go @@ -0,0 +1,104 @@ +package sync + +import ( + "sync" + "testing" + "time" + + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/userapi/api" + "github.com/nats-io/nats.go" +) + +type dummyPublisher struct { + count int +} + +func (d *dummyPublisher) PublishMsg(m *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error) { + d.count++ + return nil, nil +} + +func TestRequestPool_updatePresence(t *testing.T) { + type args struct { + presence string + device *api.Device + sleep time.Duration + } + publisher := &dummyPublisher{} + syncMap := sync.Map{} + + tests := []struct { + name string + args args + wantIncrease bool + }{ + { + name: "new presence is published", + wantIncrease: true, + args: args{ + device: &api.Device{UserID: "dummy"}, + }, + }, + { + name: "presence not published, no change", + args: args{ + device: &api.Device{UserID: "dummy"}, + }, + }, + { + name: "new presence is published dummy2", + wantIncrease: true, + args: args{ + device: &api.Device{UserID: "dummy2"}, + presence: "online", + }, + }, + { + name: "different presence is published dummy2", + wantIncrease: true, + args: args{ + device: &api.Device{UserID: "dummy2"}, + presence: "unavailable", + }, + }, + { + name: "same presence is not published dummy2", + args: args{ + device: &api.Device{UserID: "dummy2"}, + presence: "unavailable", + sleep: time.Millisecond * 150, + }, + }, + { + name: "same presence is published after being deleted", + wantIncrease: true, + args: args{ + device: &api.Device{UserID: "dummy2"}, + presence: "unavailable", + }, + }, + } + rp := &RequestPool{ + presence: syncMap, + jetstream: publisher, + cfg: &config.SyncAPI{ + Matrix: &config.Global{ + JetStream: config.JetStream{ + TopicPrefix: "Dendrite", + }, + }, + }, + } + go rp.cleanPresence(time.Millisecond * 50) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + beforeCount := publisher.count + rp.updatePresence(tt.args.presence, tt.args.device) + if tt.wantIncrease && publisher.count <= beforeCount { + t.Fatalf("expected count to increase: %d <= %d", publisher.count, beforeCount) + } + time.Sleep(tt.args.sleep) + }) + } +}