Skip to content
This repository was archived by the owner on Mar 3, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions heron/common/src/cpp/config/heron-internals-config-reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ sp_int32 HeronInternalsConfigReader::GetHeronStreammgrMempoolMaxMessageNumber()
return config_[HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER].as<int>();
}

sp_int32 HeronInternalsConfigReader::GetHeronStreammgrHeronTupleSetMessageMaxBytes() {
return config_[HeronInternalsConfigVars::HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES]
.as<int>();
}

sp_int32 HeronInternalsConfigReader::GetHeronStreammgrXormgrRotatingmapNbuckets() {
return config_[HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS].as<int>();
}
Expand Down
3 changes: 3 additions & 0 deletions heron/common/src/cpp/config/heron-internals-config-reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ class HeronInternalsConfigReader : public YamlFileReader {
// The max number of messages in the memory pool for each message type
sp_int32 GetHeronStreammgrMempoolMaxMessageNumber();

// The max byte size of HeronTupleSet message in stream manager
sp_int32 GetHeronStreammgrHeronTupleSetMessageMaxBytes();

// Get the Nbucket value, for efficient acknowledgement
sp_int32 GetHeronStreammgrXormgrRotatingmapNbuckets();

Expand Down
2 changes: 2 additions & 0 deletions heron/common/src/cpp/config/heron-internals-config-vars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CACHE_DRAIN_SIZE_MB =
"heron.streammgr.cache.drain.size.mb";
const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER =
"heron.streammgr.mempool.max.message.number";
const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES =
"heron.streammgr.herontupleset.message.max.bytes";
const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS =
"heron.streammgr.xormgr.rotatingmap.nbuckets";
const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_CLIENT_RECONNECT_MAX_ATTEMPTS =
Expand Down
3 changes: 3 additions & 0 deletions heron/common/src/cpp/config/heron-internals-config-vars.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ class HeronInternalsConfigVars {
// The max number of messages in the memory pool for each message type
static const sp_string HERON_STREAMMGR_MEMPOOL_MAX_MESSAGE_NUMBER;

// The max byte size of HeronTupleSet message in stream manager
static const sp_string HERON_STREAMMGR_HERONTUPLESET_MESSAGE_MAX_BYTES;

// For efficient acknowledgement
static const sp_string HERON_STREAMMGR_XORMGR_ROTATINGMAP_NBUCKETS;

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/aurora/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/examples/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/kubernetes/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The reconnect interval to other stream managers in secs for stream manager client
heron.streammgr.client.reconnect.interval.sec: 1

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/local/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/localzk/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/marathon/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/mesos/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/slurm/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/test/test_heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ heron.streammgr.cache.drain.size.mb: 100
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# For efficient acknowledgement
heron.streammgr.xormgr.rotatingmap.nbuckets: 3

Expand Down
3 changes: 3 additions & 0 deletions heron/config/src/yaml/conf/yarn/heron_internals.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ heron.streammgr.xormgr.rotatingmap.nbuckets: 3
# The max number of messages in the memory pool for each message type
heron.streammgr.mempool.max.message.number: 512

# The max byte size of a HeronTupleSet message allowed in memory pool
heron.streammgr.herontupleset.message.max.bytes: 83886080

# The max reconnect attempts to other stream managers for stream manager client
heron.streammgr.client.reconnect.max.attempts: 300

Expand Down
12 changes: 11 additions & 1 deletion heron/stmgr/src/cpp/manager/stmgr-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ StMgrServer::StMgrServer(EventLoop* eventLoop, const NetworkOptions& _options,

sp_uint64 drain_threshold_bytes =
config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrStatefulBufferSizeMb() * 1_MB;
sp_uint32 max_herontupleset_size_in_bytes =
config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrHeronTupleSetMessageMaxBytes();
stateful_gateway_ = new CheckpointGateway(drain_threshold_bytes, neighbour_calculator_,
metrics_manager_client_,
std::bind(&StMgrServer::DrainTupleSet, this, std::placeholders::_1, std::placeholders::_2),
Expand Down Expand Up @@ -431,7 +433,15 @@ void StMgrServer::HandleTupleSetMessage(Connection* _conn,
->incr_by(_message->control().fails_size());
}
stmgr_->HandleInstanceData(iter->second, instance_info_[iter->second]->local_spout_, _message);
__global_protobuf_pool_release__(_message);
auto message_size = _message->ByteSize();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByteSizeLong?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not find ByteSizeLong API in generated code. Let me try again...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems this is introduced in 3.1.0 and then deprecated in 3.4.0

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, ByteSize calculates size of serialized message, which seems will be slightly larger than actual size in memory.

@objmagic objmagic Aug 28, 2017

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if the message is 60MB but only little part of it is used to hold the incoming message (because incoming message is small), we will not delete it because I believe serialized size is much smaller than 60MB. i'm not sure about

if (message_size >= max_herontupleset_size_in_bytes) {
LOG(WARNING) << "HeronTupleSet message has size " << message_size <<
" bytes, exceeding limit " << max_herontupleset_size_in_bytes << " bytes." <<
" Release to memory allocator rather than memory pool.";
delete _message;
} else {
__global_protobuf_pool_release__(_message);
}
}

void StMgrServer::SendToInstance2(proto::stmgr::TupleStreamMessage* _message) {
Expand Down
1 change: 1 addition & 0 deletions heron/stmgr/src/cpp/manager/stmgr-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ class StMgrServer : public Server {
heron::common::TimeSpentMetric* back_pressure_metric_initiated_;

bool spouts_under_back_pressure_;
sp_uint32 max_herontupleset_size_in_bytes;

// Stateful processing related member variables
NeighbourCalculator* neighbour_calculator_;
Expand Down