From b581e23c936306eff827384ab7d0d9348c819f62 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Fri, 29 Jul 2016 10:17:37 +0800 Subject: [PATCH 1/2] Kubelet: add gRPC implementation of new runtime interface --- pkg/kubelet/remote/doc.go | 19 ++ pkg/kubelet/remote/remote_image.go | 112 +++++++++ pkg/kubelet/remote/remote_runtime.go | 360 +++++++++++++++++++++++++++ pkg/kubelet/remote/utils.go | 34 +++ 4 files changed, 525 insertions(+) create mode 100644 pkg/kubelet/remote/doc.go create mode 100644 pkg/kubelet/remote/remote_image.go create mode 100644 pkg/kubelet/remote/remote_runtime.go create mode 100644 pkg/kubelet/remote/utils.go diff --git a/pkg/kubelet/remote/doc.go b/pkg/kubelet/remote/doc.go new file mode 100644 index 00000000000..7124b75d6a2 --- /dev/null +++ b/pkg/kubelet/remote/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package remote containers gRPC implementation of kubecontainer.RuntimeService +// and kubecontainer.ImageManagerService. +package remote diff --git a/pkg/kubelet/remote/remote_image.go b/pkg/kubelet/remote/remote_image.go new file mode 100644 index 00000000000..b603e8e1618 --- /dev/null +++ b/pkg/kubelet/remote/remote_image.go @@ -0,0 +1,112 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remote + +import ( + "time" + + "github.com/golang/glog" + "google.golang.org/grpc" + internalApi "k8s.io/kubernetes/pkg/kubelet/api" + runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" +) + +// RemoteImageService is a gRPC implementation of internalApi.ImageManagerService. +type RemoteImageService struct { + timeout time.Duration + imageClient runtimeApi.ImageServiceClient +} + +// NewRemoteImageService creates a new internalApi.ImageManagerService. +func NewRemoteImageService(addr string, connectionTimout time.Duration) (internalApi.ImageManagerService, error) { + glog.V(3).Infof("Connecting to image service %s", addr) + conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithDialer(dial)) + if err != nil { + glog.Errorf("Connect remote image service %s failed: %v", addr, err) + return nil, err + } + + return &RemoteImageService{ + timeout: connectionTimout, + imageClient: runtimeApi.NewImageServiceClient(conn), + }, nil +} + +// ListImages lists pulled images. +func (r *RemoteImageService) ListImages(filter *runtimeApi.ImageFilter) ([]*runtimeApi.Image, error) { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + resp, err := r.imageClient.ListImages(ctx, &runtimeApi.ListImagesRequest{ + Filter: filter, + }) + if err != nil { + glog.Errorf("ListImages from image service failed: %v", err) + return nil, err + } + + return resp.Images, nil +} + +// ImageStatus returns the status of the image. +func (r *RemoteImageService) ImageStatus(image *runtimeApi.ImageSpec) (*runtimeApi.Image, error) { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + resp, err := r.imageClient.ImageStatus(ctx, &runtimeApi.ImageStatusRequest{ + Image: image, + }) + if err != nil { + glog.Errorf("ImageStatus from image service failed: %v", err) + return nil, err + } + + return resp.Image, nil +} + +// PullImage pulls a image with authentication config. +func (r *RemoteImageService) PullImage(image *runtimeApi.ImageSpec, auth *runtimeApi.AuthConfig) error { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + _, err := r.imageClient.PullImage(ctx, &runtimeApi.PullImageRequest{ + Image: image, + Auth: auth, + }) + if err != nil { + glog.Errorf("PullImage from image service failed: %v", err) + return err + } + + return nil +} + +// RemoveImage removes the image. +func (r *RemoteImageService) RemoveImage(image *runtimeApi.ImageSpec) error { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + _, err := r.imageClient.RemoveImage(ctx, &runtimeApi.RemoveImageRequest{ + Image: image, + }) + if err != nil { + glog.Errorf("RemoveImage from image service failed: %v", err) + return err + } + + return nil +} diff --git a/pkg/kubelet/remote/remote_runtime.go b/pkg/kubelet/remote/remote_runtime.go new file mode 100644 index 00000000000..02e1a0e1206 --- /dev/null +++ b/pkg/kubelet/remote/remote_runtime.go @@ -0,0 +1,360 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remote + +import ( + "io" + "time" + + "github.com/golang/glog" + "google.golang.org/grpc" + internalApi "k8s.io/kubernetes/pkg/kubelet/api" + runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" +) + +// RemoteRuntimeService is a gRPC implementation of internalApi.RuntimeService. +type RemoteRuntimeService struct { + timeout time.Duration + runtimeClient runtimeApi.RuntimeServiceClient +} + +// NewRemoteRuntimeService creates a new internalApi.RuntimeService. +func NewRemoteRuntimeService(addr string, connectionTimout time.Duration) (internalApi.RuntimeService, error) { + glog.V(3).Infof("Connecting to runtime service %s", addr) + conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithDialer(dial)) + if err != nil { + glog.Errorf("Connect remote runtime %s failed: %v", addr, err) + return nil, err + } + + return &RemoteRuntimeService{ + timeout: connectionTimout, + runtimeClient: runtimeApi.NewRuntimeServiceClient(conn), + }, nil +} + +// Version returns the runtime name, runtime version and runtime API version. +func (r *RemoteRuntimeService) Version(apiVersion string) (*runtimeApi.VersionResponse, error) { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + typedVersion, err := r.runtimeClient.Version(ctx, &runtimeApi.VersionRequest{ + Version: &apiVersion, + }, + ) + if err != nil { + glog.Errorf("Version from runtime service failed: %v", err) + return nil, err + } + + return typedVersion, err +} + +// CreatePodSandbox creates a pod-level sandbox. +func (r *RemoteRuntimeService) CreatePodSandbox(config *runtimeApi.PodSandboxConfig) (string, error) { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + resp, err := r.runtimeClient.CreatePodSandbox(ctx, &runtimeApi.CreatePodSandboxRequest{ + Config: config, + }) + if err != nil { + glog.Errorf("CreatePodSandbox from runtime service failed: %v", err) + return "", err + } + + return resp.GetPodSandboxId(), nil +} + +// StopPodSandbox stops the sandbox. If there are any running containers in the +// sandbox, they should be forced to termination. +func (r *RemoteRuntimeService) StopPodSandbox(podSandBoxID string) error { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + _, err := r.runtimeClient.StopPodSandbox(ctx, &runtimeApi.StopPodSandboxRequest{ + PodSandboxId: &podSandBoxID, + }) + if err != nil { + glog.Errorf("StopPodSandbox from runtime service failed: %v", err) + return err + } + + return nil +} + +// DeletePodSandbox deletes the sandbox. If there are any containers in the +// sandbox, they should be forced to deletion. +func (r *RemoteRuntimeService) DeletePodSandbox(podSandBoxID string) error { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + _, err := r.runtimeClient.DeletePodSandbox(ctx, &runtimeApi.DeletePodSandboxRequest{ + PodSandboxId: &podSandBoxID, + }) + if err != nil { + glog.Errorf("DeletePodSandbox from runtime service failed: %v", err) + return err + } + + return nil +} + +// PodSandboxStatus returns the status of the PodSandbox. +func (r *RemoteRuntimeService) PodSandboxStatus(podSandBoxID string) (*runtimeApi.PodSandboxStatus, error) { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + resp, err := r.runtimeClient.PodSandboxStatus(ctx, &runtimeApi.PodSandboxStatusRequest{ + PodSandboxId: &podSandBoxID, + }) + if err != nil { + glog.Errorf("PodSandboxStatus from runtime service failed: %v", err) + return nil, err + } + + return resp.Status, nil +} + +// ListPodSandbox returns a list of PodSandboxes. +func (r *RemoteRuntimeService) ListPodSandbox(filter *runtimeApi.PodSandboxFilter) ([]*runtimeApi.PodSandbox, error) { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + resp, err := r.runtimeClient.ListPodSandbox(ctx, &runtimeApi.ListPodSandboxRequest{ + Filter: filter, + }) + if err != nil { + glog.Errorf("ListPodSandbox from runtime service failed: %v", err) + return nil, err + } + + return resp.Items, nil +} + +// CreateContainer creates a new container in the specified PodSandbox. +func (r *RemoteRuntimeService) CreateContainer(podSandBoxID string, config *runtimeApi.ContainerConfig, sandboxConfig *runtimeApi.PodSandboxConfig) (string, error) { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + resp, err := r.runtimeClient.CreateContainer(ctx, &runtimeApi.CreateContainerRequest{ + PodSandboxId: &podSandBoxID, + Config: config, + SandboxConfig: sandboxConfig, + }) + if err != nil { + glog.Errorf("CreateContainer from runtime service failed: %v", err) + return "", err + } + + return resp.GetContainerId(), nil +} + +// StartContainer starts the container. +func (r *RemoteRuntimeService) StartContainer(rawContainerID string) error { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + _, err := r.runtimeClient.StartContainer(ctx, &runtimeApi.StartContainerRequest{ + ContainerId: &rawContainerID, + }) + if err != nil { + glog.Errorf("StartContainer from runtime service failed: %v", err) + return err + } + + return nil +} + +// StopContainer stops a running container with a grace period (i.e., timeout). +func (r *RemoteRuntimeService) StopContainer(rawContainerID string, timeout int64) error { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + _, err := r.runtimeClient.StopContainer(ctx, &runtimeApi.StopContainerRequest{ + ContainerId: &rawContainerID, + Timeout: &timeout, + }) + if err != nil { + glog.Errorf("StopContainer from runtime service failed: %v", err) + return err + } + + return nil +} + +// RemoveContainer removes the container. If the container is running, the container +// should be forced to removal. +func (r *RemoteRuntimeService) RemoveContainer(rawContainerID string) error { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + _, err := r.runtimeClient.RemoveContainer(ctx, &runtimeApi.RemoveContainerRequest{ + ContainerId: &rawContainerID, + }) + if err != nil { + glog.Errorf("RemoveContainer from runtime service failed: %v", err) + return err + } + + return nil +} + +// ListContainers lists containers by filters. +func (r *RemoteRuntimeService) ListContainers(filter *runtimeApi.ContainerFilter) ([]*runtimeApi.Container, error) { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + resp, err := r.runtimeClient.ListContainers(ctx, &runtimeApi.ListContainersRequest{ + Filter: filter, + }) + if err != nil { + glog.Errorf("ListContainers from runtime service failed: %v", err) + return nil, err + } + + return resp.Containers, nil +} + +// ContainerStatus returns the container status. +func (r *RemoteRuntimeService) ContainerStatus(rawContainerID string) (*runtimeApi.ContainerStatus, error) { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + resp, err := r.runtimeClient.ContainerStatus(ctx, &runtimeApi.ContainerStatusRequest{ + ContainerId: &rawContainerID, + }) + if err != nil { + glog.Errorf("ContainerStatus from runtime service failed: %v", err) + return nil, err + } + + return resp.Status, nil +} + +// Exec executes a command in the container. +// TODO: support terminal resizing for exec, refer https://github.com/kubernetes/kubernetes/issues/29579. +func (r *RemoteRuntimeService) Exec(rawContainerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + stream, err := r.runtimeClient.Exec(ctx) + if err != nil { + glog.Errorf("Get remote runtime client stream failed: %v", err) + return err + } + + request := &runtimeApi.ExecRequest{ + ContainerId: &rawContainerID, + Cmd: cmd, + Tty: &tty, + } + err = stream.Send(request) + if err != nil { + glog.Errorf("Send exec request to remote runtime failed: %v", err) + return err + } + + errChanOut := make(chan error, 1) + errChanIn := make(chan error, 1) + exit := make(chan bool) + + go func(stdout, stderr io.WriteCloser) { + defer close(errChanOut) + defer close(exit) + + for { + resp, err := stream.Recv() + if err != nil && err != io.EOF { + errChanOut <- err + return + } + + if resp != nil && len(resp.Stdout) > 0 && stdout != nil { + nw, err := stdout.Write(resp.Stdout) + if err != nil { + errChanOut <- err + return + } + if nw != len(resp.Stdout) { + errChanOut <- io.ErrShortWrite + return + } + if err == io.EOF { + break + } + } + + if resp != nil && len(resp.Stderr) > 0 && stderr != nil { + nw, err := stderr.Write(resp.Stderr) + if err != nil { + errChanOut <- err + return + } + if nw != len(resp.Stderr) { + errChanOut <- io.ErrShortWrite + return + } + if err == io.EOF { + break + } + } + } + }(stdout, stderr) + + if stdin != nil { + go func(stdin io.Reader) { + defer close(errChanIn) + buffer := make([]byte, 256) + + for { + nr, err := stdin.Read(buffer) + if nr > 0 { + request.Stdin = buffer[:nr] + err := stream.Send(request) + if err != nil { + errChanIn <- err + return + } + } + + if err == io.EOF { + break + } + + if err != nil { + errChanIn <- err + return + } + } + }(stdin) + } + + <-exit + select { + case err = <-errChanIn: + if err != nil { + glog.Errorf("Exec send stream error: %v", err) + } + return err + case err = <-errChanOut: + if err != nil { + glog.Errorf("Exec receive stream error: %v", err) + } + return err + } +} diff --git a/pkg/kubelet/remote/utils.go b/pkg/kubelet/remote/utils.go new file mode 100644 index 00000000000..3b6357a7e67 --- /dev/null +++ b/pkg/kubelet/remote/utils.go @@ -0,0 +1,34 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remote + +import ( + "net" + "time" + + "golang.org/x/net/context" +) + +// dial creates a net.Conn by unix socket addr. +func dial(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", addr, timeout) +} + +// getContextWithTimeout returns a context with timeout. +func getContextWithTimeout(timeout time.Duration) (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), timeout) +} From f8c51adbe13fe739c60f1303b40245ed62218042 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Wed, 3 Aug 2016 07:22:06 +0800 Subject: [PATCH 2/2] Log id in error message --- pkg/kubelet/remote/doc.go | 6 +- pkg/kubelet/remote/remote_image.go | 10 +- pkg/kubelet/remote/remote_runtime.go | 133 +++------------------------ 3 files changed, 21 insertions(+), 128 deletions(-) diff --git a/pkg/kubelet/remote/doc.go b/pkg/kubelet/remote/doc.go index 7124b75d6a2..c53f3e166a7 100644 --- a/pkg/kubelet/remote/doc.go +++ b/pkg/kubelet/remote/doc.go @@ -1,5 +1,5 @@ /* -Copyright 2015 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,6 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package remote containers gRPC implementation of kubecontainer.RuntimeService -// and kubecontainer.ImageManagerService. +// Package remote containers gRPC implementation of internalApi.RuntimeService +// and internalApi.ImageManagerService. package remote diff --git a/pkg/kubelet/remote/remote_image.go b/pkg/kubelet/remote/remote_image.go index b603e8e1618..b6fa7d88b1e 100644 --- a/pkg/kubelet/remote/remote_image.go +++ b/pkg/kubelet/remote/remote_image.go @@ -46,7 +46,7 @@ func NewRemoteImageService(addr string, connectionTimout time.Duration) (interna }, nil } -// ListImages lists pulled images. +// ListImages lists available images. func (r *RemoteImageService) ListImages(filter *runtimeApi.ImageFilter) ([]*runtimeApi.Image, error) { ctx, cancel := getContextWithTimeout(r.timeout) defer cancel() @@ -55,7 +55,7 @@ func (r *RemoteImageService) ListImages(filter *runtimeApi.ImageFilter) ([]*runt Filter: filter, }) if err != nil { - glog.Errorf("ListImages from image service failed: %v", err) + glog.Errorf("ListImages with filter %q from image service failed: %v", filter, err) return nil, err } @@ -71,7 +71,7 @@ func (r *RemoteImageService) ImageStatus(image *runtimeApi.ImageSpec) (*runtimeA Image: image, }) if err != nil { - glog.Errorf("ImageStatus from image service failed: %v", err) + glog.Errorf("ImageStatus %q from image service failed: %v", image.GetImage(), err) return nil, err } @@ -88,7 +88,7 @@ func (r *RemoteImageService) PullImage(image *runtimeApi.ImageSpec, auth *runtim Auth: auth, }) if err != nil { - glog.Errorf("PullImage from image service failed: %v", err) + glog.Errorf("PullImage %q from image service failed: %v", image.GetImage(), err) return err } @@ -104,7 +104,7 @@ func (r *RemoteImageService) RemoveImage(image *runtimeApi.ImageSpec) error { Image: image, }) if err != nil { - glog.Errorf("RemoveImage from image service failed: %v", err) + glog.Errorf("RemoveImage %q from image service failed: %v", image.GetImage(), err) return err } diff --git a/pkg/kubelet/remote/remote_runtime.go b/pkg/kubelet/remote/remote_runtime.go index 02e1a0e1206..9e2ae073649 100644 --- a/pkg/kubelet/remote/remote_runtime.go +++ b/pkg/kubelet/remote/remote_runtime.go @@ -20,6 +20,7 @@ import ( "io" "time" + "fmt" "github.com/golang/glog" "google.golang.org/grpc" internalApi "k8s.io/kubernetes/pkg/kubelet/api" @@ -54,8 +55,7 @@ func (r *RemoteRuntimeService) Version(apiVersion string) (*runtimeApi.VersionRe typedVersion, err := r.runtimeClient.Version(ctx, &runtimeApi.VersionRequest{ Version: &apiVersion, - }, - ) + }) if err != nil { glog.Errorf("Version from runtime service failed: %v", err) return nil, err @@ -90,7 +90,7 @@ func (r *RemoteRuntimeService) StopPodSandbox(podSandBoxID string) error { PodSandboxId: &podSandBoxID, }) if err != nil { - glog.Errorf("StopPodSandbox from runtime service failed: %v", err) + glog.Errorf("StopPodSandbox %q from runtime service failed: %v", podSandBoxID, err) return err } @@ -107,7 +107,7 @@ func (r *RemoteRuntimeService) DeletePodSandbox(podSandBoxID string) error { PodSandboxId: &podSandBoxID, }) if err != nil { - glog.Errorf("DeletePodSandbox from runtime service failed: %v", err) + glog.Errorf("DeletePodSandbox %q from runtime service failed: %v", podSandBoxID, err) return err } @@ -123,7 +123,7 @@ func (r *RemoteRuntimeService) PodSandboxStatus(podSandBoxID string) (*runtimeAp PodSandboxId: &podSandBoxID, }) if err != nil { - glog.Errorf("PodSandboxStatus from runtime service failed: %v", err) + glog.Errorf("PodSandboxStatus %q from runtime service failed: %v", podSandBoxID, err) return nil, err } @@ -139,7 +139,7 @@ func (r *RemoteRuntimeService) ListPodSandbox(filter *runtimeApi.PodSandboxFilte Filter: filter, }) if err != nil { - glog.Errorf("ListPodSandbox from runtime service failed: %v", err) + glog.Errorf("ListPodSandbox with filter %q from runtime service failed: %v", filter, err) return nil, err } @@ -157,7 +157,7 @@ func (r *RemoteRuntimeService) CreateContainer(podSandBoxID string, config *runt SandboxConfig: sandboxConfig, }) if err != nil { - glog.Errorf("CreateContainer from runtime service failed: %v", err) + glog.Errorf("CreateContainer in sandbox %q from runtime service failed: %v", podSandBoxID, err) return "", err } @@ -173,7 +173,7 @@ func (r *RemoteRuntimeService) StartContainer(rawContainerID string) error { ContainerId: &rawContainerID, }) if err != nil { - glog.Errorf("StartContainer from runtime service failed: %v", err) + glog.Errorf("StartContainer %q from runtime service failed: %v", rawContainerID, err) return err } @@ -190,7 +190,7 @@ func (r *RemoteRuntimeService) StopContainer(rawContainerID string, timeout int6 Timeout: &timeout, }) if err != nil { - glog.Errorf("StopContainer from runtime service failed: %v", err) + glog.Errorf("StopContainer %q from runtime service failed: %v", rawContainerID, err) return err } @@ -207,7 +207,7 @@ func (r *RemoteRuntimeService) RemoveContainer(rawContainerID string) error { ContainerId: &rawContainerID, }) if err != nil { - glog.Errorf("RemoveContainer from runtime service failed: %v", err) + glog.Errorf("RemoveContainer %q from runtime service failed: %v", rawContainerID, err) return err } @@ -223,7 +223,7 @@ func (r *RemoteRuntimeService) ListContainers(filter *runtimeApi.ContainerFilter Filter: filter, }) if err != nil { - glog.Errorf("ListContainers from runtime service failed: %v", err) + glog.Errorf("ListContainers with filter %q from runtime service failed: %v", filter, err) return nil, err } @@ -239,7 +239,7 @@ func (r *RemoteRuntimeService) ContainerStatus(rawContainerID string) (*runtimeA ContainerId: &rawContainerID, }) if err != nil { - glog.Errorf("ContainerStatus from runtime service failed: %v", err) + glog.Errorf("ContainerStatus %q from runtime service failed: %v", rawContainerID, err) return nil, err } @@ -249,112 +249,5 @@ func (r *RemoteRuntimeService) ContainerStatus(rawContainerID string) (*runtimeA // Exec executes a command in the container. // TODO: support terminal resizing for exec, refer https://github.com/kubernetes/kubernetes/issues/29579. func (r *RemoteRuntimeService) Exec(rawContainerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error { - ctx, cancel := getContextWithTimeout(r.timeout) - defer cancel() - - stream, err := r.runtimeClient.Exec(ctx) - if err != nil { - glog.Errorf("Get remote runtime client stream failed: %v", err) - return err - } - - request := &runtimeApi.ExecRequest{ - ContainerId: &rawContainerID, - Cmd: cmd, - Tty: &tty, - } - err = stream.Send(request) - if err != nil { - glog.Errorf("Send exec request to remote runtime failed: %v", err) - return err - } - - errChanOut := make(chan error, 1) - errChanIn := make(chan error, 1) - exit := make(chan bool) - - go func(stdout, stderr io.WriteCloser) { - defer close(errChanOut) - defer close(exit) - - for { - resp, err := stream.Recv() - if err != nil && err != io.EOF { - errChanOut <- err - return - } - - if resp != nil && len(resp.Stdout) > 0 && stdout != nil { - nw, err := stdout.Write(resp.Stdout) - if err != nil { - errChanOut <- err - return - } - if nw != len(resp.Stdout) { - errChanOut <- io.ErrShortWrite - return - } - if err == io.EOF { - break - } - } - - if resp != nil && len(resp.Stderr) > 0 && stderr != nil { - nw, err := stderr.Write(resp.Stderr) - if err != nil { - errChanOut <- err - return - } - if nw != len(resp.Stderr) { - errChanOut <- io.ErrShortWrite - return - } - if err == io.EOF { - break - } - } - } - }(stdout, stderr) - - if stdin != nil { - go func(stdin io.Reader) { - defer close(errChanIn) - buffer := make([]byte, 256) - - for { - nr, err := stdin.Read(buffer) - if nr > 0 { - request.Stdin = buffer[:nr] - err := stream.Send(request) - if err != nil { - errChanIn <- err - return - } - } - - if err == io.EOF { - break - } - - if err != nil { - errChanIn <- err - return - } - } - }(stdin) - } - - <-exit - select { - case err = <-errChanIn: - if err != nil { - glog.Errorf("Exec send stream error: %v", err) - } - return err - case err = <-errChanOut: - if err != nil { - glog.Errorf("Exec receive stream error: %v", err) - } - return err - } + return fmt.Errorf("Not implemented") }