Merge pull request #19778 from resouer/runtime

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-02-01 21:05:05 -08:00
commit 32ab64ce5b
61 changed files with 322 additions and 250 deletions

View File

@ -55,6 +55,7 @@ import (
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilnet "k8s.io/kubernetes/pkg/util/net" utilnet "k8s.io/kubernetes/pkg/util/net"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume/empty_dir" "k8s.io/kubernetes/pkg/volume/empty_dir"
@ -964,7 +965,7 @@ func main() {
addFlags(pflag.CommandLine) addFlags(pflag.CommandLine)
util.InitFlags() util.InitFlags()
util.ReallyCrash = true utilruntime.ReallyCrash = true
util.InitLogs() util.InitLogs()
defer util.FlushLogs() defer util.FlushLogs()

View File

@ -61,6 +61,7 @@ import (
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
nodeutil "k8s.io/kubernetes/pkg/util/node" nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
) )
@ -299,7 +300,7 @@ func Run(s *options.KubeletServer, kcfg *KubeletConfig) error {
} }
} }
util.ReallyCrash = s.ReallyCrashForTesting runtime.ReallyCrash = s.ReallyCrashForTesting
rand.Seed(time.Now().UTC().UnixNano()) rand.Seed(time.Now().UTC().UnixNano())
credentialprovider.SetPreferredDockercfgPath(s.RootDirectory) credentialprovider.SetPreferredDockercfgPath(s.RootDirectory)

View File

@ -44,7 +44,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
kruntime "k8s.io/kubernetes/pkg/runtime" kruntime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
) )
const ( const (
@ -608,7 +608,7 @@ func (k *Executor) doShutdown(driver bindings.ExecutorDriver) {
if k.shutdownAlert != nil { if k.shutdownAlert != nil {
func() { func() {
util.HandleCrash() utilruntime.HandleCrash()
k.shutdownAlert() k.shutdownAlert()
}() }()
} }

View File

@ -21,6 +21,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/kubelet"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/runtime"
) )
// executorKubelet decorates the kubelet with a Run function that notifies the // executorKubelet decorates the kubelet with a Run function that notifies the
@ -38,7 +39,7 @@ func (kl *executorKubelet) Run(mergedUpdates <-chan kubetypes.PodUpdate) {
// When this Run function is called, we close it here. // When this Run function is called, we close it here.
// Otherwise, KubeletExecutorServer.runKubelet will. // Otherwise, KubeletExecutorServer.runKubelet will.
close(kl.kubeletDone) close(kl.kubeletDone)
util.HandleCrash() runtime.HandleCrash()
log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity
// important: never return! this is in our contract // important: never return! this is in our contract
select {} select {}

View File

@ -19,7 +19,7 @@ package proc
import ( import (
"sync" "sync"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime"
) )
const ( const (
@ -84,7 +84,7 @@ func stateRun(ps *processState, a *scheduledAction) stateFn {
close(a.errCh) // signal that action was scheduled close(a.errCh) // signal that action was scheduled
func() { func() {
// we don't trust clients of this package // we don't trust clients of this package
defer util.HandleCrash() defer runtime.HandleCrash()
a.action() a.action()
}() }()
return stateRun return stateRun

View File

@ -20,7 +20,7 @@ import (
"sync" "sync"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime"
) )
const ( const (
@ -42,6 +42,6 @@ var registerMetrics sync.Once
func Register() { func Register() {
registerMetrics.Do(func() { registerMetrics.Do(func() {
prometheus.MustRegister(panicCounter) prometheus.MustRegister(panicCounter)
util.PanicHandlers = append(util.PanicHandlers, func(interface{}) { panicCounter.Inc() }) runtime.PanicHandlers = append(runtime.PanicHandlers, func(interface{}) { panicCounter.Inc() })
}) })
} }

View File

@ -21,7 +21,7 @@ import (
"sync" "sync"
"time" "time"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime"
) )
type Signal <-chan struct{} type Signal <-chan struct{}
@ -90,7 +90,7 @@ func After(f func()) Signal {
ch := make(chan struct{}) ch := make(chan struct{})
go func() { go func() {
defer close(ch) defer close(ch)
defer util.HandleCrash() defer runtime.HandleCrash()
if f != nil { if f != nil {
f() f()
} }
@ -111,7 +111,7 @@ func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
default: default:
} }
func() { func() {
defer util.HandleCrash() defer runtime.HandleCrash()
f() f()
}() }()
select { select {

View File

@ -25,7 +25,7 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime"
) )
type ErrorHandler interface { type ErrorHandler interface {
@ -57,7 +57,7 @@ func (k *errorHandler) Error(pod *api.Pod, schedulingErr error) {
} }
log.Infof("Error scheduling %v: %v; retrying", pod.Name, schedulingErr) log.Infof("Error scheduling %v: %v; retrying", pod.Name, schedulingErr)
defer util.HandleCrash() defer runtime.HandleCrash()
// default upstream scheduler passes pod.Name as binding.PodID // default upstream scheduler passes pod.Name as binding.PodID
ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace) ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace)

View File

@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/intstr"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -118,14 +119,14 @@ type endpointController struct {
// Runs e; will not return until stopCh is closed. workers determines how many // Runs e; will not return until stopCh is closed. workers determines how many
// endpoints will be handled in parallel. // endpoints will be handled in parallel.
func (e *endpointController) Run(workers int, stopCh <-chan struct{}) { func (e *endpointController) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash() defer utilruntime.HandleCrash()
go e.serviceController.Run(stopCh) go e.serviceController.Run(stopCh)
go e.podController.Run(stopCh) go e.podController.Run(stopCh)
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go util.Until(e.worker, time.Second, stopCh) go util.Until(e.worker, time.Second, stopCh)
} }
go func() { go func() {
defer util.HandleCrash() defer utilruntime.HandleCrash()
time.Sleep(5 * time.Minute) // give time for our cache to fill time.Sleep(5 * time.Minute) // give time for our cache to fill
e.checkLeftoverEndpoints() e.checkLeftoverEndpoints()
}() }()

View File

@ -38,10 +38,10 @@ import (
"k8s.io/kubernetes/pkg/apiserver/metrics" "k8s.io/kubernetes/pkg/apiserver/metrics"
"k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
utilerrors "k8s.io/kubernetes/pkg/util/errors" utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/flushwriter" "k8s.io/kubernetes/pkg/util/flushwriter"
utilnet "k8s.io/kubernetes/pkg/util/net" utilnet "k8s.io/kubernetes/pkg/util/net"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wsstream" "k8s.io/kubernetes/pkg/util/wsstream"
"k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/version"
@ -343,7 +343,7 @@ func write(statusCode int, gv unversioned.GroupVersion, s runtime.NegotiatedSeri
if wsstream.IsWebSocketRequest(req) { if wsstream.IsWebSocketRequest(req) {
r := wsstream.NewReader(out, true) r := wsstream.NewReader(out, true)
if err := r.Copy(w, req); err != nil { if err := r.Copy(w, req); err != nil {
util.HandleError(fmt.Errorf("error encountered while streaming results via websocket: %v", err)) utilruntime.HandleError(fmt.Errorf("error encountered while streaming results via websocket: %v", err))
} }
return return
} }
@ -392,7 +392,7 @@ func errorNegotiated(err error, s runtime.NegotiatedSerializer, gv unversioned.G
// errorJSONFatal renders an error to the response, and if codec fails will render plaintext. // errorJSONFatal renders an error to the response, and if codec fails will render plaintext.
// Returns the HTTP status code of the error. // Returns the HTTP status code of the error.
func errorJSONFatal(err error, codec runtime.Encoder, w http.ResponseWriter) int { func errorJSONFatal(err error, codec runtime.Encoder, w http.ResponseWriter) int {
util.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err)) utilruntime.HandleError(fmt.Errorf("apiserver was unable to write a JSON response: %v", err))
status := errToAPIStatus(err) status := errToAPIStatus(err)
code := int(status.Code) code := int(status.Code)
output, err := runtime.Encode(codec, status) output, err := runtime.Encode(codec, status)

View File

@ -23,7 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime"
) )
// statusError is an object that can be converted into an unversioned.Status // statusError is an object that can be converted into an unversioned.Status
@ -60,7 +60,7 @@ func errToAPIStatus(err error) *unversioned.Status {
// by REST storage - these typically indicate programmer // by REST storage - these typically indicate programmer
// error by not using pkg/api/errors, or unexpected failure // error by not using pkg/api/errors, or unexpected failure
// cases. // cases.
util.HandleError(fmt.Errorf("apiserver received an error that is not an unversioned.Status: %v", err)) runtime.HandleError(fmt.Errorf("apiserver received an error that is not an unversioned.Status: %v", err))
return &unversioned.Status{ return &unversioned.Status{
Status: unversioned.StatusFailure, Status: unversioned.StatusFailure,
Code: int32(status), Code: int32(status),

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/strategicpatch" "k8s.io/kubernetes/pkg/util/strategicpatch"
"github.com/emicklei/go-restful" "github.com/emicklei/go-restful"
@ -906,7 +907,7 @@ func finishRequest(timeout time.Duration, fn resultFunc) (result runtime.Object,
panicCh := make(chan interface{}, 1) panicCh := make(chan interface{}, 1)
go func() { go func() {
// panics don't cross goroutine boundaries, so we have to handle ourselves // panics don't cross goroutine boundaries, so we have to handle ourselves
defer util.HandleCrash(func(panicReason interface{}) { defer utilruntime.HandleCrash(func(panicReason interface{}) {
// Propagate to parent goroutine // Propagate to parent goroutine
panicCh <- panicReason panicCh <- panicReason
}) })

View File

@ -21,6 +21,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/runtime"
) )
// ExpirationCache implements the store interface // ExpirationCache implements the store interface
@ -90,7 +91,7 @@ func (c *ExpirationCache) getOrExpire(key string) (interface{}, bool) {
// fails; as long as we only return un-expired entries a // fails; as long as we only return un-expired entries a
// reader doesn't need to wait for the result of the delete. // reader doesn't need to wait for the result of the delete.
go func() { go func() {
defer util.HandleCrash() defer runtime.HandleCrash()
c.cacheStorage.Delete(key) c.cacheStorage.Delete(key)
}() }()
return nil, false return nil, false

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
@ -267,7 +268,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
case io.ErrUnexpectedEOF: case io.ErrUnexpectedEOF:
glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err) glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
default: default:
util.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err)) utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
} }
// If this is "connection refused" error, it means that most likely apiserver is not responsive. // If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart // It doesn't make sense to re-list all objects because most likely we will be able to restart
@ -329,12 +330,12 @@ loop:
return apierrs.FromObject(event.Object) return apierrs.FromObject(event.Object)
} }
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a { if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
util.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue continue
} }
meta, err := meta.Accessor(event.Object) meta, err := meta.Accessor(event.Object)
if err != nil { if err != nil {
util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
continue continue
} }
newResourceVersion := meta.GetResourceVersion() newResourceVersion := meta.GetResourceVersion()
@ -349,7 +350,7 @@ loop:
// to change this. // to change this.
r.store.Delete(event.Object) r.store.Delete(event.Object)
default: default:
util.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
} }
*resourceVersion = newResourceVersion *resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion) r.setLastSyncResourceVersion(newResourceVersion)

View File

@ -61,6 +61,7 @@ import (
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog" "github.com/golang/glog"
@ -169,7 +170,7 @@ type LeaderElectionRecord struct {
// Run starts the leader election loop // Run starts the leader election loop
func (le *LeaderElector) Run() { func (le *LeaderElector) Run() {
defer func() { defer func() {
util.HandleCrash() runtime.HandleCrash()
le.config.Callbacks.OnStoppedLeading() le.config.Callbacks.OnStoppedLeading()
}() }()
le.acquire() le.acquire()

View File

@ -27,6 +27,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
@ -120,7 +121,7 @@ func recordToSink(sink EventSink, event *api.Event, eventCorrelator *EventCorrel
event = &eventCopy event = &eventCopy
result, err := eventCorrelator.EventCorrelate(event) result, err := eventCorrelator.EventCorrelate(event)
if err != nil { if err != nil {
util.HandleError(err) utilruntime.HandleError(err)
} }
if result.Skip { if result.Skip {
return return
@ -216,7 +217,7 @@ func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format stri
func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*api.Event)) watch.Interface { func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*api.Event)) watch.Interface {
watcher := eventBroadcaster.Watch() watcher := eventBroadcaster.Watch()
go func() { go func() {
defer util.HandleCrash() defer utilruntime.HandleCrash()
for { for {
watchEvent, open := <-watcher.ResultChan() watchEvent, open := <-watcher.ResultChan()
if !open { if !open {
@ -262,7 +263,7 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, timestamp unv
go func() { go func() {
// NOTE: events should be a non-blocking operation // NOTE: events should be a non-blocking operation
defer util.HandleCrash() defer utilruntime.HandleCrash()
recorder.Action(watch.Added, event) recorder.Action(watch.Added, event)
}() }()
} }

View File

@ -30,8 +30,8 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/server/portforward" "k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/runtime"
) )
// PortForwarder knows how to listen for local connections and forward them to // PortForwarder knows how to listen for local connections and forward them to
@ -165,7 +165,7 @@ func (pf *PortForwarder) forward() error {
select { select {
case <-pf.stopChan: case <-pf.stopChan:
case <-pf.streamConn.CloseChan(): case <-pf.streamConn.CloseChan():
util.HandleError(errors.New("lost connection to pod")) runtime.HandleError(errors.New("lost connection to pod"))
} }
return nil return nil
@ -199,7 +199,7 @@ func (pf *PortForwarder) listenOnPortAndAddress(port *ForwardedPort, protocol st
func (pf *PortForwarder) getListener(protocol string, hostname string, port *ForwardedPort) (net.Listener, error) { func (pf *PortForwarder) getListener(protocol string, hostname string, port *ForwardedPort) (net.Listener, error) {
listener, err := net.Listen(protocol, fmt.Sprintf("%s:%d", hostname, port.Local)) listener, err := net.Listen(protocol, fmt.Sprintf("%s:%d", hostname, port.Local))
if err != nil { if err != nil {
util.HandleError(fmt.Errorf("Unable to create listener: Error %s", err)) runtime.HandleError(fmt.Errorf("Unable to create listener: Error %s", err))
return nil, err return nil, err
} }
listenerAddress := listener.Addr().String() listenerAddress := listener.Addr().String()
@ -223,7 +223,7 @@ func (pf *PortForwarder) waitForConnection(listener net.Listener, port Forwarded
if err != nil { if err != nil {
// TODO consider using something like https://github.com/hydrogen18/stoppableListener? // TODO consider using something like https://github.com/hydrogen18/stoppableListener?
if !strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") { if !strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") {
util.HandleError(fmt.Errorf("Error accepting connection on port %d: %v", port.Local, err)) runtime.HandleError(fmt.Errorf("Error accepting connection on port %d: %v", port.Local, err))
} }
return return
} }
@ -255,7 +255,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
headers.Set(api.PortForwardRequestIDHeader, strconv.Itoa(requestID)) headers.Set(api.PortForwardRequestIDHeader, strconv.Itoa(requestID))
errorStream, err := pf.streamConn.CreateStream(headers) errorStream, err := pf.streamConn.CreateStream(headers)
if err != nil { if err != nil {
util.HandleError(fmt.Errorf("error creating error stream for port %d -> %d: %v", port.Local, port.Remote, err)) runtime.HandleError(fmt.Errorf("error creating error stream for port %d -> %d: %v", port.Local, port.Remote, err))
return return
} }
// we're not writing to this stream // we're not writing to this stream
@ -277,7 +277,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
headers.Set(api.StreamType, api.StreamTypeData) headers.Set(api.StreamType, api.StreamTypeData)
dataStream, err := pf.streamConn.CreateStream(headers) dataStream, err := pf.streamConn.CreateStream(headers)
if err != nil { if err != nil {
util.HandleError(fmt.Errorf("error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err)) runtime.HandleError(fmt.Errorf("error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err))
return return
} }
@ -287,7 +287,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
go func() { go func() {
// Copy from the remote side to the local port. // Copy from the remote side to the local port.
if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
util.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err)) runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err))
} }
// inform the select below that the remote copy is done // inform the select below that the remote copy is done
@ -300,7 +300,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
// Copy from the local port to the remote side. // Copy from the local port to the remote side.
if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
util.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err)) runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err))
// break out of the select below without waiting for the other copy to finish // break out of the select below without waiting for the other copy to finish
close(localError) close(localError)
} }
@ -315,7 +315,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
// always expect something on errorChan (it may be nil) // always expect something on errorChan (it may be nil)
err = <-errorChan err = <-errorChan
if err != nil { if err != nil {
util.HandleError(err) runtime.HandleError(err)
} }
} }
@ -323,7 +323,7 @@ func (pf *PortForwarder) Close() {
// stop all listeners // stop all listeners
for _, l := range pf.listeners { for _, l := range pf.listeners {
if err := l.Close(); err != nil { if err := l.Close(); err != nil {
util.HandleError(fmt.Errorf("error closing listener: %v", err)) runtime.HandleError(fmt.Errorf("error closing listener: %v", err))
} }
} }
} }

View File

@ -24,8 +24,8 @@ import (
"sync" "sync"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/runtime"
) )
// streamProtocolV2 implements version 2 of the streaming protocol for attach // streamProtocolV2 implements version 2 of the streaming protocol for attach
@ -113,7 +113,7 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
defer once.Do(func() { remoteStdin.Close() }) defer once.Do(func() { remoteStdin.Close() })
if _, err := io.Copy(remoteStdin, e.stdin); err != nil { if _, err := io.Copy(remoteStdin, e.stdin); err != nil {
util.HandleError(err) runtime.HandleError(err)
} }
}() }()
@ -133,7 +133,7 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
// this "copy" doesn't actually read anything - it's just here to wait for // this "copy" doesn't actually read anything - it's just here to wait for
// the server to close remoteStdin. // the server to close remoteStdin.
if _, err := io.Copy(ioutil.Discard, remoteStdin); err != nil { if _, err := io.Copy(ioutil.Discard, remoteStdin); err != nil {
util.HandleError(err) runtime.HandleError(err)
} }
}() }()
} }
@ -143,7 +143,7 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
go func() { go func() {
defer wg.Done() defer wg.Done()
if _, err := io.Copy(e.stdout, remoteStdout); err != nil { if _, err := io.Copy(e.stdout, remoteStdout); err != nil {
util.HandleError(err) runtime.HandleError(err)
} }
}() }()
} }
@ -153,7 +153,7 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
go func() { go func() {
defer wg.Done() defer wg.Done()
if _, err := io.Copy(e.stderr, remoteStderr); err != nil { if _, err := io.Copy(e.stderr, remoteStderr); err != nil {
util.HandleError(err) runtime.HandleError(err)
} }
}() }()
} }

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
@ -183,7 +184,7 @@ func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod contro
// Run begins watching and syncing daemon sets. // Run begins watching and syncing daemon sets.
func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash() defer utilruntime.HandleCrash()
glog.Infof("Starting Daemon Sets controller manager") glog.Infof("Starting Daemon Sets controller manager")
controller.SyncAllPodsWithStore(dsc.kubeClient, dsc.podStore.Store) controller.SyncAllPodsWithStore(dsc.kubeClient, dsc.podStore.Store)
go dsc.dsController.Run(stopCh) go dsc.dsController.Run(stopCh)
@ -444,7 +445,7 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) {
if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &ds.Spec.Template, ds); err != nil { if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &ds.Spec.Template, ds); err != nil {
glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.CreationObserved(dsKey) dsc.expectations.CreationObserved(dsKey)
util.HandleError(err) utilruntime.HandleError(err)
} }
}(i) }(i)
} }
@ -459,7 +460,7 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) {
if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil { if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.DeletionObserved(dsKey) dsc.expectations.DeletionObserved(dsKey)
util.HandleError(err) utilruntime.HandleError(err)
} }
}(i) }(i)
} }

View File

@ -40,6 +40,7 @@ import (
utilerrors "k8s.io/kubernetes/pkg/util/errors" utilerrors "k8s.io/kubernetes/pkg/util/errors"
labelsutil "k8s.io/kubernetes/pkg/util/labels" labelsutil "k8s.io/kubernetes/pkg/util/labels"
podutil "k8s.io/kubernetes/pkg/util/pod" podutil "k8s.io/kubernetes/pkg/util/pod"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
@ -185,7 +186,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
// Run begins watching and syncing. // Run begins watching and syncing.
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash() defer utilruntime.HandleCrash()
go dc.dController.Run(stopCh) go dc.dController.Run(stopCh)
go dc.rcController.Run(stopCh) go dc.rcController.Run(stopCh)
go dc.podController.Run(stopCh) go dc.podController.Run(stopCh)

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -123,14 +124,14 @@ type EndpointController struct {
// Runs e; will not return until stopCh is closed. workers determines how many // Runs e; will not return until stopCh is closed. workers determines how many
// endpoints will be handled in parallel. // endpoints will be handled in parallel.
func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash() defer utilruntime.HandleCrash()
go e.serviceController.Run(stopCh) go e.serviceController.Run(stopCh)
go e.podController.Run(stopCh) go e.podController.Run(stopCh)
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go util.Until(e.worker, time.Second, stopCh) go util.Until(e.worker, time.Second, stopCh)
} }
go func() { go func() {
defer util.HandleCrash() defer utilruntime.HandleCrash()
time.Sleep(5 * time.Minute) // give time for our cache to fill time.Sleep(5 * time.Minute) // give time for our cache to fill
e.checkLeftoverEndpoints() e.checkLeftoverEndpoints()
}() }()

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
) )
// Config contains all the settings for a Controller. // Config contains all the settings for a Controller.
@ -79,7 +80,7 @@ func New(c *Config) *Controller {
// It's an error to call Run more than once. // It's an error to call Run more than once.
// Run blocks; call via go. // Run blocks; call via go.
func (c *Controller) Run(stopCh <-chan struct{}) { func (c *Controller) Run(stopCh <-chan struct{}) {
defer util.HandleCrash() defer utilruntime.HandleCrash()
r := cache.NewReflector( r := cache.NewReflector(
c.config.ListerWatcher, c.config.ListerWatcher,
c.config.ObjectType, c.config.ObjectType,

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
@ -103,7 +104,7 @@ func (gcc *GCController) gc() {
defer wait.Done() defer wait.Done()
if err := gcc.deletePod(namespace, name); err != nil { if err := gcc.deletePod(namespace, name); err != nil {
// ignore not founds // ignore not founds
defer util.HandleError(err) defer utilruntime.HandleError(err)
} }
}(terminatedPods[i].Namespace, terminatedPods[i].Name) }(terminatedPods[i].Namespace, terminatedPods[i].Name)
} }

View File

@ -35,6 +35,7 @@ import (
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
@ -135,7 +136,7 @@ func NewJobController(kubeClient clientset.Interface, resyncPeriod controller.Re
// Run the main goroutine responsible for watching and syncing jobs. // Run the main goroutine responsible for watching and syncing jobs.
func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash() defer utilruntime.HandleCrash()
go jm.jobController.Run(stopCh) go jm.jobController.Run(stopCh)
go jm.podController.Run(stopCh) go jm.podController.Run(stopCh)
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
@ -349,7 +350,7 @@ func (jm *JobController) syncJob(key string) error {
go func(ix int) { go func(ix int) {
defer wait.Done() defer wait.Done()
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, &job); err != nil { if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, &job); err != nil {
defer util.HandleError(err) defer utilruntime.HandleError(err)
} }
}(i) }(i)
} }
@ -469,7 +470,7 @@ func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int, job *ex
go func(ix int) { go func(ix int) {
defer wait.Done() defer wait.Done()
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil { if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {
defer util.HandleError(err) defer utilruntime.HandleError(err)
// Decrement the expected number of deletes because the informer won't observe this deletion // Decrement the expected number of deletes because the informer won't observe this deletion
jm.expectations.DeletionObserved(jobKey) jm.expectations.DeletionObserved(jobKey)
activeLock.Lock() activeLock.Lock()
@ -514,7 +515,7 @@ func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int, job *ex
go func() { go func() {
defer wait.Done() defer wait.Done()
if err := jm.podControl.CreatePods(job.Namespace, &job.Spec.Template, job); err != nil { if err := jm.podControl.CreatePods(job.Namespace, &job.Spec.Template, job); err != nil {
defer util.HandleError(err) defer utilruntime.HandleError(err)
// Decrement the expected number of creates because the informer won't observe this pod // Decrement the expected number of creates because the informer won't observe this pod
jm.expectations.CreationObserved(jobKey) jm.expectations.CreationObserved(jobKey)
activeLock.Lock() activeLock.Lock()

View File

@ -27,7 +27,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -70,12 +70,12 @@ func NewNamespaceController(kubeClient client.Interface, versions *unversioned.A
glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", namespace.Name, t) glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", namespace.Name, t)
time.Sleep(time.Duration(t) * time.Second) time.Sleep(time.Duration(t) * time.Second)
if err := controller.Requeue(namespace); err != nil { if err := controller.Requeue(namespace); err != nil {
util.HandleError(err) utilruntime.HandleError(err)
} }
}() }()
return return
} }
util.HandleError(err) utilruntime.HandleError(err)
} }
}, },
UpdateFunc: func(oldObj, newObj interface{}) { UpdateFunc: func(oldObj, newObj interface{}) {
@ -87,12 +87,12 @@ func NewNamespaceController(kubeClient client.Interface, versions *unversioned.A
glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", namespace.Name, t) glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", namespace.Name, t)
time.Sleep(time.Duration(t) * time.Second) time.Sleep(time.Duration(t) * time.Second)
if err := controller.Requeue(namespace); err != nil { if err := controller.Requeue(namespace); err != nil {
util.HandleError(err) utilruntime.HandleError(err)
} }
}() }()
return return
} }
util.HandleError(err) utilruntime.HandleError(err)
} }
}, },
}, },

View File

@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -241,7 +242,7 @@ func (nc *NodeController) Run(period time.Duration) {
nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
remaining, err := nc.deletePods(value.Value) remaining, err := nc.deletePods(value.Value)
if err != nil { if err != nil {
util.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0 return false, 0
} }
@ -260,7 +261,7 @@ func (nc *NodeController) Run(period time.Duration) {
nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) { nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) {
completed, remaining, err := nc.terminatePods(value.Value, value.AddedAt) completed, remaining, err := nc.terminatePods(value.Value, value.AddedAt)
if err != nil { if err != nil {
util.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err)) utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
return false, 0 return false, 0
} }
@ -333,7 +334,7 @@ func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
// this can only happen if the Store.KeyFunc has a problem creating // this can only happen if the Store.KeyFunc has a problem creating
// a key for the pod. If it happens once, it will happen again so // a key for the pod. If it happens once, it will happen again so
// don't bother requeuing the pod. // don't bother requeuing the pod.
util.HandleError(err) utilruntime.HandleError(err)
return return
} }
@ -366,7 +367,7 @@ func forcefullyDeletePod(c client.Interface, pod *api.Pod) {
var zero int64 var zero int64
err := c.Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero}) err := c.Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero})
if err != nil { if err != nil {
util.HandleError(err) utilruntime.HandleError(err)
} }
} }
@ -456,7 +457,7 @@ func (nc *NodeController) monitorNodeStatus() error {
if readyCondition.Status != api.ConditionTrue && lastReadyCondition.Status == api.ConditionTrue { if readyCondition.Status != api.ConditionTrue && lastReadyCondition.Status == api.ConditionTrue {
nc.recordNodeStatusChange(node, "NodeNotReady") nc.recordNodeStatusChange(node, "NodeNotReady")
if err = nc.markAllPodsNotReady(node.Name); err != nil { if err = nc.markAllPodsNotReady(node.Name); err != nil {
util.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err)) utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
} }
} }

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
@ -182,7 +183,7 @@ func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder)
// Run begins watching and syncing. // Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash() defer utilruntime.HandleCrash()
go rsc.rsController.Run(stopCh) go rsc.rsController.Run(stopCh)
go rsc.podController.Run(stopCh) go rsc.podController.Run(stopCh)
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
@ -360,7 +361,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext
// Decrement the expected number of creates because the informer won't observe this pod // Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name) glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name)
rsc.expectations.CreationObserved(rsKey) rsc.expectations.CreationObserved(rsKey)
util.HandleError(err) utilruntime.HandleError(err)
} }
}() }()
} }
@ -388,7 +389,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext
// Decrement the expected number of deletes because the informer won't observe this deletion // Decrement the expected number of deletes because the informer won't observe this deletion
glog.V(2).Infof("Failed deletion, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name) glog.V(2).Infof("Failed deletion, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name)
rsc.expectations.DeletionObserved(rsKey) rsc.expectations.DeletionObserved(rsKey)
util.HandleError(err) utilruntime.HandleError(err)
} }
}(i) }(i)
} }

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
@ -184,7 +185,7 @@ func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) {
// Run begins watching and syncing. // Run begins watching and syncing.
func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash() defer utilruntime.HandleCrash()
glog.Infof("Starting RC Manager") glog.Infof("Starting RC Manager")
controller.SyncAllPodsWithStore(rm.kubeClient, rm.podStore.Store) controller.SyncAllPodsWithStore(rm.kubeClient, rm.podStore.Store)
go rm.rcController.Run(stopCh) go rm.rcController.Run(stopCh)
@ -364,7 +365,7 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re
// Decrement the expected number of creates because the informer won't observe this pod // Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
rm.expectations.CreationObserved(rcKey) rm.expectations.CreationObserved(rcKey)
util.HandleError(err) utilruntime.HandleError(err)
} }
}() }()
} }
@ -392,7 +393,7 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re
// Decrement the expected number of deletes because the informer won't observe this deletion // Decrement the expected number of deletes because the informer won't observe this deletion
glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
rm.expectations.DeletionObserved(rcKey) rm.expectations.DeletionObserved(rcKey)
util.HandleError(err) utilruntime.HandleError(err)
} }
}(i) }(i)
} }

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
@ -153,7 +154,7 @@ func (rq *ResourceQuotaController) worker() {
defer rq.queue.Done(key) defer rq.queue.Done(key)
err := rq.syncHandler(key.(string)) err := rq.syncHandler(key.(string))
if err != nil { if err != nil {
util.HandleError(err) utilruntime.HandleError(err)
} }
}() }()
} }
@ -161,7 +162,7 @@ func (rq *ResourceQuotaController) worker() {
// Run begins quota controller using the specified number of workers // Run begins quota controller using the specified number of workers
func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) { func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash() defer utilruntime.HandleCrash()
go rq.rqController.Run(stopCh) go rq.rqController.Run(stopCh)
go rq.podController.Run(stopCh) go rq.podController.Run(stopCh)
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {

View File

@ -32,7 +32,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime"
) )
const ( const (
@ -188,7 +188,7 @@ func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) {
time.Sleep(processingRetryInterval) time.Sleep(processingRetryInterval)
serviceQueue.AddIfNotPresent(deltas) serviceQueue.AddIfNotPresent(deltas)
} else if err != nil { } else if err != nil {
util.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err)) runtime.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err))
} }
} }
} }

View File

@ -31,7 +31,7 @@ import (
"k8s.io/kubernetes/pkg/registry/secret" "k8s.io/kubernetes/pkg/registry/secret"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -255,7 +255,7 @@ func (e *TokensController) secretDeleted(obj interface{}) {
if err := client.RetryOnConflict(RemoveTokenBackoff, func() error { if err := client.RetryOnConflict(RemoveTokenBackoff, func() error {
return e.removeSecretReferenceIfNeeded(serviceAccount, secret.Name) return e.removeSecretReferenceIfNeeded(serviceAccount, secret.Name)
}); err != nil { }); err != nil {
util.HandleError(err) utilruntime.HandleError(err)
} }
} }

View File

@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/ui" "k8s.io/kubernetes/pkg/ui"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilnet "k8s.io/kubernetes/pkg/util/net" utilnet "k8s.io/kubernetes/pkg/util/net"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
systemd "github.com/coreos/go-systemd/daemon" systemd "github.com/coreos/go-systemd/daemon"
@ -617,7 +618,7 @@ func (s *GenericAPIServer) Run(options *ServerRunOptions) {
} }
go func() { go func() {
defer util.HandleCrash() defer utilruntime.HandleCrash()
for { for {
// err == systemd.SdNotifyNoSocket when not running on a systemd system // err == systemd.SdNotifyNoSocket when not running on a systemd system
if err := systemd.SdNotify("READY=1\n"); err != nil && err != systemd.SdNotifyNoSocket { if err := systemd.SdNotify("READY=1\n"); err != nil && err != systemd.SdNotifyNoSocket {

View File

@ -34,7 +34,7 @@ import (
cadvisorapiv2 "github.com/google/cadvisor/info/v2" cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"github.com/google/cadvisor/manager" "github.com/google/cadvisor/manager"
"github.com/google/cadvisor/utils/sysfs" "github.com/google/cadvisor/utils/sysfs"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime"
) )
type cadvisorClient struct { type cadvisorClient struct {
@ -119,7 +119,7 @@ func (cc *cadvisorClient) exportHTTP(port uint) error {
// If export failed, retry in the background until we are able to bind. // If export failed, retry in the background until we are able to bind.
// This allows an existing cAdvisor to be killed before this one registers. // This allows an existing cAdvisor to be killed before this one registers.
go func() { go func() {
defer util.HandleCrash() defer runtime.HandleCrash()
err := serv.ListenAndServe() err := serv.ListenAndServe()
for err != nil { for err != nil {

View File

@ -54,6 +54,7 @@ import (
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/procfs" "k8s.io/kubernetes/pkg/util/procfs"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
utilstrings "k8s.io/kubernetes/pkg/util/strings" utilstrings "k8s.io/kubernetes/pkg/util/strings"
) )
@ -1297,7 +1298,7 @@ func (dm *DockerManager) killPodWithSyncResult(pod *api.Pod, runningPod kubecont
wg.Add(len(runningPod.Containers)) wg.Add(len(runningPod.Containers))
for _, container := range runningPod.Containers { for _, container := range runningPod.Containers {
go func(container *kubecontainer.Container) { go func(container *kubecontainer.Container) {
defer util.HandleCrash() defer utilruntime.HandleCrash()
defer wg.Done() defer wg.Done()
var containerSpec *api.Container var containerSpec *api.Container
@ -1418,7 +1419,7 @@ func (dm *DockerManager) killContainer(containerID kubecontainer.ContainerID, co
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
defer close(done) defer close(done)
defer util.HandleCrash() defer utilruntime.HandleCrash()
if err := dm.runner.Run(containerID, pod, container, container.Lifecycle.PreStop); err != nil { if err := dm.runner.Run(containerID, pod, container, container.Lifecycle.PreStop); err != nil {
glog.Errorf("preStop hook for container %q failed: %v", name, err) glog.Errorf("preStop hook for container %q failed: %v", name, err)
} }

View File

@ -76,6 +76,7 @@ import (
nodeutil "k8s.io/kubernetes/pkg/util/node" nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/procfs" "k8s.io/kubernetes/pkg/util/procfs"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/selinux" "k8s.io/kubernetes/pkg/util/selinux"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/util/validation/field"
@ -1632,7 +1633,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
// Kill pods we can't run. // Kill pods we can't run.
if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil { if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil {
if err := kl.killPod(pod, runningPod); err != nil { if err := kl.killPod(pod, runningPod); err != nil {
util.HandleError(err) utilruntime.HandleError(err)
} }
return err return err
} }

View File

@ -57,6 +57,7 @@ import (
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/bandwidth" "k8s.io/kubernetes/pkg/util/bandwidth"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
@ -64,7 +65,7 @@ import (
) )
func init() { func init() {
util.ReallyCrash = true utilruntime.ReallyCrash = true
} }
const testKubeletHostname = "127.0.0.1" const testKubeletHostname = "127.0.0.1"

View File

@ -24,7 +24,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime"
) )
type OOMWatcher interface { type OOMWatcher interface {
@ -60,7 +60,7 @@ func (ow *realOOMWatcher) Start(ref *api.ObjectReference) error {
} }
go func() { go func() {
defer util.HandleCrash() defer runtime.HandleCrash()
for event := range eventChannel.GetChannel() { for event := range eventChannel.GetChannel() {
glog.V(2).Infof("Got sys oom event from cadvisor: %v", event) glog.V(2).Infof("Got sys oom event from cadvisor: %v", event)

View File

@ -27,7 +27,7 @@ import (
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime"
) )
// PodWorkers is an abstract interface for testability. // PodWorkers is an abstract interface for testability.
@ -171,7 +171,7 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kube
// the status of the pod for the first pod worker sync. See corresponding // the status of the pod for the first pod worker sync. See corresponding
// comment in syncPod. // comment in syncPod.
go func() { go func() {
defer util.HandleCrash() defer runtime.HandleCrash()
p.managePodLoop(podUpdates) p.managePodLoop(podUpdates)
}() }()
} }

View File

@ -27,11 +27,12 @@ import (
"k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
) )
func init() { func init() {
util.ReallyCrash = true runtime.ReallyCrash = true
} }
var defaultProbe *api.Probe = &api.Probe{ var defaultProbe *api.Probe = &api.Probe{

View File

@ -24,7 +24,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime"
) )
// worker handles the periodic probing of its assigned container. Each worker has a go-routine // worker handles the periodic probing of its assigned container. Each worker has a go-routine
@ -120,7 +120,7 @@ probeLoop:
// doProbe probes the container once and records the result. // doProbe probes the container once and records the result.
// Returns whether the worker should continue. // Returns whether the worker should continue.
func (w *worker) doProbe() (keepGoing bool) { func (w *worker) doProbe() (keepGoing bool) {
defer util.HandleCrash(func(_ interface{}) { keepGoing = true }) defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })
status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID) status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
if !ok { if !ok {

View File

@ -32,11 +32,12 @@ import (
"k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
) )
func init() { func init() {
util.ReallyCrash = true runtime.ReallyCrash = true
} }
func TestDoProbe(t *testing.T) { func TestDoProbe(t *testing.T) {
@ -251,7 +252,7 @@ func TestCleanUp(t *testing.T) {
} }
func TestHandleCrash(t *testing.T) { func TestHandleCrash(t *testing.T) {
util.ReallyCrash = false // Test that we *don't* really crash. runtime.ReallyCrash = false // Test that we *don't* really crash.
m := newTestManager() m := newTestManager()
w := newTestWorker(m, readiness, api.Probe{}) w := newTestWorker(m, readiness, api.Probe{})

View File

@ -50,11 +50,11 @@ import (
"k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flushwriter" "k8s.io/kubernetes/pkg/util/flushwriter"
"k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/httpstream/spdy" "k8s.io/kubernetes/pkg/util/httpstream/spdy"
"k8s.io/kubernetes/pkg/util/limitwriter" "k8s.io/kubernetes/pkg/util/limitwriter"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wsstream" "k8s.io/kubernetes/pkg/util/wsstream"
) )
@ -763,7 +763,7 @@ func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder Po
// negotiated protocol isn't currently used server side, but could be in the future // negotiated protocol isn't currently used server side, but could be in the future
if err != nil { if err != nil {
// Handshake writes the error to the client // Handshake writes the error to the client
util.HandleError(err) utilruntime.HandleError(err)
return return
} }
@ -865,7 +865,7 @@ func (h *portForwardStreamHandler) monitorStreamPair(p *portForwardStreamPair, t
select { select {
case <-timeout: case <-timeout:
err := fmt.Errorf("(conn=%p, request=%s) timed out waiting for streams", h.conn, p.requestID) err := fmt.Errorf("(conn=%p, request=%s) timed out waiting for streams", h.conn, p.requestID)
util.HandleError(err) utilruntime.HandleError(err)
p.printError(err.Error()) p.printError(err.Error())
case <-p.complete: case <-p.complete:
glog.V(5).Infof("(conn=%p, request=%s) successfully received error and data streams", h.conn, p.requestID) glog.V(5).Infof("(conn=%p, request=%s) successfully received error and data streams", h.conn, p.requestID)
@ -949,7 +949,7 @@ Loop:
} }
if complete, err := p.add(stream); err != nil { if complete, err := p.add(stream); err != nil {
msg := fmt.Sprintf("error processing stream for request %s: %v", requestID, err) msg := fmt.Sprintf("error processing stream for request %s: %v", requestID, err)
util.HandleError(errors.New(msg)) utilruntime.HandleError(errors.New(msg))
p.printError(msg) p.printError(msg)
} else if complete { } else if complete {
go h.portForward(p) go h.portForward(p)
@ -973,7 +973,7 @@ func (h *portForwardStreamHandler) portForward(p *portForwardStreamPair) {
if err != nil { if err != nil {
msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", port, h.pod, h.uid, err) msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", port, h.pod, h.uid, err)
util.HandleError(msg) utilruntime.HandleError(msg)
fmt.Fprint(p.errorStream, msg.Error()) fmt.Fprint(p.errorStream, msg.Error())
} }
} }

View File

@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/intstr"
utilnet "k8s.io/kubernetes/pkg/util/net" utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/runtime"
) )
// Controller is the controller manager for the core bootstrap Kubernetes controller // Controller is the controller manager for the core bootstrap Kubernetes controller
@ -103,7 +104,7 @@ func (c *Controller) RunKubernetesService(ch chan struct{}) {
// run, ports and type will be corrected only during // run, ports and type will be corrected only during
// start. // start.
if err := c.UpdateKubernetesService(false); err != nil { if err := c.UpdateKubernetesService(false); err != nil {
util.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err)) runtime.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err))
} }
}, c.EndpointInterval, ch) }, c.EndpointInterval, ch)
} }

View File

@ -29,11 +29,11 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
utilnet "k8s.io/kubernetes/pkg/util/net" utilnet "k8s.io/kubernetes/pkg/util/net"
utilerrors "k8s.io/kubernetes/pkg/util/errors" utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/iptables" "k8s.io/kubernetes/pkg/util/iptables"
"k8s.io/kubernetes/pkg/util/runtime"
) )
type portal struct { type portal struct {
@ -335,7 +335,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol
glog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) glog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
go func(service proxy.ServicePortName, proxier *Proxier) { go func(service proxy.ServicePortName, proxier *Proxier) {
defer util.HandleCrash() defer runtime.HandleCrash()
atomic.AddInt32(&proxier.numProxyLoops, 1) atomic.AddInt32(&proxier.numProxyLoops, 1)
sock.ProxyLoop(service, si, proxier) sock.ProxyLoop(service, si, proxier)
atomic.AddInt32(&proxier.numProxyLoops, -1) atomic.AddInt32(&proxier.numProxyLoops, -1)

View File

@ -31,8 +31,8 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
ipttest "k8s.io/kubernetes/pkg/util/iptables/testing" ipttest "k8s.io/kubernetes/pkg/util/iptables/testing"
"k8s.io/kubernetes/pkg/util/runtime"
) )
const ( const (
@ -87,7 +87,7 @@ var udpServerPort int
func init() { func init() {
// Don't handle panics // Don't handle panics
util.ReallyCrash = true runtime.ReallyCrash = true
// TCP setup. // TCP setup.
// TODO: Close() this when fix #19254 // TODO: Close() this when fix #19254

View File

@ -28,7 +28,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime"
) )
// Abstraction over TCP/UDP sockets which are proxied. // Abstraction over TCP/UDP sockets which are proxied.
@ -259,7 +259,7 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne
} }
activeClients.clients[cliAddr.String()] = svrConn activeClients.clients[cliAddr.String()] = svrConn
go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) {
defer util.HandleCrash() defer runtime.HandleCrash()
udp.proxyClient(cliAddr, svrConn, activeClients, timeout) udp.proxyClient(cliAddr, svrConn, activeClients, timeout)
}(cliAddr, svrConn, activeClients, timeout) }(cliAddr, svrConn, activeClients, timeout)
} }

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/registry/service" "k8s.io/kubernetes/pkg/registry/service"
"k8s.io/kubernetes/pkg/registry/service/ipallocator" "k8s.io/kubernetes/pkg/registry/service/ipallocator"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
) )
@ -67,7 +68,7 @@ func NewRepair(interval time.Duration, registry service.Registry, network *net.I
func (c *Repair) RunUntil(ch chan struct{}) { func (c *Repair) RunUntil(ch chan struct{}) {
util.Until(func() { util.Until(func() {
if err := c.RunOnce(); err != nil { if err := c.RunOnce(); err != nil {
util.HandleError(err) runtime.HandleError(err)
} }
}, c.interval, ch) }, c.interval, ch)
} }
@ -113,7 +114,7 @@ func (c *Repair) runOnce() error {
ip := net.ParseIP(svc.Spec.ClusterIP) ip := net.ParseIP(svc.Spec.ClusterIP)
if ip == nil { if ip == nil {
// cluster IP is broken, reallocate // cluster IP is broken, reallocate
util.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", svc.Spec.ClusterIP, svc.Name, svc.Namespace)) runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", svc.Spec.ClusterIP, svc.Name, svc.Namespace))
continue continue
} }
switch err := r.Allocate(ip); err { switch err := r.Allocate(ip); err {
@ -121,11 +122,11 @@ func (c *Repair) runOnce() error {
case ipallocator.ErrAllocated: case ipallocator.ErrAllocated:
// TODO: send event // TODO: send event
// cluster IP is broken, reallocate // cluster IP is broken, reallocate
util.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s was assigned to multiple services; please recreate", ip, svc.Name, svc.Namespace)) runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s was assigned to multiple services; please recreate", ip, svc.Name, svc.Namespace))
case ipallocator.ErrNotInRange: case ipallocator.ErrNotInRange:
// TODO: send event // TODO: send event
// cluster IP is broken, reallocate // cluster IP is broken, reallocate
util.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not within the service CIDR %s; please recreate", ip, svc.Name, svc.Namespace, c.network)) runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not within the service CIDR %s; please recreate", ip, svc.Name, svc.Namespace, c.network))
case ipallocator.ErrFull: case ipallocator.ErrFull:
// TODO: send event // TODO: send event
return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", r) return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", r)

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/registry/service/portallocator" "k8s.io/kubernetes/pkg/registry/service/portallocator"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/runtime"
) )
// See ipallocator/controller/repair.go; this is a copy for ports. // See ipallocator/controller/repair.go; this is a copy for ports.
@ -52,7 +53,7 @@ func NewRepair(interval time.Duration, registry service.Registry, portRange net.
func (c *Repair) RunUntil(ch chan struct{}) { func (c *Repair) RunUntil(ch chan struct{}) {
util.Until(func() { util.Until(func() {
if err := c.RunOnce(); err != nil { if err := c.RunOnce(); err != nil {
util.HandleError(err) runtime.HandleError(err)
} }
}, c.interval, ch) }, c.interval, ch)
} }
@ -107,11 +108,11 @@ func (c *Repair) runOnce() error {
case portallocator.ErrAllocated: case portallocator.ErrAllocated:
// TODO: send event // TODO: send event
// port is broken, reallocate // port is broken, reallocate
util.HandleError(fmt.Errorf("the port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace)) runtime.HandleError(fmt.Errorf("the port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace))
case portallocator.ErrNotInRange: case portallocator.ErrNotInRange:
// TODO: send event // TODO: send event
// port is broken, reallocate // port is broken, reallocate
util.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %v; please recreate", port, svc.Name, svc.Namespace, c.portRange)) runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %v; please recreate", port, svc.Name, svc.Namespace, c.portRange))
case portallocator.ErrFull: case portallocator.ErrFull:
// TODO: send event // TODO: send event
return fmt.Errorf("the port range %v is full; you must widen the port range in order to create new services", c.portRange) return fmt.Errorf("the port range %v is full; you must widen the port range in order to create new services", c.portRange)

View File

@ -27,7 +27,7 @@ import (
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
etcd "github.com/coreos/etcd/client" etcd "github.com/coreos/etcd/client"
@ -145,7 +145,7 @@ func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, e
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called // etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
// as a goroutine. // as a goroutine.
func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key string, resourceVersion uint64) { func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key string, resourceVersion uint64) {
defer util.HandleCrash() defer utilruntime.HandleCrash()
defer close(w.etcdError) defer close(w.etcdError)
defer close(w.etcdIncoming) defer close(w.etcdIncoming)
@ -211,7 +211,7 @@ func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key stri
resp, err := client.Get(ctx, key, &opts) resp, err := client.Get(ctx, key, &opts)
if err != nil { if err != nil {
if !etcdutil.IsEtcdNotFound(err) { if !etcdutil.IsEtcdNotFound(err) {
util.HandleError(fmt.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err)) utilruntime.HandleError(fmt.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err))
return resourceVersion, err return resourceVersion, err
} }
if etcdError, ok := err.(etcd.Error); ok { if etcdError, ok := err.(etcd.Error); ok {
@ -247,7 +247,7 @@ var (
// called as a goroutine. // called as a goroutine.
func (w *etcdWatcher) translate() { func (w *etcdWatcher) translate() {
defer close(w.outgoing) defer close(w.outgoing)
defer util.HandleCrash() defer utilruntime.HandleCrash()
for { for {
select { select {
@ -309,7 +309,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
// ensure resource version is set on the object we load from etcd // ensure resource version is set on the object we load from etcd
if w.versioner != nil { if w.versioner != nil {
if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil { if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil {
util.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err)) utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err))
} }
} }
@ -317,7 +317,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
if w.transform != nil { if w.transform != nil {
obj, err = w.transform(obj) obj, err = w.transform(obj)
if err != nil { if err != nil {
util.HandleError(fmt.Errorf("failure to transform api object %#v: %v", obj, err)) utilruntime.HandleError(fmt.Errorf("failure to transform api object %#v: %v", obj, err))
return nil, err return nil, err
} }
} }
@ -330,7 +330,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
func (w *etcdWatcher) sendAdd(res *etcd.Response) { func (w *etcdWatcher) sendAdd(res *etcd.Response) {
if res.Node == nil { if res.Node == nil {
util.HandleError(fmt.Errorf("unexpected nil node: %#v", res)) utilruntime.HandleError(fmt.Errorf("unexpected nil node: %#v", res))
return return
} }
if w.include != nil && !w.include(res.Node.Key) { if w.include != nil && !w.include(res.Node.Key) {
@ -338,7 +338,7 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) {
} }
obj, err := w.decodeObject(res.Node) obj, err := w.decodeObject(res.Node)
if err != nil { if err != nil {
util.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node)) utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node))
// TODO: expose an error through watch.Interface? // TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses // Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value. // the resourceVersion to resume will never be able to get past a bad value.
@ -367,7 +367,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
} }
curObj, err := w.decodeObject(res.Node) curObj, err := w.decodeObject(res.Node)
if err != nil { if err != nil {
util.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node)) utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node))
// TODO: expose an error through watch.Interface? // TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses // Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value. // the resourceVersion to resume will never be able to get past a bad value.
@ -407,7 +407,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
func (w *etcdWatcher) sendDelete(res *etcd.Response) { func (w *etcdWatcher) sendDelete(res *etcd.Response) {
if res.PrevNode == nil { if res.PrevNode == nil {
util.HandleError(fmt.Errorf("unexpected nil prev node: %#v", res)) utilruntime.HandleError(fmt.Errorf("unexpected nil prev node: %#v", res))
return return
} }
if w.include != nil && !w.include(res.PrevNode.Key) { if w.include != nil && !w.include(res.PrevNode.Key) {
@ -422,7 +422,7 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) {
} }
obj, err := w.decodeObject(&node) obj, err := w.decodeObject(&node)
if err != nil { if err != nil {
util.HandleError(fmt.Errorf("failure to decode api object: %v\nfrom %#v %#v", err, res, res.Node)) utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\nfrom %#v %#v", err, res, res.Node))
// TODO: expose an error through watch.Interface? // TODO: expose an error through watch.Interface?
// Ignore this value. If we stop the watch on a bad value, a client that uses // Ignore this value. If we stop the watch on a bad value, a client that uses
// the resourceVersion to resume will never be able to get past a bad value. // the resourceVersion to resume will never be able to get past a bad value.
@ -446,7 +446,7 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
case EtcdDelete, EtcdExpire: case EtcdDelete, EtcdExpire:
w.sendDelete(res) w.sendDelete(res)
default: default:
util.HandleError(fmt.Errorf("unknown action: %v", res.Action)) utilruntime.HandleError(fmt.Errorf("unknown action: %v", res.Action))
} }
} }

View File

@ -21,8 +21,8 @@ import (
"net/http" "net/http"
"strings" "strings"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/runtime"
) )
const HeaderSpdy31 = "SPDY/3.1" const HeaderSpdy31 = "SPDY/3.1"
@ -64,13 +64,13 @@ func (u responseUpgrader) UpgradeResponse(w http.ResponseWriter, req *http.Reque
conn, _, err := hijacker.Hijack() conn, _, err := hijacker.Hijack()
if err != nil { if err != nil {
util.HandleError(fmt.Errorf("unable to upgrade: error hijacking response: %v", err)) runtime.HandleError(fmt.Errorf("unable to upgrade: error hijacking response: %v", err))
return nil return nil
} }
spdyConn, err := NewServerConnection(conn, newStreamHandler) spdyConn, err := NewServerConnection(conn, newStreamHandler)
if err != nil { if err != nil {
util.HandleError(fmt.Errorf("unable to upgrade: error creating SPDY server connection: %v", err)) runtime.HandleError(fmt.Errorf("unable to upgrade: error creating SPDY server connection: %v", err))
return nil return nil
} }

View File

@ -0,0 +1,78 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runtime
import (
"fmt"
"github.com/golang/glog"
"runtime"
)
// For testing, bypass HandleCrash.
var ReallyCrash bool
// PanicHandlers is a list of functions which will be invoked when a panic happens.
var PanicHandlers = []func(interface{}){logPanic}
//TODO search the public functions
// HandleCrash simply catches a crash and logs an error. Meant to be called via defer.
// Additional context-specific handlers can be provided, and will be called in case of panic
func HandleCrash(additionalHandlers ...func(interface{})) {
if ReallyCrash {
return
}
if r := recover(); r != nil {
for _, fn := range PanicHandlers {
fn(r)
}
for _, fn := range additionalHandlers {
fn(r)
}
}
}
// logPanic logs the caller tree when a panic occurs.
func logPanic(r interface{}) {
callers := ""
for i := 0; true; i++ {
_, file, line, ok := runtime.Caller(i)
if !ok {
break
}
callers = callers + fmt.Sprintf("%v:%v\n", file, line)
}
glog.Errorf("Recovered from panic: %#v (%v)\n%v", r, r, callers)
}
// ErrorHandlers is a list of functions which will be invoked when an unreturnable
// error occurs.
var ErrorHandlers = []func(error){logError}
// HandlerError is a method to invoke when a non-user facing piece of code cannot
// return an error and needs to indicate it has been ignored. Invoking this method
// is preferable to logging the error - the default behavior is to log but the
// errors may be sent to a remote server for analysis.
func HandleError(err error) {
for _, fn := range ErrorHandlers {
fn(err)
}
}
// logError prints an error with the call stack of the location it was reported
func logError(err error) {
glog.ErrorDepth(2, err)
}

View File

@ -0,0 +1,69 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runtime
import (
"fmt"
"testing"
)
func TestHandleCrash(t *testing.T) {
count := 0
expect := 10
for i := 0; i < expect; i = i + 1 {
defer HandleCrash()
if i%2 == 0 {
panic("Test Panic")
}
count = count + 1
}
if count != expect {
t.Errorf("Expected %d iterations, found %d", expect, count)
}
}
func TestCustomHandleCrash(t *testing.T) {
old := PanicHandlers
defer func() { PanicHandlers = old }()
var result interface{}
PanicHandlers = []func(interface{}){
func(r interface{}) {
result = r
},
}
func() {
defer HandleCrash()
panic("test")
}()
if result != "test" {
t.Errorf("did not receive custom handler")
}
}
func TestCustomHandleError(t *testing.T) {
old := ErrorHandlers
defer func() { ErrorHandlers = old }()
var result error
ErrorHandlers = []func(error){
func(err error) {
result = err
},
}
err := fmt.Errorf("test")
HandleError(err)
if result != err {
t.Errorf("did not receive custom handler")
}
}

View File

@ -34,6 +34,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
"k8s.io/kubernetes/pkg/util/runtime"
) )
var ( var (
@ -289,7 +290,7 @@ func (l *SSHTunnelList) Close() {
for ix := range l.entries { for ix := range l.entries {
entry := l.entries[ix] entry := l.entries[ix]
go func() { go func() {
defer HandleCrash() defer runtime.HandleCrash()
time.Sleep(1 * time.Minute) time.Sleep(1 * time.Minute)
if err := entry.Tunnel.Close(); err != nil { if err := entry.Tunnel.Close(); err != nil {
glog.Errorf("Failed to close tunnel %v: %v", entry, err) glog.Errorf("Failed to close tunnel %v: %v", entry, err)

View File

@ -22,19 +22,14 @@ import (
"os" "os"
"reflect" "reflect"
"regexp" "regexp"
"runtime"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/runtime"
"github.com/golang/glog"
) )
// For testing, bypass HandleCrash.
var ReallyCrash bool
// For any test of the style: // For any test of the style:
// ... // ...
// <- time.After(timeout): // <- time.After(timeout):
@ -44,57 +39,6 @@ var ReallyCrash bool
// (GC, seeks, etc), but not so long as to make a developer ctrl-c a test run if they do happen to break that test. // (GC, seeks, etc), but not so long as to make a developer ctrl-c a test run if they do happen to break that test.
var ForeverTestTimeout = time.Second * 30 var ForeverTestTimeout = time.Second * 30
// PanicHandlers is a list of functions which will be invoked when a panic happens.
var PanicHandlers = []func(interface{}){logPanic}
// HandleCrash simply catches a crash and logs an error. Meant to be called via defer.
// Additional context-specific handlers can be provided, and will be called in case of panic
func HandleCrash(additionalHandlers ...func(interface{})) {
if ReallyCrash {
return
}
if r := recover(); r != nil {
for _, fn := range PanicHandlers {
fn(r)
}
for _, fn := range additionalHandlers {
fn(r)
}
}
}
// logPanic logs the caller tree when a panic occurs.
func logPanic(r interface{}) {
callers := ""
for i := 0; true; i++ {
_, file, line, ok := runtime.Caller(i)
if !ok {
break
}
callers = callers + fmt.Sprintf("%v:%v\n", file, line)
}
glog.Errorf("Recovered from panic: %#v (%v)\n%v", r, r, callers)
}
// ErrorHandlers is a list of functions which will be invoked when an unreturnable
// error occurs.
var ErrorHandlers = []func(error){logError}
// HandlerError is a method to invoke when a non-user facing piece of code cannot
// return an error and needs to indicate it has been ignored. Invoking this method
// is preferable to logging the error - the default behavior is to log but the
// errors may be sent to a remote server for analysis.
func HandleError(err error) {
for _, fn := range ErrorHandlers {
fn(err)
}
}
// logError prints an error with the call stack of the location it was reported
func logError(err error) {
glog.ErrorDepth(2, err)
}
// NeverStop may be passed to Until to make it never stop. // NeverStop may be passed to Until to make it never stop.
var NeverStop <-chan struct{} = make(chan struct{}) var NeverStop <-chan struct{} = make(chan struct{})
@ -116,7 +60,7 @@ func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
for { for {
func() { func() {
defer HandleCrash() defer runtime.HandleCrash()
f() f()
}() }()
select { select {

View File

@ -17,7 +17,6 @@ limitations under the License.
package util package util
import ( import (
"fmt"
"testing" "testing"
"time" "time"
) )
@ -53,55 +52,6 @@ func TestUntilReturnsImmediately(t *testing.T) {
} }
} }
func TestHandleCrash(t *testing.T) {
count := 0
expect := 10
for i := 0; i < expect; i = i + 1 {
defer HandleCrash()
if i%2 == 0 {
panic("Test Panic")
}
count = count + 1
}
if count != expect {
t.Errorf("Expected %d iterations, found %d", expect, count)
}
}
func TestCustomHandleCrash(t *testing.T) {
old := PanicHandlers
defer func() { PanicHandlers = old }()
var result interface{}
PanicHandlers = []func(interface{}){
func(r interface{}) {
result = r
},
}
func() {
defer HandleCrash()
panic("test")
}()
if result != "test" {
t.Errorf("did not receive custom handler")
}
}
func TestCustomHandleError(t *testing.T) {
old := ErrorHandlers
defer func() { ErrorHandlers = old }()
var result error
ErrorHandlers = []func(error){
func(err error) {
result = err
},
}
err := fmt.Errorf("test")
HandleError(err)
if result != err {
t.Errorf("did not receive custom handler")
}
}
func TestStringDiff(t *testing.T) { func TestStringDiff(t *testing.T) {
diff := StringDiff("aaabb", "aaacc") diff := StringDiff("aaabb", "aaacc")
expect := "aaa\n\nA: bb\n\nB: cc\n\n" expect := "aaa\n\nA: bb\n\nB: cc\n\n"

View File

@ -27,7 +27,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime"
) )
// The Websocket subprotocol "channel.k8s.io" prepends each binary message with a byte indicating // The Websocket subprotocol "channel.k8s.io" prepends each binary message with a byte indicating
@ -92,7 +92,7 @@ func IsWebSocketRequest(req *http.Request) bool {
// ignoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the // ignoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the
// read and write deadlines are pushed every time a new message is received. // read and write deadlines are pushed every time a new message is received.
func ignoreReceives(ws *websocket.Conn, timeout time.Duration) { func ignoreReceives(ws *websocket.Conn, timeout time.Duration) {
defer util.HandleCrash() defer runtime.HandleCrash()
var data []byte var data []byte
for { for {
resetTimeout(ws, timeout) resetTimeout(ws, timeout)
@ -163,7 +163,7 @@ func (conn *Conn) SetIdleTimeout(duration time.Duration) {
// Open the connection and create channels for reading and writing. // Open the connection and create channels for reading and writing.
func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) ([]io.ReadWriteCloser, error) { func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) ([]io.ReadWriteCloser, error) {
go func() { go func() {
defer util.HandleCrash() defer runtime.HandleCrash()
defer conn.Close() defer conn.Close()
websocket.Server{Handshake: conn.handshake, Handler: conn.handle}.ServeHTTP(w, req) websocket.Server{Handshake: conn.handshake, Handler: conn.handle}.ServeHTTP(w, req)
}() }()

View File

@ -23,7 +23,7 @@ import (
"time" "time"
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime"
) )
// The WebSocket subprotocol "binary.k8s.io" will only send messages to the // The WebSocket subprotocol "binary.k8s.io" will only send messages to the
@ -71,7 +71,7 @@ func (r *Reader) handshake(config *websocket.Config, req *http.Request) error {
// method completes. // method completes.
func (r *Reader) Copy(w http.ResponseWriter, req *http.Request) error { func (r *Reader) Copy(w http.ResponseWriter, req *http.Request) error {
go func() { go func() {
defer util.HandleCrash() defer runtime.HandleCrash()
websocket.Server{Handshake: r.handshake, Handler: r.handle}.ServeHTTP(w, req) websocket.Server{Handshake: r.handshake, Handler: r.handle}.ServeHTTP(w, req)
}() }()
return <-r.err return <-r.err

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/keymutex" "k8s.io/kubernetes/pkg/util/keymutex"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
) )
@ -223,7 +224,7 @@ func verifyDevicePath(devicePaths []string, sdBeforeSet sets.String) (string, er
// This function is intended to be called asynchronously as a go routine. // This function is intended to be called asynchronously as a go routine.
func detachDiskAndVerify(c *gcePersistentDiskCleaner) { func detachDiskAndVerify(c *gcePersistentDiskCleaner) {
glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Will block for pending operations", c.pdName) glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Will block for pending operations", c.pdName)
defer util.HandleCrash() defer runtime.HandleCrash()
// Block execution until any pending attach/detach operations for this PD have completed // Block execution until any pending attach/detach operations for this PD have completed
attachDetachMutex.LockKey(c.pdName) attachDetachMutex.LockKey(c.pdName)

View File

@ -22,8 +22,8 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/net"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
) )
// Decoder allows StreamWatcher to watch any stream for which a Decoder can be written. // Decoder allows StreamWatcher to watch any stream for which a Decoder can be written.
@ -88,7 +88,7 @@ func (sw *StreamWatcher) stopping() bool {
func (sw *StreamWatcher) receive() { func (sw *StreamWatcher) receive() {
defer close(sw.result) defer close(sw.result)
defer sw.Stop() defer sw.Stop()
defer util.HandleCrash() defer utilruntime.HandleCrash()
for { for {
action, obj, err := sw.source.Decode() action, obj, err := sw.source.Decode()
if err != nil { if err != nil {

View File

@ -32,7 +32,7 @@ import (
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/plugin/pkg/scheduler" "k8s.io/kubernetes/plugin/pkg/scheduler"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
@ -343,7 +343,7 @@ func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue
// Retry asynchronously. // Retry asynchronously.
// Note that this is extremely rudimentary and we need a more real error handling path. // Note that this is extremely rudimentary and we need a more real error handling path.
go func() { go func() {
defer util.HandleCrash() defer runtime.HandleCrash()
podID := types.NamespacedName{ podID := types.NamespacedName{
Namespace: pod.Namespace, Namespace: pod.Namespace,
Name: pod.Name, Name: pod.Name,

View File

@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/runtime"
) )
const ( const (
@ -206,7 +207,7 @@ var _ = ginkgo.SynchronizedAfterSuite(func() {
// generated in this directory, and cluster logs will also be saved. // generated in this directory, and cluster logs will also be saved.
// This function is called on each Ginkgo node in parallel mode. // This function is called on each Ginkgo node in parallel mode.
func RunE2ETests(t *testing.T) { func RunE2ETests(t *testing.T) {
util.ReallyCrash = true runtime.ReallyCrash = true
util.InitLogs() util.InitLogs()
defer util.FlushLogs() defer util.FlushLogs()