diff --git a/clientapi/routing/presence.go b/clientapi/routing/presence.go index 9176e542a..1602d5ab2 100644 --- a/clientapi/routing/presence.go +++ b/clientapi/routing/presence.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/eduserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + types2 "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/types" "github.com/matrix-org/gomatrixserverlib" @@ -81,12 +82,14 @@ func SetPresence(req *http.Request, } } + lastActive := gomatrixserverlib.AsTimestamp(time.Now()) + pReq := userapi.InputPresenceRequest{ UserID: userID, DisplayName: device.DisplayName, Presence: p, StatusMsg: r.StatusMsg, - LastActiveTS: int64(gomatrixserverlib.AsTimestamp(time.Now())), + LastActiveTS: int64(lastActive), } pRes := userapi.InputPresenceResponse{} @@ -96,7 +99,19 @@ func SetPresence(req *http.Request, return util.ErrorResponse(err) } + eduReq := api.InputPresenceRequest{ + UserID: userID, + Presence: p, + StatusMsg: r.StatusMsg, + LastActiveTS: lastActive, + StreamPos: types2.StreamPosition(pRes.StreamPos), + } + eduRes := api.InputPresenceResponse{} // TODO: Inform EDU Server to send new presence to the federationsender/syncapi + if err := eduAPI.InputPresence(req.Context(), &eduReq, &eduRes); err != nil { + logrus.WithError(err).Error("failed to send presence to eduserver") + return util.ErrorResponse(err) + } return util.JSONResponse{ Code: http.StatusOK, diff --git a/eduserver/api/input.go b/eduserver/api/input.go index c14ee1a86..c217ab51c 100644 --- a/eduserver/api/input.go +++ b/eduserver/api/input.go @@ -20,6 +20,7 @@ package api import ( "context" + types2 "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/userapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -81,7 +82,8 @@ type InputPresenceRequest struct { UserID string `json:"user_id"` Presence types.PresenceStatus `json:"status"` StatusMsg string `json:"status_msg"` - LastActiveTS gomatrixserverlib.Timestamp `json:"timestamp"` + LastActiveTS gomatrixserverlib.Timestamp `json:"last_active_ts"` + StreamPos types2.StreamPosition `json:"stream_pos"` } // InputPresenceResponse is a response to InputPresenceRequest diff --git a/eduserver/api/output.go b/eduserver/api/output.go index f7c08e24c..e0ddde9a8 100644 --- a/eduserver/api/output.go +++ b/eduserver/api/output.go @@ -19,6 +19,7 @@ package api import ( "time" + types2 "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/userapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -87,11 +88,12 @@ type FederationReceiptData struct { EventIDs []string `json:"event_ids"` } -// OutputPresence is an entry in the presence output kafka log -type OutputPresence struct { +// OutputPresenceData is an entry in the presence output kafka log +type OutputPresenceData struct { UserID string `json:"user_id"` Presence types.PresenceStatus `json:"presence"` StatusMsg string `json:"status_msg,omitempty"` LastActiveTS gomatrixserverlib.Timestamp `json:"-"` LastActiveAgo int64 `json:"last_active_ago,omitempty"` + StreamPos types2.StreamPosition } diff --git a/eduserver/input/input.go b/eduserver/input/input.go index 81feef078..b586a068a 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -221,11 +221,12 @@ func (t *EDUServerInputAPI) InputPresence( "status": request.Presence, "status_msg": request.StatusMsg, }).Infof("Producing to topic '%s'", t.OutputPresenceTopic) - output := &api.OutputPresence{ + output := &api.OutputPresenceData{ UserID: request.UserID, Presence: request.Presence, StatusMsg: request.StatusMsg, LastActiveTS: request.LastActiveTS, + StreamPos: request.StreamPos, } js, err := json.Marshal(output) if err != nil { diff --git a/syncapi/consumers/eduserver_presence.go b/syncapi/consumers/eduserver_presence.go new file mode 100644 index 000000000..d0742d4e9 --- /dev/null +++ b/syncapi/consumers/eduserver_presence.go @@ -0,0 +1,91 @@ +// Copyright 2021 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "encoding/json" + + "github.com/Shopify/sarama" + "github.com/getsentry/sentry-go" + "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" + log "github.com/sirupsen/logrus" +) + +// OutputPresenceDataConsumer consumes events that originated in the EDU server. +type OutputPresenceDataConsumer struct { + presenceConsumer *internal.ContinualConsumer + db storage.Database + stream types.StreamProvider + notifier *notifier.Notifier +} + +// NewOutputPresenceDataConsumer creates a new OutputPresenceDataConsumer. +// Call Start() to begin consuming from the EDU server. +func NewOutputPresenceDataConsumer( + process *process.ProcessContext, + cfg *config.SyncAPI, + kafkaConsumer sarama.Consumer, + store storage.Database, + notifier *notifier.Notifier, + stream types.StreamProvider, +) *OutputPresenceDataConsumer { + + consumer := internal.ContinualConsumer{ + Process: process, + ComponentName: "syncapi/eduserver/presence", + Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputPresenceData), + Consumer: kafkaConsumer, + PartitionStore: store, + } + + s := &OutputPresenceDataConsumer{ + presenceConsumer: &consumer, + db: store, + notifier: notifier, + stream: stream, + } + + consumer.ProcessMessage = s.onMessage + + return s +} + +// Start consuming from EDU api +func (s *OutputPresenceDataConsumer) Start() error { + return s.presenceConsumer.Start() +} + +func (s *OutputPresenceDataConsumer) onMessage(msg *sarama.ConsumerMessage) error { + log.Debug("presence received by sync api!") + var output api.OutputPresenceData + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("EDU server output log: message parse failure") + sentry.CaptureException(err) + return nil + } + log.Debugf("presence received by sync api! %+v", output) + + s.stream.Advance(output.StreamPos) + s.notifier.OnNewPresence(types.StreamingToken{PresenceDataPostion: output.StreamPos}, output.UserID) + + return nil +} diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go index d853cc0e4..c78347827 100644 --- a/syncapi/notifier/notifier.go +++ b/syncapi/notifier/notifier.go @@ -217,6 +217,39 @@ func (n *Notifier) OnNewInvite( n.wakeupUsers([]string{wakeUserID}, nil, n.currPos) } +func (n *Notifier) OnNewPresence( + posUpdate types.StreamingToken, userID string, +) { + n.streamLock.Lock() + defer n.streamLock.Unlock() + + var wakeUserIDs []string + _, wantDomain, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return + } + // determine if users share a room and needs to be woken up + for _, users := range n.roomIDToJoinedUsers { + // user sending presence is not in room + if !users[userID] { + continue + } + // check all users in room + for user := range users { + _, domain, err := gomatrixserverlib.SplitID('@', user) + if err != nil { + return + } + if domain == wantDomain { + wakeUserIDs = append(wakeUserIDs, user) + } + } + } + + n.currPos.ApplyUpdates(posUpdate) + n.wakeupUsers(wakeUserIDs, nil, n.currPos) +} + // GetListener returns a UserStreamListener that can be used to wait for // updates for a user. Must be closed. // notify for anything before sincePos diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 49fa1a166..d0d1e3399 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -124,6 +124,7 @@ type StreamingToken struct { SendToDevicePosition StreamPosition InvitePosition StreamPosition AccountDataPosition StreamPosition + PresenceDataPostion StreamPosition DeviceListPosition LogPosition } diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go index 3e5777888..c4ef79547 100644 --- a/syncapi/types/types_test.go +++ b/syncapi/types/types_test.go @@ -42,9 +42,9 @@ func TestNewSyncTokenWithLogs(t *testing.T) { func TestSyncTokens(t *testing.T) { shouldPass := map[string]string{ - "s4_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, LogPosition{}}.String(), - "s3_1_0_0_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, 0, 0, LogPosition{1, 2}}.String(), - "s3_1_2_3_5_0": StreamingToken{3, 1, 2, 3, 5, 0, LogPosition{}}.String(), + "s4_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, 0, LogPosition{}}.String(), + "s3_1_0_0_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, 0, 0, 0, LogPosition{1, 2}}.String(), + "s3_1_2_3_5_0": StreamingToken{3, 1, 2, 3, 5, 0, 0, LogPosition{}}.String(), "t3_1": TopologyToken{3, 1}.String(), } diff --git a/userapi/api/api.go b/userapi/api/api.go index 34d7a4508..ef17a7c48 100644 --- a/userapi/api/api.go +++ b/userapi/api/api.go @@ -343,7 +343,9 @@ type InputPresenceRequest struct { } // InputPresenceResponse is the response for InputPresenceRequest -type InputPresenceResponse struct{} +type InputPresenceResponse struct { + StreamPos int64 +} // QueryPresenceForUserRequest is the request for QueryPresenceForUserRequest type QueryPresenceForUserRequest struct { diff --git a/userapi/storage/presence/interface.go b/userapi/storage/presence/interface.go index 75a5d1ad1..333771910 100644 --- a/userapi/storage/presence/interface.go +++ b/userapi/storage/presence/interface.go @@ -33,5 +33,5 @@ type Database interface { GetPresenceForUser( ctx context.Context, userID string, - ) (presence api.OutputPresence, err error) + ) (presence api.OutputPresenceData, err error) } diff --git a/userapi/storage/presence/postgres/presence_table.go b/userapi/storage/presence/postgres/presence_table.go index 33976d9e6..45b7209ba 100644 --- a/userapi/storage/presence/postgres/presence_table.go +++ b/userapi/storage/presence/postgres/presence_table.go @@ -20,6 +20,7 @@ import ( "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal/sqlutil" + types2 "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/userapi/types" ) @@ -57,9 +58,13 @@ const selectPresenceForUserSQL = "" + " FROM presence_presences" + " WHERE user_id = $1 LIMIT 1" +const selectMaxPresenceSQL = "" + + "SELECT MAX(id) FROM presence_presences" + type presenceStatements struct { upsertPresenceStmt *sql.Stmt selectPresenceForUsersStmt *sql.Stmt + selectMaxPresenceStmt *sql.Stmt } func (p *presenceStatements) execSchema(db *sql.DB) error { @@ -74,6 +79,9 @@ func (p *presenceStatements) prepare(db *sql.DB) (err error) { if p.selectPresenceForUsersStmt, err = db.Prepare(selectPresenceForUserSQL); err != nil { return } + if p.selectMaxPresenceStmt, err = db.Prepare(selectMaxPresenceSQL); err != nil { + return + } return } @@ -100,10 +108,16 @@ func (p *presenceStatements) UpsertPresence( func (p *presenceStatements) GetPresenceForUser( ctx context.Context, txn *sql.Tx, userID string, -) (presence api.OutputPresence, err error) { +) (presence api.OutputPresenceData, err error) { presence.UserID = userID stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt) err = stmt.QueryRowContext(ctx, userID).Scan(&presence.Presence, &presence.StatusMsg, &presence.LastActiveTS) return } + +func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types2.StreamingToken, err error) { + stmt := sqlutil.TxStmt(txn, p.selectMaxPresenceStmt) + err = stmt.QueryRowContext(ctx).Scan(&pos) + return +} diff --git a/userapi/storage/presence/postgres/storage.go b/userapi/storage/presence/postgres/storage.go index bb56d263c..92fa6dfae 100644 --- a/userapi/storage/presence/postgres/storage.go +++ b/userapi/storage/presence/postgres/storage.go @@ -53,6 +53,6 @@ func (d *Database) UpsertPresence( return } -func (d *Database) GetPresenceForUser(ctx context.Context, userID string) (api.OutputPresence, error) { +func (d *Database) GetPresenceForUser(ctx context.Context, userID string) (api.OutputPresenceData, error) { return d.presence.GetPresenceForUser(ctx, nil, userID) } diff --git a/userapi/storage/presence/sqlite3/presence_table.go b/userapi/storage/presence/sqlite3/presence_table.go index 253eada9f..abc730c3c 100644 --- a/userapi/storage/presence/sqlite3/presence_table.go +++ b/userapi/storage/presence/sqlite3/presence_table.go @@ -93,7 +93,7 @@ func (p *presenceStatements) UpsertPresence( func (p *presenceStatements) GetPresenceForUser( ctx context.Context, txn *sql.Tx, userID string, -) (presence api.OutputPresence, err error) { +) (presence api.OutputPresenceData, err error) { presence.UserID = userID stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt) diff --git a/userapi/storage/presence/sqlite3/storage.go b/userapi/storage/presence/sqlite3/storage.go index 45bbded71..8ddba4b34 100644 --- a/userapi/storage/presence/sqlite3/storage.go +++ b/userapi/storage/presence/sqlite3/storage.go @@ -67,6 +67,6 @@ func (d *Database) UpsertPresence( return } -func (d *Database) GetPresenceForUser(ctx context.Context, userID string) (api.OutputPresence, error) { +func (d *Database) GetPresenceForUser(ctx context.Context, userID string) (api.OutputPresenceData, error) { return d.presence.GetPresenceForUser(ctx, nil, userID) }