Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ private void scheduleTimeBasedUploadCheck() {
* This method handles the upload cycle:
* 1. Closes the current bufferedOutputStream to ensure data is flushed to disk
* 2. Uploads the buffer file to S3
* 3. Cleans up the old buffer file and creates a new one
* 4. Reopens the bufferedOutputStream for continued writing
* 3. Cleans up the old buffer file and creates a new one (unless triggerType is CLOSE)
* 4. Reopens the bufferedOutputStream for continued writing if not during close
*
* If upload fails, the buffer file remains intact for crash recovery.
*
Expand Down Expand Up @@ -337,7 +337,6 @@ private void uploadDiskBufferedFileToS3(TriggerType triggerType) throws LogStrea
"bucket=" + bucketName, "host=" + HOSTNAME, "logName=" + logName, "trigger_type=" + triggerType.name());
LOG.info("Successfully uploaded buffer file {}", getBufferFileName());

// Always clean up buffer file and reopen stream after successful upload
boolean deleted = bufferFile.delete();
if (deleted) {
OpenTsdbMetricConverter.incr(SingerMetrics.S3_WRITER + "buffer_file_delete", 1,
Expand Down Expand Up @@ -633,10 +632,25 @@ public void close() throws IOException {
LOG.debug("Cancelled time-based upload task for log {}", logName);
}

// Close buffered output stream to ensure data is flushed to disk
if (bufferedOutputStream != null) {
bufferedOutputStream.close();
bufferedOutputStream = null;

synchronized (objLock) {
try {
if (bufferedOutputStream != null) {
bufferedOutputStream.flush();
}
if (bufferedOutputStream != null && bufferFile.length() > 0) {
try {
uploadDiskBufferedFileToS3(TriggerType.CLOSE);
} catch (LogStreamWriterException e) {
LOG.error("Failed to upload remaining buffer on close for log {}", logName, e);
}
}
} finally {
if (bufferedOutputStream != null) {
bufferedOutputStream.close();
bufferedOutputStream = null;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,33 @@ public void testSuccessfulUploadCleansBufferAndResetsStream() throws Exception {
assertTrue("Buffer should contain new message", content.contains("new data after reset"));
}

@Test
public void testCloseUploadsRemainingBufferData() throws Exception {
when(mockS3Uploader.upload(any(S3ObjectUpload.class))).thenReturn(true);

s3WriterConfig.setMaxFileSizeMB(10);
s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath);

ByteBuffer messageBuffer = ByteBuffer.wrap("data that should be uploaded on close".getBytes());
LogMessage logMessage = new LogMessage(messageBuffer);
LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, null);

s3Writer.startCommit(false);
s3Writer.writeLogMessageToCommit(logMessageAndPosition, false);
s3Writer.endCommit(1, false);

File bufferFile = new File(tempPath + "/" + s3Writer.getBufferFileName());
assertTrue("Buffer should contain data before close", bufferFile.exists() && bufferFile.length() > 0);

verify(mockS3Uploader, never()).upload(any(S3ObjectUpload.class));

s3Writer.close();

verify(mockS3Uploader).upload(any(S3ObjectUpload.class));

assertFalse("Buffer file should be deleted after successful close upload", bufferFile.exists());
}

@Test
public void testBufferFileRecoveryAfterRestartWithMultipleLogStreams() throws Exception {
// Create two different log streams (simulating different log files)
Expand Down