Merge pull request #72 from lavalamp/localkube

Localkube
This commit is contained in:
brendandburns 2014-06-12 11:15:08 -07:00
commit d100006616
11 changed files with 281 additions and 37 deletions

View File

@ -93,6 +93,16 @@ cd kubernetes
cluster/kube-down.sh cluster/kube-down.sh
``` ```
## Running locally
In a separate tab of your terminal, run:
```
cd kubernetes
hack/local-up.sh
```
This will build and start a lightweight local cluster, consisting of a master and a single minion. Type Control-C to shut it down. While it's running, you can use `hack/localcfg.sh` in place of `cluster/cloudcfg.sh` to talk to it.
## Where to go next? ## Where to go next?
[Detailed example application](https://github.com/GoogleCloudPlatform/kubernetes/blob/master/examples/guestbook/guestbook.md) [Detailed example application](https://github.com/GoogleCloudPlatform/kubernetes/blob/master/examples/guestbook/guestbook.md)

View File

@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
// apiserver is the main api server and master for the cluster. // apiserver is the main api server and master for the cluster.
// it is responsible for serving the cluster management API. // it is responsible for serving the cluster management API.
package main package main

View File

@ -20,7 +20,9 @@ import (
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
"net/url"
"os" "os"
"path"
"strconv" "strconv"
"time" "time"
@ -39,7 +41,7 @@ var (
updatePeriod = flag.Duration("u", 60*time.Second, "Update interarrival period") updatePeriod = flag.Duration("u", 60*time.Second, "Update interarrival period")
portSpec = flag.String("p", "", "The port spec, comma-separated list of <external>:<internal>,...") portSpec = flag.String("p", "", "The port spec, comma-separated list of <external>:<internal>,...")
servicePort = flag.Int("s", -1, "If positive, create and run a corresponding service on this port, only used with 'run'") servicePort = flag.Int("s", -1, "If positive, create and run a corresponding service on this port, only used with 'run'")
authConfig = flag.String("auth", os.Getenv("HOME")+"/.kubernetes_auth", "Path to the auth info file. If missing, prompt the user") authConfig = flag.String("auth", os.Getenv("HOME")+"/.kubernetes_auth", "Path to the auth info file. If missing, prompt the user. Only used if doing https.")
json = flag.Bool("json", false, "If true, print raw JSON for responses") json = flag.Bool("json", false, "If true, print raw JSON for responses")
yaml = flag.Bool("yaml", false, "If true, print raw YAML for responses") yaml = flag.Bool("yaml", false, "If true, print raw YAML for responses")
) )
@ -61,9 +63,16 @@ func main() {
usage() usage()
} }
method := flag.Arg(0) method := flag.Arg(0)
url := *httpServer + "/api/v1beta1" + flag.Arg(1) secure := true
parsedUrl, err := url.Parse(*httpServer)
if err != nil {
log.Fatalf("Unable to parse %v as a URL\n", err)
}
if parsedUrl.Scheme != "" && parsedUrl.Scheme != "https" {
secure = false
}
url := *httpServer + path.Join("/api/v1beta1", flag.Arg(1))
var request *http.Request var request *http.Request
var err error
var printer cloudcfg.ResourcePrinter var printer cloudcfg.ResourcePrinter
if *json { if *json {
@ -74,10 +83,13 @@ func main() {
printer = &cloudcfg.HumanReadablePrinter{} printer = &cloudcfg.HumanReadablePrinter{}
} }
auth, err := cloudcfg.LoadAuthInfo(*authConfig) var auth *kube_client.AuthInfo
if secure {
auth, err = cloudcfg.LoadAuthInfo(*authConfig)
if err != nil { if err != nil {
log.Fatalf("Error loading auth: %#v", err) log.Fatalf("Error loading auth: %#v", err)
} }
}
switch method { switch method {
case "get", "list": case "get", "list":
@ -94,7 +106,7 @@ func main() {
case "rollingupdate": case "rollingupdate":
client := &kube_client.Client{ client := &kube_client.Client{
Host: *httpServer, Host: *httpServer,
Auth: &auth, Auth: auth,
} }
cloudcfg.Update(flag.Arg(1), client, *updatePeriod) cloudcfg.Update(flag.Arg(1), client, *updatePeriod)
case "run": case "run":
@ -108,19 +120,19 @@ func main() {
if err != nil { if err != nil {
log.Fatalf("Error parsing replicas: %#v", err) log.Fatalf("Error parsing replicas: %#v", err)
} }
err = cloudcfg.RunController(image, name, replicas, kube_client.Client{Host: *httpServer, Auth: &auth}, *portSpec, *servicePort) err = cloudcfg.RunController(image, name, replicas, kube_client.Client{Host: *httpServer, Auth: auth}, *portSpec, *servicePort)
if err != nil { if err != nil {
log.Fatalf("Error: %#v", err) log.Fatalf("Error: %#v", err)
} }
return return
case "stop": case "stop":
err = cloudcfg.StopController(flag.Arg(1), kube_client.Client{Host: *httpServer, Auth: &auth}) err = cloudcfg.StopController(flag.Arg(1), kube_client.Client{Host: *httpServer, Auth: auth})
if err != nil { if err != nil {
log.Fatalf("Error: %#v", err) log.Fatalf("Error: %#v", err)
} }
return return
case "rm": case "rm":
err = cloudcfg.DeleteController(flag.Arg(1), kube_client.Client{Host: *httpServer, Auth: &auth}) err = cloudcfg.DeleteController(flag.Arg(1), kube_client.Client{Host: *httpServer, Auth: auth})
if err != nil { if err != nil {
log.Fatalf("Error: %#v", err) log.Fatalf("Error: %#v", err)
} }
@ -131,8 +143,7 @@ func main() {
if err != nil { if err != nil {
log.Fatalf("Error: %#v", err) log.Fatalf("Error: %#v", err)
} }
var body string body, err := cloudcfg.DoRequest(request, auth)
body, err = cloudcfg.DoRequest(request, auth.User, auth.Password)
if err != nil { if err != nil {
log.Fatalf("Error: %#v", err) log.Fatalf("Error: %#v", err)
} }

View File

@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
// The controller manager is responsible for monitoring replication controllers, and creating corresponding // The controller manager is responsible for monitoring replication controllers, and creating corresponding
// pods to achieve the desired state. It listens for new controllers in etcd, and it sends requests to the // pods to achieve the desired state. It listens for new controllers in etcd, and it sends requests to the
// master to create/delete pods. // master to create/delete pods.

View File

@ -24,6 +24,7 @@ import (
"log" "log"
"math/rand" "math/rand"
"os" "os"
"os/exec"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
@ -57,7 +58,13 @@ func main() {
log.Fatal("Couldn't connnect to docker.") log.Fatal("Couldn't connnect to docker.")
} }
hostname, err := exec.Command("hostname", "-f").Output()
if err != nil {
log.Fatalf("Couldn't determine hostname: %v", err)
}
my_kubelet := kubelet.Kubelet{ my_kubelet := kubelet.Kubelet{
Hostname: string(hostname),
DockerClient: dockerClient, DockerClient: dockerClient,
FileCheckFrequency: *fileCheckFrequency, FileCheckFrequency: *fileCheckFrequency,
SyncFrequency: *syncFrequency, SyncFrequency: *syncFrequency,

138
cmd/localkube/localkube.go Normal file
View File

@ -0,0 +1,138 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// An all-in-one binary for standing up a fake Kubernetes cluster on your
// local machine.
// Assumes that there is a pre-existing etcd server running on localhost.
package main
import (
"flag"
"fmt"
"log"
"net/http"
"os"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient"
)
// kubelet flags
var (
file = flag.String("config", "", "Path to the config file")
syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config")
fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking file for new data")
httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data")
manifest_url = flag.String("manifest_url", "", "URL for accessing the container manifest")
kubelet_address = flag.String("kubelet_address", "127.0.0.1", "The address for the kubelet info server to serve on")
kubelet_port = flag.Uint("kubelet_port", 10250, "The port for the kubelete info server to serve on")
)
// master flags
var (
master_port = flag.Uint("master_port", 8080, "The port for the master to listen on. Default 8080.")
master_address = flag.String("master_address", "127.0.0.1", "The address for the master to listen to. Default 127.0.0.1")
apiPrefix = flag.String("api_prefix", "/api/v1beta1", "The prefix for API requests on the server. Default '/api/v1beta1'")
)
// flags that affect both
var (
etcd_server = flag.String("etcd_server", "http://localhost:4001", "Url of local etcd server")
)
// Starts kubelet services. Never returns.
func fake_kubelet() {
endpoint := "unix:///var/run/docker.sock"
dockerClient, err := docker.NewClient(endpoint)
if err != nil {
log.Fatal("Couldn't connnect to docker.")
}
my_kubelet := kubelet.Kubelet{
Hostname: *kubelet_address,
DockerClient: dockerClient,
FileCheckFrequency: *fileCheckFrequency,
SyncFrequency: *syncFrequency,
HTTPCheckFrequency: *httpCheckFrequency,
}
my_kubelet.RunKubelet(*file, *manifest_url, *etcd_server, *kubelet_address, *kubelet_port)
}
// Starts api services (the master). Never returns.
func api_server() {
machineList := util.StringList{*kubelet_address}
etcdClient := etcd.NewClient([]string{*etcd_server})
podRegistry := registry.MakeEtcdRegistry(etcdClient, machineList)
controllerRegistry := registry.MakeEtcdRegistry(etcdClient, machineList)
serviceRegistry := registry.MakeEtcdRegistry(etcdClient, machineList)
containerInfo := &client.HTTPContainerInfo{
Client: http.DefaultClient,
Port: *kubelet_port,
}
storage := map[string]apiserver.RESTStorage{
"pods": registry.MakePodRegistryStorage(podRegistry, containerInfo, registry.MakeFirstFitScheduler(machineList, podRegistry)),
"replicationControllers": registry.MakeControllerRegistryStorage(controllerRegistry),
"services": registry.MakeServiceRegistryStorage(serviceRegistry),
}
endpoints := registry.MakeEndpointController(serviceRegistry, podRegistry)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
s := &http.Server{
Addr: fmt.Sprintf("%s:%d", *master_address, *master_port),
Handler: apiserver.New(storage, *apiPrefix),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
log.Fatal(s.ListenAndServe())
}
// Starts up a controller manager. Never returns.
func controller_manager() {
controllerManager := registry.MakeReplicationManager(etcd.NewClient([]string{*etcd_server}),
client.Client{
Host: fmt.Sprintf("http://%s:%d", *master_address, *master_port),
})
go util.Forever(func() { controllerManager.Synchronize() }, 20*time.Second)
go util.Forever(func() { controllerManager.WatchControllers() }, 20*time.Second)
select {}
}
func main() {
flag.Parse()
// Set up logger for etcd client
etcd.SetLogger(log.New(os.Stderr, "etcd ", log.LstdFlags))
go api_server()
go fake_kubelet()
go controller_manager()
log.Printf("All components started.\nMaster running at: http://%s:%d\nKubelet running at: http://%s:%d\n",
*master_address, *master_port,
*kubelet_address, *kubelet_port)
select {}
}

View File

@ -22,7 +22,7 @@ source $(dirname $0)/config-go.sh
cd "${KUBE_TARGET}" cd "${KUBE_TARGET}"
BINARIES="proxy integration apiserver controller-manager kubelet cloudcfg" BINARIES="proxy integration apiserver controller-manager kubelet cloudcfg localkube"
for b in $BINARIES; do for b in $BINARIES; do
echo "+++ Building ${b}" echo "+++ Building ${b}"

51
hack/local-up.sh Executable file
View File

@ -0,0 +1,51 @@
#!/bin/bash
# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This command builds and runs a local kubernetes cluster.
if [ "$(which etcd)" == "" ]; then
echo "etcd must be in your PATH"
exit 1
fi
# Stop right away if the build fails
set -e
# Only build what we need
(
source $(dirname $0)/config-go.sh
cd "${KUBE_TARGET}"
BINARIES="cloudcfg localkube"
for b in $BINARIES; do
echo "+++ Building ${b}"
go build "${KUBE_GO_PACKAGE}"/cmd/${b}
done
)
echo "Starting etcd"
ETCD_DIR=$(mktemp -d -t kube-integration.XXXXXX)
trap "rm -rf ${ETCD_DIR}" EXIT
etcd -name test -data-dir ${ETCD_DIR} > /tmp/etcd.log &
ETCD_PID=$!
sleep 5
echo "Running localkube as root (so it can talk to docker's unix socket)"
sudo $(dirname $0)/../output/go/localkube
kill $ETCD_PID

27
hack/localcfg.sh Executable file
View File

@ -0,0 +1,27 @@
#!/bin/bash
# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is exactly like cloudcfg.sh, but it talks to a local master
# (which you're assumed to be running with localkube.sh).
CLOUDCFG=$(dirname $0)/../output/go/cloudcfg
if [ ! -x $CLOUDCFG ]; then
echo "Could not find cloudcfg binary. Run hack/build-go.sh to build it."
exit 1
fi
# 8080 is the default port for the master
$CLOUDCFG -h http://localhost:8080 $@

View File

@ -42,25 +42,28 @@ func promptForString(field string) string {
return result return result
} }
// Parse an AuthInfo object from a file path // Parse an AuthInfo object from a file path. Prompt user and create file if it doesn't exist.
func LoadAuthInfo(path string) (client.AuthInfo, error) { func LoadAuthInfo(path string) (*client.AuthInfo, error) {
var auth client.AuthInfo var auth client.AuthInfo
if _, err := os.Stat(path); os.IsNotExist(err) { if _, err := os.Stat(path); os.IsNotExist(err) {
auth.User = promptForString("Username") auth.User = promptForString("Username")
auth.Password = promptForString("Password") auth.Password = promptForString("Password")
data, err := json.Marshal(auth) data, err := json.Marshal(auth)
if err != nil { if err != nil {
return auth, err return &auth, err
} }
err = ioutil.WriteFile(path, data, 0600) err = ioutil.WriteFile(path, data, 0600)
return auth, err return &auth, err
} }
data, err := ioutil.ReadFile(path) data, err := ioutil.ReadFile(path)
if err != nil { if err != nil {
return auth, err return nil, err
} }
err = json.Unmarshal(data, &auth) err = json.Unmarshal(data, &auth)
return auth, err if err != nil {
return nil, err
}
return &auth, err
} }
// Perform a rolling update of a collection of pods. // Perform a rolling update of a collection of pods.
@ -99,23 +102,22 @@ func RequestWithBody(configFile, url, method string) (*http.Request, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return RequestWithBodyData(data, url, method) return requestWithBodyData(data, url, method)
} }
// RequestWithBodyData is a helper method that creates an HTTP request with the specified url, method // requestWithBodyData is a helper method that creates an HTTP request with the specified url, method
// and body data // and body data
// FIXME: need to be public API? func requestWithBodyData(data []byte, url, method string) (*http.Request, error) {
func RequestWithBodyData(data []byte, url, method string) (*http.Request, error) {
request, err := http.NewRequest(method, url, bytes.NewBuffer(data)) request, err := http.NewRequest(method, url, bytes.NewBuffer(data))
request.ContentLength = int64(len(data)) request.ContentLength = int64(len(data))
return request, err return request, err
} }
// Execute a request, adds authentication, and HTTPS cert ignoring. // Execute a request, adds authentication (if auth != nil), and HTTPS cert ignoring.
// TODO: Make this stuff optional func DoRequest(request *http.Request, auth *client.AuthInfo) (string, error) {
// FIXME: need to be public API? if auth != nil {
func DoRequest(request *http.Request, user, password string) (string, error) { request.SetBasicAuth(auth.User, auth.Password)
request.SetBasicAuth(user, password) }
tr := &http.Transport{ tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
} }

View File

@ -60,6 +60,7 @@ type DockerInterface interface {
// The main kubelet implementation // The main kubelet implementation
type Kubelet struct { type Kubelet struct {
Hostname string
Client registry.EtcdClient Client registry.EtcdClient
DockerClient DockerInterface DockerClient DockerInterface
FileCheckFrequency time.Duration FileCheckFrequency time.Duration
@ -427,15 +428,10 @@ func (kl *Kubelet) getKubeletStateFromEtcd(key string, changeChannel chan<- []ap
// The channel to send new configurations across // The channel to send new configurations across
// This function loops forever and is intended to be run in a go routine. // This function loops forever and is intended to be run in a go routine.
func (kl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerManifest) { func (kl *Kubelet) SyncAndSetupEtcdWatch(changeChannel chan<- []api.ContainerManifest) {
hostname, err := exec.Command("hostname", "-f").Output() key := "/registry/hosts/" + strings.TrimSpace(kl.Hostname)
if err != nil {
log.Printf("Couldn't determine hostname : %v", err)
return
}
key := "/registry/hosts/" + strings.TrimSpace(string(hostname))
// First fetch the initial configuration (watch only gives changes...) // First fetch the initial configuration (watch only gives changes...)
for { for {
err = kl.getKubeletStateFromEtcd(key, changeChannel) err := kl.getKubeletStateFromEtcd(key, changeChannel)
if err == nil { if err == nil {
// We got a successful response, etcd is up, set up the watch. // We got a successful response, etcd is up, set up the watch.
break break