diff --git a/doubleml/irm/irm.py b/doubleml/irm/irm.py index cb8e093f..500e41b8 100644 --- a/doubleml/irm/irm.py +++ b/doubleml/irm/irm.py @@ -674,3 +674,98 @@ def policy_tree(self, features, depth=2, **tree_params): model = DoubleMLPolicyTree(orth_signal, depth=depth, features=features, **tree_params).fit() return model + + def plot_overlap_common_support( + self, + idx_treatment: int = 0, + i_rep: int = 0, + bins=10, + density: bool = False, + palette: str = "colorblind", + ): + """Plot propensity score distributions and binned calibration curves. + + Visualizes the distribution of estimated propensity scores :math:`\\hat{m}_0(X) = \\hat{E}[D|X]` + split by treatment status using histograms, together with calibration curves comparing + predicted propensity scores against observed treatment fractions. + + Parameters + ---------- + idx_treatment : int + Index of the treatment variable (for multi-treatment settings). + Default is ``0``. + + i_rep : int + Index of the repetition to use for the propensity score predictions. + Default is ``0``. + + bins : int or array-like + Number of bins or explicit bin edges for the histograms and calibration curves. + Default is ``10``. + + density : bool + If ``True``, histogram heights are normalized to density. + Default is ``False``. + + palette : str or sequence + Seaborn palette name or explicit colors. + Default is ``"colorblind"``. + + Returns + ------- + fig : :class:`matplotlib.figure.Figure` + Matplotlib figure. + axes : :class:`numpy.ndarray` + 2x2 axes array. + + Raises + ------ + ValueError + If ``fit()`` has not been called or predictions are not stored. + """ + from doubleml.utils.plots import plot_propensity_score_calibration + + # Input validation + if self._framework is None: + raise ValueError("Apply fit() before plot_overlap_common_support().") + + if self.predictions is None: + raise ValueError( + "Predictions are not stored. Call fit() with store_predictions=True " + "before plot_overlap_common_support()." + ) + + if "ml_m" not in self.predictions: + raise ValueError( + "Propensity score predictions ('ml_m') are not available. " + "Ensure fit() was called with store_predictions=True." + ) + + if not isinstance(idx_treatment, int): + raise TypeError(f"idx_treatment must be an integer. Got {type(idx_treatment)}.") + if idx_treatment < 0 or idx_treatment >= self._dml_data.n_treat: + raise ValueError( + f"idx_treatment must be in [0, {self._dml_data.n_treat - 1}]. Got {idx_treatment}." + ) + + if not isinstance(i_rep, int): + raise TypeError(f"i_rep must be an integer. Got {type(i_rep)}.") + if i_rep < 0 or i_rep >= self.n_rep: + raise ValueError(f"i_rep must be in [0, {self.n_rep - 1}]. Got {i_rep}.") + + # Extract propensity scores and treatment indicator + ps_scores = self.predictions["ml_m"][:, i_rep, idx_treatment] + treatment = self._dml_data.d + + # Generate plot + fig, axes = plot_propensity_score_calibration( + propensity_score=ps_scores, + treatment=treatment, + bins=bins, + density=density, + palette=palette, + ) + + return fig, axes + + diff --git a/doubleml/utils/__init__.py b/doubleml/utils/__init__.py index 868429da..96098826 100644 --- a/doubleml/utils/__init__.py +++ b/doubleml/utils/__init__.py @@ -7,6 +7,7 @@ from .dummy_learners import DMLDummyClassifier, DMLDummyRegressor from .gain_statistics import gain_statistics from .global_learner import GlobalClassifier, GlobalRegressor +from .plots import plot_propensity_score_calibration from .policytree import DoubleMLPolicyTree from .propensity_score_processing import PSProcessor, PSProcessorConfig from .resampling import DoubleMLClusterResampling, DoubleMLResampling @@ -22,6 +23,7 @@ "gain_statistics", "GlobalClassifier", "GlobalRegressor", + "plot_propensity_score_calibration", "PSProcessor", "PSProcessorConfig", ] diff --git a/doubleml/utils/_plots.py b/doubleml/utils/_plots.py index 67b449b3..f21121f5 100644 --- a/doubleml/utils/_plots.py +++ b/doubleml/utils/_plots.py @@ -92,3 +92,6 @@ def _sensitivity_contour_plot( fig.update_yaxes(range=[0, np.max(y)]) return fig + + + diff --git a/doubleml/utils/plots.py b/doubleml/utils/plots.py new file mode 100644 index 00000000..75b31d85 --- /dev/null +++ b/doubleml/utils/plots.py @@ -0,0 +1,132 @@ +import matplotlib.pyplot as plt +import numpy as np +import seaborn as sns + + +def plot_propensity_score_calibration( + propensity_score, + treatment, + bins=10, + density=False, + palette="colorblind", +): + """ + Plot propensity score distributions and binned calibration curves. + + Parameters + ---------- + propensity_score : array-like + Predicted propensity scores of shape (n_samples,). + treatment : array-like + Binary treatment indicator of shape (n_samples,). + bins : int or array-like + Number of bins or explicit bin edges. + density : bool + If True, histogram heights are normalized. + palette : str or sequence + Seaborn palette name or explicit colors. + + Returns + ------- + fig : :class:`matplotlib.figure.Figure` + Matplotlib figure. + axes : :class:`numpy.ndarray` + 2x2 axes array. + """ + ps = np.asarray(propensity_score, dtype=float).reshape(-1) + tr = np.asarray(treatment).reshape(-1) + + if ps.shape != tr.shape: + raise ValueError("propensity_score and treatment must have the same shape.") + if ps.ndim != 1: + raise ValueError("propensity_score and treatment must be one-dimensional.") + if not np.isin(tr, [0, 1]).all(): + raise ValueError("treatment must be binary with values 0 and 1.") + if np.any((ps < 0) | (ps > 1)): + raise ValueError("propensity_score must lie in [0, 1].") + + tr = tr.astype(int) + + if isinstance(bins, int): + if bins < 2: + raise ValueError("bins must be at least 2.") + bins = np.linspace(0.0, 1.0, bins + 1) + else: + bins = np.asarray(bins, dtype=float) + if bins.ndim != 1 or len(bins) < 2: + raise ValueError("bins must contain at least two edges.") + if np.any(np.diff(bins) <= 0): + raise ValueError("bins must be strictly increasing.") + + x_min, x_max = float(bins[0]), float(bins[-1]) + centers = 0.5 * (bins[:-1] + bins[1:]) + widths = np.diff(bins) + + treated_frac = [] + control_frac = [] + for i in range(len(bins) - 1): + if i < len(bins) - 2: + mask = (ps >= bins[i]) & (ps < bins[i + 1]) + else: + mask = (ps >= bins[i]) & (ps <= bins[i + 1]) + if np.sum(mask) == 0: + treated_frac.append(np.nan) + control_frac.append(np.nan) + else: + p_treated = np.mean(tr[mask] == 1) + treated_frac.append(p_treated) + control_frac.append(1.0 - p_treated) + + colors = sns.color_palette(palette, n_colors=2) + fig, axes = plt.subplots(2, 2, figsize=(12, 10), gridspec_kw={"height_ratios": [2, 1]}) + + sns.histplot( + ps[tr == 1], + bins=bins, + stat="density" if density else "count", + kde=False, + color=colors[0], + ax=axes[0, 0], + label="Treated", + ) + axes[0, 0].set_title("Treated: Propensity Score Distribution") + axes[0, 0].set_xlim(x_min, x_max) + axes[0, 0].set_ylabel("Density" if density else "Count") + axes[0, 0].legend() + + sns.histplot( + ps[tr == 0], + bins=bins, + stat="density" if density else "count", + kde=False, + color=colors[1], + ax=axes[0, 1], + label="Control", + ) + axes[0, 1].set_title("Control: Propensity Score Distribution") + axes[0, 1].set_xlim(x_min, x_max) + axes[0, 1].set_ylabel("Density" if density else "Count") + axes[0, 1].legend() + + axes[1, 0].bar(centers, treated_frac, width=widths, color=colors[0], alpha=0.7) + axes[1, 0].plot([x_min, x_max], [x_min, x_max], "k--", label="Ideal calibration") + axes[1, 0].set_title("Treated: Calibration") + axes[1, 0].set_xlabel("Predicted propensity score") + axes[1, 0].set_ylabel("Observed treatment fraction") + axes[1, 0].set_xlim(x_min, x_max) + axes[1, 0].set_ylim(0, 1) + axes[1, 0].legend() + + axes[1, 1].bar(centers, control_frac, width=widths, color=colors[1], alpha=0.7) + axes[1, 1].plot([x_min, x_max], [1 - x_min, 1 - x_max], "k--", label="Ideal calibration") + axes[1, 1].set_title("Control: Calibration") + axes[1, 1].set_xlabel("Predicted propensity score") + axes[1, 1].set_ylabel("Observed control fraction") + axes[1, 1].set_xlim(x_min, x_max) + axes[1, 1].set_ylim(0, 1) + axes[1, 1].legend() + + fig.suptitle("Propensity Score Calibration") + plt.tight_layout() + + return fig, axes diff --git a/doubleml/utils/tests/test_propensity_score_calibration.py b/doubleml/utils/tests/test_propensity_score_calibration.py new file mode 100644 index 00000000..7680cc2d --- /dev/null +++ b/doubleml/utils/tests/test_propensity_score_calibration.py @@ -0,0 +1,165 @@ +import matplotlib +import matplotlib.pyplot as plt +import numpy as np +import pytest + +from doubleml.utils.plots import plot_propensity_score_calibration + + +@pytest.mark.ci +class TestInputValidation: + """Test input validation for plot_propensity_score_calibration.""" + + def test_shape_mismatch(self): + with pytest.raises(ValueError, match="same shape"): + plot_propensity_score_calibration(np.array([0.5, 0.3]), np.array([0, 1, 0])) + + def test_non_binary_treatment(self): + with pytest.raises(ValueError, match="binary with values 0 and 1"): + plot_propensity_score_calibration(np.array([0.5, 0.3, 0.7]), np.array([0, 1, 2])) + + def test_scores_below_zero(self): + with pytest.raises(ValueError, match="must lie in"): + plot_propensity_score_calibration(np.array([-0.1, 0.5]), np.array([0, 1])) + + def test_scores_above_one(self): + with pytest.raises(ValueError, match="must lie in"): + plot_propensity_score_calibration(np.array([0.5, 1.1]), np.array([0, 1])) + + def test_bins_too_few(self): + with pytest.raises(ValueError, match="bins must be at least 2"): + plot_propensity_score_calibration(np.array([0.5, 0.3]), np.array([0, 1]), bins=1) + + def test_bins_array_too_short(self): + with pytest.raises(ValueError, match="at least two edges"): + plot_propensity_score_calibration(np.array([0.5, 0.3]), np.array([0, 1]), bins=np.array([0.5])) + + def test_bins_not_increasing(self): + with pytest.raises(ValueError, match="strictly increasing"): + plot_propensity_score_calibration(np.array([0.5, 0.3]), np.array([0, 1]), bins=np.array([0.5, 0.3, 0.8])) + + +@pytest.mark.ci +class TestReturnType: + """Test return type and basic plot structure.""" + + def test_returns_figure_and_axes(self): + ps = np.array([0.1, 0.3, 0.5, 0.7, 0.9, 0.2, 0.4, 0.6, 0.8, 0.95]) + tr = np.array([0, 0, 0, 1, 1, 0, 0, 1, 1, 1]) + fig, axes = plot_propensity_score_calibration(ps, tr) + assert isinstance(fig, matplotlib.figure.Figure) + assert isinstance(axes, np.ndarray) + assert axes.shape == (2, 2) + plt.close(fig) + + def test_density_mode(self): + ps = np.array([0.1, 0.3, 0.5, 0.7, 0.9, 0.2, 0.4, 0.6, 0.8, 0.95]) + tr = np.array([0, 0, 0, 1, 1, 0, 0, 1, 1, 1]) + fig, axes = plot_propensity_score_calibration(ps, tr, density=True) + assert axes[0, 0].get_ylabel() == "Density" + plt.close(fig) + + def test_count_mode(self): + ps = np.array([0.1, 0.3, 0.5, 0.7, 0.9, 0.2, 0.4, 0.6, 0.8, 0.95]) + tr = np.array([0, 0, 0, 1, 1, 0, 0, 1, 1, 1]) + fig, axes = plot_propensity_score_calibration(ps, tr, density=False) + assert axes[0, 0].get_ylabel() == "Count" + plt.close(fig) + + +@pytest.mark.ci +class TestBinHandling: + """Test bin handling with int and array bins.""" + + def test_int_bins(self): + np.random.seed(42) + ps = np.random.uniform(0, 1, 100) + tr = (ps > 0.5).astype(int) + fig, axes = plot_propensity_score_calibration(ps, tr, bins=5) + assert isinstance(fig, matplotlib.figure.Figure) + plt.close(fig) + + def test_explicit_bins(self): + np.random.seed(42) + ps = np.random.uniform(0, 1, 100) + tr = (ps > 0.5).astype(int) + custom_bins = np.array([0.0, 0.25, 0.5, 0.75, 1.0]) + fig, axes = plot_propensity_score_calibration(ps, tr, bins=custom_bins) + assert isinstance(fig, matplotlib.figure.Figure) + plt.close(fig) + + +@pytest.mark.ci +class TestBoundaryValues: + """Test boundary values at 0 and 1.""" + + def test_all_scores_at_zero(self): + ps = np.zeros(10) + tr = np.array([0, 1, 0, 1, 0, 1, 0, 1, 0, 1]) + fig, axes = plot_propensity_score_calibration(ps, tr) + assert isinstance(fig, matplotlib.figure.Figure) + plt.close(fig) + + def test_all_scores_at_one(self): + ps = np.ones(10) + tr = np.array([0, 1, 0, 1, 0, 1, 0, 1, 0, 1]) + fig, axes = plot_propensity_score_calibration(ps, tr) + assert isinstance(fig, matplotlib.figure.Figure) + plt.close(fig) + + def test_scores_at_bin_edges(self): + ps = np.array([0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]) + tr = np.array([0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1]) + fig, axes = plot_propensity_score_calibration(ps, tr, bins=10) + assert isinstance(fig, matplotlib.figure.Figure) + plt.close(fig) + + +@pytest.mark.ci +class TestEmptyBinBehavior: + """Test empty-bin behavior (NaN in calibration).""" + + def test_empty_bins_do_not_crash(self): + # Only scores in [0.4, 0.6], so bins outside that range are empty + ps = np.array([0.45, 0.5, 0.55, 0.5, 0.45, 0.55]) + tr = np.array([0, 1, 1, 0, 0, 1]) + fig, axes = plot_propensity_score_calibration(ps, tr, bins=10) + assert isinstance(fig, matplotlib.figure.Figure) + plt.close(fig) + + +@pytest.mark.ci +class TestCalibrationContent: + """Test that calibration subplots have expected properties.""" + + def test_calibration_axes_labels(self): + np.random.seed(42) + ps = np.random.uniform(0, 1, 200) + tr = (np.random.uniform(0, 1, 200) < ps).astype(int) + fig, axes = plot_propensity_score_calibration(ps, tr) + + assert axes[1, 0].get_xlabel() == "Predicted propensity score" + assert axes[1, 0].get_ylabel() == "Observed treatment fraction" + assert axes[1, 1].get_xlabel() == "Predicted propensity score" + assert axes[1, 1].get_ylabel() == "Observed control fraction" + plt.close(fig) + + def test_titles(self): + np.random.seed(42) + ps = np.random.uniform(0, 1, 200) + tr = (np.random.uniform(0, 1, 200) < ps).astype(int) + fig, axes = plot_propensity_score_calibration(ps, tr) + + assert axes[0, 0].get_title() == "Treated: Propensity Score Distribution" + assert axes[0, 1].get_title() == "Control: Propensity Score Distribution" + assert axes[1, 0].get_title() == "Treated: Calibration" + assert axes[1, 1].get_title() == "Control: Calibration" + plt.close(fig) + + def test_suptitle(self): + np.random.seed(42) + ps = np.random.uniform(0, 1, 50) + tr = (np.random.uniform(0, 1, 50) < ps).astype(int) + fig, axes = plot_propensity_score_calibration(ps, tr) + assert fig._suptitle.get_text() == "Propensity Score Calibration" + plt.close(fig)