Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified docs/.DS_Store
Binary file not shown.
65 changes: 56 additions & 9 deletions src/quends/base/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def deduplicate_history(history):
return list(reversed(out))



def to_native_types(obj):
"""
Recursively convert NumPy scalar and array types in nested structures to native Python types.
Expand Down Expand Up @@ -116,12 +115,11 @@ def _add_history(self, operation, options):

Private helper; not intended for external use.
"""
options = {k: v for k, v in options.items() if k not in ('self', 'cls', '__class__')}
self._history.append({
"operation": operation,
"options": options
})

options = {
k: v for k, v in options.items() if k not in ("self", "cls", "__class__")
}
self._history.append({"operation": operation, "options": options})

def get_metadata(self):
"""
Return the deduplicated operation history for this DataStream.
Expand All @@ -131,8 +129,7 @@ def get_metadata(self):
The deduplicated operation history, with options for each operation.
"""
return deduplicate_history(self._history)



def head(self, n=5):
"""
Return the first `n` rows of the underlying DataFrame.
Expand Down Expand Up @@ -906,6 +903,56 @@ def is_stationary(self, columns):
results[column] = f"Error: {e}"
return results

def make_stationary(self, col, n_pts_orig, workflow):
"""
Attempt to make the data stream into being stationary by removing an initial
fraction of data.

Parameters
----------
col : str
n_pts_orig : int
workflow : RobustWorkflow

Returns
-------
self : DataStream
stationary : bool
"""
stationary = self.is_stationary([col])[
col
] # is_stationary() returns dictionary. The value for key qoi tells us if it is stationary
n_pts = len(self.df)

ds = self

n_dropped = 0
while (
not stationary
and not workflow._operate_safe
and n_pts > workflow._n_pts_min
and n_pts > workflow._n_pts_frac_min * n_pts_orig
):
# See if we get a stationary stream if we drop some initial fraction of the data
n_drop = int(n_pts * workflow._drop_fraction)
df_shortened = ds.df.iloc[n_drop:]
ds = DataStream(df_shortened)
n_pts = len(ds.df)
n_dropped = n_pts_orig - n_pts
stationary = ds.is_stationary([col])[col]

if workflow._verbosity > 0:
if stationary:
print(
f"Data stream was not stationary, but is stationary after dropping first {n_dropped} points."
)
else:
print(
f"Data stream is not stationary, even after dropping first {n_dropped} points."
)

return ds, stationary

# === Compatibility wrappers for legacy tests ===

def mean(self, column_name=None, method="non-overlapping", window_size=None):
Expand Down
Loading