mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 22:46:12 +00:00
Merge pull request #13759 from upmc-enterprises/pod_dns
Allow pods to have dns
This commit is contained in:
commit
6eb7d8cf1b
@ -23,7 +23,9 @@ The following sections detail the supported record types and layout that is
|
||||
supported. Any other layout or names or queries that happen to work are
|
||||
considered implementation details and are subject to change without warning.
|
||||
|
||||
### A records
|
||||
### Services
|
||||
|
||||
#### A records
|
||||
"Normal" (not headless) Services are assigned a DNS A record for a name of the
|
||||
form `my-svc.my-namespace.svc.cluster.local`. This resolves to the cluster IP
|
||||
of the Service.
|
||||
@ -50,6 +52,13 @@ Previous versions of kube-dns made names of the for
|
||||
`my-svc.my-namespace.cluster.local` (the 'svc' level was added later). This
|
||||
is no longer supported.
|
||||
|
||||
### Pods
|
||||
|
||||
#### A Records
|
||||
When enabled, pods are assigned a DNS A record in the form of `pod-ip-address.my-namespace.pod.cluster.local`.
|
||||
|
||||
For example, a pod with ip `1.2.3.4` in the namespace `default` with a dns name of `cluster.local` would have an entry: `1-2-3-4.default.pod.cluster.local`.
|
||||
|
||||
## How do I find the DNS server?
|
||||
The DNS server itself runs as a Kubernetes Service. This gives it a stable IP
|
||||
address. When you run the SkyDNS service, you want to assign a static IP to use for
|
||||
@ -126,7 +135,7 @@ Then create a pod using this file:
|
||||
kubectl create -f busybox.yaml
|
||||
```
|
||||
|
||||
### 2 Wait for this pod to go into the running state.
|
||||
### 2 Wait for this pod to go into the running state.
|
||||
|
||||
You can get its status with:
|
||||
```
|
||||
|
@ -17,11 +17,11 @@ description of `-domain` below.
|
||||
`-domain`: Set the domain under which all DNS names will be hosted. For
|
||||
example, if this is set to `kubernetes.io`, then a service named "nifty" in the
|
||||
"default" namespace would be exposed through DNS as
|
||||
"nifty.default.kubernetes.io".
|
||||
"nifty.default.svc.kubernetes.io".
|
||||
|
||||
`-v`: Set logging level
|
||||
|
||||
`-etcd_mutation_timeout`: For how long the application will keep retrying etcd
|
||||
`-etcd_mutation_timeout`: For how long the application will keep retrying etcd
|
||||
mutation (insertion or removal of a dns entry) before giving up and crashing.
|
||||
|
||||
`-etcd-server`: The etcd server that is being used by skydns.
|
||||
|
@ -61,6 +61,8 @@ const (
|
||||
resyncPeriod = 30 * time.Minute
|
||||
// A subdomain added to the user specified domain for all services.
|
||||
serviceSubdomain = "svc"
|
||||
// A subdomain added to the user specified dmoain for all pods.
|
||||
podSubdomain = "pod"
|
||||
)
|
||||
|
||||
type etcdClient interface {
|
||||
@ -222,6 +224,59 @@ func (ks *kube2sky) handleEndpointAdd(obj interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (ks *kube2sky) handlePodCreate(obj interface{}) {
|
||||
if e, ok := obj.(*kapi.Pod); ok {
|
||||
// If the pod ip is not yet available, do not attempt to create.
|
||||
if e.Status.PodIP != "" {
|
||||
name := buildDNSNameString(ks.domain, podSubdomain, e.Namespace, santizeIP(e.Status.PodIP))
|
||||
ks.mutateEtcdOrDie(func() error { return ks.generateRecordsForPod(name, e) })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ks *kube2sky) handlePodUpdate(old interface{}, new interface{}) {
|
||||
oldPod, okOld := old.(*kapi.Pod)
|
||||
newPod, okNew := new.(*kapi.Pod)
|
||||
|
||||
// Validate that the objects are good
|
||||
if okOld && okNew {
|
||||
if oldPod.Status.PodIP != newPod.Status.PodIP {
|
||||
ks.handlePodDelete(oldPod)
|
||||
ks.handlePodCreate(newPod)
|
||||
}
|
||||
} else if okNew {
|
||||
ks.handlePodCreate(newPod)
|
||||
} else if okOld {
|
||||
ks.handlePodDelete(oldPod)
|
||||
}
|
||||
}
|
||||
|
||||
func (ks *kube2sky) handlePodDelete(obj interface{}) {
|
||||
if e, ok := obj.(*kapi.Pod); ok {
|
||||
if e.Status.PodIP != "" {
|
||||
name := buildDNSNameString(ks.domain, podSubdomain, e.Namespace, santizeIP(e.Status.PodIP))
|
||||
ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ks *kube2sky) generateRecordsForPod(subdomain string, service *kapi.Pod) error {
|
||||
b, err := json.Marshal(getSkyMsg(service.Status.PodIP, 0))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
recordValue := string(b)
|
||||
recordLabel := getHash(recordValue)
|
||||
recordKey := buildDNSNameString(subdomain, recordLabel)
|
||||
|
||||
glog.V(2).Infof("Setting DNS record: %v -> %q, with recordKey: %v\n", subdomain, recordValue, recordKey)
|
||||
if err := ks.writeSkyRecord(recordKey, recordValue); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ks *kube2sky) generateRecordsForPortalService(subdomain string, service *kapi.Service) error {
|
||||
b, err := json.Marshal(getSkyMsg(service.Spec.ClusterIP, 0))
|
||||
if err != nil {
|
||||
@ -249,6 +304,10 @@ func (ks *kube2sky) generateRecordsForPortalService(subdomain string, service *k
|
||||
return nil
|
||||
}
|
||||
|
||||
func santizeIP(ip string) string {
|
||||
return strings.Replace(ip, ".", "-", -1)
|
||||
}
|
||||
|
||||
func buildPortSegmentString(portName string, portProtocol kapi.Protocol) string {
|
||||
if portName == "" {
|
||||
// we don't create a random name
|
||||
@ -328,6 +387,11 @@ func createEndpointsLW(kubeClient *kclient.Client) *kcache.ListWatch {
|
||||
return kcache.NewListWatchFromClient(kubeClient, "endpoints", kapi.NamespaceAll, kSelector.Everything())
|
||||
}
|
||||
|
||||
// Returns a cache.ListWatch that gets all changes to pods.
|
||||
func createEndpointsPodLW(kubeClient *kclient.Client) *kcache.ListWatch {
|
||||
return kcache.NewListWatchFromClient(kubeClient, "pods", kapi.NamespaceAll, kSelector.Everything())
|
||||
}
|
||||
|
||||
func (ks *kube2sky) newService(obj interface{}) {
|
||||
if s, ok := obj.(*kapi.Service); ok {
|
||||
name := buildDNSNameString(ks.domain, serviceSubdomain, s.Namespace, s.Name)
|
||||
@ -411,6 +475,7 @@ func newKubeClient() (*kclient.Client, error) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if masterURL != "" && *argKubecfgFile == "" {
|
||||
// Only --kube_master_url was provided.
|
||||
config = &kclient.Config{
|
||||
@ -470,6 +535,24 @@ func watchEndpoints(kubeClient *kclient.Client, ks *kube2sky) kcache.Store {
|
||||
return eStore
|
||||
}
|
||||
|
||||
func watchPods(kubeClient *kclient.Client, ks *kube2sky) kcache.Store {
|
||||
eStore, eController := kframework.NewInformer(
|
||||
createEndpointsPodLW(kubeClient),
|
||||
&kapi.Pod{},
|
||||
resyncPeriod,
|
||||
kframework.ResourceEventHandlerFuncs{
|
||||
AddFunc: ks.handlePodCreate,
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
ks.handlePodUpdate(oldObj, newObj)
|
||||
},
|
||||
DeleteFunc: ks.handlePodDelete,
|
||||
},
|
||||
)
|
||||
|
||||
go eController.Run(util.NeverStop)
|
||||
return eStore
|
||||
}
|
||||
|
||||
func getHash(text string) string {
|
||||
h := fnv.New32a()
|
||||
h.Write([]byte(text))
|
||||
@ -499,6 +582,7 @@ func main() {
|
||||
|
||||
ks.endpointsStore = watchEndpoints(kubeClient, &ks)
|
||||
ks.servicesStore = watchForServices(kubeClient, &ks)
|
||||
ks.servicesStore = watchPods(kubeClient, &ks)
|
||||
|
||||
select {}
|
||||
}
|
||||
|
@ -82,6 +82,7 @@ const (
|
||||
testDomain = "cluster.local."
|
||||
basePath = "/skydns/local/cluster"
|
||||
serviceSubDomain = "svc"
|
||||
podSubDomain = "pod"
|
||||
)
|
||||
|
||||
func newKube2Sky(ec etcdClient) *kube2sky {
|
||||
@ -94,8 +95,8 @@ func newKube2Sky(ec etcdClient) *kube2sky {
|
||||
}
|
||||
}
|
||||
|
||||
func getEtcdPathForA(name, namespace string) string {
|
||||
return path.Join(basePath, serviceSubDomain, namespace, name)
|
||||
func getEtcdPathForA(name, namespace, subDomain string) string {
|
||||
return path.Join(basePath, subDomain, namespace, name)
|
||||
}
|
||||
|
||||
func getEtcdPathForSRV(portName, protocol, name, namespace string) string {
|
||||
@ -121,7 +122,7 @@ func getHostPortFromString(data string) (*hostPort, error) {
|
||||
}
|
||||
|
||||
func assertDnsServiceEntryInEtcd(t *testing.T, ec *fakeEtcdClient, serviceName, namespace string, expectedHostPort *hostPort) {
|
||||
key := getEtcdPathForA(serviceName, namespace)
|
||||
key := getEtcdPathForA(serviceName, namespace, serviceSubDomain)
|
||||
values := ec.Get(key)
|
||||
//require.True(t, exists)
|
||||
require.True(t, len(values) > 0, "entry not found.")
|
||||
@ -130,6 +131,20 @@ func assertDnsServiceEntryInEtcd(t *testing.T, ec *fakeEtcdClient, serviceName,
|
||||
assert.Equal(t, expectedHostPort.Host, actualHostPort.Host)
|
||||
}
|
||||
|
||||
func assertDnsPodEntryInEtcd(t *testing.T, ec *fakeEtcdClient, podIP, namespace string) {
|
||||
key := getEtcdPathForA(podIP, namespace, podSubDomain)
|
||||
values := ec.Get(key)
|
||||
//require.True(t, exists)
|
||||
require.True(t, len(values) > 0, "entry not found.")
|
||||
}
|
||||
|
||||
func assertDnsPodEntryNotInEtcd(t *testing.T, ec *fakeEtcdClient, podIP, namespace string) {
|
||||
key := getEtcdPathForA(podIP, namespace, podSubDomain)
|
||||
values := ec.Get(key)
|
||||
//require.True(t, exists)
|
||||
require.True(t, len(values) == 0, "entry found.")
|
||||
}
|
||||
|
||||
func assertSRVEntryInEtcd(t *testing.T, ec *fakeEtcdClient, portName, protocol, serviceName, namespace string, expectedPortNumber, expectedEntriesCount int) {
|
||||
srvKey := getEtcdPathForSRV(portName, protocol, serviceName, namespace)
|
||||
values := ec.Get(srvKey)
|
||||
@ -173,6 +188,20 @@ func newService(namespace, serviceName, clusterIP, portName string, portNumber i
|
||||
return service
|
||||
}
|
||||
|
||||
func newPod(namespace, podName, podIP string) kapi.Pod {
|
||||
pod := kapi.Pod{
|
||||
ObjectMeta: kapi.ObjectMeta{
|
||||
Name: podName,
|
||||
Namespace: namespace,
|
||||
},
|
||||
Status: kapi.PodStatus{
|
||||
PodIP: podIP,
|
||||
},
|
||||
}
|
||||
|
||||
return pod
|
||||
}
|
||||
|
||||
func newSubset() kapi.EndpointSubset {
|
||||
subset := kapi.EndpointSubset{
|
||||
Addresses: []kapi.EndpointAddress{},
|
||||
@ -393,3 +422,46 @@ func TestBuildDNSName(t *testing.T) {
|
||||
newExpectedDNSName := "00.name.ns.svc.cluster.local."
|
||||
assert.Equal(t, newExpectedDNSName, buildDNSNameString(expectedDNSName, "00"))
|
||||
}
|
||||
|
||||
func TestPodDns(t *testing.T) {
|
||||
const (
|
||||
testPodIP = "1.2.3.4"
|
||||
sanitizedPodIP = "1-2-3-4"
|
||||
testNamespace = "default"
|
||||
testPodName = "testPod"
|
||||
)
|
||||
ec := &fakeEtcdClient{make(map[string]string)}
|
||||
k2s := newKube2Sky(ec)
|
||||
|
||||
// create pod without ip address yet
|
||||
pod := newPod(testNamespace, testPodName, "")
|
||||
k2s.handlePodCreate(&pod)
|
||||
assert.Empty(t, ec.writes)
|
||||
|
||||
// create pod
|
||||
pod = newPod(testNamespace, testPodName, testPodIP)
|
||||
k2s.handlePodCreate(&pod)
|
||||
assertDnsPodEntryInEtcd(t, ec, sanitizedPodIP, testNamespace)
|
||||
|
||||
// update pod with same ip
|
||||
newPod := pod
|
||||
newPod.Status.PodIP = testPodIP
|
||||
k2s.handlePodUpdate(&pod, &newPod)
|
||||
assertDnsPodEntryInEtcd(t, ec, sanitizedPodIP, testNamespace)
|
||||
|
||||
// update pod with different ip's
|
||||
newPod = pod
|
||||
newPod.Status.PodIP = "4.3.2.1"
|
||||
k2s.handlePodUpdate(&pod, &newPod)
|
||||
assertDnsPodEntryInEtcd(t, ec, "4-3-2-1", testNamespace)
|
||||
assertDnsPodEntryNotInEtcd(t, ec, "1-2-3-4", testNamespace)
|
||||
|
||||
// Delete the pod
|
||||
k2s.handlePodDelete(&newPod)
|
||||
assert.Empty(t, ec.writes)
|
||||
}
|
||||
|
||||
func TestSanitizeIP(t *testing.T) {
|
||||
expectedIP := "1-2-3-4"
|
||||
assert.Equal(t, expectedIP, santizeIP("1.2.3.4"))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user