diff --git a/src/consensus/aft/raft.h b/src/consensus/aft/raft.h index a3a106ca18a..0d9d4759664 100644 --- a/src/consensus/aft/raft.h +++ b/src/consensus/aft/raft.h @@ -210,6 +210,30 @@ namespace aft std::unique_ptr ledger; std::shared_ptr channels; + enum class StartupRole : std::uint8_t + { + Primary, + Backup, + }; + + // Describes the initial role and state for this node at construction + // time, before any other thread can observe it (so no lock is needed). + struct StartupState + { + StartupRole role; + + // State to apply before becoming primary/backup. When nullopt for + // a primary, the node starts from scratch (genesis). + struct StateInfo + { + Index index = 0; + Term term = 0; + std::vector view_history; + Index recovery_start_index = 0; + }; + std::optional info = std::nullopt; + }; + Aft( const ccf::consensus::Configuration& settings_, std::unique_ptr store_, @@ -219,7 +243,8 @@ namespace aft std::shared_ptr rpc_request_context_, std::shared_ptr commit_callbacks_subsystem_ = nullptr, - bool public_only_ = false) : + bool public_only_ = false, + std::optional startup = std::nullopt) : store(std::move(store_)), timeout_elapsed(0), @@ -247,6 +272,37 @@ namespace aft { commit_callbacks->set_consensus(this); } + + if (startup.has_value()) + { + const auto& s = startup.value(); + if (s.info.has_value()) + { + const auto& si = s.info.value(); + if (s.role == StartupRole::Primary) + { + state->current_view = si.term; + state->last_idx = si.index; + state->commit_idx = si.index; + state->view_history.initialise(si.view_history); + state->view_history.update(si.index, si.term); + } + else + { + state->last_idx = si.index; + state->commit_idx = si.index; + state->view_history.initialise(si.view_history); + ledger->init(si.index, si.recovery_start_index); + become_aware_of_new_term(si.term); + } + } + + if (s.role == StartupRole::Primary) + { + state->current_view += starting_view_change; + become_leader(true); + } + } } ~Aft() override = default; diff --git a/src/enclave/rpc_handler.h b/src/enclave/rpc_handler.h index 2005d3bd1b4..92cd49e60ef 100644 --- a/src/enclave/rpc_handler.h +++ b/src/enclave/rpc_handler.h @@ -14,6 +14,8 @@ namespace ccf::kv { class CommittableTx; + class Consensus; + class TxHistory; } namespace ccf @@ -33,6 +35,8 @@ namespace ccf virtual void tick(std::chrono::milliseconds /*elapsed*/) {} virtual void open() = 0; virtual bool is_open() = 0; + virtual void set_consensus_and_history( + ccf::kv::Consensus* consensus, ccf::kv::TxHistory* history) = 0; // Used by rpcendpoint to process incoming client RPCs virtual void process(std::shared_ptr ctx) = 0; diff --git a/src/node/ledger_secrets.h b/src/node/ledger_secrets.h index c7f0c13f8dd..a2f31df5334 100644 --- a/src/node/ledger_secrets.h +++ b/src/node/ledger_secrets.h @@ -170,10 +170,10 @@ namespace ccf VersionedLedgerSecret get_latest(ccf::kv::ReadOnlyTx& tx) { - std::lock_guard guard(lock); - take_dependency_on_secrets(tx); + std::lock_guard guard(lock); + if (ledger_secrets.empty()) { throw std::logic_error( @@ -186,10 +186,10 @@ namespace ccf std::pair> get_latest_and_penultimate(ccf::kv::ReadOnlyTx& tx) { - std::lock_guard guard(lock); - take_dependency_on_secrets(tx); + std::lock_guard guard(lock); + if (ledger_secrets.empty()) { throw std::logic_error( @@ -209,10 +209,10 @@ namespace ccf ccf::kv::ReadOnlyTx& tx, std::optional up_to = std::nullopt) { - std::lock_guard guard(lock); - take_dependency_on_secrets(tx); + std::lock_guard guard(lock); + if (!up_to.has_value()) { return ledger_secrets; diff --git a/src/node/node_client.h b/src/node/node_client.h index cf74b8df37a..31bfa243b0c 100644 --- a/src/node/node_client.h +++ b/src/node/node_client.h @@ -14,19 +14,19 @@ namespace ccf protected: std::shared_ptr rpc_map; ccf::crypto::ECKeyPairPtr node_sign_kp; - const ccf::crypto::Pem& self_signed_node_cert; - const std::optional& endorsed_node_cert = std::nullopt; + ccf::crypto::Pem self_signed_node_cert; + std::optional endorsed_node_cert; public: NodeClient( std::shared_ptr rpc_map_, ccf::crypto::ECKeyPairPtr node_sign_kp_, - const ccf::crypto::Pem& self_signed_node_cert_, - const std::optional& endorsed_node_cert_) : + ccf::crypto::Pem self_signed_node_cert_, + std::optional endorsed_node_cert_) : rpc_map(std::move(rpc_map_)), node_sign_kp(std::move(node_sign_kp_)), - self_signed_node_cert(self_signed_node_cert_), - endorsed_node_cert(endorsed_node_cert_) + self_signed_node_cert(std::move(self_signed_node_cert_)), + endorsed_node_cert(std::move(endorsed_node_cert_)) {} virtual ~NodeClient() = default; diff --git a/src/node/node_state.h b/src/node/node_state.h index fcaf689dbd9..da952eeb4d6 100644 --- a/src/node/node_state.h +++ b/src/node/node_state.h @@ -409,6 +409,11 @@ namespace ccf NodeId self; std::shared_ptr node_encrypt_kp; ccf::crypto::Pem self_signed_node_cert; + + // Protects endorsed_node_cert and self_signed_node_cert. This lock is + // used instead of the main NodeState lock in map/global hooks to avoid + // lock-order-inversion with KV maps_lock and consensus state->lock. + pal::Mutex endorsed_cert_lock; std::optional endorsed_node_cert = std::nullopt; QuoteInfo quote_info; pal::PlatformAttestationMeasurement node_measurement; @@ -952,12 +957,23 @@ namespace ccf subject_alt_names = get_subject_alternative_names(); js::register_class_ids(); - self_signed_node_cert = create_self_signed_cert( - node_sign_kp, - config.node_certificate.subject_name, - subject_alt_names, - config.startup_host_time, - config.node_certificate.initial_validity_days); + + // Hold endorsed_cert_lock only while mutating self_signed_node_cert. + // Must be released before initiate_quote_generation(), which may call + // Store::deserialise_snapshot and acquire maps_lock — holding both + // would invert the lock order with KV hooks that acquire + // endorsed_cert_lock under maps_lock. + ccf::crypto::Pem self_signed_cert_snapshot; + { + std::lock_guard cert_guard(endorsed_cert_lock); + self_signed_node_cert = create_self_signed_cert( + node_sign_kp, + config.node_certificate.subject_name, + subject_alt_names, + config.startup_host_time, + config.node_certificate.initial_validity_days); + self_signed_cert_snapshot = self_signed_node_cert; + } accept_node_tls_connections(); open_frontend(ActorsType::nodes); @@ -966,6 +982,7 @@ namespace ccf // recovered setup_history(); setup_snapshotter(); + setup_basic_hooks(); setup_encryptor(); initiate_quote_generation(); @@ -985,18 +1002,21 @@ namespace ccf history->set_service_signing_identity( network.identity->get_key_pair(), config.cose_signatures); - setup_consensus(false, endorsed_node_cert); - - // Become the primary and force replication - consensus->force_become_primary(); + // No endorsed certificate exists yet on the Start path — it is + // created later by the boot request and applied via the + // node_endorsed_certificates hook. + setup_consensus( + false, + std::nullopt, + RaftType::StartupState{RaftType::StartupRole::Primary}); LOG_INFO_FMT("Created new node {}", self); - return {self_signed_node_cert, network.identity->cert}; + return {self_signed_cert_snapshot, network.identity->cert}; } case StartType::Join: { LOG_INFO_FMT("Created join node {}", self); - return {self_signed_node_cert, {}}; + return {self_signed_cert_snapshot, {}}; } case StartType::Recover: { @@ -1017,7 +1037,7 @@ namespace ccf config.initial_service_certificate_validity_days); LOG_INFO_FMT("Created recovery node {}", self); - return {self_signed_node_cert, network.identity->cert}; + return {self_signed_cert_snapshot, network.identity->cert}; } default: { @@ -1043,7 +1063,7 @@ namespace ccf auto join_client_cert = std::make_unique<::tls::Cert>( network_ca, - self_signed_node_cert, + get_self_signed_certificate_unsafe(), node_sign_kp->private_key_pem(), target_host); @@ -1205,9 +1225,6 @@ namespace ccf } n2n_channels_cert = resp.network_info->endorsed_certificate.value(); - setup_consensus(resp.network_info->public_only, n2n_channels_cert); - auto_refresh_jwt_keys(); - if (resp.network_info->public_only) { last_recovered_signed_idx = @@ -1218,23 +1235,18 @@ namespace ccf View view = VIEW_UNKNOWN; std::vector view_history_ = {}; + ccf::kv::ConsensusHookPtrs snapshot_hooks; if (startup_snapshot_info) { // It is only possible to deserialise the entire snapshot now, // once the ledger secrets have been passed in by the network - ccf::kv::ConsensusHookPtrs hooks; deserialise_snapshot( network.tables, startup_snapshot_info->raw, - hooks, + snapshot_hooks, &view_history_, resp.network_info->public_only); - for (auto& hook : hooks) - { - hook->call(consensus.get()); - } - auto tx = network.tables->create_read_only_tx(); view = resolve_latest_sig_view(tx); @@ -1253,11 +1265,28 @@ namespace ccf view); } - consensus->init_as_backup( - network.tables->current_version(), - view, - view_history_, - last_recovered_signed_idx); + // Create consensus with backup init info baked in, so + // init_as_backup runs in the constructor before any other + // thread can see the consensus object. + setup_consensus( + resp.network_info->public_only, + n2n_channels_cert, + RaftType::StartupState{ + RaftType::StartupRole::Backup, + RaftType::StartupState::StateInfo{ + network.tables->current_version(), + view, + view_history_, + last_recovered_signed_idx}}); + + // Now that consensus exists, execute any hooks from the + // snapshot (e.g. ConfigurationChangeHook) + for (auto& hook : snapshot_hooks) + { + hook->call(consensus.get()); + } + + auto_refresh_jwt_keys(); { auto snap_tx = network.tables->create_read_only_tx(); @@ -1394,7 +1423,7 @@ namespace ccf rpcsessions, rpc_map, node_sign_kp, - self_signed_node_cert); + get_self_signed_certificate_unsafe()); jwt_key_auto_refresh->start(); network.tables->set_map_hook( @@ -1469,10 +1498,15 @@ namespace ccf } ++last_recovered_idx; - // Not synchronised because consensus isn't effectively running then - for (auto& hook : r->get_hooks()) + // Consensus may not exist yet, in which case there's nothing for + // these hooks to do + if (consensus != nullptr) { - hook->call(consensus.get()); + // Not synchronised because consensus isn't effectively running then + for (auto& hook : r->get_hooks()) + { + hook->call(consensus.get()); + } } } catch (const std::exception& e) @@ -1700,13 +1734,17 @@ namespace ccf } } - setup_consensus(true); + setup_consensus( + true, + std::nullopt, + RaftType::StartupState{ + RaftType::StartupRole::Primary, + RaftType::StartupState::StateInfo{index, view, view_history}}); + auto_refresh_jwt_keys(); LOG_DEBUG_FMT("Restarting consensus at view: {} seqno: {}", view, index); - consensus->force_become_primary(index, view, view_history, index); - create_and_send_boot_request( new_term, false /* Restore consortium from ledger */); } @@ -2018,7 +2056,14 @@ namespace ccf ccf::kv::Tx& tx, AbstractGovernanceEffects::ServiceIdentities identities) override { - std::lock_guard guard(lock); + // NB: NodeState::lock is deliberately not held here. All member + // accesses in this function are either via the passed-in tx (which has + // its own KV-level protection), read-only on effectively-immutable + // fields (config, network.identity, sm), or on fields with their own + // locks (share_manager, LedgerSecrets). Holding NodeState::lock would + // create a lock-order-inversion with KV maps_lock, since this function + // is called from governance endpoints that may hold maps_lock (via + // apply_changes) or be called under it indirectly. auto* service = tx.rw(Tables::SERVICE); auto service_info = service->get(); @@ -2404,7 +2449,12 @@ namespace ccf ccf::crypto::Pem get_self_signed_certificate() override { - std::lock_guard guard(lock); + std::lock_guard guard(endorsed_cert_lock); + return self_signed_node_cert; + } + + ccf::crypto::Pem get_self_signed_certificate_unsafe() + { return self_signed_node_cert; } @@ -2507,9 +2557,14 @@ namespace ccf find_frontend(actor)->open(); } - void open_user_frontend() + void open_frontend_async(ActorsType actor) { - open_frontend(ActorsType::users); + // Schedule frontend opening on a task to avoid calling open() (which + // may take locks to set up frontend-specific systems) while KV or + // consensus locks are held — e.g. from global hooks during + // post_compact(). + ccf::tasks::add_task( + ccf::tasks::make_basic_task([this, actor]() { open_frontend(actor); })); } bool is_member_frontend_open_unsafe() @@ -2625,8 +2680,9 @@ namespace ccf bool send_create_request(const std::vector& packed) { + auto self_signed_cert = get_self_signed_certificate(); auto node_session = std::make_shared( - InvalidSessionId, self_signed_node_cert.raw()); + InvalidSessionId, self_signed_cert.raw()); auto ctx = make_rpc_context(node_session, packed); std::shared_ptr search = @@ -2812,8 +2868,11 @@ namespace ccf retired_committed_nodes.push_back(node_id); } } - consensus->set_retired_committed( - hook_version, retired_committed_nodes); + if (consensus != nullptr) + { + consensus->set_retired_committed( + hook_version, retired_committed_nodes); + } })); // Service-endorsed certificate is passed to history as early as _local_ @@ -2852,7 +2911,7 @@ namespace ccf "Could not find endorsed node certificate for {}", self)); } - std::lock_guard guard(lock); + std::lock_guard guard(endorsed_cert_lock); if (endorsed_node_cert.has_value()) { @@ -2902,7 +2961,7 @@ namespace ccf "Could not find endorsed node certificate for {}", self)); } - std::lock_guard guard(lock); + std::lock_guard guard(endorsed_cert_lock); LOG_INFO_FMT("[global] Accepting network connections"); accept_network_tls_connections(); @@ -2943,7 +3002,7 @@ namespace ccf } LOG_INFO_FMT("[global] Opening members frontend"); - open_frontend(ActorsType::members); + open_frontend_async(ActorsType::members); } })); @@ -2964,7 +3023,7 @@ namespace ccf ->public_key_pem(); if (hook_pubk_pem != current_pubk_pem) { - LOG_TRACE_FMT( + LOG_INFO_FMT( "Ignoring historical service open at seqno {} for {}", hook_version, w->cert.str()); @@ -2981,12 +3040,106 @@ namespace ccf network.identity->set_certificate(w->cert); if (w->status == ServiceStatus::OPEN) { - open_user_frontend(); + open_frontend_async(ActorsType::users); RINGBUFFER_WRITE_MESSAGE(::consensus::ledger_open, to_host); LOG_INFO_FMT("Service open at seqno {}", hook_version); } })); + + // When a node is added, even locally, inform consensus so that it + // can add a new active configuration. + network.tables->set_map_hook( + network.nodes.get_name(), + Nodes::wrap_map_hook( + [](ccf::kv::Version version, const Nodes::Write& w) + -> ccf::kv::ConsensusHookPtr { + return std::make_unique(version, w); + })); + + // Note: The Signatures hook and SerialisedMerkleTree hook are separate + // because the signature and the Merkle tree are recorded in distinct + // tables (for serialisation performance reasons). However here, they are + // expected to always be called together and for the same version as they + // are always written by each signature transaction. + + network.tables->set_map_hook( + network.cose_signatures.get_name(), + CoseSignatures::wrap_map_hook( + [s = this->snapshotter]( + ccf::kv::Version version, + const CoseSignatures::Write& w) -> ccf::kv::ConsensusHookPtr { + assert(w.has_value()); + s->record_cose_signature(version, w.value()); + return {nullptr}; + })); + + network.tables->set_map_hook( + network.serialise_tree.get_name(), + SerialisedMerkleTree::wrap_map_hook( + [s = this->snapshotter]( + ccf::kv::Version version, + const SerialisedMerkleTree::Write& w) -> ccf::kv::ConsensusHookPtr { + assert(w.has_value()); + const auto& tree = w.value(); + s->record_serialised_tree(version, tree); + return {nullptr}; + })); + + network.tables->set_map_hook( + network.snapshot_evidence.get_name(), + SnapshotEvidence::wrap_map_hook( + [s = this->snapshotter]( + ccf::kv::Version version, + const SnapshotEvidence::Write& w) -> ccf::kv::ConsensusHookPtr { + assert(w.has_value()); + auto snapshot_evidence = w.value(); + s->record_snapshot_evidence_idx(version, snapshot_evidence); + return {nullptr}; + })); + + network.tables->set_global_hook( + network.snapshot_evidence.get_name(), + SnapshotEvidence::wrap_commit_hook( + [this]( + [[maybe_unused]] ccf::kv::Version version, + const SnapshotEvidence::Write& w) { + if (!w.has_value()) + { + return; + } + + auto snapshot_evidence = w.value(); + + // If backup snapshot fetching is enabled and this node is a + // backup, schedule a fetch task + if ( + config.snapshots.backup_fetch.enabled && consensus != nullptr && + !consensus->is_primary()) + { + std::lock_guard guard(lock); + if ( + backup_snapshot_fetch_task != nullptr && + !backup_snapshot_fetch_task->is_cancelled()) + { + LOG_DEBUG_FMT( + "Backup snapshot fetch already in progress, skipping"); + } + else + { + LOG_INFO_FMT( + "Snapshot evidence detected on backup - scheduling " + "snapshot fetch from primary (since seqno: {})", + snapshot_evidence.version); + backup_snapshot_fetch_task = + std::make_shared( + config.snapshots, + snapshot_evidence.version - 1 /* YIKES */, + this); + ccf::tasks::add_task(backup_snapshot_fetch_task); + } + } + })); } ccf::kv::Version get_last_recovered_signed_idx() override @@ -3093,15 +3246,24 @@ namespace ccf void setup_consensus( bool public_only = false, const std::optional& endorsed_node_certificate_ = - std::nullopt) + std::nullopt, + std::optional startup = std::nullopt) { setup_n2n_channels(endorsed_node_certificate_); setup_cmd_forwarder(); auto shared_state = std::make_shared(self); + // endorsed_node_certificate_ is the endorsed cert available at this + // point (from the join response, or nullopt on Start/Recover where it + // arrives later via the node_endorsed_certificates hook). + // self_signed_node_cert is stable here: all callers are either during + // single-threaded startup or holding NodeState::lock. auto node_client = std::make_shared( - rpc_map, node_sign_kp, self_signed_node_cert, endorsed_node_cert); + rpc_map, + node_sign_kp, + self_signed_node_cert, + endorsed_node_certificate_); consensus = std::make_shared( consensus_config, @@ -3111,102 +3273,16 @@ namespace ccf shared_state, node_client, commit_callbacks, - public_only); + public_only, + startup); network.tables->set_consensus(consensus); network.tables->set_snapshotter(snapshotter); - // When a node is added, even locally, inform consensus so that it - // can add a new active configuration. - network.tables->set_map_hook( - network.nodes.get_name(), - Nodes::wrap_map_hook( - [](ccf::kv::Version version, const Nodes::Write& w) - -> ccf::kv::ConsensusHookPtr { - return std::make_unique(version, w); - })); - - // Note: The Signatures hook and SerialisedMerkleTree hook are separate - // because the signature and the Merkle tree are recorded in distinct - // tables (for serialisation performance reasons). However here, they are - // expected to always be called together and for the same version as they - // are always written by each signature transaction. - - network.tables->set_map_hook( - network.cose_signatures.get_name(), - CoseSignatures::wrap_map_hook( - [s = this->snapshotter]( - ccf::kv::Version version, - const CoseSignatures::Write& w) -> ccf::kv::ConsensusHookPtr { - assert(w.has_value()); - s->record_cose_signature(version, w.value()); - return {nullptr}; - })); - - network.tables->set_map_hook( - network.serialise_tree.get_name(), - SerialisedMerkleTree::wrap_map_hook( - [s = this->snapshotter]( - ccf::kv::Version version, - const SerialisedMerkleTree::Write& w) -> ccf::kv::ConsensusHookPtr { - assert(w.has_value()); - const auto& tree = w.value(); - s->record_serialised_tree(version, tree); - return {nullptr}; - })); - - network.tables->set_map_hook( - network.snapshot_evidence.get_name(), - SnapshotEvidence::wrap_map_hook( - [s = this->snapshotter]( - ccf::kv::Version version, - const SnapshotEvidence::Write& w) -> ccf::kv::ConsensusHookPtr { - assert(w.has_value()); - auto snapshot_evidence = w.value(); - s->record_snapshot_evidence_idx(version, snapshot_evidence); - return {nullptr}; - })); - - network.tables->set_global_hook( - network.snapshot_evidence.get_name(), - SnapshotEvidence::wrap_commit_hook( - [this]( - [[maybe_unused]] ccf::kv::Version version, - const SnapshotEvidence::Write& w) { - if (!w.has_value()) - { - return; - } - - auto snapshot_evidence = w.value(); - - // If backup snapshot fetching is enabled and this node is a - // backup, schedule a fetch task - if ( - config.snapshots.backup_fetch.enabled && consensus != nullptr && - !consensus->is_primary()) - { - std::lock_guard guard(lock); - if ( - backup_snapshot_fetch_task != nullptr && - !backup_snapshot_fetch_task->is_cancelled()) - { - LOG_DEBUG_FMT( - "Backup snapshot fetch already in progress, skipping"); - } - else - { - LOG_INFO_FMT( - "Snapshot evidence detected on backup - scheduling " - "snapshot fetch from primary (since seqno: {})", - snapshot_evidence.version); - backup_snapshot_fetch_task = - std::make_shared( - config.snapshots, snapshot_evidence.version, this); - ccf::tasks::add_task(backup_snapshot_fetch_task); - } - } - })); + for (auto& [actor, fe] : rpc_map->frontends()) + { + fe->set_consensus_and_history(consensus.get(), history.get()); + } // Keep the globally committed snapshot baseline in sync between primary // and backups for bounded snapshotting. @@ -3223,8 +3299,6 @@ namespace ccf { signature_cache->register_hooks(*network.tables); } - - setup_basic_hooks(); } void setup_snapshotter() @@ -3306,6 +3380,7 @@ namespace ccf std::optional client_cert_key = std::nullopt; if (authenticate_as_node_client_certificate) { + std::lock_guard cert_guard(endorsed_cert_lock); client_cert = endorsed_node_cert ? *endorsed_node_cert : self_signed_node_cert; client_cert_key = node_sign_kp->private_key_pem(); diff --git a/src/node/rpc/frontend.h b/src/node/rpc/frontend.h index 96891f09e0a..b67e4b75953 100644 --- a/src/node/rpc/frontend.h +++ b/src/node/rpc/frontend.h @@ -25,6 +25,7 @@ #define FMT_HEADER_ONLY +#include #include #include #include @@ -42,9 +43,9 @@ namespace ccf ccf::pal::Mutex open_lock; bool is_open_ = false; - ccf::kv::Consensus* consensus{nullptr}; + std::atomic consensus{nullptr}; std::shared_ptr cmd_forwarder; - ccf::kv::TxHistory* history{nullptr}; + std::atomic history{nullptr}; size_t sig_tx_interval = 5000; std::chrono::milliseconds sig_ms_interval = std::chrono::milliseconds(1000); @@ -53,23 +54,6 @@ namespace ccf std::shared_ptr node_configuration_subsystem = nullptr; - void update_consensus() - { - auto* c = tables.get_consensus().get(); - - if (consensus != c) - { - consensus = c; - endpoints.set_consensus(consensus); - } - } - - void update_history() - { - history = tables.get_history().get(); - endpoints.set_history(history); - } - endpoints::EndpointDefinitionPtr find_endpoint( std::shared_ptr ctx, ccf::kv::CommittableTx& tx) { @@ -125,7 +109,7 @@ namespace ccf const endpoints::EndpointDefinitionPtr& endpoint) { auto interface_id = ctx->get_session_context()->interface_id; - if ((consensus != nullptr) && interface_id) + if ((consensus.load() != nullptr) && interface_id) { if (!node_configuration_subsystem) { @@ -249,7 +233,7 @@ namespace ccf target_node_its; const auto nodes = InternalTablesAccess::get_trusted_nodes(tx); { - const auto primary_id = consensus->primary(); + const auto primary_id = consensus.load()->primary(); if (seeking_primary && primary_id.has_value()) { target_node_its.push_back(nodes.find(primary_id.value())); @@ -317,7 +301,7 @@ namespace ccf case (ccf::endpoints::RedirectionStrategy::ToPrimary): { const bool is_primary = - (consensus != nullptr) && consensus->can_replicate(); + (consensus.load() != nullptr) && consensus.load()->can_replicate(); if (!is_primary) { @@ -352,7 +336,7 @@ namespace ccf case (ccf::endpoints::RedirectionStrategy::ToBackup): { const bool is_backup = - (consensus != nullptr) && !consensus->can_replicate(); + (consensus.load() != nullptr) && !consensus.load()->can_replicate(); if (!is_backup) { @@ -422,9 +406,10 @@ namespace ccf bool check_session_consistency(std::shared_ptr ctx) { - if (consensus != nullptr) + auto* c = consensus.load(); + if (c != nullptr) { - auto current_view = consensus->get_view(); + auto current_view = c->get_view(); auto session_ctx = ctx->get_session_context(); if (!session_ctx->active_view.has_value()) { @@ -545,7 +530,7 @@ namespace ccf return; } - if (!cmd_forwarder || (consensus == nullptr)) + if (!cmd_forwarder || (consensus.load() == nullptr)) { ctx->set_error( HTTP_STATUS_INTERNAL_SERVER_ERROR, @@ -574,7 +559,7 @@ namespace ccf return; } - auto primary_id = consensus->primary(); + auto primary_id = consensus.load()->primary(); if (!primary_id.has_value()) { ctx->set_error( @@ -654,11 +639,12 @@ namespace ccf constexpr auto max_attempts = 30; while (attempts < max_attempts) { - if (consensus != nullptr) + auto* c = consensus.load(); + if (c != nullptr) { if ( endpoints.apply_uncommitted_tx_backpressure() && - consensus->is_at_max_capacity()) + c->is_at_max_capacity()) { ctx->set_error( HTTP_STATUS_SERVICE_UNAVAILABLE, @@ -688,8 +674,6 @@ namespace ccf } ++attempts; - update_history(); - endpoint = find_endpoint(ctx, *tx_p); if (endpoint == nullptr) { @@ -726,9 +710,8 @@ namespace ccf } else { - bool is_primary = - (consensus == nullptr) || consensus->can_replicate(); - const bool forwardable = (consensus != nullptr); + bool is_primary = (c == nullptr) || c->can_replicate(); + const bool forwardable = (c != nullptr); if (!is_primary && forwardable) { @@ -841,7 +824,7 @@ namespace ccf case ccf::kv::CommitResult::SUCCESS: { auto tx_id_opt = tx.get_txid(); - if (tx_id_opt.has_value() && consensus != nullptr) + if (tx_id_opt.has_value() && consensus.load() != nullptr) { ccf::TxID tx_id = tx_id_opt.value(); @@ -891,11 +874,12 @@ namespace ccf } } - if ( - consensus != nullptr && consensus->can_replicate() && - history != nullptr) { - history->try_emit_signature(); + auto* h = history.load(); + if (c != nullptr && c->can_replicate() && h != nullptr) + { + h->try_emit_signature(); + } } return; @@ -1018,6 +1002,16 @@ namespace ccf } } + void set_consensus_and_history( + ccf::kv::Consensus* consensus_, ccf::kv::TxHistory* history_) override + { + consensus.store(consensus_); + endpoints.set_consensus(consensus_); + + history.store(history_); + endpoints.set_history(history_); + } + bool is_open() override { std::lock_guard mguard(open_lock); @@ -1029,15 +1023,15 @@ namespace ccf { if (endpoints.request_needs_root(ctx)) { - update_history(); - if (history != nullptr) + auto* h = history.load(); + if (h != nullptr) { // Warning: Retrieving the current TxID and root from the history // should only ever be used for the proposal creation endpoint and // nothing else. Many bad things could happen otherwise (e.g. breaking // session consistency). const auto& [txid, root, term_of_next_version] = - history->get_replicated_state_txid_and_root(); + h->get_replicated_state_txid_and_root(); tx.set_read_txid(txid, term_of_next_version); tx.set_root_at_read_version(root); } @@ -1054,8 +1048,6 @@ namespace ccf */ void process(std::shared_ptr ctx) override { - update_consensus(); - // NB: If we want to re-execute on backups, the original command could // be propagated from here process_command(ctx); @@ -1073,7 +1065,6 @@ namespace ccf "Processing forwarded command with unitialised forwarded context"); } - update_consensus(); process_command(ctx); if (ctx->response_is_pending) { @@ -1085,8 +1076,6 @@ namespace ccf void tick(std::chrono::milliseconds elapsed) override { - update_consensus(); - endpoints.tick(elapsed); } }; diff --git a/src/node/rpc/test/frontend_test.cpp b/src/node/rpc/test/frontend_test.cpp index 82f0ce9a328..1c878b724b8 100644 --- a/src/node/rpc/test/frontend_test.cpp +++ b/src/node/rpc/test/frontend_test.cpp @@ -1055,6 +1055,8 @@ TEST_CASE("Forwarding" * doctest::test_suite("forwarding")) auto primary_consensus = std::make_shared(); network_primary.tables->set_consensus(primary_consensus); + user_frontend_primary.set_consensus_and_history( + primary_consensus.get(), nullptr); auto channel_stub = std::make_shared(); auto rpc_responder = std::weak_ptr(); @@ -1064,6 +1066,8 @@ TEST_CASE("Forwarding" * doctest::test_suite("forwarding")) auto backup_consensus = std::make_shared(); network_backup.tables->set_consensus(backup_consensus); + user_frontend_backup.set_consensus_and_history( + backup_consensus.get(), nullptr); auto simple_call = create_simple_request(); auto serialized_call = simple_call.build_request(); @@ -1089,6 +1093,8 @@ TEST_CASE("Forwarding" * doctest::test_suite("forwarding")) { INFO("Read command is not forwarded to primary"); TestUserFrontend user_frontend_backup_read(*network_backup.tables); + user_frontend_backup_read.set_consensus_and_history( + backup_consensus.get(), nullptr); REQUIRE(channel_stub->is_empty()); user_frontend_backup_read.process(backup_ctx); @@ -1161,6 +1167,8 @@ TEST_CASE("Forwarding" * doctest::test_suite("forwarding")) INFO("Read command is now forwarded to primary on this session"); TestUserFrontend user_frontend_backup_read(*network_backup.tables); + user_frontend_backup_read.set_consensus_and_history( + backup_consensus.get(), nullptr); user_frontend_backup_read.set_cmd_forwarder(backup_forwarder); REQUIRE(channel_stub->is_empty()); @@ -1203,6 +1211,8 @@ TEST_CASE("Nodefrontend forwarding" * doctest::test_suite("forwarding")) auto primary_consensus = std::make_shared(); network_primary.tables->set_consensus(primary_consensus); + node_frontend_primary.set_consensus_and_history( + primary_consensus.get(), nullptr); auto rpc_responder = std::weak_ptr(); auto rpc_map = std::weak_ptr(); @@ -1212,6 +1222,8 @@ TEST_CASE("Nodefrontend forwarding" * doctest::test_suite("forwarding")) auto backup_consensus = std::make_shared(); network_backup.tables->set_consensus(backup_consensus); + node_frontend_backup.set_consensus_and_history( + backup_consensus.get(), nullptr); auto write_req = create_simple_request(); auto serialized_call = write_req.build_request(); @@ -1254,6 +1266,8 @@ TEST_CASE("Userfrontend forwarding" * doctest::test_suite("forwarding")) auto primary_consensus = std::make_shared(); network_primary.tables->set_consensus(primary_consensus); + user_frontend_primary.set_consensus_and_history( + primary_consensus.get(), nullptr); auto rpc_responder = std::weak_ptr(); auto rpc_map = std::weak_ptr(); @@ -1263,6 +1277,8 @@ TEST_CASE("Userfrontend forwarding" * doctest::test_suite("forwarding")) auto backup_consensus = std::make_shared(); network_backup.tables->set_consensus(backup_consensus); + user_frontend_backup.set_consensus_and_history( + backup_consensus.get(), nullptr); auto write_req = create_simple_request(); auto serialized_call = write_req.build_request(); @@ -1305,6 +1321,8 @@ TEST_CASE("Memberfrontend forwarding" * doctest::test_suite("forwarding")) auto primary_consensus = std::make_shared(); network_primary.tables->set_consensus(primary_consensus); + member_frontend_primary.set_consensus_and_history( + primary_consensus.get(), nullptr); auto rpc_responder = std::weak_ptr(); auto rpc_map = std::weak_ptr(); @@ -1314,6 +1332,8 @@ TEST_CASE("Memberfrontend forwarding" * doctest::test_suite("forwarding")) auto backup_consensus = std::make_shared(); network_backup.tables->set_consensus(backup_consensus); + member_frontend_backup.set_consensus_and_history( + backup_consensus.get(), nullptr); auto write_req = create_simple_request(); auto serialized_call = write_req.build_request(); diff --git a/tsan_env_suppressions b/tsan_env_suppressions index 3ee6d5ecdee..c2535499f0c 100644 --- a/tsan_env_suppressions +++ b/tsan_env_suppressions @@ -4,13 +4,6 @@ # Awkward usages of '*' in this file like '/ds/*ring_buffer.h' are necessary to handle the cases where tsan thinks # src/ds/ring_buffer.h as src/ds/test/../ring_buffer.h for example -# For partitions_test -deadlock:*/store.h -deadlock:*/untyped_map.h - -# For governance_test -race:*/node/*rpc/*frontend.h - # Race between closedir and epoll_ctl. race:closedir race:epoll_ctl