From 0889ceeac2add90881f35d5fe8bb1b700fd62453 Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Sat, 31 Jul 2021 16:10:46 +0200 Subject: [PATCH] Make /sync requests work --- clientapi/routing/presence.go | 2 +- eduserver/api/output.go | 4 +- syncapi/consumers/eduserver_presence.go | 3 +- syncapi/streams/streams.go | 5 ++ syncapi/streams/streams_presence.go | 66 +++++++++++++++++++ syncapi/sync/requestpool.go | 7 ++ syncapi/syncapi.go | 7 ++ syncapi/types/types.go | 16 +++-- sytest-whitelist | 3 +- userapi/api/api.go | 25 +++++-- userapi/internal/api.go | 29 +++++++- userapi/inthttp/client.go | 9 +++ userapi/inthttp/server.go | 13 ++++ userapi/storage/presence/interface.go | 4 ++ .../presence/postgres/presence_table.go | 32 ++++++++- userapi/storage/presence/postgres/storage.go | 4 ++ .../presence/sqlite3/presence_table.go | 30 +++++++++ userapi/storage/presence/sqlite3/storage.go | 4 ++ 18 files changed, 244 insertions(+), 19 deletions(-) create mode 100644 syncapi/streams/streams_presence.go diff --git a/clientapi/routing/presence.go b/clientapi/routing/presence.go index 1602d5ab2..1f4d01a08 100644 --- a/clientapi/routing/presence.go +++ b/clientapi/routing/presence.go @@ -171,7 +171,7 @@ func GetPresence(req *http.Request, resp := presenceResponse{} lastActive := time.Since(presence.LastActiveTS.Time()) resp.LastActiveAgo = lastActive.Milliseconds() - + resp.StatusMsg = presence.StatusMsg resp.CurrentlyActive = lastActive <= time.Minute*5 if !resp.CurrentlyActive { presence.PresenceStatus = types.Unavailable diff --git a/eduserver/api/output.go b/eduserver/api/output.go index e0ddde9a8..2fbc08b54 100644 --- a/eduserver/api/output.go +++ b/eduserver/api/output.go @@ -93,7 +93,7 @@ type OutputPresenceData struct { UserID string `json:"user_id"` Presence types.PresenceStatus `json:"presence"` StatusMsg string `json:"status_msg,omitempty"` - LastActiveTS gomatrixserverlib.Timestamp `json:"-"` + LastActiveTS gomatrixserverlib.Timestamp `json:"last_active_ts"` LastActiveAgo int64 `json:"last_active_ago,omitempty"` - StreamPos types2.StreamPosition + StreamPos types2.StreamPosition `json:"stream_pos"` } diff --git a/syncapi/consumers/eduserver_presence.go b/syncapi/consumers/eduserver_presence.go index d0742d4e9..c2305a90b 100644 --- a/syncapi/consumers/eduserver_presence.go +++ b/syncapi/consumers/eduserver_presence.go @@ -74,7 +74,6 @@ func (s *OutputPresenceDataConsumer) Start() error { } func (s *OutputPresenceDataConsumer) onMessage(msg *sarama.ConsumerMessage) error { - log.Debug("presence received by sync api!") var output api.OutputPresenceData if err := json.Unmarshal(msg.Value, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream @@ -85,7 +84,7 @@ func (s *OutputPresenceDataConsumer) onMessage(msg *sarama.ConsumerMessage) erro log.Debugf("presence received by sync api! %+v", output) s.stream.Advance(output.StreamPos) - s.notifier.OnNewPresence(types.StreamingToken{PresenceDataPostion: output.StreamPos}, output.UserID) + s.notifier.OnNewPresence(types.StreamingToken{PresenceDataPosition: output.StreamPos}, output.UserID) return nil } diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index ba4118df5..a181fcb31 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -18,6 +18,7 @@ type Streams struct { InviteStreamProvider types.StreamProvider SendToDeviceStreamProvider types.StreamProvider AccountDataStreamProvider types.StreamProvider + PresenceDataStreamProdiver types.StreamProvider DeviceListStreamProvider types.PartitionedStreamProvider } @@ -47,6 +48,10 @@ func NewSyncStreamProviders( StreamProvider: StreamProvider{DB: d}, userAPI: userAPI, }, + PresenceDataStreamProdiver: &PresenceStreamProvider{ + StreamProvider: StreamProvider{DB: d}, + UserAPI: userAPI, + }, DeviceListStreamProvider: &DeviceListStreamProvider{ PartitionedStreamProvider: PartitionedStreamProvider{DB: d}, rsAPI: rsAPI, diff --git a/syncapi/streams/streams_presence.go b/syncapi/streams/streams_presence.go new file mode 100644 index 000000000..5989a0409 --- /dev/null +++ b/syncapi/streams/streams_presence.go @@ -0,0 +1,66 @@ +package streams + +import ( + "context" + "encoding/json" + "time" + + "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" +) + +type PresenceStreamProvider struct { + StreamProvider + UserAPI userapi.UserInternalAPI +} + +func (p *PresenceStreamProvider) CompleteSync(ctx context.Context, req *types.SyncRequest) types.StreamPosition { + req.Log.Debug(" CompleteSyncrequested for presence!") + return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) +} + +type outputPresence struct { + AvatarUrl string `json:"avatar_url,omitempty"` + CurrentlyActive bool `json:"currently_active,omitempty"` + LastActiveAgo int64 `json:"last_active_ago,omitempty"` + Presence string `json:"presence,omitempty"` + StatusMsg string `json:"status_msg,omitempty"` +} + +func (p *PresenceStreamProvider) IncrementalSync(ctx context.Context, req *types.SyncRequest, from, to types.StreamPosition) types.StreamPosition { + res := userapi.QueryPresenceAfterResponse{} + if err := p.UserAPI.QueryPresenceAfter(ctx, &userapi.QueryPresenceAfterRequest{StreamPos: int64(from)}, &res); err != nil { + req.Log.WithError(err).Error("unable to fetch presence after") + return from + } + evs := []gomatrixserverlib.ClientEvent{} + var maxPos int64 + for _, presence := range res.Presences { + ev := gomatrixserverlib.ClientEvent{} + lastActive := time.Since(presence.LastActiveTS.Time()) + pres := outputPresence{ + CurrentlyActive: lastActive <= time.Minute*5, + LastActiveAgo: lastActive.Milliseconds(), + Presence: presence.PresenceStatus.String(), + StatusMsg: presence.StatusMsg, + } + + j, err := json.Marshal(pres) + if err != nil { + req.Log.WithError(err).Error("json.Marshal failed") + return from + } + ev.Type = "m.presence" + ev.Sender = presence.UserID + ev.Content = j + + evs = append(evs, ev) + if presence.StreamPos > maxPos { + maxPos = presence.StreamPos + } + } + req.Response.Presence.Events = append(req.Response.Presence.Events, evs...) + + return types.StreamPosition(maxPos) +} diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index a45736106..399cd2b32 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -210,6 +210,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync( syncReq.Context, syncReq, ), + PresenceDataPosition: rp.streams.PresenceDataStreamProdiver.CompleteSync( + syncReq.Context, syncReq, + ), } } else { // Incremental sync @@ -242,6 +245,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. syncReq.Context, syncReq, syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition, ), + PresenceDataPosition: rp.streams.PresenceDataStreamProdiver.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.PresenceDataPosition, currentPos.PresenceDataPosition, + ), } } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 84c7140ca..3dfe0c334 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -108,5 +108,12 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start receipts consumer") } + presenceConsumer := consumers.NewOutputPresenceDataConsumer( + process, cfg, consumer, syncDB, notifier, streams.PresenceDataStreamProdiver, + ) + if err = presenceConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start presence consumer") + } + routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg) } diff --git a/syncapi/types/types.go b/syncapi/types/types.go index d0d1e3399..7dd081a1f 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -124,7 +124,7 @@ type StreamingToken struct { SendToDevicePosition StreamPosition InvitePosition StreamPosition AccountDataPosition StreamPosition - PresenceDataPostion StreamPosition + PresenceDataPosition StreamPosition DeviceListPosition LogPosition } @@ -141,10 +141,11 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) { func (t StreamingToken) String() string { posStr := fmt.Sprintf( - "s%d_%d_%d_%d_%d_%d", + "s%d_%d_%d_%d_%d_%d_%d", t.PDUPosition, t.TypingPosition, t.ReceiptPosition, t.SendToDevicePosition, t.InvitePosition, t.AccountDataPosition, + t.PresenceDataPosition, ) if dl := t.DeviceListPosition; !dl.IsEmpty() { posStr += fmt.Sprintf(".dl-%d-%d", dl.Partition, dl.Offset) @@ -169,12 +170,15 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool { return true case t.DeviceListPosition.IsAfter(&other.DeviceListPosition): return true + case t.PresenceDataPosition > other.PresenceDataPosition: + return true } + return false } func (t *StreamingToken) IsEmpty() bool { - return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition == 0 && t.DeviceListPosition.IsEmpty() + return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.PresenceDataPosition == 0 && t.DeviceListPosition.IsEmpty() } // WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken. @@ -212,6 +216,9 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) { if other.DeviceListPosition.IsAfter(&t.DeviceListPosition) { t.DeviceListPosition = other.DeviceListPosition } + if other.PresenceDataPosition > t.PresenceDataPosition { + t.PresenceDataPosition = other.PresenceDataPosition + } } type TopologyToken struct { @@ -302,7 +309,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { } categories := strings.Split(tok[1:], ".") parts := strings.Split(categories[0], "_") - var positions [6]StreamPosition + var positions [7]StreamPosition for i, p := range parts { if i > len(positions) { break @@ -321,6 +328,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { SendToDevicePosition: positions[3], InvitePosition: positions[4], AccountDataPosition: positions[5], + PresenceDataPosition: positions[6], } // dl-0-1234 // $log_name-$partition-$offset diff --git a/sytest-whitelist b/sytest-whitelist index 657d9c394..00561a1b0 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -551,4 +551,5 @@ Can create more than 10 backup versions Can delete backup Deleted & recreated backups are empty GET /presence/:user_id/status fetches initial status -PUT /presence/:user_id/status updates my presence \ No newline at end of file +PUT /presence/:user_id/status updates my presence +Presence change reports an event to myself \ No newline at end of file diff --git a/userapi/api/api.go b/userapi/api/api.go index ef17a7c48..c9f211b27 100644 --- a/userapi/api/api.go +++ b/userapi/api/api.go @@ -45,6 +45,7 @@ type UserInternalAPI interface { QuerySearchProfiles(ctx context.Context, req *QuerySearchProfilesRequest, res *QuerySearchProfilesResponse) error QueryOpenIDToken(ctx context.Context, req *QueryOpenIDTokenRequest, res *QueryOpenIDTokenResponse) error QueryPresenceForUser(ctx context.Context, req *QueryPresenceForUserRequest, res *QueryPresenceForUserResponse) error + QueryPresenceAfter(ctx context.Context, req *QueryPresenceAfterRequest, res *QueryPresenceAfterResponse) error } type PerformKeyBackupRequest struct { @@ -354,12 +355,24 @@ type QueryPresenceForUserRequest struct { // QueryPresenceForUserResponse is the response for QueryPresenceForUserRequest type QueryPresenceForUserResponse struct { - PresenceStatus types.PresenceStatus - Presence string - StatusMsg string - LastActiveTS gomatrixserverlib.Timestamp - LastActiveAgo int64 - CurrentlyActive bool + UserID string `json:"user_id"` + PresenceStatus types.PresenceStatus `json:"presence_status"` + Presence string `json:"presence"` + StatusMsg string `json:"status_msg"` + LastActiveTS gomatrixserverlib.Timestamp `json:"last_active_ts"` + LastActiveAgo int64 `json:"last_active_ago"` + CurrentlyActive bool `json:"currently_active"` + StreamPos int64 `json:"stream_pos"` +} + +// QueryPresenceAfterRequest is the request for QueryPresenceAfterRequest +type QueryPresenceAfterRequest struct { + StreamPos int64 +} + +// QueryPresenceAfterResponse is the response for QueryPresenceAfterRequest +type QueryPresenceAfterResponse struct { + Presences []QueryPresenceForUserResponse } // Device represents a client's device (mobile, web, etc) diff --git a/userapi/internal/api.go b/userapi/internal/api.go index 88dbe5804..b2c39451c 100644 --- a/userapi/internal/api.go +++ b/userapi/internal/api.go @@ -60,8 +60,12 @@ func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAc } func (a *UserInternalAPI) InputPresenceData(ctx context.Context, req *api.InputPresenceRequest, res *api.InputPresenceResponse) error { - _, err := a.PresenceDB.UpsertPresence(ctx, req.UserID, req.StatusMsg, req.Presence, req.LastActiveTS) - return err + pos, err := a.PresenceDB.UpsertPresence(ctx, req.UserID, req.StatusMsg, req.Presence, req.LastActiveTS) + if err != nil { + return err + } + res.StreamPos = pos + return nil } func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.PerformAccountCreationRequest, res *api.PerformAccountCreationResponse) error { @@ -485,6 +489,27 @@ func (a *UserInternalAPI) QueryPresenceForUser(ctx context.Context, req *api.Que return nil } +func (a *UserInternalAPI) QueryPresenceAfter(ctx context.Context, req *api.QueryPresenceAfterRequest, res *api.QueryPresenceAfterResponse) error { + p, err := a.PresenceDB.GetPresenceAfter(ctx, req.StreamPos) + if err != nil { + return err + } + presences := []api.QueryPresenceForUserResponse{} + for _, x := range p { + var y api.QueryPresenceForUserResponse + y.UserID = x.UserID + y.Presence = x.Presence.String() + y.StreamPos = int64(x.StreamPos) + y.LastActiveTS = x.LastActiveTS + y.LastActiveAgo = x.LastActiveAgo + y.PresenceStatus = x.Presence + y.StatusMsg = x.StatusMsg + presences = append(presences, y) + } + res.Presences = append(res.Presences, presences...) + return nil +} + func (a *UserInternalAPI) PerformKeyBackup(ctx context.Context, req *api.PerformKeyBackupRequest, res *api.PerformKeyBackupResponse) { // Delete metadata if req.DeleteBackup { diff --git a/userapi/inthttp/client.go b/userapi/inthttp/client.go index 9423160b1..869532b8f 100644 --- a/userapi/inthttp/client.go +++ b/userapi/inthttp/client.go @@ -47,6 +47,7 @@ const ( QuerySearchProfilesPath = "/userapi/querySearchProfiles" QueryOpenIDTokenPath = "/userapi/queryOpenIDToken" QueryPresenceForUserPath = "/userapi/queryPresenceForUser" + QueryPresenceAfterPath = "/userapi/queryPresenceAfter" QueryKeyBackupPath = "/userapi/queryKeyBackup" ) @@ -267,3 +268,11 @@ func (h *httpUserInternalAPI) QueryKeyBackup(ctx context.Context, req *api.Query res.Error = err.Error() } } + +func (h *httpUserInternalAPI) QueryPresenceAfter(ctx context.Context, req *api.QueryPresenceAfterRequest, res *api.QueryPresenceAfterResponse) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryPresenceAfter") + defer span.Finish() + + apiURL := h.apiURL + QueryPresenceAfterPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) +} diff --git a/userapi/inthttp/server.go b/userapi/inthttp/server.go index 69a343dd5..15861b7d1 100644 --- a/userapi/inthttp/server.go +++ b/userapi/inthttp/server.go @@ -260,4 +260,17 @@ func AddRoutes(internalAPIMux *mux.Router, s api.UserInternalAPI) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle(QueryPresenceAfterPath, + httputil.MakeInternalAPI("queryPresenceAfter", func(req *http.Request) util.JSONResponse { + request := api.QueryPresenceAfterRequest{} + response := api.QueryPresenceAfterResponse{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := s.QueryPresenceAfter(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) } diff --git a/userapi/storage/presence/interface.go b/userapi/storage/presence/interface.go index 333771910..c6d7839d5 100644 --- a/userapi/storage/presence/interface.go +++ b/userapi/storage/presence/interface.go @@ -34,4 +34,8 @@ type Database interface { ctx context.Context, userID string, ) (presence api.OutputPresenceData, err error) + GetPresenceAfter( + ctx context.Context, + pos int64, + ) (presence []api.OutputPresenceData, err error) } diff --git a/userapi/storage/presence/postgres/presence_table.go b/userapi/storage/presence/postgres/presence_table.go index 45b7209ba..9da71b5ee 100644 --- a/userapi/storage/presence/postgres/presence_table.go +++ b/userapi/storage/presence/postgres/presence_table.go @@ -45,7 +45,7 @@ CREATE INDEX IF NOT EXISTS presence_presences_user_id ON presence_presences(user ` const upsertPresenceSQL = "" + - "INSERT INTO presence_presences" + + "INSERT INTO presence_presences AS p" + " (user_id, presence, status_msg, last_active_ts)" + " VALUES ($1, $2, $3, $4)" + " ON CONFLICT (user_id)" + @@ -61,10 +61,16 @@ const selectPresenceForUserSQL = "" + const selectMaxPresenceSQL = "" + "SELECT MAX(id) FROM presence_presences" +const selectPresenceAfter = "" + + " SELECT id, user_id, presence, status_msg, last_active_ts" + + " FROM presence_presences" + + " WHERE id > $1" + type presenceStatements struct { upsertPresenceStmt *sql.Stmt selectPresenceForUsersStmt *sql.Stmt selectMaxPresenceStmt *sql.Stmt + selectPresenceAfterStmt *sql.Stmt } func (p *presenceStatements) execSchema(db *sql.DB) error { @@ -82,6 +88,9 @@ func (p *presenceStatements) prepare(db *sql.DB) (err error) { if p.selectMaxPresenceStmt, err = db.Prepare(selectMaxPresenceSQL); err != nil { return } + if p.selectPresenceAfterStmt, err = db.Prepare(selectPresenceAfter); err != nil { + return + } return } @@ -121,3 +130,24 @@ func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) err = stmt.QueryRowContext(ctx).Scan(&pos) return } + +// GetPresenceAfter returns the changes presences after a given stream id +func (p *presenceStatements) GetPresenceAfter( + ctx context.Context, txn *sql.Tx, + after int64, +) (presences []api.OutputPresenceData, err error) { + stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt) + + rows, err := stmt.QueryContext(ctx, after) + if err != nil { + return nil, err + } + for rows.Next() { + presence := api.OutputPresenceData{} + if err := rows.Scan(&presence.StreamPos, &presence.UserID, &presence.Presence, &presence.StatusMsg, &presence.LastActiveTS); err != nil { + return nil, err + } + presences = append(presences, presence) + } + return presences, rows.Err() +} diff --git a/userapi/storage/presence/postgres/storage.go b/userapi/storage/presence/postgres/storage.go index 92fa6dfae..8bb30c9fa 100644 --- a/userapi/storage/presence/postgres/storage.go +++ b/userapi/storage/presence/postgres/storage.go @@ -56,3 +56,7 @@ func (d *Database) UpsertPresence( func (d *Database) GetPresenceForUser(ctx context.Context, userID string) (api.OutputPresenceData, error) { return d.presence.GetPresenceForUser(ctx, nil, userID) } + +func (d *Database) GetPresenceAfter(ctx context.Context, pos int64) (presence []api.OutputPresenceData, err error) { + return d.presence.GetPresenceAfter(ctx, nil, pos) +} diff --git a/userapi/storage/presence/sqlite3/presence_table.go b/userapi/storage/presence/sqlite3/presence_table.go index abc730c3c..cbc1379ad 100644 --- a/userapi/storage/presence/sqlite3/presence_table.go +++ b/userapi/storage/presence/sqlite3/presence_table.go @@ -56,9 +56,15 @@ const selectPresenceForUserSQL = "" + " FROM presence_presences" + " WHERE user_id = $1 LIMIT 1" +const selectPresenceAfter = "" + + " SELECT id, user_id, presence, status_msg, last_active_ts" + + " FROM presence_presences" + + " WHERE id > $1" + type presenceStatements struct { upsertPresenceStmt *sql.Stmt selectPresenceForUsersStmt *sql.Stmt + selectPresenceAfterStmt *sql.Stmt } func (p *presenceStatements) execSchema(db *sql.DB) error { @@ -73,6 +79,9 @@ func (p *presenceStatements) prepare(db *sql.DB) (err error) { if p.selectPresenceForUsersStmt, err = db.Prepare(selectPresenceForUserSQL); err != nil { return } + if p.selectPresenceAfterStmt, err = db.Prepare(selectPresenceAfter); err != nil { + return + } return } @@ -100,3 +109,24 @@ func (p *presenceStatements) GetPresenceForUser( err = stmt.QueryRowContext(ctx, userID).Scan(&presence.Presence, &presence.StatusMsg, &presence.LastActiveTS) return } + +// GetPresenceAfter returns the changes presences after a given stream id +func (p *presenceStatements) GetPresenceAfter( + ctx context.Context, txn *sql.Tx, + after int64, +) (presences []api.OutputPresenceData, err error) { + stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt) + + rows, err := stmt.QueryContext(ctx, after) + if err != nil { + return nil, err + } + for rows.Next() { + presence := api.OutputPresenceData{} + if err := rows.Scan(&presence.StreamPos, &presence.UserID, &presence.Presence, &presence.StatusMsg, &presence.LastActiveTS); err != nil { + return nil, err + } + presences = append(presences, presence) + } + return presences, rows.Err() +} diff --git a/userapi/storage/presence/sqlite3/storage.go b/userapi/storage/presence/sqlite3/storage.go index 8ddba4b34..7ba8619d8 100644 --- a/userapi/storage/presence/sqlite3/storage.go +++ b/userapi/storage/presence/sqlite3/storage.go @@ -70,3 +70,7 @@ func (d *Database) UpsertPresence( func (d *Database) GetPresenceForUser(ctx context.Context, userID string) (api.OutputPresenceData, error) { return d.presence.GetPresenceForUser(ctx, nil, userID) } + +func (d *Database) GetPresenceAfter(ctx context.Context, pos int64) (presence []api.OutputPresenceData, err error) { + return d.presence.GetPresenceAfter(ctx, nil, pos) +}