Overview
In part two of this series, we go over the necessary steps to take a time-series model into production by integrating the PyMC ModelBuilder class with Kedro.
Introduction
Picking up where we left off in part one, we are going to focus on putting together the necessary machinery to take a PyMC model to production. Specifically, we are going to walkthrough the following:
- Create a
Kedro
ML pipeline - Build a baseline time-series model
- Configure the
ModelBuilder
class - Define custom
Kedro
datasets - Register custom
Kedro
pipelines/sub-pipelines - Execute the pipeline
If you want to learn about the ModelBuilder
class check out the full class details here.
Getting Started
To start we need to install some additional dependencies into our environment that we created in the previous post. PyMC
is easier to install using conda
because there are some non-Python dependencies like a C++
compiler that conda
is able to resolve for you. So let’s switch from using pip
for our project package manager to using conda
by redefining our requirements.txt
file into an environment.yml
file.
# ./environment.yml
name: my_environment_name
channels:
- conda-forge
- defaults
dependencies:
- 'pymc=5.20.1'
- 'graphviz=12.2.1'
- 'numpyro=0.17.0'
- pip:
- ipython>=8.10
- jupyterlab>=3.0
- kedro-datasets>=3.0
- kedro-viz>=6.7.0
- kedro[jupyter]~=0.19.11
- notebook
- pre-commit~=3.5
- polars~=1.23.0
- pyarrow~=18.1.0
- plotly~=5.24.0
- openpyxl~=3.1.0
- kedro-docker~=0.6.2
- pymc-extras==0.2.3 # Added new dependency
Notice that we are installing PyMC
a probabilistic programming language that will allow us to define our Bayesian time-series model, numpyro
which provides a Jax
backend for sampling speed-ups compared to the native sampler, and pymc-extras
which contains the ModelBuilder
class that we will use as a scaffolding around our model.
You can create this environment using conda
at the command-line:
conda env create -f environment.yml
Creating an ML Pipeline
We need to create a new Kedro pipeline that will contain all of our code related to machine learning.
kedro pipeline create ML
Great! Now similary to how we defined our nodes in our data processing pipeline we will do the same for our ML pipeline. We are going to need to create nodes that do the following:
- split our training data into train and test portions
- train the model
- generate forecasts
- evaluate forecasts
Defining Pipeline Nodes
Train-Test Split
Let’s define how we are going split our dataset so that we can evaluate our model’s performance later. If you recall, we are dealing with time-series data across twelve Brazilian cities. Our interest is in forecasting the average monthly temperatures of these cities. Looking at the data you will notice that different cities started collecting data at different time points but the last measurement of October 2019 is shared by all cities (even if the measurement is missing). So, let’s hold out one year of data (12 measurements) for each city to be our test set.
# /src/climate_brazil/pipelines/ML/nodes.py
def train_test_split(data: pl.DataFrame, testing_window: int) -> tuple[pl.DataFrame, pl.DataFrame]:
"""
Splits time-series dataset into test and train splits
---
Params:
data: The time-series dataset
testing_window: The size of the testing set
"""
= (
testing_dataset
data"city", "date")
.sort("city")
.group_by(
.agg(all().tail(testing_window)
pl.
)"temperature", "date")
.explode("city", "date")
.sort(
)
= (
training_dataset
data"city", "date")
.sort("city")
.group_by(
.agg(all().slice(0, pl.len() - testing_window)
pl.
)"temperature", "date")
.explode("city", "date")
.sort(
)
return training_dataset, testing_dataset
Our train_test_split()
function takes a parameter defining the testing set window size. This is a configuration that you may want to change in the future. Kedro generates configuration files for every pipeline that you created and that is where we will put all of our configuration parameters. Let’s define our testing_window parameter now.
#/conf/base/parameters_ML.yml
testing_window: 12
Bayesian Structural Time Series
We are going to build a simple Bayesian structural time-series model to forecast the monthly average temperatures. This series is focused on engineering so I won’t delve too deeply into the mathematics behind the model. Briefly, our model is going to have two components, a trend component that we are going to estimate using a one-dimensional Gaussian process and a seasonality component that we are going to estimate as deterministic seasonality using fourier features.
Defining The Model
We will represent our model as the following:
\[ temperature \sim N(\mu, \sigma) \] \[ \mu = f(x) + \beta_{fourier}X_{fourier\_terms} \] \[ f(x) \sim GP(0, K(x, x'; \eta, \ell)) \] \[ \beta_{fourier} \sim N(0, 0.5) \] \[ \eta \sim HalfNormal(1.0) \] \[ \ell \sim Gamma(48, 2) \] \[ \sigma \sim HalfNormal(0.2) \]
Where our Gaussian Process covariance matrix \(K\) uses the Exponentiated Quadratic
kernel. \[ K(x, x') = exp[-\frac{(x - x')^{2}}{2\ell^{2}}] \]
In PyMC
this would look like this:
with pm.Model() as model:
"obs_id", train_time_range)
model.add_coord(
model.add_coord("fourier_features",
len(train_fourier_series.to_numpy().T)),
np.arange(
)
= pm.Data("time_range", train_time_range, dims="obs_id")
t = pm.Data(
fourier_terms "fourier_terms", train_fourier_series.to_numpy().T
)
= pm.HalfNormal("error", 0.2)
error
# Trend component
= pm.HalfNormal("amplitude_trend", 1.0)
amplitude_trend = pm.Gamma("ls_trend", alpha=48, beta=2)
ls_trend = amplitude_trend * pm.gp.cov.ExpQuad(1, ls_trend)
cov_trend
= pm.gp.HSGP(
gp_trend =[10],
m=5.,
c=cov_trend
cov_func
)= gp_trend.prior("trend", X=t[:, None], dims="obs_id")
trend
# Seasonality components
= pm.Normal(
beta_fourier "beta_fourier", mu=0, sigma=0.5, dims="fourier_features"
)= pm.Deterministic(
seasonality "seasonal", pm.math.dot(beta_fourier, fourier_terms), dims="obs_id"
)
# Combine components
= trend + seasonality
mu
pm.Normal("temperature",
=mu,
mu=error,
sigma=temperature_normalized,
observed="obs_id",
dims )
At this point we could pass in the expected data (time_range, fourier_terms, and temperature_normalized) for a city and train the time-series. However, there would be less back-tracking if we go ahead and define our model along with the necessary processing of the inputs, the method for generating forecasts, and the methods for saving and loading our trained model.
Configuring ModelBuilder
The ModelBuilder
class from the pymc-extras
package gives us a nice scaffolding that facilitates taking PyMC
models into production. There are seven required methods that we need to define inside of our model class that inherits from ModelBuilder
. However, it is often the case that we need to override some default methods in accordance to our modeling needs. Let’s go over the required methods first.
We need to implement _generate_and_preprocess_model_data()
which is a method that performs our final processing of the data before it is fed into the model. In our case, we normalize the data and define our fourier features to capture yearly (12 months) seasonality.
# ModelBuilder Required Method Implementation
def _generate_and_preprocess_model_data(self, X: pl.DataFrame, y: pl.Series, normalize: bool = False):
"""
Last mile data processing of inputs expected by the model
---
Params:
X: The matrix of predictor variables expected by our model
y: The target variable
normalize: Whether to Z normalize the variables
"""
self.train_time_range = np.arange(X.shape[0])
self.n_modes = 10
= np.array(self.train_time_range) / (12)
periods self.train_fourier_series = pl.DataFrame(
{f"{func}_order_{order}": getattr(np, func)(
2 * np.pi * periods * order
)for order in range(1, self.n_modes + 1)
for func in ("sin", "cos")
}
)if normalize:
self.y_mean = np.nanmean(y)
self.y_std = np.nanstd(y)
self.y_normalized = (y - self.y_mean) / self.y_std
else:
self.y_normalized = y
Since we are only including endogenous variables in our model the normalization only applies to the target variable
Next we need to define how we are going to feed new data to our model when we are ready to make forecasts. Since we are building a time-series model with no exogenous predictor variables, we only need to define how far out into the future we want to generate forecasts.
# ModelBuilder Required Method Implementation
def _data_setter(self, n_ahead: int):
"""
Generates required data for producing forecasts
---
Params:
n_ahead: How many periods (months) to forecast future temperatures
"""
self.start = self.train_time_range[-1]
self.end = self.start + n_ahead
= np.arange(self.start, self.end, 1) / (12)
new_periods self.test_fourier_series = pl.DataFrame(
{f"{func}_order_{order}": getattr(np, func)(
2 * np.pi * new_periods * order
)for order in range(1, self.n_modes + 1)
for func in ("sin", "cos")
} )
Certainly we will need to define how to build our model by using the build_model()
method. You’ll notice that we call our previously defined method _generate_and_preprocess_model_data()
here.
# ModelBuilder Required Method Implementation
def build_model(self, X: pl.DataFrame, y: pl.Series, normalize_target: bool = False, **kwargs):
"""
Defines the PyMC model structure
---
Params:
X: Dataframe of features
y: Array of target values
"""
self._generate_and_preprocess_model_data(X, y, normalize=normalize_target)
with pm.Model() as self.model:
self.model.add_coord("obs_id", self.train_time_range)
self.model.add_coord(
"fourier_features",
len(self.train_fourier_series.to_numpy().T)),
np.arange(
)
= pm.Data("time_range", self.train_time_range, dims="obs_id")
t = pm.Data(
fourier_terms "fourier_terms", self.train_fourier_series.to_numpy().T
)
= pm.HalfNormal("error", self.model_config['error'])
error
# Trend component
= pm.HalfNormal("amplitude_trend", self.model_config['amplitude_trend'])
amplitude_trend = pm.Gamma("ls_trend", alpha=self.model_config['ls_trend']['alpha'], beta=self.model_config['ls_trend']['beta'])
ls_trend = amplitude_trend * pm.gp.cov.ExpQuad(1, ls_trend)
cov_trend
= pm.gp.HSGP(
gp_trend =[10],
m=5.,
c=cov_trend
cov_func
)= gp_trend.prior("trend", X=t[:, None], dims="obs_id")
trend
# Seasonality components
= pm.Normal(
beta_fourier "beta_fourier", mu=self.model_config['beta_fourier']['mu'], sigma=self.model_config['beta_fourier']['sigma'], dims="fourier_features"
)= pm.Deterministic(
seasonality "seasonal", pm.math.dot(beta_fourier, fourier_terms), dims="obs_id"
)
# Combine components
= trend + seasonality
mu
pm.Normal("temperature_normalized",
=mu,
mu=error,
sigma=self.y_normalized,
observed="obs_id",
dims )
Finally, the last three required methods are all methods that define configurations for our model. As we mentioned earlier, all configurations should be handled by Kedro
in our projects’ conf/
directory. So we are going to return empty dicts as our default configurations.
# ModelBuilder Required Methods Implementations
@staticmethod
def get_default_model_config() -> dict:
= {}
model_config return model_config
@staticmethod
def get_default_sampler_config() -> dict:
= {}
sampler_configreturn sampler_config
@property
def output_var(self):
"""
This is basically the name of the target variable. This is used when loading in a saved model.
"""
return "temperature"
@property
@abstractmethod
def _serializable_model_config(self) -> dict[str, int | float | dict]:
"""
Only needs to be implemented if your model configurations contain unserializable objects.
You need to convert the non-serializable objects to serializable ones here.
"""
return self.model_config
Let’s go ahead and define our model and sampler configurations in the parameters_ML.yml
file.
#/conf/base/parameters_ML.yml
testing_window: 12
model_config:
error: 0.2
amplitude_trend: 1.0
ls_trend:
alpha: 48
beta: 2
beta_fourier:
mu: 0
sigma: 0.5
sampler_config:
draws: 1000
tune: 1000
chains: 4
target_accept: 0.95
Our model configurations contain the priors that we defined for our model above. Whereas, the sampler configurations define parameters related to our MCMC sampler.
With the required ModelBuilder
methods out of the way, let’s talk about some of the methods that we will override to accomodate our specific use case.
First, since we are using a time-series model our predictions are generated differently than how you’d generate predictions from a typical regression or classification task. So we need to make changes to the sample_posterior_predictive()
method to accomdate a time-series model. We take the approach of defining a new model specifically for forecasting purposes. Notice that we are calling _data_setter()
to generate our future model input, which is our forecast horizon.
# ModelBuilder Override method
def sample_posterior_predictive(self, n_ahead: int, extend_idata: bool, combined: bool, **kwargs):
self._data_setter(n_ahead)
with pm.Model() as self.model:
self.model.add_coord("obs_id", self.train_time_range)
self.model.add_coord(
"fourier_features",
len(self.train_fourier_series.to_numpy().T)),
np.arange(
)
= pm.Data("time_range", self.train_time_range, dims="obs_id")
t = pm.Data(
fourier_terms "fourier_terms", self.train_fourier_series.to_numpy().T
)
= pm.HalfNormal("error", self.model_config['error'])
error
# Trend component
= pm.HalfNormal("amplitude_trend", self.model_config['amplitude_trend'])
amplitude_trend = pm.Gamma("ls_trend", alpha=self.model_config['ls_trend']['alpha'], beta=self.model_config['ls_trend']['beta'])
ls_trend = amplitude_trend * pm.gp.cov.ExpQuad(1, ls_trend)
cov_trend
= pm.gp.HSGP(
gp_trend =[10],
m=5.,
c=cov_trend
cov_func
)= gp_trend.prior("trend", X=t[:, None], dims="obs_id")
trend
# Seasonality components
= pm.Normal(
beta_fourier "beta_fourier", mu=self.model_config['beta_fourier']['mu'], sigma=self.model_config['beta_fourier']['sigma'], dims="fourier_features"
)= pm.Deterministic(
seasonality "seasonal", pm.math.dot(beta_fourier, fourier_terms), dims="obs_id"
)
# Combine components
= trend + seasonality
mu
pm.Normal("temperature_normalized",
=mu,
mu=error,
sigma=self.y_normalized,
observed="obs_id",
dims
)
self.model.add_coords({"obs_id_fut": np.arange(self.start, self.end, 1)})
= pm.Data("time_range_fut", np.arange(self.start, self.end, 1))
t_fut = pm.Data("fourier_terms_fut", self.test_fourier_series.to_numpy().T)
fourier_terms_fut
# Trend future component
= gp_trend.conditional("trend_fut", Xnew=t_fut[:, None], dims="obs_id_fut")
trend_fut
# Seasonality components
= pm.Deterministic(
seasonality_fut "seasonal_fut", pm.math.dot(beta_fourier, fourier_terms_fut), dims="obs_id_fut"
)
= trend_fut + seasonality_fut
mu_fut
pm.Normal("temperature_normalized_fut",
=mu_fut,
mu=error,
sigma="obs_id_fut",
dims
)
with self.model: # sample with new input data
= pm.sample_posterior_predictive(self.idata, var_names=["temperature_normalized_fut"], predictions=True, **kwargs)
post_pred if extend_idata:
self.idata.extend(post_pred, join="right")
= az.extract(
posterior_predictive_samples "predictions", combined=combined
post_pred,
)
return posterior_predictive_samples
We also need to update the fit()
method for a couple of reasons. The first is because we implemented a normalize_target
method and we need to propogate that into the fit()
method which calls our build_model()
method. Second, because the defualt fit()
method defines additional sampler configurations that we don’t want because we have already centralized all of our configurations to be handled by Kedro
.
# ModelBuilder Override method
def fit(self, X: pl.DataFrame, y: pl.Series, normalize_target: bool = False) -> az.InferenceData:
"""
Fits the model to the provided dataset
---
Params:
X: The dataset container predictor variables
y: The target variable
normalize_target: Whether to Z normalize the target variable
"""
self.build_model(X, y, normalize_target=normalize_target)
self.idata = self.sample_model(**self.sampler_config)
= X.to_pandas()
X_df = pd.concat([X_df, y.to_pandas()], axis=1)
combined_data assert all(combined_data.columns), "All columns must have non-empty names"
with warnings.catch_warnings():
warnings.filterwarnings("ignore",
=UserWarning,
category="The group fit_data is not defined in the InferenceData scheme",
message
)self.idata.add_groups(fit_data=combined_data.to_xarray())
return self.idata
Lastly (I promise), we need to define a new method that will handle our model evaluations. We are going to use the root mean squared error as a metric to evaluate our model. We will also measure the coverage of our 80% Highest Density Interval (HDI). Additionally, it is a good idea to include in our model serialization the city the model is fitted to, and if we are standardizing the data then we should also include the mean and standard deviation of our target temperature values.
# ModelBuilder override Init
def __init__(
self,
dict | None = None,
model_config: dict | None = None,
sampler_config: str = None
city:
):"""
Initializes model configuration and sampler configuration for the model
Parameters
----------
model_config : Dictionary, optional
dictionary of parameters that initialise model configuration. Class-default defined by the user default_model_config method.
sampler_config : Dictionary, optional
dictionary of parameters that initialise sampler configuration. Class-default defined by the user default_sampler_config method.
city: The Brazilian city we are modeling monthly average temperatures of
"""
= (
sampler_config self.get_default_sampler_config() if sampler_config is None else sampler_config
)self.sampler_config = sampler_config
= self.get_default_model_config() if model_config is None else model_config
model_config
self.model_config = model_config
self.model = None
self.idata: az.InferenceData | None = None
self.is_fitted_ = False
self.city = city # This is our Addition
# ModelBuilder new method
def evaluate(self, y_true: pl.Series, forecasts: xr.Dataset, back_transform: bool = False) -> dict:
"""
Evaluate our forecasts posterior predictive mean using the root mean squared error (RMSE) as the metric and evaluate our highest density interval's (HDI)s coverage
---
Params:
y_true: The ground truth temperatures
forecasts: The forecasts
back_transform: Whether we need to transform our forecasts back to the original scale
"""
if back_transform:
try:
= self.y_mean
y_mean = self.y_std
y_std except AttributeError:
= self.idata.attrs['y_mean']
y_mean = self.idata.attrs['y_std']
y_std = forecasts[f'{self.output_var}_normalized_fut'].mean(("chain", "draw")).values * y_std + y_mean
posterior_predictive_mean = az.hdi(forecasts[f'{self.output_var}_normalized_fut'], hdi_prob=0.8) * y_std + y_mean
hdi else:
= forecasts[f'{self.output_var}_normalized_fut'].mean(("chain", "draw")).values
posterior_predictive_mean = az.hdi(forecasts[f'{self.output_var}_normalized_fut'], hdi_prob=0.8)
hdi
= y_true.to_numpy() - posterior_predictive_mean
error = np.sqrt(
RMSE
np.nanmean(
np.square(error)
)
)
= pl.DataFrame(
coverage_df
{"hdi_lower": hdi[f'{self.output_var}_normalized_fut'][:, 0].values,
"hdi_upper": hdi[f'{self.output_var}_normalized_fut'][:, 1].values,
"y_true": y_true
}
)
= (
COVERAGE
coverage_dffilter(
."y_true").is_not_null()
pl.col(
)
.with_columns(
pl.when("y_true") <= pl.col("hdi_upper")) &
(pl.col("y_true") >= pl.col("hdi_lower"))
(pl.col(
)1.)
.then(0.)
.otherwise("coverage")
.alias(
)"coverage").mean()).item()
.select(pl.col(
)
return {"RMSE": RMSE, "HDI_COVERAGE": COVERAGE}
def _save_input_params(self, idata) -> None:
"""
Saves any additional model parameters (other than the dataset) to the idata object.
"""
"city"] = self.city
idata.attrs["y_mean"] = self.y_mean
idata.attrs["y_std"] = self.y_std idata.attrs[
Combining everything together we have our time-series model implemented as:
# /src/climate_brazil/pipelines/ML/ts_model.py
import warnings
from abc import abstractmethod
import arviz as az
import numpy as np
import pandas as pd
import polars as pl
import pymc as pm
import xarray as xr
from pymc_extras.model_builder import ModelBuilder
class TSModel(ModelBuilder):
= "TimeSeries"
_model_type = "0.1"
version
def __init__(
self,
dict | None = None,
model_config: dict | None = None,
sampler_config: str = None
city:
):"""
Initializes model configuration and sampler configuration for the model
Parameters
----------
model_config : Dictionary, optional
dictionary of parameters that initialise model configuration. Class-default defined by the user default_model_config method.
sampler_config : Dictionary, optional
dictionary of parameters that initialise sampler configuration. Class-default defined by the user default_sampler_config method.
city: The Brazilian city we are modeling monthly average temperatures of
"""
= (
sampler_config self.get_default_sampler_config() if sampler_config is None else sampler_config
)self.sampler_config = sampler_config
= self.get_default_model_config() if model_config is None else model_config
model_config
self.model_config = model_config # parameters for priors etc.
self.model = None # Set by build_model
self.idata: az.InferenceData | None = None # idata is generated during fitting
self.is_fitted_ = False
self.city = city
def _generate_and_preprocess_model_data(self, X: pl.DataFrame, y: pl.Series, normalize: bool = False):
"""
Last mile data processing of inputs expected by the model
---
Params:
X: The matrix of predictor variables expected by our model
y: The target variable
normalize: Whether to Z normalize the variables
"""
self.train_time_range = np.arange(X.shape[0])
self.n_modes = 10
= np.array(self.train_time_range) / (12)
periods self.train_fourier_series = pl.DataFrame(
{f"{func}_order_{order}": getattr(np, func)(
2 * np.pi * periods * order
)for order in range(1, self.n_modes + 1)
for func in ("sin", "cos")
}
)if normalize:
self.y_mean = np.nanmean(y)
self.y_std = np.nanstd(y)
self.y_normalized = (y - self.y_mean) / self.y_std
else:
self.y_normalized = y
def _data_setter(self, n_ahead: int):
"""
Generates required data for producing forecasts
---
Params:
n_ahead: How many periods (months) to forecast future temperatures
"""
self.start = self.train_time_range[-1]
self.end = self.start + n_ahead
= np.arange(self.start, self.end, 1) / (12)
new_periods self.test_fourier_series = pl.DataFrame(
{f"{func}_order_{order}": getattr(np, func)(
2 * np.pi * new_periods * order
)for order in range(1, self.n_modes + 1)
for func in ("sin", "cos")
}
)
def build_model(self, X: pl.DataFrame, y: pl.Series, normalize_target: bool = False, **kwargs):
"""
Defines the PyMC model structure
---
Params:
X: Dataframe of features
y: Array of target values
"""
self._generate_and_preprocess_model_data(X, y, normalize=normalize_target)
with pm.Model() as self.model:
self.model.add_coord("obs_id", self.train_time_range)
self.model.add_coord(
"fourier_features",
len(self.train_fourier_series.to_numpy().T)),
np.arange(
)
= pm.Data("time_range", self.train_time_range, dims="obs_id")
t = pm.Data(
fourier_terms "fourier_terms", self.train_fourier_series.to_numpy().T
)
= pm.HalfNormal("error", self.model_config['error'])
error
# Trend component
= pm.HalfNormal("amplitude_trend", self.model_config['amplitude_trend'])
amplitude_trend = pm.Gamma("ls_trend", alpha=self.model_config['ls_trend']['alpha'], beta=self.model_config['ls_trend']['beta'])
ls_trend = amplitude_trend * pm.gp.cov.ExpQuad(1, ls_trend)
cov_trend
= pm.gp.HSGP(
gp_trend =[10],
m=5.,
c=cov_trend
cov_func
)= gp_trend.prior("trend", X=t[:, None], dims="obs_id")
trend
# Seasonality components
= pm.Normal(
beta_fourier "beta_fourier", mu=self.model_config['beta_fourier']['mu'], sigma=self.model_config['beta_fourier']['sigma'], dims="fourier_features"
)= pm.Deterministic(
seasonality "seasonal", pm.math.dot(beta_fourier, fourier_terms), dims="obs_id"
)
# Combine components
= trend + seasonality
mu
pm.Normal("temperature_normalized",
=mu,
mu=error,
sigma=self.y_normalized,
observed="obs_id",
dims
)
def sample_posterior_predictive(self, n_ahead: int, extend_idata: bool, combined: bool, **kwargs):
self._data_setter(n_ahead)
with pm.Model() as self.model:
self.model.add_coord("obs_id", self.train_time_range)
self.model.add_coord(
"fourier_features",
len(self.train_fourier_series.to_numpy().T)),
np.arange(
)
= pm.Data("time_range", self.train_time_range, dims="obs_id")
t = pm.Data(
fourier_terms "fourier_terms", self.train_fourier_series.to_numpy().T
)
= pm.HalfNormal("error", self.model_config['error'])
error
# Trend component
= pm.HalfNormal("amplitude_trend", self.model_config['amplitude_trend'])
amplitude_trend = pm.Gamma("ls_trend", alpha=self.model_config['ls_trend']['alpha'], beta=self.model_config['ls_trend']['beta'])
ls_trend = amplitude_trend * pm.gp.cov.ExpQuad(1, ls_trend)
cov_trend
= pm.gp.HSGP(
gp_trend =[10],
m=5.,
c=cov_trend
cov_func
)= gp_trend.prior("trend", X=t[:, None], dims="obs_id")
trend
# Seasonality components
= pm.Normal(
beta_fourier "beta_fourier", mu=self.model_config['beta_fourier']['mu'], sigma=self.model_config['beta_fourier']['sigma'], dims="fourier_features"
)= pm.Deterministic(
seasonality "seasonal", pm.math.dot(beta_fourier, fourier_terms), dims="obs_id"
)
# Combine components
= trend + seasonality
mu
pm.Normal("temperature_normalized",
=mu,
mu=error,
sigma=self.y_normalized,
observed="obs_id",
dims
)
self.model.add_coords({"obs_id_fut": np.arange(self.start, self.end, 1)})
= pm.Data("time_range_fut", np.arange(self.start, self.end, 1))
t_fut = pm.Data("fourier_terms_fut", self.test_fourier_series.to_numpy().T)
fourier_terms_fut
# Trend future component
= gp_trend.conditional("trend_fut", Xnew=t_fut[:, None], dims="obs_id_fut")
trend_fut
# Seasonality components
= pm.Deterministic(
seasonality_fut "seasonal_fut", pm.math.dot(beta_fourier, fourier_terms_fut), dims="obs_id_fut"
)
= trend_fut + seasonality_fut
mu_fut
pm.Normal("temperature_normalized_fut",
=mu_fut,
mu=error,
sigma="obs_id_fut",
dims
)
with self.model: # sample with new input data
= pm.sample_posterior_predictive(self.idata, var_names=[f"{self.output_var}_normalized_fut"], predictions=True, **kwargs)
post_pred if extend_idata:
self.idata.extend(post_pred, join="right")
= az.extract(
posterior_predictive_samples "predictions", combined=combined
post_pred,
)
return posterior_predictive_samples
def fit(self, X: pl.DataFrame, y: pl.Series, normalize_target: bool = False) -> az.InferenceData:
"""
Fits the model to the provided dataset
---
Params:
X: The dataset container predictor variables
y: The target variable
normalize_target: Whether to Z normalize the target variable
"""
self.build_model(X, y, normalize_target=normalize_target)
self.idata = self.sample_model(**self.sampler_config)
= X.to_pandas()
X_df = pd.concat([X_df, y.to_pandas()], axis=1)
combined_data assert all(combined_data.columns), "All columns must have non-empty names"
with warnings.catch_warnings():
warnings.filterwarnings("ignore",
=UserWarning,
category="The group fit_data is not defined in the InferenceData scheme",
message
)self.idata.add_groups(fit_data=combined_data.to_xarray()) # type: ignore
return self.idata # type: ignore
@staticmethod
def get_default_model_config() -> dict:
= {
model_config "error": 0.2,
"amplitude_trend": 1.0,
"ls_trend": {"alpha": 48, "beta": 2},
"beta_fourier": {"mu": 0, "sigma": 0.5},
}return model_config
@staticmethod
def get_default_sampler_config() -> dict:
"""
Returns a class default sampler dict for model builder if no sampler_config is provided on class initialization.
The sampler config dict is used to send parameters to the sampler .
It will be used during fitting in case the user doesn't provide any sampler_config of their own.
"""
= {
sampler_config"draws": 1_000,
"tune": 1_000,
"chains": 4,
"target_accept": 0.95,
}return sampler_config
@property
def output_var(self):
return "temperature"
@property
@abstractmethod
def _serializable_model_config(self) -> dict[str, int | float | dict]:
"""
Converts non-serializable values from model_config to their serializable reversable equivalent.
Data types like pandas DataFrame, Series or datetime aren't JSON serializable,
so in order to save the model they need to be formatted.
Returns
-------
model_config: dict
"""
return self.model_config
def evaluate(self, y_true: pl.Series, forecasts: xr.Dataset, back_transform: bool = False) -> dict:
"""
Evaluate our forecasts posterior predictive mean using the root mean squared error (RMSE) as the metric and evaluate our highest density interval's (HDI)s coverage
---
Params:
y_true: The ground truth temperatures
forecasts: The forecasts
back_transform: Whether we need to transform our forecasts back to the original scale
"""
if back_transform:
try:
= self.y_mean
y_mean = self.y_std
y_std except AttributeError:
= self.idata.attrs['y_mean']
y_mean = self.idata.attrs['y_std']
y_std = forecasts[f'{self.output_var}_normalized_fut'].mean(("chain", "draw")).values * y_std + y_mean
posterior_predictive_mean = az.hdi(forecasts[f'{self.output_var}_normalized_fut'], hdi_prob=0.8) * y_std + y_mean
hdi else:
= forecasts[f'{self.output_var}_normalized_fut'].mean(("chain", "draw")).values
posterior_predictive_mean = az.hdi(forecasts[f'{self.output_var}_normalized_fut'], hdi_prob=0.8)
hdi
= y_true.to_numpy() - posterior_predictive_mean
error = np.sqrt(
RMSE
np.nanmean(
np.square(error)
)
)
= pl.DataFrame(
coverage_df
{"hdi_lower": hdi[f'{self.output_var}_normalized_fut'][:, 0].values,
"hdi_upper": hdi[f'{self.output_var}_normalized_fut'][:, 1].values,
"y_true": y_true
}
)
= (
COVERAGE
coverage_dffilter(
."y_true").is_not_null()
pl.col(
)
.with_columns(
pl.when("y_true") <= pl.col("hdi_upper")) &
(pl.col("y_true") >= pl.col("hdi_lower"))
(pl.col(
)1.)
.then(0.)
.otherwise("coverage")
.alias(
)"coverage").mean()).item()
.select(pl.col(
)
return {"RMSE": RMSE, "HDI_COVERAGE": COVERAGE}
def _save_input_params(self, idata: az.InferenceData) -> None:
"""
Saves any additional model parameters (other than the dataset) to the idata object.
"""
"city"] = self.city
idata.attrs["y_mean"] = self.y_mean
idata.attrs["y_std"] = self.y_std idata.attrs[
Training Node
Now that we have our model configured with ModelBuilder
it is time to define our training node:
#/src/climate_brazil/pipelines/ML/nodes.py
def train(training_dataset: pl.DataFrame, model_config: dict, sampler_config: dict) -> dict:
"""
Train time-series models
---
Params:
training_dataset: Training data
model_config: Model Configurations
sampler_config: MCMC sampler configurations
"""
= {}
city_models for city in training_dataset['city'].sort().unique(maintain_order=True):
= training_dataset.filter(pl.col("city")==city).sort("date")
city_dataset = TSModel(model_config=model_config, sampler_config=sampler_config, city=city)
model =city_dataset['date'], y=city_dataset['temperature'], normalize_target=True)
model.fit(Xf"{city}_model"] = model
city_models[
return city_models
DO NOT use this training node in production! We are going to refactor this code in the last part of this series to scale more efficiently.
We need to be sure to define catalog entries for our training output otherwise Kedro
will designate our trained models as MemoryDataset
’s. However, there is no built-in dataset in Kedro
that knows how to store our model. So we need to define a custom Kedro
dataset to accomodate our needs.
Creating a Custom Kedro Dataset
Creating a custom dataset can be quite simple. All we need to do is inheret from Kedro
’s AbstractDataset
and define the init
, save
, load
, and describe
methods. In our case, the ModelBuilder
class has save
and load
methods that accept a filepath and we will use those methods in our custom dataset:
# /src/climate_brazil/datasets/pymc_model_dataset.py
from pathlib import Path, PurePosixPath
from typing import Any
from kedro.io import AbstractDataset
from kedro_framework.pipelines.ML.ts_model import TSModel
class PyMCModelDataset(AbstractDataset):
"""
``PyMCDataset`` loads / save PyMC models from a given filepath as a TSModel object.
"""
def __init__(self, filepath: str):
"""Creates a new instance of PyMCDataset to load / save PyMC models for a given filepath.
Args:
filepath: The location of the model netcdf file to load / save data.
"""
self._filepath = PurePosixPath(filepath)
self._path = Path(self._filepath.parent)
def load(self) -> TSModel:
"""Loads data from the netcdf file.
Returns:
loaded TSModel
"""
= TSModel.load(self._filepath)
model return model
def save(self, model: TSModel) -> None:
"""Saves PyMC model to the specified filepath."""
self._path.mkdir(parents=True, exist_ok=True)
self._filepath)
model.save(
def _describe(self) -> dict[str, Any]:
"""Returns a dict that describes the attributes of the dataset."""
return dict(filepath=self._filepath)
Train Catalog Entry
With our custom dataset defined let’s define entries for our training and testing data as well as for our trained models.
# conf/base/catalog.yml
# Raw data ---------------------------------------
cities_temperatures:
type: partitions.PartitionedDataset
path: data/01_raw/
dataset:
type: polars.LazyPolarsDataset
file_format: csv
# Processed data ---------------------------------------
cities_temperatures_processed:
type: polars.EagerPolarsDataset
file_format: parquet
filepath: data/02_intermediate/cities_temperatures_processed.pq
cities_ts_plot:
type: plotly.JSONDataset
filepath: data/08_reporting/cities_ts_plot.json
# Model inputs ------------------------------------------
training_dataset:
type: polars.EagerPolarsDataset
file_format: parquet
filepath: data/05_model_input/training_dataset.pq
testing_dataset:
type: polars.EagerPolarsDataset
file_format: parquet
filepath: data/05_model_input/testing_dataset.pq
# Models -----------------------------------------
Belem_model:
type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
filepath: data/06_models/Belem_model.nc
Curitiba_model:
type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
filepath: data/06_models/Curitiba_model.nc
Fortaleza_model:
type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
filepath: data/06_models/Fortaleza_model.nc
Goiania_model:
type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
filepath: data/06_models/Goiania_model.nc
Macapa_model:
type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
filepath: data/06_models/Macapa_model.nc
Manaus_model:
type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
filepath: data/06_models/Manaus_model.nc
Recife_model:
type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
filepath: data/06_models/Recife_model.nc
Rio_model:
type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
filepath: data/06_models/Rio_model.nc
Salvador_model:
type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
filepath: data/06_models/Salvador_model.nc
Sao_Luiz_model:
type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
filepath: data/06_models/Sao_Luiz_model.nc
Sao_Paulo_model:
type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
filepath: data/06_models/Sao_Paulo_model.nc
Vitoria_model:
type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
filepath: data/06_models/Vitoria_model.nc
For the sake of clarity, we added one entry for each city. You can use a Kedro
’s dataset factory feature to capture all city models with one entry. We went over an example of how to use a dataset factory in part one of this series.
Forecasting Node
Finally, we need to define how to generate our forecasts. We would like our forecasting node to have the ability to perform evaluations if we pass in the testing dataset as an argument and for the moment we just want to log out our evaluation metrics to stdout.
# /src/climate_brazil/datasets/pymc_forecast_dataset.py
def forecast(model: TSModel, n_ahead: int, ground_truth: Optional[pl.DataFrame] = None) -> xr.Dataset:
"""
Generates forecasts from trained time-series and produces evaluations if ground truth is passed in.
---
Params:
model: The trained time-series model
n_ahead: The forecast horizon
ground_truth: The actual values to be compared with the forecasts
"""
= model.sample_posterior_predictive(n_ahead=n_ahead, extend_idata=True, combined=False)
forecasts if ground_truth is not None:
= model.evaluate(y_true=ground_truth.filter(pl.col("city")==model.idata.attrs['city'])["temperature"], forecasts=forecasts, back_transform=True)
evaluations f"{model.idata.attrs['city']} Evaluations: {evaluations}")
logger.info(return forecasts
Notice that we need to pass in an n_ahead
parameter that defines the forecast horizon. Now is a good time to add that parameter into /conf/base/parameters_ML.yml
testing_window: 12
n_ahead: 12
model_config:
error: 0.2
amplitude_trend: 1.0
ls_trend:
alpha: 48
beta: 2
beta_fourier:
mu: 0
sigma: 0.5
sampler_config:
draws: 1000
tune: 1000
chains: 4
target_accept: 0.95
mp_ctx: spawn # might need this if you are running on MacOS
Our forecast output will be an xarray
dataset and we will go ahead and save that to disk in NETCDF
format. Once again, we will create a custom Kedro
dataset for that.
# /src/climate_brazil/datasets/pymc_forecast_dataset.py
from pathlib import Path, PurePosixPath
from typing import Any
import xarray as xr
from kedro.io import AbstractDataset
class PyMCForecastDataset(AbstractDataset):
"""
``PyMCForecastDataset`` loads / save PyMC forecasts from a given filepath as an xarray Dataset.
"""
def __init__(self, filepath: str):
"""Creates a new instance of PyMCForecastDataset to load / save PyMC forecasts for a given filepath.
Args:
filepath: The location of the forecast netcdf file to load / save.
"""
self._filepath = PurePosixPath(filepath)
self._path = Path(self._filepath.parent)
def load(self) -> xr.Dataset:
"""Loads data from the netcdf file.
Returns:
loaded forecasts
"""
return xr.load_dataset(self._filepath)
def save(self, forecast: xr.Dataset) -> None:
"""Saves PyMC forecasts to the specified filepath."""
self._path.mkdir(parents=True, exist_ok=True)
=self._filepath)
forecast.to_netcdf(path
def _describe(self) -> dict[str, Any]:
"""Returns a dict that describes the attributes of the dataset."""
return dict(filepath=self._filepath)
Defining The ML Pipeline
We are almost set! Our catalog is defined, our model is configured, and our nodes are implemented. Let’s put everything together and define our machine learning pipeline.
Train
Here we are going to introduce tags
, another Kedro
feature that allows you to run specific nodes by tag category. At this stage our pipeline will be the training split node followed by the training node.
# /src/climate_brazil/pipelines/ML/pipeline.py
from kedro.pipeline import node, Pipeline, pipeline # noqa
from .nodes import train_test_split, train, forecast
def create_pipeline(**kwargs) -> Pipeline:
= [
split_train_nodes
node(=train_test_split,
func=dict(
inputs= "cities_temperatures_processed",
data ="params:testing_window"
testing_window
),=["training_dataset", "testing_dataset"],
outputs="train_test_split",
name=["train"]
tags
),
node(=train,
func=dict(
inputs="training_dataset",
training_dataset="params:model_config",
model_config="params:sampler_config"
sampler_config
),=dict(
outputs= "Belem_model",
Belem_model = "Curitiba_model",
Curitiba_model = "Fortaleza_model",
Fortaleza_model = "Goiania_model",
Goiania_model = "Macapa_model",
Macapa_model = "Manaus_model",
Manaus_model = "Recife_model",
Recife_model = "Rio_model",
Rio_model = "Salvador_model",
Salvador_model = "Sao_Luiz_model",
Sao_Luiz_model = "Sao_Paulo_model",
Sao_Paulo_model = "Vitoria_model"
Vitoria_model
),="train",
name=["train"]
tags
) ]
Always make sure that your inputs and outputs match the keys you chose in catalog.yml
Forecast
This is where we are going to leverage the flexibility of tags
. We are going to create two sub-pipelines for our forecasts. The first sub-pipeline will be for training purposes, this is when we have ground-truth data to pass in for evaluations. The second will be when we are using our time-series model in production and we do not have the ground-truth available. Our full ML pipeline will look like this:
# /src/climate_brazil/pipelines/ML/pipeline.py
from kedro.pipeline import node, Pipeline, pipeline # noqa
from .nodes import train_test_split, train, forecast
= ["Belem", "Curitiba", "Fortaleza", "Goiania", "Macapa", "Manaus", "Recife", "Rio", "Salvador", "Sao_Luiz", "Sao_Paulo", "Vitoria"]
cities
def create_pipeline(**kwargs) -> Pipeline:
= [
split_train_nodes
node(=train_test_split,
func=dict(
inputs= "cities_temperatures_processed",
data ="params:testing_window"
testing_window
),=["training_dataset", "testing_dataset"],
outputs="train_test_split",
name=["train"]
tags
),
node(=train,
func=dict(
inputs="training_dataset",
training_dataset="params:model_config",
model_config="params:sampler_config"
sampler_config
),=dict(
outputs= "Belem_model",
Belem_model = "Curitiba_model",
Curitiba_model = "Fortaleza_model",
Fortaleza_model = "Goiania_model",
Goiania_model = "Macapa_model",
Macapa_model = "Manaus_model",
Manaus_model = "Recife_model",
Recife_model = "Rio_model",
Rio_model = "Salvador_model",
Salvador_model = "Sao_Luiz_model",
Sao_Luiz_model = "Sao_Paulo_model",
Sao_Paulo_model = "Vitoria_model"
Vitoria_model
),="train",
name=["train"]
tags
)
]
= [
forecast_training_nodes
node(=forecast,
func=dict(
inputs=f"{city}_model",
model="params:n_ahead",
n_ahead="testing_dataset"
ground_truth
),=f"{city}_forecasts_evaluation",
outputs=f"{city}_forecasts_evaluation",
name=["train"]
tags
)for city in cities
]
= [
forecast_nodes
node(=forecast,
func=dict(
inputs=f"{city}_model",
model="params:n_ahead"
n_ahead
),=f"{city}_forecasts",
outputs=f"{city}_forecast",
name=["forecast"]
tags
)for city in cities
]
return pipeline(split_train_nodes + forecast_training_nodes + forecast_nodes)
Defining Custom Pipelines
Previously, we let Kedro
automatically register our pipelines based on the project pipelines we created. However, we now have sub-pipelines that we would like to define based on our tags. We can do so in src/climate_brazil/pipeline_registry.py
:
#src/climate_brazil/pipeline_registry.py
"""Project pipelines."""
from kedro.framework.project import find_pipelines
from kedro.pipeline import Pipeline
from climate_brazil.pipelines.data_processing import create_pipeline as dp
from climate_brazil.pipelines.ML import create_pipeline as ml
def register_pipelines() -> dict[str, Pipeline]:
"""Register the project's pipelines.
Returns:
A mapping from pipeline names to ``Pipeline`` objects.
"""
= dp()
dp_pipeline = ml()
ml_pipeline = ml_pipeline.only_nodes_with_tags("train")
training_pipeline = ml_pipeline.only_nodes_with_tags("forecast")
forecast_pipeline
return {
"data_processing": dp_pipeline,
"train": dp_pipeline + training_pipeline,
"forecast": forecast_pipeline,
"__default__": dp_pipeline + training_pipeline
}
Notice how we broke down our ML pipeline into two sub-pipelines. Also, notice how we defined the train pipeline to consist of both the data processing pipeline and the training pipeline. We also assigned the train pipeline to be the default pipeline. This means if we execute kedro run
without specifying a specific pipeline it will run the train pipeline.
Executing Pipeline Components
Training Pipeline
You can execute your training pipeline by either executing
kedro run
or
kedro run --pipeline train
Let’s go ahead and run one of the above to train the model and generate forecasts on the test set with evaluations.
Forecasting pipeline
If you already have a trained model and you don’t have the ground truth for evaluations you can execute
kedro run --pipeline forecast
Visualizing Forecasts
We’ve seen our evaluation metrics logged to stdout, which by the way is written to /info.log
, let’s also look at our fitted model and forecasts against the actual data. To do that let’s add a node that will handle generating these plots in our pipeline.
def forecast_plot(training_dataset: pl.DataFrame, testing_dataset: pl.DataFrame, model: TSModel, forecast: xr.Dataset) -> go.Figure:
"""
Generates plot showing in-sample posterior predictive performance as well as out-of-sample forecasts
---
Params:
training_dataset: The training split
testing_dataset: The testing split
model: the trained model
forecasts: the forecast from the trainined model
"""
# City specific data
= model.idata.attrs['city']
city = training_dataset.filter(pl.col("city")==city)
city_training_dataset = testing_dataset.filter(pl.col("city")==city)
city_testing_dataset
# Model fit posterior predictive mean and HDI
= model.idata.posterior_predictive['temperature_normalized'].mean(('chain', 'draw'))
posterior_mean_normalized = az.hdi(model.idata.posterior_predictive['temperature_normalized'], hdi_prob=0.8)
hdi_normalized = posterior_mean_normalized * model.idata.attrs['y_std'] + model.idata.attrs['y_mean']
posterior_mean = hdi_normalized * model.idata.attrs['y_std'] + model.idata.attrs['y_mean']
hdi
# Forecast posterior predictive mean and HDI
= forecast['temperature_normalized_fut'].mean(('chain', 'draw'))
posterior_predictive_mean_normalized = az.hdi(forecast['temperature_normalized_fut'], hdi_prob=0.8)
posterior_predictive_hdi_normalized = posterior_predictive_mean_normalized * model.idata.attrs['y_std'] + model.idata.attrs['y_mean']
posterior_predictive_mean = posterior_predictive_hdi_normalized * model.idata.attrs['y_std'] + model.idata.attrs['y_mean']
posterior_predictive_hdi
= go.Figure()
fig
fig.add_traces(
[
go.Scatter(="",
name=city_training_dataset["date"],
x=hdi["temperature_normalized"][:, 1],
y="lines",
mode=dict(color="#eb8c34"),
marker=dict(width=0),
line="HDI",
legendgroup=False
showlegend
),
go.Scatter(="80% HDI",
name=city_training_dataset["date"],
x=hdi["temperature_normalized"][:, 0],
y="lines", marker=dict(color="#eb8c34"),
mode=dict(width=0),
line="HDI",
legendgroup='tonexty',
fill='rgba(235, 140, 52, 0.5)'
fillcolor
),
go.Scatter(= city_training_dataset["date"],
x = city_training_dataset["temperature"],
y ="markers",
mode="#48bbf0",
marker_color="actuals",
name="actuals"
legendgroup
),
go.Scatter(= city_training_dataset["date"],
x = posterior_mean,
y ="blue",
marker_color="posterior_mean",
name="posterior_mean"
legendgroup
),
go.Scatter(="",
name=city_testing_dataset["date"],
x=posterior_predictive_hdi["temperature_normalized_fut"][:, 1],
y="lines",
mode=dict(color="#eb8c34"),
marker=dict(width=0),
line="HDI",
legendgroup=False
showlegend
),
go.Scatter(="",
name=city_testing_dataset["date"],
x=posterior_predictive_hdi["temperature_normalized_fut"][:, 0],
y="lines", marker=dict(color="#eb8c34"),
mode=dict(width=0),
line="HDI",
legendgroup='tonexty',
fill='rgba(235, 140, 52, 0.5)',
fillcolor=False
showlegend
),
go.Scatter(= city_testing_dataset["date"],
x = city_testing_dataset["temperature"],
y ="markers",
mode="#48bbf0",
marker_color="",
name="actuals",
legendgroup=False
showlegend
),
go.Scatter(= city_testing_dataset["date"],
x = posterior_predictive_mean,
y ="lines",
mode="yellow",
marker_color="",
name="posterior_mean",
legendgroup=False
showlegend
),
]
)
fig.update_layout(= f"{city.title()} Temperature Forecast",
title =dict(
xaxis="Date",
title=dict(
rangeselector=list(
buttons
[dict(count=1, label="1y", step="year", stepmode="backward"),
dict(count=5, label="5y", step="year", stepmode="backward"),
dict(count=10, label="10y", step="year", stepmode="backward"),
dict(step="all", label="All"),
]
)
),=dict(visible=True),
rangeslidertype="date",
="black",
rangeselector_font_color="hotpink",
rangeselector_activecolor="lightblue",
rangeselector_bgcolor=dict(clipmax=city_testing_dataset['date'].max() + timedelta(days=30), clipmin=city_training_dataset['date'].min() - timedelta(days=30))
autorangeoptions
),=dict(
yaxis="Temperature"
title
)
)return fig
Let’s make sure we define the outputs from our node so that we persist the figures to disk.
# Forecast plots: This is a dataset factory that will save all of our figures
"{city}_plot":
type: plotly.JSONDataset
filepath: data/08_reporting/{city}_plot.json
Next, we add the node to our pipeline.
# Add to /src/climate_brazil/pipelines/ML/pipeline.py
= [
plot_node
node(=forecast_plot,
func=dict(
inputs="training_dataset",
training_dataset="testing_dataset",
testing_dataset=f"{city}_model",
model=f"{city}_forecasts_evaluation"
forecast
),=f"{city}_plot",
outputs=f"{city}_forecast_plot",
name=["train", "forecast"]
tags
)for city in cities
]
return pipeline(
split_train_nodes + forecast_training_nodes
+ forecast_nodes
+ plot_node
)
Great! Now, let’s only execute our forecast_plot
node since we’ve already ran the pipeline that generates our models and forecasts.
kedro run --nodes forecast_plot
That will generate the following figures:
Looks good! We now have our training and forecasting pipelines up and running.
Summary
In this section we focused on preparing a PyMC model to be production ready using the ModelBuilder
class. We walked through the required methods that we need to define and adjusted and implemented new methods according to our project’s needs. Along the way we learned some more advanced features of Kedro
like defining custom datasets and registering custom pipelines/sub-pipelines. We also looked at building a basic Bayesian structural time-series model.
Coming Next
We evaluated our model by looking at the RMSE and the coverage of our 80% HDI, but what if we wanted to compare different models, or different priors. With our current implementation it would be cumbersome to track these metrics. In the next part of this series we are going to talk about common MLOps principles and integrate MLFlow into our Kedro
workflow. I hope to see you there!