diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 1a1094ad40d..56f11a47b97 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -41,8 +41,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota" "github.com/GoogleCloudPlatform/kubernetes/pkg/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/volumeclaimbinder" + "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/pflag" diff --git a/examples/persistent-volumes/simpletest/pod.yaml b/examples/persistent-volumes/simpletest/pod.yaml index f9bab5316ab..f7f686c0404 100644 --- a/examples/persistent-volumes/simpletest/pod.yaml +++ b/examples/persistent-volumes/simpletest/pod.yaml @@ -17,4 +17,4 @@ spec: volumes: - name: mypd persistentVolumeClaim: - claimName: myclaim-1 \ No newline at end of file + claimName: myclaim-1 diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index dfe8d6c4b6e..df82dc97a2f 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -461,6 +461,10 @@ func ValidatePersistentVolume(pv *api.PersistentVolume) errs.ValidationErrorList numVolumes++ allErrs = append(allErrs, validateAWSElasticBlockStoreVolumeSource(pv.Spec.AWSElasticBlockStore).Prefix("awsElasticBlockStore")...) } + if pv.Spec.Glusterfs != nil { + numVolumes++ + allErrs = append(allErrs, validateGlusterfs(pv.Spec.Glusterfs).Prefix("glusterfs")...) + } if numVolumes != 1 { allErrs = append(allErrs, errs.NewFieldInvalid("", pv.Spec.PersistentVolumeSource, "exactly 1 volume type is required")) } diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go index da62b57cda8..90576f74913 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go @@ -39,6 +39,7 @@ type PersistentVolumeClaimBinder struct { volumeController *framework.Controller claimController *framework.Controller client binderClient + stopChannels map[string]chan struct{} } // NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder @@ -234,8 +235,27 @@ func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCli func (controller *PersistentVolumeClaimBinder) Run() { glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n") - go controller.claimController.Run(make(chan struct{})) - go controller.volumeController.Run(make(chan struct{})) + if controller.stopChannels == nil { + controller.stopChannels = make(map[string]chan struct{}) + } + + if _, exists := controller.stopChannels["volumes"]; !exists { + controller.stopChannels["volumes"] = make(chan struct{}) + go controller.volumeController.Run(controller.stopChannels["volumes"]) + } + + if _, exists := controller.stopChannels["claims"]; !exists { + controller.stopChannels["claims"] = make(chan struct{}) + go controller.claimController.Run(controller.stopChannels["claims"]) + } +} + +func (controller *PersistentVolumeClaimBinder) Stop() { + glog.V(5).Infof("Stopping PersistentVolumeClaimBinder\n") + for name, stopChan := range controller.stopChannels { + close(stopChan) + delete(controller.stopChannels, name) + } } // binderClient abstracts access to PVs and PVCs diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go index c6bc047afaa..d1999a4a5dd 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" @@ -27,6 +28,28 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" ) +func TestRunStop(t *testing.T) { + o := testclient.NewObjects(api.Scheme) + client := &testclient.Fake{ReactFn: testclient.ObjectReaction(o, latest.RESTMapper)} + binder := NewPersistentVolumeClaimBinder(client, 1*time.Second) + + if len(binder.stopChannels) != 0 { + t.Errorf("Non-running binder should not have any stopChannels. Got %v", len(binder.stopChannels)) + } + + binder.Run() + + if len(binder.stopChannels) != 2 { + t.Errorf("Running binder should have exactly 2 stopChannels. Got %v", len(binder.stopChannels)) + } + + binder.Stop() + + if len(binder.stopChannels) != 0 { + t.Errorf("Non-running binder should not have any stopChannels. Got %v", len(binder.stopChannels)) + } +} + func TestExampleObjects(t *testing.T) { scenarios := map[string]struct { diff --git a/test/integration/persistent_volumes_test.go b/test/integration/persistent_volumes_test.go new file mode 100644 index 00000000000..87be82de4e8 --- /dev/null +++ b/test/integration/persistent_volumes_test.go @@ -0,0 +1,190 @@ +// +build integration,!no-etcd + +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volumeclaimbinder" +) + +func init() { + requireEtcd() +} + +func TestPersistentVolumeClaimBinder(t *testing.T) { + _, s := runAMaster(t) + defer s.Close() + + deleteAllEtcdKeys() + client := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Version()}) + + binder := volumeclaimbinder.NewPersistentVolumeClaimBinder(client, 1*time.Second) + binder.Run() + defer binder.Stop() + + for _, volume := range createTestVolumes() { + _, err := client.PersistentVolumes().Create(volume) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + volumes, err := client.PersistentVolumes().List(labels.Everything(), fields.Everything()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(volumes.Items) != 2 { + t.Errorf("expected 2 PVs, got %#v", len(volumes.Items)) + } + + for _, claim := range createTestClaims() { + _, err := client.PersistentVolumeClaims(api.NamespaceDefault).Create(claim) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + claims, err := client.PersistentVolumeClaims(api.NamespaceDefault).List(labels.Everything(), fields.Everything()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(claims.Items) != 3 { + t.Errorf("expected 3 PVCs, got %#v", len(claims.Items)) + } + + // make sure the binder has caught up + time.Sleep(2 * time.Second) + + for _, claim := range createTestClaims() { + claim, err := client.PersistentVolumeClaims(api.NamespaceDefault).Get(claim.Name) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if (claim.Name == "claim01" || claim.Name == "claim02") && claim.Status.VolumeRef == nil { + t.Errorf("Expected claim to be bound: %v", claim) + } + if claim.Name == "claim03" && claim.Status.VolumeRef != nil { + t.Errorf("Expected claim03 to be unbound: %v", claim) + } + } +} + +func createTestClaims() []*api.PersistentVolumeClaim { + return []*api.PersistentVolumeClaim{ + { + ObjectMeta: api.ObjectMeta{ + Name: "claim03", + Namespace: api.NamespaceDefault, + }, + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.AccessModeType{api.ReadWriteOnce}, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("500G"), + }, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "claim01", + Namespace: api.NamespaceDefault, + }, + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce}, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("8G"), + }, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "claim02", + Namespace: api.NamespaceDefault, + }, + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce, api.ReadWriteMany}, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("5G"), + }, + }, + }, + }, + } +} + +func createTestVolumes() []*api.PersistentVolume { + return []*api.PersistentVolume{ + { + ObjectMeta: api.ObjectMeta{ + UID: "gce-pd-10", + Name: "gce003", + }, + Spec: api.PersistentVolumeSpec{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("10G"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ + PDName: "gce123123123", + FSType: "foo", + }, + }, + AccessModes: []api.AccessModeType{ + api.ReadWriteOnce, + api.ReadOnlyMany, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + UID: "nfs-5", + Name: "nfs002", + }, + Spec: api.PersistentVolumeSpec{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("5G"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + Glusterfs: &api.GlusterfsVolumeSource{ + EndpointsName: "andintheend", + Path: "theloveyoutakeisequaltotheloveyoumake", + }, + }, + AccessModes: []api.AccessModeType{ + api.ReadWriteOnce, + api.ReadOnlyMany, + api.ReadWriteMany, + }, + }, + }, + } +}