Skip to content

Interface & Config

syntho_hive.interface.synthesizer.Synthesizer

Main entry point that wires metadata, privacy, and orchestration.

Source code in syntho_hive/interface/synthesizer.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
class Synthesizer:
    """Main entry point that wires metadata, privacy, and orchestration."""

    def __init__(
        self,
        metadata: Metadata,
        privacy_config: PrivacyConfig,
        spark_session: Optional[SparkSession] = None,
        model: Type[ConditionalGenerativeModel] = CTGAN,
        embedding_threshold: int = 50,
    ):
        """Instantiate the synthesizer façade.

        Args:
            metadata: Dataset schema and relational configuration.
            privacy_config: Privacy guardrail configuration.
            spark_session: Optional SparkSession required for orchestration.
            model: Generative model class to use for synthesis. Must be a class
                (not an instance) that implements ``ConditionalGenerativeModel``.
                The class constructor must accept ``(metadata, batch_size, epochs,
                **kwargs)`` and instances must implement ``fit()``, ``sample()``,
                ``save()``, and ``load()``.

                Supported classes:
                - ``syntho_hive.core.models.ctgan.CTGAN`` (default)
                - Any custom class implementing ``ConditionalGenerativeModel``

                Existing callers that omit this parameter receive CTGAN behavior
                unchanged.
            embedding_threshold: Cardinality threshold for switching to embeddings.
        """
        if not (
            isinstance(model, type) and issubclass(model, ConditionalGenerativeModel)
        ):
            raise TypeError(
                f"model_cls must be a subclass of ConditionalGenerativeModel, "
                f"got {model!r}. Implement fit(), sample(), save(), load() "
                f"and subclass ConditionalGenerativeModel."
            )

        self.metadata = metadata
        self.privacy = privacy_config
        self.spark = spark_session
        self.model_cls = model
        self.embedding_threshold = embedding_threshold

        # Initialize internal components
        if self.spark:
            self.orchestrator = StagedOrchestrator(
                metadata, self.spark, model_cls=self.model_cls
            )
        else:
            self.orchestrator = (
                None  # Mode without Spark (maybe local pandas only in future)
            )

    def fit(
        self,
        data: Any,  # Str (database name) or Dict[str, str] (table paths)
        sampling_strategy: str = "relational_stratified",
        sample_size: int = 5_000_000,
        validate: bool = False,
        epochs: int = 300,
        batch_size: int = 500,
        progress_bar: bool = True,
        checkpoint_interval: int = 10,
        checkpoint_dir: Optional[str] = None,
        **model_kwargs: Union[int, str, Tuple[int, int]],
    ):
        """Fit the generative models on the real database.

        Args:
            data: Database name (str) or mapping of {table: path} (dict).
            sampling_strategy: Strategy for sampling real data.
            sample_size: Number of rows to sample from real data (approx).
            validate: Whether to run validation after fitting.
            epochs: Number of training epochs for CTGAN.
            batch_size: Batch size for training.
            progress_bar: If True (default), display tqdm progress bar to stderr during training.
                Structured log events always emit regardless of this flag.
            checkpoint_interval: Save a validation checkpoint every N epochs. Default 10.
            checkpoint_dir: Optional directory to save best_checkpoint/ and final_checkpoint/
                during training.
            **model_kwargs: Additional args forwarded to the underlying model (e.g., embedding_dim).

        Raises:
            SchemaError: If the data argument is invalid.
            TrainingError: If training fails for any reason.
        """
        if sampling_strategy != "full":
            import warnings

            warnings.warn(
                f"sampling_strategy='{sampling_strategy}' is not yet implemented. "
                "Using full dataset. This will be supported in a future release.",
                UserWarning,
                stacklevel=2,
            )

        try:
            if validate:
                if (
                    isinstance(data, dict)
                    and data
                    and isinstance(next(iter(data.values())), pd.DataFrame)
                ):
                    # User passed actual DataFrames — data-level FK type checks are possible
                    self.metadata.validate_schema(real_data=data)
                else:
                    # String (DB name) or dict of path strings — structural checks only
                    self.metadata.validate_schema()

            if not self.orchestrator:
                raise ValueError("SparkSession required for fit()")

            if sample_size <= 0:
                raise ValueError("sample_size must be positive")

            print(
                f"Fitting on data source with {sampling_strategy} (target: {sample_size} rows)..."
            )
            print(f"Training Config: epochs={epochs}, batch_size={batch_size}")

            # Determine paths
            if isinstance(data, str):
                real_paths = {t: f"{data}.{t}" for t in self.metadata.tables}
            elif isinstance(data, dict):
                real_paths = data
            else:
                raise SchemaError(
                    f"fit() argument 'data' must be a database name (str) or path mapping (dict), "
                    f"got {type(data).__name__}."
                )

            self.orchestrator.fit_all(
                real_paths,
                epochs=epochs,
                batch_size=batch_size,
                progress_bar=progress_bar,
                checkpoint_interval=checkpoint_interval,
                checkpoint_dir=checkpoint_dir,
                **model_kwargs,
            )
        except SynthoHiveError:
            raise
        except Exception as exc:
            log.error("fit_failed", error=str(exc))
            raise TrainingError(f"fit() failed. Original error: {exc}") from exc

    def sample(
        self,
        num_rows: Dict[str, int],
        output_format: str = "delta",
        output_path: Optional[str] = None,
    ) -> Union[Dict[str, str], Dict[str, pd.DataFrame]]:
        """Generate synthetic data for each table.

        Args:
            num_rows: Mapping of table name to number of rows to generate.
            output_format: Storage format for generated datasets (default ``"delta"``).
            output_path: Optional path to write files. If None, returns DataFrames in memory.

        Raises:
            TrainingError: If generation fails for any reason.

        Returns:
            Mapping of table name to the output path (if wrote to disk) OR Dictionary of DataFrames (if in-memory).
        """
        try:
            if not self.orchestrator:
                raise ValueError("SparkSession required for sample()")

            print(f"Generating data with {self.model_cls.__name__} backend...")

            # If output_path is explicitly None, we return DataFrames
            if output_path is None:
                return self.orchestrator.generate(num_rows, output_path_base=None)

            output_base = output_path
            self.orchestrator.generate(num_rows, output_base)

            # Return paths mapping
            return {t: f"{output_base}/{t}" for t in self.metadata.tables}
        except SynthoHiveError:
            raise
        except Exception as exc:
            log.error("sample_failed", error=str(exc))
            raise TrainingError(f"sample() failed. Original error: {exc}") from exc

    def save(self, path: str) -> None:
        """Persist the synthesizer state to disk.

        Args:
            path: Filesystem path to write the synthesizer checkpoint to.

        Raises:
            SerializationError: If saving fails for any reason.
        """
        try:
            import joblib

            joblib.dump(self, path)
            log.info("synthesizer_saved", path=path)
        except SynthoHiveError:
            raise
        except Exception as exc:
            log.error("save_failed", path=path, error=str(exc))
            raise SerializationError(
                f"save() failed writing synthesizer to '{path}'. Original error: {exc}"
            ) from exc

    def __getstate__(self):
        """Exclude non-serializable attributes (SparkSession, IO) from pickling."""
        state = self.__dict__.copy()
        state.pop("spark", None)
        state.pop("io", None)
        return state

    def __setstate__(self, state):
        """Restore instance from pickled state; non-serializable attrs set to None."""
        self.__dict__.update(state)
        self.spark = None
        self.io = None

    @classmethod
    def load(cls, path: str) -> "Synthesizer":
        """Load a synthesizer from a previously saved checkpoint.

        Args:
            path: Filesystem path to the synthesizer checkpoint.

        Raises:
            SerializationError: If loading fails for any reason.

        Returns:
            Loaded Synthesizer instance.
        """
        try:
            import joblib

            instance = joblib.load(path)
            log.info("synthesizer_loaded", path=path)
            return instance
        except SynthoHiveError:
            raise
        except Exception as exc:
            log.error("load_failed", path=path, error=str(exc))
            raise SerializationError(
                f"load() failed reading synthesizer from '{path}'. Original error: {exc}"
            ) from exc

    def generate_validation_report(
        self,
        real_data: Dict[str, str],
        synthetic_data: Dict[str, str],
        output_path: str,
    ):
        """Generate a validation report comparing real vs synthetic datasets.

        Args:
            real_data: Map of table name to real dataset path/table.
            synthetic_data: Map of table name to generated dataset path.
            output_path: Filesystem path for the rendered report.

        Raises:
            SynthoHiveError: If the report generation fails for any reason.
        """
        try:
            if not self.spark:
                raise ValueError(
                    "SparkSession required for validation report generation"
                )

            print("Generating validation report...")
            report_gen = ValidationReport()

            real_dfs = {}
            synth_dfs = {}

            # 1. Load Real Data
            for table, path in real_data.items():
                print(f"Loading real data for {table} from {path}...")
                # Try reading as table first, then path
                try:
                    df = self.spark.read.table(path)
                except Exception as exc:
                    log.warning("delta_read_fallback_failed", error=str(exc))
                    raise SerializationError(
                        f"generate_validation_report() failed reading real data. "
                        f"Original error: {exc}"
                    ) from exc

                real_dfs[table] = df.toPandas()

            # 2. Load Synthetic Data
            for table, path in synthetic_data.items():
                print(f"Loading synthetic data for {table} from {path}...")
                df = self.spark.read.format("delta").load(path)
                synth_dfs[table] = df.toPandas()

            # 3. Generate Report
            report_gen.generate(real_dfs, synth_dfs, output_path)
        except SynthoHiveError:
            raise
        except Exception as exc:
            log.error(
                "generate_validation_report_failed",
                output_path=output_path,
                error=str(exc),
            )
            raise SynthoHiveError(
                f"generate_validation_report() failed. Original error: {exc}"
            ) from exc

    def save_to_hive(
        self, synthetic_data: Dict[str, str], target_db: str, overwrite: bool = True
    ):
        """Register generated datasets as Hive tables.

        Args:
            synthetic_data: Map of table name to generated dataset path.
            target_db: Hive database where tables should be registered.
            overwrite: Whether to drop and recreate existing tables.

        Raises:
            ValueError: If Spark is unavailable.
        """
        if not self.spark:
            raise ValueError("SparkSession required for Hive registration")

        # Validate database name against allowlist before any SQL interpolation.
        # Raises SchemaError immediately — no Spark context touched for invalid names.
        if not _SAFE_IDENTIFIER.match(target_db):
            raise SchemaError(
                f"SchemaError: Database name '{target_db}' contains invalid characters. "
                f"Only letters, digits, and underscores [a-zA-Z0-9_] are allowed. "
                f"This validation prevents SQL injection via unsanitized user input."
            )

        # Validate table names from synthetic_data keys
        for table_name in synthetic_data:
            if not _SAFE_IDENTIFIER.match(str(table_name)):
                raise SchemaError(
                    f"SchemaError: Table name '{table_name}' contains invalid characters. "
                    f"Only letters, digits, and underscores [a-zA-Z0-9_] are allowed."
                )

        # Validate paths from synthetic_data values
        for table_name, path in synthetic_data.items():
            if "'" in str(path):
                raise ValueError(
                    f"Path for table '{table_name}' contains invalid characters: {path}"
                )

        print(f"Save to Hive database: {target_db}")

        # Ensure DB exists
        self.spark.sql(f"CREATE DATABASE IF NOT EXISTS {target_db}")

        for table, path in synthetic_data.items():
            full_table_name = f"{target_db}.{table}"
            print(f"Registering table {full_table_name} at {path}")

            if overwrite:
                self.spark.sql(f"DROP TABLE IF EXISTS {full_table_name}")

            # Register External Table
            self.spark.sql(
                f"CREATE TABLE {full_table_name} USING DELTA LOCATION '{path}'"
            )

fit

fit(data: Any, sampling_strategy: str = 'relational_stratified', sample_size: int = 5000000, validate: bool = False, epochs: int = 300, batch_size: int = 500, progress_bar: bool = True, checkpoint_interval: int = 10, checkpoint_dir: Optional[str] = None, **model_kwargs: Union[int, str, Tuple[int, int]])

Fit the generative models on the real database.

Parameters:

Name Type Description Default
data Any

Database name (str) or mapping of {table: path} (dict).

required
sampling_strategy str

Strategy for sampling real data.

'relational_stratified'
sample_size int

Number of rows to sample from real data (approx).

5000000
validate bool

Whether to run validation after fitting.

False
epochs int

Number of training epochs for CTGAN.

300
batch_size int

Batch size for training.

500
progress_bar bool

If True (default), display tqdm progress bar to stderr during training. Structured log events always emit regardless of this flag.

True
checkpoint_interval int

Save a validation checkpoint every N epochs. Default 10.

10
checkpoint_dir Optional[str]

Optional directory to save best_checkpoint/ and final_checkpoint/ during training.

None
**model_kwargs Union[int, str, Tuple[int, int]]

Additional args forwarded to the underlying model (e.g., embedding_dim).

{}

Raises:

Type Description
SchemaError

If the data argument is invalid.

TrainingError

If training fails for any reason.

Source code in syntho_hive/interface/synthesizer.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def fit(
    self,
    data: Any,  # Str (database name) or Dict[str, str] (table paths)
    sampling_strategy: str = "relational_stratified",
    sample_size: int = 5_000_000,
    validate: bool = False,
    epochs: int = 300,
    batch_size: int = 500,
    progress_bar: bool = True,
    checkpoint_interval: int = 10,
    checkpoint_dir: Optional[str] = None,
    **model_kwargs: Union[int, str, Tuple[int, int]],
):
    """Fit the generative models on the real database.

    Args:
        data: Database name (str) or mapping of {table: path} (dict).
        sampling_strategy: Strategy for sampling real data.
        sample_size: Number of rows to sample from real data (approx).
        validate: Whether to run validation after fitting.
        epochs: Number of training epochs for CTGAN.
        batch_size: Batch size for training.
        progress_bar: If True (default), display tqdm progress bar to stderr during training.
            Structured log events always emit regardless of this flag.
        checkpoint_interval: Save a validation checkpoint every N epochs. Default 10.
        checkpoint_dir: Optional directory to save best_checkpoint/ and final_checkpoint/
            during training.
        **model_kwargs: Additional args forwarded to the underlying model (e.g., embedding_dim).

    Raises:
        SchemaError: If the data argument is invalid.
        TrainingError: If training fails for any reason.
    """
    if sampling_strategy != "full":
        import warnings

        warnings.warn(
            f"sampling_strategy='{sampling_strategy}' is not yet implemented. "
            "Using full dataset. This will be supported in a future release.",
            UserWarning,
            stacklevel=2,
        )

    try:
        if validate:
            if (
                isinstance(data, dict)
                and data
                and isinstance(next(iter(data.values())), pd.DataFrame)
            ):
                # User passed actual DataFrames — data-level FK type checks are possible
                self.metadata.validate_schema(real_data=data)
            else:
                # String (DB name) or dict of path strings — structural checks only
                self.metadata.validate_schema()

        if not self.orchestrator:
            raise ValueError("SparkSession required for fit()")

        if sample_size <= 0:
            raise ValueError("sample_size must be positive")

        print(
            f"Fitting on data source with {sampling_strategy} (target: {sample_size} rows)..."
        )
        print(f"Training Config: epochs={epochs}, batch_size={batch_size}")

        # Determine paths
        if isinstance(data, str):
            real_paths = {t: f"{data}.{t}" for t in self.metadata.tables}
        elif isinstance(data, dict):
            real_paths = data
        else:
            raise SchemaError(
                f"fit() argument 'data' must be a database name (str) or path mapping (dict), "
                f"got {type(data).__name__}."
            )

        self.orchestrator.fit_all(
            real_paths,
            epochs=epochs,
            batch_size=batch_size,
            progress_bar=progress_bar,
            checkpoint_interval=checkpoint_interval,
            checkpoint_dir=checkpoint_dir,
            **model_kwargs,
        )
    except SynthoHiveError:
        raise
    except Exception as exc:
        log.error("fit_failed", error=str(exc))
        raise TrainingError(f"fit() failed. Original error: {exc}") from exc

generate_validation_report

generate_validation_report(real_data: Dict[str, str], synthetic_data: Dict[str, str], output_path: str)

Generate a validation report comparing real vs synthetic datasets.

Parameters:

Name Type Description Default
real_data Dict[str, str]

Map of table name to real dataset path/table.

required
synthetic_data Dict[str, str]

Map of table name to generated dataset path.

required
output_path str

Filesystem path for the rendered report.

required

Raises:

Type Description
SynthoHiveError

If the report generation fails for any reason.

Source code in syntho_hive/interface/synthesizer.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def generate_validation_report(
    self,
    real_data: Dict[str, str],
    synthetic_data: Dict[str, str],
    output_path: str,
):
    """Generate a validation report comparing real vs synthetic datasets.

    Args:
        real_data: Map of table name to real dataset path/table.
        synthetic_data: Map of table name to generated dataset path.
        output_path: Filesystem path for the rendered report.

    Raises:
        SynthoHiveError: If the report generation fails for any reason.
    """
    try:
        if not self.spark:
            raise ValueError(
                "SparkSession required for validation report generation"
            )

        print("Generating validation report...")
        report_gen = ValidationReport()

        real_dfs = {}
        synth_dfs = {}

        # 1. Load Real Data
        for table, path in real_data.items():
            print(f"Loading real data for {table} from {path}...")
            # Try reading as table first, then path
            try:
                df = self.spark.read.table(path)
            except Exception as exc:
                log.warning("delta_read_fallback_failed", error=str(exc))
                raise SerializationError(
                    f"generate_validation_report() failed reading real data. "
                    f"Original error: {exc}"
                ) from exc

            real_dfs[table] = df.toPandas()

        # 2. Load Synthetic Data
        for table, path in synthetic_data.items():
            print(f"Loading synthetic data for {table} from {path}...")
            df = self.spark.read.format("delta").load(path)
            synth_dfs[table] = df.toPandas()

        # 3. Generate Report
        report_gen.generate(real_dfs, synth_dfs, output_path)
    except SynthoHiveError:
        raise
    except Exception as exc:
        log.error(
            "generate_validation_report_failed",
            output_path=output_path,
            error=str(exc),
        )
        raise SynthoHiveError(
            f"generate_validation_report() failed. Original error: {exc}"
        ) from exc

load classmethod

load(path: str) -> Synthesizer

Load a synthesizer from a previously saved checkpoint.

Parameters:

Name Type Description Default
path str

Filesystem path to the synthesizer checkpoint.

required

Raises:

Type Description
SerializationError

If loading fails for any reason.

Returns:

Type Description
Synthesizer

Loaded Synthesizer instance.

Source code in syntho_hive/interface/synthesizer.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
@classmethod
def load(cls, path: str) -> "Synthesizer":
    """Load a synthesizer from a previously saved checkpoint.

    Args:
        path: Filesystem path to the synthesizer checkpoint.

    Raises:
        SerializationError: If loading fails for any reason.

    Returns:
        Loaded Synthesizer instance.
    """
    try:
        import joblib

        instance = joblib.load(path)
        log.info("synthesizer_loaded", path=path)
        return instance
    except SynthoHiveError:
        raise
    except Exception as exc:
        log.error("load_failed", path=path, error=str(exc))
        raise SerializationError(
            f"load() failed reading synthesizer from '{path}'. Original error: {exc}"
        ) from exc

sample

sample(num_rows: Dict[str, int], output_format: str = 'delta', output_path: Optional[str] = None) -> Union[Dict[str, str], Dict[str, pd.DataFrame]]

Generate synthetic data for each table.

Parameters:

Name Type Description Default
num_rows Dict[str, int]

Mapping of table name to number of rows to generate.

required
output_format str

Storage format for generated datasets (default "delta").

'delta'
output_path Optional[str]

Optional path to write files. If None, returns DataFrames in memory.

None

Raises:

Type Description
TrainingError

If generation fails for any reason.

Returns:

Type Description
Union[Dict[str, str], Dict[str, DataFrame]]

Mapping of table name to the output path (if wrote to disk) OR Dictionary of DataFrames (if in-memory).

Source code in syntho_hive/interface/synthesizer.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def sample(
    self,
    num_rows: Dict[str, int],
    output_format: str = "delta",
    output_path: Optional[str] = None,
) -> Union[Dict[str, str], Dict[str, pd.DataFrame]]:
    """Generate synthetic data for each table.

    Args:
        num_rows: Mapping of table name to number of rows to generate.
        output_format: Storage format for generated datasets (default ``"delta"``).
        output_path: Optional path to write files. If None, returns DataFrames in memory.

    Raises:
        TrainingError: If generation fails for any reason.

    Returns:
        Mapping of table name to the output path (if wrote to disk) OR Dictionary of DataFrames (if in-memory).
    """
    try:
        if not self.orchestrator:
            raise ValueError("SparkSession required for sample()")

        print(f"Generating data with {self.model_cls.__name__} backend...")

        # If output_path is explicitly None, we return DataFrames
        if output_path is None:
            return self.orchestrator.generate(num_rows, output_path_base=None)

        output_base = output_path
        self.orchestrator.generate(num_rows, output_base)

        # Return paths mapping
        return {t: f"{output_base}/{t}" for t in self.metadata.tables}
    except SynthoHiveError:
        raise
    except Exception as exc:
        log.error("sample_failed", error=str(exc))
        raise TrainingError(f"sample() failed. Original error: {exc}") from exc

save

save(path: str) -> None

Persist the synthesizer state to disk.

Parameters:

Name Type Description Default
path str

Filesystem path to write the synthesizer checkpoint to.

required

Raises:

Type Description
SerializationError

If saving fails for any reason.

Source code in syntho_hive/interface/synthesizer.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def save(self, path: str) -> None:
    """Persist the synthesizer state to disk.

    Args:
        path: Filesystem path to write the synthesizer checkpoint to.

    Raises:
        SerializationError: If saving fails for any reason.
    """
    try:
        import joblib

        joblib.dump(self, path)
        log.info("synthesizer_saved", path=path)
    except SynthoHiveError:
        raise
    except Exception as exc:
        log.error("save_failed", path=path, error=str(exc))
        raise SerializationError(
            f"save() failed writing synthesizer to '{path}'. Original error: {exc}"
        ) from exc

save_to_hive

save_to_hive(synthetic_data: Dict[str, str], target_db: str, overwrite: bool = True)

Register generated datasets as Hive tables.

Parameters:

Name Type Description Default
synthetic_data Dict[str, str]

Map of table name to generated dataset path.

required
target_db str

Hive database where tables should be registered.

required
overwrite bool

Whether to drop and recreate existing tables.

True

Raises:

Type Description
ValueError

If Spark is unavailable.

Source code in syntho_hive/interface/synthesizer.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def save_to_hive(
    self, synthetic_data: Dict[str, str], target_db: str, overwrite: bool = True
):
    """Register generated datasets as Hive tables.

    Args:
        synthetic_data: Map of table name to generated dataset path.
        target_db: Hive database where tables should be registered.
        overwrite: Whether to drop and recreate existing tables.

    Raises:
        ValueError: If Spark is unavailable.
    """
    if not self.spark:
        raise ValueError("SparkSession required for Hive registration")

    # Validate database name against allowlist before any SQL interpolation.
    # Raises SchemaError immediately — no Spark context touched for invalid names.
    if not _SAFE_IDENTIFIER.match(target_db):
        raise SchemaError(
            f"SchemaError: Database name '{target_db}' contains invalid characters. "
            f"Only letters, digits, and underscores [a-zA-Z0-9_] are allowed. "
            f"This validation prevents SQL injection via unsanitized user input."
        )

    # Validate table names from synthetic_data keys
    for table_name in synthetic_data:
        if not _SAFE_IDENTIFIER.match(str(table_name)):
            raise SchemaError(
                f"SchemaError: Table name '{table_name}' contains invalid characters. "
                f"Only letters, digits, and underscores [a-zA-Z0-9_] are allowed."
            )

    # Validate paths from synthetic_data values
    for table_name, path in synthetic_data.items():
        if "'" in str(path):
            raise ValueError(
                f"Path for table '{table_name}' contains invalid characters: {path}"
            )

    print(f"Save to Hive database: {target_db}")

    # Ensure DB exists
    self.spark.sql(f"CREATE DATABASE IF NOT EXISTS {target_db}")

    for table, path in synthetic_data.items():
        full_table_name = f"{target_db}.{table}"
        print(f"Registering table {full_table_name} at {path}")

        if overwrite:
            self.spark.sql(f"DROP TABLE IF EXISTS {full_table_name}")

        # Register External Table
        self.spark.sql(
            f"CREATE TABLE {full_table_name} USING DELTA LOCATION '{path}'"
        )

syntho_hive.interface.config.Metadata

Bases: BaseModel

Schema definition for the entire dataset.

Source code in syntho_hive/interface/config.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class Metadata(BaseModel):
    """Schema definition for the entire dataset."""

    tables: Dict[str, TableConfig] = Field(default_factory=dict)

    def add_table(
        self,
        name: str,
        pk: str,
        **kwargs: Union[List[str], Dict[str, str], Dict[str, Constraint]],
    ):
        """Register a table configuration.

        Args:
            name: Table name.
            pk: Primary key column name.
            **kwargs: Additional fields to populate ``TableConfig``.

        Raises:
            SchemaError: If a table with the same name already exists.
        """
        if name in self.tables:
            raise SchemaError(f"Table '{name}' already exists in metadata.")
        self.tables[name] = TableConfig(name=name, pk=pk, **kwargs)

    def get_table(self, name: str) -> Optional[TableConfig]:
        """Fetch a table configuration by name.

        Args:
            name: Table name to retrieve.

        Returns:
            Corresponding ``TableConfig`` or ``None`` if missing.
        """
        return self.tables.get(name)

    def validate_schema(
        self, real_data: Optional[Dict[str, "pd.DataFrame"]] = None
    ) -> None:
        """Validate schema integrity, focusing on foreign key references.

        Collects all errors before raising so callers see the complete problem
        list in a single exception.

        Args:
            real_data: Optional mapping of table name to DataFrame. When provided,
                FK type compatibility and column existence checks are performed in
                addition to structural (table-existence, FK-format) checks.

        Raises:
            SchemaValidationError: When one or more FK references are malformed,
                target a missing table, have type mismatches, or reference missing
                columns. The exception message lists all detected problems.
        """
        errors: List[str] = []

        for table_name, table_config in self.tables.items():
            for local_col, parent_ref in table_config.fk.items():
                if "." not in parent_ref:
                    errors.append(
                        f"Invalid FK reference '{parent_ref}' in table '{table_name}'."
                        f" Format should be 'parent_table.parent_col'."
                    )
                    continue

                parent_table, parent_col = parent_ref.split(".", 1)

                if parent_table not in self.tables:
                    errors.append(
                        f"Table '{table_name}' references non-existent parent table '{parent_table}'."
                    )
                    continue

                # Optional: data-level type and column checks.
                if real_data is not None:
                    if table_name not in real_data or parent_table not in real_data:
                        # Skip type check when data is only partially provided.
                        continue

                    child_df = real_data[table_name]
                    parent_df = real_data[parent_table]

                    if local_col not in child_df.columns:
                        errors.append(
                            f"FK column '{local_col}' missing from table '{table_name}'."
                            f" Add column '{local_col}' to child table '{table_name}'."
                        )
                    elif parent_col not in parent_df.columns:
                        errors.append(
                            f"Parent PK column '{parent_col}' missing from table '{parent_table}'."
                        )
                    else:
                        child_dtype = str(child_df[local_col].dtype)
                        parent_dtype = str(parent_df[parent_col].dtype)
                        if not _dtypes_compatible(child_dtype, parent_dtype):
                            errors.append(
                                f"FK type mismatch: '{table_name}.{local_col}' is {child_dtype}"
                                f" but '{parent_table}.{parent_col}' is {parent_dtype}."
                                f" Fix: cast '{table_name}.{local_col}' to {parent_dtype}"
                                f" or cast '{parent_table}.{parent_col}' to {child_dtype}."
                            )

        if errors:
            raise SchemaValidationError("\n".join(errors))

add_table

add_table(name: str, pk: str, **kwargs: Union[List[str], Dict[str, str], Dict[str, Constraint]])

Register a table configuration.

Parameters:

Name Type Description Default
name str

Table name.

required
pk str

Primary key column name.

required
**kwargs Union[List[str], Dict[str, str], Dict[str, Constraint]]

Additional fields to populate TableConfig.

{}

Raises:

Type Description
SchemaError

If a table with the same name already exists.

Source code in syntho_hive/interface/config.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def add_table(
    self,
    name: str,
    pk: str,
    **kwargs: Union[List[str], Dict[str, str], Dict[str, Constraint]],
):
    """Register a table configuration.

    Args:
        name: Table name.
        pk: Primary key column name.
        **kwargs: Additional fields to populate ``TableConfig``.

    Raises:
        SchemaError: If a table with the same name already exists.
    """
    if name in self.tables:
        raise SchemaError(f"Table '{name}' already exists in metadata.")
    self.tables[name] = TableConfig(name=name, pk=pk, **kwargs)

get_table

get_table(name: str) -> Optional[TableConfig]

Fetch a table configuration by name.

Parameters:

Name Type Description Default
name str

Table name to retrieve.

required

Returns:

Type Description
Optional[TableConfig]

Corresponding TableConfig or None if missing.

Source code in syntho_hive/interface/config.py
112
113
114
115
116
117
118
119
120
121
def get_table(self, name: str) -> Optional[TableConfig]:
    """Fetch a table configuration by name.

    Args:
        name: Table name to retrieve.

    Returns:
        Corresponding ``TableConfig`` or ``None`` if missing.
    """
    return self.tables.get(name)

validate_schema

validate_schema(real_data: Optional[Dict[str, DataFrame]] = None) -> None

Validate schema integrity, focusing on foreign key references.

Collects all errors before raising so callers see the complete problem list in a single exception.

Parameters:

Name Type Description Default
real_data Optional[Dict[str, DataFrame]]

Optional mapping of table name to DataFrame. When provided, FK type compatibility and column existence checks are performed in addition to structural (table-existence, FK-format) checks.

None

Raises:

Type Description
SchemaValidationError

When one or more FK references are malformed, target a missing table, have type mismatches, or reference missing columns. The exception message lists all detected problems.

Source code in syntho_hive/interface/config.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def validate_schema(
    self, real_data: Optional[Dict[str, "pd.DataFrame"]] = None
) -> None:
    """Validate schema integrity, focusing on foreign key references.

    Collects all errors before raising so callers see the complete problem
    list in a single exception.

    Args:
        real_data: Optional mapping of table name to DataFrame. When provided,
            FK type compatibility and column existence checks are performed in
            addition to structural (table-existence, FK-format) checks.

    Raises:
        SchemaValidationError: When one or more FK references are malformed,
            target a missing table, have type mismatches, or reference missing
            columns. The exception message lists all detected problems.
    """
    errors: List[str] = []

    for table_name, table_config in self.tables.items():
        for local_col, parent_ref in table_config.fk.items():
            if "." not in parent_ref:
                errors.append(
                    f"Invalid FK reference '{parent_ref}' in table '{table_name}'."
                    f" Format should be 'parent_table.parent_col'."
                )
                continue

            parent_table, parent_col = parent_ref.split(".", 1)

            if parent_table not in self.tables:
                errors.append(
                    f"Table '{table_name}' references non-existent parent table '{parent_table}'."
                )
                continue

            # Optional: data-level type and column checks.
            if real_data is not None:
                if table_name not in real_data or parent_table not in real_data:
                    # Skip type check when data is only partially provided.
                    continue

                child_df = real_data[table_name]
                parent_df = real_data[parent_table]

                if local_col not in child_df.columns:
                    errors.append(
                        f"FK column '{local_col}' missing from table '{table_name}'."
                        f" Add column '{local_col}' to child table '{table_name}'."
                    )
                elif parent_col not in parent_df.columns:
                    errors.append(
                        f"Parent PK column '{parent_col}' missing from table '{parent_table}'."
                    )
                else:
                    child_dtype = str(child_df[local_col].dtype)
                    parent_dtype = str(parent_df[parent_col].dtype)
                    if not _dtypes_compatible(child_dtype, parent_dtype):
                        errors.append(
                            f"FK type mismatch: '{table_name}.{local_col}' is {child_dtype}"
                            f" but '{parent_table}.{parent_col}' is {parent_dtype}."
                            f" Fix: cast '{table_name}.{local_col}' to {parent_dtype}"
                            f" or cast '{parent_table}.{parent_col}' to {child_dtype}."
                        )

    if errors:
        raise SchemaValidationError("\n".join(errors))

syntho_hive.interface.config.TableConfig

Bases: BaseModel

Configuration for a single table, including keys and constraints.

Source code in syntho_hive/interface/config.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class TableConfig(BaseModel):
    """Configuration for a single table, including keys and constraints."""

    name: str
    pk: str
    pii_cols: List[str] = Field(default_factory=list)
    high_cardinality_cols: List[str] = Field(default_factory=list)
    fk: Dict[str, str] = Field(
        default_factory=dict, description="Map of local_col -> parent_table.parent_col"
    )
    parent_context_cols: List[str] = Field(
        default_factory=list,
        description="List of parent attributes to condition on (e.g., 'users.region')",
    )
    constraints: Dict[str, Constraint] = Field(
        default_factory=dict, description="Map of col_name -> Constraint"
    )
    linkage_method: Literal["empirical", "negbinom"] = "empirical"

    @property
    def has_dependencies(self) -> bool:
        """Whether the table declares any foreign key dependencies."""
        return bool(self.fk)

has_dependencies property

has_dependencies: bool

Whether the table declares any foreign key dependencies.

syntho_hive.interface.config.PrivacyConfig

Bases: BaseModel

Configuration for privacy guardrails applied during synthesis.

Source code in syntho_hive/interface/config.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class PrivacyConfig(BaseModel):
    """Configuration for privacy guardrails applied during synthesis."""

    enable_differential_privacy: bool = False
    epsilon: float = 1.0
    pii_strategy: Literal["mask", "faker", "context_aware_faker"] = (
        "context_aware_faker"
    )
    k_anonymity_threshold: int = 5
    pii_columns: List[str] = Field(default_factory=list)

    @field_validator("epsilon")
    @classmethod
    def validate_epsilon(cls, v: float) -> float:
        if v <= 0:
            raise ValueError("epsilon must be positive")
        return v

syntho_hive.validation.report_generator.ValidationReport

Generate summary reports of validation metrics.

Source code in syntho_hive/validation/report_generator.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
class ValidationReport:
    """Generate summary reports of validation metrics."""

    def __init__(self):
        """Initialize statistical validator and metric store."""
        self.validator = StatisticalValidator()
        self.metrics = {}

    def _calculate_detailed_stats(self, real_df: pd.DataFrame, synth_df: pd.DataFrame) -> Dict[str, Any]:
        """Calculate descriptive statistics for side-by-side comparison.

        Args:
            real_df: Real dataframe.
            synth_df: Synthetic dataframe aligned to the real columns.

        Returns:
            Nested dict of summary stats for each column.
        """
        stats = {}
        for col in real_df.columns:
            if col not in synth_df.columns:
                continue

            col_stats = {"real": {}, "synth": {}}

            for name, df, res in [("real", real_df, col_stats["real"]), ("synth", synth_df, col_stats["synth"])]:
                series = df[col]
                if pd.api.types.is_numeric_dtype(series):
                    res["mean"] = series.mean()
                    res["std"] = series.std()
                    res["min"] = series.min()
                    res["max"] = series.max()
                else:
                    res["unique_count"] = series.nunique()
                    res["top_value"] = series.mode().iloc[0] if not series.mode().empty else "N/A"
                    res["top_freq"] = series.value_counts().iloc[0] if not series.empty else 0

            stats[col] = col_stats
        return stats

    def generate(self, real_data: Dict[str, pd.DataFrame], synth_data: Dict[str, pd.DataFrame], output_path: str):
        """Run validation and save a report.

        Args:
            real_data: Mapping of table name to real dataframe.
            synth_data: Mapping of table name to synthetic dataframe.
            output_path: Destination path for HTML or JSON report.
        """
        report = {
            "tables": {},
            "summary": "Validation Report"
        }

        for table_name, real_df in real_data.items():
            if table_name not in synth_data:
                continue

            synth_df = synth_data[table_name]

            # 1. Column comparisons
            col_metrics = self.validator.compare_columns(real_df, synth_df)

            # 2. Correlation
            corr_diff = self.validator.check_correlations(real_df, synth_df)

            # 3. Detailed Stats
            stats = self._calculate_detailed_stats(real_df, synth_df)

            # 4. Data Preview
            # Use Pandas to_html for easy formatting, strict constraints
            preview = {
                "real_html": real_df.head(10).to_html(index=False, classes='scroll-table', border=0),
                "synth_html": synth_df.head(10).to_html(index=False, classes='scroll-table', border=0)
            }

            report["tables"][table_name] = {
                "column_metrics": col_metrics,
                "correlation_distance": corr_diff,
                "detailed_stats": stats,
                "preview": preview
            }

        if output_path.endswith(".html"):
            self._save_html(report, output_path)
        else:
            # Save to JSON for now (PDF requires more deps)
            with open(output_path, "w") as f:
                json.dump(report, f, indent=2, default=str)

        import os
        print(f"Report saved to {os.path.abspath(output_path)}")

    def _save_html(self, report: Dict[str, Any], output_path: str):
        """Render a rich HTML report with metric explanations, stats, and previews.

        Args:
            report: Structured report dictionary produced by ``generate``.
            output_path: Filesystem path to write the HTML file.
        """
        html_content = [
            """<html>
            <head>
                <style>
                    body { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; margin: 20px; background-color: #f9f9f9; color: #333; }
                    h1, h2, h3 { color: #2c3e50; }
                    .container { max-width: 1200px; margin: 0 auto; background: white; padding: 30px; border-radius: 8px; box-shadow: 0 2px 5px rgba(0,0,0,0.1); }

                    /* Tables */
                    table { border-collapse: collapse; width: 100%; margin-bottom: 20px; font-size: 14px; }
                    th, td { border: 1px solid #e1e4e8; padding: 10px; text-align: left; }
                    th { background-color: #f1f8ff; color: #0366d6; font-weight: 600; }
                    tr:nth-child(even) { background-color: #f8f9fa; }

                    /* Status Colors */
                    .pass { color: #28a745; font-weight: bold; }
                    .fail { color: #dc3545; font-weight: bold; }

                    /* Layout */
                    .section { margin-top: 40px; border-top: 1px solid #eee; padding-top: 20px; }
                    .metric-box { background: #f0f4f8; padding: 15px; border-radius: 5px; margin-bottom: 20px; border-left: 5px solid #0366d6; }
                    .row { display: flex; gap: 20px; }
                    .col { flex: 1; overflow-x: auto; }

                    /* Tabs/Previews */
                    .preview-header { font-weight: bold; margin-bottom: 10px; color: #555; }
                    .scroll-table { max-height: 400px; overflow-y: auto; display: block; }
                </style>
            </head>
            <body>
            <div class="container">
                <h1>Validation Report</h1>

                <div class="metric-box">
                    <h3>Metric Explanations</h3>
                    <ul>
                        <li><strong>KS Test (Kolmogorov-Smirnov):</strong> Used for continuous numerical columns. Compares the cumulative distribution functions of the real and synthetic data. <br>
                            <em>Result:</em> Returns a p-value. If p > 0.05, we fail to reject the null hypothesis (i.e., distributions are likely the same).</li>
                        <li><strong>TVD (Total Variation Distance):</strong> Used for categorical or discrete columns. Measures the maximum difference between probabilities assigned to the same event by two distributions. <br>
                            <em>Result:</em> Value between 0 and 1. Lower is better (0 means identical). We consider < 0.1 as passing.</li>
                        <li><strong>Correlation Distance:</strong> Measures how well the pairwise correlations between numerical columns are preserved. Calculated as the Frobenius norm of the difference between correlation matrices. <br>
                            <em>Result:</em> Lower is better (0 means identical correlation structure).</li>
                    </ul>
                </div>
            """]

        for table_name, data in report["tables"].items():
            html_content.append(f"<div class='section'><h2>Table: {table_name}</h2>")

            # --- 1. Correlation & Overall ---
            corr_dist = data.get('correlation_distance', 0.0)
            html_content.append(f"<p><strong>Correlation Distance:</strong> {corr_dist:.4f}</p>")

            # --- 2. Column Metrics ---
            html_content.append("<h3>Column Validation Metrics</h3>")
            html_content.append("<table><tr><th>Column</th><th>Test Type</th><th>Statistic</th><th>P-Value / Score</th><th>Status</th></tr>")

            for col, metrics in data["column_metrics"].items():
                if "error" in metrics:
                    html_content.append(f"<tr><td>{col}</td><td colspan='4' class='fail'>Error: {metrics['error']}</td></tr>")
                    continue

                status = "PASS" if metrics.get("passed", False) else "FAIL"
                cls = "pass" if status == "PASS" else "fail"

                stat = f"{metrics.get('statistic', 0):.4f}"
                # TVD doesn't have a p-value, KS does.
                pval = f"{metrics.get('p_value', 0):.4f}" if metrics.get('p_value') is not None else "N/A"
                test_name = metrics.get('test', 'N/A')

                html_content.append(f"<tr><td>{col}</td><td>{test_name}</td><td>{stat}</td><td>{pval}</td><td class='{cls}'>{status}</td></tr>")

            html_content.append("</table>")

            # --- 3. Detailed Statistics ---
            if "detailed_stats" in data:
                html_content.append("<h3>Detailed Statistics (Real vs Synthetic)</h3>")
                html_content.append("<table><tr><th>Column</th><th>Metric</th><th>Real</th><th>Synthetic</th></tr>")

                for col, stats in data["detailed_stats"].items():
                    # stats has "real": {...}, "synth": {...}
                    real_s = stats.get("real", {})
                    synth_s = stats.get("synth", {})

                    # Merge keys to show
                    all_keys = sorted(list(set(real_s.keys()) | set(synth_s.keys())))
                    # Usually we want mean, std, min, max or unique, top

                    first = True
                    for k in all_keys:
                        r_val = real_s.get(k, "-")
                        s_val = synth_s.get(k, "-")

                        # Format floats
                        if isinstance(r_val, (float, np.floating)): r_val = f"{r_val:.4f}"
                        if isinstance(s_val, (float, np.floating)): s_val = f"{s_val:.4f}"

                        row_start = f"<tr><td rowspan='{len(all_keys)}'>{col}</td>" if first else "<tr>"
                        row_end = f"<td>{k}</td><td>{r_val}</td><td>{s_val}</td></tr>"
                        html_content.append(row_start + row_end)
                        first = False
                html_content.append("</table>")

            # --- 4. Data Preview ---
            if "preview" in data:
                html_content.append("<h3>Data Preview (First 10 Rows)</h3>")
                html_content.append("<div class='row'>")

                # Real
                html_content.append("<div class='col'>")
                html_content.append("<div class='preview-header'>Original Data (Real)</div>")
                html_content.append(data["preview"]["real_html"])
                html_content.append("</div>")

                # Synth
                html_content.append("<div class='col'>")
                html_content.append("<div class='preview-header'>Synthetic Data (Generated)</div>")
                html_content.append(data["preview"]["synth_html"])
                html_content.append("</div>")

                html_content.append("</div>") # End row

            html_content.append("</div>") # End section

        html_content.append("</div></body></html>")

        with open(output_path, "w") as f:
            f.write("\n".join(html_content))

generate

generate(real_data: Dict[str, DataFrame], synth_data: Dict[str, DataFrame], output_path: str)

Run validation and save a report.

Parameters:

Name Type Description Default
real_data Dict[str, DataFrame]

Mapping of table name to real dataframe.

required
synth_data Dict[str, DataFrame]

Mapping of table name to synthetic dataframe.

required
output_path str

Destination path for HTML or JSON report.

required
Source code in syntho_hive/validation/report_generator.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def generate(self, real_data: Dict[str, pd.DataFrame], synth_data: Dict[str, pd.DataFrame], output_path: str):
    """Run validation and save a report.

    Args:
        real_data: Mapping of table name to real dataframe.
        synth_data: Mapping of table name to synthetic dataframe.
        output_path: Destination path for HTML or JSON report.
    """
    report = {
        "tables": {},
        "summary": "Validation Report"
    }

    for table_name, real_df in real_data.items():
        if table_name not in synth_data:
            continue

        synth_df = synth_data[table_name]

        # 1. Column comparisons
        col_metrics = self.validator.compare_columns(real_df, synth_df)

        # 2. Correlation
        corr_diff = self.validator.check_correlations(real_df, synth_df)

        # 3. Detailed Stats
        stats = self._calculate_detailed_stats(real_df, synth_df)

        # 4. Data Preview
        # Use Pandas to_html for easy formatting, strict constraints
        preview = {
            "real_html": real_df.head(10).to_html(index=False, classes='scroll-table', border=0),
            "synth_html": synth_df.head(10).to_html(index=False, classes='scroll-table', border=0)
        }

        report["tables"][table_name] = {
            "column_metrics": col_metrics,
            "correlation_distance": corr_diff,
            "detailed_stats": stats,
            "preview": preview
        }

    if output_path.endswith(".html"):
        self._save_html(report, output_path)
    else:
        # Save to JSON for now (PDF requires more deps)
        with open(output_path, "w") as f:
            json.dump(report, f, indent=2, default=str)

    import os
    print(f"Report saved to {os.path.abspath(output_path)}")

Exceptions

syntho_hive.exceptions.SynthoHiveError

Bases: Exception

Base exception for all SynthoHive errors.

Source code in syntho_hive/exceptions.py
10
11
12
13
class SynthoHiveError(Exception):
    """Base exception for all SynthoHive errors."""

    pass

syntho_hive.exceptions.SchemaError

Bases: SynthoHiveError

Raised for invalid metadata, missing FK definitions, unsupported column types, or invalid identifier names (e.g., SQL injection attempt via database/table name).

Source code in syntho_hive/exceptions.py
16
17
18
19
20
21
22
23
class SchemaError(SynthoHiveError):
    """
    Raised for invalid metadata, missing FK definitions, unsupported column
    types, or invalid identifier names (e.g., SQL injection attempt via
    database/table name).
    """

    pass

syntho_hive.exceptions.SchemaValidationError

Bases: SchemaError

Raised by validate_schema() when FK type mismatches, missing FK columns, or invalid FK references are detected. Collects all errors before raising so callers see the complete problem list in a single exception.

Source code in syntho_hive/exceptions.py
26
27
28
29
30
31
32
33
class SchemaValidationError(SchemaError):
    """
    Raised by validate_schema() when FK type mismatches, missing FK columns,
    or invalid FK references are detected. Collects all errors before raising
    so callers see the complete problem list in a single exception.
    """

    pass

syntho_hive.exceptions.TrainingError

Bases: SynthoHiveError

Raised for NaN loss, training divergence, GPU OOM, or any other failure that occurs during Synthesizer.fit().

Source code in syntho_hive/exceptions.py
36
37
38
39
40
41
42
class TrainingError(SynthoHiveError):
    """
    Raised for NaN loss, training divergence, GPU OOM, or any other failure
    that occurs during Synthesizer.fit().
    """

    pass

syntho_hive.exceptions.SerializationError

Bases: SynthoHiveError

Raised for save/load failures, corrupt checkpoints, missing checkpoint components, or version mismatches that prevent successful loading.

Source code in syntho_hive/exceptions.py
45
46
47
48
49
50
51
class SerializationError(SynthoHiveError):
    """
    Raised for save/load failures, corrupt checkpoints, missing checkpoint
    components, or version mismatches that prevent successful loading.
    """

    pass

syntho_hive.exceptions.ConstraintViolationError

Bases: SynthoHiveError

Raised when generated output violates numeric constraints (min, max, dtype) defined in the table Metadata.

Source code in syntho_hive/exceptions.py
54
55
56
57
58
59
60
class ConstraintViolationError(SynthoHiveError):
    """
    Raised when generated output violates numeric constraints (min, max,
    dtype) defined in the table Metadata.
    """

    pass

syntho_hive.exceptions.GenerationError

Bases: SynthoHiveError

Raised when synthetic data generation fails.

Source code in syntho_hive/exceptions.py
63
64
65
66
class GenerationError(SynthoHiveError):
    """Raised when synthetic data generation fails."""

    pass

syntho_hive.exceptions.PrivacyError

Bases: SynthoHiveError

Raised when privacy sanitization fails.

Source code in syntho_hive/exceptions.py
69
70
71
72
class PrivacyError(SynthoHiveError):
    """Raised when privacy sanitization fails."""

    pass