Thoughts Resampled

Anatoliy Plastinin's Blog

How to Write Data into Parquet

Column oriented data stores have proven its success for many analytical purposes. Such success was shown by RCFile and Apache ORC formats and their wide adoption in many distributed data processing tools in Hadoop ecosystem.

Another columnar format that has gained popularity lately is Apache Parquet, which is now top level Apache project. It is supported by many data processing tools including Spark and Presto provide support for parquet format.

Recently I’ve been experimenting with storing data in the parquet format, so I thought it might be a good idea to share a few examples. This post covers the basics of how to write data into parquet.

Non-hadoop writer

I’m going to show how to implement simple non-hadoop writer. It will be enough to start experimenting with parquet and its options. It also might be useful enough in cases when you have to deal with serial data import and when there is no need for big guns like Spark.

To write data in parquet we need to define a schema. And fortunately parquet provides support for popular data serialization libraries, like avro, protocol buffers and thrift. For demo purposes I simply use protobuf.

Please, do not be confused, protobuf is a serialization library, but here it’s used only to define record with schema. So we can use ProtoParquetWriter that comes with parquet out-of-the-box. Parquet doesn’t use serialization functionality of any of those libraries, it has its own binary format.

Frankly, in most cases protobuf is not the best choice for defining record schema, since it doesn’t has many types that parquet provides, like DECIMAL or INT96 for timestamps.

If you are interested in low level details of how to write custom data classes checkout following examples and benchmarks from parquet repository.

Suppose we have an Event class generated from protobuf definition, so we can write collection of events like this

1
2
3
4
5
6
7
8
9
val compressionCodecName = CompressionCodecName.GZIP
val blockSize = 256 * 1024 * 1024
val pageSize = 1 * 1024 * 1024
val outputPath = new Path("data.parquet")

val parquetWriter = new ProtoParquetWriter[Event](outputPath, classOf[Event], compressionCodecName, blockSize, pageSize)

events.foreach(parquetWriter.write)
parquetWriter.close()

Page size is an amount of data buffered before it’s written as page. This setting might affect compression performance. Block (row group) size is an amount of data buffered in memory before it is written to disc. For more details about what pages and row groups are, please see parquet format documentation.

Writing parquet files to S3

Now let’s see how to write parquet files directly to Amazon S3. This can be done using Hadoop S3 file systems. More precisely, here we’ll use S3A file system. It will require a few code changes, we’ll use ParquetWriter class to be able to pass conf object with AWS settings.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val compressionCodecName = CompressionCodecName.GZIP
val blockSize = 256 * 1024 * 1024
val pageSize = 1 * 1024 * 1024
val outputPath = new Path("s3a://bucket/data.parquet")

val accessKey = "aws_access_key"
val secretKey = "aws_secret_key"
val conf = new Configuration
conf.set("fs.s3a.access.key", accessKey)
conf.set("fs.s3a.secret.key", secretKey)

val writeSupport = new ProtoWriteSupport[Event](classOf[Event])

val parquetWriter = new ParquetWriter[Event](outputPath,
  writeSupport, compressionCodecName, blockSize, pageSize, pageSize,
  ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
  ParquetWriter.DEFAULT_WRITER_VERSION,
  conf
)

Now you should have enough information to start experimenting with parquet, you can find examples from this post as a single project in this repository.

Comments