Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/arcticdb/pipeline/query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ std::unique_ptr<util::BitSet> build_bitset_for_index(
const auto range_start = std::get<timestamp>(rg.start_);
const auto range_end = std::get<timestamp>(rg.end_);
for(auto i = 0u; i < container.size(); ++i) {
const auto intersects = range_intersects<RawType>(range_start, range_end, *start_idx_pos, *end_idx_pos - 1);
const auto intersects = range_intersects<RawType>(range_start, range_end, *start_idx_pos, *end_idx_pos);
(*res)[i] = intersects;
if(intersects)
ARCTICDB_DEBUG(log::version(), "range intersects at {}", i);
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ static void read_indexed_keys_to_pipeline(
pipeline_context->norm_meta_ = std::make_unique<arcticdb::proto::descriptors::NormalizationMetadata>(std::move(*index_segment_reader.mutable_tsd().mutable_proto().mutable_normalization()));
pipeline_context->user_meta_ = std::make_unique<arcticdb::proto::descriptors::UserDefinedMetadata>(std::move(*index_segment_reader.mutable_tsd().mutable_proto().mutable_user_meta()));
pipeline_context->bucketize_dynamic_ = bucketize_dynamic;
ARCTICDB_DEBUG(log::version(), "read_indexed_keys_to_pipeline: Symbol {} Found {} keys with {} total rows", pipeline_context->slice_and_keys_.size(), pipeline_context->total_rows_, version_info.symbol());
ARCTICDB_DEBUG(log::version(), "read_indexed_keys_to_pipeline: Symbol {} Found {} keys with {} total rows", version_info.symbol(), pipeline_context->slice_and_keys_.size(), pipeline_context->total_rows_);
}

// Returns true if there are staged segments
Expand Down
109 changes: 76 additions & 33 deletions python/arcticdb/version_store/_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@

FlattenResult = namedtuple("FlattenResult", ["is_recursive_normalize_preferred", "metastruct", "to_write"])

def resolve_defaults(param_name, proto_cfg, global_default, existing_value=None, uppercase=True, runtime_options=None, **kwargs):

def resolve_defaults(
param_name, proto_cfg, global_default, existing_value=None, uppercase=True, runtime_options=None, **kwargs
):
"""
Precedence: existing_value > kwargs > runtime_defaults > env > proto_cfg > global_default

Expand Down Expand Up @@ -350,7 +353,7 @@ def _initialize(self, library, env, lib_cfg, custom_normalizer, open_mode, nativ
self._init_norm_failure_handler()
self._open_mode = open_mode
self._native_cfg = native_cfg
self._runtime_options=runtime_options
self._runtime_options = runtime_options

def set_output_format(self, output_format: Union[OutputFormat, str]):
if self._runtime_options is None:
Expand Down Expand Up @@ -542,8 +545,18 @@ def _try_flatten_and_write_composite_object(
def resolve_defaults(param_name, proto_cfg, global_default, existing_value=None, uppercase=True, **kwargs):
return resolve_defaults(param_name, proto_cfg, global_default, existing_value, uppercase, **kwargs)

def resolve_runtime_defaults(self, param_name, proto_cfg, global_default, existing_value=None, uppercase=True, **kwargs):
return resolve_defaults(param_name, proto_cfg, global_default, existing_value, uppercase, runtime_options=self._runtime_options, **kwargs)
def resolve_runtime_defaults(
self, param_name, proto_cfg, global_default, existing_value=None, uppercase=True, **kwargs
):
return resolve_defaults(
param_name,
proto_cfg,
global_default,
existing_value,
uppercase,
runtime_options=self._runtime_options,
**kwargs,
)

def _write_options(self):
return self._lib_cfg.lib_desc.version.write_options
Expand Down Expand Up @@ -2012,7 +2025,6 @@ def _resolve_empty_columns(self, columns, implement_read_index):
columns = None
return columns


def read(
self,
symbol: str,
Expand Down Expand Up @@ -2174,9 +2186,15 @@ def _post_process_dataframe(self, read_result, read_query, implement_read_index=
start_idx, end_idx = self._compute_filter_start_end_row(read_result, read_query)
data = []
for c in read_result.frame_data.data:
data.append(c[start_idx:end_idx])
data.append(c[start_idx - 1 : end_idx])
row_count = len(data[0]) if len(data) else 0
read_result.frame_data = FrameData(data, read_result.frame_data.names, read_result.frame_data.index_columns, row_count, read_result.frame_data.offset)
read_result.frame_data = FrameData(
data,
read_result.frame_data.names,
read_result.frame_data.index_columns,
row_count,
read_result.frame_data.offset,
)

vitem = self._adapt_read_res(read_result)

Expand Down Expand Up @@ -2368,19 +2386,25 @@ def compact_incomplete(
prune_previous_version,
validate_index,
delete_staged_data_on_failure,
stage_results=_stage_results
stage_results=_stage_results,
)

if isinstance(compaction_result, ae.version_store.VersionedItem):
return self._convert_thin_cxx_item_to_python(compaction_result, metadata)
elif isinstance(compaction_result, List):
# We expect this to be a list of errors
check(compaction_result, "List of errors in compaction result should never be empty")
check(all(isinstance(c, KeyNotFoundInStageResultInfo) for c in compaction_result), "Compaction errors should always be KeyNotFoundInStageResultInfo")
raise MissingKeysInStageResultsError("Missing keys during compaction", tokens_with_missing_keys=compaction_result)
check(
all(isinstance(c, KeyNotFoundInStageResultInfo) for c in compaction_result),
"Compaction errors should always be KeyNotFoundInStageResultInfo",
)
raise MissingKeysInStageResultsError(
"Missing keys during compaction", tokens_with_missing_keys=compaction_result
)
else:
raise RuntimeError(f"Unexpected type for compaction_result {type(compaction_result)}. This indicates a bug in ArcticDB.")

raise RuntimeError(
f"Unexpected type for compaction_result {type(compaction_result)}. This indicates a bug in ArcticDB."
)

@staticmethod
def _get_index_columns_from_descriptor(descriptor):
Expand All @@ -2401,7 +2425,6 @@ def _get_index_columns_from_descriptor(descriptor):

return index_columns


def _adapt_read_res(self, read_result: ReadResult) -> VersionedItem:
if isinstance(read_result.frame_data, ArrowOutputFrame):
frame_data = read_result.frame_data
Expand Down Expand Up @@ -2444,7 +2467,6 @@ def _adapt_read_res(self, read_result: ReadResult) -> VersionedItem:
timestamp=read_result.version.timestamp,
)


def list_versions(
self,
symbol: Optional[str] = None,
Expand Down Expand Up @@ -3020,29 +3042,39 @@ def will_item_be_pickled(self, item, recursive_normalizers: Optional[bool] = Non
result = True

result |= norm_meta.WhichOneof("input_type") == "msg_pack_frame"
log_warning_message = get_config_int("VersionStore.WillItemBePickledWarningMsg") != 0 and log.is_active(_LogLevel.WARN)
log_warning_message = get_config_int("VersionStore.WillItemBePickledWarningMsg") != 0 and log.is_active(
_LogLevel.WARN
)
if result and log_warning_message:
proto_cfg = self._lib_cfg.lib_desc.version.write_options
resolved_recursive_normalizers = resolve_defaults(
"recursive_normalizers", proto_cfg, global_default=False, uppercase=False, **{"recursive_normalizers": recursive_normalizers}
"recursive_normalizers",
proto_cfg,
global_default=False,
uppercase=False,
**{"recursive_normalizers": recursive_normalizers},
)
warning_msg = ""
is_recursive_normalize_preferred, _, _ = self._try_flatten(item, "")
if resolved_recursive_normalizers and is_recursive_normalize_preferred:
warning_msg = ("As recursive_normalizers is enabled, the item will be "
"recursively normalized in `write`. However, this API will "
"still return True for historical reason, such as recursively "
"normalized data not being data_range searchable like "
"pickled data. ")
warning_msg = (
"As recursive_normalizers is enabled, the item will be "
"recursively normalized in `write`. However, this API will "
"still return True for historical reason, such as recursively "
"normalized data not being data_range searchable like "
"pickled data. "
)
fl = Flattener()
if fl.will_obj_be_partially_pickled(item):
warning_msg += "Please note the item will still be partially pickled."
elif not is_recursive_normalize_preferred:
warning_msg = ("The item will be msgpack normalized in `write`. "
"Msgpack normalization is considered `pickled` in ArcticDB, "
"therefore this API will return True. ")
warning_msg = (
"The item will be msgpack normalized in `write`. "
"Msgpack normalization is considered `pickled` in ArcticDB, "
"therefore this API will return True. "
)
log.warning(warning_msg)

return result

@staticmethod
Expand Down Expand Up @@ -3529,18 +3561,29 @@ def resolve_dynamic_strings(kwargs):

return dynamic_strings


def _log_warning_on_writing_empty_dataframe(dataframe, symbol):
# We allow passing other things to write such as integers and strings and python arrays but we care only about
# dataframes and series
is_dataframe = isinstance(dataframe, pd.DataFrame)
is_series = isinstance(dataframe, pd.Series)
if (is_series or is_dataframe) and dataframe.empty and os.getenv("ARCTICDB_WARN_ON_WRITING_EMPTY_DATAFRAME", "1") == "1":
if (
(is_series or is_dataframe)
and dataframe.empty
and os.getenv("ARCTICDB_WARN_ON_WRITING_EMPTY_DATAFRAME", "1") == "1"
):
empty_column_type = pd.DataFrame({"a": []}).dtypes["a"] if is_dataframe else pd.Series([]).dtype
current_dtypes = list(dataframe.dtypes.items()) if is_dataframe else [(dataframe.name, dataframe.dtype)]
log.warning("Writing empty dataframe to ArcticDB for symbol \"{}\". The dtypes of empty columns depend on the"
"Pandas version being used. This can lead to unexpected behavior in the processing pipeline. For"
" example if the empty columns are of object dtype they cannot be part of numeric computations in"
"the processing pipeline such as filtering (qb = qb[qb['empty_column'] < 5]) or projection"
"(qb = qb.apply('new', qb['empty_column'] + 5)). Pandas version is: {}, the default dtype for empty"
" column is: {}. Column types in the original input: {}. Parameter \"coerce_columns\" can be used"
" to explicitly set the types of dataframe columns", symbol, PANDAS_VERSION, empty_column_type, current_dtypes)
log.warning(
'Writing empty dataframe to ArcticDB for symbol "{}". The dtypes of empty columns depend on the'
"Pandas version being used. This can lead to unexpected behavior in the processing pipeline. For"
" example if the empty columns are of object dtype they cannot be part of numeric computations in"
"the processing pipeline such as filtering (qb = qb[qb['empty_column'] < 5]) or projection"
"(qb = qb.apply('new', qb['empty_column'] + 5)). Pandas version is: {}, the default dtype for empty"
' column is: {}. Column types in the original input: {}. Parameter "coerce_columns" can be used'
" to explicitly set the types of dataframe columns",
symbol,
PANDAS_VERSION,
empty_column_type,
current_dtypes,
)
Loading
Loading