Parallel model training with Prophet and Spark

Note : code/cluster optimization is not at all taken into account to focus solely on the functionality of parallel model training. Same for (hyper) parameter values.

https://pixabay.com/illustrations/prophet-man-beard-old-bible-god-1568108/

2. Background

You as a data engineer or a machine learning engineer are given a mission to create forecast with a time-series dataset. Your lovely data scientist already implemented basic set-up using Prophet in local environment. Things work as expected but the issue lies on scaling. What if we have gazillions of data points? Could we still run it on a single machine? Technically yes. But what if we have a SLA per single batch of forecast? Well, one of the options would be to use parallel/distributed processing like Apache Spark.

3. Requirements

As a first step of running Prophet on Spark, our initial requirements are as follows.

4. Tutorial

To share some real-world application, I’ll walk through Spark/Prophet flow using sample data set from World Health Organisation. The goal is to forecast daily new cases for US and Korea in near future.

4.1) Set-up Spark

In my case, I used AWS EMR to host a Spark cluster but feel free to use any other platform to your liking. Make sure you could get a SparkSession.

from pyspark.sql.session import SparkSession
spark = SparkSession \
.builder \
.appName("Prophet Forecast Project") \
.getOrCreate()

4.2) Read data and load it as a DataFrame

I uploaded a csv in S3 and loaded with the following command.

# put correct path
INPUT_PATH = f"s3://BUCKET_NAME/S3_KEY"
df_input = spark.read.load(INPUT_PATH, format="csv", header=True)

Check how it looks.

4.3) Preprocess data

In order to meet the requirement of Prophet, we need to create a dataframe with two columns: ds and y. Since our interested is focused on daily new cases for US and Korea, we need to take that into account as well.

from pyspark.sql import functions as f
cols = ["Date_reported", "Country_code", "New_Cases"]
is_kr_us = f.col("Country_code").isin(["KR", "US"])
df_input_kr_us = df_input.select(cols).filter(is_kr_us)
new_cols = ["ds", "country_code", "y"]
df_input_kr_us = df_input_kr_us.toDF(*new_cols)

Didn’t I just say Prophet only accepts two known columns? And one might wonder why we have extra column (i.e. country_code) here? I will get back to this soon. For now, just let it go :)

4.4) Read (hyper) parameter values

So far, we’ve only read input data file. At some point, we only need to have (hyper) parameter values to iterate over models with. My approach was to save it as a json and read it as a dataframe. The reason why all parameters are stored in an array is to let data scientists put any values they want to tune parameters with.

$ cat parameter.json
{
"growth": ["linear"],
"season_mode": ["additive"],
"changepoint_range": [0.5],
"m_season_name": ["monthly"],
"m_season_period": [28, 29, 30, 31],
"m_season_fourier_order": [5]
}

Let’s read this file as a dataframe.

# put correct values
PARAM_PATH = "s3://S3_BUCKET/S3_KEY"
df_param = spark.read.load(
PARAM_PATH, format="json", multiline=True)

4.5) Preprocess (hyper) parameter dataframe

The next step is to flatten array and create a row with a single value per column. We could simply call explode for this task.

for col in df_param.columns:
df_param = df_param.withColumn(col, f.explode(col))

At this point, we have a dataframe with input data and another one with all different combinations of parameters. The next step is to merge them into one.

4.6) Merge input and (hyper) parameter dataframe

Recall what we were trying to achieve. We created parameter.json to tune parameters need to train Prophet with. But df_input_kr_us and df_param don’t have any connection so far. They live in two different dataframes. That’s why we need this step. Just be mindful that this step might lead to huge overhead depending on your dataset size and the number of parameter values in parameter.json file.

df_input_param = df_input_kr_us.crossJoin(df_param)

Before we move on, we need to understand what just happened. If you take a deeper look, it’s apparent that each row in df_input_kr_us has appeared four times in df_input_param after cross-join. Now our data set coupled with (hyper) parameters are ready to be consumed by Prophet.

4.7) Prepare a Holiday dataframe

With best of my knowledge, I believe holiday/special events and (hyper) parameters are equally important when working with time-series data modelling. If you have some background knowledge on your data, you know that not every Wednesday are the same Wednesday.

There are multiple ways to get a list of holidays per country but for the brevity of this post, I’ll simply use built-in functionality. For your information, I’ve used holidays/holidays in my project.

4.8) Run cross validation

Now that all preparation is taken care of, we could finally do something serious. Take a deep breath and have a look at the following function.

from fbprophet import Prophet
from fbprophet.diagnostics import (
cross_validation, performance_metrics)

def run_cross_validation(keys, df):
country_code, changepoint_range, growth, \
m_season_fourier_order, m_season_name, \
m_season_period, season_mode = keys
model = Prophet(
growth=growth,
seasonality_mode=season_mode,
changepoint_range=changepoint_range,
daily_seasonality=False,
weekly_seasonality=False,
yearly_seasonality=False)
model \
.add_seasonality(
name=m_season_name,
period=m_season_period,
fourier_order=m_season_fourier_order) \
.add_country_holidays(country_name=country_code)
model.fit(df) df_cv = cross_validation(
model,
initial="180 days",
period="30 days",
cutoffs=None,
horizon="45 days",
parallel="processes")
df_perf_metrics = performance_metrics(
df_cv,
metrics=["mape"],
rolling_window=0.1)
series = df_perf_metrics.agg({"mape": "sum"})
df_perf_metrics_mape = pd.DataFrame(
data=series,
columns=["mape"])
mapping = {
"country_code": country_code,
"growth": growth,
"changepoint_range": changepoint_range,
"season_mode": season_mode,
"m_season_name": m_season_name,
"m_season_fourier_order": m_season_fourier_order,
"m_season_period": m_season_period}
df_perf_metrics_mape= df_perf_metrics_mape.assign(**mapping) cols = [
"country_code", "mape", "growth", "season_mode",
"changepoint_range", "m_season_name",
"m_season_period", "m_season_fourier_order"]
return df_perf_metrics_mape[cols]

Daunting? Before I go through line by line, let me show you how we use this function. We first define the schema of output dataframe from the function we are going to parallelize.

from pyspark.sql.types import (
ArrayType, IntegerType, FloatType, DateType,
StringType, StructField, StructType)

cross_validation = StructType([
StructField("country_code", StringType()),
StructField("mape", FloatType()),
StructField("growth", StringType()),
StructField("season_mode", StringType()),
StructField("changepoint_range", FloatType()),
StructField("m_season_name", StringType()),
StructField("m_season_period", IntegerType()),
StructField("m_season_fourier_order", IntegerType())])

Then, we map those two together with applyInPandas API.

group_by_col = ["country_code", "changepoint_range", "growth",
"m_season_fourier_order", m_season_name",
"m_season_period", "season_mode"]
df_cross_validation = df_input_param \
.groupBy(group_by_col) \
.applyInPandas(
func=run_cross_validation,
schema=cross_validation_schema)

You can read the official document for the details of applyInPandas. In a nutshell, we need a GroupedData object before calling applyInPandas. Then, pre-defined UDF (e.g. run_cross_validation) that is passed as an argument gets run per group in parallel. When done running UDF for all groups, they are returned as a single dataframe (e.g. df_cross_validation).

Now let’s go into the function and have a look at what’s happening inside.

def run_cross_validation(keys, df):    country_code, changepoint_range, growth, \
m_season_fourier_order, m_season_name, \
m_season_period, season_mode = keys

First of all, UDF passed to applyInPandas only accepts two parameters: keys and df. The former is a list of string. But I didn’t pass any keys explicitly when calling the function above. Well, the dataframe df_input_param is grouped by group_by_col. I will leave it up to you to define whether this is explicit enough. In other words, keys inside run_cross_validation is identical to the following.

keys = ["country_code", "changepoint_range", "growth",
"m_season_fourier_order", m_season_name",
"m_season_period", "season_mode"]

So after unpacking, we are able to assign values to each variable.

def run_cross_validation(keys, df):    model = Prophet(
growth=growth,
seasonality_mode=season_mode,
changepoint_range=changepoint_range,
daily_seasonality=False,
weekly_seasonality=False,
yearly_seasonality=False)
model \
.add_seasonality(
name=m_season_name,
period=m_season_period,
fourier_order=m_season_fourier_order) \
.add_country_holidays(country_name=country_code)
model.fit(df)

This is a trivial model building code. Once we are able to retrieve parameters, we just have to pass them correctly. Oh, here, we are accessing df that was passed as the second argument. Similar to keys, this time df_input_param is passed somewhat unbeknownst to you.

def run_cross_validation(keys, df):    df_cv = cross_validation(
model,
initial="180 days",
period="30 days",
horizon="45 days")
df_perf_metrics = performance_metrics(
df_cv,
metrics=["mape"])

Now that we have a model, we need to cross validate it using performance metrics. Here I used MAPE or Mean Absolute Percentage Error but feel free to use any other metrics: MSE, RMSE, MAE, MDAPE.

In case you are interested in how each intermediary dataframe looks like, they are as follows.

And df_perf_metrics. At a first glance, horizon column might not make sense. In human-readable format, the first three rows, for instance, represent 5 days, 6days and 7 days.

def run_cross_validation(keys, df):

series = df_perf_metrics.agg({"mape": "sum"})
df_perf_metrics_mape = pd.DataFrame(
data=series, columns=["mape"])

Here, I’m taking the sum of whole mape column. My intention was to get the set of parameters that produced smallest mape. To get that, I had to come up with a representative value. My choice was sum but you could definitely use other metrics like median or average.

def run_cross_validation(keys, df):

mapping = {
"country_code": country_code,
"growth": growth,
"changepoint_range": changepoint_range,
"season_mode": season_mode,
"m_season_name": m_season_name,
"m_season_fourier_order": m_season_fourier_order,
"m_season_period": m_season_period}
df_perf_metrics_mape= df_perf_metrics_mape.assign(**mapping) cols = [
"country_code", "mape", "growth", "season_mode",
"changepoint_range", "m_season_name",
"m_season_period", "m_season_fourier_order"]
return df_perf_metrics_mape[cols]

The last couple blocks are a simple Pandas wrangling to make the final output match the expected shape. In case you wonder why this is needed, scroll up and check df_perf_metrics. If that is the return value of this run_cross_validation function, how do we know which country_code this MAPE is for and which parameters were used to calculate MAPE with. In the end, here’s how df_cross_validation looks like when returned from the function.

So how should we interpret this? Take KR for example. It has four rows with different mape and parameters. Our task is to select smallest mape and parameters used.

4.9) Select best-peforming parameters that produced the smallest mape per country_code

df_cv_min_mape = df_cv.groupBy(["country_code"]) \ 
.agg(f.min("mape").alias("mape"))

First of all, let’s select the row with the smallest mape. These are 4th and 7th row of df_cross_validation above respectively.

Now that we know which row to use, we need to pick up parameters used. Since mape is no longer needed and should be removed for downstream function, it’s dropped.

df_best_param = df_cv.join(
df_cv_min_mape, on=["country_code", "mape"], how="inner")
df_best_param = df_best_param.drop("mape")

It’s a coincidence that all the parameters for both countries are the same. But that’s not guaranteed always. Since the smallest mape is calculated per country_code, it’s probable that US and KR end up using different parameters. Anyway, now let’s create a dataframe only with best-performing parameters.

df_input_best_param = df_input_kr_us \
.join(df_best_param, on=["country_code"], how="left")

4.10) Create a forecast with best-performing parameters

Ready to press the button? Good job for following this far. Now that hards works are all sorted out, let’s enjoy magical moment.

def create_forecast(keys, df):    country_code, changepoint_range, growth, \
m_season_fourier_order, m_season_name, m_season_period, \
season_mode = keys

model = Prophet(
growth=growth,
seasonality_mode=season_mode,
changepoint_range=changepoint_range,
daily_seasonality=False,
weekly_seasonality=False,
yearly_seasonality=False
)
model \
.add_seasonality(
name=m_season_name,
period=m_season_period,
fourier_order=m_season_fourier_order) \
.add_country_holidays(country_name=country_code)
model.fit(df) df_future = model.make_future_dataframe(
periods=90,
freq="d",
include_history=True
)
df_forecast_tmp = model.predict(df_future)

df_forecast = (df_forecast_tmp.set_index("ds")).join(
other=df.set_index("ds"), how="left")
df_forecast = df_forecast[["country_code", "y", "yhat"]].reset_index(level=0)
mapping = {
"country_code": country_code,
"changepoint_range": changepoint_range,
"growth": growth,
"season_mode": season_mode,
"m_season_fourier_order": m_season_fourier_order,
"m_season_period": m_season_period,
}
df_forecast = df_forecast.assign(**mapping)
df_forecast_new = df_forecast.rename(
columns={"ds": "date", "yhat": "forecast"})
df_forecast_new["forecast"] = df_forecast_new["forecast"] \
.astype("int64")
cols = [
"date", "country_code", "forecast", "growth",
"season_mode", "changepoint_range",
"m_season_period", "m_season_fourier_order"]
return df_forecast_new[cols]

I won’t go into details here since they are pretty much identical to run_cross_validation function except some Pandas operations. The interface between Pandas UDF and Spark is ApplyInPandas.

forecast_schema = StructType([
StructField("date", DateType()),
StructField("country_code", StringType()),
StructField("forecast", IntegerType()),
StructField("growth", StringType()),
StructField("season_mode", StringType()),
StructField("changepoint_range", FloatType()),
StructField("m_season_period", IntegerType()),
StructField("m_season_fourier_order", IntegerType())
])
df_forecast = df_input_best_param \
.groupBy(group_by_col) \
.applyInPandas(create_forecast, schema=forecast_schema)

4.11) Add a column for input data

Now that we have some forecast, we could serve this data to end users. But for clarity, I recommend adding a data source column for final touch so that we know which parameters and which data are used for each forecast. This is definitely optional and may not be needed if used with model registry like MLflow.

# put meaningful reference to your train data
# e.g. s3 key
TRAIN_DATA = ""
df_forecast = df_forecast \
.withColumn("train_data", f.lit(INPUT_PATH))

5. Conclusion

That was a long journey. Congratulations if you explored till here. To briefly wrap up what was covered in this post, I set up a Spark cluster to train Prophet models in parallel. In the process of model training, I had to carve input/parameter dataframe such that it is in a form that Prophet expects. At the same time, I had to write two UDFs for cross validation and forecasting. That’s how I took advantage of parallelisation.

Plus, you proably don’t want to run line by line in production. I packaged above codes into small Python modules (e.g. utils.py, schema.py) and then passed compressed zip file with py-files option with spark-submit. But again, if you are a decent Spark user, I’m confident that’s a piece of cake.

If you came back to this post to complain that this code doesn’ t work in your environment, well that’s very likely to happen. First of all, you might run into memory issue when cross-joining two dataframes. This post may be error-prone but at the same time, I believe you may find some clues with regards to using Spark with Prophet.

Any feedback, advice, correction would be appreciated.

6. Source

Data Engineer, higee.io