Skip to content

Connectors

syntho_hive.connectors.spark_io.SparkIO

Utility for reading and writing datasets via Spark and Delta Lake.

Source code in syntho_hive/connectors/spark_io.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class SparkIO:
    """Utility for reading and writing datasets via Spark and Delta Lake."""

    def __init__(self, spark: SparkSession):
        """Initialize the IO helper.

        Args:
            spark: Active SparkSession used for all IO.
        """
        self.spark = spark

    def read_dataset(
        self,
        path_or_table: str,
        format: str = None,
        **kwargs: Union[str, int, bool, float],
    ) -> DataFrame:
        """Read a dataset from a table name or filesystem path.

        Args:
            path_or_table: Hive table name or filesystem/URI path.
            format: Optional explicit format override (e.g., ``"csv"``).
            **kwargs: Additional Spark read options.

        Returns:
            Spark DataFrame loaded from the specified source.
        """
        # Simple heuristic
        if (
            "/" in path_or_table
            or "\\" in path_or_table
            or path_or_table.startswith("file://")
        ):
            if format:
                return self.spark.read.format(format).load(path_or_table, **kwargs)

            if path_or_table.endswith(".csv"):
                return (
                    self.spark.read.format("csv")
                    .option("header", "true")
                    .option("inferSchema", "true")
                    .option("multiLine", "true")
                    .load(path_or_table, **kwargs)
                )
            elif path_or_table.endswith(".parquet"):
                return self.spark.read.format("parquet").load(path_or_table, **kwargs)
            else:
                # Default to parquet for directories/tables (matching write default)
                return self.spark.read.format("parquet").load(path_or_table, **kwargs)
        return self.spark.table(path_or_table)

    def write_dataset(
        self,
        df: DataFrame,
        target_path: str,
        mode: str = "overwrite",
        partition_by: Optional[str] = None,
        format: str = "parquet",
    ):
        """Write a Spark DataFrame to storage.

        Args:
            df: Spark DataFrame to persist.
            target_path: Output path (directory or table location).
            mode: Save mode, e.g., ``"overwrite"`` or ``"append"``.
            partition_by: Optional column name to partition by.
            format: Output format, defaults to ``"parquet"``.
        """
        writer = df.write.format(format).mode(mode)
        if partition_by:
            writer = writer.partitionBy(partition_by)
        writer.save(target_path)

    def write_pandas(
        self,
        pdf: pd.DataFrame,
        target_path: str,
        mode: str = "overwrite",
        format: str = "parquet",
    ):
        """Write a Pandas DataFrame using Spark-backed persistence.

        Args:
            pdf: Pandas DataFrame to persist.
            target_path: Output path for the written dataset.
            mode: Save mode for Spark writer (default ``"overwrite"``).
            format: Storage format, defaults to ``"parquet"``.
        """
        sdf = self.spark.createDataFrame(pdf)
        self.write_dataset(sdf, target_path, mode=mode, format=format)

read_dataset

read_dataset(path_or_table: str, format: str = None, **kwargs: Union[str, int, bool, float]) -> DataFrame

Read a dataset from a table name or filesystem path.

Parameters:

Name Type Description Default
path_or_table str

Hive table name or filesystem/URI path.

required
format str

Optional explicit format override (e.g., "csv").

None
**kwargs Union[str, int, bool, float]

Additional Spark read options.

{}

Returns:

Type Description
DataFrame

Spark DataFrame loaded from the specified source.

Source code in syntho_hive/connectors/spark_io.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def read_dataset(
    self,
    path_or_table: str,
    format: str = None,
    **kwargs: Union[str, int, bool, float],
) -> DataFrame:
    """Read a dataset from a table name or filesystem path.

    Args:
        path_or_table: Hive table name or filesystem/URI path.
        format: Optional explicit format override (e.g., ``"csv"``).
        **kwargs: Additional Spark read options.

    Returns:
        Spark DataFrame loaded from the specified source.
    """
    # Simple heuristic
    if (
        "/" in path_or_table
        or "\\" in path_or_table
        or path_or_table.startswith("file://")
    ):
        if format:
            return self.spark.read.format(format).load(path_or_table, **kwargs)

        if path_or_table.endswith(".csv"):
            return (
                self.spark.read.format("csv")
                .option("header", "true")
                .option("inferSchema", "true")
                .option("multiLine", "true")
                .load(path_or_table, **kwargs)
            )
        elif path_or_table.endswith(".parquet"):
            return self.spark.read.format("parquet").load(path_or_table, **kwargs)
        else:
            # Default to parquet for directories/tables (matching write default)
            return self.spark.read.format("parquet").load(path_or_table, **kwargs)
    return self.spark.table(path_or_table)

write_dataset

write_dataset(df: DataFrame, target_path: str, mode: str = 'overwrite', partition_by: Optional[str] = None, format: str = 'parquet')

Write a Spark DataFrame to storage.

Parameters:

Name Type Description Default
df DataFrame

Spark DataFrame to persist.

required
target_path str

Output path (directory or table location).

required
mode str

Save mode, e.g., "overwrite" or "append".

'overwrite'
partition_by Optional[str]

Optional column name to partition by.

None
format str

Output format, defaults to "parquet".

'parquet'
Source code in syntho_hive/connectors/spark_io.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def write_dataset(
    self,
    df: DataFrame,
    target_path: str,
    mode: str = "overwrite",
    partition_by: Optional[str] = None,
    format: str = "parquet",
):
    """Write a Spark DataFrame to storage.

    Args:
        df: Spark DataFrame to persist.
        target_path: Output path (directory or table location).
        mode: Save mode, e.g., ``"overwrite"`` or ``"append"``.
        partition_by: Optional column name to partition by.
        format: Output format, defaults to ``"parquet"``.
    """
    writer = df.write.format(format).mode(mode)
    if partition_by:
        writer = writer.partitionBy(partition_by)
    writer.save(target_path)

write_pandas

write_pandas(pdf: DataFrame, target_path: str, mode: str = 'overwrite', format: str = 'parquet')

Write a Pandas DataFrame using Spark-backed persistence.

Parameters:

Name Type Description Default
pdf DataFrame

Pandas DataFrame to persist.

required
target_path str

Output path for the written dataset.

required
mode str

Save mode for Spark writer (default "overwrite").

'overwrite'
format str

Storage format, defaults to "parquet".

'parquet'
Source code in syntho_hive/connectors/spark_io.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def write_pandas(
    self,
    pdf: pd.DataFrame,
    target_path: str,
    mode: str = "overwrite",
    format: str = "parquet",
):
    """Write a Pandas DataFrame using Spark-backed persistence.

    Args:
        pdf: Pandas DataFrame to persist.
        target_path: Output path for the written dataset.
        mode: Save mode for Spark writer (default ``"overwrite"``).
        format: Storage format, defaults to ``"parquet"``.
    """
    sdf = self.spark.createDataFrame(pdf)
    self.write_dataset(sdf, target_path, mode=mode, format=format)

syntho_hive.connectors.sampling.RelationalSampler

Relational stratified sampler for parent-child table hierarchies.

Source code in syntho_hive/connectors/sampling.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class RelationalSampler:
    """Relational stratified sampler for parent-child table hierarchies."""

    def __init__(self, metadata: Metadata, spark: SparkSession):
        """Initialize the sampler.

        Args:
            metadata: Metadata describing tables and their keys.
            spark: Active SparkSession for table access.
        """
        self.metadata = metadata
        self.spark = spark

    def sample_relational(
        self, root_table: str, sample_size: int, stratify_by: Optional[str] = None
    ) -> Dict[str, DataFrame]:
        """Sample a root table and cascade the sample to child tables.

        Args:
            root_table: Name of the parent/root table to sample.
            sample_size: Approximate number of rows to retain from the root.
            stratify_by: Optional column for stratified sampling.

        Returns:
            Dictionary mapping table name to sampled Spark DataFrame.
        """
        sampled_data = {}

        # 1. Sample Root
        print(f"Sampling root table: {root_table}")
        # Placeholder for real table loading
        root_df = self.spark.table(root_table)

        if stratify_by:
            # Approximate stratified sampling
            fractions = (
                root_df.select(stratify_by)
                .distinct()
                .withColumn("fraction", F.lit(0.1))
                .rdd.collectAsMap()
            )
            # Note: fractions logic needs to be calculated based on sample_size / total_count
            sampled_root = root_df.sampleBy(stratify_by, fractions, seed=42)
        else:
            fraction = min(1.0, sample_size / root_df.count())
            sampled_root = root_df.sample(
                withReplacement=False, fraction=fraction, seed=42
            )

        sampled_data[root_table] = sampled_root

        # 2. Cascade to Children using BFS for multi-level hierarchies
        tables_to_process = [root_table]
        processed = set()

        while tables_to_process:
            current = tables_to_process.pop(0)
            if current in processed:
                continue
            processed.add(current)

            current_pk = self.metadata.get_table(current).pk
            current_sampled = sampled_data[current]

            # Find children of the current table
            for child_name, config in self.metadata.tables.items():
                if child_name in processed:
                    continue
                for child_col, parent_ref in config.fk.items():
                    parent_table = parent_ref.split(".")[0]
                    if parent_table == current:
                        print(f"Cascading sample to child: {child_name}")
                        child_df = self.spark.table(child_name)

                        # Semi-join: keep only child rows matching sampled parent PKs
                        # without introducing ambiguous duplicate columns
                        child_sampled = child_df.join(
                            current_sampled.select(current_pk).distinct(),
                            child_df[child_col] == current_sampled[current_pk],
                            "left_semi",
                        )

                        sampled_data[child_name] = child_sampled
                        tables_to_process.append(child_name)
                        break  # Only process the first FK match per child table

        return sampled_data

sample_relational

sample_relational(root_table: str, sample_size: int, stratify_by: Optional[str] = None) -> Dict[str, DataFrame]

Sample a root table and cascade the sample to child tables.

Parameters:

Name Type Description Default
root_table str

Name of the parent/root table to sample.

required
sample_size int

Approximate number of rows to retain from the root.

required
stratify_by Optional[str]

Optional column for stratified sampling.

None

Returns:

Type Description
Dict[str, DataFrame]

Dictionary mapping table name to sampled Spark DataFrame.

Source code in syntho_hive/connectors/sampling.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def sample_relational(
    self, root_table: str, sample_size: int, stratify_by: Optional[str] = None
) -> Dict[str, DataFrame]:
    """Sample a root table and cascade the sample to child tables.

    Args:
        root_table: Name of the parent/root table to sample.
        sample_size: Approximate number of rows to retain from the root.
        stratify_by: Optional column for stratified sampling.

    Returns:
        Dictionary mapping table name to sampled Spark DataFrame.
    """
    sampled_data = {}

    # 1. Sample Root
    print(f"Sampling root table: {root_table}")
    # Placeholder for real table loading
    root_df = self.spark.table(root_table)

    if stratify_by:
        # Approximate stratified sampling
        fractions = (
            root_df.select(stratify_by)
            .distinct()
            .withColumn("fraction", F.lit(0.1))
            .rdd.collectAsMap()
        )
        # Note: fractions logic needs to be calculated based on sample_size / total_count
        sampled_root = root_df.sampleBy(stratify_by, fractions, seed=42)
    else:
        fraction = min(1.0, sample_size / root_df.count())
        sampled_root = root_df.sample(
            withReplacement=False, fraction=fraction, seed=42
        )

    sampled_data[root_table] = sampled_root

    # 2. Cascade to Children using BFS for multi-level hierarchies
    tables_to_process = [root_table]
    processed = set()

    while tables_to_process:
        current = tables_to_process.pop(0)
        if current in processed:
            continue
        processed.add(current)

        current_pk = self.metadata.get_table(current).pk
        current_sampled = sampled_data[current]

        # Find children of the current table
        for child_name, config in self.metadata.tables.items():
            if child_name in processed:
                continue
            for child_col, parent_ref in config.fk.items():
                parent_table = parent_ref.split(".")[0]
                if parent_table == current:
                    print(f"Cascading sample to child: {child_name}")
                    child_df = self.spark.table(child_name)

                    # Semi-join: keep only child rows matching sampled parent PKs
                    # without introducing ambiguous duplicate columns
                    child_sampled = child_df.join(
                        current_sampled.select(current_pk).distinct(),
                        child_df[child_col] == current_sampled[current_pk],
                        "left_semi",
                    )

                    sampled_data[child_name] = child_sampled
                    tables_to_process.append(child_name)
                    break  # Only process the first FK match per child table

    return sampled_data