Merge pull request #6729 from smarterclayton/chaosclient

Add a new Chaos transport that can simulate network failure and add it to the kubelet
This commit is contained in:
Fabio Yeon 2015-04-13 16:06:53 -07:00
commit e99141de0d
6 changed files with 269 additions and 4 deletions

View File

@ -31,6 +31,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/chaosclient"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth"
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
@ -85,7 +86,6 @@ type KubeletServer struct {
ClusterDomain string
MasterServiceNamespace string
ClusterDNS util.IP
ReallyCrashForTesting bool
StreamingConnectionIdleTimeout time.Duration
ImageGCHighThresholdPercent int
ImageGCLowThresholdPercent int
@ -96,6 +96,13 @@ type KubeletServer struct {
TLSPrivateKeyFile string
CertDirectory string
NodeStatusUpdateFrequency time.Duration
// Flags intended for testing
// Crash immediately, rather than eating panics.
ReallyCrashForTesting bool
// Insert a probability of random errors during calls to the master.
ChaosChance float64
}
// bootstrapping interface for kubelet, targets the initialization protocol
@ -182,7 +189,6 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.ClusterDomain, "cluster_domain", s.ClusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains")
fs.StringVar(&s.MasterServiceNamespace, "master_service_namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods")
fs.Var(&s.ClusterDNS, "cluster_dns", "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers")
fs.BoolVar(&s.ReallyCrashForTesting, "really_crash_for_testing", s.ReallyCrashForTesting, "If true, crash with panics more often.")
fs.DurationVar(&s.StreamingConnectionIdleTimeout, "streaming_connection_idle_timeout", 0, "Maximum time a streaming connection can be idle before the connection is automatically closed. Example: '5m'")
fs.DurationVar(&s.NodeStatusUpdateFrequency, "node_status_update_frequency", s.NodeStatusUpdateFrequency, "Specifies how often kubelet posts node status to master. Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod in nodecontroller. Default: 10s")
fs.IntVar(&s.ImageGCHighThresholdPercent, "image_gc_high_threshold", s.ImageGCHighThresholdPercent, "The percent of disk usage after which image garbage collection is always run. Default: 90%%")
@ -190,6 +196,10 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.NetworkPluginName, "network_plugin", s.NetworkPluginName, "<Warning: Alpha feature> The name of the network plugin to be invoked for various events in kubelet/pod lifecycle")
fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.")
fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.")
// Flags intended for testing, not recommended used in production environments.
fs.BoolVar(&s.ReallyCrashForTesting, "really_crash_for_testing", s.ReallyCrashForTesting, "If true, when panics occur crash. Intended for testing.")
fs.Float64Var(&s.ChaosChance, "chaos_chance", s.ChaosChance, "If > 0.0, introduce random client errors and latency. Intended for testing. [default=0.0]")
}
// Run runs the specified KubeletServer. This should never exit.
@ -338,6 +348,9 @@ func (s *KubeletServer) createAPIServerClient() (*client.Client, error) {
glog.Infof("Multiple api servers specified. Picking first one")
}
clientConfig.Host = s.APIServerList[0]
s.addChaosToClientConfig(&clientConfig)
c, err := client.New(&clientConfig)
if err != nil {
return nil, err
@ -345,6 +358,18 @@ func (s *KubeletServer) createAPIServerClient() (*client.Client, error) {
return c, nil
}
// addChaosToClientConfig injects random errors into client connections if configured.
func (s *KubeletServer) addChaosToClientConfig(config *client.Config) {
if s.ChaosChance != 0.0 {
config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
seed := chaosclient.NewSeed(1)
// TODO: introduce a standard chaos package with more tunables - this is just a proof of concept
// TODO: introduce random latency and stalls
return chaosclient.NewChaosRoundTripper(rt, chaosclient.LogChaos, seed.P(s.ChaosChance, chaosclient.ErrSimulatedConnectionResetByPeer))
}
}
}
// SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an API Client.
// Under the hood it calls RunKubelet (below)
func SimpleKubelet(client *client.Client,

View File

@ -45,6 +45,7 @@ API_HOST=${API_HOST:-127.0.0.1}
API_CORS_ALLOWED_ORIGINS=${API_CORS_ALLOWED_ORIGINS:-"/127.0.0.1(:[0-9]+)?$,/localhost(:[0-9]+)?$"}
KUBELET_PORT=${KUBELET_PORT:-10250}
LOG_LEVEL=${LOG_LEVEL:-3}
CHAOS_CHANCE=${CHAOS_CHANCE:-0.0}
# For the common local scenario, fail fast if server is already running.
# this can happen if you run local-up-cluster.sh twice and kill etcd in between.
@ -143,6 +144,7 @@ CTLRMGR_PID=$!
KUBELET_LOG=/tmp/kubelet.log
sudo -E "${GO_OUT}/kubelet" \
--v=${LOG_LEVEL} \
--chaos_chance="${CHAOS_CHANCE}" \
--hostname_override="127.0.0.1" \
--address="127.0.0.1" \
--api_servers="${API_HOST}:${API_PORT}" \

View File

@ -0,0 +1,148 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package chaosclient makes it easy to simulate network latency, misbehaving
// servers, and random errors from servers. It is intended to stress test components
// under failure conditions and expose weaknesses in the error handling logic
// of the codebase.
package chaosclient
import (
"errors"
"fmt"
"log"
"math/rand"
"net/http"
"reflect"
"runtime"
)
// chaosrt provides the ability to perform simulations of HTTP client failures
// under the Golang http.Transport interface.
type chaosrt struct {
rt http.RoundTripper
notify ChaosNotifier
c []Chaos
}
// Chaos intercepts requests to a remote HTTP endpoint and can inject arbitrary
// failures.
type Chaos interface {
// Intercept should return true if the normal flow should be skipped, and the
// return response and error used instead. Modifications to the request will
// be ignored, but may be used to make decisions about types of failures.
Intercept(req *http.Request) (bool, *http.Response, error)
}
// ChaosNotifier notifies another component that the ChaosRoundTripper has simulated
// a failure.
type ChaosNotifier interface {
// OnChaos is invoked when a chaotic outcome was triggered. fn is the
// source of Chaos and req was the outgoing request
OnChaos(req *http.Request, c Chaos)
}
// ChaosFunc takes an http.Request and decides whether to alter the response. It
// returns true if it wishes to mutate the response, with a http.Response or
// error.
type ChaosFunc func(req *http.Request) (bool, *http.Response, error)
func (fn ChaosFunc) Intercept(req *http.Request) (bool, *http.Response, error) {
return fn.Intercept(req)
}
func (fn ChaosFunc) String() string {
return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
}
// NewChaosRoundTripper creates an http.RoundTripper that will intercept requests
// based on the provided Chaos functions. The notifier is invoked when a Chaos
// Intercept is fired.
func NewChaosRoundTripper(rt http.RoundTripper, notify ChaosNotifier, c ...Chaos) http.RoundTripper {
return &chaosrt{rt, notify, c}
}
// RoundTrip gives each ChaosFunc an opportunity to intercept the request. The first
// interceptor wins.
func (rt *chaosrt) RoundTrip(req *http.Request) (*http.Response, error) {
for _, c := range rt.c {
if intercept, resp, err := c.Intercept(req); intercept {
rt.notify.OnChaos(req, c)
return resp, err
}
}
return rt.rt.RoundTrip(req)
}
// Seed represents a consistent stream of chaos.
type Seed struct {
*rand.Rand
}
// NewSeed creates an object that assists in generating random chaotic events
// based on a deterministic seed.
func NewSeed(seed int64) Seed {
return Seed{rand.New(rand.NewSource(seed))}
}
type pIntercept struct {
Chaos
s Seed
p float64
}
// P returns a ChaosFunc that fires with a probabilty of p (p between 0.0
// and 1.0 with 0.0 meaning never and 1.0 meaning always).
func (s Seed) P(p float64, c Chaos) Chaos {
return pIntercept{c, s, p}
}
// Intercept intercepts requests with the provided probability p.
func (c pIntercept) Intercept(req *http.Request) (bool, *http.Response, error) {
if c.s.Float64() < c.p {
return c.Chaos.Intercept(req)
}
return false, nil, nil
}
func (c pIntercept) String() string {
return fmt.Sprintf("P{%f %s}", c.p, c.Chaos)
}
// ErrSimulatedConnectionResetByPeer emulates the golang net error when a connection
// is reset by a peer.
// TODO: make this more accurate
// TODO: add other error types
// TODO: add a helper for returning multiple errors randomly.
var ErrSimulatedConnectionResetByPeer = Error{errors.New("connection reset by peer")}
// Error returns the nested error when C() is invoked.
type Error struct {
error
}
// C returns the nested error
func (e Error) Intercept(_ *http.Request) (bool, *http.Response, error) {
return true, nil, e.error
}
// LogChaos is the default ChaosNotifier and writes a message to the Golang log.
var LogChaos = ChaosNotifier(logChaos{})
type logChaos struct{}
func (logChaos) OnChaos(req *http.Request, c Chaos) {
log.Printf("Triggered chaotic behavior for %s %s: %v", req.Method, req.URL.String(), c)
}

View File

@ -0,0 +1,82 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package chaosclient
import (
"net/http"
"net/http/httptest"
"net/url"
"testing"
)
type TestLogChaos struct {
*testing.T
}
func (t TestLogChaos) OnChaos(req *http.Request, c Chaos) {
t.Logf("CHAOS: chaotic behavior for %s %s: %v", req.Method, req.URL.String(), c)
}
func unwrapURLError(err error) error {
if urlErr, ok := err.(*url.Error); ok && urlErr != nil {
return urlErr.Err
}
return err
}
func TestChaos(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
client := http.Client{
Transport: NewChaosRoundTripper(http.DefaultTransport, TestLogChaos{t}, ErrSimulatedConnectionResetByPeer),
}
resp, err := client.Get(server.URL)
if unwrapURLError(err) != ErrSimulatedConnectionResetByPeer.error {
t.Fatalf("expected reset by peer: %v", err)
}
if resp != nil {
t.Fatalf("expected no response object: %#v", resp)
}
}
func TestPartialChaos(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
seed := NewSeed(1)
client := http.Client{
Transport: NewChaosRoundTripper(
http.DefaultTransport, TestLogChaos{t},
seed.P(0.5, ErrSimulatedConnectionResetByPeer),
),
}
success, fail := 0, 0
for {
_, err := client.Get(server.URL)
if err != nil {
fail++
} else {
success++
}
if success > 1 && fail > 1 {
break
}
}
}

View File

@ -75,8 +75,14 @@ type Config struct {
UserAgent string
// Transport may be used for custom HTTP behavior. This attribute may not
// be specified with the TLS client certificate options.
// be specified with the TLS client certificate options. Use WrapTransport
// for most client level operations.
Transport http.RoundTripper
// WrapTransport will be invoked for custom HTTP behavior after the underlying
// transport is initialized (either the transport created from TLSClientConfig,
// Transport, or http.DefaultTransport). The config may layer other RoundTrippers
// on top of the returned RoundTripper.
WrapTransport func(rt http.RoundTripper) http.RoundTripper
// QPS indicates the maximum QPS to the master from this client. If zero, QPS is unlimited.
QPS float32
@ -255,6 +261,9 @@ func TransportFor(config *Config) (http.RoundTripper, error) {
transport = http.DefaultTransport
}
}
if config.WrapTransport != nil {
transport = config.WrapTransport(transport)
}
transport, err = HTTPWrappersForConfig(config, transport)
if err != nil {

View File

@ -115,7 +115,6 @@ func RunUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []str
fmt.Fprintf(out, "%s/%s\n", info.Mapping.Resource, info.Name)
return nil
})
}
func updateWithPatch(cmd *cobra.Command, args []string, f *cmdutil.Factory, patch string) (string, error) {