From 6d39b4d285c49f04cc9197f0e720d7575b155b9b Mon Sep 17 00:00:00 2001 From: OceanLi <122793010+ohdearquant@users.noreply.github.com> Date: Tue, 19 Nov 2024 22:56:03 -0500 Subject: [PATCH] updated structure util into a folder --- autogen/structure/__init__.py | 0 autogen/structure/utils/__init__.py | 19 + .../structure/utils/break_down_pydantic.py | 77 +++ autogen/structure/utils/string_similarity.py | 297 ++++++++++ autogen/structure/utils/to_json.py | 548 ++++++++++++++++++ autogen/structure/utils/validate_keys.py | 146 +++++ autogen/structure/utils/validate_mapping.py | 99 ++++ autogen/structure/utils/xml_parser.py | 143 +++++ autogen/structure_utils.py | 544 ----------------- 9 files changed, 1329 insertions(+), 544 deletions(-) create mode 100644 autogen/structure/__init__.py create mode 100644 autogen/structure/utils/__init__.py create mode 100644 autogen/structure/utils/break_down_pydantic.py create mode 100644 autogen/structure/utils/string_similarity.py create mode 100644 autogen/structure/utils/to_json.py create mode 100644 autogen/structure/utils/validate_keys.py create mode 100644 autogen/structure/utils/validate_mapping.py create mode 100644 autogen/structure/utils/xml_parser.py delete mode 100644 autogen/structure_utils.py diff --git a/autogen/structure/__init__.py b/autogen/structure/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/autogen/structure/utils/__init__.py b/autogen/structure/utils/__init__.py new file mode 100644 index 0000000000..7fa4303031 --- /dev/null +++ b/autogen/structure/utils/__init__.py @@ -0,0 +1,19 @@ +from .break_down_pydantic import break_down_pydantic_annotation +from .string_similarity import SIMILARITY_TYPE, string_similarity +from .to_json import fuzzy_parse_json, to_dict, to_json +from .validate_keys import validate_keys +from .validate_mapping import validate_mapping +from .xml_parser import dict_to_xml, xml_to_dict + +__all__ = [ + "break_down_pydantic_annotation", + "string_similarity", + "SIMILARITY_TYPE", + "to_json", + "to_dict", + "fuzzy_parse_json", + "validate_keys", + "validate_mapping", + "xml_to_dict", + "dict_to_xml", +] diff --git a/autogen/structure/utils/break_down_pydantic.py b/autogen/structure/utils/break_down_pydantic.py new file mode 100644 index 0000000000..661b6271ef --- /dev/null +++ b/autogen/structure/utils/break_down_pydantic.py @@ -0,0 +1,77 @@ +# copied from https://github.com/lion-agi/lion-os/blob/main/lion/integrations/pydantic_/break_down_annotation.py +# copyright by HaiyangLi, APACHE LICENSE 2.0 + +from inspect import isclass +from typing import Any, Dict, TypeVar, get_args, get_origin + +from pydantic import BaseModel + +T = TypeVar("T", bound=BaseModel) + + +def break_down_pydantic_annotation( + model: type[T], max_depth: int | None = None, current_depth: int = 0 +) -> Dict[str, Any]: + """ + Break down the type annotations of a Pydantic model into a dictionary. + + This function recursively processes Pydantic models, converting their + field annotations into a dictionary structure. It handles nested models + and lists of models. + + Args: + model: The Pydantic model class to break down. + max_depth: Maximum depth for recursion. None for no limit. + current_depth: Current recursion depth (used internally). + + Returns: + A dictionary representing the structure of the model's annotations. + + Raises: + TypeError: If the input is not a Pydantic model. + RecursionError: If max recursion depth is reached. + + Example: + >>> from pydantic import BaseModel + >>> class SubModel(BaseModel): + ... field1: int + ... field2: str + >>> class MainModel(BaseModel): + ... sub: SubModel + ... items: list[SubModel] + >>> result = break_down_annotation(MainModel) + >>> print(result) + { + 'sub': {'field1': , 'field2': }, + 'items': [{'field1': , 'field2': }] + } + """ + + if not _is_pydantic_model(model): + raise TypeError("Input must be a Pydantic model") + + if max_depth is not None and current_depth >= max_depth: + raise RecursionError("Maximum recursion depth reached") + + out: Dict[str, Any] = {} + for k, v in model.__annotations__.items(): + origin = get_origin(v) + if _is_pydantic_model(v): + out[k] = break_down_pydantic_annotation(v, max_depth, current_depth + 1) + elif origin is list: + args = get_args(v) + if args and _is_pydantic_model(args[0]): + out[k] = [break_down_pydantic_annotation(args[0], max_depth, current_depth + 1)] + else: + out[k] = [args[0] if args else Any] + else: + out[k] = v + + return out + + +def _is_pydantic_model(x: Any) -> bool: + return isclass(x) and issubclass(x, BaseModel) + + +__all__ = ["break_down_pydantic_annotation"] diff --git a/autogen/structure/utils/string_similarity.py b/autogen/structure/utils/string_similarity.py new file mode 100644 index 0000000000..4c4c77c4d5 --- /dev/null +++ b/autogen/structure/utils/string_similarity.py @@ -0,0 +1,297 @@ +# copied from https://github.com/lion-agi/lion-os/tree/main/lion/libs/string_similarity +# copyright by HaiyangLi, APACHE LICENSE 2.0 + +from dataclasses import dataclass +from difflib import SequenceMatcher +from itertools import product +from typing import Callable, Dict, List, Optional, Sequence, TypeVar, Union + +from pydantic import BaseModel +from typing_extensions import Literal + +T = TypeVar("T", bound=BaseModel) + + +def cosine_similarity(s1: str, s2: str) -> float: + """Calculate the cosine similarity between two strings. + + Args: + s1: First input string + s2: Second input string + + Returns: + float: Cosine similarity score between 0 and 1 + """ + if not s1 or not s2: + return 0.0 + + set1, set2 = set(s1), set(s2) + intersection = set1.intersection(set2) + + if not set1 or not set2: + return 0.0 + + return len(intersection) / ((len(set1) * len(set2)) ** 0.5) + + +def hamming_similarity(s1: str, s2: str) -> float: + """Calculate the Hamming similarity between two strings. + + The strings must be of equal length. Returns the proportion of positions + at which corresponding symbols are the same. + + Args: + s1: First input string + s2: Second input string + + Returns: + float: Hamming similarity score between 0 and 1 + """ + if not s1 or not s2 or len(s1) != len(s2): + return 0.0 + + matches = sum(c1 == c2 for c1, c2 in zip(s1, s2)) + return matches / len(s1) + + +def jaro_distance(s: str, t: str) -> float: + """Calculate the Jaro distance between two strings. + + Args: + s: First input string + t: Second input string + + Returns: + float: Jaro distance score between 0 and 1 + """ + s_len = len(s) + t_len = len(t) + + if s_len == 0 and t_len == 0: + return 1.0 + elif s_len == 0 or t_len == 0: + return 0.0 + + match_distance = (max(s_len, t_len) // 2) - 1 + match_distance = max(0, match_distance) # Ensure non-negative + + s_matches = [False] * s_len + t_matches = [False] * t_len + + matches = 0 + transpositions = 0 + + # Identify matches + for i in range(s_len): + start = max(0, i - match_distance) + end = min(i + match_distance + 1, t_len) + + for j in range(start, end): + if t_matches[j] or s[i] != t[j]: + continue + s_matches[i] = t_matches[j] = True + matches += 1 + break + + if matches == 0: + return 0.0 + + # Count transpositions + k = 0 + for i in range(s_len): + if not s_matches[i]: + continue + while not t_matches[k]: + k += 1 + if s[i] != t[k]: + transpositions += 1 + k += 1 + + transpositions //= 2 + + return (matches / s_len + matches / t_len + (matches - transpositions) / matches) / 3.0 + + +def jaro_winkler_similarity(s: str, t: str, scaling: float = 0.1) -> float: + """Calculate the Jaro-Winkler similarity between two strings. + + Args: + s: First input string + t: Second input string + scaling: Scaling factor for common prefix adjustment + + Returns: + float: Jaro-Winkler similarity score between 0 and 1 + + Raises: + ValueError: If scaling factor is not between 0 and 0.25 + """ + if not 0 <= scaling <= 0.25: + raise ValueError("Scaling factor must be between 0 and 0.25") + + jaro_sim = jaro_distance(s, t) + + # Find length of common prefix (up to 4 chars) + prefix_len = 0 + for s_char, t_char in zip(s, t): + if s_char != t_char: + break + prefix_len += 1 + if prefix_len == 4: + break + + return jaro_sim + (prefix_len * scaling * (1 - jaro_sim)) + + +def levenshtein_distance(a: str, b: str) -> int: + """Calculate the Levenshtein (edit) distance between two strings. + + Args: + a: First input string + b: Second input string + + Returns: + int: Minimum number of single-character edits needed to change one + string into the other + """ + if not a: + return len(b) + if not b: + return len(a) + + m, n = len(a), len(b) + d = [[0] * (n + 1) for _ in range(m + 1)] + + for i in range(m + 1): + d[i][0] = i + for j in range(n + 1): + d[0][j] = j + + for i, j in product(range(1, m + 1), range(1, n + 1)): + cost = 0 if a[i - 1] == b[j - 1] else 1 + d[i][j] = min( + d[i - 1][j] + 1, # deletion + d[i][j - 1] + 1, # insertion + d[i - 1][j - 1] + cost, # substitution + ) + + return d[m][n] + + +def levenshtein_similarity(s1: str, s2: str) -> float: + """Calculate the Levenshtein similarity between two strings. + + Converts Levenshtein distance to a similarity score between 0 and 1. + + Args: + s1: First input string + s2: Second input string + + Returns: + float: Levenshtein similarity score between 0 and 1 + """ + if not s1 and not s2: + return 1.0 + if not s1 or not s2: + return 0.0 + + distance = levenshtein_distance(s1, s2) + max_len = max(len(s1), len(s2)) + return 1 - (distance / max_len) + + +# Type definitions +SIMILARITY_ALGO_MAP: Dict[str, Callable[[str, str], float]] = { + "jaro_winkler": jaro_winkler_similarity, + "levenshtein": levenshtein_similarity, + "sequence_matcher": lambda s1, s2: SequenceMatcher(None, s1, s2).ratio(), + "hamming": hamming_similarity, + "cosine": cosine_similarity, +} + + +SIMILARITY_TYPE = Literal[ + "jaro_winkler", + "levenshtein", + "sequence_matcher", + "hamming", + "cosine", +] + + +@dataclass(frozen=True) +class MatchResult: + """Represents a string matching result.""" + + word: str + score: float + index: int + + +def string_similarity( + word: str, + correct_words: Sequence[str], + algorithm: SIMILARITY_TYPE | Callable[[str, str], float] = "jaro_winkler", + threshold: float = 0.0, + case_sensitive: bool = False, + return_most_similar: bool = False, +) -> Optional[Union[str, List[str]]]: + """Find similar strings using specified similarity algorithm.""" + if not correct_words: + raise ValueError("correct_words must not be empty") + + if not 0.0 <= threshold <= 1.0: + raise ValueError("threshold must be between 0.0 and 1.0") + + # Convert inputs to strings + compare_word = str(word) + original_words = [str(w) for w in correct_words] + + # Handle case sensitivity + if not case_sensitive: + compare_word = compare_word.lower() + compare_words = [w.lower() for w in original_words] + else: + compare_words = original_words.copy() + + # Get scoring function + if isinstance(algorithm, str): + score_func = SIMILARITY_ALGO_MAP.get(algorithm) + if score_func is None: + raise ValueError(f"Unsupported algorithm: {algorithm}") + elif callable(algorithm): + score_func = algorithm + else: + raise ValueError("algorithm must be a string specifying a built-in algorithm or " "a callable") + + # Calculate similarities + results = [] + for idx, (orig_word, comp_word) in enumerate(zip(original_words, compare_words)): + # Skip different length strings for hamming similarity + if algorithm == "hamming" and len(comp_word) != len(compare_word): + continue + + score = score_func(compare_word, comp_word) + if score >= threshold: + results.append(MatchResult(orig_word, score, idx)) + + # Return None if no matches + if not results: + return None + + # Sort by score (descending) and index (ascending) for stable ordering + results.sort(key=lambda x: (-x.score, x.index)) + + # Return results + if return_most_similar: + return results[0].word + + # Filter exact matches for case sensitive comparisons + if case_sensitive: + max_score = results[0].score + results = [r for r in results if r.score == max_score] + + return [r.word for r in results] + + +__all__ = ["string_similarity", "SIMILARITY_TYPE"] diff --git a/autogen/structure/utils/to_json.py b/autogen/structure/utils/to_json.py new file mode 100644 index 0000000000..1e64bb6db7 --- /dev/null +++ b/autogen/structure/utils/to_json.py @@ -0,0 +1,548 @@ +# copied from https://github.com/lion-agi/lion-os/tree/main/lion/libs/parse/json +# copyright by HaiyangLi, APACHE LICENSE 2.0 + +import json +import re +from collections.abc import Callable, Iterable, Mapping +from typing import Any, Dict, List, Sequence, Union, overload + +from pydantic_core import PydanticUndefinedType +from typing_extensions import Literal + +from .xml_parser import xml_to_dict + + +def to_json(string: str | List[str], /, fuzzy_parse: bool = False) -> Union[List[Dict[str, Any]], Dict]: + """Extract and parse JSON content from a string or markdown code blocks. + + This function attempts to parse JSON directly from the input string first. + If that fails, it looks for JSON content within markdown code blocks + (denoted by ```json). + + Args: + string: Input string or list of strings to parse. If a list is provided, + it will be joined with newlines. + + Returns: + - A dictionary if a single JSON object is found + - A list of dictionaries if multiple JSON objects are found + - An empty list if no valid JSON is found + + Examples: + >>> to_json('{"key": "value"}') + {'key': 'value'} + + >>> to_json(''' + ... ```json + ... {"key": "value"} + ... ``` + ... ''') + {'key': 'value'} + + >>> to_json(''' + ... ```json + ... {"key1": "value1"} + ... ``` + ... ```json + ... {"key2": "value2"} + ... ``` + ... ''') + [{'key1': 'value1'}, {'key2': 'value2'}] + """ + + if isinstance(string, list): + string = "\n".join(string) + + # Try direct JSON parsing first + try: + if fuzzy_parse: + return fuzzy_parse_json(string) + return json.loads(string) + except Exception: + pass + + # Look for JSON in markdown code blocks + pattern = r"```json\s*(.*?)\s*```" + matches = re.findall(pattern, string, re.DOTALL) + + if not matches: + return [] + + if len(matches) == 1: + return json.loads(matches[0]) + + if fuzzy_parse: + return [fuzzy_parse_json(match) for match in matches] + return [json.loads(match) for match in matches] + + +def fuzzy_parse_json(str_to_parse: str, /) -> Union[Dict[str, Any], List[Dict[str, Any]]]: + """Parse a JSON string with automatic fixing of common formatting issues. + + Args: + str_to_parse: The JSON string to parse + + Returns: + The parsed JSON object as a dictionary + + Raises: + ValueError: If the string cannot be parsed as valid JSON + TypeError: If the input is not a string or the result is not a dict + """ + if not isinstance(str_to_parse, str): + raise TypeError("Input must be a string") + + if not str_to_parse.strip(): + raise ValueError("Input string is empty") + + try: + return json.loads(str_to_parse) + except Exception: + pass + + cleaned = _clean_json_string(str_to_parse) + try: + return json.loads(cleaned) + except Exception: + pass + + try: + fixed = fix_json_string(cleaned) + return json.loads(fixed) + except Exception as e: + raise ValueError(f"Failed to parse JSON string after all fixing attempts: {e}") from e + + +def _clean_json_string(s: str) -> str: + """Clean and standardize a JSON string.""" + s = re.sub(r"(? str: + """Fix a JSON string by ensuring all brackets are properly closed. + + Args: + str_to_parse: JSON string to fix + + Returns: + Fixed JSON string with proper bracket closure + + Raises: + ValueError: If mismatched or extra closing brackets are found + """ + if not str_to_parse: + raise ValueError("Input string is empty") + + brackets = {"{": "}", "[": "]"} + open_brackets = [] + pos = 0 + length = len(str_to_parse) + + while pos < length: + char = str_to_parse[pos] + + # Handle escape sequences + if char == "\\": + pos += 2 # Skip escape sequence + continue + + # Handle string content + if char == '"': + pos += 1 + # Skip until closing quote, accounting for escapes + while pos < length: + if str_to_parse[pos] == "\\": + pos += 2 # Skip escape sequence + continue + if str_to_parse[pos] == '"': + break + pos += 1 + pos += 1 + continue + + # Handle brackets + if char in brackets: + open_brackets.append(brackets[char]) + elif char in brackets.values(): + if not open_brackets: + raise ValueError(f"Extra closing bracket '{char}' at position {pos}") + if open_brackets[-1] != char: + raise ValueError(f"Mismatched bracket '{char}' at position {pos}") + open_brackets.pop() + + pos += 1 + + # Add missing closing brackets + closing_brackets = "".join(reversed(open_brackets)) + return str_to_parse + closing_brackets + + +@overload +def to_dict(input_: type[None] | PydanticUndefinedType, /) -> dict[str, Any]: ... + + +@overload +def to_dict(input_: Mapping, /) -> dict[str, Any]: ... + + +@overload +def to_dict(input_: set, /) -> dict[Any, Any]: ... + + +@overload +def to_dict(input_: Sequence, /) -> dict[str, Any]: ... + + +@overload +def to_dict( + input_: Any, + /, + *, + use_model_dump: bool = True, + fuzzy_parse: bool = False, + suppress: bool = False, + str_type: Literal["json", "xml"] | None = "json", + parser: Callable[[str], Any] | None = None, + recursive: bool = False, + max_recursive_depth: int = None, + exclude_types: tuple = (), + recursive_python_only: bool = True, + **kwargs: Any, +) -> dict[str, Any]: ... + + +def to_dict( + input_: Any, + /, + *, + use_model_dump: bool = True, + fuzzy_parse: bool = False, + suppress: bool = False, + str_type: Literal["json", "xml"] | None = "json", + parser: Callable[[str], Any] | None = None, + recursive: bool = False, + max_recursive_depth: int = None, + exclude_types: tuple = (), + recursive_python_only: bool = True, + **kwargs: Any, +): + """ + Convert various input types to a dictionary, with optional recursive processing. + + Args: + input_: The input to convert. + use_model_dump: Use model_dump() for Pydantic models if available. + fuzzy_parse: Use fuzzy parsing for string inputs. + suppress: Return empty dict on errors if True. + str_type: Input string type ("json" or "xml"). + parser: Custom parser function for string inputs. + recursive: Enable recursive conversion of nested structures. + max_recursive_depth: Maximum recursion depth (default 5, max 10). + exclude_types: Tuple of types to exclude from conversion. + recursive_python_only: If False, attempts to convert custom types recursively. + **kwargs: Additional arguments for parsing functions. + + Returns: + dict[str, Any]: A dictionary derived from the input. + + Raises: + ValueError: If parsing fails and suppress is False. + + Examples: + >>> to_dict({"a": 1, "b": [2, 3]}) + {'a': 1, 'b': [2, 3]} + >>> to_dict('{"x": 10}', str_type="json") + {'x': 10} + >>> to_dict({"a": {"b": {"c": 1}}}, recursive=True, max_recursive_depth=2) + {'a': {'b': {'c': 1}}} + """ + try: + if recursive: + return recursive_to_dict( + input_, + use_model_dump=use_model_dump, + fuzzy_parse=fuzzy_parse, + str_type=str_type, + parser=parser, + max_recursive_depth=max_recursive_depth, + exclude_types=exclude_types, + recursive_custom_types=not recursive_python_only, + **kwargs, + ) + + return _to_dict( + input_, + fuzzy_parse=fuzzy_parse, + parser=parser, + str_type=str_type, + use_model_dump=use_model_dump, + exclude_types=exclude_types, + **kwargs, + ) + except Exception as e: + if suppress: + return {} + raise e + + +def _to_dict( + input_: Any, + /, + *, + use_model_dump: bool = True, + fuzzy_parse: bool = False, + str_type: Literal["json", "xml"] | None = "json", + parser: Callable[[str], Any] | None = None, + exclude_types: tuple = (), + **kwargs: Any, +) -> dict[str, Any]: + """Convert various input types to a dictionary. + + Handles multiple input types, including None, Mappings, strings, and more. + + Args: + input_: The input to convert to a dictionary. + use_model_dump: Use model_dump() for Pydantic models if available. + fuzzy_parse: Use fuzzy parsing for string inputs. + suppress: Return empty dict on parsing errors if True. + str_type: Input string type, either "json" or "xml". + parser: Custom parser function for string inputs. + **kwargs: Additional arguments passed to parsing functions. + + Returns: + A dictionary derived from the input. + + Raises: + ValueError: If string parsing fails and suppress is False. + + Examples: + >>> to_dict({"a": 1, "b": 2}) + {'a': 1, 'b': 2} + >>> to_dict('{"x": 10}', str_type="json") + {'x': 10} + >>> to_dict("1", str_type="xml") + {'a': '1'} + """ + if isinstance(exclude_types, tuple) and len(exclude_types) > 0: + if isinstance(input_, exclude_types): + return input_ + + if isinstance(input_, dict): + return input_ + + if use_model_dump and hasattr(input_, "model_dump"): + return input_.model_dump(**kwargs) + + if isinstance(input_, type(None) | PydanticUndefinedType): + return _undefined_to_dict(input_) + if isinstance(input_, Mapping): + return _mapping_to_dict(input_) + + if isinstance(input_, str): + if fuzzy_parse: + parser = fuzzy_parse_json + try: + a = _str_to_dict( + input_, + str_type=str_type, + parser=parser, + **kwargs, + ) + if isinstance(a, dict): + return a + except Exception as e: + raise ValueError("Failed to convert string to dictionary") from e + + if isinstance(input_, set): + return _set_to_dict(input_) + if isinstance(input_, Iterable): + return _iterable_to_dict(input_) + + return _generic_type_to_dict(input_, **kwargs) + + +def _recursive_to_dict( + input_: Any, + /, + *, + max_recursive_depth: int, + current_depth: int = 0, + recursive_custom_types: bool = False, + exclude_types: tuple = (), + **kwargs: Any, +) -> Any: + + if current_depth >= max_recursive_depth: + return input_ + + if isinstance(input_, str): + try: + # Attempt to parse the string + parsed = _to_dict(input_, **kwargs) + # Recursively process the parsed result + return _recursive_to_dict( + parsed, + max_recursive_depth=max_recursive_depth, + current_depth=current_depth + 1, + recursive_custom_types=recursive_custom_types, + exclude_types=exclude_types, + **kwargs, + ) + except Exception: + # Return the original string if parsing fails + return input_ + + elif isinstance(input_, dict): + # Recursively process dictionary values + return { + key: _recursive_to_dict( + value, + max_recursive_depth=max_recursive_depth, + current_depth=current_depth + 1, + recursive_custom_types=recursive_custom_types, + exclude_types=exclude_types, + **kwargs, + ) + for key, value in input_.items() + } + + elif isinstance(input_, (list, tuple)): + # Recursively process list or tuple elements + processed = [ + _recursive_to_dict( + element, + max_recursive_depth=max_recursive_depth, + current_depth=current_depth + 1, + recursive_custom_types=recursive_custom_types, + exclude_types=exclude_types, + **kwargs, + ) + for element in input_ + ] + return type(input_)(processed) + + elif recursive_custom_types: + # Process custom classes if enabled + try: + obj_dict = to_dict(input_, **kwargs) + return _recursive_to_dict( + obj_dict, + max_recursive_depth=max_recursive_depth, + current_depth=current_depth + 1, + recursive_custom_types=recursive_custom_types, + exclude_types=exclude_types, + **kwargs, + ) + except Exception: + return input_ + + else: + # Return the input as is for other data types + return input_ + + +def recursive_to_dict( + input_: Any, + /, + *, + max_recursive_depth: int = None, + exclude_types: tuple = (), + recursive_custom_types: bool = False, + **kwargs: Any, +) -> Any: + + if not isinstance(max_recursive_depth, int): + max_recursive_depth = 5 + else: + if max_recursive_depth < 0: + raise ValueError("max_recursive_depth must be a non-negative integer") + if max_recursive_depth == 0: + return input_ + if max_recursive_depth > 10: + raise ValueError("max_recursive_depth must be less than or equal to 10") + + return _recursive_to_dict( + input_, + max_recursive_depth=max_recursive_depth, + current_depth=0, + recursive_custom_types=recursive_custom_types, + exclude_types=exclude_types, + **kwargs, + ) + + +def _undefined_to_dict( + input_: type[None] | PydanticUndefinedType, + /, +) -> dict: + return {} + + +def _mapping_to_dict(input_: Mapping, /) -> dict: + return dict(input_) + + +def _str_to_dict( + input_: str, + /, + *, + str_type: Literal["json", "xml"] | None = "json", + parser: Callable[[str], Any] | None = None, + **kwargs: Any, +) -> dict[str, Any] | list[dict[str, Any]]: + """Handle string inputs.""" + if not input_: + return {} + + if str_type == "json": + try: + return json.loads(input_, **kwargs) if parser is None else parser(input_, **kwargs) + except json.JSONDecodeError as e: + raise ValueError("Failed to parse JSON string") from e + + if str_type == "xml": + try: + if parser is None: + return xml_to_dict(input_, **kwargs) + return parser(input_, **kwargs) + except Exception as e: + raise ValueError("Failed to parse XML string") from e + + raise ValueError(f"Unsupported string type for `to_dict`: {str_type}, it should " "be 'json' or 'xml'.") + + +def _set_to_dict(input_: set, /) -> dict: + return {value: value for value in input_} + + +def _iterable_to_dict(input_: Iterable, /) -> dict: + return {idx: v for idx, v in enumerate(input_)} + + +def _generic_type_to_dict( + input_, + /, + **kwargs: Any, +) -> dict[str, Any]: + + try: + for method in ["to_dict", "dict", "json", "to_json"]: + if hasattr(input_, method): + result = getattr(input_, method)(**kwargs) + return json.loads(result) if isinstance(result, str) else result + except Exception: + pass + + if hasattr(input_, "__dict__"): + return input_.__dict__ + + try: + return dict(input_) + except Exception as e: + raise ValueError(f"Unable to convert input to dictionary: {e}") + + +__all__ = ["to_json", "fuzzy_parse_json", "to_dict"] diff --git a/autogen/structure/utils/validate_keys.py b/autogen/structure/utils/validate_keys.py new file mode 100644 index 0000000000..7cd0184a18 --- /dev/null +++ b/autogen/structure/utils/validate_keys.py @@ -0,0 +1,146 @@ +# copied from https://github.com/lion-agi/lion-os/blob/main/lion/libs/parse/validate/keys.py +# copyright by HaiyangLi, APACHE LICENSE 2.0 + +from collections.abc import Callable, Sequence +from typing import Any, Literal, TypedDict + +from .string_similarity import SIMILARITY_ALGO_MAP, SIMILARITY_TYPE, string_similarity + + +class KeysDict(TypedDict): + """Dictionary mapping keys to their expected types.""" + + pass + + +def validate_keys( + d_: dict[str, Any], + keys: Sequence[str] | KeysDict, + /, + *, + similarity_algo: SIMILARITY_TYPE | Callable[[str, str], float] = "jaro_winkler", + similarity_threshold: float = 0.85, + fuzzy_match: bool = True, + handle_unmatched: Literal["ignore", "raise", "remove", "fill", "force"] = "ignore", + fill_value: Any = None, + fill_mapping: dict[str, Any] | None = None, + strict: bool = False, +) -> dict[str, Any]: + """ + Validate and correct dictionary keys based on expected keys using string similarity. + + Args: + d_: The dictionary to validate and correct keys for. + keys: List of expected keys or dictionary mapping keys to types. + similarity_algo: String similarity algorithm to use or custom function. + similarity_threshold: Minimum similarity score for fuzzy matching. + fuzzy_match: If True, use fuzzy matching for key correction. + handle_unmatched: Specifies how to handle unmatched keys: + - "ignore": Keep unmatched keys in output. + - "raise": Raise ValueError if unmatched keys exist. + - "remove": Remove unmatched keys from output. + - "fill": Fill unmatched keys with default value/mapping. + - "force": Combine "fill" and "remove" behaviors. + fill_value: Default value for filling unmatched keys. + fill_mapping: Dictionary mapping unmatched keys to default values. + strict: If True, raise ValueError if any expected key is missing. + + Returns: + A new dictionary with validated and corrected keys. + + Raises: + ValueError: If validation fails based on specified parameters. + TypeError: If input types are invalid. + AttributeError: If key validation fails. + """ + # Input validation + if not isinstance(d_, dict): + raise TypeError("First argument must be a dictionary") + if keys is None: + raise TypeError("Keys argument cannot be None") + if not 0.0 <= similarity_threshold <= 1.0: + raise ValueError("similarity_threshold must be between 0.0 and 1.0") + + # Extract expected keys + fields_set = set(keys) if isinstance(keys, list) else set(keys.keys()) + if not fields_set: + return d_.copy() # Return copy of original if no expected keys + + # Initialize output dictionary and tracking sets + corrected_out = {} + matched_expected = set() + matched_input = set() + + # Get similarity function + if isinstance(similarity_algo, str): + if similarity_algo not in SIMILARITY_ALGO_MAP: + raise ValueError(f"Unknown similarity algorithm: {similarity_algo}") + similarity_func = SIMILARITY_ALGO_MAP[similarity_algo] + else: + similarity_func = similarity_algo + + # First pass: exact matches + for key in d_: + if key in fields_set: + corrected_out[key] = d_[key] + matched_expected.add(key) + matched_input.add(key) + + # Second pass: fuzzy matching if enabled + if fuzzy_match: + remaining_input = set(d_.keys()) - matched_input + remaining_expected = fields_set - matched_expected + + for key in remaining_input: + if not remaining_expected: + break + + matches = string_similarity( + key, + list(remaining_expected), + algorithm=similarity_func, + threshold=similarity_threshold, + return_most_similar=True, + ) + + if matches: + match = matches + corrected_out[match] = d_[key] + matched_expected.add(match) + matched_input.add(key) + remaining_expected.remove(match) + elif handle_unmatched == "ignore": + corrected_out[key] = d_[key] + + # Handle unmatched keys based on handle_unmatched parameter + unmatched_input = set(d_.keys()) - matched_input + unmatched_expected = fields_set - matched_expected + + if handle_unmatched == "raise" and unmatched_input: + raise ValueError(f"Unmatched keys found: {unmatched_input}") + + elif handle_unmatched == "ignore": + for key in unmatched_input: + corrected_out[key] = d_[key] + + elif handle_unmatched in ("fill", "force"): + # Fill missing expected keys + for key in unmatched_expected: + if fill_mapping and key in fill_mapping: + corrected_out[key] = fill_mapping[key] + else: + corrected_out[key] = fill_value + + # For "fill" mode, also keep unmatched original keys + if handle_unmatched == "fill": + for key in unmatched_input: + corrected_out[key] = d_[key] + + # Check strict mode + if strict and unmatched_expected: + raise ValueError(f"Missing required keys: {unmatched_expected}") + + return corrected_out + + +__all__ = ["validate_keys"] diff --git a/autogen/structure/utils/validate_mapping.py b/autogen/structure/utils/validate_mapping.py new file mode 100644 index 0000000000..93560ea9df --- /dev/null +++ b/autogen/structure/utils/validate_mapping.py @@ -0,0 +1,99 @@ +# copied from https://github.com/lion-agi/lion-os/blob/main/lion/libs/parse/validate/mapping.py +# copyright by HaiyangLi, APACHE LICENSE 2.0 +from collections.abc import Callable, Sequence +from typing import Any, Literal + +from .string_similarity import SIMILARITY_TYPE +from .to_json import to_dict, to_json +from .validate_keys import KeysDict, validate_keys + + +def validate_mapping( + d: Any, + keys: Sequence[str] | KeysDict, + /, + *, + similarity_algo: SIMILARITY_TYPE | Callable[[str, str], float] = "jaro_winkler", + similarity_threshold: float = 0.85, + fuzzy_match: bool = True, + handle_unmatched: Literal["ignore", "raise", "remove", "fill", "force"] = "ignore", + fill_value: Any = None, + fill_mapping: dict[str, Any] | None = None, + strict: bool = False, + suppress_conversion_errors: bool = False, +) -> dict[str, Any]: + """ + Validate and correct any input into a dictionary with expected keys. + + Args: + d: Input to validate. Can be: + - Dictionary + - JSON string or markdown code block + - XML string + - Object with to_dict/model_dump method + - Any type convertible to dictionary + keys: List of expected keys or dictionary mapping keys to types. + similarity_algo: String similarity algorithm or custom function. + similarity_threshold: Minimum similarity score for fuzzy matching. + fuzzy_match: If True, use fuzzy matching for key correction. + handle_unmatched: How to handle unmatched keys: + - "ignore": Keep unmatched keys + - "raise": Raise error for unmatched keys + - "remove": Remove unmatched keys + - "fill": Fill missing keys with default values + - "force": Combine "fill" and "remove" behaviors + fill_value: Default value for filling unmatched keys. + fill_mapping: Dictionary mapping keys to default values. + strict: Raise error if any expected key is missing. + suppress_conversion_errors: Return empty dict on conversion errors. + + Returns: + Validated and corrected dictionary. + + Raises: + ValueError: If input cannot be converted or validation fails. + TypeError: If input types are invalid. + """ + if d is None: + raise TypeError("Input cannot be None") + + # Try converting to dictionary + try: + if isinstance(d, str): + # First try to_json for JSON strings and code blocks + try: + json_result = to_json(d) + dict_input = json_result[0] if isinstance(json_result, list) else json_result + except Exception: + # Fall back to to_dict for other string formats + dict_input = to_dict(d, str_type="json", fuzzy_parse=True, suppress=True) + else: + dict_input = to_dict(d, use_model_dump=True, fuzzy_parse=True, suppress=True) + + if not isinstance(dict_input, dict): + if suppress_conversion_errors: + dict_input = {} + else: + raise ValueError(f"Failed to convert input to dictionary: {type(dict_input)}") + + except Exception as e: + if suppress_conversion_errors: + dict_input = {} + else: + raise ValueError(f"Failed to convert input to dictionary: {e}") + + # Validate the dictionary + return validate_keys( + dict_input, + keys, + similarity_algo=similarity_algo, + similarity_threshold=similarity_threshold, + fuzzy_match=fuzzy_match, + handle_unmatched=handle_unmatched, + fill_value=fill_value, + fill_mapping=fill_mapping, + strict=strict, + ) + + +__all__ = ["validate_mapping"] diff --git a/autogen/structure/utils/xml_parser.py b/autogen/structure/utils/xml_parser.py new file mode 100644 index 0000000000..0a6c58abc3 --- /dev/null +++ b/autogen/structure/utils/xml_parser.py @@ -0,0 +1,143 @@ +# copied from https://github.com/lion-agi/lion-os/tree/main/lion/libs/parse/xml +# copyright by HaiyangLi, APACHE LICENSE 2.0 + + +import re +import xml.etree.ElementTree as ET +from typing import Any + + +def xml_to_dict( + xml_string: str, + /, + suppress=False, + remove_root: bool = True, + root_tag: str = None, +) -> dict[str, Any]: + """ + Parse an XML string into a nested dictionary structure. + + This function converts an XML string into a dictionary where: + - Element tags become dictionary keys + - Text content is assigned directly to the tag key if there are no children + - Attributes are stored in a '@attributes' key + - Multiple child elements with the same tag are stored as lists + + Args: + xml_string: The XML string to parse. + + Returns: + A dictionary representation of the XML structure. + + Raises: + ValueError: If the XML is malformed or parsing fails. + """ + try: + a = XMLParser(xml_string).parse() + if remove_root and (root_tag or "root") in a: + a = a[root_tag or "root"] + return a + except ValueError as e: + if not suppress: + raise e + + +def dict_to_xml(data: dict, /, root_tag: str = "root") -> str: + + root = ET.Element(root_tag) + + def convert(dict_obj: dict, parent: Any) -> None: + for key, val in dict_obj.items(): + if isinstance(val, dict): + element = ET.SubElement(parent, key) + convert(dict_obj=val, parent=element) + else: + element = ET.SubElement(parent, key) + element.text = str(object=val) + + convert(dict_obj=data, parent=root) + return ET.tostring(root, encoding="unicode") + + +class XMLParser: + def __init__(self, xml_string: str): + self.xml_string = xml_string.strip() + self.index = 0 + + def parse(self) -> dict[str, Any]: + """Parse the XML string and return the root element as a dictionary.""" + return self._parse_element() + + def _parse_element(self) -> dict[str, Any]: + """Parse a single XML element and its children.""" + self._skip_whitespace() + if self.xml_string[self.index] != "<": + raise ValueError(f"Expected '<', found '{self.xml_string[self.index]}'") + + tag, attributes = self._parse_opening_tag() + children: dict[str, str | list | dict] = {} + text = "" + + while self.index < len(self.xml_string): + self._skip_whitespace() + if self.xml_string.startswith(" tuple[str, dict[str, str]]: + """Parse an opening XML tag and its attributes.""" + match = re.match( + r'<(\w+)((?:\s+\w+="[^"]*")*)\s*/?>', + self.xml_string[self.index :], # noqa + ) + if not match: + raise ValueError("Invalid opening tag") + self.index += match.end() + tag = match.group(1) + attributes = dict(re.findall(r'(\w+)="([^"]*)"', match.group(2))) + return tag, attributes + + def _parse_closing_tag(self) -> str: + """Parse a closing XML tag.""" + match = re.match(r"", self.xml_string[self.index :]) # noqa + if not match: + raise ValueError("Invalid closing tag") + self.index += match.end() + return match.group(1) + + def _parse_text(self) -> str: + """Parse text content between XML tags.""" + start = self.index + while self.index < len(self.xml_string) and self.xml_string[self.index] != "<": + self.index += 1 + return self.xml_string[start : self.index] # noqa + + def _skip_whitespace(self) -> None: + """Skip any whitespace characters at the current parsing position.""" + p_ = len(self.xml_string[self.index :]) # noqa + m_ = len(self.xml_string[self.index :].lstrip()) # noqa + + self.index += p_ - m_ diff --git a/autogen/structure_utils.py b/autogen/structure_utils.py deleted file mode 100644 index 62de3f6873..0000000000 --- a/autogen/structure_utils.py +++ /dev/null @@ -1,544 +0,0 @@ -# by HaiyangLi, -# most of the codes are from https://github.com/lion-agi/lion-os -# APACHE LICENSE 2.0, copyright 2024, HaiyangLi - -import json -import re -from dataclasses import dataclass -from difflib import SequenceMatcher -from inspect import isclass -from itertools import product -from typing import Any, Callable, Dict, List, Optional, Sequence, TypeVar, Union, get_args, get_origin - -from pydantic import BaseModel -from typing_extensions import Literal - -T = TypeVar("T", bound=BaseModel) - - -# string_similarity -# copied from https://github.com/lion-agi/lion-os/blob/main/lion/libs/string_similarity.py -# copyright by HaiyangLi, APACHE LICENSE 2.0 -def cosine_similarity(s1: str, s2: str) -> float: - """Calculate the cosine similarity between two strings. - - Args: - s1: First input string - s2: Second input string - - Returns: - float: Cosine similarity score between 0 and 1 - """ - if not s1 or not s2: - return 0.0 - - set1, set2 = set(s1), set(s2) - intersection = set1.intersection(set2) - - if not set1 or not set2: - return 0.0 - - return len(intersection) / ((len(set1) * len(set2)) ** 0.5) - - -def hamming_similarity(s1: str, s2: str) -> float: - """Calculate the Hamming similarity between two strings. - - The strings must be of equal length. Returns the proportion of positions - at which corresponding symbols are the same. - - Args: - s1: First input string - s2: Second input string - - Returns: - float: Hamming similarity score between 0 and 1 - """ - if not s1 or not s2 or len(s1) != len(s2): - return 0.0 - - matches = sum(c1 == c2 for c1, c2 in zip(s1, s2)) - return matches / len(s1) - - -def jaro_distance(s: str, t: str) -> float: - """Calculate the Jaro distance between two strings. - - Args: - s: First input string - t: Second input string - - Returns: - float: Jaro distance score between 0 and 1 - """ - s_len = len(s) - t_len = len(t) - - if s_len == 0 and t_len == 0: - return 1.0 - elif s_len == 0 or t_len == 0: - return 0.0 - - match_distance = (max(s_len, t_len) // 2) - 1 - match_distance = max(0, match_distance) # Ensure non-negative - - s_matches = [False] * s_len - t_matches = [False] * t_len - - matches = 0 - transpositions = 0 - - # Identify matches - for i in range(s_len): - start = max(0, i - match_distance) - end = min(i + match_distance + 1, t_len) - - for j in range(start, end): - if t_matches[j] or s[i] != t[j]: - continue - s_matches[i] = t_matches[j] = True - matches += 1 - break - - if matches == 0: - return 0.0 - - # Count transpositions - k = 0 - for i in range(s_len): - if not s_matches[i]: - continue - while not t_matches[k]: - k += 1 - if s[i] != t[k]: - transpositions += 1 - k += 1 - - transpositions //= 2 - - return (matches / s_len + matches / t_len + (matches - transpositions) / matches) / 3.0 - - -def jaro_winkler_similarity(s: str, t: str, scaling: float = 0.1) -> float: - """Calculate the Jaro-Winkler similarity between two strings. - - Args: - s: First input string - t: Second input string - scaling: Scaling factor for common prefix adjustment - - Returns: - float: Jaro-Winkler similarity score between 0 and 1 - - Raises: - ValueError: If scaling factor is not between 0 and 0.25 - """ - if not 0 <= scaling <= 0.25: - raise ValueError("Scaling factor must be between 0 and 0.25") - - jaro_sim = jaro_distance(s, t) - - # Find length of common prefix (up to 4 chars) - prefix_len = 0 - for s_char, t_char in zip(s, t): - if s_char != t_char: - break - prefix_len += 1 - if prefix_len == 4: - break - - return jaro_sim + (prefix_len * scaling * (1 - jaro_sim)) - - -def levenshtein_distance(a: str, b: str) -> int: - """Calculate the Levenshtein (edit) distance between two strings. - - Args: - a: First input string - b: Second input string - - Returns: - int: Minimum number of single-character edits needed to change one - string into the other - """ - if not a: - return len(b) - if not b: - return len(a) - - m, n = len(a), len(b) - d = [[0] * (n + 1) for _ in range(m + 1)] - - for i in range(m + 1): - d[i][0] = i - for j in range(n + 1): - d[0][j] = j - - for i, j in product(range(1, m + 1), range(1, n + 1)): - cost = 0 if a[i - 1] == b[j - 1] else 1 - d[i][j] = min( - d[i - 1][j] + 1, # deletion - d[i][j - 1] + 1, # insertion - d[i - 1][j - 1] + cost, # substitution - ) - - return d[m][n] - - -def levenshtein_similarity(s1: str, s2: str) -> float: - """Calculate the Levenshtein similarity between two strings. - - Converts Levenshtein distance to a similarity score between 0 and 1. - - Args: - s1: First input string - s2: Second input string - - Returns: - float: Levenshtein similarity score between 0 and 1 - """ - if not s1 and not s2: - return 1.0 - if not s1 or not s2: - return 0.0 - - distance = levenshtein_distance(s1, s2) - max_len = max(len(s1), len(s2)) - return 1 - (distance / max_len) - - -# Type definitions -SIMILARITY_ALGO_MAP: Dict[str, Callable[[str, str], float]] = { - "jaro_winkler": jaro_winkler_similarity, - "levenshtein": levenshtein_similarity, - "sequence_matcher": lambda s1, s2: SequenceMatcher(None, s1, s2).ratio(), - "hamming": hamming_similarity, - "cosine": cosine_similarity, -} - - -SIMILARITY_TYPE = Literal[ - "jaro_winkler", - "levenshtein", - "sequence_matcher", - "hamming", - "cosine", -] - - -@dataclass(frozen=True) -class MatchResult: - """Represents a string matching result.""" - - word: str - score: float - index: int - - -def string_similarity( - word: str, - correct_words: Sequence[str], - algorithm: SIMILARITY_TYPE | Callable[[str, str], float] = "jaro_winkler", - threshold: float = 0.0, - case_sensitive: bool = False, - return_most_similar: bool = False, -) -> Optional[Union[str, List[str]]]: - """Find similar strings using specified similarity algorithm.""" - if not correct_words: - raise ValueError("correct_words must not be empty") - - if not 0.0 <= threshold <= 1.0: - raise ValueError("threshold must be between 0.0 and 1.0") - - # Convert inputs to strings - compare_word = str(word) - original_words = [str(w) for w in correct_words] - - # Handle case sensitivity - if not case_sensitive: - compare_word = compare_word.lower() - compare_words = [w.lower() for w in original_words] - else: - compare_words = original_words.copy() - - # Get scoring function - if isinstance(algorithm, str): - score_func = SIMILARITY_ALGO_MAP.get(algorithm) - if score_func is None: - raise ValueError(f"Unsupported algorithm: {algorithm}") - elif callable(algorithm): - score_func = algorithm - else: - raise ValueError("algorithm must be a string specifying a built-in algorithm or " "a callable") - - # Calculate similarities - results = [] - for idx, (orig_word, comp_word) in enumerate(zip(original_words, compare_words)): - # Skip different length strings for hamming similarity - if algorithm == "hamming" and len(comp_word) != len(compare_word): - continue - - score = score_func(compare_word, comp_word) - if score >= threshold: - results.append(MatchResult(orig_word, score, idx)) - - # Return None if no matches - if not results: - return None - - # Sort by score (descending) and index (ascending) for stable ordering - results.sort(key=lambda x: (-x.score, x.index)) - - # Return results - if return_most_similar: - return results[0].word - - # Filter exact matches for case sensitive comparisons - if case_sensitive: - max_score = results[0].score - results = [r for r in results if r.score == max_score] - - return [r.word for r in results] - - -# copied from https://github.com/lion-agi/lion-os/blob/main/lion/integrations/pydantic_/break_down_annotation.py -# copyright by HaiyangLi, APACHE LICENSE 2.0 -def break_down_pydantic_annotation( - model: type[T], max_depth: int | None = None, current_depth: int = 0 -) -> Dict[str, Any]: - """ - Break down the type annotations of a Pydantic model into a dictionary. - - This function recursively processes Pydantic models, converting their - field annotations into a dictionary structure. It handles nested models - and lists of models. - - Args: - model: The Pydantic model class to break down. - max_depth: Maximum depth for recursion. None for no limit. - current_depth: Current recursion depth (used internally). - - Returns: - A dictionary representing the structure of the model's annotations. - - Raises: - TypeError: If the input is not a Pydantic model. - RecursionError: If max recursion depth is reached. - - Example: - >>> from pydantic import BaseModel - >>> class SubModel(BaseModel): - ... field1: int - ... field2: str - >>> class MainModel(BaseModel): - ... sub: SubModel - ... items: list[SubModel] - >>> result = break_down_annotation(MainModel) - >>> print(result) - { - 'sub': {'field1': , 'field2': }, - 'items': [{'field1': , 'field2': }] - } - """ - - if not _is_pydantic_model(model): - raise TypeError("Input must be a Pydantic model") - - if max_depth is not None and current_depth >= max_depth: - raise RecursionError("Maximum recursion depth reached") - - out: Dict[str, Any] = {} - for k, v in model.__annotations__.items(): - origin = get_origin(v) - if _is_pydantic_model(v): - out[k] = break_down_pydantic_annotation(v, max_depth, current_depth + 1) - elif origin is list: - args = get_args(v) - if args and _is_pydantic_model(args[0]): - out[k] = [break_down_pydantic_annotation(args[0], max_depth, current_depth + 1)] - else: - out[k] = [args[0] if args else Any] - else: - out[k] = v - - return out - - -def _is_pydantic_model(x: Any) -> bool: - return isclass(x) and issubclass(x, BaseModel) - - -# copied from https://github.com/lion-agi/lion-os/blob/main/lion/libs/parse.py -# copyright by HaiyangLi, APACHE LICENSE 2.0 -def to_json(string: str | List[str], /, fuzzy_parse: bool = False) -> Union[List[Dict[str, Any]], Dict]: - """Extract and parse JSON content from a string or markdown code blocks. - - This function attempts to parse JSON directly from the input string first. - If that fails, it looks for JSON content within markdown code blocks - (denoted by ```json). - - Args: - string: Input string or list of strings to parse. If a list is provided, - it will be joined with newlines. - - Returns: - - A dictionary if a single JSON object is found - - A list of dictionaries if multiple JSON objects are found - - An empty list if no valid JSON is found - - Examples: - >>> to_json('{"key": "value"}') - {'key': 'value'} - - >>> to_json(''' - ... ```json - ... {"key": "value"} - ... ``` - ... ''') - {'key': 'value'} - - >>> to_json(''' - ... ```json - ... {"key1": "value1"} - ... ``` - ... ```json - ... {"key2": "value2"} - ... ``` - ... ''') - [{'key1': 'value1'}, {'key2': 'value2'}] - """ - - if isinstance(string, list): - string = "\n".join(string) - - # Try direct JSON parsing first - try: - if fuzzy_parse: - return fuzzy_parse_json(string) - return json.loads(string) - except Exception: - pass - - # Look for JSON in markdown code blocks - pattern = r"```json\s*(.*?)\s*```" - matches = re.findall(pattern, string, re.DOTALL) - - if not matches: - return [] - - if len(matches) == 1: - return json.loads(matches[0]) - - if fuzzy_parse: - return [fuzzy_parse_json(match) for match in matches] - return [json.loads(match) for match in matches] - - -def fuzzy_parse_json(str_to_parse: str, /) -> Union[Dict[str, Any], List[Dict[str, Any]]]: - """Parse a JSON string with automatic fixing of common formatting issues. - - Args: - str_to_parse: The JSON string to parse - - Returns: - The parsed JSON object as a dictionary - - Raises: - ValueError: If the string cannot be parsed as valid JSON - TypeError: If the input is not a string or the result is not a dict - """ - if not isinstance(str_to_parse, str): - raise TypeError("Input must be a string") - - if not str_to_parse.strip(): - raise ValueError("Input string is empty") - - try: - return json.loads(str_to_parse) - except Exception: - pass - - cleaned = _clean_json_string(str_to_parse) - try: - return json.loads(cleaned) - except Exception: - pass - - try: - fixed = fix_json_string(cleaned) - return json.loads(fixed) - except Exception as e: - raise ValueError(f"Failed to parse JSON string after all fixing attempts: {e}") from e - - -def _clean_json_string(s: str) -> str: - """Clean and standardize a JSON string.""" - s = re.sub(r"(? str: - """Fix a JSON string by ensuring all brackets are properly closed. - - Args: - str_to_parse: JSON string to fix - - Returns: - Fixed JSON string with proper bracket closure - - Raises: - ValueError: If mismatched or extra closing brackets are found - """ - if not str_to_parse: - raise ValueError("Input string is empty") - - brackets = {"{": "}", "[": "]"} - open_brackets = [] - pos = 0 - length = len(str_to_parse) - - while pos < length: - char = str_to_parse[pos] - - # Handle escape sequences - if char == "\\": - pos += 2 # Skip escape sequence - continue - - # Handle string content - if char == '"': - pos += 1 - # Skip until closing quote, accounting for escapes - while pos < length: - if str_to_parse[pos] == "\\": - pos += 2 # Skip escape sequence - continue - if str_to_parse[pos] == '"': - break - pos += 1 - pos += 1 - continue - - # Handle brackets - if char in brackets: - open_brackets.append(brackets[char]) - elif char in brackets.values(): - if not open_brackets: - raise ValueError(f"Extra closing bracket '{char}' at position {pos}") - if open_brackets[-1] != char: - raise ValueError(f"Mismatched bracket '{char}' at position {pos}") - open_brackets.pop() - - pos += 1 - - # Add missing closing brackets - closing_brackets = "".join(reversed(open_brackets)) - return str_to_parse + closing_brackets - - -# TODO: add a recursive to dict into AG2 -# check `recursive_to_dict` under https://github.com/lion-agi/lion-os/blob/main/lion/libs/parse.py - -# TODO: add fuzzy matching key, fuzzy matching mapping, need to modify the implementation of to_dict from LION-OS