47ec28f3b1
This is a general purpose tool that happens to contain some jar specific features. Change-Id: I05f4654d4517c245ad7a3c15492e0d2368bbf64f
115 lines
2.9 KiB
Go
115 lines
2.9 KiB
Go
// Copyright 2016 Google Inc. All rights reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package main
|
|
|
|
import (
|
|
"runtime"
|
|
)
|
|
|
|
type RateLimit struct {
|
|
requests chan struct{}
|
|
finished chan int
|
|
released chan int
|
|
stop chan struct{}
|
|
}
|
|
|
|
// NewRateLimit starts a new rate limiter with maxExecs number of executions
|
|
// allowed to happen at a time. If maxExecs is <= 0, it will default to the
|
|
// number of logical CPUs on the system.
|
|
//
|
|
// With Finish and Release, we'll keep track of outstanding buffer sizes to be
|
|
// written. If that size goes above maxMem, we'll prevent starting new
|
|
// executions.
|
|
//
|
|
// The total memory use may be higher due to current executions. This just
|
|
// prevents runaway memory use due to slower writes.
|
|
func NewRateLimit(maxExecs int, maxMem int64) *RateLimit {
|
|
if maxExecs <= 0 {
|
|
maxExecs = runtime.NumCPU()
|
|
}
|
|
if maxMem <= 0 {
|
|
// Default to 512MB
|
|
maxMem = 512 * 1024 * 1024
|
|
}
|
|
|
|
ret := &RateLimit{
|
|
requests: make(chan struct{}),
|
|
|
|
// Let all of the pending executions to mark themselves as finished,
|
|
// even if our goroutine isn't processing input.
|
|
finished: make(chan int, maxExecs),
|
|
|
|
released: make(chan int),
|
|
stop: make(chan struct{}),
|
|
}
|
|
|
|
go ret.goFunc(maxExecs, maxMem)
|
|
|
|
return ret
|
|
}
|
|
|
|
// RequestExecution blocks until another execution can be allowed to run.
|
|
func (r *RateLimit) RequestExecution() Execution {
|
|
<-r.requests
|
|
return r.finished
|
|
}
|
|
|
|
type Execution chan<- int
|
|
|
|
// Finish will mark your execution as finished, and allow another request to be
|
|
// approved.
|
|
//
|
|
// bufferSize may be specified to count memory buffer sizes, and must be
|
|
// matched with calls to RateLimit.Release to mark the buffers as released.
|
|
func (e Execution) Finish(bufferSize int) {
|
|
e <- bufferSize
|
|
}
|
|
|
|
// Call Release when finished with a buffer recorded with Finish.
|
|
func (r *RateLimit) Release(bufferSize int) {
|
|
r.released <- bufferSize
|
|
}
|
|
|
|
// Stop the background goroutine
|
|
func (r *RateLimit) Stop() {
|
|
close(r.stop)
|
|
}
|
|
|
|
func (r *RateLimit) goFunc(maxExecs int, maxMem int64) {
|
|
var curExecs int
|
|
var curMemory int64
|
|
|
|
for {
|
|
var requests chan struct{}
|
|
if curExecs < maxExecs && curMemory < maxMem {
|
|
requests = r.requests
|
|
}
|
|
|
|
select {
|
|
case requests <- struct{}{}:
|
|
curExecs++
|
|
case amount := <-r.finished:
|
|
curExecs--
|
|
curMemory += int64(amount)
|
|
if curExecs < 0 {
|
|
panic("curExecs < 0")
|
|
}
|
|
case amount := <-r.released:
|
|
curMemory -= int64(amount)
|
|
case <-r.stop:
|
|
return
|
|
}
|
|
}
|
|
}
|