Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,9 @@ jobs:
run: |
./fastp --version
./fastp -i testdata/R1.fq -o /dev/null
- name: upload binary
uses: actions/upload-artifact@v4
with:
name: fastp-${{ runner.os }}-${{ runner.arch }}
path: fastp
2 changes: 2 additions & 0 deletions src/singleproducersingleconsumerlist.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class SingleProducerSingleConsumerList {
if(head==NULL) {
head = item;
tail = item;
// Signal the first item is consumable (no predecessor to set this)
head->nextItemReady.store(true, std::memory_order_release);
} else {
tail->nextItem = item;
tail->nextItemReady = true;
Expand Down
29 changes: 15 additions & 14 deletions src/writerthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <cerrno>
#include <cstring>
#include <thread>
#include <chrono>

WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){
mOptions = opt;
Expand All @@ -19,7 +20,7 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){
mNextSeq = NULL;
mCompressors = NULL;
mCompBufs = NULL;
mCompBufSize = 0;
mCompBufSizes = NULL;
mBufferLists = NULL;

if (mPwriteMode) {
Expand All @@ -33,11 +34,13 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){
mCompressors = new libdeflate_compressor*[mOptions->thread];
for (int t = 0; t < mOptions->thread; t++)
mCompressors[t] = libdeflate_alloc_compressor(mOptions->compression);
// Pre-allocate per-worker compress buffers (avoids malloc/free per pack)
mCompBufSize = PACK_SIZE * 500; // ~500 bytes/read worst case
size_t initBufSize = PACK_SIZE * 500;
mCompBufs = new char*[mOptions->thread];
for (int t = 0; t < mOptions->thread; t++)
mCompBufs[t] = new char[mCompBufSize];
mCompBufSizes = new size_t[mOptions->thread];
for (int t = 0; t < mOptions->thread; t++) {
mCompBufs[t] = new char[initBufSize];
mCompBufSizes[t] = initBufSize;
}
mWorkingBufferList = 0;
mBufferLength = 0;
} else {
Expand Down Expand Up @@ -114,11 +117,11 @@ void WriterThread::input(int tid, string* data) {

void WriterThread::inputPwrite(int tid, string* data) {
size_t bound = libdeflate_gzip_compress_bound(mCompressors[tid], data->size());
// Grow pre-allocated buffer if needed
if (bound > mCompBufSize) {
// Grow per-worker buffer if needed
if (bound > mCompBufSizes[tid]) {
delete[] mCompBufs[tid];
mCompBufs[tid] = new char[bound];
// Note: mCompBufSize is shared but only grows, safe for other threads
mCompBufSizes[tid] = bound;
}
size_t outsize = libdeflate_gzip_compress(mCompressors[tid], data->data(), data->size(),
mCompBufs[tid], bound);
Expand All @@ -130,16 +133,13 @@ void WriterThread::inputPwrite(int tid, string* data) {

size_t seq = mNextSeq[tid];

// Wait for previous batch's cumulative offset
// Wait for previous batch's cumulative offset.
// Sleep yields CPU to prevent livelock under contention.
size_t offset = 0;
if (seq > 0) {
size_t prevSlot = (seq - 1) & (OFFSET_RING_SIZE - 1);
while (mOffsetRing[prevSlot].published_seq.load(std::memory_order_acquire) != seq - 1) {
#if defined(__aarch64__)
__asm__ volatile("yield");
#elif defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause");
#endif
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
offset = mOffsetRing[prevSlot].cumulative_offset.load(std::memory_order_relaxed);
}
Expand Down Expand Up @@ -182,6 +182,7 @@ void WriterThread::cleanup() {
delete[] mCompBufs[t];
delete[] mCompBufs; mCompBufs = NULL;
}
delete[] mCompBufSizes; mCompBufSizes = NULL;
return;
}
deleteWriter();
Expand Down
2 changes: 1 addition & 1 deletion src/writerthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class WriterThread{
size_t* mNextSeq;
libdeflate_compressor** mCompressors;
char** mCompBufs; // per-worker pre-allocated compress output buffers
size_t mCompBufSize;
size_t* mCompBufSizes; // per-worker buffer sizes
};

#endif
Loading