diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index f7f8c89ab2783..d295d366b869e 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -21,6 +21,7 @@ import datetime import re import inspect +import math import warnings from collections.abc import Mapping from functools import partial, reduce @@ -116,6 +117,7 @@ verify_temp_column_name, SPARK_CONF_ARROW_ENABLED, log_advice, + is_ansi_mode_enabled, ) from pyspark.pandas.datetimes import DatetimeMethods from pyspark.pandas.spark.accessors import SparkSeriesMethods @@ -3367,15 +3369,39 @@ def autocorr(self, lag: int = 1) -> float: scol = self.spark.column if lag == 0: corr = sdf.select(F.corr(scol, scol)).head()[0] + return np.nan if corr is None else corr else: lag_scol = F.lag(scol, lag).over(Window.orderBy(NATURAL_ORDER_COLUMN_NAME)) lag_col_name = verify_temp_column_name(sdf, "__autocorr_lag_tmp_col__") - corr = ( - sdf.withColumn(lag_col_name, lag_scol) - .select(F.corr(scol, F.col(lag_col_name))) - .head()[0] - ) - return np.nan if corr is None else corr + + spark_session = self._internal.spark_frame.sparkSession + if is_ansi_mode_enabled(spark_session): + sdf = sdf.withColumn(lag_col_name, lag_scol) + sdf_mean = sdf.select( + F.avg(scol).alias("mean_x"), + F.avg(F.col(lag_col_name)).alias("mean_y"), + ).collect()[0] + + mean_x = sdf_mean["mean_x"] + mean_y = sdf_mean["mean_y"] + + corr_expr = F.try_divide( + F.avg((scol - mean_x) * (F.col(lag_col_name) - mean_y)), + F.stddev_samp(scol) * F.stddev_samp(F.col(lag_col_name)), + ) + + corr = sdf.select(corr_expr.alias("corr")).head()[0] + if corr is None or math.isnan(corr) or abs(corr) < 1e-12: + return np.nan + return corr + else: + corr = ( + sdf.withColumn(lag_col_name, lag_scol) + .select(F.corr(scol, F.col(lag_col_name))) + .head()[0] + ) + + return np.nan if corr is None else corr def corr( self, other: "Series", method: str = "pearson", min_periods: Optional[int] = None