Using Spark’s ML library for Machine Learning
Through this project, I will guide you around the use of Apache Spark’s Machine Learning library. In this exercise, we’ll be working with the ‘flights’ database, which contains information on flights in the US including departure and arrival airport, delays information, airline, among others.
Our objective will be to predict a flight’s departure delay with available data. To do this, we will use different models, tuned through a Parameter map with which we can try different parameter values and finally run through a ‘Magic Loop’ to try both of them out. Feature engineering, if required, will be done with a Pipeline object.
Dataset used may be downloaded from here
A glossary for the variables can be found here
Creating a Spark Session
In this project I scripted the code to test its proper functionality on my computer, using jupyter’s pyspark docker image. After running all of my code locally and making sure there weren’t any errors, I created a cluster on AWS' EMR with 4 r4.large instances which have 4 cores and 30 GBs RAM each. I also changed the following settings to Spark:
spark.executor.memory 20G
spark.executor.cores 4
spark.driver.memory 20G
The following script is as was run on the AWS cluster:
# Added libraries.
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window
from pyspark.ml import Pipeline
from pyspark.ml import Transformer
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Bucketizer, RFormula
import pandas as pd
import numpy as np
import time
pd.set_option('display.max_columns', 40)
# The following scripts are to start a SparkSession on local mode
#spark = SparkSession.builder.master("local[*]")\
#.config("spark.executor.memory", "10G")\
#.config("spark.driver.memory", "10G")\
#.config("spark.memory.fraction", ".8")\
#.getOrCreate()
# Using * within the box in local means we grant Spark access to all threads on our computer, this may be changed.
Exploratory Analysis
After this, we’ll load the database into Spark and we’ll take a first look at what we have:
flights = spark.read.csv('s3://daniel-sharp/datos/flights/flights.csv', header =True)
# read.csv has an argument 'inferSchema' which will try to parse columns to their 'appropiate' type. However,
# here we avoid it as it produces unwanted results, such as parsing time "0005" to 5, which means we lose information
# As it is, it will parse all columns to type 'string'
First, we will take a look at what data we have:
num_rows = flights.count()
num_rows
5819079
So we have information from 5,819,079 flights.
Next, we will look at our data and check if we need to make any changes:
flights.limit(10).toPandas()
YEAR | MONTH | DAY | DAY_OF_WEEK | AIRLINE | FLIGHT_NUMBER | TAIL_NUMBER | ORIGIN_AIRPORT | DESTINATION_AIRPORT | SCHEDULED_DEPARTURE | DEPARTURE_TIME | DEPARTURE_DELAY | TAXI_OUT | WHEELS_OFF | SCHEDULED_TIME | ELAPSED_TIME | AIR_TIME | DISTANCE | WHEELS_ON | TAXI_IN | SCHEDULED_ARRIVAL | ARRIVAL_TIME | ARRIVAL_DELAY | DIVERTED | CANCELLED | CANCELLATION_REASON | AIR_SYSTEM_DELAY | SECURITY_DELAY | AIRLINE_DELAY | LATE_AIRCRAFT_DELAY | WEATHER_DELAY | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2015 | 1 | 1 | 4 | AS | 98 | N407AS | ANC | SEA | 0005 | 2354 | -11 | 21 | 0015 | 205 | 194 | 169 | 1448 | 0404 | 4 | 0430 | 0408 | -22 | 0 | 0 | None | None | None | None | None | None |
1 | 2015 | 1 | 1 | 4 | AA | 2336 | N3KUAA | LAX | PBI | 0010 | 0002 | -8 | 12 | 0014 | 280 | 279 | 263 | 2330 | 0737 | 4 | 0750 | 0741 | -9 | 0 | 0 | None | None | None | None | None | None |
2 | 2015 | 1 | 1 | 4 | US | 840 | N171US | SFO | CLT | 0020 | 0018 | -2 | 16 | 0034 | 286 | 293 | 266 | 2296 | 0800 | 11 | 0806 | 0811 | 5 | 0 | 0 | None | None | None | None | None | None |
3 | 2015 | 1 | 1 | 4 | AA | 258 | N3HYAA | LAX | MIA | 0020 | 0015 | -5 | 15 | 0030 | 285 | 281 | 258 | 2342 | 0748 | 8 | 0805 | 0756 | -9 | 0 | 0 | None | None | None | None | None | None |
4 | 2015 | 1 | 1 | 4 | AS | 135 | N527AS | SEA | ANC | 0025 | 0024 | -1 | 11 | 0035 | 235 | 215 | 199 | 1448 | 0254 | 5 | 0320 | 0259 | -21 | 0 | 0 | None | None | None | None | None | None |
5 | 2015 | 1 | 1 | 4 | DL | 806 | N3730B | SFO | MSP | 0025 | 0020 | -5 | 18 | 0038 | 217 | 230 | 206 | 1589 | 0604 | 6 | 0602 | 0610 | 8 | 0 | 0 | None | None | None | None | None | None |
6 | 2015 | 1 | 1 | 4 | NK | 612 | N635NK | LAS | MSP | 0025 | 0019 | -6 | 11 | 0030 | 181 | 170 | 154 | 1299 | 0504 | 5 | 0526 | 0509 | -17 | 0 | 0 | None | None | None | None | None | None |
7 | 2015 | 1 | 1 | 4 | US | 2013 | N584UW | LAX | CLT | 0030 | 0044 | 14 | 13 | 0057 | 273 | 249 | 228 | 2125 | 0745 | 8 | 0803 | 0753 | -10 | 0 | 0 | None | None | None | None | None | None |
8 | 2015 | 1 | 1 | 4 | AA | 1112 | N3LAAA | SFO | DFW | 0030 | 0019 | -11 | 17 | 0036 | 195 | 193 | 173 | 1464 | 0529 | 3 | 0545 | 0532 | -13 | 0 | 0 | None | None | None | None | None | None |
9 | 2015 | 1 | 1 | 4 | DL | 1173 | N826DN | LAS | ATL | 0030 | 0033 | 3 | 12 | 0045 | 221 | 203 | 186 | 1747 | 0651 | 5 | 0711 | 0656 | -15 | 0 | 0 | None | None | None | None | None | None |
# Check for null values, with guidance from:
# https://stackoverflow.com/questions/44627386/how-to-find-count-of-null-and-nan-values-for-each-column-in-a-pyspark-dataframe
flights.select([(count(when(isnan(c) | col(c).isNull(), c))/num_rows).alias(c) for c in flights.columns]).toPandas()
YEAR | MONTH | DAY | DAY_OF_WEEK | AIRLINE | FLIGHT_NUMBER | TAIL_NUMBER | ORIGIN_AIRPORT | DESTINATION_AIRPORT | SCHEDULED_DEPARTURE | DEPARTURE_TIME | DEPARTURE_DELAY | TAXI_OUT | WHEELS_OFF | SCHEDULED_TIME | ELAPSED_TIME | AIR_TIME | DISTANCE | WHEELS_ON | TAXI_IN | SCHEDULED_ARRIVAL | ARRIVAL_TIME | ARRIVAL_DELAY | DIVERTED | CANCELLED | CANCELLATION_REASON | AIR_SYSTEM_DELAY | SECURITY_DELAY | AIRLINE_DELAY | LATE_AIRCRAFT_DELAY | WEATHER_DELAY | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.00253 | 0.0 | 0.0 | 0.0 | 0.014805 | 0.014805 | 0.015303 | 0.015303 | 0.000001 | 0.018056 | 0.018056 | 0.0 | 0.015898 | 0.015898 | 0.0 | 0.015898 | 0.018056 | 0.0 | 0.0 | 0.984554 | 0.81725 | 0.81725 | 0.81725 | 0.81725 | 0.81725 |
flights.withColumn("delay", flights["DEPARTURE_DELAY"].cast(DoubleType())).describe("delay").show()
+-------+------------------+
|summary| delay|
+-------+------------------+
| count| 5732926|
| mean| 9.370158275198389|
| stddev|37.080942496786925|
| min| -82.0|
| max| 1988.0|
+-------+------------------+
Data Cleaning and Feature Engineering
From the latter, we know the following changes can be made:
-
Parse to integer:
- Departure Delay (our response variable)
- Distance
- Arrival delay
- Scheduled time
- Month
- Day of the week
- Day
-
Parse to time - where I will extract the hour
- Scheduled Departure
- Departure time
- Scheduled arrival
- Arrival time
- Scheduled Arrival
-
Columns to delete - too many missing values -
- Cancellation Reason
- Air System Delay
- Security Delay
- Airline Delay
- Late Aircraft Delay
- Weather Delay
- Year (we know all our data is from 2015, so this column has no variance and thus is useless)
-
Doubtful variables - may be redundant to other information (high correlation) & data leakage -
- Taxi in
- Taxi out
- Air time
- Wheels on
- Wheels off
- Air time
- Diverted
- Cancelled
- Elapsed time
I will also remove all remaining rows with Null values, which will represent 1.8% of the data at the most, which is insignificant.
It is also important to consider that some of these variables may produce ‘data leakage’ as they could only be known after or along with the fact, for example: DEPARTURE_TIME, WHEELS_OFF, among others. We need to remove these variables for the same flight.
Since we want to fit all these steps in a pipeline, we need to creates a series of classes that have a transform method within them. These are outlined next:
# To parse strings into integers
class ParserDouble(Transformer):
def __init__(self, columns=[None]):
self.columns = columns
def _transform(self, df):
for col in self.columns:
df = df.withColumn(col, df[col].cast(DoubleType()))
self = df
return self
# To separate time into hours + minutes
class getHour(Transformer):
def __init__(self, columns=[None]):
self.columns = columns
def _transform(self, df):
for col in self.columns:
df = df.withColumn(col+"_HOUR", substring(col,0,2))
self = df
return self
# To remove unwanted columns
class rm_cols(Transformer):
def __init__(self, columns=[None]):
self.columns = columns
def _transform(self, df):
for col in self.columns:
df = df.drop(col)
self = df
return self
# To remove rows with Null values
class rm_nulls(Transformer):
def _transform(self, df):
self = df.na.drop(how="any")
return self
Feature Engineering
It’s important to note some variables aren’t relevant for out prediction, such as those that happened after flight departure, eg. Arrival time at destination for flight of interest. However, this same variable might be relevant for our same flight’s arrival previous to its departure time. We might be even interested in generating a ‘time at airport’ variable, which considers the time from landing to departure. We can do this by lagging rows for every flight tail number. This way we can ‘track’ the specific airplanes.
Also, having categorical values such as Day and Time make our model much more complex as we would have to calculate n-1 parameters for each one. To avoid this, I will ‘Bucketize’ these columns into weeks and periods respectively. This way we can have week 1, week 2, week 3 and week 4 instead of days 1-31. This means our models will calculate 3 parameters instead of 30. The case for hours is analogous, I grouped the time variable into period of 4 hours each. Unfortunately, doing this same exercise for the Aiports or Airlines variables is not possible. Maybe they could be categorized by cost-levels (maybe expecting cheaper airlines to be late more often), but I do not have that knowledge now. Airports could probably be categorized by regions, where maybe regions with more extreme weather face more delays.
We work on this next:
# Lag relevant columns from previous flights
class lagger(Transformer):
def __init__(self, columns=[None], grouper = "TAIL_NUMBER"):
self.columns = columns
self.grouper = grouper
def _transform(self, df):
for col in self.columns:
df = df.withColumn(col+"_PREV",lag(flights[col])
.over(Window.partitionBy(self.grouper).orderBy(self.grouper)))
self = df
return self
# Create column with time difference between events
class t_diff(Transformer):
def __init__(self, col_name = None, columns=[None]):
self.columns = columns
self.col_name = col_name
def _transform(self, df):
self = df.withColumn(self.col_name, when(df["DAY_OF_WEEK"] > df["DAY_OF_WEEK_PREV"],
2400 + df[self.columns[0]] - df[self.columns[1]]).
otherwise(df[self.columns[0]] - df[self.columns[1]]))
#self = df.withColumn(self.col_name, df[self.columns[0]] - df[self.columns[1]])
return self
Afterwards, we configure all the steps that will be used in out pipeline, which include removing columns with high number of null values and those that represent data leakage, lagging variables to get information on the airplanes previous flight and creating new variables, such as the time at airport variable. Finally, for use in our models, we have to ‘One Hot Encode’ our categorical variables.
# Remove columns with significantly high NAs
rm_cols_nas = rm_cols(["CANCELLATION_REASON","AIR_SYSTEM_DELAY","SECURITY_DELAY","AIRLINE_DELAY","LATE_AIRCRAFT_DELAY",
"WEATHER_DELAY"])
# Remove columns not useful to analysis or that present data leakage risk
rm_cols_unnecessary = rm_cols(["WHEELS_OFF", "TAXI_OUT", "CANCELLED", "DIVERTED", "WHEELS_ON",
"AIR_TIME", "TAXI_IN", "YEAR", "ELAPSED_TIME", "SCHEDULED_TIME", "AIR_TIME"])
# Remove rows with null values
rm_nulls1 = rm_nulls()
# Lag columns from the flights previous trip (its grouped by TAIL_NUMBER)
lagger1 = lagger(columns = ["SCHEDULED_ARRIVAL", "DEPARTURE_TIME", "DEPARTURE_DELAY", "ARRIVAL_DELAY",
"DAY_OF_WEEK"], grouper = "TAIL_NUMBER")
# Remove columns not useful to analysis or that present data leakage risk
rm_cols_leakage= rm_cols(["SCHEDULED_ARRIVAL", "ARRIVAL_TIME", "ARRIVAL_DELAY", "DEPARTURE_TIME"])
# Convert to integer
double_maker = ParserDouble(["DEPARTURE_DELAY", "DISTANCE", "ARRIVAL_DELAY_PREV"])
# Create time at airport variable (it doesn't really produce a time value as such, but represents a time 'proxy')
t_at_airport = t_diff(col_name = "SCHEDULED_TIME_AT_AIRPORT", columns = ["SCHEDULED_DEPARTURE","SCHEDULED_ARRIVAL_PREV"])
# Separate time columns into hour-minute columns
h_m_sep = getHour(["SCHEDULED_DEPARTURE"])
# Final parse of strings to ints for relevant variables
double_maker2 = ParserDouble(["DEPARTURE_DELAY_PREV","SCHEDULED_DEPARTURE_HOUR", "DAY"])
# Bucketize factor columns with many labels.
bucketizer_day = Bucketizer(splits=[-float("inf"), 7, 14, 21, float("inf")], inputCol="DAY", outputCol="WEEK")
bucketizer_time = Bucketizer(splits=[-float("inf"), 4, 8, 12, 16, 20, float("inf")], inputCol="SCHEDULED_DEPARTURE_HOUR",
outputCol="SCHED_DEPARTURE_PERIOD")
# Finally, we need to 'One-hot encode' our categorical variables for use in the models
cat_vars = ["AIRLINE", "ORIGIN_AIRPORT", "MONTH", "DAY_OF_WEEK", "WEEK", "SCHED_DEPARTURE_PERIOD"]
indexers = [StringIndexer(inputCol=column, outputCol=column+"_INDEX") for column in cat_vars ]
hot_encoders = [OneHotEncoder(inputCol=column+"_INDEX", outputCol=column+"_HOT") for column in cat_vars ]
# Remove all unnecessary variables remaining
rm_rem= rm_cols(["SCHEDULED_ARRIVAL_PREV", "SCHEDULED_DEPARTURE", "ARRIVAL_TIME_PREV", "DEPARTURE_TIME_PREV",
"AIRLINE_INDEX", 'TAIL_NUMBER','ORIGIN_AIRPORT_INDEX',"AIRLINE",'FLIGHT_NUMBER', 'TAIL_NUMBER',
'ORIGIN_AIRPORT','DESTINATION_AIRPORT', "DAY_OF_WEEK_PREV", "MONTH", "DAY", "DAY_OF_WEEK",
"MONTH_INDEX", "WEEK_INDEX","DAY_OF_WEEK_INDEX", "WEEK", "SCHEDULED_ARRIVAL_PREV",
"SCHEDULED_DEPARTURE_HOUR", "SCHED_DEPARTURE_PERIOD_INDEX", "SCHED_DEPARTURE_PERIOD"])
data_prepare = Pipeline(stages = [rm_cols_nas, rm_cols_unnecessary, rm_nulls1, lagger1, rm_cols_leakage, double_maker,
rm_nulls1, t_at_airport, h_m_sep, double_maker2, bucketizer_day, bucketizer_time] +
indexers + hot_encoders + [rm_rem])
clean_flights = data_prepare.fit(flights).transform(flights)
And now, our data is ready to be run through models!
clean_flights.limit(10).toPandas()
DEPARTURE_DELAY | DISTANCE | DEPARTURE_DELAY_PREV | ARRIVAL_DELAY_PREV | SCHEDULED_TIME_AT_AIRPORT | AIRLINE_HOT | ORIGIN_AIRPORT_HOT | MONTH_HOT | DAY_OF_WEEK_HOT | WEEK_HOT | SCHED_DEPARTURE_PERIOD_HOT | |
---|---|---|---|---|---|---|---|---|---|---|---|
0 | -3.0 | 500.0 | 75.0 | 72.0 | 1389.0 | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (0.0, 0.0, 0.0, 0.0, 1.0, 0.0) | (0.0, 0.0, 0.0) | (1.0, 0.0, 0.0, 0.0, 0.0) |
1 | 46.0 | 130.0 | -3.0 | -7.0 | 1006.0 | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (0.0, 0.0, 0.0, 0.0, 1.0, 0.0) | (0.0, 0.0, 0.0) | (0.0, 0.0, 0.0, 0.0, 1.0) |
2 | -6.0 | 130.0 | 46.0 | 36.0 | 817.0 | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (0.0, 0.0, 1.0, 0.0, 0.0, 0.0) | (0.0, 0.0, 0.0) | (0.0, 0.0, 0.0, 1.0, 0.0) |
3 | -5.0 | 912.0 | -6.0 | -3.0 | 112.0 | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (0.0, 0.0, 1.0, 0.0, 0.0, 0.0) | (0.0, 0.0, 0.0) | (0.0, 0.0, 0.0, 1.0, 0.0) |
4 | -10.0 | 912.0 | -5.0 | 2.0 | 86.0 | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (0.0, 0.0, 1.0, 0.0, 0.0, 0.0) | (0.0, 0.0, 0.0) | (1.0, 0.0, 0.0, 0.0, 0.0) |
5 | 67.0 | 912.0 | -10.0 | -17.0 | -30.0 | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (0.0, 0.0, 1.0, 0.0, 0.0, 0.0) | (0.0, 0.0, 0.0) | (0.0, 1.0, 0.0, 0.0, 0.0) |
6 | 41.0 | 912.0 | 67.0 | 51.0 | 86.0 | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (0.0, 0.0, 1.0, 0.0, 0.0, 0.0) | (0.0, 0.0, 0.0) | (0.0, 1.0, 0.0, 0.0, 0.0) |
7 | 6.0 | 366.0 | 41.0 | 26.0 | 59.0 | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (0.0, 0.0, 1.0, 0.0, 0.0, 0.0) | (0.0, 0.0, 0.0) | (0.0, 0.0, 1.0, 0.0, 0.0) |
8 | 0.0 | 366.0 | 6.0 | 3.0 | 927.0 | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (1.0, 0.0, 0.0, 0.0, 0.0, 0.0) | (0.0, 0.0, 0.0) | (0.0, 0.0, 0.0, 1.0, 0.0) |
9 | -6.0 | 130.0 | 0.0 | 13.0 | 104.0 | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... | (1.0, 0.0, 0.0, 0.0, 0.0, 0.0) | (0.0, 0.0, 0.0) | (1.0, 0.0, 0.0, 0.0, 0.0) |
And just to verify how much data we lost from Null values:
(1 - clean_flights.count()/flights.count()) * 100
1.8897664046148899
We only lost ~1.9% of our data to null values.
Running models and tuning
As stated at the beggining, our objetive is to find the best model to predict a flight’s departure delay. This means we will try to predict a number and, thus, will need a regression model. In this case, we will work with two models: the standard Linear Regression Model and the Random Forest model.
To start with, we have to divide our data in a training and a test sample, which we’ll do 70% and 30% respectively. We can do a random split as we don’t have sequential data to worry about
(train_flights, test_flights) = clean_flights.randomSplit([0.70, 0.30])
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.param import Params
Now we’ll set up a ‘magic loop’ (so called by Rayid Ghani from Chicago’s Data Science, Public Policy, Social Good Center for Data Science and Public Policy). The Magic Loop will iterate through our both models, running each one with different parameters, which we’ll define with the ParamGridBuilder. In each model run we will use 10 fold cross validation and finally will evaluate the ‘best model’s’ performance on the test sample.
Instead of using ‘for’ loops for our Magic Loop, I adapted code from Bryan Cutler’s CV Pipelines, which allows us to try diffferent models on a single CrossValidator object, which acts out as our Magic Loop. The benefit from doing this, is that our CrossValidator will run all the different models we give it and keep the best one.
# Define the base characteristics of the models we will use:
lr = LinearRegression(maxIter=10, featuresCol='features', labelCol='label')
rf = RandomForestRegressor(subsamplingRate=0.15, featuresCol='features', labelCol='label')
# Spark ML models require you input a 'label' column and a 'features' column.
# To do this, we use vector assembler, which produces a 'features' column where each record is a vector of our covariates
formula = RFormula(formula = "DEPARTURE_DELAY ~ .")
#rm_extra = rm_cols(f_vars)
# The following structure was adapted from https://bryancutler.github.io/cv-pipelines/
pipeline = Pipeline(stages=[formula])
paramGrid_lr = ParamGridBuilder() \
.baseOn({pipeline.stages: [formula, lr]}) \
.addGrid(lr.elasticNetParam, [0.5, 1.0, 0.0]) \
.addGrid(lr.regParam, [0.01, 0.001, 0.1])\
.build()
paramGrid_rf = ParamGridBuilder() \
.baseOn({pipeline.stages : [formula, rf]}) \
.addGrid(rf.numTrees, [10, 20, 30]) \
.addGrid(rf.featureSubsetStrategy, ['onethird', '0.5', 'sqrt'])\
.build()
evaluator = RegressionEvaluator(metricName='rmse')
grids = paramGrid_rf + paramGrid_lr
crossval = CrossValidator(estimatorParamMaps=grids,
estimator=pipeline,
evaluator=evaluator,
numFolds=10,
parallelism = 4)
Here we fit out traning data to all our models. cvModel contains our best model along with other information from the Cross Validation runs:
start_time = time.time()
cvModel = crossval.fit(train_flights)
end_time = time.time()
print("--- Magic Loop execution: %s seconds ---" % (end_time - start_time))
--- Magic Loop execution: 15658.50262594223 seconds ---
Running the Magic Loop on the EMR cluster took ~15,659 seconds, which is around 4.3 hours. This might sound like a long time, but we just trained 180 models ($3^2 = 9$ combinations per model, with 10 fold Cross Validation).
Spark’s objetive is more for production rather than for model exploration, which is why identifying model’s performace isn’t as easy as it is in Sci-kit learn, however, it can still be done.
Following, we can see the average performance (RMSE) for each model during its k-fold runs, the metrics are shown in order of exectution, which means that the first half of values belong to the Random Forest model and the second half belong to our Linear Regression runs:
cvModel.avgMetrics
[31.425270934910127,
30.845918655526823,
34.94443101990099,
31.398382173733125,
30.81426642031701,
34.85703232171853,
31.417874346423805,
30.83880871086108,
35.210848036877124,
33.69249481427418,
33.69268537951549,
33.695178433419564,
33.69562579280449,
33.70458705476841,
33.693430289161746,
33.703639429397434,
33.72220262933084,
33.6949916706693]
To obtain the best model we can use find the index of the element with the lowest value (in this case because its RMSE, but in others we might want to find the maximum), and extract its corresponding parameters. This only shows the best parameters as run by the CrossValidors, the rest of the model’s parameters are either at default values, or as defined when the models are declared.
Best Model
cvModel.getEstimatorParamMaps()[ np.argmin(cvModel.avgMetrics) ]
{Param(parent='Pipeline_4d63958e14f5e691468e', name='stages', doc='a list of pipeline stages'): [RFormula_42c8b8393de389d8b760,
RandomForestRegressor_483f833d74c3e990ce8d],
Param(parent='RandomForestRegressor_483f833d74c3e990ce8d', name='featureSubsetStrategy', doc='The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n].'): '0.5',
Param(parent='RandomForestRegressor_483f833d74c3e990ce8d', name='numTrees', doc='Number of trees to train (>= 1).'): 20}
np.min(cvModel.avgMetrics)
30.81426642031701
The best model is a Random Forest regressor, using a subset strategy of 50% of the variables and 20 trees. It achieved an RMSE of 30.81.
Worst performer
cvModel.getEstimatorParamMaps()[ np.argmax(cvModel.avgMetrics) ]
{Param(parent='Pipeline_4d63958e14f5e691468e', name='stages', doc='a list of pipeline stages'): [RFormula_42c8b8393de389d8b760,
RandomForestRegressor_483f833d74c3e990ce8d],
Param(parent='RandomForestRegressor_483f833d74c3e990ce8d', name='featureSubsetStrategy', doc='The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n].'): 'sqrt',
Param(parent='RandomForestRegressor_483f833d74c3e990ce8d', name='numTrees', doc='Number of trees to train (>= 1).'): 30}
np.max(cvModel.avgMetrics)
35.210848036877124
The worst performer was a Random Forest regressor with a subset strategy of using the square root of the number of variables and 30 trees.
We can also manually give the index to find out the parameters for a specific model. (Remember, the index corresponds to the avgMetrics array.
Best Linear Regression
# I use 9+ argmin because we know the first 9 results are from the Random Forest iterations
cvModel.getEstimatorParamMaps()[9 + np.argmin(cvModel.avgMetrics[8:])]
{Param(parent='LinearRegression_4183b499386e856ec022', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 1.0,
Param(parent='LinearRegression_4183b499386e856ec022', name='regParam', doc='regularization parameter (>= 0).'): 0.01,
Param(parent='Pipeline_4d63958e14f5e691468e', name='stages', doc='a list of pipeline stages'): [RFormula_42c8b8393de389d8b760,
LinearRegression_4183b499386e856ec022]}
The best Linear Regression model has regularization parameter 0.01 and using L1 (also known as lasso) regularization. The variance in Linear Regression performance (measured as RMSE) was minimal, with all of them being between 33.2 and 33.73.
Finally, we can test our ‘best’ model from the training phase on our test sample, which is new, unseen data:
evaluator.evaluate(cvModel.transform(test_flights))
31.228861151885553
cvModel.transform(test_flights).limit(10).select("label","prediction").show()
+-----+------------------+
|label| prediction|
+-----+------------------+
|-19.0| 28.88757275873146|
|-19.0| 2.462289130631162|
|-19.0|1.6668822768061389|
|-19.0|2.1210669714752477|
|-18.0| 1.24498749884627|
|-18.0| 2.462289130631162|
|-18.0|1.6836043331825006|
|-18.0|1.6991721935153794|
|-17.0| 1.24498749884627|
|-17.0| 2.462289130631162|
+-----+------------------+