Merge pull request #22735 from resouer/throttle-dev

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-03-26 06:44:48 -07:00
commit e44ad7a083
15 changed files with 51 additions and 49 deletions

View File

@ -56,6 +56,7 @@ import (
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flag" "k8s.io/kubernetes/pkg/util/flag"
"k8s.io/kubernetes/pkg/util/flowcontrol"
utilnet "k8s.io/kubernetes/pkg/util/net" utilnet "k8s.io/kubernetes/pkg/util/net"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
@ -201,7 +202,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
go replicationcontroller.NewReplicationManager(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096). go replicationcontroller.NewReplicationManager(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096).
Run(3, wait.NeverStop) Run(3, wait.NeverStop)
nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(), nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
40*time.Second, 60*time.Second, 5*time.Second, nil, false) 40*time.Second, 60*time.Second, 5*time.Second, nil, false)
nodeController.Run(5 * time.Second) nodeController.Run(5 * time.Second)
cadvisorInterface := new(cadvisortest.Fake) cadvisorInterface := new(cadvisortest.Fake)

View File

@ -64,9 +64,9 @@ import (
"k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/healthz"
quotainstall "k8s.io/kubernetes/pkg/quota/install" quotainstall "k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/util/configz"
"k8s.io/kubernetes/pkg/util/crypto" "k8s.io/kubernetes/pkg/util/crypto"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog" "github.com/golang/glog"
@ -207,8 +207,8 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
// this cidr has been validated already // this cidr has been validated already
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR) _, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, s.AllocateNodeCIDRs) s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod.Duration) nodeController.Run(s.NodeSyncPeriod.Duration)

View File

@ -58,8 +58,8 @@ import (
"k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/healthz"
quotainstall "k8s.io/kubernetes/pkg/quota/install" quotainstall "k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/crypto" "k8s.io/kubernetes/pkg/util/crypto"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/contrib/mesos/pkg/profile" "k8s.io/kubernetes/contrib/mesos/pkg/profile"
@ -154,8 +154,8 @@ func (s *CMServer) Run(_ []string) error {
} }
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR) _, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, s.AllocateNodeCIDRs) s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod.Duration) nodeController.Run(s.NodeSyncPeriod.Duration)

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
) )
const ( const (
@ -53,7 +54,7 @@ type RESTClient struct {
contentConfig ContentConfig contentConfig ContentConfig
// TODO extract this into a wrapper interface via the RESTClient interface in kubectl. // TODO extract this into a wrapper interface via the RESTClient interface in kubectl.
Throttle util.RateLimiter Throttle flowcontrol.RateLimiter
// Set specific behavior of the client. If not set http.DefaultClient will be used. // Set specific behavior of the client. If not set http.DefaultClient will be used.
Client *http.Client Client *http.Client
@ -77,9 +78,9 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf
config.ContentType = "application/json" config.ContentType = "application/json"
} }
var throttle util.RateLimiter var throttle flowcontrol.RateLimiter
if maxQPS > 0 { if maxQPS > 0 {
throttle = util.NewTokenBucketRateLimiter(maxQPS, maxBurst) throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst)
} }
return &RESTClient{ return &RESTClient{
base: &base, base: &base,

View File

@ -39,7 +39,7 @@ import (
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -117,11 +117,11 @@ type Request struct {
resp *http.Response resp *http.Response
backoffMgr BackoffManager backoffMgr BackoffManager
throttle util.RateLimiter throttle flowcontrol.RateLimiter
} }
// NewRequest creates a new request helper object for accessing runtime.Objects on a server. // NewRequest creates a new request helper object for accessing runtime.Objects on a server.
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, backoff BackoffManager, throttle util.RateLimiter) *Request { func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, backoff BackoffManager, throttle flowcontrol.RateLimiter) *Request {
if backoff == nil { if backoff == nil {
glog.V(2).Infof("Not implementing request backoff strategy.") glog.V(2).Infof("Not implementing request backoff strategy.")
backoff = &NoBackoff{} backoff = &NoBackoff{}

View File

@ -33,8 +33,8 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
utilerrors "k8s.io/kubernetes/pkg/util/errors" utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/flowcontrol"
netsets "k8s.io/kubernetes/pkg/util/net/sets" netsets "k8s.io/kubernetes/pkg/util/net/sets"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
@ -80,7 +80,7 @@ type GCECloud struct {
managedZones []string // List of zones we are spanning (for Ubernetes-Lite, primarily when running on master) managedZones []string // List of zones we are spanning (for Ubernetes-Lite, primarily when running on master)
networkURL string networkURL string
useMetadataServer bool useMetadataServer bool
operationPollRateLimiter util.RateLimiter operationPollRateLimiter flowcontrol.RateLimiter
} }
type Config struct { type Config struct {
@ -297,7 +297,7 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
glog.Infof("managing multiple zones: %v", managedZones) glog.Infof("managing multiple zones: %v", managedZones)
} }
operationPollRateLimiter := util.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size. operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size.
return &GCECloud{ return &GCECloud{
service: svc, service: svc,

View File

@ -22,7 +22,7 @@ import (
"strings" "strings"
"time" "time"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flowcontrol"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"golang.org/x/oauth2" "golang.org/x/oauth2"
@ -61,7 +61,7 @@ type altTokenSource struct {
oauthClient *http.Client oauthClient *http.Client
tokenURL string tokenURL string
tokenBody string tokenBody string
throttle util.RateLimiter throttle flowcontrol.RateLimiter
} }
func (a *altTokenSource) Token() (*oauth2.Token, error) { func (a *altTokenSource) Token() (*oauth2.Token, error) {
@ -106,7 +106,7 @@ func newAltTokenSource(tokenURL, tokenBody string) oauth2.TokenSource {
oauthClient: client, oauthClient: client,
tokenURL: tokenURL, tokenURL: tokenURL,
tokenBody: tokenBody, tokenBody: tokenBody,
throttle: util.NewTokenBucketRateLimiter(tokenURLQPS, tokenURLBurst), throttle: flowcontrol.NewTokenBucketRateLimiter(tokenURLQPS, tokenURLBurst),
} }
return oauth2.ReuseTokenSource(nil, a) return oauth2.ReuseTokenSource(nil, a)
} }

View File

@ -40,7 +40,7 @@ import (
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flowcontrol"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
@ -69,7 +69,7 @@ type NodeController struct {
allocateNodeCIDRs bool allocateNodeCIDRs bool
cloud cloudprovider.Interface cloud cloudprovider.Interface
clusterCIDR *net.IPNet clusterCIDR *net.IPNet
deletingPodsRateLimiter util.RateLimiter deletingPodsRateLimiter flowcontrol.RateLimiter
knownNodeSet sets.String knownNodeSet sets.String
kubeClient clientset.Interface kubeClient clientset.Interface
// Method for easy mocking in unittest. // Method for easy mocking in unittest.
@ -129,8 +129,8 @@ func NewNodeController(
cloud cloudprovider.Interface, cloud cloudprovider.Interface,
kubeClient clientset.Interface, kubeClient clientset.Interface,
podEvictionTimeout time.Duration, podEvictionTimeout time.Duration,
deletionEvictionLimiter util.RateLimiter, deletionEvictionLimiter flowcontrol.RateLimiter,
terminationEvictionLimiter util.RateLimiter, terminationEvictionLimiter flowcontrol.RateLimiter,
nodeMonitorGracePeriod time.Duration, nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration, nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration, nodeMonitorPeriod time.Duration,

View File

@ -31,8 +31,8 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/diff" "k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
@ -418,7 +418,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, nodeController := NewNodeController(nil, item.fakeNodeHandler,
evictionTimeout, util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, evictionTimeout, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow } nodeController.now = func() unversioned.Time { return fakeNow }
for _, ds := range item.daemonSets { for _, ds := range item.daemonSets {
@ -487,7 +487,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
deleteWaitChan: make(chan struct{}), deleteWaitChan: make(chan struct{}),
} }
nodeController := NewNodeController(nil, fnh, 10*time.Minute, nodeController := NewNodeController(nil, fnh, 10*time.Minute,
util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, false) testNodeMonitorPeriod, nil, false)
nodeController.cloud = &fakecloud.FakeCloud{} nodeController.cloud = &fakecloud.FakeCloud{}
@ -720,8 +720,8 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
} }
for i, item := range table { for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeAlwaysRateLimiter(), nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(),
util.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow } nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil { if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -870,8 +870,8 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
} }
for i, item := range table { for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeAlwaysRateLimiter(), nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(),
util.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow } nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil { if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("Case[%d] unexpected error: %v", i, err) t.Errorf("Case[%d] unexpected error: %v", i, err)
@ -952,7 +952,7 @@ func TestNodeDeletion(t *testing.T) {
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}), Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
} }
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(), nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow } nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil { if err := nodeController.monitorNodeStatus(); err != nil {

View File

@ -22,7 +22,7 @@ import (
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
@ -137,11 +137,11 @@ func (q *UniqueQueue) Head() (TimedValue, bool) {
// of execution. It is also rate limited. // of execution. It is also rate limited.
type RateLimitedTimedQueue struct { type RateLimitedTimedQueue struct {
queue UniqueQueue queue UniqueQueue
limiter util.RateLimiter limiter flowcontrol.RateLimiter
} }
// Creates new queue which will use given RateLimiter to oversee execution. // Creates new queue which will use given RateLimiter to oversee execution.
func NewRateLimitedTimedQueue(limiter util.RateLimiter) *RateLimitedTimedQueue { func NewRateLimitedTimedQueue(limiter flowcontrol.RateLimiter) *RateLimitedTimedQueue {
return &RateLimitedTimedQueue{ return &RateLimitedTimedQueue{
queue: UniqueQueue{ queue: UniqueQueue{
queue: TimedQueue{}, queue: TimedQueue{},

View File

@ -21,7 +21,7 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
@ -39,7 +39,7 @@ func CheckSetEq(lhs, rhs sets.String) bool {
} }
func TestAddNode(t *testing.T) { func TestAddNode(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first") evictor.Add("first")
evictor.Add("second") evictor.Add("second")
evictor.Add("third") evictor.Add("third")
@ -62,7 +62,7 @@ func TestAddNode(t *testing.T) {
} }
func TestDelNode(t *testing.T) { func TestDelNode(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first") evictor.Add("first")
evictor.Add("second") evictor.Add("second")
evictor.Add("third") evictor.Add("third")
@ -84,7 +84,7 @@ func TestDelNode(t *testing.T) {
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern) t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
} }
evictor = NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) evictor = NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first") evictor.Add("first")
evictor.Add("second") evictor.Add("second")
evictor.Add("third") evictor.Add("third")
@ -106,7 +106,7 @@ func TestDelNode(t *testing.T) {
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern) t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
} }
evictor = NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) evictor = NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first") evictor.Add("first")
evictor.Add("second") evictor.Add("second")
evictor.Add("third") evictor.Add("third")
@ -130,7 +130,7 @@ func TestDelNode(t *testing.T) {
} }
func TestTry(t *testing.T) { func TestTry(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first") evictor.Add("first")
evictor.Add("second") evictor.Add("second")
evictor.Add("third") evictor.Add("third")
@ -152,7 +152,7 @@ func TestTry(t *testing.T) {
} }
func TestTryOrdering(t *testing.T) { func TestTryOrdering(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first") evictor.Add("first")
evictor.Add("second") evictor.Add("second")
evictor.Add("third") evictor.Add("third")
@ -184,7 +184,7 @@ func TestTryOrdering(t *testing.T) {
} }
func TestTryRemovingWhileTry(t *testing.T) { func TestTryRemovingWhileTry(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first") evictor.Add("first")
evictor.Add("second") evictor.Add("second")
evictor.Add("third") evictor.Add("third")

View File

@ -32,8 +32,8 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/leaky" "k8s.io/kubernetes/pkg/kubelet/leaky"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
utilerrors "k8s.io/kubernetes/pkg/util/errors" utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/parsers" "k8s.io/kubernetes/pkg/util/parsers"
) )
@ -107,7 +107,7 @@ type dockerPuller struct {
type throttledDockerPuller struct { type throttledDockerPuller struct {
puller dockerPuller puller dockerPuller
limiter util.RateLimiter limiter flowcontrol.RateLimiter
} }
// newDockerPuller creates a new instance of the default implementation of DockerPuller. // newDockerPuller creates a new instance of the default implementation of DockerPuller.
@ -122,7 +122,7 @@ func newDockerPuller(client DockerInterface, qps float32, burst int) DockerPulle
} }
return &throttledDockerPuller{ return &throttledDockerPuller{
puller: dp, puller: dp,
limiter: util.NewTokenBucketRateLimiter(qps, burst), limiter: flowcontrol.NewTokenBucketRateLimiter(qps, burst),
} }
} }

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package util package flowcontrol
import ( import (
"sync" "sync"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package util package flowcontrol
import ( import (
"math" "math"

View File

@ -26,7 +26,7 @@ import (
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -66,7 +66,7 @@ var _ = KubeDescribe("Service endpoints latency", func() {
// Turn off rate limiting--it interferes with our measurements. // Turn off rate limiting--it interferes with our measurements.
oldThrottle := f.Client.RESTClient.Throttle oldThrottle := f.Client.RESTClient.Throttle
f.Client.RESTClient.Throttle = util.NewFakeAlwaysRateLimiter() f.Client.RESTClient.Throttle = flowcontrol.NewFakeAlwaysRateLimiter()
defer func() { f.Client.RESTClient.Throttle = oldThrottle }() defer func() { f.Client.RESTClient.Throttle = oldThrottle }()
failing := sets.NewString() failing := sets.NewString()