Merge pull request #11736 from thockin/cleanup-dns-legacy

Stop populating legacy DNS names
This commit is contained in:
Vish Kannan 2015-07-23 17:30:00 -07:00
commit 6306ad1bce
8 changed files with 48 additions and 86 deletions

View File

@ -111,13 +111,12 @@ func (ks *kube2sky) writeSkyRecord(subdomain string, data string) error {
}
// Generates skydns records for a headless service.
func (ks *kube2sky) newHeadlessService(subdomain string, service *kapi.Service, isNewStyleFormat bool) error {
func (ks *kube2sky) newHeadlessService(subdomain string, service *kapi.Service) error {
// Create an A record for every pod in the service.
// This record must be periodically updated.
// Format is as follows:
// For a service x, with pods a and b create DNS records,
// a.x.ns.domain. and, b.x.ns.domain.
// TODO: Handle multi-port services.
ks.mlock.Lock()
defer ks.mlock.Unlock()
key, err := kcache.MetaNamespaceKeyFunc(service)
@ -133,7 +132,7 @@ func (ks *kube2sky) newHeadlessService(subdomain string, service *kapi.Service,
return nil
}
if e, ok := e.(*kapi.Endpoints); ok {
return ks.generateRecordsForHeadlessService(subdomain, e, service, isNewStyleFormat)
return ks.generateRecordsForHeadlessService(subdomain, e, service)
}
return nil
}
@ -148,7 +147,7 @@ func getSkyMsg(ip string, port int) *skymsg.Service {
}
}
func (ks *kube2sky) generateRecordsForHeadlessService(subdomain string, e *kapi.Endpoints, svc *kapi.Service, isNewStyleFormat bool) error {
func (ks *kube2sky) generateRecordsForHeadlessService(subdomain string, e *kapi.Endpoints, svc *kapi.Service) error {
for idx := range e.Subsets {
for subIdx := range e.Subsets[idx].Addresses {
b, err := json.Marshal(getSkyMsg(e.Subsets[idx].Addresses[subIdx].IP, 0))
@ -163,15 +162,13 @@ func (ks *kube2sky) generateRecordsForHeadlessService(subdomain string, e *kapi.
if err := ks.writeSkyRecord(recordKey, recordValue); err != nil {
return err
}
if isNewStyleFormat {
for portIdx := range e.Subsets[idx].Ports {
endpointPort := &e.Subsets[idx].Ports[portIdx]
portSegment := buildPortSegmentString(endpointPort.Name, endpointPort.Protocol)
if portSegment != "" {
err := ks.generateSRVRecord(subdomain, portSegment, recordLabel, recordKey, endpointPort.Port)
if err != nil {
return err
}
for portIdx := range e.Subsets[idx].Ports {
endpointPort := &e.Subsets[idx].Ports[portIdx]
portSegment := buildPortSegmentString(endpointPort.Name, endpointPort.Protocol)
if portSegment != "" {
err := ks.generateSRVRecord(subdomain, portSegment, recordLabel, recordKey, endpointPort.Port)
if err != nil {
return err
}
}
}
@ -200,7 +197,7 @@ func (ks *kube2sky) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, e
return nil, fmt.Errorf("got a non service object in services store %v", obj)
}
func (ks *kube2sky) addDNSUsingEndpoints(subdomain string, e *kapi.Endpoints, isNewStyleFormat bool) error {
func (ks *kube2sky) addDNSUsingEndpoints(subdomain string, e *kapi.Endpoints) error {
ks.mlock.Lock()
defer ks.mlock.Unlock()
svc, err := ks.getServiceFromEndpoints(e)
@ -215,41 +212,29 @@ func (ks *kube2sky) addDNSUsingEndpoints(subdomain string, e *kapi.Endpoints, is
if err := ks.removeDNS(subdomain); err != nil {
return err
}
return ks.generateRecordsForHeadlessService(subdomain, e, svc, isNewStyleFormat)
return ks.generateRecordsForHeadlessService(subdomain, e, svc)
}
func (ks *kube2sky) handleEndpointAdd(obj interface{}) {
if e, ok := obj.(*kapi.Endpoints); ok {
name := buildDNSNameString(ks.domain, e.Namespace, e.Name)
ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e, false) })
name = buildDNSNameString(ks.domain, serviceSubdomain, e.Namespace, e.Name)
ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e, true) })
name := buildDNSNameString(ks.domain, serviceSubdomain, e.Namespace, e.Name)
ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e) })
}
}
func (ks *kube2sky) generateRecordsForPortalService(subdomain string, service *kapi.Service, isNewStyleFormat bool) error {
func (ks *kube2sky) generateRecordsForPortalService(subdomain string, service *kapi.Service) error {
b, err := json.Marshal(getSkyMsg(service.Spec.ClusterIP, 0))
if err != nil {
return err
}
recordValue := string(b)
recordKey := subdomain
recordLabel := ""
if isNewStyleFormat {
recordLabel = getHash(recordValue)
if err != nil {
return err
}
recordKey = buildDNSNameString(subdomain, recordLabel)
}
recordLabel := getHash(recordValue)
recordKey := buildDNSNameString(subdomain, recordLabel)
glog.V(2).Infof("Setting DNS record: %v -> %q, with recordKey: %v\n", subdomain, recordValue, recordKey)
if err := ks.writeSkyRecord(recordKey, recordValue); err != nil {
return err
}
if !isNewStyleFormat {
return nil
}
// Generate SRV Records
for i := range service.Spec.Ports {
port := &service.Spec.Ports[i]
@ -290,15 +275,15 @@ func (ks *kube2sky) generateSRVRecord(subdomain, portSegment, recordName, cName
return nil
}
func (ks *kube2sky) addDNS(subdomain string, service *kapi.Service, isNewStyleFormat bool) error {
func (ks *kube2sky) addDNS(subdomain string, service *kapi.Service) error {
if len(service.Spec.Ports) == 0 {
glog.Fatalf("unexpected service with no ports: %v", service)
}
// if ClusterIP is not set, a DNS entry should not be created
if !kapi.IsServiceIPSet(service) {
return ks.newHeadlessService(subdomain, service, isNewStyleFormat)
return ks.newHeadlessService(subdomain, service)
}
return ks.generateRecordsForPortalService(subdomain, service, isNewStyleFormat)
return ks.generateRecordsForPortalService(subdomain, service)
}
// Implements retry logic for arbitrary mutator. Crashes after retrying for
@ -345,19 +330,14 @@ func createEndpointsLW(kubeClient *kclient.Client) *kcache.ListWatch {
func (ks *kube2sky) newService(obj interface{}) {
if s, ok := obj.(*kapi.Service); ok {
//TODO(artfulcoder) stop adding and deleting old-format string for service
name := buildDNSNameString(ks.domain, s.Namespace, s.Name)
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s, false) })
name = buildDNSNameString(ks.domain, serviceSubdomain, s.Namespace, s.Name)
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s, true) })
name := buildDNSNameString(ks.domain, serviceSubdomain, s.Namespace, s.Name)
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) })
}
}
func (ks *kube2sky) removeService(obj interface{}) {
if s, ok := obj.(*kapi.Service); ok {
name := buildDNSNameString(ks.domain, s.Namespace, s.Name)
ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) })
name = buildDNSNameString(ks.domain, serviceSubdomain, s.Namespace, s.Name)
name := buildDNSNameString(ks.domain, serviceSubdomain, s.Namespace, s.Name)
ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) })
}
}

View File

@ -94,11 +94,7 @@ func newKube2Sky(ec etcdClient) *kube2sky {
}
}
func getEtcdOldStylePath(name, namespace string) string {
return path.Join(basePath, namespace, name)
}
func getEtcdNewStylePath(name, namespace string) string {
func getEtcdPathForA(name, namespace string) string {
return path.Join(basePath, serviceSubDomain, namespace, name)
}
@ -125,18 +121,11 @@ func getHostPortFromString(data string) (*hostPort, error) {
}
func assertDnsServiceEntryInEtcd(t *testing.T, ec *fakeEtcdClient, serviceName, namespace string, expectedHostPort *hostPort) {
oldStyleKey := getEtcdOldStylePath(serviceName, namespace)
values := ec.Get(oldStyleKey)
require.True(t, len(values) > 0, fmt.Sprintf("oldStyleKey '%s' not found.", oldStyleKey))
actualHostPort, err := getHostPortFromString(values[0])
require.NoError(t, err)
assert.Equal(t, expectedHostPort.Host, actualHostPort.Host)
newStyleKey := getEtcdNewStylePath(serviceName, namespace)
values = ec.Get(newStyleKey)
key := getEtcdPathForA(serviceName, namespace)
values := ec.Get(key)
//require.True(t, exists)
require.True(t, len(values) > 0, "newStyleKey entry not found.")
actualHostPort, err = getHostPortFromString(values[0])
require.True(t, len(values) > 0, "entry not found.")
actualHostPort, err := getHostPortFromString(values[0])
require.NoError(t, err)
assert.Equal(t, expectedHostPort.Host, actualHostPort.Host)
}
@ -230,9 +219,8 @@ func TestHeadlessService(t *testing.T) {
assert.NoError(t, k2s.servicesStore.Add(&service))
endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, "10.0.0.1", "10.0.0.2"), newSubsetWithOnePort("", 8080, "10.0.0.3", "10.0.0.4"))
// We expect 4 records with "svc" subdomain and 4 records without
// "svc" subdomain.
expectedDNSRecords := 8
// We expect 4 records.
expectedDNSRecords := 4
assert.NoError(t, k2s.endpointsStore.Add(&endpoints))
k2s.newService(&service)
assert.Equal(t, expectedDNSRecords, len(ec.writes))
@ -251,9 +239,8 @@ func TestHeadlessServiceWithNamedPorts(t *testing.T) {
assert.NoError(t, k2s.servicesStore.Add(&service))
endpoints := newEndpoints(service, newSubsetWithTwoPorts("http1", 80, "http2", 81, "10.0.0.1", "10.0.0.2"), newSubsetWithOnePort("https", 443, "10.0.0.3", "10.0.0.4"))
// We expect 14 records. 6 SRV records. 4 POD entries with old style, 4 POD entries with new style
// "svc" subdomain.
expectedDNSRecords := 14
// We expect 10 records. 6 SRV records. 4 POD records.
expectedDNSRecords := 10
assert.NoError(t, k2s.endpointsStore.Add(&endpoints))
k2s.newService(&service)
assert.Equal(t, expectedDNSRecords, len(ec.writes))
@ -263,8 +250,8 @@ func TestHeadlessServiceWithNamedPorts(t *testing.T) {
endpoints.Subsets = endpoints.Subsets[:1]
k2s.handleEndpointAdd(&endpoints)
// We expect 8 records. 4 SRV records. 2 POD entries with old style, 2 POD entries with new style
expectedDNSRecords = 8
// We expect 6 records. 4 SRV records. 2 POD records.
expectedDNSRecords = 6
assert.Equal(t, expectedDNSRecords, len(ec.writes))
assertSRVEntryInEtcd(t, ec, "http1", "tcp", testService, testNamespace, 80, 2)
assertSRVEntryInEtcd(t, ec, "http2", "tcp", testService, testNamespace, 81, 2)
@ -284,14 +271,14 @@ func TestHeadlessServiceEndpointsUpdate(t *testing.T) {
assert.NoError(t, k2s.servicesStore.Add(&service))
endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, "10.0.0.1", "10.0.0.2"))
expectedDNSRecords := 4
expectedDNSRecords := 2
assert.NoError(t, k2s.endpointsStore.Add(&endpoints))
k2s.newService(&service)
assert.Equal(t, expectedDNSRecords, len(ec.writes))
endpoints.Subsets = append(endpoints.Subsets,
newSubsetWithOnePort("", 8080, "10.0.0.3", "10.0.0.4"),
)
expectedDNSRecords = 8
expectedDNSRecords = 4
k2s.handleEndpointAdd(&endpoints)
assert.Equal(t, expectedDNSRecords, len(ec.writes))
@ -315,9 +302,8 @@ func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) {
// Add an endpoints object for the service.
endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, "10.0.0.1", "10.0.0.2"), newSubsetWithOnePort("", 8080, "10.0.0.3", "10.0.0.4"))
// We expect 4 records with "svc" subdomain and 4 records without
// "svc" subdomain.
expectedDNSRecords := 8
// We expect 4 records.
expectedDNSRecords := 4
k2s.handleEndpointAdd(&endpoints)
assert.Equal(t, expectedDNSRecords, len(ec.writes))
}
@ -347,7 +333,7 @@ func TestUpdateSinglePortService(t *testing.T) {
k2s := newKube2Sky(ec)
service := newService(testNamespace, testService, "1.2.3.4", "", 0)
k2s.newService(&service)
assert.Len(t, ec.writes, 2)
assert.Len(t, ec.writes, 1)
newService := service
newService.Spec.ClusterIP = "0.0.0.0"
k2s.updateService(&service, &newService)
@ -365,9 +351,7 @@ func TestDeleteSinglePortService(t *testing.T) {
service := newService(testNamespace, testService, "1.2.3.4", "", 80)
// Add the service
k2s.newService(&service)
// two entries should get created, one with the svc subdomain (new-style)
// , and one without the svc subdomain (old-style)
assert.Len(t, ec.writes, 2)
assert.Len(t, ec.writes, 1)
// Delete the service
k2s.removeService(&service)
assert.Empty(t, ec.writes)
@ -387,7 +371,7 @@ func TestServiceWithNamePort(t *testing.T) {
expectedValue := getHostPort(&service)
assertDnsServiceEntryInEtcd(t, ec, testService, testNamespace, expectedValue)
assertSRVEntryInEtcd(t, ec, "http1", "tcp", testService, testNamespace, 80, 1)
assert.Len(t, ec.writes, 3)
assert.Len(t, ec.writes, 2)
// update service
newService := service
@ -396,7 +380,7 @@ func TestServiceWithNamePort(t *testing.T) {
expectedValue = getHostPort(&newService)
assertDnsServiceEntryInEtcd(t, ec, testService, testNamespace, expectedValue)
assertSRVEntryInEtcd(t, ec, "http2", "tcp", testService, testNamespace, 80, 1)
assert.Len(t, ec.writes, 3)
assert.Len(t, ec.writes, 2)
// Delete the service
k2s.removeService(&service)

View File

@ -28,7 +28,7 @@ kube_cert_group: kube-cert
# Internal DNS domain name.
# This domain must not be used in your network. Services will be discoverable
# under <service-name>.<namespace>.<domainname>, e.g.
# myservice.default.cluster.local
# myservice.default.svc.cluster.local
dns_domain: "{{ cluster_name }}"
# IP address of the DNS server.

View File

@ -100,7 +100,7 @@ public class KubernetesSeedProvider implements SeedProvider {
public List<InetAddress> getSeeds() {
List<InetAddress> list = new ArrayList<InetAddress>();
String host = "https://kubernetes.default.cluster.local";
String host = "https://kubernetes.default.svc.cluster.local";
String serviceName = getEnvOrDefault("CASSANDRA_SERVICE", "cassandra");
String podNamespace = getEnvOrDefault("POD_NAMESPACE", "default");
String path = String.format("/api/v1/namespaces/%s/endpoints/", podNamespace);

View File

@ -171,7 +171,7 @@ $ kubectl logs dns-frontend
#### Note about default namespace
If you prefer not using namespace, then all your services can be addressed using `default` namespace, e.g. `http://dns-backend.default.cluster.local:8000`, or shorthand version `http://dns-backend:8000`
If you prefer not using namespace, then all your services can be addressed using `default` namespace, e.g. `http://dns-backend.default.svc.cluster.local:8000`, or shorthand version `http://dns-backend:8000`
### tl; dr;

View File

@ -139,8 +139,8 @@ Result: ([]string)<nil>
Error: &lt;*&gt;lookup elasticsearch-logging: no such host
LookupSRV(&#34;&#34;, &#34;&#34;, elasticsearch-logging):
cname: elasticsearch-logging.default.cluster.local.
Result: ([]*net.SRV)[&lt;*&gt;{Target:(string)elasticsearch-logging.default.cluster.local. Port:(uint16)9200 Priority:(uint16)10 Weight:(uint16)100}]
cname: elasticsearch-logging.default.svc.cluster.local.
Result: ([]*net.SRV)[&lt;*&gt;{Target:(string)elasticsearch-logging.default.svc.cluster.local. Port:(uint16)9200 Priority:(uint16)10 Weight:(uint16)100}]
Error: <nil>
LookupHost(elasticsearch-logging):

View File

@ -18,7 +18,7 @@
# should only send updates if something changes. We should be able to do
# this by comparing pod creation time with the last scan time.
while true; do
hostport="https://kubernetes.default.cluster.local"
hostport="https://kubernetes.default.svc.cluster.local"
token=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)
path="api/v1/pods"
query="labels=$SELECTOR"

View File

@ -201,12 +201,10 @@ var _ = Describe("DNS", func() {
// All the names we need to be able to resolve.
// TODO: Spin up a separate test service and test that dns works for that service.
// TODO: Should these be changed to kubernetes.kube-system etc. ?
namesToResolve := []string{
"kubernetes.default",
"kubernetes.default.svc",
"kubernetes.default.svc.cluster.local",
"kubernetes.default.cluster.local",
"google.com",
}
// Added due to #8512. This is critical for GCE and GKE deployments.