mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 05:57:25 +00:00
commit
8b44f34e0f
@ -22,43 +22,64 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
|
||||||
"github.com/coreos/go-etcd/etcd"
|
"github.com/coreos/go-etcd/etcd"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
manifestUrl := ServeCachedManifestFile()
|
||||||
// Setup
|
// Setup
|
||||||
servers := []string{"http://localhost:4001"}
|
servers := []string{"http://localhost:4001"}
|
||||||
log.Printf("Creating etcd client pointing to %v", servers)
|
log.Printf("Creating etcd client pointing to %v", servers)
|
||||||
etcdClient := etcd.NewClient(servers)
|
machineList := []string{"localhost", "machine"}
|
||||||
machineList := registry.MakeMinionRegistry([]string{"machine"})
|
|
||||||
|
|
||||||
reg := registry.MakeEtcdRegistry(etcdClient, machineList)
|
// Master
|
||||||
|
m := master.New(servers, machineList, nil)
|
||||||
|
apiserver := httptest.NewServer(m.ConstructHandler("/api/v1beta1"))
|
||||||
|
|
||||||
apiserver := apiserver.New(map[string]apiserver.RESTStorage{
|
controllerManager := controller.MakeReplicationManager(etcd.NewClient(servers), client.New(apiserver.URL, nil))
|
||||||
"pods": registry.MakePodRegistryStorage(reg, &client.FakeContainerInfo{}, registry.MakeRoundRobinScheduler(machineList), nil, nil),
|
|
||||||
"replicationControllers": registry.MakeControllerRegistryStorage(reg),
|
|
||||||
}, "/api/v1beta1")
|
|
||||||
server := httptest.NewServer(apiserver)
|
|
||||||
|
|
||||||
controllerManager := controller.MakeReplicationManager(etcd.NewClient(servers), client.New(server.URL, nil))
|
|
||||||
|
|
||||||
controllerManager.Run(10 * time.Second)
|
controllerManager.Run(10 * time.Second)
|
||||||
|
|
||||||
|
// Kublet
|
||||||
|
fakeDocker1 := &kubelet.FakeDockerClient{}
|
||||||
|
myKubelet := kubelet.Kubelet{
|
||||||
|
Hostname: machineList[0],
|
||||||
|
DockerClient: fakeDocker1,
|
||||||
|
DockerPuller: &kubelet.FakeDockerPuller{},
|
||||||
|
FileCheckFrequency: 5 * time.Second,
|
||||||
|
SyncFrequency: 5 * time.Second,
|
||||||
|
HTTPCheckFrequency: 5 * time.Second,
|
||||||
|
}
|
||||||
|
go myKubelet.RunKubelet("", manifestUrl, servers[0], "localhost", 0)
|
||||||
|
|
||||||
|
// Create a second kublet so that the guestbook example's two redis slaves both
|
||||||
|
// have a place they can schedule.
|
||||||
|
fakeDocker2 := &kubelet.FakeDockerClient{}
|
||||||
|
otherKubelet := kubelet.Kubelet{
|
||||||
|
Hostname: machineList[1],
|
||||||
|
DockerClient: fakeDocker2,
|
||||||
|
DockerPuller: &kubelet.FakeDockerPuller{},
|
||||||
|
FileCheckFrequency: 5 * time.Second,
|
||||||
|
SyncFrequency: 5 * time.Second,
|
||||||
|
HTTPCheckFrequency: 5 * time.Second,
|
||||||
|
}
|
||||||
|
go otherKubelet.RunKubelet("", "", servers[0], "localhost", 0)
|
||||||
|
|
||||||
// Ok. we're good to go.
|
// Ok. we're good to go.
|
||||||
log.Printf("API Server started on %s", server.URL)
|
log.Printf("API Server started on %s", apiserver.URL)
|
||||||
// Wait for the synchronization threads to come up.
|
// Wait for the synchronization threads to come up.
|
||||||
time.Sleep(time.Second * 10)
|
time.Sleep(time.Second * 10)
|
||||||
|
|
||||||
kubeClient := client.New(server.URL, nil)
|
kubeClient := client.New(apiserver.URL, nil)
|
||||||
data, err := ioutil.ReadFile("api/examples/controller.json")
|
data, err := ioutil.ReadFile("api/examples/controller.json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Unexpected error: %#v", err)
|
log.Fatalf("Unexpected error: %#v", err)
|
||||||
@ -79,5 +100,62 @@ func main() {
|
|||||||
if err != nil || len(pods.Items) != 2 {
|
if err != nil || len(pods.Items) != 2 {
|
||||||
log.Fatal("FAILED")
|
log.Fatal("FAILED")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check that kubelet tried to make the pods.
|
||||||
|
// Using a set to list unique creation attempts. Our fake is
|
||||||
|
// really stupid, so kubelet tries to create these multiple times.
|
||||||
|
createdPods := map[string]struct{}{}
|
||||||
|
for _, p := range fakeDocker1.Created {
|
||||||
|
// The last 8 characters are random, so slice them off.
|
||||||
|
if n := len(p); n > 8 {
|
||||||
|
createdPods[p[:n-8]] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, p := range fakeDocker2.Created {
|
||||||
|
// The last 8 characters are random, so slice them off.
|
||||||
|
if n := len(p); n > 8 {
|
||||||
|
createdPods[p[:n-8]] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// We expect 5: 2 net containers + 2 pods from the replication controller +
|
||||||
|
// 1 net container + 2 pods from the URL.
|
||||||
|
if len(createdPods) != 7 {
|
||||||
|
log.Fatalf("Unexpected list of created pods: %#v\n", createdPods)
|
||||||
|
}
|
||||||
log.Printf("OK")
|
log.Printf("OK")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Serve a file for kubelet to read.
|
||||||
|
func ServeCachedManifestFile() (servingAddress string) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.URL.Path == "/manifest" {
|
||||||
|
w.Write([]byte(testManifestFile))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Fatalf("Got request: %#v\n", r)
|
||||||
|
http.NotFound(w, r)
|
||||||
|
}))
|
||||||
|
return server.URL + "/manifest"
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
// This is copied from, and should be kept in sync with:
|
||||||
|
// https://raw.githubusercontent.com/GoogleCloudPlatform/container-vm-guestbook-redis-python/master/manifest.yaml
|
||||||
|
testManifestFile = `version: v1beta1
|
||||||
|
containers:
|
||||||
|
- name: redis
|
||||||
|
image: dockerfile/redis
|
||||||
|
volumeMounts:
|
||||||
|
- name: redis-data
|
||||||
|
path: /data
|
||||||
|
|
||||||
|
- name: guestbook
|
||||||
|
image: google/guestbook-python-redis
|
||||||
|
ports:
|
||||||
|
- name: www
|
||||||
|
hostPort: 80
|
||||||
|
containerPort: 80
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
- name: redis-data`
|
||||||
|
)
|
||||||
|
@ -43,52 +43,52 @@ var (
|
|||||||
syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config")
|
syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config")
|
||||||
fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking file for new data")
|
fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking file for new data")
|
||||||
httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data")
|
httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data")
|
||||||
manifest_url = flag.String("manifest_url", "", "URL for accessing the container manifest")
|
manifestUrl = flag.String("manifest_url", "", "URL for accessing the container manifest")
|
||||||
kubelet_address = flag.String("kubelet_address", "127.0.0.1", "The address for the kubelet info server to serve on")
|
kubeletAddress = flag.String("kubelet_address", "127.0.0.1", "The address for the kubelet info server to serve on")
|
||||||
kubelet_port = flag.Uint("kubelet_port", 10250, "The port for the kubelete info server to serve on")
|
kubeletPort = flag.Uint("kubelet_port", 10250, "The port for the kubelete info server to serve on")
|
||||||
)
|
)
|
||||||
|
|
||||||
// master flags
|
// master flags
|
||||||
var (
|
var (
|
||||||
master_port = flag.Uint("master_port", 8080, "The port for the master to listen on. Default 8080.")
|
masterPort = flag.Uint("master_port", 8080, "The port for the master to listen on. Default 8080.")
|
||||||
master_address = flag.String("master_address", "127.0.0.1", "The address for the master to listen to. Default 127.0.0.1")
|
masterAddress = flag.String("master_address", "127.0.0.1", "The address for the master to listen to. Default 127.0.0.1")
|
||||||
apiPrefix = flag.String("api_prefix", "/api/v1beta1", "The prefix for API requests on the server. Default '/api/v1beta1'")
|
apiPrefix = flag.String("api_prefix", "/api/v1beta1", "The prefix for API requests on the server. Default '/api/v1beta1'")
|
||||||
)
|
)
|
||||||
|
|
||||||
// flags that affect both
|
// flags that affect both
|
||||||
var (
|
var (
|
||||||
etcd_server = flag.String("etcd_server", "http://localhost:4001", "Url of local etcd server")
|
etcdServer = flag.String("etcd_server", "http://localhost:4001", "Url of local etcd server")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Starts kubelet services. Never returns.
|
// Starts kubelet services. Never returns.
|
||||||
func fake_kubelet() {
|
func fakeKubelet() {
|
||||||
endpoint := "unix:///var/run/docker.sock"
|
endpoint := "unix:///var/run/docker.sock"
|
||||||
dockerClient, err := docker.NewClient(endpoint)
|
dockerClient, err := docker.NewClient(endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Couldn't connnect to docker.")
|
log.Fatal("Couldn't connnect to docker.")
|
||||||
}
|
}
|
||||||
|
|
||||||
my_kubelet := kubelet.Kubelet{
|
myKubelet := kubelet.Kubelet{
|
||||||
Hostname: *kubelet_address,
|
Hostname: *kubeletAddress,
|
||||||
DockerClient: dockerClient,
|
DockerClient: dockerClient,
|
||||||
FileCheckFrequency: *fileCheckFrequency,
|
FileCheckFrequency: *fileCheckFrequency,
|
||||||
SyncFrequency: *syncFrequency,
|
SyncFrequency: *syncFrequency,
|
||||||
HTTPCheckFrequency: *httpCheckFrequency,
|
HTTPCheckFrequency: *httpCheckFrequency,
|
||||||
}
|
}
|
||||||
my_kubelet.RunKubelet(*file, *manifest_url, *etcd_server, *kubelet_address, *kubelet_port)
|
myKubelet.RunKubelet(*file, *manifestUrl, *etcdServer, *kubeletAddress, *kubeletPort)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Starts api services (the master). Never returns.
|
// Starts api services (the master). Never returns.
|
||||||
func api_server() {
|
func apiServer() {
|
||||||
m := master.New([]string{*etcd_server}, []string{*kubelet_address}, nil)
|
m := master.New([]string{*etcdServer}, []string{*kubeletAddress}, nil)
|
||||||
log.Fatal(m.Run(net.JoinHostPort(*master_address, strconv.Itoa(int(*master_port))), *apiPrefix))
|
log.Fatal(m.Run(net.JoinHostPort(*masterAddress, strconv.Itoa(int(*masterPort))), *apiPrefix))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Starts up a controller manager. Never returns.
|
// Starts up a controller manager. Never returns.
|
||||||
func controller_manager() {
|
func controllerManager() {
|
||||||
controllerManager := controller.MakeReplicationManager(
|
controllerManager := controller.MakeReplicationManager(
|
||||||
etcd.NewClient([]string{*etcd_server}),
|
etcd.NewClient([]string{*etcdServer}),
|
||||||
client.New(fmt.Sprintf("http://%s:%d", *master_address, *master_port), nil))
|
client.New(fmt.Sprintf("http://%s:%d", *masterAddress, *masterPort), nil))
|
||||||
|
|
||||||
controllerManager.Run(20 * time.Second)
|
controllerManager.Run(20 * time.Second)
|
||||||
select {}
|
select {}
|
||||||
@ -101,12 +101,12 @@ func main() {
|
|||||||
// Set up logger for etcd client
|
// Set up logger for etcd client
|
||||||
etcd.SetLogger(log.New(os.Stderr, "etcd ", log.LstdFlags))
|
etcd.SetLogger(log.New(os.Stderr, "etcd ", log.LstdFlags))
|
||||||
|
|
||||||
go api_server()
|
go apiServer()
|
||||||
go fake_kubelet()
|
go fakeKubelet()
|
||||||
go controller_manager()
|
go controllerManager()
|
||||||
|
|
||||||
log.Printf("All components started.\nMaster running at: http://%s:%d\nKubelet running at: http://%s:%d\n",
|
log.Printf("All components started.\nMaster running at: http://%s:%d\nKubelet running at: http://%s:%d\n",
|
||||||
*master_address, *master_port,
|
*masterAddress, *masterPort,
|
||||||
*kubelet_address, *kubelet_port)
|
*kubeletAddress, *kubeletPort)
|
||||||
select {}
|
select {}
|
||||||
}
|
}
|
||||||
|
@ -22,7 +22,7 @@ fi
|
|||||||
# Stop right away if the build fails
|
# Stop right away if the build fails
|
||||||
set -e
|
set -e
|
||||||
|
|
||||||
$(dirname $0)/build-go.sh
|
$(dirname $0)/build-go.sh integration
|
||||||
|
|
||||||
ETCD_DIR=$(mktemp -d -t kube-integration.XXXXXX)
|
ETCD_DIR=$(mktemp -d -t kube-integration.XXXXXX)
|
||||||
trap "rm -rf ${ETCD_DIR}" EXIT
|
trap "rm -rf ${ETCD_DIR}" EXIT
|
||||||
|
89
pkg/kubelet/fake_docker_client.go
Normal file
89
pkg/kubelet/fake_docker_client.go
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2014 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package kubelet
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/fsouza/go-dockerclient"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A simple fake docker client, so that kublet can be run for testing without requiring a real docker setup.
|
||||||
|
type FakeDockerClient struct {
|
||||||
|
containerList []docker.APIContainers
|
||||||
|
container *docker.Container
|
||||||
|
err error
|
||||||
|
called []string
|
||||||
|
stopped []string
|
||||||
|
Created []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *FakeDockerClient) clearCalls() {
|
||||||
|
f.called = []string{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *FakeDockerClient) appendCall(call string) {
|
||||||
|
f.called = append(f.called, call)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *FakeDockerClient) ListContainers(options docker.ListContainersOptions) ([]docker.APIContainers, error) {
|
||||||
|
f.appendCall("list")
|
||||||
|
return f.containerList, f.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *FakeDockerClient) InspectContainer(id string) (*docker.Container, error) {
|
||||||
|
f.appendCall("inspect")
|
||||||
|
return f.container, f.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *FakeDockerClient) CreateContainer(c docker.CreateContainerOptions) (*docker.Container, error) {
|
||||||
|
f.appendCall("create")
|
||||||
|
f.Created = append(f.Created, c.Name)
|
||||||
|
// This is not a very good fake. We'll just add this container's name to the list.
|
||||||
|
// Docker likes to add a '/', so copy that behavior.
|
||||||
|
f.containerList = append(f.containerList, docker.APIContainers{ID: c.Name, Names: []string{"/" + c.Name}})
|
||||||
|
return &docker.Container{ID: "/" + c.Name}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *FakeDockerClient) StartContainer(id string, hostConfig *docker.HostConfig) error {
|
||||||
|
f.appendCall("start")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *FakeDockerClient) StopContainer(id string, timeout uint) error {
|
||||||
|
f.appendCall("stop")
|
||||||
|
f.stopped = append(f.stopped, id)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type FakeDockerPuller struct {
|
||||||
|
ImagesPulled []string
|
||||||
|
|
||||||
|
// Every pull will return the first error here, and then reslice
|
||||||
|
// to remove it. Will give nil errors if this slice is empty.
|
||||||
|
ErrorsToInject []error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Records the image pull attempt, and optionally injects an error.
|
||||||
|
func (f *FakeDockerPuller) Pull(image string) error {
|
||||||
|
f.ImagesPulled = append(f.ImagesPulled, image)
|
||||||
|
|
||||||
|
if n := len(f.ErrorsToInject); n > 0 {
|
||||||
|
err := f.ErrorsToInject[0]
|
||||||
|
f.ErrorsToInject = f.ErrorsToInject[:n-1]
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
@ -60,6 +60,11 @@ type DockerInterface interface {
|
|||||||
StopContainer(id string, timeout uint) error
|
StopContainer(id string, timeout uint) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Interface for testability
|
||||||
|
type DockerPuller interface {
|
||||||
|
Pull(image string) error
|
||||||
|
}
|
||||||
|
|
||||||
type CadvisorInterface interface {
|
type CadvisorInterface interface {
|
||||||
ContainerInfo(name string) (*info.ContainerInfo, error)
|
ContainerInfo(name string) (*info.ContainerInfo, error)
|
||||||
MachineInfo() (*info.MachineInfo, error)
|
MachineInfo() (*info.MachineInfo, error)
|
||||||
@ -70,6 +75,7 @@ type Kubelet struct {
|
|||||||
Hostname string
|
Hostname string
|
||||||
EtcdClient util.EtcdClient
|
EtcdClient util.EtcdClient
|
||||||
DockerClient DockerInterface
|
DockerClient DockerInterface
|
||||||
|
DockerPuller DockerPuller
|
||||||
CadvisorClient CadvisorInterface
|
CadvisorClient CadvisorInterface
|
||||||
FileCheckFrequency time.Duration
|
FileCheckFrequency time.Duration
|
||||||
SyncFrequency time.Duration
|
SyncFrequency time.Duration
|
||||||
@ -92,6 +98,9 @@ const (
|
|||||||
// Starts background goroutines. If config_path, manifest_url, or address are empty,
|
// Starts background goroutines. If config_path, manifest_url, or address are empty,
|
||||||
// they are not watched. Never returns.
|
// they are not watched. Never returns.
|
||||||
func (kl *Kubelet) RunKubelet(config_path, manifest_url, etcd_servers, address string, port uint) {
|
func (kl *Kubelet) RunKubelet(config_path, manifest_url, etcd_servers, address string, port uint) {
|
||||||
|
if kl.DockerPuller == nil {
|
||||||
|
kl.DockerPuller = MakeDockerPuller()
|
||||||
|
}
|
||||||
updateChannel := make(chan manifestUpdate)
|
updateChannel := make(chan manifestUpdate)
|
||||||
if config_path != "" {
|
if config_path != "" {
|
||||||
log.Printf("Watching for file configs at %s", config_path)
|
log.Printf("Watching for file configs at %s", config_path)
|
||||||
@ -220,9 +229,13 @@ func (kl *Kubelet) ListContainers() ([]string, error) {
|
|||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kl *Kubelet) pullImage(image string) error {
|
type dockerPuller struct{}
|
||||||
kl.pullLock.Lock()
|
|
||||||
defer kl.pullLock.Unlock()
|
func MakeDockerPuller() DockerPuller {
|
||||||
|
return dockerPuller{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dockerPuller) Pull(image string) error {
|
||||||
cmd := exec.Command("docker", "pull", image)
|
cmd := exec.Command("docker", "pull", image)
|
||||||
err := cmd.Start()
|
err := cmd.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -497,12 +510,48 @@ func (kl *Kubelet) extractFromHTTP(url string, updateChannel chan<- manifestUpda
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer response.Body.Close()
|
defer response.Body.Close()
|
||||||
manifest, err := kl.extractSingleFromReader(response.Body)
|
data, err := ioutil.ReadAll(response.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
updateChannel <- manifestUpdate{httpClientSource, []api.ContainerManifest{manifest}}
|
if len(data) == 0 {
|
||||||
return nil
|
return fmt.Errorf("zero-length data received from %v", url)
|
||||||
|
}
|
||||||
|
|
||||||
|
// First try as if it's a single manifest
|
||||||
|
var manifest api.ContainerManifest
|
||||||
|
singleErr := yaml.Unmarshal(data, &manifest)
|
||||||
|
if singleErr == nil && manifest.Version == "" {
|
||||||
|
// If data is a []ContainerManifest, trying to put it into a ContainerManifest
|
||||||
|
// will not give an error but also won't set any of the fields.
|
||||||
|
// Our docs say that the version field is mandatory, so using that to judge wether
|
||||||
|
// this was actually successful.
|
||||||
|
singleErr = fmt.Errorf("got blank version field")
|
||||||
|
}
|
||||||
|
if singleErr == nil {
|
||||||
|
updateChannel <- manifestUpdate{httpClientSource, []api.ContainerManifest{manifest}}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// That didn't work, so try an array of manifests.
|
||||||
|
var manifests []api.ContainerManifest
|
||||||
|
multiErr := yaml.Unmarshal(data, &manifests)
|
||||||
|
// We're not sure if the person reading the logs is going to care about the single or
|
||||||
|
// multiple manifest unmarshalling attempt, so we need to put both in the logs, as is
|
||||||
|
// done at the end. Hence not returning early here.
|
||||||
|
if multiErr == nil && len(manifests) == 0 {
|
||||||
|
multiErr = fmt.Errorf("no elements in ContainerManifest array")
|
||||||
|
}
|
||||||
|
if multiErr == nil && manifests[0].Version == "" {
|
||||||
|
multiErr = fmt.Errorf("got blank version field")
|
||||||
|
}
|
||||||
|
if multiErr == nil {
|
||||||
|
updateChannel <- manifestUpdate{httpClientSource, manifests}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("%v: received '%v', but couldn't parse as a "+
|
||||||
|
"single manifest (%v: %#v) or as multiple manifests (%v: %#v).\n",
|
||||||
|
url, string(data), singleErr, manifest, multiErr, manifests)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Take an etcd Response object, and turn it into a structured list of containers
|
// Take an etcd Response object, and turn it into a structured list of containers
|
||||||
@ -644,7 +693,7 @@ func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (stri
|
|||||||
Command: []string{"sh", "-c", "rm -f nap && mkfifo nap && exec cat nap"},
|
Command: []string{"sh", "-c", "rm -f nap && mkfifo nap && exec cat nap"},
|
||||||
Ports: ports,
|
Ports: ports,
|
||||||
}
|
}
|
||||||
kl.pullImage("busybox")
|
kl.DockerPuller.Pull("busybox")
|
||||||
return kl.RunContainer(manifest, container, "")
|
return kl.RunContainer(manifest, container, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -678,7 +727,7 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
|
|||||||
}
|
}
|
||||||
if !exists {
|
if !exists {
|
||||||
log.Printf("%#v doesn't exist, creating", element)
|
log.Printf("%#v doesn't exist, creating", element)
|
||||||
err = kl.pullImage(element.Image)
|
kl.DockerPuller.Pull(element.Image)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error pulling container: %#v", err)
|
log.Printf("Error pulling container: %#v", err)
|
||||||
continue
|
continue
|
||||||
|
@ -75,9 +75,15 @@ func verifyError(t *testing.T, e error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func makeTestKubelet() *Kubelet {
|
||||||
|
return &Kubelet{
|
||||||
|
DockerPuller: &FakeDockerPuller{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestExtractJSON(t *testing.T) {
|
func TestExtractJSON(t *testing.T) {
|
||||||
obj := TestObject{}
|
obj := TestObject{}
|
||||||
kubelet := Kubelet{}
|
kubelet := makeTestKubelet()
|
||||||
data := `{ "name": "foo", "data": { "value": "bar", "number": 10 } }`
|
data := `{ "name": "foo", "data": { "value": "bar", "number": 10 } }`
|
||||||
kubelet.ExtractYAMLData([]byte(data), &obj)
|
kubelet.ExtractYAMLData([]byte(data), &obj)
|
||||||
|
|
||||||
@ -86,48 +92,6 @@ func TestExtractJSON(t *testing.T) {
|
|||||||
verifyIntEquals(t, obj.Data.Number, 10)
|
verifyIntEquals(t, obj.Data.Number, 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
type FakeDockerClient struct {
|
|
||||||
containerList []docker.APIContainers
|
|
||||||
container *docker.Container
|
|
||||||
err error
|
|
||||||
called []string
|
|
||||||
stopped []string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FakeDockerClient) clearCalls() {
|
|
||||||
f.called = []string{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FakeDockerClient) appendCall(call string) {
|
|
||||||
f.called = append(f.called, call)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FakeDockerClient) ListContainers(options docker.ListContainersOptions) ([]docker.APIContainers, error) {
|
|
||||||
f.appendCall("list")
|
|
||||||
return f.containerList, f.err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FakeDockerClient) InspectContainer(id string) (*docker.Container, error) {
|
|
||||||
f.appendCall("inspect")
|
|
||||||
return f.container, f.err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FakeDockerClient) CreateContainer(docker.CreateContainerOptions) (*docker.Container, error) {
|
|
||||||
f.appendCall("create")
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FakeDockerClient) StartContainer(id string, hostConfig *docker.HostConfig) error {
|
|
||||||
f.appendCall("start")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FakeDockerClient) StopContainer(id string, timeout uint) error {
|
|
||||||
f.appendCall("stop")
|
|
||||||
f.stopped = append(f.stopped, id)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func verifyCalls(t *testing.T, fakeDocker FakeDockerClient, calls []string) {
|
func verifyCalls(t *testing.T, fakeDocker FakeDockerClient, calls []string) {
|
||||||
verifyStringArrayEquals(t, fakeDocker.called, calls)
|
verifyStringArrayEquals(t, fakeDocker.called, calls)
|
||||||
}
|
}
|
||||||
@ -175,6 +139,7 @@ func TestContainerExists(t *testing.T) {
|
|||||||
}
|
}
|
||||||
kubelet := Kubelet{
|
kubelet := Kubelet{
|
||||||
DockerClient: &fakeDocker,
|
DockerClient: &fakeDocker,
|
||||||
|
DockerPuller: &FakeDockerPuller{},
|
||||||
}
|
}
|
||||||
manifest := api.ContainerManifest{
|
manifest := api.ContainerManifest{
|
||||||
Id: "qux",
|
Id: "qux",
|
||||||
@ -218,6 +183,7 @@ func TestGetContainerID(t *testing.T) {
|
|||||||
}
|
}
|
||||||
kubelet := Kubelet{
|
kubelet := Kubelet{
|
||||||
DockerClient: &fakeDocker,
|
DockerClient: &fakeDocker,
|
||||||
|
DockerPuller: &FakeDockerPuller{},
|
||||||
}
|
}
|
||||||
fakeDocker.containerList = []docker.APIContainers{
|
fakeDocker.containerList = []docker.APIContainers{
|
||||||
{
|
{
|
||||||
@ -256,6 +222,7 @@ func TestGetContainerByName(t *testing.T) {
|
|||||||
}
|
}
|
||||||
kubelet := Kubelet{
|
kubelet := Kubelet{
|
||||||
DockerClient: &fakeDocker,
|
DockerClient: &fakeDocker,
|
||||||
|
DockerPuller: &FakeDockerPuller{},
|
||||||
}
|
}
|
||||||
fakeDocker.containerList = []docker.APIContainers{
|
fakeDocker.containerList = []docker.APIContainers{
|
||||||
{
|
{
|
||||||
@ -284,6 +251,7 @@ func TestListContainers(t *testing.T) {
|
|||||||
}
|
}
|
||||||
kubelet := Kubelet{
|
kubelet := Kubelet{
|
||||||
DockerClient: &fakeDocker,
|
DockerClient: &fakeDocker,
|
||||||
|
DockerPuller: &FakeDockerPuller{},
|
||||||
}
|
}
|
||||||
fakeDocker.containerList = []docker.APIContainers{
|
fakeDocker.containerList = []docker.APIContainers{
|
||||||
{
|
{
|
||||||
@ -314,6 +282,7 @@ func TestKillContainerWithError(t *testing.T) {
|
|||||||
}
|
}
|
||||||
kubelet := Kubelet{
|
kubelet := Kubelet{
|
||||||
DockerClient: &fakeDocker,
|
DockerClient: &fakeDocker,
|
||||||
|
DockerPuller: &FakeDockerPuller{},
|
||||||
}
|
}
|
||||||
err := kubelet.KillContainer("foo")
|
err := kubelet.KillContainer("foo")
|
||||||
verifyError(t, err)
|
verifyError(t, err)
|
||||||
@ -326,6 +295,7 @@ func TestKillContainer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
kubelet := Kubelet{
|
kubelet := Kubelet{
|
||||||
DockerClient: &fakeDocker,
|
DockerClient: &fakeDocker,
|
||||||
|
DockerPuller: &FakeDockerPuller{},
|
||||||
}
|
}
|
||||||
fakeDocker.containerList = []docker.APIContainers{
|
fakeDocker.containerList = []docker.APIContainers{
|
||||||
{
|
{
|
||||||
@ -345,7 +315,7 @@ func TestKillContainer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestResponseToContainersNil(t *testing.T) {
|
func TestResponseToContainersNil(t *testing.T) {
|
||||||
kubelet := Kubelet{}
|
kubelet := makeTestKubelet()
|
||||||
list, err := kubelet.ResponseToManifests(&etcd.Response{Node: nil})
|
list, err := kubelet.ResponseToManifests(&etcd.Response{Node: nil})
|
||||||
if len(list) != 0 {
|
if len(list) != 0 {
|
||||||
t.Errorf("Unexpected non-zero list: %#v", list)
|
t.Errorf("Unexpected non-zero list: %#v", list)
|
||||||
@ -356,7 +326,7 @@ func TestResponseToContainersNil(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestResponseToManifests(t *testing.T) {
|
func TestResponseToManifests(t *testing.T) {
|
||||||
kubelet := Kubelet{}
|
kubelet := makeTestKubelet()
|
||||||
list, err := kubelet.ResponseToManifests(&etcd.Response{
|
list, err := kubelet.ResponseToManifests(&etcd.Response{
|
||||||
Node: &etcd.Node{
|
Node: &etcd.Node{
|
||||||
Value: util.MakeJSONString([]api.ContainerManifest{
|
Value: util.MakeJSONString([]api.ContainerManifest{
|
||||||
@ -510,6 +480,7 @@ func TestSyncManifestsDoesNothing(t *testing.T) {
|
|||||||
}
|
}
|
||||||
kubelet := Kubelet{
|
kubelet := Kubelet{
|
||||||
DockerClient: &fakeDocker,
|
DockerClient: &fakeDocker,
|
||||||
|
DockerPuller: &FakeDockerPuller{},
|
||||||
}
|
}
|
||||||
err := kubelet.SyncManifests([]api.ContainerManifest{
|
err := kubelet.SyncManifests([]api.ContainerManifest{
|
||||||
{
|
{
|
||||||
@ -552,6 +523,7 @@ func TestSyncManifestsDeletes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
kubelet := Kubelet{
|
kubelet := Kubelet{
|
||||||
DockerClient: &fakeDocker,
|
DockerClient: &fakeDocker,
|
||||||
|
DockerPuller: &FakeDockerPuller{},
|
||||||
}
|
}
|
||||||
err := kubelet.SyncManifests([]api.ContainerManifest{})
|
err := kubelet.SyncManifests([]api.ContainerManifest{})
|
||||||
expectNoError(t, err)
|
expectNoError(t, err)
|
||||||
@ -827,16 +799,16 @@ func TestExtractFromHttpBadness(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestExtractFromHttp(t *testing.T) {
|
func TestExtractFromHttpSingle(t *testing.T) {
|
||||||
kubelet := Kubelet{}
|
kubelet := Kubelet{}
|
||||||
updateChannel := make(chan manifestUpdate)
|
updateChannel := make(chan manifestUpdate)
|
||||||
reader := startReading(updateChannel)
|
reader := startReading(updateChannel)
|
||||||
|
|
||||||
manifests := []api.ContainerManifest{
|
manifests := []api.ContainerManifest{
|
||||||
{Id: "foo"},
|
{Version: "v1beta1", Id: "foo"},
|
||||||
}
|
}
|
||||||
// TODO: provide a mechanism for taking arrays of
|
// Taking a single-manifest from a URL allows kubelet to be used
|
||||||
// manifests or a single manifest.
|
// in the implementation of google's container VM image.
|
||||||
data, err := json.Marshal(manifests[0])
|
data, err := json.Marshal(manifests[0])
|
||||||
|
|
||||||
fakeHandler := util.FakeHandler{
|
fakeHandler := util.FakeHandler{
|
||||||
@ -855,6 +827,46 @@ func TestExtractFromHttp(t *testing.T) {
|
|||||||
|
|
||||||
if len(read) != 1 {
|
if len(read) != 1 {
|
||||||
t.Errorf("Unexpected list: %#v", read)
|
t.Errorf("Unexpected list: %#v", read)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(manifests, read[0]) {
|
||||||
|
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", manifests, read[0])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExtractFromHttpMultiple(t *testing.T) {
|
||||||
|
kubelet := Kubelet{}
|
||||||
|
updateChannel := make(chan manifestUpdate)
|
||||||
|
reader := startReading(updateChannel)
|
||||||
|
|
||||||
|
manifests := []api.ContainerManifest{
|
||||||
|
{Version: "v1beta1", Id: "foo"},
|
||||||
|
{Version: "v1beta1", Id: "bar"},
|
||||||
|
}
|
||||||
|
data, err := json.Marshal(manifests)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Some weird json problem: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("Serving: %v", string(data))
|
||||||
|
|
||||||
|
fakeHandler := util.FakeHandler{
|
||||||
|
StatusCode: 200,
|
||||||
|
ResponseBody: string(data),
|
||||||
|
}
|
||||||
|
testServer := httptest.NewServer(&fakeHandler)
|
||||||
|
|
||||||
|
err = kubelet.extractFromHTTP(testServer.URL, updateChannel)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error: %#v", err)
|
||||||
|
}
|
||||||
|
close(updateChannel)
|
||||||
|
|
||||||
|
read := reader.GetList()
|
||||||
|
|
||||||
|
if len(read) != 1 {
|
||||||
|
t.Errorf("Unexpected list: %#v", read)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(manifests, read[0]) {
|
if !reflect.DeepEqual(manifests, read[0]) {
|
||||||
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", manifests, read[0])
|
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", manifests, read[0])
|
||||||
@ -964,6 +976,7 @@ func TestGetContainerStats(t *testing.T) {
|
|||||||
|
|
||||||
kubelet := Kubelet{
|
kubelet := Kubelet{
|
||||||
DockerClient: &fakeDocker,
|
DockerClient: &fakeDocker,
|
||||||
|
DockerPuller: &FakeDockerPuller{},
|
||||||
CadvisorClient: mockCadvisor,
|
CadvisorClient: mockCadvisor,
|
||||||
}
|
}
|
||||||
fakeDocker.containerList = []docker.APIContainers{
|
fakeDocker.containerList = []docker.APIContainers{
|
||||||
@ -992,6 +1005,7 @@ func TestGetContainerStatsWithoutCadvisor(t *testing.T) {
|
|||||||
|
|
||||||
kubelet := Kubelet{
|
kubelet := Kubelet{
|
||||||
DockerClient: &fakeDocker,
|
DockerClient: &fakeDocker,
|
||||||
|
DockerPuller: &FakeDockerPuller{},
|
||||||
}
|
}
|
||||||
fakeDocker.containerList = []docker.APIContainers{
|
fakeDocker.containerList = []docker.APIContainers{
|
||||||
{
|
{
|
||||||
@ -1029,6 +1043,7 @@ func TestGetContainerStatsWhenCadvisorFailed(t *testing.T) {
|
|||||||
|
|
||||||
kubelet := Kubelet{
|
kubelet := Kubelet{
|
||||||
DockerClient: &fakeDocker,
|
DockerClient: &fakeDocker,
|
||||||
|
DockerPuller: &FakeDockerPuller{},
|
||||||
CadvisorClient: mockCadvisor,
|
CadvisorClient: mockCadvisor,
|
||||||
}
|
}
|
||||||
fakeDocker.containerList = []docker.APIContainers{
|
fakeDocker.containerList = []docker.APIContainers{
|
||||||
@ -1061,6 +1076,7 @@ func TestGetContainerStatsOnNonExistContainer(t *testing.T) {
|
|||||||
|
|
||||||
kubelet := Kubelet{
|
kubelet := Kubelet{
|
||||||
DockerClient: &fakeDocker,
|
DockerClient: &fakeDocker,
|
||||||
|
DockerPuller: &FakeDockerPuller{},
|
||||||
CadvisorClient: mockCadvisor,
|
CadvisorClient: mockCadvisor,
|
||||||
}
|
}
|
||||||
fakeDocker.containerList = []docker.APIContainers{}
|
fakeDocker.containerList = []docker.APIContainers{}
|
||||||
|
@ -98,3 +98,12 @@ func (m *Master) Run(myAddress, apiPrefix string) error {
|
|||||||
}
|
}
|
||||||
return s.ListenAndServe()
|
return s.ListenAndServe()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Instead of calling Run, call ConstructHandler to get a handler for your own
|
||||||
|
// server. Intended for testing. Only call once.
|
||||||
|
func (m *Master) ConstructHandler(apiPrefix string) http.Handler {
|
||||||
|
endpoints := registry.MakeEndpointController(m.serviceRegistry, m.podRegistry)
|
||||||
|
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
|
||||||
|
|
||||||
|
return apiserver.New(m.storage, apiPrefix)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user