Scaling relational databases with Apache Spark SQL and DataFrames

Wrangle, aggregate, and filter data at scale using your friendly SQL with a twist.
226 readers like this.
computer servers processing data

Opensource.com

Relational databases are here to stay, regardless of the hype and the advent of newer databases often called NoSQL databases. The simple reason is that relational databases enforce essential structure, constraints, and provide a nice, declarative language to query data (which we love): SQL!

However, scale has always been a problem with relational databases. Most enterprises in the 21st century are loaded with rich data stores and repositories and want to take maximum advantage of their Big Data for actionable insights. Relational databases might be popular, but they don't scale very well unless we invest in a proper Big Data management strategy. This involves thinking about potential data sources, data volumes, constraints, schemas, ETL (extract-transform-load), access and querying patterns, and much more!

 

This article will cover some excellent advances made for leveraging the power of relational databases, but "at scale," using some of the newer components from Apache Spark—Spark SQL and DataFrames. Most notably, we will cover the following topics.

  1. Motivations and challenges with scaling relational databases
  2. Understanding Spark SQL & DataFrames
    • Goals
    • Architecture and features
    • Performance
  3. The second article in this series presents a real-world case study/tutorial on Spark SQL with hands-on examples

We will be looking at the major challenges and motivations for people working so hard and investing time in building new components in Apache Spark so we can perform SQL at scale. We will also examine the major architecture, interfaces, features, and performance benchmarks for Spark SQL and DataFrames. Last, but most importantly, we will cover a real-world case study on analyzing intrusion attacks based on KDD 99 Cup Data using Spark SQL and DataFrames by leveraging Databricks Cloud Platform for Spark.

Motivations and challenges on scaling relational databases for Big Data

Relational data stores are easy to build and query. Also, users and developers often prefer writing easy-to-interpret, declarative queries in a human-like readable language such as SQL. However, as data starts increasing in volume and variety, the relational approach does not scale well enough for building Big Data applications and analytical systems. Following are some major challenges.

  • Dealing with different types and sources of data, which can be structured, semi-structured, and unstructured
  • Building ETL pipelines to and from various data sources, which may lead to developing a lot of specific custom code, thereby increasing technical debt over time
  • Having the capability to perform both traditional business intelligence (BI)-based analytics and advanced analytics (machine learning, statistical modeling, etc.), the latter of which is definitely challenging to perform in relational systems

Big Data analytics is not something that was just invented yesterday! We have had success in this domain with Hadoop and the MapReduce paradigm. This was powerful, but often slow, and gave users a low-level, procedural programming interface that required people to write a lot of code for even very simple data transformations. However, once Spark was released, it really revolutionized the way Big Data analytics was done with a focus on in-memory computing, fault tolerance, high-level abstractions, and ease of use.

 

Hadoop Map Reduce vs. Spark

From then, several frameworks and systems like Hive, Pig, and Shark (which evolved into Spark SQL) provided rich relational interfaces and declarative querying mechanisms to Big Data stores. The challenge remained that these tools were either relational or procedural-based, and we couldn't have the best of both worlds.

However, in the real world, most data and analytical pipelines might involve a combination of relational and procedural code. Forcing users to choose either one ends up complicating things and increasing user efforts in developing, building, and maintaining different applications and systems. Apache Spark SQL builds on the previously mentioned SQL-on-Spark effort called Shark. Instead of forcing users to pick between a relational or a procedural API, Spark SQL tries to enable users to seamlessly intermix the two and perform data querying, retrieval, and analysis at scale on Big Data.

Understanding Spark SQL and DataFrames

Spark SQL essentially tries to bridge the gap between the two models we mentioned previously—the relational and procedural models—with two major components.

  • Spark SQL provides a DataFrame API that can perform relational operations on both external data sources and Spark's built-in distributed collections—at scale!
  • To support a wide variety of diverse data sources and algorithms in Big Data, Spark SQL introduces a novel extensible optimizer called Catalyst, which makes it easy to add data sources, optimization rules, and data types for advanced analytics such as machine learning.

Essentially, Spark SQL leverages the power of Spark to perform distributed, robust, in-memory computations at massive scale on Big Data. Spark SQL provides state-of-the-art SQL performance and also maintains compatibility with all existing structures and components supported by Apache Hive (a popular Big Data warehouse framework) including data formats, user-defined functions (UDFs), and the metastore. Besides this, it also helps in ingesting a wide variety of data formats from Big Data sources and enterprise data warehouses like JSON, Hive, Parquet, and so on, and performing a combination of relational and procedural operations for more complex, advanced analytics.

Goals

Let's look at some of the interesting facts about Spark SQL, including its usage, adoption, and goals, some of which I will shamelessly once again copy from the excellent and original paper on "Relational Data Processing in Spark." Spark SQL was first released in May 2014 and is perhaps now one of the most actively developed components in Spark. Apache Spark is definitely the most active open source project for Big Data processing, with hundreds of contributors.

Besides being an open source project, Spark SQL has started seeing mainstream industry adoption. It has already been deployed in very large-scale environments. Facebook has an excellent case study about "Apache Spark @Scale: A 60 TB+ production use case." The company was doing data preparation for entity ranking, and its Hive jobs used to take several days and had many challenges, but Facebook was successfully able to scale and increase performance using Spark. Check out the interesting challenges they faced on this journey!

Another interesting fact is that two-thirds of Databricks Cloud (a hosted service running Spark) customers use Spark SQL within other programming languages. We will also showcase a hands-on case study using Spark SQL on Databricks in part two of this series.

The major goals for Spark SQL, as defined by its creators, are:

  1. Support relational processing, both within Spark programs (on native RDDs) and on external data sources, using a programmer-friendly API
  2. Provide high performance using established DBMS techniques
  3. Easily support new data sources, including semi-structured data and external databases amenable to query federation
  4. Enable extension with advanced analytics algorithms such as graph processing and machine learning

Architecture and features

We will now take a look at the key features and architecture around Spark SQL and DataFrames. Some key concepts to keep in mind here would be around the Spark ecosystem, which has been constantly evolving over time.

 

Spark ecosystem

RDD (Resilient Distributed Dataset) is perhaps the biggest contributor behind all of Spark's success stories. It is basically a data structure, or rather a distributed memory abstraction to be more precise, that allows programmers to perform in-memory computations on large distributed clusters while retaining aspects like fault tolerance. You can also parallelize a lot of computations and transformations and track the whole lineage of transformations, which can help in efficiently recomputing lost data. Spark enthusiasts may wish to read an excellent paper around RDDs, "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing." Also, Spark works with the concept of drivers and workers, as depicted in the following figure.

 

Spark works with drivers and workers

You can typically create an RDD by reading in data from files, databases, parallelizing existing collections, or even transformations. Typically, transformations are operations that can be used to transform the data into different aspects and dimensions, depending on the way we want to wrangle and process the data. They are also lazily evaluated, meaning that, even if you define a transformation, the results are not computed until you apply an action, which typically requires a result to be returned to the driver program (and it computes all applied transformations then!).

 

RDD transformations

Shout out to fellow data scientist and friend Favio Vázquez and his excellent article "Deep Learning With Apache Spark," where I got some excellent ideas and content, including the preceding figure. Check it out!

Now that we know about the general architecture of how Spark works, let's take a closer look into Spark SQL. Typically, Spark SQL runs as a library on top of Spark, as we saw in the figure covering the Spark ecosystem. The following figure gives a more detailed peek into the typical architecture and interfaces of Spark SQL.

 

Spark SQL architecture and interfaces

The figure clearly shows the various SQL interfaces, which can be accessed through JDBC/ODBC or through a command-line console, as well as the DataFrame API integrated into Spark's supported programming languages (we will be using Python). The DataFrame API is very powerful and allows users to finally intermix procedural and relational code! Advanced functions like UDFs (user-defined functions) can also be exposed in SQL, which can be used by BI tools.

Spark DataFrames are very interesting and help us leverage the power of Spark SQL and combine its procedural paradigms as needed. A Spark DataFrame is basically a distributed collection of rows (row types) with the same schema. It is basically a Spark Dataset organized into named columns. A point to note here is that Datasets are an extension of the DataFrame API that provides a type-safe, object-oriented programming interface. Hence, they are available only in Java and Scala and we will, therefore, be focusing on DataFrames.

 

DataFrames architecture

A DataFrame is equivalent to a table in a relational database (but with more optimizations under the hood) and can also be manipulated in similar ways to the "native" distributed collections in Spark (RDDs). Spark DataFrames have some interesting properties, some of which are mentioned below.

  1. Unlike RDDs, DataFrames usually keep track of their schema and support various relational operations that lead to a more optimized execution.
  2. DataFrames can be constructed from tables, just like existing Hive tables in your Big Data infrastructure, or even from existing RDDs.
  3. DataFrames can be manipulated with direct SQL queries and also using the DataFrame DSL (domain-specific language), where we can use various relational operators and transformers such as where and groupBy.
  4. Also, each DataFrame can also be viewed as an RDD of row objects, allowing users to call procedural Spark APIs such as map.
  5. Finally, a given, but a point to always remember, unlike traditional dataframe APIs (Pandas), Spark DataFrames are lazy, in that each DataFrame object represents a logical plan to compute a dataset, but no execution occurs until the user calls a special "output operation" such as save.

This should give you enough perspective on Spark SQL, DataFrames, essential features, concepts, architecture, and interfaces. Let's wrap up this section by taking a look at performance benchmarks.

Performance

Releasing a new feature without the right optimizations can be deadly, and the folks who built Spark did tons of performance tests and benchmarking! Let's take a look at some interesting results. The first figure showcasing some results is depicted below.

 

Performance comparisons

In these experiments, they compared the performance of Spark SQL against Shark and Impala using the AMPLab Big Data benchmark, which uses a web analytics workload developed by Pavlo, et al. The benchmark contains four types of queries with different parameters performing scans, aggregation, joins, and a UDF-based MapReduce job. The dataset was 110GB of data after compression using the columnar Parquet format. We see that in all queries, Spark SQL is substantially faster than Shark and generally competitive with Impala. The Catalyst optimizer is responsible for this, which reduces CPU overhead (we will cover this briefly). This feature makes Spark SQL competitive with the C++ and LLVM-based Impala engine in many of these queries. The largest gap from Impala is in Query 3a where Impala chooses a better join plan, because the selectivity of the queries makes one of the tables very small.

The following graphs show some more performance benchmarks for DataFrames and regular Spark APIs and Spark + SQL.

 

Spark DataFrames vs. RDDs and SQL

Spark DataFrames vs. RDDs and SQL

Finally, the following graph shows a nice benchmark result of DataFrames vs. RDDs in different languages, which gives an interesting perspective on how optimized DataFrames can be.

 

Comparing Spark DataFrames and RDDs

Comparing Spark DataFrames and RDDs

Secret to performance: the Catalyst optimizer

Why is Spark SQL so fast and optimized? The reason is because of a new extensible optimizer, Catalyst, based on functional programming constructs in Scala. While we won't go into extensive details about Catalyst here, it is worth a mention since it helps in optimizing DataFrame operations and queries.

 

Catalyst

Catalyst's extensible design has two purposes.

  • Makes it easy to add new optimization techniques and features to Spark SQL, especially to tackle diverse problems around Big Data, semi-structured data, and advanced analytics
  • Ease of being able to extend the optimizer—for example, by adding data source-specific rules that can push filtering or aggregation into external storage systems or support for new data types

Catalyst supports both rule-based and cost-based optimization. While extensible optimizers have been proposed in the past, they have typically required a complex domain-specific language to specify rules. Usually, this leads to having a significant learning curve and maintenance burden. In contrast, Catalyst uses standard features of the Scala programming language, such as pattern-matching, to let developers use the full programming language while still making rules easy to specify.

 

Catalyst architecture

At its core, Catalyst contains a general library for representing trees and applying rules to manipulate them. On top of this framework, it has libraries specific to relational query processing (e.g., expressions, logical query plans), and several sets of rules that handle different phases of query execution: analysis, logical optimization, physical planning, and code generation to compile parts of queries to Java bytecode. Interested in knowing more details about Catalyst and doing a deep-dive? You can check out this excellent "Deep Dive into Spark SQL’s Catalyst Optimizer" from Databricks.

Click through to the second article in this series for a hands-on tutorial based on a real-world dataset to understand at how to use Spark SQL. 


This article originally appeared on Medium's Towards Data Science channel and is republished with permission.


What to read next
Tags
User profile image.
Dipanjan (DJ) Sarkar is a Data Scientist at Red Hat, a published author, consultant and trainer. He has consulted and worked with several startups as well as Fortune 500 companies like Intel. He primarily works on leveraging data science, machine learning and deep learning to build large- scale intelligent systems.

2 Comments

Can you explain this more?

"A point to note here is that Datasets are an extension of the DataFrame API that provides a type-safe, object-oriented programming interface. Hence, they are available only in Java and Scala and we will, therefore, be focusing on DataFrames"

I'm struggling to understand this. If a Dataframe *is* a Dataset[Row], how can the Dataset be an extension of the Dataframe API? It seems to me that the opposite would be true.

Also, what's to the logic for ignoring Datasets because they're only available in Spark's first class languages?

Absolutely, so overall the data structures are kind of similar yet different making it a bit confusing. But if you check the history of the evolution of Spark (https://stackoverflow.com/questions/31508083/difference-between-datafra…), we first had the RDDs and then DataFrames came into the picture in 2013 and then finally Dataset spun off from DataFrames in 2015 as a type-safe version of DFs.

Datasets are pretty good and work quite well in native Spark (leveraging Scala) but since we leverage python in our example, we have to go for Spark DataFrames. Traditionally though Datasets have always been slightly slower than DataFrames but their performance is catching up (https://databricks.com/session/demystifying-dataframe-and-dataset). Hope this helps!

Creative Commons LicenseThis work is licensed under a Creative Commons Attribution-Share Alike 4.0 International License.