mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 09:22:44 +00:00
Merge pull request #24118 from smarterclayton/proxy_args
Automatic merge from submit-queue Allow Proxy to be initialized with store
This commit is contained in:
commit
767fa6913d
@ -21,41 +21,54 @@ import (
|
|||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/client/cache"
|
"k8s.io/kubernetes/pkg/client/cache"
|
||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
|
||||||
"k8s.io/kubernetes/pkg/fields"
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewSourceAPI creates config source that watches for changes to the services and endpoints.
|
// NewSourceAPI creates config source that watches for changes to the services and endpoints.
|
||||||
func NewSourceAPI(c *client.Client, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) {
|
func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) {
|
||||||
servicesLW := cache.NewListWatchFromClient(c, "services", api.NamespaceAll, fields.Everything())
|
servicesLW := cache.NewListWatchFromClient(c, "services", api.NamespaceAll, fields.Everything())
|
||||||
endpointsLW := cache.NewListWatchFromClient(c, "endpoints", api.NamespaceAll, fields.Everything())
|
cache.NewReflector(servicesLW, &api.Service{}, NewServiceStore(nil, servicesChan), period).Run()
|
||||||
|
|
||||||
newServicesSourceApiFromLW(servicesLW, period, servicesChan)
|
endpointsLW := cache.NewListWatchFromClient(c, "endpoints", api.NamespaceAll, fields.Everything())
|
||||||
newEndpointsSourceApiFromLW(endpointsLW, period, endpointsChan)
|
cache.NewReflector(endpointsLW, &api.Endpoints{}, NewEndpointsStore(nil, endpointsChan), period).Run()
|
||||||
}
|
}
|
||||||
|
|
||||||
func newServicesSourceApiFromLW(servicesLW cache.ListerWatcher, period time.Duration, servicesChan chan<- ServiceUpdate) {
|
// NewServiceStore creates an undelta store that expands updates to the store into
|
||||||
servicesPush := func(objs []interface{}) {
|
// ServiceUpdate events on the channel. If no store is passed, a default store will
|
||||||
|
// be initialized. Allows reuse of a cache store across multiple components.
|
||||||
|
func NewServiceStore(store cache.Store, ch chan<- ServiceUpdate) cache.Store {
|
||||||
|
fn := func(objs []interface{}) {
|
||||||
var services []api.Service
|
var services []api.Service
|
||||||
for _, o := range objs {
|
for _, o := range objs {
|
||||||
services = append(services, *(o.(*api.Service)))
|
services = append(services, *(o.(*api.Service)))
|
||||||
}
|
}
|
||||||
servicesChan <- ServiceUpdate{Op: SET, Services: services}
|
ch <- ServiceUpdate{Op: SET, Services: services}
|
||||||
|
}
|
||||||
|
if store == nil {
|
||||||
|
store = cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||||
|
}
|
||||||
|
return &cache.UndeltaStore{
|
||||||
|
Store: store,
|
||||||
|
PushFunc: fn,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceQueue := cache.NewUndeltaStore(servicesPush, cache.MetaNamespaceKeyFunc)
|
// NewEndpointsStore creates an undelta store that expands updates to the store into
|
||||||
cache.NewReflector(servicesLW, &api.Service{}, serviceQueue, period).Run()
|
// EndpointsUpdate events on the channel. If no store is passed, a default store will
|
||||||
}
|
// be initialized. Allows reuse of a cache store across multiple components.
|
||||||
|
func NewEndpointsStore(store cache.Store, ch chan<- EndpointsUpdate) cache.Store {
|
||||||
func newEndpointsSourceApiFromLW(endpointsLW cache.ListerWatcher, period time.Duration, endpointsChan chan<- EndpointsUpdate) {
|
fn := func(objs []interface{}) {
|
||||||
endpointsPush := func(objs []interface{}) {
|
|
||||||
var endpoints []api.Endpoints
|
var endpoints []api.Endpoints
|
||||||
for _, o := range objs {
|
for _, o := range objs {
|
||||||
endpoints = append(endpoints, *(o.(*api.Endpoints)))
|
endpoints = append(endpoints, *(o.(*api.Endpoints)))
|
||||||
}
|
}
|
||||||
endpointsChan <- EndpointsUpdate{Op: SET, Endpoints: endpoints}
|
ch <- EndpointsUpdate{Op: SET, Endpoints: endpoints}
|
||||||
|
}
|
||||||
|
if store == nil {
|
||||||
|
store = cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||||
|
}
|
||||||
|
return &cache.UndeltaStore{
|
||||||
|
Store: store,
|
||||||
|
PushFunc: fn,
|
||||||
}
|
}
|
||||||
|
|
||||||
endpointQueue := cache.NewUndeltaStore(endpointsPush, cache.MetaNamespaceKeyFunc)
|
|
||||||
cache.NewReflector(endpointsLW, &api.Endpoints{}, endpointQueue, period).Run()
|
|
||||||
}
|
}
|
||||||
|
@ -61,7 +61,7 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
|
|||||||
|
|
||||||
ch := make(chan ServiceUpdate)
|
ch := make(chan ServiceUpdate)
|
||||||
|
|
||||||
newServicesSourceApiFromLW(lw, 30*time.Second, ch)
|
cache.NewReflector(lw, &api.Service{}, NewServiceStore(nil, ch), 30*time.Second).Run()
|
||||||
|
|
||||||
got, ok := <-ch
|
got, ok := <-ch
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -172,7 +172,7 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
|
|||||||
|
|
||||||
ch := make(chan EndpointsUpdate)
|
ch := make(chan EndpointsUpdate)
|
||||||
|
|
||||||
newEndpointsSourceApiFromLW(lw, 30*time.Second, ch)
|
cache.NewReflector(lw, &api.Endpoints{}, NewEndpointsStore(nil, ch), 30*time.Second).Run()
|
||||||
|
|
||||||
got, ok := <-ch
|
got, ok := <-ch
|
||||||
if !ok {
|
if !ok {
|
||||||
|
Loading…
Reference in New Issue
Block a user