diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b847f56..798f74b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -68,3 +68,9 @@ jobs: run: | ./fastp --version ./fastp -i testdata/R1.fq -o /dev/null + + - name: upload binary + uses: actions/upload-artifact@v4 + with: + name: fastp-${{ runner.os }}-${{ runner.arch }} + path: fastp diff --git a/src/singleproducersingleconsumerlist.h b/src/singleproducersingleconsumerlist.h index ee80647..66304dc 100644 --- a/src/singleproducersingleconsumerlist.h +++ b/src/singleproducersingleconsumerlist.h @@ -99,6 +99,8 @@ class SingleProducerSingleConsumerList { if(head==NULL) { head = item; tail = item; + // Signal the first item is consumable (no predecessor to set this) + head->nextItemReady.store(true, std::memory_order_release); } else { tail->nextItem = item; tail->nextItemReady = true; diff --git a/src/writerthread.cpp b/src/writerthread.cpp index 3424e3e..5d21091 100644 --- a/src/writerthread.cpp +++ b/src/writerthread.cpp @@ -6,6 +6,7 @@ #include #include #include +#include WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ mOptions = opt; @@ -19,7 +20,7 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ mNextSeq = NULL; mCompressors = NULL; mCompBufs = NULL; - mCompBufSize = 0; + mCompBufSizes = NULL; mBufferLists = NULL; if (mPwriteMode) { @@ -33,11 +34,13 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ mCompressors = new libdeflate_compressor*[mOptions->thread]; for (int t = 0; t < mOptions->thread; t++) mCompressors[t] = libdeflate_alloc_compressor(mOptions->compression); - // Pre-allocate per-worker compress buffers (avoids malloc/free per pack) - mCompBufSize = PACK_SIZE * 500; // ~500 bytes/read worst case + size_t initBufSize = PACK_SIZE * 500; mCompBufs = new char*[mOptions->thread]; - for (int t = 0; t < mOptions->thread; t++) - mCompBufs[t] = new char[mCompBufSize]; + mCompBufSizes = new size_t[mOptions->thread]; + for (int t = 0; t < mOptions->thread; t++) { + mCompBufs[t] = new char[initBufSize]; + mCompBufSizes[t] = initBufSize; + } mWorkingBufferList = 0; mBufferLength = 0; } else { @@ -114,11 +117,11 @@ void WriterThread::input(int tid, string* data) { void WriterThread::inputPwrite(int tid, string* data) { size_t bound = libdeflate_gzip_compress_bound(mCompressors[tid], data->size()); - // Grow pre-allocated buffer if needed - if (bound > mCompBufSize) { + // Grow per-worker buffer if needed + if (bound > mCompBufSizes[tid]) { delete[] mCompBufs[tid]; mCompBufs[tid] = new char[bound]; - // Note: mCompBufSize is shared but only grows, safe for other threads + mCompBufSizes[tid] = bound; } size_t outsize = libdeflate_gzip_compress(mCompressors[tid], data->data(), data->size(), mCompBufs[tid], bound); @@ -130,16 +133,13 @@ void WriterThread::inputPwrite(int tid, string* data) { size_t seq = mNextSeq[tid]; - // Wait for previous batch's cumulative offset + // Wait for previous batch's cumulative offset. + // Sleep yields CPU to prevent livelock under contention. size_t offset = 0; if (seq > 0) { size_t prevSlot = (seq - 1) & (OFFSET_RING_SIZE - 1); while (mOffsetRing[prevSlot].published_seq.load(std::memory_order_acquire) != seq - 1) { -#if defined(__aarch64__) - __asm__ volatile("yield"); -#elif defined(__x86_64__) || defined(__i386__) - __asm__ volatile("pause"); -#endif + std::this_thread::sleep_for(std::chrono::microseconds(1)); } offset = mOffsetRing[prevSlot].cumulative_offset.load(std::memory_order_relaxed); } @@ -182,6 +182,7 @@ void WriterThread::cleanup() { delete[] mCompBufs[t]; delete[] mCompBufs; mCompBufs = NULL; } + delete[] mCompBufSizes; mCompBufSizes = NULL; return; } deleteWriter(); diff --git a/src/writerthread.h b/src/writerthread.h index a2b7dc8..053d27f 100644 --- a/src/writerthread.h +++ b/src/writerthread.h @@ -62,7 +62,7 @@ class WriterThread{ size_t* mNextSeq; libdeflate_compressor** mCompressors; char** mCompBufs; // per-worker pre-allocated compress output buffers - size_t mCompBufSize; + size_t* mCompBufSizes; // per-worker buffer sizes }; #endif