mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Kubelet should have a max think time before auto resync
The sync frequency should be part of the syncLoop and resync no less often than every X seconds. The current implementation runs even if a config update was delivered less than X seconds ago.
This commit is contained in:
parent
b43e3865b4
commit
d7f46718a8
@ -115,7 +115,6 @@ func startComponents(manifestURL string) (apiServerURL string) {
|
||||
config.NewSourceURL(manifestURL, 5*time.Second, cfg1.Channel("url"))
|
||||
myKubelet := kubelet.NewIntegrationTestKubelet(machineList[0], &fakeDocker1)
|
||||
go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0)
|
||||
go util.Forever(cfg1.Sync, 3*time.Second)
|
||||
go util.Forever(func() {
|
||||
kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), http.DefaultServeMux, "localhost", 10250)
|
||||
}, 0)
|
||||
@ -127,7 +126,6 @@ func startComponents(manifestURL string) (apiServerURL string) {
|
||||
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, 30*time.Second, cfg2.Channel("etcd"))
|
||||
otherKubelet := kubelet.NewIntegrationTestKubelet(machineList[1], &fakeDocker2)
|
||||
go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0)
|
||||
go util.Forever(cfg2.Sync, 3*time.Second)
|
||||
go util.Forever(func() {
|
||||
kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), http.DefaultServeMux, "localhost", 10251)
|
||||
}, 0)
|
||||
|
@ -148,16 +148,12 @@ func main() {
|
||||
dockerClient,
|
||||
cadvisorClient,
|
||||
etcdClient,
|
||||
*rootDirectory)
|
||||
*rootDirectory,
|
||||
*syncFrequency)
|
||||
|
||||
// start the kubelet
|
||||
go util.Forever(func() { k.Run(cfg.Updates()) }, 0)
|
||||
|
||||
// resynchronize periodically
|
||||
// TODO: make this part of PodConfig so that it is only delivered after syncFrequency has elapsed without
|
||||
// an update
|
||||
go util.Forever(cfg.Sync, *syncFrequency)
|
||||
|
||||
// start the kubelet server
|
||||
if *enableServer {
|
||||
go util.Forever(func() {
|
||||
|
@ -64,13 +64,15 @@ func NewMainKubelet(
|
||||
dc DockerInterface,
|
||||
cc CadvisorInterface,
|
||||
ec tools.EtcdClient,
|
||||
rd string) *Kubelet {
|
||||
rd string,
|
||||
ri time.Duration) *Kubelet {
|
||||
return &Kubelet{
|
||||
hostname: hn,
|
||||
dockerClient: dc,
|
||||
cadvisorClient: cc,
|
||||
etcdClient: ec,
|
||||
rootDirectory: rd,
|
||||
resyncInterval: ri,
|
||||
podWorkers: newPodWorkers(),
|
||||
}
|
||||
}
|
||||
@ -79,19 +81,21 @@ func NewMainKubelet(
|
||||
// TODO: add more integration tests, and expand parameter list as needed.
|
||||
func NewIntegrationTestKubelet(hn string, dc DockerInterface) *Kubelet {
|
||||
return &Kubelet{
|
||||
hostname: hn,
|
||||
dockerClient: dc,
|
||||
dockerPuller: &FakeDockerPuller{},
|
||||
podWorkers: newPodWorkers(),
|
||||
hostname: hn,
|
||||
dockerClient: dc,
|
||||
dockerPuller: &FakeDockerPuller{},
|
||||
resyncInterval: 3 * time.Second,
|
||||
podWorkers: newPodWorkers(),
|
||||
}
|
||||
}
|
||||
|
||||
// Kubelet is the main kubelet implementation.
|
||||
type Kubelet struct {
|
||||
hostname string
|
||||
dockerClient DockerInterface
|
||||
rootDirectory string
|
||||
podWorkers podWorkers
|
||||
hostname string
|
||||
dockerClient DockerInterface
|
||||
rootDirectory string
|
||||
podWorkers podWorkers
|
||||
resyncInterval time.Duration
|
||||
|
||||
// Optional, no events will be sent without it
|
||||
etcdClient tools.EtcdClient
|
||||
@ -561,14 +565,15 @@ func filterHostPortConflicts(pods []Pod) []Pod {
|
||||
// no changes are seen to the configuration, will synchronize the last known desired
|
||||
// state every sync_frequency seconds. Never returns.
|
||||
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
|
||||
var pods []Pod
|
||||
for {
|
||||
var pods []Pod
|
||||
select {
|
||||
case u := <-updates:
|
||||
switch u.Op {
|
||||
case SET:
|
||||
glog.Infof("Containers changed [%s]", kl.hostname)
|
||||
pods = u.Pods
|
||||
pods = filterHostPortConflicts(pods)
|
||||
|
||||
case UPDATE:
|
||||
//TODO: implement updates of containers
|
||||
@ -578,10 +583,12 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
|
||||
default:
|
||||
panic("syncLoop does not support incremental changes")
|
||||
}
|
||||
case <-time.After(kl.resyncInterval):
|
||||
if pods == nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
pods = filterHostPortConflicts(pods)
|
||||
|
||||
err := handler.SyncPods(pods)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't sync containers : %v", err)
|
||||
|
Loading…
Reference in New Issue
Block a user