"""
The ``mlflow.projects`` module provides an API for running MLflow projects locally or remotely.
"""
import json
import logging
import os
import yaml
import mlflow.projects.databricks
import mlflow.utils.uri
from mlflow import tracking
from mlflow.entities import RunStatus
from mlflow.exceptions import ExecutionException, MlflowException
from mlflow.projects.backend import loader
from mlflow.projects.submitted_run import SubmittedRun
from mlflow.projects.utils import (
    MLFLOW_LOCAL_BACKEND_RUN_ID_CONFIG,
    PROJECT_BUILD_IMAGE,
    PROJECT_DOCKER_ARGS,
    PROJECT_DOCKER_AUTH,
    PROJECT_ENV_MANAGER,
    PROJECT_STORAGE_DIR,
    PROJECT_SYNCHRONOUS,
    fetch_and_validate_project,
    get_entry_point_command,
    get_or_create_run,
    get_run_env_vars,
    load_project,
)
from mlflow.tracking.fluent import _get_experiment_id
from mlflow.utils import env_manager as _EnvManager
from mlflow.utils.mlflow_tags import (
    MLFLOW_DOCKER_IMAGE_ID,
    MLFLOW_PROJECT_BACKEND,
    MLFLOW_PROJECT_ENV,
    MLFLOW_RUN_NAME,
)
_logger = logging.getLogger(__name__)
def _resolve_experiment_id(experiment_name=None, experiment_id=None):
    """
    Resolve experiment.
    Verifies either one or other is specified - cannot be both selected.
    If ``experiment_name`` is provided and does not exist, an experiment
    of that name is created and its id is returned.
    Args:
        experiment_name: Name of experiment under which to launch the run.
        experiment_id: ID of experiment under which to launch the run.
    Returns:
        str
    """
    if experiment_name and experiment_id:
        raise MlflowException("Specify only one of 'experiment_name' or 'experiment_id'.")
    if experiment_id:
        return str(experiment_id)
    if experiment_name:
        client = tracking.MlflowClient()
        exp = client.get_experiment_by_name(experiment_name)
        if exp:
            return exp.experiment_id
        else:
            _logger.info("'%s' does not exist. Creating a new experiment", experiment_name)
            return client.create_experiment(experiment_name)
    return _get_experiment_id()
def _run(
    uri,
    experiment_id,
    entry_point,
    version,
    parameters,
    docker_args,
    backend_name,
    backend_config,
    storage_dir,
    env_manager,
    synchronous,
    run_name,
    build_image,
    docker_auth,
):
    """
    Helper that delegates to the project-running method corresponding to the passed-in backend.
    Returns a ``SubmittedRun`` corresponding to the project run.
    """
    tracking_store_uri = tracking.get_tracking_uri()
    backend_config[PROJECT_ENV_MANAGER] = env_manager
    backend_config[PROJECT_SYNCHRONOUS] = synchronous
    backend_config[PROJECT_DOCKER_ARGS] = docker_args
    backend_config[PROJECT_STORAGE_DIR] = storage_dir
    backend_config[PROJECT_BUILD_IMAGE] = build_image
    backend_config[PROJECT_DOCKER_AUTH] = docker_auth
    # TODO: remove this check once kubernetes execution has been refactored
    if backend_name not in {"databricks", "kubernetes"}:
        backend = loader.load_backend(backend_name)
        if backend:
            submitted_run = backend.run(
                uri,
                entry_point,
                parameters,
                version,
                backend_config,
                tracking_store_uri,
                experiment_id,
            )
            tracking.MlflowClient().set_tag(
                submitted_run.run_id, MLFLOW_PROJECT_BACKEND, backend_name
            )
            if run_name is not None:
                tracking.MlflowClient().set_tag(submitted_run.run_id, MLFLOW_RUN_NAME, run_name)
            return submitted_run
    work_dir = fetch_and_validate_project(uri, version, entry_point, parameters)
    project = load_project(work_dir)
    _validate_execution_environment(project, backend_name)
    active_run = get_or_create_run(
        None, uri, experiment_id, work_dir, version, entry_point, parameters
    )
    if run_name is not None:
        tracking.MlflowClient().set_tag(active_run.info.run_id, MLFLOW_RUN_NAME, run_name)
    if backend_name == "databricks":
        tracking.MlflowClient().set_tag(
            active_run.info.run_id, MLFLOW_PROJECT_BACKEND, "databricks"
        )
        from mlflow.projects.databricks import run_databricks, run_databricks_spark_job
        if project.databricks_spark_job_spec is not None:
            return run_databricks_spark_job(
                remote_run=active_run,
                uri=uri,
                work_dir=work_dir,
                experiment_id=experiment_id,
                cluster_spec=backend_config,
                project_spec=project,
                entry_point=entry_point,
                parameters=parameters,
            )
        return run_databricks(
            remote_run=active_run,
            uri=uri,
            entry_point=entry_point,
            work_dir=work_dir,
            parameters=parameters,
            experiment_id=experiment_id,
            cluster_spec=backend_config,
            env_manager=env_manager,
        )
    elif backend_name == "kubernetes":
        from mlflow.projects import kubernetes as kb
        from mlflow.projects.docker import (
            build_docker_image,
            validate_docker_env,
            validate_docker_installation,
        )
        tracking.MlflowClient().set_tag(active_run.info.run_id, MLFLOW_PROJECT_ENV, "docker")
        tracking.MlflowClient().set_tag(
            active_run.info.run_id, MLFLOW_PROJECT_BACKEND, "kubernetes"
        )
        validate_docker_env(project)
        validate_docker_installation()
        kube_config = _parse_kubernetes_config(backend_config)
        image = build_docker_image(
            work_dir=work_dir,
            repository_uri=kube_config["repository-uri"],
            base_image=project.docker_env.get("image"),
            run_id=active_run.info.run_id,
            build_image=build_image,
            docker_auth=docker_auth,
        )
        image_digest = kb.push_image_to_registry(image.tags[0])
        tracking.MlflowClient().set_tag(
            active_run.info.run_id, MLFLOW_DOCKER_IMAGE_ID, image_digest
        )
        return kb.run_kubernetes_job(
            project.name,
            active_run,
            image.tags[0],
            image_digest,
            get_entry_point_command(project, entry_point, parameters, storage_dir),
            get_run_env_vars(
                run_id=active_run.info.run_id, experiment_id=active_run.info.experiment_id
            ),
            kube_config.get("kube-context", None),
            kube_config["kube-job-template"],
        )
    supported_backends = ["databricks", "kubernetes"] + list(loader.MLFLOW_BACKENDS.keys())
    raise ExecutionException(
        f"Got unsupported execution mode {backend_name}. Supported values: {supported_backends}"
    )
[docs]def run(
    uri,
    entry_point="main",
    version=None,
    parameters=None,
    docker_args=None,
    experiment_name=None,
    experiment_id=None,
    backend="local",
    backend_config=None,
    storage_dir=None,
    synchronous=True,
    run_id=None,
    run_name=None,
    env_manager=None,
    build_image=False,
    docker_auth=None,
):
    """
    Run an MLflow project. The project can be local or stored at a Git URI.
    MLflow provides built-in support for running projects locally or remotely on a Databricks or
    Kubernetes cluster. You can also run projects against other targets by installing an appropriate
    third-party plugin. See `Community Plugins <../plugins.html#community-plugins>`_ for more
    information.
    For information on using this method in chained workflows, see `Building Multistep Workflows
    <../projects.html#building-multistep-workflows>`_.
    Raises:
        :py:class:`mlflow.exceptions.ExecutionException` If a run launched in blocking mode
            is unsuccessful.
    Args:
        uri: URI of project to run. A local filesystem path
            or a Git repository URI (e.g. https://github.com/mlflow/mlflow-example)
            pointing to a project directory containing an MLproject file.
        entry_point: Entry point to run within the project. If no entry point with the specified
            name is found, runs the project file ``entry_point`` as a script,
            using "python" to run ``.py`` files and the default shell (specified by
            environment variable ``$SHELL``) to run ``.sh`` files.
        version: For Git-based projects, either a commit hash or a branch name.
        parameters: Parameters (dictionary) for the entry point command.
        docker_args: Arguments (dictionary) for the docker command.
        experiment_name: Name of experiment under which to launch the run.
        experiment_id: ID of experiment under which to launch the run.
        backend: Execution backend for the run: MLflow provides built-in support for "local",
            "databricks", and "kubernetes" (experimental) backends. If running against
            Databricks, will run against a Databricks workspace determined as follows:
            if a Databricks tracking URI of the form ``databricks://profile`` has been set
            (e.g. by setting the MLFLOW_TRACKING_URI environment variable), will run
            against the workspace specified by <profile>. Otherwise, runs against the
            workspace specified by the default Databricks CLI profile.
        backend_config: A dictionary, or a path to a JSON file (must end in '.json'), which will
            be passed as config to the backend. The exact content which should be
            provided is different for each execution backend and is documented
            at https://www.mlflow.org/docs/latest/projects.html.
        storage_dir: Used only if ``backend`` is "local". MLflow downloads artifacts from
            distributed URIs passed to parameters of type ``path`` to subdirectories of
            ``storage_dir``.
        synchronous: Whether to block while waiting for a run to complete. Defaults to True.
            Note that if ``synchronous`` is False and ``backend`` is "local", this
            method will return, but the current process will block when exiting until
            the local run completes. If the current process is interrupted, any
            asynchronous runs launched via this method will be terminated. If
            ``synchronous`` is True and the run fails, the current process will
            error out as well.
        run_id: Note: this argument is used internally by the MLflow project APIs and should
            not be specified. If specified, the run ID will be used instead of
            creating a new run.
        run_name: The name to give the MLflow Run associated with the project execution.
            If ``None``, the MLflow Run name is left unset.
        env_manager: Specify an environment manager to create a new environment for the run and
            install project dependencies within that environment. The following values
            are supported:
            - local: use the local environment
            - virtualenv: use virtualenv (and pyenv for Python version management)
            - conda: use conda
            If unspecified, MLflow automatically determines the environment manager to
            use by inspecting files in the project directory. For example, if
            ``python_env.yaml`` is present, virtualenv will be used.
        build_image: Whether to build a new docker image of the project or to reuse an existing
            image. Default: False (reuse an existing image)
        docker_auth: A dictionary representing information to authenticate with a Docker
            registry. See `docker.client.DockerClient.login
            <https://docker-py.readthedocs.io/en/stable/client.html#docker.client.DockerClient.login>`_
            for available options.
    Returns:
        :py:class:`mlflow.projects.SubmittedRun` exposing information (e.g. run ID)
        about the launched run.
    .. code-block:: python
        :caption: Example
        import mlflow
        project_uri = "https://github.com/mlflow/mlflow-example"
        params = {"alpha": 0.5, "l1_ratio": 0.01}
        # Run MLflow project and create a reproducible conda environment
        # on a local host
        mlflow.run(project_uri, parameters=params)
    .. code-block:: text
        :caption: Output
        ...
        ...
        Elasticnet model (alpha=0.500000, l1_ratio=0.010000):
        RMSE: 0.788347345611717
        MAE: 0.6155576449938276
        R2: 0.19729662005412607
        ... mlflow.projects: === Run (ID '6a5109febe5e4a549461e149590d0a7c') succeeded ===
    """
    backend_config_dict = backend_config if backend_config is not None else {}
    if (
        backend_config
        and type(backend_config) != dict
        and os.path.splitext(backend_config)[-1] == ".json"
    ):
        with open(backend_config) as handle:
            try:
                backend_config_dict = json.load(handle)
            except ValueError:
                _logger.error(
                    "Error when attempting to load and parse JSON cluster spec from file %s",
                    backend_config,
                )
                raise
    if env_manager is not None:
        _EnvManager.validate(env_manager)
    if backend == "databricks":
        mlflow.projects.databricks.before_run_validations(mlflow.get_tracking_uri(), backend_config)
    elif backend == "local" and run_id is not None:
        backend_config_dict[MLFLOW_LOCAL_BACKEND_RUN_ID_CONFIG] = run_id
    experiment_id = _resolve_experiment_id(
        experiment_name=experiment_name, experiment_id=experiment_id
    )
    submitted_run_obj = _run(
        uri=uri,
        experiment_id=experiment_id,
        entry_point=entry_point,
        version=version,
        parameters=parameters,
        docker_args=docker_args,
        backend_name=backend,
        backend_config=backend_config_dict,
        env_manager=env_manager,
        storage_dir=storage_dir,
        synchronous=synchronous,
        run_name=run_name,
        build_image=build_image,
        docker_auth=docker_auth,
    )
    if synchronous:
        _wait_for(submitted_run_obj)
    return submitted_run_obj 
def _wait_for(submitted_run_obj):
    """Wait on the passed-in submitted run, reporting its status to the tracking server."""
    run_id = submitted_run_obj.run_id
    active_run = None
    # Note: there's a small chance we fail to report the run's status to the tracking server if
    # we're interrupted before we reach the try block below
    try:
        active_run = tracking.MlflowClient().get_run(run_id) if run_id is not None else None
        if submitted_run_obj.wait():
            _logger.info("=== Run (ID '%s') succeeded ===", run_id)
            _maybe_set_run_terminated(active_run, "FINISHED")
        else:
            _maybe_set_run_terminated(active_run, "FAILED")
            raise ExecutionException(f"Run (ID '{run_id}') failed")
    except KeyboardInterrupt:
        _logger.error("=== Run (ID '%s') interrupted, cancelling run ===", run_id)
        submitted_run_obj.cancel()
        _maybe_set_run_terminated(active_run, "FAILED")
        raise
def _maybe_set_run_terminated(active_run, status):
    """
    If the passed-in active run is defined and still running (i.e. hasn't already been terminated
    within user code), mark it as terminated with the passed-in status.
    """
    if active_run is None:
        return
    run_id = active_run.info.run_id
    cur_status = tracking.MlflowClient().get_run(run_id).info.status
    if RunStatus.is_terminated(cur_status):
        return
    tracking.MlflowClient().set_terminated(run_id, status)
def _validate_execution_environment(project, backend):
    if project.docker_env and backend == "databricks":
        raise ExecutionException(
            "Running docker-based projects on Databricks is not yet supported."
        )
def _parse_kubernetes_config(backend_config):
    """
    Creates build context tarfile containing Dockerfile and project code, returning path to tarfile
    """
    if not backend_config:
        raise ExecutionException("Backend_config file not found.")
    kube_config = backend_config.copy()
    if "kube-job-template-path" not in backend_config.keys():
        raise ExecutionException(
            "'kube-job-template-path' attribute must be specified in backend_config."
        )
    kube_job_template = backend_config["kube-job-template-path"]
    if os.path.exists(kube_job_template):
        with open(kube_job_template) as job_template:
            yaml_obj = yaml.safe_load(job_template.read())
        kube_job_template = yaml_obj
        kube_config["kube-job-template"] = kube_job_template
    else:
        raise ExecutionException(f"Could not find 'kube-job-template-path': {kube_job_template}")
    if "kube-context" not in backend_config.keys():
        _logger.debug(
            "Could not find kube-context in backend_config."
            " Using current context or in-cluster config."
        )
    if "repository-uri" not in backend_config.keys():
        raise ExecutionException("Could not find 'repository-uri' in backend_config.")
    return kube_config
__all__ = ["run", "SubmittedRun"]