mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 20:53:33 +00:00
Merge pull request #43996 from ncdc/proxy-shared-informers
Automatic merge from submit-queue Use shared informers for proxy endpoints and service configs Use shared informers instead of creating local controllers/reflectors for the proxy's endpoints and service configs. This allows downstream integrators to pass in preexisting shared informers to save on memory & cpu usage. This also enables the cache mutation detector for kube-proxy for those presubmit jobs that already turn it on. Follow-up to #43295 cc @wojtek-t Will race with #43937 for conflicting changes 😄 cc @thockin cc @smarterclayton @sttts @liggitt @deads2k @derekwaynecarr @eparis @kubernetes/rh-cluster-infra
This commit is contained in:
commit
0f10d6ccf2
@ -619,10 +619,15 @@ function start-kube-proxy {
|
|||||||
if [[ -n "${KUBEPROXY_TEST_ARGS:-}" ]]; then
|
if [[ -n "${KUBEPROXY_TEST_ARGS:-}" ]]; then
|
||||||
params+=" ${KUBEPROXY_TEST_ARGS}"
|
params+=" ${KUBEPROXY_TEST_ARGS}"
|
||||||
fi
|
fi
|
||||||
|
local container_env=""
|
||||||
|
if [[ -n "${ENABLE_CACHE_MUTATION_DETECTOR:-}" ]]; then
|
||||||
|
container_env="env:\n - name: KUBE_CACHE_MUTATION_DETECTOR\n value: \"${ENABLE_CACHE_MUTATION_DETECTOR}\""
|
||||||
|
fi
|
||||||
sed -i -e "s@{{kubeconfig}}@${kubeconfig}@g" ${src_file}
|
sed -i -e "s@{{kubeconfig}}@${kubeconfig}@g" ${src_file}
|
||||||
sed -i -e "s@{{pillar\['kube_docker_registry'\]}}@${kube_docker_registry}@g" ${src_file}
|
sed -i -e "s@{{pillar\['kube_docker_registry'\]}}@${kube_docker_registry}@g" ${src_file}
|
||||||
sed -i -e "s@{{pillar\['kube-proxy_docker_tag'\]}}@${kube_proxy_docker_tag}@g" ${src_file}
|
sed -i -e "s@{{pillar\['kube-proxy_docker_tag'\]}}@${kube_proxy_docker_tag}@g" ${src_file}
|
||||||
sed -i -e "s@{{params}}@${params}@g" ${src_file}
|
sed -i -e "s@{{params}}@${params}@g" ${src_file}
|
||||||
|
sed -i -e "s@{{container_env}}@${container_env}@g" ${src_file}
|
||||||
sed -i -e "s@{{ cpurequest }}@100m@g" ${src_file}
|
sed -i -e "s@{{ cpurequest }}@100m@g" ${src_file}
|
||||||
sed -i -e "s@{{api_servers_with_port}}@${api_servers}@g" ${src_file}
|
sed -i -e "s@{{api_servers_with_port}}@${api_servers}@g" ${src_file}
|
||||||
if [[ -n "${CLUSTER_IP_RANGE:-}" ]]; then
|
if [[ -n "${CLUSTER_IP_RANGE:-}" ]]; then
|
||||||
|
@ -814,10 +814,15 @@ function start-kube-proxy {
|
|||||||
if [[ -n "${KUBEPROXY_TEST_ARGS:-}" ]]; then
|
if [[ -n "${KUBEPROXY_TEST_ARGS:-}" ]]; then
|
||||||
params+=" ${KUBEPROXY_TEST_ARGS}"
|
params+=" ${KUBEPROXY_TEST_ARGS}"
|
||||||
fi
|
fi
|
||||||
|
local container_env=""
|
||||||
|
if [[ -n "${ENABLE_CACHE_MUTATION_DETECTOR:-}" ]]; then
|
||||||
|
container_env="env:\n - name: KUBE_CACHE_MUTATION_DETECTOR\n value: \"${ENABLE_CACHE_MUTATION_DETECTOR}\""
|
||||||
|
fi
|
||||||
sed -i -e "s@{{kubeconfig}}@${kubeconfig}@g" ${src_file}
|
sed -i -e "s@{{kubeconfig}}@${kubeconfig}@g" ${src_file}
|
||||||
sed -i -e "s@{{pillar\['kube_docker_registry'\]}}@${kube_docker_registry}@g" ${src_file}
|
sed -i -e "s@{{pillar\['kube_docker_registry'\]}}@${kube_docker_registry}@g" ${src_file}
|
||||||
sed -i -e "s@{{pillar\['kube-proxy_docker_tag'\]}}@${kube_proxy_docker_tag}@g" ${src_file}
|
sed -i -e "s@{{pillar\['kube-proxy_docker_tag'\]}}@${kube_proxy_docker_tag}@g" ${src_file}
|
||||||
sed -i -e "s@{{params}}@${params}@g" ${src_file}
|
sed -i -e "s@{{params}}@${params}@g" ${src_file}
|
||||||
|
sed -i -e "s@{{container_env}}@${container_env}@g" ${src_file}
|
||||||
sed -i -e "s@{{ cpurequest }}@100m@g" ${src_file}
|
sed -i -e "s@{{ cpurequest }}@100m@g" ${src_file}
|
||||||
sed -i -e "s@{{api_servers_with_port}}@${api_servers}@g" ${src_file}
|
sed -i -e "s@{{api_servers_with_port}}@${api_servers}@g" ${src_file}
|
||||||
if [[ -n "${CLUSTER_IP_RANGE:-}" ]]; then
|
if [[ -n "${CLUSTER_IP_RANGE:-}" ]]; then
|
||||||
|
@ -32,6 +32,8 @@
|
|||||||
# test_args should always go last to overwrite prior configuration
|
# test_args should always go last to overwrite prior configuration
|
||||||
{% set params = log_level + " " + feature_gates + " " + test_args -%}
|
{% set params = log_level + " " + feature_gates + " " + test_args -%}
|
||||||
|
|
||||||
|
{% set container_env = "" -%}
|
||||||
|
|
||||||
# kube-proxy podspec
|
# kube-proxy podspec
|
||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
kind: Pod
|
kind: Pod
|
||||||
@ -60,6 +62,7 @@ spec:
|
|||||||
- /bin/sh
|
- /bin/sh
|
||||||
- -c
|
- -c
|
||||||
- echo -998 > /proc/$$$/oom_score_adj && kube-proxy {{api_servers_with_port}} {{kubeconfig}} {{cluster_cidr}} --resource-container="" {{params}} 1>>/var/log/kube-proxy.log 2>&1
|
- echo -998 > /proc/$$$/oom_score_adj && kube-proxy {{api_servers_with_port}} {{kubeconfig}} {{cluster_cidr}} --resource-container="" {{params}} 1>>/var/log/kube-proxy.log 2>&1
|
||||||
|
{{container_env}}
|
||||||
securityContext:
|
securityContext:
|
||||||
privileged: true
|
privileged: true
|
||||||
volumeMounts:
|
volumeMounts:
|
||||||
|
@ -19,6 +19,7 @@ go_library(
|
|||||||
"//cmd/kube-proxy/app/options:go_default_library",
|
"//cmd/kube-proxy/app/options:go_default_library",
|
||||||
"//pkg/api:go_default_library",
|
"//pkg/api:go_default_library",
|
||||||
"//pkg/client/clientset_generated/internalclientset:go_default_library",
|
"//pkg/client/clientset_generated/internalclientset:go_default_library",
|
||||||
|
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
|
||||||
"//pkg/proxy:go_default_library",
|
"//pkg/proxy:go_default_library",
|
||||||
"//pkg/proxy/config:go_default_library",
|
"//pkg/proxy/config:go_default_library",
|
||||||
"//pkg/proxy/iptables:go_default_library",
|
"//pkg/proxy/iptables:go_default_library",
|
||||||
|
@ -41,6 +41,7 @@ import (
|
|||||||
"k8s.io/kubernetes/cmd/kube-proxy/app/options"
|
"k8s.io/kubernetes/cmd/kube-proxy/app/options"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
|
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
|
||||||
"k8s.io/kubernetes/pkg/proxy"
|
"k8s.io/kubernetes/pkg/proxy"
|
||||||
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
|
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
|
||||||
"k8s.io/kubernetes/pkg/proxy/iptables"
|
"k8s.io/kubernetes/pkg/proxy/iptables"
|
||||||
@ -306,18 +307,24 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
|
|||||||
iptInterface.AddReloadFunc(proxier.Sync)
|
iptInterface.AddReloadFunc(proxier.Sync)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
informerFactory := informers.NewSharedInformerFactory(client, config.ConfigSyncPeriod)
|
||||||
|
|
||||||
// Create configs (i.e. Watches for Services and Endpoints)
|
// Create configs (i.e. Watches for Services and Endpoints)
|
||||||
// Note: RegisterHandler() calls need to happen before creation of Sources because sources
|
// Note: RegisterHandler() calls need to happen before creation of Sources because sources
|
||||||
// only notify on changes, and the initial update (on process start) may be lost if no handlers
|
// only notify on changes, and the initial update (on process start) may be lost if no handlers
|
||||||
// are registered yet.
|
// are registered yet.
|
||||||
serviceConfig := proxyconfig.NewServiceConfig(client.Core().RESTClient(), config.ConfigSyncPeriod)
|
serviceConfig := proxyconfig.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), config.ConfigSyncPeriod)
|
||||||
serviceConfig.RegisterHandler(servicesHandler)
|
serviceConfig.RegisterHandler(servicesHandler)
|
||||||
go serviceConfig.Run(wait.NeverStop)
|
go serviceConfig.Run(wait.NeverStop)
|
||||||
|
|
||||||
endpointsConfig := proxyconfig.NewEndpointsConfig(client.Core().RESTClient(), config.ConfigSyncPeriod)
|
endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod)
|
||||||
endpointsConfig.RegisterHandler(endpointsHandler)
|
endpointsConfig.RegisterHandler(endpointsHandler)
|
||||||
go endpointsConfig.Run(wait.NeverStop)
|
go endpointsConfig.Run(wait.NeverStop)
|
||||||
|
|
||||||
|
// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
|
||||||
|
// functions must configure their shared informer event handlers first.
|
||||||
|
go informerFactory.Start(wait.NeverStop)
|
||||||
|
|
||||||
config.NodeRef = &clientv1.ObjectReference{
|
config.NodeRef = &clientv1.ObjectReference{
|
||||||
Kind: "Node",
|
Kind: "Node",
|
||||||
Name: hostname,
|
Name: hostname,
|
||||||
|
@ -22,6 +22,7 @@ go_library(
|
|||||||
"//pkg/api:go_default_library",
|
"//pkg/api:go_default_library",
|
||||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||||
"//pkg/client/clientset_generated/internalclientset:go_default_library",
|
"//pkg/client/clientset_generated/internalclientset:go_default_library",
|
||||||
|
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
|
||||||
"//pkg/client/metrics/prometheus:go_default_library",
|
"//pkg/client/metrics/prometheus:go_default_library",
|
||||||
"//pkg/kubelet/cadvisor/testing:go_default_library",
|
"//pkg/kubelet/cadvisor/testing:go_default_library",
|
||||||
"//pkg/kubelet/cm:go_default_library",
|
"//pkg/kubelet/cm:go_default_library",
|
||||||
|
@ -30,6 +30,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
|
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
|
||||||
_ "k8s.io/kubernetes/pkg/client/metrics/prometheus" // for client metric registration
|
_ "k8s.io/kubernetes/pkg/client/metrics/prometheus" // for client metric registration
|
||||||
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
|
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||||
@ -54,8 +55,9 @@ type HollowNodeConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
maxPods = 110
|
maxPods = 110
|
||||||
podsPerCore = 0
|
podsPerCore = 0
|
||||||
|
configResyncPeriod = 15 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
var knownMorphs = sets.NewString("kubelet", "proxy")
|
var knownMorphs = sets.NewString("kubelet", "proxy")
|
||||||
@ -136,10 +138,11 @@ func main() {
|
|||||||
|
|
||||||
iptInterface := fakeiptables.NewFake()
|
iptInterface := fakeiptables.NewFake()
|
||||||
|
|
||||||
serviceConfig := proxyconfig.NewServiceConfig(internalClientset.Core().RESTClient(), 15*time.Minute)
|
informerFactory := informers.NewSharedInformerFactory(internalClientset, configResyncPeriod)
|
||||||
|
serviceConfig := proxyconfig.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), configResyncPeriod)
|
||||||
serviceConfig.RegisterHandler(&kubemark.FakeProxyHandler{})
|
serviceConfig.RegisterHandler(&kubemark.FakeProxyHandler{})
|
||||||
|
|
||||||
endpointsConfig := proxyconfig.NewEndpointsConfig(internalClientset.Core().RESTClient(), 15*time.Minute)
|
endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), configResyncPeriod)
|
||||||
endpointsConfig.RegisterHandler(&kubemark.FakeProxyHandler{})
|
endpointsConfig.RegisterHandler(&kubemark.FakeProxyHandler{})
|
||||||
|
|
||||||
eventClient, err := clientgoclientset.NewForConfig(clientConfig)
|
eventClient, err := clientgoclientset.NewForConfig(clientConfig)
|
||||||
@ -153,6 +156,7 @@ func main() {
|
|||||||
eventClient,
|
eventClient,
|
||||||
endpointsConfig,
|
endpointsConfig,
|
||||||
serviceConfig,
|
serviceConfig,
|
||||||
|
informerFactory,
|
||||||
iptInterface,
|
iptInterface,
|
||||||
eventBroadcaster,
|
eventBroadcaster,
|
||||||
recorder,
|
recorder,
|
||||||
|
@ -24,6 +24,7 @@ go_library(
|
|||||||
"//pkg/apis/componentconfig/v1alpha1:go_default_library",
|
"//pkg/apis/componentconfig/v1alpha1:go_default_library",
|
||||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||||
"//pkg/client/clientset_generated/internalclientset:go_default_library",
|
"//pkg/client/clientset_generated/internalclientset:go_default_library",
|
||||||
|
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
|
||||||
"//pkg/kubelet:go_default_library",
|
"//pkg/kubelet:go_default_library",
|
||||||
"//pkg/kubelet/cadvisor:go_default_library",
|
"//pkg/kubelet/cadvisor:go_default_library",
|
||||||
"//pkg/kubelet/cm:go_default_library",
|
"//pkg/kubelet/cm:go_default_library",
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"k8s.io/kubernetes/cmd/kube-proxy/app/options"
|
"k8s.io/kubernetes/cmd/kube-proxy/app/options"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
|
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
|
||||||
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
|
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
|
||||||
"k8s.io/kubernetes/pkg/util"
|
"k8s.io/kubernetes/pkg/util"
|
||||||
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
||||||
@ -56,6 +57,7 @@ func NewHollowProxyOrDie(
|
|||||||
eventClient v1core.EventsGetter,
|
eventClient v1core.EventsGetter,
|
||||||
endpointsConfig *proxyconfig.EndpointsConfig,
|
endpointsConfig *proxyconfig.EndpointsConfig,
|
||||||
serviceConfig *proxyconfig.ServiceConfig,
|
serviceConfig *proxyconfig.ServiceConfig,
|
||||||
|
informerFactory informers.SharedInformerFactory,
|
||||||
iptInterface utiliptables.Interface,
|
iptInterface utiliptables.Interface,
|
||||||
broadcaster record.EventBroadcaster,
|
broadcaster record.EventBroadcaster,
|
||||||
recorder record.EventRecorder,
|
recorder record.EventRecorder,
|
||||||
@ -73,6 +75,7 @@ func NewHollowProxyOrDie(
|
|||||||
|
|
||||||
go endpointsConfig.Run(wait.NeverStop)
|
go endpointsConfig.Run(wait.NeverStop)
|
||||||
go serviceConfig.Run(wait.NeverStop)
|
go serviceConfig.Run(wait.NeverStop)
|
||||||
|
go informerFactory.Start(wait.NeverStop)
|
||||||
|
|
||||||
hollowProxy, err := proxyapp.NewProxyServer(client, eventClient, config, iptInterface, &FakeProxier{}, broadcaster, recorder, nil, "fake")
|
hollowProxy, err := proxyapp.NewProxyServer(client, eventClient, config, iptInterface, &FakeProxier{}, broadcaster, recorder, nil, "fake")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -17,11 +17,10 @@ go_library(
|
|||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/api:go_default_library",
|
"//pkg/api:go_default_library",
|
||||||
|
"//pkg/client/informers/informers_generated/internalversion/core/internalversion:go_default_library",
|
||||||
"//pkg/client/listers/core/internalversion:go_default_library",
|
"//pkg/client/listers/core/internalversion:go_default_library",
|
||||||
"//pkg/util/config:go_default_library",
|
"//pkg/util/config:go_default_library",
|
||||||
"//vendor:github.com/golang/glog",
|
"//vendor:github.com/golang/glog",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
|
||||||
"//vendor:k8s.io/apimachinery/pkg/fields",
|
|
||||||
"//vendor:k8s.io/apimachinery/pkg/labels",
|
"//vendor:k8s.io/apimachinery/pkg/labels",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
||||||
"//vendor:k8s.io/client-go/tools/cache",
|
"//vendor:k8s.io/client-go/tools/cache",
|
||||||
@ -38,11 +37,12 @@ go_test(
|
|||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/api:go_default_library",
|
"//pkg/api:go_default_library",
|
||||||
|
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
|
||||||
|
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/runtime",
|
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/watch",
|
"//vendor:k8s.io/apimachinery/pkg/watch",
|
||||||
"//vendor:k8s.io/client-go/tools/cache",
|
"//vendor:k8s.io/client-go/testing",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -24,27 +24,13 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/client-go/tools/cache"
|
ktesting "k8s.io/client-go/testing"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
||||||
|
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
|
||||||
)
|
)
|
||||||
|
|
||||||
type fakeLW struct {
|
|
||||||
listResp runtime.Object
|
|
||||||
watchResp watch.Interface
|
|
||||||
}
|
|
||||||
|
|
||||||
func (lw fakeLW) List(options metav1.ListOptions) (runtime.Object, error) {
|
|
||||||
return lw.listResp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (lw fakeLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
|
|
||||||
return lw.watchResp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ cache.ListerWatcher = fakeLW{}
|
|
||||||
|
|
||||||
func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
|
func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
|
||||||
service1v1 := &api.Service{
|
service1v1 := &api.Service{
|
||||||
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "s1"},
|
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "s1"},
|
||||||
@ -57,11 +43,9 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
|
|||||||
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 30}}}}
|
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 30}}}}
|
||||||
|
|
||||||
// Setup fake api client.
|
// Setup fake api client.
|
||||||
|
client := fake.NewSimpleClientset()
|
||||||
fakeWatch := watch.NewFake()
|
fakeWatch := watch.NewFake()
|
||||||
lw := fakeLW{
|
client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
|
||||||
listResp: &api.ServiceList{Items: []api.Service{}},
|
|
||||||
watchResp: fakeWatch,
|
|
||||||
}
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
@ -69,8 +53,11 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
|
|||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
handler := newSvcHandler(t, nil, func() { ch <- struct{}{} })
|
handler := newSvcHandler(t, nil, func() { ch <- struct{}{} })
|
||||||
|
|
||||||
serviceConfig := newServiceConfig(lw, time.Minute)
|
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
|
||||||
|
|
||||||
|
serviceConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute)
|
||||||
serviceConfig.RegisterHandler(handler)
|
serviceConfig.RegisterHandler(handler)
|
||||||
|
go sharedInformers.Start(stopCh)
|
||||||
go serviceConfig.Run(stopCh)
|
go serviceConfig.Run(stopCh)
|
||||||
|
|
||||||
// Add the first service
|
// Add the first service
|
||||||
@ -130,11 +117,9 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Setup fake api client.
|
// Setup fake api client.
|
||||||
|
client := fake.NewSimpleClientset()
|
||||||
fakeWatch := watch.NewFake()
|
fakeWatch := watch.NewFake()
|
||||||
lw := fakeLW{
|
client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil))
|
||||||
listResp: &api.EndpointsList{Items: []api.Endpoints{}},
|
|
||||||
watchResp: fakeWatch,
|
|
||||||
}
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
@ -142,8 +127,11 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
|
|||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
handler := newEpsHandler(t, nil, func() { ch <- struct{}{} })
|
handler := newEpsHandler(t, nil, func() { ch <- struct{}{} })
|
||||||
|
|
||||||
endpointsConfig := newEndpointsConfig(lw, time.Minute)
|
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
|
||||||
|
|
||||||
|
endpointsConfig := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute)
|
||||||
endpointsConfig.RegisterHandler(handler)
|
endpointsConfig.RegisterHandler(handler)
|
||||||
|
go sharedInformers.Start(stopCh)
|
||||||
go endpointsConfig.Run(stopCh)
|
go endpointsConfig.Run(stopCh)
|
||||||
|
|
||||||
// Add the first endpoints
|
// Add the first endpoints
|
||||||
@ -229,19 +217,11 @@ func TestInitialSync(t *testing.T) {
|
|||||||
wg.Add(2)
|
wg.Add(2)
|
||||||
|
|
||||||
// Setup fake api client.
|
// Setup fake api client.
|
||||||
fakeSvcWatch := watch.NewFake()
|
client := fake.NewSimpleClientset(svc1, svc2, eps2, eps1)
|
||||||
svcLW := fakeLW{
|
sharedInformers := informers.NewSharedInformerFactory(client, 0)
|
||||||
listResp: &api.ServiceList{Items: []api.Service{*svc1, *svc2}},
|
|
||||||
watchResp: fakeSvcWatch,
|
|
||||||
}
|
|
||||||
fakeEpsWatch := watch.NewFake()
|
|
||||||
epsLW := fakeLW{
|
|
||||||
listResp: &api.EndpointsList{Items: []api.Endpoints{*eps2, *eps1}},
|
|
||||||
watchResp: fakeEpsWatch,
|
|
||||||
}
|
|
||||||
|
|
||||||
svcConfig := newServiceConfig(svcLW, time.Minute)
|
svcConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), 0)
|
||||||
epsConfig := newEndpointsConfig(epsLW, time.Minute)
|
epsConfig := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), 0)
|
||||||
svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done)
|
svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done)
|
||||||
svcConfig.RegisterHandler(svcHandler)
|
svcConfig.RegisterHandler(svcHandler)
|
||||||
epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done)
|
epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done)
|
||||||
@ -249,6 +229,7 @@ func TestInitialSync(t *testing.T) {
|
|||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
go sharedInformers.Start(stopCh)
|
||||||
go svcConfig.Run(stopCh)
|
go svcConfig.Run(stopCh)
|
||||||
go epsConfig.Run(stopCh)
|
go epsConfig.Run(stopCh)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
@ -21,12 +21,11 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/core/internalversion"
|
||||||
listers "k8s.io/kubernetes/pkg/client/listers/core/internalversion"
|
listers "k8s.io/kubernetes/pkg/client/listers/core/internalversion"
|
||||||
"k8s.io/kubernetes/pkg/util/config"
|
"k8s.io/kubernetes/pkg/util/config"
|
||||||
)
|
)
|
||||||
@ -64,35 +63,36 @@ type EndpointsConfigHandler interface {
|
|||||||
// EndpointsConfig tracks a set of endpoints configurations.
|
// EndpointsConfig tracks a set of endpoints configurations.
|
||||||
// It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change.
|
// It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change.
|
||||||
type EndpointsConfig struct {
|
type EndpointsConfig struct {
|
||||||
informer cache.Controller
|
lister listers.EndpointsLister
|
||||||
lister listers.EndpointsLister
|
listerSynced cache.InformerSynced
|
||||||
handlers []EndpointsConfigHandler
|
handlers []EndpointsConfigHandler
|
||||||
// updates channel is used to trigger registered handlers.
|
// updates channel is used to trigger registered handlers.
|
||||||
updates chan struct{}
|
updates chan struct{}
|
||||||
|
stop chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEndpointsConfig creates a new EndpointsConfig.
|
// NewEndpointsConfig creates a new EndpointsConfig.
|
||||||
func NewEndpointsConfig(c cache.Getter, period time.Duration) *EndpointsConfig {
|
func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyncPeriod time.Duration) *EndpointsConfig {
|
||||||
endpointsLW := cache.NewListWatchFromClient(c, "endpoints", metav1.NamespaceAll, fields.Everything())
|
result := &EndpointsConfig{
|
||||||
return newEndpointsConfig(endpointsLW, period)
|
lister: endpointsInformer.Lister(),
|
||||||
}
|
listerSynced: endpointsInformer.Informer().HasSynced,
|
||||||
|
// The updates channel is used to send interrupts to the Endpoints handler.
|
||||||
|
// It's buffered because we never want to block for as long as there is a
|
||||||
|
// pending interrupt, but don't want to drop them if the handler is doing
|
||||||
|
// work.
|
||||||
|
updates: make(chan struct{}, 1),
|
||||||
|
stop: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
func newEndpointsConfig(lw cache.ListerWatcher, period time.Duration) *EndpointsConfig {
|
endpointsInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||||
result := &EndpointsConfig{}
|
|
||||||
|
|
||||||
store, informer := cache.NewIndexerInformer(
|
|
||||||
lw,
|
|
||||||
&api.Endpoints{},
|
|
||||||
period,
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: result.handleAddEndpoints,
|
AddFunc: result.handleAddEndpoints,
|
||||||
UpdateFunc: result.handleUpdateEndpoints,
|
UpdateFunc: result.handleUpdateEndpoints,
|
||||||
DeleteFunc: result.handleDeleteEndpoints,
|
DeleteFunc: result.handleDeleteEndpoints,
|
||||||
},
|
},
|
||||||
cache.Indexers{},
|
resyncPeriod,
|
||||||
)
|
)
|
||||||
result.informer = informer
|
|
||||||
result.lister = listers.NewEndpointsLister(store)
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -101,16 +101,9 @@ func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
|
|||||||
c.handlers = append(c.handlers, handler)
|
c.handlers = append(c.handlers, handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts the underlying informer and goroutine responsible for calling
|
// Run starts the goroutine responsible for calling registered handlers.
|
||||||
// registered handlers.
|
|
||||||
func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
|
func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
|
||||||
// The updates channel is used to send interrupts to the Endpoints handler.
|
if !cache.WaitForCacheSync(stopCh, c.listerSynced) {
|
||||||
// It's buffered because we never want to block for as long as there is a
|
|
||||||
// pending interrupt, but don't want to drop them if the handler is doing
|
|
||||||
// work.
|
|
||||||
c.updates = make(chan struct{}, 1)
|
|
||||||
go c.informer.Run(stopCh)
|
|
||||||
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("endpoint controller not synced"))
|
utilruntime.HandleError(fmt.Errorf("endpoint controller not synced"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -118,27 +111,32 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
|
|||||||
// We have synced informers. Now we can start delivering updates
|
// We have synced informers. Now we can start delivering updates
|
||||||
// to the registered handler.
|
// to the registered handler.
|
||||||
go func() {
|
go func() {
|
||||||
for range c.updates {
|
for {
|
||||||
endpoints, err := c.lister.List(labels.Everything())
|
select {
|
||||||
if err != nil {
|
case <-c.updates:
|
||||||
glog.Errorf("Error while listing endpoints from cache: %v", err)
|
endpoints, err := c.lister.List(labels.Everything())
|
||||||
// This will cause a retry (if there isn't any other trigger in-flight).
|
if err != nil {
|
||||||
c.dispatchUpdate()
|
glog.Errorf("Error while listing endpoints from cache: %v", err)
|
||||||
continue
|
// This will cause a retry (if there isn't any other trigger in-flight).
|
||||||
}
|
c.dispatchUpdate()
|
||||||
if endpoints == nil {
|
continue
|
||||||
endpoints = []*api.Endpoints{}
|
}
|
||||||
}
|
if endpoints == nil {
|
||||||
for i := range c.handlers {
|
endpoints = []*api.Endpoints{}
|
||||||
glog.V(3).Infof("Calling handler.OnEndpointsUpdate()")
|
}
|
||||||
c.handlers[i].OnEndpointsUpdate(endpoints)
|
for i := range c.handlers {
|
||||||
|
glog.V(3).Infof("Calling handler.OnEndpointsUpdate()")
|
||||||
|
c.handlers[i].OnEndpointsUpdate(endpoints)
|
||||||
|
}
|
||||||
|
case <-c.stop:
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
// Close updates channel when stopCh is closed.
|
// Close updates channel when stopCh is closed.
|
||||||
go func() {
|
go func() {
|
||||||
<-stopCh
|
<-stopCh
|
||||||
close(c.updates)
|
close(c.stop)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -157,6 +155,9 @@ func (c *EndpointsConfig) handleDeleteEndpoints(_ interface{}) {
|
|||||||
func (c *EndpointsConfig) dispatchUpdate() {
|
func (c *EndpointsConfig) dispatchUpdate() {
|
||||||
select {
|
select {
|
||||||
case c.updates <- struct{}{}:
|
case c.updates <- struct{}{}:
|
||||||
|
// Work enqueued successfully
|
||||||
|
case <-c.stop:
|
||||||
|
// We're shut down / avoid logging the message below
|
||||||
default:
|
default:
|
||||||
glog.V(4).Infof("Endpoints handler already has a pending interrupt.")
|
glog.V(4).Infof("Endpoints handler already has a pending interrupt.")
|
||||||
}
|
}
|
||||||
@ -165,35 +166,36 @@ func (c *EndpointsConfig) dispatchUpdate() {
|
|||||||
// ServiceConfig tracks a set of service configurations.
|
// ServiceConfig tracks a set of service configurations.
|
||||||
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
|
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
|
||||||
type ServiceConfig struct {
|
type ServiceConfig struct {
|
||||||
informer cache.Controller
|
lister listers.ServiceLister
|
||||||
lister listers.ServiceLister
|
listerSynced cache.InformerSynced
|
||||||
handlers []ServiceConfigHandler
|
handlers []ServiceConfigHandler
|
||||||
// updates channel is used to trigger registered handlers
|
// updates channel is used to trigger registered handlers
|
||||||
updates chan struct{}
|
updates chan struct{}
|
||||||
|
stop chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServiceConfig creates a new ServiceConfig.
|
// NewServiceConfig creates a new ServiceConfig.
|
||||||
func NewServiceConfig(c cache.Getter, period time.Duration) *ServiceConfig {
|
func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
|
||||||
servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything())
|
result := &ServiceConfig{
|
||||||
return newServiceConfig(servicesLW, period)
|
lister: serviceInformer.Lister(),
|
||||||
}
|
listerSynced: serviceInformer.Informer().HasSynced,
|
||||||
|
// The updates channel is used to send interrupts to the Services handler.
|
||||||
|
// It's buffered because we never want to block for as long as there is a
|
||||||
|
// pending interrupt, but don't want to drop them if the handler is doing
|
||||||
|
// work.
|
||||||
|
updates: make(chan struct{}, 1),
|
||||||
|
stop: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
func newServiceConfig(lw cache.ListerWatcher, period time.Duration) *ServiceConfig {
|
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||||
result := &ServiceConfig{}
|
|
||||||
|
|
||||||
store, informer := cache.NewIndexerInformer(
|
|
||||||
lw,
|
|
||||||
&api.Service{},
|
|
||||||
period,
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: result.handleAddService,
|
AddFunc: result.handleAddService,
|
||||||
UpdateFunc: result.handleUpdateService,
|
UpdateFunc: result.handleUpdateService,
|
||||||
DeleteFunc: result.handleDeleteService,
|
DeleteFunc: result.handleDeleteService,
|
||||||
},
|
},
|
||||||
cache.Indexers{},
|
resyncPeriod,
|
||||||
)
|
)
|
||||||
result.informer = informer
|
|
||||||
result.lister = listers.NewServiceLister(store)
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -202,16 +204,10 @@ func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
|
|||||||
c.handlers = append(c.handlers, handler)
|
c.handlers = append(c.handlers, handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts the underlying informer and goroutine responsible for calling
|
// Run starts the goroutine responsible for calling
|
||||||
// registered handlers.
|
// registered handlers.
|
||||||
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
|
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
|
||||||
// The updates channel is used to send interrupts to the Services handler.
|
if !cache.WaitForCacheSync(stopCh, c.listerSynced) {
|
||||||
// It's buffered because we never want to block for as long as there is a
|
|
||||||
// pending interrupt, but don't want to drop them if the handler is doing
|
|
||||||
// work.
|
|
||||||
c.updates = make(chan struct{}, 1)
|
|
||||||
go c.informer.Run(stopCh)
|
|
||||||
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("service controller not synced"))
|
utilruntime.HandleError(fmt.Errorf("service controller not synced"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -219,27 +215,32 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
|
|||||||
// We have synced informers. Now we can start delivering updates
|
// We have synced informers. Now we can start delivering updates
|
||||||
// to the registered handler.
|
// to the registered handler.
|
||||||
go func() {
|
go func() {
|
||||||
for range c.updates {
|
for {
|
||||||
services, err := c.lister.List(labels.Everything())
|
select {
|
||||||
if err != nil {
|
case <-c.updates:
|
||||||
glog.Errorf("Error while listing services from cache: %v", err)
|
services, err := c.lister.List(labels.Everything())
|
||||||
// This will cause a retry (if there isn't any other trigger in-flight).
|
if err != nil {
|
||||||
c.dispatchUpdate()
|
glog.Errorf("Error while listing services from cache: %v", err)
|
||||||
continue
|
// This will cause a retry (if there isnt' any other trigger in-flight).
|
||||||
}
|
c.dispatchUpdate()
|
||||||
if services == nil {
|
continue
|
||||||
services = []*api.Service{}
|
}
|
||||||
}
|
if services == nil {
|
||||||
for i := range c.handlers {
|
services = []*api.Service{}
|
||||||
glog.V(3).Infof("Calling handler.OnServiceUpdate()")
|
}
|
||||||
c.handlers[i].OnServiceUpdate(services)
|
for i := range c.handlers {
|
||||||
|
glog.V(3).Infof("Calling handler.OnServiceUpdate()")
|
||||||
|
c.handlers[i].OnServiceUpdate(services)
|
||||||
|
}
|
||||||
|
case <-c.stop:
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
// Close updates channel when stopCh is closed.
|
// Close updates channel when stopCh is closed.
|
||||||
go func() {
|
go func() {
|
||||||
<-stopCh
|
<-stopCh
|
||||||
close(c.updates)
|
close(c.stop)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -258,8 +259,11 @@ func (c *ServiceConfig) handleDeleteService(_ interface{}) {
|
|||||||
func (c *ServiceConfig) dispatchUpdate() {
|
func (c *ServiceConfig) dispatchUpdate() {
|
||||||
select {
|
select {
|
||||||
case c.updates <- struct{}{}:
|
case c.updates <- struct{}{}:
|
||||||
|
// Work enqueued successfully
|
||||||
|
case <-c.stop:
|
||||||
|
// We're shut down / avoid logging the message below
|
||||||
default:
|
default:
|
||||||
glog.V(4).Infof("Service handler alread has a pending interrupt.")
|
glog.V(4).Infof("Service handler already has a pending interrupt.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,7 +25,10 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
ktesting "k8s.io/client-go/testing"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
||||||
|
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
|
||||||
)
|
)
|
||||||
|
|
||||||
type sortedServices []*api.Service
|
type sortedServices []*api.Service
|
||||||
@ -121,17 +124,19 @@ func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestNewServiceAddedAndNotified(t *testing.T) {
|
func TestNewServiceAddedAndNotified(t *testing.T) {
|
||||||
|
client := fake.NewSimpleClientset()
|
||||||
fakeWatch := watch.NewFake()
|
fakeWatch := watch.NewFake()
|
||||||
lw := fakeLW{
|
client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
|
||||||
listResp: &api.ServiceList{Items: []api.Service{}},
|
|
||||||
watchResp: fakeWatch,
|
|
||||||
}
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
config := newServiceConfig(lw, time.Minute)
|
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
|
||||||
|
|
||||||
|
config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute)
|
||||||
handler := NewServiceHandlerMock()
|
handler := NewServiceHandlerMock()
|
||||||
config.RegisterHandler(handler)
|
config.RegisterHandler(handler)
|
||||||
|
go sharedInformers.Start(stopCh)
|
||||||
go config.Run(stopCh)
|
go config.Run(stopCh)
|
||||||
|
|
||||||
service := &api.Service{
|
service := &api.Service{
|
||||||
@ -143,17 +148,19 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
|
func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
|
||||||
|
client := fake.NewSimpleClientset()
|
||||||
fakeWatch := watch.NewFake()
|
fakeWatch := watch.NewFake()
|
||||||
lw := fakeLW{
|
client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
|
||||||
listResp: &api.ServiceList{Items: []api.Service{}},
|
|
||||||
watchResp: fakeWatch,
|
|
||||||
}
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
config := newServiceConfig(lw, time.Minute)
|
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
|
||||||
|
|
||||||
|
config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute)
|
||||||
handler := NewServiceHandlerMock()
|
handler := NewServiceHandlerMock()
|
||||||
config.RegisterHandler(handler)
|
config.RegisterHandler(handler)
|
||||||
|
go sharedInformers.Start(stopCh)
|
||||||
go config.Run(stopCh)
|
go config.Run(stopCh)
|
||||||
|
|
||||||
service1 := &api.Service{
|
service1 := &api.Service{
|
||||||
@ -177,19 +184,21 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
|
func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
|
||||||
|
client := fake.NewSimpleClientset()
|
||||||
fakeWatch := watch.NewFake()
|
fakeWatch := watch.NewFake()
|
||||||
lw := fakeLW{
|
client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
|
||||||
listResp: &api.ServiceList{Items: []api.Service{}},
|
|
||||||
watchResp: fakeWatch,
|
|
||||||
}
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
config := newServiceConfig(lw, time.Minute)
|
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
|
||||||
|
|
||||||
|
config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute)
|
||||||
handler := NewServiceHandlerMock()
|
handler := NewServiceHandlerMock()
|
||||||
handler2 := NewServiceHandlerMock()
|
handler2 := NewServiceHandlerMock()
|
||||||
config.RegisterHandler(handler)
|
config.RegisterHandler(handler)
|
||||||
config.RegisterHandler(handler2)
|
config.RegisterHandler(handler2)
|
||||||
|
go sharedInformers.Start(stopCh)
|
||||||
go config.Run(stopCh)
|
go config.Run(stopCh)
|
||||||
|
|
||||||
service1 := &api.Service{
|
service1 := &api.Service{
|
||||||
@ -209,19 +218,21 @@ func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
|
func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
|
||||||
|
client := fake.NewSimpleClientset()
|
||||||
fakeWatch := watch.NewFake()
|
fakeWatch := watch.NewFake()
|
||||||
lw := fakeLW{
|
client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil))
|
||||||
listResp: &api.EndpointsList{Items: []api.Endpoints{}},
|
|
||||||
watchResp: fakeWatch,
|
|
||||||
}
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
config := newEndpointsConfig(lw, time.Minute)
|
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
|
||||||
|
|
||||||
|
config := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute)
|
||||||
handler := NewEndpointsHandlerMock()
|
handler := NewEndpointsHandlerMock()
|
||||||
handler2 := NewEndpointsHandlerMock()
|
handler2 := NewEndpointsHandlerMock()
|
||||||
config.RegisterHandler(handler)
|
config.RegisterHandler(handler)
|
||||||
config.RegisterHandler(handler2)
|
config.RegisterHandler(handler2)
|
||||||
|
go sharedInformers.Start(stopCh)
|
||||||
go config.Run(stopCh)
|
go config.Run(stopCh)
|
||||||
|
|
||||||
endpoints1 := &api.Endpoints{
|
endpoints1 := &api.Endpoints{
|
||||||
@ -247,19 +258,21 @@ func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
|
func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
|
||||||
|
client := fake.NewSimpleClientset()
|
||||||
fakeWatch := watch.NewFake()
|
fakeWatch := watch.NewFake()
|
||||||
lw := fakeLW{
|
client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil))
|
||||||
listResp: &api.EndpointsList{Items: []api.Endpoints{}},
|
|
||||||
watchResp: fakeWatch,
|
|
||||||
}
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
config := newEndpointsConfig(lw, time.Minute)
|
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
|
||||||
|
|
||||||
|
config := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute)
|
||||||
handler := NewEndpointsHandlerMock()
|
handler := NewEndpointsHandlerMock()
|
||||||
handler2 := NewEndpointsHandlerMock()
|
handler2 := NewEndpointsHandlerMock()
|
||||||
config.RegisterHandler(handler)
|
config.RegisterHandler(handler)
|
||||||
config.RegisterHandler(handler2)
|
config.RegisterHandler(handler2)
|
||||||
|
go sharedInformers.Start(stopCh)
|
||||||
go config.Run(stopCh)
|
go config.Run(stopCh)
|
||||||
|
|
||||||
endpoints1 := &api.Endpoints{
|
endpoints1 := &api.Endpoints{
|
||||||
|
Loading…
Reference in New Issue
Block a user