Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Categorical polars update #1134

Closed
Closed
24 changes: 16 additions & 8 deletions dataprofiler/profilers/categorical_column_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import cast

import datasketches
import polars as pl
from pandas import DataFrame, Series

from .. import dp_logging
Expand Down Expand Up @@ -474,7 +475,7 @@ def _check_stop_condition_is_met(self, sample_size: int, unqiue_ratio: float):
return True
return False

def _update_stop_condition(self, data: DataFrame):
def _update_stop_condition(self, data: DataFrame | pl.DataFrame):
"""Return value stop_condition_is_met given stop conditions.

:param data: Dataframe currently being processed by categorical profiler
Expand All @@ -497,8 +498,8 @@ def _get_categories_cms(self, df_series, len_df):
"""Return count min sketch and heavy hitters for both the batch and stream case.

:param df_series: Series currently being processed by categorical profiler
:type df_series: Series
:param len_df: the total number of samples iin df_series
:type df_series: polars.Series
:param len_df: the total number of samples in df_series
:type len_df: int
:return: cms, heavy_hitter_dict, missing_heavy_hitter_dict
"""
Expand Down Expand Up @@ -601,13 +602,13 @@ def _get_categories_full(self, df_series) -> dict:
:return: dict of counts for each unique value
:rtype: dict
"""
category_count: dict = df_series.value_counts(dropna=False).to_dict()
category_count: dict = Series(df_series).value_counts(dropna=False).to_dict()
return category_count

@BaseColumnProfiler._timeit(name="categories")
def _update_categories(
self,
df_series: DataFrame,
df_series: DataFrame | pl.DataFrame,
prev_dependent_properties: dict = None,
subset_properties: dict = None,
) -> None:
Expand Down Expand Up @@ -657,7 +658,9 @@ def _update_categories(
if self._stop_condition_is_met:
self._categories = {}

def _update_helper(self, df_series_clean: Series, profile: dict) -> None:
def _update_helper(
self, df_series_clean: Series | pl.Series, profile: dict
) -> None:
"""
Update col profile properties with clean dataset and its known profile.

Expand All @@ -669,7 +672,7 @@ def _update_helper(self, df_series_clean: Series, profile: dict) -> None:
"""
self._update_column_base_properties(profile)

def update(self, df_series: Series) -> CategoricalColumn:
def update(self, df_series: pl.Series | Series) -> CategoricalColumn:
"""
Update the column profile.

Expand All @@ -682,12 +685,17 @@ def update(self, df_series: Series) -> CategoricalColumn:
if len(df_series) == 0 or self._stop_condition_is_met:
return self

if isinstance(df_series, pl.Series):
pandas_df = df_series.to_pandas()
else:
pandas_df = df_series

profile = dict(sample_size=len(df_series))
CategoricalColumn._update_categories(self, df_series)
BaseColumnProfiler._perform_property_calcs(
self,
self.__calculations,
df_series=df_series,
df_series=pandas_df,
prev_dependent_properties={},
subset_properties=profile,
)
Expand Down
25 changes: 13 additions & 12 deletions dataprofiler/profilers/datetime_column_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import numpy as np
import pandas as pd
import polars as pl

from . import profiler_utils
from .base_column_profilers import BaseColumnPrimitiveTypeProfiler, BaseColumnProfiler
Expand Down Expand Up @@ -256,8 +257,7 @@ def _get_datetime_profile(cls, df_series: pd.Series) -> dict:
profile: dict = dict()
activated_date_formats: list = list()
len_df = len(df_series)

is_row_datetime = pd.Series(np.full((len(df_series)), False))
is_row_datetime = pd.Series(np.full((len_df), False))

min_value = None
max_value = None
Expand All @@ -275,18 +275,19 @@ def _get_datetime_profile(cls, df_series: pd.Series) -> dict:
)
)

df_dates = valid_dates[~valid_dates.isnull()]
df_dates = pl.Series(valid_dates[~valid_dates.isnull()])

if "%b" in date_format and not df_dates.empty:
if "%b" in date_format and not df_dates.is_empty():
may_month = 5 # May can be %b or %B we want to force, so check
all_may = df_dates.apply(lambda x: x.month == may_month).all()
all_may = df_dates.map_elements(lambda x: x.month == may_month)
all_may = pl.Series(all_may).all()
if all_may:
valid_dates[:] = np.nan
df_dates = pd.Series([], dtype=object)
valid_dates[:] = None
df_dates = pl.Series([])

# Create mask to avoid null dates
null_date_mask = valid_dates.isnull()
np_date_array = df_dates.values
np_date_array = df_dates.to_numpy()

# check off any values which were found to be datetime
is_row_datetime[~is_row_datetime] = (~null_date_mask).values
Expand All @@ -298,18 +299,18 @@ def _get_datetime_profile(cls, df_series: pd.Series) -> dict:
max_idx = np.argmax(np_date_array)

# Selects the min, ma value objects for comparison
tmp_min_value_obj = df_dates.iloc[min_idx]
tmp_max_value_obj = df_dates.iloc[max_idx]
tmp_min_value_obj = df_dates.item(int(min_idx))
tmp_max_value_obj = df_dates.item(int(max_idx))

# If minimum value, keep reference
if tmp_min_value_obj < min_value_obj:
min_value = df_series[~null_date_mask].iloc[min_idx]
min_value_obj = tmp_min_value_obj
min_value_obj = pd.Timestamp(tmp_min_value_obj)

# If maximum value, keep reference
if tmp_max_value_obj > max_value_obj:
max_value = df_series[~null_date_mask].iloc[max_idx]
max_value_obj = tmp_max_value_obj
max_value_obj = pd.Timestamp(tmp_max_value_obj)

df_series = df_series[null_date_mask]

Expand Down
2 changes: 1 addition & 1 deletion dataprofiler/profilers/profiler_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ def perform_chi_squared_test_for_homogeneity(
# If one or less categories, we have zero/negative degrees of freedom,
# which is not an appropriate value for this context
num_cats = len(cat_counts)
if len(cat_counts) <= 1:
if num_cats <= 1:
warnings.warn(
"Insufficient number of categories. "
"Chi-squared test cannot be performed.",
Expand Down
27 changes: 15 additions & 12 deletions dataprofiler/profilers/unstructured_labeler_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from collections import defaultdict

from pandas import Series
import polars as pl

from ..labelers.base_data_labeler import BaseDataLabeler
from ..labelers.data_labelers import DataLabeler
Expand Down Expand Up @@ -155,25 +155,27 @@ def label_encoding(self) -> list[str]:
return self.data_labeler.labels

@BaseColumnProfiler._timeit(name="data_labeler_predict")
def _update_helper(self, df_series_clean: Series, profile: dict) -> None:
def _update_helper(self, df_series_clean: pl.Series, profile: dict) -> None:
"""
Update col profile properties with clean dataset and its known profile.

:param df_series_clean: df series with nulls removed
:type df_series_clean: pandas.core.series.Series
:type df_series_clean: polars.Series
:param profile: profile dictionary
:type profile: dict
:return: None
"""
data_ndarray = df_series_clean.to_numpy()

# this will get char_level predictions as output
predictions = self.data_labeler.predict(df_series_clean)
predictions = self.data_labeler.predict(data_ndarray)

# also store spacy/NER format
postprocessor = CharPostprocessor(
use_word_level_argmax=True, output_format="NER"
)
format_predictions = postprocessor.process(
df_series_clean, predictions.copy(), self.data_labeler.label_mapping
data_ndarray, predictions.copy(), self.data_labeler.label_mapping
)

# Update counts and percent values
Expand All @@ -188,14 +190,15 @@ def _update_helper(self, df_series_clean: Series, profile: dict) -> None:
# CHARACTERS/WORDS PROCESSED
self._update_column_base_properties(profile)

def update(self, df_series: Series) -> None:
def update(self, df_series: pl.Series) -> None:
"""Update profile."""
if len(df_series) == 0:
return
profile = dict(
char_sample_size=self.char_sample_size,
word_sample_size=self.word_sample_size,
)

self._update_helper(df_series, profile)

@property
Expand Down Expand Up @@ -278,21 +281,21 @@ def _update_true_char_label_counts(self, predictions: list) -> None:
self.char_sample_size += len(sample)

def _update_postprocess_char_label_counts(
self, df_series_clean: Series, format_predictions: dict
self, df_series_clean: pl.Series, format_predictions: dict
) -> None:
"""
Update the postprocess character label counts.

:param df_series_clean: df series with nulls removed
:type df_series_clean: pandas.core.series.Series
:type df_series_clean: polars.Series
:param format_predictions: contains dict of samples with predictions on
the character level in congruence with the word level predictions
:type format_predictions: Dict
:return: None
"""
char_label_counts = self.entity_counts["postprocess_char_level"]

for index, result in enumerate(zip(df_series_clean, format_predictions)):
for result in zip(df_series_clean, format_predictions):
text, entities = result
index = 0
for entity in entities:
Expand All @@ -308,20 +311,20 @@ def _update_postprocess_char_label_counts(
char_label_counts["UNKNOWN"] += len(text) - index

def _update_word_label_counts(
self, df_series_clean: Series, format_predictions: dict
self, df_series_clean: pl.Series, format_predictions: dict
) -> None:
"""
Update the sorted dictionary of each entity count.

:param df_series_clean: df series with nulls removed
:type df_series_clean: pandas.core.series.Series
:type df_series_clean: polars.Series
:param format_predictions: Dictionary of sample text and entities
:type format_predictions: dict
:return: None
"""
word_label_counts = self.entity_counts["word_level"]

for index, result in enumerate(zip(df_series_clean, format_predictions)):
for result in zip(df_series_clean, format_predictions):
text, entities = result
begin_word_idx = -1
index = 0
Expand Down
53 changes: 38 additions & 15 deletions dataprofiler/profilers/unstructured_text_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import warnings
from collections import Counter, defaultdict

import polars as pl
from numpy import ndarray
from pandas import DataFrame, Series

Expand Down Expand Up @@ -667,15 +668,15 @@ def profile(self) -> dict:
@BaseColumnProfiler._timeit(name="vocab")
def _update_vocab(
self,
data: list | ndarray | DataFrame,
data: list | ndarray | DataFrame | pl.DataFrame,
prev_dependent_properties: dict = None,
subset_properties: dict = None,
) -> None:
"""
Find the vocabulary counts used in the text samples.

:param data: list or array of data from which to extract vocab
:type data: Union[list, numpy.array, pandas.DataFrame]
:type data: Union[list, numpy.array, pandas.DataFrame, polars.DataFrame]
:param prev_dependent_properties: Contains all the previous properties
that the calculations depend on.
:type prev_dependent_properties: dict
Expand All @@ -690,15 +691,15 @@ def _update_vocab(
@BaseColumnProfiler._timeit(name="words")
def _update_words(
self,
data: list | ndarray | DataFrame,
data: list | ndarray | DataFrame | pl.DataFrame,
prev_dependent_properties: dict = None,
subset_properties: dict = None,
) -> None:
"""
Find unique words and word count used in the text samples.

:param data: list or array of data from which to extract vocab
:type data: Union[list, numpy.array, pandas.DataFrame]
:type data: Union[list, numpy.array, pandas.DataFrame, polars.DataFrame]
:param prev_dependent_properties: Contains all the previous properties
that the calculations depend on.
:type prev_dependent_properties: dict
Expand All @@ -708,37 +709,54 @@ def _update_words(
:return: None
"""
if not self._is_case_sensitive:
words = (
[w.strip(string.punctuation) for w in row.lower().split()]
for row in data
)
if isinstance(data, pl.DataFrame):
words = (
[
w.strip(string.punctuation)
for w in row.str.to_lowercase().str.split(by=" ")
]
for row in data
)
else:
words = (
[w.strip(string.punctuation) for w in row.lower().split()]
for row in data
)
else:
words = ([w.strip(string.punctuation) for w in row.split()] for row in data)
if isinstance(data, pl.DataFrame):
words = (
[w.strip(string.punctuation) for w in row.str.split(by=" ")]
for row in data
)
else:
words = (
[w.strip(string.punctuation) for w in row.split()] for row in data
)
word_count = Counter(itertools.chain.from_iterable(words))

for w, c in word_count.items():
if w and w.lower() not in self._stop_words:
self.word_count.update({w: c})

def _update_helper(self, data: Series, profile: dict) -> None:
def _update_helper(self, data: pl.Series, profile: dict) -> None:
"""
Update col profile properties with clean dataset and its known null parameters.

:param data: df series with nulls removed
:type data: pandas.core.series.Series
:type data: polars.Series
:param profile: text profile dictionary
:type profile: dict
:return: None
"""
self.sample_size += profile.pop("sample_size")
self.metadata = profile

def update(self, data: Series) -> TextProfiler:
def update(self, data: Series | pl.Series) -> TextProfiler:
"""
Update the column profile.

:param data: df series
:type data: pandas.core.series.Series
:type data: polars.Series
:return: updated TextProfiler
:rtype: TextProfiler
"""
Expand All @@ -748,14 +766,19 @@ def update(self, data: Series) -> TextProfiler:

profile = dict(sample_size=len_data)

if isinstance(data, pl.Series):
data_pandas = data.to_pandas()
else:
data_pandas = data

BaseColumnProfiler._perform_property_calcs(
self, # type: ignore
self.__calculations,
df_series=data,
df_series=data_pandas,
prev_dependent_properties={},
subset_properties=profile,
)

self._update_helper(data, profile)
self._update_helper(pl.Series(data), profile)

return self
Loading
Loading