MLflow Models

An MLflow Model is a standard format for packaging machine learning models that can be used in a variety of downstream tools—for example, real-time serving through a REST API or batch inference on Apache Spark. The format defines a convention that lets you save a model in different “flavors” that can be understood by different downstream tools.

Storage Format

Each MLflow Model is a directory containing arbitrary files, together with an MLmodel file in the root of the directory that can define multiple flavors that the model can be viewed in.

Flavors are the key concept that makes MLflow Models powerful: they are a convention that deployment tools can use to understand the model, which makes it possible to write tools that work with models from any ML library without having to integrate each tool with each library. MLflow defines several “standard” flavors that all of its built-in deployment tools support, such as a “Python function” flavor that describes how to run the model as a Python function. However, libraries can also define and use other flavors. For example, MLflow’s mlflow.sklearn library allows loading models back as a scikit-learn Pipeline object for use in code that is aware of scikit-learn, or as a generic Python function for use in tools that just need to apply the model (for example, the mlflow deployments tool with the option -t sagemaker for deploying models to Amazon SageMaker).

All of the flavors that a particular model supports are defined in its MLmodel file in YAML format. For example, mlflow.sklearn outputs models as follows:

# Directory written by mlflow.sklearn.save_model(model, "my_model")
my_model/
├── MLmodel
├── model.pkl
├── conda.yaml
├── python_env.yaml
└── requirements.txt

And its MLmodel file describes two flavors:

time_created: 2018-05-25T17:28:53.35

flavors:
  sklearn:
    sklearn_version: 0.19.1
    pickled_model: model.pkl
  python_function:
    loader_module: mlflow.sklearn

This model can then be used with any tool that supports either the sklearn or python_function model flavor. For example, the mlflow models serve command can serve a model with the python_function or the crate (R Function) flavor:

mlflow models serve -m my_model

Note

If you wish to serve a model from inside a docker container (or to query it from another machine), you need to change the network address to 0.0.0.0 using the -h argument.

mlflow models serve -h 0.0.0.0 -m my_model

In addition, the mlflow deployments command-line tool can package and deploy models to AWS SageMaker as long as they support the python_function flavor:

mlflow deployments create -t sagemaker -m my_model [other options]

Fields in the MLmodel Format

Apart from a flavors field listing the model flavors, the MLmodel YAML format can contain the following fields:

time_created

Date and time when the model was created, in UTC ISO 8601 format.

run_id

ID of the run that created the model, if the model was saved using MLflow Tracking.

signature

model signature in JSON format.

input_example

reference to an artifact with input example.

databricks_runtime

Databricks runtime version and type, if the model was trained in a Databricks notebook or job.

mlflow_version

The version of MLflow that was used to log the model.

Additional Logged Files

For environment recreation, we automatically log conda.yaml, python_env.yaml, and requirements.txt files whenever a model is logged. These files can then be used to reinstall dependencies using conda or virtualenv with pip.

Note

Anaconda Inc. updated their terms of service for anaconda.org channels. Based on the new terms of service you may require a commercial license if you rely on Anaconda’s packaging and distribution. See Anaconda Commercial Edition FAQ for more information. Your use of any Anaconda channels is governed by their terms of service.

MLflow models logged before v1.18 were by default logged with the conda defaults channel (https://repo.anaconda.com/pkgs/) as a dependency. Because of this license change, MLflow has stopped the use of the defaults channel for models logged using MLflow v1.18 and above. The default channel logged is now conda-forge, which points at the community managed https://conda-forge.org/.

If you logged a model before MLflow v1.18 without excluding the defaults channel from the conda environment for the model, that model may have a dependency on the defaults channel that you may not have intended. To manually confirm whether a model has this dependency, you can examine channel value in the conda.yaml file that is packaged with the logged model. For example, a model’s conda.yaml with a defaults channel dependency may look like this:

name: mlflow-env
channels:
- defaults
dependencies:
- python=3.8.8
- pip
- pip:
    - mlflow==2.3
    - scikit-learn==0.23.2
    - cloudpickle==1.6.0

If you would like to change the channel used in a model’s environment, you can re-register the model to the model registry with a new conda.yaml. You can do this by specifying the channel in the conda_env parameter of log_model().

For more information on the log_model() API, see the MLflow documentation for the model flavor you are working with, for example, mlflow.sklearn.log_model().

conda.yaml

When saving a model, MLflow provides the option to pass in a conda environment parameter that can contain dependencies used by the model. If no conda environment is provided, a default environment is created based on the flavor of the model. This conda environment is then saved in conda.yaml.

python_env.yaml

This file contains the following information that’s required to restore a model environment using virtualenv:

  • Python version

  • Version specifiers for pip, setuptools, and wheel

  • Pip requirements of the model (reference to requirements.txt)

requirements.txt

The requirements file is created from the pip portion of the conda.yaml environment specification. Additional pip dependencies can be added to requirements.txt by including them as a pip dependency in a conda environment and logging the model with the environment or using the pip_requirements argument of the mlflow.<flavor>.log_model API.

The following shows an example of saving a model with a manually specified conda environment and the corresponding content of the generated conda.yaml and requirements.txt files.

conda_env = {
    "channels": ["conda-forge"],
    "dependencies": ["python=3.8.8", "pip"],
    "pip": ["mlflow==2.3", "scikit-learn==0.23.2", "cloudpickle==1.6.0"],
    "name": "mlflow-env",
}
mlflow.sklearn.log_model(..., conda_env=conda_env)

The written conda.yaml file:

name: mlflow-env
channels:
  - conda-forge
dependencies:
- python=3.8.8
- pip
- pip:
  - mlflow==2.3
  - scikit-learn==0.23.2
  - cloudpickle==1.6.0

The written python_env.yaml file:

python: 3.8.8
build_dependencies:
  - pip==21.1.3
  - setuptools==57.4.0
  - wheel==0.37.0
dependencies:
  - -r requirements.txt

The written requirements.txt file:

mlflow==2.3
scikit-learn==0.23.2
cloudpickle==1.6.0

Model Signature And Input Example

When working with ML models you often need to know some basic functional properties of the model at hand, such as “What inputs does it expect?” and “What output does it produce?”. MLflow models can include the following additional metadata about model inputs, outputs and params that can be used by downstream tooling:

Model Inference Params

Inference params are parameters that are passed to the model at inference time. These parameters do not need to be specified when training the model, but could be useful for inference. With the advances in foundational models, more often “inference configuration” is used to modify the behavior of a model. In some cases, especially popular LLMs, the same model may require different parameter configurations for different samples at inference time.

With this newly introduced feature, you can now specify a dictionary of inference params during model inference, providing a broader utility and improved control over the generated inference results, particularly for LLM use cases. By passing different params such as temperature, max_length, etc. to the model at inference time, you can easily control the output of the model.

In order to use params at inference time, a valid Model Signature with params must be defined. The params are passed to the model at inference time as a dictionary and each param value will be validated against the corresponding param type defined in the model signature. Valid param types are DataType or a list of DataType as listed below.

  • DataType.string or an array of DataType.string

  • DataType.integer or an array of DataType.integer

  • DataType.boolean or an array of DataType.boolean

  • DataType.double or an array of DataType.double

  • DataType.float or an array of DataType.float

  • DataType.long or an array of DataType.long

  • DataType.datetime or an array of DataType.datetime

Note

When validating param values, the values will be converted to python native types. For example, np.float32(0.1) will be converted to float(0.1).

A simple example of using params for model inference:

import mlflow
from mlflow.models import infer_signature


class MyModel(mlflow.pyfunc.PythonModel):
    def predict(self, ctx, model_input, params):
        return list(params.values())


params = {"str_param": "string", "int_array": [1, 2, 3]}
# params' default values are saved with ModelSignature
signature = infer_signature(["input"], params=params)

with mlflow.start_run():
    model_info = mlflow.pyfunc.log_model(
        python_model=MyModel(), artifact_path="my_model", signature=signature
    )

loaded_model = mlflow.pyfunc.load_model(model_info.model_uri)

# Not passing params -- predict with default values
loaded_predict = loaded_model.predict(["input"])
assert loaded_predict == ["string", [1, 2, 3]]

# Passing some params -- add default values
loaded_predict = loaded_model.predict(["input"], params={"str_param": "new_string"})
assert loaded_predict == ["new_string", [1, 2, 3]]

# Passing all params -- override
loaded_predict = loaded_model.predict(
    ["input"], params={"str_param": "new_string", "int_array": [4, 5, 6]}
)
assert loaded_predict == ["new_string", [4, 5, 6]]

Model Signature

Model signatures define input, output and parameters schemas for MLflow models, providing a standard interface to codify and enforce the correct use of your models. Signatures are fetched by the MLflow Tracking UI and Model Registry UI to display model inputs, outputs and params. They are also utilized by MLflow model deployment tools to validate inference inputs according to the model’s assigned signature (see the Signature enforcement section for more details).

To include a signature with your model, pass a model input example as an argument to the appropriate log_model or save_model call, e.g. sklearn.log_model(), and the model signature will be automatically inferred (see the How to log models with signatures section for more details). The model signature is stored in JSON format in the MLmodel file in your model artifacts, together with other model metadata. To set a signature on a logged or saved model, use the set_signature() API (see the How to set signatures on models section for more details).

Model Signature Types

A model signature consists on inputs and outputs schemas, each of which can be either column-based or tensor-based. Column-based schemas are a sequence of (optionally) named columns with type specified as one of the MLflow data types. Tensor-based schemas are a sequence of (optionally) named tensors with type specified as one of the numpy data types. Params schema is a sequence of ParamSpec, each of which contains name, type, default and shape fields. type field must be specified as one of the MLflow data types, and shape field should be None for scalar parameters, or (-1,) for list parameters. See some examples of constructing them below.

Column-based Signature Example

Each column-based input and output is represented by a type corresponding to one of MLflow data types and an optional name. Input columns can also be marked as optional, indicating whether they are required as input to the model or can be omitted. The following example displays a modified MLmodel file excerpt containing the model signature for a classification model trained on the Iris dataset. The input has 4 named, numeric columns and 1 named, optional string column. The output is an unnamed integer specifying the predicted class.

signature:
    inputs: '[{"name": "sepal length (cm)", "type": "double"}, {"name": "sepal width
      (cm)", "type": "double"}, {"name": "petal length (cm)", "type": "double"}, {"name":
      "petal width (cm)", "type": "double"}, {"name": "class", "type": "string", "optional": "true"}]'
    outputs: '[{"type": "integer"}]'
    params: null
Tensor-based Signature Example

Each tensor-based input and output is represented by a dtype corresponding to one of numpy data types, shape and an optional name. Tensor-based signatures do not support optional inputs. When specifying the shape, -1 is used for axes that may be variable in size. The following example displays an MLmodel file excerpt containing the model signature for a classification model trained on the MNIST dataset. The input has one named tensor where input sample is an image represented by a 28 × 28 × 1 array of float32 numbers. The output is an unnamed tensor that has 10 units specifying the likelihood corresponding to each of the 10 classes. Note that the first dimension of the input and the output is the batch size and is thus set to -1 to allow for variable batch sizes.

signature:
    inputs: '[{"name": "images", "dtype": "uint8", "shape": [-1, 28, 28, 1]}]'
    outputs: '[{"shape": [-1, 10], "dtype": "float32"}]'
    params: null
Signature with params Example

The params field is optional and is used to specify parameters that can be used for model inference. Params accept scalar values of type MLflow data types, or a list of such values. The default value of a parameter is specified by setting the default field, and the value should be of the type specified by type field. The shape field can be used to specify the shape of the value, it should be None for scalar values and (-1,) for a list.

signature:
    inputs: '[{"name": "text", "type": "string"}]'
    outputs: '[{"name": "output", "type": "string"}]'
    params: '[{"name": "temperature", "type": "float", "default": 0.5, "shape": null},
              {"name": "top_k", "type": "integer", "default": 1, "shape": null},
              {"name": "suppress_tokens", "type": "integer", "default": [101, 102], "shape": [-1]}]'

Signature Enforcement

Schema enforcement checks the provided input and params against the model’s signature and raises an exception if the input is not compatible and will issue a warning or raise an exception if the params are incompatible. This enforcement is applied in MLflow before calling the underlying model implementation, and during model inference process. Note that this enforcement only applies when using MLflow model deployment tools or when loading models as python_function. In particular, it is not applied to models that are loaded in their native format (e.g. by calling mlflow.sklearn.load_model()).

Name Ordering Enforcement

The input names are checked against the model signature. If there are any missing required inputs, MLflow will raise an exception. Missing optional inputs will not raise an exception. Extra inputs that were not declared in the signature will be ignored. If the input schema in the signature defines input names, input matching is done by name and the inputs are reordered to match the signature. If the input schema does not have input names, matching is done by position (i.e. MLflow will only check the number of inputs).

Input Type Enforcement

The input types are checked against the signature.

For models with column-based signatures (i.e DataFrame inputs), MLflow will perform safe type conversions if necessary. Generally, only conversions that are guaranteed to be lossless are allowed. For example, int -> long or int -> double conversions are ok, long -> double is not. If the types cannot be made compatible, MLflow will raise an error.

For models with tensor-based signatures, type checking is strict (i.e an exception will be thrown if the input type does not match the type specified by the schema).

Params Type and Shape Enforcement

The params types and shapes are checked against the signature.

MLflow verifies the compatibility of each parameter provided during inference by comparing its type and shape with those specified in the signature. Scalar values should have a shape of None, while list values should have a shape of (-1,). If the parameter’s type or shape is incompatible, an exception will be raised. Additionally, the value of the parameter is validated against the specified type in the signature. We attempt to convert the value to the specified type, and if this conversion fails, an MlflowException will be raised. A valid list of params is documented in Model Inference Params section. Models that have signatures and are used for inference with declared params not part of the logged signature will have a warning issued with each request and the invalid params ignored during inference.

Handling Integers With Missing Values

Integer data with missing values is typically represented as floats in Python. Therefore, data types of integer columns in Python can vary depending on the data sample. This type variance can cause schema enforcement errors at runtime since integer and float are not compatible types. For example, if your training data did not have any missing values for integer column c, its type will be integer. However, when you attempt to score a sample of the data that does include a missing value in column c, its type will be float. If your model signature specified c to have integer type, MLflow will raise an error since it can not convert float to int. Note that MLflow uses python to serve models and to deploy models to Spark, so this can affect most model deployments. The best way to avoid this problem is to declare integer columns as doubles (float64) whenever there can be missing values.

Handling Date and Timestamp

For datetime values, Python has precision built into the type. For example, datetime values with day precision have numpy type datetime64[D], while values with nanosecond precision have type datetime64[ns]. Datetime precision is ignored for column-based model signature but is enforced for tensor-based signatures.

Handling Ragged Arrays

Ragged arrays can be created in numpy and are produced with a shape of (-1,) and a dytpe of object. This will be handled by default when using infer_signature, resulting in a signature containing Tensor('object', (-1,)). A similar signature can be manually created containing a more detailed representation of a ragged array, for a more expressive signature, such as Tensor('float64', (-1, -1, -1, 3)). Enforcement will then be done on as much detail as possible given the signature provided, and will support ragged input arrays as well.

How To Log Models With Signatures

To include a signature with your model, pass a model input example to the appropriate log_model or save_model call, e.g. sklearn.log_model(), and the model signature will be automatically inferred from the input example and the model’s predicted output of the input example.

You may also include a signature object with your model by passing a signature object as an argument to your log_model or save_model call. The model signature object can be created by hand or inferred from datasets with valid model inputs (e.g. the training dataset with target column omitted), valid model outputs (e.g. model predictions generated on the training dataset), and valid model parameters (a dictionary of parameters passed to model for inference; e.g. Generation Configs for transformers).

Note

Model signatures are utilized in MLflow model deployment tools, which commonly serve the Python Function (PyFunc) flavor of MLflow models. Hence, when passing a signature object to your log_model or save_model call, it is recommended that the signature represent the inputs and outputs of the model’s PyFunc flavor. This is especially important when the model loaded as a PyFunc model has an input schema that is different from the test dataset schema (as is the case with the pmdarima model flavor).

Column-based Signature Example

The following example demonstrates how to store a model signature for a simple classifier trained on the Iris dataset:

import pandas as pd
from sklearn import datasets
from sklearn.ensemble import RandomForestClassifier
import mlflow
from mlflow.models import infer_signature

iris = datasets.load_iris()
iris_train = pd.DataFrame(iris.data, columns=iris.feature_names)
clf = RandomForestClassifier(max_depth=7, random_state=0)

with mlflow.start_run():
    clf.fit(iris_train, iris.target)
    # Take the first row of the training dataset as the model input example.
    input_example = iris_train.iloc[[0]]
    # The signature is automatically inferred from the input example and its predicted output.
    mlflow.sklearn.log_model(clf, "iris_rf", input_example=input_example)

The same signature can be explicitly created and logged as follows:

from mlflow.models import ModelSignature, infer_signature
from mlflow.types.schema import Schema, ColSpec

# Option 1: Manually construct the signature object
input_schema = Schema(
    [
        ColSpec("double", "sepal length (cm)"),
        ColSpec("double", "sepal width (cm)"),
        ColSpec("double", "petal length (cm)"),
        ColSpec("double", "petal width (cm)"),
    ]
)
output_schema = Schema([ColSpec("long")])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)

# Option 2: Infer the signature
signature = infer_signature(iris_train, clf.predict(iris_train))

with mlflow.start_run():
    mlflow.sklearn.log_model(clf, "iris_rf", signature=signature)
Tensor-based Signature Example

The following example demonstrates how to store a model signature for a simple classifier trained on the MNIST dataset:

import tensorflow as tf
import mlflow

mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

model = tf.keras.models.Sequential(
    [
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation="relu"),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10),
    ]
)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(optimizer="adam", loss=loss_fn, metrics=["accuracy"])

with mlflow.start_run():
    model.fit(x_train, y_train, epochs=5)
    # Take the first three training examples as the model input example.
    input_example = x_train[:3, :]
    mlflow.tensorflow.log_model(model, "mnist_cnn", input_example=input_example)

The same signature can be explicitly created and logged as follows:

import numpy as np
from mlflow.models import ModelSignature, infer_signature
from mlflow.types.schema import Schema, TensorSpec

# Option 1: Manually construct the signature object
input_schema = Schema(
    [
        TensorSpec(np.dtype(np.float64), (-1, 28, 28, 1)),
    ]
)
output_schema = Schema([TensorSpec(np.dtype(np.float32), (-1, 10))])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)

# Option 2: Infer the signature
signature = infer_signature(testX, model.predict(testX))

with mlflow.start_run():
    mlflow.tensorflow.log_model(model, "mnist_cnn", signature=signature)
Signature with params Example

The following example demonstrates how to store a model signature with params for a simple transformers model:

import mlflow
from mlflow.models import infer_signature
import transformers

architecture = "mrm8488/t5-base-finetuned-common_gen"
model = transformers.pipeline(
    task="text2text-generation",
    tokenizer=transformers.T5TokenizerFast.from_pretrained(architecture),
    model=transformers.T5ForConditionalGeneration.from_pretrained(architecture),
)
data = "pencil draw paper"

params = {
    "top_k": 2,
    "num_beams": 5,
    "max_length": 30,
    "temperature": 0.62,
    "top_p": 0.85,
    "repetition_penalty": 1.15,
    "begin_suppress_tokens": [1, 2, 3],
}

# infer signature with params
signature = infer_signature(
    data,
    mlflow.transformers.generate_signature_output(model, data),
    params,
)

# save model with signature
mlflow.transformers.save_model(
    model,
    "text2text",
    signature=signature,
)
pyfunc_loaded = mlflow.pyfunc.load_model("text2text")

# predict with params
result = pyfunc_loaded.predict(data, params=params)

The same signature can be created explicitly as follows:

from mlflow.models import ModelSignature
from mlflow.types.schema import ColSpec, ParamSchema, ParamSpec, Schema

input_schema = Schema([ColSpec(type="string")])
output_schema = Schema([ColSpec(type="string")])
params_schema = ParamSchema(
    [
        ParamSpec("top_k", "long", 2),
        ParamSpec("num_beams", "long", 5),
        ParamSpec("max_length", "long", 30),
        ParamSpec("temperature", "double", 0.62),
        ParamSpec("top_p", "double", 0.85),
        ParamSpec("repetition_penalty", "double", 1.15),
        ParamSpec("begin_suppress_tokens", "long", [1, 2, 3], (-1,)),
    ]
)
signature = ModelSignature(
    inputs=input_schema, outputs=output_schema, params=params_schema
)

How To Set Signatures on Models

Models can be saved with without model signatures or with incorrect ones. To add a signature to an existing logged model, use the mlflow.models.set_signature() API. Here are some examples.

Set Signature on Logged Model

The following example demonstrates how to set a model signature on a logged sklearn model. Suppose you’ve logged a sklearn model without a signature like below:

import pandas as pd
from sklearn import datasets
from sklearn.ensemble import RandomForestClassifier
import mlflow

X, y = datasets.load_iris(return_X_y=True, as_frame=True)
clf = RandomForestClassifier(max_depth=7, random_state=0)
with mlflow.start_run() as run:
    clf.fit(X, y)
    mlflow.sklearn.log_model(clf, "iris_rf")

You can set a signature on the logged model as follows:

import pandas as pd
from sklearn import datasets
import mlflow
from mlflow.models.model import get_model_info
from mlflow.models import infer_signature, set_signature

# load the logged model
model_uri = f"runs:/{run.info.run_id}/iris_rf"
model = mlflow.pyfunc.load_model(model_uri)

# construct the model signature from test dataset
X_test, _ = datasets.load_iris(return_X_y=True, as_frame=True)
signature = infer_signature(X_test, model.predict(X_test))

# set the signature for the logged model
set_signature(model_uri, signature)

# now when you load the model again, it will have the desired signature
assert get_model_info(model_uri).signature == signature

Note that model signatures can also be set on model artifacts saved outside of MLflow Tracking. As an example, you can easily set a signature on a locally saved iris model by altering the model_uri variable in the previous code snippet to point to the model’s local directory.

Set Signature on Model Version

As MLflow Model Registry artifacts are meant to be read-only, you cannot directly set a signature on a model version or model artifacts represented by models:/ URI schemes. Instead, you should first set the signature on the source model artifacts and generate a new model version using the updated model artifacts. The following example illustrates how this can be done.

Supposed you have created the following model version without a signature like below:

from sklearn.ensemble import RandomForestClassifier
import mlflow
from mlflow.client import MlflowClient

model_name = "add_signature_model"

with mlflow.start_run() as run:
    mlflow.sklearn.log_model(RandomForestClassifier(), "sklearn-model")

model_uri = f"runs:/{run.info.run_id}/sklearn-model"
mlflow.register_model(model_uri=model_uri, name=model_name)

To set a signature on the model version, create a duplicate model version with the new signature as follows:

from sklearn.ensemble import RandomForestClassifier
import mlflow
from mlflow.store.artifact.models_artifact_repo import ModelsArtifactRepository

client = mlflow.client.MlflowClient()
model_name = "add_signature_model"
model_version = 1
mv = client.get_model_version(name=model_name, version=model_version)

# set a dummy signature on the model vesion source
signature = infer_signature(np.array([1]))
set_signature(mv.source, signature)

# create a new model version with the updated source
client.create_model_version(name=model_name, source=mv.source, run_id=mv.run_id)

Note that this process overwrites the model artifacts from the source run of model version 1 with a new model signature.

Model Input Example

A model input example provides an instance of a valid model input. Input examples are stored with the model as separate artifacts and are referenced in the MLmodel file. To include an input example with your model, add it to the appropriate log_model call, e.g. sklearn.log_model(). Input examples are also used to infer model signatures in log_model calls when signatures aren’t specified.

Similar to model signatures, model inputs can be column-based (i.e DataFrames) or tensor-based (i.e numpy.ndarrays). See examples below:

How To Log Model With Column-based Example

For models accepting column-based inputs, an example can be a single record or a batch of records. The sample input can be in the following formats:

  • Pandas DataFrame

  • dict (of scalars, strings, or lists of scalar values)

  • list

  • str

  • bytes

The given example will be converted to a Pandas DataFrame and then serialized to json using the Pandas split-oriented format. Bytes are base64-encoded. The following example demonstrates how you can log a column-based input example with your model:

input_example = {
    "sepal length (cm)": 5.1,
    "sepal width (cm)": 3.5,
    "petal length (cm)": 1.4,
    "petal width (cm)": 0.2,
}
mlflow.sklearn.log_model(..., input_example=input_example)

How To Log Model With Tensor-based Example

For models accepting tensor-based inputs, an example must be a batch of inputs. By default, the axis 0 is the batch axis unless specified otherwise in the model signature. The sample input can be passed in as any of the following formats:

  • numpy ndarray

  • Python dict mapping a string to a numpy array

  • Scipy csr_matrix (sparse matrix)

  • Scipy csc_matrix (sparse matrix).

The following example demonstrates how you can log a tensor-based input example with your model:

# each input has shape (4, 4)
input_example = np.array(
    [
        [[0, 0, 0, 0], [0, 134, 25, 56], [253, 242, 195, 6], [0, 93, 82, 82]],
        [[0, 23, 46, 0], [33, 13, 36, 166], [76, 75, 0, 255], [33, 44, 11, 82]],
    ],
    dtype=np.uint8,
)
mlflow.tensorflow.log_model(..., input_example=input_example)

Model API

You can save and load MLflow Models in multiple ways. First, MLflow includes integrations with several common libraries. For example, mlflow.sklearn contains save_model, log_model, and load_model functions for scikit-learn models. Second, you can use the mlflow.models.Model class to create and write models. This class has four key functions:

  • add_flavor to add a flavor to the model. Each flavor has a string name and a dictionary of key-value attributes, where the values can be any object that can be serialized to YAML.

  • save to save the model to a local directory.

  • log to log the model as an artifact in the current run using MLflow Tracking.

  • load to load a model from a local directory or from an artifact in a previous run.

Built-In Model Flavors

MLflow provides several standard flavors that might be useful in your applications. Specifically, many of its deployment tools support these flavors, so you can export your own model in one of these flavors to benefit from all these tools:

Python Function (python_function)

The python_function model flavor serves as a default model interface for MLflow Python models. Any MLflow Python model is expected to be loadable as a python_function model. This enables other MLflow tools to work with any python model regardless of which persistence module or framework was used to produce the model. This interoperability is very powerful because it allows any Python model to be productionized in a variety of environments.

In addition, the python_function model flavor defines a generic filesystem model format for Python models and provides utilities for saving and loading models to and from this format. The format is self-contained in the sense that it includes all the information necessary to load and use a model. Dependencies are stored either directly with the model or referenced via conda environment. This model format allows other tools to integrate their models with MLflow.

How To Save Model As Python Function

Most python_function models are saved as part of other model flavors - for example, all mlflow built-in flavors include the python_function flavor in the exported models. In addition, the mlflow.pyfunc module defines functions for creating python_function models explicitly. This module also includes utilities for creating custom Python models, which is a convenient way of adding custom python code to ML models. For more information, see the custom Python models documentation.

How To Load And Score Python Function Models

You can load python_function models in Python by calling the mlflow.pyfunc.load_model() function. Note that the load_model function assumes that all dependencies are already available and will not check nor install any dependencies ( see model deployment section for tools to deploy models with automatic dependency management).

Once loaded, you can score the model by calling the predict method, which has the following signature:

predict(data: Union[pandas.(Series | DataFrame), numpy.ndarray, csc_matrix, csr_matrix, List[Any], Dict[str, Any], str],
        params: Optional[Dict[str, Any]] = None) → Union[pandas.(Series | DataFrame), numpy.ndarray, list, str]

All PyFunc models will support pandas.DataFrame as an input. In addition to pandas.DataFrame, DL PyFunc models will also support tensor inputs in the form of numpy.ndarrays. To verify whether a model flavor supports tensor inputs, please check the flavor’s documentation.

For models with a column-based schema, inputs are typically provided in the form of a pandas.DataFrame. If a dictionary mapping column name to values is provided as input for schemas with named columns or if a python List or a numpy.ndarray is provided as input for schemas with unnamed columns, MLflow will cast the input to a DataFrame. Schema enforcement and casting with respect to the expected data types is performed against the DataFrame.

For models with a tensor-based schema, inputs are typically provided in the form of a numpy.ndarray or a dictionary mapping the tensor name to its np.ndarray value. Schema enforcement will check the provided input’s shape and type against the shape and type specified in the model’s schema and throw an error if they do not match.

For models where no schema is defined, no changes to the model inputs and outputs are made. MLflow will propagate any errors raised by the model if the model does not accept the provided input type.

The python environment that a PyFunc model is loaded into for prediction or inference may differ from the environment in which it was trained. In the case of an environment mismatch, a warning message will be printed when calling mlflow.pyfunc.load_model(). This warning statement will identify the packages that have a version mismatch between those used during training and the current environment. In order to get the full dependencies of the environment in which the model was trained, you can call mlflow.pyfunc.get_model_dependencies(). Furthermore, if you want to run model inference in the same environment used in model training, you can call mlflow.pyfunc.spark_udf() with the env_manager argument set as “conda”. This will generate the environment from the conda.yaml file, ensuring that the python UDF will execute with the exact package versions that were used during training.

R Function (crate)

The crate model flavor defines a generic model format for representing an arbitrary R prediction function as an MLflow model using the crate function from the carrier package. The prediction function is expected to take a dataframe as input and produce a dataframe, a vector or a list with the predictions as output.

This flavor requires R to be installed in order to be used.

crate usage

For a minimal crate model, an example configuration for the predict function is:

library(mlflow)
library(carrier)
# Load iris dataset
data("iris")

# Learn simple linear regression model
model <- lm(Sepal.Width~Sepal.Length, data = iris)

# Define a crate model
# call package functions with an explicit :: namespace.
crate_model <- crate(
  function(new_obs)  stats::predict(model, data.frame("Sepal.Length" = new_obs)),
  model = model
)

# log the model
model_path <- mlflow_log_model(model = crate_model, artifact_path = "iris_prediction")

# load the logged model and make a prediction
model_uri <- paste0(mlflow_get_run()$artifact_uri, "/iris_prediction")
mlflow_model <- mlflow_load_model(model_uri = model_uri,
                                  flavor = NULL,
                                  client = mlflow_client())

prediction <- mlflow_predict(model = mlflow_model, data = 5)
print(prediction)

H2O (h2o)

The h2o model flavor enables logging and loading H2O models.

The mlflow.h2o module defines save_model() and log_model() methods in python, and mlflow_save_model and mlflow_log_model in R for saving H2O models in MLflow Model format. These methods produce MLflow Models with the python_function flavor, allowing you to load them as generic Python functions for inference via mlflow.pyfunc.load_model(). This loaded PyFunc model can be scored with only DataFrame input. When you load MLflow Models with the h2o flavor using mlflow.pyfunc.load_model(), the h2o.init() method is called. Therefore, the correct version of h2o(-py) must be installed in the loader’s environment. You can customize the arguments given to h2o.init() by modifying the init entry of the persisted H2O model’s YAML configuration file: model.h2o/h2o.yaml.

Finally, you can use the mlflow.h2o.load_model() method to load MLflow Models with the h2o flavor as H2O model objects.

For more information, see mlflow.h2o.

h2o pyfunc usage

For a minimal h2o model, here is an example of the pyfunc predict() method in a classification scenario :

import mlflow
import h2o

h2o.init()
from h2o.estimators.glm import H2OGeneralizedLinearEstimator

# import the prostate data
df = h2o.import_file(
    "http://s3.amazonaws.com/h2o-public-test-data/smalldata/prostate/prostate.csv.zip"
)

# convert the columns to factors
df["CAPSULE"] = df["CAPSULE"].asfactor()
df["RACE"] = df["RACE"].asfactor()
df["DCAPS"] = df["DCAPS"].asfactor()
df["DPROS"] = df["DPROS"].asfactor()

# split the data
train, test, valid = df.split_frame(ratios=[0.7, 0.15])

# generate a GLM model
glm_classifier = H2OGeneralizedLinearEstimator(
    family="binomial", lambda_=0, alpha=0.5, nfolds=5, compute_p_values=True
)

with mlflow.start_run():
    glm_classifier.train(
        y="CAPSULE", x=["AGE", "RACE", "VOL", "GLEASON"], training_frame=train
    )
    metrics = glm_classifier.model_performance()
    metrics_to_track = ["MSE", "RMSE", "r2", "logloss"]
    metrics_to_log = {
        key: value
        for key, value in metrics._metric_json.items()
        if key in metrics_to_track
    }
    params = glm_classifier.params
    mlflow.log_params(params)
    mlflow.log_metrics(metrics_to_log)
    model_info = mlflow.h2o.log_model(glm_classifier, artifact_path="h2o_model_info")

# load h2o model and make a prediction
h2o_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
test_df = test.as_data_frame()
predictions = h2o_pyfunc.predict(test_df)
print(predictions)

# it is also possible to load the model and predict using h2o methods on the h2o frame

# h2o_model = mlflow.h2o.load_model(model_info.model_uri)
# predictions = h2o_model.predict(test)

Keras (keras)

The keras model flavor enables logging and loading Keras models. It is available in both Python and R clients. In R, you can save or log the model using mlflow_save_model and mlflow_log_model. These functions serialize Keras models models as HDF5 files using the Keras library’s built-in model persistence functions. You can use mlflow_load_model function in R to load MLflow Models with the keras flavor as Keras Model objects.

Keras pyfunc usage

For a minimal Sequential model, an example configuration for the pyfunc predict() method is:

import mlflow
import numpy as np
import pathlib
import shutil
from tensorflow import keras

mlflow.tensorflow.autolog()

X = np.array([-2, -1, 0, 1, 2, 1]).reshape(-1, 1)
y = np.array([0, 0, 1, 1, 1, 0])
model = keras.Sequential(
    [
        keras.Input(shape=(1,)),
        keras.layers.Dense(1, activation="sigmoid"),
    ]
)
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
model.fit(X, y, batch_size=3, epochs=5, validation_split=0.2)

local_artifact_dir = "/tmp/mlflow/keras_model"
pathlib.Path(local_artifact_dir).mkdir(parents=True, exist_ok=True)

model_uri = f"runs:/{mlflow.last_active_run().info.run_id}/model"
keras_pyfunc = mlflow.pyfunc.load_model(
    model_uri=model_uri, dst_path=local_artifact_dir
)

data = np.array([-4, 1, 0, 10, -2, 1]).reshape(-1, 1)
predictions = keras_pyfunc.predict(data)

shutil.rmtree(local_artifact_dir)

MLeap (mleap)

Warning

The mleap model flavor is deprecated as of MLflow 2.6.0 and will be removed in a future release.

The mleap model flavor supports saving Spark models in MLflow format using the MLeap persistence mechanism. MLeap is an inference-optimized format and execution engine for Spark models that does not depend on SparkContext to evaluate inputs.

Note

You can save Spark models in MLflow format with the mleap flavor by specifying the sample_input argument of the mlflow.spark.save_model() or mlflow.spark.log_model() method (recommended). For more details see Spark MLlib.

The mlflow.mleap module also defines save_model() and log_model() methods for saving MLeap models in MLflow format, but these methods do not include the python_function flavor in the models they produce. Similarly, mleap models can be saved in R with mlflow_save_model and loaded with mlflow_load_model, with mlflow_save_model requiring sample_input to be specified as a sample Spark dataframe containing input data to the model is required by MLeap for data schema inference.

A companion module for loading MLflow Models with the MLeap flavor is available in the mlflow/java package.

For more information, see mlflow.spark, mlflow.mleap, and the MLeap documentation.

PyTorch (pytorch)

The pytorch model flavor enables logging and loading PyTorch models.

The mlflow.pytorch module defines utilities for saving and loading MLflow Models with the pytorch flavor. You can use the mlflow.pytorch.save_model() and mlflow.pytorch.log_model() methods to save PyTorch models in MLflow format; both of these functions use the torch.save() method to serialize PyTorch models. Additionally, you can use the mlflow.pytorch.load_model() method to load MLflow Models with the pytorch flavor as PyTorch model objects. This loaded PyFunc model can be scored with both DataFrame input and numpy array input. Finally, models produced by mlflow.pytorch.save_model() and mlflow.pytorch.log_model() contain the python_function flavor, allowing you to load them as generic Python functions for inference via mlflow.pyfunc.load_model().

Note

When using the PyTorch flavor, if a GPU is available at prediction time, the default GPU will be used to run inference. To disable this behavior, users can use the MLFLOW_DEFAULT_PREDICTION_DEVICE or pass in a device with the device parameter for the predict function.

Note

In case of multi gpu training, ensure to save the model only with global rank 0 gpu. This avoids logging multiple copies of the same model.

PyTorch pyfunc usage

For a minimal PyTorch model, an example configuration for the pyfunc predict() method is:

import numpy as np
import mlflow
from mlflow.models import infer_signature
import torch
from torch import nn


net = nn.Linear(6, 1)
loss_function = nn.L1Loss()
optimizer = torch.optim.Adam(net.parameters(), lr=1e-4)

X = torch.randn(6)
y = torch.randn(1)

epochs = 5
for epoch in range(epochs):
    optimizer.zero_grad()
    outputs = net(X)

    loss = loss_function(outputs, y)
    loss.backward()

    optimizer.step()

with mlflow.start_run() as run:
    signature = infer_signature(X.numpy(), net(X).detach().numpy())
    model_info = mlflow.pytorch.log_model(net, "model", signature=signature)

pytorch_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)

predictions = pytorch_pyfunc.predict(torch.randn(6).numpy())
print(predictions)

For more information, see mlflow.pytorch.

Scikit-learn (sklearn)

The sklearn model flavor provides an easy-to-use interface for saving and loading scikit-learn models. The mlflow.sklearn module defines save_model() and log_model() functions that save scikit-learn models in MLflow format, using either Python’s pickle module (Pickle) or CloudPickle for model serialization. These functions produce MLflow Models with the python_function flavor, allowing them to be loaded as generic Python functions for inference via mlflow.pyfunc.load_model(). This loaded PyFunc model can only be scored with DataFrame input. Finally, you can use the mlflow.sklearn.load_model() method to load MLflow Models with the sklearn flavor as scikit-learn model objects.

Scikit-learn pyfunc usage

For a Scikit-learn LogisticRegression model, an example configuration for the pyfunc predict() method is:

import mlflow
from mlflow.models import infer_signature
import numpy as np
from sklearn.linear_model import LogisticRegression

with mlflow.start_run():
    X = np.array([-2, -1, 0, 1, 2, 1]).reshape(-1, 1)
    y = np.array([0, 0, 1, 1, 1, 0])
    lr = LogisticRegression()
    lr.fit(X, y)
    signature = infer_signature(X, lr.predict(X))

    model_info = mlflow.sklearn.log_model(
        sk_model=lr, artifact_path="model", signature=signature
    )

sklearn_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)

data = np.array([-4, 1, 0, 10, -2, 1]).reshape(-1, 1)

predictions = sklearn_pyfunc.predict(data)

For more information, see mlflow.sklearn.

Spark MLlib (spark)

The spark model flavor enables exporting Spark MLlib models as MLflow Models.

The mlflow.spark module defines

MLflow Models produced by these functions contain the python_function flavor, allowing you to load them as generic Python functions via mlflow.pyfunc.load_model(). This loaded PyFunc model can only be scored with DataFrame input. When a model with the spark flavor is loaded as a Python function via mlflow.pyfunc.load_model(), a new SparkContext is created for model inference; additionally, the function converts all Pandas DataFrame inputs to Spark DataFrames before scoring. While this initialization overhead and format translation latency is not ideal for high-performance use cases, it enables you to easily deploy any MLlib PipelineModel to any production environment supported by MLflow (SageMaker, AzureML, etc).

Spark MLlib pyfunc usage

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
import mlflow

# Prepare training data from a list of (label, features) tuples.
spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()
training = spark.createDataFrame(
    [
        (1.0, Vectors.dense([0.0, 1.1, 0.1])),
        (0.0, Vectors.dense([2.0, 1.0, -1.0])),
        (0.0, Vectors.dense([2.0, 1.3, 1.0])),
        (1.0, Vectors.dense([0.0, 1.2, -0.5])),
    ],
    ["label", "features"],
)

# Create and fit a LogisticRegression instance
lr = LogisticRegression(maxIter=10, regParam=0.01)
lr_model = lr.fit(training)

# Serialize the Model
with mlflow.start_run():
    model_info = mlflow.spark.log_model(lr_model, "spark-model")

# Load saved model
lr_model_saved = mlflow.pyfunc.load_model(model_info.model_uri)

# Make predictions on test data.
# The DataFrame used in the predict method must be a Pandas DataFrame
test = spark.createDataFrame(
    [
        (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
        (0.0, Vectors.dense([3.0, 2.0, -0.1])),
        (1.0, Vectors.dense([0.0, 2.2, -1.5])),
    ],
    ["label", "features"],
).toPandas()

prediction = lr_model_saved.predict(test)

Note

Note that when the sample_input parameter is provided to log_model() or save_model(), the Spark model is automatically saved as an mleap flavor by invoking mlflow.mleap.add_to_model().

For example, the follow code block:

training_df = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0) ], ["id", "text", "label"])

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training_df)

mlflow.spark.log_model(model, "spark-model", sample_input=training_df)

results in the following directory structure logged to the MLflow Experiment:

# Directory written by with the addition of mlflow.mleap.add_to_model(model, "spark-model", training_df)
# Note the addition of the mleap directory
spark-model/
├── mleap
├── sparkml
├── MLmodel
├── conda.yaml
├── python_env.yaml
└── requirements.txt

For more information, see mlflow.mleap.

For more information, see mlflow.spark.

TensorFlow (tensorflow)

The simple example below shows how to log params and metrics in mlflow for a custom training loop using low-level TensorFlow API. See tf-keras-example. for an example of mlflow and tf.keras models.

import numpy as np
import tensorflow as tf

import mlflow

x = np.linspace(-4, 4, num=512)
y = 3 * x + 10

# estimate w and b where y = w * x + b
learning_rate = 0.1
x_train = tf.Variable(x, trainable=False, dtype=tf.float32)
y_train = tf.Variable(y, trainable=False, dtype=tf.float32)

# initial values
w = tf.Variable(1.0)
b = tf.Variable(1.0)

with mlflow.start_run():
    mlflow.log_param("learning_rate", learning_rate)

    for i in range(1000):
        with tf.GradientTape(persistent=True) as tape:
            # calculate MSE = 0.5 * (y_predict - y_train)^2
            y_predict = w * x_train + b
            loss = 0.5 * tf.reduce_mean(tf.square(y_predict - y_train))
            mlflow.log_metric("loss", value=loss.numpy(), step=i)

        # Update the trainable variables
        # w = w - learning_rate * gradient of loss function w.r.t. w
        # b = b - learning_rate * gradient of loss function w.r.t. b
        w.assign_sub(learning_rate * tape.gradient(loss, w))
        b.assign_sub(learning_rate * tape.gradient(loss, b))

print(f"W = {w.numpy():.2f}, b = {b.numpy():.2f}")

ONNX (onnx)

The onnx model flavor enables logging of ONNX models in MLflow format via the mlflow.onnx.save_model() and mlflow.onnx.log_model() methods. These methods also add the python_function flavor to the MLflow Models that they produce, allowing the models to be interpreted as generic Python functions for inference via mlflow.pyfunc.load_model(). This loaded PyFunc model can be scored with both DataFrame input and numpy array input. The python_function representation of an MLflow ONNX model uses the ONNX Runtime execution engine for evaluation. Finally, you can use the mlflow.onnx.load_model() method to load MLflow Models with the onnx flavor in native ONNX format.

For more information, see mlflow.onnx and http://onnx.ai/.

ONNX pyfunc usage example

For an ONNX model, an example configuration that uses pytorch to train a dummy model, converts it to ONNX, logs to mlflow and makes a prediction using pyfunc predict() method is:

import numpy as np
import mlflow
from mlflow.models import infer_signature
import onnx
import torch
from torch import nn

# define a torch model
net = nn.Linear(6, 1)
loss_function = nn.L1Loss()
optimizer = torch.optim.Adam(net.parameters(), lr=1e-4)

X = torch.randn(6)
y = torch.randn(1)

# run model training
epochs = 5
for epoch in range(epochs):
    optimizer.zero_grad()
    outputs = net(X)

    loss = loss_function(outputs, y)
    loss.backward()

    optimizer.step()

# convert model to ONNX and load it
torch.onnx.export(net, X, "model.onnx")
onnx_model = onnx.load_model("model.onnx")

# log the model into a mlflow run
with mlflow.start_run():
    signature = infer_signature(X.numpy(), net(X).detach().numpy())
    model_info = mlflow.onnx.log_model(onnx_model, "model", signature=signature)

# load the logged model and make a prediction
onnx_pyfunc = mlflow.pyfunc.load_model(model_info.model_uri)

predictions = onnx_pyfunc.predict(X.numpy())
print(predictions)

MXNet Gluon (gluon)

Warning

The gluon model flavor is deprecated and will be removed in a future release.

The gluon model flavor enables logging of Gluon models in MLflow format via the mlflow.gluon.save_model() and mlflow.gluon.log_model() methods. These methods also add the python_function flavor to the MLflow Models that they produce, allowing the models to be interpreted as generic Python functions for inference via mlflow.pyfunc.load_model(). This loaded PyFunc model can be scored with both DataFrame input and numpy array input. You can also use the mlflow.gluon.load_model() method to load MLflow Models with the gluon flavor in native Gluon format.

Gluon pyfunc usage

For a minimal gluon model, here is an example of the pyfunc predict() method with a logistic regression model :

import mlflow
import mxnet as mx
from mxnet import nd, autograd, gluon
from mxnet.gluon import nn, Trainer
from mxnet.gluon.data import DataLoader, ArrayDataset
import numpy as np

# this example requires a compatible version of numpy : numpy == 1.23.1
# `pip uninstall numpy`  `python -m pip install numpy==1.23.1`


def get_random_data(size, ctx):
    x = nd.normal(0, 1, shape=(size, 10), ctx=ctx)
    y = x.sum(axis=1) > 3
    return x, y


# use cpu for this example, gpu could be used with ctx=gpu()
ctx = mx.cpu()
train_data_size = 1000
val_data_size = 100
batch_size = 10

train_x, train_ground_truth_class = get_random_data(train_data_size, ctx)
train_dataset = ArrayDataset(train_x, train_ground_truth_class)
train_dataloader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
)

val_x, val_ground_truth_class = get_random_data(val_data_size, ctx)
val_dataset = ArrayDataset(val_x, val_ground_truth_class)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

net = nn.HybridSequential()

with net.name_scope():
    net.add(nn.Dense(units=10, activation="relu"))  # input layer
    net.add(nn.Dense(units=10, activation="relu"))  # inner layer 1
    net.add(nn.Dense(units=10, activation="relu"))  # inner layer 2
    net.add(nn.Dense(units=1))  # output layer: must have only 1 neuron

net.initialize(mx.init.Xavier())

loss = gluon.loss.SigmoidBinaryCrossEntropyLoss()
trainer = Trainer(
    params=net.collect_params(),
    optimizer="sgd",
    optimizer_params={"learning_rate": 0.1},
)

accuracy = mx.metric.Accuracy()
f1 = mx.metric.F1()
threshold = 0.5


def train_model():
    cumulative_train_loss = 0

    for i, (data, label) in enumerate(train_dataloader):
        with autograd.record():
            # do forward pass on a batch of training data
            output = net(data)
            # calculate loss for the training data batch
            loss_result = loss(output, label)
        # calculate gradients
        loss_result.backward()
        # update parameters of the network
        trainer.step(batch_size)
        # sum losses of every batch
        cumulative_train_loss += nd.sum(loss_result).asscalar()

    return cumulative_train_loss


def validate_model(threshold):
    cumulative_val_loss = 0

    for i, (val_data, val_ground_truth_class) in enumerate(val_dataloader):
        # do forward pass on a batch of validation data
        output = net(val_data)
        # calculate cumulative validation loss
        cumulative_val_loss += nd.sum(loss(output, val_ground_truth_class)).asscalar()
        # prediction as a sigmoid
        prediction = net(val_data).sigmoid()
        # converting neuron outputs to classes
        predicted_classes = mx.nd.ceil(prediction - threshold)
        # update validation accuracy
        accuracy.update(val_ground_truth_class, predicted_classes.reshape(-1))
        # calculate probabilities of belonging to different classes
        prediction = prediction.reshape(-1)
        probabilities = mx.nd.stack(1 - prediction, prediction, axis=1)

        f1.update(val_ground_truth_class, probabilities)

    return cumulative_val_loss


# train model and get metrics
cumulative_train_loss = train_model()
cumulative_val_loss = validate_model(threshold)
net.collect_params().initialize()
metrics_to_log = {
    "training_loss": cumulative_train_loss,
    "val_loss": cumulative_val_loss,
    "f1": f1.get()[1],
    "accuracy": accuracy.get()[1],
}
params_to_log = {"learning_rate": trainer.learning_rate, "threshold": threshold}

# the model needs to be hybridized and run forward at least once before export is called
net.hybridize()
net.forward(train_x)

with mlflow.start_run():
    mlflow.log_params(params_to_log)
    mlflow.log_metrics(metrics_to_log)
    model_info = mlflow.gluon.log_model(net, "model")

# load the model
pytorch_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)

# make a prediction
X = np.random.randn(10, 10)
predictions = pytorch_pyfunc.predict(X)
print(predictions)

For more information, see mlflow.gluon.

XGBoost (xgboost)

The xgboost model flavor enables logging of XGBoost models in MLflow format via the mlflow.xgboost.save_model() and mlflow.xgboost.log_model() methods in python and mlflow_save_model and mlflow_log_model in R respectively. These methods also add the python_function flavor to the MLflow Models that they produce, allowing the models to be interpreted as generic Python functions for inference via mlflow.pyfunc.load_model(). This loaded PyFunc model can only be scored with DataFrame input. You can also use the mlflow.xgboost.load_model() method to load MLflow Models with the xgboost model flavor in native XGBoost format.

Note that the xgboost model flavor only supports an instance of xgboost.Booster, not models that implement the scikit-learn API.

XGBoost pyfunc usage

The example below

  • Loads the IRIS dataset from scikit-learn

  • Trains an XGBoost Classifier

  • Logs the model and params using mlflow

  • Loads the logged model and makes predictions

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
import mlflow
from mlflow.models import infer_signature

data = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    data["data"], data["target"], test_size=0.2
)

xgb_classifier = XGBClassifier(
    n_estimators=10,
    max_depth=3,
    learning_rate=1,
    objective="binary:logistic",
    random_state=123,
)

# log fitted model and XGBClassifier parameters
with mlflow.start_run():
    xgb_classifier.fit(X_train, y_train)
    clf_params = xgb_classifier.get_xgb_params()
    mlflow.log_params(clf_params)
    signature = infer_signature(X_train, xgb_classifier.predict(X_train))
    model_info = mlflow.xgboost.log_model(
        xgb_classifier, "iris-classifier", signature=signature
    )

# Load saved model and make predictions
xgb_classifier_saved = mlflow.pyfunc.load_model(model_info.model_uri)
y_pred = xgb_classifier_saved.predict(X_test)

For more information, see mlflow.xgboost.

LightGBM (lightgbm)

The lightgbm model flavor enables logging of LightGBM models in MLflow format via the mlflow.lightgbm.save_model() and mlflow.lightgbm.log_model() methods. These methods also add the python_function flavor to the MLflow Models that they produce, allowing the models to be interpreted as generic Python functions for inference via mlflow.pyfunc.load_model(). You can also use the mlflow.lightgbm.load_model() method to load MLflow Models with the lightgbm model flavor in native LightGBM format.

Note that the scikit-learn API for LightGBM is now supported. For more information, see mlflow.lightgbm.

LightGBM pyfunc usage

The example below

  • Loads the IRIS dataset from scikit-learn

  • Trains a LightGBM LGBMClassifier

  • Logs the model and feature importance’s using mlflow

  • Loads the logged model and makes predictions

from lightgbm import LGBMClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import mlflow
from mlflow.models import infer_signature

data = load_iris()

# Remove special characters from feature names to be able to use them as keys for mlflow metrics
feature_names = [
    name.replace(" ", "_").replace("(", "").replace(")", "")
    for name in data["feature_names"]
]
X_train, X_test, y_train, y_test = train_test_split(
    data["data"], data["target"], test_size=0.2
)
# create model instance
lgb_classifier = LGBMClassifier(
    n_estimators=10,
    max_depth=3,
    learning_rate=1,
    objective="binary:logistic",
    random_state=123,
)

# Fit and save model and LGBMClassifier feature importances as mlflow metrics
with mlflow.start_run():
    lgb_classifier.fit(X_train, y_train)
    feature_importances = dict(zip(feature_names, lgb_classifier.feature_importances_))
    feature_importance_metrics = {
        f"feature_importance_{feature_name}": imp_value
        for feature_name, imp_value in feature_importances.items()
    }
    mlflow.log_metrics(feature_importance_metrics)
    signature = infer_signature(X_train, lgb_classifier.predict(X_train))
    model_info = mlflow.lightgbm.log_model(
        lgb_classifier, "iris-classifier", signature=signature
    )

# Load saved model and make predictions
lgb_classifier_saved = mlflow.pyfunc.load_model(model_info.model_uri)
y_pred = lgb_classifier_saved.predict(X_test)
print(y_pred)

CatBoost (catboost)

The catboost model flavor enables logging of CatBoost models in MLflow format via the mlflow.catboost.save_model() and mlflow.catboost.log_model() methods. These methods also add the python_function flavor to the MLflow Models that they produce, allowing the models to be interpreted as generic Python functions for inference via mlflow.pyfunc.load_model(). You can also use the mlflow.catboost.load_model() method to load MLflow Models with the catboost model flavor in native CatBoost format.

For more information, see mlflow.catboost.

CatBoost pyfunc usage

For a CatBoost Classifier model, an example configuration for the pyfunc predict() method is:

import mlflow
from mlflow.models import infer_signature
from catboost import CatBoostClassifier
from sklearn import datasets

# prepare data
X, y = datasets.load_wine(as_frame=False, return_X_y=True)

# train the model
model = CatBoostClassifier(
    iterations=5,
    loss_function="MultiClass",
    allow_writing_files=False,
)
model.fit(X, y)

# create model signature
predictions = model.predict(X)
signature = infer_signature(X, predictions)

# log the model into a mlflow run
with mlflow.start_run():
    model_info = mlflow.catboost.log_model(model, "model", signature=signature)

# load the logged model and make a prediction
catboost_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
print(catboost_pyfunc.predict(X[:5]))

Spacy(spaCy)

The spaCy model flavor enables logging of spaCy models in MLflow format via the mlflow.spacy.save_model() and mlflow.spacy.log_model() methods. Additionally, these methods add the python_function flavor to the MLflow Models that they produce, allowing the models to be interpreted as generic Python functions for inference via mlflow.pyfunc.load_model(). This loaded PyFunc model can only be scored with DataFrame input. You can also use the mlflow.spacy.load_model() method to load MLflow Models with the spacy model flavor in native spaCy format.

For more information, see mlflow.spacy.

Spacy pyfunc usage

The example below shows how to train a Spacy TextCategorizer model, log the model artifact and metrics to the mlflow tracking server and then load the saved model to make predictions. For this example, we will be using the Polarity 2.0 dataset available in the nltk package. This dataset consists of 10000 positive and 10000 negative short movie reviews.

First we convert the texts and sentiment labels (“pos” or “neg”) from NLTK native format to Spacy’s DocBin format:

import pandas as pd
import spacy
from nltk.corpus import movie_reviews
from spacy import Language
from spacy.tokens import DocBin

nltk.download("movie_reviews")


def get_sentences(sentiment_type: str) -> pd.DataFrame:
    """Reconstruct the sentences from the word lists for each review record for a specific ``sentiment_type``
    as a pandas DataFrame with two columns: 'sentence' and 'sentiment'.
    """
    file_ids = movie_reviews.fileids(sentiment_type)
    sent_df = []
    for file_id in file_ids:
        sentence = " ".join(movie_reviews.words(file_id))
        sent_df.append({"sentence": sentence, "sentiment": sentiment_type})
    return pd.DataFrame(sent_df)


def convert(data_df: pd.DataFrame, target_file: str):
    """Convert a DataFrame with 'sentence' and 'sentiment' columns to a
    spacy DocBin object and save it to 'target_file'.
    """
    nlp = spacy.blank("en")
    sentiment_labels = data_df.sentiment.unique()
    spacy_doc = DocBin()

    for _, row in data_df.iterrows():
        sent_tokens = nlp.make_doc(row["sentence"])
        # To train a Spacy TextCategorizer model, the label must be attached to the "cats" dictionary of the "Doc"
        # object, e.g. {"pos": 1.0, "neg": 0.0} for a "pos" label.
        for label in sentiment_labels:
            sent_tokens.cats[label] = 1.0 if label == row["sentiment"] else 0.0
        spacy_doc.add(sent_tokens)

    spacy_doc.to_disk(target_file)


# Build a single DataFrame with both positive and negative reviews, one row per review
review_data = [get_sentences(sentiment_type) for sentiment_type in ("pos", "neg")]
review_data = pd.concat(review_data, axis=0)

# Split the DataFrame into a train and a dev set
train_df = review_data.groupby("sentiment", group_keys=False).apply(
    lambda x: x.sample(frac=0.7, random_state=100)
)
dev_df = review_data.loc[review_data.index.difference(train_df.index), :]

# Save the train and dev data files to the current directory as "corpora.train" and "corpora.dev", respectively
convert(train_df, "corpora.train")
convert(dev_df, "corpora.dev")

To set up the training job, we first need to generate a configuration file as described in the Spacy Documentation For simplicity, we will only use a TextCategorizer in the pipeline.

python -m spacy init config --pipeline textcat --lang en mlflow-textcat.cfg

Change the default train and dev paths in the config file to the current directory:

  [paths]
- train = null
- dev = null
+ train = "."
+ dev = "."

In Spacy, the training loop is defined internally in Spacy’s code. Spacy provides a “logging” extension point where we can use mlflow. To do this,

  • We have to define a function to write metrics / model input to mlfow

  • Register it as a logger in Spacy’s component registry

  • Change the default console logger in the Spacy’s configuration file (mlflow-textcat.cfg)

from typing import IO, Callable, Tuple, Dict, Any, Optional
import spacy
from spacy import Language
import mlflow


@spacy.registry.loggers("mlflow_logger.v1")
def mlflow_logger():
    """Returns a function, ``setup_logger`` that returns two functions:

    * ``log_step`` is called internally by Spacy for every evaluation step. We can log the intermediate train and
    validation scores to the mlflow tracking server here.
    * ``finalize``: is called internally by Spacy after training is complete. We can log the model artifact to the
    mlflow tracking server here.
    """

    def setup_logger(
        nlp: Language,
        stdout: IO = sys.stdout,
        stderr: IO = sys.stderr,
    ) -> Tuple[Callable, Callable]:
        def log_step(info: Optional[Dict[str, Any]]):
            if info:
                step = info["step"]
                score = info["score"]
                metrics = {}

                for pipe_name in nlp.pipe_names:
                    loss = info["losses"][pipe_name]
                    metrics[f"{pipe_name}_loss"] = loss
                    metrics[f"{pipe_name}_score"] = score
                mlflow.log_metrics(metrics, step=step)

        def finalize():
            uri = mlflow.spacy.log_model(nlp, "mlflow_textcat_example")
            mlflow.end_run()

        return log_step, finalize

    return setup_logger

Check the spacy-loggers library <https://pypi.org/project/spacy-loggers/> _ for a more complete implementation.

Point to our mlflow logger in Spacy configuration file. For this example, we will lower the number of training steps and eval frequency:

  [training.logger]
- @loggers = "spacy.ConsoleLogger.v1"
- dev = null
+ @loggers = "mlflow_logger.v1"

  [training]
- max_steps = 20000
- eval_frequency = 100
+ max_steps = 100
+ eval_frequency = 10

Train our model:

from spacy.cli.train import train as spacy_train

spacy_train("mlflow-textcat.cfg")

To make predictions, we load the saved model from the last run:

from mlflow import MlflowClient

# look up the last run info from mlflow
client = MlflowClient()
last_run = client.search_runs(experiment_ids=["0"], max_results=1)[0]

# We need to append the spacy model directory name to the artifact uri
spacy_model = mlflow.pyfunc.load_model(
    f"{last_run.info.artifact_uri}/mlflow_textcat_example"
)
predictions_in = dev_df.loc[:, ["sentence"]]
predictions_out = spacy_model.predict(predictions_in).squeeze().tolist()
predicted_labels = [
    "pos" if row["pos"] > row["neg"] else "neg" for row in predictions_out
]
print(dev_df.assign(predicted_sentiment=predicted_labels))

Fastai(fastai)

The fastai model flavor enables logging of fastai Learner models in MLflow format via the mlflow.fastai.save_model() and mlflow.fastai.log_model() methods. Additionally, these methods add the python_function flavor to the MLflow Models that they produce, allowing the models to be interpreted as generic Python functions for inference via mlflow.pyfunc.load_model(). This loaded PyFunc model can only be scored with DataFrame input. You can also use the mlflow.fastai.load_model() method to load MLflow Models with the fastai model flavor in native fastai format.

The interface for utilizing a fastai model loaded as a pyfunc type for generating predictions uses a Pandas DataFrame argument.

This example runs the fastai tabular tutorial, logs the experiments, saves the model in fastai format and loads the model to get predictions using a fastai data loader:

from fastai.data.external import URLs, untar_data
from fastai.tabular.core import Categorify, FillMissing, Normalize, TabularPandas
from fastai.tabular.data import TabularDataLoaders
from fastai.tabular.learner import tabular_learner
from fastai.data.transforms import RandomSplitter
from fastai.metrics import accuracy
from fastcore.basics import range_of
import pandas as pd
import mlflow
import mlflow.fastai


def print_auto_logged_info(r):
    tags = {k: v for k, v in r.data.tags.items() if not k.startswith("mlflow.")}
    artifacts = [
        f.path for f in mlflow.MlflowClient().list_artifacts(r.info.run_id, "model")
    ]
    print(f"run_id: {r.info.run_id}")
    print(f"artifacts: {artifacts}")
    print(f"params: {r.data.params}")
    print(f"metrics: {r.data.metrics}")
    print(f"tags: {tags}")


def main(epochs=5, learning_rate=0.01):
    path = untar_data(URLs.ADULT_SAMPLE)
    path.ls()

    df = pd.read_csv(path / "adult.csv")

    dls = TabularDataLoaders.from_csv(
        path / "adult.csv",
        path=path,
        y_names="salary",
        cat_names=[
            "workclass",
            "education",
            "marital-status",
            "occupation",
            "relationship",
            "race",
        ],
        cont_names=["age", "fnlwgt", "education-num"],
        procs=[Categorify, FillMissing, Normalize],
    )

    splits = RandomSplitter(valid_pct=0.2)(range_of(df))

    to = TabularPandas(
        df,
        procs=[Categorify, FillMissing, Normalize],
        cat_names=[
            "workclass",
            "education",
            "marital-status",
            "occupation",
            "relationship",
            "race",
        ],
        cont_names=["age", "fnlwgt", "education-num"],
        y_names="salary",
        splits=splits,
    )

    dls = to.dataloaders(bs=64)

    model = tabular_learner(dls, metrics=accuracy)

    mlflow.fastai.autolog()

    with mlflow.start_run() as run:
        model.fit(5, 0.01)
        mlflow.fastai.log_model(model, "model")

    print_auto_logged_info(mlflow.get_run(run_id=run.info.run_id))

    model_uri = f"runs:/{run.info.run_id}/model"
    loaded_model = mlflow.fastai.load_model(model_uri)

    test_df = df.copy()
    test_df.drop(["salary"], axis=1, inplace=True)
    dl = learn.dls.test_dl(test_df)

    predictions, _ = loaded_model.get_preds(dl=dl)
    px = pd.DataFrame(predictions).astype("float")
    px.head(5)


main()

Output (Pandas DataFrame):

Index

Probability of first class

Probability of second class

0

0.545088

0.454912

1

0.503172

0.496828

2

0.962663

0.037337

3

0.206107

0.793893

4

0.807599

0.192401

Alternatively, when using the python_function flavor, get predictions from a DataFrame.

from fastai.data.external import URLs, untar_data
from fastai.tabular.core import Categorify, FillMissing, Normalize, TabularPandas
from fastai.tabular.data import TabularDataLoaders
from fastai.tabular.learner import tabular_learner
from fastai.data.transforms import RandomSplitter
from fastai.metrics import accuracy
from fastcore.basics import range_of
import pandas as pd
import mlflow
import mlflow.fastai

model_uri = ...

path = untar_data(URLs.ADULT_SAMPLE)
df = pd.read_csv(path / "adult.csv")
test_df = df.copy()
test_df.drop(["salary"], axis=1, inplace=True)

loaded_model = mlflow.pyfunc.load_model(model_uri)
loaded_model.predict(test_df)

Output (Pandas DataFrame):

Index

Probability of first class, Probability of second class

0

[0.5450878, 0.45491222]

1

[0.50317234, 0.49682766]

2

[0.9626626, 0.037337445]

3

[0.20610662, 0.7938934]

4

[0.8075987, 0.19240129]

For more information, see mlflow.fastai.

Statsmodels (statsmodels)

The statsmodels model flavor enables logging of Statsmodels models in MLflow format via the mlflow.statsmodels.save_model() and mlflow.statsmodels.log_model() methods. These methods also add the python_function flavor to the MLflow Models that they produce, allowing the models to be interpreted as generic Python functions for inference via mlflow.pyfunc.load_model(). This loaded PyFunc model can only be scored with DataFrame input. You can also use the mlflow.statsmodels.load_model() method to load MLflow Models with the statsmodels model flavor in native statsmodels format.

As for now, automatic logging is restricted to parameters, metrics and models generated by a call to fit on a statsmodels model.

Statsmodels pyfunc usage

The following 2 examples illustrate usage of a basic regression model (OLS) and an ARIMA time series model from the following statsmodels apis : statsmodels.formula.api and statsmodels.tsa.api

For a minimal statsmodels regression model, here is an example of the pyfunc predict() method :

import mlflow
import pandas as pd
from sklearn.datasets import load_diabetes
import statsmodels.formula.api as smf

# load the diabetes dataset from sklearn
diabetes = load_diabetes()

# create X and y dataframes for the features and target
X = pd.DataFrame(data=diabetes.data, columns=diabetes.feature_names)
y = pd.DataFrame(data=diabetes.target, columns=["target"])

# concatenate X and y dataframes
df = pd.concat([X, y], axis=1)

# create the linear regression model (ordinary least squares)
model = smf.ols(
    formula="target ~ age + sex + bmi + bp + s1 + s2 + s3 + s4 + s5 + s6", data=df
)

mlflow.statsmodels.autolog(
    log_models=True,
    disable=False,
    exclusive=False,
    disable_for_unsupported_versions=False,
    silent=False,
    registered_model_name=None,
)

with mlflow.start_run():
    res = model.fit(method="pinv", use_t=True)
    model_info = mlflow.statsmodels.log_model(res, artifact_path="OLS_model")

# load the pyfunc model
statsmodels_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)

# generate predictions
predictions = statsmodels_pyfunc.predict(X)
print(predictions)

For a minimal time series ARIMA model, here is an example of the pyfunc predict() method :

import mlflow
import numpy as np
import pandas as pd
from statsmodels.tsa.arima.model import ARIMA

# create a time series dataset with seasonality
np.random.seed(0)

# generate a time index with a daily frequency
dates = pd.date_range(start="2022-12-01", end="2023-12-01", freq="D")

# generate the seasonal component (weekly)
seasonality = np.sin(np.arange(len(dates)) * (2 * np.pi / 365.25) * 7)

# generate the trend component
trend = np.linspace(-5, 5, len(dates)) + 2 * np.sin(
    np.arange(len(dates)) * (2 * np.pi / 365.25) * 0.1
)

# generate the residual component
residuals = np.random.normal(0, 1, len(dates))

# generate the final time series by adding the components
time_series = seasonality + trend + residuals

# create a dataframe from the time series
data = pd.DataFrame({"date": dates, "value": time_series})
data.set_index("date", inplace=True)

order = (1, 0, 0)
# create the ARIMA model
model = ARIMA(data, order=order)

mlflow.statsmodels.autolog(
    log_models=True,
    disable=False,
    exclusive=False,
    disable_for_unsupported_versions=False,
    silent=False,
    registered_model_name=None,
)

with mlflow.start_run():
    res = model.fit()
    mlflow.log_params(
        {
            "order": order,
            "trend": model.trend,
            "seasonal_order": model.seasonal_order,
        }
    )
    mlflow.log_params(res.params)
    mlflow.log_metric("aic", res.aic)
    mlflow.log_metric("bic", res.bic)
    model_info = mlflow.statsmodels.log_model(res, artifact_path="ARIMA_model")

# load the pyfunc model
statsmodels_pyfunc = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)

# prediction dataframes for a TimeSeriesModel must have exactly one row and include columns called start and end
start = pd.to_datetime("2024-01-01")
end = pd.to_datetime("2024-01-07")

# generate predictions
prediction_data = pd.DataFrame({"start": start, "end": end}, index=[0])
predictions = statsmodels_pyfunc.predict(prediction_data)
print(predictions)

For more information, see mlflow.statsmodels.

Prophet (prophet)

The prophet model flavor enables logging of Prophet models in MLflow format via the mlflow.prophet.save_model() and mlflow.prophet.log_model() methods. These methods also add the python_function flavor to the MLflow Models that they produce, allowing the models to be interpreted as generic Python functions for inference via mlflow.pyfunc.load_model(). This loaded PyFunc model can only be scored with DataFrame input. You can also use the mlflow.prophet.load_model() method to load MLflow Models with the prophet model flavor in native prophet format.

Prophet pyfunc usage

This example uses a time series dataset from Prophet’s GitHub repository, containing log number of daily views to Peyton Manning’s Wikipedia page for several years. A sample of the dataset is as follows:

ds

y

2007-12-10

9.59076113897809

2007-12-11

8.51959031601596

2007-12-12

8.18367658262066

2007-12-13

8.07246736935477

import numpy as np
import pandas as pd
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics

import mlflow
from mlflow.models import infer_signature

# starts on 2007-12-10, ends on 2016-01-20
train_df = pd.read_csv(
    "https://raw.githubusercontent.com/facebook/prophet/main/examples/example_wp_log_peyton_manning.csv"
)

# Create a "test" DataFrame with the "ds" column containing 10 days after the end date in train_df
test_dates = pd.date_range(start="2016-01-21", end="2016-01-31", freq="D")
test_df = pd.Series(data=test_dates.values, name="ds").to_frame()

prophet_model = Prophet(changepoint_prior_scale=0.5, uncertainty_samples=7)

with mlflow.start_run():
    prophet_model.fit(train_df)

    # extract and log parameters such as changepoint_prior_scale in the mlflow run
    model_params = {
        name: value for name, value in vars(prophet_model).items() if np.isscalar(value)
    }
    mlflow.log_params(model_params)

    # cross validate with 900 days of data initially, predictions for next 30 days
    # walk forward by 30 days
    cv_results = cross_validation(
        prophet_model, initial="900 days", period="30 days", horizon="30 days"
    )

    # Calculate metrics from cv_results, then average each metric across all backtesting windows and log to mlflow
    cv_metrics = ["mse", "rmse", "mape"]
    metrics_results = performance_metrics(cv_results, metrics=cv_metrics)
    average_metrics = metrics_results.loc[:, cv_metrics].mean(axis=0).to_dict()
    mlflow.log_metrics(average_metrics)

    # Calculate model signature
    train = prophet_model.history
    predictions = prophet_model.predict(prophet_model.make_future_dataframe(30))
    signature = infer_signature(train, predictions)

    model_info = mlflow.prophet.log_model(
        prophet_model, "prophet-model", signature=signature
    )

# Load saved model
prophet_model_saved = mlflow.pyfunc.load_model(model_info.model_uri)
predictions = prophet_model_saved.predict(test_df)

Output (Pandas DataFrame):

Index

ds

yhat

yhat_upper

yhat_lower

0

2016-01-21

8.526513

8.827397

8.328563

1

2016-01-22

8.541355

9.434994

8.112758

2

2016-01-23

8.308332

8.633746

8.201323

3

2016-01-24

8.676326

9.534593

8.020874

4

2016-01-25

8.983457

9.430136

8.121798

For more information, see mlflow.prophet.

Pmdarima (pmdarima)

The pmdarima model flavor enables logging of pmdarima models in MLflow format via the mlflow.pmdarima.save_model() and mlflow.pmdarima.log_model() methods. These methods also add the python_function flavor to the MLflow Models that they produce, allowing the model to be interpreted as generic Python functions for inference via mlflow.pyfunc.load_model(). This loaded PyFunc model can only be scored with a DataFrame input. You can also use the mlflow.pmdarima.load_model() method to load MLflow Models with the pmdarima model flavor in native pmdarima formats.

The interface for utilizing a pmdarima model loaded as a pyfunc type for generating forecast predictions uses a single-row Pandas DataFrame configuration argument. The following columns in this configuration Pandas DataFrame are supported:

  • n_periods (required) - specifies the number of future periods to generate starting from the last datetime value

    of the training dataset, utilizing the frequency of the input training series when the model was trained. (for example, if the training data series elements represent one value per hour, in order to forecast 3 days of future data, set the column n_periods to 72.

  • X (optional) - exogenous regressor values (only supported in pmdarima version >= 1.8.0) a 2D array of values for

    future time period events. For more information, read the underlying library explanation.

  • return_conf_int (optional) - a boolean (Default: False) for whether to return confidence interval values.

    See above note.

  • alpha (optional) - the significance value for calculating confidence intervals. (Default: 0.05)

An example configuration for the pyfunc predict of a pmdarima model is shown below, with a future period prediction count of 100, a confidence interval calculation generation, no exogenous regressor elements, and a default alpha of 0.05:

Index

n_periods

return_conf_int

0

100

True

Warning

The Pandas DataFrame passed to a pmdarima pyfunc flavor must only contain 1 row.

Note

When predicting a pmdarima flavor, the predict method’s DataFrame configuration column return_conf_int’s value controls the output format. When the column’s value is set to False or None (which is the default if this column is not supplied in the configuration DataFrame), the schema of the returned Pandas DataFrame is a single column: ["yhat"]. When set to True, the schema of the returned DataFrame is: ["yhat", "yhat_lower", "yhat_upper"] with the respective lower (yhat_lower) and upper (yhat_upper) confidence intervals added to the forecast predictions (yhat).

Example usage of pmdarima artifact loaded as a pyfunc with confidence intervals calculated:

import pmdarima
import mlflow
import pandas as pd

data = pmdarima.datasets.load_airpassengers()

with mlflow.start_run():
    model = pmdarima.auto_arima(data, seasonal=True)
    mlflow.pmdarima.save_model(model, "/tmp/model.pmd")

loaded_pyfunc = mlflow.pyfunc.load_model("/tmp/model.pmd")

prediction_conf = pd.DataFrame(
    [{"n_periods": 4, "return_conf_int": True, "alpha": 0.1}]
)

predictions = loaded_pyfunc.predict(prediction_conf)

Output (Pandas DataFrame):

Index

yhat

yhat_lower

yhat_upper

0

467.573731

423.30995

511.83751

1

490.494467

416.17449

564.81444

2

509.138684

420.56255

597.71117

3

492.554714

397.30634

587.80309

Warning

Signature logging for pmdarima will not function correctly if return_conf_int is set to True from a non-pyfunc artifact. The output of the native ARIMA.predict() when returning confidence intervals is not a recognized signature type.

OpenAI (openai) (Experimental)

Attention

The openai flavor is in active development and is marked as Experimental. Public APIs may change and new features are subject to be added as additional functionality is brought to the flavor.

The openi model flavor enables logging of OpenAI models in MLflow format via the mlflow.openai.save_model() and mlflow.openai.log_model() functions. Use of these functions also adds the python_function flavor to the MLflow Models that they produce, allowing the model to be interpreted as a generic Python function for inference via mlflow.pyfunc.load_model(). You can also use the mlflow.openai.load_model() function to load a saved or logged MLflow Model with the openai flavor as a dictionary of the model’s attributes.

Example:

import logging
import os

import openai
import pandas as pd

import mlflow

logging.getLogger("mlflow").setLevel(logging.ERROR)

# Uncomment the following lines to run this script without using a real OpenAI API key.
# os.environ["MLFLOW_TESTING"] = "true"
# os.environ["OPENAI_API_KEY"] = "test"

assert "OPENAI_API_KEY" in os.environ, "Please set the OPENAI_API_KEY environment variable."


# On Databricks, set the stored OpenAI API key scope here for automatically loading the API key
# for real time inference. See https://docs.databricks.com/security/secrets/index.html on
# how to add a scope and API key.
os.environ["MLFLOW_OPENAI_SECRET_SCOPE"] = "<scope-name>"

print(
    """
# ******************************************************************************
# Single variable
# ******************************************************************************
"""
)
with mlflow.start_run():
    model_info = mlflow.openai.log_model(
        model="gpt-3.5-turbo",
        task=openai.ChatCompletion,
        artifact_path="model",
        messages=[{"role": "user", "content": "Tell me a joke about {animal}."}],
    )


model = mlflow.pyfunc.load_model(model_info.model_uri)
df = pd.DataFrame(
    {
        "animal": [
            "cats",
            "dogs",
        ]
    }
)
print(model.predict(df))

list_of_dicts = [
    {"animal": "cats"},
    {"animal": "dogs"},
]
print(model.predict(list_of_dicts))

list_of_strings = [
    "cats",
    "dogs",
]
print(model.predict(list_of_strings))
print(
    """
# ******************************************************************************
# Multiple variables
# ******************************************************************************
"""
)
with mlflow.start_run():
    model_info = mlflow.openai.log_model(
        model="gpt-3.5-turbo",
        task=openai.ChatCompletion,
        artifact_path="model",
        messages=[{"role": "user", "content": "Tell me a {adjective} joke about {animal}."}],
    )


model = mlflow.pyfunc.load_model(model_info.model_uri)
df = pd.DataFrame(
    {
        "adjective": ["funny", "scary"],
        "animal": ["cats", "dogs"],
    }
)
print(model.predict(df))


list_of_dicts = [
    {"adjective": "funny", "animal": "cats"},
    {"adjective": "scary", "animal": "dogs"},
]
print(model.predict(list_of_dicts))

print(
    """
# ******************************************************************************
# Multiple prompts
# ******************************************************************************
"""
)
with mlflow.start_run():
    model_info = mlflow.openai.log_model(
        model="gpt-3.5-turbo",
        task=openai.ChatCompletion,
        artifact_path="model",
        messages=[
            {"role": "system", "content": "You are {person}"},
            {"role": "user", "content": "Let me hear your thoughts on {topic}"},
        ],
    )


model = mlflow.pyfunc.load_model(model_info.model_uri)
df = pd.DataFrame(
    {
        "person": ["Elon Musk", "Jeff Bezos"],
        "topic": ["AI", "ML"],
    }
)
print(model.predict(df))

list_of_dicts = [
    {"person": "Elon Musk", "topic": "AI"},
    {"person": "Jeff Bezos", "topic": "ML"},
]
print(model.predict(list_of_dicts))


print(
    """
# ******************************************************************************
# No input variables
# ******************************************************************************
"""
)
with mlflow.start_run():
    model_info = mlflow.openai.log_model(
        model="gpt-3.5-turbo",
        task=openai.ChatCompletion,
        artifact_path="model",
        messages=[{"role": "system", "content": "You are Elon Musk"}],
    )

model = mlflow.pyfunc.load_model(model_info.model_uri)
df = pd.DataFrame(
    {
        "question": [
            "Let me hear your thoughts on AI",
            "Let me hear your thoughts on ML",
        ],
    }
)
print(model.predict(df))

list_of_dicts = [
    {"question": "Let me hear your thoughts on AI"},
    {"question": "Let me hear your thoughts on ML"},
]
model = mlflow.pyfunc.load_model(model_info.model_uri)
print(model.predict(list_of_dicts))

list_of_strings = [
    "Let me hear your thoughts on AI",
    "Let me hear your thoughts on ML",
]
model = mlflow.pyfunc.load_model(model_info.model_uri)
print(model.predict(list_of_strings))

LangChain (langchain) (Experimental)

Attention

The langchain flavor is in active development and is marked as Experimental. Public APIs may change and new features are subject to be added as additional functionality is brought to the flavor.

The langchain model flavor enables logging of LangChain models in MLflow format via the mlflow.langchain.save_model() and mlflow.langchain.log_model() functions. Use of these functions also adds the python_function flavor to the MLflow Models that they produce, allowing the model to be interpreted as a generic Python function for inference via mlflow.pyfunc.load_model(). You can also use the mlflow.langchain.load_model() function to load a saved or logged MLflow Model with the langchain flavor as a dictionary of the model’s attributes.

Example: Log a LangChain LLMChain

import os

from langchain.chains import LLMChain
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate

import mlflow

assert "OPENAI_API_KEY" in os.environ, "Please set the OPENAI_API_KEY environment variable."

llm = OpenAI(temperature=0.9)
prompt = PromptTemplate(
    input_variables=["product"],
    template="What is a good name for a company that makes {product}?",
)
chain = LLMChain(llm=llm, prompt=prompt)

with mlflow.start_run():
    logged_model = mlflow.langchain.log_model(chain, "langchain_model")

loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
print(loaded_model.predict([{"product": "colorful socks"}]))

Example: Log a LangChain Agent

import os

from langchain.agents import AgentType, initialize_agent, load_tools
from langchain.llms import OpenAI

import mlflow

assert "OPENAI_API_KEY" in os.environ, "Please set the OPENAI_API_KEY environment variable."
assert "SERPAPI_API_KEY" in os.environ, "Please set the SERPAPI_API_KEY environment variable."

# First, let's load the language model we're going to use to control the agent.
llm = OpenAI(temperature=0)

# Next, let's load some tools to use. Note that the `llm-math` tool uses an LLM, so we need to pass that in.
tools = load_tools(["serpapi", "llm-math"], llm=llm)

# Finally, let's initialize an agent with the tools, the language model, and the type of agent we want to use.
agent = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)

with mlflow.start_run():
    logged_model = mlflow.langchain.log_model(agent, "langchain_model")

loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
print(
    loaded_model.predict(
        [
            {
                "input": "What was the high temperature in SF yesterday in Fahrenheit? What is that number raised to the .023 power?"
            }
        ]
    )
)

Logging RetrievalQA Chains

In MLflow, you can use the langchain flavor to save a RetrievalQA chain, including the retriever object.

Native LangChain requires the user to handle the serialization and deserialization of the retriever object, but MLflow’s langchain flavor handles that for you.

Here are the two things you need to tell MLflow:

  1. Where the retriever object is stored (persist_dir).

  2. How to load the retriever object from that location (loader_fn).

After you define these, MLflow takes care of the rest, saving both the content in the persist_dir and pickling the loader_fn function.

Example: Log a LangChain RetrievalQA Chain

import os
import tempfile

from langchain.chains import RetrievalQA
from langchain.document_loaders import TextLoader
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.llms import OpenAI
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS

import mlflow

assert "OPENAI_API_KEY" in os.environ, "Please set the OPENAI_API_KEY environment variable."

with tempfile.TemporaryDirectory() as temp_dir:
    persist_dir = os.path.join(temp_dir, "faiss_index")

    # Create the vector db, persist the db to a local fs folder
    loader = TextLoader("tests/langchain/state_of_the_union.txt")
    documents = loader.load()
    text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
    docs = text_splitter.split_documents(documents)
    embeddings = OpenAIEmbeddings()
    db = FAISS.from_documents(docs, embeddings)
    db.save_local(persist_dir)

    # Create the RetrievalQA chain
    retrievalQA = RetrievalQA.from_llm(llm=OpenAI(), retriever=db.as_retriever())

    # Log the retrievalQA chain
    def load_retriever(persist_directory):
        embeddings = OpenAIEmbeddings()
        vectorstore = FAISS.load_local(persist_directory, embeddings)
        return vectorstore.as_retriever()

    with mlflow.start_run() as run:
        logged_model = mlflow.langchain.log_model(
            retrievalQA,
            artifact_path="retrieval_qa",
            loader_fn=load_retriever,
            persist_dir=persist_dir,
        )

# Load the retrievalQA chain
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
loaded_model.predict([{"query": "What did the president say about Ketanji Brown Jackson"}])

Logging a retriever and evaluate it individually

The langchain flavor provides the functionality to log a retriever object and evaluate it individually. This is useful if you want to evaluate the quality of the relevant documents returned by a retriever object without directing these documents through a large language model (LLM) to yield a summarized response.

In order to log the retriever object in the langchain flavor, it is also required to specify persist_dir and loader_fn, the same as logging the RetrievalQA chain. See the previous section for details about these parameters.

See the following example for more details.

Example: Log a LangChain Retriever

import os
import tempfile

from langchain.document_loaders import TextLoader
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS

import mlflow

assert "OPENAI_API_KEY" in os.environ, "Please set the OPENAI_API_KEY environment variable."

with tempfile.TemporaryDirectory() as temp_dir:
    persist_dir = os.path.join(temp_dir, "faiss_index")

    # Create the vector db, persist the db to a local fs folder
    loader = TextLoader("tests/langchain/state_of_the_union.txt")
    documents = loader.load()
    text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
    docs = text_splitter.split_documents(documents)
    embeddings = OpenAIEmbeddings()
    db = FAISS.from_documents(docs, embeddings)
    db.save_local(persist_dir)

    def load_retriever(persist_directory):
        embeddings = OpenAIEmbeddings()
        vectorstore = FAISS.load_local(persist_directory, embeddings)
        return vectorstore.as_retriever()

    # Log the retriever
    with mlflow.start_run() as run:
        logged_model = mlflow.langchain.log_model(
            db.as_retriever(),
            artifact_path="retriever",
            loader_fn=load_retriever,
            persist_dir=persist_dir,
        )

# Load the retriever chain
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
print(loaded_model.predict([{"query": "What did the president say about Ketanji Brown Jackson"}]))

John Snow Labs (johnsnowlabs) (Experimental)

Attention

The johnsnowlabs flavor is in active development and is marked as Experimental. Public APIs may change and new features are subject to be added as additional functionality is brought to the flavor.

The johnsnowlabs model flavor gives you access to 20.000+ state-of-the-art enterprise NLP models in 200+ languages for medical, finance, legal and many more domains.

You can use mlflow.johnsnowlabs.log_model() to log and export your model as mlflow.pyfunc.PyFuncModel.

This enables you to integrate any John Snow Labs model into the MLflow framework. You can easily deploy your models for inference with MLflows serve functionalities. Models are interpreted as a generic Python function for inference via mlflow.pyfunc.load_model(). You can also use the mlflow.johnsnowlabs.load_model() function to load a saved or logged MLflow Model with the johnsnowlabs flavor from an stored artifact.

Features include: LLM’s, Text Summarization, Question Answering, Named Entity Recognition, Relation Extraction, Sentiment Analysis, Spell Checking, Image Classification, Automatic Speech Recognition and much more, powered by the latest Transformer Architectures. The models are provided by John Snow Labs and requires a John Snow Labs Enterprise NLP License. You can reach out to us for a research or industry license.

Example: Export a John Snow Labs to MLflow format

import json
import os

import pandas as pd
from johnsnowlabs import nlp

import mlflow
from mlflow.pyfunc import spark_udf

# 1) Write your raw license.json string into the 'JOHNSNOWLABS_LICENSE_JSON' env variable for MLflow
creds = {
    "AWS_ACCESS_KEY_ID": "...",
    "AWS_SECRET_ACCESS_KEY": "...",
    "SPARK_NLP_LICENSE": "...",
    "SECRET": "...",
}
os.environ["JOHNSNOWLABS_LICENSE_JSON"] = json.dumps(creds)

# 2) Install enterprise libraries
nlp.install()
# 3) Start a Spark session with enterprise libraries
spark = nlp.start()

# 4) Load a model and test it
nlu_model = "en.classify.bert_sequence.covid_sentiment"
model_save_path = "my_model"
johnsnowlabs_model = nlp.load(nlu_model)
johnsnowlabs_model.predict(["I hate COVID,", "I love COVID"])

# 5) Export model with pyfunc and johnsnowlabs flavors
with mlflow.start_run():
    model_info = mlflow.johnsnowlabs.log_model(johnsnowlabs_model, model_save_path)

# 6) Load model with johnsnowlabs flavor
mlflow.johnsnowlabs.load_model(model_info.model_uri)

# 7) Load model with pyfunc flavor
mlflow.pyfunc.load_model(model_save_path)

pandas_df = pd.DataFrame({"text": ["Hello World"]})
spark_df = spark.createDataFrame(pandas_df).coalesce(1)
pyfunc_udf = spark_udf(
    spark=spark,
    model_uri=model_save_path,
    env_manager="virtualenv",
    result_type="string",
)
new_df = spark_df.withColumn("prediction", pyfunc_udf(*pandas_df.columns))

# 9) You can now use the mlflow models serve command to serve the model see next section

# 10)  You can also use x command to deploy model inside of a container see next section

To deploy the John Snow Labs model as a container

  1. Start the Docker Container

docker run -p 5001:8080 -e JOHNSNOWLABS_LICENSE_JSON=your_json_string "mlflow-pyfunc"
  1. Query server

curl http://127.0.0.1:5001/invocations -H 'Content-Type: application/json' -d '{
  "dataframe_split": {
      "columns": ["text"],
      "data": [["I hate covid"], ["I love covid"]]
  }
}'

To deploy the John Snow Labs model without a container

  1. Export env variable and start server

export JOHNSNOWLABS_LICENSE_JSON=your_json_string
mlflow models serve -m <model_uri>
  1. Query server

curl http://127.0.0.1:5000/invocations -H 'Content-Type: application/json' -d '{
  "dataframe_split": {
      "columns": ["text"],
      "data": [["I hate covid"], ["I love covid"]]
  }
}'

Diviner (diviner)

The diviner model flavor enables logging of diviner models in MLflow format via the mlflow.diviner.save_model() and mlflow.diviner.log_model() methods. These methods also add the python_function flavor to the MLflow Models that they produce, allowing the model to be interpreted as generic Python functions for inference via mlflow.pyfunc.load_model(). This loaded PyFunc model can only be scored with a DataFrame input. You can also use the mlflow.diviner.load_model() method to load MLflow Models with the diviner model flavor in native diviner formats.

Diviner Types

Diviner is a library that provides an orchestration framework for performing time series forecasting on groups of related series. Forecasting in diviner is accomplished through wrapping popular open source libraries such as prophet and pmdarima. The diviner library offers a simplified set of APIs to simultaneously generate distinct time series forecasts for multiple data groupings using a single input DataFrame and a unified high-level API.

Metrics and Parameters logging for Diviner

Unlike other flavors that are supported in MLflow, Diviner has the concept of grouped models. As a collection of many (perhaps thousands) of individual forecasting models, the burden to the tracking server to log individual metrics and parameters for each of these models is significant. For this reason, metrics and parameters are exposed for retrieval from Diviner’s APIs as Pandas DataFrames, rather than discrete primitive values.

To illustrate, let us assume we are forecasting hourly electricity consumption from major cities around the world. A sample of our input data looks like this:

country

city

datetime

watts

US

NewYork

2022-03-01 00:01:00

23568.9

US

NewYork

2022-03-01 00:02:00

22331.7

US

Boston

2022-03-01 00:01:00

14220.1

US

Boston

2022-03-01 00:02:00

14183.4

CA

Toronto

2022-03-01 00:01:00

18562.2

CA

Toronto

2022-03-01 00:02:00

17681.6

MX

MexicoCity

2022-03-01 00:01:00

19946.8

MX

MexicoCity

2022-03-01 00:02:00

19444.0

If we were to fit a model on this data, supplying the grouping keys as:

grouping_keys = ["country", "city"]

We will have a model generated for each of the grouping keys that have been supplied:

[("US", "NewYork"), ("US", "Boston"), ("CA", "Toronto"), ("MX", "MexicoCity")]

With a model constructed for each of these, entering each of their metrics and parameters wouldn’t be an issue for the MLflow tracking server. What would become a problem, however, is if we modeled each major city on the planet and ran this forecasting scenario every day. If we were to adhere to the conditions of the World Bank, that would mean just over 10,000 models as of 2022. After a mere few weeks of running this forecasting every day we would have a very large metrics table.

To eliminate this issue for large-scale forecasting, the metrics and parameters for diviner are extracted as a grouping key indexed Pandas DataFrame, as shown below for example (float values truncated for visibility):

grouping_key_columns

country

city

mse

rmse

mae

mape

mdape

smape

“(‘country’, ‘city’)”

CA

Toronto

8276851.6

2801.7

2417.7

0.16

0.16

0.159

“(‘country’, ‘city’)”

MX

MexicoCity

3548872.4

1833.8

1584.5

0.15

0.16

0.159

“(‘country’, ‘city’)”

US

NewYork

3167846.4

1732.4

1498.2

0.15

0.16

0.158

“(‘country’, ‘city’)”

US

Boston

14082666.4

3653.2

3156.2

0.15

0.16

0.159

There are two recommended means of logging the metrics and parameters from a diviner model :

import os
import mlflow
import tempfile

with tempfile.TemporaryDirectory() as tmpdir:
    params = model.extract_model_params()
    metrics = model.cross_validate_and_score(
        horizon="72 hours",
        period="240 hours",
        initial="480 hours",
        parallel="threads",
        rolling_window=0.1,
        monthly=False,
    )
    params.to_csv(f"{tmpdir}/params.csv", index=False, header=True)
    metrics.to_csv(f"{tmpdir}/metrics.csv", index=False, header=True)

    mlflow.log_artifacts(tmpdir, artifact_path="data")

Note

The parameters extract from diviner models may require casting (or dropping of columns) if using the pd.DataFrame.to_dict() approach due to the inability of this method to serialize objects.

import mlflow

params = model.extract_model_params()
metrics = model.cross_validate_and_score(
    horizon="72 hours",
    period="240 hours",
    initial="480 hours",
    parallel="threads",
    rolling_window=0.1,
    monthly=False,
)
params["t_scale"] = params["t_scale"].astype(str)
params["start"] = params["start"].astype(str)
params = params.drop("stan_backend", axis=1)

mlflow.log_dict(params.to_dict(), "params.json")
mlflow.log_dict(metrics.to_dict(), "metrics.json")

Logging of the model artifact is shown in the pyfunc example below.

Diviner pyfunc usage

The MLflow Diviner flavor includes an implementation of the pyfunc interface for Diviner models. To control prediction behavior, you can specify configuration arguments in the first row of a Pandas DataFrame input.

As this configuration is dependent upon the underlying model type (i.e., the diviner.GroupedProphet.forecast() method has a different signature than does diviner.GroupedPmdarima.predict()), the Diviner pyfunc implementation attempts to coerce arguments to the types expected by the underlying model.

Note

Diviner models support both “full group” and “partial group” forecasting. If a column named "groups" is present in the configuration DataFrame submitted to the pyfunc flavor, the grouping key values in the first row will be used to generate a subset of forecast predictions. This functionality removes the need to filter a subset from the full output of all groups forecasts if the results of only a few (or one) groups are needed.

For a GroupedPmdarima model, an example configuration for the pyfunc predict() method is:

import mlflow
import pandas as pd
from pmdarima.arima.auto import AutoARIMA
from diviner import GroupedPmdarima

with mlflow.start_run():
    base_model = AutoARIMA(out_of_sample_size=96, maxiter=200)
    model = GroupedPmdarima(model_template=base_model).fit(
        df=df,
        group_key_columns=["country", "city"],
        y_col="watts",
        datetime_col="datetime",
        silence_warnings=True,
    )

    mlflow.diviner.save_model(diviner_model=model, path="/tmp/diviner_model")

diviner_pyfunc = mlflow.pyfunc.load_model(model_uri="/tmp/diviner_model")

predict_conf = pd.DataFrame(
    {
        "n_periods": 120,
        "groups": [
            ("US", "NewYork"),
            ("CA", "Toronto"),
            ("MX", "MexicoCity"),
        ],  # NB: List of tuples required.
        "predict_col": "wattage_forecast",
        "alpha": 0.1,
        "return_conf_int": True,
        "on_error": "warn",
    },
    index=[0],
)

subset_forecasts = diviner_pyfunc.predict(predict_conf)

Note

There are several instances in which a configuration DataFrame submitted to the pyfunc predict() method will cause an MlflowException to be raised:

  • If neither horizon or n_periods are provided.

  • The value of n_periods or horizon is not an integer.

  • If the model is of type GroupedProphet, frequency as a string type must be provided.

  • If both horizon and n_periods are provided with different values.

Transformers (transformers) (Experimental)

Attention

The transformers flavor is in active development and is marked as Experimental. Public APIs may change and new features are subject to be added as additional functionality is brought to the flavor.

The transformers model flavor enables logging of transformers models, components, and pipelines in MLflow format via the mlflow.transformers.save_model() and mlflow.transformers.log_model() functions. Use of these functions also adds the python_function flavor to the MLflow Models that they produce, allowing the model to be interpreted as a generic Python function for inference via mlflow.pyfunc.load_model(). You can also use the mlflow.transformers.load_model() function to load a saved or logged MLflow Model with the transformers flavor in the native transformers formats.

Input and Output types for PyFunc

The transformers python_function (pyfunc) model flavor simplifies and standardizes both the inputs and outputs of pipeline inference. This conformity allows for serving and batch inference by coercing the data structures that are required for transformers inference pipelines to formats that are compatible with json serialization and casting to Pandas DataFrames.

Note

Certain TextGenerationPipeline types, particularly instructional-based ones, may return the original prompt and included line-formatting carriage returns “n” in their outputs. For these pipeline types, if you would like to disable the prompt return, you can set the following in the inference_config dictionary when saving or logging the model: “include_prompt”: False. To remove the newline characters from within the body of the generated text output, you can add the “collapse_whitespace”: True option to the inference_config dictionary. If the pipeline type being saved does not inherit from TextGenerationPipeline, these options will not perform any modification to the output returned from pipeline inference.

Attention

Not all transformers pipeline types are supported. See the table below for the list of currently supported Pipeline types that can be loaded as pyfunc.

In the current version, audio and text-based large language models are supported for use with pyfunc, while computer vision, multi-modal, timeseries, reinforcement learning, and graph models are only supported for native type loading via mlflow.transformers.load_model()

Future releases of MLflow will introduce pyfunc support for these additional types.

The table below shows the mapping of transformers pipeline types to the python_function (pyfunc) model flavor data type inputs and outputs.

Important

The inputs and outputs of the pyfunc implementation of these pipelines are not guaranteed to match the input types and output types that would return from a native use of a given pipeline type. If your use case requires access to scores, top_k results, or other additional references within the output from a pipeline inference call, please use the native implementation by loading via mlflow.transformers.load_model() to receive the full output.

Similarly, if your use case requires the use of raw tensor outputs or processing of outputs through an external processor module, load the model components directly as a dict by calling mlflow.transformers.load_model() and specify the return_type argument as ‘components’.

Supported transformers Pipeline types for Pyfunc

Pipeline Type

Input Type

Output Type

Instructional Text Generation

str or List[str]

List[str]

Conversational

str or List[str]

List[str]

Summarization

str or List[str]

List[str]

Text Classification

str or List[str]

pd.DataFrame (dtypes: {‘label’: str, ‘score’: double})

Text Generation

str or List[str]

List[str]

Text2Text Generation

str or List[str]

List[str]

Token Classification

str or List[str]

List[str]

Translation

str or List[str]

List[str]

ZeroShot Classification*

Dict[str, [List[str] | str]*

pd.DataFrame (dtypes: {‘sequence’: str, ‘labels’: str, ‘scores’: double})

Table Question Answering**

Dict[str, [List[str] | str]**

List[str]

Question Answering***

Dict[str, str]***

List[str]

Fill Mask****

str or List[str]****

List[str]

Feature Extraction

str or List[str]

np.ndarray

AutomaticSpeechRecognition

bytes*****, str, or np.ndarray

List[str]

AudioClassification

bytes*****, str, or np.ndarray

pd.DataFrame (dtypes: {‘label’: str, ‘score’: double})

* A collection of these inputs can also be passed. The standard required key names are ‘sequences’ and ‘candidate_labels’, but these may vary. Check the input requirments for the architecture that you’re using to ensure that the correct dictionary key names are provided.

** A collection of these inputs can also be passed. The reference table must be a json encoded dict (i.e. {‘query’: ‘what did we sell most of?’, ‘table’: json.dumps(table_as_dict)})

*** A collection of these inputs can also be passed. The standard required key names are ‘question’ and ‘context’. Verify the expected input key names match the expected input to the model to ensure your inference request can be read properly.

**** The mask syntax for the model that you’ve chosen is going to be specific to that model’s implementation. Some are ‘[MASK]’, while others are ‘<mask>’. Verify the expected syntax to avoid failed inference requests.

***** If using pyfunc in MLflow Model Serving for realtime inference, the raw audio in bytes format must be base64 encoded prior to submitting to the endpoint. String inputs will be interpreted as uri locations.

Using inference_config and model signature params for transformers inference

For transformers inference, there are two ways to pass in additional arguments to the pipeline.

  • Use inference_config when saving/logging the model

  • Specify params at inference time when calling predict()

Note

If both inference_config and ModelSignature with schema are saved when logging model, both of them will be used for inference. The default params in ModelSignature will override the params in inference_config. If extra params are provided at inference time, they take precedence over all params.

  • Using inference_config

import mlflow
from mlflow.models import infer_signature
from mlflow.transformers import generate_signature_output
import transformers

architecture = "mrm8488/t5-base-finetuned-common_gen"
model = transformers.pipeline(
    task="text2text-generation",
    tokenizer=transformers.T5TokenizerFast.from_pretrained(architecture),
    model=transformers.T5ForConditionalGeneration.from_pretrained(architecture),
)
data = "pencil draw paper"

# Infer the signature
signature = infer_signature(
    data,
    generate_signature_output(model, data),
)

# Define an inference_config
inference_config = {
    "num_beams": 5,
    "max_length": 30,
    "do_sample": True,
    "remove_invalid_values": True,
}

# Saving inference_config with the model
mlflow.transformers.save_model(
    model,
    path="text2text",
    inference_config=inference_config,
    signature=signature,
)

pyfunc_loaded = mlflow.pyfunc.load_model("text2text")
# inference_config will be applied
result = pyfunc_loaded.predict(data)
  • Specifying params at inference time

import mlflow
from mlflow.models import infer_signature
from mlflow.transformers import generate_signature_output
import transformers

architecture = "mrm8488/t5-base-finetuned-common_gen"
model = transformers.pipeline(
    task="text2text-generation",
    tokenizer=transformers.T5TokenizerFast.from_pretrained(architecture),
    model=transformers.T5ForConditionalGeneration.from_pretrained(architecture),
)
data = "pencil draw paper"

# Define an inference_config
inference_config = {
    "num_beams": 5,
    "max_length": 30,
    "do_sample": True,
    "remove_invalid_values": True,
}

# Infer the signature including params
signature_with_params = infer_signature(
    data,
    generate_signature_output(model, data),
    params=inference_config,
)

# Saving model without inference_config
mlflow.transformers.save_model(
    model,
    path="text2text",
    signature=signature_with_params,
)

pyfunc_loaded = mlflow.pyfunc.load_model("text2text")

# Pass params at inference time
params = {
    "max_length": 20,
    "do_sample": False,
}

# In this case we only override max_length and do_sample,
# other params will use the default one saved in ModelSignature.
# The final params used for prediction is as follows:
# {
#    "num_beams": 5,
#    "max_length": 20,
#    "do_sample": False,
#    "remove_invalid_values": True,
# }
result = pyfunc_loaded.predict(data, params=params)
Example of loading a transformers model as a python function

In the below example, a simple pre-trained model is used within a pipeline. After logging to MLflow, the pipeline is loaded as a pyfunc and used to generate a response from a passed-in list of strings.

import mlflow
import transformers

# Read a pre-trained conversation pipeline from HuggingFace hub
conversational_pipeline = transformers.pipeline(model="microsoft/DialoGPT-medium")

# Define the signature
signature = mlflow.models.infer_signature(
    "Hi there, chatbot!",
    mlflow.transformers.generate_signature_output(
        conversational_pipeline, "Hi there, chatbot!"
    ),
)

# Log the pipeline
with mlflow.start_run():
    model_info = mlflow.transformers.log_model(
        transformers_model=conversational_pipeline,
        artifact_path="chatbot",
        task="conversational",
        signature=signature,
        input_example="A clever and witty question",
    )

# Load the saved pipeline as pyfunc
chatbot = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)

# Ask the chatbot a question
response = chatbot.predict("What is machine learning?")

print(response)

# >> [It's a new thing that's been around for a while.]

Save and Load options for transformers

The transformers flavor for MLflow provides support for saving either components of a model or a pipeline object that contains the customized components in an easy to use interface that is optimized for inference.

Note

MLflow by default uses a 500 MB max_shard_size to save the model object in mlflow.transformers.save_model() or mlflow.transformers.log_model() APIs. You can use the environment variable MLFLOW_HUGGINGFACE_MODEL_MAX_SHARD_SIZE to override the value.

Note

For component-based logging, the only requirement that must be met in the submitted dict is that a model is provided. All other elements of the dict are optional.

Logging a components-based model

The example below shows logging components of a transformers model via a dictionary mapping of specific named components. The names of the keys within the submitted dictionary must be in the set: {"model", "tokenizer", "feature_extractor", "image_processor"}. Processor type objects (some image processors, audio processors, and multi-modal processors) must be saved explicitly with the processor argument in the mlflow.transformers.save_model() or mlflow.transformers.log_model() APIs.

After logging, the components are automatically inserted into the appropriate Pipeline type for the task being performed and are returned, ready for inference.

Note

The components that are logged can be retrieved in their original structure (a dictionary) by setting the attribute return_type to “components” in the load_model() API.

Attention

Not all model types are compatible with the pipeline API constructor via component elements. Incompatible models will raise an MLflowException error stating that the model is missing the name_or_path attribute. In the event that this occurs, please construct the model directly via the transformers.pipeline(<repo name>) API and save the pipeline object directly.

import mlflow
import transformers

task = "text-classification"
architecture = "distilbert-base-uncased-finetuned-sst-2-english"
model = transformers.AutoModelForSequenceClassification.from_pretrained(architecture)
tokenizer = transformers.AutoTokenizer.from_pretrained(architecture)

# Define the components of the model in a dictionary
transformers_model = {"model": model, "tokenizer": tokenizer}

# Log the model components
with mlflow.start_run():
    model_info = mlflow.transformers.log_model(
        transformers_model=transformers_model,
        artifact_path="text_classifier",
        task=task,
    )

# Load the components as a pipeline
loaded_pipeline = mlflow.transformers.load_model(
    model_info.model_uri, return_type="pipeline"
)

print(type(loaded_pipeline).__name__)
# >> TextClassificationPipeline

loaded_pipeline(["MLflow is awesome!", "Transformers is a great library!"])

# >> [{'label': 'POSITIVE', 'score': 0.9998478889465332},
# >>  {'label': 'POSITIVE', 'score': 0.9998030066490173}]
Saving a pipeline and loading components

Some use cases can benefit from the simplicity of defining a solution as a pipeline, but need the component-level access for performing a micro-services based deployment strategy where pre / post-processing is performed on containers that do not house the model itself. For this paradigm, a pipeline can be loaded as its constituent parts, as shown below.

import transformers
import mlflow

translation_pipeline = transformers.pipeline(
    task="translation_en_to_fr",
    model=transformers.T5ForConditionalGeneration.from_pretrained("t5-small"),
    tokenizer=transformers.T5TokenizerFast.from_pretrained(
        "t5-small", model_max_length=100
    ),
)

with mlflow.start_run():
    model_info = mlflow.transformers.log_model(
        transformers_model=translation_pipeline,
        artifact_path="french_translator",
    )

translation_components = mlflow.transformers.load_model(
    model_info.model_uri, return_type="components"
)

for key, value in translation_components.items():
    print(f"{key} -> {type(value).__name__}")

# >> task -> str
# >> model -> T5ForConditionalGeneration
# >> tokenizer -> T5TokenizerFast

response = translation_pipeline("MLflow is great!")

print(response)

# >> [{'translation_text': 'MLflow est formidable!'}]

reconstructed_pipeline = transformers.pipeline(**translation_components)

reconstructed_response = reconstructed_pipeline(
    "transformers makes using Deep Learning models easy and fun!"
)

print(reconstructed_response)

# >> [{'translation_text': "Les transformateurs rendent l'utilisation de modèles Deep Learning facile et amusante!"}]
Automatic Metadata and ModelCard logging

In order to provide as much information as possible for saved models, the transformers flavor will automatically fetch the ModelCard for any model or pipeline that is saved that has a stored card on the HuggingFace hub. This card will be logged as part of the model artifact, viewable at the same directory level as the MLmodel file and the stored model object.

In addition to the ModelCard, the components that comprise any Pipeline (or the individual components if saving a dictionary of named components) will have their source types stored. The model type, pipeline type, task, and classes of any supplementary component (such as a Tokenizer or ImageProcessor) will be stored in the MLmodel file as well.

Automatic Signature inference

For pipelines that support pyfunc, there are 3 means of attaching a model signature to the MLmodel file.

  • Provide a model signature explicitly via setting a valid ModelSignature to the signature attribute. This can be generated via the helper utility mlflow.transformers.generate_signature_output()

  • Provide an input_example. The signature will be inferred and validated that it matches the appropriate input type. The output type will be validated by performing inference automatically (if the model is a pyfunc supported type).

  • Do nothing. The transformers flavor will automatically apply the appropriate general signature that the pipeline type supports (only for a single-entity; collections will not be inferred).

Scalability for inference

A common configuration for lowering the total memory pressure for pytorch models within transformers pipelines is to modify the processing data type. This is achieved through setting the torch_dtype argument when creating a Pipeline. For a full reference of these tunable arguments for configuration of pipelines, see the training docs .

Note

This feature does not exist in versions of transformers < 4.26.x

In order to apply these configurations to a saved or logged run, there are two options:

  • Save a pipeline with the torch_dtype argument set to the encoding type of your choice.

Example:

import transformers
import torch
import mlflow

task = "translation_en_to_fr"

my_pipeline = transformers.pipeline(
    task=task,
    model=transformers.T5ForConditionalGeneration.from_pretrained("t5-small"),
    tokenizer=transformers.T5TokenizerFast.from_pretrained(
        "t5-small", model_max_length=100
    ),
    framework="pt",
    torch_dtype=torch.bfloat16,
)

with mlflow.start_run():
    model_info = mlflow.transformers.log_model(
        transformers_model=my_pipeline,
        artifact_path="my_pipeline",
    )

# Illustrate that the torch data type is recorded in the flavor configuration
print(model_info.flavors["transformers"])

Result:

{'transformers_version': '4.28.1',
 'code': None,
 'task': 'translation_en_to_fr',
 'instance_type': 'TranslationPipeline',
 'source_model_name': 't5-small',
 'pipeline_model_type': 'T5ForConditionalGeneration',
 'framework': 'pt',
 'torch_dtype': 'torch.bfloat16',
 'tokenizer_type': 'T5TokenizerFast',
 'components': ['tokenizer'],
 'pipeline': 'pipeline'}
  • Specify the torch_dtype argument when loading the model to override any values set during logging or saving.

Example:

import transformers
import torch
import mlflow

task = "translation_en_to_fr"

my_pipeline = transformers.pipeline(
    task=task,
    model=transformers.T5ForConditionalGeneration.from_pretrained("t5-small"),
    tokenizer=transformers.T5TokenizerFast.from_pretrained(
        "t5-small", model_max_length=100
    ),
    framework="pt",
    torch_dtype=torch.bfloat16,
)

with mlflow.start_run():
    model_info = mlflow.transformers.log_model(
        transformers_model=my_pipeline,
        artifact_path="my_pipeline",
    )

loaded_pipeline = mlflow.transformers.load_model(
    model_info.model_uri, return_type="pipeline", torch_dtype=torch.float64
)

print(loaded_pipeline.torch_dtype)

Result:

torch.float64

Note

Logging or saving a model in ‘components’ mode (using a dictionary to declare components) does not support setting the data type for a constructed pipeline. If you need to override the default behavior of how data is encoded, please save or log a pipeline object.

Note

Overriding the data type for a pipeline when loading as a python_function (pyfunc) model flavor is not supported. The value set for torch_dtype during save_model() or log_model() will persist when loading as pyfunc.

Input data types for audio pipelines

Note that passing raw data to an audio pipeline (raw bytes) requires two separate elements of the same effective library. In order to use the bitrate transposition and conversion of the audio bytes data into numpy nd.array format, the library ffmpeg is required. Installing this package directly from pypi (pip install ffmpeg) does not install the underlying c dll’s that are required to make ffmpeg function. Please consult with the documentation at the ffmpeg website for guidance on your given operating system.

The Audio Pipeline types, when loaded as a python_function (pyfunc) model flavor have three input types available:

  • str

The string input type is meant for blob references (uri locations) that are accessible to the instance of the pyfunc model. This input mode is useful when doing large batch processing of audio inference in Spark due to the inherent limitations of handling large bytes data in Spark DataFrames. Ensure that you have ffmpeg installed in the environment that the pyfunc model is running in order to use str input uri-based inference. If this package is not properly installed (both from pypi and from the ffmpeg binaries), an Exception will be thrown at inference time.

Warning

If using a uri (str) as an input type for a pyfunc model that you are intending to host for realtime inference through the MLflow Model Server, you must specify a custom model signature when logging or saving the model. The default signature input value type of bytes will, in MLflow Model serving, force the conversion of the uri string to bytes, which will cause an Exception to be thrown from the serving process stating that the soundfile is corrupt.

An example of specifying an appropriate uri-based input model signature for an audio model is shown below:

from mlflow.models import infer_signature
from mlflow.transformers import generate_signature_output

url = "https://www.mywebsite.com/sound/files/for/transcription/file111.mp3"
signature = infer_signature(url, generate_signature_output(my_audio_pipeline, url))
with mlflow.start_run():
    mlflow.transformers.log_model(
        transformers_model=my_audio_pipeline,
        artifact_path="my_transcriber",
        signature=signature,
    )
  • bytes

This is the default serialization format of audio files. It is the easiest format to utilize due to the fact that Pipeline implementations will automatically convert the audio bitrate from the file with the use of ffmpeg (a required dependency if using this format) to the bitrate required by the underlying model within the Pipeline. When using the pyfunc representation of the pipeline directly (not through serving), the sound file can be passed directly as bytes without any modification. When used through serving, the bytes data must be base64 encoded.

  • np.ndarray

This input format requires that both the bitrate has been set prior to conversion to numpy.ndarray (i.e., through the use of a package like librosa or pydub) and that the model has been saved with a signature that uses the np.ndarray format for the input.

Note

Audio models being used for serving that intend to utilize pre-formatted audio in np.ndarray format must have the model saved with a signature configuration that reflects this schema. Failure to do so will result in type casting errors due to the default signature for audio transformers pipelines being set as expecting binary (bytes) data. The serving endpoint cannot accept a union of types, so a particular model instance must choose one or the other as an allowed input type.

SentenceTransformers (sentence_transformers) (Experimental)

Attention

The sentence_transformers flavor is in active development and is marked as Experimental. Public APIs may change and new features are subject to be added as additional functionality is brought to the flavor.

The sentence_transformers model flavor enables logging of sentence-transformers models in MLflow format via the mlflow.sentence_transformers.save_model() and mlflow.sentence_transformers.log_model() functions. Use of these functions also adds the python_function flavor to the MLflow Models that they produce, allowing the model to be interpreted as a generic Python function for inference via mlflow.pyfunc.load_model(). You can also use the mlflow.sentence_transformers.load_model() function to load a saved or logged MLflow Model with the sentence_transformers flavor as a native sentence-transformers model.

Example:

from sentence_transformers import SentenceTransformer

import mlflow
import mlflow.sentence_transformers

model = SentenceTransformer("all-MiniLM-L6-v2")

example_sentences = ["This is a sentence.", "This is another sentence."]

# Define the signature
signature = mlflow.models.infer_signature(
    model_input=example_sentences,
    model_output=model.encode(example_sentences),
)

# Log the model using mlflow
with mlflow.start_run():
    logged_model = mlflow.sentence_transformers.log_model(
        model=model,
        artifact_path="sbert_model",
        signature=signature,
        input_example=example_sentences,
    )

# Load option 1: mlflow.pyfunc.load_model returns a PyFuncModel
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
embeddings1 = loaded_model.predict(["hello world", "i am mlflow"])

# Load option 2: mlflow.sentence_transformers.load_model returns a SentenceTransformer
loaded_model = mlflow.sentence_transformers.load_model(logged_model.model_uri)
embeddings2 = loaded_model.encode(["hello world", "i am mlflow"])

print(embeddings1)

"""
>> [[-3.44772562e-02  3.10232025e-02  6.73496164e-03  2.61089969e-02
  ...
  2.37922110e-02 -2.28897743e-02  3.89375277e-02  3.02067865e-02]
 [ 4.81191138e-03 -9.33756605e-02  6.95968643e-02  8.09735525e-03
  ...
   6.57437667e-02 -2.72239652e-02  4.02687863e-02 -1.05599344e-01]] 
"""

Model Evaluation

After building and training your MLflow Model, you can use the mlflow.evaluate() API to evaluate its performance on one or more datasets of your choosing. mlflow.evaluate() currently supports evaluation of MLflow Models with the python_function (pyfunc) model flavor for classification, regression, and numerous language modeling tasks (see Evaluating with LLMs), computing a variety of task-specific performance metrics, model performance plots, and model explanations. Evaluation results are logged to MLflow Tracking.

The following example from the MLflow GitHub Repository uses mlflow.evaluate() to evaluate the performance of a classifier on the UCI Adult Data Set, logging a comprehensive collection of MLflow Metrics and Artifacts that provide insight into model performance and behavior:

import xgboost
import shap
import mlflow
from mlflow.models import infer_signature
from sklearn.model_selection import train_test_split

# Load the UCI Adult Dataset
X, y = shap.datasets.adult()

# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.33, random_state=42
)

# Fit an XGBoost binary classifier on the training data split
model = xgboost.XGBClassifier().fit(X_train, y_train)

# Create a model signature
signature = infer_signature(X_test, model.predict(X_test))

# Build the Evaluation Dataset from the test set
eval_data = X_test
eval_data["label"] = y_test

with mlflow.start_run() as run:
    # Log the baseline model to MLflow
    mlflow.sklearn.log_model(model, "model", signature=signature)
    model_uri = mlflow.get_artifact_uri("model")

    # Evaluate the logged model
    result = mlflow.evaluate(
        model_uri,
        eval_data,
        targets="label",
        model_type="classifier",
        evaluators=["default"],
    )

eval_metrics_img eval_importance_img

Evaluating with LLMs

As of MLflow 2.4.0, mlflow.evaluate() has built-in support for a variety of tasks with LLMs, including text summarization, text classification, question answering, and text generation. The following example uses mlflow.evaluate() to evaluate a model that answers questions about MLflow (note that you must have the OPENAI_API_TOKEN environment variable set in your current system environment in order to run the example):

import os
import pandas as pd

import mlflow
import openai

# Create a question answering model using prompt engineering with OpenAI. Log the
# prompt and the model to MLflow Tracking
mlflow.start_run()
system_prompt = (
    "Your job is to answer questions about MLflow. When you are asked a question about MLflow,"
    " respond to it. Make sure to include code examples. If the question is not related to"
    " MLflow, refuse to answer and say that the question is unrelated."
)
mlflow.log_param("system_prompt", system_prompt)
logged_model = mlflow.openai.log_model(
    model="gpt-3.5-turbo",
    task=openai.ChatCompletion,
    artifact_path="model",
    messages=[
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": "{question}"},
    ],
)

# Evaluate the model on some example questions
questions = pd.DataFrame(
    {
        "question": [
            "How do you create a run with MLflow?",
            "How do you log a model with MLflow?",
            "What is the capital of France?",
        ]
    }
)
mlflow.evaluate(
    model=logged_model.model_uri,
    model_type="question-answering",
    data=questions,
)

# Load and inspect the evaluation results
results: pd.DataFrame = mlflow.load_table(
    "eval_results_table.json", extra_columns=["run_id", "params.system_prompt"]
)
print("Evaluation results:")
print(results)

MLflow also provides an Artifact View UI for comparing inputs and outputs across multiple models built with LLMs. For example, after evaluating multiple prompts for question answering (see the MLflow OpenAI question answering full example), you can navigate to the Artifact View to view the questions and compare the answers for each model:

_images/artifact-view-ui.png


For additional examples demonstrating the use of mlflow.evaluate() with LLMs, check out the MLflow LLMs example repository.

Evaluating with Custom Metrics

If the default set of metrics is insufficient, you can supply custom_metrics and custom_artifacts to mlflow.evaluate() to produce custom metrics and artifacts for the model(s) that you’re evaluating. The following short example from the MLflow GitHub Repository uses mlflow.evaluate() with a custom metric function to evaluate the performance of a regressor on the California Housing Dataset.

import os

import matplotlib.pyplot as plt
import numpy as np
from sklearn.datasets import fetch_california_housing
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split

import mlflow
from mlflow.models import infer_signature, make_metric

# loading the California housing dataset
cali_housing = fetch_california_housing(as_frame=True)

# split the dataset into train and test partitions
X_train, X_test, y_train, y_test = train_test_split(
    cali_housing.data, cali_housing.target, test_size=0.2, random_state=123
)

# train the model
lin_reg = LinearRegression().fit(X_train, y_train)

# Infer model signature
predictions = lin_reg.predict(X_train)
signature = infer_signature(X_train, predictions)

# creating the evaluation dataframe
eval_data = X_test.copy()
eval_data["target"] = y_test


def squared_diff_plus_one(eval_df, _builtin_metrics):
    """
    This example custom metric function creates a metric based on the ``prediction`` and
    ``target`` columns in ``eval_df`.
    """
    return np.sum(np.abs(eval_df["prediction"] - eval_df["target"] + 1) ** 2)


def sum_on_target_divided_by_two(_eval_df, builtin_metrics):
    """
    This example custom metric function creates a metric derived from existing metrics in
    ``builtin_metrics``.
    """
    return builtin_metrics["sum_on_target"] / 2


def prediction_target_scatter(eval_df, _builtin_metrics, artifacts_dir):
    """
    This example custom artifact generates and saves a scatter plot to ``artifacts_dir`` that
    visualizes the relationship between the predictions and targets for the given model to a
    file as an image artifact.
    """
    plt.scatter(eval_df["prediction"], eval_df["target"])
    plt.xlabel("Targets")
    plt.ylabel("Predictions")
    plt.title("Targets vs. Predictions")
    plot_path = os.path.join(artifacts_dir, "example_scatter_plot.png")
    plt.savefig(plot_path)
    return {"example_scatter_plot_artifact": plot_path}


with mlflow.start_run() as run:
    mlflow.sklearn.log_model(lin_reg, "model", signature=signature)
    model_uri = mlflow.get_artifact_uri("model")
    result = mlflow.evaluate(
        model=model_uri,
        data=eval_data,
        targets="target",
        model_type="regressor",
        evaluators=["default"],
        custom_metrics=[
            make_metric(
                eval_fn=squared_diff_plus_one,
                greater_is_better=False,
            ),
            make_metric(
                eval_fn=sum_on_target_divided_by_two,
                greater_is_better=True,
            ),
        ],
        custom_artifacts=[prediction_target_scatter],
    )

print(f"metrics:\n{result.metrics}")
print(f"artifacts:\n{result.artifacts}")

For a more comprehensive custom metrics usage example, refer to this example from the MLflow GitHub Repository.

Performing Model Validation

You can also use the mlflow.evaluate() API to perform some checks on the metrics generated during model evaluation to validate the quality of your model. By specifying a validation_thresholds dictionary mapping metric names to mlflow.models.MetricThreshold objects, you can specify value thresholds that your model’s evaluation metrics must exceed as well as absolute and relative gains your model must have in comparison to a specified baseline_model. If your model fails to clear specified thresholds, mlflow.evaluate() will throw a ModelValidationFailedException detailing the validation failure.

import xgboost
import shap
from sklearn.model_selection import train_test_split
from sklearn.dummy import DummyClassifier
import mlflow
from mlflow.models import infer_signature
from mlflow.models import MetricThreshold

# load UCI Adult Data Set; segment it into training and test sets
X, y = shap.datasets.adult()
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.33, random_state=42
)

# train a candidate XGBoost model
candidate_model = xgboost.XGBClassifier().fit(X_train, y_train)

# train a baseline dummy model
baseline_model = DummyClassifier(strategy="uniform").fit(X_train, y_train)

# create signature that is shared by the two models
signature = infer_signature(X_test, y_test)

# construct an evaluation dataset from the test set
eval_data = X_test
eval_data["label"] = y_test

# Define criteria for model to be validated against
thresholds = {
    "accuracy_score": MetricThreshold(
        threshold=0.8,  # accuracy should be >=0.8
        min_absolute_change=0.05,  # accuracy should be at least 0.05 greater than baseline model accuracy
        min_relative_change=0.05,  # accuracy should be at least 5 percent greater than baseline model accuracy
        greater_is_better=True,
    ),
}

with mlflow.start_run() as run:
    candidate_model_uri = mlflow.sklearn.log_model(
        candidate_model, "candidate_model", signature=signature
    ).model_uri
    baseline_model_uri = mlflow.sklearn.log_model(
        baseline_model, "baseline_model", signature=signature
    ).model_uri

    mlflow.evaluate(
        candidate_model_uri,
        eval_data,
        targets="label",
        model_type="classifier",
        validation_thresholds=thresholds,
        baseline_model=baseline_model_uri,
    )

Refer to mlflow.models.MetricThreshold to see details on how the thresholds are specified and checked. For a more comprehensive demonstration on how to use mlflow.evaluate() to perform model validation, refer to the Model Validation example from the MLflow GitHub Repository.

The logged output within the MLflow UI for the comprehensive example is shown below. Note the two model artifacts that have been logged: ‘baseline_model’ and ‘candidate_model’ for comparison purposes in the example.

eval_importance_compare_img

Note

Limitations (when the default evaluator is used):

  • Model validation results are not included in the active MLflow run.

  • No metrics are logged nor artifacts produced for the baseline model in the active MLflow run.

Additional information about model evaluation behaviors and outputs is available in the mlflow.evaluate() API docs.

Note

There are plugins that support in-depth model validation with features that are not supported directly in MLflow. To learn more, see:

Note

Differences in the computation of Area under Curve Precision Recall score (metric name precision_recall_auc) between multi and binary classifiers:

Multiclass classifier models, when evaluated, utilize the standard scoring metric from sklearn: sklearn.metrics.roc_auc_score to calculate the area under the precision recall curve. This algorithm performs a linear interpolation calculation utilizing the trapezoidal rule to estimate the area under the precision recall curve. It is well-suited for use in evaluating multi-class classification models to provide a single numeric value of the quality of fit.

Binary classifier models, on the other hand, use the sklearn.metrics.average_precision_score to avoid the shortcomings of the roc_auc_score implementation when applied to heavily imbalanced classes in binary classification. Usage of the roc_auc_score for imbalanced datasets can give a misleading result (optimistically better than the model’s actual ability to accurately predict the minority class membership).

For additional information on the topic of why different algorithms are employed for this, as well as links to the papers that informed the implementation of these metrics within the sklearn.metrics module, refer to the documentation.

For simplicity purposes, both methodologies evaluation metric results (whether for multi-class or binary classification) are unified in the single metric: precision_recall_auc.

Model Validation with Giskard’s plugin

To extend the validation capabilities of MLflow and anticipate issues before they go to production, a plugin has been built by Giskard allowing users to:

See the following plugin example notebooks for a demo:

For more information on the plugin, see the giskard-mlflow docs.

Model Validation with Trubrics’ plugin

To extend the validation capabilities of MLflow, a plugin has been built by Trubrics allowing users:

  • to use a large number of out-of-the-box validations

  • to validate a run with any custom python functions

  • to view all validation results in a .json file, for diagnosis of why an MLflow run could have failed

See the plugin example notebook for a demo.

For more information on the plugin, see the trubrics-mlflow docs.

Model Customization

While MLflow’s built-in model persistence utilities are convenient for packaging models from various popular ML libraries in MLflow Model format, they do not cover every use case. For example, you may want to use a model from an ML library that is not explicitly supported by MLflow’s built-in flavors. Alternatively, you may want to package custom inference code and data to create an MLflow Model. Fortunately, MLflow provides two solutions that can be used to accomplish these tasks: Custom Python Models and Custom Flavors.

Custom Python Models

The mlflow.pyfunc module provides save_model() and log_model() utilities for creating MLflow Models with the python_function flavor that contain user-specified code and artifact (file) dependencies. These artifact dependencies may include serialized models produced by any Python ML library.

Because these custom models contain the python_function flavor, they can be deployed to any of MLflow’s supported production environments, such as SageMaker, AzureML, or local REST endpoints.

The following examples demonstrate how you can use the mlflow.pyfunc module to create custom Python models. For additional information about model customization with MLflow’s python_function utilities, see the python_function custom models documentation.

Example: Creating a custom “add n” model

This example defines a class for a custom model that adds a specified numeric value, n, to all columns of a Pandas DataFrame input. Then, it uses the mlflow.pyfunc APIs to save an instance of this model with n = 5 in MLflow Model format. Finally, it loads the model in python_function format and uses it to evaluate a sample input.

import mlflow.pyfunc


# Define the model class
class AddN(mlflow.pyfunc.PythonModel):
    def __init__(self, n):
        self.n = n

    def predict(self, context, model_input, params=None):
        return model_input.apply(lambda column: column + self.n)


# Construct and save the model
model_path = "add_n_model"
add5_model = AddN(n=5)
mlflow.pyfunc.save_model(path=model_path, python_model=add5_model)

# Load the model in `python_function` format
loaded_model = mlflow.pyfunc.load_model(model_path)

# Evaluate the model
import pandas as pd

model_input = pd.DataFrame([range(10)])
model_output = loaded_model.predict(model_input)
assert model_output.equals(pd.DataFrame([range(5, 15)]))

Example: Saving an XGBoost model in MLflow format

This example begins by training and saving a gradient boosted tree model using the XGBoost library. Next, it defines a wrapper class around the XGBoost model that conforms to MLflow’s python_function inference API. Then, it uses the wrapper class and the saved XGBoost model to construct an MLflow Model that performs inference using the gradient boosted tree. Finally, it loads the MLflow Model in python_function format and uses it to evaluate test data.

# Load training and test datasets
from sys import version_info
import xgboost as xgb
from sklearn import datasets
from sklearn.model_selection import train_test_split

PYTHON_VERSION = f"{version_info.major}.{version_info.minor}.{version_info.micro}"
iris = datasets.load_iris()
x = iris.data[:, 2:]
y = iris.target
x_train, x_test, y_train, _ = train_test_split(x, y, test_size=0.2, random_state=42)
dtrain = xgb.DMatrix(x_train, label=y_train)

# Train and save an XGBoost model
xgb_model = xgb.train(params={"max_depth": 10}, dtrain=dtrain, num_boost_round=10)
xgb_model_path = "xgb_model.pth"
xgb_model.save_model(xgb_model_path)

# Create an `artifacts` dictionary that assigns a unique name to the saved XGBoost model file.
# This dictionary will be passed to `mlflow.pyfunc.save_model`, which will copy the model file
# into the new MLflow Model's directory.
artifacts = {"xgb_model": xgb_model_path}

# Define the model class
import mlflow.pyfunc


class XGBWrapper(mlflow.pyfunc.PythonModel):
    def load_context(self, context):
        import xgboost as xgb

        self.xgb_model = xgb.Booster()
        self.xgb_model.load_model(context.artifacts["xgb_model"])

    def predict(self, context, model_input, params=None):
        input_matrix = xgb.DMatrix(model_input.values)
        return self.xgb_model.predict(input_matrix)


# Create a Conda environment for the new MLflow Model that contains all necessary dependencies.
import cloudpickle

conda_env = {
    "channels": ["defaults"],
    "dependencies": [
        f"python={PYTHON_VERSION}",
        "pip",
        {
            "pip": [
                f"mlflow=={mlflow.__version__}",
                f"xgboost=={xgb.__version__}",
                f"cloudpickle=={cloudpickle.__version__}",
            ],
        },
    ],
    "name": "xgb_env",
}

# Save the MLflow Model
mlflow_pyfunc_model_path = "xgb_mlflow_pyfunc"
mlflow.pyfunc.save_model(
    path=mlflow_pyfunc_model_path,
    python_model=XGBWrapper(),
    artifacts=artifacts,
    conda_env=conda_env,
)

# Load the model in `python_function` format
loaded_model = mlflow.pyfunc.load_model(mlflow_pyfunc_model_path)

# Evaluate the model
import pandas as pd

test_predictions = loaded_model.predict(pd.DataFrame(x_test))
print(test_predictions)

Example: Logging a transformers model with hf:/ schema to avoid copying large files

This example shows how to use a special schema hf:/ to log a transformers model from huggingface hub directly. This is useful when the model is too large and especially when you want to serve the model directly, but it doesn’t save extra space if you want to download and test the model locally.

import mlflow
from mlflow.models import infer_signature
import numpy as np
import transformers


# Define a custom PythonModel
class QAModel(mlflow.pyfunc.PythonModel):
    def load_context(self, context):
        """
        This method initializes the tokenizer and language model
        using the specified snapshot location from model context.
        """
        snapshot_location = context.artifacts["bert-tiny-model"]
        # Initialize tokenizer and language model
        tokenizer = transformers.AutoTokenizer.from_pretrained(snapshot_location)
        model = transformers.BertForQuestionAnswering.from_pretrained(snapshot_location)
        self.pipeline = transformers.pipeline(
            task="question-answering", model=model, tokenizer=tokenizer
        )

    def predict(self, context, model_input, params=None):
        question = model_input["question"][0]
        if isinstance(question, np.ndarray):
            question = question.item()
        ctx = model_input["context"][0]
        if isinstance(ctx, np.ndarray):
            ctx = ctx.item()
        return self.pipeline(question=question, context=ctx)


# Log the model
data = {"question": "Who's house?", "context": "The house is owned by Run."}
pyfunc_artifact_path = "question_answering_model"
with mlflow.start_run() as run:
    model_info = mlflow.pyfunc.log_model(
        artifact_path=pyfunc_artifact_path,
        python_model=QAModel(),
        artifacts={"bert-tiny-model": "hf:/prajjwal1/bert-tiny"},
        input_example=data,
        signature=infer_signature(data, ["Run"]),
        extra_pip_requirements=["torch", "accelerate", "transformers", "numpy"],
    )

Custom Flavors

You can also create custom MLflow Models by writing a custom flavor.

As discussed in the Model API and Storage Format sections, an MLflow Model is defined by a directory of files that contains an MLmodel configuration file. This MLmodel file describes various model attributes, including the flavors in which the model can be interpreted. The MLmodel file contains an entry for each flavor name; each entry is a YAML-formatted collection of flavor-specific attributes.

To create a new flavor to support a custom model, you define the set of flavor-specific attributes to include in the MLmodel configuration file, as well as the code that can interpret the contents of the model directory and the flavor’s attributes. A detailed example of constructing a custom model flavor and its usage is shown below. New custom flavors not considered for official inclusion into MLflow should be introduced as separate GitHub repositories with documentation provided in the Community Model Flavors page.

Example: Creating a custom “sktime” flavor

This example illustrates the creation of a custom flavor for sktime time series library. The library provides a unified interface for multiple learning tasks including time series forecasting. While the custom flavor in this example is specific in terms of the sktime inference API and model serialization format, its interface design is similar to many of the existing built-in flavors. Particularly, the interface for utilizing the custom model loaded as a python_function flavor for generating predictions uses a single-row Pandas DataFrame configuration argument to expose the paramters of the sktime inference API. The complete code for this example is included in the flavor.py module of the sktime example directory.

Let’s examine the custom flavor module in more detail. The first step is to import several modules inluding sktime library, various MLflow utilities as well as the MLflow pyfunc module which is required to add the pyfunc specification to the MLflow model configuration. Note also the import of the flavor module itself. This will be passed to the mlflow.models.Model.log() method to log the model as an artifact to the current Mlflow run.

import logging
import os
import pickle

import flavor
import mlflow
import numpy as np
import pandas as pd
import sktime
import yaml
from mlflow import pyfunc
from mlflow.exceptions import MlflowException
from mlflow.models import Model
from mlflow.models.model import MLMODEL_FILE_NAME
from mlflow.models.utils import _save_example
from mlflow.protos.databricks_pb2 import INTERNAL_ERROR, INVALID_PARAMETER_VALUE
from mlflow.tracking._model_registry import DEFAULT_AWAIT_MAX_SLEEP_SECONDS
from mlflow.tracking.artifact_utils import _download_artifact_from_uri
from mlflow.utils.environment import (
    _CONDA_ENV_FILE_NAME,
    _CONSTRAINTS_FILE_NAME,
    _PYTHON_ENV_FILE_NAME,
    _REQUIREMENTS_FILE_NAME,
    _mlflow_conda_env,
    _process_conda_env,
    _process_pip_requirements,
    _PythonEnv,
    _validate_env_arguments,
)
from mlflow.utils.file_utils import write_to
from mlflow.utils.model_utils import (
    _add_code_from_conf_to_system_path,
    _get_flavor_configuration,
    _validate_and_copy_code_paths,
    _validate_and_prepare_target_save_path,
)
from mlflow.utils.requirements_utils import _get_pinned_requirement
from sktime.utils.multiindex import flatten_multiindex

_logger = logging.getLogger(__name__)

We continue by defining a set of important variables used throughout the code that follows.

The flavor name needs to be provided for every custom flavor and should reflect the name of the library to be supported. It is saved as part of the flavor-specific attributes to the MLmodel configuration file. This example also defines some sktime specific variables. For illustration purposes, only a subset of the available predict methods to be exposed via the _SktimeModelWrapper class is included when loading the model in its python_function flavor (additional methods could be added in a similar fashion). Additionaly, the model serialization formats, namely pickle (default) and cloudpickle, are defined. Note that both serialization modules require using the same python environment (version) in whatever environment this model is used for inference to ensure that the model will load with the appropriate version of pickle (cloudpickle).

FLAVOR_NAME = "sktime"

SKTIME_PREDICT = "predict"
SKTIME_PREDICT_INTERVAL = "predict_interval"
SKTIME_PREDICT_QUANTILES = "predict_quantiles"
SKTIME_PREDICT_VAR = "predict_var"
SUPPORTED_SKTIME_PREDICT_METHODS = [
    SKTIME_PREDICT,
    SKTIME_PREDICT_INTERVAL,
    SKTIME_PREDICT_QUANTILES,
    SKTIME_PREDICT_VAR,
]

SERIALIZATION_FORMAT_PICKLE = "pickle"
SERIALIZATION_FORMAT_CLOUDPICKLE = "cloudpickle"
SUPPORTED_SERIALIZATION_FORMATS = [
    SERIALIZATION_FORMAT_PICKLE,
    SERIALIZATION_FORMAT_CLOUDPICKLE,
]

Similar to the MLflow built-in flavors, a custom flavor logs the model in MLflow format via the save_model() and log_model() functions. In the save_model() function, the sktime model is saved to a specified output directory. Additionally, save_model() leverages the mlflow.models.Model.add_flavor() and mlflow.models.Model.save() methods to produce the MLmodel configuration containing the sktime and the python_function flavor. The resulting configuration has several flavor-specific attributes, such as the flavor name and sktime_version, which denotes the version of the sktime library that was used to train the model. An example of the output directoy for the custom sktime model is shown below:

# Directory written by flavor.save_model(model, "my_model")
my_model/
├── MLmodel
├── conda.yaml
├── model.pkl
├── python_env.yaml
└── requirements.txt

And its YAML-formatted MLmodel file describes the two flavors:

flavors:
  python_function:
    env:
      conda: conda.yaml
      virtualenv: python_env.yaml
    loader_module: flavor
    model_path: model.pkl
    python_version: 3.8.15
  sktime:
    code: null
    pickled_model: model.pkl
    serialization_format: pickle
    sktime_version: 0.16.0

The save_model() function also provides flexibility to add additional paramters which can be added as flavor-specific attributes to the model configuration. In this example there is only one flavor-specific parameter for specifying the model serialization format. All other paramters are non-flavor specific (for a detailed description of these parameters take a look at mlflow.sklearn.save_model). Note: When creating your own custom flavor, be sure rename the sktime_model parameter in both the save_model() and log_model() functions to reflect the name of your custom model flavor.

def save_model(
    sktime_model,
    path,
    conda_env=None,
    code_paths=None,
    mlflow_model=None,
    signature=None,
    input_example=None,
    pip_requirements=None,
    extra_pip_requirements=None,
    serialization_format=SERIALIZATION_FORMAT_PICKLE,
):
    _validate_env_arguments(conda_env, pip_requirements, extra_pip_requirements)

    if serialization_format not in SUPPORTED_SERIALIZATION_FORMATS:
        raise MlflowException(
            message=(
                f"Unrecognized serialization format: {serialization_format}. "
                "Please specify one of the following supported formats: "
                "{SUPPORTED_SERIALIZATION_FORMATS}."
            ),
            error_code=INVALID_PARAMETER_VALUE,
        )

    _validate_and_prepare_target_save_path(path)
    code_dir_subpath = _validate_and_copy_code_paths(code_paths, path)

    if mlflow_model is None:
        mlflow_model = Model()
    if signature is not None:
        mlflow_model.signature = signature
    if input_example is not None:
        _save_example(mlflow_model, input_example, path)

    model_data_subpath = "model.pkl"
    model_data_path = os.path.join(path, model_data_subpath)
    _save_model(
        sktime_model, model_data_path, serialization_format=serialization_format
    )

    pyfunc.add_to_model(
        mlflow_model,
        loader_module="flavor",
        model_path=model_data_subpath,
        conda_env=_CONDA_ENV_FILE_NAME,
        python_env=_PYTHON_ENV_FILE_NAME,
        code=code_dir_subpath,
    )

    mlflow_model.add_flavor(
        FLAVOR_NAME,
        pickled_model=model_data_subpath,
        sktime_version=sktime.__version__,
        serialization_format=serialization_format,
        code=code_dir_subpath,
    )
    mlflow_model.save(os.path.join(path, MLMODEL_FILE_NAME))

    if conda_env is None:
        if pip_requirements is None:
            include_cloudpickle = (
                serialization_format == SERIALIZATION_FORMAT_CLOUDPICKLE
            )
            default_reqs = get_default_pip_requirements(include_cloudpickle)
            inferred_reqs = mlflow.models.infer_pip_requirements(
                path, FLAVOR_NAME, fallback=default_reqs
            )
            default_reqs = sorted(set(inferred_reqs).union(default_reqs))
        else:
            default_reqs = None
        conda_env, pip_requirements, pip_constraints = _process_pip_requirements(
            default_reqs, pip_requirements, extra_pip_requirements
        )
    else:
        conda_env, pip_requirements, pip_constraints = _process_conda_env(conda_env)

    with open(os.path.join(path, _CONDA_ENV_FILE_NAME), "w") as f:
        yaml.safe_dump(conda_env, stream=f, default_flow_style=False)

    if pip_constraints:
        write_to(os.path.join(path, _CONSTRAINTS_FILE_NAME), "\n".join(pip_constraints))

    write_to(os.path.join(path, _REQUIREMENTS_FILE_NAME), "\n".join(pip_requirements))

    _PythonEnv.current().to_yaml(os.path.join(path, _PYTHON_ENV_FILE_NAME))


def _save_model(model, path, serialization_format):
    with open(path, "wb") as out:
        if serialization_format == SERIALIZATION_FORMAT_PICKLE:
            pickle.dump(model, out)
        else:
            import cloudpickle

            cloudpickle.dump(model, out)

The save_model() function also writes the model dependencies to a requirements.txt and conda.yaml file in the model output directory. For this purpose the set of pip dependecies produced by this flavor need to be added to the get_default_pip_requirements() function. In this example only the minimum required dependencies are provided. In practice, additional requirements needed for preprocessing or post-processing steps could be included. Note that for any custom flavor, the mlflow.models.infer_pip_requirements() method in the save_model() function will return the default requirements defined in get_default_pip_requirements() as package imports are only inferred for built-in flavors.

def get_default_pip_requirements(include_cloudpickle=False):
    pip_deps = [_get_pinned_requirement("sktime")]
    if include_cloudpickle:
        pip_deps += [_get_pinned_requirement("cloudpickle")]

    return pip_deps


def get_default_conda_env(include_cloudpickle=False):
    return _mlflow_conda_env(
        additional_pip_deps=get_default_pip_requirements(include_cloudpickle)
    )

Next, we add the log_model() function. This function is little more than a wrapper around the mlflow.models.Model.log() method to enable logging our custom model as an artifact to the curren MLflow run. Any flavor-specific parameters (e.g. serialization_format) introduced in the save_model() function also need to be added in the log_model() function. We also need to pass the flavor module to the mlflow.models.Model.log() method which internally calls the save_model() function from above to persist the model.

def log_model(
    sktime_model,
    artifact_path,
    conda_env=None,
    code_paths=None,
    registered_model_name=None,
    signature=None,
    input_example=None,
    await_registration_for=DEFAULT_AWAIT_MAX_SLEEP_SECONDS,
    pip_requirements=None,
    extra_pip_requirements=None,
    serialization_format=SERIALIZATION_FORMAT_PICKLE,
    **kwargs,
):
    return Model.log(
        artifact_path=artifact_path,
        flavor=flavor,
        registered_model_name=registered_model_name,
        sktime_model=sktime_model,
        conda_env=conda_env,
        code_paths=code_paths,
        signature=signature,
        input_example=input_example,
        await_registration_for=await_registration_for,
        pip_requirements=pip_requirements,
        extra_pip_requirements=extra_pip_requirements,
        serialization_format=serialization_format,
        **kwargs,
    )

To interpret model directories produced by save_model(), the custom flavor must also define a load_model() function. The load_model() function reads the MLmodel configuration from the specified model directory and uses the configuration attributes to load and return the sktime model from its serialized representation.

def load_model(model_uri, dst_path=None):
    local_model_path = _download_artifact_from_uri(
        artifact_uri=model_uri, output_path=dst_path
    )
    flavor_conf = _get_flavor_configuration(
        model_path=local_model_path, flavor_name=FLAVOR_NAME
    )
    _add_code_from_conf_to_system_path(local_model_path, flavor_conf)
    sktime_model_file_path = os.path.join(
        local_model_path, flavor_conf["pickled_model"]
    )
    serialization_format = flavor_conf.get(
        "serialization_format", SERIALIZATION_FORMAT_PICKLE
    )
    return _load_model(
        path=sktime_model_file_path, serialization_format=serialization_format
    )


def _load_model(path, serialization_format):
    with open(path, "rb") as pickled_model:
        if serialization_format == SERIALIZATION_FORMAT_PICKLE:
            return pickle.load(pickled_model)
        elif serialization_format == SERIALIZATION_FORMAT_CLOUDPICKLE:
            import cloudpickle

            return cloudpickle.load(pickled_model)

The _load_pyfunc() function will be called by the mlflow.pyfunc.load_model() method to load the custom model flavor as a pyfunc type. The MLmodel flavor configuration is used to pass any flavor-specific attributes to the _load_model() function (i.e., the path to the python_function flavor in the model directory and the model serialization format).

def _load_pyfunc(path):
    try:
        sktime_flavor_conf = _get_flavor_configuration(
            model_path=path, flavor_name=FLAVOR_NAME
        )
        serialization_format = sktime_flavor_conf.get(
            "serialization_format", SERIALIZATION_FORMAT_PICKLE
        )
    except MlflowException:
        _logger.warning(
            "Could not find sktime flavor configuration during model "
            "loading process. Assuming 'pickle' serialization format."
        )
        serialization_format = SERIALIZATION_FORMAT_PICKLE

    pyfunc_flavor_conf = _get_flavor_configuration(
        model_path=path, flavor_name=pyfunc.FLAVOR_NAME
    )
    path = os.path.join(path, pyfunc_flavor_conf["model_path"])

    return _SktimeModelWrapper(
        _load_model(path, serialization_format=serialization_format)
    )

The final step is to create the model wrapper class defining the python_function flavor. The design of the wrapper class determines how the flavor’s inference API is exposed when making predictions using the python_function flavor. Just like the built-in flavors, the predict() method of the sktime wrapper class accepts a single-row Pandas DataFrame configuration argument. For an example of how to construct this configuration DataFrame refer to the usage example in the next section. A detailed description of the supported paramaters and input formats is provided in the flavor.py module docstrings.

class _SktimeModelWrapper:
    def __init__(self, sktime_model):
        self.sktime_model = sktime_model

    def predict(self, dataframe, params=None) -> pd.DataFrame:
        df_schema = dataframe.columns.values.tolist()

        if len(dataframe) > 1:
            raise MlflowException(
                f"The provided prediction pd.DataFrame contains {len(dataframe)} rows. "
                "Only 1 row should be supplied.",
                error_code=INVALID_PARAMETER_VALUE,
            )

        # Convert the configuration dataframe into a dictionary to simplify the
        # extraction of parameters passed to the sktime predcition methods.
        attrs = dataframe.to_dict(orient="index").get(0)
        predict_method = attrs.get("predict_method")

        if not predict_method:
            raise MlflowException(
                f"The provided prediction configuration pd.DataFrame columns ({df_schema}) do not "
                "contain the required column `predict_method` for specifying the prediction method.",
                error_code=INVALID_PARAMETER_VALUE,
            )

        if predict_method not in SUPPORTED_SKTIME_PREDICT_METHODS:
            raise MlflowException(
                "Invalid `predict_method` value."
                f"The supported prediction methods are {SUPPORTED_SKTIME_PREDICT_METHODS}",
                error_code=INVALID_PARAMETER_VALUE,
            )

        # For inference parameters 'fh', 'X', 'coverage', 'alpha', and 'cov'
        # the respective sktime default value is used if the value was not
        # provided in the configuration dataframe.
        fh = attrs.get("fh", None)

        # Any model that is trained with exogenous regressor elements will need
        # to provide `X` entries as a numpy ndarray to the predict method.
        X = attrs.get("X", None)

        # When the model is served via REST API the exogenous regressor must be
        # provided as a list to the configuration DataFrame to be JSON serializable.
        # Below we convert the list back to ndarray type as required by sktime
        # predict methods.
        if isinstance(X, list):
            X = np.array(X)

        # For illustration purposes only a subset of the available sktime prediction
        # methods is exposed. Additional methods (e.g. predict_proba) could be added
        # in a similar fashion.
        if predict_method == SKTIME_PREDICT:
            predictions = self.sktime_model.predict(fh=fh, X=X)

        if predict_method == SKTIME_PREDICT_INTERVAL:
            coverage = attrs.get("coverage", 0.9)
            predictions = self.sktime_model.predict_interval(
                fh=fh, X=X, coverage=coverage
            )

        if predict_method == SKTIME_PREDICT_QUANTILES:
            alpha = attrs.get("alpha", None)
            predictions = self.sktime_model.predict_quantiles(fh=fh, X=X, alpha=alpha)

        if predict_method == SKTIME_PREDICT_VAR:
            cov = attrs.get("cov", False)
            predictions = self.sktime_model.predict_var(fh=fh, X=X, cov=cov)

        # Methods predict_interval() and predict_quantiles() return a pandas
        # MultiIndex column structure. As MLflow signature inference does not
        # support MultiIndex column structure the columns must be flattened.
        if predict_method in [SKTIME_PREDICT_INTERVAL, SKTIME_PREDICT_QUANTILES]:
            predictions.columns = flatten_multiindex(predictions)

        return predictions

Example: Using the custom “sktime” flavor

This example trains a sktime NaiveForecaster model using the Longley dataset for forecasting with exogenous variables. It shows a custom model type implementation that logs the training hyper-parameters, evaluation metrics and the trained model as an artifact. The single-row configuration DataFrame for this example defines an interval forecast with nominal coverage values [0.9,0.95], a future forecast horizon of four periods, and an exogenous regressor.

import json

import flavor
import pandas as pd
from sktime.datasets import load_longley
from sktime.forecasting.model_selection import temporal_train_test_split
from sktime.forecasting.naive import NaiveForecaster
from sktime.performance_metrics.forecasting import (
    mean_absolute_error,
    mean_absolute_percentage_error,
)

import mlflow

ARTIFACT_PATH = "model"

with mlflow.start_run() as run:
    y, X = load_longley()
    y_train, y_test, X_train, X_test = temporal_train_test_split(y, X)

    forecaster = NaiveForecaster()
    forecaster.fit(
        y_train,
        X=X_train,
    )

    # Extract parameters
    parameters = forecaster.get_params()

    # Evaluate model
    y_pred = forecaster.predict(fh=[1, 2, 3, 4], X=X_test)
    metrics = {
        "mae": mean_absolute_error(y_test, y_pred),
        "mape": mean_absolute_percentage_error(y_test, y_pred),
    }

    print(f"Parameters: \n{json.dumps(parameters, indent=2)}")
    print(f"Metrics: \n{json.dumps(metrics, indent=2)}")

    # Log parameters and metrics
    mlflow.log_params(parameters)
    mlflow.log_metrics(metrics)

    # Log model using custom model flavor with pickle serialization (default).
    flavor.log_model(
        sktime_model=forecaster,
        artifact_path=ARTIFACT_PATH,
        serialization_format="pickle",
    )
    model_uri = mlflow.get_artifact_uri(ARTIFACT_PATH)

# Load model in native sktime flavor and pyfunc flavor
loaded_model = flavor.load_model(model_uri=model_uri)
loaded_pyfunc = flavor.pyfunc.load_model(model_uri=model_uri)

# Convert test data to 2D numpy array so it can be passed to pyfunc predict using
# a single-row Pandas DataFrame configuration argument
X_test_array = X_test.to_numpy()

# Create configuration DataFrame
predict_conf = pd.DataFrame(
    [
        {
            "fh": [1, 2, 3, 4],
            "predict_method": "predict_interval",
            "coverage": [0.9, 0.95],
            "X": X_test_array,
        }
    ]
)

# Generate interval forecasts with native sktime flavor and pyfunc flavor
print(
    f"\nNative sktime 'predict_interval':\n${loaded_model.predict_interval(fh=[1, 2, 3], X=X_test, coverage=[0.9, 0.95])}"
)
print(f"\nPyfunc 'predict_interval':\n${loaded_pyfunc.predict(predict_conf)}")

# Print the run id wich is used for serving the model to a local REST API endpoint
print(f"\nMLflow run id:\n{run.info.run_id}")

When opening the MLflow runs detail page the serialized model artifact will show up, such as:

_images/tracking_artifact_ui_custom_flavor.png

To serve the model to a local REST API endpoint run the following MLflow CLI command substituting the run id printed during execution of the previous block (for more details refer to the Deploy MLflow models section):

mlflow models serve -m runs:/<run_id>/model --env-manager local --host 127.0.0.1

An example of requesting a prediction from the served model is shown below. The exogenous regressor needs to be provided as a list to be JSON serializable. The wrapper instance will convert the list back to numpy ndarray type as required by sktime inference API.

import pandas as pd
import requests

from sktime.datasets import load_longley
from sktime.forecasting.model_selection import temporal_train_test_split

y, X = load_longley()
y_train, y_test, X_train, X_test = temporal_train_test_split(y, X)

# Define local host and endpoint url
host = "127.0.0.1"
url = f"http://{host}:5000/invocations"

# Create configuration DataFrame
X_test_list = X_test.to_numpy().tolist()
predict_conf = pd.DataFrame(
    [
        {
            "fh": [1, 2, 3, 4],
            "predict_method": "predict_interval",
            "coverage": [0.9, 0.95],
            "X": X_test_list,
        }
    ]
)

# Create dictionary with pandas DataFrame in the split orientation
json_data = {"dataframe_split": predict_conf.to_dict(orient="split")}

# Score model
response = requests.post(url, json=json_data)
print(f"\nPyfunc 'predict_interval':\n${response.json()}")

Built-In Deployment Tools

MLflow provides tools for deploying MLflow models on a local machine and to several production environments. Not all deployment methods are available for all model flavors.

Deploy MLflow models

MLflow can deploy models locally as local REST API endpoints or to directly score files. In addition, MLflow can package models as self-contained Docker images with the REST API endpoint. The image can be used to safely deploy the model to various environments such as Kubernetes.

You deploy MLflow model locally or generate a Docker image using the CLI interface to the mlflow.models module.

The REST API defines 4 endpoints:

  • /ping used for health check

  • /health (same as /ping)

  • /version used for getting the mlflow version

  • /invocations used for scoring

The REST API server accepts csv or json input. The input format must be specified in Content-Type header. The value of the header must be either application/json or application/csv.

The csv input must be a valid pandas.DataFrame csv representation. For example, data = pandas_df.to_csv().

The json input must be a dictionary with exactly one of the following fields that further specify the type and encoding of the input data

  • dataframe_split field with pandas DataFrames in the split orientation. For example, data = {"dataframe_split": pandas_df.to_dict(orient='split').

  • dataframe_records field with pandas DataFrame in the records orientation. For example, data = {"dataframe_records": pandas_df.to_dict(orient='records').*We do not recommend using this format because it is not guaranteed to preserve column ordering.*

  • instances field with tensor input formatted as described in TF Serving’s API docs where the provided inputs will be cast to Numpy arrays.

  • inputs field with tensor input formatted as described in TF Serving’s API docs where the provided inputs will be cast to Numpy arrays.

The json input also has an optional field params that can be used to pass additional parameters. Valid parameters types are Union[DataType, List[DataType], None] where DataType is MLflow data types. In order to pass params, a valid Model Signature with params must be defined.

Note

Since JSON loses type information, MLflow will cast the JSON input to the input type specified in the model’s schema if available. If your model is sensitive to input types, it is recommended that a schema is provided for the model to ensure that type mismatch errors do not occur at inference time. In particular, DL models are typically strict about input types and will need model schema in order for the model to score correctly. For complex data types, see Encoding complex data below.

Example requests:

# split-oriented DataFrame input
curl http://127.0.0.1:5000/invocations -H 'Content-Type: application/json' -d '{
  "dataframe_split": {
      "columns": ["a", "b", "c"],
      "data": [[1, 2, 3], [4, 5, 6]]
  }
}'

# record-oriented DataFrame input (fine for vector rows, loses ordering for JSON records)
curl http://127.0.0.1:5000/invocations -H 'Content-Type: application/json' -d '{
  "dataframe_records": [
    {"a": 1,"b": 2,"c": 3},
    {"a": 4,"b": 5,"c": 6}
  ]
}'

# numpy/tensor input using TF serving's "instances" format
curl http://127.0.0.1:5000/invocations -H 'Content-Type: application/json' -d '{
    "instances": [
        {"a": "s1", "b": 1, "c": [1, 2, 3]},
        {"a": "s2", "b": 2, "c": [4, 5, 6]},
        {"a": "s3", "b": 3, "c": [7, 8, 9]}
    ]
}'

# numpy/tensor input using TF serving's "inputs" format
curl http://127.0.0.1:5000/invocations -H 'Content-Type: application/json' -d '{
    "inputs": {"a": ["s1", "s2", "s3"], "b": [1, 2, 3], "c": [[1, 2, 3], [4, 5, 6], [7, 8, 9]]}
}'

# inference with params
curl http://127.0.0.1:5000/invocations -H 'Content-Type: application/json' -d '{
    "inputs": {"question": ["What color is it?"],
               "context": ["Some people said it was green but I know that it is pink."]},
    "params": {"max_answer_len": 10}
}'

For more information about serializing pandas DataFrames, see pandas.DataFrame.to_json.

For more information about serializing tensor inputs using the TF serving format, see TF serving’s request format docs.

Serving with MLServer

Python models can be deployed using Seldon’s MLServer as alternative inference server. MLServer is integrated with two leading open source model deployment tools, Seldon Core and KServe (formerly known as KFServing), and can be used to test and deploy models using these frameworks. This is especially powerful when building docker images since the docker image built with MLServer can be deployed directly with both of these frameworks.

MLServer exposes the same scoring API through the /invocations endpoint. In addition, it supports the standard V2 Inference Protocol.

Note

To use MLServer with MLflow, please install mlflow as:

pip install mlflow[extras]

To serve a MLflow model using MLServer, you can use the --enable-mlserver flag, such as:

mlflow models serve -m my_model --enable-mlserver

Similarly, to build a Docker image built with MLServer you can use the --enable-mlserver flag, such as:

mlflow models build-docker -m my_model --enable-mlserver -n my-model

To read more about the integration between MLflow and MLServer, please check the end-to-end example in the MLServer documentation or visit the MLServer docs.

Encoding complex data

Complex data types, such as dates or binary, do not have a native JSON representation. If you include a model signature, MLflow can automatically decode supported data types from JSON. The following data type conversions are supported:

  • binary: data is expected to be base64 encoded, MLflow will automatically base64 decode.

  • datetime: data is expected as string according to ISO 8601 specification. MLflow will parse this into the appropriate datetime representation on the given platform.

Example requests:

# record-oriented DataFrame input with binary column "b"
curl http://127.0.0.1:5000/invocations -H 'Content-Type: application/json' -d '[
    {"a": 0, "b": "dGVzdCBiaW5hcnkgZGF0YSAw"},
    {"a": 1, "b": "dGVzdCBiaW5hcnkgZGF0YSAx"},
    {"a": 2, "b": "dGVzdCBiaW5hcnkgZGF0YSAy"}
]'

# record-oriented DataFrame input with datetime column "b"
curl http://127.0.0.1:5000/invocations -H 'Content-Type: application/json' -d '[
    {"a": 0, "b": "2020-01-01T00:00:00Z"},
    {"a": 1, "b": "2020-02-01T12:34:56Z"},
    {"a": 2, "b": "2021-03-01T00:00:00Z"}
]'

Command Line Interface

MLflow also has a CLI that supports the following commands:

  • serve deploys the model as a local REST API server.

  • build_docker packages a REST API endpoint serving the model as a docker image.

  • predict uses the model to generate a prediction for a local CSV or JSON file. Note that this method only supports DataFrame input.

For more info, see:

mlflow models --help
mlflow models serve --help
mlflow models predict --help
mlflow models build-docker --help

Environment Management Tools

MLflow currently supports the following environment management tools to restore model environments:

local

Use the local environment. No extra tools are required.

virtualenv (preferred)

Create environments using virtualenv and pyenv (for python version management). Virtualenv and pyenv (for Linux and macOS) or pyenv-win (for Windows) must be installed for this mode of environment reconstruction.

conda

Create environments using conda. Conda must be installed for this mode of environment reconstruction.

Warning

By using conda, you’re responsible for adhering to Anaconda’s terms of service.

The mlflow models CLI commands provide an optional --env-manager argument that selects a specific environment management configuration to be used, as shown below:

# Use virtualenv
mlflow models predict ... --env-manager=virtualenv
# Use conda
mlflow models serve ... --env-manager=conda

Deploy a python_function model on Microsoft Azure ML

The MLflow plugin azureml-mlflow can deploy models to Azure ML, either to Azure Kubernetes Service (AKS) or Azure Container Instances (ACI) for real-time serving.

The resulting deployment accepts the following data formats as input:

  • JSON-serialized pandas DataFrames in the split orientation. For example, data = pandas_df.to_json(orient='split'). This format is specified using a Content-Type request header value of application/json.

Warning

The TensorSpec input format is not fully supported for deployments on Azure Machine Learning at the moment. Be aware that many autolog() implementations may use TensorSpec for model’s signatures when logging models and hence those deployments will fail in Azure ML.

Deployments can be generated using both the Python API or MLflow CLI. In both cases, a JSON configuration file can be indicated with the details of the deployment you want to achieve. If not indicated, then a default deployment is done using Azure Container Instances (ACI) and a minimal configuration. The full specification of this configuration file can be checked at Deployment configuration schema. Also, you will also need the Azure ML MLflow Tracking URI of your particular Azure ML Workspace where you want to deploy your model. You can obtain this URI in several ways:

  • Through the Azure ML Studio:

    • Navigate to Azure ML Studio and select the workspace you are working on.

    • Click on the name of the workspace at the upper right corner of the page.

    • Click “View all properties in Azure Portal” on the pane popup.

    • Copy the MLflow tracking URI value from the properties section.

  • Programmatically, using Azure ML SDK with the method Workspace.get_mlflow_tracking_uri(). If you are running inside Azure ML Compute, like for instance a Compute Instance, you can get this value also from the environment variable os.environ["MLFLOW_TRACKING_URI"].

  • Manually, for a given Subscription ID, Resource Group and Azure ML Workspace, the URI is as follows: azureml://eastus.api.azureml.ms/mlflow/v1.0/subscriptions/<SUBSCRIPTION_ID>/resourceGroups/<RESOURCE_GROUP_NAME>/providers/Microsoft.MachineLearningServices/workspaces/<WORKSPACE_NAME>

Configuration example for ACI deployment

{
  "computeType": "aci",
  "containerResourceRequirements":
  {
    "cpu": 1,
    "memoryInGB": 1
  },
  "location": "eastus2",
}
Remarks:
  • If containerResourceRequirements is not indicated, a deployment with minimal compute configuration is applied (cpu: 0.1 and memory: 0.5).

  • If location is not indicated, it defaults to the location of the workspace.

Configuration example for an AKS deployment

{
  "computeType": "aks",
  "computeTargetName": "aks-mlflow"
}
Remarks:
  • In above example, aks-mlflow is the name of an Azure Kubernetes Cluster registered/created in Azure Machine Learning.

The following examples show how to create a deployment in ACI. Please, ensure you have azureml-mlflow installed before continuing.

Example: Workflow using the Python API

import json
from mlflow.deployments import get_deploy_client

# Create the deployment configuration.
# If no deployment configuration is provided, then the deployment happens on ACI.
deploy_config = {"computeType": "aci"}

# Write the deployment configuration into a file.
deployment_config_path = "deployment_config.json"
with open(deployment_config_path, "w") as outfile:
    outfile.write(json.dumps(deploy_config))

# Set the tracking uri in the deployment client.
client = get_deploy_client("<azureml-mlflow-tracking-url>")

# MLflow requires the deployment configuration to be passed as a dictionary.
config = {"deploy-config-file": deployment_config_path}
model_name = "mymodel"
model_version = 1

# define the model path and the name is the service name
# if model is not registered, it gets registered automatically and a name is autogenerated using the "name" parameter below
client.create_deployment(
    model_uri=f"models:/{model_name}/{model_version}",
    config=config,
    name="mymodel-aci-deployment",
)

# After the model deployment completes, requests can be posted via HTTP to the new ACI
# webservice's scoring URI.
print("Scoring URI is: %s", webservice.scoring_uri)

# The following example posts a sample input from the wine dataset
# used in the MLflow ElasticNet example:
# https://github.com/mlflow/mlflow/tree/master/examples/sklearn_elasticnet_wine

# `sample_input` is a JSON-serialized pandas DataFrame with the `split` orientation
import requests
import json

# `sample_input` is a JSON-serialized pandas DataFrame with the `split` orientation
sample_input = {
    "columns": [
        "alcohol",
        "chlorides",
        "citric acid",
        "density",
        "fixed acidity",
        "free sulfur dioxide",
        "pH",
        "residual sugar",
        "sulphates",
        "total sulfur dioxide",
        "volatile acidity",
    ],
    "data": [[8.8, 0.045, 0.36, 1.001, 7, 45, 3, 20.7, 0.45, 170, 0.27]],
}
response = requests.post(
    url=webservice.scoring_uri,
    data=json.dumps(sample_input),
    headers={"Content-type": "application/json"},
)
response_json = json.loads(response.text)
print(response_json)

Example: Workflow using the MLflow CLI

echo "{ computeType: aci }" > deployment_config.json
mlflow deployments create --name <deployment-name> -m models:/<model-name>/<model-version> -t <azureml-mlflow-tracking-url> --deploy-config-file deployment_config.json

# After the deployment completes, requests can be posted via HTTP to the new ACI
# webservice's scoring URI.

scoring_uri=$(az ml service show --name <deployment-name> -v | jq -r ".scoringUri")

# The following example posts a sample input from the wine dataset
# used in the MLflow ElasticNet example:
# https://github.com/mlflow/mlflow/tree/master/examples/sklearn_elasticnet_wine

# `sample_input` is a JSON-serialized pandas DataFrame with the `split` orientation
sample_input='
{
    "columns": [
        "alcohol",
        "chlorides",
        "citric acid",
        "density",
        "fixed acidity",
        "free sulfur dioxide",
        "pH",
        "residual sugar",
        "sulphates",
        "total sulfur dioxide",
        "volatile acidity"
    ],
    "data": [
        [8.8, 0.045, 0.36, 1.001, 7, 45, 3, 20.7, 0.45, 170, 0.27]
    ]
}'

echo $sample_input | curl -s -X POST $scoring_uri\
-H 'Cache-Control: no-cache'\
-H 'Content-Type: application/json'\
-d @-

You can also test your deployments locally first using the option run-local:

mlflow deployments run-local --name <deployment-name> -m models:/<model-name>/<model-version> -t <azureml-mlflow-tracking-url>

For more info, see:

mlflow deployments help -t azureml

Deploy a python_function model on Amazon SageMaker

The mlflow.deployments and mlflow.sagemaker modules can deploy python_function models locally in a Docker container with SageMaker compatible environment and remotely on SageMaker. To deploy remotely to SageMaker you need to set up your environment and user accounts. To export a custom model to SageMaker, you need a MLflow-compatible Docker image to be available on Amazon ECR. MLflow provides a default Docker image definition; however, it is up to you to build the image and upload it to ECR. MLflow includes the utility function build_and_push_container to perform this step. Once built and uploaded, you can use the MLflow container for all MLflow Models. Model webservers deployed using the mlflow.deployments module accept the following data formats as input, depending on the deployment flavor:

  • python_function: For this deployment flavor, the endpoint accepts the same formats described in the local model deployment documentation.

  • mleap: For this deployment flavor, the endpoint accepts only JSON-serialized pandas DataFrames in the split orientation. For example, data = pandas_df.to_json(orient='split'). This format is specified using a Content-Type request header value of application/json.

Commands

  • mlflow deployments run-local -t sagemaker deploys the model locally in a Docker container. The image and the environment should be identical to how the model would be run remotely and it is therefore useful for testing the model prior to deployment.

  • mlflow sagemaker build-and-push-container builds an MLfLow Docker image and uploads it to ECR. The caller must have the correct permissions set up. The image is built locally and requires Docker to be present on the machine that performs this step.

  • mlflow deployments create -t sagemaker deploys the model on Amazon SageMaker. MLflow uploads the Python Function model into S3 and starts an Amazon SageMaker endpoint serving the model.

Example workflow using the MLflow CLI

mlflow sagemaker build-and-push-container  # build the container (only needs to be called once)
mlflow deployments run-local -t sagemaker --name <deployment-name> -m <path-to-model>  # test the model locally
mlflow deployments sagemaker create -t  # deploy the model remotely

For more info, see:

mlflow sagemaker --help
mlflow sagemaker build-and-push-container --help
mlflow deployments run-local --help
mlflow deployments help -t sagemaker

Export a python_function model as an Apache Spark UDF

You can output a python_function model as an Apache Spark UDF, which can be uploaded to a Spark cluster and used to score the model.

Example

from pyspark.sql.functions import struct
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, "<path-to-model>")
df = spark_df.withColumn("prediction", pyfunc_udf(struct([...])))

If a model contains a signature, the UDF can be called without specifying column name arguments. In this case, the UDF will be called with column names from signature, so the evaluation dataframe’s column names must match the model signature’s column names.

Example

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, "<path-to-model-with-signature>")
df = spark_df.withColumn("prediction", pyfunc_udf())

If a model contains a signature with tensor spec inputs, you will need to pass a column of array type as a corresponding UDF argument. The values in this column must be comprised of one-dimensional arrays. The UDF will reshape the array values to the required shape with ‘C’ order (i.e. read / write the elements using C-like index order) and cast the values as the required tensor spec type. For example, assuming a model requires input ‘a’ of shape (-1, 2, 3) and input ‘b’ of shape (-1, 4, 5). In order to perform inference on this data, we need to prepare a Spark DataFrame with column ‘a’ containing arrays of length 6 and column ‘b’ containing arrays of length 20. We can then invoke the UDF like following example code:

Example

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
# Assuming the model requires input 'a' of shape (-1, 2, 3) and input 'b' of shape (-1, 4, 5)
model_path = "<path-to-model-requiring-multidimensional-inputs>"
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_path)
# The `spark_df` has column 'a' containing arrays of length 6 and
# column 'b' containing arrays of length 20
df = spark_df.withColumn("prediction", pyfunc_udf(struct("a", "b")))

The resulting UDF is based on Spark’s Pandas UDF and is currently limited to producing either a single value, an array of values, or a struct containing multiple field values of the same type per observation. By default, we return the first numeric column as a double. You can control what result is returned by supplying result_type argument. The following values are supported:

  • 'int' or IntegerType: The leftmost integer that can fit in int32 result is returned or an exception is raised if there are none.

  • 'long' or LongType: The leftmost long integer that can fit in int64 result is returned or an exception is raised if there are none.

  • ArrayType (IntegerType | LongType): Return all integer columns that can fit into the requested size.

  • 'float' or FloatType: The leftmost numeric result cast to float32 is returned or an exception is raised if there are no numeric columns.

  • 'double' or DoubleType: The leftmost numeric result cast to double is returned or an exception is raised if there are no numeric columns.

  • ArrayType ( FloatType | DoubleType ): Return all numeric columns cast to the requested type. An exception is raised if there are no numeric columns.

  • 'string' or StringType: Result is the leftmost column cast as string.

  • ArrayType ( StringType ): Return all columns cast as string.

  • 'bool' or 'boolean' or BooleanType: The leftmost column cast to bool is returned or an exception is raised if the values cannot be coerced.

  • 'field1 FIELD1_TYPE, field2 FIELD2_TYPE, ...': A struct type containing multiple fields separated by comma, each field type must be one of types listed above.

Example

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
# Suppose the PyFunc model `predict` method returns a dict like:
# `{'prediction': 1-dim_array, 'probability': 2-dim_array}`
# You can supply result_type to be a struct type containing
# 2 fields 'prediction' and 'probability' like following.
pyfunc_udf = mlflow.pyfunc.spark_udf(
    spark, "<path-to-model>", result_type="prediction float, probability: array<float>"
)
df = spark_df.withColumn("prediction", pyfunc_udf())

Example

from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import struct
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
pyfunc_udf = mlflow.pyfunc.spark_udf(
    spark, "path/to/model", result_type=ArrayType(FloatType())
)
# The prediction column will contain all the numeric columns returned by the model as floats
df = spark_df.withColumn("prediction", pyfunc_udf(struct("name", "age")))

If you want to use conda to restore the python environment that was used to train the model, set the env_manager argument when calling mlflow.pyfunc.spark_udf().

Example

from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import struct
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
pyfunc_udf = mlflow.pyfunc.spark_udf(
    spark,
    "path/to/model",
    result_type=ArrayType(FloatType()),
    env_manager="conda",  # Use conda to restore the environment used in training
)
df = spark_df.withColumn("prediction", pyfunc_udf(struct("name", "age")))

Deployment to Custom Targets

In addition to the built-in deployment tools, MLflow provides a pluggable mlflow.deployments Python API and mlflow deployments CLI for deploying models to custom targets and environments. To deploy to a custom target, you must first install an appropriate third-party Python plugin. See the list of known community-maintained plugins here.

Commands

The mlflow deployments CLI contains the following commands, which can also be invoked programmatically using the mlflow.deployments Python API:

  • Create: Deploy an MLflow model to a specified custom target

  • Delete: Delete a deployment

  • Update: Update an existing deployment, for example to deploy a new model version or change the deployment’s configuration (e.g. increase replica count)

  • List: List IDs of all deployments

  • Get: Print a detailed description of a particular deployment

  • Run Local: Deploy the model locally for testing

  • Help: Show the help string for the specified target

For more info, see:

mlflow deployments --help
mlflow deployments create --help
mlflow deployments delete --help
mlflow deployments update --help
mlflow deployments list --help
mlflow deployments get --help
mlflow deployments run-local --help
mlflow deployments help --help

Community Model Flavors

Go to the Community Model Flavors page to get an overview of other useful MLflow flavors, which are developed and maintained by the MLflow community.