Compare commits
8 commits
main
...
neilalexan
Author | SHA1 | Date | |
---|---|---|---|
c1ad1b0efc | |||
6fe58ffec3 | |||
a94fb71d1b | |||
9a7c567037 | |||
0bc93ad82a | |||
6ad64767a1 | |||
3571892fcd | |||
d0ca183f49 |
|
@ -2,9 +2,11 @@ package internal
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Functions here are "proxying" calls to the gomatrixserverlib federation
|
// Functions here are "proxying" calls to the gomatrixserverlib federation
|
||||||
|
@ -44,13 +46,23 @@ func (a *FederationInternalAPI) ClaimKeys(
|
||||||
) (gomatrixserverlib.RespClaimKeys, error) {
|
) (gomatrixserverlib.RespClaimKeys, error) {
|
||||||
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
|
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
ires, err := a.doRequestIfNotBackingOffOrBlacklisted(s, func() (interface{}, error) {
|
|
||||||
|
logrus.Infof("XXX: ClaimKeys request: %s %+v", s, oneTimeKeys)
|
||||||
|
|
||||||
|
ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
|
||||||
return a.federation.ClaimKeys(ctx, s, oneTimeKeys)
|
return a.federation.ClaimKeys(ctx, s, oneTimeKeys)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
logrus.Infof("XXX: ClaimKeys error: %s %+v", s, err)
|
||||||
return gomatrixserverlib.RespClaimKeys{}, err
|
return gomatrixserverlib.RespClaimKeys{}, err
|
||||||
}
|
}
|
||||||
return ires.(gomatrixserverlib.RespClaimKeys), nil
|
res, ok := ires.(gomatrixserverlib.RespClaimKeys)
|
||||||
|
if !ok {
|
||||||
|
logrus.Infof("XXX: ClaimKeys type-cast error: %s %+v", s, res)
|
||||||
|
return gomatrixserverlib.RespClaimKeys{}, fmt.Errorf("typecast error")
|
||||||
|
}
|
||||||
|
logrus.Infof("XXX: ClaimKeys response: %s %+v (%+v)", s, ires, res)
|
||||||
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *FederationInternalAPI) QueryKeys(
|
func (a *FederationInternalAPI) QueryKeys(
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -22,7 +22,7 @@ require (
|
||||||
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
|
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
|
||||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
|
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530
|
github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20221025142407-17b0be811afa
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20221027132025-853797ba5086
|
||||||
github.com/matrix-org/pinecone v0.0.0-20221026160848-639feeff74d6
|
github.com/matrix-org/pinecone v0.0.0-20221026160848-639feeff74d6
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||||
github.com/mattn/go-sqlite3 v1.14.15
|
github.com/mattn/go-sqlite3 v1.14.15
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -387,8 +387,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 h1:s7fexw
|
||||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo=
|
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo=
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 h1:kHKxCOLcHH8r4Fzarl4+Y3K5hjothkVW5z7T1dUM11U=
|
github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 h1:kHKxCOLcHH8r4Fzarl4+Y3K5hjothkVW5z7T1dUM11U=
|
||||||
github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20221025142407-17b0be811afa h1:S98DShDv3sn7O4n4HjtJOejypseYVpv1R/XPg+cDnfI=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20221027132025-853797ba5086 h1:+ge81dGIZ2Uj1plc3mKFKlnJM+W7hxjzH55TpuenLPg=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20221025142407-17b0be811afa/go.mod h1:Mtifyr8q8htcBeugvlDnkBcNUy5LO8OzUoplAf1+mb4=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20221027132025-853797ba5086/go.mod h1:Mtifyr8q8htcBeugvlDnkBcNUy5LO8OzUoplAf1+mb4=
|
||||||
github.com/matrix-org/pinecone v0.0.0-20221026160848-639feeff74d6 h1:nAT5w41Q9uWTSnpKW55/hBwP91j2IFYPDRs0jJ8TyFI=
|
github.com/matrix-org/pinecone v0.0.0-20221026160848-639feeff74d6 h1:nAT5w41Q9uWTSnpKW55/hBwP91j2IFYPDRs0jJ8TyFI=
|
||||||
github.com/matrix-org/pinecone v0.0.0-20221026160848-639feeff74d6/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
|
github.com/matrix-org/pinecone v0.0.0-20221026160848-639feeff74d6/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
|
||||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
|
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
|
||||||
|
|
|
@ -128,58 +128,55 @@ func (a *KeyInternalAPI) PerformClaimKeys(ctx context.Context, req *api.PerformC
|
||||||
func (a *KeyInternalAPI) claimRemoteKeys(
|
func (a *KeyInternalAPI) claimRemoteKeys(
|
||||||
ctx context.Context, timeout time.Duration, res *api.PerformClaimKeysResponse, domainToDeviceKeys map[string]map[string]map[string]string,
|
ctx context.Context, timeout time.Duration, res *api.PerformClaimKeysResponse, domainToDeviceKeys map[string]map[string]map[string]string,
|
||||||
) {
|
) {
|
||||||
resultCh := make(chan *gomatrixserverlib.RespClaimKeys, len(domainToDeviceKeys))
|
var wg sync.WaitGroup // Wait for fan-out goroutines to finish
|
||||||
// allows us to wait until all federation servers have been poked
|
var mu sync.Mutex // Protects the response struct
|
||||||
var wg sync.WaitGroup
|
var claimed int // Number of keys claimed in total
|
||||||
wg.Add(len(domainToDeviceKeys))
|
var failures int // Number of servers we failed to ask
|
||||||
// mutex for failures
|
|
||||||
var failMu sync.Mutex
|
util.GetLogger(ctx).WithField("num_servers", len(domainToDeviceKeys)).Info("Claiming remote keys from servers")
|
||||||
util.GetLogger(ctx).WithField("num_servers", len(domainToDeviceKeys)).Info("Claiming remote keys from servers")
|
wg.Add(len(domainToDeviceKeys))
|
||||||
|
|
||||||
|
util.GetLogger(ctx).Infof("Domain to device keys: %+v", domainToDeviceKeys)
|
||||||
|
|
||||||
// fan out
|
|
||||||
for d, k := range domainToDeviceKeys {
|
for d, k := range domainToDeviceKeys {
|
||||||
go func(domain string, keysToClaim map[string]map[string]string) {
|
go func(domain string, keysToClaim map[string]map[string]string) {
|
||||||
defer wg.Done()
|
util.GetLogger(ctx).Infof("Keys to claim from %s: %+v", domain, keysToClaim)
|
||||||
|
|
||||||
fedCtx, cancel := context.WithTimeout(ctx, timeout)
|
fedCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
claimKeyRes, err := a.FedClient.ClaimKeys(fedCtx, gomatrixserverlib.ServerName(domain), keysToClaim)
|
claimKeyRes, err := a.FedClient.ClaimKeys(fedCtx, gomatrixserverlib.ServerName(domain), keysToClaim)
|
||||||
|
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("Server %s response: %+v", domain, claimKeyRes)
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(ctx).WithError(err).WithField("server", domain).Error("ClaimKeys failed")
|
util.GetLogger(ctx).WithError(err).WithField("server", domain).Error("ClaimKeys failed")
|
||||||
failMu.Lock()
|
|
||||||
res.Failures[domain] = map[string]interface{}{
|
res.Failures[domain] = map[string]interface{}{
|
||||||
"message": err.Error(),
|
"message": err.Error(),
|
||||||
}
|
}
|
||||||
failMu.Unlock()
|
failures++
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resultCh <- &claimKeyRes
|
|
||||||
|
for userID, deviceIDToKeys := range claimKeyRes.OneTimeKeys {
|
||||||
|
res.OneTimeKeys[userID] = make(map[string]map[string]json.RawMessage)
|
||||||
|
for deviceID, keys := range deviceIDToKeys {
|
||||||
|
res.OneTimeKeys[userID][deviceID] = keys
|
||||||
|
claimed += len(keys)
|
||||||
|
}
|
||||||
|
}
|
||||||
}(d, k)
|
}(d, k)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close the result channel when the goroutines have quit so the for .. range exits
|
wg.Wait()
|
||||||
go func() {
|
util.GetLogger(ctx).WithFields(logrus.Fields{
|
||||||
wg.Wait()
|
"num_keys": claimed,
|
||||||
close(resultCh)
|
"num_failures": failures,
|
||||||
}()
|
}).Info("Claimed remote keys")
|
||||||
|
|
||||||
keysClaimed := 0
|
|
||||||
for result := range resultCh {
|
|
||||||
for userID, nest := range result.OneTimeKeys {
|
|
||||||
res.OneTimeKeys[userID] = make(map[string]map[string]json.RawMessage)
|
|
||||||
for deviceID, nest2 := range nest {
|
|
||||||
res.OneTimeKeys[userID][deviceID] = make(map[string]json.RawMessage)
|
|
||||||
for keyIDWithAlgo, otk := range nest2 {
|
|
||||||
keyJSON, err := json.Marshal(otk)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
res.OneTimeKeys[userID][deviceID][keyIDWithAlgo] = keyJSON
|
|
||||||
keysClaimed++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
util.GetLogger(ctx).WithField("num_keys", keysClaimed).Info("Claimed remote keys")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *KeyInternalAPI) PerformDeleteKeys(ctx context.Context, req *api.PerformDeleteKeysRequest, res *api.PerformDeleteKeysResponse) error {
|
func (a *KeyInternalAPI) PerformDeleteKeys(ctx context.Context, req *api.PerformDeleteKeysRequest, res *api.PerformDeleteKeysResponse) error {
|
||||||
|
|
Loading…
Reference in a new issue