Add S3 blob storage with cashier billing to ic-gateway#193
Add S3 blob storage with cashier billing to ic-gateway#193shilingwang wants to merge 25 commits intomainfrom
Conversation
| ) -> impl Stream<Item = Result<bytes::Bytes, std::io::Error>> + Send + 'static { | ||
| stream::iter(parts) | ||
| .map(move |part| fetch_chunk(state.clone(), owner, part)) | ||
| .buffered(CHUNK_DOWNLOAD_PARALLELISM) |
There was a problem hiding this comment.
I thought about it again and this may be exploited by malicious clients. A malicious client may request a file and just load a single byte from it (e.g. by terminating the connection early) but we'll load up to 8MB every time nevertheless. Also, if the clients are slow (or act slow because the client is intentionally dropping packets / delaying ACKs), memory usage can blow up (e.g. 10000 connections will require 80GB RAM).
It'd be nice to do some benchmarking, e.g. how fast the connection is to AWS - if as fast as the connection between the client and the gateway, buffering may not even be necessary.
| .blob_tree | ||
| .root_hash() | ||
| .ok_or_else(|| StorageError::Forbidden("blob tree has no root hash".into()))? | ||
| .to_string(); |
There was a problem hiding this comment.
Do we really need to stringify it here? Can't we check it directly? I guess because OwnerEgressSignature has it as a string?
| /// Errors from S3 storage operations. | ||
| #[derive(Debug)] | ||
| pub enum StorageError { | ||
| AwsS3(String), |
There was a problem hiding this comment.
Why do we have only AWS specific errors?
| Client::from_conf(s3_config) | ||
| } | ||
|
|
||
| /// Ensure the bucket exists, creating it if necessary. Probe intelligent tiering. |
There was a problem hiding this comment.
Does it really probe tiering?
| HeadBucketError::NotFound(_) => false, | ||
| other => return Err(StorageError::AwsS3(other.to_string())), | ||
| }, | ||
| Err(e) => return Err(StorageError::AwsS3(format!("{}", DisplayErrorContext(e)))), |
There was a problem hiding this comment.
Here and everywhere: just DisplayErrorContext(e).into() might be nicer?
| .body | ||
| .collect() | ||
| .await | ||
| .map(|b| Some(b.to_vec())) |
There was a problem hiding this comment.
Maybe we should change the trait to work with Bytes everywhere? This way we won't have to convert from Bytes that S3 client returns and Vec, and vice-versa. That would save us allocations and CPU cycles.
| request: &GetBudgetRequestV1, | ||
| ) -> Result<GetBudgetResult, Error> { | ||
| let encoded_args = | ||
| candid::encode_args((request,)).context("failed to encode budget_get_v1 args")?; |
There was a problem hiding this comment.
nit: I think we can drop the budget_get_v1 here in the context and everywhere else the same way. It's anyway inferred where the error happens.
| client: Arc<CashierClient>, | ||
| gateway_id: GatewayId, | ||
| pricelist: Pricelist, | ||
| budgets: RwLock<HashMap<Principal, CachedBudget>>, |
There was a problem hiding this comment.
I think it's best to use DashMap here instead. Or even better - Moka cache with a TTL.
| CashierUnavailable(String), | ||
| } | ||
|
|
||
| impl std::fmt::Display for BillingError { |
There was a problem hiding this comment.
Use thiserror instead of manual impl
|
|
||
| impl fmt::Debug for CashierConnector { | ||
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| f.debug_struct("CashierConnector") |
There was a problem hiding this comment.
nit: maybe just use normal write! with some formatting?
| client: Arc<CashierClient>, | ||
| gateway_name: Option<String>, | ||
| ) -> Result<Self, Error> { | ||
| let principal = client.principal()?; |
There was a problem hiding this comment.
add error context here and below?
|
I lost the access so I cannot easily push any change. I'll create a new PR. @blind-oracle @frankdavid |
@r-birkner could you help me change the permission of this Repo? |
| let mut budgets = self.budgets.write().await; | ||
| let cached = budgets | ||
| .get_mut(owner) | ||
| .expect("cache entry exists after refresh"); |
There was a problem hiding this comment.
Avoid expect since it would panic
| .is_none_or(|c| c.fetched_at.elapsed() >= BUDGET_TTL) | ||
| }; | ||
|
|
||
| if needs_refresh { |
There was a problem hiding this comment.
Hmm I'm not sure, but don't we have condition here with flush_usage? Like, we run it periodically and what if we haven't flushed yet the budget (e.g. we had some debits) - here we just overwrite it with a fresh copy from the cashier?
| if divisor == 0 { | ||
| return 0; | ||
| } | ||
| // Promote to `i128` so `quantity (u64) * cost (i64)` cannot overflow |
There was a problem hiding this comment.
Why we don't just error out when we get "absurd inputs"? Like negative cost etc.
|
|
||
| fn int_to_i64(v: &Int) -> i64 { | ||
| // Int is arbitrary precision; clamp to i64 range for local budget math. | ||
| v.0.to_string().parse::<i64>().unwrap_or(i64::MAX) |
There was a problem hiding this comment.
Stringifying BigInt for every conversion operation is a big overhead. Why don't we just operate on BigInts directly w/o conversion to i64? That would probably make life easier (e.g. no need to worry about overflows etc below)
|
|
||
| type S = Arc<StorageState>; | ||
|
|
||
| const BODY_READ_TIMEOUT: Duration = Duration::from_secs(60); |
There was a problem hiding this comment.
Make configurable or use some already present CLI option
| /// download. `buffered(N)` preserves source order, so the response body stays | ||
| /// strictly sequential while we prefetch ahead. Bounds peak per-download | ||
| /// memory at `CHUNK_DOWNLOAD_PARALLELISM * 1 MiB` (~8 MiB). | ||
| const CHUNK_DOWNLOAD_PARALLELISM: usize = 8; |
There was a problem hiding this comment.
Also make configurable
#NODE-1941
Summary
New modules
HTTP endpoints (under /v1/)
Design decisions