Make "Existing members see new member's presence" pass

This commit is contained in:
Till Faelligen 2021-11-15 21:30:09 +01:00
parent a9c6b405f7
commit de238538e4
13 changed files with 94 additions and 10 deletions

View file

@ -295,7 +295,7 @@ func (m *DendriteMonolith) Start() {
)
fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing, true,
base, federation, rsAPI, m.userAPI, keyRing, true,
)
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)

View file

@ -115,14 +115,14 @@ func (m *DendriteMonolith) Start() {
base, keyRing,
)
fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing, true,
)
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, federation)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
keyAPI.SetUserAPI(userAPI)
fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, userAPI, keyRing, true,
)
eduInputAPI := eduserver.NewInternalAPI(
base, cache.New(), userAPI,
)

View file

@ -168,7 +168,7 @@ func main() {
asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI)
fsAPI := federationsender.NewInternalAPI(
&base.Base, federation, rsAPI, keyRing, true,
&base.Base, federation, rsAPI, userAPI, keyRing, true,
)
rsAPI.SetFederationSenderAPI(fsAPI)
provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI)

View file

@ -175,12 +175,13 @@ func main() {
)
rsAPI := rsComponent
fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing, true,
base, federation, rsAPI, nil, keyRing, true,
)
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
keyAPI.SetUserAPI(userAPI)
fsAPI.SetUserAPI(userAPI)
eduInputAPI := eduserver.NewInternalAPI(
base, cache.New(), userAPI,

View file

@ -118,7 +118,7 @@ func main() {
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI)
fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing, true,
base, federation, rsAPI, userAPI, keyRing, true,
)
rsComponent.SetFederationSenderAPI(fsAPI)

View file

@ -102,7 +102,7 @@ func main() {
}
fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing, false,
base, federation, rsAPI, nil, keyRing, false,
)
if base.UseHTTPAPIs {
federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI)
@ -115,6 +115,8 @@ func main() {
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
keyAPI.SetUserAPI(userAPI)
fsAPI.SetUserAPI(userAPI)
if traceInternal {
userAPI = &uapi.UserInternalAPITrace{
Impl: userAPI,

View file

@ -6,6 +6,7 @@ import (
"time"
"github.com/matrix-org/dendrite/federationsender/types"
userAPI "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
)
@ -40,6 +41,8 @@ func (e *FederationClientError) Error() string {
type FederationSenderInternalAPI interface {
FederationClient
SetUserAPI(api userAPI.UserInternalAPI)
QueryServerKeys(ctx context.Context, request *QueryServerKeysRequest, response *QueryServerKeysResponse) error
// PerformDirectoryLookup looks up a remote room ID from a room alias.

View file

@ -26,6 +26,7 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/kafka"
userAPI "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
@ -42,6 +43,7 @@ func NewInternalAPI(
base *setup.BaseDendrite,
federation *gomatrixserverlib.FederationClient,
rsAPI roomserverAPI.RoomserverInternalAPI,
userAPI userAPI.UserInternalAPI,
keyRing *gomatrixserverlib.KeyRing,
resetBlacklist bool,
) api.FederationSenderInternalAPI {
@ -95,5 +97,5 @@ func NewInternalAPI(
logrus.WithError(err).Panic("failed to start key server consumer")
}
return internal.NewFederationSenderInternalAPI(federationSenderDB, cfg, rsAPI, federation, keyRing, stats, queues)
return internal.NewFederationSenderInternalAPI(federationSenderDB, cfg, rsAPI, userAPI, federation, keyRing, stats, queues)
}

View file

@ -11,6 +11,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/storage"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
userAPI "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
)
@ -21,6 +22,7 @@ type FederationSenderInternalAPI struct {
cfg *config.FederationSender
statistics *statistics.Statistics
rsAPI roomserverAPI.RoomserverInternalAPI
userAPI userAPI.UserInternalAPI
federation *gomatrixserverlib.FederationClient
keyRing *gomatrixserverlib.KeyRing
queues *queue.OutgoingQueues
@ -30,6 +32,7 @@ type FederationSenderInternalAPI struct {
func NewFederationSenderInternalAPI(
db storage.Database, cfg *config.FederationSender,
rsAPI roomserverAPI.RoomserverInternalAPI,
userAPI userAPI.UserInternalAPI,
federation *gomatrixserverlib.FederationClient,
keyRing *gomatrixserverlib.KeyRing,
statistics *statistics.Statistics,
@ -39,6 +42,7 @@ func NewFederationSenderInternalAPI(
db: db,
cfg: cfg,
rsAPI: rsAPI,
userAPI: userAPI,
federation: federation,
keyRing: keyRing,
statistics: statistics,
@ -46,6 +50,10 @@ func NewFederationSenderInternalAPI(
}
}
func (f *FederationSenderInternalAPI) SetUserAPI(userAPI userAPI.UserInternalAPI) {
f.userAPI = userAPI
}
func (a *FederationSenderInternalAPI) isBlacklistedOrBackingOff(s gomatrixserverlib.ServerName) (*statistics.ServerStatistics, error) {
stats := a.statistics.ForServer(s)
until, blacklisted := stats.BackoffInfo()

View file

@ -7,9 +7,11 @@ import (
"fmt"
"time"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationsender/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/version"
userAPI "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -103,6 +105,11 @@ func (r *FederationSenderInternalAPI) PerformJoin(
continue
}
// once we successfully joined a remote room, send available presence data to it
if err := r.sendPresenceData(ctx, request.RoomID, serverName); err != nil {
lastErr = err
}
// We're all good.
response.JoinedVia = serverName
return
@ -132,6 +139,62 @@ func (r *FederationSenderInternalAPI) PerformJoin(
)
}
// sendPresenceData sends presence data for a given roomID
func (r *FederationSenderInternalAPI) sendPresenceData(
ctx context.Context, roomID string, serverName gomatrixserverlib.ServerName,
) (err error) {
// query current presence for users
memberShip := roomserverAPI.QueryMembershipsForRoomResponse{}
if err := r.rsAPI.QueryMembershipsForRoom(ctx, &roomserverAPI.QueryMembershipsForRoomRequest{RoomID: roomID, JoinedOnly: true}, &memberShip); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"server_name": serverName,
"room_id": roomID,
}).Warnf("Failed to query membership for room")
return err
}
content := eduserverAPI.FederationPresenceData{}
for _, event := range memberShip.JoinEvents {
// only send presence events which originated from us
_, senderServerName, err := gomatrixserverlib.SplitID('@', event.Sender)
if err != nil {
continue
}
if senderServerName != r.cfg.Matrix.ServerName {
continue
}
var presence userAPI.QueryPresenceForUserResponse
if err := r.userAPI.QueryPresenceForUser(ctx, &userAPI.QueryPresenceForUserRequest{UserID: event.Sender}, &presence); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"server_name": serverName,
"room_id": roomID,
}).Warnf("Failed query presence for user")
continue
}
lastActiveTS := time.Since(presence.LastActiveTS.Time())
ev := eduserverAPI.FederationPresenceSingle{
CurrentlyActive: lastActiveTS < time.Minute*5,
LastActiveAgo: int(lastActiveTS.Milliseconds()),
Presence: presence.Presence,
UserID: presence.UserID,
StatusMsg: presence.StatusMsg,
}
content.Push = append(content.Push, ev)
}
edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MPresence,
Origin: string(r.cfg.Matrix.ServerName),
}
if edu.Content, err = json.Marshal(content); err != nil {
return err
}
return r.queues.SendEDU(edu, r.cfg.Matrix.ServerName, []gomatrixserverlib.ServerName{serverName})
}
func (r *FederationSenderInternalAPI) performJoinUsingServer(
ctx context.Context,
roomID, userID string,

View file

@ -7,6 +7,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/httputil"
userAPI "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/opentracing/opentracing-go"
@ -51,6 +52,8 @@ type httpFederationSenderInternalAPI struct {
httpClient *http.Client
}
func (f *httpFederationSenderInternalAPI) SetUserAPI(userAPI userAPI.UserInternalAPI) {}
// Handle an instruction to make_leave & send_leave with a remote server.
func (h *httpFederationSenderInternalAPI) PerformLeave(
ctx context.Context,

View file

@ -595,3 +595,4 @@ User can invite remote user to room with version 9
Remote user can backfill in a room with version 9
Can reject invites over federation for rooms with version 9
Can receive redactions from regular users over federation in room version 9
Existing members see new member's presence

View file

@ -495,6 +495,7 @@ func (a *UserInternalAPI) QueryPresenceForUser(ctx context.Context, req *api.Que
res.PresenceStatus = p.Presence
res.StatusMsg = p.StatusMsg
res.LastActiveTS = p.LastActiveTS
res.UserID = p.UserID
maxLastSeen := gomatrixserverlib.Timestamp(maxLastSeenTS)
if maxLastSeen.Time().After(p.LastActiveTS.Time()) {