mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Implement hash collision avoidance mechanism
Signed-off-by: Michail Kargakis <mkargaki@redhat.com>
This commit is contained in:
parent
aeb2d9b9b4
commit
4a2c5eae92
@ -574,7 +574,6 @@ func (dc *DeploymentController) syncDeployment(key string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("Unable to retrieve deployment %v from store: %v", key, err))
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -140,7 +140,7 @@ func (dc *DeploymentController) rsAndPodsWithHashKeySynced(d *extensions.Deploym
|
|||||||
// Add pod-template-hash information if it's not in the RS.
|
// Add pod-template-hash information if it's not in the RS.
|
||||||
// Otherwise, new RS produced by Deployment will overlap with pre-existing ones
|
// Otherwise, new RS produced by Deployment will overlap with pre-existing ones
|
||||||
// that aren't constrained by the pod-template-hash.
|
// that aren't constrained by the pod-template-hash.
|
||||||
syncedRS, err := dc.addHashKeyToRSAndPods(rs, podMap[rs.UID])
|
syncedRS, err := dc.addHashKeyToRSAndPods(rs, podMap[rs.UID], d.Status.CollisionCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -153,12 +153,15 @@ func (dc *DeploymentController) rsAndPodsWithHashKeySynced(d *extensions.Deploym
|
|||||||
// 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created
|
// 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created
|
||||||
// 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas
|
// 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas
|
||||||
// 3. Add hash label to the rs's label and selector
|
// 3. Add hash label to the rs's label and selector
|
||||||
func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet, podList *v1.PodList) (*extensions.ReplicaSet, error) {
|
func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet, podList *v1.PodList, collisionCount *int64) (*extensions.ReplicaSet, error) {
|
||||||
// If the rs already has the new hash label in its selector, it's done syncing
|
// If the rs already has the new hash label in its selector, it's done syncing
|
||||||
if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
|
if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
|
||||||
return rs, nil
|
return rs, nil
|
||||||
}
|
}
|
||||||
hash := deploymentutil.GetReplicaSetHash(rs)
|
hash, err := deploymentutil.GetReplicaSetHash(rs, collisionCount)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
// 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label.
|
// 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label.
|
||||||
updatedRS, err := deploymentutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(rs.Namespace), dc.rsLister, rs.Namespace, rs.Name,
|
updatedRS, err := deploymentutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(rs.Namespace), dc.rsLister, rs.Namespace, rs.Name,
|
||||||
func(updated *extensions.ReplicaSet) error {
|
func(updated *extensions.ReplicaSet) error {
|
||||||
@ -224,8 +227,8 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet,
|
|||||||
// 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes.
|
// 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes.
|
||||||
// 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas.
|
// 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas.
|
||||||
// Note that the pod-template-hash will be added to adopted RSes and pods.
|
// Note that the pod-template-hash will be added to adopted RSes and pods.
|
||||||
func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployment, rsList, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) {
|
func (dc *DeploymentController) getNewReplicaSet(d *extensions.Deployment, rsList, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) {
|
||||||
existingNewRS, err := deploymentutil.FindNewReplicaSet(deployment, rsList)
|
existingNewRS, err := deploymentutil.FindNewReplicaSet(d, rsList)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -247,28 +250,28 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
|
|||||||
rsCopy := objCopy.(*extensions.ReplicaSet)
|
rsCopy := objCopy.(*extensions.ReplicaSet)
|
||||||
|
|
||||||
// Set existing new replica set's annotation
|
// Set existing new replica set's annotation
|
||||||
annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(deployment, rsCopy, newRevision, true)
|
annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, newRevision, true)
|
||||||
minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != deployment.Spec.MinReadySeconds
|
minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds
|
||||||
if annotationsUpdated || minReadySecondsNeedsUpdate {
|
if annotationsUpdated || minReadySecondsNeedsUpdate {
|
||||||
rsCopy.Spec.MinReadySeconds = deployment.Spec.MinReadySeconds
|
rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds
|
||||||
return dc.client.Extensions().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(rsCopy)
|
return dc.client.Extensions().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(rsCopy)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Should use the revision in existingNewRS's annotation, since it set by before
|
// Should use the revision in existingNewRS's annotation, since it set by before
|
||||||
updateConditions := deploymentutil.SetDeploymentRevision(deployment, rsCopy.Annotations[deploymentutil.RevisionAnnotation])
|
needsUpdate := deploymentutil.SetDeploymentRevision(d, rsCopy.Annotations[deploymentutil.RevisionAnnotation])
|
||||||
// If no other Progressing condition has been recorded and we need to estimate the progress
|
// If no other Progressing condition has been recorded and we need to estimate the progress
|
||||||
// of this deployment then it is likely that old users started caring about progress. In that
|
// of this deployment then it is likely that old users started caring about progress. In that
|
||||||
// case we need to take into account the first time we noticed their new replica set.
|
// case we need to take into account the first time we noticed their new replica set.
|
||||||
cond := deploymentutil.GetDeploymentCondition(deployment.Status, extensions.DeploymentProgressing)
|
cond := deploymentutil.GetDeploymentCondition(d.Status, extensions.DeploymentProgressing)
|
||||||
if deployment.Spec.ProgressDeadlineSeconds != nil && cond == nil {
|
if d.Spec.ProgressDeadlineSeconds != nil && cond == nil {
|
||||||
msg := fmt.Sprintf("Found new replica set %q", rsCopy.Name)
|
msg := fmt.Sprintf("Found new replica set %q", rsCopy.Name)
|
||||||
condition := deploymentutil.NewDeploymentCondition(extensions.DeploymentProgressing, v1.ConditionTrue, deploymentutil.FoundNewRSReason, msg)
|
condition := deploymentutil.NewDeploymentCondition(extensions.DeploymentProgressing, v1.ConditionTrue, deploymentutil.FoundNewRSReason, msg)
|
||||||
deploymentutil.SetDeploymentCondition(&deployment.Status, *condition)
|
deploymentutil.SetDeploymentCondition(&d.Status, *condition)
|
||||||
updateConditions = true
|
needsUpdate = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if updateConditions {
|
if needsUpdate {
|
||||||
if deployment, err = dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment); err != nil {
|
if d, err = dc.client.Extensions().Deployments(d.Namespace).UpdateStatus(d); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -280,72 +283,107 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
|
|||||||
}
|
}
|
||||||
|
|
||||||
// new ReplicaSet does not exist, create one.
|
// new ReplicaSet does not exist, create one.
|
||||||
namespace := deployment.Namespace
|
templateCopy, err := api.Scheme.DeepCopy(d.Spec.Template)
|
||||||
podTemplateSpecHash := fmt.Sprintf("%d", deploymentutil.GetPodTemplateSpecHash(deployment.Spec.Template))
|
if err != nil {
|
||||||
newRSTemplate := deploymentutil.GetNewReplicaSetTemplate(deployment)
|
return nil, err
|
||||||
newRSTemplate.Labels = labelsutil.CloneAndAddLabel(deployment.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
|
}
|
||||||
|
newRSTemplate := templateCopy.(v1.PodTemplateSpec)
|
||||||
|
podTemplateSpecHash := fmt.Sprintf("%d", deploymentutil.GetPodTemplateSpecHash(&newRSTemplate, d.Status.CollisionCount))
|
||||||
|
newRSTemplate.Labels = labelsutil.CloneAndAddLabel(d.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
|
||||||
// Add podTemplateHash label to selector.
|
// Add podTemplateHash label to selector.
|
||||||
newRSSelector := labelsutil.CloneSelectorAndAddLabel(deployment.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
|
newRSSelector := labelsutil.CloneSelectorAndAddLabel(d.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
|
||||||
|
|
||||||
// Create new ReplicaSet
|
// Create new ReplicaSet
|
||||||
newRS := extensions.ReplicaSet{
|
newRS := extensions.ReplicaSet{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
// Make the name deterministic, to ensure idempotence
|
// Make the name deterministic, to ensure idempotence
|
||||||
Name: deployment.Name + "-" + podTemplateSpecHash,
|
Name: d.Name + "-" + podTemplateSpecHash,
|
||||||
Namespace: namespace,
|
Namespace: d.Namespace,
|
||||||
OwnerReferences: []metav1.OwnerReference{*newControllerRef(deployment)},
|
OwnerReferences: []metav1.OwnerReference{*newControllerRef(d)},
|
||||||
},
|
},
|
||||||
Spec: extensions.ReplicaSetSpec{
|
Spec: extensions.ReplicaSetSpec{
|
||||||
Replicas: new(int32),
|
Replicas: new(int32),
|
||||||
MinReadySeconds: deployment.Spec.MinReadySeconds,
|
MinReadySeconds: d.Spec.MinReadySeconds,
|
||||||
Selector: newRSSelector,
|
Selector: newRSSelector,
|
||||||
Template: newRSTemplate,
|
Template: newRSTemplate,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
allRSs := append(oldRSs, &newRS)
|
allRSs := append(oldRSs, &newRS)
|
||||||
newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, &newRS)
|
newReplicasCount, err := deploymentutil.NewRSNewReplicas(d, allRSs, &newRS)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
*(newRS.Spec.Replicas) = newReplicasCount
|
*(newRS.Spec.Replicas) = newReplicasCount
|
||||||
// Set new replica set's annotation
|
// Set new replica set's annotation
|
||||||
deploymentutil.SetNewReplicaSetAnnotations(deployment, &newRS, newRevision, false)
|
deploymentutil.SetNewReplicaSetAnnotations(d, &newRS, newRevision, false)
|
||||||
createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS)
|
// Create the new ReplicaSet. If it already exists, then we need to check for possible
|
||||||
|
// hash collisions. If there is any other error, we need to report it in the status of
|
||||||
|
// the Deployment.
|
||||||
|
alreadyExists := false
|
||||||
|
createdRS, err := dc.client.Extensions().ReplicaSets(d.Namespace).Create(&newRS)
|
||||||
switch {
|
switch {
|
||||||
// We may end up hitting this due to a slow cache or a fast resync of the deployment.
|
// We may end up hitting this due to a slow cache or a fast resync of the Deployment.
|
||||||
// TODO: Restore once https://github.com/kubernetes/kubernetes/issues/29735 is fixed
|
// Fetch a copy of the ReplicaSet. If its PodTemplateSpec is semantically deep equal
|
||||||
// ie. we start using a new hashing algorithm.
|
// with the PodTemplateSpec of the Deployment, then that is our new ReplicaSet. Otherwise,
|
||||||
|
// this is a hash collision and we need to increment the collisionCount field in the
|
||||||
|
// status of the Deployment and try the creation again.
|
||||||
case errors.IsAlreadyExists(err):
|
case errors.IsAlreadyExists(err):
|
||||||
return nil, err
|
alreadyExists = true
|
||||||
// return dc.rsLister.ReplicaSets(namespace).Get(newRS.Name)
|
rs, rsErr := dc.rsLister.ReplicaSets(newRS.Namespace).Get(newRS.Name)
|
||||||
|
if rsErr != nil {
|
||||||
|
return nil, rsErr
|
||||||
|
}
|
||||||
|
isEqual, equalErr := deploymentutil.EqualIgnoreHash(&d.Spec.Template, &rs.Spec.Template)
|
||||||
|
if equalErr != nil {
|
||||||
|
return nil, equalErr
|
||||||
|
}
|
||||||
|
// Matching ReplicaSet is not equal - increment the collisionCount in the DeploymentStatus
|
||||||
|
// and requeue the Deployment.
|
||||||
|
if !isEqual {
|
||||||
|
if d.Status.CollisionCount == nil {
|
||||||
|
d.Status.CollisionCount = new(int64)
|
||||||
|
}
|
||||||
|
preCollisionCount := *d.Status.CollisionCount
|
||||||
|
*d.Status.CollisionCount++
|
||||||
|
// Update the collisionCount for the Deployment and let it requeue by returning the original
|
||||||
|
// error.
|
||||||
|
_, dErr := dc.client.Extensions().Deployments(d.Namespace).UpdateStatus(d)
|
||||||
|
if dErr == nil {
|
||||||
|
glog.V(2).Infof("Found a hash collision for deployment %q - bumping collisionCount (%d->%d) to resolve it", d.Name, preCollisionCount, *d.Status.CollisionCount)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Pass through the matching ReplicaSet as the new ReplicaSet.
|
||||||
|
createdRS = rs
|
||||||
|
err = nil
|
||||||
case err != nil:
|
case err != nil:
|
||||||
msg := fmt.Sprintf("Failed to create new replica set %q: %v", newRS.Name, err)
|
msg := fmt.Sprintf("Failed to create new replica set %q: %v", newRS.Name, err)
|
||||||
if deployment.Spec.ProgressDeadlineSeconds != nil {
|
if d.Spec.ProgressDeadlineSeconds != nil {
|
||||||
cond := deploymentutil.NewDeploymentCondition(extensions.DeploymentProgressing, v1.ConditionFalse, deploymentutil.FailedRSCreateReason, msg)
|
cond := deploymentutil.NewDeploymentCondition(extensions.DeploymentProgressing, v1.ConditionFalse, deploymentutil.FailedRSCreateReason, msg)
|
||||||
deploymentutil.SetDeploymentCondition(&deployment.Status, *cond)
|
deploymentutil.SetDeploymentCondition(&d.Status, *cond)
|
||||||
// We don't really care about this error at this point, since we have a bigger issue to report.
|
// We don't really care about this error at this point, since we have a bigger issue to report.
|
||||||
// TODO: Update the rest of the Deployment status, too. We may need to do this every time we
|
|
||||||
// error out in all other places in the controller so that we let users know that their deployments
|
|
||||||
// have been noticed by the controller, albeit with errors.
|
|
||||||
// TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account
|
// TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account
|
||||||
// these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568
|
// these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568
|
||||||
_, _ = dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).UpdateStatus(deployment)
|
_, _ = dc.client.Extensions().Deployments(d.Namespace).UpdateStatus(d)
|
||||||
}
|
}
|
||||||
dc.eventRecorder.Eventf(deployment, v1.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg)
|
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if newReplicasCount > 0 {
|
if !alreadyExists && newReplicasCount > 0 {
|
||||||
dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled up replica set %s to %d", createdRS.Name, newReplicasCount)
|
dc.eventRecorder.Eventf(d, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled up replica set %s to %d", createdRS.Name, newReplicasCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
deploymentutil.SetDeploymentRevision(deployment, newRevision)
|
needsUpdate := deploymentutil.SetDeploymentRevision(d, newRevision)
|
||||||
if deployment.Spec.ProgressDeadlineSeconds != nil {
|
if !alreadyExists && d.Spec.ProgressDeadlineSeconds != nil {
|
||||||
msg := fmt.Sprintf("Created new replica set %q", createdRS.Name)
|
msg := fmt.Sprintf("Created new replica set %q", createdRS.Name)
|
||||||
condition := deploymentutil.NewDeploymentCondition(extensions.DeploymentProgressing, v1.ConditionTrue, deploymentutil.NewReplicaSetReason, msg)
|
condition := deploymentutil.NewDeploymentCondition(extensions.DeploymentProgressing, v1.ConditionTrue, deploymentutil.NewReplicaSetReason, msg)
|
||||||
deploymentutil.SetDeploymentCondition(&deployment.Status, *condition)
|
deploymentutil.SetDeploymentCondition(&d.Status, *condition)
|
||||||
|
needsUpdate = true
|
||||||
|
}
|
||||||
|
if needsUpdate {
|
||||||
|
_, err = dc.client.Extensions().Deployments(d.Namespace).UpdateStatus(d)
|
||||||
}
|
}
|
||||||
_, err = dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment)
|
|
||||||
return createdRS, err
|
return createdRS, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -564,6 +602,7 @@ func calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaS
|
|||||||
ReadyReplicas: deploymentutil.GetReadyReplicaCountForReplicaSets(allRSs),
|
ReadyReplicas: deploymentutil.GetReadyReplicaCountForReplicaSets(allRSs),
|
||||||
AvailableReplicas: availableReplicas,
|
AvailableReplicas: availableReplicas,
|
||||||
UnavailableReplicas: unavailableReplicas,
|
UnavailableReplicas: unavailableReplicas,
|
||||||
|
CollisionCount: deployment.Status.CollisionCount,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy conditions one by one so we won't mutate the original object.
|
// Copy conditions one by one so we won't mutate the original object.
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
core "k8s.io/client-go/testing"
|
core "k8s.io/client-go/testing"
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||||
@ -183,7 +184,8 @@ func newDControllerRef(d *extensions.Deployment) *metav1.OwnerReference {
|
|||||||
|
|
||||||
// generateRS creates a replica set, with the input deployment's template as its template
|
// generateRS creates a replica set, with the input deployment's template as its template
|
||||||
func generateRS(deployment extensions.Deployment) extensions.ReplicaSet {
|
func generateRS(deployment extensions.Deployment) extensions.ReplicaSet {
|
||||||
template := GetNewReplicaSetTemplate(&deployment)
|
cp, _ := api.Scheme.DeepCopy(deployment.Spec.Template)
|
||||||
|
template := cp.(v1.PodTemplateSpec)
|
||||||
return extensions.ReplicaSet{
|
return extensions.ReplicaSet{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
UID: randomUID(),
|
UID: randomUID(),
|
||||||
@ -192,7 +194,7 @@ func generateRS(deployment extensions.Deployment) extensions.ReplicaSet {
|
|||||||
OwnerReferences: []metav1.OwnerReference{*newDControllerRef(&deployment)},
|
OwnerReferences: []metav1.OwnerReference{*newDControllerRef(&deployment)},
|
||||||
},
|
},
|
||||||
Spec: extensions.ReplicaSetSpec{
|
Spec: extensions.ReplicaSetSpec{
|
||||||
Replicas: func() *int32 { i := int32(0); return &i }(),
|
Replicas: new(int32),
|
||||||
Template: template,
|
Template: template,
|
||||||
Selector: &metav1.LabelSelector{MatchLabels: template.Labels},
|
Selector: &metav1.LabelSelector{MatchLabels: template.Labels},
|
||||||
},
|
},
|
||||||
|
@ -110,7 +110,7 @@ func TestPodTemplateSpecHash(t *testing.T) {
|
|||||||
specJson := strings.Replace(podSpec, "@@VERSION@@", strconv.Itoa(i), 1)
|
specJson := strings.Replace(podSpec, "@@VERSION@@", strconv.Itoa(i), 1)
|
||||||
spec := v1.PodTemplateSpec{}
|
spec := v1.PodTemplateSpec{}
|
||||||
json.Unmarshal([]byte(specJson), &spec)
|
json.Unmarshal([]byte(specJson), &spec)
|
||||||
hash := GetPodTemplateSpecHash(spec)
|
hash := GetPodTemplateSpecHash(&spec, nil)
|
||||||
if v, ok := seenHashes[hash]; ok {
|
if v, ok := seenHashes[hash]; ok {
|
||||||
t.Errorf("Hash collision, old: %d new: %d", v, i)
|
t.Errorf("Hash collision, old: %d new: %d", v, i)
|
||||||
break
|
break
|
||||||
@ -139,6 +139,6 @@ func BenchmarkFnv(b *testing.B) {
|
|||||||
json.Unmarshal([]byte(podSpec), &spec)
|
json.Unmarshal([]byte(podSpec), &spec)
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
GetPodTemplateSpecHash(spec)
|
GetPodTemplateSpecHash(&spec, nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package util
|
package util
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/binary"
|
||||||
"hash/fnv"
|
"hash/fnv"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
@ -30,9 +31,17 @@ import (
|
|||||||
hashutil "k8s.io/kubernetes/pkg/util/hash"
|
hashutil "k8s.io/kubernetes/pkg/util/hash"
|
||||||
)
|
)
|
||||||
|
|
||||||
func GetPodTemplateSpecHash(template v1.PodTemplateSpec) uint32 {
|
func GetPodTemplateSpecHash(template *v1.PodTemplateSpec, uniquifier *int64) uint32 {
|
||||||
podTemplateSpecHasher := fnv.New32a()
|
podTemplateSpecHasher := fnv.New32a()
|
||||||
hashutil.DeepHashObject(podTemplateSpecHasher, template)
|
hashutil.DeepHashObject(podTemplateSpecHasher, *template)
|
||||||
|
|
||||||
|
// Add uniquifier in the hash if it exists.
|
||||||
|
if uniquifier != nil {
|
||||||
|
uniquifierBytes := make([]byte, 8)
|
||||||
|
binary.LittleEndian.PutUint64(uniquifierBytes, uint64(*uniquifier))
|
||||||
|
podTemplateSpecHasher.Write(uniquifierBytes)
|
||||||
|
}
|
||||||
|
|
||||||
return podTemplateSpecHasher.Sum32()
|
return podTemplateSpecHasher.Sum32()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
59
pkg/controller/deployment/util/pod_util_test.go
Normal file
59
pkg/controller/deployment/util/pod_util_test.go
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
func int64P(num int64) *int64 {
|
||||||
|
return &num
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetPodTemplateSpecHash(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
template *v1.PodTemplateSpec
|
||||||
|
collisionCount *int64
|
||||||
|
otherCollisionCount *int64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "simple",
|
||||||
|
template: &v1.PodTemplateSpec{},
|
||||||
|
collisionCount: int64P(1),
|
||||||
|
otherCollisionCount: int64P(2),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "using math.MaxInt64",
|
||||||
|
template: &v1.PodTemplateSpec{},
|
||||||
|
collisionCount: nil,
|
||||||
|
otherCollisionCount: int64P(int64(math.MaxInt64)),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
hash := GetPodTemplateSpecHash(test.template, test.collisionCount)
|
||||||
|
otherHash := GetPodTemplateSpecHash(test.template, test.otherCollisionCount)
|
||||||
|
|
||||||
|
if hash == otherHash {
|
||||||
|
t.Errorf("expected different hashes but got the same: %d", hash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -69,11 +69,12 @@ func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rsL
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetReplicaSetHash returns the pod template hash of a ReplicaSet's pod template space
|
// GetReplicaSetHash returns the pod template hash of a ReplicaSet's pod template space
|
||||||
func GetReplicaSetHash(rs *extensions.ReplicaSet) string {
|
func GetReplicaSetHash(rs *extensions.ReplicaSet, uniquifier *int64) (string, error) {
|
||||||
meta := rs.Spec.Template.ObjectMeta
|
template, err := api.Scheme.DeepCopy(rs.Spec.Template)
|
||||||
meta.Labels = labelsutil.CloneAndRemoveLabel(meta.Labels, extensions.DefaultDeploymentUniqueLabelKey)
|
if err != nil {
|
||||||
return fmt.Sprintf("%d", GetPodTemplateSpecHash(v1.PodTemplateSpec{
|
return "", err
|
||||||
ObjectMeta: meta,
|
}
|
||||||
Spec: rs.Spec.Template.Spec,
|
rsTemplate := template.(v1.PodTemplateSpec)
|
||||||
}))
|
rsTemplate.Labels = labelsutil.CloneAndRemoveLabel(rsTemplate.Labels, extensions.DefaultDeploymentUniqueLabelKey)
|
||||||
|
return fmt.Sprintf("%d", GetPodTemplateSpecHash(&rsTemplate, uniquifier)), nil
|
||||||
}
|
}
|
||||||
|
@ -21,8 +21,10 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/davecgh/go-spew/spew"
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
@ -108,6 +110,9 @@ var _ = framework.KubeDescribe("Deployment", func() {
|
|||||||
It("test Deployment ReplicaSet orphaning and adoption regarding controllerRef", func() {
|
It("test Deployment ReplicaSet orphaning and adoption regarding controllerRef", func() {
|
||||||
testDeploymentsControllerRef(f)
|
testDeploymentsControllerRef(f)
|
||||||
})
|
})
|
||||||
|
It("deployment can avoid hash collisions", func() {
|
||||||
|
testDeploymentHashCollisionAvoidance(f)
|
||||||
|
})
|
||||||
// TODO: add tests that cover deployment.Spec.MinReadySeconds once we solved clock-skew issues
|
// TODO: add tests that cover deployment.Spec.MinReadySeconds once we solved clock-skew issues
|
||||||
// See https://github.com/kubernetes/kubernetes/issues/29229
|
// See https://github.com/kubernetes/kubernetes/issues/29229
|
||||||
})
|
})
|
||||||
@ -1359,3 +1364,47 @@ func orphanDeploymentReplicaSets(c clientset.Interface, d *extensions.Deployment
|
|||||||
deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(d.UID))
|
deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(d.UID))
|
||||||
return c.Extensions().Deployments(d.Namespace).Delete(d.Name, deleteOptions)
|
return c.Extensions().Deployments(d.Namespace).Delete(d.Name, deleteOptions)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testDeploymentHashCollisionAvoidance(f *framework.Framework) {
|
||||||
|
ns := f.Namespace.Name
|
||||||
|
c := f.ClientSet
|
||||||
|
|
||||||
|
deploymentName := "test-hash-collision"
|
||||||
|
framework.Logf("Creating Deployment %q", deploymentName)
|
||||||
|
podLabels := map[string]string{"name": nginxImageName}
|
||||||
|
d := framework.NewDeployment(deploymentName, int32(0), podLabels, nginxImageName, nginxImage, extensions.RollingUpdateDeploymentStrategyType)
|
||||||
|
deployment, err := c.Extensions().Deployments(ns).Create(d)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
err = framework.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, "1", nginxImage)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
// TODO: Switch this to do a non-cascading deletion of the Deployment, mutate the ReplicaSet
|
||||||
|
// once it has no owner reference, then recreate the Deployment if we ever proceed with
|
||||||
|
// https://github.com/kubernetes/kubernetes/issues/44237
|
||||||
|
framework.Logf("Mock a hash collision")
|
||||||
|
newRS, err := deploymentutil.GetNewReplicaSet(deployment, c)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
var nilRs *extensions.ReplicaSet
|
||||||
|
Expect(newRS).NotTo(Equal(nilRs))
|
||||||
|
_, err = framework.UpdateReplicaSetWithRetries(c, ns, newRS.Name, func(update *extensions.ReplicaSet) {
|
||||||
|
*update.Spec.Template.Spec.TerminationGracePeriodSeconds = int64(5)
|
||||||
|
})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
framework.Logf("Expect deployment collision counter to increment")
|
||||||
|
if err := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
|
||||||
|
d, err := c.Extensions().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
framework.Logf("cannot get deployment %q: %v", deploymentName, err)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
framework.Logf(spew.Sprintf("deployment status: %#v", d.Status))
|
||||||
|
return d.Status.CollisionCount != nil && *d.Status.CollisionCount == int64(1), nil
|
||||||
|
}); err != nil {
|
||||||
|
framework.Failf("Failed to increment collision counter for deployment %q: %v", deploymentName, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
framework.Logf("Expect a new ReplicaSet to be created")
|
||||||
|
err = framework.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, "2", nginxImage)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
}
|
||||||
|
@ -3348,9 +3348,10 @@ func WatchRecreateDeployment(c clientset.Interface, d *extensions.Deployment) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
// WaitForDeploymentRevisionAndImage waits for the deployment's and its new RS's revision and container image to match the given revision and image.
|
// WaitForDeploymentRevisionAndImage waits for the deployment's and its new RS's revision and container image to match the given revision and image.
|
||||||
// Note that deployment revision and its new RS revision should be updated shortly, so we only wait for 1 minute here to fail early.
|
// Note that deployment revision and its new RS revision should be updated shortly most of the time, but an overwhelmed RS controller
|
||||||
|
// may result in taking longer to relabel a RS.
|
||||||
func WaitForDeploymentRevisionAndImage(c clientset.Interface, ns, deploymentName string, revision, image string) error {
|
func WaitForDeploymentRevisionAndImage(c clientset.Interface, ns, deploymentName string, revision, image string) error {
|
||||||
return testutil.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, revision, image, Logf, Poll, pollShortTimeout)
|
return testutil.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, revision, image, Logf, Poll, pollLongTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CheckNewRSAnnotations check if the new RS's annotation is as expected
|
// CheckNewRSAnnotations check if the new RS's annotation is as expected
|
||||||
@ -3486,16 +3487,17 @@ func WaitForPartialEvents(c clientset.Interface, ns string, objOrRef runtime.Obj
|
|||||||
|
|
||||||
type updateDeploymentFunc func(d *extensions.Deployment)
|
type updateDeploymentFunc func(d *extensions.Deployment)
|
||||||
|
|
||||||
func UpdateDeploymentWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateDeploymentFunc) (deployment *extensions.Deployment, err error) {
|
func UpdateDeploymentWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateDeploymentFunc) (*extensions.Deployment, error) {
|
||||||
deployments := c.Extensions().Deployments(namespace)
|
var deployment *extensions.Deployment
|
||||||
var updateErr error
|
var updateErr error
|
||||||
pollErr := wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
|
pollErr := wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
|
||||||
if deployment, err = deployments.Get(name, metav1.GetOptions{}); err != nil {
|
var err error
|
||||||
|
if deployment, err = c.Extensions().Deployments(namespace).Get(name, metav1.GetOptions{}); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
// Apply the update, then attempt to push it to the apiserver.
|
// Apply the update, then attempt to push it to the apiserver.
|
||||||
applyUpdate(deployment)
|
applyUpdate(deployment)
|
||||||
if deployment, err = deployments.Update(deployment); err == nil {
|
if deployment, err = c.Extensions().Deployments(namespace).Update(deployment); err == nil {
|
||||||
Logf("Updating deployment %s", name)
|
Logf("Updating deployment %s", name)
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
@ -3513,7 +3515,7 @@ type updateRsFunc func(d *extensions.ReplicaSet)
|
|||||||
func UpdateReplicaSetWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateRsFunc) (*extensions.ReplicaSet, error) {
|
func UpdateReplicaSetWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateRsFunc) (*extensions.ReplicaSet, error) {
|
||||||
var rs *extensions.ReplicaSet
|
var rs *extensions.ReplicaSet
|
||||||
var updateErr error
|
var updateErr error
|
||||||
pollErr := wait.PollImmediate(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
|
pollErr := wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
|
||||||
var err error
|
var err error
|
||||||
if rs, err = c.Extensions().ReplicaSets(namespace).Get(name, metav1.GetOptions{}); err != nil {
|
if rs, err = c.Extensions().ReplicaSets(namespace).Get(name, metav1.GetOptions{}); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
|
Loading…
Reference in New Issue
Block a user