Data lakes give organizations the ability to harness data from multiple sources in less time. Users across different roles are now empowered to collaborate and analyze data in different ways, leading to better, faster decision-making. Amazon Simple Storage Service (Amazon S3) is the highly performant object storage service for structured and unstructured data and the storage service of choice to build a data lake.

However, many use cases like performing change data capture (CDC) from an upstream relational database to an Amazon S3-based data lake require handling data at a record level. Performing an operation like inserting, updating, and deleting individual records from a dataset requires the processing engine to read all the objects (files), make the changes, and rewrite the entire dataset as new files. Furthermore, making the data available in the data lake in near-real time often leads to the data being fragmented over many small files, resulting in poor query performance. Apache Hudi is an open-source data management framework that enables you to manage data at the record level in Amazon S3 data lakes, thereby simplifying building CDC pipelines and making it efficient to do streaming data ingestion. Datasets managed by Hudi are stored in Amazon S3 using open storage formats, and integrations with Presto, Apache Hive, Apache Spark, and the AWS Glue Data Catalog give you near real-time access to updated data using familiar tools. Hudi is supported in Amazon EMR and is automatically installed when you choose Spark, Hive, or Presto when deploying your EMR cluster.

In this post, we show you how to build a CDC pipeline that captures the data from an Amazon Relational Database Service (Amazon RDS) for MySQL database using AWS Database Migration Service (AWS DMS) and applies those changes to a dataset in Amazon S3 using Apache Hudi on Amazon EMR. Apache Hudi includes the utility HoodieDeltaStreamer, which provides an easy way to ingest data from many sources, such as a distributed file system or Kafka. It manages checkpointing, rollback, and recovery so you don’t need to keep track of what data has been read and processed from the source, which makes it easy to consume change data. It also allows for lightweight SQL-based transformations on the data as it is being ingested. For more information, see Writing Hudi Tables. Support for AWS DMS with HoodieDeltaStreamer is provided with Apache Hudi version 0.5.2 and is available on Amazon EMR 5.30.x and 6.1.0.

Architecture overview

The following diagram illustrates the architecture we deploy to build our CDC pipeline.

cdc pipeline hudi 1

In this architecture, we have a MySQL instance on Amazon RDS. AWS DMS pulls full and incremental data (using the CDC feature of AWS DMS) into an S3 bucket in Parquet format. HoodieDeltaStreamer on an EMR cluster is used to process the full and incremental data to create a Hudi dataset. As the data in the MySQL database gets updated, the AWS DMS task picks up the changes and takes them to the raw S3 bucket. The HoodieDeltastreamer job can be run on the EMR cluster at a certain frequency or in a continuous mode to apply these changes to the Hudi dataset in the Amazon S3 data lake. You can query this data with tools such as SparkSQL, Presto, Apache Hive running on the EMR cluster, and Amazon Athena.

Deploying the solution resources

We use AWS CloudFormation to deploy these components in your AWS account. Choose an AWS Region for deployment where the following services are available:

You need to meet the following prerequisites before deploying the CloudFormation template:

  • Have a VPC with at least two public subnets in your account.
  • Have a S3 bucket where you want to collect logs from the EMR cluster. This should be in the same AWS region where you spin up the CloudFormation stack.
  • Have an AWS Identity and Access Management (IAM) role dms-vpc-role. For instructions on creating one, see Security in AWS Database Migration Service.
  • If you’re deploying the stack in an account using the AWS Lake Formation permission model, validate the following settings:
    • The IAM user used to deploy the stack is added as a data lake administrator under Lake Formation or the IAM user used to deploy the stack has IAM privileges to create databases in the AWS Glue Data Catalog.
    • The Data Catalog settings under Lake Formation are configured to use only IAM access control for new databases and new tables in new databases. This makes sure that all access to the newly created databases and tables in the Data Catalog are controlled solely using IAM permissions.
      cdc pipeline hudi 2 2
  • IAMAllowedPrincipals is granted database creator privilege on the Lake Formation Database creators page.
    cdc pipeline hudi 3 2

If this privilege is not in place, grant it by choosing Grant and selecting the Create database permission.

cdc pipeline hudi 4 2

These Lake Formation settings are required so that all permissions to the Data Catalog objects are controlled using IAM only.

Launching the CloudFormation stack

To launch the CloudFormation stack, complete the following steps:

  1. Choose Launch Stack:
    LaunchStack
  2. Provide the mandatory parameters in the Parameters section, including an S3 bucket to store the Amazon EMR logs and a CIDR IP range from where you want to access Amazon RDS for MySQL.
    cdc pipeline hudi 5 2
  3. Follow through the CloudFormation stack creation wizard, leaving rest of the default values unchanged.
  4. On the final page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  5. Choose Create stack.
  6. When the stack creation is complete, record the details of the S3 bucket, EMR cluster, and Amazon RDS for MySQL details on the Outputs tab of the CloudFormation stack.
    cdc pipeline hudi 6 2

The CloudFormation template uses m5.xlarge and m5.2xlarge instances for the EMR cluster. If these instance types aren’t available in the Region or Availability Zone you have selected for deployment, the creation of the CloudFormation stack fails. If that happens, choose a Region or subnet where the instance type is available. For more information about working around this issue, see Instance Type Not Supported.

CloudFormation also creates and configures the AWS DMS endpoints and tasks with requisite connection attributes such as dataFormat, timestampColumnName, and parquetTimestampInMillisecond. For more information, see Extra connection attributes when using Amazon S3 as a target for AWS DMS.

The database instance deployed as part of the CloudFormation stack has already been created with the settings needed for AWS DMS to work in CDC mode on the database. These are:

  • binlog_format=ROW
  • binlog_checksum=NONE

Also, automatic backups are enabled on the RDS DB instance. This is a required attribute for AWS DMS to do CDC. For more information, see Using a MySQL-compatible database as a source for AWS DMS.

Running the end-to-end data flow

Now that the CloudFormation stack is deployed, we can run our data flow to get the full and incremental data from MySQL into a Hudi dataset in our data lake.

  1. As a best practice, retain your binlogs for at least 24 hours. Log in to your Amazon RDS for MySQL database using your SQL client and run the following command:
    call mysql.rds_set_configuration('binlog retention hours', 24)

  2. Create a table in the dev database:
    create table dev.retail_transactions(
    tran_id INT,
    tran_date DATE,
    store_id INT,
    store_city varchar(50),
    store_state char(2),
    item_code varchar(50),
    quantity INT,
    total FLOAT);

  3. When the table is created, insert some dummy data into the database:
    insert into dev.retail_transactions values(1,'2019-03-17',1,'CHICAGO','IL','XXXXXX',5,106.25);
    insert into dev.retail_transactions values(2,'2019-03-16',2,'NEW YORK','NY','XXXXXX',6,116.25);
    insert into dev.retail_transactions values(3,'2019-03-15',3,'SPRINGFIELD','IL','XXXXXX',7,126.25);
    insert into dev.retail_transactions values(4,'2019-03-17',4,'SAN FRANCISCO','CA','XXXXXX',8,136.25);
    insert into dev.retail_transactions values(5,'2019-03-11',1,'CHICAGO','IL','XXXXXX',9,146.25);
    insert into dev.retail_transactions values(6,'2019-03-18',1,'CHICAGO','IL','XXXXXX',10,156.25);
    insert into dev.retail_transactions values(7,'2019-03-14',2,'NEW YORK','NY','XXXXXX',11,166.25);
    insert into dev.retail_transactions values(8,'2019-03-11',1,'CHICAGO','IL','XXXXXX',12,176.25);
    insert into dev.retail_transactions values(9,'2019-03-10',4,'SAN FRANCISCO','CA','XXXXXX',13,186.25);
    insert into dev.retail_transactions values(10,'2019-03-13',1,'CHICAGO','IL','XXXXXX',14,196.25);
    insert into dev.retail_transactions values(11,'2019-03-14',5,'CHICAGO','IL','XXXXXX',15,106.25);
    insert into dev.retail_transactions values(12,'2019-03-15',6,'CHICAGO','IL','XXXXXX',16,116.25);
    insert into dev.retail_transactions values(13,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);
    insert into dev.retail_transactions values(14,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);
    

    We now use AWS DMS to start pushing this data to Amazon S3.

  4. On the AWS DMS console, run the task hudiblogload.

This task does a full load of the table to Amazon S3 and then starts writing incremental data.

cdc pipeline hudi 7 2

If you’re prompted to test the AWS DMS endpoints while starting the AWS DMS task for the first time, you should do so. It’s generally a good practice to test the source and target endpoints before starting an AWS DMS task for the first time.

In a few minutes, the status of the task changes to Load complete, replication ongoing, which means that the full load is complete and the ongoing replication has started. You can go to the S3 bucket created by the stack and you should see a .parquet file under the dmsdata/dev/retail_transactions folder in your S3 bucket.

cdc pipeline hudi 8 2

  1. On the Hardware tab of your EMR cluster, choose the master instance group and note the EC2 instance ID for the master instance.
  2. On the Systems Manager console, choose Session Manager.
  3. Choose Start Session to start a session with the master node of your cluster.

If you face challenges connecting to the master instance of the EMR cluster, see Troubleshooting Session Manager.

  1. Switch the user to Hadoop by running the following command:
    sudo su hadoop

In a real-life use case, the AWS DMS task starts writing incremental files to the same Amazon S3 location when the full load is complete. The way to distinguish full load vs. incremental load files is that the full load files have a name starting with LOAD, whereas CDC filenames have datetimestamps, as you see in a later step. From a processing perspective, we want to process the full load into the Hudi dataset and then start incremental data processing. To do this, we move the full load files to a different S3 folder under the same S3 bucket and process those before we start processing incremental files.

  1. Run the following command on the master node of the EMR cluster (replace <s3-bucket-name> with your actual bucket name):
    aws s3 mv s3://<s3-bucket-name>/dmsdata/dev/retail_transactions/ s3://<s3-bucket-name>/dmsdata/data-full/dev/retail_transactions/ --exclude "*" --include "LOAD*.parquet" --recursive

With the full table dump available in the data-full folder, we now use the HoodieDeltaStreamer utility on the EMR cluster to populate the Hudi dataset on Amazon S3.

  1. Run the following command to populate the Hudi dataset to the hudi folder in the same S3 bucket (replace <s3-bucket-name> with the name of the S3 bucket created by the CloudFormation stack):
    spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \ --master yarn --deploy-mode cluster \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --conf spark.sql.hive.convertMetastoreParquet=false \
    /usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \ --table-type COPY_ON_WRITE \ --source-ordering-field dms_received_ts \ --props s3://<s3-bucket-name>/properties/dfs-source-retail-transactions-full.properties \ --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \ --target-base-path s3://<s3-bucket-name>/hudi/retail_transactions --target-table hudiblogdb.retail_transactions \ --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \ --payload-class org.apache.hudi.payload.AWSDmsAvroPayload \
    --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ --enable-hive-sync
    

The preceding command runs a Spark job that runs the HoodieDeltaStreamer utility. For more information about the parameters used in this command, see Writing Hudi Tables.

When the Spark job is complete, you can navigate to the AWS Glue console and find a table called retail_transactions created under the hudiblogdb database. The input format for the table is org.apache.hudi.hadoop.HoodieParquetInputFormat.

cdc pipeline hudi 9 2

Next, we query the data and look at the data in the retail_transactions table in the catalog.

  1. In the Systems Manager session established earlier, run the following command (make sure that you have completed all the prerequisites for the post, including adding IAMAllowedPrincipals as a database creator in Lake Formation):
    spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" \
    --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
    --jars /usr/lib/hudi/hudi-spark-bundle_2.11-0.5.2-incubating.jar,/usr/lib/spark/external/lib/spark-avro.jar
    

  2. Run the following query on the retail_transactions table:
    spark.sql("Select * from hudiblogdb.retail_transactions order by tran_id").show()

You should see the same data in the table as the MySQL database with a few columns added by the HoodieDeltaStreamer process.

cdc pipeline hudi 10 2

We now run some DML statements on our MySQL database and take these changes through to the Hudi dataset.

  1. Run the following DML statements on the MySQL database:
    insert into dev.retail_transactions values(15,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);
    update dev.retail_transactions set store_city='SPRINGFIELD' where tran_id=12;
    delete from dev.retail_transactions where tran_id=2;

In a few minutes, you see a new .parquet file created under dmsdata/dev/retail_transactions folder in the S3 bucket.

cdc pipeline hudi 12 2

  1. Run the following command on the EMR cluster to get the incremental data to the Hudi dataset (replace <s3-bucket-name> with the name of the S3 bucket created by the CloudFormation template):
    spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \ --master yarn --deploy-mode cluster \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --conf spark.sql.hive.convertMetastoreParquet=false \
    /usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \ --table-type COPY_ON_WRITE \ --source-ordering-field dms_received_ts \ --props s3://<s3-bucket-name>/properties/dfs-source-retail-transactions-incremental.properties \ --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \ --target-base-path s3://<s3-bucket-name>/hudi/retail_transactions --target-table hudiblogdb.retail_transactions \ --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \ --payload-class org.apache.hudi.payload.AWSDmsAvroPayload \
    --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ --enable-hive-sync \
    --checkpoint 0

The key difference between this command and the previous one is in the properties file that was used as an argument to the –-props and --checkpoint parameters. For the earlier command that performed the full load, we used dfs-source-retail-transactions-full.properties; for the incremental one, we used dfs-source-retail-transactions-incremental.properties. The differences between these two property files are:

  • The location of source data changes between full and incremental data in Amazon S3.
  • The SQL transformer query included a hard-coded Op field for the full load task, because an AWS DMS first-time full load doesn’t include the Op field for Parquet datasets. The Op field can have values of I, U, and D—for Insert, Update and Delete indicators.

We cover the details of the --checkpoint parameter in the Considerations when deploying to production section later in this post.

  1. When the job is complete, run the same query in spark-shell.

You should see these updates applied to the Hudi dataset.

cdc pipeline hudi 11 2

You can use the Hudi CLI to administer Hudi datasets to view information about commits, the filesystem, statistics, and more.

  1. To do this, in the Systems Manager session, run the following command:
    /usr/lib/hudi/cli/bin/hudi-cli.sh

  2. Inside the Hudi-cli, run the following command (replace the <s3-bucket-name> with the S3 bucket created by the Cloud Formation stack):
    connect --path s3://<s3-bucket-name>/hudi/retail_transactions

  3. To inspect commits on your Hudi dataset, run the following command:
    commits show

    cdc pipeline hudi 13 2

You can also query incremental data from the Hudi dataset. This is particularly useful when you want to take incremental data for downstream processing like aggregations. Hudi provides multiple ways of pulling data incrementally which is documented here. An example of how to use this feature is available in the Hudi Quick Start Guide.

Considerations when deploying to production

The preceding setup showed an example of how to build a CDC pipeline from your relational database to your Amazon S3-based data lake. However, if you want to use this solution for production, you should consider the following:

  • To ensure high availability, you can set up the AWS DMS instance in a Multi-AZ configuration.
  • The CloudFormation stack deployed the required properties files needed by the deltastreamer utility into the S3 bucket at s3://<s3-bucket-name>/properties/. You may need to customize these based on your requirements. For more information, see Configurations. There are a few parameters that may need your attention:
    • deltastreamer.transformer.sql – This property exposes an extremely powerful feature of the deltastreamer utility: it enables you to transform data on the fly as it’s being ingested and persisted in the Hudi dataset. In this post, we have shown a basic transformation that casts the tran_date column to a string, but you can apply any transformation as part of this query.
    • parquet.small.file.limit – This field is in bytes and a critical storage configuration specifying how Hudi handles small files on Amazon S3. Small files can happen due to the number of records being processed in each insert per partition. Setting this value allows Hudi to continue to treat inserts in a particular partition as updates to the existing files, causing files that are up to the size of this small.file.limit to be rewritten and keep growing in size.
    • parquet.max.file.size – This is the max file size of a single Parquet in your Hudi dataset, after which a new file is created to store more data. For Amazon S3 storage and data querying needs, we can keep this around 256 MB–1 GB (256x1024x1024 = 268435456).
    • [Insert|Upsert|bulkinsert].shuffle.parallelism – In this post, we dealt with a small dataset of few records only. However, in real-life situations, you might want to bring in hundreds of millions of records in the first load, and then incremental CDC can potentially be in millions per day. There is a very important parameter to set when you want quite predictable control on the number of files in each of your Hudi dataset partitions. This is also needed to ensure you don’t hit an Apache Spark limit of 2 GB for data shuffle blocks when processing large amounts of data. For example, if you plan to load 200 GBs of data in first load and want to keep file sizes of approximately 256 MB, set the shuffle parallelism parameters for this dataset as 800 (200×1024/256). For more information, see Tuning Guide.
  • In the incremental load deltastreamer command, we used an additional parameter: --checkpoint 0. When deltastreamer writes a Hudi dataset, it persists checkpoint information in the .commit files under the .hoodie folder. It uses this information in subsequent runs and only reads that data from Amazon S3, which is created after this checkpoint time. In a production scenario, after you start the AWS DMS task, the task keeps writing incremental data to the target S3 folder as soon as the full load is complete. In the steps that we followed, we ran a command on the EMR cluster to manually move the full load files to another folder and process the data from there. When we did that, the timestamp associated with the S3 objects changes to the most current timestamp. If we run the incremental load without the checkpoint argument, deltastreamer doesn’t pick up any incremental data written to Amazon S3 before we manually moved the full load files. To make sure that all incremental data is processed by deltastreamer the first time, set the checkpoint to 0, which makes it process all incremental data in the folder. However, only use this parameter for the first incremental load and let deltastreamer use its own checkpointing methodology from that point onwards.
  • For this post, we ran the spark-submit command manually. However, in production, you can run it as a step on the EMR cluster.
  • You can either schedule the incremental data load command to run at a regular interval using a scheduling or orchestration tool, or run it in a continuous fashion at a certain frequency by passing additional parameters to the spark-submit command --min-sync-interval-seconds XX –continuous, where XX is the number of seconds between each run of the data pull. For example, if you want to run the processing every 5 minutes, replace XX with 300.

Cleaning up

When you are done exploring the solution, complete the following steps to clean up the resources deployed by CloudFormation:

  1. Empty the S3 bucket created by the CloudFormation stack
  2. Delete any Amazon EMR log files generated under s3://<EMR-Logs-S3-Bucket> /HudiBlogEMRLogs/.
  3. Stop the AWS DMS task Hudiblogload.
  4. Delete the CloudFormation stack.
  5. Delete any Amazon RDS for MySQL database snapshots retained after the CloudFormation template is deleted.

Conclusion

More and more data lakes are being built on Amazon S3, and these data lakes often need to be hydrated with change data from transactional systems. Handling deletes and upserts of data into the data lake using traditional methods involves a lot of heavy lifting. In this post, we saw how to easily build a solution with AWS DMS and HoodieDeltaStreamer on Amazon EMR. We also looked at how to perform lightweight record-level transformations when integrating data into the data lake, and how to use this data for downstream processes like aggregations. We also discussed the important settings and command line options that were used and how you could modify them to suit your requirements.


About the Authors

ninad phatak 100Ninad Phatak is a Senior Analytics Specialist Solutions Architect with Amazon Internet Services Private Limited. He specializes in data engineering and datawarehousing technologies and helps customers architect their analytics use cases and platforms on AWS.

 

 

 

raghu dubey 100Raghu Dubey is a Senior Analytics Specialist Solutions Architect with Amazon Internet Services Private Limited. He specializes in Big Data Analytics, Data warehousing and BI and helps customers build scalable data analytics platforms.