From 3ad2a047f27158d01b49eb9cf8181d08b33c0245 Mon Sep 17 00:00:00 2001 From: Junmin Gu Date: Wed, 27 Aug 2025 17:13:22 -0400 Subject: [PATCH 01/16] use span to allocate mesh data by default this makes it possible to aggregate data for later flush --- .../FlushFormats/FlushFormatOpenPMD.H | 1 + .../FlushFormats/FlushFormatOpenPMD.cpp | 6 ++ Source/Diagnostics/WarpXOpenPMD.H | 1 + Source/Diagnostics/WarpXOpenPMD.cpp | 78 +++++++++++++++---- 4 files changed, 69 insertions(+), 17 deletions(-) diff --git a/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.H b/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.H index 5666d85bf3a..a801854db03 100644 --- a/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.H +++ b/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.H @@ -53,6 +53,7 @@ public: private: /** This is responsible for dumping to file */ std::unique_ptr< WarpXOpenPMDPlot > m_OpenPMDPlotWriter; + int m_NumAggBTDBufferToFlush=5; }; #endif // WARPX_FLUSHFORMATOPENPMD_H_ diff --git a/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp b/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp index ae54fd13e5a..af692cbbbdc 100644 --- a/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp +++ b/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp @@ -64,6 +64,9 @@ FlushFormatOpenPMD::FlushFormatOpenPMD (const std::string& diag_name) ablastr::warn_manager::WMRecordWarning("Diagnostics", warnMsg); encoding = openPMD::IterationEncoding::groupBased; } + + pp_diag_name.query("buffer_flush_limit_btd", m_NumAggBTDBufferToFlush); + amrex::Print()<<" BTD: ForceFlushEvery: "<WriteOpenPMDParticles( particle_diags, static_cast(time), use_pinned_pc, isBTD, isLastBTDFlush); + if (bufferID % m_NumAggBTDBufferToFlush == 0) + m_OpenPMDPlotWriter->ForceFlush(isBTD); + // signal that no further updates will be written to this iteration m_OpenPMDPlotWriter->CloseStep(isBTD, isLastBTDFlush); } diff --git a/Source/Diagnostics/WarpXOpenPMD.H b/Source/Diagnostics/WarpXOpenPMD.H index ed44fd8de51..5476d1f9efb 100644 --- a/Source/Diagnostics/WarpXOpenPMD.H +++ b/Source/Diagnostics/WarpXOpenPMD.H @@ -152,6 +152,7 @@ public: /** Return OpenPMD File type ("bp5", "bp4", "h5" or "json")*/ std::string OpenPMDFileType () { return m_OpenPMDFileType; } + void ForceFlush(bool isBTD); private: void Init (openPMD::Access access, bool isBTD); diff --git a/Source/Diagnostics/WarpXOpenPMD.cpp b/Source/Diagnostics/WarpXOpenPMD.cpp index 17054cd1290..948e125f444 100644 --- a/Source/Diagnostics/WarpXOpenPMD.cpp +++ b/Source/Diagnostics/WarpXOpenPMD.cpp @@ -407,13 +407,27 @@ WarpXOpenPMDPlot::~WarpXOpenPMDPlot () } } +/* + * If I/O is through ADIOS: + * isBTD=true => PerformPut + * this way we do not flush out every buffer in a snapshot, + * (BTD uses few data ranks so this is costly for ADIOS collective functions) + * Instead we aggregate a few buffers before calling ForceFlush(isBTD) to write out. + * Note that SPAN is used to allocate CPU data in ADIOS. + * The advantage is when SPAN is successful, PerformPut takes no action. + * + * isBTD=false => PDW + */ void WarpXOpenPMDPlot::flushCurrent (bool isBTD) const { - WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent"); - - openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD); - - currIteration.seriesFlush(); + openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD); + if (isBTD) { + WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent-PP()"); + currIteration.seriesFlush( "adios2.engine.preferred_flush_target = \"buffer\"" ); + } else { + WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent()"); + currIteration.seriesFlush(); + } } std::string @@ -463,6 +477,7 @@ void WarpXOpenPMDPlot::SetStep (int ts, const std::string& dirPrefix, int file_m void WarpXOpenPMDPlot::CloseStep (bool isBTD, bool isLastBTDFlush) { + WARPX_PROFILE("WarpXOpenPMDPlot::CloseStep()"); // default close is true bool callClose = true; // close BTD file only when isLastBTDFlush is true @@ -666,19 +681,37 @@ for (const auto & particle_diag : particle_diags) { pc->getCharge(), pc->getMass(), isBTD, isLastBTDFlush); } +} + +/* + * Flush a few BTD buffers in a snapshot + * controlled by FlushFormatOpenPMD::m_NumAggBTDBufferToFlush (default to 5) + * can be adjusted in the input file: .buffer_flush_limit_btd + */ +void +WarpXOpenPMDPlot::ForceFlush(bool isBTD) +{ + if (!isBTD) + return; auto hasOption = m_OpenPMDoptions.find("FlattenSteps"); - const bool flattenSteps = isBTD && (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos); + const bool result = (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos); - if (flattenSteps) + if (result) { - // forcing new step so data from each btd batch in - // preferred_flush_target="buffer" can be flushed out - openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD); - currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "new_step")"); + WARPX_PROFILE("WarpXOpenPMDPlot::FlattenSteps()"); + openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD); + currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "new_step")"); } + else + { + WARPX_PROFILE("WarpXOpenPMDPlot::PDW()"); + openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD); + currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "disk")"); + } } + void WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc, const std::string& name, @@ -1473,6 +1506,7 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename, } } // icomp setup loop + bool spanWorks = true; for ( int icomp=0; icompisManaged() || fab.arena()->isDevice()) { - amrex::BaseFab foo(local_box, 1, amrex::The_Pinned_Arena()); - std::shared_ptr data_pinned(foo.release()); - amrex::Gpu::dtoh_memcpy_async(data_pinned.get(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real)); - // intentionally delayed until before we .flush(): amrex::Gpu::streamSynchronize(); - mesh_comp.storeChunk(data_pinned, chunk_offset, chunk_size); - } else + { + WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::D2H_Span()"); + auto dynamicMemoryView = mesh_comp.storeChunk( + chunk_offset, chunk_size, + [&local_box, &spanWorks](size_t size) { + (void) size; + spanWorks = false; + amrex::BaseFab foo(local_box, 1, amrex::The_Pinned_Arena()); + std::shared_ptr data_pinned(foo.release()); + return data_pinned; + }); + + auto span = dynamicMemoryView.currentBuffer(); + amrex::Gpu::dtoh_memcpy_async(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real)); + } + } else #endif { amrex::Real const *local_data = fab.dataPtr(icomp); From 3bcdfae893d092b2abb0ab303d3a1259bce6a029 Mon Sep 17 00:00:00 2001 From: Junmin Gu Date: Wed, 27 Aug 2025 19:42:54 -0400 Subject: [PATCH 02/16] touched unused var --- Source/Diagnostics/WarpXOpenPMD.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Source/Diagnostics/WarpXOpenPMD.cpp b/Source/Diagnostics/WarpXOpenPMD.cpp index 948e125f444..6a13b89e30a 100644 --- a/Source/Diagnostics/WarpXOpenPMD.cpp +++ b/Source/Diagnostics/WarpXOpenPMD.cpp @@ -1557,6 +1557,8 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename, auto span = dynamicMemoryView.currentBuffer(); amrex::Gpu::dtoh_memcpy_async(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real)); + if (!spanWorks) + amrex::Print()<<" span failed \n"; } } else #endif From fb2bd4803d6e7adb28cb87feb96193518bbd1a3d Mon Sep 17 00:00:00 2001 From: Junmin Gu Date: Wed, 27 Aug 2025 20:13:24 -0400 Subject: [PATCH 03/16] removed unused var --- Source/Diagnostics/WarpXOpenPMD.cpp | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/Source/Diagnostics/WarpXOpenPMD.cpp b/Source/Diagnostics/WarpXOpenPMD.cpp index 6a13b89e30a..12ffeb00e02 100644 --- a/Source/Diagnostics/WarpXOpenPMD.cpp +++ b/Source/Diagnostics/WarpXOpenPMD.cpp @@ -1506,7 +1506,6 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename, } } // icomp setup loop - bool spanWorks = true; for ( int icomp=0; icomp( chunk_offset, chunk_size, - [&local_box, &spanWorks](size_t size) { + [&local_box](size_t size) { (void) size; - spanWorks = false; + amrex::Print()<<" span failed \n"; amrex::BaseFab foo(local_box, 1, amrex::The_Pinned_Arena()); std::shared_ptr data_pinned(foo.release()); return data_pinned; @@ -1557,8 +1556,6 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename, auto span = dynamicMemoryView.currentBuffer(); amrex::Gpu::dtoh_memcpy_async(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real)); - if (!spanWorks) - amrex::Print()<<" span failed \n"; } } else #endif From 2e384ba03bbfca89ccc912287239e567ea87207f Mon Sep 17 00:00:00 2001 From: guj Date: Wed, 3 Sep 2025 12:27:39 -0700 Subject: [PATCH 04/16] fixed style, moved comments, etc --- .../FlushFormats/FlushFormatOpenPMD.H | 6 ++++- .../FlushFormats/FlushFormatOpenPMD.cpp | 1 - Source/Diagnostics/WarpXOpenPMD.H | 14 +++++++++-- Source/Diagnostics/WarpXOpenPMD.cpp | 24 ++++--------------- 4 files changed, 21 insertions(+), 24 deletions(-) diff --git a/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.H b/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.H index a801854db03..22968ae3a34 100644 --- a/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.H +++ b/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.H @@ -53,7 +53,11 @@ public: private: /** This is responsible for dumping to file */ std::unique_ptr< WarpXOpenPMDPlot > m_OpenPMDPlotWriter; - int m_NumAggBTDBufferToFlush=5; + + /** This parameter is corresponding to the input option + "buffer_flush_limit_btd" at the diagnostic level. + By default we set to flush every 5 buffers per snapshot */ + int m_NumAggBTDBufferToFlush = 5; }; #endif // WARPX_FLUSHFORMATOPENPMD_H_ diff --git a/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp b/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp index af692cbbbdc..903ebd60cb2 100644 --- a/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp +++ b/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp @@ -66,7 +66,6 @@ FlushFormatOpenPMD::FlushFormatOpenPMD (const std::string& diag_name) } pp_diag_name.query("buffer_flush_limit_btd", m_NumAggBTDBufferToFlush); - amrex::Print()<<" BTD: ForceFlushEvery: "<.buffer_flush_limit_btd + */ + void ForceFlush (bool isBTD); private: void Init (openPMD::Access access, bool isBTD); @@ -182,7 +188,11 @@ private: * @param[in] isBTD if the current diagnostic is BTD * * if isBTD=false, apply the default flush behaviour - * if isBTD=true, advice to use ADIOS Put() instead of PDW for better performance. + * in ADIOS, the action will be PerformDataWrite + * if isBTD=true, in ADIOS, the action will be PerformPut + * because no action is taken for the span tasks. + * This way we can aggregate buffers before + * calling ForceFlush(isBTD) to write out. * * iteration.seriesFlush() is used instead of series.flush() * because the latter flushes only if data is dirty diff --git a/Source/Diagnostics/WarpXOpenPMD.cpp b/Source/Diagnostics/WarpXOpenPMD.cpp index 12ffeb00e02..bcd01b1a248 100644 --- a/Source/Diagnostics/WarpXOpenPMD.cpp +++ b/Source/Diagnostics/WarpXOpenPMD.cpp @@ -407,23 +407,12 @@ WarpXOpenPMDPlot::~WarpXOpenPMDPlot () } } -/* - * If I/O is through ADIOS: - * isBTD=true => PerformPut - * this way we do not flush out every buffer in a snapshot, - * (BTD uses few data ranks so this is costly for ADIOS collective functions) - * Instead we aggregate a few buffers before calling ForceFlush(isBTD) to write out. - * Note that SPAN is used to allocate CPU data in ADIOS. - * The advantage is when SPAN is successful, PerformPut takes no action. - * - * isBTD=false => PDW - */ void WarpXOpenPMDPlot::flushCurrent (bool isBTD) const { openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD); if (isBTD) { - WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent-PP()"); - currIteration.seriesFlush( "adios2.engine.preferred_flush_target = \"buffer\"" ); + WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent()::BTD"); + currIteration.seriesFlush("adios2.engine.preferred_flush_target = \"buffer\""); } else { WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent()"); currIteration.seriesFlush(); @@ -683,12 +672,7 @@ for (const auto & particle_diag : particle_diags) { } } -/* - * Flush a few BTD buffers in a snapshot - * controlled by FlushFormatOpenPMD::m_NumAggBTDBufferToFlush (default to 5) - * can be adjusted in the input file: .buffer_flush_limit_btd - */ -void +Void WarpXOpenPMDPlot::ForceFlush(bool isBTD) { if (!isBTD) @@ -705,7 +689,7 @@ WarpXOpenPMDPlot::ForceFlush(bool isBTD) } else { - WARPX_PROFILE("WarpXOpenPMDPlot::PDW()"); + WARPX_PROFILE("WarpXOpenPMDPlot::ForceFlush()::Disk()"); openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD); currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "disk")"); } From ff200122a9a9871cd8ddb34688818528fd92878b Mon Sep 17 00:00:00 2001 From: guj Date: Wed, 3 Sep 2025 12:30:40 -0700 Subject: [PATCH 05/16] fixed typo --- Source/Diagnostics/WarpXOpenPMD.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/Diagnostics/WarpXOpenPMD.cpp b/Source/Diagnostics/WarpXOpenPMD.cpp index bcd01b1a248..8867fc4c15c 100644 --- a/Source/Diagnostics/WarpXOpenPMD.cpp +++ b/Source/Diagnostics/WarpXOpenPMD.cpp @@ -672,7 +672,7 @@ for (const auto & particle_diag : particle_diags) { } } -Void +void WarpXOpenPMDPlot::ForceFlush(bool isBTD) { if (!isBTD) From 8b926ede223a1b983b7c7d65f6fee270a7279459 Mon Sep 17 00:00:00 2001 From: guj Date: Wed, 3 Sep 2025 12:40:03 -0700 Subject: [PATCH 06/16] fixed doc style error --- Source/Diagnostics/WarpXOpenPMD.H | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/Diagnostics/WarpXOpenPMD.H b/Source/Diagnostics/WarpXOpenPMD.H index 91156ac724a..8bcd6502387 100644 --- a/Source/Diagnostics/WarpXOpenPMD.H +++ b/Source/Diagnostics/WarpXOpenPMD.H @@ -156,7 +156,7 @@ public: * @param[in] isBTD if the current diagnostic is BTD * This function is controlled by the paramter * FlushFormatOpenPMD::m_NumAggBTDBufferToFlush (default to 5), - * it can be adjusted in the input file: .buffer_flush_limit_btd + * it can be adjusted in the input file: diag_name.buffer_flush_limit_btd */ void ForceFlush (bool isBTD); private: From c832012cbc445c55a9592836a61d4bce8eaf5ae8 Mon Sep 17 00:00:00 2001 From: Axel Huebl Date: Wed, 3 Sep 2025 16:17:11 -0700 Subject: [PATCH 07/16] Fix Doxygen --- Source/Diagnostics/WarpXOpenPMD.H | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Source/Diagnostics/WarpXOpenPMD.H b/Source/Diagnostics/WarpXOpenPMD.H index 8bcd6502387..6394b0a88fa 100644 --- a/Source/Diagnostics/WarpXOpenPMD.H +++ b/Source/Diagnostics/WarpXOpenPMD.H @@ -153,10 +153,12 @@ public: std::string OpenPMDFileType () { return m_OpenPMDFileType; } /** Flush a few BTD buffers in a snapshot - * @param[in] isBTD if the current diagnostic is BTD - * This function is controlled by the paramter + * + * This function is controlled by the parameter * FlushFormatOpenPMD::m_NumAggBTDBufferToFlush (default to 5), * it can be adjusted in the input file: diag_name.buffer_flush_limit_btd + * + * @param[in] isBTD if the current diagnostic is BTD */ void ForceFlush (bool isBTD); private: From ff047b1615e7e58267fbf808439e9ff9a08ff72d Mon Sep 17 00:00:00 2001 From: guj Date: Wed, 3 Sep 2025 18:08:01 -0700 Subject: [PATCH 08/16] more documentation, function name change for ForceFlush() --- .../FlushFormats/FlushFormatOpenPMD.cpp | 4 +-- Source/Diagnostics/WarpXOpenPMD.H | 20 ++++++++----- Source/Diagnostics/WarpXOpenPMD.cpp | 29 +++++++++---------- 3 files changed, 29 insertions(+), 24 deletions(-) diff --git a/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp b/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp index 903ebd60cb2..85b28d9aba3 100644 --- a/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp +++ b/Source/Diagnostics/FlushFormats/FlushFormatOpenPMD.cpp @@ -178,8 +178,8 @@ FlushFormatOpenPMD::WriteToFile ( m_OpenPMDPlotWriter->WriteOpenPMDParticles( particle_diags, static_cast(time), use_pinned_pc, isBTD, isLastBTDFlush); - if (bufferID % m_NumAggBTDBufferToFlush == 0) - m_OpenPMDPlotWriter->ForceFlush(isBTD); + if (isBTD && (bufferID % m_NumAggBTDBufferToFlush == 0) ) + m_OpenPMDPlotWriter->FlushBTDToDisk(); // signal that no further updates will be written to this iteration m_OpenPMDPlotWriter->CloseStep(isBTD, isLastBTDFlush); diff --git a/Source/Diagnostics/WarpXOpenPMD.H b/Source/Diagnostics/WarpXOpenPMD.H index 6394b0a88fa..5fa15a38bb3 100644 --- a/Source/Diagnostics/WarpXOpenPMD.H +++ b/Source/Diagnostics/WarpXOpenPMD.H @@ -152,15 +152,21 @@ public: /** Return OpenPMD File type ("bp5", "bp4", "h5" or "json")*/ std::string OpenPMDFileType () { return m_OpenPMDFileType; } - /** Flush a few BTD buffers in a snapshot + /** Ensure BTD buffers are written to disk * - * This function is controlled by the parameter - * FlushFormatOpenPMD::m_NumAggBTDBufferToFlush (default to 5), - * it can be adjusted in the input file: diag_name.buffer_flush_limit_btd + * This function can be called to intermediately ensure ADIOS buffered "steps" + * are written to disk, and the valid metadata if checkpointing is required. * - * @param[in] isBTD if the current diagnostic is BTD + * This is needed to read partial data while a simulation is running or + * to support restarting (the BTD diagnostics) in WarpX, so it + * can continue to append to a partially written labframe station + * after restart. + * + * The frequency is controlled by + * FlushFormatOpenPMD::m_NumAggBTDBufferToFlush (default to 5). + * It can be adjusted in the input file: diag_name.buffer_flush_limit_btd */ - void ForceFlush (bool isBTD); + void FlushBTDToDisk (); private: void Init (openPMD::Access access, bool isBTD); @@ -194,7 +200,7 @@ private: * if isBTD=true, in ADIOS, the action will be PerformPut * because no action is taken for the span tasks. * This way we can aggregate buffers before - * calling ForceFlush(isBTD) to write out. + * calling FlushBTDToDisk() to write out. * * iteration.seriesFlush() is used instead of series.flush() * because the latter flushes only if data is dirty diff --git a/Source/Diagnostics/WarpXOpenPMD.cpp b/Source/Diagnostics/WarpXOpenPMD.cpp index 8867fc4c15c..a438a184543 100644 --- a/Source/Diagnostics/WarpXOpenPMD.cpp +++ b/Source/Diagnostics/WarpXOpenPMD.cpp @@ -673,17 +673,17 @@ for (const auto & particle_diag : particle_diags) { } void -WarpXOpenPMDPlot::ForceFlush(bool isBTD) +WarpXOpenPMDPlot::FlushBTDToDisk() { - if (!isBTD) - return; - + bool isBTD = true; auto hasOption = m_OpenPMDoptions.find("FlattenSteps"); - const bool result = (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos); + const bool doFlattenSteps = (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos); - if (result) + if (doFlattenSteps) { - WARPX_PROFILE("WarpXOpenPMDPlot::FlattenSteps()"); + WARPX_PROFILE("WarpXOpenPMDPlot::ForceFlush()"); + // Here for checkpointing purpose, we ask ADIOS to create to a new step, which + // triggers writting both data and metadata. openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD); currIteration.seriesFlush(R"(adios2.engine.preferred_flush_target = "new_step")"); } @@ -1529,14 +1529,13 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename, { WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::D2H_Span()"); auto dynamicMemoryView = mesh_comp.storeChunk( - chunk_offset, chunk_size, - [&local_box](size_t size) { - (void) size; - amrex::Print()<<" span failed \n"; - amrex::BaseFab foo(local_box, 1, amrex::The_Pinned_Arena()); - std::shared_ptr data_pinned(foo.release()); - return data_pinned; - }); + chunk_offset, chunk_size, + [&local_box](size_t size) { + (void) size; + amrex::BaseFab foo(local_box, 1, amrex::The_Pinned_Arena()); + std::shared_ptr data_pinned(foo.release()); + return data_pinned; + }); auto span = dynamicMemoryView.currentBuffer(); amrex::Gpu::dtoh_memcpy_async(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real)); From 2a5da56fd86c9b2d2576163cbf50daf920815f2c Mon Sep 17 00:00:00 2001 From: guj Date: Mon, 8 Sep 2025 11:59:11 -0700 Subject: [PATCH 09/16] added documentation for .buffer_flush_limit_btd --- Docs/source/usage/parameters.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Docs/source/usage/parameters.rst b/Docs/source/usage/parameters.rst index 96ff8b26834..6b732a702c8 100644 --- a/Docs/source/usage/parameters.rst +++ b/Docs/source/usage/parameters.rst @@ -2914,6 +2914,9 @@ In-situ capabilities can be used by turning on Sensei or Ascent (provided they a ``variable based`` is an `experimental feature with ADIOS2 BP5 `__ that will replace ``g``. Default: ``f`` (full diagnostics) +* ``.buffer_flush_limit_btd`` (`integer`; defaults to 5) optional, only read if ``.diag_type = BackTransformed`` + This parameter is intended for ADIOS backend to group every N buffers (N is the value of this parameter) and then flush to disk. + * ``.adios2_operator.type`` (``zfp``, ``blosc``) optional, `ADIOS2 I/O operator type `__ for `openPMD `_ data dumps. From e79da499f7fb0db45a12a381f3abc0aa139ae3ef Mon Sep 17 00:00:00 2001 From: guj Date: Mon, 8 Sep 2025 17:48:38 -0700 Subject: [PATCH 10/16] revered doFlattenSteps to flattenSteps --- Source/Diagnostics/WarpXOpenPMD.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/Diagnostics/WarpXOpenPMD.cpp b/Source/Diagnostics/WarpXOpenPMD.cpp index a438a184543..396fb67f298 100644 --- a/Source/Diagnostics/WarpXOpenPMD.cpp +++ b/Source/Diagnostics/WarpXOpenPMD.cpp @@ -677,9 +677,9 @@ WarpXOpenPMDPlot::FlushBTDToDisk() { bool isBTD = true; auto hasOption = m_OpenPMDoptions.find("FlattenSteps"); - const bool doFlattenSteps = (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos); + const bool flattenSteps = (m_Series->backend() == "ADIOS2") && (hasOption != std::string::npos); - if (doFlattenSteps) + if (flattenSteps) { WARPX_PROFILE("WarpXOpenPMDPlot::ForceFlush()"); // Here for checkpointing purpose, we ask ADIOS to create to a new step, which From 5b84af22ac5370ce688185bc63ac98843a4c69d3 Mon Sep 17 00:00:00 2001 From: guj Date: Mon, 8 Sep 2025 18:03:33 -0700 Subject: [PATCH 11/16] simpler syntax from Axel's suggestion --- Source/Diagnostics/WarpXOpenPMD.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Source/Diagnostics/WarpXOpenPMD.cpp b/Source/Diagnostics/WarpXOpenPMD.cpp index 396fb67f298..00e0702da12 100644 --- a/Source/Diagnostics/WarpXOpenPMD.cpp +++ b/Source/Diagnostics/WarpXOpenPMD.cpp @@ -1530,8 +1530,7 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename, WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::D2H_Span()"); auto dynamicMemoryView = mesh_comp.storeChunk( chunk_offset, chunk_size, - [&local_box](size_t size) { - (void) size; + [&local_box](size_t /* size */) { amrex::BaseFab foo(local_box, 1, amrex::The_Pinned_Arena()); std::shared_ptr data_pinned(foo.release()); return data_pinned; From d408dbccf415ffe694bb34964b6f696e26e896ef Mon Sep 17 00:00:00 2001 From: guj Date: Mon, 8 Sep 2025 20:02:25 -0700 Subject: [PATCH 12/16] added CPU span --- Source/Diagnostics/WarpXOpenPMD.cpp | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/Source/Diagnostics/WarpXOpenPMD.cpp b/Source/Diagnostics/WarpXOpenPMD.cpp index 00e0702da12..3a8fedc3770 100644 --- a/Source/Diagnostics/WarpXOpenPMD.cpp +++ b/Source/Diagnostics/WarpXOpenPMD.cpp @@ -1542,9 +1542,17 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename, } else #endif { - amrex::Real const *local_data = fab.dataPtr(icomp); - mesh_comp.storeChunkRaw( - local_data, chunk_offset, chunk_size); + WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::CPU_span()"); + auto dynamicMemoryView = mesh_comp.storeChunk( + chunk_offset, chunk_size, + [&local_box](size_t /* size */) { + amrex::BaseFab foo(local_box, 1); + std::shared_ptr data_pinned(foo.release()); + return data_pinned; + }); + + auto span = dynamicMemoryView.currentBuffer(); + std::memcpy(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real)); } } } // icomp store loop From 6ad9fb2ce9b14a76077829d5884b92bd0f256fd0 Mon Sep 17 00:00:00 2001 From: guj Date: Tue, 9 Sep 2025 10:49:38 -0700 Subject: [PATCH 13/16] if not using ADIOS, flush everytime we got data. --- Source/Diagnostics/WarpXOpenPMD.cpp | 32 +++++++++++++++++++---------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/Source/Diagnostics/WarpXOpenPMD.cpp b/Source/Diagnostics/WarpXOpenPMD.cpp index 3a8fedc3770..92374b29f2f 100644 --- a/Source/Diagnostics/WarpXOpenPMD.cpp +++ b/Source/Diagnostics/WarpXOpenPMD.cpp @@ -1429,6 +1429,8 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename, // collective open series_iteration.open(); + bool hasADIOS = (m_Series->backend() == "ADIOS2"); + auto meshes = series_iteration.meshes; if (first_write_to_iteration) { // lets see whether full_geom varies from geom[0] xgeom[1] @@ -1541,22 +1543,30 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename, } } else #endif - { - WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::CPU_span()"); - auto dynamicMemoryView = mesh_comp.storeChunk( - chunk_offset, chunk_size, - [&local_box](size_t /* size */) { + { // CPU + if (hasADIOS) + { + WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::CPU_span()"); + auto dynamicMemoryView = mesh_comp.storeChunk( + chunk_offset, chunk_size, + [&local_box](size_t /* size */) { amrex::BaseFab foo(local_box, 1); std::shared_ptr data_pinned(foo.release()); return data_pinned; - }); + }); - auto span = dynamicMemoryView.currentBuffer(); - std::memcpy(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real)); - } - } + auto span = dynamicMemoryView.currentBuffer(); + std::memcpy(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real)); + } + else + { + WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::CPU_mesh()"); + amrex::Real const *local_data = fab.dataPtr(icomp); + mesh_comp.storeChunkRaw( local_data, chunk_offset, chunk_size); + } + } // CPU + } } // icomp store loop - #ifdef AMREX_USE_GPU amrex::Gpu::streamSynchronize(); #endif From 30fe6d32679aa8dc4d2d11c3ab20e367028a4f14 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 9 Sep 2025 17:51:27 +0000 Subject: [PATCH 14/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- Source/Diagnostics/WarpXOpenPMD.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Source/Diagnostics/WarpXOpenPMD.cpp b/Source/Diagnostics/WarpXOpenPMD.cpp index 92374b29f2f..cac9ce82c2c 100644 --- a/Source/Diagnostics/WarpXOpenPMD.cpp +++ b/Source/Diagnostics/WarpXOpenPMD.cpp @@ -1544,11 +1544,11 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename, } else #endif { // CPU - if (hasADIOS) - { + if (hasADIOS) + { WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::CPU_span()"); auto dynamicMemoryView = mesh_comp.storeChunk( - chunk_offset, chunk_size, + chunk_offset, chunk_size, [&local_box](size_t /* size */) { amrex::BaseFab foo(local_box, 1); std::shared_ptr data_pinned(foo.release()); @@ -1558,7 +1558,7 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename, auto span = dynamicMemoryView.currentBuffer(); std::memcpy(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real)); } - else + else { WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::CPU_mesh()"); amrex::Real const *local_data = fab.dataPtr(icomp); From e5370a189bc18e4ffcf96bf88f648b5f01538511 Mon Sep 17 00:00:00 2001 From: guj Date: Tue, 9 Sep 2025 16:28:49 -0700 Subject: [PATCH 15/16] Span restricted to ADIOS --- Source/Diagnostics/WarpXOpenPMD.cpp | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/Source/Diagnostics/WarpXOpenPMD.cpp b/Source/Diagnostics/WarpXOpenPMD.cpp index cac9ce82c2c..193bbd3c6b2 100644 --- a/Source/Diagnostics/WarpXOpenPMD.cpp +++ b/Source/Diagnostics/WarpXOpenPMD.cpp @@ -1528,6 +1528,7 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename, // GPU pointers to the I/O library #ifdef AMREX_USE_GPU if (fab.arena()->isManaged() || fab.arena()->isDevice()) { + if (hasADIOS) { WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::D2H_Span()"); auto dynamicMemoryView = mesh_comp.storeChunk( @@ -1537,18 +1538,25 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename, std::shared_ptr data_pinned(foo.release()); return data_pinned; }); - auto span = dynamicMemoryView.currentBuffer(); amrex::Gpu::dtoh_memcpy_async(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real)); + } else + { + WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::D2H()"); + amrex::BaseFab foo(local_box, 1, amrex::The_Pinned_Arena()); + std::shared_ptr data_pinned(foo.release()); + amrex::Gpu::dtoh_memcpy_async(data_pinned.get(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real)); + // intentionally delayed until before we .flush(): amrex::Gpu::streamSynchronize(); + mesh_comp.storeChunk(data_pinned, chunk_offset, chunk_size); } } else #endif { // CPU - if (hasADIOS) - { + if (hasADIOS) + { WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::CPU_span()"); auto dynamicMemoryView = mesh_comp.storeChunk( - chunk_offset, chunk_size, + chunk_offset, chunk_size, [&local_box](size_t /* size */) { amrex::BaseFab foo(local_box, 1); std::shared_ptr data_pinned(foo.release()); @@ -1558,7 +1566,7 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename, auto span = dynamicMemoryView.currentBuffer(); std::memcpy(span.data(), fab.dataPtr(icomp), local_box.numPts()*sizeof(amrex::Real)); } - else + else { WARPX_PROFILE("WarpXOpenPMDPlot::WriteOpenPMDFields::CPU_mesh()"); amrex::Real const *local_data = fab.dataPtr(icomp); From e51ca1a2fe8127ad466e3f3275ddabf87f0cd82e Mon Sep 17 00:00:00 2001 From: Junmin Gu Date: Wed, 10 Sep 2025 16:49:44 -0400 Subject: [PATCH 16/16] changed function name --- Source/Diagnostics/WarpXOpenPMD.H | 2 +- Source/Diagnostics/WarpXOpenPMD.cpp | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Source/Diagnostics/WarpXOpenPMD.H b/Source/Diagnostics/WarpXOpenPMD.H index 5fa15a38bb3..128f50e287d 100644 --- a/Source/Diagnostics/WarpXOpenPMD.H +++ b/Source/Diagnostics/WarpXOpenPMD.H @@ -207,7 +207,7 @@ private: * this causes trouble when the underlying writing function is collective (like PDW) * */ - void flushCurrent (bool isBTD) const; + void seriesFlush (bool isBTD) const; /** This function does initial setup for the fields when interation is newly created * @param[in] meshes The meshes in a series diff --git a/Source/Diagnostics/WarpXOpenPMD.cpp b/Source/Diagnostics/WarpXOpenPMD.cpp index 193bbd3c6b2..8bb32c41e7e 100644 --- a/Source/Diagnostics/WarpXOpenPMD.cpp +++ b/Source/Diagnostics/WarpXOpenPMD.cpp @@ -407,14 +407,14 @@ WarpXOpenPMDPlot::~WarpXOpenPMDPlot () } } -void WarpXOpenPMDPlot::flushCurrent (bool isBTD) const +void WarpXOpenPMDPlot::seriesFlush (bool isBTD) const { openPMD::Iteration currIteration = GetIteration(m_CurrentStep, isBTD); if (isBTD) { - WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent()::BTD"); + WARPX_PROFILE("WarpXOpenPMDPlot::SeriesFlush()::BTD"); currIteration.seriesFlush("adios2.engine.preferred_flush_target = \"buffer\""); } else { - WARPX_PROFILE("WarpXOpenPMDPlot::flushCurrent()"); + WARPX_PROFILE("WarpXOpenPMDPlot::SeriesFlush()()"); currIteration.seriesFlush(); } } @@ -769,7 +769,7 @@ WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc, SetConstParticleRecordsEDPIC(currSpecies, positionComponents, NewParticleVectorSize, charge, mass); } - flushCurrent(isBTD); + this->seriesFlush(isBTD); // dump individual particles bool contributed_particles = false; // did the local MPI rank contribute particles? @@ -850,7 +850,7 @@ WarpXOpenPMDPlot::DumpToFile (ParticleContainer* pc, } } - flushCurrent(isBTD); + this->seriesFlush(isBTD); } void @@ -1579,7 +1579,7 @@ WarpXOpenPMDPlot::WriteOpenPMDFieldsAll ( //const std::string& filename, amrex::Gpu::streamSynchronize(); #endif // Flush data to disk after looping over all components - flushCurrent(isBTD); + this->seriesFlush(isBTD); } // levels loop (i) } #endif // WARPX_USE_OPENPMD