mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 06:54:01 +00:00
Make data dirs for all pods when syncing
This commit is contained in:
parent
c93fb4e309
commit
ca89aa6528
@ -39,6 +39,7 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
|
||||||
"github.com/fsouza/go-dockerclient"
|
"github.com/fsouza/go-dockerclient"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
@ -76,13 +77,16 @@ func NewMainKubelet(
|
|||||||
sourceReady SourceReadyFn,
|
sourceReady SourceReadyFn,
|
||||||
clusterDomain string,
|
clusterDomain string,
|
||||||
clusterDNS net.IP) (*Kubelet, error) {
|
clusterDNS net.IP) (*Kubelet, error) {
|
||||||
|
if rootDirectory == "" {
|
||||||
|
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
|
||||||
|
}
|
||||||
if resyncInterval <= 0 {
|
if resyncInterval <= 0 {
|
||||||
return nil, fmt.Errorf("invalid sync frequency %d", resyncInterval)
|
return nil, fmt.Errorf("invalid sync frequency %d", resyncInterval)
|
||||||
}
|
}
|
||||||
if minimumGCAge <= 0 {
|
if minimumGCAge <= 0 {
|
||||||
return nil, fmt.Errorf("invalid minimum GC age %d", minimumGCAge)
|
return nil, fmt.Errorf("invalid minimum GC age %d", minimumGCAge)
|
||||||
}
|
}
|
||||||
return &Kubelet{
|
klet := &Kubelet{
|
||||||
hostname: hostname,
|
hostname: hostname,
|
||||||
dockerClient: dockerClient,
|
dockerClient: dockerClient,
|
||||||
etcdClient: etcdClient,
|
etcdClient: etcdClient,
|
||||||
@ -100,7 +104,13 @@ func NewMainKubelet(
|
|||||||
sourceReady: sourceReady,
|
sourceReady: sourceReady,
|
||||||
clusterDomain: clusterDomain,
|
clusterDomain: clusterDomain,
|
||||||
clusterDNS: clusterDNS,
|
clusterDNS: clusterDNS,
|
||||||
}, nil
|
}
|
||||||
|
|
||||||
|
if err := klet.setupDataDirs(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return klet, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type httpGetter interface {
|
type httpGetter interface {
|
||||||
@ -234,6 +244,32 @@ func dirExists(path string) bool {
|
|||||||
return s.IsDir()
|
return s.IsDir()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kl *Kubelet) setupDataDirs() error {
|
||||||
|
kl.rootDirectory = path.Clean(kl.rootDirectory)
|
||||||
|
if err := os.MkdirAll(kl.GetRootDir(), 0750); err != nil {
|
||||||
|
return fmt.Errorf("error creating root directory: %v", err)
|
||||||
|
}
|
||||||
|
if err := os.MkdirAll(kl.GetPodsDir(), 0750); err != nil {
|
||||||
|
return fmt.Errorf("error creating pods directory: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get a list of pods that have data directories.
|
||||||
|
func (kl *Kubelet) listPodsFromDisk() ([]string, error) {
|
||||||
|
podInfos, err := ioutil.ReadDir(kl.GetPodsDir())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
pods := []string{}
|
||||||
|
for i := range podInfos {
|
||||||
|
if podInfos[i].IsDir() {
|
||||||
|
pods = append(pods, podInfos[i].Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return pods, nil
|
||||||
|
}
|
||||||
|
|
||||||
type ByCreated []*docker.Container
|
type ByCreated []*docker.Container
|
||||||
|
|
||||||
func (a ByCreated) Len() int { return len(a) }
|
func (a ByCreated) Len() int { return len(a) }
|
||||||
@ -834,6 +870,14 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
|
|||||||
killedContainers := make(map[dockertools.DockerID]empty)
|
killedContainers := make(map[dockertools.DockerID]empty)
|
||||||
glog.V(4).Infof("Syncing Pod, podFullName: %q, uuid: %q", podFullName, uuid)
|
glog.V(4).Infof("Syncing Pod, podFullName: %q, uuid: %q", podFullName, uuid)
|
||||||
|
|
||||||
|
// Make data dirs.
|
||||||
|
if err := os.Mkdir(kl.GetPodDir(uuid), 0750); err != nil && !os.IsExist(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := os.Mkdir(kl.GetPodVolumesDir(uuid), 0750); err != nil && !os.IsExist(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Make sure we have a network container
|
// Make sure we have a network container
|
||||||
var netID dockertools.DockerID
|
var netID dockertools.DockerID
|
||||||
if netDockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uuid, networkContainerName); found {
|
if netDockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uuid, networkContainerName); found {
|
||||||
@ -1008,9 +1052,30 @@ func getDesiredVolumes(pods []api.BoundPod) map[string]api.Volume {
|
|||||||
return desiredVolumes
|
return desiredVolumes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kl *Kubelet) cleanupOrphanedPods(pods []api.BoundPod) error {
|
||||||
|
desired := util.NewStringSet()
|
||||||
|
for i := range pods {
|
||||||
|
desired.Insert(pods[i].UID)
|
||||||
|
}
|
||||||
|
found, err := kl.listPodsFromDisk()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
errlist := []error{}
|
||||||
|
for i := range found {
|
||||||
|
if !desired.Has(found[i]) {
|
||||||
|
glog.V(3).Infof("Orphaned pod %q found, removing", found[i])
|
||||||
|
if err := os.RemoveAll(kl.GetPodDir(found[i])); err != nil {
|
||||||
|
errlist = append(errlist, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return errors.NewAggregate(errlist)
|
||||||
|
}
|
||||||
|
|
||||||
// Compares the map of current volumes to the map of desired volumes.
|
// Compares the map of current volumes to the map of desired volumes.
|
||||||
// If an active volume does not have a respective desired volume, clean it up.
|
// If an active volume does not have a respective desired volume, clean it up.
|
||||||
func (kl *Kubelet) reconcileVolumes(pods []api.BoundPod) error {
|
func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.BoundPod) error {
|
||||||
desiredVolumes := getDesiredVolumes(pods)
|
desiredVolumes := getDesiredVolumes(pods)
|
||||||
currentVolumes := volume.GetCurrentVolumes(kl.rootDirectory)
|
currentVolumes := volume.GetCurrentVolumes(kl.rootDirectory)
|
||||||
for name, vol := range currentVolumes {
|
for name, vol := range currentVolumes {
|
||||||
@ -1087,8 +1152,17 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove any orphaned pods.
|
||||||
|
err = kl.cleanupOrphanedPods(pods)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Remove any orphaned volumes.
|
// Remove any orphaned volumes.
|
||||||
kl.reconcileVolumes(pods)
|
err = kl.cleanupOrphanedVolumes(pods)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -66,7 +66,12 @@ func (d *testDocker) InspectContainer(id string) (*docker.Container, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestRunOnce(t *testing.T) {
|
func TestRunOnce(t *testing.T) {
|
||||||
kb := &Kubelet{}
|
kb := &Kubelet{
|
||||||
|
rootDirectory: "/tmp/kubelet",
|
||||||
|
}
|
||||||
|
if err := kb.setupDataDirs(); err != nil {
|
||||||
|
t.Errorf("Failed to init data dirs: %v", err)
|
||||||
|
}
|
||||||
podContainers := []docker.APIContainers{
|
podContainers := []docker.APIContainers{
|
||||||
{
|
{
|
||||||
Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&api.Container{Name: "bar"}), 16) + "_foo.new.test_12345678_42"},
|
Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&api.Container{Name: "bar"}), 16) + "_foo.new.test_12345678_42"},
|
||||||
|
@ -18,8 +18,6 @@ package kubelet
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
|
||||||
"path"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
@ -70,17 +68,6 @@ func EtcdClientOrDie(etcdServerList util.StringList, etcdConfigFile string) *etc
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: move this into pkg/util
|
|
||||||
func SetupRootDirectoryOrDie(rootDirectory string) {
|
|
||||||
if rootDirectory == "" {
|
|
||||||
glog.Fatal("Invalid root directory path.")
|
|
||||||
}
|
|
||||||
rootDirectory = path.Clean(rootDirectory)
|
|
||||||
if err := os.MkdirAll(rootDirectory, 0750); err != nil {
|
|
||||||
glog.Fatalf("Error creating root directory: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: move this into pkg/capabilities
|
// TODO: move this into pkg/capabilities
|
||||||
func SetupCapabilities(allowPrivileged bool) {
|
func SetupCapabilities(allowPrivileged bool) {
|
||||||
capabilities.Initialize(capabilities.Capabilities{
|
capabilities.Initialize(capabilities.Capabilities{
|
||||||
|
@ -179,9 +179,6 @@ func RunKubelet(kcfg *KubeletConfig) {
|
|||||||
kubelet.SetupCapabilities(kcfg.AllowPrivileged)
|
kubelet.SetupCapabilities(kcfg.AllowPrivileged)
|
||||||
|
|
||||||
kcfg.Hostname = util.GetHostname(kcfg.HostnameOverride)
|
kcfg.Hostname = util.GetHostname(kcfg.HostnameOverride)
|
||||||
if len(kcfg.RootDirectory) > 0 {
|
|
||||||
kubelet.SetupRootDirectoryOrDie(kcfg.RootDirectory)
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg := makePodSourceConfig(kcfg)
|
cfg := makePodSourceConfig(kcfg)
|
||||||
k, err := createAndInitKubelet(kcfg, cfg)
|
k, err := createAndInitKubelet(kcfg, cfg)
|
||||||
|
Loading…
Reference in New Issue
Block a user