mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-09 07:03:10 -06:00
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/fts
This commit is contained in:
commit
2c81466593
|
|
@ -102,8 +102,7 @@ type userInteractiveFlow struct {
|
|||
// the user already has a valid access token, but we want to double-check
|
||||
// that it isn't stolen by re-authenticating them.
|
||||
type UserInteractive struct {
|
||||
Completed []string
|
||||
Flows []userInteractiveFlow
|
||||
Flows []userInteractiveFlow
|
||||
// Map of login type to implementation
|
||||
Types map[string]Type
|
||||
// Map of session ID to completed login types, will need to be extended in future
|
||||
|
|
@ -116,7 +115,6 @@ func NewUserInteractive(userAccountAPI api.UserLoginAPI, cfg *config.ClientAPI)
|
|||
Config: cfg,
|
||||
}
|
||||
return &UserInteractive{
|
||||
Completed: []string{},
|
||||
Flows: []userInteractiveFlow{
|
||||
{
|
||||
Stages: []string{typePassword.Name()},
|
||||
|
|
@ -140,7 +138,6 @@ func (u *UserInteractive) IsSingleStageFlow(authType string) bool {
|
|||
|
||||
func (u *UserInteractive) AddCompletedStage(sessionID, authType string) {
|
||||
// TODO: Handle multi-stage flows
|
||||
u.Completed = append(u.Completed, authType)
|
||||
delete(u.Sessions, sessionID)
|
||||
}
|
||||
|
||||
|
|
@ -157,7 +154,7 @@ func (u *UserInteractive) Challenge(sessionID string) *util.JSONResponse {
|
|||
return &util.JSONResponse{
|
||||
Code: 401,
|
||||
JSON: Challenge{
|
||||
Completed: u.Completed,
|
||||
Completed: u.Sessions[sessionID],
|
||||
Flows: u.Flows,
|
||||
Session: sessionID,
|
||||
Params: make(map[string]interface{}),
|
||||
|
|
|
|||
|
|
@ -187,3 +187,38 @@ func TestUserInteractivePasswordBadLogin(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestUserInteractive_AddCompletedStage(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
sessionID string
|
||||
}{
|
||||
{
|
||||
name: "first user",
|
||||
sessionID: util.RandomString(8),
|
||||
},
|
||||
{
|
||||
name: "second user",
|
||||
sessionID: util.RandomString(8),
|
||||
},
|
||||
{
|
||||
name: "third user",
|
||||
sessionID: util.RandomString(8),
|
||||
},
|
||||
}
|
||||
u := setup()
|
||||
ctx := context.Background()
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
_, resp := u.Verify(ctx, []byte("{}"), nil)
|
||||
challenge, ok := resp.JSON.(Challenge)
|
||||
if !ok {
|
||||
t.Fatalf("expected a Challenge, got %T", resp.JSON)
|
||||
}
|
||||
if len(challenge.Completed) > 0 {
|
||||
t.Fatalf("expected 0 completed stages, got %d", len(challenge.Completed))
|
||||
}
|
||||
u.AddCompletedStage(tt.sessionID, "")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -251,125 +251,151 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
|||
waitingSyncRequests.Inc()
|
||||
defer waitingSyncRequests.Dec()
|
||||
|
||||
currentPos := rp.Notifier.CurrentPosition()
|
||||
// loop until we get some data
|
||||
for {
|
||||
startTime := time.Now()
|
||||
currentPos := rp.Notifier.CurrentPosition()
|
||||
|
||||
if !rp.shouldReturnImmediately(syncReq, currentPos) {
|
||||
timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above
|
||||
defer timer.Stop()
|
||||
// if the since token matches the current positions, wait via the notifier
|
||||
if !rp.shouldReturnImmediately(syncReq, currentPos) {
|
||||
timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above
|
||||
defer timer.Stop()
|
||||
|
||||
userStreamListener := rp.Notifier.GetListener(*syncReq)
|
||||
defer userStreamListener.Close()
|
||||
userStreamListener := rp.Notifier.GetListener(*syncReq)
|
||||
defer userStreamListener.Close()
|
||||
|
||||
giveup := func() util.JSONResponse {
|
||||
syncReq.Log.Debugln("Responding to sync since client gave up or timeout was reached")
|
||||
syncReq.Response.NextBatch = syncReq.Since
|
||||
// We should always try to include OTKs in sync responses, otherwise clients might upload keys
|
||||
// even if that's not required. See also:
|
||||
// https://github.com/matrix-org/synapse/blob/29f06704b8871a44926f7c99e73cf4a978fb8e81/synapse/rest/client/sync.py#L276-L281
|
||||
// Only try to get OTKs if the context isn't already done.
|
||||
if syncReq.Context.Err() == nil {
|
||||
err = internal.DeviceOTKCounts(syncReq.Context, rp.keyAPI, syncReq.Device.UserID, syncReq.Device.ID, syncReq.Response)
|
||||
if err != nil && err != context.Canceled {
|
||||
syncReq.Log.WithError(err).Warn("failed to get OTK counts")
|
||||
giveup := func() util.JSONResponse {
|
||||
syncReq.Log.Debugln("Responding to sync since client gave up or timeout was reached")
|
||||
syncReq.Response.NextBatch = syncReq.Since
|
||||
// We should always try to include OTKs in sync responses, otherwise clients might upload keys
|
||||
// even if that's not required. See also:
|
||||
// https://github.com/matrix-org/synapse/blob/29f06704b8871a44926f7c99e73cf4a978fb8e81/synapse/rest/client/sync.py#L276-L281
|
||||
// Only try to get OTKs if the context isn't already done.
|
||||
if syncReq.Context.Err() == nil {
|
||||
err = internal.DeviceOTKCounts(syncReq.Context, rp.keyAPI, syncReq.Device.UserID, syncReq.Device.ID, syncReq.Response)
|
||||
if err != nil && err != context.Canceled {
|
||||
syncReq.Log.WithError(err).Warn("failed to get OTK counts")
|
||||
}
|
||||
}
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: syncReq.Response,
|
||||
}
|
||||
}
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: syncReq.Response,
|
||||
|
||||
select {
|
||||
case <-syncReq.Context.Done(): // Caller gave up
|
||||
return giveup()
|
||||
|
||||
case <-timer.C: // Timeout reached
|
||||
return giveup()
|
||||
|
||||
case <-userStreamListener.GetNotifyChannel(syncReq.Since):
|
||||
syncReq.Log.Debugln("Responding to sync after wake-up")
|
||||
currentPos.ApplyUpdates(userStreamListener.GetSyncPosition())
|
||||
}
|
||||
} else {
|
||||
syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync immediately")
|
||||
}
|
||||
|
||||
if syncReq.Since.IsEmpty() {
|
||||
// Complete sync
|
||||
syncReq.Response.NextBatch = types.StreamingToken{
|
||||
PDUPosition: rp.streams.PDUStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
TypingPosition: rp.streams.TypingStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
ReceiptPosition: rp.streams.ReceiptStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
InvitePosition: rp.streams.InviteStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
NotificationDataPosition: rp.streams.NotificationDataStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
PresencePosition: rp.streams.PresenceStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
}
|
||||
} else {
|
||||
// Incremental sync
|
||||
syncReq.Response.NextBatch = types.StreamingToken{
|
||||
PDUPosition: rp.streams.PDUStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.PDUPosition, currentPos.PDUPosition,
|
||||
),
|
||||
TypingPosition: rp.streams.TypingStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.TypingPosition, currentPos.TypingPosition,
|
||||
),
|
||||
ReceiptPosition: rp.streams.ReceiptStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition,
|
||||
),
|
||||
InvitePosition: rp.streams.InviteStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.InvitePosition, currentPos.InvitePosition,
|
||||
),
|
||||
SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition,
|
||||
),
|
||||
AccountDataPosition: rp.streams.AccountDataStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition,
|
||||
),
|
||||
NotificationDataPosition: rp.streams.NotificationDataStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition,
|
||||
),
|
||||
DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
|
||||
),
|
||||
PresencePosition: rp.streams.PresenceStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.PresencePosition, currentPos.PresencePosition,
|
||||
),
|
||||
}
|
||||
// it's possible for there to be no updates for this user even though since < current pos,
|
||||
// e.g busy servers with a quiet user. In this scenario, we don't want to return a no-op
|
||||
// response immediately, so let's try this again but pretend they bumped their since token.
|
||||
// If the incremental sync was processed very quickly then we expect the next loop to block
|
||||
// with a notifier, but if things are slow it's entirely possible that currentPos is no
|
||||
// longer the current position so we will hit this code path again. We need to do this and
|
||||
// not return a no-op response because:
|
||||
// - It's an inefficient use of bandwidth.
|
||||
// - Some sytests which test 'waking up' sync rely on some sync requests to block, which
|
||||
// they weren't always doing, resulting in flakey tests.
|
||||
if !syncReq.Response.HasUpdates() {
|
||||
syncReq.Since = currentPos
|
||||
// do not loop again if the ?timeout= is 0 as that means "return immediately"
|
||||
if syncReq.Timeout > 0 {
|
||||
syncReq.Timeout = syncReq.Timeout - time.Since(startTime)
|
||||
if syncReq.Timeout < 0 {
|
||||
syncReq.Timeout = 0
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-syncReq.Context.Done(): // Caller gave up
|
||||
return giveup()
|
||||
|
||||
case <-timer.C: // Timeout reached
|
||||
return giveup()
|
||||
|
||||
case <-userStreamListener.GetNotifyChannel(syncReq.Since):
|
||||
syncReq.Log.Debugln("Responding to sync after wake-up")
|
||||
currentPos.ApplyUpdates(userStreamListener.GetSyncPosition())
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: syncReq.Response,
|
||||
}
|
||||
} else {
|
||||
syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync immediately")
|
||||
}
|
||||
|
||||
if syncReq.Since.IsEmpty() {
|
||||
// Complete sync
|
||||
syncReq.Response.NextBatch = types.StreamingToken{
|
||||
PDUPosition: rp.streams.PDUStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
TypingPosition: rp.streams.TypingStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
ReceiptPosition: rp.streams.ReceiptStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
InvitePosition: rp.streams.InviteStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
NotificationDataPosition: rp.streams.NotificationDataStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
PresencePosition: rp.streams.PresenceStreamProvider.CompleteSync(
|
||||
syncReq.Context, syncReq,
|
||||
),
|
||||
}
|
||||
} else {
|
||||
// Incremental sync
|
||||
syncReq.Response.NextBatch = types.StreamingToken{
|
||||
PDUPosition: rp.streams.PDUStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.PDUPosition, currentPos.PDUPosition,
|
||||
),
|
||||
TypingPosition: rp.streams.TypingStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.TypingPosition, currentPos.TypingPosition,
|
||||
),
|
||||
ReceiptPosition: rp.streams.ReceiptStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition,
|
||||
),
|
||||
InvitePosition: rp.streams.InviteStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.InvitePosition, currentPos.InvitePosition,
|
||||
),
|
||||
SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition,
|
||||
),
|
||||
AccountDataPosition: rp.streams.AccountDataStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition,
|
||||
),
|
||||
NotificationDataPosition: rp.streams.NotificationDataStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition,
|
||||
),
|
||||
DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
|
||||
),
|
||||
PresencePosition: rp.streams.PresenceStreamProvider.IncrementalSync(
|
||||
syncReq.Context, syncReq,
|
||||
syncReq.Since.PresencePosition, currentPos.PresencePosition,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: syncReq.Response,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -350,6 +350,19 @@ type Response struct {
|
|||
DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"`
|
||||
}
|
||||
|
||||
func (r *Response) HasUpdates() bool {
|
||||
// purposefully exclude DeviceListsOTKCount as we always include them
|
||||
return (len(r.AccountData.Events) > 0 ||
|
||||
len(r.Presence.Events) > 0 ||
|
||||
len(r.Rooms.Invite) > 0 ||
|
||||
len(r.Rooms.Join) > 0 ||
|
||||
len(r.Rooms.Leave) > 0 ||
|
||||
len(r.Rooms.Peek) > 0 ||
|
||||
len(r.ToDevice.Events) > 0 ||
|
||||
len(r.DeviceLists.Changed) > 0 ||
|
||||
len(r.DeviceLists.Left) > 0)
|
||||
}
|
||||
|
||||
// NewResponse creates an empty response with initialised maps.
|
||||
func NewResponse() *Response {
|
||||
res := Response{}
|
||||
|
|
|
|||
Loading…
Reference in a new issue