From 66e746badeb4598cc179fdd91d3ab93e555e0fe7 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 11 Apr 2015 12:23:01 -0400 Subject: [PATCH 1/3] Add a transport that can simulate random network errors --- pkg/client/chaosclient/chaosclient.go | 148 +++++++++++++++++++++ pkg/client/chaosclient/chaosclient_test.go | 82 ++++++++++++ 2 files changed, 230 insertions(+) create mode 100644 pkg/client/chaosclient/chaosclient.go create mode 100644 pkg/client/chaosclient/chaosclient_test.go diff --git a/pkg/client/chaosclient/chaosclient.go b/pkg/client/chaosclient/chaosclient.go new file mode 100644 index 00000000000..8f34015dc78 --- /dev/null +++ b/pkg/client/chaosclient/chaosclient.go @@ -0,0 +1,148 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package chaosclient makes it easy to simulate network latency, misbehaving +// servers, and random errors from servers. It is intended to stress test components +// under failure conditions and expose weaknesses in the error handling logic +// of the codebase. +package chaosclient + +import ( + "errors" + "fmt" + "log" + "math/rand" + "net/http" + "reflect" + "runtime" +) + +// chaosrt provides the ability to perform simulations of HTTP client failures +// under the Golang http.Transport interface. +type chaosrt struct { + rt http.RoundTripper + notify ChaosNotifier + c []Chaos +} + +// Chaos intercepts requests to a remote HTTP endpoint and can inject arbitrary +// failures. +type Chaos interface { + // Intercept should return true if the normal flow should be skipped, and the + // return response and error used instead. Modifications to the request will + // be ignored, but may be used to make decisions about types of failures. + Intercept(req *http.Request) (bool, *http.Response, error) +} + +// ChaosNotifier notifies another component that the ChaosRoundTripper has simulated +// a failure. +type ChaosNotifier interface { + // OnChaos is invoked when a chaotic outcome was triggered. fn is the + // source of Chaos and req was the outgoing request + OnChaos(req *http.Request, c Chaos) +} + +// ChaosFunc takes an http.Request and decides whether to alter the response. It +// returns true if it wishes to mutate the response, with a http.Response or +// error. +type ChaosFunc func(req *http.Request) (bool, *http.Response, error) + +func (fn ChaosFunc) Intercept(req *http.Request) (bool, *http.Response, error) { + return fn.Intercept(req) +} +func (fn ChaosFunc) String() string { + return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() +} + +// NewChaosRoundTripper creates an http.RoundTripper that will intercept requests +// based on the provided Chaos functions. The notifier is invoked when a Chaos +// Intercept is fired. +func NewChaosRoundTripper(rt http.RoundTripper, notify ChaosNotifier, c ...Chaos) http.RoundTripper { + return &chaosrt{rt, notify, c} +} + +// RoundTrip gives each ChaosFunc an opportunity to intercept the request. The first +// interceptor wins. +func (rt *chaosrt) RoundTrip(req *http.Request) (*http.Response, error) { + for _, c := range rt.c { + if intercept, resp, err := c.Intercept(req); intercept { + rt.notify.OnChaos(req, c) + return resp, err + } + } + return rt.rt.RoundTrip(req) +} + +// Seed represents a consistent stream of chaos. +type Seed struct { + *rand.Rand +} + +// NewSeed creates an object that assists in generating random chaotic events +// based on a deterministic seed. +func NewSeed(seed int64) Seed { + return Seed{rand.New(rand.NewSource(seed))} +} + +type pIntercept struct { + Chaos + s Seed + p float64 +} + +// P returns a ChaosFunc that fires with a probabilty of p (p between 0.0 +// and 1.0 with 0.0 meaning never and 1.0 meaning always). +func (s Seed) P(p float64, c Chaos) Chaos { + return pIntercept{c, s, p} +} + +// Intercept intercepts requests with the provided probability p. +func (c pIntercept) Intercept(req *http.Request) (bool, *http.Response, error) { + if c.s.Float64() < c.p { + return c.Chaos.Intercept(req) + } + return false, nil, nil +} + +func (c pIntercept) String() string { + return fmt.Sprintf("P{%f %s}", c.p, c.Chaos) +} + +// ErrSimulatedConnectionResetByPeer emulates the golang net error when a connection +// is reset by a peer. +// TODO: make this more accurate +// TODO: add other error types +// TODO: add a helper for returning multiple errors randomly. +var ErrSimulatedConnectionResetByPeer = Error{errors.New("connection reset by peer")} + +// Error returns the nested error when C() is invoked. +type Error struct { + error +} + +// C returns the nested error +func (e Error) Intercept(_ *http.Request) (bool, *http.Response, error) { + return true, nil, e.error +} + +// LogChaos is the default ChaosNotifier and writes a message to the Golang log. +var LogChaos = ChaosNotifier(logChaos{}) + +type logChaos struct{} + +func (logChaos) OnChaos(req *http.Request, c Chaos) { + log.Printf("Triggered chaotic behavior for %s %s: %v", req.Method, req.URL.String(), c) +} diff --git a/pkg/client/chaosclient/chaosclient_test.go b/pkg/client/chaosclient/chaosclient_test.go new file mode 100644 index 00000000000..f33b1029400 --- /dev/null +++ b/pkg/client/chaosclient/chaosclient_test.go @@ -0,0 +1,82 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package chaosclient + +import ( + "net/http" + "net/http/httptest" + "net/url" + "testing" +) + +type TestLogChaos struct { + *testing.T +} + +func (t TestLogChaos) OnChaos(req *http.Request, c Chaos) { + t.Logf("CHAOS: chaotic behavior for %s %s: %v", req.Method, req.URL.String(), c) +} + +func unwrapURLError(err error) error { + if urlErr, ok := err.(*url.Error); ok && urlErr != nil { + return urlErr.Err + } + return err +} + +func TestChaos(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + client := http.Client{ + Transport: NewChaosRoundTripper(http.DefaultTransport, TestLogChaos{t}, ErrSimulatedConnectionResetByPeer), + } + resp, err := client.Get(server.URL) + if unwrapURLError(err) != ErrSimulatedConnectionResetByPeer.error { + t.Fatalf("expected reset by peer: %v", err) + } + if resp != nil { + t.Fatalf("expected no response object: %#v", resp) + } +} + +func TestPartialChaos(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + seed := NewSeed(1) + client := http.Client{ + Transport: NewChaosRoundTripper( + http.DefaultTransport, TestLogChaos{t}, + seed.P(0.5, ErrSimulatedConnectionResetByPeer), + ), + } + success, fail := 0, 0 + for { + _, err := client.Get(server.URL) + if err != nil { + fail++ + } else { + success++ + } + if success > 1 && fail > 1 { + break + } + } +} From 0146e318f56cf0ce1e286d2b5be7ccc8ad4ab18c Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 11 Apr 2015 12:45:30 -0400 Subject: [PATCH 2/3] Allow client.Config to wrap the underyling Transport --- pkg/client/helper.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/client/helper.go b/pkg/client/helper.go index aa607dfa07e..be42dd3ac0e 100644 --- a/pkg/client/helper.go +++ b/pkg/client/helper.go @@ -75,8 +75,14 @@ type Config struct { UserAgent string // Transport may be used for custom HTTP behavior. This attribute may not - // be specified with the TLS client certificate options. + // be specified with the TLS client certificate options. Use WrapTransport + // for most client level operations. Transport http.RoundTripper + // WrapTransport will be invoked for custom HTTP behavior after the underlying + // transport is initialized (either the transport created from TLSClientConfig, + // Transport, or http.DefaultTransport). The config may layer other RoundTrippers + // on top of the returned RoundTripper. + WrapTransport func(rt http.RoundTripper) http.RoundTripper // QPS indicates the maximum QPS to the master from this client. If zero, QPS is unlimited. QPS float32 @@ -255,6 +261,9 @@ func TransportFor(config *Config) (http.RoundTripper, error) { transport = http.DefaultTransport } } + if config.WrapTransport != nil { + transport = config.WrapTransport(transport) + } transport, err = HTTPWrappersForConfig(config, transport) if err != nil { From ca335d7be9f4507ff509b4f838e1d337b776ec55 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sat, 11 Apr 2015 12:45:45 -0400 Subject: [PATCH 3/3] Add support for chaos to Kubelet and hack/local-up-cluster.sh --- cmd/kubelet/app/server.go | 29 +++++++++++++++++++++++++++-- hack/local-up-cluster.sh | 2 ++ pkg/kubectl/cmd/update.go | 1 - 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index d5528adece3..79dd3c8ab57 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -30,6 +30,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/chaosclient" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth" "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" @@ -84,7 +85,6 @@ type KubeletServer struct { ClusterDomain string MasterServiceNamespace string ClusterDNS util.IP - ReallyCrashForTesting bool StreamingConnectionIdleTimeout time.Duration ImageGCHighThresholdPercent int ImageGCLowThresholdPercent int @@ -95,6 +95,13 @@ type KubeletServer struct { TLSPrivateKeyFile string CertDirectory string NodeStatusUpdateFrequency time.Duration + + // Flags intended for testing + + // Crash immediately, rather than eating panics. + ReallyCrashForTesting bool + // Insert a probability of random errors during calls to the master. + ChaosChance float64 } // bootstrapping interface for kubelet, targets the initialization protocol @@ -181,7 +188,6 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.ClusterDomain, "cluster_domain", s.ClusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains") fs.StringVar(&s.MasterServiceNamespace, "master_service_namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods") fs.Var(&s.ClusterDNS, "cluster_dns", "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers") - fs.BoolVar(&s.ReallyCrashForTesting, "really_crash_for_testing", s.ReallyCrashForTesting, "If true, crash with panics more often.") fs.DurationVar(&s.StreamingConnectionIdleTimeout, "streaming_connection_idle_timeout", 0, "Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m'") fs.DurationVar(&s.NodeStatusUpdateFrequency, "node_status_update_frequency", s.NodeStatusUpdateFrequency, "Specifies how often kubelet posts node status to master. Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod in nodecontroller. Default: 10s") fs.IntVar(&s.ImageGCHighThresholdPercent, "image_gc_high_threshold", s.ImageGCHighThresholdPercent, "The percent of disk usage after which image garbage collection is always run. Default: 90%%") @@ -189,6 +195,10 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.NetworkPluginName, "network_plugin", s.NetworkPluginName, " The name of the network plugin to be invoked for various events in kubelet/pod lifecycle") fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.") fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") + + // Flags intended for testing, not recommended used in production environments. + fs.BoolVar(&s.ReallyCrashForTesting, "really_crash_for_testing", s.ReallyCrashForTesting, "If true, when panics occur crash. Intended for testing.") + fs.Float64Var(&s.ChaosChance, "chaos_chance", s.ChaosChance, "If > 0.0, introduce random client errors and latency. Intended for testing. [default=0.0]") } // Run runs the specified KubeletServer. This should never exit. @@ -337,6 +347,9 @@ func (s *KubeletServer) createAPIServerClient() (*client.Client, error) { glog.Infof("Multiple api servers specified. Picking first one") } clientConfig.Host = s.APIServerList[0] + + s.addChaosToClientConfig(&clientConfig) + c, err := client.New(&clientConfig) if err != nil { return nil, err @@ -344,6 +357,18 @@ func (s *KubeletServer) createAPIServerClient() (*client.Client, error) { return c, nil } +// addChaosToClientConfig injects random errors into client connections if configured. +func (s *KubeletServer) addChaosToClientConfig(config *client.Config) { + if s.ChaosChance != 0.0 { + config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper { + seed := chaosclient.NewSeed(1) + // TODO: introduce a standard chaos package with more tunables - this is just a proof of concept + // TODO: introduce random latency and stalls + return chaosclient.NewChaosRoundTripper(rt, chaosclient.LogChaos, seed.P(s.ChaosChance, chaosclient.ErrSimulatedConnectionResetByPeer)) + } + } +} + // SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an API Client. // Under the hood it calls RunKubelet (below) func SimpleKubelet(client *client.Client, diff --git a/hack/local-up-cluster.sh b/hack/local-up-cluster.sh index 08c8a0d06d1..531f257c516 100755 --- a/hack/local-up-cluster.sh +++ b/hack/local-up-cluster.sh @@ -45,6 +45,7 @@ API_HOST=${API_HOST:-127.0.0.1} API_CORS_ALLOWED_ORIGINS=${API_CORS_ALLOWED_ORIGINS:-"/127.0.0.1(:[0-9]+)?$,/localhost(:[0-9]+)?$"} KUBELET_PORT=${KUBELET_PORT:-10250} LOG_LEVEL=${LOG_LEVEL:-3} +CHAOS_CHANCE=${CHAOS_CHANCE:-0.0} # For the common local scenario, fail fast if server is already running. # this can happen if you run local-up-cluster.sh twice and kill etcd in between. @@ -139,6 +140,7 @@ CTLRMGR_PID=$! KUBELET_LOG=/tmp/kubelet.log sudo -E "${GO_OUT}/kubelet" \ --v=${LOG_LEVEL} \ + --chaos_chance="${CHAOS_CHANCE}" \ --hostname_override="127.0.0.1" \ --address="127.0.0.1" \ --api_servers="${API_HOST}:${API_PORT}" \ diff --git a/pkg/kubectl/cmd/update.go b/pkg/kubectl/cmd/update.go index cbca1e6f3c0..d8d3323d03a 100644 --- a/pkg/kubectl/cmd/update.go +++ b/pkg/kubectl/cmd/update.go @@ -114,7 +114,6 @@ func RunUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []str fmt.Fprintf(out, "%s/%s\n", info.Mapping.Resource, info.Name) return nil }) - } func updateWithPatch(cmd *cobra.Command, args []string, f *cmdutil.Factory, patch string) (string, error) {