diff --git a/autogen/structure/__init__.py b/autogen/structure/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/autogen/structure/models/__init__.py b/autogen/structure/models/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/autogen/structure/models/field_model.py b/autogen/structure/models/field_model.py new file mode 100644 index 0000000000..8733526d1a --- /dev/null +++ b/autogen/structure/models/field_model.py @@ -0,0 +1,120 @@ +# Copyright (c) 2023 - 2024, Owners of https://github.com/ag2ai +# +# SPDX-License-Identifier: Apache-2.0 +# +# Portions derived from https://github.com/lion-agi/lion-core are under the Apache-2.0 License +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any, Callable + +from pydantic import BaseModel, ConfigDict, Field, field_validator +from pydantic.fields import FieldInfo +from pydantic_core import PydanticUndefined + + +class FieldModel(BaseModel): + """Model for defining and managing field definitions. + + Provides a structured way to define fields with: + - Type annotations and validation + - Default values and factories + - Documentation and metadata + - Serialization options + + Example: + ```python + field = FieldModel( + name="age", + annotation=int, + default=0, + description="User age in years", + validator=lambda v: v if v >= 0 else 0 + ) + ``` + + Attributes: + default: Default field value + default_factory: Function to generate default value + title: Field title for documentation + description: Field description + examples: Example values + validators: Validation functions + exclude: Exclude from serialization + deprecated: Mark as deprecated + frozen: Mark as immutable + alias: Alternative field name + alias_priority: Priority for alias resolution + name: Field name (required) + annotation: Type annotation + validator: Validation function + validator_kwargs: Validator parameters + + Notes: + - All attributes except 'name' can be UNDEFINED + - validator_kwargs are passed to field_validator decorator + - Cannot have both default and default_factory + """ + + model_config = ConfigDict( + extra="allow", + validate_default=False, + arbitrary_types_allowed=True, + use_enum_values=True, + ) + + # Field configuration attributes + default: Any = PydanticUndefined # Default value + default_factory: Callable = PydanticUndefined # Factory function for default value + title: str = PydanticUndefined # Field title + description: str = PydanticUndefined # Field description + examples: list = PydanticUndefined # Example values + validators: list = PydanticUndefined # Validation functions + exclude: bool = PydanticUndefined # Exclude from serialization + deprecated: bool = PydanticUndefined # Mark as deprecated + frozen: bool = PydanticUndefined # Mark as immutable + alias: str = PydanticUndefined # Alternative field name + alias_priority: int = PydanticUndefined # Priority for alias resolution + + # Core field attributes + name: str = Field(..., exclude=True) # Field name (required) + annotation: type | Any = Field(PydanticUndefined, exclude=True) # Type annotation + validator: Callable | Any = Field(PydanticUndefined, exclude=True) # Validation function + validator_kwargs: dict | Any = Field(default_factory=dict, exclude=True) # Validator parameters + + @property + def field_info(self) -> FieldInfo: + """Generate Pydantic FieldInfo object from field configuration. + + Returns: + FieldInfo object with all configured attributes + + Notes: + - Uses clean dict to exclude UNDEFINED values + - Sets annotation to Any if not specified + - Preserves all metadata in field_info + """ + annotation = self.annotation if self.annotation is not PydanticUndefined else Any + field_obj: FieldInfo = Field(**self.model_dump(exclude_unset=True)) # type: ignore + field_obj.annotation = annotation + return field_obj + + @property + def field_validator(self) -> dict[str, Callable] | None: + """Generate field validator configuration. + + Returns: + Dictionary mapping validator name to function, + or None if no validator defined + + Notes: + - Validator name is f"{field_name}_validator" + - Uses validator_kwargs if provided + - Returns None if validator is UNDEFINED + """ + if self.validator is PydanticUndefined: + return None + kwargs = self.validator_kwargs or {} + return {f"{self.name}_validator": field_validator(self.name, **kwargs)(self.validator)} + + +__all__ = ["FieldModel"] diff --git a/autogen/structure/models/instruct.py b/autogen/structure/models/instruct.py new file mode 100644 index 0000000000..fcea06dfad --- /dev/null +++ b/autogen/structure/models/instruct.py @@ -0,0 +1,102 @@ +# Copyright (c) 2023 - 2024, Owners of https://github.com/ag2ai +# +# SPDX-License-Identifier: Apache-2.0 +# +# Portions derived from https://github.com/lion-agi/lion-core are under the Apache-2.0 License +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any + +from pydantic import BaseModel, JsonValue, field_validator + +from .field_model import FieldModel +from .prompts import ( + context_examples, + context_field_description, + guidance_examples, + guidance_field_description, + instruction_examples, + instruction_field_description, +) + + +def validate_instruction(cls, value) -> JsonValue | None: + """Validates that the instruction is not empty or None and is in the correct format. + + Args: + cls: The validator class method. + value (JsonValue | None): The instruction value to validate. + + Returns: + JsonValue | None: The validated instruction or None if invalid. + """ + if value is None or (isinstance(value, str) and not value.strip()): + return None + return value + + +# Field Models +INSTRUCTION_FIELD = FieldModel( + name="instruction", + annotation=JsonValue | None, + default=None, + title="Primary Instruction", + description=instruction_field_description, + examples=instruction_examples, + validator=validate_instruction, + validator_kwargs={"mode": "before"}, +) + +GUIDANCE_FIELD = FieldModel( + name="guidance", + annotation=JsonValue | None, + default=None, + title="Execution Guidance", + description=guidance_field_description, + examples=guidance_examples, +) + +CONTEXT_FIELD = FieldModel( + name="context", + annotation=JsonValue | None, + default=None, + title="Task Context", + description=context_field_description, + examples=context_examples, +) + + +class Instruct(BaseModel): + """Model for defining instruction parameters and execution requirements. + + Attributes: + instruction (JsonValue | None): The primary instruction. + guidance (JsonValue | None): Execution guidance. + context (JsonValue | None): Task context. + reason (bool): Whether to include reasoning. + actions (bool): Whether specific actions are required. + """ + + instruction: JsonValue | None = INSTRUCTION_FIELD.field_info + guidance: JsonValue | None = GUIDANCE_FIELD.field_info + context: JsonValue | None = CONTEXT_FIELD.field_info + + @field_validator("instruction", **INSTRUCTION_FIELD.validator_kwargs) + def _validate_instruction(cls, v): + """Field validator for the 'instruction' field. + + Args: + v: The value to validate. + + Returns: + JsonValue | None: The validated instruction value. + """ + return INSTRUCTION_FIELD.validator(cls, v) + + +class InstructResponse(BaseModel): + instruct: Instruct + response: Any = None + + +__all__ = ["Instruct", "InstructResponse"] diff --git a/autogen/structure/models/new_model_params.py b/autogen/structure/models/new_model_params.py new file mode 100644 index 0000000000..2f60396f17 --- /dev/null +++ b/autogen/structure/models/new_model_params.py @@ -0,0 +1,173 @@ +# Copyright (c) 2023 - 2024, Owners of https://github.com/ag2ai +# +# SPDX-License-Identifier: Apache-2.0 +# +# Portions derived from https://github.com/lion-agi/lion-core are under the Apache-2.0 License +# SPDX-License-Identifier: Apache-2.0 + +import inspect +from typing import Callable + +from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, create_model, field_validator, model_validator +from pydantic.fields import FieldInfo + +from .field_model import FieldModel + + +class NewModelParams(BaseModel): + """Configuration class for dynamically creating new Pydantic models.""" + + model_config = ConfigDict( + extra="forbid", + arbitrary_types_allowed=True, + use_enum_values=True, + ) + + name: str | None = None + parameter_fields: dict[str, FieldInfo] = Field(default_factory=dict) + base_type: type[BaseModel] = Field(default=BaseModel) + field_models: list[FieldModel] = Field(default_factory=list) + exclude_fields: list = Field(default_factory=list) + field_descriptions: dict = Field(default_factory=dict) + inherit_base: bool = Field(default=True) + config_dict: dict | None = Field(default=None) + doc: str | None = Field(default=None) + frozen: bool = False + _validators: dict[str, Callable] | None = PrivateAttr(default=None) + _use_keys: set[str] = PrivateAttr(default_factory=set) + + @property + def use_fields(self): + """Get field definitions to use in new model.""" + params = {k: v for k, v in self.parameter_fields.items() if k in self._use_keys} + params.update({f.name: f.field_info for f in self.field_models if f.name in self._use_keys}) + return {k: (v.annotation, v) for k, v in params.items()} + + @field_validator("parameter_fields", mode="before") + def validate_parameters(cls, value): + """Validate parameter field definitions.""" + if value is None: + return {} + if not isinstance(value, dict): + raise ValueError("Fields must be a dictionary.") + for k, v in value.items(): + if not isinstance(k, str): + raise ValueError("Field names must be strings.") + if not isinstance(v, FieldInfo): + raise ValueError("Field values must be FieldInfo objects.") + return value.copy() + + @field_validator("base_type", mode="before") + def validate_base(cls, value) -> type[BaseModel]: + """Validate base model type.""" + if value is None: + return BaseModel + if isinstance(value, type) and issubclass(value, BaseModel): + return value + if isinstance(value, BaseModel): + return value.__class__ + raise ValueError("Base must be a BaseModel subclass or instance.") + + @field_validator("exclude_fields", mode="before") + def validate_fields(cls, value) -> list[str]: + """Validate excluded fields list.""" + if value is None: + return [] + if isinstance(value, dict): + value = list(value.keys()) + if isinstance(value, set | tuple): + value = list(value) + if isinstance(value, list): + if not all(isinstance(i, str) for i in value): + raise ValueError("Field names must be strings.") + return value.copy() + raise ValueError("Fields must be a list, set, or dictionary.") + + @field_validator("field_descriptions", mode="before") + def validate_field_descriptions(cls, value) -> dict[str, str]: + """Validate field descriptions dictionary.""" + if value is None: + return {} + if not isinstance(value, dict): + raise ValueError("Field descriptions must be a dictionary.") + for k, v in value.items(): + if not isinstance(k, str): + raise ValueError("Field names must be strings.") + if not isinstance(v, str): + raise ValueError("Field descriptions must be strings.") + return value + + @field_validator("name", mode="before") + def validate_name(cls, value) -> str: + """Validate model name.""" + if value is None: + return "StepModel" + if not isinstance(value, str): + raise ValueError("Name must be a string.") + return value + + @field_validator("field_models", mode="before") + def _validate_field_models(cls, value): + """Validate field model definitions.""" + if value is None: + return [] + value = [value] if not isinstance(value, list) else value + if not all(isinstance(i, FieldModel) for i in value): + raise ValueError("Field models must be FieldModel objects.") + return value + + @model_validator(mode="after") + def validate_param_model(self): + """Validate complete model configuration.""" + if self.base_type is not None: + self.parameter_fields.update(self.base_type.model_fields) + + self.parameter_fields.update({f.name: f.field_info for f in self.field_models}) + + use_keys = list(self.parameter_fields.keys()) + use_keys.extend(list(self._use_keys)) + + if self.exclude_fields: + use_keys = [i for i in use_keys if i not in self.exclude_fields] + + self._use_keys = set(use_keys) + + validators = {} + + for i in self.field_models: + if i.field_validator is not None: + validators.update(i.field_validator) + self._validators = validators + + if self.field_descriptions: + for i in self.field_models: + if i.name in self.field_descriptions: + i.description = self.field_descriptions[i.name] + + if not isinstance(self.name, str): + if hasattr(self.base_type, "class_name"): + if callable(self.base_type.class_name): + self.name = self.base_type.class_name() + else: + self.name = self.base_type.class_name + elif inspect.isclass(self.base_type): + self.name = self.base_type.__name__ + + return self + + def create_new_model(self) -> type[BaseModel]: + """Create new Pydantic model with specified configuration.""" + a: type[BaseModel] = create_model( + self.name, + __config__=self.config_dict, + __doc__=self.doc, + __base__=self.base_type if self.inherit_base else None, + __validators__=self._validators, + **self.use_fields, + ) + if self.frozen: + a.model_config["frozen"] = True + return a + + +__all__ = ["NewModelParams"] diff --git a/autogen/structure/models/operative.py b/autogen/structure/models/operative.py new file mode 100644 index 0000000000..f0327d95cf --- /dev/null +++ b/autogen/structure/models/operative.py @@ -0,0 +1,159 @@ +# Copyright (c) 2023 - 2024, Owners of https://github.com/ag2ai +# +# SPDX-License-Identifier: Apache-2.0 +# +# Portions derived from https://github.com/lion-agi/lion-core are under the Apache-2.0 License +# SPDX-License-Identifier: Apache-2.0 + +from pydantic import BaseModel, Field, PrivateAttr, model_validator +from pydantic.fields import FieldInfo + +from autogen.structure.utils import to_json, validate_keys + +from .field_model import FieldModel +from .new_model_params import NewModelParams + + +class Operative(BaseModel): + """Class representing an operative that handles request and response models for operations.""" + + name: str | None = None + + request_params: NewModelParams | None = Field(default=None) + request_type: type[BaseModel] | None = Field(default=None) + + response_params: NewModelParams | None = Field(default=None) + response_type: type[BaseModel] | None = Field(default=None) + response_model: BaseModel | None = Field(default=None) + response_str_dict: dict | str | None = Field(default=None) + + auto_retry_parse: bool = True + max_retries: int = 3 + _should_retry: bool = PrivateAttr(default=None) + + @model_validator(mode="after") + def _validate(self): + """Validates the operative instance after initialization.""" + if self.request_type is None: + self.request_type = self.request_params.create_new_model() + if self.name is None: + self.name = self.request_params.name or self.request_type.__name__ + return self + + def raise_validate_pydantic(self, text: str) -> None: + """Validates and updates the response model using strict matching. + + Args: + text (str): The text to validate and parse into the response model. + + Raises: + Exception: If the validation fails. + """ + d_ = to_json(text, fuzzy_parse=True) + if isinstance(d_, list | tuple) and len(d_) == 1: + d_ = d_[0] + try: + d_ = validate_keys(d_, self.request_type.model_fields, handle_unmatched="raise") + self.response_model = self.request_type.model_validate(d_) + self._should_retry = False + except Exception: + self.response_str_dict = d_ + self._should_retry = True + + def force_validate_pydantic(self, text: str): + """Forcibly validates and updates the response model, allowing unmatched fields. + + Args: + text (str): The text to validate and parse into the response model. + """ + d_ = text + try: + d_ = to_json(text, fuzzy_parse=True) + if isinstance(d_, list | tuple) and len(d_) == 1: + d_ = d_[0] + d_ = validate_keys(d_, self.request_type.model_fields, handle_unmatched="force") + self.response_model = self.request_type.model_validate(d_) + self._should_retry = False + except Exception: + self.response_str_dict = d_ + self.response_model = None + self._should_retry = True + + def update_response_model(self, text: str | None = None, data: dict | None = None) -> BaseModel | dict | str | None: + """Updates the response model based on the provided text or data. + + Args: + text (str, optional): The text to parse and validate. + data (dict, optional): The data to update the response model with. + + Returns: + BaseModel | dict | str | None: The updated response model or raw data. + + Raises: + ValueError: If neither text nor data is provided. + """ + if text is None and data is None: + raise ValueError("Either text or data must be provided.") + + if text: + self.response_str_dict = text + try: + self.raise_validate_pydantic(text) + except Exception: + self.force_validate_pydantic(text) + + if data and self.response_type: + d_ = self.response_model.model_dump() + d_.update(data) + self.response_model = self.response_type.model_validate(d_) + + if not self.response_model and isinstance(self.response_str_dict, list): + try: + self.response_model = [self.request_type.model_validate(d_) for d_ in self.response_str_dict] + except Exception: + pass + + return self.response_model or self.response_str_dict + + def create_response_type( + self, + response_params: NewModelParams | None = None, + field_models: list[FieldModel] | None = None, + parameter_fields: dict[str, FieldInfo] | None = None, + exclude_fields: list[str] | None = None, + field_descriptions: dict[str, str] | None = None, + inherit_base: bool = True, + config_dict: dict | None = None, + doc: str | None = None, + frozen: bool = False, + validators: dict | None = None, + ) -> None: + """Creates a new response type based on the provided parameters. + + Args: + response_params (NewModelParams, optional): Parameters for the new response model. + field_models (list[FieldModel], optional): List of field models. + parameter_fields (dict[str, FieldInfo], optional): Dictionary of parameter fields. + exclude_fields (list, optional): List of fields to exclude. + field_descriptions (dict, optional): Dictionary of field descriptions. + inherit_base (bool, optional): Whether to inherit the base model. + config_dict (dict | None, optional): Configuration dictionary. + doc (str | None, optional): Documentation string. + frozen (bool, optional): Whether the model is frozen. + validators (dict, optional): Dictionary of validators. + """ + self.response_params = response_params or NewModelParams( + parameter_fields=parameter_fields, + field_models=field_models, + exclude_fields=exclude_fields, + field_descriptions=field_descriptions, + inherit_base=inherit_base, + config_dict=config_dict, + doc=doc, + frozen=frozen, + base_type=self.request_params.base_type, + ) + if validators and isinstance(validators, dict): + self.response_params._validators.update(validators) + + self.response_type = self.response_params.create_new_model() diff --git a/autogen/structure/models/prompts.py b/autogen/structure/models/prompts.py new file mode 100644 index 0000000000..c5784051b4 --- /dev/null +++ b/autogen/structure/models/prompts.py @@ -0,0 +1,85 @@ +# Copyright (c) 2023 - 2024, Owners of https://github.com/ag2ai +# +# SPDX-License-Identifier: Apache-2.0 +# +# Portions derived from https://github.com/lion-agi/lion-core are under the Apache-2.0 License +# SPDX-License-Identifier: Apache-2.0 + +from pydantic import JsonValue + +instruction_field_description = ( + "Define the core task or instruction to be executed. Your instruction should:\n\n" + "1. Be specific and actionable\n" + "2. Clearly state the expected outcome\n" + "3. Include any critical constraints or requirements\n\n" + "**Guidelines for writing effective instructions:**\n" + "- Start with a clear action verb (e.g., analyze, create, evaluate)\n" + "- Specify the scope and boundaries of the task\n" + "- Include success criteria when applicable\n" + "- Break complex tasks into distinct steps\n\n" + "**Examples:**\n" + "- 'Analyze the provided sales data and identify top 3 performing products'\n" + "- 'Generate a Python function that validates email addresses'\n" + "- 'Create a data visualization showing monthly revenue trends'" +) + +guidance_field_description = ( + "Provide strategic direction and constraints for task execution.\n\n" + "**Key components to include:**\n" + "1. Methodological preferences\n" + "2. Quality standards and requirements\n" + "3. Specific limitations or boundaries\n" + "4. Performance expectations\n\n" + "**Best practices:**\n" + "- Be explicit about any assumptions that should be made\n" + "- Specify preferred approaches or techniques\n" + "- Detail any constraints on resources or methods\n" + "- Include relevant standards or compliance requirements\n\n" + "Leave as None if no specific guidance is needed beyond the instruction." +) + +context_field_description = ( + "Supply essential background information and current state data required for " + "task execution.\n\n" + "**Include relevant details about:**\n" + "1. Environmental conditions\n" + "2. Historical context\n" + "3. Related systems or processes\n" + "4. Previous outcomes or decisions\n\n" + "**Context should:**\n" + "- Be directly relevant to the task\n" + "- Provide necessary background without excess detail\n" + "- Include any dependencies or prerequisites\n" + "- Specify the current state of the system\n\n" + "Set to None if no additional context is required." +) + + +# Example structures for each field to demonstrate proper formatting +instruction_examples: list[JsonValue] = [ + "Analyze the dataset 'sales_2023.csv' and identify revenue trends", + "Create a Python function to process customer feedback data", + { + "task": "data_analysis", + "target": "sales_performance", + "scope": ["revenue", "growth", "seasonality"], + }, +] + +guidance_examples: list[JsonValue] = [ + "Use statistical methods for trend analysis", + "Optimize for readability and maintainability", + { + "methods": ["regression", "time_series"], + "constraints": {"memory": "2GB", "time": "5min"}, + }, +] + +context_examples: list[JsonValue] = [ + "Previous analysis showed seasonal patterns", + { + "prior_results": {"accuracy": 0.95}, + "system_state": "production", + "dependencies": ["numpy", "pandas"], + }, +] diff --git a/autogen/structure/utils/__init__.py b/autogen/structure/utils/__init__.py new file mode 100644 index 0000000000..7fa4303031 --- /dev/null +++ b/autogen/structure/utils/__init__.py @@ -0,0 +1,19 @@ +from .break_down_pydantic import break_down_pydantic_annotation +from .string_similarity import SIMILARITY_TYPE, string_similarity +from .to_json import fuzzy_parse_json, to_dict, to_json +from .validate_keys import validate_keys +from .validate_mapping import validate_mapping +from .xml_parser import dict_to_xml, xml_to_dict + +__all__ = [ + "break_down_pydantic_annotation", + "string_similarity", + "SIMILARITY_TYPE", + "to_json", + "to_dict", + "fuzzy_parse_json", + "validate_keys", + "validate_mapping", + "xml_to_dict", + "dict_to_xml", +] diff --git a/autogen/structure/utils/break_down_pydantic.py b/autogen/structure/utils/break_down_pydantic.py new file mode 100644 index 0000000000..ebc8f78c66 --- /dev/null +++ b/autogen/structure/utils/break_down_pydantic.py @@ -0,0 +1,81 @@ +# Copyright (c) 2023 - 2024, Owners of https://github.com/ag2ai +# +# SPDX-License-Identifier: Apache-2.0 +# +# Portions derived from https://github.com/lion-agi/lion-core are under the Apache-2.0 License +# SPDX-License-Identifier: Apache-2.0 + +from inspect import isclass +from typing import Any, Dict, TypeVar, get_args, get_origin + +from pydantic import BaseModel + +T = TypeVar("T", bound=BaseModel) + + +def break_down_pydantic_annotation( + model: type[T], max_depth: int | None = None, current_depth: int = 0 +) -> Dict[str, Any]: + """ + Break down the type annotations of a Pydantic model into a dictionary. + + This function recursively processes Pydantic models, converting their + field annotations into a dictionary structure. It handles nested models + and lists of models. + + Args: + model: The Pydantic model class to break down. + max_depth: Maximum depth for recursion. None for no limit. + current_depth: Current recursion depth (used internally). + + Returns: + A dictionary representing the structure of the model's annotations. + + Raises: + TypeError: If the input is not a Pydantic model. + RecursionError: If max recursion depth is reached. + + Example: + >>> from pydantic import BaseModel + >>> class SubModel(BaseModel): + ... field1: int + ... field2: str + >>> class MainModel(BaseModel): + ... sub: SubModel + ... items: list[SubModel] + >>> result = break_down_annotation(MainModel) + >>> print(result) + { + 'sub': {'field1': , 'field2': }, + 'items': [{'field1': , 'field2': }] + } + """ + + if not _is_pydantic_model(model): + raise TypeError("Input must be a Pydantic model") + + if max_depth is not None and current_depth >= max_depth: + raise RecursionError("Maximum recursion depth reached") + + out: Dict[str, Any] = {} + for k, v in model.__annotations__.items(): + origin = get_origin(v) + if _is_pydantic_model(v): + out[k] = break_down_pydantic_annotation(v, max_depth, current_depth + 1) + elif origin is list: + args = get_args(v) + if args and _is_pydantic_model(args[0]): + out[k] = [break_down_pydantic_annotation(args[0], max_depth, current_depth + 1)] + else: + out[k] = [args[0] if args else Any] + else: + out[k] = v + + return out + + +def _is_pydantic_model(x: Any) -> bool: + return isclass(x) and issubclass(x, BaseModel) + + +__all__ = ["break_down_pydantic_annotation"] diff --git a/autogen/structure/utils/prepare_assistant_response.py b/autogen/structure/utils/prepare_assistant_response.py new file mode 100644 index 0000000000..6ec873b1fb --- /dev/null +++ b/autogen/structure/utils/prepare_assistant_response.py @@ -0,0 +1,52 @@ +# Copyright (c) 2023 - 2024, Owners of https://github.com/ag2ai +# +# SPDX-License-Identifier: Apache-2.0 +# +# Portions derived from https://github.com/lion-agi/lion-core are under the Apache-2.0 License +# SPDX-License-Identifier: Apache-2.0 + + +from pydantic import BaseModel + + +def prepare_assistant_response(assistant_response: BaseModel | list[BaseModel] | dict | str) -> dict: + """ + Prepare an assistant's response for storage and transmission. + + This function handles various input formats including: + - Single model outputs (response.choices[0].message.content) + - Streaming responses (response[i].choices[0].delta.content) + - Direct content in dictionaries or strings + + Args: + assistant_response: The response content in any supported format + + Returns: + Note: Formatted response content + """ + if assistant_response: + content = {} + # Handle model.choices[0].message.content format + if isinstance(assistant_response, BaseModel): + content["assistant_response"] = assistant_response.choices[0].message.content or "" + content["model_response"] = assistant_response.model_dump(exclude_none=True, exclude_unset=True) + # Handle streaming response[i].choices[0].delta.content format + elif isinstance(assistant_response, list): + msg = "".join([i.choices[0].delta.content or "" for i in assistant_response]) + content["assistant_response"] = msg + content["model_response"] = [ + i.model_dump( + exclude_none=True, + exclude_unset=True, + ) + for i in assistant_response + ] + elif isinstance(assistant_response, dict) and "content" in assistant_response: + content["assistant_response"] = assistant_response["content"] + elif isinstance(assistant_response, str): + content["assistant_response"] = assistant_response + else: + content["assistant_response"] = str(assistant_response) + return content + else: + return {"assistant_response": "", "model_response": ""} diff --git a/autogen/structure/utils/prepare_instruction.py b/autogen/structure/utils/prepare_instruction.py new file mode 100644 index 0000000000..8d30cd347d --- /dev/null +++ b/autogen/structure/utils/prepare_instruction.py @@ -0,0 +1,205 @@ +# Copyright (c) 2023 - 2024, Owners of https://github.com/ag2ai +# +# SPDX-License-Identifier: Apache-2.0 +# +# Portions derived from https://github.com/lion-agi/lion-core are under the Apache-2.0 License +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any, Literal + +from pydantic import BaseModel + +from .break_down_pydantic import break_down_pydantic_annotation + +DEFAULT_SYSTEM = "You are a helpful AI assistant. Let's think step by step." + +CONTENT_KEYS = [ + "guidance", + "instruction", + "context", + "request_response_format", + "tool_schemas", +] + + +def prepare_request_response_format(request_fields: dict) -> str: + """ + Prepare a standardized format for request responses. + + Args: + request_fields: Dictionary of fields to include in response + + Returns: + str: Formatted response template + """ + return ( + "**MUST RETURN JSON-PARSEABLE RESPONSE ENCLOSED BY JSON CODE BLO" f"CKS.** \n```json\n{request_fields}\n```" + ).strip() + + +def format_image_item(idx: str, x: str, /) -> dict[str, Any]: + """ + Create an image_url dictionary for content formatting. + + Args: + idx: Base64 encoded image data + x: Image detail level + + Returns: + dict: Formatted image item + """ + return { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{idx}", + "detail": x, + }, + } + + +def format_text_item(item: Any) -> str: + """ + Format a text item or list of items into a string. + + Args: + item: Text item(s) to format + + Returns: + str: Formatted text + """ + msg = "" + item = [item] if not isinstance(item, list) else item + for j in item: + if isinstance(j, dict): + for k, v in j.items(): + msg += f"- {k}: {v} \n\n" + else: + msg += f"{j}\n" + return msg + + +def format_text_content(content: dict) -> str: + """ + Format dictionary content into a structured text format. + + Args: + content: Dictionary containing content sections + + Returns: + str: Formatted text content + """ + if "plain_content" in content and isinstance(content["plain_content"], str): + return content["plain_content"] + + msg = "\n---\n # Task\n" + for k, v in content.items(): + if k in CONTENT_KEYS: + if k == "request_response_format": + k = "response format" + msg += f"## **Task {k}**\n{format_text_item(v)}\n\n" + msg += "\n\n---\n" + return msg + + +def format_image_content( + text_content: str, + images: list, + image_detail: Literal["low", "high", "auto"], +) -> dict[str, Any]: + """ + Format text content with images for message content. + + Args: + text_content: The text content to format + images: List of images to include + image_detail: Level of detail for images + + Returns: + dict: Formatted content with text and images + """ + content = [{"type": "text", "text": text_content}] + content.extend(format_image_item(i, image_detail) for i in images) + return content + + +def prepare_instruction_content( + guidance: str | None = None, + instruction: str | None = None, + context: str | dict | list | None = None, + request_fields: dict | list[str] | None = None, + plain_content: str | None = None, + request_model: BaseModel = None, + images: str | list | None = None, + image_detail: Literal["low", "high", "auto"] | None = None, + tool_schemas: dict | None = None, +) -> dict: + """ + Prepare the content for an instruction message. + + Args: + guidance: Optional guidance text + instruction: Main instruction content + context: Additional context information + request_fields: Fields to request in response + plain_content: Plain text content + request_model: Pydantic model for structured requests + images: Images to include + image_detail: Level of detail for images + tool_schemas: Tool schemas to include + + Returns: + Note: Prepared instruction content + + Raises: + ValueError: If both request_fields and request_model are provided + """ + if request_fields and request_model: + raise ValueError("only one of request_fields or request_model can be provided") + + out_ = {"context": []} + if guidance: + out_["guidance"] = guidance + if instruction: + out_["instruction"] = instruction + if context: + if isinstance(context, list): + out_["context"].extend(context) + else: + out_["context"].append(context) + if images: + out_["images"] = images if isinstance(images, list) else [images] + out_["image_detail"] = image_detail or "low" + + if tool_schemas: + out_["tool_schemas"] = tool_schemas + + if request_model: + out_["request_model"] = request_model + request_fields = break_down_pydantic_annotation(request_model) + out_["context"].append({"respond_schema_info": request_model.model_json_schema()}) + + if request_fields: + _fields = request_fields if isinstance(request_fields, dict) else {} + if not isinstance(request_fields, dict): + _fields = {i: "..." for i in request_fields} + out_["request_fields"] = _fields + out_["request_response_format"] = prepare_request_response_format(request_fields=_fields) + + if plain_content: + out_["plain_content"] = plain_content + + return {k: v for k, v in out_.items() if v is not None} + + +def format_instruction_content(content: dict) -> dict[str, Any]: + """Format the content of the instruction.""" + text_content = format_text_content(content) + if "images" not in content: + return {"role": "user", "content": text_content} + else: + content_ = format_image_content( + text_content=text_content, + images=content["images"], + image_detail=content["image_detail"], + ) + return {"role": "user", "content": content_} diff --git a/autogen/structure/utils/string_similarity.py b/autogen/structure/utils/string_similarity.py new file mode 100644 index 0000000000..adfcf75641 --- /dev/null +++ b/autogen/structure/utils/string_similarity.py @@ -0,0 +1,300 @@ +# Copyright (c) 2023 - 2024, Owners of https://github.com/ag2ai +# +# SPDX-License-Identifier: Apache-2.0 +# +# Portions derived from https://github.com/lion-agi/lion-core are under the Apache-2.0 License +# SPDX-License-Identifier: Apache-2.0 +from dataclasses import dataclass +from difflib import SequenceMatcher +from itertools import product +from typing import Callable, Dict, List, Optional, Sequence, TypeVar, Union + +from pydantic import BaseModel +from typing_extensions import Literal + +T = TypeVar("T", bound=BaseModel) + + +def cosine_similarity(s1: str, s2: str) -> float: + """Calculate the cosine similarity between two strings. + + Args: + s1: First input string + s2: Second input string + + Returns: + float: Cosine similarity score between 0 and 1 + """ + if not s1 or not s2: + return 0.0 + + set1, set2 = set(s1), set(s2) + intersection = set1.intersection(set2) + + if not set1 or not set2: + return 0.0 + + return len(intersection) / ((len(set1) * len(set2)) ** 0.5) + + +def hamming_similarity(s1: str, s2: str) -> float: + """Calculate the Hamming similarity between two strings. + + The strings must be of equal length. Returns the proportion of positions + at which corresponding symbols are the same. + + Args: + s1: First input string + s2: Second input string + + Returns: + float: Hamming similarity score between 0 and 1 + """ + if not s1 or not s2 or len(s1) != len(s2): + return 0.0 + + matches = sum(c1 == c2 for c1, c2 in zip(s1, s2)) + return matches / len(s1) + + +def jaro_distance(s: str, t: str) -> float: + """Calculate the Jaro distance between two strings. + + Args: + s: First input string + t: Second input string + + Returns: + float: Jaro distance score between 0 and 1 + """ + s_len = len(s) + t_len = len(t) + + if s_len == 0 and t_len == 0: + return 1.0 + elif s_len == 0 or t_len == 0: + return 0.0 + + match_distance = (max(s_len, t_len) // 2) - 1 + match_distance = max(0, match_distance) # Ensure non-negative + + s_matches = [False] * s_len + t_matches = [False] * t_len + + matches = 0 + transpositions = 0 + + # Identify matches + for i in range(s_len): + start = max(0, i - match_distance) + end = min(i + match_distance + 1, t_len) + + for j in range(start, end): + if t_matches[j] or s[i] != t[j]: + continue + s_matches[i] = t_matches[j] = True + matches += 1 + break + + if matches == 0: + return 0.0 + + # Count transpositions + k = 0 + for i in range(s_len): + if not s_matches[i]: + continue + while not t_matches[k]: + k += 1 + if s[i] != t[k]: + transpositions += 1 + k += 1 + + transpositions //= 2 + + return (matches / s_len + matches / t_len + (matches - transpositions) / matches) / 3.0 + + +def jaro_winkler_similarity(s: str, t: str, scaling: float = 0.1) -> float: + """Calculate the Jaro-Winkler similarity between two strings. + + Args: + s: First input string + t: Second input string + scaling: Scaling factor for common prefix adjustment + + Returns: + float: Jaro-Winkler similarity score between 0 and 1 + + Raises: + ValueError: If scaling factor is not between 0 and 0.25 + """ + if not 0 <= scaling <= 0.25: + raise ValueError("Scaling factor must be between 0 and 0.25") + + jaro_sim = jaro_distance(s, t) + + # Find length of common prefix (up to 4 chars) + prefix_len = 0 + for s_char, t_char in zip(s, t): + if s_char != t_char: + break + prefix_len += 1 + if prefix_len == 4: + break + + return jaro_sim + (prefix_len * scaling * (1 - jaro_sim)) + + +def levenshtein_distance(a: str, b: str) -> int: + """Calculate the Levenshtein (edit) distance between two strings. + + Args: + a: First input string + b: Second input string + + Returns: + int: Minimum number of single-character edits needed to change one + string into the other + """ + if not a: + return len(b) + if not b: + return len(a) + + m, n = len(a), len(b) + d = [[0] * (n + 1) for _ in range(m + 1)] + + for i in range(m + 1): + d[i][0] = i + for j in range(n + 1): + d[0][j] = j + + for i, j in product(range(1, m + 1), range(1, n + 1)): + cost = 0 if a[i - 1] == b[j - 1] else 1 + d[i][j] = min( + d[i - 1][j] + 1, # deletion + d[i][j - 1] + 1, # insertion + d[i - 1][j - 1] + cost, # substitution + ) + + return d[m][n] + + +def levenshtein_similarity(s1: str, s2: str) -> float: + """Calculate the Levenshtein similarity between two strings. + + Converts Levenshtein distance to a similarity score between 0 and 1. + + Args: + s1: First input string + s2: Second input string + + Returns: + float: Levenshtein similarity score between 0 and 1 + """ + if not s1 and not s2: + return 1.0 + if not s1 or not s2: + return 0.0 + + distance = levenshtein_distance(s1, s2) + max_len = max(len(s1), len(s2)) + return 1 - (distance / max_len) + + +# Type definitions +SIMILARITY_ALGO_MAP: Dict[str, Callable[[str, str], float]] = { + "jaro_winkler": jaro_winkler_similarity, + "levenshtein": levenshtein_similarity, + "sequence_matcher": lambda s1, s2: SequenceMatcher(None, s1, s2).ratio(), + "hamming": hamming_similarity, + "cosine": cosine_similarity, +} + + +SIMILARITY_TYPE = Literal[ + "jaro_winkler", + "levenshtein", + "sequence_matcher", + "hamming", + "cosine", +] + + +@dataclass(frozen=True) +class MatchResult: + """Represents a string matching result.""" + + word: str + score: float + index: int + + +def string_similarity( + word: str, + correct_words: Sequence[str], + algorithm: SIMILARITY_TYPE | Callable[[str, str], float] = "jaro_winkler", + threshold: float = 0.0, + case_sensitive: bool = False, + return_most_similar: bool = False, +) -> Optional[Union[str, List[str]]]: + """Find similar strings using specified similarity algorithm.""" + if not correct_words: + raise ValueError("correct_words must not be empty") + + if not 0.0 <= threshold <= 1.0: + raise ValueError("threshold must be between 0.0 and 1.0") + + # Convert inputs to strings + compare_word = str(word) + original_words = [str(w) for w in correct_words] + + # Handle case sensitivity + if not case_sensitive: + compare_word = compare_word.lower() + compare_words = [w.lower() for w in original_words] + else: + compare_words = original_words.copy() + + # Get scoring function + if isinstance(algorithm, str): + score_func = SIMILARITY_ALGO_MAP.get(algorithm) + if score_func is None: + raise ValueError(f"Unsupported algorithm: {algorithm}") + elif callable(algorithm): + score_func = algorithm + else: + raise ValueError("algorithm must be a string specifying a built-in algorithm or " "a callable") + + # Calculate similarities + results = [] + for idx, (orig_word, comp_word) in enumerate(zip(original_words, compare_words)): + # Skip different length strings for hamming similarity + if algorithm == "hamming" and len(comp_word) != len(compare_word): + continue + + score = score_func(compare_word, comp_word) + if score >= threshold: + results.append(MatchResult(orig_word, score, idx)) + + # Return None if no matches + if not results: + return None + + # Sort by score (descending) and index (ascending) for stable ordering + results.sort(key=lambda x: (-x.score, x.index)) + + # Return results + if return_most_similar: + return results[0].word + + # Filter exact matches for case sensitive comparisons + if case_sensitive: + max_score = results[0].score + results = [r for r in results if r.score == max_score] + + return [r.word for r in results] + + +__all__ = ["string_similarity", "SIMILARITY_TYPE"] diff --git a/autogen/structure/utils/to_json.py b/autogen/structure/utils/to_json.py new file mode 100644 index 0000000000..38bd31d510 --- /dev/null +++ b/autogen/structure/utils/to_json.py @@ -0,0 +1,551 @@ +# Copyright (c) 2023 - 2024, Owners of https://github.com/ag2ai +# +# SPDX-License-Identifier: Apache-2.0 +# +# Portions derived from https://github.com/lion-agi/lion-core are under the Apache-2.0 License +# SPDX-License-Identifier: Apache-2.0 +import json +import re +from collections.abc import Callable, Iterable, Mapping +from typing import Any, Dict, List, Sequence, Union, overload + +from pydantic_core import PydanticUndefinedType +from typing_extensions import Literal + +from .xml_parser import xml_to_dict + + +def to_json(string: str | List[str], /, fuzzy_parse: bool = False) -> Union[List[Dict[str, Any]], Dict]: + """Extract and parse JSON content from a string or markdown code blocks. + + This function attempts to parse JSON directly from the input string first. + If that fails, it looks for JSON content within markdown code blocks + (denoted by ```json). + + Args: + string: Input string or list of strings to parse. If a list is provided, + it will be joined with newlines. + + Returns: + - A dictionary if a single JSON object is found + - A list of dictionaries if multiple JSON objects are found + - An empty list if no valid JSON is found + + Examples: + >>> to_json('{"key": "value"}') + {'key': 'value'} + + >>> to_json(''' + ... ```json + ... {"key": "value"} + ... ``` + ... ''') + {'key': 'value'} + + >>> to_json(''' + ... ```json + ... {"key1": "value1"} + ... ``` + ... ```json + ... {"key2": "value2"} + ... ``` + ... ''') + [{'key1': 'value1'}, {'key2': 'value2'}] + """ + + if isinstance(string, list): + string = "\n".join(string) + + # Try direct JSON parsing first + try: + if fuzzy_parse: + return fuzzy_parse_json(string) + return json.loads(string) + except Exception: + pass + + # Look for JSON in markdown code blocks + pattern = r"```json\s*(.*?)\s*```" + matches = re.findall(pattern, string, re.DOTALL) + + if not matches: + return [] + + if len(matches) == 1: + return json.loads(matches[0]) + + if fuzzy_parse: + return [fuzzy_parse_json(match) for match in matches] + return [json.loads(match) for match in matches] + + +def fuzzy_parse_json(str_to_parse: str, /) -> Union[Dict[str, Any], List[Dict[str, Any]]]: + """Parse a JSON string with automatic fixing of common formatting issues. + + Args: + str_to_parse: The JSON string to parse + + Returns: + The parsed JSON object as a dictionary + + Raises: + ValueError: If the string cannot be parsed as valid JSON + TypeError: If the input is not a string or the result is not a dict + """ + if not isinstance(str_to_parse, str): + raise TypeError("Input must be a string") + + if not str_to_parse.strip(): + raise ValueError("Input string is empty") + + try: + return json.loads(str_to_parse) + except Exception: + pass + + cleaned = _clean_json_string(str_to_parse) + try: + return json.loads(cleaned) + except Exception: + pass + + try: + fixed = fix_json_string(cleaned) + return json.loads(fixed) + except Exception as e: + raise ValueError(f"Failed to parse JSON string after all fixing attempts: {e}") from e + + +def _clean_json_string(s: str) -> str: + """Clean and standardize a JSON string.""" + s = re.sub(r"(? str: + """Fix a JSON string by ensuring all brackets are properly closed. + + Args: + str_to_parse: JSON string to fix + + Returns: + Fixed JSON string with proper bracket closure + + Raises: + ValueError: If mismatched or extra closing brackets are found + """ + if not str_to_parse: + raise ValueError("Input string is empty") + + brackets = {"{": "}", "[": "]"} + open_brackets = [] + pos = 0 + length = len(str_to_parse) + + while pos < length: + char = str_to_parse[pos] + + # Handle escape sequences + if char == "\\": + pos += 2 # Skip escape sequence + continue + + # Handle string content + if char == '"': + pos += 1 + # Skip until closing quote, accounting for escapes + while pos < length: + if str_to_parse[pos] == "\\": + pos += 2 # Skip escape sequence + continue + if str_to_parse[pos] == '"': + break + pos += 1 + pos += 1 + continue + + # Handle brackets + if char in brackets: + open_brackets.append(brackets[char]) + elif char in brackets.values(): + if not open_brackets: + raise ValueError(f"Extra closing bracket '{char}' at position {pos}") + if open_brackets[-1] != char: + raise ValueError(f"Mismatched bracket '{char}' at position {pos}") + open_brackets.pop() + + pos += 1 + + # Add missing closing brackets + closing_brackets = "".join(reversed(open_brackets)) + return str_to_parse + closing_brackets + + +@overload +def to_dict(input_: type[None] | PydanticUndefinedType, /) -> dict[str, Any]: ... + + +@overload +def to_dict(input_: Mapping, /) -> dict[str, Any]: ... + + +@overload +def to_dict(input_: set, /) -> dict[Any, Any]: ... + + +@overload +def to_dict(input_: Sequence, /) -> dict[str, Any]: ... + + +@overload +def to_dict( + input_: Any, + /, + *, + use_model_dump: bool = True, + fuzzy_parse: bool = False, + suppress: bool = False, + str_type: Literal["json", "xml"] | None = "json", + parser: Callable[[str], Any] | None = None, + recursive: bool = False, + max_recursive_depth: int = None, + exclude_types: tuple = (), + recursive_python_only: bool = True, + **kwargs: Any, +) -> dict[str, Any]: ... + + +def to_dict( + input_: Any, + /, + *, + use_model_dump: bool = True, + fuzzy_parse: bool = False, + suppress: bool = False, + str_type: Literal["json", "xml"] | None = "json", + parser: Callable[[str], Any] | None = None, + recursive: bool = False, + max_recursive_depth: int = None, + exclude_types: tuple = (), + recursive_python_only: bool = True, + **kwargs: Any, +): + """ + Convert various input types to a dictionary, with optional recursive processing. + + Args: + input_: The input to convert. + use_model_dump: Use model_dump() for Pydantic models if available. + fuzzy_parse: Use fuzzy parsing for string inputs. + suppress: Return empty dict on errors if True. + str_type: Input string type ("json" or "xml"). + parser: Custom parser function for string inputs. + recursive: Enable recursive conversion of nested structures. + max_recursive_depth: Maximum recursion depth (default 5, max 10). + exclude_types: Tuple of types to exclude from conversion. + recursive_python_only: If False, attempts to convert custom types recursively. + **kwargs: Additional arguments for parsing functions. + + Returns: + dict[str, Any]: A dictionary derived from the input. + + Raises: + ValueError: If parsing fails and suppress is False. + + Examples: + >>> to_dict({"a": 1, "b": [2, 3]}) + {'a': 1, 'b': [2, 3]} + >>> to_dict('{"x": 10}', str_type="json") + {'x': 10} + >>> to_dict({"a": {"b": {"c": 1}}}, recursive=True, max_recursive_depth=2) + {'a': {'b': {'c': 1}}} + """ + try: + if recursive: + return recursive_to_dict( + input_, + use_model_dump=use_model_dump, + fuzzy_parse=fuzzy_parse, + str_type=str_type, + parser=parser, + max_recursive_depth=max_recursive_depth, + exclude_types=exclude_types, + recursive_custom_types=not recursive_python_only, + **kwargs, + ) + + return _to_dict( + input_, + fuzzy_parse=fuzzy_parse, + parser=parser, + str_type=str_type, + use_model_dump=use_model_dump, + exclude_types=exclude_types, + **kwargs, + ) + except Exception as e: + if suppress: + return {} + raise e + + +def _to_dict( + input_: Any, + /, + *, + use_model_dump: bool = True, + fuzzy_parse: bool = False, + str_type: Literal["json", "xml"] | None = "json", + parser: Callable[[str], Any] | None = None, + exclude_types: tuple = (), + **kwargs: Any, +) -> dict[str, Any]: + """Convert various input types to a dictionary. + + Handles multiple input types, including None, Mappings, strings, and more. + + Args: + input_: The input to convert to a dictionary. + use_model_dump: Use model_dump() for Pydantic models if available. + fuzzy_parse: Use fuzzy parsing for string inputs. + suppress: Return empty dict on parsing errors if True. + str_type: Input string type, either "json" or "xml". + parser: Custom parser function for string inputs. + **kwargs: Additional arguments passed to parsing functions. + + Returns: + A dictionary derived from the input. + + Raises: + ValueError: If string parsing fails and suppress is False. + + Examples: + >>> to_dict({"a": 1, "b": 2}) + {'a': 1, 'b': 2} + >>> to_dict('{"x": 10}', str_type="json") + {'x': 10} + >>> to_dict("1", str_type="xml") + {'a': '1'} + """ + if isinstance(exclude_types, tuple) and len(exclude_types) > 0: + if isinstance(input_, exclude_types): + return input_ + + if isinstance(input_, dict): + return input_ + + if use_model_dump and hasattr(input_, "model_dump"): + return input_.model_dump(**kwargs) + + if isinstance(input_, type(None) | PydanticUndefinedType): + return _undefined_to_dict(input_) + if isinstance(input_, Mapping): + return _mapping_to_dict(input_) + + if isinstance(input_, str): + if fuzzy_parse: + parser = fuzzy_parse_json + try: + a = _str_to_dict( + input_, + str_type=str_type, + parser=parser, + **kwargs, + ) + if isinstance(a, dict): + return a + except Exception as e: + raise ValueError("Failed to convert string to dictionary") from e + + if isinstance(input_, set): + return _set_to_dict(input_) + if isinstance(input_, Iterable): + return _iterable_to_dict(input_) + + return _generic_type_to_dict(input_, **kwargs) + + +def _recursive_to_dict( + input_: Any, + /, + *, + max_recursive_depth: int, + current_depth: int = 0, + recursive_custom_types: bool = False, + exclude_types: tuple = (), + **kwargs: Any, +) -> Any: + + if current_depth >= max_recursive_depth: + return input_ + + if isinstance(input_, str): + try: + # Attempt to parse the string + parsed = _to_dict(input_, **kwargs) + # Recursively process the parsed result + return _recursive_to_dict( + parsed, + max_recursive_depth=max_recursive_depth, + current_depth=current_depth + 1, + recursive_custom_types=recursive_custom_types, + exclude_types=exclude_types, + **kwargs, + ) + except Exception: + # Return the original string if parsing fails + return input_ + + elif isinstance(input_, dict): + # Recursively process dictionary values + return { + key: _recursive_to_dict( + value, + max_recursive_depth=max_recursive_depth, + current_depth=current_depth + 1, + recursive_custom_types=recursive_custom_types, + exclude_types=exclude_types, + **kwargs, + ) + for key, value in input_.items() + } + + elif isinstance(input_, (list, tuple)): + # Recursively process list or tuple elements + processed = [ + _recursive_to_dict( + element, + max_recursive_depth=max_recursive_depth, + current_depth=current_depth + 1, + recursive_custom_types=recursive_custom_types, + exclude_types=exclude_types, + **kwargs, + ) + for element in input_ + ] + return type(input_)(processed) + + elif recursive_custom_types: + # Process custom classes if enabled + try: + obj_dict = to_dict(input_, **kwargs) + return _recursive_to_dict( + obj_dict, + max_recursive_depth=max_recursive_depth, + current_depth=current_depth + 1, + recursive_custom_types=recursive_custom_types, + exclude_types=exclude_types, + **kwargs, + ) + except Exception: + return input_ + + else: + # Return the input as is for other data types + return input_ + + +def recursive_to_dict( + input_: Any, + /, + *, + max_recursive_depth: int = None, + exclude_types: tuple = (), + recursive_custom_types: bool = False, + **kwargs: Any, +) -> Any: + + if not isinstance(max_recursive_depth, int): + max_recursive_depth = 5 + else: + if max_recursive_depth < 0: + raise ValueError("max_recursive_depth must be a non-negative integer") + if max_recursive_depth == 0: + return input_ + if max_recursive_depth > 10: + raise ValueError("max_recursive_depth must be less than or equal to 10") + + return _recursive_to_dict( + input_, + max_recursive_depth=max_recursive_depth, + current_depth=0, + recursive_custom_types=recursive_custom_types, + exclude_types=exclude_types, + **kwargs, + ) + + +def _undefined_to_dict( + input_: type[None] | PydanticUndefinedType, + /, +) -> dict: + return {} + + +def _mapping_to_dict(input_: Mapping, /) -> dict: + return dict(input_) + + +def _str_to_dict( + input_: str, + /, + *, + str_type: Literal["json", "xml"] | None = "json", + parser: Callable[[str], Any] | None = None, + **kwargs: Any, +) -> dict[str, Any] | list[dict[str, Any]]: + """Handle string inputs.""" + if not input_: + return {} + + if str_type == "json": + try: + return json.loads(input_, **kwargs) if parser is None else parser(input_, **kwargs) + except json.JSONDecodeError as e: + raise ValueError("Failed to parse JSON string") from e + + if str_type == "xml": + try: + if parser is None: + return xml_to_dict(input_, **kwargs) + return parser(input_, **kwargs) + except Exception as e: + raise ValueError("Failed to parse XML string") from e + + raise ValueError(f"Unsupported string type for `to_dict`: {str_type}, it should " "be 'json' or 'xml'.") + + +def _set_to_dict(input_: set, /) -> dict: + return {value: value for value in input_} + + +def _iterable_to_dict(input_: Iterable, /) -> dict: + return {idx: v for idx, v in enumerate(input_)} + + +def _generic_type_to_dict( + input_, + /, + **kwargs: Any, +) -> dict[str, Any]: + + try: + for method in ["to_dict", "dict", "json", "to_json"]: + if hasattr(input_, method): + result = getattr(input_, method)(**kwargs) + return json.loads(result) if isinstance(result, str) else result + except Exception: + pass + + if hasattr(input_, "__dict__"): + return input_.__dict__ + + try: + return dict(input_) + except Exception as e: + raise ValueError(f"Unable to convert input to dictionary: {e}") + + +__all__ = ["to_json", "fuzzy_parse_json", "to_dict"] diff --git a/autogen/structure/utils/validate_keys.py b/autogen/structure/utils/validate_keys.py new file mode 100644 index 0000000000..392041df89 --- /dev/null +++ b/autogen/structure/utils/validate_keys.py @@ -0,0 +1,149 @@ +# Copyright (c) 2023 - 2024, Owners of https://github.com/ag2ai +# +# SPDX-License-Identifier: Apache-2.0 +# +# Portions derived from https://github.com/lion-agi/lion-core are under the Apache-2.0 License +# SPDX-License-Identifier: Apache-2.0 +from collections.abc import Callable, Sequence +from typing import Any, Literal, TypedDict + +from .string_similarity import SIMILARITY_ALGO_MAP, SIMILARITY_TYPE, string_similarity + + +class KeysDict(TypedDict): + """Dictionary mapping keys to their expected types.""" + + pass + + +def validate_keys( + d_: dict[str, Any], + keys: Sequence[str] | KeysDict, + /, + *, + similarity_algo: SIMILARITY_TYPE | Callable[[str, str], float] = "jaro_winkler", + similarity_threshold: float = 0.85, + fuzzy_match: bool = True, + handle_unmatched: Literal["ignore", "raise", "remove", "fill", "force"] = "ignore", + fill_value: Any = None, + fill_mapping: dict[str, Any] | None = None, + strict: bool = False, +) -> dict[str, Any]: + """ + Validate and correct dictionary keys based on expected keys using string similarity. + + Args: + d_: The dictionary to validate and correct keys for. + keys: List of expected keys or dictionary mapping keys to types. + similarity_algo: String similarity algorithm to use or custom function. + similarity_threshold: Minimum similarity score for fuzzy matching. + fuzzy_match: If True, use fuzzy matching for key correction. + handle_unmatched: Specifies how to handle unmatched keys: + - "ignore": Keep unmatched keys in output. + - "raise": Raise ValueError if unmatched keys exist. + - "remove": Remove unmatched keys from output. + - "fill": Fill unmatched keys with default value/mapping. + - "force": Combine "fill" and "remove" behaviors. + fill_value: Default value for filling unmatched keys. + fill_mapping: Dictionary mapping unmatched keys to default values. + strict: If True, raise ValueError if any expected key is missing. + + Returns: + A new dictionary with validated and corrected keys. + + Raises: + ValueError: If validation fails based on specified parameters. + TypeError: If input types are invalid. + AttributeError: If key validation fails. + """ + # Input validation + if not isinstance(d_, dict): + raise TypeError("First argument must be a dictionary") + if keys is None: + raise TypeError("Keys argument cannot be None") + if not 0.0 <= similarity_threshold <= 1.0: + raise ValueError("similarity_threshold must be between 0.0 and 1.0") + + # Extract expected keys + fields_set = set(keys) if isinstance(keys, list) else set(keys.keys()) + if not fields_set: + return d_.copy() # Return copy of original if no expected keys + + # Initialize output dictionary and tracking sets + corrected_out = {} + matched_expected = set() + matched_input = set() + + # Get similarity function + if isinstance(similarity_algo, str): + if similarity_algo not in SIMILARITY_ALGO_MAP: + raise ValueError(f"Unknown similarity algorithm: {similarity_algo}") + similarity_func = SIMILARITY_ALGO_MAP[similarity_algo] + else: + similarity_func = similarity_algo + + # First pass: exact matches + for key in d_: + if key in fields_set: + corrected_out[key] = d_[key] + matched_expected.add(key) + matched_input.add(key) + + # Second pass: fuzzy matching if enabled + if fuzzy_match: + remaining_input = set(d_.keys()) - matched_input + remaining_expected = fields_set - matched_expected + + for key in remaining_input: + if not remaining_expected: + break + + matches = string_similarity( + key, + list(remaining_expected), + algorithm=similarity_func, + threshold=similarity_threshold, + return_most_similar=True, + ) + + if matches: + match = matches + corrected_out[match] = d_[key] + matched_expected.add(match) + matched_input.add(key) + remaining_expected.remove(match) + elif handle_unmatched == "ignore": + corrected_out[key] = d_[key] + + # Handle unmatched keys based on handle_unmatched parameter + unmatched_input = set(d_.keys()) - matched_input + unmatched_expected = fields_set - matched_expected + + if handle_unmatched == "raise" and unmatched_input: + raise ValueError(f"Unmatched keys found: {unmatched_input}") + + elif handle_unmatched == "ignore": + for key in unmatched_input: + corrected_out[key] = d_[key] + + elif handle_unmatched in ("fill", "force"): + # Fill missing expected keys + for key in unmatched_expected: + if fill_mapping and key in fill_mapping: + corrected_out[key] = fill_mapping[key] + else: + corrected_out[key] = fill_value + + # For "fill" mode, also keep unmatched original keys + if handle_unmatched == "fill": + for key in unmatched_input: + corrected_out[key] = d_[key] + + # Check strict mode + if strict and unmatched_expected: + raise ValueError(f"Missing required keys: {unmatched_expected}") + + return corrected_out + + +__all__ = ["validate_keys"] diff --git a/autogen/structure/utils/validate_mapping.py b/autogen/structure/utils/validate_mapping.py new file mode 100644 index 0000000000..be1fae5bd7 --- /dev/null +++ b/autogen/structure/utils/validate_mapping.py @@ -0,0 +1,103 @@ +# Copyright (c) 2023 - 2024, Owners of https://github.com/ag2ai +# +# SPDX-License-Identifier: Apache-2.0 +# +# Portions derived from https://github.com/lion-agi/lion-core are under the Apache-2.0 License +# SPDX-License-Identifier: Apache-2.0 +from collections.abc import Callable, Sequence +from typing import Any, Literal + +from .string_similarity import SIMILARITY_TYPE +from .to_json import to_dict, to_json +from .validate_keys import KeysDict, validate_keys + + +def validate_mapping( + d: Any, + keys: Sequence[str] | KeysDict, + /, + *, + similarity_algo: SIMILARITY_TYPE | Callable[[str, str], float] = "jaro_winkler", + similarity_threshold: float = 0.85, + fuzzy_match: bool = True, + handle_unmatched: Literal["ignore", "raise", "remove", "fill", "force"] = "ignore", + fill_value: Any = None, + fill_mapping: dict[str, Any] | None = None, + strict: bool = False, + suppress_conversion_errors: bool = False, +) -> dict[str, Any]: + """ + Validate and correct any input into a dictionary with expected keys. + + Args: + d: Input to validate. Can be: + - Dictionary + - JSON string or markdown code block + - XML string + - Object with to_dict/model_dump method + - Any type convertible to dictionary + keys: List of expected keys or dictionary mapping keys to types. + similarity_algo: String similarity algorithm or custom function. + similarity_threshold: Minimum similarity score for fuzzy matching. + fuzzy_match: If True, use fuzzy matching for key correction. + handle_unmatched: How to handle unmatched keys: + - "ignore": Keep unmatched keys + - "raise": Raise error for unmatched keys + - "remove": Remove unmatched keys + - "fill": Fill missing keys with default values + - "force": Combine "fill" and "remove" behaviors + fill_value: Default value for filling unmatched keys. + fill_mapping: Dictionary mapping keys to default values. + strict: Raise error if any expected key is missing. + suppress_conversion_errors: Return empty dict on conversion errors. + + Returns: + Validated and corrected dictionary. + + Raises: + ValueError: If input cannot be converted or validation fails. + TypeError: If input types are invalid. + """ + if d is None: + raise TypeError("Input cannot be None") + + # Try converting to dictionary + try: + if isinstance(d, str): + # First try to_json for JSON strings and code blocks + try: + json_result = to_json(d) + dict_input = json_result[0] if isinstance(json_result, list) else json_result + except Exception: + # Fall back to to_dict for other string formats + dict_input = to_dict(d, str_type="json", fuzzy_parse=True, suppress=True) + else: + dict_input = to_dict(d, use_model_dump=True, fuzzy_parse=True, suppress=True) + + if not isinstance(dict_input, dict): + if suppress_conversion_errors: + dict_input = {} + else: + raise ValueError(f"Failed to convert input to dictionary: {type(dict_input)}") + + except Exception as e: + if suppress_conversion_errors: + dict_input = {} + else: + raise ValueError(f"Failed to convert input to dictionary: {e}") + + # Validate the dictionary + return validate_keys( + dict_input, + keys, + similarity_algo=similarity_algo, + similarity_threshold=similarity_threshold, + fuzzy_match=fuzzy_match, + handle_unmatched=handle_unmatched, + fill_value=fill_value, + fill_mapping=fill_mapping, + strict=strict, + ) + + +__all__ = ["validate_mapping"] diff --git a/autogen/structure/utils/xml_parser.py b/autogen/structure/utils/xml_parser.py new file mode 100644 index 0000000000..35890f91f2 --- /dev/null +++ b/autogen/structure/utils/xml_parser.py @@ -0,0 +1,146 @@ +# Copyright (c) 2023 - 2024, Owners of https://github.com/ag2ai +# +# SPDX-License-Identifier: Apache-2.0 +# +# Portions derived from https://github.com/lion-agi/lion-core are under the Apache-2.0 License +# SPDX-License-Identifier: Apache-2.0 + +import re +import xml.etree.ElementTree as ET +from typing import Any + + +def xml_to_dict( + xml_string: str, + /, + suppress=False, + remove_root: bool = True, + root_tag: str = None, +) -> dict[str, Any]: + """ + Parse an XML string into a nested dictionary structure. + + This function converts an XML string into a dictionary where: + - Element tags become dictionary keys + - Text content is assigned directly to the tag key if there are no children + - Attributes are stored in a '@attributes' key + - Multiple child elements with the same tag are stored as lists + + Args: + xml_string: The XML string to parse. + + Returns: + A dictionary representation of the XML structure. + + Raises: + ValueError: If the XML is malformed or parsing fails. + """ + try: + a = XMLParser(xml_string).parse() + if remove_root and (root_tag or "root") in a: + a = a[root_tag or "root"] + return a + except ValueError as e: + if not suppress: + raise e + + +def dict_to_xml(data: dict, /, root_tag: str = "root") -> str: + + root = ET.Element(root_tag) + + def convert(dict_obj: dict, parent: Any) -> None: + for key, val in dict_obj.items(): + if isinstance(val, dict): + element = ET.SubElement(parent, key) + convert(dict_obj=val, parent=element) + else: + element = ET.SubElement(parent, key) + element.text = str(object=val) + + convert(dict_obj=data, parent=root) + return ET.tostring(root, encoding="unicode") + + +class XMLParser: + def __init__(self, xml_string: str): + self.xml_string = xml_string.strip() + self.index = 0 + + def parse(self) -> dict[str, Any]: + """Parse the XML string and return the root element as a dictionary.""" + return self._parse_element() + + def _parse_element(self) -> dict[str, Any]: + """Parse a single XML element and its children.""" + self._skip_whitespace() + if self.xml_string[self.index] != "<": + raise ValueError(f"Expected '<', found '{self.xml_string[self.index]}'") + + tag, attributes = self._parse_opening_tag() + children: dict[str, str | list | dict] = {} + text = "" + + while self.index < len(self.xml_string): + self._skip_whitespace() + if self.xml_string.startswith(" tuple[str, dict[str, str]]: + """Parse an opening XML tag and its attributes.""" + match = re.match( + r'<(\w+)((?:\s+\w+="[^"]*")*)\s*/?>', + self.xml_string[self.index :], # noqa + ) + if not match: + raise ValueError("Invalid opening tag") + self.index += match.end() + tag = match.group(1) + attributes = dict(re.findall(r'(\w+)="([^"]*)"', match.group(2))) + return tag, attributes + + def _parse_closing_tag(self) -> str: + """Parse a closing XML tag.""" + match = re.match(r"", self.xml_string[self.index :]) # noqa + if not match: + raise ValueError("Invalid closing tag") + self.index += match.end() + return match.group(1) + + def _parse_text(self) -> str: + """Parse text content between XML tags.""" + start = self.index + while self.index < len(self.xml_string) and self.xml_string[self.index] != "<": + self.index += 1 + return self.xml_string[start : self.index] # noqa + + def _skip_whitespace(self) -> None: + """Skip any whitespace characters at the current parsing position.""" + p_ = len(self.xml_string[self.index :]) # noqa + m_ = len(self.xml_string[self.index :].lstrip()) # noqa + + self.index += p_ - m_