On July 16, 2021, Amazon Athena upgraded its Apache Hudi integration with new features and support for Hudi’s latest 0.8.0 release. Hudi is an open-source storage management framework that provides incremental data processing primitives for Hadoop-compatible data lakes. This upgraded integration adds the latest community improvements to Hudi along with important new features including snapshot queries, which provide near real-time views of table data, and reading bootstrapped tables which provide efficient migration of existing table data.

In this series of posts on Athena and Hudi, we will provide a short overview of key Hudi capabilities along with detailed procedures for using read-optimized queries, snapshot queries, and bootstrapped tables.

Overview

With Apache Hudi, you can perform record-level inserts, updates, and deletes on Amazon S3, allowing you to comply with data privacy laws, consume real-time streams and change data captures, reinstate late-arriving data, and track history and rollbacks in an open, vendor neutral format. Apache Hudi uses Apache Parquet and Apache Avro storage formats for data storage, and includes built-in integrations with Apache Spark, Apache Hive, and Apache Presto, which enables you to query Apache Hudi datasets using the same tools that you use today with near-real-time access to fresh data.

An Apache Hudi dataset can be one of the following table types:

  • Copy on Write (CoW) – Data is stored in columnar format (Parquet), and each update creates a new version of the base file on a write commit. A CoW table type typically lends itself to read-heavy workloads on data that changes less frequently.
  • Merge on Read (MoR) – Data is stored using a combination of columnar (Parquet) and row-based (Avro) formats. Updates are logged to row-based delta files and are compacted as needed to create new versions of the columnar files. A MoR table type is typically suited for write-heavy or change-heavy workloads with fewer reads.

Apache Hudi provides three logical views for accessing data:

  • Read-optimized – Provides the latest committed dataset from CoW tables and the latest compacted dataset from MoR tables
  • Incremental – Provides a change stream between two actions out of a CoW dataset to feed downstream jobs and extract, transform, load (ETL) workflows
  • Real-time – Provides the latest committed data from a MoR table by merging the columnar and row-based files inline

As of this writing, Athena supports read-optimized and real-time views.

Using read-optimized queries

In this post, you will use Athena to query an Apache Hudi read-optimized view on data residing in Amazon S3. The walkthrough includes the following high-level steps:

  1. Store raw data in an S3 data lake.
  2. Transform the raw data to Apache Hudi CoW and MoR tables using Apache Spark on Amazon EMR.
  3. Query and analyze the tables on Amazon S3 with Athena on a read-optimized view.
  4. Perform an update to a row in the Apache Hudi dataset.
  5. Query and analyze the updated dataset using Athena.

Architecture

The following diagram illustrates our solution architecture.

bdb959 query apache hudi athena 1

In this architecture, you have high-velocity weather data stored in an S3 data lake. This raw dataset is processed on Amazon EMR and stored in an Apache Hudi dataset in Amazon S3 for further analysis by Athena. If the data is updated, Apache Hudi performs an update on the existing record, and these updates are reflected in the results fetched by the Athena query.

Let’s build this architecture.

Prerequisites

Before getting started, we set up our resources. For this post, we use the us-east-1 Region.

  1. Create an Amazon Elastic Compute Cloud (Amazon EC2) key pair. For instructions, see Create a key pair using Amazon EC2.
  2. Create a S3 bucket for storing the raw weather data (for this post, we call it weather-raw-bucket).
  3. Create two folders in the S3 bucket: parquet_file and delta_parquet.
  4. Download all the data files, Apache Scala scripts (data_insertion_cow_delta_script, data_insertion_cow_script, data_insertion_mor_delta_script, and data_insertion_mor_script), and Athena DDL code (athena_weather_hudi_cow.sql and athena_weather_hudi_mor.sql) from the GitHub repo.
  5. Upload the weather_oct_2020.parquet file to weather-raw-bucket/parquet_file.
  6. Upload the file weather_delta.parquet to weather-raw-bucket/delta_parquet. We update an existing weather record from a relative_humidity of 81 to 50 and a temperature of 6.4 to 10.
  7. Create another S3 bucket for storing the Apache Hudi dataset. For this post, we create a bucket with a corresponding subfolder named athena-hudi-bucket/hudi_weather.
  8. Deploy the EMR cluster using the provided AWS CloudFormation template:
    LaunchStack
  9. Enter a name for your stack.
  10. Choose a pre-created key pair name.

This is required to connect to the EMR cluster nodes. For more information, see Connect to the Master Node Using SSH.

bdb959 query apache hudi athena 2

  1. Accept all the defaults and choose Next.
  2. Acknowledge that AWS CloudFormation might create AWS Identity and Access Management (IAM) resources.
  3. Choose Create stack.

bdb959 query apache hudi athena 3

Use Apache Hudi with Amazon EMR

When the cluster is ready, you can use the provided key pair to SSH into the primary node.

  1. Use the following bash command to load the spark-shell to work with Apache Hudi:
    spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

  2. On the spark-shell, run the following Scala code in the script data_insertion_cow_script to import weather data from the S3 data lake to an Apache Hudi dataset using the CoW storage type:
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.functions._
    import org.apache.hudi.DataSourceWriteOptions
    import org.apache.hudi.config.HoodieWriteConfig
    import org.apache.hudi.hive.MultiPartKeysValueExtractor //Set up various input values as variables
    val inputDataPath = "s3://weather-raw-bucket/parquet_file/"
    val hudiTableName = "weather_hudi_cow"
    val hudiTablePath = "s3://athena-hudi-bucket/hudi_weather/" + hudiTableName // Set up our Hudi Data Source Options
    val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "city_id", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "timestamp", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "date", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName) // Read data from S3 and create a DataFrame with Partition and Record Key
    val inputDF = spark.read.format("parquet").load(inputDataPath) // Write data into the Hudi dataset
    inputDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiTablePath)

Replace the S3 bucket path for inputDataPath and hudiTablePath in the preceding code with your S3 bucket.

For more information about DataSourceWriteOptions, see Work with a Hudi Dataset.

  1. In the spark-shell, count the total number of records in the Apache Hudi dataset:
    scala> inputDF.count()
    res1: Long = 1000

  2. Repeat the same step for creating an MoR table using data_insertion_mor_script (the default is COPY_ON_WRITE).
  3. Run the spark.sql("show tables").show(); query to list three tables, one for CoW and two queries, _rt and _ro, for MoR.

The following screenshot shows our output.

bdb959 query apache hudi athena 4

Let’s check the processed Apache Hudi dataset in the S3 data lake.

  1. On the Amazon S3 console, confirm the subfolders weather_hudi_cow and weather_hudi_mor are in athena-hudi-bucket.
    bdb959 query apache hudi athena 5
  1. Navigate to the weather_hudi_cow subfolder to see the Apache Hudi dataset that is partitioned using the date key—one for each date in our dataset.
    bdb959 query apache hudi athena 6
  2. On the Athena console, create a hudi_athena_test database using following command:
    create database hudi_athena_test;

You use this database to create all your tables.

  1. Create an Athena table using the athena_weather_hudi_cow.sql script:
    CREATE EXTERNAL TABLE weather_partition_cow( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `city_id` string, `timestamp` string, `relative_humidity` decimal(3,1), `temperature` decimal(3,1), `absolute_humidity` decimal(5,4) ) PARTITIONED BY ( `date` string)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION 's3://athena-hudi-bucket/hudi_weather/weather_hudi_cow'
    

Replace the S3 bucket path in the preceding code with your S3 bucket (Hudi table path) in LOCATION.

bdb959 query apache hudi athena 7

  1. Add partitions to the table by running the following query from the athena_weather_judi_cow.sql script on the Athena console:
    ALTER TABLE weather_partition_cow ADD
    PARTITION (date = '2020-10-01') LOCATION 's3://athena-hudi-bucket/hudi_weather/weather_hudi_cow/2020-10-01/'
    PARTITION (date = '2020-10-02') LOCATION 's3://athena-hudi-bucket/hudi_weather/weather_hudi_cow/2020-10-02/'
    PARTITION (date = '2020-10-03') LOCATION 's3://athena-hudi-bucket/hudi_weather/weather_hudi_cow/2020-10-03/'
    PARTITION (date = '2020-10-04') LOCATION 's3://athena-hudi-bucket/hudi_weather/weather_hudi_cow/2020-10-04/';

Replace the S3 bucket path in the preceding code with your S3 bucket (Hudi table path) in LOCATION.

  1. Confirm the total number of records in the Apache Hudi dataset with the following query:
    SELECT count(*) FROM "hudi_athena_test"."weather_partition_cow";

It should return a single row with a count of 1,000.

bdb959 query apache hudi athena 8

Now let’s check the record that we want to update.

  1. Run the following query on the Athena console:
    SELECT * FROM "hudi_athena_test"."weather_partition_cow"
    where city_id ='1'
    and date ='2020-10-04'
    and timestamp = '2020-10-04T07:19:12Z';

The output should look like the following screenshot. Note the value of relative_humidity and temperature.

bdb959 query apache hudi athena 9

  1. Return to the Amazon EMR primary node and run the following code in the data_insertion_cow_delta_script script on the spark-shell prompt to update the data:
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.functions._
    import org.apache.hudi.DataSourceWriteOptions
    import org.apache.hudi.config.HoodieWriteConfig
    import org.apache.hudi.hive.MultiPartKeysValueExtractor //Set up various input values as variables
    val inputDataPath = "s3://weather-raw-bucket/delta_parquet/"
    val hudiTableName = "weather_hudi_cow"
    val hudiTablePath = "s3://athena-hudi-bucket/hudi_weather/" + hudiTableName // Set up our Hudi Data Source Options
    val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "city_id", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "timestamp", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "date", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName) // Read data from S3 and create a DataFrame with Partition and Record Key
    val inputDF = spark.read.format("parquet").load(inputDataPath) // Write data into the Hudi dataset
    inputDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiTablePath)
    

Replace the S3 bucket path for inputDataPath and hudiTablePath in the preceding code with your S3 bucket.

  1. Run the following query on the Athena console to confirm no change occurred to the total number of records:
SELECT count(*) FROM "hudi_athena_test"."weather_partition_cow";

The following screenshot shows our query results.

bdb959 query apache hudi athena 10

  1. Run the following query again on the Athena console to check for the update:
SELECT * FROM "hudi_athena_test"."weather_partition_cow"
where city_id ='1'
and date ='2020-10-04'
and timestamp = '2020-10-04T07:19:12Z'

The relative_humidity and temperature values for the relevant record are updated.

bdb959 query apache hudi athena 11

  1. Repeat similar steps for the MoR table.

Clean up the resources

You must clean up the resources you created earlier to avoid ongoing charges.

  1. On the AWS CloudFormation console, delete the stack you launched.
  2. On the Amazon S3 console, empty the buckets weather-raw-bucket and athena-hudi-bucket and delete the buckets.

Conclusion

As you have learned in this post, we used Apache Hudi support in Amazon EMR to develop a data pipeline to simplify incremental data management use cases that require record-level insert and update operations. We used Athena to read the read-optimized view of an Apache Hudi dataset in an S3 data lake.


About the Authors

dhiraj thakur 100Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to provide guidance on enterprise cloud adoption, migration and strategy. He is passionate about technology and enjoys building and experimenting in Analytics and AI/ML space.

 

 

 

sameer goel 100Sameer Goel is a Solutions Architect in The Netherlands, who drives customer success by building prototypes on cutting-edge initiatives. Prior to joining AWS, Sameer graduated with a master’s degree from NEU Boston, with a Data Science concentration. He enjoys building and experimenting with creative projects and applications.

 

 

imtiazImtiaz (Taz) Sayed is the WW Tech Master for Analytics at AWS. He enjoys engaging with the community on all things data and analytics.

 

Categories: Big Data