Analyzing NYC motor vehicle data in Spark

A while back I wrote about analyzing NYC’s traffic (motor vehicle) data in q/kdb+. Then, soon afterwards, I showed how to analyze that data in python using pandas library. Now, I would like to again analyze the same dataset but this time, in Apache Spark. As I mentioned in my last post, I am currently learning Spark so you will be seeing a lot more posts about it in the near future.

If you don’t have Spark installed, please see my previous post on how to set it up on AWS.

In this post, I will show you how to :

  • Load data from a csv
  • Transform dataframe
  • Aggregating data
  • Sorting data
  • Filter data

All types of sex problems especially lack of viagra ordering djpaulkom.tv sexual urge in you. This enzyme is named as canadian pharmacy levitra find this phosphodiesterase type-5. You can use this herbal supplement together with Vital shop cialis you could try this out M-40 capsule to increase energy levels, power, vigor and vitality to last longer and satisfy her fully controlling the PE. This class of natural herbal sexual stimulant pills are commonly referred to as natural herbal commander cialis .

Getting the data

Download the csv file to your local directory

spark:~$ wget https://nycopendata.socrata.com/api/views/h9gi-nx95/rows.csv

Starting a Spark session

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('enlistq').getOrCreate()

Loading data from csv into a spark dataframe

In [2]:
df = spark.read.csv('rows.csv', header=True, inferSchema=True)

Once we have data loaded into a dataframe, we can do a lot of things with it. First of all, let's take a look at the schema. We can see that we have a dataframe with many columns and of different data types such as string, double, integer etc.

In [3]:
df.printSchema()
root
 |-- DATE: string (nullable = true)
 |-- TIME: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- ZIP CODE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- ON STREET NAME: string (nullable = true)
 |-- CROSS STREET NAME: string (nullable = true)
 |-- OFF STREET NAME: string (nullable = true)
 |-- NUMBER OF PERSONS INJURED: string (nullable = true)
 |-- NUMBER OF PERSONS KILLED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS INJURED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS KILLED: integer (nullable = true)
 |-- NUMBER OF CYCLIST INJURED: integer (nullable = true)
 |-- NUMBER OF CYCLIST KILLED: string (nullable = true)
 |-- NUMBER OF MOTORIST INJURED: string (nullable = true)
 |-- NUMBER OF MOTORIST KILLED: integer (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 2: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 3: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 4: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 5: string (nullable = true)
 |-- UNIQUE KEY: integer (nullable = true)
 |-- VEHICLE TYPE CODE 1: string (nullable = true)
 |-- VEHICLE TYPE CODE 2: string (nullable = true)
 |-- VEHICLE TYPE CODE 3: string (nullable = true)
 |-- VEHICLE TYPE CODE 4: string (nullable = true)
 |-- VEHICLE TYPE CODE 5: string (nullable = true)

Transforming dataframe

To make it easy to see the entire dataframe and work with it, we will only select few columns.

In [4]:
df_new = df.select(['DATE', 'BOROUGH',  'NUMBER OF PERSONS INJURED', 'NUMBER OF PERSONS KILLED',
                    'NUMBER OF PEDESTRIANS INJURED', 'NUMBER OF PEDESTRIANS KILLED'])
In [5]:
df_new.printSchema()
root
 |-- DATE: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- NUMBER OF PERSONS INJURED: string (nullable = true)
 |-- NUMBER OF PERSONS KILLED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS INJURED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS KILLED: integer (nullable = true)

Our new dataframe, df_new, has 6 columns. Besides DATE and BOROUGH columns, all other columns have really long names. Let's rename those columns to something more precise.

In [6]:
df_new = df_new.withColumnRenamed('NUMBER OF PERSONS INJURED', 'PERSONS INJURED') \
                .withColumnRenamed('NUMBER OF PERSONS KILLED', 'PERSONS KILLED') \
                .withColumnRenamed('NUMBER OF PEDESTRIANS INJURED', 'PEDESTRIANS INJURED') \
                .withColumnRenamed('NUMBER OF PEDESTRIANS KILLED', 'PEDESTRIANS KILLED')
In [7]:
df_new.show()
+----------+---------+---------------+--------------+-------------------+------------------+
|      DATE|  BOROUGH|PERSONS INJURED|PERSONS KILLED|PEDESTRIANS INJURED|PEDESTRIANS KILLED|
+----------+---------+---------------+--------------+-------------------+------------------+
|12/12/2017| BROOKLYN|              0|             0|                  0|                 0|
|12/12/2017|MANHATTAN|              0|             0|                  0|                 0|
|12/12/2017|   QUEENS|              0|             0|                  0|                 0|
|12/12/2017|     null|              0|             0|                  0|                 0|
|12/12/2017|     null|              0|             0|                  0|                 0|
|12/12/2017|     null|              0|             0|                  0|                 0|
|12/12/2017|     null|              0|             0|                  0|                 0|
|12/12/2017|     null|              0|             0|                  0|                 0|
|12/12/2017|   QUEENS|              3|             0|                  0|                 0|
|12/12/2017|     null|              0|             0|                  0|                 0|
|12/12/2017|   QUEENS|              0|             0|                  0|                 0|
|12/12/2017|     null|              0|             0|                  0|                 0|
|12/12/2017|MANHATTAN|              0|             0|                  0|                 0|
|12/12/2017|     null|              0|             0|                  0|                 0|
|12/12/2017|    BRONX|              0|             0|                  0|                 0|
|12/12/2017|    BRONX|              0|             0|                  0|                 0|
|12/12/2017|    BRONX|              0|             0|                  0|                 0|
|12/12/2017| BROOKLYN|              0|             0|                  0|                 0|
|12/12/2017| BROOKLYN|              0|             0|                  0|                 0|
|12/12/2017|MANHATTAN|              0|             0|                  0|                 0|
+----------+---------+---------------+--------------+-------------------+------------------+
only showing top 20 rows

The very first thing that catches my attention is the null values in the BOROUGH column. Let's remove all null values from our dataframe.

In [8]:
df_new = df_new.na.drop()
df_new.show()
+----------+---------+---------------+--------------+-------------------+------------------+
|      DATE|  BOROUGH|PERSONS INJURED|PERSONS KILLED|PEDESTRIANS INJURED|PEDESTRIANS KILLED|
+----------+---------+---------------+--------------+-------------------+------------------+
|12/12/2017| BROOKLYN|              0|             0|                  0|                 0|
|12/12/2017|MANHATTAN|              0|             0|                  0|                 0|
|12/12/2017|   QUEENS|              0|             0|                  0|                 0|
|12/12/2017|   QUEENS|              3|             0|                  0|                 0|
|12/12/2017|   QUEENS|              0|             0|                  0|                 0|
|12/12/2017|MANHATTAN|              0|             0|                  0|                 0|
|12/12/2017|    BRONX|              0|             0|                  0|                 0|
|12/12/2017|    BRONX|              0|             0|                  0|                 0|
|12/12/2017|    BRONX|              0|             0|                  0|                 0|
|12/12/2017| BROOKLYN|              0|             0|                  0|                 0|
|12/12/2017| BROOKLYN|              0|             0|                  0|                 0|
|12/12/2017|MANHATTAN|              0|             0|                  0|                 0|
|12/12/2017|   QUEENS|              0|             0|                  0|                 0|
|12/12/2017|MANHATTAN|              0|             0|                  0|                 0|
|12/12/2017| BROOKLYN|              1|             0|                  0|                 0|
|12/12/2017|   QUEENS|              2|             0|                  0|                 0|
|12/12/2017|   QUEENS|              0|             0|                  0|                 0|
|12/12/2017|   QUEENS|              0|             0|                  0|                 0|
|12/12/2017|   QUEENS|              2|             0|                  0|                 0|
|12/12/2017| BROOKLYN|              0|             0|                  0|                 0|
+----------+---------+---------------+--------------+-------------------+------------------+
only showing top 20 rows

We can also add new columns to our dataframe.

In [9]:
# Add a new column which gives total number of PERSONS KILLED and PERSONS INJURED
df_new.withColumn('PERSONS KILLED OR INJURED', \
                  (df_new['PERSONS KILLED']+df_new['PERSONS INJURED'])).select(['BOROUGH',
                                                                                'PERSONS KILLED', 
                                                                                'PERSONS INJURED', 
                                                                                'PERSONS KILLED OR INJURED']).show()
+---------+--------------+---------------+-------------------------+
|  BOROUGH|PERSONS KILLED|PERSONS INJURED|PERSONS KILLED OR INJURED|
+---------+--------------+---------------+-------------------------+
| BROOKLYN|             0|              0|                      0.0|
|MANHATTAN|             0|              0|                      0.0|
|   QUEENS|             0|              0|                      0.0|
|   QUEENS|             0|              3|                      3.0|
|   QUEENS|             0|              0|                      0.0|
|MANHATTAN|             0|              0|                      0.0|
|    BRONX|             0|              0|                      0.0|
|    BRONX|             0|              0|                      0.0|
|    BRONX|             0|              0|                      0.0|
| BROOKLYN|             0|              0|                      0.0|
| BROOKLYN|             0|              0|                      0.0|
|MANHATTAN|             0|              0|                      0.0|
|   QUEENS|             0|              0|                      0.0|
|MANHATTAN|             0|              0|                      0.0|
| BROOKLYN|             0|              1|                      1.0|
|   QUEENS|             0|              2|                      2.0|
|   QUEENS|             0|              0|                      0.0|
|   QUEENS|             0|              0|                      0.0|
|   QUEENS|             0|              2|                      2.0|
| BROOKLYN|             0|              0|                      0.0|
+---------+--------------+---------------+-------------------------+
only showing top 20 rows

Aggregation

Now that we have removed all rows with null values, I would like to know how many distinct values of BOROUGH we have in our dataset. NYC has 5 boroughs so if this dataset covers all boroughs, we should see 5 distinct values.

In [10]:
df_new.select(['BOROUGH']).distinct().show()
+-------------+
|      BOROUGH|
+-------------+
|       QUEENS|
|     BROOKLYN|
|        BRONX|
|    MANHATTAN|
|STATEN ISLAND|
+-------------+

I would also like to see the total number of people killed by BOROUGH.

In [11]:
df_new.groupBy(['BOROUGH']).agg({'PERSONS KILLED': 'sum'}).show()
+-------------+-------------------+
|      BOROUGH|sum(PERSONS KILLED)|
+-------------+-------------------+
|       QUEENS|                266|
|     BROOKLYN|                301|
|        BRONX|                130|
|    MANHATTAN|                181|
|STATEN ISLAND|                 53|
+-------------+-------------------+

Similarly, I can also do another aggregation together with 'sum'.

In [12]:
df_new.groupBy(['BOROUGH']).agg({'PERSONS KILLED': 'sum', 'PERSONS INJURED': 'sum'}).show()
+-------------+--------------------+-------------------+
|      BOROUGH|sum(PERSONS INJURED)|sum(PERSONS KILLED)|
+-------------+--------------------+-------------------+
|       QUEENS|             57016.0|                266|
|     BROOKLYN|             75586.0|                301|
|        BRONX|             32108.0|                130|
|    MANHATTAN|             36374.0|                181|
|STATEN ISLAND|              9418.0|                 53|
+-------------+--------------------+-------------------+

Here is an example of some other aggregations we can do.

In [13]:
df_new.groupBy(['BOROUGH']).agg({'PERSONS KILLED': 'mean', 
                                 'PERSONS INJURED': 'min',
                                'PEDESTRIANS INJURED': 'max',
                                'PEDESTRIANS KILLED': 'count'}).show()
+-------------+--------------------+--------------------+------------------------+-------------------------+
|      BOROUGH|min(PERSONS INJURED)| avg(PERSONS KILLED)|max(PEDESTRIANS INJURED)|count(PEDESTRIANS KILLED)|
+-------------+--------------------+--------------------+------------------------+-------------------------+
|       QUEENS|                   0|0.001211133320281...|                      15|                   219629|
|     BROOKLYN|                   0|0.001163428907149...|                       9|                   258718|
|        BRONX|                   0|0.001162406001591602|                       9|                   111837|
|    MANHATTAN|                   0|  8.5115587908884E-4|                      28|                   212652|
|STATEN ISLAND|                   0|0.001368165625484...|                       6|                    38738|
+-------------+--------------------+--------------------+------------------------+-------------------------+

Let's plot total number of people killed in each BOROUGH.

In [14]:
total_people_killed = df_new.groupBy(['BOROUGH']).agg({'PERSONS KILLED': \
                                                       'sum'}).withColumnRenamed('sum(PERSONS KILLED)', 
                                                                                 'total_people_killed')
total_people_killed.show()
+-------------+-------------------+
|      BOROUGH|total_people_killed|
+-------------+-------------------+
|       QUEENS|                266|
|     BROOKLYN|                301|
|        BRONX|                130|
|    MANHATTAN|                181|
|STATEN ISLAND|                 53|
+-------------+-------------------+

In [15]:
# We can convert our Spark dataframe to a pandas dataframe and then plot it
import pandas as pd
people_killed = total_people_killed.toPandas()
In [16]:
%matplotlib inline
import matplotlib.pyplot as plt
x = people_killed['BOROUGH']
y = people_killed['total_people_killed']
plt.bar(x,y);

Sorting

Let's see how we can sort our data

In [17]:
# Sort data by PEDESTRINS KILLED column in descending order
df_new.sort('PEDESTRIANS KILLED', ascending=False).show()
+----------+-------------+---------------+--------------+-------------------+------------------+
|      DATE|      BOROUGH|PERSONS INJURED|PERSONS KILLED|PEDESTRIANS INJURED|PEDESTRIANS KILLED|
+----------+-------------+---------------+--------------+-------------------+------------------+
|10/31/2017|    MANHATTAN|             12|             8|                  8|                 8|
|08/01/2016|     BROOKLYN|              0|             2|                  0|                 2|
|09/18/2017|       QUEENS|             16|             3|                  3|                 2|
|03/20/2015|        BRONX|              1|             2|                  1|                 2|
|05/08/2016|       QUEENS|              0|             2|                  0|                 2|
|10/05/2015|STATEN ISLAND|              0|             2|                  0|                 2|
|11/11/2013|       QUEENS|              1|             2|                  0|                 2|
|12/03/2017|       QUEENS|              4|             1|                  4|                 1|
|11/22/2017|     BROOKLYN|              0|             1|                  0|                 1|
|07/29/2017|    MANHATTAN|              1|             1|                  0|                 1|
|10/03/2017|     BROOKLYN|              0|             1|                  0|                 1|
|11/14/2017|    MANHATTAN|              0|             1|                  0|                 1|
|10/24/2017|        BRONX|              0|             1|                  0|                 1|
|10/17/2017|       QUEENS|              0|             1|                  0|                 1|
|09/30/2017|    MANHATTAN|              0|             1|                  0|                 1|
|09/03/2017|     BROOKLYN|              1|             1|                  0|                 1|
|08/16/2017|    MANHATTAN|              0|             1|                  0|                 1|
|11/10/2017|    MANHATTAN|              0|             1|                  0|                 1|
|10/17/2017|        BRONX|              0|             1|                  0|                 1|
|10/16/2017|     BROOKLYN|              0|             1|                  0|                 1|
+----------+-------------+---------------+--------------+-------------------+------------------+
only showing top 20 rows

In [18]:
# Group data by multiple columns and in different orders (ascending and descending)
df_new.orderBy(['PERSONS INJURED', 'PEDESTRIANS KILLED'],ascending=[0,1]).select(['DATE', 
                                                                                  'BOROUGH',
                                                                                  'PERSONS INJURED', 
                                                                                  'PEDESTRIANS KILLED']).show()
+----------+-------------+---------------+------------------+
|      DATE|      BOROUGH|PERSONS INJURED|PEDESTRIANS KILLED|
+----------+-------------+---------------+------------------+
|02/14/2015|       QUEENS|              9|                 0|
|12/09/2013|        BRONX|              9|                 0|
|12/06/2017|     BROOKLYN|              9|                 0|
|08/28/2017|    MANHATTAN|              9|                 0|
|09/17/2014|     BROOKLYN|              9|                 0|
|10/17/2016|     BROOKLYN|              9|                 0|
|09/15/2016|        BRONX|              9|                 0|
|08/25/2016|     BROOKLYN|              9|                 0|
|03/11/2017|        BRONX|              9|                 0|
|01/15/2016|     BROOKLYN|              9|                 0|
|04/04/2016|       QUEENS|              9|                 0|
|03/01/2016|     BROOKLYN|              9|                 0|
|01/02/2016|     BROOKLYN|              9|                 0|
|11/06/2015|     BROOKLYN|              9|                 0|
|07/26/2015|     BROOKLYN|              9|                 0|
|06/20/2015|STATEN ISLAND|              9|                 0|
|06/01/2015|        BRONX|              9|                 0|
|07/23/2016|       QUEENS|              9|                 0|
|10/02/2017|       QUEENS|              9|                 0|
|05/27/2016|    MANHATTAN|              9|                 0|
+----------+-------------+---------------+------------------+
only showing top 20 rows

Filtering

Now, let's do some filtering. There are multiple ways to filter/select data in spark. We will cover few of them here.

In [19]:
# Select rows where BOROUGH is QUEENS
df_new.filter(df_new['BOROUGH']=='QUEENS').show()
+----------+-------+---------------+--------------+-------------------+------------------+
|      DATE|BOROUGH|PERSONS INJURED|PERSONS KILLED|PEDESTRIANS INJURED|PEDESTRIANS KILLED|
+----------+-------+---------------+--------------+-------------------+------------------+
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              3|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              2|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              2|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              2|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
+----------+-------+---------------+--------------+-------------------+------------------+
only showing top 20 rows

In [20]:
# Select rows for dates where number of PERSONS KILLED is greater than 5
df_new.filter(df_new['PERSONS KILLED']>5).show()
+----------+---------+---------------+--------------+-------------------+------------------+
|      DATE|  BOROUGH|PERSONS INJURED|PERSONS KILLED|PEDESTRIANS INJURED|PEDESTRIANS KILLED|
+----------+---------+---------------+--------------+-------------------+------------------+
|10/31/2017|MANHATTAN|             12|             8|                  8|                 8|
+----------+---------+---------------+--------------+-------------------+------------------+

In [21]:
# We can also use SQL like syntax by first creating a view and then querying it
df_new.createOrReplaceTempView('sql_example')
spark.sql('select * from sql_example where BOROUGH="QUEENS"').show()
+----------+-------+---------------+--------------+-------------------+------------------+
|      DATE|BOROUGH|PERSONS INJURED|PERSONS KILLED|PEDESTRIANS INJURED|PEDESTRIANS KILLED|
+----------+-------+---------------+--------------+-------------------+------------------+
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              3|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              2|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              2|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              2|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
|12/12/2017| QUEENS|              0|             0|                  0|                 0|
+----------+-------+---------------+--------------+-------------------+------------------+
only showing top 20 rows

In [22]:
# Here is another example where I am doing aggregation using the SQL syntax
spark.sql('select BOROUGH, count("PERSONS INJURED") from sql_example group by BOROUGH').show()
+-------------+----------------------+
|      BOROUGH|count(PERSONS INJURED)|
+-------------+----------------------+
|       QUEENS|                219629|
|     BROOKLYN|                258718|
|        BRONX|                111837|
|    MANHATTAN|                212652|
|STATEN ISLAND|                 38738|
+-------------+----------------------+

Hope you found this post helpful. In future, I will be writing some more posts about how we can work with machine learning models in Spark.

Leave a comment

Your email address will not be published. Required fields are marked *