By Preetam Roy, Sr. Director of Technology at Cognizant
By Anirban De, Principal Architect of Technology at Cognizant
By Sumalya Bhattacharyya, Sr. Architect of Technology at Cognizant

Cognizant_Logo-4
Cognizant-APN Badge-4.1
Connect with Cognizant-2

In the context of data replication, Change Data Capture (CDC) is the technique of systematically tracking incremental change in data at the source, and subsequently applying these changes at the target to maintain synchronization.

You can implement CDC in diverse scenarios using a variety of tools and technologies, including commercial products as well as bespoke solutions.

Cognizant is an AWS Partner Network (APN) Premier Consulting Partner and AWS Managed Service Provider. Cognizant specializes in cloud infrastructure, platform, security, application rationalization, and business analytics. We have AWS Competencies in healthcare, life sciences, migration, and financial services.

In this post, we explain how to implement a custom CDC solution using Amazon Web Services (AWS).

A Hypothetical Scenario

To better illustrate how to implement our solution, we have constructed a hypothetical scenario for the retail industry. Our hypothetical retailer offers a loyalty program to its customers. Depending on spending propensity, customers are assigned loyalty categories ranging from Basic to Platinum.

Customers can use several channels (online, at the store, through partner merchants) to register for the loyalty program. The retailer maintains an on-premises Microsoft SQL Server database as the system of record for customer loyalty.

Recently, the retailer decided to build a data and analytics platform on AWS to better understand customer behavior, their spending patterns, and how that correlates with the state of the loyalty program.

For example, how long does a customer typically take to upgrade from the Basic to Silver category? Or, what is the average spend of customers who did not qualify for a loyalty category upgrade in the past 12 months?

To achieve this goal of data analytics on AWS, the retailer needs to maintain up-to-date data within the AWS environment. With respect to customer loyalty data, the on-premises SQL Server remains the system of record; however, a copy of the data must also be maintained on AWS.

The retailer decides to use AWS Database Migration Service (AWS DMS) for near real-time replication of the customer loyalty data. It also selects Amazon Redshift as the data warehouse.

To implement CDC, the retailer uses an Online Transaction Processing (OLTP) system backed by a Microsoft SQL Server database. It uses an Amazon Redshift data warehouse as the target. The OLTP system acts as a system of record for a customer loyalty program. The data warehouse aggregates loyalty data on a daily basis, and stores this information for consumption by analytics and visualization tools.

Solution Design

In our hypothetical scenario, a typical record at the source database looks like this:

event_idevent_tscust_idbusiness_op_codecust_loyalty_category
142019-04-01 10:10:00Cust100REGNBasic
  • event_id is an AUTO_INCREMENT integer.
  • event_ts is a timestamp of when the event is recorded in the system. The value usually defaults to GETDATE().
  • cust_id is the unique identifier of a customer enrolled in the loyalty program.
  • business_op_code indicates the action that resulted in this record. The action can be REGN for a new customer registration, UPGD for a loyalty status upgrade, or DNGD for a loyalty status downgrade.
  • cust_loyalty_category is the current loyalty benefit category; for example, Basic, Silver, Gold, or Platinum.

When a new customer enrolls in the loyalty program, a record is inserted and cust_loyalty_category is set to Basic. Subsequently, based on customer actions and spending history, the loyalty category may be upgraded or downgraded.

In these cases, the record corresponding to the customer is updated with the new loyalty category, and the event timestamp is updated to indicate the date and time of this action. If a customer chooses to exit the loyalty program, their record is deleted.

Our objective is to build aggregated customer loyalty data on AWS that provides a daily count of the total number of customers across the different loyalty categories. It would look something like this:

dtBasicSilverGoldPlatinum
2019-04-011000800500200
2019-04-021050810503201

The source OLTP database on the on-premises Microsoft MySQL Server database is typically used for answering this question:

  • What is the current loyalty category of a given customer?

On the other hand, the data warehouse on AWS does not record loyalty status of individual customers, and is more suited to respond to this question:

  • What is/was the status of the loyalty program on a given date?

The purpose of the data at source and target is different, and so is the database design. A simple one-to-one replication from the source OLTP system to the target data warehouse will not suffice. We need a transformation component in the CDC solution as well.

To address these requirements, we propose the following architecture using AWS Glue and AWS Serverless.

Change Data Capture (CDC) Solution Architecture Overview

Figure 1 – Solution Architecture of CDC system.

Acquiring Replicated OLTP Transactions

Committed transactions from the source SQL Server database are replicated as Apache Parquet files to an Amazon Simple Storage Service (Amazon S3) bucket (the Raw bucket in Figure 1). In the raw bucket, AWS DMS creates a new Parquet file for each transaction at the source. These Parquet files have the same prefix and are not partitioned.

The Amazon S3 target endpoint is configured with the following extra connection attributes:

dataFormat=parquet;parquetTimestampInMillisecond=true;timestampColumnName=TIMESTAMP;

With these connection attributes, AWS DMS adds two extra columns to each record when storing in the raw S3 bucket:

  • TIMESTAMP — a timestamp indicating when AWS DMS processed the record.
  • Op — a source operation indicator with a value of I, U, or D, corresponding to INSERT, UPDATE and DELETE.

Curating the Data

The curated Amazon S3 bucket is intended as a staging location for Amazon Redshift. As such, it contains Parquet files partitioned by date and sorted on customer loyalty category. To create these files, we build an AWS Glue job.

This AWS Glue job accepts two timestamp values as arguments (startDate and endDate). It identifies the objects in the raw bucket modified at any time between the startDate and endDate values. Then, it creates an AWS Glue dynamic frame from the data contained in the identified objects.

Next, it applies a SQL query on the dynamic frame. This SQL query is the heart of the AWS Glue job, and it performs two sub-tasks:

  1. Eliminates all records for customers with a D in the Op field. This means if there is an I followed by a U, and finally a D in the Op field for a particular customer ID, then all three records are dropped. This is because, ultimately, the record for this customer ID was deleted at the source.
    .
  2. Groups the remaining records by customer ID, and finds the maximum value of the event timestamp for each customer. This task identifies the most recent activity for each customer. The effective value of the loyalty category is chosen from the most recent activity.

When this SQL query is run on the dynamic frame, a new dynamic frame is created with the aggregate data set.

The final step in the AWS Glue job is to write this new dynamic frame into the curated S3 bucket. However, instead of writing the AWS Glue dynamic frame directly, we first convert it into an Apache Spark data frame. Before writing the data frame to Amazon S3, we set the following parameter:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

Then, we use the overwrite mode with the data frame writer to ensure duplicate files are silently removed. This strategy is important to maintain the idempotence of the AWS Glue job. In other words, if the job is run multiple times with the same date range as input, Apache Spark retains only one file in the curated bucket.

When writing the data frame to the curated bucket, we partition the data on the fly using this property:

partitionBy("event_dt", "cust_loyalty_category")

This creates a primary partition by event date and a sub-partition by customer loyalty category, which results in an S3 object key similar to this:

curated/cust_loyalty/event_dt=2019-04-01/cust_loyalty_category=Basic/part-00000-408186e2-68f6-40c9-ba12-2fb1b0bc3c21.c000.snappy.parquet

A quick S3 Select on this object shows the content:

{ "cust_id": "Cust100", "TIMESTAMP": 4.535740773078597e+25 }

The TIMESTAMP value refers to when the most recent record for this customer ID was recorded by AWS DMS. This may not be directly relevant for our aggregation use case, but it could serve some purpose in context of tracing back activity in the AWS environment.

At this point, the schema of the curated bucket looks like this:

Column NameData TypePartition Key
cust_idstring
Timestamptimestamp
event_dtstringPartition (0)
cust_loyalty_categorystringPartition (1)

And here’s a sample record:

cust_idtimestampevent_dtcust_loyalty_category
Cust1002019-12-12 07:25:182019-04-01Basic

Note that the primary partition of the curated schema aligns with the dt column of the Amazon Redshift table. This column also contains the distribution key (DISTKEY), as well as the sort key (SORTKEY).

The last sub-task of the AWS Glue job is to run a pre-configured AWS Glue crawler so the AWS Glue Data Catalog is updated with the new Parquet files written by Spark. This ensures any downstream consumer of curated data has accurate metadata to work with.

Loading the Target Table

The final step is to transpose the data such that a count of customers for each of the four loyalty categories is available for each value of event_dt. This is best done using SQL, and so this step is implemented as an Amazon Redshift stored procedure.

For Amazon Redshift to access data residing in the Parquet files in the curated bucket, we configure Amazon Redshift Spectrum to use the AWS Glue Data Catalog updated by the AWS Glue job.

We create an external schema in the Amazon Redshift database pointing to the database in the AWS Glue Data Catalog that contains the table corresponding to the curated bucket. The command is similar to this:

create external schema cdcmeta from data catalog database 'cdcmetadb' iam_role 'arn:aws:iam::xxxxxxxxxxxx:role/myRedshiftRole'

It is not necessary to create an external table in Amazon Redshift, since this information is picked up directly from the AWS Glue Data Catalog. However, the identity and access management (IAM) role must have policies in place to access the AWS Glue Data Catalog.

Over an extended period of time, the curated bucket will contain a large number of partitions (one for each day). Assuming we refresh Amazon Redshift on a daily basis, we must restrict Amazon Redshift Spectrum to scanning only those S3 objects for the dates we want.

This is a critical consideration from the point of view of performance as well as cost. To achieve this, we pass two arguments to the stored procedure: startDate and endDate. This is similar to the AWS Glue job we created earlier for the curated data.

The stored procedure first creates a temporary table from the curated data with partitions belonging to the date range specified by these input values.

There can be an overlap in the values of the dt column in the temporary table and the main table in Amazon Redshift. If we simply insert the records from the temporary table into the main table, we may create duplicate records, since Amazon Redshift does not enforce any unique constraint.

Therefore, we must first delete records from the main table for values of dt that are present in the temporary table. Then, we insert all records from temporary table into the main table.

Operationally, the delete statement becomes effectively a NO operation if successive runs of the script are called with a discrete date range. In effect, there is no overlap in the date range (startDate – endDate) between successive invocations of the stored procedure.

Here’s a screen listing of a call to the Amazon Redshift stored procedure:

rsdemo=> select * from dashboard.cust_loyalty; dt | basic | silver | gold | platinum
----+-------+--------+------+----------
(0 rows) rsdemo=> call cust_loyalty_agg_load('2019-01-01','2019-12-31');
INFO: startDate 2019-01-01, endDate 2019-12-31
INFO: Found 1 qualifying date value(s)
INFO: 0 record(s) deleted
INFO: 1 record(s) inserted
CALL
rsdemo=> select * from dashboard.cust_loyalty; dt | basic | silver | gold | platinum
------------+-------+--------+------+---------- 2019-04-01 | 1 | | |
(1 row)

Scheduling

The AWS Glue job and Amazon Redshift stored procedure are designed so they do not need to be called synchronously one after the other. Both of these jobs accept a date range that allows them to work independently with no awareness of the other job’s status or run history.

Also, as mentioned earlier, the AWS Glue job is idempotent, so there is no risk of running successive iterations with overlapping date range. The same holds true for the Amazon Redshift stored procedure since we logically do an UPSERT or MERGE.

As an example, assume that both jobs need to run once daily:

  • At 00:01, the AWS Glue job is initiated with a startDate of the previous day at 00:00:00 hours, and an endDate of the current (today) date at 00:00:00 hours.
    .
  • At 04:00, the Amazon Redshift stored procedure is called with the previous day’s date assigned to the startDate and endDate parameters.

For scheduling, we use the cron scheduler on an Amazon Elastic Compute Cloud (Amazon EC2) instance with Amazon Linux. For passing values of the input arguments to the jobs based on the current date value, we need a little bit of Python scripting.

Python also allows us to use the Boto3 library for interacting with AWS resources. For example, we can create an AWS Glue connection for the Amazon Redshift cluster. Then, in the Python script, we fetch the connection details from this resource instead of hard coding these in the script. We schedule these scripts using cron.

Scalability and Throughput

The first aspect of the solution we consider in context of throughput is the rate of change at the source; in effect, the Microsoft SQL Server database on premises.

Functionally, we are only interested in tracking and replicating changes to customer loyalty data; we do not need to know about customer orders, spending, billing, or returns.

During seasonal peaks or periods of heavy commercial activity, the rate of change of customer loyalty data is likely to increase as well, given that new customers sign up for the loyalty program, and a significant portion of existing customers upgrade to the next category based on spend.

During these periods, we create additional AWS DMS tasks and use the source filter feature of AWS DMS to scale out the tasks. For example, if we want to dedicate a DMS task to new customer registration, then we use the business_op_code column at the source to filter for values matching REGN.

The other AWS DMS task also implements a filter, but it matches against the values UPGD or DNGD. Of course, vertical scaling is also at our disposal—we simply change the type of DMS replication instance to cater to the anticipated increase or decrease in the replication workload.

For curating the data, the AWS Glue job is normally run once daily. That means the job should be configured with enough memory to fit all of the data accumulated through the previous day. Assuming that on an average AWS DMS creates objects with a gross size of 20GB in the raw bucket every day, the AWS Glue job should be configured with two workers of type G.1X or one worker of type G.2X.

Either of these configuration provides compute capacity equivalent to 8 CPUs and 32GB of memory. To maintain predictable throughput during seasonal peaks, the job configuration is updated with a higher number of workers.

Also, the AWS Glue job metrics are monitored to determine whether the job is under-provisioned or over-provisioned.

Finally, the throughput of the Amazon Redshift stored procedure is mainly impacted by the volume of curated data it needs to process. At the curated layer, data is aggregated such that every record corresponds to one and only one customer enrolled in the loyalty program.

This means that during seasonal peaks, when more than the usual number of customers sign up for the loyalty program and a significantly large number of customers qualify for a loyalty category upgrade because of increased spending, the curated data volume per partition also grows.

Again, to maintain predictable throughput of the stored procedure, we use the Workload Management (WLM) feature of Amazon Redshift. The stored procedure is run with a specific query group. During periods of high activity, the WLM queue corresponding to this query group is updated with a higher proportion of memory. The elastic resize feature is also used to scale up the overall capacity of the Amazon Redshift cluster.

Conclusion

In this post, we used a hypothetical retailer with a customer loyalty program to demonstrate how Change Data Capture (CDC) can synchronize incremental changes in customer activity with the main body of data already stored about a customer.

We explained how to acquire replicated OLTP transactions and capture them in an Amazon S3 bucket. Then, we showed how you can curate that data by running AWS Glue jobs on the Amazon Redshift database.

We also demonstrated how to use AWS Glue jobs to curate the Parquet files that contained customer loyalty data to identify customers of interest.

Finally, we made recommendations for scalability and throughput so that our CDC solution can handle periods of high customer activity.

The content and opinions in this blog are those of the third party author and AWS is not responsible for the content or accuracy of this post.

.
Cognizant-APN-Blog-CTA-1
.


Cognizant – APN Partner Spotlight

Cognizant is an APN Premier Consulting Partner. They transform customers’ business, operating, and technology models for the digital era by helping organizations envision, build, and run more innovative and efficient businesses.

Contact Cognizant | Practice Overview | AWS Marketplace

*Already worked with Cognizant? Rate this Partner

*To review an APN Partner, you must be an AWS customer that has worked with them directly on a project.