Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
243 lines
6.6 KiB
243 lines
6.6 KiB
// Copyright 2015 Matthew Holt |
|
// |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package certmagic |
|
|
|
import ( |
|
"context" |
|
"log" |
|
"runtime" |
|
"sync" |
|
"time" |
|
) |
|
|
|
// NewRateLimiter returns a rate limiter that allows up to maxEvents |
|
// in a sliding window of size window. If maxEvents and window are |
|
// both 0, or if maxEvents is non-zero and window is 0, rate limiting |
|
// is disabled. This function panics if maxEvents is less than 0 or |
|
// if maxEvents is 0 and window is non-zero, which is considered to be |
|
// an invalid configuration, as it would never allow events. |
|
func NewRateLimiter(maxEvents int, window time.Duration) *RingBufferRateLimiter { |
|
if maxEvents < 0 { |
|
panic("maxEvents cannot be less than zero") |
|
} |
|
if maxEvents == 0 && window != 0 { |
|
panic("invalid configuration: maxEvents = 0 and window != 0 would not allow any events") |
|
} |
|
rbrl := &RingBufferRateLimiter{ |
|
window: window, |
|
ring: make([]time.Time, maxEvents), |
|
started: make(chan struct{}), |
|
stopped: make(chan struct{}), |
|
ticket: make(chan struct{}), |
|
} |
|
go rbrl.loop() |
|
<-rbrl.started // make sure loop is ready to receive before we return |
|
return rbrl |
|
} |
|
|
|
// RingBufferRateLimiter uses a ring to enforce rate limits |
|
// consisting of a maximum number of events within a single |
|
// sliding window of a given duration. An empty value is |
|
// not valid; use NewRateLimiter to get one. |
|
type RingBufferRateLimiter struct { |
|
window time.Duration |
|
ring []time.Time // maxEvents == len(ring) |
|
cursor int // always points to the oldest timestamp |
|
mu sync.Mutex // protects ring, cursor, and window |
|
started chan struct{} |
|
stopped chan struct{} |
|
ticket chan struct{} |
|
} |
|
|
|
// Stop cleans up r's scheduling goroutine. |
|
func (r *RingBufferRateLimiter) Stop() { |
|
close(r.stopped) |
|
} |
|
|
|
func (r *RingBufferRateLimiter) loop() { |
|
defer func() { |
|
if err := recover(); err != nil { |
|
buf := make([]byte, stackTraceBufferSize) |
|
buf = buf[:runtime.Stack(buf, false)] |
|
log.Printf("panic: ring buffer rate limiter: %v\n%s", err, buf) |
|
} |
|
}() |
|
|
|
for { |
|
// if we've been stopped, return |
|
select { |
|
case <-r.stopped: |
|
return |
|
default: |
|
} |
|
|
|
if len(r.ring) == 0 { |
|
if r.window == 0 { |
|
// rate limiting is disabled; always allow immediately |
|
r.permit() |
|
continue |
|
} |
|
panic("invalid configuration: maxEvents = 0 and window != 0 does not allow any events") |
|
} |
|
|
|
// wait until next slot is available or until we've been stopped |
|
r.mu.Lock() |
|
then := r.ring[r.cursor].Add(r.window) |
|
r.mu.Unlock() |
|
waitDuration := time.Until(then) |
|
waitTimer := time.NewTimer(waitDuration) |
|
select { |
|
case <-waitTimer.C: |
|
r.permit() |
|
case <-r.stopped: |
|
waitTimer.Stop() |
|
return |
|
} |
|
} |
|
} |
|
|
|
// Allow returns true if the event is allowed to |
|
// happen right now. It does not wait. If the event |
|
// is allowed, a ticket is claimed. |
|
func (r *RingBufferRateLimiter) Allow() bool { |
|
select { |
|
case <-r.ticket: |
|
return true |
|
default: |
|
return false |
|
} |
|
} |
|
|
|
// Wait blocks until the event is allowed to occur. It returns an |
|
// error if the context is cancelled. |
|
func (r *RingBufferRateLimiter) Wait(ctx context.Context) error { |
|
select { |
|
case <-ctx.Done(): |
|
return context.Canceled |
|
case <-r.ticket: |
|
return nil |
|
} |
|
} |
|
|
|
// MaxEvents returns the maximum number of events that |
|
// are allowed within the sliding window. |
|
func (r *RingBufferRateLimiter) MaxEvents() int { |
|
r.mu.Lock() |
|
defer r.mu.Unlock() |
|
return len(r.ring) |
|
} |
|
|
|
// SetMaxEvents changes the maximum number of events that are |
|
// allowed in the sliding window. If the new limit is lower, |
|
// the oldest events will be forgotten. If the new limit is |
|
// higher, the window will suddenly have capacity for new |
|
// reservations. It panics if maxEvents is 0 and window size |
|
// is not zero. |
|
func (r *RingBufferRateLimiter) SetMaxEvents(maxEvents int) { |
|
newRing := make([]time.Time, maxEvents) |
|
r.mu.Lock() |
|
defer r.mu.Unlock() |
|
|
|
if r.window != 0 && maxEvents == 0 { |
|
panic("invalid configuration: maxEvents = 0 and window != 0 would not allow any events") |
|
} |
|
|
|
// only make the change if the new limit is different |
|
if maxEvents == len(r.ring) { |
|
return |
|
} |
|
|
|
// the new ring may be smaller; fast-forward to the |
|
// oldest timestamp that will be kept in the new |
|
// ring so the oldest ones are forgotten and the |
|
// newest ones will be remembered |
|
sizeDiff := len(r.ring) - maxEvents |
|
for i := 0; i < sizeDiff; i++ { |
|
r.advance() |
|
} |
|
|
|
if len(r.ring) > 0 { |
|
// copy timestamps into the new ring until we |
|
// have either copied all of them or have reached |
|
// the capacity of the new ring |
|
startCursor := r.cursor |
|
for i := 0; i < len(newRing); i++ { |
|
newRing[i] = r.ring[r.cursor] |
|
r.advance() |
|
if r.cursor == startCursor { |
|
// new ring is larger than old one; |
|
// "we've come full circle" |
|
break |
|
} |
|
} |
|
} |
|
|
|
r.ring = newRing |
|
r.cursor = 0 |
|
} |
|
|
|
// Window returns the size of the sliding window. |
|
func (r *RingBufferRateLimiter) Window() time.Duration { |
|
r.mu.Lock() |
|
defer r.mu.Unlock() |
|
return r.window |
|
} |
|
|
|
// SetWindow changes r's sliding window duration to window. |
|
// Goroutines that are already blocked on a call to Wait() |
|
// will not be affected. It panics if window is non-zero |
|
// but the max event limit is 0. |
|
func (r *RingBufferRateLimiter) SetWindow(window time.Duration) { |
|
r.mu.Lock() |
|
defer r.mu.Unlock() |
|
if window != 0 && len(r.ring) == 0 { |
|
panic("invalid configuration: maxEvents = 0 and window != 0 would not allow any events") |
|
} |
|
r.window = window |
|
} |
|
|
|
// permit allows one event through the throttle. This method |
|
// blocks until a goroutine is waiting for a ticket or until |
|
// the rate limiter is stopped. |
|
func (r *RingBufferRateLimiter) permit() { |
|
for { |
|
select { |
|
case r.started <- struct{}{}: |
|
// notify parent goroutine that we've started; should |
|
// only happen once, before constructor returns |
|
continue |
|
case <-r.stopped: |
|
return |
|
case r.ticket <- struct{}{}: |
|
r.mu.Lock() |
|
defer r.mu.Unlock() |
|
if len(r.ring) > 0 { |
|
r.ring[r.cursor] = time.Now() |
|
r.advance() |
|
} |
|
return |
|
} |
|
} |
|
} |
|
|
|
// advance moves the cursor to the next position. |
|
// It is NOT safe for concurrent use, so it must |
|
// be called inside a lock on r.mu. |
|
func (r *RingBufferRateLimiter) advance() { |
|
r.cursor++ |
|
if r.cursor >= len(r.ring) { |
|
r.cursor = 0 |
|
} |
|
}
|
|
|