Fix: resolve issue #4478 by using a temporary file for non-append writes

To address the issue where a failed write operation results in an empty file, we can use a temporary file for non-append writes. This ensures that the original file is only replaced once the new content is fully written and committed.

**Key Changes:**

1. **Temporary File Handling:**
   - For non-append writes, a temporary file is created in the same directory as the target file.
   - All write operations are performed on the temporary file first.

2. **Atomic Commit:**
   - The temporary file is only renamed to the target path during `Commit()`, ensuring atomic replacement.
   - If `Commit()` fails, the temporary file is cleaned up.

3. **Error Handling:**
   - `Cancel()` properly removes temporary files if the operation is aborted.
   - `Close()` is made idempotent to handle multiple calls safely.

4. **Data Integrity:**
   - Directory sync after rename ensures metadata persistence.
   - Proper file flushing and syncing before rename operations.

Signed-off-by: Oded Porat <onporat@gmail.com>
This commit is contained in:
Oded Porat
2025-04-16 10:30:20 +03:00
parent 523791cb5f
commit 78456caf46
2 changed files with 72 additions and 24 deletions

View File

@@ -180,29 +180,43 @@ func (d *driver) Writer(ctx context.Context, subPath string, append bool) (stora
return nil, err return nil, err
} }
fp, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE, 0o666) var (
if err != nil { fp *os.File
return nil, err err error
} offset int64
tempFilePath string
var offset int64 )
if !append { if !append {
err := fp.Truncate(0) // Create temporary file in target directory
tempFile, err := os.CreateTemp(parentDir, ".tmp-")
if err != nil { if err != nil {
fp.Close()
return nil, err return nil, err
} }
tempFilePath = tempFile.Name()
tempFile.Close()
// Open temp file with truncation
fp, err = os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o666)
if err != nil {
os.Remove(tempFilePath)
return nil, err
}
offset = 0
} else { } else {
n, err := fp.Seek(0, io.SeekEnd) fp, err = os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE, 0o666)
if err != nil { if err != nil {
fp.Close()
return nil, err return nil, err
} }
offset = n
}
return newFileWriter(fp, offset), nil offset, err = fp.Seek(0, io.SeekEnd)
if err != nil {
fp.Close()
return nil, err
}
}
return newFileWriter(fp, offset, tempFilePath, fullPath), nil
} }
// Stat retrieves the FileInfo for the given path, including the current size // Stat retrieves the FileInfo for the given path, including the current size
@@ -343,13 +357,17 @@ type fileWriter struct {
closed bool closed bool
committed bool committed bool
cancelled bool cancelled bool
tempFilePath string // Path to the temporary file (non-empty for non-append writes)
targetPath string // Target path for final file
} }
func newFileWriter(file *os.File, size int64) *fileWriter { func newFileWriter(file *os.File, size int64, tempFilePath, targetPath string) *fileWriter {
return &fileWriter{ return &fileWriter{
file: file, file: file,
size: size, size: size,
bw: bufio.NewWriter(file), bw: bufio.NewWriter(file),
tempFilePath: tempFilePath,
targetPath: targetPath,
} }
} }
@@ -372,7 +390,7 @@ func (fw *fileWriter) Size() int64 {
func (fw *fileWriter) Close() error { func (fw *fileWriter) Close() error {
if fw.closed { if fw.closed {
return fmt.Errorf("already closed") return nil // Allow multiple Close calls without error
} }
if err := fw.bw.Flush(); err != nil { if err := fw.bw.Flush(); err != nil {
@@ -397,7 +415,15 @@ func (fw *fileWriter) Cancel(ctx context.Context) error {
fw.cancelled = true fw.cancelled = true
fw.file.Close() fw.file.Close()
return os.Remove(fw.file.Name())
// Remove temporary file if it exists
if fw.tempFilePath != "" {
os.Remove(fw.tempFilePath)
} else {
os.Remove(fw.targetPath)
}
return nil
} }
func (fw *fileWriter) Commit(ctx context.Context) error { func (fw *fileWriter) Commit(ctx context.Context) error {
@@ -412,11 +438,30 @@ func (fw *fileWriter) Commit(ctx context.Context) error {
if err := fw.bw.Flush(); err != nil { if err := fw.bw.Flush(); err != nil {
return err return err
} }
if err := fw.file.Sync(); err != nil { if err := fw.file.Sync(); err != nil {
return err return err
} }
// Close the file before renaming (required on some systems)
if err := fw.Close(); err != nil {
return err
}
// Handle temporary file replacement
if fw.tempFilePath != "" {
// Atomically rename temp file to target path
if err := os.Rename(fw.tempFilePath, fw.targetPath); err != nil {
os.Remove(fw.tempFilePath)
return err
}
// Sync directory to ensure rename persistence
if dir, err := os.Open(path.Dir(fw.targetPath)); err == nil {
defer dir.Close()
dir.Sync()
}
}
fw.committed = true fw.committed = true
return nil return nil
} }

View File

@@ -410,6 +410,9 @@ func (suite *DriverSuite) testContinueStreamAppend(chunkSize int64) {
suite.Require().NoError(err) suite.Require().NoError(err)
suite.Require().Equal(chunkSize, nn) suite.Require().Equal(chunkSize, nn)
err = writer.Commit(suite.ctx)
suite.Require().NoError(err)
err = writer.Close() err = writer.Close()
suite.Require().NoError(err) suite.Require().NoError(err)