Big Data on Amazon EMR – Apache Spark vs Apache Hadoop
Big Data Architectural Patterns & Best Practices on AWS
Building AWS Glue Spark ETL jobs using Amazon DocumentDB (with MongoDB compatibility) & MongoDB on EC2
ConnectionType
. The following diagram illustrates the three components of the solution architecture:
- Amazon DocumentDB
- AWS Glue
- MongoDB on Amazon Elastic Compute Cloud (Amazon EC2)
Prerequisites
Before getting started, you must complete the following prerequisites:
- Create an AWS Identity and Access Management (IAM) user with sufficient permissions to interact with the AWS Management Console. Your IAM permissions must also include access to create IAM roles and policies created by the AWS CloudFormation template provided in this post.
- Create an IAM policy for AWS Glue.
- Save the following code as
DocumentDB-Glue-ETL.py
in your S3 bucket.
- Save the following code as
MongoDB-Glue-ETL.py
in your S3 bucket.
Provisioning resources with AWS CloudFormation
For this post, we provide CloudFormation templates for you to review and customize to your needs. Some of the resources deployed by this stack incur costs as long as they remain in use, such as Amazon DocumentDB and Amazon EC2.
For instructions on launching your stacks, see Launching an Amazon DocumentDB AWS CloudFormation Stack and MongoDB on the AWS Cloud: Quick Start Reference Deployment.
The Amazon DocumentDB stack creation can take up to 15 minutes, and MongoDB stack creation can take up 60 minutes.
When stack creation is complete, go to the Outputs tab for the stack on the AWS CloudFormation console and note down the following values (you use these in later steps):
- DocumentDB CloudFormation –
ClusterEndpoint
andClusterPort
- MongoDB CloudFormation –
PrimaryReplicaNodeIp
Preparing your collection
When the CloudFormation stack is complete, use an EC2 instance to connect to your Amazon DocumentDB cluster. For instructions, see Install the mongo shell, Connect to your Amazon DocumentDB cluster, and Insert and query data.
For instructions on accessing Amazon DocumentDB from Amazon EC2 in the same VPC, see Connect Using Amazon EC2.
For more information about MongoDB, see Connect to MongoDB nodes and Testing MongoDB.
Before creating your AWS Glue ETL job, use the mongo shell to insert a few entries into a collection titled profiles
. See the following code:
You’re now ready to configure AWS Glue ETL jobs using Amazon DocumentDB and MongoDB ConnectionType
.
Setting up AWS Glue connections
You set up two separate connections for Amazon DocumentDB and MongoDB when the databases are in two different VPCs (or if you deployed the databases using the provided CloudFormation template). Complete the following steps for both connections. We first walk you through the Amazon DocumentDB connection.
- On the AWS Glue console, under Databases, choose Connections.
- Choose Add connection.
- For Connection name, enter a name for your connection.
- If you have SSL enabled on your Amazon DocumentDB cluster (which is what the CloudFormation template in this post used), select Require SSL connection.
- For Connection Type, choose Amazon DocumentDB or MongoDB.
- Choose Next.
- For Amazon DocumentDB URL, enter a URL using the output from the CloudFormation stack, such as
mongodb://host:port/databasename
(use the default port, 27017). - For Username and Password, enter the credentials you entered as parameters when creating the CloudFormation stack.
- For VPC, choose the VPC in which you created databases (Amazon DocumentDB and MongoDB).
- For Subnet, choose the subnet within your VPC.
- For Security groups, select your security group.
- Choose Next.
- Review the connection details and choose Finish.
Similarly, add the connection for MongoDB with the following changes to the steps:
- If you used the CloudFormation template in this post, don’t select Require SSL connection for MongoDB
- For Connection Type, choose MongoDB
- For MongoDB URL, enter a URL using the output from the CloudFormation stack, such as
mongodb://host:port/databasename
(use the default port, 27017)
Creating an AWS Glue endpoint, S3 endpoint, and security group
Before testing the connections, make sure you create an AWS Glue endpoint and S3 endpoint in the VPC in which the databases are created. Complete the following steps for both Amazon DocumentDB and MongoDB instances separately:
- To create your AWS Glue endpoint, on the Amazon VPC console, choose Endpoints.
- Choose Create endpoint.
- For Service Name, choose AWS Glue.
- Search for and select
com.amazonaws.<region>.glue
(for example,com.amazonaws.us-west-2.glue
). Enter the appropriate Region where the database instance was created. - For VPC, choose the VPC of the Amazon DocumentDB
- For Security group, select the security groups of the Amazon DocumentDB cluster.
- Choose Create endpoint.
- To create your S3 endpoint, on the Amazon VPC console, choose Endpoints.
- Choose Create endpoint.
- For Service Name, choose Amazon S3.
- Search for and select
com.amazonaws.<region>.s
3 (for example, com.amazonaws.us-west-2.s3). Enter the appropriate Region. - For VPC, choose the VPC of the Amazon DocumentDB
- For Configure route tables, select the route table ID of the associated subnet of the database.
- Choose Create endpoint.
Similarly, add an AWS Glue endpoint and S3 endpoint for MongoDB with the following changes:
- Choose the VPC of the Amazon MongoDB instance
The Amazon security group must include itself as a source in its inbound rules. Complete the following steps for both Amazon DocumentDB and MongoDB instances separately:
- On the Security Groups page, choose Edit Inbound Rules.
- Choose Add rule.
- For Type, choose All traffic.
- For Source, choose the same security group.
- Choose Save rules.
The objective of setting up a connection is to establish private connections between the Amazon DocumentDB and MongoDB instances in the VPC and AWS Glue via the S3 endpoint, AWS Glue endpoint, and security group. It’s not required to test the connection because that connection is established by the AWS Glue job when you run it. At the time of writing, testing an AWS Glue connection is not supported for Amazon DocumentDB connections.
Code for building the AWS Glue ETL job
The following sample code sets up a read connection with Amazon DocumentDB for your AWS Glue ETL job (PySpark):
The following sample code sets up a write connection with Amazon DocumentDB for your AWS Glue ETL job (PySpark):
The following sample code creates an AWS Glue DynamicFrame by using the read and write connections for your AWS Glue ETL job (PySpark):
Setting up AWS Glue ETL jobs
You’re now ready to set up your ETL job in AWS Glue. Complete the following steps for both Amazon DocumentDB and MongoDB instances separately:
- On the AWS Glue console, under ETL, choose Jobs.
- Choose Add job.
- For Job Name, enter a name.
- For IAM role, choose the IAM role you created as a prerequisite.
- For Type, choose Spark.
- For Glue Version, choose Python (latest version).
- For This job runs, choose An existing script that you provide.
- Choose the Amazon S3 path where the script (
DocumentDB-Glue-ETL.py
) is stored. - Under Advanced properties, enable Job bookmark.
Job bookmarks help AWS Glue maintain state information and prevent the reprocessing of old data.
- Keep the remaining settings at their defaults and choose Next.
- For Connections, choose the Amazon DocumentDB connection you created.
- Choose Save job and edit scripts.
- Edit the following parameters:
documentdb_uri
ormongo_uri
documentdb_write_uri
orwrite_uri
user
password
output_path
- Choose Run job.
When the job is finished, validate the data loaded in the collection.
Similarly, add the job for MongoDB with the following changes:
- Choose the Amazon S3 path where the script (
MongoDB-Glue-ETL.py
) is stored - For Connections, choose the Amazon MongoDB connection you created
- Change the parameters applicable to MongoDB (
mongo_uri
andwrite_uri
)
Cleaning up
After you finish, don’t forget to delete the CloudFormation stack, because some of the AWS resources deployed by the stack in this post incur a cost as long as you continue to use them.
You can delete the CloudFormation stack to delete all AWS resources created by the stack.
- On the AWS CloudFormation console, on the Stacks page, select the stack to delete. The stack must be currently running.
- On the stack details page, choose Delete.
- Choose Delete stack when prompted.
Additionally, delete the AWS Glue endpoint, S3 endpoint, AWS Glue connections, and AWS Glue ETL jobs.
Summary
In this post, we showed you how to build AWS Glue ETL Spark jobs and set up connections using ConnectionType
for Amazon DocumentDB and MongoDB databases using AWS CloudFormation. You can use this solution to read data from Amazon DocumentDB or MongoDB, and transform it and write to Amazon DocumentDB or MongoDB or other targets like Amazon S3 (using Amazon Athena to query), Amazon Redshift, Amazon DynamoDB, Amazon Elasticsearch Service (Amazon ES), and more.
Using Spark SQL for ETL
Original Article by Ben Snively, May 2016
With big data, you deal with many different formats and large volumes of data. SQL-style queries have been around for nearly four decades. Many systems support SQL-style syntax on top of the data layers, and the Hadoop/Spark ecosystem is no exception. This allows companies to try new technologies quickly without learning a new query syntax for basic retrievals, joins, and aggregations.
Amazon EMR is a managed service for the Hadoop and Spark ecosystem that allows customers to quickly focus on the analytics they want to run, not the heavy lifting of cluster management.
In this post, we demonstrate how you can leverage big data platforms and still write queries using a SQL-style syntax over data that is in different data formats within a data lake. We first show how you can use Hue within EMR to perform SQL-style queries quickly on top of Apache Hive. Then we show you how to query the dataset much faster using the Zeppelin web interface on the Spark execution engine. Lastly, we show you how to take the result from a Spark SQL query and store it in Amazon DynamoDB.
Hive and Spark SQL history
For versions <= 1.x, Apache Hive executed native Hadoop MapReduce to run the analytics and often required the interpreter to write multiple jobs that were chained together in phases. This allowed massive datasets to be queried but was slow due to the overhead of Hadoop MapReduce jobs.
SparkSQL adds this same SQL interface to Spark, just as Hive added to the Hadoop MapReduce capabilities. SparkSQL is built on top of the Spark Core, which leverages in-memory computations and RDDs that allow it to be much faster than Hadoop MapReduce.
Spark integrates easily with many big data repositories. The following illustration shows some of these integrations.
Using SparkSQL for ETL
In the second part of this post, we walk through a basic example using data sources stored in different formats in Amazon S3. Using a SQL syntax language, we fuse and aggregate the different datasets, and finally load that data into DynamoDB as a full ETL process.
The table below summarizes the datasets used in this post.
Create a table in Hive/Hue
Hive and SparkSQL let you share a metadata catalogue. This allows you to create table definitions one time and use either query execution engine as needed. All table definitions could have been created in either tool exclusively as well.
First, launch an EMR cluster with Hive, Hue, Spark, and Zeppelin configured. It’s recommended that you run a cluster with at least four core nodes if the default instance size is m3.xlarge.
Then launch a Hue browser and navigate to the query section. To learn how to enable web interface access to Hue, see View Web Interfaces Hosted on Amazon EMR Clusters.
The first table to create is the ratings table. The table definition specifies the tab-separated values in the ROW FORMAT line below:
After you create the table, you select the row icon to the left of the table to refresh the table listing on the left side and see sample data.
Next, create the MovieDetails table to query over. This data has two delimiters: a hash for the columns and a pipe for the elements in the genre array.
After you create the array, the genres appear in the sample data browser.
Transform the data using SparkSQL/Zeppelin
Now interact with SparkSQL through a Zeppelin UI, but re-use the table definitions you created in the Hive metadata store. You’ll create another table in SparkSQL later in this post to show how that would have been done there.
Connect to the Zeppelin UI and create a new notebook under the Notebook tab. Query to show the tables. You can see that the two tables you created in Hive are also available in SparkSQL.
Using SparkSQL, you can perform the same query as you did in Hive in a previous step.
Note: The last semi-colon at the end of the statement was removed.
This time, it will usually take less than 30 seconds for SparkSQL to query the data and return the results. The actual response time depends on the size of the EMR cluster.
Spark lets you leverage an RDD for data that is queried and iterated over. You can tell Spark to do this with your usermovieratings table, by executing the following command:
and:
Now, execute the query again:
This time, the query returned within a couple seconds so that analysts can quickly interact with the large data set in the RDD.
Suppose you want the same information as the previous query, but this time broken out by the top five movies for males and the top five for females. To do this, bring in the data set user-details. This data set contains information such as gender and occupation. This data set is pipe delimited.
This query combines two queries in a union statement. The first query gets the five top-rated movies for males using all three datasets and then combines the results with the five top-rated movies for females:
Because the ratings table is still cached in the SparkContext, the query happens quickly (in this case, four seconds).
Load the transformed data into DynamoDB
Next, create a new DynamoDB table that saves the number of ratings that users voted on, per genre and rating number. To query this, you first need to figure out which movies were voted on. Combine that information with the movie details data and figure out the movie’s genres to know how are users voting per genre.
The following SQL statement queries for that information and returns the counts:
Notice that you are exploding the genre list in the moviedetails table, because that column type is the list of genres for a single movie.
Create a new DynamoDB table to store the results of the SQL query in the same region in which you are running. In this post, we use us-east-1. Use the following settings:
Note: Change the type for the range key, because the code below stores the rating as a number.
Next, SSH to the master node for the EMR cluster. Here’s how to use the EMR-DDB connector in conjunction with SparkSQL to store data in DynamoDB.
Start a Spark shell, using the EMR-DDB connector JAR file name:
To learn how this works, see the Analyze Your Data on Amazon DynamoDB with Apache Spark blog post.
Paste this code into the Spark shell prompt:
After you run the code, notice that the DynamoDB table now has 95 entries which contain the rating and the number of ratings per genre.
The ddbConf defines the Hadoop configuration that allows Spark to use a custom Hadoop input/output for reading and writing the RDD being created.
The next major piece of code executes the SparkSQL statement. At this point, query the different datasets in S3 to get the data to store in DynamoDB.
The query result is stored in a Spark DataFrame that you can use in your code.
After you have the DataFrame, perform a transformation to have an RDD that matches the types that the DynamoDB custom output format knows how to write. The custom output format expects a tuple containing the Text and DynamoDBItemWritable types.
Create a new RDD with those types in it, in the following map call:
The ddbInsertFormattedRDD now contains elements that look like this for the DynamoDBItemWritable element in the tuple:
{count={N: 4049,}, category={S: Action,}, rating={N: 3,}}
{count={N: 5560,}, category={S: Action,}, rating={N: 4,}}
{count={N: 3718,}, category={S: Action,}, rating={N: 5,}}
{count={N: 654,}, category={S: Adventure,}, rating={N: 1,}}
{count={N: 1126,}, category={S: Adventure,}, rating={N: 2,}}
This last call uses the job configuration that defines the EMR-DDB connector to write out the new RDD you created in the expected format:
Conclusion
EMR makes it easy to run SQL-style analytics in both Spark and Hive. As this post has shown, connectors within EMR and the open source community let you easily talk to many data sources, including DynamoDB. Rather than focusing on standing up the software and managing the cluster, with EMR you can quickly process and analyze your data and store the results in destinations such as NoSQL repositories and data warehouses.
If you have a question or suggestion, please leave a comment below.
————————————-
Related
Querying Amazon Kinesis Streams Directly with SQL and Spark Streaming