Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions python/arcticdb/version_store/library.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@

import pandas as pd
import numpy as np
import logging
from arcticdb.log import version as log
from arcticdb.version_store._normalization import normalize_metadata
from arcticdb.version_store.admin_tools import AdminTools
import arcticdb_ext as _ae

logger = logging.getLogger(__name__)


AsOf = Union[int, str, datetime.datetime, _PreloadedIndexQuery]
Expand Down Expand Up @@ -989,6 +988,7 @@ def stage(
or ``sort_and_finalize_staged_data`` to specify which data to finalize.

"""
log.debug("Staging data for symbol={}", symbol)

if not self._allowed_input_type(data):
raise ArcticUnsupportedDataTypeException(
Expand Down Expand Up @@ -1112,6 +1112,7 @@ def write(
>>> w = adb.WritePayload("symbol", df, metadata={'the': 'metadata'})
>>> lib.write(*w, staged=True)
"""
log.debug("Writing symbol={}, staged={}, prune_previous_versions={}", symbol, staged, prune_previous_versions)
is_recursive_normalizers_enabled = self._nvs._is_recursive_normalizers_enabled(
**{"recursive_normalizers": recursive_normalizers}
)
Expand Down Expand Up @@ -1201,6 +1202,7 @@ def write_pickle(
--------
write: For more detailed documentation.
"""
log.debug("Writing pickled data for symbol={}, staged={}", symbol, staged)
return self._nvs.write(
symbol=symbol,
data=data,
Expand Down Expand Up @@ -1295,6 +1297,7 @@ def write_batch(
>>> items[0].symbol, items[1].symbol
('symbol_1', 'symbol_2')
"""
log.debug("Writing batch of {} symbols", len(payloads))
self._raise_if_duplicate_symbols_in_batch(payloads)
self._raise_if_unsupported_type_in_write_batch(payloads)

Expand Down Expand Up @@ -1343,6 +1346,7 @@ def write_pickle_batch(
write: For more detailed documentation.
write_pickle: For information on the implications of providing data that needs to be pickled.
"""
log.debug("Writing pickled batch of {} symbols", len(payloads))
self._raise_if_duplicate_symbols_in_batch(payloads)

return self._nvs._batch_write_internal(
Expand Down Expand Up @@ -1439,6 +1443,7 @@ def append(
2018-01-05 5
2018-01-06 6
"""
log.debug("Appending data to symbol={}", symbol)

if not self._allowed_input_type(data):
raise ArcticUnsupportedDataTypeException(
Expand Down Expand Up @@ -1490,6 +1495,7 @@ def append_batch(
ArcticUnsupportedDataTypeException
If data that is not of NormalizableType appears in any of the payloads.
"""
log.debug("Appending batch of {} symbols", len(append_payloads))

self._raise_if_duplicate_symbols_in_batch(append_payloads)
self._raise_if_unsupported_type_in_write_batch(append_payloads)
Expand Down Expand Up @@ -1612,6 +1618,7 @@ def update(
2024-01-11 00:00:00.000000 2024-02-01 00:00:00.000000001 1 b'test' 1738599073268493107 5975110026983744452 84 2 1 2 200009 200031

"""
log.debug("Updating symbol={}, upsert={}, date_range={}", symbol, upsert, date_range)

if not self._allowed_input_type(data):
raise ArcticUnsupportedDataTypeException(
Expand Down Expand Up @@ -1693,6 +1700,7 @@ def update_batch(
2024-01-01 10
2024-01-02 11
"""
log.debug("Updating batch of {} symbols, upsert={}", len(update_payloads), upsert)

self._raise_if_duplicate_symbols_in_batch(update_payloads)
self._raise_if_unsupported_type_in_write_batch(update_payloads)
Expand Down Expand Up @@ -1722,6 +1730,7 @@ def delete_staged_data(self, symbol: str) -> None:
write
Documentation on the ``staged`` parameter explains the concept of staged data in more detail.
"""
log.debug("Deleting staged data for symbol={}", symbol)
self._nvs.remove_incomplete(symbol)

def finalize_staged_data(
Expand Down Expand Up @@ -1845,6 +1854,7 @@ def finalize_staged_data(
2000-01-03 3
2000-01-04 4
"""
log.debug("Finalizing staged data for symbol={}, mode={}", symbol, mode)
mode = Library._normalize_staged_data_mode(mode)

return self._nvs.compact_incomplete(
Expand Down Expand Up @@ -1965,6 +1975,7 @@ def sort_and_finalize_staged_data(
2024-01-03 3
2024-01-04 4
"""
log.debug("Sorting and finalizing staged data for symbol={}, mode={}", symbol, mode)
mode = Library._normalize_staged_data_mode(mode)
compaction_result = self._nvs.version_store.sort_merge(
symbol,
Expand Down Expand Up @@ -2005,6 +2016,7 @@ def get_staged_symbols(self) -> List[str]:
write
Documentation on the ``staged`` parameter explains the concept of staged data in more detail.
"""
log.debug("Getting staged symbols")
return self._nvs.list_symbols_with_incomplete_data()

def read(
Expand Down Expand Up @@ -2116,6 +2128,7 @@ def read(
----
column: [[5,6,7]]
"""
log.debug("Reading symbol={}, as_of={}, date_range={}, columns={}, lazy={}", symbol, as_of, date_range, columns, lazy)
if lazy:
return LazyDataFrame(
self,
Expand Down Expand Up @@ -2233,6 +2246,7 @@ def read_batch(
--------
read
"""
log.debug("Reading batch of {} symbols, lazy={}", len(symbols), lazy)
symbol_strings = []
as_ofs = []
date_ranges = []
Expand Down Expand Up @@ -2422,6 +2436,7 @@ def read_batch_and_join(
2025-01-01 00:00:00 1
2025-01-02 00:00:00 2
"""
log.debug("Reading and joining batch of {} symbols", len(symbols))
symbol_strings = []
as_ofs = []
date_ranges = []
Expand Down Expand Up @@ -2487,6 +2502,7 @@ def read_metadata(self, symbol: str, as_of: Optional[AsOf] = None) -> VersionedI
Structure containing metadata and version number of the affected symbol in the store. The data attribute
will be None.
"""
log.debug("Reading metadata for symbol={}, as_of={}", symbol, as_of)
return self._nvs.read_metadata(symbol, as_of, iterate_snapshots_if_tombstoned=False)

def read_metadata_batch(self, symbols: List[Union[str, ReadInfoRequest]]) -> List[Union[VersionedItem, DataError]]:
Expand All @@ -2512,6 +2528,7 @@ def read_metadata_batch(self, symbols: List[Union[str, ReadInfoRequest]]) -> Lis
--------
read_metadata
"""
log.debug("Reading metadata batch of {} symbols", len(symbols))
symbol_strings, as_ofs = self.parse_list_of_symbols(symbols)

include_errors_and_none_meta = True
Expand Down Expand Up @@ -2549,6 +2566,7 @@ def write_metadata(
VersionedItem
Structure containing metadata and version number of the affected symbol in the store.
"""
log.debug("Writing metadata for symbol={}", symbol)
return self._nvs.write_metadata(symbol, metadata, prune_previous_version=prune_previous_versions)

def write_metadata_batch(
Expand Down Expand Up @@ -2599,6 +2617,7 @@ def write_metadata_batch(
>>> lib.read_metadata("symbol_2")
{'the': 'metadata_2'}
"""
log.debug("Writing metadata batch of {} symbols", len(write_metadata_payloads))

self._raise_if_duplicate_symbols_in_batch(write_metadata_payloads)
throw_on_error = False
Expand Down Expand Up @@ -2651,6 +2670,7 @@ def snapshot(
If a symbol or the version of symbol specified in versions does not exist or has been deleted in the library,
or, the library has no symbol.
"""
log.debug("Creating snapshot={}", snapshot_name)
# We deliberately check the snapshot name only with the v2 API to avoid disruption to legacy users on the v1 API
self._nvs.version_store.verify_snapshot(snapshot_name)
self._nvs.snapshot(snap_name=snapshot_name, metadata=metadata, skip_symbols=skip_symbols, versions=versions)
Expand All @@ -2677,6 +2697,7 @@ def delete(self, symbol: str, versions: Optional[Union[int, Iterable[int]]] = No
versions
Version or versions of symbol to delete. If ``None`` then all versions will be deleted.
"""
log.debug("Deleting symbol={}, versions={}", symbol, versions)
if versions is None:
self._nvs.delete(symbol)
return
Expand All @@ -2703,6 +2724,7 @@ def delete_batch(self, delete_requests: List[Union[str, DeleteRequest]]) -> List
List of DataError objects, one for each symbol that was not deleted due to an error.
If the symbol was already deleted, there will be no error, just a warning.
"""
log.debug("Deleting batch of {} symbols", len(delete_requests))
symbols = []
versions = []

Expand Down Expand Up @@ -2731,6 +2753,7 @@ def prune_previous_versions(self, symbol) -> None:
symbol : `str`
Symbol name to prune.
"""
log.debug("Pruning previous versions for symbol={}", symbol)
self._nvs.prune_previous_versions(symbol)

def delete_data_in_range(
Expand Down Expand Up @@ -2765,6 +2788,7 @@ def delete_data_in_range(
2018-01-03 7
2018-01-04 8
"""
log.debug("Deleting data in range for symbol={}, date_range={}", symbol, date_range)
if date_range is None:
raise ArcticInvalidApiUsageException("date_range must be given but was None")
self._nvs.delete(symbol, date_range=date_range, prune_previous_version=prune_previous_versions)
Expand All @@ -2784,6 +2808,7 @@ def delete_snapshot(self, snapshot_name: str) -> None:
Exception
If the named snapshot does not exist.
"""
log.debug("Deleting snapshot={}", snapshot_name)
return self._nvs.delete_snapshot(snapshot_name)

def list_symbols(self, snapshot_name: Optional[str] = None, regex: Optional[str] = None) -> List[str]:
Expand All @@ -2804,6 +2829,7 @@ def list_symbols(self, snapshot_name: Optional[str] = None, regex: Optional[str]
List[str]
Symbols in the library.
"""
log.debug("Listing symbols, snapshot_name={}, regex={}", snapshot_name, regex)
return self._nvs.list_symbols(snapshot=snapshot_name, regex=regex)

def has_symbol(self, symbol: str, as_of: Optional[AsOf] = None) -> bool:
Expand Down Expand Up @@ -2838,6 +2864,7 @@ def has_symbol(self, symbol: str, as_of: Optional[AsOf] = None) -> bool:
>>> "another_symbol" in lib
False
"""
log.debug("Checking if symbol={} exists, as_of={}", symbol, as_of)
return self._nvs.has_symbol(symbol, as_of=as_of)

def list_snapshots(self, load_metadata: Optional[bool] = True) -> Union[List[str], Dict[str, Any]]:
Expand All @@ -2855,6 +2882,7 @@ def list_snapshots(self, load_metadata: Optional[bool] = True) -> Union[List[str
Snapshots in the library. Returns a list of snapshot names if load_metadata is False, otherwise returns a
dictionary where keys are snapshot names and values are metadata associated with that snapshot.
"""
log.debug("Listing snapshots, load_metadata={}", load_metadata)
result = self._nvs.list_snapshots(load_metadata)
return result if load_metadata else list(result.keys())

Expand Down Expand Up @@ -2903,6 +2931,7 @@ def list_versions(
>>> versions["symbol", 1].snapshots
["my_snap"]
"""
log.debug("Listing versions for symbol={}, latest_only={}", symbol, latest_only)
versions = self._nvs.list_versions(
symbol=symbol,
snapshot=snapshot,
Expand Down Expand Up @@ -2953,6 +2982,7 @@ def head(
If lazy is False, VersionedItem object that contains a .data and .metadata element.
If lazy is True, a LazyDataFrame object on which further querying can be performed prior to collect.
"""
log.debug("Reading head of symbol={}, n={}, as_of={}", symbol, n, as_of)
if lazy:
q = QueryBuilder().head(n)
return LazyDataFrame(
Expand Down Expand Up @@ -3019,6 +3049,7 @@ def tail(
If lazy is False, VersionedItem object that contains a .data and .metadata element.
If lazy is True, a LazyDataFrame object on which further querying can be performed prior to collect.
"""
log.debug("Reading tail of symbol={}, n={}, as_of={}", symbol, n, as_of)
if lazy:
q = QueryBuilder().tail(n)
return LazyDataFrame(
Expand Down Expand Up @@ -3087,6 +3118,7 @@ def get_description(self, symbol: str, as_of: Optional[AsOf] = None) -> SymbolDe
SymbolDescription
For documentation on each field.
"""
log.debug("Getting description for symbol={}, as_of={}", symbol, as_of)
info = self._nvs.get_info(
symbol,
as_of,
Expand Down Expand Up @@ -3145,6 +3177,7 @@ def get_description_batch(
SymbolDescription
For documentation on each field.
"""
log.debug("Getting description batch of {} symbols", len(symbols))
symbol_strings, as_ofs = self.parse_list_of_symbols(symbols)

throw_on_error = False
Expand All @@ -3165,6 +3198,7 @@ def reload_symbol_list(self) -> None:
This can take a long time on large libraries or certain S3 implementations, and once started, it cannot be
safely interrupted. If the call is interrupted somehow (exception/process killed), please call this again ASAP.
"""
log.debug("Reloading symbol list")
self._nvs.version_store.reload_symbol_list()

def compact_symbol_list(self) -> None:
Expand All @@ -3183,6 +3217,7 @@ def compact_symbol_list(self) -> None:
InternalException
Storage lock required to compact the symbol list could not be acquired
"""
log.debug("Compacting symbol list")
return self._nvs.compact_symbol_list()

def compact_data_experimental(
Expand Down Expand Up @@ -3249,6 +3284,7 @@ def compact_data_experimental(
>>> len(lib_tool.read_index("sym"))
1
"""
log.debug("Compacting data for symbol={}, rows_per_segment={}", symbol, rows_per_segment)
return self._nvs.compact_data_experimental(symbol, rows_per_segment, prune_previous_versions)

def is_symbol_fragmented(self, symbol: str, segment_size: Optional[int] = None) -> bool:
Expand All @@ -3273,6 +3309,7 @@ def is_symbol_fragmented(self, symbol: str, segment_size: Optional[int] = None)
-------
bool
"""
log.debug("Checking fragmentation for symbol={}", symbol)
return self._nvs.is_symbol_fragmented(symbol, segment_size)

def defragment_symbol_data(
Expand Down Expand Up @@ -3340,6 +3377,7 @@ def defragment_symbol_data(
Config map setting - SymbolDataCompact.SegmentCount will be replaced by a library setting
in the future. This API will allow overriding the setting as well.
"""
log.debug("Defragmenting symbol={}, segment_size={}", symbol, segment_size)
return self._nvs.defragment_symbol_data(symbol, segment_size, prune_previous_versions)

def merge_experimental(
Expand Down Expand Up @@ -3431,6 +3469,7 @@ def merge_experimental(
1970-01-01 00:00:00.000000002 100
1970-01-01 00:00:00.000000003 3
"""
log.debug("Merging symbol={} with strategy={}", symbol, strategy)
return self._nvs.merge_experimental(
symbol=symbol,
source=source,
Expand Down