Merge pull request #129893 from simonfogliato/kubelet-log-permissions

Use uncompressed kubelet log file permissions when compressed.
This commit is contained in:
Kubernetes Prow Robot 2025-03-19 17:54:30 -07:00 committed by GitHub
commit 26b1d3424a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 36 additions and 24 deletions

View File

@ -79,7 +79,7 @@ func GetAllLogs(log string) ([]string, error) {
pattern := fmt.Sprintf("%s.*", log)
logs, err := filepath.Glob(pattern)
if err != nil {
return nil, fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
return nil, fmt.Errorf("failed to list all log files with pattern %q: %w", pattern, err)
}
inuse, _ := filterUnusedLogs(logs)
sort.Strings(inuse)
@ -113,7 +113,7 @@ func UncompressLog(log string) (_ io.ReadCloser, retErr error) {
}
f, err := os.Open(log)
if err != nil {
return nil, fmt.Errorf("failed to open log: %v", err)
return nil, fmt.Errorf("failed to open log: %w", err)
}
defer func() {
if retErr != nil {
@ -122,7 +122,7 @@ func UncompressLog(log string) (_ io.ReadCloser, retErr error) {
}()
r, err := gzip.NewReader(f)
if err != nil {
return nil, fmt.Errorf("failed to create gzip reader: %v", err)
return nil, fmt.Errorf("failed to create gzip reader: %w", err)
}
return &compressReadCloser{f: f, Reader: r}, nil
}
@ -158,7 +158,7 @@ func NewContainerLogManager(runtimeService internalapi.RuntimeService, osInterfa
}
parsedMaxSize, err := parseMaxSize(maxSize)
if err != nil {
return nil, fmt.Errorf("failed to parse container log max size %q: %v", maxSize, err)
return nil, fmt.Errorf("failed to parse container log max size %q: %w", maxSize, err)
}
// Negative number means to disable container log rotation
if parsedMaxSize < 0 {
@ -205,7 +205,7 @@ func (c *containerLogManager) Clean(ctx context.Context, containerID string) err
defer c.mutex.Unlock()
resp, err := c.runtimeService.ContainerStatus(ctx, containerID, false)
if err != nil {
return fmt.Errorf("failed to get container status %q: %v", containerID, err)
return fmt.Errorf("failed to get container status %q: %w", containerID, err)
}
if resp.GetStatus() == nil {
return fmt.Errorf("container status is nil for %q", containerID)
@ -213,12 +213,12 @@ func (c *containerLogManager) Clean(ctx context.Context, containerID string) err
pattern := fmt.Sprintf("%s*", resp.GetStatus().GetLogPath())
logs, err := c.osInterface.Glob(pattern)
if err != nil {
return fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
return fmt.Errorf("failed to list all log files with pattern %q: %w", pattern, err)
}
for _, l := range logs {
if err := c.osInterface.Remove(l); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove container %q log %q: %v", containerID, l, err)
return fmt.Errorf("failed to remove container %q log %q: %w", containerID, l, err)
}
}
@ -239,7 +239,7 @@ func (c *containerLogManager) rotateLogs(ctx context.Context) error {
// TODO(#59998): Use kubelet pod cache.
containers, err := c.runtimeService.ListContainers(ctx, &runtimeapi.ContainerFilter{})
if err != nil {
return fmt.Errorf("failed to list containers: %v", err)
return fmt.Errorf("failed to list containers: %w", err)
}
for _, container := range containers {
// Only rotate logs for running containers. Non-running containers won't
@ -315,17 +315,17 @@ func (c *containerLogManager) rotateLog(ctx context.Context, id, log string) err
pattern := fmt.Sprintf("%s.*", log)
logs, err := filepath.Glob(pattern)
if err != nil {
return fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
return fmt.Errorf("failed to list all log files with pattern %q: %w", pattern, err)
}
logs, err = c.cleanupUnusedLogs(logs)
if err != nil {
return fmt.Errorf("failed to cleanup logs: %v", err)
return fmt.Errorf("failed to cleanup logs: %w", err)
}
logs, err = c.removeExcessLogs(logs)
if err != nil {
return fmt.Errorf("failed to remove excess logs: %v", err)
return fmt.Errorf("failed to remove excess logs: %w", err)
}
// Compress uncompressed log files.
@ -334,12 +334,12 @@ func (c *containerLogManager) rotateLog(ctx context.Context, id, log string) err
continue
}
if err := c.compressLog(l); err != nil {
return fmt.Errorf("failed to compress log %q: %v", l, err)
return fmt.Errorf("failed to compress log %q: %w", l, err)
}
}
if err := c.rotateLatestLog(ctx, id, log); err != nil {
return fmt.Errorf("failed to rotate log %q: %v", log, err)
return fmt.Errorf("failed to rotate log %q: %w", log, err)
}
return nil
@ -351,7 +351,7 @@ func (c *containerLogManager) cleanupUnusedLogs(logs []string) ([]string, error)
inuse, unused := filterUnusedLogs(logs)
for _, l := range unused {
if err := c.osInterface.Remove(l); err != nil {
return nil, fmt.Errorf("failed to remove unused log %q: %v", l, err)
return nil, fmt.Errorf("failed to remove unused log %q: %w", l, err)
}
}
return inuse, nil
@ -404,7 +404,7 @@ func (c *containerLogManager) removeExcessLogs(logs []string) ([]string, error)
i := 0
for ; i < len(logs)-maxRotatedFiles; i++ {
if err := c.osInterface.Remove(logs[i]); err != nil {
return nil, fmt.Errorf("failed to remove old log %q: %v", logs[i], err)
return nil, fmt.Errorf("failed to remove old log %q: %w", logs[i], err)
}
}
logs = logs[i:]
@ -413,15 +413,19 @@ func (c *containerLogManager) removeExcessLogs(logs []string) ([]string, error)
// compressLog compresses a log to log.gz with gzip.
func (c *containerLogManager) compressLog(log string) error {
logInfo, err := os.Stat(log)
if err != nil {
return fmt.Errorf("failed to stat log file: %w", err)
}
r, err := c.osInterface.Open(log)
if err != nil {
return fmt.Errorf("failed to open log %q: %v", log, err)
return fmt.Errorf("failed to open log %q: %w", log, err)
}
defer r.Close()
tmpLog := log + tmpSuffix
f, err := c.osInterface.OpenFile(tmpLog, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
f, err := c.osInterface.OpenFile(tmpLog, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, logInfo.Mode())
if err != nil {
return fmt.Errorf("failed to create temporary log %q: %v", tmpLog, err)
return fmt.Errorf("failed to create temporary log %q: %w", tmpLog, err)
}
defer func() {
// Best effort cleanup of tmpLog.
@ -431,19 +435,19 @@ func (c *containerLogManager) compressLog(log string) error {
w := gzip.NewWriter(f)
defer w.Close()
if _, err := io.Copy(w, r); err != nil {
return fmt.Errorf("failed to compress %q to %q: %v", log, tmpLog, err)
return fmt.Errorf("failed to compress %q to %q: %w", log, tmpLog, err)
}
// The archive needs to be closed before renaming, otherwise an error will occur on Windows.
w.Close()
f.Close()
compressedLog := log + compressSuffix
if err := c.osInterface.Rename(tmpLog, compressedLog); err != nil {
return fmt.Errorf("failed to rename %q to %q: %v", tmpLog, compressedLog, err)
return fmt.Errorf("failed to rename %q to %q: %w", tmpLog, compressedLog, err)
}
// Remove old log file.
r.Close()
if err := c.osInterface.Remove(log); err != nil {
return fmt.Errorf("failed to remove log %q after compress: %v", log, err)
return fmt.Errorf("failed to remove log %q after compress: %w", log, err)
}
return nil
}
@ -454,7 +458,7 @@ func (c *containerLogManager) rotateLatestLog(ctx context.Context, id, log strin
timestamp := c.clock.Now().Format(timestampFormat)
rotated := fmt.Sprintf("%s.%s", log, timestamp)
if err := c.osInterface.Rename(log, rotated); err != nil {
return fmt.Errorf("failed to rotate log %q to %q: %v", log, rotated, err)
return fmt.Errorf("failed to rotate log %q to %q: %w", log, rotated, err)
}
if err := c.runtimeService.ReopenContainerLog(ctx, id); err != nil {
// Rename the rotated log back, so that we can try rotating it again
@ -466,7 +470,7 @@ func (c *containerLogManager) rotateLatestLog(ctx context.Context, id, log strin
// log.
klog.ErrorS(renameErr, "Failed to rename rotated log", "rotatedLog", rotated, "newLog", log, "containerID", id)
}
return fmt.Errorf("failed to reopen container log %q: %v", id, err)
return fmt.Errorf("failed to reopen container log %q: %w", id, err)
}
return nil
}

View File

@ -370,10 +370,18 @@ func TestCompressLog(t *testing.T) {
testFile.Close()
testLog := testFile.Name()
testLogInfo, err := os.Stat(testLog)
assert.NoError(t, err)
c := &containerLogManager{osInterface: container.RealOS{}}
require.NoError(t, c.compressLog(testLog))
_, err = os.Stat(testLog + compressSuffix)
testLogCompressInfo, err := os.Stat(testLog + compressSuffix)
assert.NoError(t, err, "log should be compressed")
if testLogInfo.Mode() != testLogCompressInfo.Mode() {
t.Errorf("compressed and uncompressed test log file modes do not match")
}
if err := c.compressLog("test-unknown-log"); err == nil {
t.Errorf("compressing unknown log should return error")
}
_, err = os.Stat(testLog + tmpSuffix)
assert.Error(t, err, "temporary log should be renamed")
_, err = os.Stat(testLog)