Source code for mlflow.genai.optimize.optimize

import logging
import uuid
from concurrent.futures import ThreadPoolExecutor
from contextlib import nullcontext
from typing import TYPE_CHECKING, Any, Callable

import mlflow
from mlflow.entities.model_registry import PromptVersion
from mlflow.environment_variables import MLFLOW_GENAI_EVAL_MAX_WORKERS
from mlflow.exceptions import MlflowException
from mlflow.genai.evaluation.utils import (
    _convert_eval_set_to_df,
)
from mlflow.genai.optimize.optimizers import BasePromptOptimizer
from mlflow.genai.optimize.types import (
    AggregationFn,
    EvaluationResultRecord,
    PromptOptimizationResult,
)
from mlflow.genai.optimize.util import (
    create_metric_from_scorers,
    prompt_optimization_autolog,
    validate_train_data,
)
from mlflow.genai.prompts import load_prompt, register_prompt
from mlflow.genai.scorers import Scorer
from mlflow.genai.utils.trace_utils import convert_predict_fn
from mlflow.prompt.constants import PROMPT_TEXT_TAG_KEY
from mlflow.telemetry.events import PromptOptimizationEvent
from mlflow.telemetry.track import record_usage_event
from mlflow.utils import gorilla
from mlflow.utils.annotations import experimental
from mlflow.utils.autologging_utils.safety import _wrap_patch

if TYPE_CHECKING:
    from mlflow.genai.evaluation.utils import EvaluationDatasetTypes

_logger = logging.getLogger(__name__)


[docs]@experimental(version="3.5.0") @record_usage_event(PromptOptimizationEvent) def optimize_prompts( *, predict_fn: Callable[..., Any], train_data: "EvaluationDatasetTypes", prompt_uris: list[str], optimizer: BasePromptOptimizer, scorers: list[Scorer], aggregation: AggregationFn | None = None, enable_tracking: bool = True, ) -> PromptOptimizationResult: """ Automatically optimize prompts using evaluation metrics and training data. This function uses the provided optimization algorithm to improve prompt quality based on your evaluation criteria and dataset. Args: predict_fn: a target function that uses the prompts to be optimized. The callable should receive inputs as keyword arguments and return the response. The function should use MLflow prompt registry and call `PromptVersion.format` during execution in order for this API to optimize the prompt. This function should return the same type as the outputs in the dataset. train_data: an evaluation dataset used for the optimization. It should include the inputs and outputs fields with dict values. The data must be one of the following formats: * An EvaluationDataset entity * Pandas DataFrame * Spark DataFrame * List of dictionaries The dataset must include the following columns: - inputs: A column containing single inputs in dict format. Each input should contain keys matching the variables in the prompt template. - outputs: A column containing an output for each input that the predict_fn should produce. prompt_uris: a list of prompt uris to be optimized. The prompt templates should be used by the predict_fn. optimizer: a prompt optimizer object that optimizes a set of prompts with the training dataset and scorers. For example, GepaPromptOptimizer(reflection_model="openai:/gpt-4o"). scorers: List of scorers that evaluate the inputs, outputs and expectations. Required parameter. Use builtin scorers like Equivalence or Correctness, or define custom scorers with the @scorer decorator. aggregation: A callable that computes the overall performance metric from individual scorer outputs. Takes a dict mapping scorer names to scores and returns a float value (greater is better). If None and all scorers return numerical values, uses sum of scores by default. enable_tracking: If True (default), automatically creates an MLflow run if no active run exists and logs the following information: - The optimization scores (initial, final, improvement) - Links to the optimized prompt versions - The optimizer name and parameters - Optimization progress If False, no MLflow run is created and no tracking occurs. Returns: The optimization result object that includes the optimized prompts as a list of prompt versions, evaluation scores, and the optimizer name. Examples: .. code-block:: python import mlflow import openai from mlflow.genai.optimize.optimizers import GepaPromptOptimizer from mlflow.genai.scorers import Correctness prompt = mlflow.genai.register_prompt( name="qa", template="Answer the following question: {{question}}", ) def predict_fn(question: str) -> str: completion = openai.OpenAI().chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt.format(question=question)}], ) return completion.choices[0].message.content dataset = [ {"inputs": {"question": "What is the capital of France?"}, "outputs": "Paris"}, {"inputs": {"question": "What is the capital of Germany?"}, "outputs": "Berlin"}, ] result = mlflow.genai.optimize_prompts( predict_fn=predict_fn, train_data=dataset, prompt_uris=[prompt.uri], optimizer=GepaPromptOptimizer(reflection_model="openai:/gpt-4o"), scorers=[Correctness(model="openai:/gpt-4o")], ) print(result.optimized_prompts[0].template) **Example: Using custom scorers with an objective function** .. code-block:: python import mlflow from mlflow.genai.optimize.optimizers import GepaPromptOptimizer from mlflow.genai.scorers import scorer # Define custom scorers @scorer(name="accuracy") def accuracy_scorer(outputs, expectations): return 1.0 if outputs.lower() == expectations.lower() else 0.0 @scorer(name="brevity") def brevity_scorer(outputs): # Prefer shorter outputs (max 50 chars gets score of 1.0) return min(1.0, 50 / max(len(outputs), 1)) # Define objective to combine scores def weighted_objective(scores): return 0.7 * scores["accuracy"] + 0.3 * scores["brevity"] result = mlflow.genai.optimize_prompts( predict_fn=predict_fn, train_data=dataset, prompt_uris=[prompt.uri], optimizer=GepaPromptOptimizer(reflection_model="openai:/gpt-4o"), scorers=[accuracy_scorer, brevity_scorer], aggregation=weighted_objective, ) """ train_data_df = _convert_eval_set_to_df(train_data) converted_train_data = train_data_df.to_dict("records") validate_train_data(converted_train_data) predict_fn = convert_predict_fn( predict_fn=predict_fn, sample_input=converted_train_data[0]["inputs"] ) metric_fn = create_metric_from_scorers(scorers, aggregation) eval_fn = _build_eval_fn(predict_fn, metric_fn) target_prompts = [load_prompt(prompt_uri) for prompt_uri in prompt_uris] if not all(prompt.is_text_prompt for prompt in target_prompts): raise MlflowException("Only text prompts can be optimized") target_prompts_dict = {prompt.name: prompt.template for prompt in target_prompts} with ( prompt_optimization_autolog( optimizer_name=optimizer.__class__.__name__, num_prompts=len(target_prompts), num_training_samples=len(converted_train_data), train_data_df=train_data_df, ) if enable_tracking else nullcontext({}) ) as log_results: optimizer_output = optimizer.optimize( eval_fn, converted_train_data, target_prompts_dict, enable_tracking ) optimized_prompts = [ register_prompt(name=prompt_name, template=prompt) for prompt_name, prompt in optimizer_output.optimized_prompts.items() ] log_results["optimizer_output"] = optimizer_output log_results["optimized_prompts"] = optimized_prompts return PromptOptimizationResult( optimized_prompts=optimized_prompts, optimizer_name=optimizer.__class__.__name__, initial_eval_score=optimizer_output.initial_eval_score, final_eval_score=optimizer_output.final_eval_score, )
def _build_eval_fn( predict_fn: Callable[..., Any], metric_fn: Callable[[dict[str, Any], dict[str, Any], dict[str, Any]], float], ) -> Callable[[dict[str, str], list[dict[str, Any]]], list[EvaluationResultRecord]]: """ Build an evaluation function that uses the candidate prompts to evaluate the predict_fn. Args: predict_fn: The function to evaluate metric_fn: Metric function created from scorers that takes (inputs, outputs, expectations) Returns: An evaluation function """ from mlflow.pyfunc import Context, set_prediction_context def eval_fn( candidate_prompts: dict[str, str], dataset: list[dict[str, Any]] ) -> list[EvaluationResultRecord]: used_prompts = set() @property def _template_patch(self) -> str: template_name = self.name if template_name in candidate_prompts: used_prompts.add(template_name) return candidate_prompts[template_name] return self._tags.get(PROMPT_TEXT_TAG_KEY, "") patch = _wrap_patch(PromptVersion, "template", _template_patch) def _run_single(record: dict[str, Any]): inputs = record["inputs"] # use expectations if provided, otherwise use outputs outputs = record.get("expectations") or {"expected_response": record.get("outputs")} eval_request_id = str(uuid.uuid4()) # set prediction context to retrieve the trace by the request id, # and set is_evaluate to True to disable async trace logging with set_prediction_context(Context(request_id=eval_request_id, is_evaluate=True)): try: program_outputs = predict_fn(inputs) except Exception as e: program_outputs = f"Failed to invoke the predict_fn with {inputs}: {e}" trace = mlflow.get_trace(eval_request_id, silent=True) # Use metric function created from scorers score = metric_fn(inputs=inputs, outputs=program_outputs, expectations=outputs) return EvaluationResultRecord( inputs=inputs, outputs=program_outputs, expectations=outputs, score=score, trace=trace, ) try: with ThreadPoolExecutor( max_workers=MLFLOW_GENAI_EVAL_MAX_WORKERS.get(), thread_name_prefix="MLflowPromptOptimization", ) as executor: futures = [executor.submit(_run_single, record) for record in dataset] results = [future.result() for future in futures] # Check for unused prompts and warn if unused_prompts := set(candidate_prompts.keys()) - used_prompts: _logger.warning( "The following prompts were not used during evaluation: " f"{sorted(unused_prompts)}. This may indicate that predict_fn is " "not calling format() for these prompts, or the prompt names don't match. " "Please verify that your predict_fn uses all prompts specified in prompt_uris." ) return results finally: gorilla.revert(patch) return eval_fn