Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,9 @@ void BP5Reader::PerformGets()
params["SelectSteps"] = m_Parameters.SelectSteps;
if (m_Parameters.IgnoreFlattenSteps)
params["IgnoreFlattenSteps"] = "true";
// Send our file id so the server can detect stale cached metadata (0 = none).
if (m_FileUUID != 0)
params["FileUUID"] = std::to_string(m_FileUUID);
m_Remote->Open(std::get<0>(tup), std::get<1>(tup), m_RemoteName, m_OpenMode,
RowMajorOrdering, params);
}
Expand Down
14 changes: 13 additions & 1 deletion source/adios2/toolkit/remote/XrootdHttpRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "XrootdHttpRemote.h"
#include "adios2/helper/adiosLog.h"
#include "adios2/helper/adiosSystem.h" // IsLittleEndian

#include <chrono>
#include <cstdlib>
Expand Down Expand Up @@ -343,13 +344,18 @@ void XrootdHttpRemote::Open(const std::string hostname, const int32_t port,
if (it != params.end())
m_RequestTimeout = std::stol(it->second);

// File id for the server's identity check.
it = params.find("FileUUID");
if (it != params.end())
m_FileUUID = static_cast<uint32_t>(std::stoul(it->second));

// Collect non-HTTP engine params (TarInfo, SelectSteps, IgnoreFlattenSteps)
// and encode them as a TAB-separated string for transmission
Params engineParams;
for (const auto &p : params)
{
if (p.first != "UseHttps" && p.first != "CAPath" && p.first != "VerifySSL" &&
p.first != "ConnectTimeout" && p.first != "RequestTimeout")
p.first != "ConnectTimeout" && p.first != "RequestTimeout" && p.first != "FileUUID")
{
engineParams[p.first] = p.second;
}
Expand Down Expand Up @@ -442,6 +448,12 @@ std::string XrootdHttpRemote::BuildFileConfigSegment()
std::ostringstream s;
// Always emit RMOrder so the server doesn't have to guess the default.
s << "r" << (m_RowMajorOrdering ? "1" : "0");
// File id for the identity check (omit when 0). Must precede the greedy `p`.
if (m_FileUUID != 0)
s << "u" << m_FileUUID;
// Client byte order (omit when little-endian). Must precede the greedy `p`.
if (!helper::IsLittleEndian())
s << "e1";
if (!m_EngineParams.empty())
s << "p" << Base64urlEncode(m_EngineParams);
std::string out = s.str();
Expand Down
3 changes: 3 additions & 0 deletions source/adios2/toolkit/remote/XrootdHttpRemote.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ class XrootdHttpRemote : public Remote
std::string m_BaseUrl;
std::string m_Filename;
std::string m_EngineParams;
/** File id from the reader's metadata, sent for the server's staleness check;
* 0 = none (legacy file). */
uint32_t m_FileUUID = 0;
/** Per-engine `<file-config>` path segment, computed once at Open()
* from RMOrder + EngineParams. Reused on every request. */
std::string m_FileConfigSegment;
Expand Down
8 changes: 7 additions & 1 deletion source/utils/xrootd-plugin/AccessLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,13 @@ void AccessLog::Log(const Record &r)
os << ",\"block\":" << r.blockID;
if (r.accuracyError != 0.0)
os << ",\"acc\":" << r.accuracyError;
os << ",\"bytes\":" << r.bytes << ",\"batch\":" << r.batch << "}";
os << ",\"bytes\":" << r.bytes << ",\"batch\":" << r.batch;
if (r.reject)
{
os << ",\"reject\":";
AppendJsonString(os, r.reject);
}
os << "}";

std::string line = os.str();
{
Expand Down
3 changes: 2 additions & 1 deletion source/utils/xrootd-plugin/AccessLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class AccessLog
uint64_t blockID = static_cast<uint64_t>(-1); // -1 => no block selection
double accuracyError = 0.0;
uint64_t bytes = 0;
uint32_t batch = 1; // number of variables in the batch this belonged to
uint32_t batch = 1; // number of variables in the batch this belonged to
const char *reject = nullptr; // non-null => request was rejected; the reason string
};

/* Record one request. No-op when disabled; may drop if the queue is full. */
Expand Down
39 changes: 35 additions & 4 deletions source/utils/xrootd-plugin/XrdHttpSsiHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ std::string UrlEncode(const std::string &s)
// c Count o Start a AccuracyError
// N AccuracyNorm R AccuracyRelative
//
// File-config codes:
// r RMOrder (single digit) p EngineParams (rest of segment)
// File-config codes (in this order):
// v WireVersion (digits) r RMOrder (single digit) u FileUUID (digits)
// e ClientBigEndian (digits) p EngineParams (rest of segment)
//
// The decoder builds a legacy "verb Filename=...&key=val&..." string for
// the inner SSI parser, which is unchanged.
Expand Down Expand Up @@ -192,14 +193,24 @@ bool DecodePerRequestParamString(const std::string &paramstring, std::ostringstr
return true;
}

// Decode the file-config segment. Strict order: optional `r<digit>`,
// then optional `p<rest>`. `_` placeholder is empty.
// Decode the file-config segment: optional v, r, u, e in that order, then p.
// `_` placeholder is empty.
bool DecodeFileConfigSegment(const std::string &fileConfig, std::ostringstream &out)
{
if (fileConfig == "_")
return true;
size_t i = 0;
const size_t n = fileConfig.size();
if (i < n && fileConfig[i] == 'v')
{
++i;
const size_t start = i;
while (i < n && fileConfig[i] >= '0' && fileConfig[i] <= '9')
++i;
if (i == start)
return false; // 'v' with no digits
out << "&WireVersion=" << fileConfig.substr(start, i - start);
}
if (i < n && fileConfig[i] == 'r')
{
++i;
Expand All @@ -210,6 +221,26 @@ bool DecodeFileConfigSegment(const std::string &fileConfig, std::ostringstream &
return false;
out << "&RMOrder=" << d;
}
if (i < n && fileConfig[i] == 'u')
{
++i;
const size_t start = i;
while (i < n && fileConfig[i] >= '0' && fileConfig[i] <= '9')
++i;
if (i == start)
return false; // 'u' with no digits
out << "&FileUUID=" << fileConfig.substr(start, i - start);
}
if (i < n && fileConfig[i] == 'e')
{
++i;
const size_t start = i;
while (i < n && fileConfig[i] >= '0' && fileConfig[i] <= '9')
++i;
if (i == start)
return false; // 'e' with no digits
out << "&ClientBigEndian=" << fileConfig.substr(start, i - start);
}
if (i < n && fileConfig[i] == 'p')
{
++i;
Expand Down
116 changes: 116 additions & 0 deletions source/utils/xrootd-plugin/XrdSsiSvService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,23 @@ void *SvAdiosGet(void *svP)
sessP->doAdiosGet();
return 0;
}

// Highest request wire-format version we accept (clients emit none yet, absent ==
// 0). Policing it gives a future newer client a clear error, not a parse failure.
constexpr uint32_t kMaxWireVersion = 0;

// Record a rejected request in the access log (the normal entry is logged only
// after a successful read, which a rejection never reaches).
void LogReject(const std::string &file, const char *reason, uint32_t batch)
{
if (!AccessLog::Instance().Enabled())
return;
AccessLog::Record rec;
rec.file = file.c_str();
rec.reject = reason;
rec.batch = batch;
AccessLog::Instance().Log(rec);
}
}

/******************************************************************************/
Expand Down Expand Up @@ -499,6 +516,9 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP)
std::string Filename;
std::string EngineParams;
bool ArrayOrder = true;
uint32_t RequestUUID = 0; // file id from client metadata; 0 = no check
uint32_t WireVersion = 0; // request format version; absent/0 = original
bool ClientBigEndian = false; // client native byte order; absent = little
size_t NVars = 0;

struct VarRequest
Expand Down Expand Up @@ -619,23 +639,68 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP)
std::size_t pos = param.find("=") + 1;
EngineParams = UrlDecode(param.substr(pos));
}
else if (HasPrefix(param, "FileUUID="))
{
std::size_t pos = param.find("=") + 1;
std::stringstream sstream(param.substr(pos));
sstream >> RequestUUID;
}
else if (HasPrefix(param, "WireVersion="))
{
std::size_t pos = param.find("=") + 1;
std::stringstream sstream(param.substr(pos));
sstream >> WireVersion;
}
else if (HasPrefix(param, "ClientBigEndian="))
{
std::size_t pos = param.find("=") + 1;
std::stringstream sstream(param.substr(pos));
int be = 0;
sstream >> be;
ClientBigEndian = (be != 0);
}
}
}

NVars = varRequests.size(); // trust actual count over NVars param

if (Filename.empty() || NVars == 0)
{
LogReject(Filename, "badparams", static_cast<uint32_t>(NVars));
RespondErr("batchget: invalid parameters", EINVAL);
return;
}
if (WireVersion > kMaxWireVersion)
{
LogReject(Filename, "version", static_cast<uint32_t>(NVars));
std::string msg = "batchget: unsupported request protocol version " +
std::to_string(WireVersion) + "; this server supports up to " +
std::to_string(kMaxWireVersion);
RespondErr(msg.c_str(), EINVAL);
return;
}
if (ClientBigEndian != !adios2::helper::IsLittleEndian())
{
LogReject(Filename, "byteorder", static_cast<uint32_t>(NVars));
RespondErr("batchget: cross-endian remote reads not supported", EINVAL);
return;
}

try
{
auto poolEntry = m_FilePoolPtr->GetFree(Filename, ArrayOrder, EngineParams);
auto engine = poolEntry.file->m_engine;
auto io = poolEntry.file->m_io;

// Reject if the client's file id doesn't match the file we opened
// (stale cached metadata); 0 = client has no id, no check.
if (RequestUUID != 0 && engine.FileUUID() != RequestUUID)
{
LogReject(Filename, "identity", static_cast<uint32_t>(NVars));
RespondErr("batchget: file identity mismatch; remote metadata is stale", EINVAL);
return;
}

// Compute sizes for each variable and total response size
// Response format: [uint64_t NVars][uint64_t size_0]...[size_N-1][data_0]...[data_N-1]
size_t headerSize = sizeof(uint64_t) + NVars * sizeof(uint64_t);
Expand All @@ -648,6 +713,7 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP)
adios2::DataType TypeOfVar = io.InquireVariableType(vr.VarName);
if (TypeOfVar == adios2::DataType::None)
{
LogReject(Filename, "novar", static_cast<uint32_t>(NVars));
RespondErr("batchget: unknown variable", EINVAL);
return;
}
Expand Down Expand Up @@ -678,6 +744,7 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP)
m_responseBuffer = (char *)malloc(m_responseBufferSize);
if (!m_responseBuffer)
{
LogReject(Filename, "alloc", static_cast<uint32_t>(NVars));
RespondErr("batchget: allocation failed", ENOMEM);
return;
}
Expand Down Expand Up @@ -773,6 +840,7 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP)
}
catch (const std::exception &exc)
{
LogReject(Filename, "exception", static_cast<uint32_t>(NVars));
std::string msg = std::string("batchget: exception during processing: ") + exc.what();
RespondErr(msg.c_str(), EINVAL);
}
Expand All @@ -787,6 +855,9 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP)
size_t BlockID = (size_t)-1, DimCount = 0, StepStart = 0;
size_t StepCount = 1;
bool ArrayOrder = true;
uint32_t RequestUUID = 0; // file id from client metadata; 0 = no check
uint32_t WireVersion = 0; // request format version; absent/0 = original
bool ClientBigEndian = false; // client native byte order; absent = little
// Accuracy parameters
double AccuracyError = 0.0;
double AccuracyNorm = 0.0;
Expand Down Expand Up @@ -885,6 +956,41 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP)
std::size_t pos = param.find("=") + 1;
EngineParams = UrlDecode(param.substr(pos));
}
else if (HasPrefix(param, "FileUUID="))
{
std::size_t pos = param.find("=") + 1;
std::stringstream sstream(param.substr(pos));
sstream >> RequestUUID;
}
else if (HasPrefix(param, "WireVersion="))
{
std::size_t pos = param.find("=") + 1;
std::stringstream sstream(param.substr(pos));
sstream >> WireVersion;
}
else if (HasPrefix(param, "ClientBigEndian="))
{
std::size_t pos = param.find("=") + 1;
std::stringstream sstream(param.substr(pos));
int be = 0;
sstream >> be;
ClientBigEndian = (be != 0);
}
}
if (WireVersion > kMaxWireVersion)
{
LogReject(Filename, "version", 1);
std::string msg = "get: unsupported request protocol version " +
std::to_string(WireVersion) + "; this server supports up to " +
std::to_string(kMaxWireVersion);
RespondErr(msg.c_str(), EINVAL);
return;
}
if (ClientBigEndian != !adios2::helper::IsLittleEndian())
{
LogReject(Filename, "byteorder", 1);
RespondErr("get: cross-endian remote reads not supported", EINVAL);
return;
}
// Get a "anonymous" engine with this file open with this array order.
// (any other differentiating characteristics of an ADIOS Open should be included in these
Expand All @@ -901,10 +1007,19 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP)
auto poolEntry = m_FilePoolPtr->GetFree(Filename, ArrayOrder, EngineParams);
auto engine = poolEntry.file->m_engine;
auto io = poolEntry.file->m_io;

// Metadata-identity check (0 = client has no id, enforces nothing).
if (RequestUUID != 0 && engine.FileUUID() != RequestUUID)
{
LogReject(Filename, "identity", 1);
RespondErr("get: file identity mismatch; remote metadata is stale", EINVAL);
return;
}
adios2::Box<adios2::Dims> varSel(Start, Count);
adios2::DataType TypeOfVar = io.InquireVariableType(VarName);
if (TypeOfVar == adios2::DataType::None)
{
LogReject(Filename, "novar", 1);
RespondErr("get: unknown variable", EINVAL);
return;
}
Expand Down Expand Up @@ -975,6 +1090,7 @@ void XrdSsiSvService::ProcessRequest4Me(XrdSsiRequest *rqstP)
}
catch (const std::exception &exc)
{
LogReject(Filename, "exception", 1);
std::string errMsg = std::string("Exception in Get: ") + exc.what();
RespondErr(errMsg.c_str(), EINVAL);
}
Expand Down
Loading