diff --git a/pkg/volume/util/recyclerclient/recycler_client.go b/pkg/volume/util/recyclerclient/recycler_client.go new file mode 100644 index 00000000000..138a35c53e1 --- /dev/null +++ b/pkg/volume/util/recyclerclient/recycler_client.go @@ -0,0 +1,252 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package recyclerclient + +import ( + "fmt" + + "github.com/golang/glog" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/watch" + clientset "k8s.io/client-go/kubernetes" +) + +type RecycleEventRecorder func(eventtype, message string) + +// RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume +// Recyclers. This function will save the given Pod to the API and watch it +// until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded, +// whichever comes first. An attempt to delete a recycler pod is always +// attempted before returning. +// +// In case there is a pod with the same namespace+name already running, this +// function deletes it as it is not able to judge if it is an old recycler +// or user has forged a fake recycler to block Kubernetes from recycling.// +// +// pod - the pod designed by a volume plugin to recycle the volume. pod.Name +// will be overwritten with unique name based on PV.Name. +// client - kube client for API operations. +func RecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *v1.Pod, kubeClient clientset.Interface, recorder RecycleEventRecorder) error { + return internalRecycleVolumeByWatchingPodUntilCompletion(pvName, pod, newRecyclerClient(kubeClient, recorder)) +} + +// same as above func comments, except 'recyclerClient' is a narrower pod API +// interface to ease testing +func internalRecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *v1.Pod, recyclerClient recyclerClient) error { + glog.V(5).Infof("creating recycler pod for volume %s\n", pod.Name) + + // Generate unique name for the recycler pod - we need to get "already + // exists" error when a previous controller has already started recycling + // the volume. Here we assume that pv.Name is already unique. + pod.Name = "recycler-for-" + pvName + pod.GenerateName = "" + + stopChannel := make(chan struct{}) + defer close(stopChannel) + podCh, err := recyclerClient.WatchPod(pod.Name, pod.Namespace, stopChannel) + if err != nil { + glog.V(4).Infof("cannot start watcher for pod %s/%s: %v", pod.Namespace, pod.Name, err) + return err + } + + // Start the pod + _, err = recyclerClient.CreatePod(pod) + if err != nil { + if errors.IsAlreadyExists(err) { + deleteErr := recyclerClient.DeletePod(pod.Name, pod.Namespace) + if deleteErr != nil { + return fmt.Errorf("failed to delete old recycler pod %s/%s: %s", pod.Namespace, pod.Name, deleteErr) + } + // Recycler will try again and the old pod will be hopefuly deleted + // at that time. + return fmt.Errorf("old recycler pod found, will retry later") + } + return fmt.Errorf("unexpected error creating recycler pod: %+v", err) + } + err = waitForPod(pod, recyclerClient, podCh) + + // In all cases delete the recycler pod and log its result. + glog.V(2).Infof("deleting recycler pod %s/%s", pod.Namespace, pod.Name) + deleteErr := recyclerClient.DeletePod(pod.Name, pod.Namespace) + if deleteErr != nil { + glog.Errorf("failed to delete recycler pod %s/%s: %v", pod.Namespace, pod.Name, err) + } + + // Returning recycler error is preferred, the pod will be deleted again on + // the next retry. + if err != nil { + return fmt.Errorf("failed to recycle volume: %s", err) + } + + // Recycle succeeded but we failed to delete the recycler pod. Report it, + // the controller will re-try recycling the PV again shortly. + if deleteErr != nil { + return fmt.Errorf("failed to delete recycler pod: %s", deleteErr) + } + + return nil +} + +// waitForPod watches the pod it until it finishes and send all events on the +// pod to the PV. +func waitForPod(pod *v1.Pod, recyclerClient recyclerClient, podCh <-chan watch.Event) error { + for { + event, ok := <-podCh + if !ok { + return fmt.Errorf("recycler pod %q watch channel had been closed", pod.Name) + } + switch event.Object.(type) { + case *v1.Pod: + // POD changed + pod := event.Object.(*v1.Pod) + glog.V(4).Infof("recycler pod update received: %s %s/%s %s", event.Type, pod.Namespace, pod.Name, pod.Status.Phase) + switch event.Type { + case watch.Added, watch.Modified: + if pod.Status.Phase == v1.PodSucceeded { + // Recycle succeeded. + return nil + } + if pod.Status.Phase == v1.PodFailed { + if pod.Status.Message != "" { + return fmt.Errorf(pod.Status.Message) + } else { + return fmt.Errorf("pod failed, pod.Status.Message unknown.") + } + } + + case watch.Deleted: + return fmt.Errorf("recycler pod was deleted") + + case watch.Error: + return fmt.Errorf("recycler pod watcher failed") + } + + case *v1.Event: + // Event received + podEvent := event.Object.(*v1.Event) + glog.V(4).Infof("recycler event received: %s %s/%s %s/%s %s", event.Type, podEvent.Namespace, podEvent.Name, podEvent.InvolvedObject.Namespace, podEvent.InvolvedObject.Name, podEvent.Message) + if event.Type == watch.Added { + recyclerClient.Event(podEvent.Type, podEvent.Message) + } + } + } +} + +// recyclerClient abstracts access to a Pod by providing a narrower interface. +// This makes it easier to mock a client for testing. +type recyclerClient interface { + CreatePod(pod *v1.Pod) (*v1.Pod, error) + GetPod(name, namespace string) (*v1.Pod, error) + DeletePod(name, namespace string) error + // WatchPod returns a ListWatch for watching a pod. The stopChannel is used + // to close the reflector backing the watch. The caller is responsible for + // derring a close on the channel to stop the reflector. + WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error) + // Event sends an event to the volume that is being recycled. + Event(eventtype, message string) +} + +func newRecyclerClient(client clientset.Interface, recorder RecycleEventRecorder) recyclerClient { + return &realRecyclerClient{ + client, + recorder, + } +} + +type realRecyclerClient struct { + client clientset.Interface + recorder RecycleEventRecorder +} + +func (c *realRecyclerClient) CreatePod(pod *v1.Pod) (*v1.Pod, error) { + return c.client.CoreV1().Pods(pod.Namespace).Create(pod) +} + +func (c *realRecyclerClient) GetPod(name, namespace string) (*v1.Pod, error) { + return c.client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) +} + +func (c *realRecyclerClient) DeletePod(name, namespace string) error { + return c.client.CoreV1().Pods(namespace).Delete(name, nil) +} + +func (c *realRecyclerClient) Event(eventtype, message string) { + c.recorder(eventtype, message) +} + +func (c *realRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error) { + podSelector, err := fields.ParseSelector("metadata.name=" + name) + if err != nil { + return nil, err + } + options := metav1.ListOptions{ + FieldSelector: podSelector.String(), + Watch: true, + } + + podWatch, err := c.client.CoreV1().Pods(namespace).Watch(options) + if err != nil { + return nil, err + } + + eventSelector, _ := fields.ParseSelector("involvedObject.name=" + name) + eventWatch, err := c.client.CoreV1().Events(namespace).Watch(metav1.ListOptions{ + FieldSelector: eventSelector.String(), + Watch: true, + }) + if err != nil { + podWatch.Stop() + return nil, err + } + + eventCh := make(chan watch.Event, 30) + + go func() { + defer eventWatch.Stop() + defer podWatch.Stop() + defer close(eventCh) + var podWatchChannelClosed bool + var eventWatchChannelClosed bool + for { + select { + case _ = <-stopChannel: + return + + case podEvent, ok := <-podWatch.ResultChan(): + if !ok { + podWatchChannelClosed = true + } else { + eventCh <- podEvent + } + case eventEvent, ok := <-eventWatch.ResultChan(): + if !ok { + eventWatchChannelClosed = true + } else { + eventCh <- eventEvent + } + } + if podWatchChannelClosed && eventWatchChannelClosed { + break + } + } + }() + + return eventCh, nil +} diff --git a/pkg/volume/util/recyclerclient/recycler_client_test.go b/pkg/volume/util/recyclerclient/recycler_client_test.go new file mode 100644 index 00000000000..0bde6a35545 --- /dev/null +++ b/pkg/volume/util/recyclerclient/recycler_client_test.go @@ -0,0 +1,235 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package recyclerclient + +import ( + "fmt" + "testing" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + api "k8s.io/kubernetes/pkg/apis/core" +) + +type testcase struct { + // Input of the test + name string + existingPod *v1.Pod + createPod *v1.Pod + // eventSequence is list of events that are simulated during recycling. It + // can be either event generated by a recycler pod or a state change of + // the pod. (see newPodEvent and newEvent below). + eventSequence []watch.Event + + // Expected output. + // expectedEvents is list of events that were sent to the volume that was + // recycled. + expectedEvents []mockEvent + expectedError string +} + +func newPodEvent(eventtype watch.EventType, name string, phase v1.PodPhase, message string) watch.Event { + return watch.Event{ + Type: eventtype, + Object: newPod(name, phase, message), + } +} + +func newEvent(eventtype, message string) watch.Event { + return watch.Event{ + Type: watch.Added, + Object: &v1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + }, + Reason: "MockEvent", + Message: message, + Type: eventtype, + }, + } +} + +func newPod(name string, phase v1.PodPhase, message string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: name, + }, + Status: v1.PodStatus{ + Phase: phase, + Message: message, + }, + } +} + +func TestRecyclerPod(t *testing.T) { + tests := []testcase{ + { + // Test recycler success with some events + name: "RecyclerSuccess", + createPod: newPod("podRecyclerSuccess", v1.PodPending, ""), + eventSequence: []watch.Event{ + // Pod gets Running and Succeeded + newPodEvent(watch.Added, "podRecyclerSuccess", v1.PodPending, ""), + newEvent(v1.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerSuccess to 127.0.0.1"), + newEvent(v1.EventTypeNormal, "pulling image \"gcr.io/google_containers/busybox\""), + newEvent(v1.EventTypeNormal, "Successfully pulled image \"gcr.io/google_containers/busybox\""), + newEvent(v1.EventTypeNormal, "Created container with docker id 83d929aeac82"), + newEvent(v1.EventTypeNormal, "Started container with docker id 83d929aeac82"), + newPodEvent(watch.Modified, "podRecyclerSuccess", v1.PodRunning, ""), + newPodEvent(watch.Modified, "podRecyclerSuccess", v1.PodSucceeded, ""), + }, + expectedEvents: []mockEvent{ + {v1.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerSuccess to 127.0.0.1"}, + {v1.EventTypeNormal, "pulling image \"gcr.io/google_containers/busybox\""}, + {v1.EventTypeNormal, "Successfully pulled image \"gcr.io/google_containers/busybox\""}, + {v1.EventTypeNormal, "Created container with docker id 83d929aeac82"}, + {v1.EventTypeNormal, "Started container with docker id 83d929aeac82"}, + }, + expectedError: "", + }, + { + // Test recycler failure with some events + name: "RecyclerFailure", + createPod: newPod("podRecyclerFailure", v1.PodPending, ""), + eventSequence: []watch.Event{ + // Pod gets Running and Succeeded + newPodEvent(watch.Added, "podRecyclerFailure", v1.PodPending, ""), + newEvent(v1.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerFailure to 127.0.0.1"), + newEvent(v1.EventTypeWarning, "Unable to mount volumes for pod \"recycler-for-podRecyclerFailure_default(3c9809e5-347c-11e6-a79b-3c970e965218)\": timeout expired waiting for volumes to attach/mount"), + newEvent(v1.EventTypeWarning, "Error syncing pod, skipping: timeout expired waiting for volumes to attach/mount for pod \"default\"/\"recycler-for-podRecyclerFailure\". list of unattached/unmounted"), + newPodEvent(watch.Modified, "podRecyclerFailure", v1.PodRunning, ""), + newPodEvent(watch.Modified, "podRecyclerFailure", v1.PodFailed, "Pod was active on the node longer than specified deadline"), + }, + expectedEvents: []mockEvent{ + {v1.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerFailure to 127.0.0.1"}, + {v1.EventTypeWarning, "Unable to mount volumes for pod \"recycler-for-podRecyclerFailure_default(3c9809e5-347c-11e6-a79b-3c970e965218)\": timeout expired waiting for volumes to attach/mount"}, + {v1.EventTypeWarning, "Error syncing pod, skipping: timeout expired waiting for volumes to attach/mount for pod \"default\"/\"recycler-for-podRecyclerFailure\". list of unattached/unmounted"}, + }, + expectedError: "failed to recycle volume: Pod was active on the node longer than specified deadline", + }, + { + // Recycler pod gets deleted + name: "RecyclerDeleted", + createPod: newPod("podRecyclerDeleted", v1.PodPending, ""), + eventSequence: []watch.Event{ + // Pod gets Running and Succeeded + newPodEvent(watch.Added, "podRecyclerDeleted", v1.PodPending, ""), + newEvent(v1.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerDeleted to 127.0.0.1"), + newPodEvent(watch.Deleted, "podRecyclerDeleted", v1.PodPending, ""), + }, + expectedEvents: []mockEvent{ + {v1.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerDeleted to 127.0.0.1"}, + }, + expectedError: "failed to recycle volume: recycler pod was deleted", + }, + { + // Another recycler pod is already running + name: "RecyclerRunning", + existingPod: newPod("podOldRecycler", v1.PodRunning, ""), + createPod: newPod("podNewRecycler", v1.PodFailed, "mock message"), + eventSequence: []watch.Event{}, + expectedError: "old recycler pod found, will retry later", + }, + } + + for _, test := range tests { + t.Logf("Test %q", test.name) + client := &mockRecyclerClient{ + events: test.eventSequence, + pod: test.existingPod, + } + err := internalRecycleVolumeByWatchingPodUntilCompletion(test.createPod.Name, test.createPod, client) + receivedError := "" + if err != nil { + receivedError = err.Error() + } + if receivedError != test.expectedError { + t.Errorf("Test %q failed, expected error %q, got %q", test.name, test.expectedError, receivedError) + continue + } + if !client.deletedCalled { + t.Errorf("Test %q failed, expected deferred client.Delete to be called on recycler pod", test.name) + continue + } + for i, expectedEvent := range test.expectedEvents { + if len(client.receivedEvents) <= i { + t.Errorf("Test %q failed, expected event %d: %q not received", test.name, i, expectedEvent.message) + continue + } + receivedEvent := client.receivedEvents[i] + if expectedEvent.eventtype != receivedEvent.eventtype { + t.Errorf("Test %q failed, event %d does not match: expected eventtype %q, got %q", test.name, i, expectedEvent.eventtype, receivedEvent.eventtype) + } + if expectedEvent.message != receivedEvent.message { + t.Errorf("Test %q failed, event %d does not match: expected message %q, got %q", test.name, i, expectedEvent.message, receivedEvent.message) + } + } + for i := len(test.expectedEvents); i < len(client.receivedEvents); i++ { + t.Errorf("Test %q failed, unexpected event received: %s, %q", test.name, client.receivedEvents[i].eventtype, client.receivedEvents[i].message) + } + } +} + +type mockRecyclerClient struct { + pod *v1.Pod + deletedCalled bool + receivedEvents []mockEvent + events []watch.Event +} + +type mockEvent struct { + eventtype, message string +} + +func (c *mockRecyclerClient) CreatePod(pod *v1.Pod) (*v1.Pod, error) { + if c.pod == nil { + c.pod = pod + return c.pod, nil + } + // Simulate "already exists" error + return nil, errors.NewAlreadyExists(api.Resource("pods"), pod.Name) +} + +func (c *mockRecyclerClient) GetPod(name, namespace string) (*v1.Pod, error) { + if c.pod != nil { + return c.pod, nil + } else { + return nil, fmt.Errorf("pod does not exist") + } +} + +func (c *mockRecyclerClient) DeletePod(name, namespace string) error { + c.deletedCalled = true + return nil +} + +func (c *mockRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error) { + eventCh := make(chan watch.Event, 0) + go func() { + for _, e := range c.events { + eventCh <- e + } + }() + return eventCh, nil +} + +func (c *mockRecyclerClient) Event(eventtype, message string) { + c.receivedEvents = append(c.receivedEvents, mockEvent{eventtype, message}) +}