Using Spark’s ML library for Machine Learning

Through this project, I will guide you around the use of Apache Spark’s Machine Learning library. In this exercise, we’ll be working with the ‘flights’ database, which contains information on flights in the US including departure and arrival airport, delays information, airline, among others.

Our objective will be to predict a flight’s departure delay with available data. To do this, we will use different models, tuned through a Parameter map with which we can try different parameter values and finally run through a ‘Magic Loop’ to try both of them out. Feature engineering, if required, will be done with a Pipeline object.

Dataset used may be downloaded from here
A glossary for the variables can be found here

Creating a Spark Session

In this project I scripted the code to test its proper functionality on my computer, using jupyter’s pyspark docker image. After running all of my code locally and making sure there weren’t any errors, I created a cluster on AWS' EMR with 4 r4.large instances which have 4 cores and 30 GBs RAM each. I also changed the following settings to Spark:

spark.executor.memory 20G
spark.executor.cores 4
spark.driver.memory 20G

The following script is as was run on the AWS cluster:

# Added libraries.
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window
from pyspark.ml import Pipeline
from pyspark.ml import Transformer
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Bucketizer, RFormula
import pandas as pd
import numpy as np
import time
pd.set_option('display.max_columns', 40)

# The following scripts are to start a SparkSession on local mode
#spark = SparkSession.builder.master("local[*]")\
#.config("spark.executor.memory", "10G")\
#.config("spark.driver.memory", "10G")\
#.config("spark.memory.fraction", ".8")\
#.getOrCreate()
# Using * within the box in local means we grant Spark access to all threads on our computer, this may be changed.

Exploratory Analysis

After this, we’ll load the database into Spark and we’ll take a first look at what we have:

flights = spark.read.csv('s3://daniel-sharp/datos/flights/flights.csv', header =True) 
# read.csv has an argument 'inferSchema' which will try to parse columns to their 'appropiate' type. However,
# here we avoid it as it produces unwanted results, such as parsing time "0005" to 5, which means we lose information
# As it is, it will parse all columns to type 'string'

First, we will take a look at what data we have:

num_rows = flights.count()
num_rows
5819079

So we have information from 5,819,079 flights.

Next, we will look at our data and check if we need to make any changes:

flights.limit(10).toPandas()

YEAR MONTH DAY DAY_OF_WEEK AIRLINE FLIGHT_NUMBER TAIL_NUMBER ORIGIN_AIRPORT DESTINATION_AIRPORT SCHEDULED_DEPARTURE DEPARTURE_TIME DEPARTURE_DELAY TAXI_OUT WHEELS_OFF SCHEDULED_TIME ELAPSED_TIME AIR_TIME DISTANCE WHEELS_ON TAXI_IN SCHEDULED_ARRIVAL ARRIVAL_TIME ARRIVAL_DELAY DIVERTED CANCELLED CANCELLATION_REASON AIR_SYSTEM_DELAY SECURITY_DELAY AIRLINE_DELAY LATE_AIRCRAFT_DELAY WEATHER_DELAY
0 2015 1 1 4 AS 98 N407AS ANC SEA 0005 2354 -11 21 0015 205 194 169 1448 0404 4 0430 0408 -22 0 0 None None None None None None
1 2015 1 1 4 AA 2336 N3KUAA LAX PBI 0010 0002 -8 12 0014 280 279 263 2330 0737 4 0750 0741 -9 0 0 None None None None None None
2 2015 1 1 4 US 840 N171US SFO CLT 0020 0018 -2 16 0034 286 293 266 2296 0800 11 0806 0811 5 0 0 None None None None None None
3 2015 1 1 4 AA 258 N3HYAA LAX MIA 0020 0015 -5 15 0030 285 281 258 2342 0748 8 0805 0756 -9 0 0 None None None None None None
4 2015 1 1 4 AS 135 N527AS SEA ANC 0025 0024 -1 11 0035 235 215 199 1448 0254 5 0320 0259 -21 0 0 None None None None None None
5 2015 1 1 4 DL 806 N3730B SFO MSP 0025 0020 -5 18 0038 217 230 206 1589 0604 6 0602 0610 8 0 0 None None None None None None
6 2015 1 1 4 NK 612 N635NK LAS MSP 0025 0019 -6 11 0030 181 170 154 1299 0504 5 0526 0509 -17 0 0 None None None None None None
7 2015 1 1 4 US 2013 N584UW LAX CLT 0030 0044 14 13 0057 273 249 228 2125 0745 8 0803 0753 -10 0 0 None None None None None None
8 2015 1 1 4 AA 1112 N3LAAA SFO DFW 0030 0019 -11 17 0036 195 193 173 1464 0529 3 0545 0532 -13 0 0 None None None None None None
9 2015 1 1 4 DL 1173 N826DN LAS ATL 0030 0033 3 12 0045 221 203 186 1747 0651 5 0711 0656 -15 0 0 None None None None None None
# Check for null values, with guidance from:
# https://stackoverflow.com/questions/44627386/how-to-find-count-of-null-and-nan-values-for-each-column-in-a-pyspark-dataframe
flights.select([(count(when(isnan(c) | col(c).isNull(), c))/num_rows).alias(c) for c in flights.columns]).toPandas()

YEAR MONTH DAY DAY_OF_WEEK AIRLINE FLIGHT_NUMBER TAIL_NUMBER ORIGIN_AIRPORT DESTINATION_AIRPORT SCHEDULED_DEPARTURE DEPARTURE_TIME DEPARTURE_DELAY TAXI_OUT WHEELS_OFF SCHEDULED_TIME ELAPSED_TIME AIR_TIME DISTANCE WHEELS_ON TAXI_IN SCHEDULED_ARRIVAL ARRIVAL_TIME ARRIVAL_DELAY DIVERTED CANCELLED CANCELLATION_REASON AIR_SYSTEM_DELAY SECURITY_DELAY AIRLINE_DELAY LATE_AIRCRAFT_DELAY WEATHER_DELAY
0 0.0 0.0 0.0 0.0 0.0 0.0 0.00253 0.0 0.0 0.0 0.014805 0.014805 0.015303 0.015303 0.000001 0.018056 0.018056 0.0 0.015898 0.015898 0.0 0.015898 0.018056 0.0 0.0 0.984554 0.81725 0.81725 0.81725 0.81725 0.81725
flights.withColumn("delay", flights["DEPARTURE_DELAY"].cast(DoubleType())).describe("delay").show()
+-------+------------------+
|summary|             delay|
+-------+------------------+
|  count|           5732926|
|   mean| 9.370158275198389|
| stddev|37.080942496786925|
|    min|             -82.0|
|    max|            1988.0|
+-------+------------------+

Data Cleaning and Feature Engineering

From the latter, we know the following changes can be made:

  • Parse to integer:

    • Departure Delay (our response variable)
    • Distance
    • Arrival delay
    • Scheduled time
    • Month
    • Day of the week
    • Day
  • Parse to time - where I will extract the hour

    • Scheduled Departure
    • Departure time
    • Scheduled arrival
    • Arrival time
    • Scheduled Arrival
  • Columns to delete - too many missing values -

    • Cancellation Reason
    • Air System Delay
    • Security Delay
    • Airline Delay
    • Late Aircraft Delay
    • Weather Delay
    • Year (we know all our data is from 2015, so this column has no variance and thus is useless)
  • Doubtful variables - may be redundant to other information (high correlation) & data leakage -

    • Taxi in
    • Taxi out
    • Air time
    • Wheels on
    • Wheels off
    • Air time
    • Diverted
    • Cancelled
    • Elapsed time

I will also remove all remaining rows with Null values, which will represent 1.8% of the data at the most, which is insignificant.

It is also important to consider that some of these variables may produce ‘data leakage’ as they could only be known after or along with the fact, for example: DEPARTURE_TIME, WHEELS_OFF, among others. We need to remove these variables for the same flight.

Since we want to fit all these steps in a pipeline, we need to creates a series of classes that have a transform method within them. These are outlined next:

# To parse strings into integers
class ParserDouble(Transformer):  

    def __init__(self, columns=[None]):
        self.columns = columns

    def _transform(self, df):
        for col in self.columns:
            df = df.withColumn(col, df[col].cast(DoubleType()))
        self = df
        return self
# To separate time into hours + minutes
class getHour(Transformer):  

    def __init__(self, columns=[None]):
        self.columns = columns

    def _transform(self, df):
        for col in self.columns:
            df = df.withColumn(col+"_HOUR", substring(col,0,2))
        self = df
        return self
# To remove unwanted columns
class rm_cols(Transformer):  

    def __init__(self, columns=[None]):
        self.columns = columns

    def _transform(self, df):
        for col in self.columns:
            df = df.drop(col)
        self = df
        return self
# To remove rows with Null values
class rm_nulls(Transformer):  
    def _transform(self, df):
        self = df.na.drop(how="any")
        return self

Feature Engineering

It’s important to note some variables aren’t relevant for out prediction, such as those that happened after flight departure, eg. Arrival time at destination for flight of interest. However, this same variable might be relevant for our same flight’s arrival previous to its departure time. We might be even interested in generating a ‘time at airport’ variable, which considers the time from landing to departure. We can do this by lagging rows for every flight tail number. This way we can ‘track’ the specific airplanes.

Also, having categorical values such as Day and Time make our model much more complex as we would have to calculate n-1 parameters for each one. To avoid this, I will ‘Bucketize’ these columns into weeks and periods respectively. This way we can have week 1, week 2, week 3 and week 4 instead of days 1-31. This means our models will calculate 3 parameters instead of 30. The case for hours is analogous, I grouped the time variable into period of 4 hours each. Unfortunately, doing this same exercise for the Aiports or Airlines variables is not possible. Maybe they could be categorized by cost-levels (maybe expecting cheaper airlines to be late more often), but I do not have that knowledge now. Airports could probably be categorized by regions, where maybe regions with more extreme weather face more delays.

We work on this next:

# Lag relevant columns from previous flights
class lagger(Transformer):  

    def __init__(self, columns=[None], grouper = "TAIL_NUMBER"):
        self.columns = columns
        self.grouper = grouper
        
    def _transform(self, df):
        for col in self.columns:
            df = df.withColumn(col+"_PREV",lag(flights[col])
                                 .over(Window.partitionBy(self.grouper).orderBy(self.grouper)))
        self = df
        return self
# Create column with time difference between events
class t_diff(Transformer):  
    def __init__(self, col_name = None, columns=[None]):
        self.columns = columns
        self.col_name = col_name
    def _transform(self, df):
        self = df.withColumn(self.col_name, when(df["DAY_OF_WEEK"] > df["DAY_OF_WEEK_PREV"], 
                                                2400 + df[self.columns[0]] - df[self.columns[1]]).
                             otherwise(df[self.columns[0]] - df[self.columns[1]]))
        #self = df.withColumn(self.col_name, df[self.columns[0]] - df[self.columns[1]])
        return self

Afterwards, we configure all the steps that will be used in out pipeline, which include removing columns with high number of null values and those that represent data leakage, lagging variables to get information on the airplanes previous flight and creating new variables, such as the time at airport variable. Finally, for use in our models, we have to ‘One Hot Encode’ our categorical variables.

# Remove columns with significantly high NAs
rm_cols_nas = rm_cols(["CANCELLATION_REASON","AIR_SYSTEM_DELAY","SECURITY_DELAY","AIRLINE_DELAY","LATE_AIRCRAFT_DELAY",
                    "WEATHER_DELAY"])

# Remove columns not useful to analysis or that present data leakage risk
rm_cols_unnecessary = rm_cols(["WHEELS_OFF", "TAXI_OUT", "CANCELLED", "DIVERTED", "WHEELS_ON",
                              "AIR_TIME", "TAXI_IN", "YEAR", "ELAPSED_TIME", "SCHEDULED_TIME", "AIR_TIME"])

# Remove rows with null values
rm_nulls1 = rm_nulls()

# Lag columns from the flights previous trip (its grouped by TAIL_NUMBER)
lagger1 = lagger(columns = ["SCHEDULED_ARRIVAL", "DEPARTURE_TIME", "DEPARTURE_DELAY", "ARRIVAL_DELAY",
                           "DAY_OF_WEEK"], grouper = "TAIL_NUMBER")

# Remove columns not useful to analysis or that present data leakage risk
rm_cols_leakage= rm_cols(["SCHEDULED_ARRIVAL", "ARRIVAL_TIME", "ARRIVAL_DELAY", "DEPARTURE_TIME"])

# Convert to integer
double_maker = ParserDouble(["DEPARTURE_DELAY", "DISTANCE", "ARRIVAL_DELAY_PREV"])

# Create time at airport variable (it doesn't really produce a time value as such, but represents a time 'proxy')
t_at_airport = t_diff(col_name = "SCHEDULED_TIME_AT_AIRPORT", columns = ["SCHEDULED_DEPARTURE","SCHEDULED_ARRIVAL_PREV"])

# Separate time columns into hour-minute columns
h_m_sep = getHour(["SCHEDULED_DEPARTURE"])

# Final parse of strings to ints for relevant variables
double_maker2 = ParserDouble(["DEPARTURE_DELAY_PREV","SCHEDULED_DEPARTURE_HOUR", "DAY"])

# Bucketize factor columns with many labels.
bucketizer_day = Bucketizer(splits=[-float("inf"), 7, 14, 21, float("inf")], inputCol="DAY", outputCol="WEEK")
bucketizer_time = Bucketizer(splits=[-float("inf"), 4, 8, 12, 16, 20, float("inf")], inputCol="SCHEDULED_DEPARTURE_HOUR",
                             outputCol="SCHED_DEPARTURE_PERIOD")

# Finally, we need to 'One-hot encode' our categorical variables for use in the models
cat_vars = ["AIRLINE", "ORIGIN_AIRPORT", "MONTH", "DAY_OF_WEEK", "WEEK", "SCHED_DEPARTURE_PERIOD"]

indexers = [StringIndexer(inputCol=column, outputCol=column+"_INDEX") for column in cat_vars ]
hot_encoders = [OneHotEncoder(inputCol=column+"_INDEX", outputCol=column+"_HOT") for column in cat_vars ]

# Remove all unnecessary variables remaining
rm_rem= rm_cols(["SCHEDULED_ARRIVAL_PREV", "SCHEDULED_DEPARTURE", "ARRIVAL_TIME_PREV", "DEPARTURE_TIME_PREV", 
                 "AIRLINE_INDEX", 'TAIL_NUMBER','ORIGIN_AIRPORT_INDEX',"AIRLINE",'FLIGHT_NUMBER', 'TAIL_NUMBER',
                 'ORIGIN_AIRPORT','DESTINATION_AIRPORT', "DAY_OF_WEEK_PREV", "MONTH", "DAY", "DAY_OF_WEEK",
                 "MONTH_INDEX", "WEEK_INDEX","DAY_OF_WEEK_INDEX", "WEEK", "SCHEDULED_ARRIVAL_PREV",
                 "SCHEDULED_DEPARTURE_HOUR", "SCHED_DEPARTURE_PERIOD_INDEX", "SCHED_DEPARTURE_PERIOD"])
data_prepare = Pipeline(stages = [rm_cols_nas, rm_cols_unnecessary, rm_nulls1, lagger1, rm_cols_leakage, double_maker, 
                                  rm_nulls1, t_at_airport, h_m_sep, double_maker2, bucketizer_day, bucketizer_time] + 
                        indexers + hot_encoders + [rm_rem])
clean_flights = data_prepare.fit(flights).transform(flights)

And now, our data is ready to be run through models!

clean_flights.limit(10).toPandas()

DEPARTURE_DELAY DISTANCE DEPARTURE_DELAY_PREV ARRIVAL_DELAY_PREV SCHEDULED_TIME_AT_AIRPORT AIRLINE_HOT ORIGIN_AIRPORT_HOT MONTH_HOT DAY_OF_WEEK_HOT WEEK_HOT SCHED_DEPARTURE_PERIOD_HOT
0 -3.0 500.0 75.0 72.0 1389.0 (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 0.0, 0.0, 1.0, 0.0) (0.0, 0.0, 0.0) (1.0, 0.0, 0.0, 0.0, 0.0)
1 46.0 130.0 -3.0 -7.0 1006.0 (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 0.0, 0.0, 1.0, 0.0) (0.0, 0.0, 0.0) (0.0, 0.0, 0.0, 0.0, 1.0)
2 -6.0 130.0 46.0 36.0 817.0 (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 1.0, 0.0, 0.0, 0.0) (0.0, 0.0, 0.0) (0.0, 0.0, 0.0, 1.0, 0.0)
3 -5.0 912.0 -6.0 -3.0 112.0 (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 1.0, 0.0, 0.0, 0.0) (0.0, 0.0, 0.0) (0.0, 0.0, 0.0, 1.0, 0.0)
4 -10.0 912.0 -5.0 2.0 86.0 (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 1.0, 0.0, 0.0, 0.0) (0.0, 0.0, 0.0) (1.0, 0.0, 0.0, 0.0, 0.0)
5 67.0 912.0 -10.0 -17.0 -30.0 (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 1.0, 0.0, 0.0, 0.0) (0.0, 0.0, 0.0) (0.0, 1.0, 0.0, 0.0, 0.0)
6 41.0 912.0 67.0 51.0 86.0 (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 1.0, 0.0, 0.0, 0.0) (0.0, 0.0, 0.0) (0.0, 1.0, 0.0, 0.0, 0.0)
7 6.0 366.0 41.0 26.0 59.0 (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 1.0, 0.0, 0.0, 0.0) (0.0, 0.0, 0.0) (0.0, 0.0, 1.0, 0.0, 0.0)
8 0.0 366.0 6.0 3.0 927.0 (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (1.0, 0.0, 0.0, 0.0, 0.0, 0.0) (0.0, 0.0, 0.0) (0.0, 0.0, 0.0, 1.0, 0.0)
9 -6.0 130.0 0.0 13.0 104.0 (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (1.0, 0.0, 0.0, 0.0, 0.0, 0.0) (0.0, 0.0, 0.0) (1.0, 0.0, 0.0, 0.0, 0.0)

And just to verify how much data we lost from Null values:

(1 - clean_flights.count()/flights.count()) * 100
1.8897664046148899

We only lost ~1.9% of our data to null values.

Running models and tuning

As stated at the beggining, our objetive is to find the best model to predict a flight’s departure delay. This means we will try to predict a number and, thus, will need a regression model. In this case, we will work with two models: the standard Linear Regression Model and the Random Forest model.

To start with, we have to divide our data in a training and a test sample, which we’ll do 70% and 30% respectively. We can do a random split as we don’t have sequential data to worry about

(train_flights, test_flights) = clean_flights.randomSplit([0.70, 0.30])
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.param import Params

Now we’ll set up a ‘magic loop’ (so called by Rayid Ghani from Chicago’s Data Science, Public Policy, Social Good Center for Data Science and Public Policy). The Magic Loop will iterate through our both models, running each one with different parameters, which we’ll define with the ParamGridBuilder. In each model run we will use 10 fold cross validation and finally will evaluate the ‘best model’s’ performance on the test sample.

Instead of using ‘for’ loops for our Magic Loop, I adapted code from Bryan Cutler’s CV Pipelines, which allows us to try diffferent models on a single CrossValidator object, which acts out as our Magic Loop. The benefit from doing this, is that our CrossValidator will run all the different models we give it and keep the best one.

# Define the base characteristics of the models we will use:
lr = LinearRegression(maxIter=10, featuresCol='features', labelCol='label')
rf = RandomForestRegressor(subsamplingRate=0.15, featuresCol='features', labelCol='label')

# Spark ML models require you input a 'label' column and a 'features' column.
# To do this, we use vector assembler, which produces a 'features' column where each record is a vector of our covariates
formula = RFormula(formula = "DEPARTURE_DELAY ~ .")
#rm_extra = rm_cols(f_vars)
# The following structure was adapted from https://bryancutler.github.io/cv-pipelines/

pipeline = Pipeline(stages=[formula])

paramGrid_lr = ParamGridBuilder() \
    .baseOn({pipeline.stages: [formula, lr]}) \
    .addGrid(lr.elasticNetParam, [0.5, 1.0, 0.0]) \
    .addGrid(lr.regParam, [0.01, 0.001, 0.1])\
    .build()
    
paramGrid_rf = ParamGridBuilder() \
    .baseOn({pipeline.stages : [formula, rf]}) \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.featureSubsetStrategy, ['onethird', '0.5', 'sqrt'])\
    .build()
    
evaluator = RegressionEvaluator(metricName='rmse')

grids = paramGrid_rf + paramGrid_lr
crossval = CrossValidator(estimatorParamMaps=grids,
                          estimator=pipeline,
                          evaluator=evaluator,
                          numFolds=10,
                         parallelism = 4)

Here we fit out traning data to all our models. cvModel contains our best model along with other information from the Cross Validation runs:

start_time = time.time()
cvModel = crossval.fit(train_flights)
end_time = time.time()
print("--- Magic Loop execution: %s seconds ---" % (end_time - start_time))
--- Magic Loop execution: 15658.50262594223 seconds ---

Running the Magic Loop on the EMR cluster took ~15,659 seconds, which is around 4.3 hours. This might sound like a long time, but we just trained 180 models ($3^2 = 9$ combinations per model, with 10 fold Cross Validation).

Spark’s objetive is more for production rather than for model exploration, which is why identifying model’s performace isn’t as easy as it is in Sci-kit learn, however, it can still be done.

Following, we can see the average performance (RMSE) for each model during its k-fold runs, the metrics are shown in order of exectution, which means that the first half of values belong to the Random Forest model and the second half belong to our Linear Regression runs:

cvModel.avgMetrics
[31.425270934910127,
 30.845918655526823,
 34.94443101990099,
 31.398382173733125,
 30.81426642031701,
 34.85703232171853,
 31.417874346423805,
 30.83880871086108,
 35.210848036877124,
 33.69249481427418,
 33.69268537951549,
 33.695178433419564,
 33.69562579280449,
 33.70458705476841,
 33.693430289161746,
 33.703639429397434,
 33.72220262933084,
 33.6949916706693]

To obtain the best model we can use find the index of the element with the lowest value (in this case because its RMSE, but in others we might want to find the maximum), and extract its corresponding parameters. This only shows the best parameters as run by the CrossValidors, the rest of the model’s parameters are either at default values, or as defined when the models are declared.

Best Model

cvModel.getEstimatorParamMaps()[ np.argmin(cvModel.avgMetrics) ]
{Param(parent='Pipeline_4d63958e14f5e691468e', name='stages', doc='a list of pipeline stages'): [RFormula_42c8b8393de389d8b760,
  RandomForestRegressor_483f833d74c3e990ce8d],
 Param(parent='RandomForestRegressor_483f833d74c3e990ce8d', name='featureSubsetStrategy', doc='The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n].'): '0.5',
 Param(parent='RandomForestRegressor_483f833d74c3e990ce8d', name='numTrees', doc='Number of trees to train (>= 1).'): 20}
np.min(cvModel.avgMetrics)
30.81426642031701

The best model is a Random Forest regressor, using a subset strategy of 50% of the variables and 20 trees. It achieved an RMSE of 30.81.

Worst performer

cvModel.getEstimatorParamMaps()[ np.argmax(cvModel.avgMetrics) ]
{Param(parent='Pipeline_4d63958e14f5e691468e', name='stages', doc='a list of pipeline stages'): [RFormula_42c8b8393de389d8b760,
  RandomForestRegressor_483f833d74c3e990ce8d],
 Param(parent='RandomForestRegressor_483f833d74c3e990ce8d', name='featureSubsetStrategy', doc='The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n].'): 'sqrt',
 Param(parent='RandomForestRegressor_483f833d74c3e990ce8d', name='numTrees', doc='Number of trees to train (>= 1).'): 30}
np.max(cvModel.avgMetrics)
35.210848036877124

The worst performer was a Random Forest regressor with a subset strategy of using the square root of the number of variables and 30 trees.

We can also manually give the index to find out the parameters for a specific model. (Remember, the index corresponds to the avgMetrics array.

Best Linear Regression

# I use 9+ argmin because we know the first 9 results are from the Random Forest iterations
cvModel.getEstimatorParamMaps()[9 + np.argmin(cvModel.avgMetrics[8:])]
{Param(parent='LinearRegression_4183b499386e856ec022', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 1.0,
 Param(parent='LinearRegression_4183b499386e856ec022', name='regParam', doc='regularization parameter (>= 0).'): 0.01,
 Param(parent='Pipeline_4d63958e14f5e691468e', name='stages', doc='a list of pipeline stages'): [RFormula_42c8b8393de389d8b760,
  LinearRegression_4183b499386e856ec022]}

The best Linear Regression model has regularization parameter 0.01 and using L1 (also known as lasso) regularization. The variance in Linear Regression performance (measured as RMSE) was minimal, with all of them being between 33.2 and 33.73.

Finally, we can test our ‘best’ model from the training phase on our test sample, which is new, unseen data:

evaluator.evaluate(cvModel.transform(test_flights))
31.228861151885553
cvModel.transform(test_flights).limit(10).select("label","prediction").show()
+-----+------------------+
|label|        prediction|
+-----+------------------+
|-19.0| 28.88757275873146|
|-19.0| 2.462289130631162|
|-19.0|1.6668822768061389|
|-19.0|2.1210669714752477|
|-18.0|  1.24498749884627|
|-18.0| 2.462289130631162|
|-18.0|1.6836043331825006|
|-18.0|1.6991721935153794|
|-17.0|  1.24498749884627|
|-17.0| 2.462289130631162|
+-----+------------------+