Source code in io_bench/io_bench.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
class IOBench:
    def __init__(
            self, 
            source_file: str, 
            output_dir: str = './data', 
            runs: int = 10, 
            parsers: Optional[List[str]] = None
        ) -> None:
        """
        Benchmark performance of standard flat file formats and partitioning schemes.

        Args:
            source_file (str): Path to the source CSV file.
            output_dir (str): Directory for output files.
            runs (int): Number of benchmark runs.
            parsers (Optional[List[str]]): List of parsers to use.
        """
        if parsers is None:
            parsers = [
                'avro', 
                'parquet_polars', 
                'parquet_arrow', 
                'parquet_fast', 
                'feather', 
                'feather_arrow'
            ]

        self.source_file = source_file
        self.output_dir = output_dir
        self.runs = runs
        self.benchmark_counter = 0
        self.partitioned = False
        self.console = Console()

        self.avro_dir = os.path.join(output_dir, 'avro')
        self.parquet_dir = os.path.join(output_dir, 'parquet')
        self.feather_dir = os.path.join(output_dir, 'feather')

        self.available_parsers = {
            'avro': AvroParser(self.avro_dir),
            'parquet_polars': PolarsParquetParser(self.parquet_dir),
            'parquet_arrow': ArrowParquetParser(self.parquet_dir),
            'parquet_fast': FastParquetParser(self.parquet_dir),
            'feather': FeatherParser(self.feather_dir),
            'feather_arrow': ArrowFeatherParser(self.feather_dir)
        }

        self.parsers = parsers if parsers is not None else list(self.available_parsers.keys())
        self.selected_parsers = {name: self.available_parsers[name] for name in self.parsers if name in self.available_parsers}

    def generate_sample(self, records: int = 100000) -> None:
        """
        Generate sample data and save it to the source file.

        Args:
            records (int): Number of records to generate.
        """
        if os.path.exists(self.source_file):
            self.console.print(f"[yellow]Source file '{self.source_file}' already exists. Skipping data generation.")
            return

        # Calculate the exact number of repetitions needed for the records
        base_repeats = records // 3
        remainder = records % 3

        data = {
            'Region': (['North America', 'Europe', 'Asia'] * base_repeats) + ['North America', 'Europe', 'Asia'][:remainder],
            'Country': (['USA', 'Germany', 'China'] * base_repeats) + ['USA', 'Germany', 'China'][:remainder],
            'Total Cost': ([1000.0, 1500.5, 2000.75] * base_repeats) + [1000.0, 1500.5, 2000.75][:remainder],
            'Sales': ([5000.0, 7000.5, 9000.75] * base_repeats) + [5000.0, 7000.5, 9000.75][:remainder],
            'Profit': ([2500.0, 3500.5, 4500.75] * base_repeats) + [2500.0, 3500.5, 4500.75][:remainder]
        }

        df = pd.DataFrame(data)

        os.makedirs(os.path.dirname(self.source_file), exist_ok=True)
        with self.console.status(f'[cyan]Generating {records} records of data ...', spinner='bouncingBar'):
            df.to_csv(self.source_file, index=False)


    def clear_partitions(self) -> None:
        """
        Clear the partition folders by deleting all files in the avro, parquet, and feather directories.
        """
        for directory in [self.avro_dir, self.parquet_dir, self.feather_dir]:
            for filename in os.listdir(directory):
                file_path = os.path.join(directory, filename)
                try:
                    if os.path.isfile(file_path) or os.path.islink(file_path):
                        os.unlink(file_path)
                except Exception as e:
                    self.console.print(f"[red]Failed to delete {file_path}. Reason: {e}")

    def partition(self, rows: dict = None) -> None:
        """
        Partition source data into multiple files based on specified row chunks.

        Args:
            rows (dict): Dictionary specifying number of rows per partition for each file format.
        """
        if rows is None:
            default_rows = {
                'avro': 500000,
                'parquet': 3000000,
                'feather': 1600000
            }
        else:
            default_rows = rows
        # map simple chunk sizes
        row_chunks = {
            'avro': default_rows['avro'],
            'parquet_polars': default_rows['parquet'],
            'parquet_arrow': default_rows['parquet'],
            'parquet_fast': default_rows['parquet'],
            'feather': default_rows['feather'],
            'feather_arrow': default_rows['feather']
        }
        with self.console.status('[cyan]writing partitioned data...', spinner='dots'):
            asyncio.run(self._partition(row_chunks))
        self.partitioned = True

    async def _partition(self, row_chunks: Dict[str, int]) -> None:
        """
        Asynchronously partition the data into different formats.

        Args:
            row_chunks (Dict[str, int]): Dictionary specifying the number of rows per partition for each format.
        """
        df = pd.read_csv(self.source_file)

        os.makedirs(self.avro_dir, exist_ok=True)
        os.makedirs(self.parquet_dir, exist_ok=True)
        os.makedirs(self.feather_dir, exist_ok=True)

        total_rows = df.shape[0]

        partition_ranges = {}
        for parser, chunk_size in row_chunks.items():
            if parser in self.selected_parsers:
                partition_ranges[parser] = self._calculate_partition_ranges(total_rows, chunk_size)

        self.clear_partitions()
        # self.console.print(partition_ranges)

        tasks = []
        for parser, ranges in partition_ranges.items():
            for partition_id, (start_idx, end_idx) in enumerate(ranges):
                partition_df = df.iloc[start_idx:end_idx]
                if parser == 'avro':
                    tasks.append(self._write_avro(partition_df, os.path.join(self.avro_dir, f'part_{partition_id}.avro')))
                elif parser.startswith('parquet'):
                    tasks.append(self._write_parquet(partition_df, os.path.join(self.parquet_dir, f'{parser}_part_{partition_id}.parquet')))
                elif parser.startswith('feather'):
                    tasks.append(self._write_feather(partition_df, os.path.join(self.feather_dir, f'{parser}_part_{partition_id}.feather')))

        await asyncio.gather(*tasks)

    @staticmethod
    def _calculate_partition_ranges(total_rows: int, row_chunks: int) -> List[tuple]:
        """
        Calculate the partition ranges given the total number of rows and chunk size.

        Args:
            total_rows (int): Total number of rows in the DataFrame.
            row_chunks (int): The chunk size in terms of rows.

        Returns:
            List[tuple]: List of tuples where each tuple represents a start and end index for a partition.
        """
        num_partitions = max(1, (total_rows + row_chunks - 1) // row_chunks)  # Ensure at least one partition
        rows_per_partition = total_rows // num_partitions
        remainder = total_rows % num_partitions

        partition_ranges = []
        start_idx = 0

        for i in range(num_partitions):
            end_idx = start_idx + rows_per_partition + (1 if i < remainder else 0)
            partition_ranges.append((start_idx, min(end_idx, total_rows)))
            start_idx = end_idx

        return partition_ranges

    @staticmethod
    async def _write_avro(df: pd.DataFrame, file_path: str) -> None:
        """
        Write a DataFrame to an Avro file without needing a predefined schema.

        Args:
            df (pd.DataFrame): DataFrame to write.
            file_path (str): Path to the output Avro file.
        """
        records = df.to_dict('records')

        # Generate Avro schema based on the DataFrame columns and types
        avro_type_mapping = {
            'int64': ['long', 'null'],
            'float64': ['double', 'null'],
            'object': ['string', 'null'],
            'bool': ['boolean', 'null'],
            'datetime64[ns]': [{'type': 'long', 'logicalType': 'timestamp-micros'}, 'null']
        }

        # Create schema
        schema = {
            'type': 'record',
            'name': 'Benchmark',
            'fields': [
                {'name': col, 'type': avro_type_mapping.get(str(dtype), ['string', 'null'])}
                for col, dtype in df.dtypes.items()
            ]
        }

        # Write records
        with open(file_path, 'wb') as out:
            fastavro.writer(out, schema, records)

    @staticmethod
    async def _write_parquet(df: pd.DataFrame, file_path: str) -> None:
        """
        Write a DataFrame to a Parquet file.

        Args:
            df (pd.DataFrame): DataFrame to write.
            file_path (str): Path to the output Parquet file.
        """
        table = pa.Table.from_pandas(df)
        pq.write_table(table, file_path)

    @staticmethod
    async def _write_feather(df: pd.DataFrame, file_path: str) -> None:
        """
        Write a DataFrame to a Feather file.

        Args:
            df (pd.DataFrame): DataFrame to write.
            file_path (str): Path to the output Feather file.
        """
        table = pa.Table.from_pandas(df)
        feather.write_feather(table, file_path)

    def run(self, columns: Optional[List[str]] = None, suffix: Optional[str] = None) -> List[Bench]:
        """
        Run benchmarks using the specified parsers.

        Args:
            columns (Optional[List[str]]): List of columns to select.
            suffix (Optional[str]]): Suffix for benchmark IDs.

        Returns:
            List[Bench]: List of benchmark results.
        """
        if not self.partitioned:
            self.partition()

        benchmarks = []

        if suffix is None:
            suffix = f'_{self.benchmark_counter}'
            self.benchmark_counter += 1

        for name, parser in self.selected_parsers.items():
            bench = Bench(parser, columns=columns, num_runs=self.runs, id=f'{name}{suffix}').benchmark()
            benchmarks.append(bench)

        return benchmarks

    @staticmethod
    def report(benchmark_results: List[Bench], report_dir: str = './result') -> None:
        """
        Generate a report from benchmark results.

        Args:
            benchmark_results (List[Bench]): List of benchmark results.
            report_dir (str): Directory to save the report.
        """
        generate_report(benchmark_results, report_dir)

__init__(source_file, output_dir='./data', runs=10, parsers=None)

Benchmark performance of standard flat file formats and partitioning schemes.

Parameters:

Name Type Description Default
source_file str

Path to the source CSV file.

required
output_dir str

Directory for output files.

'./data'
runs int

Number of benchmark runs.

10
parsers Optional[List[str]]

List of parsers to use.

None
Source code in io_bench/io_bench.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def __init__(
        self, 
        source_file: str, 
        output_dir: str = './data', 
        runs: int = 10, 
        parsers: Optional[List[str]] = None
    ) -> None:
    """
    Benchmark performance of standard flat file formats and partitioning schemes.

    Args:
        source_file (str): Path to the source CSV file.
        output_dir (str): Directory for output files.
        runs (int): Number of benchmark runs.
        parsers (Optional[List[str]]): List of parsers to use.
    """
    if parsers is None:
        parsers = [
            'avro', 
            'parquet_polars', 
            'parquet_arrow', 
            'parquet_fast', 
            'feather', 
            'feather_arrow'
        ]

    self.source_file = source_file
    self.output_dir = output_dir
    self.runs = runs
    self.benchmark_counter = 0
    self.partitioned = False
    self.console = Console()

    self.avro_dir = os.path.join(output_dir, 'avro')
    self.parquet_dir = os.path.join(output_dir, 'parquet')
    self.feather_dir = os.path.join(output_dir, 'feather')

    self.available_parsers = {
        'avro': AvroParser(self.avro_dir),
        'parquet_polars': PolarsParquetParser(self.parquet_dir),
        'parquet_arrow': ArrowParquetParser(self.parquet_dir),
        'parquet_fast': FastParquetParser(self.parquet_dir),
        'feather': FeatherParser(self.feather_dir),
        'feather_arrow': ArrowFeatherParser(self.feather_dir)
    }

    self.parsers = parsers if parsers is not None else list(self.available_parsers.keys())
    self.selected_parsers = {name: self.available_parsers[name] for name in self.parsers if name in self.available_parsers}

clear_partitions()

Clear the partition folders by deleting all files in the avro, parquet, and feather directories.

Source code in io_bench/io_bench.py
101
102
103
104
105
106
107
108
109
110
111
112
def clear_partitions(self) -> None:
    """
    Clear the partition folders by deleting all files in the avro, parquet, and feather directories.
    """
    for directory in [self.avro_dir, self.parquet_dir, self.feather_dir]:
        for filename in os.listdir(directory):
            file_path = os.path.join(directory, filename)
            try:
                if os.path.isfile(file_path) or os.path.islink(file_path):
                    os.unlink(file_path)
            except Exception as e:
                self.console.print(f"[red]Failed to delete {file_path}. Reason: {e}")

generate_sample(records=100000)

Generate sample data and save it to the source file.

Parameters:

Name Type Description Default
records int

Number of records to generate.

100000
Source code in io_bench/io_bench.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def generate_sample(self, records: int = 100000) -> None:
    """
    Generate sample data and save it to the source file.

    Args:
        records (int): Number of records to generate.
    """
    if os.path.exists(self.source_file):
        self.console.print(f"[yellow]Source file '{self.source_file}' already exists. Skipping data generation.")
        return

    # Calculate the exact number of repetitions needed for the records
    base_repeats = records // 3
    remainder = records % 3

    data = {
        'Region': (['North America', 'Europe', 'Asia'] * base_repeats) + ['North America', 'Europe', 'Asia'][:remainder],
        'Country': (['USA', 'Germany', 'China'] * base_repeats) + ['USA', 'Germany', 'China'][:remainder],
        'Total Cost': ([1000.0, 1500.5, 2000.75] * base_repeats) + [1000.0, 1500.5, 2000.75][:remainder],
        'Sales': ([5000.0, 7000.5, 9000.75] * base_repeats) + [5000.0, 7000.5, 9000.75][:remainder],
        'Profit': ([2500.0, 3500.5, 4500.75] * base_repeats) + [2500.0, 3500.5, 4500.75][:remainder]
    }

    df = pd.DataFrame(data)

    os.makedirs(os.path.dirname(self.source_file), exist_ok=True)
    with self.console.status(f'[cyan]Generating {records} records of data ...', spinner='bouncingBar'):
        df.to_csv(self.source_file, index=False)

partition(rows=None)

Partition source data into multiple files based on specified row chunks.

Parameters:

Name Type Description Default
rows dict

Dictionary specifying number of rows per partition for each file format.

None
Source code in io_bench/io_bench.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def partition(self, rows: dict = None) -> None:
    """
    Partition source data into multiple files based on specified row chunks.

    Args:
        rows (dict): Dictionary specifying number of rows per partition for each file format.
    """
    if rows is None:
        default_rows = {
            'avro': 500000,
            'parquet': 3000000,
            'feather': 1600000
        }
    else:
        default_rows = rows
    # map simple chunk sizes
    row_chunks = {
        'avro': default_rows['avro'],
        'parquet_polars': default_rows['parquet'],
        'parquet_arrow': default_rows['parquet'],
        'parquet_fast': default_rows['parquet'],
        'feather': default_rows['feather'],
        'feather_arrow': default_rows['feather']
    }
    with self.console.status('[cyan]writing partitioned data...', spinner='dots'):
        asyncio.run(self._partition(row_chunks))
    self.partitioned = True

report(benchmark_results, report_dir='./result') staticmethod

Generate a report from benchmark results.

Parameters:

Name Type Description Default
benchmark_results List[IOBench]

List of benchmark results.

required
report_dir str

Directory to save the report.

'./result'
Source code in io_bench/io_bench.py
288
289
290
291
292
293
294
295
296
297
@staticmethod
def report(benchmark_results: List[Bench], report_dir: str = './result') -> None:
    """
    Generate a report from benchmark results.

    Args:
        benchmark_results (List[Bench]): List of benchmark results.
        report_dir (str): Directory to save the report.
    """
    generate_report(benchmark_results, report_dir)

run(columns=None, suffix=None)

Run benchmarks using the specified parsers.

Parameters:

Name Type Description Default
columns Optional[List[str]]

List of columns to select.

None
suffix Optional[str]]

Suffix for benchmark IDs.

None

Returns:

Type Description
List[IOBench]

List[Bench]: List of benchmark results.

Source code in io_bench/io_bench.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def run(self, columns: Optional[List[str]] = None, suffix: Optional[str] = None) -> List[Bench]:
    """
    Run benchmarks using the specified parsers.

    Args:
        columns (Optional[List[str]]): List of columns to select.
        suffix (Optional[str]]): Suffix for benchmark IDs.

    Returns:
        List[Bench]: List of benchmark results.
    """
    if not self.partitioned:
        self.partition()

    benchmarks = []

    if suffix is None:
        suffix = f'_{self.benchmark_counter}'
        self.benchmark_counter += 1

    for name, parser in self.selected_parsers.items():
        bench = Bench(parser, columns=columns, num_runs=self.runs, id=f'{name}{suffix}').benchmark()
        benchmarks.append(bench)

    return benchmarks