Apache Parquet, a columnar storage file format, has become a standard for data storage due to its efficient data compression and encoding schemes.
However, streaming data into Parquet files in a memory-efficient manner presents significant challenges, especially in memory-constrained environments since streaming is usually a record-based operation, while Parquet is a columnar format.
Estuary Flow, a data integration and streaming platform, encountered these challenges head-on while developing a connector that streams data into Iceberg. This required the use of a “2-pass write” to efficiently stream data into Parquet files.
The Challenges
The primary challenge in streaming data into Parquet files lies in the memory constraints of the connectors. Estuary Flow connectors are designed to run with a minimal amount of resources to keep the operational costs of the system (and by direct extension: the price the user pays to use the system) as low as possible. Data is streamed to materialization connectors row-by-row, and ideally, the connector will only need to buffer a single row at a time while it passes it on to the destination.
But parquet files are written column-by-column, with many rows grouped into row groups.
A conventional strategy for writing data to Parquet files is to buffer some number of rows in memory that will make up a row group, and then write out the row group all at once. These row groups need to be large enough to allow for efficiently reading the file - upwards of 1GB of data per row group is a typical recommendation (source: https://parquet.apache.org/docs/file-format/configurations). While 1 GB of ram isn’t a big deal for the average engineer’s laptop, it’s actually quite a bit of overhead for a connector that would otherwise only really care about a single row at a time.
In addition to that, the connector needs to handle highly variable user data: some users may have relatively small rows while other users may have single rows that are multiple MBs in size, so even using a fixed number of rows per row group has the potential to require very different amounts of memory.
The need to find a way to stream data into Parquet files without exhausting memory resources led to the development of the 2-pass write solution. This method involves initially writing much smaller row groups to disk and then consolidating them into larger row groups in a memory-efficient manner.
The 2-Pass Write Solution
The core of this methodology involves “transposing” incoming streaming data from a row-oriented to a column-oriented structure using an intermediate scratch file that is stored on disk rather than in memory. This transposition is crucial for the memory-efficient streaming of data into Parquet files.
The 2-pass write method is designed to handle large datasets efficiently. By splitting the process into two passes and focusing on column-wise operations, the method minimizes memory usage while ensuring data integrity and performance.
The 2-pass write solution is a sophisticated approach to handle the memory constraints of streaming data into Parquet files. Here’s a detailed breakdown of the process:
Pass 1: Initial Parquet File with Small Row Groups
In the first pass, the data is written row-by-row to a scratch file on disk that will subsequently be read column-by-column. While there are a variety of conceivable file formats that could be used for this, it turns out that Parquet itself works pretty well for reading column-by-column, and as long as the Parquet scratch file uses very small row groups (a few tens of megabytes), the amount of RAM the connector uses is minimized.
The high-level pseudocode for the main algorithm driving the writing of the scratch file and later reading of the scratch file (more on that below) is shown below, and you can check out the actual code our connector uses here.
pythonfor row in streamedRows:
# Append the row's data to our limited in-memory buffer.
buffer.append(row)
# Accounting for the size of the row in the buffer. The sizeOfRow function
# produces a best estimate of the amount of memory each "row" consumes. It's
# not exact, but with some simplifying assumptions it's close enough.
bufferedSizeBytes += sizeOfRow(row)
# Only allow for a relatively small amount of buffering.
if bufferedSizeBytes > MEMORY_LIMIT_BYTES:
scratchSizeBytes += writeBufferToScratchFile(buffer)
# Clear the buffer now that its data has been written to the scratch
# file.
buffer = []
# Once the scratch file is as large as the target row group size,
# read it column-by-column and stream the data to the output.
if scratchSizeBytes > TARGET_ROW_GROUP_SIZE_BYTES:
writeScratchFileToOutputStream()
One thing that’s obvious from this strategy is that we need to be able to interact with Parquet files at a relatively low level to have direct access to reading and writing column chunks and row groups. Our code for this is currently written in Go and uses the Apache Parquet package, and in particular the file module for direct access to column chunk readers and writers.
Pass 2: Transition from Scratch File to Output Row Groups
Once the scratch file reaches the size of one output row group, the process transitions to the second pass. The low-level Apache Parquet package is employed to read the scratch file column-by-column, compacting the many small row groups of the scratch file into one single row group that is streamed to the output The entire process is then repeated to add additional row groups to the final output file.
The high-level algorithm for the second pass is shown below:
python# Process the data column-by-column.
for colIdx in range(numColumns):
# This output column writer will be used for all of the values read from all
# of the row groups.
columnWriter = getWriterForColumn(colIdx)
for rowGroup in rowGroups:
# Each scratch file row group is read for the values of the current
# column.
columnReader = getColumnReaderForColumn(rowGroup, colIdx)
value = columnReader.Read()
columnWriter.Write(value)
The final output here is a stream of bytes that can be piped directly to an object store or similar destination: Our connector code uses a file.Writer from the Apache Parquet package to construct column writers, and that file writer writes directly to files in S3.
Limitations of a 2-Pass Write
Although we have had good success with this strategy, we have encountered a couple of limitations that are good to be aware of.
Second Pass Overhead
It’s probably obvious that there’s going to be extra overhead from encoding the data for the first pass, and then decoding that data and encoding it again for the second pass. It would be faster to use enough memory to buffer an entire row group and do a single-pass write. In isolation, the 2-pass write of Parquet is also slower than writing data in a more stream-oriented file format like Avro or even NDJSON.
In practice, we have seen that the 2-pass write is still quite fast, and is unlikely to be a serious performance bottleneck in real production data pipelines. The higher costs that would be needed to enable a single pass write don’t seem worth it, and writing files in Parquet format is very desirable for the end user who will end up querying this data.
Scratch File Metadata Size
Opening a Parquet file involves reading its metadata and holding that in memory. It turns out that a Parquet file with a large number of columns (as in 1000+ columns) and many row groups can have quite a bit of metadata - enough that if left unchecked could blow up the memory usage of the connector in extreme cases.
To prevent excessive metadata sizes, we use a simple heuristic that kicks off writing the scratch file to the output if it exceeds a certain number of column chunks. This means that data sets having huge numbers of columns might end up with smaller output row group sizes than they otherwise would. Data schemas like this are fairly rare and we haven’t seen any issues with excessively small row group sizes yet. This limitation could be improved by using a more memory-efficient Parquet implementation like Rust, or using a different scratch file format.