skip to Main Content
"In the middle of difficulty lies opportunity." ~ Albert EinsteinBig Data on Amazon Elastic MapReduce (EMR)

Big Data on Amazon EMR – Apache Spark  vs  Apache Hadoop

Apache Spark Ecosystem

Apache Spark on Amazon EMR

Amazon EMR is the best place to run Apache Spark. You can quickly and easily create managed Spark clusters from the AWS Management Console, AWS CLI, or the Amazon EMR API. Additionally, you can leverage additional Amazon EMR features, including fast Amazon S3 connectivity using the Amazon EMR File System (EMRFS), integration with the Amazon EC2 Spot market and the AWS Glue Data Catalog, and EMR Managed Scaling to add or remove instances from your cluster. AWS Lake Formation brings fine-grained access control, while integration with AWS Step Functions helps with orchestrating your data pipelines. EMR Studio (preview) is an integrated development environment (IDE) that makes it easy for data scientists and data engineers to develop, visualize, and debug data engineering and data science applications written in R, Python, Scala, and PySpark. EMR Studio provides fully managed Jupyter Notebooks, and tools like Spark UI and YARN Timeline Service to simplify debugging. EMR Notebooks make it easy for you to experiment and build applications with Spark. If you prefer, you can use Apache Zeppelin to create interactive and collaborative notebooks for data exploration using Spark.

Features & benefits

Fast Performance

EMR features Amazon EMR runtime for Apache Spark, a performance-optimized runtime environment for Apache Spark that is active by default on Amazon EMR clusters. Amazon EMR runtime for Apache Spark can be over 3x faster than clusters without the EMR runtime, and has 100% API compatibility with standard Apache Spark. This improved performance means your workloads run faster and saves you compute costs, without making any changes to your applications.

By using a directed acyclic graph (DAG) execution engine, Spark can create efficient query plans for data transformations. Spark also stores input, output, and intermediate data in-memory as resilient dataframes, which allows for fast processing without I/O cost, boosting performance of iterative or interactive workloads.

Develop Applications Quickly & Collaboratively

Apache Spark natively supports Java, Scala, SQL, and Python, which gives you a variety of languages for building your applications. Also, you can submit SQL or HiveQL queries using the Spark SQL module. In addition to running applications, you can use the Spark API interactively with Python or Scala directly in the Spark shell or via EMR Studio, oJupyter notebooks on your cluster. Support for Apache Hadoop 3.0 in EMR 6.0 brings Docker container support to simplify managing dependencies. You can also leverage cluster-independent EMR Notebooks (based on Jupyter) or use Zeppelin to create interactive and collaborative notebooks for data exploration and visualization. You can tune and debug your workloads in the EMR console which has an off-cluster, persistent Spark History Server.

Create Diverse Workflows

Apache Spark includes several libraries to help build applications for machine learning (MLlib), stream processing (Spark Streaming), and graph processing (GraphX). These libraries are tightly integrated in the Spark ecosystem, and they can be leveraged out of the box to address a variety of use cases. Additionally, you can use deep learning frameworks like Apache MXNet with your Spark applications. Integration with AWS Step Functions enables you to add serverless workflow automation and orchestration to your applications.

Integration with Amazon EMR Feature Set

Submit Apache Spark jobs with the EMR Step API, use Spark with EMRFS to directly access data in S3, save costs using EC2 Spot capacity, use EMR Managed Scaling to dynamically add and remove capacity, and launch long-running or transient clusters to match your workload. You can also easily configure Spark encryption and authentication with Kerberos using an EMR security configuration. Additionally, you can use the AWS Glue Data Catalog to store Spark SQL table metadata or use Amazon SageMaker with your Spark machine learning pipelines. EMR installs and manages Spark on Hadoop YARN, and you can also add other big data applications on your cluster. EMR with Apache Hudi lets you more efficiently manage change data capture (CDC) and helps with privacy regulations like GDPR and CCPA by simplifying record deletion. Click here for more details about EMR features.

Apache Hadoop Ecosystem

Apache Hadoop on Amazon EMR

• Apache Hadoop is an open source software project that can be used to efficiently process large datasets. Instead of using one large computer to process and store the data, Hadoop allows clustering commodity hardware together to analyze massive data sets in parallel.

• There are many applications and execution engines in the Hadoop ecosystem, providing a variety of tools to match the needs of your analytics workloads. Amazon EMR makes it easy to create and manage fully configured, elastic clusters of Amazon EC2 instances running Hadoop and other applications in the Hadoop ecosystem.

Applications & Frameworks in the Hadoop Ecosystem

Hadoop commonly refers to the actual Apache Hadoop project, which includes MapReduce (execution framework), YARN (resource manager), and HDFS (distributed storage). You can also install Apache Tez, a next-generation framework which can be used instead of Hadoop MapReduce as an execution engine. Amazon EMR also includes EMRFS, a connector allowing Hadoop to use Amazon S3 as a storage layer.

However, there are also other applications and frameworks in the Hadoop ecosystem, including tools that enable low-latency queries, GUIs for interactive querying, a variety of interfaces like SQL, and distributed NoSQL databases. The Hadoop ecosystem includes many open source tools designed to build additional functionality on Hadoop core components, and you can use Amazon EMR to easily install and configure tools such as Hive, Pig, Hue, Ganglia, Oozie, and HBase on your cluster. You can also run other frameworks, like Apache Spark for in-memory processing, or Presto for interactive SQL, in addition to Hadoop on Amazon EMR.

Hadoop: the basic components

Amazon EMR programmatically installs and configures applications in the Hadoop project, including Hadoop MapReduce, YARN, HDFS, and Apache Tez across the nodes in your cluster.

Processing with Hadoop MapReduce, Tez, and YARN

Hadoop MapReduce and Tez, execution engines in the Hadoop ecosystem, process workloads using frameworks that break down jobs into smaller pieces of work that can be distributed across nodes in your Amazon EMR cluster. They are built with the expectation that any given machine in your cluster could fail at any time and are designed for fault tolerance. If a server running a task fails, Hadoop reruns that task on another machine until completion.

You can write MapReduce and Tez programs in Java, use Hadoop Streaming to execute custom scripts in a parallel fashion, utilize Hive and Pig for higher level abstractions over MapReduce and Tez, or other tools to interact with Hadoop.

Starting with Hadoop 2, resource management is managed by Yet Another Resource Negotiator (YARN). YARN keeps track of all the resources across your cluster, and it ensures that these resources are dynamically allocated to accomplish the tasks in your processing job. YARN is able to manage Hadoop MapReduce and Tez workloads as well as other distributed frameworks such as Apache Spark.

Storage using Amazon S3 and EMRFS

By using the EMR File System (EMRFS) on your Amazon EMR cluster, you can leverage Amazon S3 as your data layer for Hadoop. Amazon S3 is highly scalable, low cost, and designed for durability, making it a great data store for big data processing. By storing your data in Amazon S3, you can decouple your compute layer from your storage layer, allowing you to size your Amazon EMR cluster for the amount of CPU and memory required for your workloads instead of having extra nodes in your cluster to maximize on-cluster storage. Additionally, you can terminate your Amazon EMR cluster when it is idle to save costs, while your data remains in Amazon S3.

EMRFS is optimized for Hadoop to directly read and write in parallel to Amazon S3 performantly, and can process objects encrypted with Amazon S3 server-side and client-side encryption. EMRFS allows you to use Amazon S3 as your data lake, and Hadoop in Amazon EMR can be used as an elastic query layer.

On-cluster storage with HDFS

Hadoop also includes a distributed storage system, the Hadoop Distributed File System (HDFS), which stores data across local disks of your cluster in large blocks. HDFS has a configurable replication factor (with a default of 3x), giving increased availability and durability. HDFS monitors replication and balances your data across your nodes as nodes fail and new nodes are added.

HDFS is automatically installed with Hadoop on your Amazon EMR cluster, and you can use HDFS along with Amazon S3 to store your input and output data. You can easily encrypt HDFS using an Amazon EMR security configuration. Also, Amazon EMR configures Hadoop to uses HDFS and local disk for intermediate data created during your Hadoop MapReduce jobs, even if your input data is located in Amazon S3.

Big Data Architectural Patterns & Best Practices on AWS

2017 Slide Deck
2018 Slide Deck

Building AWS Glue Spark ETL jobs using Amazon DocumentDB (with MongoDB compatibility) & MongoDB on EC2

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. AWS Glue has native connectors to connect to supported data sources on AWS or elsewhere using JDBC drivers. Additionally, AWS Glue now supports reading and writing to Amazon DocumentDB (with MongoDB compatibility) and MongoDB collections using AWS Glue Spark ETL jobs. This feature enables you to connect and read, transform, and load (write) data from and to Amazon DocumentDB and MongoDB collections into services such as Amazon Simple Storage Service (Amazon S3) and Amazon Redshift for downstream analytics. For more information, see Connection Types and Options for ETL in AWS Glue.This post shows how to build AWS Glue ETL Spark jobs and set up connections with Amazon DocumentDB or MongoDB to read and load data using ConnectionType. The following diagram illustrates the three components of the solution architecture:

The following diagram illustrates the three components of the solution architecture:

Prerequisites

Before getting started, you must complete the following prerequisites:

  1. Create an AWS Identity and Access Management (IAM) user with sufficient permissions to interact with the AWS Management Console. Your IAM permissions must also include access to create IAM roles and policies created by the AWS CloudFormation template provided in this post.
  2. Create an IAM policy for AWS Glue.
  3. Save the following code as DocumentDB-Glue-ETL.py in your S3 bucket.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext, SparkConf
    from awsglue.context import GlueContext
    from awsglue.job import Job
    import time
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    output_path = "s3://<bucket>/<folder>/" + str(time.time()) + "/"
    documentdb_uri = "mongodb://<host name>:27017"
    documentdb_write_uri = "mongodb://<host name>:27017"
    
    read_docdb_options = {
        "uri": documentdb_uri,
        "database": "test",
        "collection": "profiles",
        "username": "<username>",
        "password": "<password>",
        "ssl": "true",
        "ssl.domain_match": "false",
        "partitioner": "MongoSamplePartitioner",
        "partitionerOptions.partitionSizeMB": "10",
        "partitionerOptions.partitionKey": "_id"
    }
    
    write_documentdb_options = {
        "uri": documentdb_write_uri,
        "database": "test",
        "collection": "collection1",
        "username": "<username>",
        "password": "<password>",
        "ssl": "true",
        "ssl.domain_match": "false",
        "partitioner": "MongoSamplePartitioner",
        "partitionerOptions.partitionSizeMB": "10",
        "partitionerOptions.partitionKey": "_id"
    }
    
    # Get DynamicFrame from DocumentDB
    dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb",
                                                                   connection_options=read_docdb_options)
    
    # Write DynamicFrame to DocumentDB
    glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb",
                                                 connection_options=write_documentdb_options)
    
    job.commit()
  1. Save the following code as MongoDB-Glue-ETL.py in your S3 bucket.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext, SparkConf
    from awsglue.context import GlueContext
    from awsglue.job import Job
    import time
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    output_path = "s3://<bucket>/<folder>/" + str(time.time()) + "/"
    mongo_uri = "mongodb://<host name or IP>:27017"
    write_uri = "mongodb://<host name or IP>:27017"
    
    read_mongo_options = {
        "uri": mongo_uri,
        "database": "test",
        "collection": "profiles",
        "username": "<username>",
        "password": "<password>",
        "partitioner": "MongoSamplePartitioner",
        "partitionerOptions.partitionSizeMB": "10",
        "partitionerOptions.partitionKey": "_id"}
    
    write_mongo_options = {
        "uri": write_uri,
        "database": "test",
        "collection": "collection1",
        "username": "<username>",
        "password": "<password>"
    }
    
    
    # Get DynamicFrame from MongoDB
    dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb",
                                                                  connection_options=read_mongo_options)
    # Write DynamicFrame to MongoDB 
    glueContext.write_dynamic_frame.from_options(dynamic_frame, connection_type="mongodb", connection_options=write_mongo_options)
    
    job.commit()

Provisioning resources with AWS CloudFormation

For this post, we provide CloudFormation templates for you to review and customize to your needs. Some of the resources deployed by this stack incur costs as long as they remain in use, such as Amazon DocumentDB and Amazon EC2.

For instructions on launching your stacks, see Launching an Amazon DocumentDB AWS CloudFormation Stack and MongoDB on the AWS Cloud: Quick Start Reference Deployment.

The Amazon DocumentDB stack creation can take up to 15 minutes, and MongoDB stack creation can take up 60 minutes.

When stack creation is complete, go to the Outputs tab for the stack on the AWS CloudFormation console and note down the following values (you use these in later steps):

  • DocumentDB CloudFormation – ClusterEndpoint and ClusterPort
  • MongoDB CloudFormation – PrimaryReplicaNodeIp

Preparing your collection

When the CloudFormation stack is complete, use an EC2 instance to connect to your Amazon DocumentDB cluster. For instructions, see Install the mongo shellConnect to your Amazon DocumentDB cluster, and Insert and query data.

For instructions on accessing Amazon DocumentDB from Amazon EC2 in the same VPC, see Connect Using Amazon EC2.

For more information about MongoDB, see Connect to MongoDB nodes and Testing MongoDB.

Before creating your AWS Glue ETL job, use the mongo shell to insert a few entries into a collection titled profiles. See the following code:

s0:PRIMARY> use test
s0:PRIMARY> db.profiles.insertMany([
            { "_id" : 1, "name" : "Matt", "status": "active", "level": 12, "score":202},
            { "_id" : 2, "name" : "Frank", "status": "inactive", "level": 2, "score":9},
            { "_id" : 3, "name" : "Karen", "status": "active", "level": 7, "score":87},
            { "_id" : 4, "name" : "Katie", "status": "active", "level": 3, "score":27}
            ])

You’re now ready to configure AWS Glue ETL jobs using Amazon DocumentDB and MongoDB ConnectionType.

Setting up AWS Glue connections

You set up two separate connections for Amazon DocumentDB and MongoDB when the databases are in two different VPCs (or if you deployed the databases using the provided CloudFormation template). Complete the following steps for both connections. We first walk you through the Amazon DocumentDB connection.

  1. On the AWS Glue console, under Databases, choose Connections.
  2. Choose Add connection.
  3. For Connection name, enter a name for your connection.
  4. If you have SSL enabled on your Amazon DocumentDB cluster (which is what the CloudFormation template in this post used), select Require SSL connection.
  5. For Connection Type, choose Amazon DocumentDB or MongoDB.
  6. Choose Next.

Choose Next.

  1. For Amazon DocumentDB URL, enter a URL using the output from the CloudFormation stack, such as mongodb://host:port/databasename (use the default port, 27017).
  2. For Username and Password, enter the credentials you entered as parameters when creating the CloudFormation stack.
  3. For VPC, choose the VPC in which you created databases (Amazon DocumentDB and MongoDB).
  4. For Subnet, choose the subnet within your VPC.
  5. For Security groups, select your security group.
  6. Choose Next.

Choose Next.

  1. Review the connection details and choose Finish.

Review the connection details and choose Finish.

Similarly, add the connection for MongoDB with the following changes to the steps:

  • If you used the CloudFormation template in this post, don’t select Require SSL connection for MongoDB
  • For Connection Type, choose MongoDB
  • For MongoDB URL, enter a URL using the output from the CloudFormation stack, such as mongodb://host:port/databasename (use the default port, 27017)

Creating an AWS Glue endpoint, S3 endpoint, and security group

Before testing the connections, make sure you create an AWS Glue endpoint and S3 endpoint in the VPC in which the databases are created. Complete the following steps for both Amazon DocumentDB and MongoDB instances separately:

  1. To create your AWS Glue endpoint, on the Amazon VPC console, choose Endpoints.
  2. Choose Create endpoint.
  3. For Service Name, choose AWS Glue.
  4. Search for and select com.amazonaws.<region>.glue (for example, com.amazonaws.us-west-2.glue). Enter the appropriate Region where the database instance was created.
  5. For VPC, choose the VPC of the Amazon DocumentDB

For VPC, choose the VPC of the Amazon DocumentDB

  1. For Security group, select the security groups of the Amazon DocumentDB cluster.
  2. Choose Create endpoint.

Choose Create endpoint.

  1. To create your S3 endpoint, on the Amazon VPC console, choose Endpoints.
  2. Choose Create endpoint.
  3. For Service Name, choose Amazon S3.
  4. Search for and select com.amazonaws.<region>.s3 (for example, com.amazonaws.us-west-2.s3). Enter the appropriate Region.
  5. For VPC, choose the VPC of the Amazon DocumentDB
  6. For Configure route tables, select the route table ID of the associated subnet of the database.

13. For Configure route tables, select the route table ID of the associated subnet of the database.

  1. Choose Create endpoint.

Choose Create endpoint.

Similarly, add an AWS Glue endpoint and S3 endpoint for MongoDB with the following changes:

  • Choose the VPC of the Amazon MongoDB instance

The Amazon security group must include itself as a source in its inbound rules. Complete the following steps for both Amazon DocumentDB and MongoDB instances separately:

  1. On the Security Groups page, choose Edit Inbound Rules.
  2. Choose Add rule.
  3. For Type, choose All traffic.
  4. For Source, choose the same security group.
  5. Choose Save rules.

Choose Save rules.

The objective of setting up a connection is to establish private connections between the Amazon DocumentDB and MongoDB instances in the VPC and AWS Glue via the S3 endpoint, AWS Glue endpoint, and security group. It’s not required to test the connection because that connection is established by the AWS Glue job when you run it. At the time of writing, testing an AWS Glue connection is not supported for Amazon DocumentDB connections.

Code for building the AWS Glue ETL job

The following sample code sets up a read connection with Amazon DocumentDB for your AWS Glue ETL job (PySpark):

read_docdb_options = {
    "uri": documentdb_uri,
    "database": "test",
    "collection": "profiles",
    "username": "<username>",
    "password": "<password>",
    "ssl": "true",
    "ssl.domain_match": "false",
    "partitioner": "MongoSamplePartitioner",
    "partitionerOptions.partitionSizeMB": "10",
    "partitionerOptions.partitionKey": "_id"
}

The following sample code sets up a write connection with Amazon DocumentDB for your AWS Glue ETL job (PySpark):

write_documentdb_options = {
    "uri": documentdb_write_uri,
    "database": "test",
    "collection": "collection1",
    "username": "<username>",
    "password": "<password>",
    "ssl": "true",
    "ssl.domain_match": "false",
    "partitioner": "MongoSamplePartitioner",
    "partitionerOptions.partitionSizeMB": "10",
    "partitionerOptions.partitionKey": "_id"
}

The following sample code creates an AWS Glue DynamicFrame by using the read and write connections for your AWS Glue ETL job (PySpark):

# Get DynamicFrame from DocumentDB
dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb",
                                                               connection_options=read_docdb_options)

# Write DynamicFrame to DocumentDB
glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb",
                                             connection_options=write_documentdb_options)

Setting up AWS Glue ETL jobs

You’re now ready to set up your ETL job in AWS Glue. Complete the following steps for both Amazon DocumentDB and MongoDB instances separately:

  1. On the AWS Glue console, under ETL, choose Jobs.
  2. Choose Add job.
  3. For Job Name, enter a name.
  4. For IAM role, choose the IAM role you created as a prerequisite.
  5. For Type, choose Spark.
  6. For Glue Version, choose Python (latest version).
  7. For This job runs, choose An existing script that you provide.
  8. Choose the Amazon S3 path where the script (DocumentDB-Glue-ETL.py) is stored.
  9. Under Advanced properties, enable Job bookmark.

Job bookmarks help AWS Glue maintain state information and prevent the reprocessing of old data.

  1. Keep the remaining settings at their defaults and choose Next.
  2. For Connections, choose the Amazon DocumentDB connection you created.
  3. Choose Save job and edit scripts.
  4. Edit the following parameters:
    1. documentdb_uri or mongo_uri
    2. documentdb_write_uri or write_uri
    3. user
    4. password
    5. output_path
  5. Choose Run job.

When the job is finished, validate the data loaded in the collection.

Similarly, add the job for MongoDB with the following changes:

  • Choose the Amazon S3 path where the script (MongoDB-Glue-ETL.py) is stored
  • For Connections, choose the Amazon MongoDB connection you created
  • Change the parameters applicable to MongoDB (mongo_uri and write_uri)

Cleaning up

After you finish, don’t forget to delete the CloudFormation stack, because some of the AWS resources deployed by the stack in this post incur a cost as long as you continue to use them.

You can delete the CloudFormation stack to delete all AWS resources created by the stack.

  1. On the AWS CloudFormation console, on the Stacks page, select the stack to delete. The stack must be currently running.
  2. On the stack details page, choose Delete.
  3. Choose Delete stack when prompted.

Additionally, delete the AWS Glue endpoint, S3 endpoint, AWS Glue connections, and AWS Glue ETL jobs.

Summary

In this post, we showed you how to build AWS Glue ETL Spark jobs and set up connections using ConnectionType for Amazon DocumentDB and MongoDB databases using AWS CloudFormation. You can use this solution to read data from Amazon DocumentDB or MongoDB, and transform it and write to Amazon DocumentDB or MongoDB or other targets like Amazon S3 (using Amazon Athena to query), Amazon Redshift, Amazon DynamoDBAmazon Elasticsearch Service (Amazon ES), and more.

Using Spark SQL for ETL

Original Article by Ben Snively, May 2016

With big data, you deal with many different formats and large volumes of data. SQL-style queries have been around for nearly four decades. Many systems support SQL-style syntax on top of the data layers, and the Hadoop/Spark ecosystem is no exception. This allows companies to try new technologies quickly without learning a new query syntax for basic retrievals, joins, and aggregations.

Amazon EMR is a managed service for the Hadoop and Spark ecosystem that allows customers to quickly focus on the analytics they want to run, not the heavy lifting of cluster management.

In this post, we demonstrate how you can leverage big data platforms and still write queries using a SQL-style syntax over data that is in different data formats within a data lake. We first show how you can use Hue within EMR to perform SQL-style queries quickly on top of Apache Hive. Then we show you how to query the dataset much faster using the Zeppelin web interface on the Spark execution engine. Lastly, we show you how to take the result from a Spark SQL query and store it in Amazon DynamoDB.

Hive and Spark SQL history

For versions <= 1.x, Apache Hive executed native Hadoop MapReduce to run the analytics and often required the interpreter to write multiple jobs that were chained together in phases.  This allowed massive datasets to be queried but was slow due to the overhead of Hadoop MapReduce jobs.

SparkSQL adds this same SQL interface to Spark, just as Hive added to the Hadoop MapReduce capabilities. SparkSQL is built on top of the Spark Core, which leverages in-memory computations and RDDs that allow it to be much faster than Hadoop MapReduce.

Spark integrates easily with many big data repositories.  The following illustration shows some of these integrations.

Using SparkSQL for ETL

In the second part of this post, we walk through a basic example using data sources stored in different formats in Amazon S3. Using a SQL syntax language, we fuse and aggregate the different datasets, and finally load that data into DynamoDB as a full ETL process.

The table below summarizes the datasets used in this post.

Create a table in Hive/Hue

Hive and SparkSQL let you share a metadata catalogue. This allows you to create table definitions one time and use either query execution engine as needed. All table definitions could have been created in either tool exclusively as well.

First, launch an EMR cluster with Hive, Hue, Spark, and Zeppelin configured. It’s recommended that you run a cluster with at least four core nodes if the default instance size is m3.xlarge.

Then launch a Hue browser and navigate to the query section.  To learn how to enable web interface access to Hue, see View Web Interfaces Hosted on Amazon EMR Clusters.

The first table to create is the ratings table.  The table definition specifies the tab-separated values in the ROW FORMAT line below:

CREATE EXTERNAL TABLE IF NOT EXISTS UserMovieRatings (
userId int,
movieId int,
rating int,
unixTimestamp bigint
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION 's3://us-east-1.elasticmapreduce.samples/sparksql/movielens/user-movie-ratings'
SQL

After you create the table, you select the row icon to the left of the table to refresh the table listing on the left side and see sample data.

Next, create the MovieDetails table to query over.  This data has two delimiters: a hash for the columns and a pipe for the elements in the genre array.

CREATE EXTERNAL TABLE IF NOT EXISTS MovieDetails (
movieId int,
title string,
genres array<string>
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '#'
collection items terminated by '|'
STORED AS TEXTFILE
LOCATION 's3://us-east-1.elasticmapreduce.samples/sparksql/movielens/movie-details'
SQL

After you create the array, the genres appear in the sample data browser.

Transform the data using SparkSQL/Zeppelin

Now interact with SparkSQL through a Zeppelin UI, but re-use the table definitions you created in the Hive metadata store.   You’ll create another table in SparkSQL later in this post to show how that would have been done there.

Connect to the Zeppelin UI and create a new notebook under the Notebook tab. Query to show the tables. You can see that the two tables you created in Hive are also available in SparkSQL.

%sql SHOW tables
SQL

Using SparkSQL, you can perform the same query as you did in Hive in a previous step.

Note: The last semi-colon at the end of the statement was removed.

%sql SELECT title, count(*) numberOf5Ratings FROM usermovieratings r
JOIN moviedetails d ON (r.movieid = d.movieid)
WHERE rating = 5
GROUP BY title
ORDER BY numberOf5Ratings desc limit 5
SQL

This time, it will usually take less than 30 seconds for SparkSQL to query the data and return the results.  The actual response time depends on the size of the EMR cluster.

Spark lets you leverage an RDD for data that is queried and iterated over.  You can tell Spark to do this with your usermovieratings table, by executing the following command:

%sql cache table usermovieratings
SQL

and:

%sql cache table moviedetails
SQL

Now, execute the query again:

%sql SELECT title, count(*) numberOf5Ratings FROM usermovieratings r
JOIN moviedetails d ON (r.movieid = d.movieid)
WHERE rating = 5
GROUP BY title
ORDER BY numberOf5Ratings desc limit 5
SQL

This time, the query returned within a couple seconds so that analysts can quickly interact with the large data set in the RDD.

Suppose you want the same information as the previous query, but this time broken out by the top five movies for males and the top five for females. To do this, bring in the data set user-details. This data set contains information such as gender and occupation.  This data set is pipe delimited.

%sql CREATE EXTERNAL TABLE IF NOT EXISTS UserDetails (
userId int,
age int,
gender CHAR(1),
occupation string,
zipCode String
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION 's3://us-east-1.elasticmapreduce.samples/sparksql/movielens/user-details'
SQL

This query combines two queries in a union statement.  The first query gets the five top-rated movies for males using all three datasets and then combines the results with the five top-rated movies for females:

%sql SELECT * FROM (SELECT 'M' AS gender, title, count(*) numberOf5Ratings FROM usermovieratings r
JOIN moviedetails d ON (r.movieid = d.movieid)
JOIN userdetails u ON (r.userid = u.userid)
WHERE rating = 5
AND gender = 'M'
GROUP BY title, gender 
ORDER BY numberOf5Ratings desc limit 5) AS m
UNION
SELECT * FROM (SELECT 'F' AS gender, title, count(*) numberOf5Ratings FROM usermovieratings r
JOIN moviedetails d ON (r.movieid = d.movieid)
JOIN userdetails u ON (r.userid = u.userid)
WHERE rating = 5
AND gender = 'F'
GROUP BY title, gender 
ORDER BY numberOf5Ratings desc limit 5) AS f
ORDER BY gender desc, numberof5Ratings desc
SQL

Because the ratings table is still cached in the SparkContext, the query happens quickly (in this case, four seconds).

Load the transformed data into DynamoDB

Next, create a new DynamoDB table that saves the number of ratings that users voted on, per genre and rating number.  To query this, you first need to figure out which movies were voted on. Combine that information with the movie details data and figure out the movie’s genres to know how are users voting per genre.

The following SQL statement queries for that information and returns the counts:

SELECT genre, rating, count(*)  ratingCount
FROM UserMovieRatings r
JOIN (SELECT movieid, title, explode(genres) AS genre FROM moviedetails) m
ON (r.movieid = m.movieid)
GROUP BY genre, rating
SQL

Notice that you are exploding the genre list in the moviedetails table, because that column type is the list of genres for a single movie.

Create a new DynamoDB table to store the results of the SQL query in the same region in which you are running. In this post, we use us-east-1. Use the following settings:

Note: Change the type for the range key, because the code below stores the rating as a number.

Next, SSH to the master node for the EMR cluster.  Here’s how to use the EMR-DDB connector in conjunction with SparkSQL to store data in DynamoDB.

Start a Spark shell, using the EMR-DDB connector JAR file name:

spark-shell --jars /usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar
SQL

To learn how this works, see the Analyze Your Data on Amazon DynamoDB with Apache Spark blog post.

Paste this code into the Spark shell prompt:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import com.amazonaws.services.dynamodbv2.model.AttributeValue
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io.LongWritable
import java.util.HashMap
 
var ddbConf = new JobConf(sc.hadoopConfiguration)
ddbConf.set("dynamodb.output.tableName", "GenreRatingCounts")
ddbConf.set("dynamodb.throughput.write.percent", "0.5")
ddbConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
ddbConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")

 
var genreRatingsCount = sqlContext.sql("select genre, rating, count(*)  ratingCount from UserMovieRatings r join (select movieid, title, explode(genres) as genre from moviedetails) m on (r.movieid = m.movieid) group by genre, rating order by genre, rating")

var ddbInsertFormattedRDD = genreRatingsCount.map(a => {
var ddbMap = new HashMap[String, AttributeValue]()

var catValue = new AttributeValue()
catValue.setS(a.get(0).toString)
ddbMap.put("genre", catValue)

var ratingValue = new AttributeValue()
ratingValue.setN(a.get(1).toString)
ddbMap.put("rating", ratingValue)

var countValue = new AttributeValue()
countValue.setN(a.get(2).toString)
ddbMap.put("count", countValue)
var item = new DynamoDBItemWritable()
item.setItem(ddbMap)

(new Text(""), item)
}
)

ddbInsertFormattedRDD.saveAsHadoopDataset(ddbConf)
SQL

After you run the code, notice that the DynamoDB table now has 95 entries which contain the rating and the number of ratings per genre.

The ddbConf defines the Hadoop configuration that allows Spark to use a custom Hadoop input/output for reading and writing the RDD being created.

The next major piece of code executes the SparkSQL statement.  At this point, query the different datasets in S3 to get the data to store in DynamoDB.

var genreRatingsCount = sqlContext.sql("select genre, rating, count(*)  ratingCount from UserMovieRatings r join (select movieid, title, explode(genres) as genre from moviedetails) m on (r.movieid = m.movieid) group by genre, rating order by genre, rating")
SQL

The query result is stored in a Spark DataFrame that you can use in your code.

After you have the DataFrame, perform a transformation to have an RDD that matches the types that the DynamoDB custom output format knows how to write.  The custom output format expects a tuple containing the Text and DynamoDBItemWritable types.

Create a new RDD with those types in it, in the following map call:

var ddbInsertFormattedRDD = genreRatingsCount.map(a => {
var ddbMap = new HashMap[String, AttributeValue]()
<Lines omitted, complete version is above…>
var item = new DynamoDBItemWritable()
item.setItem(ddbMap)
(new Text(""), item)
}
)
SQL

The ddbInsertFormattedRDD now contains elements that look like this for the DynamoDBItemWritable element in the tuple:

{count={N: 4049,}, category={S: Action,}, rating={N: 3,}}
{count={N: 5560,}, category={S: Action,}, rating={N: 4,}}
{count={N: 3718,}, category={S: Action,}, rating={N: 5,}}
{count={N: 654,}, category={S: Adventure,}, rating={N: 1,}}
{count={N: 1126,}, category={S: Adventure,}, rating={N: 2,}}

This last call uses the job configuration that defines the EMR-DDB connector to write out the new RDD you created in the expected format:

ddbInsertFormattedRDD.saveAsHadoopDataset(ddbConf)
SQL

Conclusion

EMR makes it easy to run SQL-style analytics in both Spark and Hive. As this post has shown, connectors within EMR and the open source community let you easily talk to many data sources, including DynamoDB. Rather than focusing on standing up the software and managing the cluster, with EMR you can quickly process and analyze your data and store the results in destinations such as NoSQL repositories and data warehouses.

If you have a question or suggestion, please leave a comment below.

————————————-

Related

Querying Amazon Kinesis Streams Directly with SQL and Spark Streaming

Back To Top