mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-21 04:53:14 -06:00
Don't drop presence for remote servers immediately
This commit is contained in:
parent
205a15621a
commit
170a96ebbe
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/queue"
|
"github.com/matrix-org/dendrite/federationapi/queue"
|
||||||
"github.com/matrix-org/dendrite/federationapi/storage"
|
"github.com/matrix-org/dendrite/federationapi/storage"
|
||||||
|
|
@ -32,6 +33,9 @@ import (
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const maxRetries = 3
|
||||||
|
const retryDelay = time.Second
|
||||||
|
|
||||||
// OutputReceiptConsumer consumes events that originate in the clientapi.
|
// OutputReceiptConsumer consumes events that originate in the clientapi.
|
||||||
type OutputPresenceConsumer struct {
|
type OutputPresenceConsumer struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
@ -93,16 +97,6 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
var queryRes roomserverAPI.QueryRoomsForUserResponse
|
|
||||||
err = t.rsAPI.QueryRoomsForUser(t.ctx, &roomserverAPI.QueryRoomsForUserRequest{
|
|
||||||
UserID: userID,
|
|
||||||
WantMembership: "join",
|
|
||||||
}, &queryRes)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("failed to calculate joined rooms for user")
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
presence := msg.Header.Get("presence")
|
presence := msg.Header.Get("presence")
|
||||||
|
|
||||||
ts, err := strconv.Atoi(msg.Header.Get("last_active_ts"))
|
ts, err := strconv.Atoi(msg.Header.Get("last_active_ts"))
|
||||||
|
|
@ -110,13 +104,33 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// send this presence to all servers who share rooms with this user.
|
var joined []gomatrixserverlib.ServerName
|
||||||
joined, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true)
|
// We're trying to get joined rooms for this user for 3 seconds (3 retries, 1s delay)
|
||||||
if err != nil {
|
// If we fail to get joined hosts, if we fail to, we discard the presence event.
|
||||||
log.WithError(err).Error("failed to get joined hosts")
|
for i := 0; i < maxRetries; i++ {
|
||||||
return true
|
var queryRes roomserverAPI.QueryRoomsForUserResponse
|
||||||
}
|
err = t.rsAPI.QueryRoomsForUser(t.ctx, &roomserverAPI.QueryRoomsForUserRequest{
|
||||||
|
UserID: userID,
|
||||||
|
WantMembership: "join",
|
||||||
|
}, &queryRes)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("failed to calculate joined rooms for user")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
joined, err = t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("failed to get joined hosts")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(joined) == 0 {
|
||||||
|
time.Sleep(retryDelay)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// If we still have no joined hosts, discard the event.
|
||||||
if len(joined) == 0 {
|
if len(joined) == 0 {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue