From 109cfa9e2bdff343b4044257c0790f2642b11f80 Mon Sep 17 00:00:00 2001 From: Jesus Zuniga Date: Thu, 5 Mar 2026 16:05:42 -0700 Subject: [PATCH] upload buffer file when writer is closed --- .../pinterest/singer/writer/s3/S3Writer.java | 28 ++++++++++++++----- .../singer/writer/s3/S3WriterTest.java | 27 ++++++++++++++++++ 2 files changed, 48 insertions(+), 7 deletions(-) diff --git a/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java b/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java index 0ddf2d67..8f1b32af 100644 --- a/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java +++ b/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java @@ -306,8 +306,8 @@ private void scheduleTimeBasedUploadCheck() { * This method handles the upload cycle: * 1. Closes the current bufferedOutputStream to ensure data is flushed to disk * 2. Uploads the buffer file to S3 - * 3. Cleans up the old buffer file and creates a new one - * 4. Reopens the bufferedOutputStream for continued writing + * 3. Cleans up the old buffer file and creates a new one (unless triggerType is CLOSE) + * 4. Reopens the bufferedOutputStream for continued writing if not during close * * If upload fails, the buffer file remains intact for crash recovery. * @@ -337,7 +337,6 @@ private void uploadDiskBufferedFileToS3(TriggerType triggerType) throws LogStrea "bucket=" + bucketName, "host=" + HOSTNAME, "logName=" + logName, "trigger_type=" + triggerType.name()); LOG.info("Successfully uploaded buffer file {}", getBufferFileName()); - // Always clean up buffer file and reopen stream after successful upload boolean deleted = bufferFile.delete(); if (deleted) { OpenTsdbMetricConverter.incr(SingerMetrics.S3_WRITER + "buffer_file_delete", 1, @@ -633,10 +632,25 @@ public void close() throws IOException { LOG.debug("Cancelled time-based upload task for log {}", logName); } - // Close buffered output stream to ensure data is flushed to disk - if (bufferedOutputStream != null) { - bufferedOutputStream.close(); - bufferedOutputStream = null; + + synchronized (objLock) { + try { + if (bufferedOutputStream != null) { + bufferedOutputStream.flush(); + } + if (bufferedOutputStream != null && bufferFile.length() > 0) { + try { + uploadDiskBufferedFileToS3(TriggerType.CLOSE); + } catch (LogStreamWriterException e) { + LOG.error("Failed to upload remaining buffer on close for log {}", logName, e); + } + } + } finally { + if (bufferedOutputStream != null) { + bufferedOutputStream.close(); + bufferedOutputStream = null; + } + } } } } \ No newline at end of file diff --git a/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java b/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java index e24da4a7..8b057756 100644 --- a/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java +++ b/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java @@ -431,6 +431,33 @@ public void testSuccessfulUploadCleansBufferAndResetsStream() throws Exception { assertTrue("Buffer should contain new message", content.contains("new data after reset")); } + @Test + public void testCloseUploadsRemainingBufferData() throws Exception { + when(mockS3Uploader.upload(any(S3ObjectUpload.class))).thenReturn(true); + + s3WriterConfig.setMaxFileSizeMB(10); + s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); + + ByteBuffer messageBuffer = ByteBuffer.wrap("data that should be uploaded on close".getBytes()); + LogMessage logMessage = new LogMessage(messageBuffer); + LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, null); + + s3Writer.startCommit(false); + s3Writer.writeLogMessageToCommit(logMessageAndPosition, false); + s3Writer.endCommit(1, false); + + File bufferFile = new File(tempPath + "/" + s3Writer.getBufferFileName()); + assertTrue("Buffer should contain data before close", bufferFile.exists() && bufferFile.length() > 0); + + verify(mockS3Uploader, never()).upload(any(S3ObjectUpload.class)); + + s3Writer.close(); + + verify(mockS3Uploader).upload(any(S3ObjectUpload.class)); + + assertFalse("Buffer file should be deleted after successful close upload", bufferFile.exists()); + } + @Test public void testBufferFileRecoveryAfterRestartWithMultipleLogStreams() throws Exception { // Create two different log streams (simulating different log files)