Reproducibility & Scalability Part 2
PyMC ModelBuilder

Author

Jonathan Dekermanjian

Published

April 2, 2025

Overview

In part two of this series, we go over the necessary steps to take a time-series model into production by integrating the PyMC ModelBuilder class with Kedro.

Introduction

Picking up where we left off in part one, we are going to focus on putting together the necessary machinery to take a PyMC model to production. Specifically, we are going to walkthrough the following:

  • Create a Kedro ML pipeline
  • Build a baseline time-series model
  • Configure the ModelBuilder class
  • Define custom Kedro datasets
  • Register custom Kedro pipelines/sub-pipelines
  • Execute the pipeline

If you want to learn about the ModelBuilder class check out the full class details here.

Getting Started

To start we need to install some additional dependencies into our environment that we created in the previous post. PyMC is easier to install using conda because there are some non-Python dependencies like a C++ compiler that conda is able to resolve for you. So let’s switch from using pip for our project package manager to using conda by redefining our requirements.txt file into an environment.yml file.

# ./environment.yml
name: my_environment_name
channels:
  - conda-forge
  - defaults

dependencies:
  - 'pymc=5.20.1'
  - 'graphviz=12.2.1'
  - 'numpyro=0.17.0'
  - pip:
    - ipython>=8.10
    - jupyterlab>=3.0
    - kedro-datasets>=3.0
    - kedro-viz>=6.7.0
    - kedro[jupyter]~=0.19.11
    - notebook
    - pre-commit~=3.5
    - polars~=1.23.0
    - pyarrow~=18.1.0
    - plotly~=5.24.0
    - openpyxl~=3.1.0
    - kedro-docker~=0.6.2
    - pymc-extras==0.2.3 # Added new dependency

Notice that we are installing PyMC a probabilistic programming language that will allow us to define our Bayesian time-series model, numpyro which provides a Jax backend for sampling speed-ups compared to the native sampler, and pymc-extras which contains the ModelBuilder class that we will use as a scaffolding around our model.

You can create this environment using conda at the command-line:

conda env create -f environment.yml

Creating an ML Pipeline

We need to create a new Kedro pipeline that will contain all of our code related to machine learning.

kedro pipeline create ML

Great! Now similary to how we defined our nodes in our data processing pipeline we will do the same for our ML pipeline. We are going to need to create nodes that do the following:

  • split our training data into train and test portions
  • train the model
  • generate forecasts
  • evaluate forecasts

Defining Pipeline Nodes

Train-Test Split

Let’s define how we are going split our dataset so that we can evaluate our model’s performance later. If you recall, we are dealing with time-series data across twelve Brazilian cities. Our interest is in forecasting the average monthly temperatures of these cities. Looking at the data you will notice that different cities started collecting data at different time points but the last measurement of October 2019 is shared by all cities (even if the measurement is missing). So, let’s hold out one year of data (12 measurements) for each city to be our test set.

# /src/climate_brazil/pipelines/ML/nodes.py
def train_test_split(data: pl.DataFrame, testing_window: int) -> tuple[pl.DataFrame, pl.DataFrame]:
    """
    Splits time-series dataset into test and train splits
    ---
    Params:
        data: The time-series dataset
        testing_window: The size of the testing set
    """

    testing_dataset = (
        data
        .sort("city", "date")
        .group_by("city")
        .agg(
            pl.all().tail(testing_window)
        )
        .explode("temperature", "date")
        .sort("city", "date")
    )

    training_dataset = (
        data
        .sort("city", "date")
        .group_by("city")
        .agg(
            pl.all().slice(0, pl.len() - testing_window)
        )
        .explode("temperature", "date")
        .sort("city", "date")
    )

    return training_dataset, testing_dataset

Our train_test_split() function takes a parameter defining the testing set window size. This is a configuration that you may want to change in the future. Kedro generates configuration files for every pipeline that you created and that is where we will put all of our configuration parameters. Let’s define our testing_window parameter now.

#/conf/base/parameters_ML.yml
testing_window: 12

Bayesian Structural Time Series

We are going to build a simple Bayesian structural time-series model to forecast the monthly average temperatures. This series is focused on engineering so I won’t delve too deeply into the mathematics behind the model. Briefly, our model is going to have two components, a trend component that we are going to estimate using a one-dimensional Gaussian process and a seasonality component that we are going to estimate as deterministic seasonality using fourier features.

Defining The Model

We will represent our model as the following:

temperatureN(μ,σ) μ=f(x)+βfourierXfourier_terms f(x)GP(0,K(x,x;η,)) βfourierN(0,0.5) ηHalfNormal(1.0) Gamma(48,2) σHalfNormal(0.2)

Where our Gaussian Process covariance matrix K uses the Exponentiated Quadratic kernel. K(x,x)=exp[(xx)222]

In PyMC this would look like this:

with pm.Model() as model:
    model.add_coord("obs_id", train_time_range)
    model.add_coord(
        "fourier_features",
        np.arange(len(train_fourier_series.to_numpy().T)),
    )

    t = pm.Data("time_range", train_time_range, dims="obs_id")
    fourier_terms = pm.Data(
        "fourier_terms", train_fourier_series.to_numpy().T
    )

    error = pm.HalfNormal("error", 0.2)

    # Trend component
    amplitude_trend = pm.HalfNormal("amplitude_trend", 1.0)
    ls_trend = pm.Gamma("ls_trend", alpha=48, beta=2)
    cov_trend = amplitude_trend * pm.gp.cov.ExpQuad(1, ls_trend)
    
    gp_trend = pm.gp.HSGP(
        m=[10], 
        c=5.,
        cov_func=cov_trend
    )
    trend = gp_trend.prior("trend", X=t[:, None], dims="obs_id")

    # Seasonality components
    beta_fourier = pm.Normal(
        "beta_fourier", mu=0, sigma=0.5, dims="fourier_features"
    )
    seasonality = pm.Deterministic(
        "seasonal", pm.math.dot(beta_fourier, fourier_terms), dims="obs_id"
    )

    # Combine components
    mu = trend + seasonality

    pm.Normal(
        "temperature",
        mu=mu,
        sigma=error,
        observed=temperature_normalized,
        dims="obs_id",
        )

At this point we could pass in the expected data (time_range, fourier_terms, and temperature_normalized) for a city and train the time-series. However, there would be less back-tracking if we go ahead and define our model along with the necessary processing of the inputs, the method for generating forecasts, and the methods for saving and loading our trained model.

Configuring ModelBuilder

The ModelBuilder class from the pymc-extras package gives us a nice scaffolding that facilitates taking PyMC models into production. There are seven required methods that we need to define inside of our model class that inherits from ModelBuilder. However, it is often the case that we need to override some default methods in accordance to our modeling needs. Let’s go over the required methods first.

We need to implement _generate_and_preprocess_model_data() which is a method that performs our final processing of the data before it is fed into the model. In our case, we normalize the data and define our fourier features to capture yearly (12 months) seasonality.

# ModelBuilder Required Method Implementation
def _generate_and_preprocess_model_data(self, X: pl.DataFrame, y: pl.Series, normalize: bool = False):
    """
    Last mile data processing of inputs expected by the model
    ---
    Params:
        X: The matrix of predictor variables expected by our model
        y: The target variable
        normalize: Whether to Z normalize the variables
    """
    self.train_time_range = np.arange(X.shape[0])
    self.n_modes = 10
    periods = np.array(self.train_time_range) / (12)
    self.train_fourier_series = pl.DataFrame(
        {
            f"{func}_order_{order}": getattr(np, func)(
                2 * np.pi * periods * order
            )
            for order in range(1, self.n_modes + 1)
            for func in ("sin", "cos")
        }
    )
    if normalize:
        self.y_mean = np.nanmean(y)
        self.y_std = np.nanstd(y)
        self.y_normalized = (y - self.y_mean) / self.y_std
    else:
        self.y_normalized = y
Caution

Since we are only including endogenous variables in our model the normalization only applies to the target variable

Next we need to define how we are going to feed new data to our model when we are ready to make forecasts. Since we are building a time-series model with no exogenous predictor variables, we only need to define how far out into the future we want to generate forecasts.

# ModelBuilder Required Method Implementation
def _data_setter(self, n_ahead: int):
    """
    Generates required data for producing forecasts
    ---
    Params:
        n_ahead: How many periods (months) to forecast future temperatures
    """
    self.start = self.train_time_range[-1]
    self.end = self.start + n_ahead

    new_periods = np.arange(self.start, self.end, 1) / (12)
    self.test_fourier_series = pl.DataFrame(
        {
            f"{func}_order_{order}": getattr(np, func)(
                2 * np.pi * new_periods * order
            )
            for order in range(1, self.n_modes + 1)
            for func in ("sin", "cos")
        }
    )

Certainly we will need to define how to build our model by using the build_model() method. You’ll notice that we call our previously defined method _generate_and_preprocess_model_data() here.

# ModelBuilder Required Method Implementation
def build_model(self, X: pl.DataFrame, y: pl.Series, normalize_target: bool = False, **kwargs):
    """
    Defines the PyMC model structure
    ---
    Params:
        X: Dataframe of features
        y: Array of target values
    """

    self._generate_and_preprocess_model_data(X, y, normalize=normalize_target)

    with pm.Model() as self.model:
        self.model.add_coord("obs_id", self.train_time_range)
        self.model.add_coord(
            "fourier_features",
            np.arange(len(self.train_fourier_series.to_numpy().T)),
        )

        t = pm.Data("time_range", self.train_time_range, dims="obs_id")
        fourier_terms = pm.Data(
            "fourier_terms", self.train_fourier_series.to_numpy().T
        )

        error = pm.HalfNormal("error", self.model_config['error'])

        # Trend component
        amplitude_trend = pm.HalfNormal("amplitude_trend", self.model_config['amplitude_trend'])
        ls_trend = pm.Gamma("ls_trend", alpha=self.model_config['ls_trend']['alpha'], beta=self.model_config['ls_trend']['beta'])
        cov_trend = amplitude_trend * pm.gp.cov.ExpQuad(1, ls_trend)

        gp_trend = pm.gp.HSGP(
            m=[10], 
            c=5., 
            cov_func=cov_trend
        )
        trend = gp_trend.prior("trend", X=t[:, None], dims="obs_id")

        # Seasonality components
        beta_fourier = pm.Normal(
            "beta_fourier", mu=self.model_config['beta_fourier']['mu'], sigma=self.model_config['beta_fourier']['sigma'], dims="fourier_features"
        )
        seasonality = pm.Deterministic(
            "seasonal", pm.math.dot(beta_fourier, fourier_terms), dims="obs_id"
        )

        # Combine components
        mu = trend + seasonality

        pm.Normal(
            "temperature_normalized",
            mu=mu,
            sigma=error,
            observed=self.y_normalized,
            dims="obs_id",
            )

Finally, the last three required methods are all methods that define configurations for our model. As we mentioned earlier, all configurations should be handled by Kedro in our projects’ conf/ directory. So we are going to return empty dicts as our default configurations.

# ModelBuilder Required Methods Implementations
@staticmethod
def get_default_model_config() -> dict:
    model_config = {}
    return model_config

@staticmethod
def get_default_sampler_config() -> dict:
    sampler_config= {}
    return sampler_config

@property
def output_var(self):
    """
    This is basically the name of the target variable. This is used when loading in a saved model.
    """
    return "temperature"

@property
@abstractmethod
def _serializable_model_config(self) -> dict[str, int | float | dict]:
    """
    Only needs to be implemented if your model configurations contain unserializable objects.
    You need to convert the non-serializable objects to serializable ones here.
    """
    return self.model_config

Let’s go ahead and define our model and sampler configurations in the parameters_ML.yml file.

#/conf/base/parameters_ML.yml
testing_window: 12

model_config:
  error: 0.2
  amplitude_trend: 1.0
  ls_trend:
    alpha: 48
    beta: 2
  beta_fourier:
    mu: 0
    sigma: 0.5

sampler_config:
  draws: 1000
  tune: 1000
  chains: 4
  target_accept: 0.95

Our model configurations contain the priors that we defined for our model above. Whereas, the sampler configurations define parameters related to our MCMC sampler.

With the required ModelBuilder methods out of the way, let’s talk about some of the methods that we will override to accomodate our specific use case.

First, since we are using a time-series model our predictions are generated differently than how you’d generate predictions from a typical regression or classification task. So we need to make changes to the sample_posterior_predictive() method to accomdate a time-series model. We take the approach of defining a new model specifically for forecasting purposes. Notice that we are calling _data_setter() to generate our future model input, which is our forecast horizon.

# ModelBuilder Override method
def sample_posterior_predictive(self, n_ahead: int, extend_idata: bool, combined: bool, **kwargs):
    self._data_setter(n_ahead)

    with pm.Model() as self.model:
        self.model.add_coord("obs_id", self.train_time_range)
        self.model.add_coord(
            "fourier_features",
            np.arange(len(self.train_fourier_series.to_numpy().T)),
        )

        t = pm.Data("time_range", self.train_time_range, dims="obs_id")
        fourier_terms = pm.Data(
            "fourier_terms", self.train_fourier_series.to_numpy().T
        )

        error = pm.HalfNormal("error", self.model_config['error'])

        # Trend component
        amplitude_trend = pm.HalfNormal("amplitude_trend", self.model_config['amplitude_trend'])
        ls_trend = pm.Gamma("ls_trend", alpha=self.model_config['ls_trend']['alpha'], beta=self.model_config['ls_trend']['beta'])
        cov_trend = amplitude_trend * pm.gp.cov.ExpQuad(1, ls_trend)

        gp_trend = pm.gp.HSGP(
            m=[10], 
            c=5., 
            cov_func=cov_trend
        )
        trend = gp_trend.prior("trend", X=t[:, None], dims="obs_id")

        # Seasonality components
        beta_fourier = pm.Normal(
            "beta_fourier", mu=self.model_config['beta_fourier']['mu'], sigma=self.model_config['beta_fourier']['sigma'], dims="fourier_features"
        )
        seasonality = pm.Deterministic(
            "seasonal", pm.math.dot(beta_fourier, fourier_terms), dims="obs_id"
        )

        # Combine components
        mu = trend + seasonality

        pm.Normal(
            "temperature_normalized",
            mu=mu,
            sigma=error,
            observed=self.y_normalized,
            dims="obs_id",
            )
        
        self.model.add_coords({"obs_id_fut": np.arange(self.start, self.end, 1)})

        t_fut = pm.Data("time_range_fut", np.arange(self.start, self.end, 1))
        fourier_terms_fut = pm.Data("fourier_terms_fut", self.test_fourier_series.to_numpy().T)

        # Trend future component
        trend_fut = gp_trend.conditional("trend_fut", Xnew=t_fut[:, None], dims="obs_id_fut")

        # Seasonality components
        seasonality_fut = pm.Deterministic(
            "seasonal_fut", pm.math.dot(beta_fourier, fourier_terms_fut), dims="obs_id_fut"
        )

        mu_fut = trend_fut + seasonality_fut

        pm.Normal(
            "temperature_normalized_fut",
            mu=mu_fut,
            sigma=error,
            dims="obs_id_fut",
            )

    with self.model:  # sample with new input data
        post_pred = pm.sample_posterior_predictive(self.idata, var_names=["temperature_normalized_fut"], predictions=True, **kwargs)
        if extend_idata:
            self.idata.extend(post_pred, join="right")

    posterior_predictive_samples = az.extract(
        post_pred, "predictions", combined=combined
    )

    return posterior_predictive_samples

We also need to update the fit() method for a couple of reasons. The first is because we implemented a normalize_target method and we need to propogate that into the fit() method which calls our build_model() method. Second, because the defualt fit() method defines additional sampler configurations that we don’t want because we have already centralized all of our configurations to be handled by Kedro.

# ModelBuilder Override method
def fit(self, X: pl.DataFrame, y: pl.Series, normalize_target: bool = False) -> az.InferenceData:
    """
    Fits the model to the provided dataset
    ---
    Params:
        X: The dataset container predictor variables
        y: The target variable
        normalize_target: Whether to Z normalize the target variable
    """
    self.build_model(X, y, normalize_target=normalize_target)
    self.idata = self.sample_model(**self.sampler_config)

    X_df = X.to_pandas()
    combined_data = pd.concat([X_df, y.to_pandas()], axis=1)
    assert all(combined_data.columns), "All columns must have non-empty names"
    with warnings.catch_warnings():
        warnings.filterwarnings(
            "ignore",
            category=UserWarning,
            message="The group fit_data is not defined in the InferenceData scheme",
        )
        self.idata.add_groups(fit_data=combined_data.to_xarray())

    return self.idata

Lastly (I promise), we need to define a new method that will handle our model evaluations. We are going to use the root mean squared error as a metric to evaluate our model. We will also measure the coverage of our 80% Highest Density Interval (HDI). Additionally, it is a good idea to include in our model serialization the city the model is fitted to, and if we are standardizing the data then we should also include the mean and standard deviation of our target temperature values.

# ModelBuilder override Init
def __init__(
        self,
        model_config: dict | None = None,
        sampler_config: dict | None = None,
        city: str = None
    ):
        """
        Initializes model configuration and sampler configuration for the model

        Parameters
        ----------
        model_config : Dictionary, optional
            dictionary of parameters that initialise model configuration. Class-default defined by the user default_model_config method.
        sampler_config : Dictionary, optional
            dictionary of parameters that initialise sampler configuration. Class-default defined by the user default_sampler_config method.
        city: The Brazilian city we are modeling monthly average temperatures of
        """
        sampler_config = (
            self.get_default_sampler_config() if sampler_config is None else sampler_config
        )
        self.sampler_config = sampler_config
        model_config = self.get_default_model_config() if model_config is None else model_config

        self.model_config = model_config  
        self.model = None  
        self.idata: az.InferenceData | None = None
        self.is_fitted_ = False
        self.city = city # This is our Addition

# ModelBuilder new method
def evaluate(self, y_true: pl.Series, forecasts: xr.Dataset, back_transform: bool = False) -> dict:
    """
    Evaluate our forecasts posterior predictive mean using the root mean squared error (RMSE) as the metric and evaluate our highest density interval's (HDI)s coverage
    ---
    Params:
        y_true: The ground truth temperatures
        forecasts: The forecasts
        back_transform: Whether we need to transform our forecasts back to the original scale
    """
    if back_transform:
        try:
            y_mean = self.y_mean
            y_std = self.y_std
        except AttributeError:
            y_mean = self.idata.attrs['y_mean']
            y_std = self.idata.attrs['y_std']
        posterior_predictive_mean = forecasts[f'{self.output_var}_normalized_fut'].mean(("chain", "draw")).values * y_std + y_mean
        hdi = az.hdi(forecasts[f'{self.output_var}_normalized_fut'], hdi_prob=0.8) * y_std + y_mean
    else:
        posterior_predictive_mean = forecasts[f'{self.output_var}_normalized_fut'].mean(("chain", "draw")).values
        hdi = az.hdi(forecasts[f'{self.output_var}_normalized_fut'], hdi_prob=0.8)

    error = y_true.to_numpy() - posterior_predictive_mean
    RMSE = np.sqrt(
        np.nanmean(
            np.square(error)
        )
    )

    coverage_df = pl.DataFrame(
        {
            "hdi_lower": hdi[f'{self.output_var}_normalized_fut'][:, 0].values,
            "hdi_upper": hdi[f'{self.output_var}_normalized_fut'][:, 1].values,
            "y_true": y_true
        }
    )

    COVERAGE = (
        coverage_df
            .filter(
                pl.col("y_true").is_not_null()
            )
            .with_columns(
                pl.when(
                    (pl.col("y_true") <= pl.col("hdi_upper")) &
                    (pl.col("y_true") >= pl.col("hdi_lower"))
                )
                .then(1.)
                .otherwise(0.)
                .alias("coverage")
            )
            .select(pl.col("coverage").mean()).item()
    )
    
    return {"RMSE": RMSE, "HDI_COVERAGE": COVERAGE}

def _save_input_params(self, idata) -> None:
    """
    Saves any additional model parameters (other than the dataset) to the idata object.
    """
    idata.attrs["city"] = self.city
    idata.attrs["y_mean"] = self.y_mean
    idata.attrs["y_std"] = self.y_std

Combining everything together we have our time-series model implemented as:

# /src/climate_brazil/pipelines/ML/ts_model.py
import warnings
from abc import abstractmethod

import arviz as az
import numpy as np
import pandas as pd
import polars as pl
import pymc as pm
import xarray as xr
from pymc_extras.model_builder import ModelBuilder


class TSModel(ModelBuilder):
    _model_type = "TimeSeries"
    version = "0.1"

    def __init__(
        self,
        model_config: dict | None = None,
        sampler_config: dict | None = None,
        city: str = None
    ):
        """
        Initializes model configuration and sampler configuration for the model

        Parameters
        ----------
        model_config : Dictionary, optional
            dictionary of parameters that initialise model configuration. Class-default defined by the user default_model_config method.
        sampler_config : Dictionary, optional
            dictionary of parameters that initialise sampler configuration. Class-default defined by the user default_sampler_config method.
        city: The Brazilian city we are modeling monthly average temperatures of
        """
        sampler_config = (
            self.get_default_sampler_config() if sampler_config is None else sampler_config
        )
        self.sampler_config = sampler_config
        model_config = self.get_default_model_config() if model_config is None else model_config

        self.model_config = model_config  # parameters for priors etc.
        self.model = None  # Set by build_model
        self.idata: az.InferenceData | None = None  # idata is generated during fitting
        self.is_fitted_ = False
        self.city = city

    def _generate_and_preprocess_model_data(self, X: pl.DataFrame, y: pl.Series, normalize: bool = False):
        """
        Last mile data processing of inputs expected by the model
        ---
        Params:
            X: The matrix of predictor variables expected by our model
            y: The target variable
            normalize: Whether to Z normalize the variables
        """
        self.train_time_range = np.arange(X.shape[0])
        self.n_modes = 10
        periods = np.array(self.train_time_range) / (12)
        self.train_fourier_series = pl.DataFrame(
            {
                f"{func}_order_{order}": getattr(np, func)(
                    2 * np.pi * periods * order
                )
                for order in range(1, self.n_modes + 1)
                for func in ("sin", "cos")
            }
        )
        if normalize:
            self.y_mean = np.nanmean(y)
            self.y_std = np.nanstd(y)
            self.y_normalized = (y - self.y_mean) / self.y_std
        else:
            self.y_normalized = y

    def _data_setter(self, n_ahead: int):
        """
        Generates required data for producing forecasts
        ---
        Params:
            n_ahead: How many periods (months) to forecast future temperatures
        """
        self.start = self.train_time_range[-1]
        self.end = self.start + n_ahead

        new_periods = np.arange(self.start, self.end, 1) / (12)
        self.test_fourier_series = pl.DataFrame(
            {
                f"{func}_order_{order}": getattr(np, func)(
                    2 * np.pi * new_periods * order
                )
                for order in range(1, self.n_modes + 1)
                for func in ("sin", "cos")
            }
        )

    def build_model(self, X: pl.DataFrame, y: pl.Series, normalize_target: bool = False, **kwargs):
        """
        Defines the PyMC model structure
        ---
        Params:
            X: Dataframe of features
            y: Array of target values
        """

        self._generate_and_preprocess_model_data(X, y, normalize=normalize_target)

        with pm.Model() as self.model:
            self.model.add_coord("obs_id", self.train_time_range)
            self.model.add_coord(
                "fourier_features",
                np.arange(len(self.train_fourier_series.to_numpy().T)),
            )

            t = pm.Data("time_range", self.train_time_range, dims="obs_id")
            fourier_terms = pm.Data(
                "fourier_terms", self.train_fourier_series.to_numpy().T
            )

            error = pm.HalfNormal("error", self.model_config['error'])

            # Trend component
            amplitude_trend = pm.HalfNormal("amplitude_trend", self.model_config['amplitude_trend'])
            ls_trend = pm.Gamma("ls_trend", alpha=self.model_config['ls_trend']['alpha'], beta=self.model_config['ls_trend']['beta'])
            cov_trend = amplitude_trend * pm.gp.cov.ExpQuad(1, ls_trend)

            gp_trend = pm.gp.HSGP(
                m=[10],
                c=5.,
                cov_func=cov_trend
            )
            trend = gp_trend.prior("trend", X=t[:, None], dims="obs_id")

            # Seasonality components
            beta_fourier = pm.Normal(
                "beta_fourier", mu=self.model_config['beta_fourier']['mu'], sigma=self.model_config['beta_fourier']['sigma'], dims="fourier_features"
            )
            seasonality = pm.Deterministic(
                "seasonal", pm.math.dot(beta_fourier, fourier_terms), dims="obs_id"
            )

            # Combine components
            mu = trend + seasonality

            pm.Normal(
                "temperature_normalized",
                mu=mu,
                sigma=error,
                observed=self.y_normalized,
                dims="obs_id",
                )

    def sample_posterior_predictive(self, n_ahead: int, extend_idata: bool, combined: bool, **kwargs):
        self._data_setter(n_ahead)

        with pm.Model() as self.model:
            self.model.add_coord("obs_id", self.train_time_range)
            self.model.add_coord(
                "fourier_features",
                np.arange(len(self.train_fourier_series.to_numpy().T)),
            )

            t = pm.Data("time_range", self.train_time_range, dims="obs_id")
            fourier_terms = pm.Data(
                "fourier_terms", self.train_fourier_series.to_numpy().T
            )

            error = pm.HalfNormal("error", self.model_config['error'])

            # Trend component
            amplitude_trend = pm.HalfNormal("amplitude_trend", self.model_config['amplitude_trend'])
            ls_trend = pm.Gamma("ls_trend", alpha=self.model_config['ls_trend']['alpha'], beta=self.model_config['ls_trend']['beta'])
            cov_trend = amplitude_trend * pm.gp.cov.ExpQuad(1, ls_trend)

            gp_trend = pm.gp.HSGP(
                m=[10],
                c=5.,
                cov_func=cov_trend
            )
            trend = gp_trend.prior("trend", X=t[:, None], dims="obs_id")

            # Seasonality components
            beta_fourier = pm.Normal(
                "beta_fourier", mu=self.model_config['beta_fourier']['mu'], sigma=self.model_config['beta_fourier']['sigma'], dims="fourier_features"
            )
            seasonality = pm.Deterministic(
                "seasonal", pm.math.dot(beta_fourier, fourier_terms), dims="obs_id"
            )

            # Combine components
            mu = trend + seasonality

            pm.Normal(
                "temperature_normalized",
                mu=mu,
                sigma=error,
                observed=self.y_normalized,
                dims="obs_id",
                )

            self.model.add_coords({"obs_id_fut": np.arange(self.start, self.end, 1)})

            t_fut = pm.Data("time_range_fut", np.arange(self.start, self.end, 1))
            fourier_terms_fut = pm.Data("fourier_terms_fut", self.test_fourier_series.to_numpy().T)

            # Trend future component
            trend_fut = gp_trend.conditional("trend_fut", Xnew=t_fut[:, None], dims="obs_id_fut")

            # Seasonality components
            seasonality_fut = pm.Deterministic(
                "seasonal_fut", pm.math.dot(beta_fourier, fourier_terms_fut), dims="obs_id_fut"
            )

            mu_fut = trend_fut + seasonality_fut

            pm.Normal(
                "temperature_normalized_fut",
                mu=mu_fut,
                sigma=error,
                dims="obs_id_fut",
                )

        with self.model:  # sample with new input data
            post_pred = pm.sample_posterior_predictive(self.idata, var_names=[f"{self.output_var}_normalized_fut"], predictions=True, **kwargs)
            if extend_idata:
                self.idata.extend(post_pred, join="right")

        posterior_predictive_samples = az.extract(
            post_pred, "predictions", combined=combined
        )

        return posterior_predictive_samples

    def fit(self, X: pl.DataFrame, y: pl.Series, normalize_target: bool = False) -> az.InferenceData:
        """
        Fits the model to the provided dataset
        ---
        Params:
            X: The dataset container predictor variables
            y: The target variable
            normalize_target: Whether to Z normalize the target variable
        """

        self.build_model(X, y, normalize_target=normalize_target)
        self.idata = self.sample_model(**self.sampler_config)

        X_df = X.to_pandas()
        combined_data = pd.concat([X_df, y.to_pandas()], axis=1)
        assert all(combined_data.columns), "All columns must have non-empty names"
        with warnings.catch_warnings():
            warnings.filterwarnings(
                "ignore",
                category=UserWarning,
                message="The group fit_data is not defined in the InferenceData scheme",
            )
            self.idata.add_groups(fit_data=combined_data.to_xarray())  # type: ignore

        return self.idata  # type: ignore

    @staticmethod
    def get_default_model_config() -> dict:
        model_config = {
            "error": 0.2,
            "amplitude_trend": 1.0,
            "ls_trend": {"alpha": 48, "beta": 2},
            "beta_fourier": {"mu": 0, "sigma": 0.5},
        }
        return model_config

    @staticmethod
    def get_default_sampler_config() -> dict:
        """
        Returns a class default sampler dict for model builder if no sampler_config is provided on class initialization.
        The sampler config dict is used to send parameters to the sampler .
        It will be used during fitting in case the user doesn't provide any sampler_config of their own.
        """
        sampler_config= {
            "draws": 1_000,
            "tune": 1_000,
            "chains": 4,
            "target_accept": 0.95,
        }
        return sampler_config

    @property
    def output_var(self):
        return "temperature"

    @property
    @abstractmethod
    def _serializable_model_config(self) -> dict[str, int | float | dict]:
        """
        Converts non-serializable values from model_config to their serializable reversable equivalent.
        Data types like pandas DataFrame, Series or datetime aren't JSON serializable,
        so in order to save the model they need to be formatted.

        Returns
        -------
        model_config: dict
        """
        return self.model_config

    def evaluate(self, y_true: pl.Series, forecasts: xr.Dataset, back_transform: bool = False) -> dict:
        """
        Evaluate our forecasts posterior predictive mean using the root mean squared error (RMSE) as the metric and evaluate our highest density interval's (HDI)s coverage
        ---
        Params:
            y_true: The ground truth temperatures
            forecasts: The forecasts
            back_transform: Whether we need to transform our forecasts back to the original scale
        """
        if back_transform:
            try:
                y_mean = self.y_mean
                y_std = self.y_std
            except AttributeError:
                y_mean = self.idata.attrs['y_mean']
                y_std = self.idata.attrs['y_std']
            posterior_predictive_mean = forecasts[f'{self.output_var}_normalized_fut'].mean(("chain", "draw")).values * y_std + y_mean
            hdi = az.hdi(forecasts[f'{self.output_var}_normalized_fut'], hdi_prob=0.8) * y_std + y_mean
        else:
            posterior_predictive_mean = forecasts[f'{self.output_var}_normalized_fut'].mean(("chain", "draw")).values
            hdi = az.hdi(forecasts[f'{self.output_var}_normalized_fut'], hdi_prob=0.8)

        error = y_true.to_numpy() - posterior_predictive_mean
        RMSE = np.sqrt(
            np.nanmean(
                np.square(error)
            )
        )

        coverage_df = pl.DataFrame(
            {
                "hdi_lower": hdi[f'{self.output_var}_normalized_fut'][:, 0].values,
                "hdi_upper": hdi[f'{self.output_var}_normalized_fut'][:, 1].values,
                "y_true": y_true
            }
        )

        COVERAGE = (
            coverage_df
              .filter(
                  pl.col("y_true").is_not_null()
              )
              .with_columns(
                  pl.when(
                      (pl.col("y_true") <= pl.col("hdi_upper")) &
                      (pl.col("y_true") >= pl.col("hdi_lower"))
                  )
                    .then(1.)
                    .otherwise(0.)
                    .alias("coverage")
              )
              .select(pl.col("coverage").mean()).item()
        )

        return {"RMSE": RMSE, "HDI_COVERAGE": COVERAGE}

    def _save_input_params(self, idata: az.InferenceData) -> None:
        """
        Saves any additional model parameters (other than the dataset) to the idata object.
        """
        idata.attrs["city"] = self.city
        idata.attrs["y_mean"] = self.y_mean
        idata.attrs["y_std"] = self.y_std

Training Node

Now that we have our model configured with ModelBuilder it is time to define our training node:

#/src/climate_brazil/pipelines/ML/nodes.py
def train(training_dataset: pl.DataFrame, model_config: dict, sampler_config: dict) -> dict:
    """
    Train time-series models
    ---
    Params:
        training_dataset: Training data
        model_config: Model Configurations
        sampler_config: MCMC sampler configurations
    """
    city_models = {}
    for city in training_dataset['city'].sort().unique(maintain_order=True):
        city_dataset = training_dataset.filter(pl.col("city")==city).sort("date")
        model = TSModel(model_config=model_config, sampler_config=sampler_config, city=city)
        model.fit(X=city_dataset['date'], y=city_dataset['temperature'], normalize_target=True)
        city_models[f"{city}_model"] = model

    return city_models
Important

DO NOT use this training node in production! We are going to refactor this code in the last part of this series to scale more efficiently.

We need to be sure to define catalog entries for our training output otherwise Kedro will designate our trained models as MemoryDataset’s. However, there is no built-in dataset in Kedro that knows how to store our model. So we need to define a custom Kedro dataset to accomodate our needs.

Creating a Custom Kedro Dataset

Creating a custom dataset can be quite simple. All we need to do is inheret from Kedro’s AbstractDataset and define the init, save, load, and describe methods. In our case, the ModelBuilder class has save and load methods that accept a filepath and we will use those methods in our custom dataset:

# /src/climate_brazil/datasets/pymc_model_dataset.py
from pathlib import Path, PurePosixPath
from typing import Any

from kedro.io import AbstractDataset

from kedro_framework.pipelines.ML.ts_model import TSModel


class PyMCModelDataset(AbstractDataset):
    """
    ``PyMCDataset`` loads / save PyMC models from a given filepath as a TSModel object.
    """

    def __init__(self, filepath: str):
        """Creates a new instance of PyMCDataset to load / save PyMC models for a given filepath.

        Args:
            filepath: The location of the model netcdf file to load / save data.
        """
        self._filepath = PurePosixPath(filepath)
        self._path = Path(self._filepath.parent)

    def load(self) -> TSModel:
        """Loads data from the netcdf file.

        Returns:
            loaded TSModel
        """
        model = TSModel.load(self._filepath)
        return model

    def save(self, model: TSModel) -> None:
        """Saves PyMC model to the specified filepath."""
        self._path.mkdir(parents=True,  exist_ok=True)
        model.save(self._filepath)

    def _describe(self) -> dict[str, Any]:
        """Returns a dict that describes the attributes of the dataset."""
        return dict(filepath=self._filepath)

Train Catalog Entry

With our custom dataset defined let’s define entries for our training and testing data as well as for our trained models.

# conf/base/catalog.yml
# Raw data ---------------------------------------
cities_temperatures:
  type: partitions.PartitionedDataset
  path: data/01_raw/
  dataset: 
    type: polars.LazyPolarsDataset
    file_format: csv

# Processed data ---------------------------------------
cities_temperatures_processed:
  type: polars.EagerPolarsDataset
  file_format: parquet
  filepath: data/02_intermediate/cities_temperatures_processed.pq

cities_ts_plot:
  type: plotly.JSONDataset
  filepath: data/08_reporting/cities_ts_plot.json

# Model inputs ------------------------------------------
training_dataset:
  type: polars.EagerPolarsDataset
  file_format: parquet
  filepath: data/05_model_input/training_dataset.pq

testing_dataset:
  type: polars.EagerPolarsDataset
  file_format: parquet
  filepath: data/05_model_input/testing_dataset.pq

# Models -----------------------------------------
Belem_model:
  type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
  filepath: data/06_models/Belem_model.nc

Curitiba_model:
  type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
  filepath: data/06_models/Curitiba_model.nc

Fortaleza_model:
  type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
  filepath: data/06_models/Fortaleza_model.nc

Goiania_model:
  type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
  filepath: data/06_models/Goiania_model.nc

Macapa_model:
  type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
  filepath: data/06_models/Macapa_model.nc

Manaus_model:
  type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
  filepath: data/06_models/Manaus_model.nc

Recife_model:
  type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
  filepath: data/06_models/Recife_model.nc

Rio_model:
  type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
  filepath: data/06_models/Rio_model.nc

Salvador_model:
  type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
  filepath: data/06_models/Salvador_model.nc

Sao_Luiz_model:
  type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
  filepath: data/06_models/Sao_Luiz_model.nc

Sao_Paulo_model:
  type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
  filepath: data/06_models/Sao_Paulo_model.nc

Vitoria_model:
  type: climate_brazil.datasets.pymc_model_dataset.PyMCModelDataset
  filepath: data/06_models/Vitoria_model.nc
Note

For the sake of clarity, we added one entry for each city. You can use a Kedro’s dataset factory feature to capture all city models with one entry. We went over an example of how to use a dataset factory in part one of this series.

Forecasting Node

Finally, we need to define how to generate our forecasts. We would like our forecasting node to have the ability to perform evaluations if we pass in the testing dataset as an argument and for the moment we just want to log out our evaluation metrics to stdout.

# /src/climate_brazil/datasets/pymc_forecast_dataset.py
def forecast(model: TSModel, n_ahead: int, ground_truth: Optional[pl.DataFrame] = None) -> xr.Dataset:
    """
    Generates forecasts from trained time-series and produces evaluations if ground truth is passed in.
    ---
    Params:
        model: The trained time-series model
        n_ahead: The forecast horizon
        ground_truth: The actual values to be compared with the forecasts
    """
    forecasts = model.sample_posterior_predictive(n_ahead=n_ahead, extend_idata=True, combined=False)
    if ground_truth is not None:
        evaluations = model.evaluate(y_true=ground_truth.filter(pl.col("city")==model.idata.attrs['city'])["temperature"], forecasts=forecasts, back_transform=True)
        logger.info(f"{model.idata.attrs['city']} Evaluations: {evaluations}")
    return forecasts

Notice that we need to pass in an n_ahead parameter that defines the forecast horizon. Now is a good time to add that parameter into /conf/base/parameters_ML.yml

testing_window: 12
n_ahead: 12

model_config:
  error: 0.2
  amplitude_trend: 1.0
  ls_trend:
    alpha: 48
    beta: 2
  beta_fourier:
    mu: 0
    sigma: 0.5

sampler_config:
  draws: 1000
  tune: 1000
  chains: 4
  target_accept: 0.95
  mp_ctx: spawn # might need this if you are running on MacOS

Our forecast output will be an xarray dataset and we will go ahead and save that to disk in NETCDF format. Once again, we will create a custom Kedro dataset for that.

# /src/climate_brazil/datasets/pymc_forecast_dataset.py
from pathlib import Path, PurePosixPath
from typing import Any

import xarray as xr
from kedro.io import AbstractDataset


class PyMCForecastDataset(AbstractDataset):
    """
    ``PyMCForecastDataset`` loads / save PyMC forecasts from a given filepath as an xarray Dataset.
    """

    def __init__(self, filepath: str):
        """Creates a new instance of PyMCForecastDataset to load / save PyMC forecasts for a given filepath.

        Args:
            filepath: The location of the forecast netcdf file to load / save.
        """
        self._filepath = PurePosixPath(filepath)
        self._path = Path(self._filepath.parent)

    def load(self) -> xr.Dataset:
        """Loads data from the netcdf file.

        Returns:
            loaded forecasts
        """
        return xr.load_dataset(self._filepath)

    def save(self, forecast: xr.Dataset) -> None:
        """Saves PyMC forecasts to the specified filepath."""
        self._path.mkdir(parents=True,  exist_ok=True)
        forecast.to_netcdf(path=self._filepath)

    def _describe(self) -> dict[str, Any]:
        """Returns a dict that describes the attributes of the dataset."""
        return dict(filepath=self._filepath)

Defining The ML Pipeline

We are almost set! Our catalog is defined, our model is configured, and our nodes are implemented. Let’s put everything together and define our machine learning pipeline.

Train

Here we are going to introduce tags, another Kedro feature that allows you to run specific nodes by tag category. At this stage our pipeline will be the training split node followed by the training node.

# /src/climate_brazil/pipelines/ML/pipeline.py
from kedro.pipeline import node, Pipeline, pipeline  # noqa
from .nodes import train_test_split, train, forecast

def create_pipeline(**kwargs) -> Pipeline:
    split_train_nodes =  [
        node(
            func=train_test_split,
            inputs=dict(
                data = "cities_temperatures_processed",
                testing_window="params:testing_window"
            ),
            outputs=["training_dataset", "testing_dataset"],
            name="train_test_split",
            tags=["train"]
        ),
        node(
            func=train,
            inputs=dict(
                training_dataset="training_dataset",
                model_config="params:model_config",
                sampler_config="params:sampler_config"
            ),
            outputs=dict(
                Belem_model = "Belem_model",
                Curitiba_model = "Curitiba_model",
                Fortaleza_model = "Fortaleza_model",
                Goiania_model = "Goiania_model",
                Macapa_model = "Macapa_model",
                Manaus_model = "Manaus_model",
                Recife_model = "Recife_model",
                Rio_model = "Rio_model",
                Salvador_model = "Salvador_model",
                Sao_Luiz_model = "Sao_Luiz_model",
                Sao_Paulo_model = "Sao_Paulo_model",
                Vitoria_model = "Vitoria_model"
            ),
            name="train",
            tags=["train"]
        )
    ]
Caution

Always make sure that your inputs and outputs match the keys you chose in catalog.yml

Forecast

This is where we are going to leverage the flexibility of tags. We are going to create two sub-pipelines for our forecasts. The first sub-pipeline will be for training purposes, this is when we have ground-truth data to pass in for evaluations. The second will be when we are using our time-series model in production and we do not have the ground-truth available. Our full ML pipeline will look like this:

# /src/climate_brazil/pipelines/ML/pipeline.py
from kedro.pipeline import node, Pipeline, pipeline  # noqa
from .nodes import train_test_split, train, forecast

cities = ["Belem", "Curitiba", "Fortaleza", "Goiania", "Macapa", "Manaus", "Recife", "Rio", "Salvador", "Sao_Luiz", "Sao_Paulo", "Vitoria"]

def create_pipeline(**kwargs) -> Pipeline:
    split_train_nodes =  [
        node(
            func=train_test_split,
            inputs=dict(
                data = "cities_temperatures_processed",
                testing_window="params:testing_window"
            ),
            outputs=["training_dataset", "testing_dataset"],
            name="train_test_split",
            tags=["train"]
        ),
        node(
            func=train,
            inputs=dict(
                training_dataset="training_dataset",
                model_config="params:model_config",
                sampler_config="params:sampler_config"
            ),
            outputs=dict(
                Belem_model = "Belem_model",
                Curitiba_model = "Curitiba_model",
                Fortaleza_model = "Fortaleza_model",
                Goiania_model = "Goiania_model",
                Macapa_model = "Macapa_model",
                Manaus_model = "Manaus_model",
                Recife_model = "Recife_model",
                Rio_model = "Rio_model",
                Salvador_model = "Salvador_model",
                Sao_Luiz_model = "Sao_Luiz_model",
                Sao_Paulo_model = "Sao_Paulo_model",
                Vitoria_model = "Vitoria_model"
            ),
            name="train",
            tags=["train"]
        )
    ]

    forecast_training_nodes = [
        node(
            func=forecast,
            inputs=dict(
                model=f"{city}_model",
                n_ahead="params:n_ahead",
                ground_truth="testing_dataset"
            ),
            outputs=f"{city}_forecasts_evaluation",
            name=f"{city}_forecasts_evaluation",
            tags=["train"]
        )
        for city in cities
    ]

    forecast_nodes = [
        node(
            func=forecast,
            inputs=dict(
                model=f"{city}_model",
                n_ahead="params:n_ahead"
            ),
            outputs=f"{city}_forecasts",
            name=f"{city}_forecast",
            tags=["forecast"]
        )
        for city in cities
    ]

    return pipeline(split_train_nodes + forecast_training_nodes + forecast_nodes)

Defining Custom Pipelines

Previously, we let Kedro automatically register our pipelines based on the project pipelines we created. However, we now have sub-pipelines that we would like to define based on our tags. We can do so in src/climate_brazil/pipeline_registry.py:

#src/climate_brazil/pipeline_registry.py
"""Project pipelines."""

from kedro.framework.project import find_pipelines
from kedro.pipeline import Pipeline

from climate_brazil.pipelines.data_processing import create_pipeline as dp
from climate_brazil.pipelines.ML import create_pipeline as ml

def register_pipelines() -> dict[str, Pipeline]:
    """Register the project's pipelines.

    Returns:
        A mapping from pipeline names to ``Pipeline`` objects.
    """
    dp_pipeline = dp()
    ml_pipeline = ml()
    training_pipeline = ml_pipeline.only_nodes_with_tags("train")
    forecast_pipeline = ml_pipeline.only_nodes_with_tags("forecast")

    return {
        "data_processing": dp_pipeline,
        "train": dp_pipeline + training_pipeline,
        "forecast": forecast_pipeline,
        "__default__": dp_pipeline + training_pipeline
    }

Notice how we broke down our ML pipeline into two sub-pipelines. Also, notice how we defined the train pipeline to consist of both the data processing pipeline and the training pipeline. We also assigned the train pipeline to be the default pipeline. This means if we execute kedro run without specifying a specific pipeline it will run the train pipeline.

Executing Pipeline Components

Training Pipeline

You can execute your training pipeline by either executing

kedro run

or

kedro run --pipeline train

Let’s go ahead and run one of the above to train the model and generate forecasts on the test set with evaluations.

Forecasting pipeline

If you already have a trained model and you don’t have the ground truth for evaluations you can execute

kedro run --pipeline forecast

Visualizing Forecasts

We’ve seen our evaluation metrics logged to stdout, which by the way is written to /info.log, let’s also look at our fitted model and forecasts against the actual data. To do that let’s add a node that will handle generating these plots in our pipeline.

def forecast_plot(training_dataset: pl.DataFrame, testing_dataset: pl.DataFrame, model: TSModel, forecast: xr.Dataset) -> go.Figure:
    """
    Generates plot showing in-sample posterior predictive performance as well as out-of-sample forecasts
    ---
    Params:
        training_dataset: The training split
        testing_dataset: The testing split
        model: the trained model
        forecasts: the forecast from the trainined model
    """
    # City specific data
    city = model.idata.attrs['city']
    city_training_dataset = training_dataset.filter(pl.col("city")==city)
    city_testing_dataset = testing_dataset.filter(pl.col("city")==city)

    # Model fit posterior predictive mean and HDI
    posterior_mean_normalized = model.idata.posterior_predictive['temperature_normalized'].mean(('chain', 'draw'))
    hdi_normalized = az.hdi(model.idata.posterior_predictive['temperature_normalized'], hdi_prob=0.8)
    posterior_mean = posterior_mean_normalized * model.idata.attrs['y_std'] + model.idata.attrs['y_mean']
    hdi = hdi_normalized * model.idata.attrs['y_std'] + model.idata.attrs['y_mean']

    # Forecast posterior predictive mean and HDI
    posterior_predictive_mean_normalized = forecast['temperature_normalized_fut'].mean(('chain', 'draw'))
    posterior_predictive_hdi_normalized = az.hdi(forecast['temperature_normalized_fut'], hdi_prob=0.8)
    posterior_predictive_mean = posterior_predictive_mean_normalized * model.idata.attrs['y_std'] + model.idata.attrs['y_mean']
    posterior_predictive_hdi = posterior_predictive_hdi_normalized * model.idata.attrs['y_std'] + model.idata.attrs['y_mean']

    fig = go.Figure()
    fig.add_traces(
        [
            go.Scatter(
                name="", 
                x=city_training_dataset["date"], 
                y=hdi["temperature_normalized"][:, 1], 
                mode="lines", 
                marker=dict(color="#eb8c34"), 
                line=dict(width=0), 
                legendgroup="HDI",
                showlegend=False
            ),
            go.Scatter(
                name="80% HDI", 
                x=city_training_dataset["date"], 
                y=hdi["temperature_normalized"][:, 0], 
                mode="lines", marker=dict(color="#eb8c34"), 
                line=dict(width=0), 
                legendgroup="HDI", 
                fill='tonexty', 
                fillcolor='rgba(235, 140, 52, 0.5)'
            ),
            go.Scatter(
                x = city_training_dataset["date"],
                y = city_training_dataset["temperature"],
                mode="markers",
                marker_color="#48bbf0",
                name="actuals",
                legendgroup="actuals"
            ),
            go.Scatter(
                x = city_training_dataset["date"],
                y = posterior_mean,
                marker_color="blue",
                name="posterior_mean",
                legendgroup="posterior_mean"
            ),
            go.Scatter(
                name="", 
                x=city_testing_dataset["date"], 
                y=posterior_predictive_hdi["temperature_normalized_fut"][:, 1], 
                mode="lines", 
                marker=dict(color="#eb8c34"), 
                line=dict(width=0), 
                legendgroup="HDI",
                showlegend=False
            ),
            go.Scatter(
                name="", 
                x=city_testing_dataset["date"], 
                y=posterior_predictive_hdi["temperature_normalized_fut"][:, 0], 
                mode="lines", marker=dict(color="#eb8c34"), 
                line=dict(width=0), 
                legendgroup="HDI", 
                fill='tonexty', 
                fillcolor='rgba(235, 140, 52, 0.5)',
                showlegend=False
            ),
            go.Scatter(
                x = city_testing_dataset["date"],
                y = city_testing_dataset["temperature"],
                mode="markers",
                marker_color="#48bbf0",
                name="",
                legendgroup="actuals",
                showlegend=False
            ),
            go.Scatter(
                x = city_testing_dataset["date"],
                y = posterior_predictive_mean,
                mode="lines",
                marker_color="yellow",
                name="",
                legendgroup="posterior_mean",
                showlegend=False
            ),
        ]
    )
    fig.update_layout(
        title = f"{city.title()} Temperature Forecast",
        xaxis=dict(
                title="Date",
                rangeselector=dict(
                    buttons=list(
                        [
                            dict(count=1, label="1y", step="year", stepmode="backward"),
                            dict(count=5, label="5y", step="year", stepmode="backward"),
                            dict(count=10, label="10y", step="year", stepmode="backward"),
                            dict(step="all", label="All"),
                        ]
                    )
                ),
                rangeslider=dict(visible=True),
                type="date",
                rangeselector_font_color="black",
                rangeselector_activecolor="hotpink",
                rangeselector_bgcolor="lightblue",
                autorangeoptions=dict(clipmax=city_testing_dataset['date'].max() + timedelta(days=30), clipmin=city_training_dataset['date'].min() - timedelta(days=30))
            ),
        yaxis=dict(
            title="Temperature"
        )
    )
    return fig

Let’s make sure we define the outputs from our node so that we persist the figures to disk.

# Forecast plots: This is a dataset factory that will save all of our figures
"{city}_plot":
  type: plotly.JSONDataset
  filepath: data/08_reporting/{city}_plot.json

Next, we add the node to our pipeline.

# Add to /src/climate_brazil/pipelines/ML/pipeline.py
plot_node = [
    node(
        func=forecast_plot,
        inputs=dict(
            training_dataset="training_dataset",
            testing_dataset="testing_dataset",
            model=f"{city}_model",
            forecast=f"{city}_forecasts_evaluation"
        ),
        outputs=f"{city}_plot",
        name=f"{city}_forecast_plot",
        tags=["train", "forecast"]
    )
    for city in cities
]

return pipeline(
    split_train_nodes 
    + forecast_training_nodes 
    + forecast_nodes 
    + plot_node
)

Great! Now, let’s only execute our forecast_plot node since we’ve already ran the pipeline that generates our models and forecasts.

kedro run --nodes forecast_plot

That will generate the following figures:

19701980199020002010202026272829
80% HDIactualsposterior_mean1y5y10yAllBelem Temperature ForecastTemperatureDate
19701980199020002010202026272829
80% HDIactualsposterior_mean1y5y10yAllBelem Temperature ForecastTemperatureDate
1970198019902000201020201214161820222426
80% HDIactualsposterior_mean1y5y10yAllCuritiba Temperature ForecastTemperatureDate
1970198019902000201020201214161820222426
80% HDIactualsposterior_mean1y5y10yAllCuritiba Temperature ForecastTemperatureDate
1970198019902000201020202526272829
80% HDIactualsposterior_mean1y5y10yAllFortaleza Temperature ForecastTemperatureDate
1970198019902000201020202526272829
80% HDIactualsposterior_mean1y5y10yAllFortaleza Temperature ForecastTemperatureDate
197019801990200020102020202224262830
80% HDIactualsposterior_mean1y5y10yAllGoiania Temperature ForecastTemperatureDate
197019801990200020102020202224262830
80% HDIactualsposterior_mean1y5y10yAllGoiania Temperature ForecastTemperatureDate
197019801990200020102020252627282930
80% HDIactualsposterior_mean1y5y10yAllMacapa Temperature ForecastTemperatureDate
197019801990200020102020252627282930
80% HDIactualsposterior_mean1y5y10yAllMacapa Temperature ForecastTemperatureDate
19201940196019802000202026283032
80% HDIactualsposterior_mean1y5y10yAllManaus Temperature ForecastTemperatureDate
19201940196019802000202026283032
80% HDIactualsposterior_mean1y5y10yAllManaus Temperature ForecastTemperatureDate
1950196019701980199020002010202022242628
80% HDIactualsposterior_mean1y5y10yAllRecife Temperature ForecastTemperatureDate
1950196019701980199020002010202022242628
80% HDIactualsposterior_mean1y5y10yAllRecife Temperature ForecastTemperatureDate
1975198019851990199520002005201020152020202224262830
80% HDIactualsposterior_mean1y5y10yAllRio Temperature ForecastTemperatureDate
1975198019851990199520002005201020152020202224262830
80% HDIactualsposterior_mean1y5y10yAllRio Temperature ForecastTemperatureDate
19701980199020002010202022242628
80% HDIactualsposterior_mean1y5y10yAllSalvador Temperature ForecastTemperatureDate
19701980199020002010202022242628
80% HDIactualsposterior_mean1y5y10yAllSalvador Temperature ForecastTemperatureDate
1980198519901995200020052010201520202627282930
80% HDIactualsposterior_mean1y5y10yAllSao_Luiz Temperature ForecastTemperatureDate
1980198519901995200020052010201520202627282930
80% HDIactualsposterior_mean1y5y10yAllSao_Luiz Temperature ForecastTemperatureDate
1950196019701980199020002010202014161820222426
80% HDIactualsposterior_mean1y5y10yAllSao_Paulo Temperature ForecastTemperatureDate
1950196019701980199020002010202014161820222426
80% HDIactualsposterior_mean1y5y10yAllSao_Paulo Temperature ForecastTemperatureDate
197019801990200020102020202224262830
80% HDIactualsposterior_mean1y5y10yAllVitoria Temperature ForecastTemperatureDate
197019801990200020102020202224262830
80% HDIactualsposterior_mean1y5y10yAllVitoria Temperature ForecastTemperatureDate

Looks good! We now have our training and forecasting pipelines up and running.

Summary

In this section we focused on preparing a PyMC model to be production ready using the ModelBuilder class. We walked through the required methods that we need to define and adjusted and implemented new methods according to our project’s needs. Along the way we learned some more advanced features of Kedro like defining custom datasets and registering custom pipelines/sub-pipelines. We also looked at building a basic Bayesian structural time-series model.

Coming Next

We evaluated our model by looking at the RMSE and the coverage of our 80% HDI, but what if we wanted to compare different models, or different priors. With our current implementation it would be cumbersome to track these metrics. In the next part of this series we are going to talk about common MLOps principles and integrate MLFlow into our Kedro workflow. I hope to see you there!