Source code for mlflow.tracking._tracking_service.utils

import os
from functools import partial
import logging
from pathlib import Path
from typing import Union
from contextlib import contextmanager

from mlflow.environment_variables import MLFLOW_TRACKING_AWS_SIGV4
from mlflow.store.tracking import DEFAULT_LOCAL_FILE_AND_ARTIFACT_PATH
from mlflow.store.db.db_types import DATABASE_ENGINES
from mlflow.store.tracking.file_store import FileStore
from mlflow.store.tracking.rest_store import RestStore
from mlflow.tracking._tracking_service.registry import TrackingStoreRegistry
from mlflow.utils import env, rest_utils
from mlflow.utils.file_utils import path_to_local_file_uri
from mlflow.utils.databricks_utils import get_databricks_host_creds
from mlflow.utils.uri import _DATABRICKS_UNITY_CATALOG_SCHEME

_TRACKING_URI_ENV_VAR = "MLFLOW_TRACKING_URI"

# Extra environment variables which take precedence for setting the basic/bearer
# auth on http requests.
_TRACKING_USERNAME_ENV_VAR = "MLFLOW_TRACKING_USERNAME"
_TRACKING_PASSWORD_ENV_VAR = "MLFLOW_TRACKING_PASSWORD"
_TRACKING_TOKEN_ENV_VAR = "MLFLOW_TRACKING_TOKEN"

# sets verify param of 'requests.request' function
# see https://requests.readthedocs.io/en/master/api/
_TRACKING_INSECURE_TLS_ENV_VAR = "MLFLOW_TRACKING_INSECURE_TLS"
_TRACKING_SERVER_CERT_PATH_ENV_VAR = "MLFLOW_TRACKING_SERVER_CERT_PATH"

# sets cert param of 'requests.request' function
# see https://requests.readthedocs.io/en/master/api/
_TRACKING_CLIENT_CERT_PATH_ENV_VAR = "MLFLOW_TRACKING_CLIENT_CERT_PATH"

_logger = logging.getLogger(__name__)
_tracking_uri = None


[docs]def is_tracking_uri_set(): """Returns True if the tracking URI has been set, False otherwise.""" if _tracking_uri or env.get_env(_TRACKING_URI_ENV_VAR): return True return False
[docs]def set_tracking_uri(uri: Union[str, Path]) -> None: """ Set the tracking server URI. This does not affect the currently active run (if one exists), but takes effect for successive runs. :param uri: - An empty string, or a local file path, prefixed with ``file:/``. Data is stored locally at the provided file (or ``./mlruns`` if empty). - An HTTP URI like ``https://my-tracking-server:5000``. - A Databricks workspace, provided as the string "databricks" or, to use a Databricks CLI `profile <https://github.com/databricks/databricks-cli#installation>`_, "databricks://<profileName>". - A :py:class:`pathlib.Path` instance .. test-code-block:: python :caption: Example import mlflow mlflow.set_tracking_uri("file:///tmp/my_tracking") tracking_uri = mlflow.get_tracking_uri() print("Current tracking uri: {}".format(tracking_uri)) .. code-block:: text :caption: Output Current tracking uri: file:///tmp/my_tracking """ if isinstance(uri, Path): # On Windows with Python3.8 (https://bugs.python.org/issue38671) # .resolve() doesn't return the absolute path if the directory doesn't exist # so we're calling .absolute() first to get the absolute path on Windows, # then .resolve() to clean the path uri = uri.absolute().resolve().as_uri() global _tracking_uri _tracking_uri = uri
@contextmanager def _use_tracking_uri(uri: str, local_store_root_path: str = None) -> None: """ Similar to `mlflow.tracking.set_tracking_uri` function but return a context manager. :param uri: tracking URI to use. :param local_store_root_path: the local store root path for the tracking URI. """ global _tracking_uri cwd = os.getcwd() old_tracking_uri = _tracking_uri try: if local_store_root_path is not None: os.chdir(local_store_root_path) _tracking_uri = uri yield finally: _tracking_uri = old_tracking_uri os.chdir(cwd) def _resolve_tracking_uri(tracking_uri=None): return tracking_uri or get_tracking_uri()
[docs]def get_tracking_uri() -> str: """ Get the current tracking URI. This may not correspond to the tracking URI of the currently active run, since the tracking URI can be updated via ``set_tracking_uri``. :return: The tracking URI. .. code-block:: python :caption: Example import mlflow # Get the current tracking uri tracking_uri = mlflow.get_tracking_uri() print("Current tracking uri: {}".format(tracking_uri)) .. code-block:: text :caption: Output Current tracking uri: file:///.../mlruns """ global _tracking_uri if _tracking_uri is not None: return _tracking_uri elif env.get_env(_TRACKING_URI_ENV_VAR) is not None: return env.get_env(_TRACKING_URI_ENV_VAR) else: return path_to_local_file_uri(os.path.abspath(DEFAULT_LOCAL_FILE_AND_ARTIFACT_PATH))
def _get_file_store(store_uri, **_): return FileStore(store_uri, store_uri) def _get_sqlalchemy_store(store_uri, artifact_uri): from mlflow.store.tracking.sqlalchemy_store import SqlAlchemyStore if artifact_uri is None: artifact_uri = DEFAULT_LOCAL_FILE_AND_ARTIFACT_PATH return SqlAlchemyStore(store_uri, artifact_uri) def _get_default_host_creds(store_uri): return rest_utils.MlflowHostCreds( host=store_uri, username=os.environ.get(_TRACKING_USERNAME_ENV_VAR), password=os.environ.get(_TRACKING_PASSWORD_ENV_VAR), token=os.environ.get(_TRACKING_TOKEN_ENV_VAR), aws_sigv4=MLFLOW_TRACKING_AWS_SIGV4.get(), ignore_tls_verification=os.environ.get(_TRACKING_INSECURE_TLS_ENV_VAR) == "true", client_cert_path=os.environ.get(_TRACKING_CLIENT_CERT_PATH_ENV_VAR), server_cert_path=os.environ.get(_TRACKING_SERVER_CERT_PATH_ENV_VAR), ) def _get_rest_store(store_uri, **_): return RestStore(partial(_get_default_host_creds, store_uri)) def _get_databricks_rest_store(store_uri, **_): return RestStore(partial(get_databricks_host_creds, store_uri)) def _get_databricks_uc_rest_store(store_uri, **_): from mlflow.exceptions import MlflowException from mlflow.version import VERSION global _tracking_store_registry supported_schemes = [ scheme for scheme in _tracking_store_registry._registry if scheme != _DATABRICKS_UNITY_CATALOG_SCHEME ] raise MlflowException( f"Detected Unity Catalog tracking URI '{store_uri}'. " "Setting the tracking URI to a Unity Catalog backend is not supported in the current " f"version of the MLflow client ({VERSION}). " "Please specify a different tracking URI via mlflow.set_tracking_uri, with " "one of the supported schemes: " f"{supported_schemes}. If you're trying to access models in the Unity " "Catalog, please upgrade to the latest version of the MLflow Python " "client, then specify a Unity Catalog model registry URI via " f"mlflow.set_registry_uri('{_DATABRICKS_UNITY_CATALOG_SCHEME}') or " f"mlflow.set_registry_uri('{_DATABRICKS_UNITY_CATALOG_SCHEME}://profile_name'), where " "'profile_name' is the name of the Databricks CLI profile to use for " "authentication. Be sure to leave the tracking URI configured to use " "one of the supported schemes listed above." ) _tracking_store_registry = TrackingStoreRegistry() _tracking_store_registry.register("", _get_file_store) _tracking_store_registry.register("file", _get_file_store) _tracking_store_registry.register("databricks", _get_databricks_rest_store) _tracking_store_registry.register(_DATABRICKS_UNITY_CATALOG_SCHEME, _get_databricks_uc_rest_store) for scheme in ["http", "https"]: _tracking_store_registry.register(scheme, _get_rest_store) for scheme in DATABASE_ENGINES: _tracking_store_registry.register(scheme, _get_sqlalchemy_store) _tracking_store_registry.register_entrypoints() def _get_store(store_uri=None, artifact_uri=None): return _tracking_store_registry.get_store(store_uri, artifact_uri) # TODO(sueann): move to a projects utils module def _get_git_url_if_present(uri): """ Return the path git_uri#sub_directory if the URI passed is a local path that's part of a Git repo, or returns the original URI otherwise. :param uri: The expanded uri :return: The git_uri#sub_directory if the uri is part of a Git repo, otherwise return the original uri """ if "#" in uri: # Already a URI in git repo format return uri try: from git import Repo, InvalidGitRepositoryError, GitCommandNotFound, NoSuchPathError except ImportError as e: _logger.warning( "Failed to import Git (the git executable is probably not on your PATH)," " so Git SHA is not available. Error: %s", e, ) return uri try: # Check whether this is part of a git repo repo = Repo(uri, search_parent_directories=True) # Repo url repo_url = "file://%s" % repo.working_tree_dir # Sub directory rlpath = uri.replace(repo.working_tree_dir, "") if rlpath == "": git_path = repo_url elif rlpath[0] == "/": git_path = repo_url + "#" + rlpath[1:] else: git_path = repo_url + "#" + rlpath return git_path except (InvalidGitRepositoryError, GitCommandNotFound, ValueError, NoSuchPathError): return uri