Merge pull request #28668 from justinsb/update_dns

Automatic merge from submit-queue

Expose Changesets (transactions) in dns provider

This makes the dnsprovider usable in more scenarios, and it also solves
some TODOs in the federation code.

WIP  - I'm going to test this to make sure this both works and is sufficient for use in my dns controller, but I wanted to put some code behind the talk in #28477.  cc @quinton-hoole 

Issue #28477
This commit is contained in:
k8s-merge-robot 2016-07-22 16:25:22 -07:00 committed by GitHub
commit eaf3d3f708
14 changed files with 521 additions and 207 deletions

View File

@ -50,14 +50,22 @@ type Zone interface {
type ResourceRecordSets interface {
// List returns the ResourceRecordSets of the Zone, or an error if the list operation failed.
List() ([]ResourceRecordSet, error)
// Add adds and returns a ResourceRecordSet of the Zone, or an error if the add operation failed.
Add(ResourceRecordSet) (ResourceRecordSet, error)
// Remove removes a ResourceRecordSet from the Zone, or an error if the remove operation failed.
// The supplied ResourceRecordSet must match one of the existing zones (obtained via List()) exactly.
Remove(ResourceRecordSet) error
// New allocates a new ResourceRecordSet, which can then be passed to Add() or Remove()
// New allocates a new ResourceRecordSet, which can then be passed to ResourceRecordChangeset Add() or Remove()
// Arguments are as per the ResourceRecordSet interface below.
New(name string, rrdatas []string, ttl int64, rrstype rrstype.RrsType) ResourceRecordSet
// StartChangeset begins a new batch operation of changes against the Zone
StartChangeset() ResourceRecordChangeset
}
// ResourceRecordChangeset accumulates a set of changes, that can then be applied with Apply
type ResourceRecordChangeset interface {
// Add adds the creation of a ResourceRecordSet in the Zone to the changeset
Add(ResourceRecordSet) ResourceRecordChangeset
// Remove adds the removal of a ResourceRecordSet in the Zone to the changeset
// The supplied ResourceRecordSet must match one of the existing recordsets (obtained via List()) exactly.
Remove(ResourceRecordSet) ResourceRecordChangeset
// Apply applies the accumulated operations to the Zone.
Apply() error
}
type ResourceRecordSet interface {

View File

@ -28,6 +28,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/route53"
"k8s.io/kubernetes/federation/pkg/dnsprovider/tests"
)
func newTestInterface() (dnsprovider.Interface, error) {
@ -133,12 +134,11 @@ func getInvalidRrs(zone dnsprovider.Zone) dnsprovider.ResourceRecordSet {
return rrsets.New("www12."+zone.Name(), []string{"rubbish", "rubbish"}, 180, rrstype.A)
}
func addRrsetOrFail(t *testing.T, rrsets dnsprovider.ResourceRecordSets, rrset dnsprovider.ResourceRecordSet) dnsprovider.ResourceRecordSet {
result, err := rrsets.Add(rrset)
func addRrsetOrFail(t *testing.T, rrsets dnsprovider.ResourceRecordSets, rrset dnsprovider.ResourceRecordSet) {
err := rrsets.StartChangeset().Add(rrset).Apply()
if err != nil {
t.Fatalf("Failed to add recordsets: %v", err)
}
return result
}
/* TestResourceRecordSetsList verifies that listing of zones succeeds */
@ -177,8 +177,9 @@ func TestResourceRecordSetsList(t *testing.T) {
func TestResourceRecordSetsAddSuccess(t *testing.T) {
zone := firstZone(t)
sets := rrs(t, zone)
set := addRrsetOrFail(t, sets, getExampleRrs(zone))
defer sets.Remove(set)
set := getExampleRrs(zone)
addRrsetOrFail(t, sets, set)
defer sets.StartChangeset().Remove(set).Apply()
t.Logf("Successfully added resource record set: %v", set)
}
@ -187,9 +188,9 @@ func TestResourceRecordSetsAdditionVisible(t *testing.T) {
zone := firstZone(t)
sets := rrs(t, zone)
rrset := getExampleRrs(zone)
set := addRrsetOrFail(t, sets, rrset)
defer sets.Remove(set)
t.Logf("Successfully added resource record set: %v", set)
addRrsetOrFail(t, sets, rrset)
defer sets.StartChangeset().Remove(rrset).Apply()
t.Logf("Successfully added resource record set: %v", rrset)
found := false
for _, record := range listRrsOrFail(t, sets) {
if record.Name() == rrset.Name() {
@ -207,16 +208,16 @@ func TestResourceRecordSetsAddDuplicateFail(t *testing.T) {
zone := firstZone(t)
sets := rrs(t, zone)
rrset := getExampleRrs(zone)
set := addRrsetOrFail(t, sets, rrset)
defer sets.Remove(set)
t.Logf("Successfully added resource record set: %v", set)
addRrsetOrFail(t, sets, rrset)
defer sets.StartChangeset().Remove(rrset).Apply()
t.Logf("Successfully added resource record set: %v", rrset)
// Try to add it again, and verify that the call fails.
rrs, err := sets.Add(rrset)
err := sets.StartChangeset().Add(rrset).Apply()
if err == nil {
defer sets.Remove(rrs)
t.Errorf("Should have failed to add duplicate resource record %v, but succeeded instead.", set)
defer sets.StartChangeset().Remove(rrset).Apply()
t.Errorf("Should have failed to add duplicate resource record %v, but succeeded instead.", rrset)
} else {
t.Logf("Correctly failed to add duplicate resource record %v: %v", set, err)
t.Logf("Correctly failed to add duplicate resource record %v: %v", rrset, err)
}
}
@ -225,14 +226,14 @@ func TestResourceRecordSetsRemove(t *testing.T) {
zone := firstZone(t)
sets := rrs(t, zone)
rrset := getExampleRrs(zone)
set := addRrsetOrFail(t, sets, rrset)
err := sets.Remove(set)
addRrsetOrFail(t, sets, rrset)
err := sets.StartChangeset().Remove(rrset).Apply()
if err != nil {
// Try again to clean up.
defer sets.Remove(rrset)
defer sets.StartChangeset().Remove(rrset).Apply()
t.Errorf("Failed to remove resource record set %v after adding", rrset)
} else {
t.Logf("Successfully removed resource set %v after adding", set)
t.Logf("Successfully removed resource set %v after adding", rrset)
}
}
@ -241,14 +242,14 @@ func TestResourceRecordSetsRemoveGone(t *testing.T) {
zone := firstZone(t)
sets := rrs(t, zone)
rrset := getExampleRrs(zone)
set := addRrsetOrFail(t, sets, rrset)
err := sets.Remove(set)
addRrsetOrFail(t, sets, rrset)
err := sets.StartChangeset().Remove(rrset).Apply()
if err != nil {
// Try again to clean up.
defer sets.Remove(rrset)
defer sets.StartChangeset().Remove(rrset).Apply()
t.Errorf("Failed to remove resource record set %v after adding", rrset)
} else {
t.Logf("Successfully removed resource set %v after adding", set)
t.Logf("Successfully removed resource set %v after adding", rrset)
}
// Check that it's gone
list := listRrsOrFail(t, sets)
@ -263,3 +264,21 @@ func TestResourceRecordSetsRemoveGone(t *testing.T) {
t.Errorf("Deleted resource record set %v is still present", rrset)
}
}
/* TestResourceRecordSetsReplace verifies that replacing an RRS works */
func TestResourceRecordSetsReplace(t *testing.T) {
zone := firstZone(t)
tests.CommonTestResourceRecordSetsReplace(t, zone)
}
/* TestResourceRecordSetsReplaceAll verifies that we can remove an RRS and create one with a different name*/
func TestResourceRecordSetsReplaceAll(t *testing.T) {
zone := firstZone(t)
tests.CommonTestResourceRecordSetsReplaceAll(t, zone)
}
/* TestResourceRecordSetsHonorsType verifies that we can add records of the same name but different types */
func TestResourceRecordSetsDifferentTypes(t *testing.T) {
zone := firstZone(t)
tests.CommonTestResourceRecordSetsDifferentTypes(t, zone)
}

View File

@ -0,0 +1,101 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package route53
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/route53"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
)
// Compile time check for interface adherence
var _ dnsprovider.ResourceRecordChangeset = &ResourceRecordChangeset{}
type ResourceRecordChangeset struct {
zone *Zone
rrsets *ResourceRecordSets
additions []dnsprovider.ResourceRecordSet
removals []dnsprovider.ResourceRecordSet
}
func (c *ResourceRecordChangeset) Add(rrset dnsprovider.ResourceRecordSet) dnsprovider.ResourceRecordChangeset {
c.additions = append(c.additions, rrset)
return c
}
func (c *ResourceRecordChangeset) Remove(rrset dnsprovider.ResourceRecordSet) dnsprovider.ResourceRecordChangeset {
c.removals = append(c.removals, rrset)
return c
}
// buildChange converts a dnsprovider.ResourceRecordSet to a route53.Change request
func buildChange(action string, rrs dnsprovider.ResourceRecordSet) *route53.Change {
change := &route53.Change{
Action: aws.String(action),
ResourceRecordSet: &route53.ResourceRecordSet{
Name: aws.String(rrs.Name()),
Type: aws.String(string(rrs.Type())),
TTL: aws.Int64(rrs.Ttl()),
},
}
for _, rrdata := range rrs.Rrdatas() {
rr := &route53.ResourceRecord{
Value: aws.String(rrdata),
}
change.ResourceRecordSet.ResourceRecords = append(change.ResourceRecordSet.ResourceRecords, rr)
}
return change
}
func (c *ResourceRecordChangeset) Apply() error {
hostedZoneID := c.zone.impl.Id
var changes []*route53.Change
for _, removal := range c.removals {
change := buildChange(route53.ChangeActionDelete, removal)
changes = append(changes, change)
}
for _, addition := range c.additions {
change := buildChange(route53.ChangeActionCreate, addition)
changes = append(changes, change)
}
if len(changes) == 0 {
return nil
}
service := c.zone.zones.interface_.service
request := &route53.ChangeResourceRecordSetsInput{
ChangeBatch: &route53.ChangeBatch{
Changes: changes,
},
HostedZoneId: hostedZoneID,
}
_, err := service.ChangeResourceRecordSets(request)
if err != nil {
// Cast err to awserr.Error to get the Code and
// Message from an error.
return err
}
return nil
}

View File

@ -23,7 +23,7 @@ import (
"github.com/aws/aws-sdk-go/service/route53"
)
// Compile time check for interface adeherence
// Compile time check for interface adherence
var _ dnsprovider.ResourceRecordSet = ResourceRecordSet{}
type ResourceRecordSet struct {

View File

@ -23,7 +23,7 @@ import (
"k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype"
)
// Compile time check for interface adeherence
// Compile time check for interface adherence
var _ dnsprovider.ResourceRecordSets = ResourceRecordSets{}
type ResourceRecordSets struct {
@ -48,91 +48,28 @@ func (rrsets ResourceRecordSets) List() ([]dnsprovider.ResourceRecordSet, error)
return list, nil
}
func (rrsets ResourceRecordSets) Add(rrset dnsprovider.ResourceRecordSet) (dnsprovider.ResourceRecordSet, error) {
service := rrsets.zone.zones.interface_.service
input := getChangeResourceRecordSetsInput("CREATE", rrset.Name(), string(rrset.Type()), *rrset.(ResourceRecordSet).rrsets.zone.impl.Id, rrset.Rrdatas(), rrset.Ttl())
_, err := service.ChangeResourceRecordSets(input)
if err != nil {
// Cast err to awserr.Error to get the Code and
// Message from an error.
return nil, err
func (r ResourceRecordSets) StartChangeset() dnsprovider.ResourceRecordChangeset {
return &ResourceRecordChangeset{
zone: r.zone,
rrsets: &r,
}
return ResourceRecordSet{input.ChangeBatch.Changes[0].ResourceRecordSet, &rrsets}, nil
}
func (rrsets ResourceRecordSets) Remove(rrset dnsprovider.ResourceRecordSet) error {
input := getChangeResourceRecordSetsInput("DELETE", rrset.Name(), string(rrset.Type()), *rrset.(ResourceRecordSet).rrsets.zone.impl.Id, rrset.Rrdatas(), rrset.Ttl())
_, err := rrsets.zone.zones.interface_.service.ChangeResourceRecordSets(input)
if err != nil {
// Cast err to awserr.Error to get the Code and
// Message from an error.
return err
}
return nil
}
func getChangeResourceRecordSetsInput(action, name, type_, hostedZoneId string, rrdatas []string, ttl int64) *route53.ChangeResourceRecordSetsInput {
input := &route53.ChangeResourceRecordSetsInput{
ChangeBatch: &route53.ChangeBatch{ // Required
Changes: []*route53.Change{ // Required
{ // Required
Action: aws.String(action), // Required
ResourceRecordSet: &route53.ResourceRecordSet{ // Required
Name: aws.String(name), // Required
Type: aws.String(type_), // Required
/*
AliasTarget: &route53.AliasTarget{
DNSName: aws.String("DNSName"), // Required
EvaluateTargetHealth: aws.Bool(true), // Required
HostedZoneId: aws.String("ResourceId"), // Required
},
Failover: aws.String("ResourceRecordSetFailover"),
GeoLocation: &route53.GeoLocation{
ContinentCode: aws.String("GeoLocationContinentCode"),
CountryCode: aws.String("GeoLocationCountryCode"),
SubdivisionCode: aws.String("GeoLocationSubdivisionCode"),
},
HealthCheckId: aws.String("HealthCheckId"),
Region: aws.String("ResourceRecordSetRegion"),
*/
ResourceRecords: []*route53.ResourceRecord{
{ // Required
Value: aws.String(rrdatas[0]), // Required
},
// More values...
},
/*
SetIdentifier: aws.String("ResourceRecordSetIdentifier"),
*/
TTL: aws.Int64(ttl),
/*
TrafficPolicyInstanceId: aws.String("TrafficPolicyInstanceId"),
Weight: aws.Int64(1),
*/
},
},
// More values...
},
},
HostedZoneId: aws.String(hostedZoneId), // Required
}
return input
}
func (rrsets ResourceRecordSets) New(name string, rrdatas []string, ttl int64, rrstype rrstype.RrsType) dnsprovider.ResourceRecordSet {
func (r ResourceRecordSets) New(name string, rrdatas []string, ttl int64, rrstype rrstype.RrsType) dnsprovider.ResourceRecordSet {
rrstypeStr := string(rrstype)
rrs := &route53.ResourceRecordSet{
Name: &name,
Type: &rrstypeStr,
TTL: &ttl,
}
for _, rrdata := range rrdatas {
rrs.ResourceRecords = append(rrs.ResourceRecords, &route53.ResourceRecord{
Value: aws.String(rrdata),
})
}
return ResourceRecordSet{
&route53.ResourceRecordSet{
Name: &name,
Type: &rrstypeStr,
TTL: &ttl,
ResourceRecords: []*route53.ResourceRecord{
{
Value: &rrdatas[0],
},
},
}, // TODO: Add remaining rrdatas
&rrsets,
rrs,
&r,
}
}

View File

@ -76,17 +76,18 @@ func (r *Route53APIStub) ChangeResourceRecordSets(input *route53.ChangeResourceR
}
for _, change := range input.ChangeBatch.Changes {
key := *change.ResourceRecordSet.Name + "::" + *change.ResourceRecordSet.Type
switch *change.Action {
case route53.ChangeActionCreate:
if _, found := recordSets[*change.ResourceRecordSet.Name]; found {
return nil, fmt.Errorf("Attempt to create duplicate rrset %s", *change.ResourceRecordSet.Name) // TODO: Return AWS errors with codes etc
if _, found := recordSets[key]; found {
return nil, fmt.Errorf("Attempt to create duplicate rrset %s", key) // TODO: Return AWS errors with codes etc
}
recordSets[*change.ResourceRecordSet.Name] = append(recordSets[*change.ResourceRecordSet.Name], change.ResourceRecordSet)
recordSets[key] = append(recordSets[key], change.ResourceRecordSet)
case route53.ChangeActionDelete:
if _, found := recordSets[*change.ResourceRecordSet.Name]; !found {
return nil, fmt.Errorf("Attempt to delete non-existant rrset %s", *change.ResourceRecordSet.Name) // TODO: Check other fields too
if _, found := recordSets[key]; !found {
return nil, fmt.Errorf("Attempt to delete non-existant rrset %s", key) // TODO: Check other fields too
}
delete(recordSets, *change.ResourceRecordSet.Name)
delete(recordSets, key)
case route53.ChangeActionUpsert:
// TODO - not used yet
}

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/federation/pkg/dnsprovider"
"k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype"
"k8s.io/kubernetes/federation/pkg/dnsprovider/tests"
)
func newTestInterface() (dnsprovider.Interface, error) {
@ -110,12 +111,11 @@ func getInvalidRrs(zone dnsprovider.Zone) dnsprovider.ResourceRecordSet {
return rrsets.New("www12."+zone.Name(), []string{"rubbish", "rubbish"}, 180, rrstype.A)
}
func addRrsetOrFail(t *testing.T, rrsets dnsprovider.ResourceRecordSets, rrset dnsprovider.ResourceRecordSet) dnsprovider.ResourceRecordSet {
result, err := rrsets.Add(rrset)
func addRrsetOrFail(t *testing.T, rrsets dnsprovider.ResourceRecordSets, rrset dnsprovider.ResourceRecordSet) {
err := rrsets.StartChangeset().Add(rrset).Apply()
if err != nil {
t.Fatalf("Failed to add recordsets: %v", err)
}
return result
}
/* TestResourceRecordSetsList verifies that listing of zones succeeds */
@ -156,8 +156,9 @@ func TestResourceRecordSetsList(t *testing.T) {
func TestResourceRecordSetsAddSuccess(t *testing.T) {
zone := firstZone(t)
sets := rrs(t, zone)
set := addRrsetOrFail(t, sets, getExampleRrs(zone))
defer sets.Remove(set)
set := getExampleRrs(zone)
addRrsetOrFail(t, sets, set)
defer sets.StartChangeset().Remove(set).Apply()
t.Logf("Successfully added resource record set: %v", set)
}
@ -166,9 +167,9 @@ func TestResourceRecordSetsAdditionVisible(t *testing.T) {
zone := firstZone(t)
sets := rrs(t, zone)
rrset := getExampleRrs(zone)
set := addRrsetOrFail(t, sets, rrset)
defer sets.Remove(set)
t.Logf("Successfully added resource record set: %v", set)
addRrsetOrFail(t, sets, rrset)
defer sets.StartChangeset().Remove(rrset).Apply()
t.Logf("Successfully added resource record set: %v", rrset)
found := false
for _, record := range listRrsOrFail(t, sets) {
if record.Name() == rrset.Name() {
@ -186,16 +187,16 @@ func TestResourceRecordSetsAddDuplicateFail(t *testing.T) {
zone := firstZone(t)
sets := rrs(t, zone)
rrset := getExampleRrs(zone)
set := addRrsetOrFail(t, sets, rrset)
defer sets.Remove(set)
t.Logf("Successfully added resource record set: %v", set)
addRrsetOrFail(t, sets, rrset)
defer sets.StartChangeset().Remove(rrset).Apply()
t.Logf("Successfully added resource record set: %v", rrset)
// Try to add it again, and verify that the call fails.
rrs, err := sets.Add(rrset)
err := sets.StartChangeset().Add(rrset).Apply()
if err == nil {
defer sets.Remove(rrs)
t.Errorf("Should have failed to add duplicate resource record %v, but succeeded instead.", set)
defer sets.StartChangeset().Remove(rrset).Apply()
t.Errorf("Should have failed to add duplicate resource record %v, but succeeded instead.", rrset)
} else {
t.Logf("Correctly failed to add duplicate resource record %v: %v", set, err)
t.Logf("Correctly failed to add duplicate resource record %v: %v", rrset, err)
}
}
@ -204,14 +205,14 @@ func TestResourceRecordSetsRemove(t *testing.T) {
zone := firstZone(t)
sets := rrs(t, zone)
rrset := getExampleRrs(zone)
set := addRrsetOrFail(t, sets, rrset)
err := sets.Remove(set)
addRrsetOrFail(t, sets, rrset)
err := sets.StartChangeset().Remove(rrset).Apply()
if err != nil {
// Try again to clean up.
defer sets.Remove(rrset)
t.Errorf("Failed to remove resource record set %v after adding", rrset)
defer sets.StartChangeset().Remove(rrset).Apply()
t.Errorf("Failed to remove resource record set %v after adding: %v", rrset, err)
} else {
t.Logf("Successfully removed resource set %v after adding", set)
t.Logf("Successfully removed resource set %v after adding", rrset)
}
}
@ -220,14 +221,14 @@ func TestResourceRecordSetsRemoveGone(t *testing.T) {
zone := firstZone(t)
sets := rrs(t, zone)
rrset := getExampleRrs(zone)
set := addRrsetOrFail(t, sets, rrset)
err := sets.Remove(set)
addRrsetOrFail(t, sets, rrset)
err := sets.StartChangeset().Remove(rrset).Apply()
if err != nil {
// Try again to clean up.
defer sets.Remove(rrset)
t.Errorf("Failed to remove resource record set %v after adding", rrset)
defer sets.StartChangeset().Remove(rrset).Apply()
t.Errorf("Failed to remove resource record set %v after adding: %v", rrset, err)
} else {
t.Logf("Successfully removed resource set %v after adding", set)
t.Logf("Successfully removed resource set %v after adding", rrset)
}
// Check that it's gone
list := listRrsOrFail(t, sets)
@ -242,3 +243,21 @@ func TestResourceRecordSetsRemoveGone(t *testing.T) {
t.Errorf("Deleted resource record set %v is still present", rrset)
}
}
/* TestResourceRecordSetsReplace verifies that replacing an RRS works */
func TestResourceRecordSetsReplace(t *testing.T) {
zone := firstZone(t)
tests.CommonTestResourceRecordSetsReplace(t, zone)
}
/* TestResourceRecordSetsReplaceAll verifies that we can remove an RRS and create one with a different name*/
func TestResourceRecordSetsReplaceAll(t *testing.T) {
zone := firstZone(t)
tests.CommonTestResourceRecordSetsReplaceAll(t, zone)
}
/* TestResourceRecordSetsHonorsType verifies that we can add records of the same name but different types */
func TestResourceRecordSetsDifferentTypes(t *testing.T) {
zone := firstZone(t)
tests.CommonTestResourceRecordSetsDifferentTypes(t, zone)
}

View File

@ -43,27 +43,25 @@ func (c ChangesCreateCall) Do(opts ...googleapi.CallOption) (interfaces.Change,
return nil, c.Error
}
zone := (c.Service.Service.ManagedZones_.Impl[c.Project][c.Zone]).(*ManagedZone)
rrsets := map[string]int{} // Simple mechanism to detect dupes and missing rrsets before committing - stores index+1
for i, set := range zone.Rrsets {
rrsets[hashKey(set)] = i + 1
}
for _, add := range c.Change.Additions() {
if rrsets[hashKey(add)] > 0 {
return nil, fmt.Errorf("Attempt to insert duplicate rrset %v", add)
}
rrsets := map[string]ResourceRecordSet{} // compute the new state
for _, set := range zone.Rrsets {
rrsets[hashKey(set)] = set
}
for _, del := range c.Change.Deletions() {
if !(rrsets[hashKey(del)] > 0) {
if _, found := rrsets[hashKey(del)]; !found {
return nil, fmt.Errorf("Attempt to delete non-existent rrset %v", del)
}
delete(rrsets, hashKey(del))
}
for _, add := range c.Change.Additions() {
zone.Rrsets = append(zone.Rrsets, *(add.(*ResourceRecordSet)))
if _, found := rrsets[hashKey(add)]; found {
return nil, fmt.Errorf("Attempt to insert duplicate rrset %v", add)
}
rrsets[hashKey(add)] = add.(ResourceRecordSet)
}
for _, del := range c.Change.Deletions() {
zone.Rrsets = append(
zone.Rrsets[:rrsets[hashKey(del)]-1],
zone.Rrsets[rrsets[hashKey(del)]:]...)
zone.Rrsets = []ResourceRecordSet{}
for _, rrset := range rrsets {
zone.Rrsets = append(zone.Rrsets, rrset)
}
return c.Change, nil
}

View File

@ -55,5 +55,5 @@ func (s ResourceRecordSetsService) List(project string, managedZone string) inte
func (service ResourceRecordSetsService) NewResourceRecordSet(name string, rrdatas []string, ttl int64, type_ rrstype.RrsType) interfaces.ResourceRecordSet {
rrset := ResourceRecordSet{Name_: name, Rrdatas_: rrdatas, Ttl_: ttl, Type_: string(type_)}
return &rrset
return rrset
}

View File

@ -0,0 +1,74 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package clouddns
import (
"fmt"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns/internal/interfaces"
)
// Compile time check for interface adherence
var _ dnsprovider.ResourceRecordChangeset = &ResourceRecordChangeset{}
type ResourceRecordChangeset struct {
rrsets *ResourceRecordSets
additions []dnsprovider.ResourceRecordSet
removals []dnsprovider.ResourceRecordSet
}
func (c *ResourceRecordChangeset) Add(rrset dnsprovider.ResourceRecordSet) dnsprovider.ResourceRecordChangeset {
c.additions = append(c.additions, rrset)
return c
}
func (c *ResourceRecordChangeset) Remove(rrset dnsprovider.ResourceRecordSet) dnsprovider.ResourceRecordChangeset {
c.removals = append(c.removals, rrset)
return c
}
func (c *ResourceRecordChangeset) Apply() error {
rrsets := c.rrsets
service := rrsets.zone.zones.interface_.service.Changes()
var additions []interfaces.ResourceRecordSet
for _, r := range c.additions {
additions = append(additions, r.(ResourceRecordSet).impl)
}
var deletions []interfaces.ResourceRecordSet
for _, r := range c.removals {
deletions = append(deletions, r.(ResourceRecordSet).impl)
}
change := service.NewChange(additions, deletions)
newChange, err := service.Create(rrsets.project(), rrsets.zone.impl.Name(), change).Do()
if err != nil {
return err
}
newAdditions := newChange.Additions()
if len(newAdditions) != len(additions) {
return fmt.Errorf("Internal error when adding resource record set. Call succeeded but number of records returned is incorrect. Records sent=%d, records returned=%d, additions:%v", len(additions), len(newAdditions), c.additions)
}
newDeletions := newChange.Deletions()
if len(newDeletions) != len(deletions) {
return fmt.Errorf("Internal error when deleting resource record set. Call succeeded but number of records returned is incorrect. Records sent=%d, records returned=%d, deletions:%v", len(deletions), len(newDeletions), c.removals)
}
return nil
}

View File

@ -17,14 +17,12 @@ limitations under the License.
package clouddns
import (
"fmt"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns/internal/interfaces"
"k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype"
)
// Compile time check for interface adeherence
// Compile time check for interface adherence
var _ dnsprovider.ResourceRecordSets = ResourceRecordSets{}
type ResourceRecordSets struct {
@ -44,38 +42,14 @@ func (rrsets ResourceRecordSets) List() ([]dnsprovider.ResourceRecordSet, error)
return list, nil
}
func (rrsets ResourceRecordSets) Add(rrset dnsprovider.ResourceRecordSet) (dnsprovider.ResourceRecordSet, error) {
service := rrsets.zone.zones.interface_.service.Changes()
additions := []interfaces.ResourceRecordSet{rrset.(*ResourceRecordSet).impl}
change := service.NewChange(additions, []interfaces.ResourceRecordSet{})
newChange, err := service.Create(rrsets.project(), rrsets.zone.impl.Name(), change).Do()
if err != nil {
return nil, err
func (r ResourceRecordSets) StartChangeset() dnsprovider.ResourceRecordChangeset {
return &ResourceRecordChangeset{
rrsets: &r,
}
newAdditions := newChange.Additions()
if len(newAdditions) != len(additions) {
return nil, fmt.Errorf("Internal error when adding resource record set. Call succeeded but number of records returned is incorrect. Records sent=%d, records returned=%d, record set:%v", len(additions), len(newAdditions), rrset)
}
return ResourceRecordSet{newChange.Additions()[0], &rrsets}, nil
}
func (rrsets ResourceRecordSets) Remove(rrset dnsprovider.ResourceRecordSet) error {
service := rrsets.zone.zones.interface_.service.Changes()
deletions := []interfaces.ResourceRecordSet{rrset.(ResourceRecordSet).impl}
change := service.NewChange([]interfaces.ResourceRecordSet{}, deletions)
newChange, err := service.Create(rrsets.project(), rrsets.zone.impl.Name(), change).Do()
if err != nil {
return err
}
newDeletions := newChange.Deletions()
if len(newDeletions) != len(deletions) {
return fmt.Errorf("Internal error when deleting resource record set. Call succeeded but number of records returned is incorrect. Records sent=%d, records returned=%d, record set:%v", len(deletions), len(newDeletions), rrset)
}
return nil
}
func (rrsets ResourceRecordSets) New(name string, rrdatas []string, ttl int64, rrstype rrstype.RrsType) dnsprovider.ResourceRecordSet {
return &ResourceRecordSet{rrsets.impl.NewResourceRecordSet(name, rrdatas, ttl, rrstype), &rrsets}
func (r ResourceRecordSets) New(name string, rrdatas []string, ttl int64, rrstype rrstype.RrsType) dnsprovider.ResourceRecordSet {
return ResourceRecordSet{r.impl.NewResourceRecordSet(name, rrdatas, ttl, rrstype), &r}
}
func (rrsets ResourceRecordSets) project() string {

View File

@ -0,0 +1,184 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tests
import (
"reflect"
"testing"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
"k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype"
)
/* CommonTestResourceRecordSetsReplace verifies that replacing an RRS works */
func CommonTestResourceRecordSetsReplace(t *testing.T, zone dnsprovider.Zone) {
rrsets, _ := zone.ResourceRecordSets()
sets := rrs(t, zone)
rrset := rrsets.New("alpha.test.com", []string{"8.8.4.4"}, 40, rrstype.A)
addRrsetOrFail(t, sets, rrset)
defer sets.StartChangeset().Remove(rrset).Apply()
t.Logf("Successfully added resource record set: %v", rrset)
// Replace the record (change ttl and rrdatas)
newRrset := rrsets.New("alpha.test.com", []string{"8.8.8.8"}, 80, rrstype.A)
err := sets.StartChangeset().Add(newRrset).Remove(rrset).Apply()
if err != nil {
t.Errorf("Failed to replace resource record set %v -> %v: %v", rrset, newRrset, err)
} else {
t.Logf("Correctly replaced resource record %v -> %v", rrset, newRrset)
}
defer sets.StartChangeset().Remove(newRrset).Apply()
// Check that the record was updated
assertHasRecord(t, sets, newRrset)
}
/* CommonTestResourceRecordSetsReplaceAll verifies that we can remove an RRS and create one with a different name*/
func CommonTestResourceRecordSetsReplaceAll(t *testing.T, zone dnsprovider.Zone) {
rrsets, _ := zone.ResourceRecordSets()
sets := rrs(t, zone)
rrset := rrsets.New("alpha.test.com", []string{"8.8.4.4"}, 40, rrstype.A)
addRrsetOrFail(t, sets, rrset)
defer sets.StartChangeset().Remove(rrset).Apply()
t.Logf("Successfully added resource record set: %v", rrset)
newRrset := rrsets.New("beta.test.com", []string{"8.8.8.8"}, 80, rrstype.A)
// Try to add it again, and verify that the call fails.
err := sets.StartChangeset().Add(newRrset).Remove(rrset).Apply()
if err != nil {
t.Errorf("Failed to replace resource record set %v -> %v: %v", rrset, newRrset, err)
} else {
defer sets.StartChangeset().Remove(newRrset).Apply()
t.Logf("Correctly replaced resource record %v -> %v", rrset, newRrset)
}
// Check that it was updated
assertHasRecord(t, sets, newRrset)
assertNotHasRecord(t, sets, rrset.Name(), rrset.Type())
}
/* CommonTestResourceRecordSetsHonorsType verifies that we can add records of the same name but different types */
func CommonTestResourceRecordSetsDifferentTypes(t *testing.T, zone dnsprovider.Zone) {
rrsets, _ := zone.ResourceRecordSets()
sets := rrs(t, zone)
rrset := rrsets.New("alpha.test.com", []string{"8.8.4.4"}, 40, rrstype.A)
addRrsetOrFail(t, sets, rrset)
defer sets.StartChangeset().Remove(rrset).Apply()
t.Logf("Successfully added resource record set: %v", rrset)
cnameRrset := rrsets.New("alpha.test.com", []string{"cname.test.com"}, 80, rrstype.CNAME)
// Add the resource with the same name but different type
err := sets.StartChangeset().Add(cnameRrset).Apply()
if err != nil {
t.Errorf("Failed to add resource record set %v: %v", cnameRrset, err)
}
defer sets.StartChangeset().Remove(cnameRrset).Apply()
// Check that both records exist
assertHasRecord(t, sets, cnameRrset)
assertHasRecord(t, sets, rrset)
}
/* rrs returns the ResourceRecordSets interface for a given zone */
func rrs(t *testing.T, zone dnsprovider.Zone) (r dnsprovider.ResourceRecordSets) {
rrsets, supported := zone.ResourceRecordSets()
if !supported {
t.Fatalf("ResourceRecordSets interface not supported by zone %v", zone)
return r
}
return rrsets
}
func listRrsOrFail(t *testing.T, rrsets dnsprovider.ResourceRecordSets) []dnsprovider.ResourceRecordSet {
rrset, err := rrsets.List()
if err != nil {
t.Fatalf("Failed to list recordsets: %v", err)
} else {
if len(rrset) < 0 {
t.Fatalf("Record set length=%d, expected >=0", len(rrset))
} else {
t.Logf("Got %d recordsets: %v", len(rrset), rrset)
}
}
return rrset
}
// assertHasRecord tests that rrsets has a record equivalent to rrset
func assertHasRecord(t *testing.T, rrsets dnsprovider.ResourceRecordSets, rrset dnsprovider.ResourceRecordSet) {
var found dnsprovider.ResourceRecordSet
for _, r := range listRrsOrFail(t, rrsets) {
if r.Name() != rrset.Name() || r.Type() != rrset.Type() {
continue
}
if found != nil {
t.Errorf("found duplicate resource record set: %q and %q", r, found)
}
found = r
}
if found == nil {
t.Errorf("resource record set %v not found", rrset)
} else {
assertEquivalent(t, found, rrset)
}
}
// assertNotHasRecord tests that rrsets does not have a record matching name and type
func assertNotHasRecord(t *testing.T, rrsets dnsprovider.ResourceRecordSets, name string, rrstype rrstype.RrsType) {
var found dnsprovider.ResourceRecordSet
for _, r := range listRrsOrFail(t, rrsets) {
if r.Name() != name || r.Type() != rrstype {
continue
}
if found != nil {
t.Errorf("found duplicate resource record set: %q and %q", r, found)
}
found = r
}
if found != nil {
t.Errorf("resource record set found unexpectedly: %v", found)
}
}
// assertEquivalent tests that l is equal to r, for the methods in ResourceRecordSet
func assertEquivalent(t *testing.T, l, r dnsprovider.ResourceRecordSet) {
if l.Name() != r.Name() {
t.Errorf("resource record sets not equal %v vs %v", l, r)
}
if l.Type() != r.Type() {
t.Errorf("resource record sets not equal %v vs %v", l, r)
}
if l.Ttl() != r.Ttl() {
t.Errorf("resource record sets not equal %v vs %v", l, r)
}
if !reflect.DeepEqual(l.Rrdatas(), r.Rrdatas()) {
t.Errorf("resource record sets not equal %v vs %v", l, r)
}
}
func addRrsetOrFail(t *testing.T, rrsets dnsprovider.ResourceRecordSets, rrset dnsprovider.ResourceRecordSet) {
err := rrsets.StartChangeset().Add(rrset).Apply()
if err != nil {
t.Fatalf("Failed to add recordsets: %v", err)
}
}

View File

@ -173,7 +173,7 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
glog.V(4).Infof("Creating CNAME to %q for %q", uplevelCname, dnsName)
newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
glog.V(4).Infof("Adding recordset %v", newRrset)
rrset, err = rrsets.Add(newRrset)
err = rrsets.StartChangeset().Add(newRrset).Apply()
if err != nil {
return err
}
@ -192,7 +192,7 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
}
newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
glog.V(4).Infof("Adding recordset %v", newRrset)
rrset, err = rrsets.Add(newRrset)
err = rrsets.StartChangeset().Add(newRrset).Apply()
if err != nil {
return err
}
@ -211,18 +211,20 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
return nil
} else {
// Need to replace the existing one with a better one (or just remove it if we have no healthy endpoints).
// TODO: Ideally do these inside a transaction, or do an atomic update, but dnsprovider interface doesn't support that yet.
glog.V(4).Infof("Existing recordset %v not equivalent to needed recordset %v removing existing and adding needed.", rrset, newRrset)
if err = rrsets.Remove(rrset); err != nil {
return err
}
glog.V(4).Infof("Successfully removed existing recordset %v", rrset)
changeSet := rrsets.StartChangeset()
changeSet.Remove(rrset)
if uplevelCname != "" {
if _, err = rrsets.Add(newRrset); err != nil {
changeSet.Add(newRrset)
if err := changeSet.Apply(); err != nil {
return err
}
glog.V(4).Infof("Successfully added needed recordset %v", newRrset)
glog.V(4).Infof("Successfully replaced needed recordset %v -> %v", rrset, newRrset)
} else {
if err := changeSet.Apply(); err != nil {
return err
}
glog.V(4).Infof("Successfully removed existing recordset %v", rrset)
glog.V(4).Infof("Uplevel CNAME is empty string. Not adding recordset %v", newRrset)
}
}
@ -243,15 +245,11 @@ func (s *ServiceController) ensureDnsRrsets(dnsZoneName, dnsName string, endpoin
return nil
} else {
// Need to replace the existing one with a better one
// TODO: Ideally do these inside a transaction, or do an atomic update, but dnsprovider interface doesn't support that yet.
glog.V(4).Infof("Existing recordset %v is not equivalent to needed recordset %v, removing existing and adding needed.", rrset, newRrset)
if err = rrsets.Remove(rrset); err != nil {
return err
}
glog.V(4).Infof("Successfully removed existing recordset %v", rrset)
if _, err = rrsets.Add(newRrset); err != nil {
if err = rrsets.StartChangeset().Remove(rrset).Add(newRrset).Apply(); err != nil {
return err
}
glog.V(4).Infof("Successfully replaced recordset %v -> %v", rrset, newRrset)
}
}
}

View File

@ -105,10 +105,11 @@ func (cc *clusterClientCache) processEndpointDeletion(cachedService *cachedServi
glog.V(4).Infof("Cached endpoint was found for %s/%s, cluster %s, removing", cachedService.lastState.Namespace, cachedService.lastState.Name, clusterName)
delete(cachedService.endpointMap, clusterName)
for i := 0; i < clientRetryCount; i++ {
if err := serviceController.ensureDnsRecords(clusterName, cachedService); err == nil {
err := serviceController.ensureDnsRecords(clusterName, cachedService)
if err == nil {
return nil
}
glog.V(4).Infof("Error ensuring DNS Records: %v", err)
glog.Infof("Error ensuring DNS Records: %v", err)
time.Sleep(cachedService.nextDNSUpdateDelay())
}
}