Merge pull request #2224 from thockin/dns

Enable DNS for services
This commit is contained in:
Joe Beda 2014-12-29 10:27:57 -08:00
commit a2e58d490e
44 changed files with 1040 additions and 98 deletions

View File

@ -44,3 +44,9 @@ ENABLE_NODE_LOGGING=true
LOGGING_DESTINATION=elasticsearch # options: elasticsearch, gcp
IAM_PROFILE="kubernetes"
LOG="/dev/null"
# Optional: Install cluster DNS.
ENABLE_CLUSTER_DNS=true
DNS_SERVER_IP="10.0.0.10"
DNS_DOMAIN="kubernetes.local"
DNS_REPLICAS=1

View File

@ -25,6 +25,9 @@ portal_net: $PORTAL_NET
enable_node_monitoring: $ENABLE_NODE_MONITORING
enable_node_logging: $ENABLE_NODE_LOGGING
logging_destination: $LOGGING_DESTINATION
enable_cluster_dns: $ENABLE_CLUSTER_DNS
dns_server: $DNS_SERVER_IP
dns_domain: $DNS_DOMAIN
EOF
mkdir -p /srv/salt-overlay/salt/nginx

View File

@ -299,6 +299,9 @@ function kube-up {
echo "readonly ENABLE_NODE_MONITORING='${ENABLE_NODE_MONITORING:-false}'"
echo "readonly ENABLE_NODE_LOGGING='${ENABLE_NODE_LOGGING:-false}'"
echo "readonly LOGGING_DESTINATION='${LOGGING_DESTINATION:-}'"
echo "readonly ENABLE_CLUSTER_DNS='${ENABLE_CLUSTER_DNS:-false}'"
echo "readonly DNS_SERVER_IP='${DNS_SERVER_IP:-}'"
echo "readonly DNS_DOMAIN='${DNS_DOMAIN:-}'"
grep -v "^#" "${KUBE_ROOT}/cluster/aws/templates/create-dynamic-salt-files.sh"
grep -v "^#" "${KUBE_ROOT}/cluster/aws/templates/download-release.sh"
grep -v "^#" "${KUBE_ROOT}/cluster/aws/templates/salt-master.sh"

View File

@ -54,3 +54,9 @@ LOGGING_DESTINATION=elasticsearch # options: elasticsearch, gcp
# Don't require https for registries in our local RFC1918 network
EXTRA_DOCKER_OPTS="--insecure-registry 10.0.0.0/8"
# Optional: Install cluster DNS.
ENABLE_CLUSTER_DNS=true
DNS_SERVER_IP="10.0.0.10"
DNS_DOMAIN="kubernetes.local"
DNS_REPLICAS=1

View File

@ -51,3 +51,9 @@ ENABLE_CLUSTER_MONITORING=false
# Don't require https for registries in our local RFC1918 network
EXTRA_DOCKER_OPTS="--insecure-registry 10.0.0.0/8"
# Optional: Install cluster DNS.
ENABLE_CLUSTER_DNS=true
DNS_SERVER_IP="10.0.0.10"
DNS_DOMAIN="kubernetes.local"
DNS_REPLICAS=1

View File

@ -25,6 +25,9 @@ portal_net: '$(echo "$PORTAL_NET" | sed -e "s/'/''/g")'
enable_node_monitoring: '$(echo "$ENABLE_NODE_MONITORING" | sed -e "s/'/''/g")'
enable_node_logging: '$(echo "$ENABLE_NODE_LOGGING" | sed -e "s/'/''/g")'
logging_destination: '$(echo "$LOGGING_DESTINATION" | sed -e "s/'/''/g")'
enable_cluster_dns: '$(echo "$ENABLE_CLUSTER_DNS" | sed -e "s/'/''/g")'
dns_server: '$(echo "$DNS_SERVER_IP" | sed -e "s/'/''/g")'
dns_domain: '$(echo "$DNS_DOMAIN" | sed -e "s/'/''/g")'
EOF
mkdir -p /srv/salt-overlay/salt/nginx

View File

@ -389,6 +389,9 @@ function kube-up {
echo "readonly ENABLE_NODE_MONITORING='${ENABLE_NODE_MONITORING:-false}'"
echo "readonly ENABLE_NODE_LOGGING='${ENABLE_NODE_LOGGING:-false}'"
echo "readonly LOGGING_DESTINATION='${LOGGING_DESTINATION:-}'"
echo "readonly ENABLE_CLUSTER_DNS='${ENABLE_CLUSTER_DNS:-false}'"
echo "readonly DNS_SERVER_IP='${DNS_SERVER_IP:-}'"
echo "readonly DNS_DOMAIN='${DNS_DOMAIN:-}'"
grep -v "^#" "${KUBE_ROOT}/cluster/gce/templates/common.sh"
grep -v "^#" "${KUBE_ROOT}/cluster/gce/templates/format-and-mount-pd.sh"
grep -v "^#" "${KUBE_ROOT}/cluster/gce/templates/create-dynamic-salt-files.sh"

View File

@ -42,4 +42,16 @@ echo "... calling validate-cluster" >&2
echo "... calling setup-monitoring" >&2
setup-monitoring
if [[ "${ENABLE_CLUSTER_DNS}" == "true" ]]; then
echo "Installing cluster DNS"
sed -e "s/{DNS_DOMAIN}/$DNS_DOMAIN/g" \
-e "s/{DNS_REPLICAS}/$DNS_REPLICAS/g" \
"${KUBE_ROOT}/contrib/dns/skydns-rc.yaml.in" \
| "${KUBE_ROOT}/cluster/kubectl.sh" create -f -
sed -e "s/{DNS_SERVER_IP}/$DNS_SERVER_IP/g" \
"${KUBE_ROOT}/contrib/dns/skydns-svc.yaml.in" \
| "${KUBE_ROOT}/cluster/kubectl.sh" create -f -
fi
echo "Done" >&2

View File

@ -103,7 +103,7 @@ coreos:
Requires=download-release.service
[Service]
ExecStartPre=/usr/bin/ln -sf /opt/kubernetes/server/bin/kubelet /opt/bin/kubelet
ExecStart=/opt/bin/kubelet --address=$private_ipv4 --hostname_override=$private_ipv4 --etcd_servers=http://127.0.0.1:4001 --logtostderr=true --config=/opt/kubernetes-manifests
ExecStart=/opt/bin/kubelet --address=$private_ipv4 --hostname_override=$private_ipv4 --etcd_servers=http://127.0.0.1:4001 --logtostderr=true --config=/opt/kubernetes-manifests --cluster_dns=DNS_SERVER_IP --cluster_domain=DNS_DOMAIN
Restart=always
RestartSec=2
- name: minion-proxy.service

View File

@ -44,3 +44,9 @@ ENABLE_NODE_MONITORING=true
# Optional: Enable node logging.
ENABLE_NODE_LOGGING=true
LOGGING_DESTINATION=elasticsearch
# Optional: Install cluster DNS.
ENABLE_CLUSTER_DNS=true
DNS_SERVER_IP="10.0.0.10"
DNS_DOMAIN="kubernetes.local"
DNS_REPLICAS=1

View File

@ -187,6 +187,9 @@ rax-boot-minions() {
-e "s|ENABLE_NODE_MONITORING|${ENABLE_NODE_MONITORING:-false}|" \
-e "s|ENABLE_NODE_LOGGING|${ENABLE_NODE_LOGGING:-false}|" \
-e "s|LOGGING_DESTINATION|${LOGGING_DESTINATION:-}|" \
-e "s|ENABLE_CLUSTER_DNS|${ENABLE_CLUSTER_DNS:-false}|" \
-e "s|DNS_SERVER_IP|${DNS_SERVER_IP:-}|" \
-e "s|DNS_DOMAIN|${DNS_DOMAIN:-}|" \
$(dirname $0)/rackspace/cloud-config/minion-cloud-config.yaml > $KUBE_TEMP/minion-cloud-config-$(($i + 1)).yaml

View File

@ -27,4 +27,11 @@
{% set registry_qps = "-registry_qps=0.1" %}
DAEMON_ARGS="{{daemon_args}} {{etcd_servers}} {{apiservers}} {{auth_path}} {{hostname_override}} {{address}} {{config}} --allow_privileged={{pillar['allow_privileged']}} {{pillar['log_level']}}"
{% set cluster_dns = "" %}
{% set cluster_domain = "" %}
{% if pillar['enable_cluster_dns'] is defined and pillar['enable_cluster_dns'] %}
{% set cluster_dns = "-cluster_dns=" + pillar['dns_server'] %}
{% set cluster_domain = "-cluster_domain=" + pillar['dns_domain'] %}
{% endif %}
DAEMON_ARGS="{{daemon_args}} {{etcd_servers}} {{apiservers}} {{auth_path}} {{hostname_override}} {{address}} {{config}} --allow_privileged={{pillar['allow_privileged']}} {{pillar['log_level']}} {{cluster_dns}} {{cluster_domain}}"

View File

@ -56,3 +56,9 @@ LOGGING_DESTINATION=elasticsearch
# Extra options to set on the Docker command line. This is useful for setting
# --insecure-registry for local registries.
DOCKER_OPTS=""
# Optional: Install cluster DNS.
ENABLE_CLUSTER_DNS=true
DNS_SERVER_IP="10.0.0.10"
DNS_DOMAIN="kubernetes.local"
DNS_REPLICAS=1

View File

@ -84,6 +84,9 @@ cat <<EOF >/srv/salt-overlay/pillar/cluster-params.sls
enable_node_monitoring: '$(echo "$ENABLE_NODE_MONITORING" | sed -e "s/'/''/g")'
enable_node_logging: '$(echo "$ENABLE_NODE_LOGGING" | sed -e "s/'/''/g")'
logging_destination: '$(echo "$LOGGING_DESTINATION" | sed -e "s/'/''/g")'
enable_cluster_dns: '$(echo "$ENABLE_CLUSTER_DNS" | sed -e "s/'/''/g")'
dns_server: '$(echo "$DNS_SERVER_IP" | sed -e "s/'/''/g")'
dns_domain: '$(echo "$DNS_DOMAIN" | sed -e "s/'/''/g")'
EOF
# Configure the salt-master

View File

@ -39,3 +39,9 @@ ENABLE_NODE_MONITORING=true
# Optional: Enable node logging.
ENABLE_NODE_LOGGING=true
LOGGING_DESTINATION=elasticsearch
# Optional: Install cluster DNS.
ENABLE_CLUSTER_DNS=true
DNS_SERVER_IP="10.244.240.240"
DNS_DOMAIN="kubernetes.local"
DNS_REPLICAS=1

View File

@ -25,6 +25,9 @@ portal_net: $PORTAL_NET
enable_node_monitoring: $ENABLE_NODE_MONITORING
enable_node_logging: $ENABLE_NODE_LOGGING
logging_destination: $LOGGING_DESTINATION
enable_cluster_dns: $ENABLE_CLUSTER_DNS
dns_server: $DNS_SERVER_IP
dns_domain: $DNS_DOMAIN
EOF
mkdir -p /srv/salt-overlay/salt/nginx

View File

@ -294,6 +294,9 @@ function kube-up {
echo "readonly ENABLE_NODE_MONITORING='${ENABLE_NODE_MONITORING:-false}'"
echo "readonly ENABLE_NODE_LOGGING='${ENABLE_NODE_LOGGING:-false}'"
echo "readonly LOGGING_DESTINATION='${LOGGING_DESTINATION:-}'"
echo "readonly ENABLE_CLUSTER_DNS='${ENABLE_CLUSTER_DNS:-false}'"
echo "readonly DNS_SERVER_IP='${DNS_SERVER_IP:-}'"
echo "readonly DNS_DOMAIN='${DNS_DOMAIN:-}'"
echo "readonly SERVER_BINARY_TAR='${SERVER_BINARY_TAR##*/}'"
echo "readonly SALT_TAR='${SALT_TAR##*/}'"
echo "readonly MASTER_HTPASSWD='${htpasswd}'"

View File

@ -18,6 +18,7 @@ package main
import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
@ -53,7 +54,7 @@ func waitForPodRunning(c *client.Client, id string) {
if pod.Status.Phase == api.PodRunning {
break
}
glog.Infof("Waiting for pod status to be running (%s)", pod.Status.Phase)
glog.Infof("Waiting for pod status to be %q (found %q)", api.PodRunning, pod.Status.Phase)
}
}
@ -388,6 +389,116 @@ func outputTAPSummary(infoList []TestInfo) {
}
}
// TestClusterDNS checks that cluster DNS works.
func TestClusterDNS(c *client.Client) bool {
podClient := c.Pods(api.NamespaceDefault)
//TODO: Wait for skyDNS
// All the names we need to be able to resolve.
namesToResolve := []string{
"kubernetes-ro",
"kubernetes-ro.default",
"kubernetes-ro.default.kubernetes.local",
"google.com",
}
probeCmd := "for i in `seq 1 600`; do "
for _, name := range namesToResolve {
probeCmd += fmt.Sprintf("wget -O /dev/null %s && echo OK > /results/%s;", name, name)
}
probeCmd += "sleep 1; done"
// Run a pod which probes DNS and exposes the results by HTTP.
pod := &api.Pod{
TypeMeta: api.TypeMeta{
Kind: "Pod",
APIVersion: "v1beta1",
},
ObjectMeta: api.ObjectMeta{
Name: "dns-test",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "results",
Source: &api.VolumeSource{
EmptyDir: &api.EmptyDir{},
},
},
},
Containers: []api.Container{
{
Name: "webserver",
Image: "kubernetes/test-webserver",
VolumeMounts: []api.VolumeMount{
{
Name: "results",
MountPath: "/results",
},
},
},
{
Name: "pinger",
Image: "busybox",
Command: []string{"sh", "-c", probeCmd},
VolumeMounts: []api.VolumeMount{
{
Name: "results",
MountPath: "/results",
},
},
},
},
},
}
_, err := podClient.Create(pod)
if err != nil {
glog.Errorf("Failed to create dns-test pod: %v", err)
return false
}
defer podClient.Delete(pod.Name)
waitForPodRunning(c, pod.Name)
pod, err = podClient.Get(pod.Name)
if err != nil {
glog.Errorf("Failed to get pod: %v", err)
return false
}
// Try to find results for each expected name.
var failed []string
for try := 1; try < 100; try++ {
failed = []string{}
for _, name := range namesToResolve {
_, err := c.Get().
Path("proxy").
Namespace("default").
Path("pods").
Path(pod.Name).
Path("results").
Path(name).
Do().Raw()
if err != nil {
failed = append(failed, name)
}
}
if len(failed) == 0 {
break
}
time.Sleep(3 * time.Second)
}
if len(failed) != 0 {
glog.Errorf("DNS failed for: %v", failed)
return false
}
// TODO: probe from the host, too.
glog.Info("DNS probes succeeded")
return true
}
func main() {
flag.Parse()
goruntime.GOMAXPROCS(goruntime.NumCPU())
@ -410,6 +521,7 @@ func main() {
{TestImportantURLs, "TestImportantURLs", 3},
{TestPodUpdate, "TestPodUpdate", 4},
{TestNetwork, "TestNetwork", 5},
{TestClusterDNS, "TestClusterDNS", 6},
}
info := []TestInfo{}

View File

@ -63,12 +63,15 @@ var (
cAdvisorPort = flag.Uint("cadvisor_port", 4194, "The port of the localhost cAdvisor endpoint")
oomScoreAdj = flag.Int("oom_score_adj", -900, "The oom_score_adj value for kubelet process. Values must be within the range [-1000, 1000]")
apiServerList util.StringList
clusterDomain = flag.String("cluster_domain", "", "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains")
clusterDNS = util.IP(nil)
)
func init() {
flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd_config")
flag.Var(&address, "address", "The IP address for the info server to serve on (set to 0.0.0.0 for all interfaces)")
flag.Var(&apiServerList, "api_servers", "List of Kubernetes API servers to publish events to. (ip:port), comma separated.")
flag.Var(&clusterDNS, "cluster_dns", "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers")
}
func setupRunOnce() {
@ -114,6 +117,8 @@ func main() {
RegistryBurst: *registryBurst,
MinimumGCAge: *minimumGCAge,
MaxContainerCount: *maxContainerCount,
ClusterDomain: *clusterDomain,
ClusterDNS: clusterDNS,
Runonce: *runonce,
Port: *port,
CAdvisorPort: *cAdvisorPort,

View File

@ -0,0 +1,3 @@
# Maintainers
Tim Hockin <thockin@google.com>

View File

@ -38,11 +38,13 @@ sed -e "s/{DNS_SERVER_IP}/$DNS_SERVER_IP/g" \
```
## How does it work?
SkyDNS depends on etcd, but it doesn't really need what etcd offers when in
Kubernetes mode. SkyDNS finds the Kubernetes master through the
`kubernetes-ro` service, and pulls service info from it, essentially using
etcd as a cache. For simplicity, we run etcd and SkyDNS together in a pod,
without linking the etcd instances into a cluster.
SkyDNS depends on etcd for what to serve, but it doesn't really need all of
what etcd offers in the way we use it. For simplicty, we run etcd and SkyDNS
together in a pod, and we do not try to link etcd instances across replicas. A
helper container called `kube2sky` also runs in the pod and acts a bridge
between Kubernetes and SkyDNS. It finds the Kubernetes master through the
`kubernetes-ro` service, it pulls service info from the master, and it writes
that to etcd for SkyDNS to find.
## Known issues
DNS resolution does not work from nodes directly, but it DOES work for

View File

@ -0,0 +1,5 @@
FROM scratch
MAINTAINER Tim Hockin <thockin@google.com>
ADD kube2sky kube2sky
ADD kube2sky.go kube2sky.go
ENTRYPOINT ["/kube2sky"]

View File

@ -0,0 +1,13 @@
all: kube2sky
kube2sky: kube2sky.go
CGO_ENABLED=0 go build -a --ldflags '-w' ./kube2sky.go
container: kube2sky
sudo docker build -t kubernetes/kube2sky .
push:
sudo docker push kubernetes/kube2sky
clean:
rm -f kube2sky

View File

@ -0,0 +1,22 @@
# kube2sky
==============
A bridge between Kubernetes and SkyDNS. This will watch the kubernetes API for
changes in Services and then publish those changes to SkyDNS through etcd.
For now, this is expected to be run in a pod alongside the etcd and SkyDNS
containers.
## Namespaces
Kubernetes namespaces become another level of the DNS hierarchy. See the
description of `-domain` below.
## Flags
`-domain`: Set the domain under which all DNS names will be hosted. For
example, if this is set to `kubernetes.io`, then a service named "nifty" in the
"default" namespace would be exposed through DNS as
"nifty.default.kubernetes.io".
`-verbose`: Log additional information.

View File

@ -0,0 +1,252 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// kube2sky is a bridge between Kubernetes and SkyDNS. It watches the
// Kubernetes master for changes in Services and manifests them into etcd for
// SkyDNS to serve as DNS records.
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"os"
kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
klabels "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
kwatch "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
etcd "github.com/coreos/go-etcd/etcd"
skymsg "github.com/skynetservices/skydns/msg"
)
var (
domain = flag.String("domain", "kubernetes.local", "domain under which to create names")
verbose = flag.Bool("verbose", false, "log extra information")
)
func removeDNS(record string, etcdClient *etcd.Client) error {
log.Printf("Removing %s from DNS", record)
_, err := etcdClient.Delete(skymsg.Path(record), true)
return err
}
func addDNS(record string, service *kapi.Service, etcdClient *etcd.Client) error {
svc := skymsg.Service{
Host: service.Spec.PortalIP,
Port: service.Spec.Port,
Priority: 10,
Weight: 10,
Ttl: 30,
}
b, err := json.Marshal(svc)
if err != nil {
return err
}
// Set with no TTL, and hope that kubernetes events are accurate.
log.Printf("Setting dns record: %v -> %s:%d\n", record, service.Spec.PortalIP, service.Spec.Port)
_, err = etcdClient.Set(skymsg.Path(record), string(b), uint64(0))
return err
}
func newEtcdClient() (client *etcd.Client) {
// TODO: take a flag for etcd server(s).
client = etcd.NewClient([]string{"http://127.0.0.1:4001"})
if client == nil {
return nil
}
client.SyncCluster()
return client
}
// TODO: evaluate using pkg/client/clientcmd
func newKubeClient() (*kclient.Client, error) {
config := &kclient.Config{}
masterHost := os.Getenv("KUBERNETES_RO_SERVICE_HOST")
if masterHost == "" {
log.Fatalf("KUBERNETES_RO_SERVICE_HOST is not defined")
}
masterPort := os.Getenv("KUBERNETES_RO_SERVICE_PORT")
if masterPort == "" {
log.Fatalf("KUBERNETES_RO_SERVICE_PORT is not defined")
}
config.Host = fmt.Sprintf("http://%s:%s", masterHost, masterPort)
log.Printf("Using %s for kubernetes master", config.Host)
config.Version = "v1beta1"
log.Printf("Using kubernetes API %s", config.Version)
return kclient.New(config)
}
func buildNameString(service, namespace, domain string) string {
return fmt.Sprintf("%s.%s.%s.", service, namespace, domain)
}
func watchOnce(etcdClient *etcd.Client, kubeClient *kclient.Client) {
// Start the goroutine to produce update events.
updates := make(chan serviceUpdate)
startWatching(kubeClient.Services(kapi.NamespaceAll), updates)
// This loop will break if the channel closes, which is how the
// goroutine signals an error.
for ev := range updates {
if *verbose {
log.Printf("Received update event: %#v", ev)
}
switch ev.Op {
case SetServices, AddService:
for i := range ev.Services {
s := &ev.Services[i]
name := buildNameString(s.Name, s.Namespace, *domain)
err := addDNS(name, s, etcdClient)
if err != nil {
log.Printf("Failed to add DNS for %s: %v", name, err)
}
}
case RemoveService:
for i := range ev.Services {
s := &ev.Services[i]
name := buildNameString(s.Name, s.Namespace, *domain)
err := removeDNS(name, etcdClient)
if err != nil {
log.Printf("Failed to remove DNS for %s: %v", name, err)
}
}
}
}
//TODO: fully resync periodically.
}
func main() {
flag.Parse()
etcdClient := newEtcdClient()
if etcdClient == nil {
log.Fatal("Failed to create etcd client")
}
kubeClient, err := newKubeClient()
if err != nil {
log.Fatalf("Failed to create a kubernetes client: %v", err)
}
// In case of error, the watch will be aborted. At that point we just
// retry.
for {
watchOnce(etcdClient, kubeClient)
}
}
//FIXME: make the below part of the k8s client lib?
// servicesWatcher is capable of listing and watching for changes to services
// across ALL namespaces
type servicesWatcher interface {
List(label klabels.Selector) (*kapi.ServiceList, error)
Watch(label, field klabels.Selector, resourceVersion string) (kwatch.Interface, error)
}
type operation int
// These are the available operation types.
const (
SetServices operation = iota
AddService
RemoveService
)
// serviceUpdate describes an operation of services, sent on the channel.
//
// You can add or remove a single service by sending an array of size one with
// Op == AddService|RemoveService. For setting the state of the system to a given state, just
// set Services as desired and Op to SetServices, which will reset the system
// state to that specified in this operation for this source channel. To remove
// all services, set Services to empty array and Op to SetServices
type serviceUpdate struct {
Services []kapi.Service
Op operation
}
// startWatching launches a goroutine that watches for changes to services.
func startWatching(watcher servicesWatcher, updates chan<- serviceUpdate) {
serviceVersion := ""
go watchLoop(watcher, updates, &serviceVersion)
}
// watchLoop loops forever looking for changes to services. If an error occurs
// it will close the channel and return.
func watchLoop(svcWatcher servicesWatcher, updates chan<- serviceUpdate, resourceVersion *string) {
defer close(updates)
if len(*resourceVersion) == 0 {
services, err := svcWatcher.List(klabels.Everything())
if err != nil {
log.Printf("Failed to load services: %v", err)
return
}
*resourceVersion = services.ResourceVersion
updates <- serviceUpdate{Op: SetServices, Services: services.Items}
}
watcher, err := svcWatcher.Watch(klabels.Everything(), klabels.Everything(), *resourceVersion)
if err != nil {
log.Printf("Failed to watch for service changes: %v", err)
return
}
defer watcher.Stop()
ch := watcher.ResultChan()
for {
select {
case event, ok := <-ch:
if !ok {
log.Printf("watchLoop channel closed")
return
}
if event.Type == kwatch.Error {
if status, ok := event.Object.(*kapi.Status); ok {
log.Printf("Error during watch: %#v", status)
return
}
log.Fatalf("Received unexpected error: %#v", event.Object)
}
if service, ok := event.Object.(*kapi.Service); ok {
sendUpdate(updates, event, service, resourceVersion)
continue
}
}
}
}
func sendUpdate(updates chan<- serviceUpdate, event kwatch.Event, service *kapi.Service, resourceVersion *string) {
*resourceVersion = service.ResourceVersion
switch event.Type {
case kwatch.Added, kwatch.Modified:
updates <- serviceUpdate{Op: AddService, Services: []kapi.Service{*service}}
case kwatch.Deleted:
updates <- serviceUpdate{Op: RemoveService, Services: []kapi.Service{*service}}
default:
log.Fatalf("Unknown event.Type: %v", event.Type)
}
}

View File

@ -15,17 +15,25 @@ desiredState:
manifest:
version: v1beta2
id: skydns
dnsPolicy: "Default" # Don't use cluster DNS.
containers:
- name: etcd
image: quay.io/coreos/etcd:latest
command: [ "/etcd", "-bind-addr=127.0.0.1" ]
ports:
- name: server
containerPort: 7001
- name: skydns
image: skynetservices/skydns:k8sfix
command: [
"-kubernetes=true",
"/etcd",
"-bind-addr=127.0.0.1",
"-peer-bind-addr=127.0.0.1",
]
- name: kube2sky
image: kubernetes/kube2sky:1.0
command: [
# entrypoint = "/kube2sky",
"-domain={DNS_DOMAIN}",
]
- name: skydns
image: kubernetes/skydns:2014-12-23-001
command: [
# entrypoint = "/skydns",
"-machines=http://localhost:4001",
"-addr=0.0.0.0:53",
"-domain={DNS_DOMAIN}.",

View File

@ -0,0 +1,4 @@
FROM busybox
MAINTAINER Tim Hockin <thockin@google.com>
ADD skydns skydns
ENTRYPOINT ["/skydns"]

View File

@ -0,0 +1,13 @@
all: skydns
skydns:
CGO_ENABLED=0 go build -a --ldflags '-w' github.com/skynetservices/skydns
container: skydns
sudo docker build -t kubernetes/skydns .
push:
sudo docker push kubernetes/skydns
clean:
rm -f skydns

View File

@ -0,0 +1,5 @@
# skydns for kubernetes
=======================
This container only exists until skydns itself is reduced in some way. At the
time of this writing, it is over 600 MB large.

View File

@ -378,7 +378,8 @@ verify_from_container "${svc3_name}" "${svc3_ip}" "${svc3_port}" \
#
echo "Test 5: Remove the iptables rules, make sure they come back."
echo "Manually removing iptables rules"
ssh-to-node "${test_node}" "sudo iptables -t nat -F KUBE-PROXY"
ssh-to-node "${test_node}" "sudo iptables -t nat -F KUBE-PORTALS-HOST"
ssh-to-node "${test_node}" "sudo iptables -t nat -F KUBE-PORTALS-CONTAINER"
echo "Verifying the portals from the host"
wait_for_service_up "${svc3_name}" "${svc3_ip}" "${svc3_port}" \
"${svc3_count}" "${svc3_pods}"

View File

@ -448,11 +448,27 @@ type PodList struct {
Items []Pod `json:"items"`
}
// DNSPolicy defines how a pod's DNS will be configured.
type DNSPolicy string
const (
// DNSClusterFirst indicates that the pod should use cluster DNS
// first, if it is available, then fall back on the default (as
// determined by kubelet) DNS settings.
DNSClusterFirst DNSPolicy = "ClusterFirst"
// DNSDefault indicates that the pod should use the default (as
// determined by kubelet) DNS settings.
DNSDefault DNSPolicy = "Default"
)
// PodSpec is a description of a pod
type PodSpec struct {
Volumes []Volume `json:"volumes"`
Containers []Container `json:"containers"`
RestartPolicy RestartPolicy `json:"restartPolicy,omitempty"`
// Optional: Set DNS policy. Defaults to "ClusterFirst"
DNSPolicy DNSPolicy `json:"dnsPolicy,omitempty"`
// NodeSelector is a selector which must be true for the pod to fit on a node
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
@ -1029,6 +1045,8 @@ type ContainerManifest struct {
Volumes []Volume `json:"volumes"`
Containers []Container `json:"containers"`
RestartPolicy RestartPolicy `json:"restartPolicy,omitempty"`
// Optional: Set DNS policy. Defaults to "ClusterFirst"
DNSPolicy DNSPolicy `json:"dnsPolicy"`
}
// ContainerManifestList is used to communicate container manifests to kubelet.

View File

@ -411,6 +411,7 @@ func init() {
if err := s.Convert(&in.RestartPolicy, &out.RestartPolicy, 0); err != nil {
return err
}
out.DNSPolicy = DNSPolicy(in.DNSPolicy)
out.Version = "v1beta2"
return nil
},
@ -424,6 +425,7 @@ func init() {
if err := s.Convert(&in.RestartPolicy, &out.RestartPolicy, 0); err != nil {
return err
}
out.DNSPolicy = newer.DNSPolicy(in.DNSPolicy)
return nil
},

View File

@ -60,6 +60,8 @@ type ContainerManifest struct {
Volumes []Volume `json:"volumes" description:"list of volumes that can be mounted by containers belonging to the pod"`
Containers []Container `json:"containers" description:"list of containers belonging to the pod"`
RestartPolicy RestartPolicy `json:"restartPolicy,omitempty" description:"restart policy for all containers within the pod; one of RestartPolicyAlways, RestartPolicyOnFailure, RestartPolicyNever"`
// Optional: Set DNS policy. Defaults to "ClusterFirst"
DNSPolicy DNSPolicy `json:"dnsPolicy,omitempty" description:"DNS policy for containers within the pod; one of 'ClusterFirst' or 'Default'"`
}
// ContainerManifestList is used to communicate container manifests to kubelet.
@ -827,11 +829,27 @@ type EventList struct {
// Backported from v1beta3 to replace ContainerManifest
// DNSPolicy defines how a pod's DNS will be configured.
type DNSPolicy string
const (
// DNSClusterFirst indicates that the pod should use cluster DNS
// first, if it is available, then fall back on the default (as
// determined by kubelet) DNS settings.
DNSClusterFirst DNSPolicy = "ClusterFirst"
// DNSDefault indicates that the pod should use the default (as
// determined by kubelet) DNS settings.
DNSDefault DNSPolicy = "Default"
)
// PodSpec is a description of a pod
type PodSpec struct {
Volumes []Volume `json:"volumes" description:"list of volumes that can be mounted by containers belonging to the pod"`
Containers []Container `json:"containers" description:"list of containers belonging to the pod"`
RestartPolicy RestartPolicy `json:"restartPolicy,omitempty" description:"restart policy for all containers within the pod; one of RestartPolicyAlways, RestartPolicyOnFailure, RestartPolicyNever"`
// Optional: Set DNS policy. Defaults to "ClusterFirst"
DNSPolicy DNSPolicy `json:"dnsPolicy,omitempty" description:"DNS policy for containers within the pod; one of 'ClusterFirst' or 'Default'"`
// NodeSelector is a selector which must be true for the pod to fit on a node
NodeSelector map[string]string `json:"nodeSelector,omitempty" description:"selector which must match a node's labels for the pod to be scheduled on that node"`

View File

@ -286,6 +286,7 @@ func init() {
if err := s.Convert(&in.RestartPolicy, &out.RestartPolicy, 0); err != nil {
return err
}
out.DNSPolicy = DNSPolicy(in.DNSPolicy)
out.Version = "v1beta2"
return nil
},
@ -299,6 +300,7 @@ func init() {
if err := s.Convert(&in.RestartPolicy, &out.RestartPolicy, 0); err != nil {
return err
}
out.DNSPolicy = newer.DNSPolicy(in.DNSPolicy)
return nil
},

View File

@ -817,6 +817,8 @@ type ContainerManifest struct {
Volumes []Volume `json:"volumes" description:"list of volumes that can be mounted by containers belonging to the pod"`
Containers []Container `json:"containers" description:"list of containers belonging to the pod"`
RestartPolicy RestartPolicy `json:"restartPolicy,omitempty" description:"restart policy for all containers within the pod; one of RestartPolicyAlways, RestartPolicyOnFailure, RestartPolicyNever"`
// Optional: Set DNS policy. Defaults to "ClusterFirst"
DNSPolicy DNSPolicy `json:"dnsPolicy,omitempty" description:"DNS policy for containers within the pod; one of 'ClusterFirst' or 'Default'"`
}
// ContainerManifestList is used to communicate container manifests to kubelet.
@ -828,11 +830,27 @@ type ContainerManifestList struct {
// Backported from v1beta3 to replace ContainerManifest
// DNSPolicy defines how a pod's DNS will be configured.
type DNSPolicy string
const (
// DNSClusterFirst indicates that the pod should use cluster DNS
// first, if it is available, then fall back on the default (as
// determined by kubelet) DNS settings.
DNSClusterFirst DNSPolicy = "ClusterFirst"
// DNSDefault indicates that the pod should use the default (as
// determined by kubelet) DNS settings.
DNSDefault DNSPolicy = "Default"
)
// PodSpec is a description of a pod
type PodSpec struct {
Volumes []Volume `json:"volumes" description:"list of volumes that can be mounted by containers belonging to the pod"`
Containers []Container `json:"containers" description:"list of containers belonging to the pod"`
RestartPolicy RestartPolicy `json:"restartPolicy,omitempty" description:"restart policy for all containers within the pod; one of RestartPolicyAlways, RestartPolicyOnFailure, RestartPolicyNever"`
// Optional: Set DNS policy. Defaults to "ClusterFirst"
DNSPolicy DNSPolicy `json:"dnsPolicy,omitempty" description:"DNS policy for containers within the pod; one of 'ClusterFirst' or 'Default'"`
// NodeSelector is a selector which must be true for the pod to fit on a node
NodeSelector map[string]string `json:"nodeSelector,omitempty" description:"selector which must match a node's labels for the pod to be scheduled on that node"`

View File

@ -457,11 +457,27 @@ type RestartPolicy struct {
Never *RestartPolicyNever `json:"never,omitempty"`
}
// DNSPolicy defines how a pod's DNS will be configured.
type DNSPolicy string
const (
// DNSClusterFirst indicates that the pod should use cluster DNS
// first, if it is available, then fall back on the default (as
// determined by kubelet) DNS settings.
DNSClusterFirst DNSPolicy = "ClusterFirst"
// DNSDefault indicates that the pod should use the default (as
// determined by kubelet) DNS settings.
DNSDefault DNSPolicy = "Default"
)
// PodSpec is a description of a pod
type PodSpec struct {
Volumes []Volume `json:"volumes"`
Containers []Container `json:"containers"`
RestartPolicy RestartPolicy `json:"restartPolicy,omitempty"`
// Optional: Set DNS policy. Defaults to "ClusterFirst"
DNSPolicy DNSPolicy `json:"dnsPolicy,omitempty"`
// NodeSelector is a selector which must be true for the pod to fit on a node
NodeSelector map[string]string `json:"nodeSelector,omitempty"`

View File

@ -326,6 +326,7 @@ func ValidateManifest(manifest *api.ContainerManifest) errs.ValidationErrorList
allErrs = append(allErrs, vErrs.Prefix("volumes")...)
allErrs = append(allErrs, validateContainers(manifest.Containers, allVolumes).Prefix("containers")...)
allErrs = append(allErrs, validateRestartPolicy(&manifest.RestartPolicy).Prefix("restartPolicy")...)
allErrs = append(allErrs, validateDNSPolicy(&manifest.DNSPolicy).Prefix("dnsPolicy")...)
return allErrs
}
@ -350,6 +351,20 @@ func validateRestartPolicy(restartPolicy *api.RestartPolicy) errs.ValidationErro
return allErrors
}
func validateDNSPolicy(dnsPolicy *api.DNSPolicy) errs.ValidationErrorList {
allErrors := errs.ValidationErrorList{}
switch *dnsPolicy {
case "":
// TODO: move this out to standard defaulting logic, when that is ready.
*dnsPolicy = api.DNSClusterFirst // Default value.
case api.DNSClusterFirst, api.DNSDefault:
break
default:
allErrors = append(allErrors, errs.NewFieldNotSupported("", dnsPolicy))
}
return allErrors
}
// ValidatePod tests if required fields in the pod are set.
func ValidatePod(pod *api.Pod) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
@ -375,6 +390,7 @@ func ValidatePodSpec(spec *api.PodSpec) errs.ValidationErrorList {
allErrs = append(allErrs, vErrs.Prefix("volumes")...)
allErrs = append(allErrs, validateContainers(spec.Containers, allVolumes).Prefix("containers")...)
allErrs = append(allErrs, validateRestartPolicy(&spec.RestartPolicy).Prefix("restartPolicy")...)
allErrs = append(allErrs, validateDNSPolicy(&spec.DNSPolicy).Prefix("dnsPolicy")...)
allErrs = append(allErrs, validateLabels(spec.NodeSelector, "nodeSelector")...)
return allErrs
}

View File

@ -19,6 +19,8 @@ package kubelet
import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"path"
@ -69,7 +71,9 @@ func NewMainKubelet(
pullBurst int,
minimumGCAge time.Duration,
maxContainerCount int,
sourcesReady SourcesReadyFn) *Kubelet {
sourcesReady SourcesReadyFn,
clusterDomain string,
clusterDNS net.IP) *Kubelet {
return &Kubelet{
hostname: hn,
dockerClient: dc,
@ -86,6 +90,8 @@ func NewMainKubelet(
minimumGCAge: minimumGCAge,
maxContainerCount: maxContainerCount,
sourcesReady: sourcesReady,
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
}
}
@ -138,6 +144,12 @@ type Kubelet struct {
// Optional, minimum age required for garbage collection. If zero, no limit.
minimumGCAge time.Duration
maxContainerCount int
// If non-empty, use this for container DNS search.
clusterDomain string
// If non-nil, use this for container DNS server.
clusterDNS net.IP
}
// GetRootDir returns the full path to the directory under which kubelet can
@ -561,12 +573,18 @@ func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, pod
} else if container.Privileged {
return "", fmt.Errorf("container requested privileged mode, but it is disallowed globally.")
}
err = kl.dockerClient.StartContainer(dockerContainer.ID, &docker.HostConfig{
hc := &docker.HostConfig{
PortBindings: portBindings,
Binds: binds,
NetworkMode: netMode,
Privileged: privileged,
})
}
if pod.Spec.DNSPolicy == api.DNSClusterFirst {
if err := kl.applyClusterDNS(hc, pod); err != nil {
return "", err
}
}
err = kl.dockerClient.StartContainer(dockerContainer.ID, hc)
if err != nil {
if ref != nil {
record.Eventf(ref, "failed", "failed",
@ -588,6 +606,62 @@ func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, pod
return dockertools.DockerID(dockerContainer.ID), err
}
func (kl *Kubelet) applyClusterDNS(hc *docker.HostConfig, pod *api.BoundPod) error {
// Get host DNS settings and append them to cluster DNS settings.
f, err := os.Open("/etc/resolv.conf")
if err != nil {
return err
}
defer f.Close()
hostDNS, hostSearch, err := parseResolvConf(f)
if err != nil {
return err
}
if kl.clusterDNS != nil {
hc.DNS = append([]string{kl.clusterDNS.String()}, hostDNS...)
}
if kl.clusterDomain != "" {
nsDomain := fmt.Sprintf("%s.%s", pod.Namespace, kl.clusterDomain)
hc.DNSSearch = append([]string{nsDomain, kl.clusterDomain}, hostSearch...)
}
return nil
}
// Returns the list of DNS servers and DNS search domains.
func parseResolvConf(reader io.Reader) (nameservers []string, searches []string, err error) {
file, err := ioutil.ReadAll(reader)
if err != nil {
return nil, nil, err
}
// Lines of the form "nameserver 1.2.3.4" accumulate.
nameservers = []string{}
// Lines of the form "search example.com" overrule - last one wins.
searches = []string{}
lines := strings.Split(string(file), "\n")
for l := range lines {
trimmed := strings.TrimSpace(lines[l])
if strings.HasPrefix(trimmed, "#") {
continue
}
fields := strings.Fields(trimmed)
if len(fields) == 0 {
continue
}
if fields[0] == "nameserver" {
nameservers = append(nameservers, fields[1:]...)
}
if fields[0] == "search" {
searches = fields[1:]
}
}
return nameservers, searches, nil
}
// Kill a docker container
func (kl *Kubelet) killContainer(dockerContainer *docker.APIContainers) error {
return kl.killContainerByID(dockerContainer.ID, dockerContainer.Names[0])

View File

@ -1723,3 +1723,47 @@ func TestGarbageCollectImages(t *testing.T) {
t.Errorf("unexpected images removed: %v", fakeDocker.RemovedImages)
}
}
func TestParseResolvConf(t *testing.T) {
testCases := []struct {
data string
nameservers []string
searches []string
}{
{"", []string{}, []string{}},
{" ", []string{}, []string{}},
{"\n", []string{}, []string{}},
{"\t\n\t", []string{}, []string{}},
{"#comment\n", []string{}, []string{}},
{" #comment\n", []string{}, []string{}},
{"#comment\n#comment", []string{}, []string{}},
{"#comment\nnameserver", []string{}, []string{}},
{"#comment\nnameserver\nsearch", []string{}, []string{}},
{"nameserver 1.2.3.4", []string{"1.2.3.4"}, []string{}},
{" nameserver 1.2.3.4", []string{"1.2.3.4"}, []string{}},
{"\tnameserver 1.2.3.4", []string{"1.2.3.4"}, []string{}},
{"nameserver\t1.2.3.4", []string{"1.2.3.4"}, []string{}},
{"nameserver \t 1.2.3.4", []string{"1.2.3.4"}, []string{}},
{"nameserver 1.2.3.4\nnameserver 5.6.7.8", []string{"1.2.3.4", "5.6.7.8"}, []string{}},
{"search foo", []string{}, []string{"foo"}},
{"search foo bar", []string{}, []string{"foo", "bar"}},
{"search foo bar bat\n", []string{}, []string{"foo", "bar", "bat"}},
{"search foo\nsearch bar", []string{}, []string{"bar"}},
{"nameserver 1.2.3.4\nsearch foo bar", []string{"1.2.3.4"}, []string{"foo", "bar"}},
{"nameserver 1.2.3.4\nsearch foo\nnameserver 5.6.7.8\nsearch bar", []string{"1.2.3.4", "5.6.7.8"}, []string{"bar"}},
{"#comment\nnameserver 1.2.3.4\n#comment\nsearch foo\ncomment", []string{"1.2.3.4"}, []string{"foo"}},
}
for i, tc := range testCases {
ns, srch, err := parseResolvConf(strings.NewReader(tc.data))
if err != nil {
t.Errorf("expected success, got %v", err)
continue
}
if !reflect.DeepEqual(ns, tc.nameservers) {
t.Errorf("[%d] expected nameservers %#v, got %#v", i, tc.nameservers, ns)
}
if !reflect.DeepEqual(srch, tc.searches) {
t.Errorf("[%d] expected searches %#v, got %#v", i, tc.searches, srch)
}
}
}

View File

@ -299,18 +299,32 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er
// Proxier is a simple proxy for TCP connections between a localhost:lport
// and services that provide the actual implementations.
type Proxier struct {
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[string]*serviceInfo
listenAddress net.IP
iptables iptables.Interface
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[string]*serviceInfo
listenIP net.IP
iptables iptables.Interface
hostIP net.IP
}
// NewProxier returns a new Proxier given a LoadBalancer and an address on
// which to listen. Because of the iptables logic, It is assumed that there
// is only a single Proxier active on a machine.
func NewProxier(loadBalancer LoadBalancer, listenAddress net.IP, iptables iptables.Interface) *Proxier {
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface) *Proxier {
if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) {
glog.Errorf("Can't proxy only on localhost - iptables can't do it")
return nil
}
hostIP, err := chooseHostInterface()
if err != nil {
glog.Errorf("Failed to select a host interface: %v", err)
return nil
}
glog.Infof("Initializing iptables")
// Clean up old messes. Ignore erors.
iptablesDeleteOld(iptables)
// Set up the iptables foundations we need.
if err := iptablesInit(iptables); err != nil {
glog.Errorf("Failed to initialize iptables: %v", err)
@ -323,10 +337,11 @@ func NewProxier(loadBalancer LoadBalancer, listenAddress net.IP, iptables iptabl
return nil
}
return &Proxier{
loadBalancer: loadBalancer,
serviceMap: make(map[string]*serviceInfo),
listenAddress: listenAddress,
iptables: iptables,
loadBalancer: loadBalancer,
serviceMap: make(map[string]*serviceInfo),
listenIP: listenIP,
iptables: iptables,
hostIP: hostIP,
}
}
@ -400,7 +415,7 @@ func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) {
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
// connections, for now.
func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) {
sock, err := newProxySocket(protocol, proxier.listenAddress, proxyPort)
sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort)
if err != nil {
return nil, err
}
@ -514,83 +529,127 @@ func ipsEqual(lhs, rhs []string) bool {
}
func (proxier *Proxier) openPortal(service string, info *serviceInfo) error {
args := iptablesPortalArgs(info.portalIP, info.portalPort, info.protocol, proxier.listenAddress, info.proxyPort, service)
existed, err := proxier.iptables.EnsureRule(iptables.TableNAT, iptablesProxyChain, args...)
err := proxier.openOnePortal(info.portalIP, info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
glog.Errorf("Failed to install iptables %s rule for service %q", iptablesProxyChain, service)
return err
}
if !existed {
glog.Infof("Opened iptables portal for service %q on %s:%d", service, info.portalIP, info.portalPort)
}
if len(info.publicIP) > 0 {
return proxier.openExternalPortal(service, info)
for _, publicIP := range info.publicIP {
err = proxier.openOnePortal(net.ParseIP(publicIP), info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
}
}
return nil
}
func (proxier *Proxier) openExternalPortal(service string, info *serviceInfo) error {
for _, publicIP := range info.publicIP {
args := iptablesPortalArgs(net.ParseIP(publicIP), info.portalPort, info.protocol, proxier.listenAddress, info.proxyPort, service)
existed, err := proxier.iptables.EnsureRule(iptables.TableNAT, iptablesProxyChain, args...)
if err != nil {
glog.Errorf("Failed to install iptables %s rule for service %q", iptablesProxyChain, service)
return err
}
if !existed {
glog.Infof("Opened iptables external portal for service %q on %s:%d", service, publicIP, info.proxyPort)
}
func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name string) error {
// Handle traffic from containers.
args := proxier.iptablesContainerPortalArgs(portalIP, portalPort, protocol, proxyIP, proxyPort, name)
existed, err := proxier.iptables.EnsureRule(iptables.TableNAT, iptablesContainerPortalChain, args...)
if err != nil {
glog.Errorf("Failed to install iptables %s rule for service %q", iptablesContainerPortalChain, name)
return err
}
if !existed {
glog.Infof("Opened iptables from-containers portal for service %q on %s %s:%d", name, protocol, portalIP, portalPort)
}
// Handle traffic from the host.
args = proxier.iptablesHostPortalArgs(portalIP, portalPort, protocol, proxyIP, proxyPort, name)
existed, err = proxier.iptables.EnsureRule(iptables.TableNAT, iptablesHostPortalChain, args...)
if err != nil {
glog.Errorf("Failed to install iptables %s rule for service %q", iptablesHostPortalChain, name)
return err
}
if !existed {
glog.Infof("Opened iptables from-host portal for service %q on %s %s:%d", name, protocol, portalIP, portalPort)
}
return nil
}
func (proxier *Proxier) closePortal(service string, info *serviceInfo) error {
args := iptablesPortalArgs(info.portalIP, info.portalPort, info.protocol, proxier.listenAddress, info.proxyPort, service)
if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesProxyChain, args...); err != nil {
glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesProxyChain, service)
return err
}
if len(info.publicIP) > 0 {
return proxier.closeExternalPortal(service, info)
}
glog.Infof("Closed iptables portal for service %q", service)
return nil
}
func (proxier *Proxier) closeExternalPortal(service string, info *serviceInfo) error {
// Collect errors and report them all at the end.
el := proxier.closeOnePortal(info.portalIP, info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)
for _, publicIP := range info.publicIP {
args := iptablesPortalArgs(net.ParseIP(publicIP), info.portalPort, info.protocol, proxier.listenAddress, info.proxyPort, service)
if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesProxyChain, args...); err != nil {
glog.Errorf("Failed to delete external iptables %s rule for service %q", iptablesProxyChain, service)
return err
}
el = append(el, proxier.closeOnePortal(net.ParseIP(publicIP), info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)...)
}
glog.Infof("Closed external iptables portal for service %q", service)
return nil
if len(el) == 0 {
glog.Infof("Closed iptables portals for service %q", service)
} else {
glog.Errorf("Some errors closing iptables portals for service %q", service)
}
return util.SliceToError(el)
}
var iptablesProxyChain iptables.Chain = "KUBE-PROXY"
func (proxier *Proxier) closeOnePortal(portalIP net.IP, portalPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name string) []error {
el := []error{}
// Handle traffic from containers.
args := proxier.iptablesContainerPortalArgs(portalIP, portalPort, protocol, proxyIP, proxyPort, name)
if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesContainerPortalChain, args...); err != nil {
glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesContainerPortalChain, name)
el = append(el, err)
}
// Handle traffic from the host.
args = proxier.iptablesHostPortalArgs(portalIP, portalPort, protocol, proxyIP, proxyPort, name)
if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesHostPortalChain, args...); err != nil {
glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesHostPortalChain, name)
el = append(el, err)
}
return el
}
// See comments in the *PortalArgs() functions for some details about why we
// use two chains.
var iptablesContainerPortalChain iptables.Chain = "KUBE-PORTALS-CONTAINER"
var iptablesHostPortalChain iptables.Chain = "KUBE-PORTALS-HOST"
var iptablesOldPortalChain iptables.Chain = "KUBE-PROXY"
// Ensure that the iptables infrastructure we use is set up. This can safely be called periodically.
func iptablesInit(ipt iptables.Interface) error {
// TODO: There is almost certainly room for optimization here. E.g. If
// we knew the portal_net CIDR we could fast-track outbound packets not
// destined for a service. There's probably more, help wanted.
if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesProxyChain); err != nil {
if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesContainerPortalChain); err != nil {
return err
}
if _, err := ipt.EnsureRule(iptables.TableNAT, iptables.ChainPrerouting, "-j", string(iptablesProxyChain)); err != nil {
if _, err := ipt.EnsureRule(iptables.TableNAT, iptables.ChainPrerouting, "-j", string(iptablesContainerPortalChain)); err != nil {
return err
}
if _, err := ipt.EnsureRule(iptables.TableNAT, iptables.ChainOutput, "-j", string(iptablesProxyChain)); err != nil {
if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesHostPortalChain); err != nil {
return err
}
if _, err := ipt.EnsureRule(iptables.TableNAT, iptables.ChainOutput, "-j", string(iptablesHostPortalChain)); err != nil {
return err
}
return nil
}
func iptablesDeleteOld(ipt iptables.Interface) {
// DEPRECATED: The iptablesOldPortalChain is from when we had a single chain
// for all rules. We'll unilaterally delete it here. We will remove this
// code at some future date (before 1.0).
ipt.DeleteRule(iptables.TableNAT, iptables.ChainPrerouting, "-j", string(iptablesOldPortalChain))
ipt.DeleteRule(iptables.TableNAT, iptables.ChainOutput, "-j", string(iptablesOldPortalChain))
ipt.FlushChain(iptables.TableNAT, iptablesOldPortalChain)
ipt.DeleteChain(iptables.TableNAT, iptablesOldPortalChain)
}
// Flush all of our custom iptables rules.
func iptablesFlush(ipt iptables.Interface) error {
return ipt.FlushChain(iptables.TableNAT, iptablesProxyChain)
el := []error{}
if err := ipt.FlushChain(iptables.TableNAT, iptablesContainerPortalChain); err != nil {
el = append(el, err)
}
if err := ipt.FlushChain(iptables.TableNAT, iptablesHostPortalChain); err != nil {
el = append(el, err)
}
if len(el) != 0 {
glog.Errorf("Some errors flushing old iptables portals: %v", el)
}
return util.SliceToError(el)
}
// Used below.
@ -600,8 +659,8 @@ var localhostIPv4 = net.ParseIP("127.0.0.1")
var zeroIPv6 = net.ParseIP("::0")
var localhostIPv6 = net.ParseIP("::1")
// Build a slice of iptables args for a portal rule.
func iptablesPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service string) []string {
// Build a slice of iptables args that are common to from-container and from-host portal rules.
func iptablesCommonPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, service string) []string {
// This list needs to include all fields as they are eventually spit out
// by iptables-save. This is because some systems do not support the
// 'iptables -C' arg, and so fall back on parsing iptables-save output.
@ -618,14 +677,34 @@ func iptablesPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, prox
"-d", fmt.Sprintf("%s/32", destIP.String()),
"--dport", fmt.Sprintf("%d", destPort),
}
// This is tricky. If the proxy is bound (see Proxier.listenAddress)
// to 0.0.0.0 ("any interface") or 127.0.0.1, we can use REDIRECT,
// which will bring packets back to the host's loopback interface. If
// the proxy is bound to any other interface, then it is not listening
// on the hosts's loopback, so we have to use DNAT to that specific
// IP. We can not simply use DNAT to 127.0.0.1 in the first case
// because from within a container, 127.0.0.1 is the container's
// loopback interface, not the host's.
return args
}
// Build a slice of iptables args for a from-container portal rule.
func (proxier *Proxier) iptablesContainerPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service string) []string {
args := iptablesCommonPortalArgs(destIP, destPort, protocol, service)
// This is tricky.
//
// If the proxy is bound (see Proxier.listenIP) to 0.0.0.0 ("any
// interface") we want to use REDIRECT, which sends traffic to the
// "primary address of the incoming interface" which means the container
// bridge, if there is one. When the response comes, it comes from that
// same interface, so the NAT matches and the response packet is
// correct. This matters for UDP, since there is no per-connection port
// number.
//
// The alternative would be to use DNAT, except that it doesn't work
// (empirically):
// * DNAT to 127.0.0.1 = Packets just disappear - this seems to be a
// well-known limitation of iptables.
// * DNAT to eth0's IP = Response packets come from the bridge, which
// breaks the NAT, and makes things like DNS not accept them. If
// this could be resolved, it would simplify all of this code.
//
// If the proxy is bound to a specific IP, then we have to use DNAT to
// that IP. Unlike the previous case, this works because the proxy is
// ONLY listening on that IP, not the bridge.
//
// Why would anyone bind to an address that is not inclusive of
// localhost? Apparently some cloud environments have their public IP
@ -635,8 +714,10 @@ func iptablesPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, prox
// Unfortunately, I don't know of any way to listen on some (N > 1)
// interfaces but not ALL interfaces, short of doing it manually, and
// this is simpler than that.
if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) ||
proxyIP.Equal(localhostIPv4) || proxyIP.Equal(localhostIPv6) {
//
// If the proxy is bound to localhost only, all of this is broken. Not
// allowed.
if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) {
// TODO: Can we REDIRECT with IPv6?
args = append(args, "-j", "REDIRECT", "--to-ports", fmt.Sprintf("%d", proxyPort))
} else {
@ -645,3 +726,72 @@ func iptablesPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, prox
}
return args
}
// Build a slice of iptables args for a from-host portal rule.
func (proxier *Proxier) iptablesHostPortalArgs(destIP net.IP, destPort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, service string) []string {
args := iptablesCommonPortalArgs(destIP, destPort, protocol, service)
// This is tricky.
//
// If the proxy is bound (see Proxier.listenIP) to 0.0.0.0 ("any
// interface") we want to do the same as from-container traffic and use
// REDIRECT. Except that it doesn't work (empirically). REDIRECT on
// localpackets sends the traffic to localhost (special case, but it is
// documented) but the response comes from the eth0 IP (not sure why,
// truthfully), which makes DNS unhappy.
//
// So we have to use DNAT. DNAT to 127.0.0.1 can't work for the same
// reason.
//
// So we do our best to find an interface that is not a loopback and
// DNAT to that. This works (again, empirically).
//
// If the proxy is bound to a specific IP, then we have to use DNAT to
// that IP. Unlike the previous case, this works because the proxy is
// ONLY listening on that IP, not the bridge.
//
// If the proxy is bound to localhost only, this should work, but we
// don't allow it for now.
if proxyIP.Equal(zeroIPv4) || proxyIP.Equal(zeroIPv6) {
proxyIP = proxier.hostIP
}
// TODO: Can we DNAT with IPv6?
args = append(args, "-j", "DNAT", "--to-destination", net.JoinHostPort(proxyIP.String(), strconv.Itoa(proxyPort)))
return args
}
func chooseHostInterface() (net.IP, error) {
intfs, err := net.Interfaces()
if err != nil {
return nil, err
}
i := 0
for i = range intfs {
if flagsSet(intfs[i].Flags, net.FlagUp) && flagsClear(intfs[i].Flags, net.FlagLoopback|net.FlagPointToPoint) {
// This interface should suffice.
break
}
}
if i == len(intfs) {
return nil, err
}
glog.V(2).Infof("Choosing interface %s for from-host portals", intfs[i].Name)
addrs, err := intfs[i].Addrs()
if err != nil {
return nil, err
}
glog.V(2).Infof("Interface %s = %s", intfs[i].Name, addrs[0].String())
ip, _, err := net.ParseCIDR(addrs[0].String())
if err != nil {
return nil, err
}
return ip, nil
}
func flagsSet(flags net.Flags, test net.Flags) bool {
return flags&test != 0
}
func flagsClear(flags net.Flags, test net.Flags) bool {
return flags&test == 0
}

View File

@ -82,6 +82,10 @@ func (fake *fakeIptables) EnsureChain(table iptables.Table, chain iptables.Chain
return false, nil
}
func (fake *fakeIptables) DeleteChain(table iptables.Table, chain iptables.Chain) error {
return nil
}
func (fake *fakeIptables) FlushChain(table iptables.Table, chain iptables.Chain) error {
return nil
}
@ -176,7 +180,7 @@ func TestTCPProxy(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil {
@ -194,7 +198,7 @@ func TestUDPProxy(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil {
@ -221,7 +225,7 @@ func TestTCPProxyStop(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil {
@ -249,7 +253,7 @@ func TestUDPProxyStop(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil {
@ -277,7 +281,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil {
@ -304,7 +308,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil {
@ -331,7 +335,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil {
@ -362,7 +366,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil {
@ -393,7 +397,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil {
@ -438,7 +442,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
p := NewProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{})
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil {

View File

@ -244,6 +244,8 @@ type KubeletConfig struct {
RegistryBurst int
MinimumGCAge time.Duration
MaxContainerCount int
ClusterDomain string
ClusterDNS util.IP
EnableServer bool
EnableDebuggingHandlers bool
Port uint
@ -265,7 +267,9 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) *kubelet.Kube
kc.RegistryBurst,
kc.MinimumGCAge,
kc.MaxContainerCount,
pc.SeenAllSources)
pc.SeenAllSources,
kc.ClusterDomain,
net.IP(kc.ClusterDNS))
k.BirthCry()

View File

@ -32,8 +32,10 @@ import (
type Interface interface {
// EnsureChain checks if the specified chain exists and, if not, creates it. If the chain existed, return true.
EnsureChain(table Table, chain Chain) (bool, error)
// FlushChain clears the specified chain.
// FlushChain clears the specified chain. If the chain did not exist, return error.
FlushChain(table Table, chain Chain) error
// DeleteChain deletes the specified chain. If the chain did not exist, return error.
DeleteChain(table Table, chain Chain) error
// EnsureRule checks if the specified rule is present and, if not, creates it. If the rule existed, return true.
EnsureRule(table Table, chain Chain, args ...string) (bool, error)
// DeleteRule checks if the specified rule is present and, if so, deletes it.
@ -108,6 +110,21 @@ func (runner *runner) FlushChain(table Table, chain Chain) error {
return nil
}
// DeleteChain is part of Interface.
func (runner *runner) DeleteChain(table Table, chain Chain) error {
fullArgs := makeFullArgs(table, chain)
runner.mu.Lock()
defer runner.mu.Unlock()
// TODO: we could call iptable -S first, ignore the output and check for non-zero return (more like DeleteRule)
out, err := runner.run(opDeleteChain, fullArgs)
if err != nil {
return fmt.Errorf("error deleting chain %q: %v: %s", chain, err, out)
}
return nil
}
// EnsureRule is part of Interface.
func (runner *runner) EnsureRule(table Table, chain Chain, args ...string) (bool, error) {
fullArgs := makeFullArgs(table, chain, args...)
@ -257,6 +274,7 @@ type operation string
const (
opCreateChain operation = "-N"
opFlushChain operation = "-F"
opDeleteChain operation = "-X"
opAppendRule operation = "-A"
opCheckRule operation = "-C"
opDeleteRule operation = "-D"

View File

@ -124,6 +124,40 @@ func TestFlushChain(t *testing.T) {
}
}
func TestDeleteChain(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
// Success.
func() ([]byte, error) { return []byte{}, nil },
// Failure.
func() ([]byte, error) { return nil, &exec.FakeExitError{1} },
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, ProtocolIpv4)
// Success.
err := runner.DeleteChain(TableNAT, Chain("FOOBAR"))
if err != nil {
t.Errorf("expected success, got %v", err)
}
if fcmd.CombinedOutputCalls != 1 {
t.Errorf("expected 1 CombinedOutput() call, got %d", fcmd.CombinedOutputCalls)
}
if !util.NewStringSet(fcmd.CombinedOutputLog[0]...).HasAll("iptables", "-t", "nat", "-X", "FOOBAR") {
t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0])
}
// Failure.
err = runner.DeleteChain(TableNAT, Chain("FOOBAR"))
if err == nil {
t.Errorf("expected failure")
}
}
func TestEnsureRuleAlreadyExists(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{