mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-04 07:49:35 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			245 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			245 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2016 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
// Reads the pod configuration from file or a directory of files.
 | 
						|
package config
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"os"
 | 
						|
	"path/filepath"
 | 
						|
	"sort"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"k8s.io/klog"
 | 
						|
 | 
						|
	"k8s.io/api/core/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/types"
 | 
						|
	"k8s.io/client-go/tools/cache"
 | 
						|
	api "k8s.io/kubernetes/pkg/apis/core"
 | 
						|
	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
 | 
						|
	utilio "k8s.io/utils/io"
 | 
						|
)
 | 
						|
 | 
						|
type podEventType int
 | 
						|
 | 
						|
const (
 | 
						|
	podAdd podEventType = iota
 | 
						|
	podModify
 | 
						|
	podDelete
 | 
						|
 | 
						|
	eventBufferLen = 10
 | 
						|
)
 | 
						|
 | 
						|
type watchEvent struct {
 | 
						|
	fileName  string
 | 
						|
	eventType podEventType
 | 
						|
}
 | 
						|
 | 
						|
type sourceFile struct {
 | 
						|
	path           string
 | 
						|
	nodeName       types.NodeName
 | 
						|
	period         time.Duration
 | 
						|
	store          cache.Store
 | 
						|
	fileKeyMapping map[string]string
 | 
						|
	updates        chan<- interface{}
 | 
						|
	watchEvents    chan *watchEvent
 | 
						|
}
 | 
						|
 | 
						|
func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
 | 
						|
	// "github.com/sigma/go-inotify" requires a path without trailing "/"
 | 
						|
	path = strings.TrimRight(path, string(os.PathSeparator))
 | 
						|
 | 
						|
	config := newSourceFile(path, nodeName, period, updates)
 | 
						|
	klog.V(1).Infof("Watching path %q", path)
 | 
						|
	config.run()
 | 
						|
}
 | 
						|
 | 
						|
func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
 | 
						|
	send := func(objs []interface{}) {
 | 
						|
		var pods []*v1.Pod
 | 
						|
		for _, o := range objs {
 | 
						|
			pods = append(pods, o.(*v1.Pod))
 | 
						|
		}
 | 
						|
		updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
 | 
						|
	}
 | 
						|
	store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)
 | 
						|
	return &sourceFile{
 | 
						|
		path:           path,
 | 
						|
		nodeName:       nodeName,
 | 
						|
		period:         period,
 | 
						|
		store:          store,
 | 
						|
		fileKeyMapping: map[string]string{},
 | 
						|
		updates:        updates,
 | 
						|
		watchEvents:    make(chan *watchEvent, eventBufferLen),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *sourceFile) run() {
 | 
						|
	listTicker := time.NewTicker(s.period)
 | 
						|
 | 
						|
	go func() {
 | 
						|
		// Read path immediately to speed up startup.
 | 
						|
		if err := s.listConfig(); err != nil {
 | 
						|
			klog.Errorf("Unable to read config path %q: %v", s.path, err)
 | 
						|
		}
 | 
						|
		for {
 | 
						|
			select {
 | 
						|
			case <-listTicker.C:
 | 
						|
				if err := s.listConfig(); err != nil {
 | 
						|
					klog.Errorf("Unable to read config path %q: %v", s.path, err)
 | 
						|
				}
 | 
						|
			case e := <-s.watchEvents:
 | 
						|
				if err := s.consumeWatchEvent(e); err != nil {
 | 
						|
					klog.Errorf("Unable to process watch event: %v", err)
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	s.startWatch()
 | 
						|
}
 | 
						|
 | 
						|
func (s *sourceFile) applyDefaults(pod *api.Pod, source string) error {
 | 
						|
	return applyDefaults(pod, source, true, s.nodeName)
 | 
						|
}
 | 
						|
 | 
						|
func (s *sourceFile) listConfig() error {
 | 
						|
	path := s.path
 | 
						|
	statInfo, err := os.Stat(path)
 | 
						|
	if err != nil {
 | 
						|
		if !os.IsNotExist(err) {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		// Emit an update with an empty PodList to allow FileSource to be marked as seen
 | 
						|
		s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource}
 | 
						|
		return fmt.Errorf("path does not exist, ignoring")
 | 
						|
	}
 | 
						|
 | 
						|
	switch {
 | 
						|
	case statInfo.Mode().IsDir():
 | 
						|
		pods, err := s.extractFromDir(path)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		if len(pods) == 0 {
 | 
						|
			// Emit an update with an empty PodList to allow FileSource to be marked as seen
 | 
						|
			s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		return s.replaceStore(pods...)
 | 
						|
 | 
						|
	case statInfo.Mode().IsRegular():
 | 
						|
		pod, err := s.extractFromFile(path)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		return s.replaceStore(pod)
 | 
						|
 | 
						|
	default:
 | 
						|
		return fmt.Errorf("path is not a directory or file")
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Get as many pod manifests as we can from a directory. Return an error if and only if something
 | 
						|
// prevented us from reading anything at all. Do not return an error if only some files
 | 
						|
// were problematic.
 | 
						|
func (s *sourceFile) extractFromDir(name string) ([]*v1.Pod, error) {
 | 
						|
	dirents, err := filepath.Glob(filepath.Join(name, "[^.]*"))
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("glob failed: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	pods := make([]*v1.Pod, 0)
 | 
						|
	if len(dirents) == 0 {
 | 
						|
		return pods, nil
 | 
						|
	}
 | 
						|
 | 
						|
	sort.Strings(dirents)
 | 
						|
	for _, path := range dirents {
 | 
						|
		statInfo, err := os.Stat(path)
 | 
						|
		if err != nil {
 | 
						|
			klog.Errorf("Can't get metadata for %q: %v", path, err)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		switch {
 | 
						|
		case statInfo.Mode().IsDir():
 | 
						|
			klog.Errorf("Not recursing into manifest path %q", path)
 | 
						|
		case statInfo.Mode().IsRegular():
 | 
						|
			pod, err := s.extractFromFile(path)
 | 
						|
			if err != nil {
 | 
						|
				if !os.IsNotExist(err) {
 | 
						|
					klog.Errorf("Can't process manifest file %q: %v", path, err)
 | 
						|
				}
 | 
						|
			} else {
 | 
						|
				pods = append(pods, pod)
 | 
						|
			}
 | 
						|
		default:
 | 
						|
			klog.Errorf("Manifest path %q is not a directory or file: %v", path, statInfo.Mode())
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return pods, nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
 | 
						|
	klog.V(3).Infof("Reading config file %q", filename)
 | 
						|
	defer func() {
 | 
						|
		if err == nil && pod != nil {
 | 
						|
			objKey, keyErr := cache.MetaNamespaceKeyFunc(pod)
 | 
						|
			if keyErr != nil {
 | 
						|
				err = keyErr
 | 
						|
				return
 | 
						|
			}
 | 
						|
			s.fileKeyMapping[filename] = objKey
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	file, err := os.Open(filename)
 | 
						|
	if err != nil {
 | 
						|
		return pod, err
 | 
						|
	}
 | 
						|
	defer file.Close()
 | 
						|
 | 
						|
	data, err := utilio.ReadAtMost(file, maxConfigLength)
 | 
						|
	if err != nil {
 | 
						|
		return pod, err
 | 
						|
	}
 | 
						|
 | 
						|
	defaultFn := func(pod *api.Pod) error {
 | 
						|
		return s.applyDefaults(pod, filename)
 | 
						|
	}
 | 
						|
 | 
						|
	parsed, pod, podErr := tryDecodeSinglePod(data, defaultFn)
 | 
						|
	if parsed {
 | 
						|
		if podErr != nil {
 | 
						|
			return pod, podErr
 | 
						|
		}
 | 
						|
		return pod, nil
 | 
						|
	}
 | 
						|
 | 
						|
	return pod, fmt.Errorf("%v: couldn't parse as pod(%v), please check config file.\n", filename, podErr)
 | 
						|
}
 | 
						|
 | 
						|
func (s *sourceFile) replaceStore(pods ...*v1.Pod) (err error) {
 | 
						|
	objs := []interface{}{}
 | 
						|
	for _, pod := range pods {
 | 
						|
		objs = append(objs, pod)
 | 
						|
	}
 | 
						|
	return s.store.Replace(objs, "")
 | 
						|
}
 |