As businesses around the world are embarking on building innovative solutions, we’re seeing a growing trend adopting data science workloads across various industries. Recently, we’ve seen a greater push towards reducing the friction between data engineers and data scientists. Data scientists are now enabled to run their experiments on their local machine and port to it powerful clusters that can scale without rewriting the code.

You have many options for running data science workloads, such as running it on your own managed Spark cluster. Alternatively there are cloud options such as Amazon SageMaker, Amazon EMR and Amazon Elastic Kubernetes Service (Amazon EKS) clusters. We’re also seeing customers adopting Dask—a distributed data science computing framework that natively integrates with Python libraries such as Pandas, NumPy, and Scikit-learn machine learning (ML) libraries. These libraries were developed in Python and originally optimized for single-machine processing. Dask was developed to help scale these widely used packages for big data processing. In the past few years, Dask has matured to solve CPU and memory-bound ML problems such as big data processing, regression modeling, and hyperparameter optimization.

Dask provides the distributed computing framework to scale your CPU and memory-bound ML problems beyond a single machine. However, the underlying infrastructure resources have to be provided to the framework. You can use AWS Fargate to provide those infrastructure resources. Fargate is a serverless compute engine for containers that works with both Amazon Elastic Container Service (Amazon ECS) and Amazon EKS. Fargate makes it easy for you to focus on building your applications. Fargate removes the need to provision and manage servers, lets you specify and pay for resources per application, and improves security through application isolation by design. Fargate ensures that the infrastructure your containers run on is always up to date with the required patches.

In this post, we demonstrate how to solve common ML problems such as exploratory data analysis on large datasets, preprocessing (such as one-hot encoding), and linear regression modeling using Fargate as the backend. We show you how to connect to a distributed Dask Fargate cluster from a SageMaker notebook, scale out Dask workers, and perform exploratory data analysis work on large public New York taxi trip datasets, containing over 100 million trips. Then we demonstrate how you can run regression algorithms on a distributed Dask cluster. Finally, we demonstrate how you can monitor the operational metrics of a Dask cluster that is fronted by a Network Load Balancer to access the cluster-monitoring dashboard from the internet.

Solution overview

Our use case is to demonstrate how to perform exploratory data analysis on large datasets (over 10 GB with hundreds of millions of records) and run a linear regression algorithm on a distributed Dask cluster. For this use case, we use the publicly available New York City Taxi and Limousine Commission (TLC) Trip Record Data. We use a SageMaker notebook with the backend integrated with a scalable distributed Dask cluster running on Amazon ECS on Fargate.

The following diagram illustrates the solution architecture.

The following diagram illustrates the solution architecture.

We provide an AWS CloudFormation template to provision the following resources:

You then complete the following manual setup steps:

  1. Register the Dask schedulers task’s private IP as the target in the NLB.
  2. Upload the example notebook to the SageMaker notebook instance.
  3. Follow the instructions in the notebook, which we also walk through in this post.

Implementing distributed Dask on Fargate using AWS CloudFormation

To provision your resources with AWS CloudFormation, complete the following steps:

  1. Log in to your AWS account and choose your Region.
  2. On the AWS CloudFormation console, create a stack using the following template.

On the AWS CloudFormation console, create a stack using the following template.

  1. Provide the stack parameters.

Provide the stack parameters.

  1. Acknowledge that AWS CloudFormation might need additional resources and capabilities.
  2. Choose Create stack.

Choose Create stack.

Implementing distributed Dask on Fargate using the AWS CLI

To implement distributed Dask using the AWS Command Line Interface (AWS CLI), complete the following steps:

  1. Install the AWS CLI.
  2. Run the following command to create the CloudFormation stack:
    aws cloudformation create-stack --template-url https://aws-ml-blog.s3.amazonaws.com/artifacts/machine-learning-on-distributed-dask/dask-fargate-main.template --stack-name dask-fargate --capabilities "CAPABILITY_AUTO_EXPAND" "CAPABILITY_IAM" "CAPABILITY_NAMED_IAM" --region us-east-1

Setting up Network Load Balancer to monitor the Dask cluster

To set up NLB to monitor your Fargate Dask cluster, complete the following steps:

  1. On the Amazon ECS console, choose Clusters.
  2. Choose your cluster.
  3. Choose the Dask scheduler service.
  4. On the Tasks tab, choose the running task.

On the Tasks tab, choose the running task.

  1. Copy the value for Private IP.

Copy the value for Private IP.

  1. On the Amazon Elastic Compute Cloud (Amazon EC2) console, choose Target groups.
  2. Choose dask-scheduler-tg1.
  3. On the Targets tab, choose Register targets.

On the Targets tab, choose Register targets.

  1. For Network, choose dask-vpc-main.
  2. For IP, enter the IP you copied earlier.
  3. For Ports, enter 8787.
  4. Choose Include as pending below.

Choose Include as pending below.

  1. Wait until the targets are registered and then navigate to the Load Balancers page on the Amazon EC2 console.
  2. On the Description tab, copy the value for DNS name.

On the Description tab, copy the value for DNS name.

  1. Enter the DNS name into your browser to view the Dask dashboard.

Enter the DNS name into your browser to view the Dask dashboard.

For demo purposes, we set up our Network Load Balancer in a public subnet without certificates. We recommend securing the Network Load Balancer with certificates and appropriate firewall rules.

Machine learning using Dask on Fargate: Notebook overview

To walk through the accompanying notebook, complete the following steps:

  1. On the Amazon ECS console, choose Clusters.
  2. Ensure that Fargate-Dask-Cluster is running with one task each for Dask-Scheduler and Dask-Workers.
  3. On the SageMaker console, choose Notebook instances.
  4. Open Jupyter and upload dask-sm-fargate-example.ipynb.
  5. Run each cell of the notebook and observe the results (see the following sections for details on each cell).
  6. Use the Network Load Balancer public DNS to monitor the performance of the cluster as you run the notebook cells.

Setting up conda package dependencies

The SageMaker notebook’s conda_python3 environment ships with a set of four packages. To run a distributed dask, you need to bring a few new packages as well the updated version of the existing packages. Run conda install for these packages: scikit-learn 0.23, dask-ml 1.6.0, and cloudpickle 1.6.0. See the following code:

!conda install scikit-learn=0.23.2 -c conda-forge -n python3 -y
!conda install -n python3 dask-ml=1.6.0 -c conda-forge -y
!conda install cloudpickle=1.6.0 -c conda-forge -y
!conda install s3fs=0.4.0 -c conda-forge -y

Each command takes about 5 minutes to install because it needs to resolve dependencies.

Setting up the Dask client

The client is the primary entry point for users of distributed Dask. You register the client to the distributed Dask scheduler that is deployed and running in the Fargate cluster. See the following code:

from dask.distributed import Client
client = Client('Dask-Scheduler.local-dask:8786')

Scaling Dask workers

Distributed Dask is a centrally managed, distributed, dynamic task scheduler. The central dask-scheduler process coordinates the actions of several dask-worker processes spread across multiple machines and the concurrent requests of several clients. Internally, the scheduler tracks all work as a constantly changing directed acyclic graph of tasks.

You can scale the Dask workers to add more compute and memory capacity for your data science workload. See the following code:

!sudo aws ecs update-service --service Dask-Workers --desired-count 20 --cluster Fargate-Dask-Cluster
client.restart()

Alternatively, you can use the Service Auto Scaling feature of Fargate to automatically scale the resources (number of tasks).

After client restart, you now have 40 cores with over 80 GB of memory, as shown in the following screenshot.

After client restart, you now have 40 cores with over 80 GB of memory, as shown in the following screenshot.

You can verify this on the Amazon ECS console by checking the Fargate Dask workers.

Exploratory data analysis of New York taxi trips with Dask DataFrame

A Dask DataFrame is a large parallel DataFrame composed of many smaller Pandas DataFrames, split along the index. These Pandas DataFrames may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster. One Dask DataFrame operation triggers many operations on the constituent Pandas DataFrames. The following diagram illustrates a Dask DataFrame.

The following diagram illustrates a Dask DataFrame.

Lazy loading taxi trips from Amazon S3 to a Dask DataFrame

Use the s3fs and dask.dataframe libraries to load the taxi trips from the public Amazon Simple Storage Service (Amazon S3) bucket into a distributed Dask DataFrame. S3FS builds on botocore to provide a convenient Python file system interface for Amazon S3. See the following code:

import s3fs
import dask.dataframe as dd df = dd.read_csv( 's3://nyc-tlc/trip data/yellow_tripdata_2018-*.csv', storage_options={'anon': True}, parse_dates=['tpep_pickup_datetime','tpep_dropoff_datetime']
)

Dask lazily loads the records as computations are performed, unlike the Pandas DataFrame. The datasets contain over 100 million trips, which are loaded into the distributed Dask DataFrame, as shown in the following screenshot.

The datasets contain over 100 million trips, which are loaded into the distributed Dask DataFrame, as shown in the following screenshot.

Taxi trip data structure

You can determine the data structure of the loaded Dask DataFrame using df.dtypes:

VendorID int64
tpep_pickup_datetime datetime64[ns]
tpep_dropoff_datetime datetime64[ns]
passenger_count int64
trip_distance float64
RatecodeID int64
store_and_fwd_flag object
PULocationID int64
DOLocationID int64
payment_type int64
fare_amount float64
extra float64
mta_tax float64
tip_amount float64
tolls_amount float64
improvement_surcharge float64
total_amount float64
trip_dur_secs int64
pickup_date object

Calculate the maximum trip duration

You have over 100 million trips loaded into the Dask DataFrame, and now you need to determine of all the trips that took the maximum amount of time to complete. This is a memory and compute-intensive operation and hard to perform on a single machine with limited resources. Dask allows you to use the familiar DataFrame operations, but on the backend runs those operations on a scalable distributed Fargate cluster of nodes. See the following code:

df['trip_dur_secs'] = (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']).dt.seconds
max_trip_duration = df.trip_dur_secs.max().compute()

The following screenshot shows the output.

The following screenshot shows the output.

Calculating the average number of passengers by pickup date

Now you want to know the average number of passengers across all trips for each pickup date. You can easily compute that across millions of trips using the following code:

df['pickup_date'] = df['tpep_dropoff_datetime'].dt.date
df_mean_psngr_pickup_date = df.groupby('pickup_date').passenger_count.mean().compute()

The following screenshot shows the output.

The following screenshot shows the output.

You can also perform aggregate operations across the entire dataset to identify the total number of trips and trip distance for each vendor, as shown in the following screenshot.

You can also perform aggregate operations across the entire dataset to identify the total number of trips and trip distance for each vendor

You can determine the memory usage across the Dask cluster nodes using the Dask workers dashboard, as shown in the following screenshot. For the preceding operation, the consumed memory across workers amounts to 31 GB.

For the preceding operation, the consumed memory across workers amounts to 31 GB.

 

As you run the preceding code, navigate to the Dask dashboard to see how Dask performs those operations. The following screenshot shows an example visualization of the Dask dashboard.

The following screenshot shows an example visualization of the Dask dashboard.

The visualization shows from-delayed in the progress pane. Sometimes we face problems that are parallelizable, but don’t fit into high-level abstractions like Dask Array or Dask DataFrame. For those problems, the Dask delayed function decorates your functions so they operate lazily. Rather than running your function immediately, it defers running and places the function and its arguments into a task graph.

Persisting collections into memory

You can pin the data in distributed memory of the worker nodes using the distributed Dask’s persist API:

df_persisted = client.persist(df)

Typically, we use asynchronous methods like client.persist to set up large collections and then use df.compute() for fast analyses. For example, you can compute the maximum trip distance across all trips:

max_trip_dist = df_persisted.trip_distance.max().compute()

The following screenshot shows the output.

The following screenshot shows the output.

In the preceding cell, this computation across 102 million trips took just 344 milliseconds. This is because the entire Dask DataFrame was persisted into the memory of the Fargate worker nodes, thereby enabling computations to run significantly faster.

Visualizing trips with Dask DataFrame

Dask DataFrame supports visualization with matplotlib, which is similar to Pandas DataFrame.

Imagine you want to know the top 10 expensive rides by pickup location. You can visualize that as in the following screenshot.

You can visualize that as in the following screenshot.

Predicting trip duration with Dask ML linear regression

As we have explored the taxi trips dataset, now we predict the duration of trips when no pickup and drop-off times are available. We use the historical trips that have a labeled trip duration to train a linear regression model and then use that model to predict trip duration for new trips that have no pickup and drop-off times.

We use LinearRegression from dask_ml.linear_model and the Dask Fargate cluster as the backend for training the model. Dask ML provides scalable ML in Python using Dask alongside popular ML libraries like Scikit-learn, XGBoost, and others.

The dask_ml.linear_model module implements linear models for classification and regression. Dask ML algorithms integrates with joblib to submit jobs to the distributed Dask cluster.

For training the model, we need to prepare the training and testing datasets. We use the dask_ml.model_selection library to create those datasets, as shown in the following screenshot.

We use the dask_ml.model_selection library to create those datasets, as shown in the following screenshot.

We use the dask_ml.model_selection library to create those datasets, as shown in the following screenshot.

After you train the model, you can run a prediction against the model to predict the trip duration using the testing dataset, as shown in the following screenshot.

After you train the model, you can run a prediction against the model to predict the trip duration using the testing dataset

Logging and monitoring

You can monitor, troubleshoot, and set alarms for all your Dask cluster resources running on Fargate using Amazon CloudWatch Container Insights. This fully managed service collects, aggregates, and summarizes Amazon ECS metrics and logs. The following screenshot shows an example of the Container Insights UI for the preceding notebook run.

The following screenshot shows an example of the Container Insights UI for the preceding notebook run.

Cleaning up

To clean up your resources, delete your main stack on the CloudFormation console or use the following AWS CLI command:

aws cloudformation delete-stack --stack-name dask-fargate --region us-east-1

Conclusion

In this post, we showed how to stand up a highly scalable infrastructure for performing ML on distributed Dask with a SageMaker notebook and Fargate. We demonstrated the core concepts of how to use Dask DataFrame to perform big data processing such as aggregating records by pickup date and finding the longest trip of over 100 million taxi trips. Then we did an exploratory data analysis and visualized expensive rides. Finally, we showed how to use the Dask ML library to perform linear regression algorithm to predict trip duration.

Give distributed Dask a try for your ML use cases, such as performing exploratory data analysis, preprocessing, and linear regression, and leave your feedback in the comments section. You can also access the resources for this post in the GitHub repo.

References

For resources and more information about the tools and services in this post, see the following:


About the Authors

Ram VittalRam Vittal is an enterprise solutions architect at AWS. His current focus is to help enterprise customers with their cloud adoption and optimization journey to improve their business outcomes. In his spare time, he enjoys tennis, photography, and movies.

 

 

sireesha muppala 100Sireesha Muppala is an AI/ML Specialist Solutions Architect at AWS, providing guidance to customers on architecting and implementing machine learning solutions at scale. She received her Ph.D. in Computer Science from University of Colorado, Colorado Springs. In her spare time, Sireesha loves to run and hike Colorado trails.