Source code for risksyn.generator

import json
import warnings
from pathlib import Path
from typing import Optional, Union

import numpy as np
import pandas as pd
from dpmm.pipelines import AIMPipeline
from dpmm.pipelines.base import GenerativePipeline

from risksyn.processing import (
    _apply_column_schema,
    _auto_categorize,
    _detect_column_schema,
    _numeric_cols_without_bounds,
)
from risksyn.risk import Risk, calibrate_parameters_to_risk, _epsilon_to_rho

# Empirically tested default from dpmm library
_DEFAULT_PROC_EPSILON = 0.1


[docs] class AIMGenerator: """Generate synthetic data with interpretable privacy risk guarantees. Uses the AIM (Adaptive and Iterative Mechanism) pipeline for synthesis. Parameters ---------- risk : Risk Risk specification defining the privacy guarantee. degree : int, default 2 Maximum degree of marginals used by AIM. max_model_size : int, default 80 Maximum model size parameter for AIM. compress : bool, default True Whether to use compression in AIM. proc_epsilon : float, default 0.1 Epsilon budget allocated to data preprocessing (domain estimation). Only used if domain bounds are not provided for numeric columns. n_jobs: int, default -1 Number of parallel jobs for AIM's internal graphical model. Examples -------- >>> from risksyn import Risk, AIMGenerator >>> risk = Risk.from_advantage(0.2) >>> gen = AIMGenerator(risk=risk, degree=3) >>> gen.fit(df, domain={"age": {"lower": 0, "upper": 100}}) >>> synthetic_df = gen.generate(count=1000) """
[docs] def __init__( self, risk: Risk, degree: int = 2, max_model_size: int = 80, compress: bool = True, proc_epsilon: float = _DEFAULT_PROC_EPSILON, n_jobs: int = -1, ): self._risk = risk self._degree = degree self._max_model_size = max_model_size self._compress = compress self._proc_epsilon = proc_epsilon self._n_jobs = n_jobs self._pipeline = None self._column_schema = None
[docs] def fit( self, data: pd.DataFrame, domain: Optional[dict] = None, unsafe_infer_bounds: bool = False, ) -> "AIMGenerator": """Fit the generator to the data. Parameters ---------- data : pd.DataFrame DataFrame to fit the model on. domain : dict, optional Domain specification for columns. For numeric columns, use ``{"col": {"lower": min, "upper": max}}``. For categorical columns, use ``{"col": {"categories": ["val1", "val2", ...]}}``. Required when privacy budget is low to avoid private domain estimation failures. unsafe_infer_bounds : bool, default False If True, infer domain bounds from data min/max for numeric columns that lack explicit bounds. This leaks information about the data and weakens the privacy guarantee. Returns ------- AIMGenerator Returns self for method chaining. Raises ------ ValueError If privacy budget is insufficient for the required preprocessing (when numeric columns lack domain bounds). Warns ----- UserWarning If privacy budget for generation is smaller than for processing, or if ``unsafe_infer_bounds`` is used. """ self._column_schema = _detect_column_schema(data) data, domain = _auto_categorize(data, domain) unbounded_cols = _numeric_cols_without_bounds(data, domain) if unbounded_cols and unsafe_infer_bounds: domain = dict(domain) if domain else {} for col in unbounded_cols: series = data[col] lower, upper = float(series.min()), float(series.max()) domain[col] = {"lower": lower, "upper": upper} warnings.warn( f"PrivacyLeakage: Inferring bounds for numeric column " f"'{col}' from data (lower={lower}, upper={upper}). " f"This leaks information and weakens the privacy guarantee. " f"Provide explicit bounds via the domain parameter to avoid this.", UserWarning, stacklevel=2, ) unbounded_cols = [] if unbounded_cols: proc_epsilon = self._proc_epsilon if proc_epsilon is None: raise ValueError( f"No domain bounds for numeric columns {unbounded_cols}. " "Provide domain bounds, set proc_epsilon > 0, or use " "unsafe_infer_bounds=True." ) params = calibrate_parameters_to_risk(self._risk, proc_epsilon=proc_epsilon) n_cols = len(unbounded_cols) eps_per_col = proc_epsilon / n_cols # Each column further splits its budget in half for bounds estimation eps_for_bounds = eps_per_col / 2 proc_rho = _epsilon_to_rho(proc_epsilon) gen_rho = self._risk.zcdp - proc_rho if gen_rho < proc_rho: warnings.warn( f"Privacy budget for generation ({gen_rho:.6f}) is smaller than for " f"processing ({proc_rho:.6f}). Consider providing domain bounds for " "numeric columns, relaxing the risk requirement, or decreasing proc_epsilon.", UserWarning, stacklevel=2, ) if eps_for_bounds < 0.05: warnings.warn( f"Epsilon budget for private bounds estimation is too small " f"(proc_epsilon={proc_epsilon} / {n_cols} columns / 2 = " f"{eps_for_bounds:.4f} per column). Private bounds estimation " f"will fail at this level. Provide explicit domain bounds for " f"columns {unbounded_cols}, e.g.:\n" f" domain={{'{unbounded_cols[0]}': {{'lower': <min>, 'upper': <max>}}}}", UserWarning, stacklevel=2, ) else: params = calibrate_parameters_to_risk(self._risk) self._pipeline = AIMPipeline( epsilon=params["epsilon"], delta=params["delta"], compress=self._compress, max_model_size=self._max_model_size, proc_epsilon=params.get("proc_epsilon"), n_jobs=self._n_jobs, gen_kwargs={"degree": self._degree}, ) self._pipeline.fit(data, domain) return self
[docs] def generate(self, count: int) -> pd.DataFrame: """Generate synthetic records. Parameters ---------- count : int Number of records to generate. Returns ------- pd.DataFrame DataFrame with synthetic data matching the schema of the fitted data. Raises ------ RuntimeError If ``fit()`` has not been called. """ if self._pipeline is None: raise RuntimeError("Must call fit() before generate()") synth = self._pipeline.generate(n_records=count) if self._column_schema: synth = _apply_column_schema(synth, self._column_schema) return synth
[docs] def store(self, path: Union[str, Path]) -> None: """Store the fitted generator to disk. Parameters ---------- path : str or Path Directory path to store the generator. Raises ------ RuntimeError If ``fit()`` has not been called. """ if self._pipeline is None: raise RuntimeError("Must call fit() before store()") path = Path(path) path.mkdir(parents=True, exist_ok=True) column_schema = {} if self._column_schema: for col, entry in self._column_schema.items(): col_entry = {"dtype": str(entry["dtype"])} if "precision" in entry: col_entry["precision"] = entry["precision"] column_schema[col] = col_entry metadata = { "risk_zcdp": self._risk.zcdp, "column_schema": column_schema, } with open(path / "metadata.json", "w") as f: json.dump(metadata, f, indent=2) pipeline_path = path / "pipeline" pipeline_path.mkdir(parents=True, exist_ok=True) self._pipeline.store(pipeline_path)
[docs] @classmethod def load(cls, path: Union[str, Path]) -> "AIMGenerator": """Load a fitted generator from disk. Parameters ---------- path : str or Path Directory path containing the stored generator. Returns ------- AIMGenerator The loaded generator, ready for generation. """ path = Path(path) with open(path / "metadata.json") as f: metadata = json.load(f) risk = Risk.from_zcdp(metadata["risk_zcdp"]) gen = cls(risk=risk) schema_raw = metadata.get("column_schema") if schema_raw: schema = {} for col, entry in schema_raw.items(): col_entry = {"dtype": np.dtype(entry["dtype"])} if "precision" in entry: col_entry["precision"] = entry["precision"] schema[col] = col_entry gen._column_schema = schema gen._pipeline = GenerativePipeline.load(path / "pipeline") return gen