Spark - Query JSON data with SQL

Friday Sep 18, 2020

Introduction

I created a statically generated site to display some charts from the US CFPB complaint database at https://cfpb.saisols.com/

To generate this site, I use:

The data load takes forever in PGSQL. Now, I know that I can tune PG to do this faster, but I thought I would learn a about Spark and try generating the site that way.

Step 1 - Install Scala and Spark

First, we install our tools. I use SDKMAN to manage Java and Scala (and other tools) - go here or use your favorite method to install Java and Scala. Check them with:

13:58 $ scala -version
Scala code runner version 2.13.3 -- Copyright 2002-2020, LAMP/EPFL and Lightbend, Inc.
13:58 $ java -version
openjdk version "1.8.0_212"
OpenJDK Runtime Environment Corretto-8.212.04.1 (build 1.8.0_212-b04)
OpenJDK 64-Bit Server VM Corretto-8.212.04.1 (build 25.212-b04, mixed mode)

Next, download and install Spark from https://spark.apache.org/downloads.html (I got spark-3.0.1-bin-hadoop2.7.tgz)

Now, untar it somewhere and cd into the spark directory:

cd
mkdir spark
cd spark
tar -zxf ~/Downloads/spark*.tgz
cd spark-*

Now, check it by running ./bin/spark-shell:

./bin/spark-shell 
20/09/18 14:07:16 WARN Utils: Your hostname, Mikes-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.3.107 instead (on interface en0)
20/09/18 14:07:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/09/18 14:07:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.3.107:4040
Spark context available as 'sc' (master = local[*], app id = local-1600452458772).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.1
      /_/
         
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

Spark provides us a sample data directory, so let’s look at a file in there:

cat data/graphx/followers.txt 
2 1
4 1
1 2
6 3
7 3
7 6
6 7
3 7

A file with 8 records, perfect. Let’s read it:

scala> var tf = sc.textFile("data/graphx/followers.txt")
res7: org.apache.spark.rdd.RDD[String] = data/graphx/followers.txt MapPartitionsRDD[1] at textFile at <console>:25

scala> tf.toString()
res11: String = data/graphx/followers.txt MapPartitionsRDD[3] at textFile at <console>:24

scala> tf.count()
res12: Long = 8

scala> tf.collect()
res13: Array[String] = Array(2 1, 4 1, 1 2, 6 3, 7 3, 7 6, 6 7, 3 7)

scala> tf.distinct().collect()
res17: Array[String] = Array(7 3, 3 7, 1 2, 7 6, 4 1, 6 3, 2 1, 6 7)

CTRL-D to exit

There are all kinds of methods we can use on an RDD object (that’s what textFile returns), see here: https://spark.apache.org/docs/0.6.2/api/core/spark/RDD.html

Now that we have the plain text files loaded and are able to run some queries on them, let’s look at loading the large CFPB file into Spark and querying it.

Step 2 - Load JSON file from CFPB into Spark

Now, back in a regular shell in the spark “root” directory.

Step 2 is to get the huge JSON file that is the CFPB database. I found it here:

https://www.consumerfinance.gov/data-research/consumer-complaints/#download-the-data

I put it into my Spark root directory like so:

curl https://files.consumerfinance.gov/ccdb/complaints.json.zip >complaints.json.zip

Then, unzip it to keep things simple:

unzip complaints.json.zip && ls -alh complaints.json
...
-rw-r--r--@ 1 mike  staff   1.6G Sep 18 04:22 complaints.json

How many rows?

wc -l *.json
 1760019 complaints.json

Now we have a 1.6 GB complaints.json file with 1.7+ million rows, let’s take a look at the first few rows:

head -3 complaints.json
[
{"date_received": "2019-05-28", "product": "Credit reporting, credit repair services, or other personal consumer reports", "sub_product": "Credit reporting", "issue": "Problem with a credit reporting company's investigation into an existing problem", "sub_issue": "Their investigation did not fix an error on your report", "complaint_what_happened": "Although I am checking for and addressing missing and or deficient aspects of REPORTING COMPLIANCES and not contesting any debt of compliant nature, I should make you aware that since unlawful reporting transitions collection into an equally not complaint circumstance. Being still yet not validated by document fact in compliance to requisite standards, it is to be announced yet again that legally I have no knowledge of the validity of the alleged claims of delinquency and or derogatory nature, nor of the certifiably compliant matter to either any of its collection attempts and or its reporting despite previous consumer filed composed complaints checking for each. Might it be known, especially shall I elect to take this matter up to a civil court, any debt and or derogatory claim must be pursued ( particularly for collection ) in a very defined and precisely compliant and physically verifiable or certifiable manner as detailed in the requisite obeyed federal and state collection and reporting regulations associated with any of the above noted said claim ( s ) to include but not limited to the FCBA, FCRA, HIPAA PRIVACY RULE, FACTA,  FDCPA and TCPA, etc. ADDITIONALLY, if an entity acts as a collector and also elects to act as a reporting party of consumer credit they must as well adhere to every single one even each any and all of the regulatory reporting requisites and standards of reporting with legal standing in full accordance of laws and accepted reporting standards. To date, the plaintiff has failed to demonstrate any capacity or willingness to validate the alleged debt much less certify the fair, accurate, complete and compliant reporting of the claims, particularly being significantly deficient is any display of certified metro 2 compliance. As such, given the fact of recent breaches of information collection", "company_public_response": "", "company": "EQUIFAX, INC.", "state": "GA", "zip_code": "310XX", "tags": "", "consumer_consent_provided": "Consent provided", "submitted_via": "Web", "date_sent_to_company": "2019-05-28", "company_response": "Closed with explanation", "timely": "Yes", "consumer_disputed": "N/A", "complaint_id": "3255189"},
{"date_received": "2019-09-24", "product": "Debt collection", "sub_product": "I do not know", "issue": "Attempts to collect debt not owed", "sub_issue": "Debt is not yours", "complaint_what_happened": "transworld systems inc. \nis trying to collect a debt that is not mine, not owed and is inaccurate.", "company_public_response": "", "company": "TRANSWORLD SYSTEMS INC", "state": "FL", "zip_code": "335XX", "tags": "", "consumer_consent_provided": "Consent provided", "submitted_via": "Web", "date_sent_to_company": "2019-09-24", "company_response": "Closed with explanation", "timely": "Yes", "consumer_disputed": "N/A", "complaint_id": "3384392"},

Looks like we have a JSON array with one record per line. This isn’t exactly what Spark wants, the array notation will give us issues: see https://jsonlines.org/examples/ but we’ll try it anyway.

First, get back into a spark shell (./bin/spark-shell):

scala> val path = "complaints.json"
scala> val complaintsDF = spark.read.json(path)

complaintsDF: org.apache.spark.sql.DataFrame = [_corrupt_record: string, company: string ... 17 more fields]

Looks like it’s complaining about corrupt records - we can fix that by removing the array notation in the file and just leaving one json record per line (in other words, delete the first and last lines of the file: [ and ])

Let’s see how many records we have:

scala> complaintsDF.count()
res0: Long = 1760020   

That matches our line count (+1 but remember we have errors…) - let’s check out our schema:

scala> complaintsDF.printSchema()
root
 |-- _corrupt_record: string (nullable = true)
 |-- company: string (nullable = true)
 |-- company_public_response: string (nullable = true)
 |-- company_response: string (nullable = true)
 |-- complaint_id: string (nullable = true)
 |-- complaint_what_happened: string (nullable = true)
 |-- consumer_consent_provided: string (nullable = true)
 |-- consumer_disputed: string (nullable = true)
 |-- date_received: string (nullable = true)
 |-- date_sent_to_company: string (nullable = true)
 |-- issue: string (nullable = true)
 |-- product: string (nullable = true)
 |-- state: string (nullable = true)
 |-- sub_issue: string (nullable = true)
 |-- sub_product: string (nullable = true)
 |-- submitted_via: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- timely: string (nullable = true)
 |-- zip_code: string (nullable = true)

So, we have a bunch of data frames that we can query, but wouldn’t it be nice if we could just use SQL. That’s what we were doing with Postgres’ SQL JSON data types in the original version.

Spark allows us to create a database view-like object that we can query with SQL:

complaintsDF.createOrReplaceTempView("complaints")

Now we can write SQL queries against the new “view”:

val data = spark.sql("SELECT company, count(*) FROM complaints group by company order by 2 desc")
scala> data.show()
+--------------------+--------+                                                 
|             company|count(1)|
+--------------------+--------+
|       EQUIFAX, INC.|  194570|
|Experian Informat...|  184280|
|TRANSUNION INTERM...|  178126|
|BANK OF AMERICA, ...|   93587|
|WELLS FARGO & COM...|   80929|
|JPMORGAN CHASE & CO.|   71956|
|      CITIBANK, N.A.|   59769|
|CAPITAL ONE FINAN...|   46627|
|Navient Solutions...|   33039|
|Ocwen Financial C...|   30240|
| SYNCHRONY FINANCIAL|   28271|
| NATIONSTAR MORTGAGE|   22645|
|        U.S. BANCORP|   21324|
|AMERICAN EXPRESS ...|   17889|
|       PNC Bank N.A.|   16424|
|Ditech Financial LLC|   14949|
|ENCORE CAPITAL GR...|   13813|
|PORTFOLIO RECOVER...|   13783|
|       DISCOVER BANK|   13603|
|TD BANK US HOLDIN...|   12605|
+--------------------+--------+
only showing top 20 rows