Merge pull request #104792 from matthyx/60140

make kubectl cp resume on transfer error
This commit is contained in:
Kubernetes Prow Robot 2021-11-10 14:56:34 -08:00 committed by GitHub
commit 82379431df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -71,6 +71,7 @@ type CopyOptions struct {
Container string
Namespace string
NoPreserve bool
MaxTries int
ClientConfig *restclient.Config
Clientset kubernetes.Interface
@ -155,6 +156,7 @@ func NewCmdCp(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.C
}
cmdutil.AddContainerVarFlags(cmd, &o.Container, o.Container)
cmd.Flags().BoolVarP(&o.NoPreserve, "no-preserve", "", false, "The copied file/directory's ownership and permissions will not be preserved in the container")
cmd.Flags().IntVarP(&o.MaxTries, "retries", "", 0, "Set number of retries to complete a copy operation from a container. Specify 0 to disable or any negative value for infinite retrying. The default is 0 (no retry).")
return cmd
}
@ -327,38 +329,77 @@ func (o *CopyOptions) copyToPod(src, dest fileSpec, options *exec.ExecOptions) e
}
func (o *CopyOptions) copyFromPod(src, dest fileSpec) error {
reader, outStream := io.Pipe()
options := &exec.ExecOptions{
StreamOptions: exec.StreamOptions{
IOStreams: genericclioptions.IOStreams{
In: nil,
Out: outStream,
ErrOut: o.Out,
},
Namespace: src.PodNamespace,
PodName: src.PodName,
},
// TODO: Improve error messages by first testing if 'tar' is present in the container?
Command: []string{"tar", "cf", "-", src.File.String()},
Executor: &exec.DefaultRemoteExecutor{},
}
go func() {
defer outStream.Close()
cmdutil.CheckErr(o.execute(options))
}()
reader := newTarPipe(src, o)
srcFile := src.File.(remotePath)
destFile := dest.File.(localPath)
// remove extraneous path shortcuts - these could occur if a path contained extra "../"
// and attempted to navigate beyond "/" in a remote filesystem
prefix := stripPathShortcuts(srcFile.StripSlashes().Clean().String())
return o.untarAll(src.PodNamespace, src.PodName, prefix, srcFile, destFile, reader)
}
type TarPipe struct {
src fileSpec
o *CopyOptions
reader *io.PipeReader
outStream *io.PipeWriter
bytesRead uint64
retries int
}
func newTarPipe(src fileSpec, o *CopyOptions) *TarPipe {
t := new(TarPipe)
t.src = src
t.o = o
t.initReadFrom(0)
return t
}
func (t *TarPipe) initReadFrom(n uint64) {
t.reader, t.outStream = io.Pipe()
options := &exec.ExecOptions{
StreamOptions: exec.StreamOptions{
IOStreams: genericclioptions.IOStreams{
In: nil,
Out: t.outStream,
ErrOut: t.o.Out,
},
Namespace: t.src.PodNamespace,
PodName: t.src.PodName,
},
// TODO: Improve error messages by first testing if 'tar' is present in the container?
Command: []string{"tar", "cf", "-", t.src.File.String()},
Executor: &exec.DefaultRemoteExecutor{},
}
if t.o.MaxTries > 0 {
options.Command = []string{"sh", "-c", fmt.Sprintf("tar cf - %s | tail -c+%d", t.src.File, n)}
}
go func() {
defer t.outStream.Close()
cmdutil.CheckErr(t.o.execute(options))
}()
}
func (t *TarPipe) Read(p []byte) (n int, err error) {
n, err = t.reader.Read(p)
if err != nil {
if t.o.MaxTries < 0 || t.retries < t.o.MaxTries {
fmt.Printf("Resuming copy at %d bytes, retry %d/%d\n", t.bytesRead, t.retries, t.o.MaxTries)
t.initReadFrom(t.bytesRead + 1)
err = nil
t.retries++
} else {
fmt.Printf("Dropping out copy after %d retries\n", t.retries)
}
} else {
t.bytesRead += uint64(n)
}
return
}
func makeTar(src localPath, dest remotePath, writer io.Writer) error {
// TODO: use compression here?
tarWriter := tar.NewWriter(writer)