mirror of
https://github.com/kairos-io/kairos-agent.git
synced 2025-09-02 09:36:19 +00:00
Simplify rsync implementation (#61)
This commit is contained in:
1
go.mod
1
go.mod
@@ -29,7 +29,6 @@ require (
|
|||||||
github.com/spf13/viper v1.8.1
|
github.com/spf13/viper v1.8.1
|
||||||
github.com/twpayne/go-vfs v1.7.2
|
github.com/twpayne/go-vfs v1.7.2
|
||||||
github.com/urfave/cli/v2 v2.25.1
|
github.com/urfave/cli/v2 v2.25.1
|
||||||
github.com/zloylos/grsync v1.7.0
|
|
||||||
golang.org/x/net v0.10.0
|
golang.org/x/net v0.10.0
|
||||||
golang.org/x/oauth2 v0.7.0
|
golang.org/x/oauth2 v0.7.0
|
||||||
gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0
|
gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0
|
||||||
|
@@ -115,6 +115,8 @@ const (
|
|||||||
Archx86 = "x86_64"
|
Archx86 = "x86_64"
|
||||||
ArchArm64 = "arm64"
|
ArchArm64 = "arm64"
|
||||||
SignedShim = "shim.efi"
|
SignedShim = "shim.efi"
|
||||||
|
|
||||||
|
Rsync = "rsync"
|
||||||
)
|
)
|
||||||
|
|
||||||
func GetCloudInitPaths() []string {
|
func GetCloudInitPaths() []string {
|
||||||
|
@@ -369,7 +369,7 @@ func (e *Elemental) DumpSource(target string, imgSrc *v1.ImageSource) (info inte
|
|||||||
}
|
}
|
||||||
} else if imgSrc.IsDir() {
|
} else if imgSrc.IsDir() {
|
||||||
excludes := []string{"/mnt", "/proc", "/sys", "/dev", "/tmp", "/host", "/run"}
|
excludes := []string{"/mnt", "/proc", "/sys", "/dev", "/tmp", "/host", "/run"}
|
||||||
err = utils.SyncData(e.config.Logger, e.config.Fs, imgSrc.Value(), target, excludes...)
|
err = utils.SyncData(e.config.Logger, e.config.Runner, e.config.Fs, imgSrc.Value(), target, excludes...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@@ -584,11 +584,6 @@ var _ = Describe("Elemental", Label("elemental"), func() {
|
|||||||
_, err := el.DeployImage(img, true)
|
_, err := el.DeployImage(img, true)
|
||||||
Expect(err).NotTo(BeNil())
|
Expect(err).NotTo(BeNil())
|
||||||
})
|
})
|
||||||
It("Fails copying the image if source does not exist", func() {
|
|
||||||
img.Source = v1.NewDirSrc("/welp")
|
|
||||||
_, err := el.DeployImage(img, true)
|
|
||||||
Expect(err).NotTo(BeNil())
|
|
||||||
})
|
|
||||||
It("Fails unmounting the image after copying", func() {
|
It("Fails unmounting the image after copying", func() {
|
||||||
mounter.ErrorOnUnmount = true
|
mounter.ErrorOnUnmount = true
|
||||||
_, err := el.DeployImage(img, false)
|
_, err := el.DeployImage(img, false)
|
||||||
@@ -605,14 +600,23 @@ var _ = Describe("Elemental", Label("elemental"), func() {
|
|||||||
Expect(err).ShouldNot(HaveOccurred())
|
Expect(err).ShouldNot(HaveOccurred())
|
||||||
})
|
})
|
||||||
It("Copies files from a directory source", func() {
|
It("Copies files from a directory source", func() {
|
||||||
sourceDir, err := utils.TempDir(fs, "", "elemental")
|
rsyncCount := 0
|
||||||
|
src := ""
|
||||||
|
dest := ""
|
||||||
|
runner.SideEffect = func(cmd string, args ...string) ([]byte, error) {
|
||||||
|
if cmd == constants.Rsync {
|
||||||
|
rsyncCount += 1
|
||||||
|
src = args[len(args)-2]
|
||||||
|
dest = args[len(args)-1]
|
||||||
|
|
||||||
|
}
|
||||||
|
return []byte{}, nil
|
||||||
|
}
|
||||||
|
_, err := e.DumpSource("/dest", v1.NewDirSrc("/source"))
|
||||||
Expect(err).ShouldNot(HaveOccurred())
|
Expect(err).ShouldNot(HaveOccurred())
|
||||||
_, err = e.DumpSource(destDir, v1.NewDirSrc(sourceDir))
|
Expect(rsyncCount).To(Equal(1))
|
||||||
Expect(err).To(BeNil())
|
Expect(src).To(HaveSuffix("/source/"))
|
||||||
})
|
Expect(dest).To(HaveSuffix("/dest/"))
|
||||||
It("Fails if source directory does not exist", func() {
|
|
||||||
_, err := e.DumpSource(destDir, v1.NewDirSrc("/welp"))
|
|
||||||
Expect(err).ToNot(BeNil())
|
|
||||||
})
|
})
|
||||||
It("Unpacks a docker image to target", Label("docker"), func() {
|
It("Unpacks a docker image to target", Label("docker"), func() {
|
||||||
_, err := e.DumpSource(destDir, v1.NewDockerSrc("docker/image:latest"))
|
_, err := e.DumpSource(destDir, v1.NewDockerSrc("docker/image:latest"))
|
||||||
@@ -633,15 +637,12 @@ var _ = Describe("Elemental", Label("elemental"), func() {
|
|||||||
})
|
})
|
||||||
It("Copies image file to target", func() {
|
It("Copies image file to target", func() {
|
||||||
sourceImg := "/source.img"
|
sourceImg := "/source.img"
|
||||||
|
destFile := filepath.Join(destDir, "active.img")
|
||||||
_, err := fs.Create(sourceImg)
|
_, err := fs.Create(sourceImg)
|
||||||
Expect(err).To(BeNil())
|
Expect(err).To(BeNil())
|
||||||
destFile := filepath.Join(destDir, "active.img")
|
|
||||||
_, err = fs.Stat(destFile)
|
|
||||||
Expect(err).NotTo(BeNil())
|
|
||||||
_, err = e.DumpSource(destFile, v1.NewFileSrc(sourceImg))
|
_, err = e.DumpSource(destFile, v1.NewFileSrc(sourceImg))
|
||||||
Expect(err).To(BeNil())
|
Expect(err).To(BeNil())
|
||||||
_, err = fs.Stat(destFile)
|
Expect(runner.IncludesCmds([][]string{{constants.Rsync}}))
|
||||||
Expect(err).To(BeNil())
|
|
||||||
})
|
})
|
||||||
It("Fails to copy, source file is not present", func() {
|
It("Fails to copy, source file is not present", func() {
|
||||||
_, err := e.DumpSource("whatever", v1.NewFileSrc("/source.img"))
|
_, err := e.DumpSource("whatever", v1.NewFileSrc("/source.img"))
|
||||||
|
@@ -32,11 +32,9 @@ import (
|
|||||||
|
|
||||||
"github.com/distribution/distribution/reference"
|
"github.com/distribution/distribution/reference"
|
||||||
"github.com/joho/godotenv"
|
"github.com/joho/godotenv"
|
||||||
"github.com/twpayne/go-vfs"
|
|
||||||
"github.com/zloylos/grsync"
|
|
||||||
|
|
||||||
cnst "github.com/kairos-io/kairos/v2/pkg/constants"
|
cnst "github.com/kairos-io/kairos/v2/pkg/constants"
|
||||||
v1 "github.com/kairos-io/kairos/v2/pkg/types/v1"
|
v1 "github.com/kairos-io/kairos/v2/pkg/types/v1"
|
||||||
|
"github.com/twpayne/go-vfs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func CommandExists(command string) bool {
|
func CommandExists(command string) bool {
|
||||||
@@ -156,7 +154,7 @@ func CreateDirStructure(fs v1.FS, target string) error {
|
|||||||
|
|
||||||
// SyncData rsync's source folder contents to a target folder content,
|
// SyncData rsync's source folder contents to a target folder content,
|
||||||
// both are expected to exist beforehand.
|
// both are expected to exist beforehand.
|
||||||
func SyncData(log v1.Logger, fs v1.FS, source string, target string, excludes ...string) error {
|
func SyncData(log v1.Logger, runner v1.Runner, fs v1.FS, source string, target string, excludes ...string) error {
|
||||||
if fs != nil {
|
if fs != nil {
|
||||||
if s, err := fs.RawPath(source); err == nil {
|
if s, err := fs.RawPath(source); err == nil {
|
||||||
source = s
|
source = s
|
||||||
@@ -174,46 +172,46 @@ func SyncData(log v1.Logger, fs v1.FS, source string, target string, excludes ..
|
|||||||
target = fmt.Sprintf("%s/", target)
|
target = fmt.Sprintf("%s/", target)
|
||||||
}
|
}
|
||||||
|
|
||||||
task := grsync.NewTask(
|
log.Infof("Starting rsync...")
|
||||||
source,
|
args := []string{"--progress", "--partial", "--human-readable", "--archive", "--xattrs", "--acls"}
|
||||||
target,
|
|
||||||
grsync.RsyncOptions{
|
for _, e := range excludes {
|
||||||
Quiet: false,
|
args = append(args, fmt.Sprintf("--exclude=%s", e))
|
||||||
Archive: true,
|
}
|
||||||
XAttrs: true,
|
|
||||||
ACLs: true,
|
args = append(args, source, target)
|
||||||
Exclude: excludes,
|
|
||||||
},
|
done := displayProgress(log, 5*time.Second, "Syncing data...")
|
||||||
)
|
|
||||||
|
_, err := runner.Run(cnst.Rsync, args...)
|
||||||
|
|
||||||
|
close(done)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("rsync finished with errors: %s", err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Info("Finished syncing")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func displayProgress(log v1.Logger, tick time.Duration, message string) chan bool {
|
||||||
|
ticker := time.NewTicker(tick)
|
||||||
|
done := make(chan bool)
|
||||||
|
|
||||||
quit := make(chan bool)
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-quit:
|
case <-done:
|
||||||
|
ticker.Stop()
|
||||||
return
|
return
|
||||||
case <-time.After(5 * time.Second):
|
case <-ticker.C:
|
||||||
state := task.State()
|
log.Debug(message)
|
||||||
log.Debugf(
|
|
||||||
"progress rsync %s to %s: %.2f / rem. %d / tot. %d / sp. %s",
|
|
||||||
source,
|
|
||||||
target,
|
|
||||||
state.Progress,
|
|
||||||
state.Remain,
|
|
||||||
state.Total,
|
|
||||||
state.Speed,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err := task.Run()
|
return done
|
||||||
quit <- true
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("%w: %s", err, strings.Join([]string{task.Log().Stderr, task.Log().Stdout}, "\n"))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reboot reboots the system afater the given delay (in seconds) time passed.
|
// Reboot reboots the system afater the given delay (in seconds) time passed.
|
||||||
|
@@ -53,6 +53,7 @@ var _ = Describe("Utils", Label("utils"), func() {
|
|||||||
var syscall *v1mock.FakeSyscall
|
var syscall *v1mock.FakeSyscall
|
||||||
var client *v1mock.FakeHTTPClient
|
var client *v1mock.FakeHTTPClient
|
||||||
var mounter *v1mock.ErrorMounter
|
var mounter *v1mock.ErrorMounter
|
||||||
|
var realRunner *v1.RealRunner
|
||||||
var fs vfs.FS
|
var fs vfs.FS
|
||||||
var cleanup func()
|
var cleanup func()
|
||||||
|
|
||||||
@@ -62,6 +63,7 @@ var _ = Describe("Utils", Label("utils"), func() {
|
|||||||
mounter = v1mock.NewErrorMounter()
|
mounter = v1mock.NewErrorMounter()
|
||||||
client = &v1mock.FakeHTTPClient{}
|
client = &v1mock.FakeHTTPClient{}
|
||||||
logger = v1.NewNullLogger()
|
logger = v1.NewNullLogger()
|
||||||
|
realRunner = &v1.RealRunner{Logger: logger}
|
||||||
// Ensure /tmp exists in the VFS
|
// Ensure /tmp exists in the VFS
|
||||||
fs, cleanup, _ = vfst.NewTestFS(nil)
|
fs, cleanup, _ = vfst.NewTestFS(nil)
|
||||||
fs.Mkdir("/tmp", constants.DirPerm)
|
fs.Mkdir("/tmp", constants.DirPerm)
|
||||||
@@ -455,7 +457,7 @@ var _ = Describe("Utils", Label("utils"), func() {
|
|||||||
_, _ = utils.TempFile(fs, sourceDir, "file*")
|
_, _ = utils.TempFile(fs, sourceDir, "file*")
|
||||||
}
|
}
|
||||||
|
|
||||||
Expect(utils.SyncData(logger, fs, sourceDir, destDir)).To(BeNil())
|
Expect(utils.SyncData(logger, realRunner, fs, sourceDir, destDir)).To(BeNil())
|
||||||
|
|
||||||
filesDest, err := fs.ReadDir(destDir)
|
filesDest, err := fs.ReadDir(destDir)
|
||||||
Expect(err).To(BeNil())
|
Expect(err).To(BeNil())
|
||||||
@@ -486,7 +488,7 @@ var _ = Describe("Utils", Label("utils"), func() {
|
|||||||
_, _ = utils.TempFile(fs, sourceDir, "file*")
|
_, _ = utils.TempFile(fs, sourceDir, "file*")
|
||||||
}
|
}
|
||||||
|
|
||||||
Expect(utils.SyncData(logger, fs, sourceDir, destDir, "host", "run")).To(BeNil())
|
Expect(utils.SyncData(logger, realRunner, fs, sourceDir, destDir, "host", "run")).To(BeNil())
|
||||||
|
|
||||||
filesDest, err := fs.ReadDir(destDir)
|
filesDest, err := fs.ReadDir(destDir)
|
||||||
Expect(err).To(BeNil())
|
Expect(err).To(BeNil())
|
||||||
@@ -524,7 +526,7 @@ var _ = Describe("Utils", Label("utils"), func() {
|
|||||||
utils.MkdirAll(fs, filepath.Join(sourceDir, "var", "run"), constants.DirPerm)
|
utils.MkdirAll(fs, filepath.Join(sourceDir, "var", "run"), constants.DirPerm)
|
||||||
utils.MkdirAll(fs, filepath.Join(sourceDir, "tmp", "host"), constants.DirPerm)
|
utils.MkdirAll(fs, filepath.Join(sourceDir, "tmp", "host"), constants.DirPerm)
|
||||||
|
|
||||||
Expect(utils.SyncData(logger, fs, sourceDir, destDir, "/host", "/run")).To(BeNil())
|
Expect(utils.SyncData(logger, realRunner, fs, sourceDir, destDir, "/host", "/run")).To(BeNil())
|
||||||
|
|
||||||
filesDest, err := fs.ReadDir(destDir)
|
filesDest, err := fs.ReadDir(destDir)
|
||||||
Expect(err).To(BeNil())
|
Expect(err).To(BeNil())
|
||||||
@@ -550,17 +552,17 @@ var _ = Describe("Utils", Label("utils"), func() {
|
|||||||
Expect(err).ShouldNot(HaveOccurred())
|
Expect(err).ShouldNot(HaveOccurred())
|
||||||
destDir, err := utils.TempDir(fs, "", "elementaltarget")
|
destDir, err := utils.TempDir(fs, "", "elementaltarget")
|
||||||
Expect(err).ShouldNot(HaveOccurred())
|
Expect(err).ShouldNot(HaveOccurred())
|
||||||
Expect(utils.SyncData(logger, fs, sourceDir, destDir)).To(BeNil())
|
Expect(utils.SyncData(logger, realRunner, fs, sourceDir, destDir)).To(BeNil())
|
||||||
})
|
})
|
||||||
It("should NOT fail if destination does not exist", func() {
|
It("should NOT fail if destination does not exist", func() {
|
||||||
sourceDir, err := os.MkdirTemp("", "elemental")
|
sourceDir, err := os.MkdirTemp("", "elemental")
|
||||||
err = os.WriteFile(filepath.Join(sourceDir, "testfile"), []byte("sdjfnsdjkfjkdsanfkjsnda"), os.ModePerm)
|
err = os.WriteFile(filepath.Join(sourceDir, "testfile"), []byte("sdjfnsdjkfjkdsanfkjsnda"), os.ModePerm)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
err = utils.SyncData(logger, nil, sourceDir, "/welp")
|
err = utils.SyncData(logger, realRunner, nil, sourceDir, "/welp")
|
||||||
Expect(err).To(BeNil())
|
Expect(err).To(BeNil())
|
||||||
})
|
})
|
||||||
It("should fail if source does not exist", func() {
|
It("should fail if source does not exist", func() {
|
||||||
Expect(utils.SyncData(logger, fs, "/welp", "/walp")).NotTo(BeNil())
|
Expect(utils.SyncData(logger, realRunner, fs, "/welp", "/walp")).NotTo(BeNil())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
Describe("IsLocalURI", Label("uri"), func() {
|
Describe("IsLocalURI", Label("uri"), func() {
|
||||||
|
Reference in New Issue
Block a user