Add stopCh to apiserver & context to kublet commands

Remove SetupSignalContext call from the apiserver

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>
This commit is contained in:
Darren Shepherd
2019-08-26 15:55:35 -07:00
committed by rafaelbreno[commit]
parent d01504fe57
commit 4405f44300
9 changed files with 35 additions and 32 deletions

View File

@@ -18,13 +18,13 @@ package main
import (
"bytes"
"context"
"fmt"
"io"
"os"
"github.com/spf13/cobra/doc"
"github.com/spf13/pflag"
"k8s.io/apiserver/pkg/server"
"k8s.io/kubernetes/cmd/genutils"
apiservapp "k8s.io/kubernetes/cmd/kube-apiserver/app"
cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app"
@@ -46,7 +46,6 @@ func main() {
os.Exit(1)
}
ctx := context.Background()
outDir, err := genutils.OutDir(path)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get output directory: %v\n", err)
@@ -56,7 +55,7 @@ func main() {
switch module {
case "kube-apiserver":
// generate docs for kube-apiserver
apiserver := apiservapp.NewAPIServerCommand()
apiserver := apiservapp.NewAPIServerCommand(server.SetupSignalHandler())
doc.GenMarkdownTree(apiserver, outDir)
case "kube-controller-manager":
// generate docs for kube-controller-manager
@@ -68,11 +67,11 @@ func main() {
doc.GenMarkdownTree(proxy, outDir)
case "kube-scheduler":
// generate docs for kube-scheduler
scheduler := schapp.NewSchedulerCommand()
scheduler := schapp.NewSchedulerCommand(server.SetupSignalHandler())
doc.GenMarkdownTree(scheduler, outDir)
case "kubelet":
// generate docs for kubelet
kubelet := kubeletapp.NewKubeletCommand(ctx)
kubelet := kubeletapp.NewKubeletCommand(server.SetupSignalContext())
doc.GenMarkdownTree(kubelet, outDir)
case "kubeadm":
// resets global flags created by kubelet or other commands e.g.

View File

@@ -18,7 +18,6 @@ package main
import (
"bytes"
"context"
"fmt"
"io"
"os"
@@ -27,6 +26,7 @@ import (
mangen "github.com/cpuguy83/go-md2man/v2/md2man"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/apiserver/pkg/server"
"k8s.io/cli-runtime/pkg/genericiooptions"
kubectlcmd "k8s.io/kubectl/pkg/cmd"
"k8s.io/kubernetes/cmd/genutils"
@@ -59,12 +59,11 @@ func main() {
// Set environment variables used by command so the output is consistent,
// regardless of where we run.
os.Setenv("HOME", "/home/username")
ctx := context.Background()
switch module {
case "kube-apiserver":
// generate manpage for kube-apiserver
apiserver := apiservapp.NewAPIServerCommand()
apiserver := apiservapp.NewAPIServerCommand(server.SetupSignalHandler())
genMarkdown(apiserver, "", outDir)
for _, c := range apiserver.Commands() {
genMarkdown(c, "kube-apiserver", outDir)
@@ -85,14 +84,14 @@ func main() {
}
case "kube-scheduler":
// generate manpage for kube-scheduler
scheduler := schapp.NewSchedulerCommand()
scheduler := schapp.NewSchedulerCommand(server.SetupSignalHandler())
genMarkdown(scheduler, "", outDir)
for _, c := range scheduler.Commands() {
genMarkdown(c, "kube-scheduler", outDir)
}
case "kubelet":
// generate manpage for kubelet
kubelet := kubeletapp.NewKubeletCommand(ctx)
kubelet := kubeletapp.NewKubeletCommand(server.SetupSignalContext())
genMarkdown(kubelet, "", outDir)
for _, c := range kubelet.Commands() {
genMarkdown(c, "kubelet", outDir)

View File

@@ -22,6 +22,7 @@ import (
"os"
_ "time/tzdata" // for timeZone support in CronJob
"k8s.io/apiserver/pkg/server"
"k8s.io/component-base/cli"
_ "k8s.io/component-base/logs/json/register" // for JSON log format registration
_ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugins
@@ -30,7 +31,7 @@ import (
)
func main() {
command := app.NewAPIServerCommand()
command := app.NewAPIServerCommand(server.SetupSignalHandler())
code := cli.Run(command)
os.Exit(code)
}

View File

@@ -67,7 +67,7 @@ func init() {
}
// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {
func NewAPIServerCommand(stopCh <-chan struct{}) *cobra.Command {
s := options.NewServerRunOptions()
ctx := genericapiserver.SetupSignalContext()
featureGate := s.GenericServerRunOptions.ComponentGlobalsRegistry.FeatureGateFor(basecompatibility.DefaultKubeComponent)
@@ -114,7 +114,7 @@ cluster's shared state through which all other components interact.`,
featureGate.(featuregate.MutableFeatureGate).AddMetrics()
// add component version metrics
s.GenericServerRunOptions.ComponentGlobalsRegistry.AddMetrics()
return Run(ctx, completedOptions)
return Run(cmd.Context(), completedOptions, stopCh)
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
@@ -125,7 +125,6 @@ cluster's shared state through which all other components interact.`,
return nil
},
}
cmd.SetContext(ctx)
fs := cmd.Flags()
namedFlagSets := s.Flags()
@@ -145,7 +144,7 @@ cluster's shared state through which all other components interact.`,
}
// Run runs the specified APIServer. This should never exit.
func Run(ctx context.Context, opts options.CompletedOptions) error {
func Run(ctx context.Context, opts options.CompletedOptions, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", utilversion.Get())

View File

@@ -35,7 +35,6 @@ import (
"k8s.io/apiserver/pkg/authorization/authorizer"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server"
genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/flagz"
"k8s.io/apiserver/pkg/server/healthz"
@@ -87,7 +86,11 @@ func init() {
type Option func(runtime.Registry) error
// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
func NewSchedulerCommand(stopCh <-chan struct{}, registryOptions ...Option) *cobra.Command {
// explicitly register (if not already registered) the kube effective version and feature gate in DefaultComponentGlobalsRegistry,
// which will be used in NewOptions.
_, _ = s.De.DefaultComponentGlobalsRegistry.ComponentGlobalsOrRegister(
featuregate.DefaultKubeComponent, utilversion.DefaultBuildEffectiveVersion(), utilfeature.DefaultMutableFeatureGate)
opts := options.NewOptions()
cmd := &cobra.Command{
@@ -105,7 +108,7 @@ for more information about scheduling and the kube-scheduler component.`,
return opts.ComponentGlobalsRegistry.Set()
},
RunE: func(cmd *cobra.Command, args []string) error {
return runCommand(cmd, opts, registryOptions...)
return runCommand(cmd, opts, stopCh, registryOptions...)
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
@@ -136,7 +139,7 @@ for more information about scheduling and the kube-scheduler component.`,
}
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
func runCommand(cmd *cobra.Command, opts *options.Options, stopCh <-chan struct{}, registryOptions ...Option) error {
verflag.PrintAndExitIfRequested()
fg := opts.ComponentGlobalsRegistry.FeatureGateFor(basecompatibility.DefaultKubeComponent)
// Activate logging as soon as possible, after that
@@ -150,7 +153,6 @@ func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Op
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
stopCh := server.SetupSignalHandler()
<-stopCh
cancel()
}()

View File

@@ -19,6 +19,7 @@ package main
import (
"os"
"k8s.io/apiserver/pkg/server"
"k8s.io/component-base/cli"
_ "k8s.io/component-base/logs/json/register" // for JSON log format registration
_ "k8s.io/component-base/metrics/prometheus/clientgo"
@@ -27,7 +28,7 @@ import (
)
func main() {
command := app.NewSchedulerCommand()
command := app.NewSchedulerCommand(server.SetupSignalHandler())
code := cli.Run(command)
os.Exit(code)
}

View File

@@ -64,7 +64,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/flagz"
"k8s.io/apiserver/pkg/server/healthz"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@@ -278,7 +277,6 @@ is checked every 20 seconds (also configurable with a flag).`,
if err := checkPermissions(ctx); err != nil {
logger.Error(err, "Kubelet running with insufficient permissions")
}
// make the kubelet's config safe for logging
config := kubeletServer.KubeletConfiguration.DeepCopy()
for k := range config.StaticPodURLHeader {
@@ -287,9 +285,6 @@ is checked every 20 seconds (also configurable with a flag).`,
// log the kubelet's config for inspection
logger.V(5).Info("KubeletConfiguration", "configuration", klog.Format(config))
// set up signal context for kubelet shutdown
ctx := genericapiserver.SetupSignalContext()
utilfeature.DefaultMutableFeatureGate.AddMetrics()
// run the kubelet
return Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate)
@@ -512,7 +507,8 @@ func UnsecuredDependencies(ctx context.Context, s *options.KubeletServer, featur
OSInterface: kubecontainer.RealOS{},
VolumePlugins: plugins,
DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir, pluginRunner),
TLSOptions: tlsOptions}, nil
TLSOptions: tlsOptions,
}, nil
}
// Run runs the specified KubeletServer with the given Dependencies. This should never exit.
@@ -878,7 +874,6 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
kubeDeps.Recorder,
kubeDeps.KubeClient,
)
if err != nil {
return err
}
@@ -1279,7 +1274,8 @@ func createAndInitKubelet(
kubeDeps *kubelet.Dependencies,
hostname string,
nodeName types.NodeName,
nodeIPs []net.IP) (k kubelet.Bootstrap, err error) {
nodeIPs []net.IP,
) (k kubelet.Bootstrap, err error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations

View File

@@ -22,9 +22,9 @@ limitations under the License.
package main
import (
"context"
"os"
"k8s.io/apiserver/pkg/server"
"k8s.io/component-base/cli"
_ "k8s.io/component-base/logs/json/register" // for JSON log format registration
_ "k8s.io/component-base/metrics/prometheus/clientgo" // for client metric registration
@@ -33,7 +33,7 @@ import (
)
func main() {
command := app.NewKubeletCommand(context.Background())
command := app.NewKubeletCommand(server.SetupSignalContext())
code := cli.Run(command)
os.Exit(code)
}

View File

@@ -47,6 +47,7 @@ AwEHoUQDQgAEH6cuzP8XuD5wal6wf9M6xDljTOPLX2i8uIp/C/ASqiIGUeeKQtX0
// APIServer is a server which manages apiserver.
type APIServer struct {
storageConfig storagebackend.Config
stopCh chan struct{}
cancel func(error)
}
@@ -54,6 +55,7 @@ type APIServer struct {
func NewAPIServer(storageConfig storagebackend.Config) *APIServer {
return &APIServer{
storageConfig: storageConfig,
stopCh: make(chan struct{}),
}
}
@@ -112,7 +114,7 @@ func (a *APIServer) Start(ctx context.Context) error {
return
}
err = apiserver.Run(ctx, completedOptions)
err = apiserver.Run(ctx, completedOptions, a.stopCh)
if err != nil {
errCh <- fmt.Errorf("run apiserver error: %w", err)
return
@@ -132,6 +134,10 @@ func (a *APIServer) Stop() error {
if a.cancel != nil {
a.cancel(errors.New("stopping API server"))
}
if a.stopCh != nil {
close(a.stopCh)
a.stopCh = nil
}
return nil
}