mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
commit
0155d18fbc
@ -592,7 +592,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
}
|
||||
}
|
||||
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
|
||||
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)
|
||||
mirrorPodClient := kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
|
||||
klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient, secretManager, configMapManager, checkpointManager)
|
||||
|
||||
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
|
||||
|
||||
|
@ -24,9 +24,9 @@ import (
|
||||
"path/filepath"
|
||||
|
||||
cadvisorapiv1 "github.com/google/cadvisor/info/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
|
@ -45,6 +45,10 @@ go_test(
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/require:go_default_library",
|
||||
"//vendor/k8s.io/utils/pointer:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -17,8 +17,10 @@ limitations under the License.
|
||||
package pod
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"fmt"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
@ -39,16 +41,28 @@ type MirrorClient interface {
|
||||
DeleteMirrorPod(podFullName string, uid *types.UID) (bool, error)
|
||||
}
|
||||
|
||||
// nodeGetter is a subset a NodeLister, simplified for testing.
|
||||
type nodeGetter interface {
|
||||
// Get retrieves the Node for a given name.
|
||||
Get(name string) (*v1.Node, error)
|
||||
}
|
||||
|
||||
// basicMirrorClient is a functional MirrorClient. Mirror pods are stored in
|
||||
// the kubelet directly because they need to be in sync with the internal
|
||||
// pods.
|
||||
type basicMirrorClient struct {
|
||||
apiserverClient clientset.Interface
|
||||
nodeGetter nodeGetter
|
||||
nodeName string
|
||||
}
|
||||
|
||||
// NewBasicMirrorClient returns a new MirrorClient.
|
||||
func NewBasicMirrorClient(apiserverClient clientset.Interface) MirrorClient {
|
||||
return &basicMirrorClient{apiserverClient: apiserverClient}
|
||||
func NewBasicMirrorClient(apiserverClient clientset.Interface, nodeName string, nodeGetter nodeGetter) MirrorClient {
|
||||
return &basicMirrorClient{
|
||||
apiserverClient: apiserverClient,
|
||||
nodeName: nodeName,
|
||||
nodeGetter: nodeGetter,
|
||||
}
|
||||
}
|
||||
|
||||
func (mc *basicMirrorClient) CreateMirrorPod(pod *v1.Pod) error {
|
||||
@ -64,8 +78,25 @@ func (mc *basicMirrorClient) CreateMirrorPod(pod *v1.Pod) error {
|
||||
}
|
||||
hash := getPodHash(pod)
|
||||
copyPod.Annotations[kubetypes.ConfigMirrorAnnotationKey] = hash
|
||||
|
||||
// With the MirrorPodNodeRestriction feature, mirror pods are required to have an owner reference
|
||||
// to the owning node.
|
||||
// See http://git.k8s.io/enhancements/keps/sig-auth/20190916-noderestriction-pods.md
|
||||
nodeUID, err := mc.getNodeUID()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get node UID: %v", err)
|
||||
}
|
||||
controller := true
|
||||
copyPod.OwnerReferences = []metav1.OwnerReference{{
|
||||
APIVersion: v1.SchemeGroupVersion.String(),
|
||||
Kind: "Node",
|
||||
Name: mc.nodeName,
|
||||
UID: nodeUID,
|
||||
Controller: &controller,
|
||||
}}
|
||||
|
||||
apiPod, err := mc.apiserverClient.CoreV1().Pods(copyPod.Namespace).Create(©Pod)
|
||||
if err != nil && errors.IsAlreadyExists(err) {
|
||||
if err != nil && apierrors.IsAlreadyExists(err) {
|
||||
// Check if the existing pod is the same as the pod we want to create.
|
||||
if h, ok := apiPod.Annotations[kubetypes.ConfigMirrorAnnotationKey]; ok && h == hash {
|
||||
return nil
|
||||
@ -94,7 +125,7 @@ func (mc *basicMirrorClient) DeleteMirrorPod(podFullName string, uid *types.UID)
|
||||
var GracePeriodSeconds int64
|
||||
if err := mc.apiserverClient.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &GracePeriodSeconds, Preconditions: &metav1.Preconditions{UID: uid}}); err != nil {
|
||||
// Unfortunately, there's no generic error for failing a precondition
|
||||
if !(errors.IsNotFound(err) || errors.IsConflict(err)) {
|
||||
if !(apierrors.IsNotFound(err) || apierrors.IsConflict(err)) {
|
||||
// We should return the error here, but historically this routine does
|
||||
// not return an error unless it can't parse the pod name
|
||||
klog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err)
|
||||
@ -104,6 +135,17 @@ func (mc *basicMirrorClient) DeleteMirrorPod(podFullName string, uid *types.UID)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (mc *basicMirrorClient) getNodeUID() (types.UID, error) {
|
||||
node, err := mc.nodeGetter.Get(mc.nodeName)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if node.UID == "" {
|
||||
return "", fmt.Errorf("UID unset for node %s", mc.nodeName)
|
||||
}
|
||||
return node.UID, nil
|
||||
}
|
||||
|
||||
// IsStaticPod returns true if the passed Pod is static.
|
||||
func IsStaticPod(pod *v1.Pod) bool {
|
||||
source, err := kubetypes.GetPodSource(pod)
|
||||
|
@ -17,9 +17,18 @@ limitations under the License.
|
||||
package pod
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
func TestParsePodFullName(t *testing.T) {
|
||||
@ -52,3 +61,97 @@ func TestParsePodFullName(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateMirrorPod(t *testing.T) {
|
||||
const (
|
||||
testNodeName = "test-node-name"
|
||||
testNodeUID = types.UID("test-node-uid-1234")
|
||||
testPodName = "test-pod-name"
|
||||
testPodNS = "test-pod-ns"
|
||||
testPodHash = "123456789"
|
||||
)
|
||||
testcases := []struct {
|
||||
desc string
|
||||
node *v1.Node
|
||||
nodeErr error
|
||||
expectSuccess bool
|
||||
}{{
|
||||
desc: "cannot get node",
|
||||
nodeErr: errors.New("expected: cannot get node"),
|
||||
expectSuccess: false,
|
||||
}, {
|
||||
desc: "node missing UID",
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: testNodeName,
|
||||
},
|
||||
},
|
||||
expectSuccess: false,
|
||||
}, {
|
||||
desc: "successfully fetched node",
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: testNodeName,
|
||||
UID: testNodeUID,
|
||||
},
|
||||
},
|
||||
expectSuccess: true,
|
||||
}}
|
||||
|
||||
for _, test := range testcases {
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
clientset := fake.NewSimpleClientset()
|
||||
nodeGetter := &fakeNodeGetter{
|
||||
t: t,
|
||||
expectNodeName: testNodeName,
|
||||
node: test.node,
|
||||
err: test.nodeErr,
|
||||
}
|
||||
mc := NewBasicMirrorClient(clientset, testNodeName, nodeGetter)
|
||||
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: testPodName,
|
||||
Namespace: testPodNS,
|
||||
Annotations: map[string]string{
|
||||
kubetypes.ConfigHashAnnotationKey: testPodHash,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err := mc.CreateMirrorPod(pod)
|
||||
if !test.expectSuccess {
|
||||
assert.Error(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
createdPod, err := clientset.CoreV1().Pods(testPodNS).Get(testPodName, metav1.GetOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Validate created pod
|
||||
assert.Equal(t, testPodHash, createdPod.Annotations[kubetypes.ConfigMirrorAnnotationKey])
|
||||
assert.Len(t, createdPod.OwnerReferences, 1)
|
||||
expectedOwnerRef := metav1.OwnerReference{
|
||||
APIVersion: "v1",
|
||||
Kind: "Node",
|
||||
Name: testNodeName,
|
||||
UID: testNodeUID,
|
||||
Controller: pointer.BoolPtr(true),
|
||||
}
|
||||
assert.Equal(t, expectedOwnerRef, createdPod.OwnerReferences[0])
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type fakeNodeGetter struct {
|
||||
t *testing.T
|
||||
expectNodeName string
|
||||
|
||||
node *v1.Node
|
||||
err error
|
||||
}
|
||||
|
||||
func (f *fakeNodeGetter) Get(nodeName string) (*v1.Node, error) {
|
||||
require.Equal(f.t, f.expectNodeName, nodeName)
|
||||
return f.node, f.err
|
||||
}
|
||||
|
@ -201,6 +201,7 @@ go_test(
|
||||
"//vendor/github.com/blang/semver:go_default_library",
|
||||
"//vendor/github.com/coreos/go-systemd/util:go_default_library",
|
||||
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
|
||||
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
|
||||
"//vendor/github.com/onsi/ginkgo:go_default_library",
|
||||
"//vendor/github.com/onsi/gomega:go_default_library",
|
||||
"//vendor/github.com/onsi/gomega/gstruct:go_default_library",
|
||||
|
@ -23,17 +23,20 @@ import (
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/onsi/ginkgo"
|
||||
"github.com/onsi/gomega"
|
||||
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||
)
|
||||
|
||||
var _ = framework.KubeDescribe("MirrorPod", func() {
|
||||
@ -188,7 +191,7 @@ func checkMirrorPodRunning(cl clientset.Interface, name, namespace string) error
|
||||
if pod.Status.Phase != v1.PodRunning {
|
||||
return fmt.Errorf("expected the mirror pod %q to be running, got %q", name, pod.Status.Phase)
|
||||
}
|
||||
return nil
|
||||
return validateMirrorPod(cl, pod)
|
||||
}
|
||||
|
||||
func checkMirrorPodRecreatedAndRunning(cl clientset.Interface, name, namespace string, oUID types.UID) error {
|
||||
@ -202,5 +205,49 @@ func checkMirrorPodRecreatedAndRunning(cl clientset.Interface, name, namespace s
|
||||
if pod.Status.Phase != v1.PodRunning {
|
||||
return fmt.Errorf("expected the mirror pod %q to be running, got %q", name, pod.Status.Phase)
|
||||
}
|
||||
return validateMirrorPod(cl, pod)
|
||||
}
|
||||
|
||||
func validateMirrorPod(cl clientset.Interface, mirrorPod *v1.Pod) error {
|
||||
hash, ok := mirrorPod.Annotations[kubetypes.ConfigHashAnnotationKey]
|
||||
if !ok || hash == "" {
|
||||
return fmt.Errorf("expected mirror pod %q to have a hash annotation", mirrorPod.Name)
|
||||
}
|
||||
mirrorHash, ok := mirrorPod.Annotations[kubetypes.ConfigMirrorAnnotationKey]
|
||||
if !ok || mirrorHash == "" {
|
||||
return fmt.Errorf("expected mirror pod %q to have a mirror pod annotation", mirrorPod.Name)
|
||||
}
|
||||
if hash != mirrorHash {
|
||||
return fmt.Errorf("expected mirror pod %q to have a matching mirror pod hash: got %q; expected %q", mirrorPod.Name, mirrorHash, hash)
|
||||
}
|
||||
source, ok := mirrorPod.Annotations[kubetypes.ConfigSourceAnnotationKey]
|
||||
if !ok {
|
||||
return fmt.Errorf("expected mirror pod %q to have a source annotation", mirrorPod.Name)
|
||||
}
|
||||
if source == kubetypes.ApiserverSource {
|
||||
return fmt.Errorf("expected mirror pod %q source to not be 'api'; got: %q", mirrorPod.Name, source)
|
||||
}
|
||||
|
||||
if len(mirrorPod.OwnerReferences) != 1 {
|
||||
return fmt.Errorf("expected mirror pod %q to have a single owner reference: got %d", mirrorPod.Name, len(mirrorPod.OwnerReferences))
|
||||
}
|
||||
node, err := cl.CoreV1().Nodes().Get(framework.TestContext.NodeName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch test node: %v", err)
|
||||
}
|
||||
|
||||
controller := true
|
||||
expectedOwnerRef := metav1.OwnerReference{
|
||||
APIVersion: "v1",
|
||||
Kind: "Node",
|
||||
Name: framework.TestContext.NodeName,
|
||||
UID: node.UID,
|
||||
Controller: &controller,
|
||||
}
|
||||
ref := mirrorPod.OwnerReferences[0]
|
||||
if !apiequality.Semantic.DeepEqual(ref, expectedOwnerRef) {
|
||||
return fmt.Errorf("unexpected mirror pod %q owner ref: %v", mirrorPod.Name, cmp.Diff(expectedOwnerRef, ref))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user