diff --git a/source/adios2/engine/bp5/BP5Reader.cpp b/source/adios2/engine/bp5/BP5Reader.cpp index 448fa5f62e..92e777c3c7 100644 --- a/source/adios2/engine/bp5/BP5Reader.cpp +++ b/source/adios2/engine/bp5/BP5Reader.cpp @@ -574,6 +574,9 @@ void BP5Reader::PerformGets() params["SelectSteps"] = m_Parameters.SelectSteps; if (m_Parameters.IgnoreFlattenSteps) params["IgnoreFlattenSteps"] = "true"; + // Send our file id so the server can detect stale cached metadata (0 = none). + if (m_FileUUID != 0) + params["FileUUID"] = std::to_string(m_FileUUID); m_Remote->Open(std::get<0>(tup), std::get<1>(tup), m_RemoteName, m_OpenMode, RowMajorOrdering, params); } diff --git a/source/adios2/toolkit/remote/XrootdHttpRemote.cpp b/source/adios2/toolkit/remote/XrootdHttpRemote.cpp index 6c6aab257c..9193052b51 100644 --- a/source/adios2/toolkit/remote/XrootdHttpRemote.cpp +++ b/source/adios2/toolkit/remote/XrootdHttpRemote.cpp @@ -6,6 +6,7 @@ #include "XrootdHttpRemote.h" #include "adios2/helper/adiosLog.h" +#include "adios2/helper/adiosSystem.h" // IsLittleEndian #include #include @@ -343,13 +344,18 @@ void XrootdHttpRemote::Open(const std::string hostname, const int32_t port, if (it != params.end()) m_RequestTimeout = std::stol(it->second); + // File id for the server's identity check. + it = params.find("FileUUID"); + if (it != params.end()) + m_FileUUID = static_cast(std::stoul(it->second)); + // Collect non-HTTP engine params (TarInfo, SelectSteps, IgnoreFlattenSteps) // and encode them as a TAB-separated string for transmission Params engineParams; for (const auto &p : params) { if (p.first != "UseHttps" && p.first != "CAPath" && p.first != "VerifySSL" && - p.first != "ConnectTimeout" && p.first != "RequestTimeout") + p.first != "ConnectTimeout" && p.first != "RequestTimeout" && p.first != "FileUUID") { engineParams[p.first] = p.second; } @@ -442,6 +448,12 @@ std::string XrootdHttpRemote::BuildFileConfigSegment() std::ostringstream s; // Always emit RMOrder so the server doesn't have to guess the default. s << "r" << (m_RowMajorOrdering ? "1" : "0"); + // File id for the identity check (omit when 0). Must precede the greedy `p`. + if (m_FileUUID != 0) + s << "u" << m_FileUUID; + // Client byte order (omit when little-endian). Must precede the greedy `p`. + if (!helper::IsLittleEndian()) + s << "e1"; if (!m_EngineParams.empty()) s << "p" << Base64urlEncode(m_EngineParams); std::string out = s.str(); diff --git a/source/adios2/toolkit/remote/XrootdHttpRemote.h b/source/adios2/toolkit/remote/XrootdHttpRemote.h index 8530e13218..44368bae56 100644 --- a/source/adios2/toolkit/remote/XrootdHttpRemote.h +++ b/source/adios2/toolkit/remote/XrootdHttpRemote.h @@ -147,6 +147,9 @@ class XrootdHttpRemote : public Remote std::string m_BaseUrl; std::string m_Filename; std::string m_EngineParams; + /** File id from the reader's metadata, sent for the server's staleness check; + * 0 = none (legacy file). */ + uint32_t m_FileUUID = 0; /** Per-engine `` path segment, computed once at Open() * from RMOrder + EngineParams. Reused on every request. */ std::string m_FileConfigSegment; diff --git a/source/utils/xrootd-plugin/AccessLog.cpp b/source/utils/xrootd-plugin/AccessLog.cpp index 4f2eee3aca..5c4829c26e 100644 --- a/source/utils/xrootd-plugin/AccessLog.cpp +++ b/source/utils/xrootd-plugin/AccessLog.cpp @@ -153,7 +153,13 @@ void AccessLog::Log(const Record &r) os << ",\"block\":" << r.blockID; if (r.accuracyError != 0.0) os << ",\"acc\":" << r.accuracyError; - os << ",\"bytes\":" << r.bytes << ",\"batch\":" << r.batch << "}"; + os << ",\"bytes\":" << r.bytes << ",\"batch\":" << r.batch; + if (r.reject) + { + os << ",\"reject\":"; + AppendJsonString(os, r.reject); + } + os << "}"; std::string line = os.str(); { diff --git a/source/utils/xrootd-plugin/AccessLog.h b/source/utils/xrootd-plugin/AccessLog.h index 558d3b3994..c63cfae3fb 100644 --- a/source/utils/xrootd-plugin/AccessLog.h +++ b/source/utils/xrootd-plugin/AccessLog.h @@ -48,7 +48,8 @@ class AccessLog uint64_t blockID = static_cast(-1); // -1 => no block selection double accuracyError = 0.0; uint64_t bytes = 0; - uint32_t batch = 1; // number of variables in the batch this belonged to + uint32_t batch = 1; // number of variables in the batch this belonged to + const char *reject = nullptr; // non-null => request was rejected; the reason string }; /* Record one request. No-op when disabled; may drop if the queue is full. */ diff --git a/source/utils/xrootd-plugin/XrdHttpSsiHandler.cpp b/source/utils/xrootd-plugin/XrdHttpSsiHandler.cpp index 4dec9f8b9f..737bdaa60c 100644 --- a/source/utils/xrootd-plugin/XrdHttpSsiHandler.cpp +++ b/source/utils/xrootd-plugin/XrdHttpSsiHandler.cpp @@ -73,8 +73,9 @@ std::string UrlEncode(const std::string &s) // c Count o Start a AccuracyError // N AccuracyNorm R AccuracyRelative // -// File-config codes: -// r RMOrder (single digit) p EngineParams (rest of segment) +// File-config codes (in this order): +// v WireVersion (digits) r RMOrder (single digit) u FileUUID (digits) +// e ClientBigEndian (digits) p EngineParams (rest of segment) // // The decoder builds a legacy "verb Filename=...&key=val&..." string for // the inner SSI parser, which is unchanged. @@ -192,14 +193,24 @@ bool DecodePerRequestParamString(const std::string ¶mstring, std::ostringstr return true; } -// Decode the file-config segment. Strict order: optional `r`, -// then optional `p`. `_` placeholder is empty. +// Decode the file-config segment: optional v, r, u, e in that order, then p. +// `_` placeholder is empty. bool DecodeFileConfigSegment(const std::string &fileConfig, std::ostringstream &out) { if (fileConfig == "_") return true; size_t i = 0; const size_t n = fileConfig.size(); + if (i < n && fileConfig[i] == 'v') + { + ++i; + const size_t start = i; + while (i < n && fileConfig[i] >= '0' && fileConfig[i] <= '9') + ++i; + if (i == start) + return false; // 'v' with no digits + out << "&WireVersion=" << fileConfig.substr(start, i - start); + } if (i < n && fileConfig[i] == 'r') { ++i; @@ -210,6 +221,26 @@ bool DecodeFileConfigSegment(const std::string &fileConfig, std::ostringstream & return false; out << "&RMOrder=" << d; } + if (i < n && fileConfig[i] == 'u') + { + ++i; + const size_t start = i; + while (i < n && fileConfig[i] >= '0' && fileConfig[i] <= '9') + ++i; + if (i == start) + return false; // 'u' with no digits + out << "&FileUUID=" << fileConfig.substr(start, i - start); + } + if (i < n && fileConfig[i] == 'e') + { + ++i; + const size_t start = i; + while (i < n && fileConfig[i] >= '0' && fileConfig[i] <= '9') + ++i; + if (i == start) + return false; // 'e' with no digits + out << "&ClientBigEndian=" << fileConfig.substr(start, i - start); + } if (i < n && fileConfig[i] == 'p') { ++i; diff --git a/source/utils/xrootd-plugin/XrdSsiSvService.cpp b/source/utils/xrootd-plugin/XrdSsiSvService.cpp index 506fa51466..7c0b5b0610 100644 --- a/source/utils/xrootd-plugin/XrdSsiSvService.cpp +++ b/source/utils/xrootd-plugin/XrdSsiSvService.cpp @@ -78,6 +78,23 @@ void *SvAdiosGet(void *svP) sessP->doAdiosGet(); return 0; } + +// Highest request wire-format version we accept (clients emit none yet, absent == +// 0). Policing it gives a future newer client a clear error, not a parse failure. +constexpr uint32_t kMaxWireVersion = 0; + +// Record a rejected request in the access log (the normal entry is logged only +// after a successful read, which a rejection never reaches). +void LogReject(const std::string &file, const char *reason, uint32_t batch) +{ + if (!AccessLog::Instance().Enabled()) + return; + AccessLog::Record rec; + rec.file = file.c_str(); + rec.reject = reason; + rec.batch = batch; + AccessLog::Instance().Log(rec); +} } /******************************************************************************/ @@ -499,6 +516,9 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP) std::string Filename; std::string EngineParams; bool ArrayOrder = true; + uint32_t RequestUUID = 0; // file id from client metadata; 0 = no check + uint32_t WireVersion = 0; // request format version; absent/0 = original + bool ClientBigEndian = false; // client native byte order; absent = little size_t NVars = 0; struct VarRequest @@ -619,6 +639,26 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP) std::size_t pos = param.find("=") + 1; EngineParams = UrlDecode(param.substr(pos)); } + else if (HasPrefix(param, "FileUUID=")) + { + std::size_t pos = param.find("=") + 1; + std::stringstream sstream(param.substr(pos)); + sstream >> RequestUUID; + } + else if (HasPrefix(param, "WireVersion=")) + { + std::size_t pos = param.find("=") + 1; + std::stringstream sstream(param.substr(pos)); + sstream >> WireVersion; + } + else if (HasPrefix(param, "ClientBigEndian=")) + { + std::size_t pos = param.find("=") + 1; + std::stringstream sstream(param.substr(pos)); + int be = 0; + sstream >> be; + ClientBigEndian = (be != 0); + } } } @@ -626,9 +666,25 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP) if (Filename.empty() || NVars == 0) { + LogReject(Filename, "badparams", static_cast(NVars)); RespondErr("batchget: invalid parameters", EINVAL); return; } + if (WireVersion > kMaxWireVersion) + { + LogReject(Filename, "version", static_cast(NVars)); + std::string msg = "batchget: unsupported request protocol version " + + std::to_string(WireVersion) + "; this server supports up to " + + std::to_string(kMaxWireVersion); + RespondErr(msg.c_str(), EINVAL); + return; + } + if (ClientBigEndian != !adios2::helper::IsLittleEndian()) + { + LogReject(Filename, "byteorder", static_cast(NVars)); + RespondErr("batchget: cross-endian remote reads not supported", EINVAL); + return; + } try { @@ -636,6 +692,15 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP) auto engine = poolEntry.file->m_engine; auto io = poolEntry.file->m_io; + // Reject if the client's file id doesn't match the file we opened + // (stale cached metadata); 0 = client has no id, no check. + if (RequestUUID != 0 && engine.FileUUID() != RequestUUID) + { + LogReject(Filename, "identity", static_cast(NVars)); + RespondErr("batchget: file identity mismatch; remote metadata is stale", EINVAL); + return; + } + // Compute sizes for each variable and total response size // Response format: [uint64_t NVars][uint64_t size_0]...[size_N-1][data_0]...[data_N-1] size_t headerSize = sizeof(uint64_t) + NVars * sizeof(uint64_t); @@ -648,6 +713,7 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP) adios2::DataType TypeOfVar = io.InquireVariableType(vr.VarName); if (TypeOfVar == adios2::DataType::None) { + LogReject(Filename, "novar", static_cast(NVars)); RespondErr("batchget: unknown variable", EINVAL); return; } @@ -678,6 +744,7 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP) m_responseBuffer = (char *)malloc(m_responseBufferSize); if (!m_responseBuffer) { + LogReject(Filename, "alloc", static_cast(NVars)); RespondErr("batchget: allocation failed", ENOMEM); return; } @@ -773,6 +840,7 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP) } catch (const std::exception &exc) { + LogReject(Filename, "exception", static_cast(NVars)); std::string msg = std::string("batchget: exception during processing: ") + exc.what(); RespondErr(msg.c_str(), EINVAL); } @@ -787,6 +855,9 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP) size_t BlockID = (size_t)-1, DimCount = 0, StepStart = 0; size_t StepCount = 1; bool ArrayOrder = true; + uint32_t RequestUUID = 0; // file id from client metadata; 0 = no check + uint32_t WireVersion = 0; // request format version; absent/0 = original + bool ClientBigEndian = false; // client native byte order; absent = little // Accuracy parameters double AccuracyError = 0.0; double AccuracyNorm = 0.0; @@ -885,6 +956,41 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP) std::size_t pos = param.find("=") + 1; EngineParams = UrlDecode(param.substr(pos)); } + else if (HasPrefix(param, "FileUUID=")) + { + std::size_t pos = param.find("=") + 1; + std::stringstream sstream(param.substr(pos)); + sstream >> RequestUUID; + } + else if (HasPrefix(param, "WireVersion=")) + { + std::size_t pos = param.find("=") + 1; + std::stringstream sstream(param.substr(pos)); + sstream >> WireVersion; + } + else if (HasPrefix(param, "ClientBigEndian=")) + { + std::size_t pos = param.find("=") + 1; + std::stringstream sstream(param.substr(pos)); + int be = 0; + sstream >> be; + ClientBigEndian = (be != 0); + } + } + if (WireVersion > kMaxWireVersion) + { + LogReject(Filename, "version", 1); + std::string msg = "get: unsupported request protocol version " + + std::to_string(WireVersion) + "; this server supports up to " + + std::to_string(kMaxWireVersion); + RespondErr(msg.c_str(), EINVAL); + return; + } + if (ClientBigEndian != !adios2::helper::IsLittleEndian()) + { + LogReject(Filename, "byteorder", 1); + RespondErr("get: cross-endian remote reads not supported", EINVAL); + return; } // Get a "anonymous" engine with this file open with this array order. // (any other differentiating characteristics of an ADIOS Open should be included in these @@ -901,10 +1007,19 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP) auto poolEntry = m_FilePoolPtr->GetFree(Filename, ArrayOrder, EngineParams); auto engine = poolEntry.file->m_engine; auto io = poolEntry.file->m_io; + + // Metadata-identity check (0 = client has no id, enforces nothing). + if (RequestUUID != 0 && engine.FileUUID() != RequestUUID) + { + LogReject(Filename, "identity", 1); + RespondErr("get: file identity mismatch; remote metadata is stale", EINVAL); + return; + } adios2::Box varSel(Start, Count); adios2::DataType TypeOfVar = io.InquireVariableType(VarName); if (TypeOfVar == adios2::DataType::None) { + LogReject(Filename, "novar", 1); RespondErr("get: unknown variable", EINVAL); return; } @@ -975,6 +1090,7 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP) } catch (const std::exception &exc) { + LogReject(Filename, "exception", 1); std::string errMsg = std::string("Exception in Get: ") + exc.what(); RespondErr(errMsg.c_str(), EINVAL); }