go-throttle/throttle.go

98 lines
1.7 KiB
Go
Raw Normal View History

2021-01-18 21:13:16 +00:00
package throttle
import (
"context"
"github.com/reactivex/rxgo/v2"
"math"
"sync"
"time"
)
type Throttle struct {
lastTimestamp int64
queueChan chan *req
config Config
cfgMut sync.Mutex
running bool
ctx context.Context
}
func NewThrottle(ctx context.Context, cfg *Config) *Throttle {
if cfg == nil {
cfg = &Config{
NumTokens: 0,
Delay: 0.001,
RefillRate: 0.001,
DefaultCost: 1.000,
Capacity: 1.000,
}
}
return &Throttle{
lastTimestamp: -1,
queueChan: make(chan *req, cfg.MaxCapacity),
config: *cfg,
running: false,
ctx: ctx,
}
}
func (t *Throttle) run() {
go func() {
for {
select {
case <-t.ctx.Done(): {
return
}
case r:=<-t.queueChan: {
for {
if t.config.NumTokens > 0 || t.config.RefillRate == 0 {
r.resolveChan <- rxgo.Of(true)
close(r.resolveChan)
var cost float64 = 0
if r.cost != -1 {
cost = r.cost
} else {
cost = t.config.DefaultCost
}
t.config.NumTokens -= cost
break
}
if t.lastTimestamp == -1 {
t.lastTimestamp = time.Now().Unix()
}
now := time.Now().Unix()
elapsed := now - t.lastTimestamp
t.lastTimestamp = now
t.config.NumTokens = math.Min(t.config.Capacity, t.config.NumTokens+float64(elapsed)*t.config.RefillRate)
time.Sleep(t.config.Delay)
}
}
}
}
}()
}
func (t *Throttle) Take(rateLimit float64, cost float64) rxgo.Observable {
t.cfgMut.Lock()
defer t.cfgMut.Unlock()
ch := make(chan rxgo.Item)
t.queueChan <- &req{
cost: cost,
resolveChan: ch,
}
if rateLimit == 0 {
t.config.RefillRate = 0
} else {
t.config.RefillRate = 1/rateLimit
}
return rxgo.FromChannel(ch)
}
type req struct {
cost float64
resolveChan chan rxgo.Item
}