From 03d2fcc82352a81370c556fadf0563258f05ba32 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Tue, 19 Jan 2021 00:13:16 +0300 Subject: [PATCH] Implement basic throttle library --- .gitignore | 1 + LICENSE | 21 ++++++++++++ README.md | 20 +++++++++++ config.go | 12 +++++++ go.mod | 5 +++ go.sum | 40 ++++++++++++++++++++++ throttle.go | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 196 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 config.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 throttle.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..757fee3 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/.idea \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..a0ed704 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 ChronosX88 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..2ce1302 --- /dev/null +++ b/README.md @@ -0,0 +1,20 @@ +# go-throttle + +The Go library for throttling some actions, for example, API requests to some network service. + +## Install + +Using Go Modules: + +``` +go get github.com/ChronosX88/go-throttle +``` + +## Usage + +TBD + +## License + +This project is licensed under the MIT license, Copyright (c) 2021 ChronosX88. For more information see [LICENSE](LICENSE) file. + diff --git a/config.go b/config.go new file mode 100644 index 0000000..599bec9 --- /dev/null +++ b/config.go @@ -0,0 +1,12 @@ +package throttle + +import "time" + +type Config struct { + NumTokens float64 + Delay time.Duration + RefillRate float64 + DefaultCost float64 + Capacity float64 + MaxCapacity int +} \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..3f08699 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module github.com/ChronosX88/go-throttle + +go 1.15 + +require github.com/reactivex/rxgo/v2 v2.4.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d2b3d1b --- /dev/null +++ b/go.sum @@ -0,0 +1,40 @@ +github.com/cenkalti/backoff/v4 v4.0.0 h1:6VeaLF9aI+MAUQ95106HwWzYZgJJpZ4stumjj6RFYAU= +github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= +github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/reactivex/rxgo/v2 v2.4.0 h1:0GNRR1dnVjEWEXWTcQjIbCuJUi2XAn2u0qTKwSHkTZk= +github.com/reactivex/rxgo/v2 v2.4.0/go.mod h1:NtZMAof7BaWwEiC4jNj7tUF+oCYOs7FnQBnzsyWwmgw= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/throttle.go b/throttle.go new file mode 100644 index 0000000..f0c5cbb --- /dev/null +++ b/throttle.go @@ -0,0 +1,97 @@ +package throttle + +import ( + "context" + "github.com/reactivex/rxgo/v2" + "math" + "sync" + "time" +) + +type Throttle struct { + lastTimestamp int64 + queueChan chan *req + config Config + cfgMut sync.Mutex + running bool + ctx context.Context +} + +func NewThrottle(ctx context.Context, cfg *Config) *Throttle { + if cfg == nil { + cfg = &Config{ + NumTokens: 0, + Delay: 0.001, + RefillRate: 0.001, + DefaultCost: 1.000, + Capacity: 1.000, + } + } + + return &Throttle{ + lastTimestamp: -1, + queueChan: make(chan *req, cfg.MaxCapacity), + config: *cfg, + running: false, + ctx: ctx, + } +} + +func (t *Throttle) run() { + go func() { + for { + select { + case <-t.ctx.Done(): { + return + } + case r:=<-t.queueChan: { + for { + if t.config.NumTokens > 0 || t.config.RefillRate == 0 { + r.resolveChan <- rxgo.Of(true) + close(r.resolveChan) + var cost float64 = 0 + if r.cost != -1 { + cost = r.cost + } else { + cost = t.config.DefaultCost + } + t.config.NumTokens -= cost + break + } + if t.lastTimestamp == -1 { + t.lastTimestamp = time.Now().Unix() + } + now := time.Now().Unix() + elapsed := now - t.lastTimestamp + t.lastTimestamp = now + t.config.NumTokens = math.Min(t.config.Capacity, t.config.NumTokens+float64(elapsed)*t.config.RefillRate) + time.Sleep(t.config.Delay) + } + } + } + } + }() + +} + +func (t *Throttle) Take(rateLimit float64, cost float64) rxgo.Observable { + t.cfgMut.Lock() + defer t.cfgMut.Unlock() + ch := make(chan rxgo.Item) + t.queueChan <- &req{ + cost: cost, + resolveChan: ch, + } + if rateLimit == 0 { + t.config.RefillRate = 0 + } else { + t.config.RefillRate = 1/rateLimit + } + return rxgo.FromChannel(ch) +} + +type req struct { + cost float64 + resolveChan chan rxgo.Item +} +