mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
1. ensure dns record when ingress ip is assigned after ready address creation
2. ensure dns record removal when service being removed
This commit is contained in:
parent
72a0806103
commit
dd78dd8e2b
@ -36,7 +36,7 @@ func (rrsets ResourceRecordSets) List() ([]dnsprovider.ResourceRecordSet, error)
|
|||||||
}
|
}
|
||||||
list := make([]dnsprovider.ResourceRecordSet, len(response.Rrsets()))
|
list := make([]dnsprovider.ResourceRecordSet, len(response.Rrsets()))
|
||||||
for i, rrset := range response.Rrsets() {
|
for i, rrset := range response.Rrsets() {
|
||||||
list[i] = &ResourceRecordSet{rrset, &rrsets}
|
list[i] = ResourceRecordSet{rrset, &rrsets}
|
||||||
}
|
}
|
||||||
return list, nil
|
return list, nil
|
||||||
}
|
}
|
||||||
@ -58,7 +58,7 @@ func (rrsets ResourceRecordSets) Add(rrset dnsprovider.ResourceRecordSet) (dnspr
|
|||||||
|
|
||||||
func (rrsets ResourceRecordSets) Remove(rrset dnsprovider.ResourceRecordSet) error {
|
func (rrsets ResourceRecordSets) Remove(rrset dnsprovider.ResourceRecordSet) error {
|
||||||
service := rrsets.zone.zones.interface_.service.Changes()
|
service := rrsets.zone.zones.interface_.service.Changes()
|
||||||
deletions := []interfaces.ResourceRecordSet{rrset.(*ResourceRecordSet).impl}
|
deletions := []interfaces.ResourceRecordSet{rrset.(ResourceRecordSet).impl}
|
||||||
change := service.NewChange([]interfaces.ResourceRecordSet{}, deletions)
|
change := service.NewChange([]interfaces.ResourceRecordSet{}, deletions)
|
||||||
newChange, err := service.Create(rrsets.project(), rrsets.zone.impl.Name(), change).Do()
|
newChange, err := service.Create(rrsets.project(), rrsets.zone.impl.Name(), change).Do()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -146,7 +146,7 @@ func getResolvedEndpoints(endpoints []string) ([]string, error) {
|
|||||||
/* ensureDnsRrsets ensures (idempotently, and with minimum mutations) that all of the DNS resource record sets for dnsName are consistent with endpoints.
|
/* ensureDnsRrsets ensures (idempotently, and with minimum mutations) that all of the DNS resource record sets for dnsName are consistent with endpoints.
|
||||||
if endpoints is nil or empty, a CNAME record to uplevelCname is ensured.
|
if endpoints is nil or empty, a CNAME record to uplevelCname is ensured.
|
||||||
*/
|
*/
|
||||||
func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoints []string, uplevelCname string) error {
|
func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoints []string, uplevelCname string, endpointReachable bool) error {
|
||||||
dnsZone, err := getDnsZone(dnsZoneName, s.dnsZones)
|
dnsZone, err := getDnsZone(dnsZoneName, s.dnsZones)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -160,30 +160,32 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if rrset == nil {
|
if rrset == nil {
|
||||||
// It doesn't exist yet, so create it, if we indeed have healthy endpoints
|
if endpointReachable {
|
||||||
if len(endpoints) < 1 {
|
// It doesn't exist yet, so create it, if we indeed have healthy endpoints
|
||||||
// There are no endpoint addresses at this level, so CNAME to uplevel, if provided
|
if len(endpoints) < 1 {
|
||||||
if uplevelCname != "" {
|
// There are no endpoint addresses at this level, so CNAME to uplevel, if provided
|
||||||
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
|
if uplevelCname != "" {
|
||||||
|
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
|
||||||
|
rrset, err = rrsets.Add(newRrset)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// else we want no record, and we have no record, so we're all good.
|
||||||
|
} else {
|
||||||
|
// We have valid endpoint addresses, so just add them as A records.
|
||||||
|
// But first resolve DNS names, as some cloud providers (like AWS) expose
|
||||||
|
// load balancers behind DNS names, not IP addresses.
|
||||||
|
resolvedEndpoints, err := getResolvedEndpoints(endpoints)
|
||||||
|
if err != nil {
|
||||||
|
return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve.
|
||||||
|
}
|
||||||
|
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
|
||||||
rrset, err = rrsets.Add(newRrset)
|
rrset, err = rrsets.Add(newRrset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// else we want no record, and we have no record, so we're all good.
|
|
||||||
} else {
|
|
||||||
// We have valid endpoint addresses, so just add them as A records.
|
|
||||||
// But first resolve DNS names, as some cloud providers (like AWS) expose
|
|
||||||
// load balancers behind DNS names, not IP addresses.
|
|
||||||
resolvedEndpoints, err := getResolvedEndpoints(endpoints)
|
|
||||||
if err != nil {
|
|
||||||
return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve.
|
|
||||||
}
|
|
||||||
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
|
|
||||||
rrset, err = rrsets.Add(newRrset)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// the rrset already exists, so make it right.
|
// the rrset already exists, so make it right.
|
||||||
@ -191,6 +193,11 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
|
|||||||
// Need an appropriate CNAME record. Check that we have it.
|
// Need an appropriate CNAME record. Check that we have it.
|
||||||
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
|
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
|
||||||
if rrset == newRrset {
|
if rrset == newRrset {
|
||||||
|
if !endpointReachable {
|
||||||
|
if err = rrsets.Remove(rrset); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
// The existing rrset is equal to the required one - our work is done here
|
// The existing rrset is equal to the required one - our work is done here
|
||||||
return nil
|
return nil
|
||||||
} else {
|
} else {
|
||||||
@ -199,7 +206,7 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
|
|||||||
if err = rrsets.Remove(rrset); err != nil {
|
if err = rrsets.Remove(rrset); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if uplevelCname != "" {
|
if uplevelCname != "" && endpointReachable {
|
||||||
if _, err = rrsets.Add(newRrset); err != nil {
|
if _, err = rrsets.Add(newRrset); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -214,6 +221,11 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
|
|||||||
}
|
}
|
||||||
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
|
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
|
||||||
if rrset == newRrset {
|
if rrset == newRrset {
|
||||||
|
if !endpointReachable {
|
||||||
|
if err = rrsets.Remove(rrset); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
// The existing rrset is equal to the required one - our work is done here
|
// The existing rrset is equal to the required one - our work is done here
|
||||||
// TODO: We could be more thorough about checking for equivalence to avoid unnecessary updates, but in the
|
// TODO: We could be more thorough about checking for equivalence to avoid unnecessary updates, but in the
|
||||||
// worst case we'll just replace what's there with an equivalent, if not exactly identical record set.
|
// worst case we'll just replace what's there with an equivalent, if not exactly identical record set.
|
||||||
@ -224,8 +236,10 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
|
|||||||
if err = rrsets.Remove(rrset); err != nil {
|
if err = rrsets.Remove(rrset); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if _, err = rrsets.Add(newRrset); err != nil {
|
if endpointReachable {
|
||||||
return err
|
if _, err = rrsets.Add(newRrset); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -279,7 +293,7 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
_, endpointReachable := cachedService.endpointMap[clusterName]
|
||||||
commonPrefix := serviceName + "." + namespaceName + "." + s.federationName + ".svc"
|
commonPrefix := serviceName + "." + namespaceName + "." + s.federationName + ".svc"
|
||||||
// dnsNames is the path up the DNS search tree, starting at the leaf
|
// dnsNames is the path up the DNS search tree, starting at the leaf
|
||||||
dnsNames := []string{
|
dnsNames := []string{
|
||||||
@ -292,7 +306,7 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, cachedService *
|
|||||||
endpoints := [][]string{zoneEndpoints, regionEndpoints, globalEndpoints}
|
endpoints := [][]string{zoneEndpoints, regionEndpoints, globalEndpoints}
|
||||||
|
|
||||||
for i, endpoint := range endpoints {
|
for i, endpoint := range endpoints {
|
||||||
if err = s.ensureDnsRrsets(dnsZoneName, dnsNames[i], endpoint, dnsNames[i+1]); err != nil {
|
if err = s.ensureDnsRrsets(dnsZoneName, dnsNames[i], endpoint, dnsNames[i+1], endpointReachable); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -93,7 +93,7 @@ func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedService, clusterName string, serviceController *ServiceController) error {
|
func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedService, clusterName string, serviceController *ServiceController) error {
|
||||||
glog.V(4).Infof("Processing endpoint update for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
|
glog.V(4).Infof("Processing endpoint deletion for %s/%s, cluster %s", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
|
||||||
var err error
|
var err error
|
||||||
cachedService.rwlock.Lock()
|
cachedService.rwlock.Lock()
|
||||||
defer cachedService.rwlock.Unlock()
|
defer cachedService.rwlock.Unlock()
|
||||||
@ -102,10 +102,10 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi
|
|||||||
// need to query dns info from dnsprovider and make sure of if deletion is needed
|
// need to query dns info from dnsprovider and make sure of if deletion is needed
|
||||||
if ok {
|
if ok {
|
||||||
// endpoints lost, clean dns record
|
// endpoints lost, clean dns record
|
||||||
glog.V(4).Infof("Cached endpoint was not found for %s/%s, cluster %s, building one", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
|
glog.V(4).Infof("Cached endpoint was found for %s/%s, cluster %s, removing", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
|
||||||
|
delete(cachedService.endpointMap, clusterName)
|
||||||
for i := 0; i < clientRetryCount; i++ {
|
for i := 0; i < clientRetryCount; i++ {
|
||||||
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err == nil {
|
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err == nil {
|
||||||
delete(cachedService.endpointMap, clusterName)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
|
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
|
||||||
|
@ -29,9 +29,11 @@ var fakeDns, _ = clouddns.NewFakeInterface() // No need to check for unsupported
|
|||||||
var fakeDnsZones, _ = fakeDns.Zones()
|
var fakeDnsZones, _ = fakeDns.Zones()
|
||||||
|
|
||||||
var fakeServiceController = ServiceController{
|
var fakeServiceController = ServiceController{
|
||||||
dns: fakeDns,
|
dns: fakeDns,
|
||||||
dnsZones: fakeDnsZones,
|
dnsZones: fakeDnsZones,
|
||||||
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
|
federationName: "fed1",
|
||||||
|
zoneName: "example.com",
|
||||||
|
serviceCache: &serviceCache{fedServiceMap: make(map[string]*cachedService)},
|
||||||
clusterCache: &clusterClientCache{
|
clusterCache: &clusterClientCache{
|
||||||
clientMap: make(map[string]*clusterCache),
|
clientMap: make(map[string]*clusterCache),
|
||||||
},
|
},
|
||||||
@ -52,8 +54,18 @@ func buildEndpoint(subsets [][]string) *v1.Endpoints {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestProcessEndpointUpdate(t *testing.T) {
|
func TestProcessEndpointUpdate(t *testing.T) {
|
||||||
|
clusterName := "foo"
|
||||||
cc := clusterClientCache{
|
cc := clusterClientCache{
|
||||||
clientMap: make(map[string]*clusterCache),
|
clientMap: map[string]*clusterCache{
|
||||||
|
clusterName: {
|
||||||
|
cluster: &v1alpha1.Cluster{
|
||||||
|
Status: v1alpha1.ClusterStatus{
|
||||||
|
Zones: []string{"foozone"},
|
||||||
|
Region: "fooregion",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
@ -69,7 +81,7 @@ func TestProcessEndpointUpdate(t *testing.T) {
|
|||||||
endpointMap: make(map[string]int),
|
endpointMap: make(map[string]int),
|
||||||
},
|
},
|
||||||
buildEndpoint([][]string{{"ip1", ""}}),
|
buildEndpoint([][]string{{"ip1", ""}}),
|
||||||
"foo",
|
clusterName,
|
||||||
1,
|
1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -81,11 +93,11 @@ func TestProcessEndpointUpdate(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
buildEndpoint([][]string{{"ip1", ""}}),
|
buildEndpoint([][]string{{"ip1", ""}}),
|
||||||
"foo",
|
clusterName,
|
||||||
1,
|
1,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
fakeServiceController.clusterCache = &cc
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
cc.processEndpointUpdate(test.cachedService, test.endpoint, test.clusterName, &fakeServiceController)
|
cc.processEndpointUpdate(test.cachedService, test.endpoint, test.clusterName, &fakeServiceController)
|
||||||
if test.expectResult != test.cachedService.endpointMap[test.clusterName] {
|
if test.expectResult != test.cachedService.endpointMap[test.clusterName] {
|
||||||
|
@ -44,7 +44,7 @@ func (sc *ServiceController) clusterServiceWorker() {
|
|||||||
if quit {
|
if quit {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient)
|
err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to sync service: %+v", err)
|
glog.Errorf("Failed to sync service: %+v", err)
|
||||||
}
|
}
|
||||||
@ -55,7 +55,7 @@ func (sc *ServiceController) clusterServiceWorker() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Whenever there is change on service, the federation service should be updated
|
// Whenever there is change on service, the federation service should be updated
|
||||||
func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federation_release_1_3.Interface) error {
|
func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache *clusterCache, serviceCache *serviceCache, fedClient federation_release_1_3.Interface, sc *ServiceController) error {
|
||||||
// obj holds the latest service info from apiserver, return if there is no federation cache for the service
|
// obj holds the latest service info from apiserver, return if there is no federation cache for the service
|
||||||
cachedService, ok := serviceCache.get(key)
|
cachedService, ok := serviceCache.get(key)
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -88,6 +88,15 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache
|
|||||||
}
|
}
|
||||||
|
|
||||||
if needUpdate {
|
if needUpdate {
|
||||||
|
for i := 0; i < clientRetryCount; i++ {
|
||||||
|
if err := sc.ensureDnsRecords(clusterName, cachedService); err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
glog.V(4).Infof("Error ensuring DNS Records for service %s on cluster %s: %v", key, clusterName, err)
|
||||||
|
time.Sleep(cachedService.nextDNSUpdateDelay())
|
||||||
|
clusterCache.serviceQueue.Add(key)
|
||||||
|
// did not retry here as we still want to persist federation apiserver even ensure dns records fails
|
||||||
|
}
|
||||||
err := cc.persistFedServiceUpdate(cachedService, fedClient)
|
err := cc.persistFedServiceUpdate(cachedService, fedClient)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
cachedService.appliedState = cachedService.lastState
|
cachedService.appliedState = cachedService.lastState
|
||||||
@ -219,7 +228,13 @@ func (cc *clusterClientCache) persistFedServiceUpdate(cachedService *cachedServi
|
|||||||
glog.V(5).Infof("Persist federation service status %s/%s", service.Namespace, service.Name)
|
glog.V(5).Infof("Persist federation service status %s/%s", service.Namespace, service.Name)
|
||||||
var err error
|
var err error
|
||||||
for i := 0; i < clientRetryCount; i++ {
|
for i := 0; i < clientRetryCount; i++ {
|
||||||
_, err := fedClient.Core().Services(service.Namespace).UpdateStatus(service)
|
_, err := fedClient.Core().Services(service.Namespace).Get(service.Name)
|
||||||
|
if errors.IsNotFound(err) {
|
||||||
|
glog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
|
||||||
|
service.Namespace, service.Name, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
_, err = fedClient.Core().Services(service.Namespace).UpdateStatus(service)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
glog.V(2).Infof("Successfully update service %s/%s to federation apiserver", service.Namespace, service.Name)
|
glog.V(2).Infof("Successfully update service %s/%s to federation apiserver", service.Namespace, service.Name)
|
||||||
return nil
|
return nil
|
||||||
|
@ -360,6 +360,7 @@ func (s *ServiceController) deleteClusterService(clusterName string, cachedServi
|
|||||||
err = clientset.Core().Services(service.Namespace).Delete(service.Name, &api.DeleteOptions{})
|
err = clientset.Core().Services(service.Namespace).Delete(service.Name, &api.DeleteOptions{})
|
||||||
if err == nil || errors.IsNotFound(err) {
|
if err == nil || errors.IsNotFound(err) {
|
||||||
glog.V(4).Infof("Service %s/%s deleted from cluster %s", service.Namespace, service.Name, clusterName)
|
glog.V(4).Infof("Service %s/%s deleted from cluster %s", service.Namespace, service.Name, clusterName)
|
||||||
|
delete(cachedService.endpointMap, clusterName)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
time.Sleep(cachedService.nextRetryDelay())
|
time.Sleep(cachedService.nextRetryDelay())
|
||||||
|
Loading…
Reference in New Issue
Block a user