Before the release of Amazon Kinesis Data Analytics Studio, customers relied on Amazon Kinesis Data Analytics for SQL on Amazon Kinesis Data Streams. With the release of Kinesis Data Analytics Studio, data engineers and analysts can use an Apache Zeppelin notebook within Studio to query streaming data interactively from a variety of sources, like Kinesis Data Streams, Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Simple Storage Service (Amazon S3), and other sources using custom connectors.

In this post, we cover some of the most common query patterns to run on streaming data using Apache Flink relational APIs. Out of the two relational API types supported by Apache Flink, SQL and Table APIs, our focus is on SQL APIs. We expect readers to have knowledge of Kinesis Data Streams, AWS Glue, and AWS Identity and Access Management (IAM). In this post, we use a sales transaction use case to walk you through the examples of tumbling, sliding, session and windows, group by, and joins query operations. We expect readers to have a basic knowledge of SQL queries and streaming window concepts.

Solution architecture

To show the working solution of interactive analytics on streaming data, we use a Kinesis Data Generator UI application to generate the stream of data, which continuously writes to Kinesis Data Streams. For the interactive analytics on Kinesis Data Streams, we use Kinesis Data Analytics Studio that uses Apache Flink as the processing engine, and notebooks powered by Apache Zeppelin. These notebooks come with preconfigured Apache Flink, which allows you to query data from Kinesis Data Streams interactively using SQL APIs. To use SQL queries in the Apache Zeppelin notebook, we configure an AWS Glue Data Catalog table, which is configured to use Kinesis Data Streams as a source. This configuration allows you to query the data stream by referring to the AWS Glue table in SQL queries.

We use an AWS CloudFormation template to create the AWS resources shown in the following diagram.

BDB1684 image001

Set up the environment

After you sign in to your AWS account, launch the CloudFormation template by choosing Launch Stack:

launch stack
The CloudFormation template configures the following resources in your account:

  • Two Kinesis data streams, one for sales transactions and one for card data
  • A Kinesis Data Analytics Studio application
  • An IAM role (service execution role) for Kinesis Data Analytics Studio
  • Two AWS Glue Data Catalog tables: sales and card

After you complete the setup, sign in to the Kinesis Data Analytics console. On the Kinesis Data Analytics applications page, choose the Studio tab, where you can see the Studio notebook in ready status. Select the Studio notebook, choose Run, and wait until the notebook is in running status. It can take a couple of minutes for the notebook to get into running status.

image005

To run the analysis on streaming data, select the Apache Zeppelin notebook environment and open it. You have the option to create a new note in the notebook.

image007

Run stream analytics in an interactive application

Before you start running interactive analytics with a Studio notebook, you need to start streaming data into your Kinesis data stream, which you created earlier using the CloudFormation stack. To generate streaming data into the data stream, we use a hosted Kinesis Data Generator UI application.

  1. Create an Amazon Cognito user pool in your account and user in that pool. For instructions, see the GitHub repo.
  2. Log in to the Kinesis Data Generator application.
  3. Choose the Region where the CloudFormation template was run to create the Kinesis data stream.
  4. Choose the data stream from the drop-down menu and select the data stream for sales.
  5. Set records per second to 10.
  6. Use the following code for the record template:
{ "customer_card_id": {{random.number({ "min":1, "max":99 })}}, "customer_id": {{random.number({ "min":100, "max":110 })}}, "price": {{random.number( { "min":10, "max":500 } )}}, "product_id": "{{random.arrayElement( ["4E5750DC2A1D","E6DA5387367B","B552B4B940D0"] )}}"
}

  1. Choose Send Data.

To run the table join queries in the example section, you need to stream sample card data to a separate data stream.

  1. Choose the Region where you created the data stream.
  2. Choose the data stream from the drop-down menu.
  3. Select the data stream for card.
  4. Set records per second to 5.
  5. Use the following code for the record template:
{ "card_id": {{random.number({ "min":75, "max":99 })}}, "card_number": {{random.number({ "min":23274397, "max":47547920 })}}, "card_zip": "{{random.arrayElement( ["07422","23738","03863"] )}}", "card_name": "{{random.arrayElement( ["Laura Perez","Peter Han","Karla Johnson"] )}}"
}

  1. Choose Send Data.
  2. Go back to the notebook note and specify the language Studio uses to run the application.

You need to specify Flink interpreter supported by Apache Zeppelin notebook, like Python, IPython, stream SQL, or batch SQL. Because we use Python Flink streaming SQL APIs in this post, we use the stream SQL interpreter ssql as the first statement:

%flink.ssql(type=update)

Common query patterns with Flink SQL

In this section, we walk you through examples of common query patterns using Flink SQL APIs. In all the examples, we refer to the sales table, which is the AWS Glue table created by the CloudFormation template that has Kinesis Data Streams as a source. It’s the same data stream where you publish the sales data using the Kinesis Data Generator application.

Windows and aggregation

In this section, we cover examples of windowed and aggregate queries: tumbling, sliding, and session window operations.

Tumbling window

In the following example, we use SUM aggregation on a tumbling window. The query emits the total spend for every customer every 30-second window interval.

The following table shows our input data.

proctimecustomer_idcustomer_card_idproduct_idprice
2021-04-20 21:31:01.10751014E5750DC2A1D110
2021-04-20 21:31:01.11578118B552B4B940D080
2021-04-20 21:31:01.32875101E6DA5387367B60
2021-04-20 21:31:01.504781014E5750DC2A1D110
2021-04-20 21:31:01.678751484E5750DC2A1D110
2021-04-20 21:31:01.96078118B552B4B940D080

We use the following code for our query:

%flink.ssql(type=update)
SELECT TUMBLE_END(proctime, INTERVAL '30' SECOND) as window_end_time, customer_id
, SUM(price) as tumbling_30_seconds_sum
FROM sales
GROUP BY TUMBLE(proctime, INTERVAL '30' SECOND), customer_id

The following table shows our results.

windown_end_timecustomer_idtumbling_30_seconds_sum
2021-04-20 21:31:01.075170
2021-04-20 21:31:01.07880
2021-04-20 21:31:30.075110
2021-04-20 21:31:30.078190

Sliding window

In this sliding window example, we run a SUM aggregate query that emits the total spend for every customer every 10 seconds for the 30-second window.

The following table shows our input data.

proctimecustomer_idcustomer_card_idproduct_idprice
2021-04-20 21:31:01.10751014E5750DC2A1D110
2021-04-20 21:31:01.2078118B552B4B940D080
2021-04-20 21:31:01.2875101E6DA5387367B60
2021-04-20 21:31:01.30781014E5750DC2A1D110
2021-04-20 21:31:01.36751484E5750DC2A1D110
2021-04-20 21:31:01.4078118B552B4B940D080

We use the following code for our query:

%flink.ssql(type=update)
SELECT HOP_END(proctime, INTERVAL '10' SECOND, INTERVAL '30' SECOND) AS window_end_time
, customer_id, SUM(price) AS sliding_30_seconds_sum
FROM sales
GROUP BY HOP(proctime, INTERVAL '10' SECOND, INTERVAL '30' SECOND), customer_id

The following table shows our results.

window_end_timecustomer_idsliding_30_seconds_sum
2021-04-20 21:31:01.1075110
2021-04-20 21:31:01.2075110
2021-04-20 21:31:01.207880
2021-04-20 21:31:30.3075170
2021-04-20 21:31:30.3078190
2021-04-20 21:31:30.4075280
2021-04-20 21:31:30.4078270

Session window

The following example of a session window query finds the total spend per session for a 1-minute gap of inactivity. To generate the result, we stream the data from the Kinesis Data Generator application and stop streaming for more than a minute to create a 1-minute gap of inactivity.

The following table shows our input data.

proctimecustomer_idcustomer_card_idproduct_idprice
2021-04-20 21:31:01.10751014E5750DC2A1D110
2021-04-20 21:31:01.2078118B552B4B940D080
2021-04-20 21:31:01.2875101E6DA5387367B60
2021-04-20 21:32:50.30781014E5750DC2A1D110
2021-04-20 21:32:50.36751484E5750DC2A1D110

We use the following code for our query:

%flink.ssql(type=update)
SELECT customer_id, SESSION_START(proctime, INTERVAL '1' MINUTE) AS session_start_time
, SESSION_PROCTIME(proctime, INTERVAL '1' MINUTE) AS session_end_time, SUM(price) AS total_spend
FROM sales
GROUP BY SESSION(proctime, INTERVAL '1' MINUTE), customer_id

The following table shows our results.

session_start_timesession_end_timetotal_spend
2021-04-20 21:31:01.102021-04-20 21:32:01.28250
2021-04-20 21:32:50.302021-04-20 21:32:50.36220

Data filter and consolidation

To show an example of a filter and union operation, we create two separate datasets using the filter condition and combine them using the UNION operation.

The following table shows our input data.

proctimecustomer_idcustomer_card_idproduct_idprice
2021-04-20 21:31:01.10751014E5750DC2A1D110
2021-04-20 21:31:01.2078118B552B4B940D080
2021-04-20 21:31:01.2875101E6DA5387367B60
2021-04-20 21:32:50.30781014E5750DC2A1D110
2021-04-20 21:32:50.36751484E5750DC2A1D110

We use the following code for our query:

%flink.ssql(type=update)
SELECT * FROM (
(SELECT customer_id, product_id, price FROM sales WHERE price > 100 AND  product_id <> '4E5750DC2A1D')
UNION
(SELECT customer_id, product_id, price FROM sales WHERE product_id = '4E5750DC2A1D' AND price > 250)
)

The following table shows our results.

customer_idproduct_idprice
784E5750DC2A1D300
75B552B4B940D0170
78B552B4B940D0110
754E5750DC2A1D260

Table joins

Flink SQL APIs support different types of join conditions, like inner join, outer join, and interval join. You want to limit the resource utilization from growing indefinitely, and run joins effectively. For that reason, in our example, we use table joins using an interval join. An interval join requires one equi-join predicate and a join condition that bounds the time on both sides. In this example, we join the dataset of two Kinesis Data Streams tables based on the card ID, which is a common field between the two stream datasets. The filter condition in the query is based on a time constraint, which restricts resource utilization from growing.

The following table shows our sales input data.

proctimecustomer_idcustomer_card_idproduct_idprice
2021-04-20 21:31:01.10751014E5750DC2A1D110
2021-04-20 21:31:01.2078118B552B4B940D080
2021-04-20 21:31:01.2875101E6DA5387367B60
2021-04-20 21:32:50.30781014E5750DC2A1D110
2021-04-20 21:32:50.36751484E5750DC2A1D110

The following table shows our cards input data.

card_idcard_numbercard_zipcard_name
1012327439723738Laura Perez
118540934727422Karla Johnson
1012327439723738Laura Perez
1012327439723738Laura Perez
148913688107422Peter Han

We use the following code for our query:

%flink.ssql(type=update)
SELECT sales.proctime, customer_card_id, card_zip, product_id, price
FROM card INNER JOIN sales ON card.card_id = sales.customer_card_id
WHERE sales.proctime BETWEEN card.proctime - INTERVAL '5' MINUTE AND card.proctime;

The following table shows our results.

proctimecustomer_card_idcard_zipproduct_idprice
2021-04-20 21:31:01.10101237384E5750DC2A1D110
2021-04-20 21:31:01.201187422B552B4B940D080
2021-04-20 21:31:01.2810123738E6DA5387367B60
2021-04-20 21:32:50.30101237384E5750DC2A1D110
2021-04-20 21:32:50.3614874224E5750DC2A1D110

 Data partitioning and ranking

To show the example of Top-N records, we use the same input dataset as in the previous join example. In this example, we run a query to find the top sales records by sales price in each zip code. We use the OVER window clause to rank sales in each zip code using a PARTITION BY clause. Next, we order the records in each zip code with an ORDER BY clause on the price field in descending order. The result of this operation is a ranking of each record based on the OVER clause condition. We use the external block of the query to filter the result on ranking so that we get the top sales in each zip code.

We use the following code for our query:

%flink.ssql(type=update)
SELECT card_zip, customer_card_id, product_id, price FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY card_zip ORDER BY price DESC) as row_num
FROM card INNER JOIN sales ON card.card_id = sales.customer_card_id
WHERE sales.proctime BETWEEN card.proctime - INTERVAL '5' MINUTE AND card.proctime
)
WHERE row_num = 1

The following table shows our results.

card_zipcustomer_card_idproduct_idprice
237381014E5750DC2A1D110
74221484E5750DC2A1D110

Data transformation

There are times when you want to transform incoming data. The Flink SQL API has many built-in functions to support a wide range of data transformation requirements, including string functions, date functions, arithmetic functions, and so on. For the complete list, see System (Built-in) Functions.

Extract a portion of a string

In this example, we use the SUBSTR string function to subtract the first four digits and only return the last four digits of the card number.

The following table shows our sales input data.

proctimecustomer_idcustomer_card_idproduct_idprice
2021-04-20 21:31:01.10751014E5750DC2A1D110
2021-04-20 21:31:01.2078118B552B4B940D080
2021-04-20 21:31:01.2875101E6DA5387367B60
2021-04-20 21:32:50.30781014E5750DC2A1D110
2021-04-20 21:32:50.36751484E5750DC2A1D110

The following table shows our cards input data.

card_idcard_numbercard_zipcard_name
1012327439723738Laura Perez
118540934727422Karla Johnson
1012327439723738Laura Perez
1012327439723738Laura Perez
148913688107422Peter Han

We use the following code for our query:

%flink.ssql(type=update)
SELECT proctime, SUBSTR(card_number,5) AS partial_card_number,    card_zip, product_id, price
FROM card INNER JOIN sales ON card.card_id = sales.customer_card_id

The following table shows our results.

proctimepartial_card_numbercard_zipproduct_idprice
2021-04-20 21:31:01.104397237384E5750DC2A1D110
2021-04-20 21:31:01.2034727422B552B4B940D080
2021-04-20 21:31:01.28439723738E6DA5387367B60
2021-04-20 21:32:50.304397237384E5750DC2A1D110
2021-04-20 21:32:50.36881074224E5750DC2A1D110

Replace a substring

In this example, we use the REGEXP_REPLACE string function to remove all the characters after the space from the card_name field. Assuming that the first name and last name are separated by a space, the query returns the first name only.

The following table shows our cards input data.

card_idcard_numbercard_zipcard_name
1012327439723738Laura Perez
118540934727422Karla Johnson
1012327439723738Laura Perez
1012327439723738Laura Perez
148913688107422Peter Han

We use the following code for our query:

%flink.ssql(type=update)
SELECT card_id, REGEXP_REPLACE(card_name,' .*','') card_name
FROM card

The following table shows our results.

card_idcard_name
101Laura
118Karla
101Laura
101Laura
148Jason

Split the string field into multiple fields

In this example, we use the SPLIT_INDEX string function to split the card_name field into first_name and last_name, assuming the card_name field is a full name separated by space.

The following table shows our cards input data.

card_idcard_numbercard_zipcard_name
1012327439723738Laura Perez
118540934727422Karla Johnson
1012327439723738Laura Perez
1012327439723738Laura Perez
148913688107422Peter Han

We use the following code for our query:

%flink.ssql(type=update)
SELECT card_id, SPLIT_INDEX(card_name,' ',0) first_name, SPLIT_INDEX(card_name,' ',1) last_name
FROM card

The following table shows our results.

card_idfirst_namelast_name
101LauraPerez
118KarlaJohnson
101LauraPerez
101LauraPerez
148PeterHan

Transform data using a CASE statement

There are times when you want to transform the result value and apply labels to get insights. For our example, we label the risk level as high, medium, or low for every customer (who is purchasing in the window) based on the number of purchases in the last 5-minute sliding window that emits results every 30 seconds.

The following table shows our input data.

proctimecustomer_idcustomer_card_idproduct_idprice
2021-04-20 21:31:30.10751014E5750DC2A1D110
2021-04-20 21:31:38.2078118B552B4B940D080
2021-04-20 21:31:42.2875101E6DA5387367B60
2021-04-20 21:31:50.30781014E5750DC2A1D110
2021-04-20 21:31:50.36751484E5750DC2A1D110

We use the following code for our query:

%flink.ssql(type=update)
SELECT customer_id, CASE
WHEN total_purchases BETWEEN 1 AND 2 THEN 'LOW'
WHEN total_purchases BETWEEN 3 AND 10 THEN 'MEDIUM'
ELSE 'HIGH'
END as risk
FROM (
SELECT HOP_END(proctime, INTERVAL '30' SECOND, INTERVAL '5' MINUTE) AS winend
, customer_id, COUNT(1) AS total_purchases
FROM sales
GROUP BY HOP(proctime, INTERVAL '30' SECOND, INTERVAL '5' MINUTE), customer_id
)

The following table shows our results.

customer_idrisk
78LOW
75HIGH

DateTime data transformation

The Flink SQL API has a wide range of built-in functions to operate on the date timestamp field, like extracting the day, month, week, hour, minute, day of the month, and so on. There are functions to convert the date timestamp field. In this example, we use the MINUTE and HOUR functions to extract the minute of an hour and the hour from the timestamp field.

The following table shows our sales input data.

proctimecustomer_idcustomer_card_idproduct_idprice
2021-04-20 21:31:01.10751014E5750DC2A1D110
2021-04-20 21:31:01.2078118B552B4B940D080
2021-04-20 21:31:01.2875101E6DA5387367B60
2021-04-20 21:32:50.30781014E5750DC2A1D110
2021-04-20 21:32:50.36751484E5750DC2A1D110

We use the following code for our query:

%flink.ssql(type=update)
SELECT HOUR(TIMESTAMP proctime) AS transaction_hour, MINUTE(TIMESTAMP proctime) AS transaction_min,customer_id, product_id, price
FROM sales

The following table shows our results.

transaction_hourtransaction_mincustomer_idproduct_idprice
2131754E5750DC2A1D110
213178B552B4B940D080
213175E6DA5387367B60
2132784E5750DC2A1D110
2132754E5750DC2A1D110

Conclusion

In this post, we used sales and card examples to demonstrate different query patterns to get insight from streaming data using Apache Flink SQL APIs. We walked you through examples of Flink SQL queries that you can run within Kinesis Data Analytics Studio. In just a few minutes, you can start running interactive analytics with the examples in this post.

You can quickly start developing a stream processing application using Studio from the supported languages like SQL, Python, and Scala. If you want to generate continuous actionable insights, you can easily build and deploy your code as an Apache Flink application with durable state from the notebook within Studio. For more information, see Deploying as an application with durable state.

For further reading on Flink SQL queries that you can use in Kinesis Data Analytics Studio, visit the official page at Apache Flink 1.11 SQL Queries.


About the Authors

Sam MokhtariDr. Sam Mokhtari is a Senior Solutions Architect at AWS. His main area of depth is “Data & Analytics” and he published more than 30 influential articles in this field. He is also a respected data & analytics advisor who led several large-scale implementation projects across different industries including energy, health, telecom and transport.

 

 

mitesh patel 100Mitesh Patel is a Senior Solutions Architect at AWS. He works with customers in SMB to help them develop scalable, secure and cost effective solutions in AWS. He enjoys helping customers in modernizing applications using microservices and implementing serverless analytics platform.

Categories: Big Data