Organizations across the globe are striving to improve the scalability and cost efficiency of the data warehouse. Offloading data and data processing from a data warehouse to a data lake empowers companies to introduce new use cases like ad hoc data analysis and AI and machine learning (ML), reusing the same data stored on Amazon Simple Storage Service (Amazon S3). This approach avoids data silos and allows you to process the data at very large scale while keeping the data access cost-effective.
Starting off with this new approach can bring with it several challenges:
- Choosing the most performant data format
- Using the Spark API instead of plain SQL
- Handling historical data change on Amazon S3
In this post, I focus on demonstrating how to handle historical data change for a star schema by implementing Slowly Changing Dimension Type 2 (SCD2) with Apache Hudi using Apache Spark on Amazon EMR, and storing the data on Amazon S3.
Star schema and SCD2 concept overview
In data warehousing, a star schema is the simplest type of dimensional model, in which the center of the star can have one fact table and a number of associated dimension tables. A fact is an event that is counted, such as a single sale. A dimension contains reference information about the fact, such as product details or customer information.
SCD2 is a dimension that stores and manages current and historical data over time in a data warehouse. The purpose of an SCD2 is to preserve the history of changes. If a customer changes their address, for example, or any other attribute, an SCD2 allows analysts to link facts back to the customer and their attributes in the state they were at the time of the fact event.
The following diagram illustrates a star schema with a
Sale fact table and
Customer dimension table, which is managed as an SCD2 table.
Let’s have a deeper look at the
Customer dimension table schema. You can categorize the columns into three different groups:
- Key –
customer_dim_key, also called a surrogate key, has a unique value, generated automatically. It’s used as a foreign key for the sale fact table.
- Attributes –
countryhave a business value used in business intelligence (BI) reports.
- SCD2 metadata –
is_currentare designed to manage the state of the record.
eff_end_datecontain the time interval when the record is effective.
- Metadata –
timestampis the actual time when the customer record was generated.
SCD2 implementation challenge
Implementing SCD2 in a data lake without using an additional framework like Apache Hudi introduces the challenge of updating data stored on immutable Amazon S3 storage, and as a result requires the implementor to create multiple copies of intermediate results. This situation may lead to a significant maintenance effort and potential data loss or data inconsistency.
Apache Hudi is an open-source data management framework used to simplify incremental data processing and data pipeline development. Hudi enables you to manage data at the record level in Amazon S3 and helps to handle data privacy use cases requiring record-level updates and deletes. Hudi is supported by Amazon EMR starting from version 5.28 and is automatically installed when you choose Spark, Hive, or Presto when deploying your EMR cluster.
Using the Apache Hudi
upsert operation allows Spark clients to update dimension records without any additional overhead, and also guarantees data consistency.
With copy-on-write mode, Hudi rewrites the files on Amazon S3 by performing a synchronous merge during the write operation. In addition, to enable fast file lookup as a part of the
select query, it has an indexing mechanism by mapping a given record key consistently to a file ID.
For more information about Hudi concepts, see Concepts.
In a real-world use case, sale and customer records are ingested to a data lake continuously as a replication from operational databases. For this post, we generate customer and sale records inline.
To demonstrate the solution, I walk through the following steps:
- Generate customer records using a Spark DataFrame.
- Store customer records on Amazon S3 using Apache Hudi.
- Generate sale records.
- Look up a customer surrogate key used as a dimension key.
- Store sale records on Amazon S3.
- Run a BI query to calculate the amount of sales per country.
- Generate customer records to reflect the scenario when one customer updates the address and one new customer is created.
- Apply SCD2 logic for the
- Generate new sale records.
- Run a BI query to calculate the amount of sales per country to validate the final result.
Before you get started, make sure you meet the following prerequisites:
- Have an AWS account.
- Be able to run the code sample and create an EMR cluster with EMR notebook. It’s important to use an Amazon EMR version greater than 5.28, which is the first version that supports Apache Hudi.
- Enable Apache Hudi on Amazon EMR.
- Create an S3 bucket where Hudi files are stored.
Create customer schema and records
To create your customer schema and records, use the following code:
To generate unique surrogate keys of the customer, we get a current time in microseconds resolution, using the Python
time library defined as Spark UDF:
The best practice is to use
eff_end_date with a specific future value so you can run range queries in the following steps.
Store customer records using Apache Hudi
For an initial insert, you only use the Hudi
insert operation with
Overwrite Spark mode:
For more information about Hudi configuration, see Configurations.
To validate the write operation, read the
Customer Hudi table.
Create sale records
Next, we generate sales records of existing customers. The sales record contains a
customer_id, which is a customer business key used to find a corresponding record in the
Customer table. See the following code:
The following screenshot shows our results.
Customer dimension key lookup
To find a proper customer dimension surrogate key, we use LEFT OUTER JOIN. If there is no match,
customer_dim_key gets -1 value by default. The customer dimension record may be missing due to the delay. This may happen when the components involved in a replication system can’t keep up with a pace of the replication from the operational database and are lagging behind. See the following code:
The join condition describes the expression where a sale record belongs to the specific customer record, which has an effective date interval that contains the sale record timestamp.
The following screenshot shows our output.
Save sale records
Sales represent an event that happened in a point of time in the past and never changes. We use Parquet format with append mode to store sale records:
To demonstrate the dimensional query, we join the
Sale table with the
Customer table and count the amount of sales per country. SQL is commonly used for analytics queries, so we use SparkSQL to query the SCD2 data:
The following screenshot shows our results.
Handle customer address change
Let’s assume that the customer Susan updates their address from US to France. Also, a new customer, Bastian, is created. See the following code:
According to the SCD2 concept, when a new customer record is created, the historical record needs to expire. To implement the expiration, we find Susan’s customer record in the existing dataset, set
eff_end_date to the
eff_start_date of the newest record, and set the
is_current value to false:
We create a union with the new customer records and store them using the Hudi
upsert operation and Spark
Customer Hudi table shows two records existing for the customer Susan; one is historical and one is current. The historical one with US location has an updated
is_current set to
false. Also, the new customer Bastian was added.
We can generate new sales made by Susan and look up a
Customer dimension key using LEFT OUTER JOIN exactly as we did earlier.
Running the query to get a number of sales per country reflects an address change, showing two sales for Susan with the up-to-date address in France.
In this post, I demonstrated how to continuously build SCD2 using Apache Hudi, while maintaining low operational overhead and fully eliminating the need to handle intermediate results on Amazon S3.
By combining the low-cost storage of Amazon S3, the ability to separate storage and compute, and Hudi’s native integration with Amazon EMR, we now have an effective way to store SCD2 data in our data lake.
You should now have a good understanding of SCD2 and be able to experiment and reuse my example notebook to implement a solution with your own data.
About the author
David Greenshtein is a Specialist Solutions Architect for Analytics at AWS with a passion for ETL and automation. He works with AWS customers to design and build analytics solutions enabling business to make data-driven decisions. In his free time, he likes jogging and riding bikes with his son.