mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-10 15:43:09 -06:00
Revert "Use rate.Limiter for rate limiting"
This reverts commit 91bffae350.
This commit is contained in:
parent
0e0ab48c92
commit
5f71c84188
|
|
@ -1,7 +1,6 @@
|
||||||
package httputil
|
package httputil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -10,11 +9,10 @@ import (
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"golang.org/x/time/rate"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type RateLimits struct {
|
type RateLimits struct {
|
||||||
limits map[string]deviceRatelimit
|
limits map[string]chan struct{}
|
||||||
limitsMutex sync.RWMutex
|
limitsMutex sync.RWMutex
|
||||||
cleanMutex sync.RWMutex
|
cleanMutex sync.RWMutex
|
||||||
enabled bool
|
enabled bool
|
||||||
|
|
@ -23,14 +21,9 @@ type RateLimits struct {
|
||||||
exemptUserIDs map[string]struct{}
|
exemptUserIDs map[string]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type deviceRatelimit struct {
|
|
||||||
*rate.Limiter
|
|
||||||
lastUsed time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewRateLimits(cfg *config.RateLimiting) *RateLimits {
|
func NewRateLimits(cfg *config.RateLimiting) *RateLimits {
|
||||||
l := &RateLimits{
|
l := &RateLimits{
|
||||||
limits: make(map[string]deviceRatelimit),
|
limits: make(map[string]chan struct{}),
|
||||||
enabled: cfg.Enabled,
|
enabled: cfg.Enabled,
|
||||||
requestThreshold: cfg.Threshold,
|
requestThreshold: cfg.Threshold,
|
||||||
cooloffDuration: time.Duration(cfg.CooloffMS) * time.Millisecond,
|
cooloffDuration: time.Duration(cfg.CooloffMS) * time.Millisecond,
|
||||||
|
|
@ -48,13 +41,15 @@ func NewRateLimits(cfg *config.RateLimiting) *RateLimits {
|
||||||
func (l *RateLimits) clean() {
|
func (l *RateLimits) clean() {
|
||||||
for {
|
for {
|
||||||
// On a 30 second interval, we'll take an exclusive write
|
// On a 30 second interval, we'll take an exclusive write
|
||||||
// lock of the entire map and see if any of the limiters were used
|
// lock of the entire map and see if any of the channels are
|
||||||
// more than one minute ago. If they are then we delete them, freeing up memory.
|
// empty. If they are then we will close and delete them,
|
||||||
|
// freeing up memory.
|
||||||
time.Sleep(time.Second * 30)
|
time.Sleep(time.Second * 30)
|
||||||
l.cleanMutex.Lock()
|
l.cleanMutex.Lock()
|
||||||
l.limitsMutex.Lock()
|
l.limitsMutex.Lock()
|
||||||
for k, c := range l.limits {
|
for k, c := range l.limits {
|
||||||
if s := time.Since(c.lastUsed); s > time.Minute {
|
if len(c) == 0 {
|
||||||
|
close(c)
|
||||||
delete(l.limits, k)
|
delete(l.limits, k)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -105,33 +100,33 @@ func (l *RateLimits) Limit(req *http.Request, device *userapi.Device) *util.JSON
|
||||||
rateLimit, ok := l.limits[caller]
|
rateLimit, ok := l.limits[caller]
|
||||||
l.limitsMutex.RUnlock()
|
l.limitsMutex.RUnlock()
|
||||||
|
|
||||||
// If the caller doesn't have a rate limit yet, create one and write it
|
// If the caller doesn't have a channel, create one and write it
|
||||||
// back to the map.
|
// back to the map.
|
||||||
if !ok {
|
if !ok {
|
||||||
// Create a new limiter allowing 20 burst events, recovering 1 token every l.cooloffDuration
|
rateLimit = make(chan struct{}, l.requestThreshold)
|
||||||
lim := rate.NewLimiter(rate.Every(l.cooloffDuration), 20)
|
|
||||||
rateLimit = deviceRatelimit{Limiter: lim, lastUsed: time.Now()}
|
|
||||||
|
|
||||||
l.limitsMutex.Lock()
|
l.limitsMutex.Lock()
|
||||||
l.limits[caller] = rateLimit
|
l.limits[caller] = rateLimit
|
||||||
l.limitsMutex.Unlock()
|
l.limitsMutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
l.limitsMutex.Lock()
|
|
||||||
rateLimit.lastUsed = time.Now()
|
|
||||||
l.limits[caller] = rateLimit
|
|
||||||
l.limitsMutex.Unlock()
|
|
||||||
|
|
||||||
// Check if the user has got free resource slots for this request.
|
// Check if the user has got free resource slots for this request.
|
||||||
// If they don't then we'll try to wait until one is. If the
|
// If they don't then we'll return an error.
|
||||||
// context is canceled/done, return an error.
|
select {
|
||||||
if err := rateLimit.Wait(req.Context()); err != nil {
|
case rateLimit <- struct{}{}:
|
||||||
|
default:
|
||||||
// We hit the rate limit. Tell the client to back off.
|
// We hit the rate limit. Tell the client to back off.
|
||||||
return &util.JSONResponse{
|
return &util.JSONResponse{
|
||||||
Code: http.StatusRequestTimeout,
|
Code: http.StatusTooManyRequests,
|
||||||
JSON: jsonerror.Unknown(fmt.Sprintf("Request timed out: %s", err)),
|
JSON: jsonerror.LimitExceeded("You are sending too many requests too quickly!", l.cooloffDuration.Milliseconds()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// After the time interval, drain a resource from the rate limiting
|
||||||
|
// channel. This will free up space in the channel for new requests.
|
||||||
|
go func() {
|
||||||
|
<-time.After(l.cooloffDuration)
|
||||||
|
<-rateLimit
|
||||||
|
}()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue