Skip to content

Relational Orchestration

This module handles the complexity of multi-table generation, ensuring referential integrity and statistical correlation between tables.

Orchestrator

syntho_hive.relational.orchestrator.StagedOrchestrator

Manage staged relational synthesis across parent/child tables.

Source code in syntho_hive/relational/orchestrator.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
class StagedOrchestrator:
    """Manage staged relational synthesis across parent/child tables."""

    def __init__(
        self,
        metadata: Metadata,
        spark: Optional[SparkSession] = None,
        io: Optional[Any] = None,
        on_write_failure: Literal["raise", "cleanup", "retry"] = "raise",
        model_cls: Type[ConditionalGenerativeModel] = CTGAN,
    ):
        """Initialize orchestrator dependencies.

        Args:
            metadata: Dataset metadata with relational details.
            spark: SparkSession used for IO and potential UDFs. Required when
                ``io`` is not provided.
            io: Pre-constructed IO backend. When provided, ``spark`` is ignored.
                Useful for testing and environments where SparkIO is not desired.
            on_write_failure: Policy when a write fails during generation with
                ``output_path_base`` set. Options:
                - ``'raise'`` (default): re-raise the exception immediately.
                - ``'cleanup'``: remove all previously written paths before raising.
                - ``'retry'``: attempt one additional write before raising.
            model_cls: Generative model class to instantiate per table. Must be a class
                (not an instance) implementing ``ConditionalGenerativeModel``. The class
                constructor must accept ``(metadata, batch_size, epochs, **kwargs)``.
                Defaults to CTGAN.
        """
        if not issubclass(model_cls, ConditionalGenerativeModel):
            raise TypeError(
                f"model_cls must be a subclass of ConditionalGenerativeModel, "
                f"got {model_cls!r}. Implement fit(), sample(), save(), load() "
                f"and subclass ConditionalGenerativeModel."
            )
        self.metadata = metadata
        self.spark = spark
        if io is not None:
            self.io = io
        else:
            self.io = SparkIO(spark)
        self.on_write_failure = on_write_failure
        self.model_cls = model_cls
        self.graph = SchemaGraph(metadata)
        self.models: Dict[str, ConditionalGenerativeModel] = {}
        self.linkage_models: Dict[str, LinkageModel] = {}

    def fit_all(
        self,
        real_data_paths: Dict[str, str],
        epochs: int = 300,
        batch_size: int = 500,
        progress_bar: bool = True,
        checkpoint_interval: int = 10,
        checkpoint_dir: Optional[str] = None,
        **model_kwargs: Union[int, str, Tuple[int, int]],
    ):
        """Fit CTGAN and linkage models for every table.

        Args:
            real_data_paths: Mapping ``{table_name: 'db.table' or '/path'}``.
            epochs: Number of training epochs for CTGAN.
            batch_size: Training batch size.
            progress_bar: If True (default), display tqdm progress bar to stderr during training.
                Structured log events always emit regardless of this flag.
            checkpoint_interval: Save a validation checkpoint every N epochs. Default 10.
            checkpoint_dir: Optional directory to save best_checkpoint/ and final_checkpoint/
                during training.
            **model_kwargs: Extra parameters forwarded to the model constructor.
        """
        # Topo sort to train parents first? Or independent?
        # Linkage model needs both parent and child data.
        # CTGAN needs Child data + Parent attributes (joined).

        # Training order doesn't strictly matter as long as we have data,
        # but generation order matters.

        self.metadata.validate_schema()

        for table_name in self.metadata.tables:
            log.info("fitting_model", table=table_name)
            data_path = real_data_paths.get(table_name)
            if not data_path:
                log.warning("no_data_path", table=table_name, action="skipping")
                continue

            # Read data
            target_df = self.io.read_dataset(data_path)
            # Convert to Pandas for CTGAN (prototype limitation)
            target_pdf = target_df.toPandas()

            config = self.metadata.get_table(table_name)
            if config is None:
                raise SchemaError(f"Table '{table_name}' not found in metadata")
            if not config.has_dependencies:
                # Root Table
                model = self.model_cls(
                    self.metadata, batch_size=batch_size, epochs=epochs, **model_kwargs
                )
                model.fit(
                    target_pdf,
                    table_name=table_name,
                    progress_bar=progress_bar,
                    checkpoint_interval=checkpoint_interval,
                    checkpoint_dir=checkpoint_dir,
                )
                self.models[table_name] = model
            else:
                # Child Table
                # 1. Identify "Driver" Parent (First FK)
                pk_map = config.fk
                # pk_map is {local_col: "parent_table.parent_col"}

                # Sort keys to ensure deterministic driver selection
                sorted_fks = sorted(pk_map.keys())
                driver_fk = sorted_fks[0]
                driver_ref = pk_map[driver_fk]

                driver_parent_table, driver_parent_pk = driver_ref.split(".")

                parent_path = real_data_paths.get(driver_parent_table)
                parent_df = self.io.read_dataset(parent_path).toPandas()

                # 2. Train Linkage Model on Driver Parent
                log.info(
                    "training_linkage",
                    table=table_name,
                    driver_parent=driver_parent_table,
                )
                linkage_method = self.metadata.tables[table_name].linkage_method
                linkage = LinkageModel(method=linkage_method)
                linkage.fit(
                    parent_df, target_pdf, fk_col=driver_fk, pk_col=driver_parent_pk
                )
                self.linkage_models[table_name] = linkage

                # 3. Train Conditional CTGAN (Conditioning on Driver Parent Context)
                context_cols = config.parent_context_cols
                if context_cols:
                    missing = [c for c in context_cols if c not in parent_df.columns]
                    if missing:
                        raise SchemaError(
                            f"parent_context_cols {missing} not found in parent table "
                            f"'{driver_parent_table}' columns: {list(parent_df.columns)}"
                        )
                    # Prepare parent data for merge
                    right_side = parent_df[[driver_parent_pk] + context_cols].copy()

                    rename_map = {c: f"__ctx__{c}" for c in context_cols}
                    right_side = right_side.rename(columns=rename_map)

                    joined = target_pdf.merge(
                        right_side,
                        left_on=driver_fk,
                        right_on=driver_parent_pk,
                        how="left",
                    )

                    context_df = joined[list(rename_map.values())].copy()
                    context_df.columns = context_cols
                else:
                    context_df = None

                model = self.model_cls(
                    self.metadata, batch_size=batch_size, epochs=epochs, **model_kwargs
                )
                # Note: We exclude ALL FK columns from CTGAN modeling to avoid them being treated as continuous/categorical features
                # The DataTransformer handles excluding PK/FK if they are marked in metadata.
                # But we must ensure metadata knows about ALL FKs. (It does via config.fk)
                model.fit(
                    target_pdf,
                    context=context_df,
                    table_name=table_name,
                    progress_bar=progress_bar,
                    checkpoint_interval=checkpoint_interval,
                    checkpoint_dir=checkpoint_dir,
                )
                self.models[table_name] = model

    def generate(
        self, num_rows_root: Dict[str, int], output_path_base: Optional[str] = None
    ) -> Dict[str, pd.DataFrame]:
        """Execute the multi-stage generation pipeline.

        Args:
            num_rows_root: Mapping of root table name to number of rows to generate.
            output_path_base: Base path where generated tables will be stored.
                When set, DataFrames are written to disk and released from memory
                after each table, preventing OOM on large schemas. Child tables
                read parent data from disk via this path. When None, all DataFrames
                are accumulated in memory (original behavior).

        Returns:
            Dictionary of generated DataFrames. When ``output_path_base`` is set,
            the dict contains only tables that could not be released (i.e., an empty
            dict is normal). When ``output_path_base`` is None, all tables are
            returned in memory.
        """
        generation_order = self.graph.get_generation_order()

        generated_tables = {}
        written_paths: List[str] = []
        self._written_paths = written_paths

        for table_name in generation_order:
            config = self.metadata.get_table(table_name)
            if config is None:
                raise SchemaError(f"Table '{table_name}' not found in metadata")
            is_root = not config.fk

            model = self.models[table_name]

            generated_pdf = None
            train_pdf = None  # used for schema fallback on zero-row children

            if is_root:
                log.info("generating_root_table", table=table_name)
                n_rows = num_rows_root.get(table_name, 1000)
                generated_pdf = model.sample(n_rows)
                # Assign PKs — use actual DataFrame length in case model returns different count
                generated_pdf[config.pk] = range(1, len(generated_pdf) + 1)
            else:
                log.info("generating_child_table", table=table_name)

                # 1. Handle Driver Parent (Cardinality & Context)
                pk_map = config.fk
                sorted_fks = sorted(pk_map.keys())
                driver_fk = sorted_fks[0]
                driver_ref = pk_map[driver_fk]
                driver_parent_table, driver_parent_pk = driver_ref.split(".")

                # Read Driver Parent Data (From Output or Memory)
                if output_path_base:
                    parent_path = f"{output_path_base}/{driver_parent_table}"
                    parent_df = self.io.read_dataset(parent_path).toPandas()
                else:
                    parent_df = generated_tables[driver_parent_table]

                linkage = self.linkage_models[table_name]

                # Sample Counts
                counts = linkage.sample_counts(parent_df)

                # Construct Context from Driver
                parent_ids_repeated = np.repeat(
                    parent_df[driver_parent_pk].to_numpy(), counts
                )

                context_cols = config.parent_context_cols
                if context_cols:
                    context_repeated_vals = {}
                    for col in context_cols:
                        context_repeated_vals[col] = np.repeat(
                            parent_df[col].to_numpy(), counts
                        )
                    context_df = pd.DataFrame(context_repeated_vals)
                else:
                    context_df = None

                total_child_rows = len(parent_ids_repeated)

                # 2. Generate Data
                if total_child_rows > 0:
                    generated_pdf = model.sample(total_child_rows, context=context_df)

                    # Assign Driver FK
                    generated_pdf[driver_fk] = parent_ids_repeated

                    # Assign Secondary FKs (Random Sampling from respective Parents)
                    for fk_col in sorted_fks[1:]:
                        ref = pk_map[fk_col]
                        p_table, p_pk = ref.split(".")

                        # Read Secondary Parent
                        if output_path_base:
                            p_path = f"{output_path_base}/{p_table}"
                            p_df = self.io.read_dataset(p_path).toPandas()
                        else:
                            p_df = generated_tables[p_table]

                        valid_pks = p_df[p_pk].to_numpy()

                        # Randomly sample valid PKs for this column
                        generated_pdf[fk_col] = np.random.choice(
                            valid_pks, size=total_child_rows
                        )

                    # Assign PKs
                    generated_pdf[config.pk] = range(1, len(generated_pdf) + 1)
                else:
                    # Zero child rows: create empty DataFrame with correct schema
                    # so downstream grandchild tables don't crash with KeyError
                    log.info(
                        "zero_child_rows",
                        table=table_name,
                        driver_parent=driver_parent_table,
                    )
                    train_columns = (
                        list(model.metadata.get_table(table_name).constraints.keys())
                        if model.metadata.get_table(table_name)
                        else []
                    )
                    generated_pdf = pd.DataFrame(
                        columns=train_columns if train_columns else [config.pk]
                    )

            if generated_pdf is not None:
                if output_path_base:
                    output_path = f"{output_path_base}/{table_name}"
                    _write_with_failure_policy(
                        io=self.io,
                        pdf=generated_pdf,
                        path=output_path,
                        policy=self.on_write_failure,
                        written_paths=written_paths,
                    )
                    log.debug(
                        "table_released_from_memory", table=table_name, path=output_path
                    )
                    # Do NOT store in generated_tables — child tables read from disk via output_path_base
                else:
                    generated_tables[table_name] = generated_pdf

        return generated_tables

fit_all

fit_all(real_data_paths: Dict[str, str], epochs: int = 300, batch_size: int = 500, progress_bar: bool = True, checkpoint_interval: int = 10, checkpoint_dir: Optional[str] = None, **model_kwargs: Union[int, str, Tuple[int, int]])

Fit CTGAN and linkage models for every table.

Parameters:

Name Type Description Default
real_data_paths Dict[str, str]

Mapping {table_name: 'db.table' or '/path'}.

required
epochs int

Number of training epochs for CTGAN.

300
batch_size int

Training batch size.

500
progress_bar bool

If True (default), display tqdm progress bar to stderr during training. Structured log events always emit regardless of this flag.

True
checkpoint_interval int

Save a validation checkpoint every N epochs. Default 10.

10
checkpoint_dir Optional[str]

Optional directory to save best_checkpoint/ and final_checkpoint/ during training.

None
**model_kwargs Union[int, str, Tuple[int, int]]

Extra parameters forwarded to the model constructor.

{}
Source code in syntho_hive/relational/orchestrator.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def fit_all(
    self,
    real_data_paths: Dict[str, str],
    epochs: int = 300,
    batch_size: int = 500,
    progress_bar: bool = True,
    checkpoint_interval: int = 10,
    checkpoint_dir: Optional[str] = None,
    **model_kwargs: Union[int, str, Tuple[int, int]],
):
    """Fit CTGAN and linkage models for every table.

    Args:
        real_data_paths: Mapping ``{table_name: 'db.table' or '/path'}``.
        epochs: Number of training epochs for CTGAN.
        batch_size: Training batch size.
        progress_bar: If True (default), display tqdm progress bar to stderr during training.
            Structured log events always emit regardless of this flag.
        checkpoint_interval: Save a validation checkpoint every N epochs. Default 10.
        checkpoint_dir: Optional directory to save best_checkpoint/ and final_checkpoint/
            during training.
        **model_kwargs: Extra parameters forwarded to the model constructor.
    """
    # Topo sort to train parents first? Or independent?
    # Linkage model needs both parent and child data.
    # CTGAN needs Child data + Parent attributes (joined).

    # Training order doesn't strictly matter as long as we have data,
    # but generation order matters.

    self.metadata.validate_schema()

    for table_name in self.metadata.tables:
        log.info("fitting_model", table=table_name)
        data_path = real_data_paths.get(table_name)
        if not data_path:
            log.warning("no_data_path", table=table_name, action="skipping")
            continue

        # Read data
        target_df = self.io.read_dataset(data_path)
        # Convert to Pandas for CTGAN (prototype limitation)
        target_pdf = target_df.toPandas()

        config = self.metadata.get_table(table_name)
        if config is None:
            raise SchemaError(f"Table '{table_name}' not found in metadata")
        if not config.has_dependencies:
            # Root Table
            model = self.model_cls(
                self.metadata, batch_size=batch_size, epochs=epochs, **model_kwargs
            )
            model.fit(
                target_pdf,
                table_name=table_name,
                progress_bar=progress_bar,
                checkpoint_interval=checkpoint_interval,
                checkpoint_dir=checkpoint_dir,
            )
            self.models[table_name] = model
        else:
            # Child Table
            # 1. Identify "Driver" Parent (First FK)
            pk_map = config.fk
            # pk_map is {local_col: "parent_table.parent_col"}

            # Sort keys to ensure deterministic driver selection
            sorted_fks = sorted(pk_map.keys())
            driver_fk = sorted_fks[0]
            driver_ref = pk_map[driver_fk]

            driver_parent_table, driver_parent_pk = driver_ref.split(".")

            parent_path = real_data_paths.get(driver_parent_table)
            parent_df = self.io.read_dataset(parent_path).toPandas()

            # 2. Train Linkage Model on Driver Parent
            log.info(
                "training_linkage",
                table=table_name,
                driver_parent=driver_parent_table,
            )
            linkage_method = self.metadata.tables[table_name].linkage_method
            linkage = LinkageModel(method=linkage_method)
            linkage.fit(
                parent_df, target_pdf, fk_col=driver_fk, pk_col=driver_parent_pk
            )
            self.linkage_models[table_name] = linkage

            # 3. Train Conditional CTGAN (Conditioning on Driver Parent Context)
            context_cols = config.parent_context_cols
            if context_cols:
                missing = [c for c in context_cols if c not in parent_df.columns]
                if missing:
                    raise SchemaError(
                        f"parent_context_cols {missing} not found in parent table "
                        f"'{driver_parent_table}' columns: {list(parent_df.columns)}"
                    )
                # Prepare parent data for merge
                right_side = parent_df[[driver_parent_pk] + context_cols].copy()

                rename_map = {c: f"__ctx__{c}" for c in context_cols}
                right_side = right_side.rename(columns=rename_map)

                joined = target_pdf.merge(
                    right_side,
                    left_on=driver_fk,
                    right_on=driver_parent_pk,
                    how="left",
                )

                context_df = joined[list(rename_map.values())].copy()
                context_df.columns = context_cols
            else:
                context_df = None

            model = self.model_cls(
                self.metadata, batch_size=batch_size, epochs=epochs, **model_kwargs
            )
            # Note: We exclude ALL FK columns from CTGAN modeling to avoid them being treated as continuous/categorical features
            # The DataTransformer handles excluding PK/FK if they are marked in metadata.
            # But we must ensure metadata knows about ALL FKs. (It does via config.fk)
            model.fit(
                target_pdf,
                context=context_df,
                table_name=table_name,
                progress_bar=progress_bar,
                checkpoint_interval=checkpoint_interval,
                checkpoint_dir=checkpoint_dir,
            )
            self.models[table_name] = model

generate

generate(num_rows_root: Dict[str, int], output_path_base: Optional[str] = None) -> Dict[str, pd.DataFrame]

Execute the multi-stage generation pipeline.

Parameters:

Name Type Description Default
num_rows_root Dict[str, int]

Mapping of root table name to number of rows to generate.

required
output_path_base Optional[str]

Base path where generated tables will be stored. When set, DataFrames are written to disk and released from memory after each table, preventing OOM on large schemas. Child tables read parent data from disk via this path. When None, all DataFrames are accumulated in memory (original behavior).

None

Returns:

Type Description
Dict[str, DataFrame]

Dictionary of generated DataFrames. When output_path_base is set,

Dict[str, DataFrame]

the dict contains only tables that could not be released (i.e., an empty

Dict[str, DataFrame]

dict is normal). When output_path_base is None, all tables are

Dict[str, DataFrame]

returned in memory.

Source code in syntho_hive/relational/orchestrator.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def generate(
    self, num_rows_root: Dict[str, int], output_path_base: Optional[str] = None
) -> Dict[str, pd.DataFrame]:
    """Execute the multi-stage generation pipeline.

    Args:
        num_rows_root: Mapping of root table name to number of rows to generate.
        output_path_base: Base path where generated tables will be stored.
            When set, DataFrames are written to disk and released from memory
            after each table, preventing OOM on large schemas. Child tables
            read parent data from disk via this path. When None, all DataFrames
            are accumulated in memory (original behavior).

    Returns:
        Dictionary of generated DataFrames. When ``output_path_base`` is set,
        the dict contains only tables that could not be released (i.e., an empty
        dict is normal). When ``output_path_base`` is None, all tables are
        returned in memory.
    """
    generation_order = self.graph.get_generation_order()

    generated_tables = {}
    written_paths: List[str] = []
    self._written_paths = written_paths

    for table_name in generation_order:
        config = self.metadata.get_table(table_name)
        if config is None:
            raise SchemaError(f"Table '{table_name}' not found in metadata")
        is_root = not config.fk

        model = self.models[table_name]

        generated_pdf = None
        train_pdf = None  # used for schema fallback on zero-row children

        if is_root:
            log.info("generating_root_table", table=table_name)
            n_rows = num_rows_root.get(table_name, 1000)
            generated_pdf = model.sample(n_rows)
            # Assign PKs — use actual DataFrame length in case model returns different count
            generated_pdf[config.pk] = range(1, len(generated_pdf) + 1)
        else:
            log.info("generating_child_table", table=table_name)

            # 1. Handle Driver Parent (Cardinality & Context)
            pk_map = config.fk
            sorted_fks = sorted(pk_map.keys())
            driver_fk = sorted_fks[0]
            driver_ref = pk_map[driver_fk]
            driver_parent_table, driver_parent_pk = driver_ref.split(".")

            # Read Driver Parent Data (From Output or Memory)
            if output_path_base:
                parent_path = f"{output_path_base}/{driver_parent_table}"
                parent_df = self.io.read_dataset(parent_path).toPandas()
            else:
                parent_df = generated_tables[driver_parent_table]

            linkage = self.linkage_models[table_name]

            # Sample Counts
            counts = linkage.sample_counts(parent_df)

            # Construct Context from Driver
            parent_ids_repeated = np.repeat(
                parent_df[driver_parent_pk].to_numpy(), counts
            )

            context_cols = config.parent_context_cols
            if context_cols:
                context_repeated_vals = {}
                for col in context_cols:
                    context_repeated_vals[col] = np.repeat(
                        parent_df[col].to_numpy(), counts
                    )
                context_df = pd.DataFrame(context_repeated_vals)
            else:
                context_df = None

            total_child_rows = len(parent_ids_repeated)

            # 2. Generate Data
            if total_child_rows > 0:
                generated_pdf = model.sample(total_child_rows, context=context_df)

                # Assign Driver FK
                generated_pdf[driver_fk] = parent_ids_repeated

                # Assign Secondary FKs (Random Sampling from respective Parents)
                for fk_col in sorted_fks[1:]:
                    ref = pk_map[fk_col]
                    p_table, p_pk = ref.split(".")

                    # Read Secondary Parent
                    if output_path_base:
                        p_path = f"{output_path_base}/{p_table}"
                        p_df = self.io.read_dataset(p_path).toPandas()
                    else:
                        p_df = generated_tables[p_table]

                    valid_pks = p_df[p_pk].to_numpy()

                    # Randomly sample valid PKs for this column
                    generated_pdf[fk_col] = np.random.choice(
                        valid_pks, size=total_child_rows
                    )

                # Assign PKs
                generated_pdf[config.pk] = range(1, len(generated_pdf) + 1)
            else:
                # Zero child rows: create empty DataFrame with correct schema
                # so downstream grandchild tables don't crash with KeyError
                log.info(
                    "zero_child_rows",
                    table=table_name,
                    driver_parent=driver_parent_table,
                )
                train_columns = (
                    list(model.metadata.get_table(table_name).constraints.keys())
                    if model.metadata.get_table(table_name)
                    else []
                )
                generated_pdf = pd.DataFrame(
                    columns=train_columns if train_columns else [config.pk]
                )

        if generated_pdf is not None:
            if output_path_base:
                output_path = f"{output_path_base}/{table_name}"
                _write_with_failure_policy(
                    io=self.io,
                    pdf=generated_pdf,
                    path=output_path,
                    policy=self.on_write_failure,
                    written_paths=written_paths,
                )
                log.debug(
                    "table_released_from_memory", table=table_name, path=output_path
                )
                # Do NOT store in generated_tables — child tables read from disk via output_path_base
            else:
                generated_tables[table_name] = generated_pdf

    return generated_tables

Linkage

syntho_hive.relational.linkage.LinkageModel

Model cardinality relationships between parent and child tables.

Replaces the former GaussianMixture approach (which produced negative counts) with an empirical histogram resampler (default) or optional NegBinom fit.

Source code in syntho_hive/relational/linkage.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class LinkageModel:
    """Model cardinality relationships between parent and child tables.

    Replaces the former GaussianMixture approach (which produced negative counts)
    with an empirical histogram resampler (default) or optional NegBinom fit.
    """

    def __init__(self, method: str = "empirical"):
        """Create a linkage model.

        Args:
            method: Cardinality distribution. 'empirical' (default) draws from observed
                    child counts. 'negbinom' fits scipy.stats.nbinom via method-of-moments.
                    Falls back to empirical if the data is not overdispersed (variance <= mean).
        """
        self.method = method
        self._observed_counts = None
        self._nbinom_n = None
        self._nbinom_p = None
        self.max_children = 0
        self.distribution = None  # set during fit for negbinom edge cases
        self.params = None

    def fit(
        self,
        parent_df: pd.DataFrame,
        child_df: pd.DataFrame,
        fk_col: str,
        pk_col: str = "id",
    ):
        """Fit the distribution of child counts per parent.

        Counts children per parent, including parents with zero children.

        Args:
            parent_df: Parent table with unique primary keys.
            child_df: Child table containing foreign keys to parents.
            fk_col: Name of the foreign key column in the child table.
            pk_col: Name of the primary key column in the parent table.
        """
        counts = child_df[fk_col].value_counts()
        parent_ids = pd.DataFrame(parent_df[pk_col].unique(), columns=[pk_col])
        count_df = parent_ids.merge(
            counts.rename("child_count"), left_on=pk_col, right_index=True, how="left"
        ).fillna(0)
        X = count_df["child_count"].to_numpy(dtype=int)
        self.max_children = int(X.max())
        self._observed_counts = X

        if self.method == "negbinom":
            mu = float(X.mean())
            var = float(X.var())
            if var == 0:
                # All parents have the same child count — constant distribution
                self.distribution = "constant"
                self.params = {"value": int(round(mu))}
                log.info(
                    "negbinom_constant_fallback",
                    reason="zero variance — all parents have the same child count",
                    fk_col=fk_col,
                    value=int(round(mu)),
                )
                return
            if var <= mu:
                # Underdispersed or Poisson-like — NegBinom is ill-defined
                self.distribution = "poisson"
                self.params = {"mu": mu}
                log.info(
                    "negbinom_poisson_fallback",
                    reason="variance <= mean — NegBinom ill-defined, using Poisson",
                    fk_col=fk_col,
                    mu=mu,
                    var=var,
                )
                return
            if mu > 0:
                p = mu / var
                n = mu * p / (1.0 - p)
                n = max(min(n, 1e6), 0.1)  # bound n to avoid numerical explosion
                self._nbinom_n = n
                self._nbinom_p = p
            else:
                log.warning(
                    "negbinom_fallback_to_empirical",
                    reason="mean is zero — NegBinom ill-defined for this data",
                    fk_col=fk_col,
                )
                self.method = "empirical"  # runtime fallback

    def sample_counts(self, parent_context: pd.DataFrame) -> np.ndarray:
        """Sample child counts for a set of parents.

        Args:
            parent_context: Parent dataframe (only length is used here).

        Returns:
            Numpy array of non-negative integer child counts aligned with parents.

        Raises:
            ValueError: If called before fitting the model.
        """
        if self._observed_counts is None:
            raise ValueError("LinkageModel.sample_counts() called before fit()")
        n_samples = len(parent_context)

        # Handle special distribution types from negbinom edge cases
        if self.distribution == "constant" and self.params is not None:
            return np.full(n_samples, self.params["value"], dtype=int)
        if self.distribution == "poisson" and self.params is not None:
            counts = np.random.poisson(self.params["mu"], size=n_samples)
            return np.clip(counts, 0, None).astype(int)

        if self.method == "negbinom" and self._nbinom_n is not None:
            from scipy import stats

            counts = stats.nbinom.rvs(self._nbinom_n, self._nbinom_p, size=n_samples)
            return np.clip(counts, 0, None).astype(int)
        # Default: empirical — draw from observed distribution
        return np.random.choice(self._observed_counts, size=n_samples, replace=True)

fit

fit(parent_df: DataFrame, child_df: DataFrame, fk_col: str, pk_col: str = 'id')

Fit the distribution of child counts per parent.

Counts children per parent, including parents with zero children.

Parameters:

Name Type Description Default
parent_df DataFrame

Parent table with unique primary keys.

required
child_df DataFrame

Child table containing foreign keys to parents.

required
fk_col str

Name of the foreign key column in the child table.

required
pk_col str

Name of the primary key column in the parent table.

'id'
Source code in syntho_hive/relational/linkage.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def fit(
    self,
    parent_df: pd.DataFrame,
    child_df: pd.DataFrame,
    fk_col: str,
    pk_col: str = "id",
):
    """Fit the distribution of child counts per parent.

    Counts children per parent, including parents with zero children.

    Args:
        parent_df: Parent table with unique primary keys.
        child_df: Child table containing foreign keys to parents.
        fk_col: Name of the foreign key column in the child table.
        pk_col: Name of the primary key column in the parent table.
    """
    counts = child_df[fk_col].value_counts()
    parent_ids = pd.DataFrame(parent_df[pk_col].unique(), columns=[pk_col])
    count_df = parent_ids.merge(
        counts.rename("child_count"), left_on=pk_col, right_index=True, how="left"
    ).fillna(0)
    X = count_df["child_count"].to_numpy(dtype=int)
    self.max_children = int(X.max())
    self._observed_counts = X

    if self.method == "negbinom":
        mu = float(X.mean())
        var = float(X.var())
        if var == 0:
            # All parents have the same child count — constant distribution
            self.distribution = "constant"
            self.params = {"value": int(round(mu))}
            log.info(
                "negbinom_constant_fallback",
                reason="zero variance — all parents have the same child count",
                fk_col=fk_col,
                value=int(round(mu)),
            )
            return
        if var <= mu:
            # Underdispersed or Poisson-like — NegBinom is ill-defined
            self.distribution = "poisson"
            self.params = {"mu": mu}
            log.info(
                "negbinom_poisson_fallback",
                reason="variance <= mean — NegBinom ill-defined, using Poisson",
                fk_col=fk_col,
                mu=mu,
                var=var,
            )
            return
        if mu > 0:
            p = mu / var
            n = mu * p / (1.0 - p)
            n = max(min(n, 1e6), 0.1)  # bound n to avoid numerical explosion
            self._nbinom_n = n
            self._nbinom_p = p
        else:
            log.warning(
                "negbinom_fallback_to_empirical",
                reason="mean is zero — NegBinom ill-defined for this data",
                fk_col=fk_col,
            )
            self.method = "empirical"  # runtime fallback

sample_counts

sample_counts(parent_context: DataFrame) -> np.ndarray

Sample child counts for a set of parents.

Parameters:

Name Type Description Default
parent_context DataFrame

Parent dataframe (only length is used here).

required

Returns:

Type Description
ndarray

Numpy array of non-negative integer child counts aligned with parents.

Raises:

Type Description
ValueError

If called before fitting the model.

Source code in syntho_hive/relational/linkage.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def sample_counts(self, parent_context: pd.DataFrame) -> np.ndarray:
    """Sample child counts for a set of parents.

    Args:
        parent_context: Parent dataframe (only length is used here).

    Returns:
        Numpy array of non-negative integer child counts aligned with parents.

    Raises:
        ValueError: If called before fitting the model.
    """
    if self._observed_counts is None:
        raise ValueError("LinkageModel.sample_counts() called before fit()")
    n_samples = len(parent_context)

    # Handle special distribution types from negbinom edge cases
    if self.distribution == "constant" and self.params is not None:
        return np.full(n_samples, self.params["value"], dtype=int)
    if self.distribution == "poisson" and self.params is not None:
        counts = np.random.poisson(self.params["mu"], size=n_samples)
        return np.clip(counts, 0, None).astype(int)

    if self.method == "negbinom" and self._nbinom_n is not None:
        from scipy import stats

        counts = stats.nbinom.rvs(self._nbinom_n, self._nbinom_p, size=n_samples)
        return np.clip(counts, 0, None).astype(int)
    # Default: empirical — draw from observed distribution
    return np.random.choice(self._observed_counts, size=n_samples, replace=True)

Graph & Schema

syntho_hive.relational.graph.SchemaGraph

DAG representation of table dependencies derived from metadata.

Source code in syntho_hive/relational/graph.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class SchemaGraph:
    """DAG representation of table dependencies derived from metadata."""

    def __init__(self, metadata: Metadata):
        """Create a dependency graph from table metadata.

        Args:
            metadata: Dataset metadata containing FK relationships.
        """
        self.metadata = metadata
        self.adj_list: Dict[str, Set[str]] = {}
        self._build_graph()

    def _build_graph(self):
        """Build an adjacency list from FK relationships."""
        for table_name in self.metadata.tables:
            self.adj_list[table_name] = set()

        for table_name, config in self.metadata.tables.items():
            for ref_col, ref_path in config.fk.items():
                parent_table, _ = ref_path.split(".")
                # Dependency: Parent -> Child (we generate Parent first)
                if parent_table in self.adj_list and parent_table != table_name:
                    self.adj_list[parent_table].add(table_name)

    def get_generation_order(self) -> List[str]:
        """Return a topologically sorted list of tables.

        Returns:
            List of table names ordered for parent-before-child generation.

        Raises:
            ValueError: If a cycle is detected in FK relationships.
        """
        visited = set()
        stack = []
        path = set()

        def visit(node):
            if node in path:
                raise ValueError(f"Cycle detected involving {node}")
            if node in visited:
                return

            path.add(node)
            visited.add(node)

            # Note: For generation order (Parent -> Child), we want to visit parents, then children.
            # Standard topological sort gives reverse dependency order if edge is Dependency -> Dependent
            # Here Edge is Parent -> Child. So generic topological sort:
            # Visit Parent, allow it to finish, add to stack? No.
            # If A -> B (A is parent of B).
            # We want [A, B].
            # Normal DFS topo sort on A -> B puts B on stack, then A. Stack: [A, B] (LIFO) -> Pop A, Pop B.
            # Yes, standard topological sort on (Parent -> Child) edges returns [Parent, Child].

            for neighbor in self.adj_list.get(node, []):
                visit(neighbor)

            path.remove(node)
            stack.append(node)

        # Iterate over all nodes, not just roots, to catch disconnected components
        # Sort keys for deterministic order
        for node in sorted(self.adj_list.keys()):
            visit(node)

        return stack[::-1]  # Reverse stack to get topological order

get_generation_order

get_generation_order() -> List[str]

Return a topologically sorted list of tables.

Returns:

Type Description
List[str]

List of table names ordered for parent-before-child generation.

Raises:

Type Description
ValueError

If a cycle is detected in FK relationships.

Source code in syntho_hive/relational/graph.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def get_generation_order(self) -> List[str]:
    """Return a topologically sorted list of tables.

    Returns:
        List of table names ordered for parent-before-child generation.

    Raises:
        ValueError: If a cycle is detected in FK relationships.
    """
    visited = set()
    stack = []
    path = set()

    def visit(node):
        if node in path:
            raise ValueError(f"Cycle detected involving {node}")
        if node in visited:
            return

        path.add(node)
        visited.add(node)

        # Note: For generation order (Parent -> Child), we want to visit parents, then children.
        # Standard topological sort gives reverse dependency order if edge is Dependency -> Dependent
        # Here Edge is Parent -> Child. So generic topological sort:
        # Visit Parent, allow it to finish, add to stack? No.
        # If A -> B (A is parent of B).
        # We want [A, B].
        # Normal DFS topo sort on A -> B puts B on stack, then A. Stack: [A, B] (LIFO) -> Pop A, Pop B.
        # Yes, standard topological sort on (Parent -> Child) edges returns [Parent, Child].

        for neighbor in self.adj_list.get(node, []):
            visit(neighbor)

        path.remove(node)
        stack.append(node)

    # Iterate over all nodes, not just roots, to catch disconnected components
    # Sort keys for deterministic order
    for node in sorted(self.adj_list.keys()):
        visit(node)

    return stack[::-1]  # Reverse stack to get topological order