mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-19 20:13:09 -06:00
Refactor presence queries
This commit is contained in:
parent
5d408525d0
commit
abdb1cfbfe
|
|
@ -108,6 +108,13 @@ func GetPresence(
|
||||||
e := presence.Header.Get("error")
|
e := presence.Header.Get("error")
|
||||||
if e != "" {
|
if e != "" {
|
||||||
log.Errorf("received error msg from nats: %s", e)
|
log.Errorf("received error msg from nats: %s", e)
|
||||||
|
if code := presence.Header.Get("error_code"); code == "404" {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusNotFound,
|
||||||
|
JSON: jsonerror.NotFound("No status for user found."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
JSON: types.PresenceClientResponse{
|
JSON: types.PresenceClientResponse{
|
||||||
|
|
|
||||||
|
|
@ -78,7 +78,7 @@ func (s *PresenceConsumer) Start() error {
|
||||||
// Normal NATS subscription, used by Request/Reply
|
// Normal NATS subscription, used by Request/Reply
|
||||||
_, err := s.nats.Subscribe(s.requestTopic, func(msg *nats.Msg) {
|
_, err := s.nats.Subscribe(s.requestTopic, func(msg *nats.Msg) {
|
||||||
userID := msg.Header.Get(jetstream.UserID)
|
userID := msg.Header.Get(jetstream.UserID)
|
||||||
presence, err := s.db.GetPresence(context.Background(), userID)
|
presences, err := s.db.GetPresences(context.Background(), []string{userID})
|
||||||
m := &nats.Msg{
|
m := &nats.Msg{
|
||||||
Header: nats.Header{},
|
Header: nats.Header{},
|
||||||
}
|
}
|
||||||
|
|
@ -89,11 +89,15 @@ func (s *PresenceConsumer) Start() error {
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if presence == nil {
|
if len(presences) == 0 {
|
||||||
presence = &types.PresenceInternal{
|
m.Header.Set("error", "presence does not exist")
|
||||||
UserID: userID,
|
m.Header.Set("error_code", "404")
|
||||||
|
if err = msg.RespondMsg(msg); err != nil {
|
||||||
|
logrus.WithError(err).Error("Unable to respond to messages")
|
||||||
}
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
presence := presences[0]
|
||||||
|
|
||||||
deviceRes := api.QueryDevicesResponse{}
|
deviceRes := api.QueryDevicesResponse{}
|
||||||
if err = s.deviceAPI.QueryDevices(s.ctx, &api.QueryDevicesRequest{UserID: userID}, &deviceRes); err != nil {
|
if err = s.deviceAPI.QueryDevices(s.ctx, &api.QueryDevicesRequest{UserID: userID}, &deviceRes); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -106,7 +106,7 @@ type DatabaseTransaction interface {
|
||||||
SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error)
|
SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error)
|
||||||
// getUserUnreadNotificationCountsForRooms returns the unread notifications for the given rooms
|
// getUserUnreadNotificationCountsForRooms returns the unread notifications for the given rooms
|
||||||
GetUserUnreadNotificationCountsForRooms(ctx context.Context, userID string, roomIDs map[string]string) (map[string]*eventutil.NotificationData, error)
|
GetUserUnreadNotificationCountsForRooms(ctx context.Context, userID string, roomIDs map[string]string) (map[string]*eventutil.NotificationData, error)
|
||||||
GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error)
|
GetPresences(ctx context.Context, userID []string) ([]*types.PresenceInternal, error)
|
||||||
PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error)
|
PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error)
|
||||||
RelationsFor(ctx context.Context, roomID, eventID, relType, eventType string, from, to types.StreamPosition, backwards bool, limit int) (events []types.StreamEvent, prevBatch, nextBatch string, err error)
|
RelationsFor(ctx context.Context, roomID, eventID, relType, eventType string, from, to types.StreamPosition, backwards bool, limit int) (events []types.StreamEvent, prevBatch, nextBatch string, err error)
|
||||||
}
|
}
|
||||||
|
|
@ -186,7 +186,7 @@ type Database interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type Presence interface {
|
type Presence interface {
|
||||||
GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error)
|
GetPresences(ctx context.Context, userID []string) ([]*types.PresenceInternal, error)
|
||||||
UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error)
|
UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/lib/pq"
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
|
@ -63,9 +64,9 @@ const upsertPresenceFromSyncSQL = "" +
|
||||||
" RETURNING id"
|
" RETURNING id"
|
||||||
|
|
||||||
const selectPresenceForUserSQL = "" +
|
const selectPresenceForUserSQL = "" +
|
||||||
"SELECT presence, status_msg, last_active_ts" +
|
"SELECT user_id, presence, status_msg, last_active_ts" +
|
||||||
" FROM syncapi_presence" +
|
" FROM syncapi_presence" +
|
||||||
" WHERE user_id = $1 LIMIT 1"
|
" WHERE user_id = ANY($1)"
|
||||||
|
|
||||||
const selectMaxPresenceSQL = "" +
|
const selectMaxPresenceSQL = "" +
|
||||||
"SELECT COALESCE(MAX(id), 0) FROM syncapi_presence"
|
"SELECT COALESCE(MAX(id), 0) FROM syncapi_presence"
|
||||||
|
|
@ -119,20 +120,26 @@ func (p *presenceStatements) UpsertPresence(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPresenceForUser returns the current presence of a user.
|
// GetPresenceForUsers returns the current presence of a user.
|
||||||
func (p *presenceStatements) GetPresenceForUser(
|
func (p *presenceStatements) GetPresenceForUsers(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
userID string,
|
userIDs []string,
|
||||||
) (*types.PresenceInternal, error) {
|
) ([]*types.PresenceInternal, error) {
|
||||||
result := &types.PresenceInternal{
|
result := make([]*types.PresenceInternal, 0, len(userIDs))
|
||||||
UserID: userID,
|
|
||||||
}
|
|
||||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
||||||
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
rows, err := stmt.QueryContext(ctx, pq.Array(userIDs))
|
||||||
if err == sql.ErrNoRows {
|
if err != nil {
|
||||||
return nil, nil
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
presence := &types.PresenceInternal{}
|
||||||
|
if err = rows.Scan(&presence.UserID, &presence.Presence, &presence.ClientFields.StatusMsg, &presence.LastActiveTS); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
presence.ClientFields.Presence = presence.Presence.String()
|
||||||
|
result = append(result, presence)
|
||||||
}
|
}
|
||||||
result.ClientFields.Presence = result.Presence.String()
|
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -572,8 +572,8 @@ func (d *Database) UpdatePresence(ctx context.Context, userID string, presence t
|
||||||
return pos, err
|
return pos, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
|
func (d *Database) GetPresences(ctx context.Context, userIDs []string) ([]*types.PresenceInternal, error) {
|
||||||
return d.Presence.GetPresenceForUser(ctx, nil, userID)
|
return d.Presence.GetPresenceForUsers(ctx, nil, userIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) {
|
func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) {
|
||||||
|
|
|
||||||
|
|
@ -596,8 +596,8 @@ func (d *DatabaseTransaction) GetUserUnreadNotificationCountsForRooms(ctx contex
|
||||||
return d.NotificationData.SelectUserUnreadCountsForRooms(ctx, d.txn, userID, roomIDs)
|
return d.NotificationData.SelectUserUnreadCountsForRooms(ctx, d.txn, userID, roomIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DatabaseTransaction) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
|
func (d *DatabaseTransaction) GetPresences(ctx context.Context, userIDs []string) ([]*types.PresenceInternal, error) {
|
||||||
return d.Presence.GetPresenceForUser(ctx, d.txn, userID)
|
return d.Presence.GetPresenceForUsers(ctx, d.txn, userIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DatabaseTransaction) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) {
|
func (d *DatabaseTransaction) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) {
|
||||||
|
|
|
||||||
|
|
@ -17,8 +17,11 @@ package sqlite3
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
|
@ -62,9 +65,9 @@ const upsertPresenceFromSyncSQL = "" +
|
||||||
" RETURNING id"
|
" RETURNING id"
|
||||||
|
|
||||||
const selectPresenceForUserSQL = "" +
|
const selectPresenceForUserSQL = "" +
|
||||||
"SELECT presence, status_msg, last_active_ts" +
|
"SELECT user_id, presence, status_msg, last_active_ts" +
|
||||||
" FROM syncapi_presence" +
|
" FROM syncapi_presence" +
|
||||||
" WHERE user_id = $1 LIMIT 1"
|
" WHERE user_id IN ($1)"
|
||||||
|
|
||||||
const selectMaxPresenceSQL = "" +
|
const selectMaxPresenceSQL = "" +
|
||||||
"SELECT COALESCE(MAX(id), 0) FROM syncapi_presence"
|
"SELECT COALESCE(MAX(id), 0) FROM syncapi_presence"
|
||||||
|
|
@ -134,20 +137,37 @@ func (p *presenceStatements) UpsertPresence(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPresenceForUser returns the current presence of a user.
|
// GetPresenceForUsers returns the current presence of a user.
|
||||||
func (p *presenceStatements) GetPresenceForUser(
|
func (p *presenceStatements) GetPresenceForUsers(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
userID string,
|
userIDs []string,
|
||||||
) (*types.PresenceInternal, error) {
|
) ([]*types.PresenceInternal, error) {
|
||||||
result := &types.PresenceInternal{
|
qry := strings.Replace(selectPresenceForUserSQL, "($1)", sqlutil.QueryVariadic(len(userIDs)), 1)
|
||||||
UserID: userID,
|
prepStmt, err := p.db.Prepare(qry)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
|
defer internal.CloseAndLogIfError(ctx, prepStmt, "GetPresenceForUsers: stmt.close() failed")
|
||||||
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
|
logrus.Debugf("XXX: getting users: %v", userIDs)
|
||||||
if err == sql.ErrNoRows {
|
params := make([]interface{}, len(userIDs))
|
||||||
return nil, nil
|
for i := range userIDs {
|
||||||
|
params[i] = userIDs[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err := sqlutil.TxStmt(txn, prepStmt).QueryContext(ctx, params...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceForUsers: rows.close() failed")
|
||||||
|
result := make([]*types.PresenceInternal, 0, len(userIDs))
|
||||||
|
for rows.Next() {
|
||||||
|
presence := &types.PresenceInternal{}
|
||||||
|
if err = rows.Scan(&presence.UserID, &presence.Presence, &presence.ClientFields.StatusMsg, &presence.LastActiveTS); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
presence.ClientFields.Presence = presence.Presence.String()
|
||||||
|
result = append(result, presence)
|
||||||
}
|
}
|
||||||
result.ClientFields.Presence = result.Presence.String()
|
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -207,7 +207,7 @@ type Ignores interface {
|
||||||
|
|
||||||
type Presence interface {
|
type Presence interface {
|
||||||
UpsertPresence(ctx context.Context, txn *sql.Tx, userID string, statusMsg *string, presence types.Presence, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (pos types.StreamPosition, err error)
|
UpsertPresence(ctx context.Context, txn *sql.Tx, userID string, statusMsg *string, presence types.Presence, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (pos types.StreamPosition, err error)
|
||||||
GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.PresenceInternal, err error)
|
GetPresenceForUsers(ctx context.Context, txn *sql.Tx, userIDs []string) (presence []*types.PresenceInternal, err error)
|
||||||
GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error)
|
GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error)
|
||||||
GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (presences map[string]*types.PresenceInternal, err error)
|
GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (presences map[string]*types.PresenceInternal, err error)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -70,54 +70,24 @@ func (p *PresenceStreamProvider) IncrementalSync(
|
||||||
return from
|
return from
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add presence for users which newly joined a room
|
getPresenceForUsers, err := p.getNeededUsersFromRequest(ctx, req, presences)
|
||||||
for userID := range req.MembershipChanges {
|
if err != nil {
|
||||||
if _, ok := presences[userID]; ok {
|
return from
|
||||||
continue
|
|
||||||
}
|
|
||||||
presences[userID], err = snapshot.GetPresence(ctx, userID)
|
|
||||||
if err != nil {
|
|
||||||
_ = snapshot.Rollback()
|
|
||||||
return from
|
|
||||||
}
|
|
||||||
if len(presences) > req.Filter.Presence.Limit {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(presences) == 0 {
|
// Got no presence between range and no presence to get from the database
|
||||||
|
if len(getPresenceForUsers) == 0 && len(presences) == 0 {
|
||||||
return to
|
return to
|
||||||
}
|
}
|
||||||
|
|
||||||
// add newly joined rooms user presences
|
dbPresences, err := snapshot.GetPresences(ctx, getPresenceForUsers)
|
||||||
newlyJoined := joinedRooms(req.Response, req.Device.UserID)
|
if err != nil {
|
||||||
if len(newlyJoined) > 0 {
|
req.Log.WithError(err).Error("unable to query presence for user")
|
||||||
// TODO: Check if this is working better than before.
|
_ = snapshot.Rollback()
|
||||||
if err = p.notifier.LoadRooms(ctx, p.DB, newlyJoined); err != nil {
|
return from
|
||||||
req.Log.WithError(err).Error("unable to refresh notifier lists")
|
}
|
||||||
return from
|
for _, presence := range dbPresences {
|
||||||
}
|
presences[presence.UserID] = presence
|
||||||
NewlyJoinedLoop:
|
|
||||||
for _, roomID := range newlyJoined {
|
|
||||||
roomUsers := p.notifier.JoinedUsers(roomID)
|
|
||||||
for i := range roomUsers {
|
|
||||||
// we already got a presence from this user
|
|
||||||
if _, ok := presences[roomUsers[i]]; ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Bear in mind that this might return nil, but at least populating
|
|
||||||
// a nil means that there's a map entry so we won't repeat this call.
|
|
||||||
presences[roomUsers[i]], err = snapshot.GetPresence(ctx, roomUsers[i])
|
|
||||||
if err != nil {
|
|
||||||
req.Log.WithError(err).Error("unable to query presence for user")
|
|
||||||
_ = snapshot.Rollback()
|
|
||||||
return from
|
|
||||||
}
|
|
||||||
if len(presences) > req.Filter.Presence.Limit {
|
|
||||||
break NewlyJoinedLoop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
lastPos := from
|
lastPos := from
|
||||||
|
|
@ -136,8 +106,7 @@ func (p *PresenceStreamProvider) IncrementalSync(
|
||||||
prevPresence := pres.(*types.PresenceInternal)
|
prevPresence := pres.(*types.PresenceInternal)
|
||||||
currentlyActive := prevPresence.CurrentlyActive()
|
currentlyActive := prevPresence.CurrentlyActive()
|
||||||
skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID
|
skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID
|
||||||
_, membershipChange := req.MembershipChanges[presence.UserID]
|
if skip {
|
||||||
if skip && !membershipChange {
|
|
||||||
req.Log.Tracef("Skipping presence, no change (%s)", presence.UserID)
|
req.Log.Tracef("Skipping presence, no change (%s)", presence.UserID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
@ -179,6 +148,38 @@ func (p *PresenceStreamProvider) IncrementalSync(
|
||||||
return lastPos
|
return lastPos
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *PresenceStreamProvider) getNeededUsersFromRequest(ctx context.Context, req *types.SyncRequest, presences map[string]*types.PresenceInternal) ([]string, error) {
|
||||||
|
getPresenceForUsers := []string{}
|
||||||
|
// Add presence for users which newly joined a room
|
||||||
|
for userID := range req.MembershipChanges {
|
||||||
|
if _, ok := presences[userID]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
getPresenceForUsers = append(getPresenceForUsers, userID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// add newly joined rooms user presences
|
||||||
|
newlyJoined := joinedRooms(req.Response, req.Device.UserID)
|
||||||
|
if len(newlyJoined) > 0 {
|
||||||
|
// TODO: Check if this is working better than before.
|
||||||
|
if err := p.notifier.LoadRooms(ctx, p.DB, newlyJoined); err != nil {
|
||||||
|
req.Log.WithError(err).Error("unable to refresh notifier lists")
|
||||||
|
return getPresenceForUsers, err
|
||||||
|
}
|
||||||
|
for _, roomID := range newlyJoined {
|
||||||
|
roomUsers := p.notifier.JoinedUsers(roomID)
|
||||||
|
for i := range roomUsers {
|
||||||
|
// we already got a presence from this user
|
||||||
|
if _, ok := presences[roomUsers[i]]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
getPresenceForUsers = append(getPresenceForUsers, roomUsers[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return getPresenceForUsers, nil
|
||||||
|
}
|
||||||
|
|
||||||
func joinedRooms(res *types.Response, userID string) []string {
|
func joinedRooms(res *types.Response, userID string) []string {
|
||||||
var roomIDs []string
|
var roomIDs []string
|
||||||
for roomID, join := range res.Rooms.Join {
|
for roomID, join := range res.Rooms.Join {
|
||||||
|
|
|
||||||
|
|
@ -145,12 +145,12 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensure we also send the current status_msg to federated servers and not nil
|
// ensure we also send the current status_msg to federated servers and not nil
|
||||||
dbPresence, err := db.GetPresence(context.Background(), userID)
|
dbPresence, err := db.GetPresences(context.Background(), []string{userID})
|
||||||
if err != nil && err != sql.ErrNoRows {
|
if err != nil && err != sql.ErrNoRows {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if dbPresence != nil {
|
if len(dbPresence) > 0 && dbPresence[0] != nil {
|
||||||
newPresence.ClientFields = dbPresence.ClientFields
|
newPresence.ClientFields = dbPresence[0].ClientFields
|
||||||
}
|
}
|
||||||
newPresence.ClientFields.Presence = presenceID.String()
|
newPresence.ClientFields.Presence = presenceID.String()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -29,8 +29,8 @@ func (d dummyDB) UpdatePresence(ctx context.Context, userID string, presence typ
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d dummyDB) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
|
func (d dummyDB) GetPresences(ctx context.Context, userID []string) ([]*types.PresenceInternal, error) {
|
||||||
return &types.PresenceInternal{}, nil
|
return []*types.PresenceInternal{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d dummyDB) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) {
|
func (d dummyDB) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue