mlflow.pipelines

MLflow Pipelines is an opinionated framework for structuring MLOps workflows that simplifies and standardizes machine learning application development and productionization. MLflow Pipelines makes it easy for data scientists to follow best practices for creating production-ready ML deliverables, allowing them to focus on developing excellent models. MLflow Pipelines also enables ML engineers and DevOps teams to seamlessly deploy these models to production and incorporate them into applications.

MLflow Pipelines provides production-quality Pipeline Templates for common ML problem types, such as regression & classification, and MLOps tasks, such as batch scoring. Pipelines are structured as git repositories with YAML-based configuration files and Python code, offering developers a declarative approach to ML application development that reduces boilerplate.

MLflow Pipelines also implements a cache-aware executor for pipeline steps, ensuring that steps are only executed when associated code or configurations have changed. This enables data scientists, ML engineers, and DevOps teams to iterate very quickly within their domains of expertise. MLflow offers run() APIs for executing pipelines, as well as an mlflow pipelines run CLI.

For more information, see the MLflow Pipelines Overview.

class mlflow.pipelines.Pipeline[source]

Note

Experimental: This class may change or be removed in a future release without warning.

A factory class that creates an instance of a pipeline for a particular ML problem (e.g. regression, classification) or MLOps task (e.g. batch scoring) based on the current working directory and supplied configuration.

Example
import os
from mlflow.pipelines import Pipeline

os.chdir("~/mlp-regression-template")
regression_pipeline = Pipeline(profile="local")
regression_pipeline.run(step="train")
static __new__(cls, profile: str)mlflow.pipelines.regression.v1.pipeline.RegressionPipeline[source]

Note

Experimental: This method may change or be removed in a future release without warning.

Creates an instance of an MLflow Pipeline for a particular ML problem or MLOps task based on the current working directory and supplied configuration. The current working directory must be the root directory of an MLflow Pipeline repository or a subdirectory of an MLflow Pipeline repository.

Parameters

profile – The name of the profile to use for configuring the problem-specific or task-specific pipeline. Profiles customize the configuration of one or more pipeline steps, and pipeline executions with different profiles often produce different results.

Returns

A pipeline for a particular ML problem or MLOps task. For example, an instance of RegressionPipeline for regression problems.

Example
import os
from mlflow.pipelines import Pipeline

os.chdir("~/mlp-regression-template")
regression_pipeline = Pipeline(profile="local")
regression_pipeline.run(step="train")

Regression Pipeline

The MLflow Regression Pipeline is an MLflow Pipeline for developing high-quality regression models. It is designed for developing models using scikit-learn and frameworks that integrate with scikit-learn, such as the XGBRegressor API from XGBoost. The corresponding pipeline template repository is available at https://github.com/mlflow/mlp-regression-template, and the RegressionPipeline API Documentation provides instructions for executing the pipeline and inspecting its results.

The training pipeline contains the following sequential steps:

ingest -> split -> transform -> train -> evaluate -> register

The batch scoring pipeline contains the following sequential steps:

ingest -> predict

The pipeline steps are defined as follows:

  • ingest
    • The ingest step resolves the dataset specified by the ‘data’ section in pipeline.yaml and converts it to parquet format, leveraging the custom dataset parsing code defined in steps/ingest.py if necessary. Subsequent steps convert this dataset into training, validation, & test sets and use them to develop a model.

      Note

      If you make changes to the dataset referenced by the ingest step (e.g. by adding new records or columns), you must manually re-run the ingest step in order to use the updated dataset in the pipeline. The ingest step does not automatically detect changes in the dataset.

  • split
    • The split step splits the ingested dataset produced by the ingest step into a training dataset for model training, a validation dataset for model performance evaluation & tuning, and a test dataset for model performance evaluation. The fraction of records allocated to each dataset is defined by the split_ratios attribute of the ‘split’ step definition in pipeline.yaml. The split step also preprocesses the datasets using logic defined in steps/split.py. Subsequent steps use these datasets to develop a model and measure its performance.

  • transform
    • The transform step uses the training dataset created by split to fit a transformer that performs the transformations defined in steps/transform.py. The transformer is then applied to the training dataset and the validation dataset, creating transformed datasets that are used by subsequent steps for estimator training and model performance evaluation.

  • train
    • The train step uses the transformed training dataset output from the transform step to fit an estimator with the type and parameters defined in steps/train.py. The estimator is then joined with the fitted transformer output from the transform step to create a model pipeline. Finally, this model pipeline is evaluated against the transformed training and validation datasets to compute performance metrics; custom metrics are computed according to definitions in steps/custom_metrics.py and the ‘metrics’ section of pipeline.yaml. The model pipeline and its associated parameters, performance metrics, and lineage information are logged to MLflow Tracking, producing an MLflow Run.

  • evaluate
    • The evaluate step evaluates the model pipeline created by the train step on the test dataset output from the split step, computing performance metrics and model explanations. Performance metrics are compared against configured thresholds to compute a model_validation_status, which indicates whether or not a model is good enough to be registered to the MLflow Model Registry by the subsequent register step. Custom performance metrics are computed according to definitions in steps/custom_metrics.py and the ‘metrics’ section of pipeline.yaml. Model performance thresholds are defined in the ‘validation_criteria’ section of the ‘evaluate’ step definition in pipeline.yaml. Model performance metrics and explanations are logged to the same MLflow Tracking Run used by the train step.

  • register
    • The register step checks the model_validation_status output of the preceding evaluate step and, if model validation was successful (as indicated by the 'VALIDATED' status), registers the model pipeline created by the train step to the MLflow Model Registry. If the model_validation_status does not indicate that the model passed validation checks (i.e. its value is 'REJECTED'), the model pipeline is not registered to the MLflow Model Registry. If the model pipeline is registered to the MLflow Model Registry, a registered_model_version is produced containing the model name and the model version.

      Note

      The model validation status check can be disabled by specifying allow_non_validated_model: true in the ‘register’ step definition of pipeline.yaml, in which case the model pipeline is always registered with the MLflow Model Registry when the register step is executed.

  • predict - The predict step

class mlflow.pipelines.regression.v1.pipeline.RegressionPipeline[source]

Note

Experimental: This class may change or be removed in a future release without warning.

A pipeline for developing high-quality regression models. The pipeline is designed for developing models using scikit-learn and frameworks that integrate with scikit-learn, such as the XGBRegressor API from XGBoost. The corresponding pipeline template repository is available at https://github.com/mlflow/mlp-regression-template. The training pipeline contains the following sequential steps:

ingest -> split -> transform -> train -> evaluate -> register

while the batch scoring pipeline contains this set of sequential steps:

ingest_scoring -> predict

Example
import os
from mlflow.pipelines import Pipeline

os.chdir("~/mlp-regression-template")
regression_pipeline = Pipeline(profile="local")
# Display a visual overview of the pipeline graph
regression_pipeline.inspect()
# Run the full pipeline
regression_pipeline.run()
# Display a summary of results from the 'train' step, including the trained model
# and associated performance metrics computed from the training & validation datasets
regression_pipeline.inspect(step="train")
# Display a summary of results from the 'evaluate' step, including model explanations
# computed from the validation dataset and metrics computed from the test dataset
regression_pipeline.inspect(step="evaluate")
clean(step: Optional[str] = None)None[source]

Removes all pipeline outputs from the cache, or removes the cached outputs of a particular pipeline step if specified. After cached outputs are cleaned for a particular step, the step will be re-executed in its entirety the next time it is run.

Parameters

step – String name of the step to clean within the pipeline. If not specified, cached outputs are removed for all pipeline steps.

Example
import os
from mlflow.pipelines import Pipeline

os.chdir("~/mlp-regression-template")
regression_pipeline = Pipeline(profile="local")
# Run the 'train' step and preceding steps
regression_pipeline.run(step="train")
# Clean the cache of the 'transform' step
regression_pipeline.clean(step="transform")
# Run the 'split' step; outputs are still cached because 'split' precedes
# 'transform' & 'train'
regression_pipeline.run(step="split")
# Run the 'train' step again; the 'transform' and 'train' steps are re-executed because:
# 1. the cache of the preceding 'transform' step was cleaned and 2. 'train' occurs after
# 'transform'. The 'ingest' and 'split' steps are not re-executed because their outputs
# are still cached
regression_pipeline.run(step="train")
get_artifact(artifact_name: str)Optional[Any][source]

Note

Experimental: This method may change or be removed in a future release without warning.

Reads an artifact from the pipeline’s outputs. Supported artifact names can be obtained by examining the pipeline graph visualization displayed by RegressionPipeline.inspect().

Parameters

artifact_name

The string name of the artifact. Supported artifact values are:

  • "ingested_data": returns the ingested dataset created in the ingest step as a pandas DataFrame.

  • "training_data": returns the training dataset created in the split step as a pandas DataFrame.

  • "validation_data": returns the validation dataset created in the split step as a pandas DataFrame.

  • "test_data": returns the test dataset created in the split step as a pandas DataFrame.

  • "ingested_scoring_data": returns the scoring dataset created in the ingest_scoring step as a pandas DataFrame.

  • "transformed_training_data": returns the transformed training dataset created in the transform step as a pandas DataFrame.

  • "transformed_validation_data": returns the transformed validation dataset created in the transform step as a pandas DataFrame.

  • "model": returns the MLflow Model pipeline created in the train step as a PyFuncModel instance.

  • "transformer": returns the scikit-learn transformer created in the transform step.

  • "run": returns the MLflow Tracking Run containing the model pipeline created in the train step and its associated parameters, as well as performance metrics and model explanations created during the train and evaluate steps.

  • "registered_model_version”: returns the MLflow Model Registry ModelVersion created by the register step.

  • "scored_data": returns the scored dataset created in the predict step as a pandas DataFrame.

Returns

An object representation of the artifact corresponding to the specified name, as described in the artifact_name parameter docstring. If the artifact is not present because its corresponding step has not been executed or its output cache has been cleaned, None is returned.

Example
import os
import pandas as pd
from mlflow.pipelines import Pipeline
from mlflow.pyfunc import PyFuncModel

os.chdir("~/mlp-regression-template")
regression_pipeline = Pipeline(profile="local")
regression_pipeline.run()
train_df: pd.DataFrame = regression_pipeline.get_artifact("training_data")
trained_model: PyFuncModel = regression_pipeline.get_artifact("model")
inspect(step: Optional[str] = None)None[source]

Displays a visual overview of the pipeline graph, or displays a summary of results from a particular pipeline step if specified. If the specified step has not been executed, nothing is displayed.

Parameters

step – String name of the pipeline step for which to display a results summary. If unspecified, a visual overview of the pipeline graph is displayed.

Example
import os
from mlflow.pipelines import Pipeline

os.chdir("~/mlp-regression-template")
regression_pipeline = Pipeline(profile="local")
# Display a visual overview of the pipeline graph.
regression_pipeline.inspect()
# Run the 'train' pipeline step
regression_pipeline.run(step="train")
# Display a summary of results from the preceding 'transform' step
regression_pipeline.inspect(step="transform")
run(step: Optional[str] = None)None[source]

Runs the full pipeline or a particular pipeline step, producing outputs and displaying a summary of results upon completion. Step outputs are cached from previous executions, and steps are only re-executed if configuration or code changes have been made to the step or to any of its dependent steps (e.g. changes to the pipeline’s pipeline.yaml file or steps/ingest.py file) since the previous execution.

Parameters

step

String name of the step to run within the regression pipeline. The step and its dependencies are executed sequentially. If a step is not specified, the entire pipeline is executed. Supported steps, in their order of execution, are:

  • "ingest": resolves the dataset specified by the data/training section in the pipeline’s configuration file (pipeline.yaml) and converts it to parquet format.

  • "ingest_scoring": resolves the dataset specified by the data/scoring section in the pipeline’s configuration file (pipeline.yaml) and converts it to parquet format.

  • "split": splits the ingested dataset produced by the ingest step into a training dataset for model training, a validation dataset for model performance evaluation & tuning, and a test dataset for model performance evaluation.

  • "transform": uses the training dataset created by the split step to fit a transformer that performs the transformations defined in the pipeline’s steps/transform.py file. Then, applies the transformer to the training dataset and the validation dataset, creating transformed datasets that are used by subsequent steps for estimator training and model performance evaluation.

  • "train": uses the transformed training dataset output from the transform step to fit an estimator with the type and parameters defined in in the pipeline’s steps/train.py file. Then, joins the estimator with the fitted transformer output from the transform step to create a model pipeline. Finally, evaluates the model pipeline against the transformed training and validation datasets to compute performance metrics.

  • "evaluate": evaluates the model pipeline created by the train step on the validation and test dataset outputs from the split step, computing performance metrics and model explanations. Then, compares performance metrics against thresholds configured in the pipeline’s pipeline.yaml configuration file to compute a model_validation_status, which indicates whether or not the model is good enough to be registered to the MLflow Model Registry by the subsequent register step.

  • "register": checks the model_validation_status output of the preceding evaluate step and, if model validation was successful (as indicated by the 'VALIDATED' status), registers the model pipeline created by the train step to the MLflow Model Registry.

  • "predict": uses the ingested dataset for scoring created by the ingest_scoring step and applies the specified model to the dataset.

Example
import os
from mlflow.pipelines import Pipeline

os.chdir("~/mlp-regression-template")
regression_pipeline = Pipeline(profile="local")
# Run the 'train' step and preceding steps
regression_pipeline.run(step="train")
# Run the 'register' step and preceding steps; the 'train' step and all steps
# prior to 'train' are not re-executed because their outputs are already cached
regression_pipeline.run(step="register")
# Run all pipeline steps; equivalent to running 'register'; no steps are re-executed
# because the outputs of all steps are already cached
regression_pipeline.run()