mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-17 15:13:08 +00:00
Support recovery from in the middle of a rename.
This commit is contained in:
@@ -18,23 +18,15 @@ package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/md5"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl"
|
||||
cmdutil "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl/cmd/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl/resource"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
@@ -131,16 +123,26 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg
|
||||
return err
|
||||
}
|
||||
|
||||
updaterClient := kubectl.NewRollingUpdaterClient(client)
|
||||
|
||||
var newRc *api.ReplicationController
|
||||
// fetch rc
|
||||
oldRc, err := client.ReplicationControllers(cmdNamespace).Get(oldName)
|
||||
if err != nil {
|
||||
return err
|
||||
if !errors.IsNotFound(err) || len(image) == 0 || len(args) > 1 {
|
||||
return err
|
||||
}
|
||||
// We're in the middle of a rename, look for an RC with a source annotation of oldName
|
||||
newRc, err := kubectl.FindSourceController(updaterClient, cmdNamespace, oldName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return kubectl.Rename(kubectl.NewRollingUpdaterClient(client), newRc, oldName)
|
||||
}
|
||||
|
||||
keepOldName := false
|
||||
var keepOldName bool
|
||||
|
||||
mapper, typer := f.Object()
|
||||
var newRc *api.ReplicationController
|
||||
|
||||
if len(filename) != 0 {
|
||||
obj, err := resource.NewBuilder(mapper, typer, f.ClientMapperForCommand()).
|
||||
@@ -161,73 +163,36 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg
|
||||
// than the old rc. This selector is the hash of the rc, which will differ because the new rc has a
|
||||
// different image.
|
||||
if len(image) != 0 {
|
||||
var newName string
|
||||
var err error
|
||||
|
||||
if len(args) >= 2 {
|
||||
newName = args[1]
|
||||
keepOldName = len(args) == 1
|
||||
newName := findNewName(args, oldRc)
|
||||
if newRc, err = kubectl.LoadExistingNextReplicationController(client, cmdNamespace, newName); err != nil {
|
||||
return err
|
||||
}
|
||||
if newRc != nil {
|
||||
fmt.Fprintf(out, "Found existing update in progress (%s), resuming.\n", newRc.Name)
|
||||
} else {
|
||||
newName, _ = kubectl.GetNextControllerAnnotation(oldRc)
|
||||
}
|
||||
|
||||
if len(newName) > 0 {
|
||||
newRc, err = client.ReplicationControllers(cmdNamespace).Get(newName)
|
||||
if err != nil {
|
||||
if !apierrors.IsNotFound(err) {
|
||||
return err
|
||||
} else {
|
||||
newRc = nil
|
||||
}
|
||||
} else {
|
||||
fmt.Fprint(out, "Found existing update in progress (%s), resuming.\n", newName)
|
||||
}
|
||||
}
|
||||
if newRc == nil {
|
||||
// load the old RC into the "new" RC
|
||||
if newRc, err = client.ReplicationControllers(cmdNamespace).Get(oldName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(newRc.Spec.Template.Spec.Containers) > 1 {
|
||||
// TODO: support multi-container image update.
|
||||
return errors.New("Image update is not supported for multi-container pods")
|
||||
}
|
||||
if len(newRc.Spec.Template.Spec.Containers) == 0 {
|
||||
return cmdutil.UsageError(cmd, "Pod has no containers! (%v)", newRc)
|
||||
}
|
||||
newRc.Spec.Template.Spec.Containers[0].Image = image
|
||||
|
||||
newHash, err := hashObject(newRc, client.Codec)
|
||||
newRc, err = kubectl.CreateNewControllerFromCurrentController(client, cmdNamespace, oldName, newName, image, deploymentKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(newName) == 0 {
|
||||
keepOldName = true
|
||||
newName = fmt.Sprintf("%s-%s", newRc.Name, newHash)
|
||||
}
|
||||
newRc.Name = newName
|
||||
|
||||
newRc.Spec.Selector[deploymentKey] = newHash
|
||||
newRc.Spec.Template.Labels[deploymentKey] = newHash
|
||||
// Clear resource version after hashing so that identical updates get different hashes.
|
||||
newRc.ResourceVersion = ""
|
||||
|
||||
kubectl.SetNextControllerAnnotation(oldRc, newName)
|
||||
if _, found := oldRc.Spec.Selector[deploymentKey]; !found {
|
||||
if oldRc, err = addDeploymentKeyToReplicationController(oldRc, client, deploymentKey, cmdNamespace, out); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
// Update the existing replication controller with pointers to the 'next' controller
|
||||
// and adding the <deploymentKey> label if necessary to distinguish it from the 'next' controller.
|
||||
oldHash, err := api.HashObject(oldRc, client.Codec)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
oldRc, err = kubectl.UpdateExistingReplicationController(client, oldRc, cmdNamespace, newRc.Name, deploymentKey, oldHash, out)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
newName := newRc.Name
|
||||
if oldName == newName {
|
||||
if oldName == newRc.Name {
|
||||
return cmdutil.UsageError(cmd, "%s cannot have the same name as the existing ReplicationController %s",
|
||||
filename, oldName)
|
||||
}
|
||||
|
||||
updater := kubectl.NewRollingUpdater(newRc.Namespace, kubectl.NewRollingUpdaterClient(client))
|
||||
updater := kubectl.NewRollingUpdater(newRc.Namespace, updaterClient)
|
||||
|
||||
// To successfully pull off a rolling update the new and old rc have to differ
|
||||
// by at least one selector. Every new pod should have the selector and every
|
||||
@@ -284,124 +249,18 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg
|
||||
if keepOldName {
|
||||
fmt.Fprintf(out, "%s\n", oldName)
|
||||
} else {
|
||||
fmt.Fprintf(out, "%s\n", newName)
|
||||
fmt.Fprintf(out, "%s\n", newRc.Name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func hashObject(obj runtime.Object, codec runtime.Codec) (string, error) {
|
||||
data, err := codec.Encode(obj)
|
||||
if err != nil {
|
||||
return "", err
|
||||
func findNewName(args []string, oldRc *api.ReplicationController) string {
|
||||
if len(args) >= 2 {
|
||||
return args[1]
|
||||
}
|
||||
return fmt.Sprintf("%x", md5.Sum(data)), nil
|
||||
}
|
||||
|
||||
const MaxRetries = 3
|
||||
|
||||
type updateFunc func(controller *api.ReplicationController)
|
||||
|
||||
// updateWithRetries updates applies the given rc as an update.
|
||||
func updateWithRetries(rcClient client.ReplicationControllerInterface, rc *api.ReplicationController, applyUpdate updateFunc) (*api.ReplicationController, error) {
|
||||
// Each update could take ~100ms, so give it 0.5 second
|
||||
var err error
|
||||
oldRc := rc
|
||||
err = wait.Poll(10*time.Millisecond, 500*time.Millisecond, func() (bool, error) {
|
||||
// Apply the update, then attempt to push it to the apiserver.
|
||||
applyUpdate(rc)
|
||||
if rc, err = rcClient.Update(rc); err == nil {
|
||||
// rc contains the latest controller post update
|
||||
return true, nil
|
||||
}
|
||||
// Update the controller with the latest resource version, if the update failed we
|
||||
// can't trust rc so use oldRc.Name.
|
||||
if rc, err = rcClient.Get(oldRc.Name); err != nil {
|
||||
// The Get failed: Value in rc cannot be trusted.
|
||||
rc = oldRc
|
||||
}
|
||||
// The Get passed: rc contains the latest controller, expect a poll for the update.
|
||||
return false, nil
|
||||
})
|
||||
// If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
|
||||
// controller contains the applied update.
|
||||
return rc, err
|
||||
}
|
||||
|
||||
func addDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client *client.Client, deploymentKey, namespace string, out io.Writer) (*api.ReplicationController, error) {
|
||||
oldHash, err := hashObject(oldRc, client.Codec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// First, update the template label. This ensures that any newly created pods will have the new label
|
||||
if oldRc.Spec.Template.Labels == nil {
|
||||
oldRc.Spec.Template.Labels = map[string]string{}
|
||||
}
|
||||
if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) {
|
||||
rc.Spec.Template.Labels[deploymentKey] = oldHash
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Update all pods managed by the rc to have the new hash label, so they are correctly adopted
|
||||
// TODO: extract the code from the label command and re-use it here.
|
||||
podList, err := client.Pods(namespace).List(labels.SelectorFromSet(oldRc.Spec.Selector), fields.Everything())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for ix := range podList.Items {
|
||||
pod := &podList.Items[ix]
|
||||
if pod.Labels == nil {
|
||||
pod.Labels = map[string]string{
|
||||
deploymentKey: oldHash,
|
||||
}
|
||||
} else {
|
||||
pod.Labels[deploymentKey] = oldHash
|
||||
}
|
||||
err = nil
|
||||
delay := 3
|
||||
for i := 0; i < MaxRetries; i++ {
|
||||
_, err = client.Pods(namespace).Update(pod)
|
||||
if err != nil {
|
||||
fmt.Fprint(out, "Error updating pod (%v), retrying after %d seconds", err, delay)
|
||||
time.Sleep(time.Second * time.Duration(delay))
|
||||
delay *= delay
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if oldRc.Spec.Selector == nil {
|
||||
oldRc.Spec.Selector = map[string]string{}
|
||||
}
|
||||
// Copy the old selector, so that we can scrub out any orphaned pods
|
||||
selectorCopy := map[string]string{}
|
||||
for k, v := range oldRc.Spec.Selector {
|
||||
selectorCopy[k] = v
|
||||
}
|
||||
|
||||
// Update the selector of the rc so it manages all the pods we updated above
|
||||
if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) {
|
||||
rc.Spec.Selector[deploymentKey] = oldHash
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Clean up any orphaned pods that don't have the new label, this can happen if the rc manager
|
||||
// doesn't see the update to its pod template and creates a new pod with the old labels after
|
||||
// we've finished re-adopting existing pods to the rc.
|
||||
podList, err = client.Pods(namespace).List(labels.SelectorFromSet(selectorCopy), fields.Everything())
|
||||
for ix := range podList.Items {
|
||||
pod := &podList.Items[ix]
|
||||
if value, found := pod.Labels[deploymentKey]; !found || value != oldHash {
|
||||
if err := client.Pods(namespace).Delete(pod.Name, nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return oldRc, nil
|
||||
if oldRc != nil {
|
||||
newName, _ := kubectl.GetNextControllerAnnotation(oldRc)
|
||||
return newName
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
@@ -18,16 +18,7 @@ package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
func TestValidateArgs(t *testing.T) {
|
||||
@@ -98,192 +89,3 @@ func TestValidateArgs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddDeploymentHash(t *testing.T) {
|
||||
buf := &bytes.Buffer{}
|
||||
f, tf, codec := NewAPIFactory()
|
||||
rc := &api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{Name: "rc"},
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Selector: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
podList := &api.PodList{
|
||||
Items: []api.Pod{
|
||||
{ObjectMeta: api.ObjectMeta{Name: "foo"}},
|
||||
{ObjectMeta: api.ObjectMeta{Name: "bar"}},
|
||||
{ObjectMeta: api.ObjectMeta{Name: "baz"}},
|
||||
},
|
||||
}
|
||||
|
||||
seen := util.StringSet{}
|
||||
updatedRc := false
|
||||
tf.Client = &client.FakeRESTClient{
|
||||
Codec: codec,
|
||||
Client: client.HTTPClientFunc(func(req *http.Request) (*http.Response, error) {
|
||||
switch p, m := req.URL.Path, req.Method; {
|
||||
case p == "/api/v1beta3/namespaces/default/pods" && m == "GET":
|
||||
if req.URL.RawQuery != "labelSelector=foo%3Dbar" {
|
||||
t.Errorf("Unexpected query string: %s", req.URL.RawQuery)
|
||||
}
|
||||
return &http.Response{StatusCode: 200, Body: objBody(codec, podList)}, nil
|
||||
case p == "/api/v1beta3/namespaces/default/pods/foo" && m == "PUT":
|
||||
seen.Insert("foo")
|
||||
obj := readOrDie(t, req, codec)
|
||||
podList.Items[0] = *(obj.(*api.Pod))
|
||||
return &http.Response{StatusCode: 200, Body: objBody(codec, &podList.Items[0])}, nil
|
||||
case p == "/api/v1beta3/namespaces/default/pods/bar" && m == "PUT":
|
||||
seen.Insert("bar")
|
||||
obj := readOrDie(t, req, codec)
|
||||
podList.Items[1] = *(obj.(*api.Pod))
|
||||
return &http.Response{StatusCode: 200, Body: objBody(codec, &podList.Items[1])}, nil
|
||||
case p == "/api/v1beta3/namespaces/default/pods/baz" && m == "PUT":
|
||||
seen.Insert("baz")
|
||||
obj := readOrDie(t, req, codec)
|
||||
podList.Items[2] = *(obj.(*api.Pod))
|
||||
return &http.Response{StatusCode: 200, Body: objBody(codec, &podList.Items[2])}, nil
|
||||
case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "PUT":
|
||||
updatedRc = true
|
||||
return &http.Response{StatusCode: 200, Body: objBody(codec, rc)}, nil
|
||||
default:
|
||||
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
|
||||
return nil, nil
|
||||
}
|
||||
}),
|
||||
}
|
||||
tf.ClientConfig = &client.Config{Version: latest.Version}
|
||||
tf.Namespace = "test"
|
||||
|
||||
client, err := f.Client()
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
t.Fail()
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := addDeploymentKeyToReplicationController(rc, client, "hash", api.NamespaceDefault, buf); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
for _, pod := range podList.Items {
|
||||
if !seen.Has(pod.Name) {
|
||||
t.Errorf("Missing update for pod: %s", pod.Name)
|
||||
}
|
||||
}
|
||||
if !updatedRc {
|
||||
t.Errorf("Failed to update replication controller with new labels")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateWithRetries(t *testing.T) {
|
||||
f, tf, codec := NewAPIFactory()
|
||||
rc := &api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{Name: "rc",
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Selector: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Test end to end updating of the rc with retries. Essentially make sure the update handler
|
||||
// sees the right updates, failures in update/get are handled properly, and that the updated
|
||||
// rc with new resource version is returned to the caller. Without any of these rollingupdate
|
||||
// will fail cryptically.
|
||||
newRc := *rc
|
||||
newRc.ResourceVersion = "2"
|
||||
newRc.Spec.Selector["baz"] = "foobar"
|
||||
updates := []*http.Response{
|
||||
{StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})},
|
||||
{StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})},
|
||||
{StatusCode: 200, Body: objBody(codec, &newRc)},
|
||||
}
|
||||
gets := []*http.Response{
|
||||
{StatusCode: 500, Body: objBody(codec, &api.ReplicationController{})},
|
||||
{StatusCode: 200, Body: objBody(codec, rc)},
|
||||
}
|
||||
tf.Client = &client.FakeRESTClient{
|
||||
Codec: codec,
|
||||
Client: client.HTTPClientFunc(func(req *http.Request) (*http.Response, error) {
|
||||
switch p, m := req.URL.Path, req.Method; {
|
||||
case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "PUT":
|
||||
update := updates[0]
|
||||
updates = updates[1:]
|
||||
// We should always get an update with a valid rc even when the get fails. The rc should always
|
||||
// contain the update.
|
||||
if c, ok := readOrDie(t, req, codec).(*api.ReplicationController); !ok || !reflect.DeepEqual(rc, c) {
|
||||
t.Errorf("Unexpected update body, got %+v expected %+v", c, rc)
|
||||
} else if sel, ok := c.Spec.Selector["baz"]; !ok || sel != "foobar" {
|
||||
t.Errorf("Expected selector label update, got %+v", c.Spec.Selector)
|
||||
} else {
|
||||
delete(c.Spec.Selector, "baz")
|
||||
}
|
||||
return update, nil
|
||||
case p == "/api/v1beta3/namespaces/default/replicationcontrollers/rc" && m == "GET":
|
||||
get := gets[0]
|
||||
gets = gets[1:]
|
||||
return get, nil
|
||||
default:
|
||||
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
|
||||
return nil, nil
|
||||
}
|
||||
}),
|
||||
}
|
||||
tf.ClientConfig = &client.Config{Version: latest.Version}
|
||||
client, err := f.Client()
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
t.Fail()
|
||||
return
|
||||
}
|
||||
|
||||
if rc, err := updateWithRetries(
|
||||
client.ReplicationControllers("default"), rc, func(c *api.ReplicationController) {
|
||||
c.Spec.Selector["baz"] = "foobar"
|
||||
}); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
} else if sel, ok := rc.Spec.Selector["baz"]; !ok || sel != "foobar" || rc.ResourceVersion != "2" {
|
||||
t.Errorf("Expected updated rc, got %+v", rc)
|
||||
}
|
||||
if len(updates) != 0 || len(gets) != 0 {
|
||||
t.Errorf("Remaining updates %+v gets %+v", updates, gets)
|
||||
}
|
||||
}
|
||||
|
||||
func readOrDie(t *testing.T, req *http.Request, codec runtime.Codec) runtime.Object {
|
||||
data, err := ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
t.Errorf("Error reading: %v", err)
|
||||
t.FailNow()
|
||||
}
|
||||
obj, err := codec.Decode(data)
|
||||
if err != nil {
|
||||
t.Errorf("error decoding: %v", err)
|
||||
t.FailNow()
|
||||
}
|
||||
return obj
|
||||
}
|
||||
|
Reference in New Issue
Block a user