diff --git a/ethexe/consensus/src/validator/tx_pool.rs b/ethexe/consensus/src/validator/tx_pool.rs index 6859361c000..57aec0f257d 100644 --- a/ethexe/consensus/src/validator/tx_pool.rs +++ b/ethexe/consensus/src/validator/tx_pool.rs @@ -20,13 +20,10 @@ use crate::tx_validation::{TxValidity, TxValidityChecker}; use anyhow::Result; use ethexe_common::{ Announce, HashOf, MAX_TOUCHED_PROGRAMS_PER_ANNOUNCE, SimpleBlockData, - db::{ - AnnounceStorageRO, CodesStorageRO, GlobalsStorageRO, InjectedStorageRW, OnChainStorageRO, - }, + db::{InjectedStorageRO, InjectedStorageRW}, injected::{InjectedTransaction, SignedInjectedTransaction}, }; -use ethexe_db::Database; -use ethexe_runtime_common::state::Storage; +use ethexe_db::{Database, Key as DBKey}; use gprimitives::H256; use parity_scale_codec::Encode; use std::collections::HashSet; @@ -37,23 +34,14 @@ pub const MAX_INJECTED_TRANSACTIONS_SIZE_PER_ANNOUNCE: usize = 127 * 1024; /// [`InjectedTxPool`] is a local pool of injected transactions, which validator can include in announces. #[derive(Clone)] -pub(crate) struct InjectedTxPool { +pub(crate) struct InjectedTxPool { /// HashSet of (reference_block, injected_tx_hash). inner: HashSet<(H256, HashOf)>, - db: DB, + db: Database, } -impl InjectedTxPool -where - DB: InjectedStorageRW - + GlobalsStorageRO - + OnChainStorageRO - + AnnounceStorageRO - + CodesStorageRO - + Storage - + Clone, -{ - pub fn new(db: DB) -> Self { +impl InjectedTxPool { + pub fn new(db: Database) -> Self { Self { inner: HashSet::new(), db, @@ -180,7 +168,14 @@ where } remove_txs.into_iter().for_each(|key| { + // 1. Remove from local set self.inner.remove(&key); + + // 2. Remove from database, because transaction no more need to be store + let tx_key = DBKey::InjectedTransaction(key.1).to_bytes(); + if let Some(removed_tx) = unsafe { self.db.kv().take(&tx_key) } { + tracing::trace!(tx_hash=?key.1, removed_bytes=?removed_tx, "remove tx from database"); + } }); Ok(selected_txs) @@ -194,7 +189,6 @@ mod tests { use super::*; use ethexe_common::{ StateHashWithQueueSize, - db::*, events::{BlockEvent, MirrorEvent, mirror::MessageQueueingRequestedEvent}, mock::*, }; diff --git a/ethexe/db/src/database.rs b/ethexe/db/src/database.rs index a7ca02e5af9..0a9da72af24 100644 --- a/ethexe/db/src/database.rs +++ b/ethexe/db/src/database.rs @@ -56,7 +56,7 @@ use std::{ }; #[repr(u64)] -enum Key { +pub enum Key { // TODO (kuzmindev): use `HashOf` here BlockSmallData(H256) = 0, BlockEvents(H256) = 1, @@ -94,7 +94,7 @@ impl Key { H256::from_low_u64_be(discriminant).into() } - fn to_bytes(&self) -> Vec { + pub fn to_bytes(&self) -> Vec { // Pre-allocate enough space for the largest possible key. let mut bytes = Vec::with_capacity(2 * size_of::() + size_of::()); bytes.extend(self.prefix()); @@ -860,6 +860,10 @@ impl Database { pub fn cas(&self) -> &dyn CASDatabase { self.raw.cas.as_ref() } + + pub fn kv(&self) -> &dyn KVDatabase { + self.raw.kv.as_ref() + } } impl HashStorageRO for Database { diff --git a/ethexe/db/src/lib.rs b/ethexe/db/src/lib.rs index 87d54ce380b..9cb2d20cf44 100644 --- a/ethexe/db/src/lib.rs +++ b/ethexe/db/src/lib.rs @@ -30,7 +30,7 @@ mod rocks; pub mod verifier; pub mod visitor; -pub use database::{Database, RawDatabase}; +pub use database::{Database, Key, RawDatabase}; pub use mem::MemDb; pub use migrations::{ CodeProcessingFuture, GenesisInitializer, InitConfig, LATEST_VERSION as VERSION, initialize_db,