mlflow.spark

The mlflow.spark module provides an API for logging and loading Spark MLlib models. This module exports Spark MLlib models with the following flavors:

Spark MLlib (native) format

Allows models to be loaded as Spark Transformers for scoring in a Spark session. Models with this flavor can be loaded as PySpark PipelineModel objects in Python. This is the main flavor and is always produced.

mlflow.pyfunc

Supports deployment outside of Spark by instantiating a SparkContext and reading input data as a Spark DataFrame prior to scoring. Also supports deployment in Spark as a Spark UDF. Models with this flavor can be loaded as Python functions for performing inference. This flavor is always produced.

mlflow.mleap

Enables high-performance deployment outside of Spark by leveraging MLeap’s custom dataframe and pipeline representations. Models with this flavor cannot be loaded back as Python objects. Rather, they must be deserialized in Java using the mlflow/java package. This flavor is produced only if you specify MLeap-compatible arguments.

mlflow.spark.autolog(disable=False, silent=False)[source]

Enables (or disables) and configures logging of Spark datasource paths, versions (if applicable), and formats when they are read. This method is not threadsafe and assumes a SparkSession already exists with the mlflow-spark JAR attached. It should be called on the Spark driver, not on the executors (i.e. do not call this method within a function parallelized by Spark). This API requires Spark 3.0 or above.

Datasource information is cached in memory and logged to all subsequent MLflow runs, including the active MLflow run (if one exists when the data is read). Note that autologging of Spark ML (MLlib) models is not currently supported via this API. Datasource autologging is best-effort, meaning that if Spark is under heavy load or MLflow logging fails for any reason (e.g., if the MLflow server is unavailable), logging may be dropped.

For any unexpected issues with autologging, check Spark driver and executor logs in addition to stderr & stdout generated from your MLflow code - datasource information is pulled from Spark, so logs relevant to debugging may show up amongst the Spark logs.

Example
import mlflow.spark
import os
import shutil
from pyspark.sql import SparkSession
# Create and persist some dummy data
# Note: On environments like Databricks with pre-created SparkSessions,
# ensure the org.mlflow:mlflow-spark:1.11.0 is attached as a library to
# your cluster
spark = (SparkSession.builder
            .config("spark.jars.packages", "org.mlflow:mlflow-spark:1.11.0")
            .master("local[*]")
            .getOrCreate())
df = spark.createDataFrame([
        (4, "spark i j k"),
        (5, "l m n"),
        (6, "spark hadoop spark"),
        (7, "apache hadoop")], ["id", "text"])
import tempfile
tempdir = tempfile.mkdtemp()
df.write.csv(os.path.join(tempdir, "my-data-path"), header=True)
# Enable Spark datasource autologging.
mlflow.spark.autolog()
loaded_df = spark.read.csv(os.path.join(tempdir, "my-data-path"),
                header=True, inferSchema=True)
# Call toPandas() to trigger a read of the Spark datasource. Datasource info
# (path and format) is logged to the current active run, or the
# next-created MLflow run if no run is currently active
with mlflow.start_run() as active_run:
    pandas_df = loaded_df.toPandas()
Parameters
  • disable – If True, disables the Spark datasource autologging integration. If False, enables the Spark datasource autologging integration.

  • silent – If True, suppress all event logs and warnings from MLflow during Spark datasource autologging. If False, show all events and warnings during Spark datasource autologging.

mlflow.spark.get_default_conda_env()[source]
Returns

The default Conda environment for MLflow Models produced by calls to save_model() and log_model(). This Conda environment contains the current version of PySpark that is installed on the caller’s system. dev versions of PySpark are replaced with stable versions in the resulting Conda environment (e.g., if you are running PySpark version 2.4.5.dev0, invoking this method produces a Conda environment with a dependency on PySpark version 2.4.5).

mlflow.spark.get_default_pip_requirements()[source]
Returns

A list of default pip requirements for MLflow Models produced by this flavor. Calls to save_model() and log_model() produce a pip environment that, at minimum, contains these requirements.

mlflow.spark.load_model(model_uri, dfs_tmpdir=None, dst_path=None)[source]

Load the Spark MLlib model from the path.

Parameters
  • model_uri

    The location, in URI format, of the MLflow model, for example:

    • /Users/me/path/to/local/model

    • relative/path/to/local/model

    • s3://my_bucket/path/to/model

    • runs:/<mlflow_run_id>/run-relative/path/to/model

    • models:/<model_name>/<model_version>

    • models:/<model_name>/<stage>

    For more information about supported URI schemes, see Referencing Artifacts.

  • dfs_tmpdir – Temporary directory path on Distributed (Hadoop) File System (DFS) or local filesystem if running in local mode. The model is loaded from this destination. Defaults to /tmp/mlflow.

  • dst_path – The local filesystem path to which to download the model artifact. This directory must already exist. If unspecified, a local output path will be created.

Returns

pyspark.ml.pipeline.PipelineModel

Example
from mlflow import spark
model = mlflow.spark.load_model("spark-model")
# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")], ["id", "text"])
# Make predictions on test documents
prediction = model.transform(test)
mlflow.spark.log_model(spark_model, artifact_path, conda_env=None, code_paths=None, dfs_tmpdir=None, sample_input=None, registered_model_name=None, signature: mlflow.models.signature.ModelSignature = None, input_example: Union[pandas.core.frame.DataFrame, numpy.ndarray, dict, list, csr_matrix, csc_matrix] = None, await_registration_for=300, pip_requirements=None, extra_pip_requirements=None)[source]

Log a Spark MLlib model as an MLflow artifact for the current run. This uses the MLlib persistence format and produces an MLflow Model with the Spark flavor.

Note: If no run is active, it will instantiate a run to obtain a run_id.

Parameters
  • spark_model – Spark model to be saved - MLflow can only save descendants of pyspark.ml.Model or pyspark.ml.Transformer which implement MLReadable and MLWritable.

  • artifact_path – Run relative artifact path.

  • conda_env

    Either a dictionary representation of a Conda environment or the path to a Conda environment yaml file. If provided, this decsribes the environment this model should be run in. At minimum, it should specify the dependencies contained in get_default_conda_env(). If None, the default get_default_conda_env() environment is added to the model. The following is an example dictionary representation of a Conda environment:

    {
        'name': 'mlflow-env',
        'channels': ['defaults'],
        'dependencies': [
            'python=3.7.0',
            'pyspark=2.3.0'
        ]
    }
    

  • dfs_tmpdir – Temporary directory path on Distributed (Hadoop) File System (DFS) or local filesystem if running in local mode. The model is written in this destination and then copied into the model’s artifact directory. This is necessary as Spark ML models read from and write to DFS if running on a cluster. If this operation completes successfully, all temporary files created on the DFS are removed. Defaults to /tmp/mlflow.

  • sample_input – A sample input used to add the MLeap flavor to the model. This must be a PySpark DataFrame that the model can evaluate. If sample_input is None, the MLeap flavor is not added.

  • registered_model_name – If given, create a model version under registered_model_name, also creating a registered model if one with the given name does not exist.

  • signature

    ModelSignature describes model input and output Schema. The model signature can be inferred from datasets with valid model input (e.g. the training dataset with target column omitted) and valid model output (e.g. model predictions generated on the training dataset), for example:

    from mlflow.models.signature import infer_signature
    train = df.drop_column("target_label")
    predictions = ... # compute model predictions
    signature = infer_signature(train, predictions)
    

  • input_example – Input example provides one or several instances of valid model input. The example can be used as a hint of what data to feed the model. The given example will be converted to a Pandas DataFrame and then serialized to json using the Pandas split-oriented format. Bytes are base64-encoded.

  • await_registration_for – Number of seconds to wait for the model version to finish being created and is in READY status. By default, the function waits for five minutes. Specify 0 or None to skip waiting.

  • pip_requirements – Either an iterable of pip requirement strings (e.g. ["pyspark", "-r requirements.txt", "-c constraints.txt"]) or the string path to a pip requirements file on the local filesystem (e.g. "requirements.txt"). If provided, this describes the environment this model should be run in. If None, a default list of requirements is inferred by mlflow.models.infer_pip_requirements() from the current software environment. If the requirement inference fails, it falls back to using get_default_pip_requirements(). Both requirements and constraints are automatically parsed and written to requirements.txt and constraints.txt files, respectively, and stored as part of the model. Requirements are also written to the pip section of the model’s conda environment (conda.yaml) file.

  • extra_pip_requirements

    Either an iterable of pip requirement strings (e.g. ["pandas", "-r requirements.txt", "-c constraints.txt"]) or the string path to a pip requirements file on the local filesystem (e.g. "requirements.txt"). If provided, this describes additional pip requirements that are appended to a default set of pip requirements generated automatically based on the user’s current software environment. Both requirements and constraints are automatically parsed and written to requirements.txt and constraints.txt files, respectively, and stored as part of the model. Requirements are also written to the pip section of the model’s conda environment (conda.yaml) file.

    Warning

    The following arguments can’t be specified at the same time:

    • conda_env

    • pip_requirements

    • extra_pip_requirements

    This example demonstrates how to specify pip requirements using pip_requirements and extra_pip_requirements.

Returns

A ModelInfo instance that contains the metadata of the logged model.

Example
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0) ], ["id", "text", "label"])
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
mlflow.spark.log_model(model, "spark-model")
mlflow.spark.save_model(spark_model, path, mlflow_model=None, conda_env=None, code_paths=None, dfs_tmpdir=None, sample_input=None, signature: mlflow.models.signature.ModelSignature = None, input_example: Union[pandas.core.frame.DataFrame, numpy.ndarray, dict, list, csr_matrix, csc_matrix] = None, pip_requirements=None, extra_pip_requirements=None)[source]

Save a Spark MLlib Model to a local path.

By default, this function saves models using the Spark MLlib persistence mechanism. Additionally, if a sample input is specified using the sample_input parameter, the model is also serialized in MLeap format and the MLeap flavor is added.

Parameters
  • spark_model – Spark model to be saved - MLflow can only save descendants of pyspark.ml.Model or pyspark.ml.Transformer which implement MLReadable and MLWritable.

  • path – Local path where the model is to be saved.

  • mlflow_model – MLflow model config this flavor is being added to.

  • conda_env

    Either a dictionary representation of a Conda environment or the path to a Conda environment yaml file. If provided, this decsribes the environment this model should be run in. At minimum, it should specify the dependencies contained in get_default_conda_env(). If None, the default get_default_conda_env() environment is added to the model. The following is an example dictionary representation of a Conda environment:

    {
        'name': 'mlflow-env',
        'channels': ['defaults'],
        'dependencies': [
            'python=3.7.0',
            'pyspark=2.3.0'
        ]
    }
    

  • dfs_tmpdir – Temporary directory path on Distributed (Hadoop) File System (DFS) or local filesystem if running in local mode. The model is be written in this destination and then copied to the requested local path. This is necessary as Spark ML models read from and write to DFS if running on a cluster. All temporary files created on the DFS are removed if this operation completes successfully. Defaults to /tmp/mlflow.

  • sample_input – A sample input that is used to add the MLeap flavor to the model. This must be a PySpark DataFrame that the model can evaluate. If sample_input is None, the MLeap flavor is not added.

  • signature

    ModelSignature describes model input and output Schema. The model signature can be inferred from datasets with valid model input (e.g. the training dataset with target column omitted) and valid model output (e.g. model predictions generated on the training dataset), for example:

    from mlflow.models.signature import infer_signature
    train = df.drop_column("target_label")
    predictions = ... # compute model predictions
    signature = infer_signature(train, predictions)
    

  • input_example – Input example provides one or several instances of valid model input. The example can be used as a hint of what data to feed the model. The given example will be converted to a Pandas DataFrame and then serialized to json using the Pandas split-oriented format. Bytes are base64-encoded.

  • pip_requirements – Either an iterable of pip requirement strings (e.g. ["pyspark", "-r requirements.txt", "-c constraints.txt"]) or the string path to a pip requirements file on the local filesystem (e.g. "requirements.txt"). If provided, this describes the environment this model should be run in. If None, a default list of requirements is inferred by mlflow.models.infer_pip_requirements() from the current software environment. If the requirement inference fails, it falls back to using get_default_pip_requirements(). Both requirements and constraints are automatically parsed and written to requirements.txt and constraints.txt files, respectively, and stored as part of the model. Requirements are also written to the pip section of the model’s conda environment (conda.yaml) file.

  • extra_pip_requirements

    Either an iterable of pip requirement strings (e.g. ["pandas", "-r requirements.txt", "-c constraints.txt"]) or the string path to a pip requirements file on the local filesystem (e.g. "requirements.txt"). If provided, this describes additional pip requirements that are appended to a default set of pip requirements generated automatically based on the user’s current software environment. Both requirements and constraints are automatically parsed and written to requirements.txt and constraints.txt files, respectively, and stored as part of the model. Requirements are also written to the pip section of the model’s conda environment (conda.yaml) file.

    Warning

    The following arguments can’t be specified at the same time:

    • conda_env

    • pip_requirements

    • extra_pip_requirements

    This example demonstrates how to specify pip requirements using pip_requirements and extra_pip_requirements.

Example
from mlflow import spark
from pyspark.ml.pipeline.PipelineModel

# your pyspark.ml.pipeline.PipelineModel type
model = ...
mlflow.spark.save_model(model, "spark-model")