Skip to content

API Reference

Auto-generated reference documentation for all public API2MCP classes and functions.


Core

api2mcp.core.ir_schema

api2mcp.core.ir_schema

Intermediate Representation (IR) schema for API2MCP.

The IR is the central data structure bridging ALL parsers to ALL generators. Every parser outputs IR; every generator consumes IR. Changes to the IR schema affect the entire pipeline.

Design decisions: - GraphQL queries/mutations map to Endpoint with method="QUERY"/"MUTATION" - GraphQL fragments resolved during parsing, not stored in IR - Postman variables substituted during parsing, not stored in IR - Pagination patterns detected and stored for smart tool generation

Classes

APISpec dataclass

Top-level Intermediate Representation for a parsed API.

This is the output of every parser and input to every generator.

Source code in src/api2mcp/core/ir_schema.py
@dataclass
class APISpec:
    """Top-level Intermediate Representation for a parsed API.

    This is the output of every parser and input to every generator.
    """

    title: str
    version: str
    description: str = ""
    base_url: str = ""
    servers: list[ServerInfo] = field(default_factory=list)
    endpoints: list[Endpoint] = field(default_factory=list)
    auth_schemes: list[AuthScheme] = field(default_factory=list)
    models: dict[str, ModelDef] = field(default_factory=dict)
    metadata: dict[str, Any] = field(default_factory=dict)
    source_format: str = ""  # "openapi3.0", "openapi3.1", "graphql", etc.

Endpoint dataclass

Single API operation.

Source code in src/api2mcp/core/ir_schema.py
@dataclass
class Endpoint:
    """Single API operation."""

    path: str
    method: HttpMethod
    operation_id: str
    summary: str = ""
    description: str = ""
    parameters: list[Parameter] = field(default_factory=list)
    request_body: RequestBody | None = None
    responses: list[Response] = field(default_factory=list)
    tags: list[str] = field(default_factory=list)
    security: list[dict[str, list[str]]] = field(default_factory=list)
    deprecated: bool = False
    pagination: PaginationConfig | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

Parameter dataclass

API parameter with location and schema.

Source code in src/api2mcp/core/ir_schema.py
@dataclass
class Parameter:
    """API parameter with location and schema."""

    name: str
    location: ParameterLocation
    schema: SchemaRef
    required: bool = False
    description: str = ""
    deprecated: bool = False
    example: Any = None

RequestBody dataclass

Request body definition.

Source code in src/api2mcp/core/ir_schema.py
@dataclass
class RequestBody:
    """Request body definition."""

    content_type: str  # e.g., "application/json"
    schema: SchemaRef
    required: bool = False
    description: str = ""

Response dataclass

API response definition.

Source code in src/api2mcp/core/ir_schema.py
@dataclass
class Response:
    """API response definition."""

    status_code: str  # "200", "default", etc.
    description: str = ""
    content_type: str = ""
    schema: SchemaRef | None = None

SchemaRef dataclass

Unified JSON Schema type reference.

Maps to JSON Schema for MCP tool input_schema generation.

Source code in src/api2mcp/core/ir_schema.py
@dataclass
class SchemaRef:
    """Unified JSON Schema type reference.

    Maps to JSON Schema for MCP tool input_schema generation.
    """

    type: str  # SchemaType value or composite (e.g., "string", "object")
    description: str = ""
    properties: dict[str, SchemaRef] = field(default_factory=dict)
    items: SchemaRef | None = None  # For array types
    required: list[str] = field(default_factory=list)
    enum: list[Any] = field(default_factory=list)
    format: str = ""  # date-time, email, uri, etc.
    default: Any = None
    nullable: bool = False
    additional_properties: SchemaRef | bool | None = None
    one_of: list[SchemaRef] = field(default_factory=list)
    any_of: list[SchemaRef] = field(default_factory=list)
    all_of: list[SchemaRef] = field(default_factory=list)
    ref_name: str = ""  # Original $ref name for traceability
    pattern: str = ""
    min_length: int | None = None
    max_length: int | None = None
    minimum: float | None = None
    maximum: float | None = None
    example: Any = None

    def to_json_schema(self) -> dict[str, Any]:
        """Convert to standard JSON Schema dict."""
        schema: dict[str, Any] = {}

        if self.type:
            schema["type"] = self.type
        if self.description:
            schema["description"] = self.description
        if self.format:
            schema["format"] = self.format
        if self.enum:
            schema["enum"] = self.enum
        if self.default is not None:
            schema["default"] = self.default
        if self.pattern:
            schema["pattern"] = self.pattern
        if self.min_length is not None:
            schema["minLength"] = self.min_length
        if self.max_length is not None:
            schema["maxLength"] = self.max_length
        if self.minimum is not None:
            schema["minimum"] = self.minimum
        if self.maximum is not None:
            schema["maximum"] = self.maximum
        if self.example is not None:
            schema["example"] = self.example

        if self.type == SchemaType.OBJECT or self.properties:
            if self.properties:
                schema["properties"] = {
                    name: prop.to_json_schema()
                    for name, prop in self.properties.items()
                }
            if self.required:
                schema["required"] = self.required
            if self.additional_properties is not None:
                if isinstance(self.additional_properties, SchemaRef):
                    schema["additionalProperties"] = (
                        self.additional_properties.to_json_schema()
                    )
                else:
                    schema["additionalProperties"] = self.additional_properties

        if self.type == SchemaType.ARRAY and self.items:
            schema["items"] = self.items.to_json_schema()

        if self.one_of:
            schema["oneOf"] = [s.to_json_schema() for s in self.one_of]
        if self.any_of:
            schema["anyOf"] = [s.to_json_schema() for s in self.any_of]
        if self.all_of:
            schema["allOf"] = [s.to_json_schema() for s in self.all_of]

        if self.nullable and self.type:
            # JSON Schema 2020-12 style
            schema["type"] = [self.type, "null"]

        return schema
Functions
to_json_schema()

Convert to standard JSON Schema dict.

Source code in src/api2mcp/core/ir_schema.py
def to_json_schema(self) -> dict[str, Any]:
    """Convert to standard JSON Schema dict."""
    schema: dict[str, Any] = {}

    if self.type:
        schema["type"] = self.type
    if self.description:
        schema["description"] = self.description
    if self.format:
        schema["format"] = self.format
    if self.enum:
        schema["enum"] = self.enum
    if self.default is not None:
        schema["default"] = self.default
    if self.pattern:
        schema["pattern"] = self.pattern
    if self.min_length is not None:
        schema["minLength"] = self.min_length
    if self.max_length is not None:
        schema["maxLength"] = self.max_length
    if self.minimum is not None:
        schema["minimum"] = self.minimum
    if self.maximum is not None:
        schema["maximum"] = self.maximum
    if self.example is not None:
        schema["example"] = self.example

    if self.type == SchemaType.OBJECT or self.properties:
        if self.properties:
            schema["properties"] = {
                name: prop.to_json_schema()
                for name, prop in self.properties.items()
            }
        if self.required:
            schema["required"] = self.required
        if self.additional_properties is not None:
            if isinstance(self.additional_properties, SchemaRef):
                schema["additionalProperties"] = (
                    self.additional_properties.to_json_schema()
                )
            else:
                schema["additionalProperties"] = self.additional_properties

    if self.type == SchemaType.ARRAY and self.items:
        schema["items"] = self.items.to_json_schema()

    if self.one_of:
        schema["oneOf"] = [s.to_json_schema() for s in self.one_of]
    if self.any_of:
        schema["anyOf"] = [s.to_json_schema() for s in self.any_of]
    if self.all_of:
        schema["allOf"] = [s.to_json_schema() for s in self.all_of]

    if self.nullable and self.type:
        # JSON Schema 2020-12 style
        schema["type"] = [self.type, "null"]

    return schema

AuthScheme dataclass

Authentication scheme definition.

Source code in src/api2mcp/core/ir_schema.py
@dataclass
class AuthScheme:
    """Authentication scheme definition."""

    name: str
    type: AuthType
    description: str = ""
    # API Key specifics
    api_key_name: str = ""
    api_key_location: str = ""  # "header", "query", "cookie"
    # OAuth2 specifics
    flows: dict[str, Any] = field(default_factory=dict)
    # OpenID Connect
    openid_connect_url: str = ""
    # HTTP specifics
    scheme: str = ""  # "basic", "bearer"
    bearer_format: str = ""

HttpMethod

Bases: str, Enum

HTTP methods + GraphQL virtual methods.

Source code in src/api2mcp/core/ir_schema.py
class HttpMethod(str, Enum):
    """HTTP methods + GraphQL virtual methods."""

    GET = "GET"
    POST = "POST"
    PUT = "PUT"
    PATCH = "PATCH"
    DELETE = "DELETE"
    HEAD = "HEAD"
    OPTIONS = "OPTIONS"
    TRACE = "TRACE"
    # GraphQL virtual methods
    QUERY = "QUERY"
    MUTATION = "MUTATION"
    SUBSCRIPTION = "SUBSCRIPTION"

ParameterLocation

Bases: str, Enum

Where in the request a parameter is sent.

Source code in src/api2mcp/core/ir_schema.py
class ParameterLocation(str, Enum):
    """Where in the request a parameter is sent."""

    PATH = "path"
    QUERY = "query"
    HEADER = "header"
    COOKIE = "cookie"
    BODY = "body"  # GraphQL variables

Parsers

api2mcp.parsers.openapi

api2mcp.parsers.openapi.OpenAPIParser

Bases: BaseParser

Parser for OpenAPI 3.0.x and 3.1.x specifications.

Source code in src/api2mcp/parsers/openapi.py
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
class OpenAPIParser(BaseParser):
    """Parser for OpenAPI 3.0.x and 3.1.x specifications."""

    def detect(self, content: dict[str, Any]) -> bool:
        """Return True if content looks like an OpenAPI 3.x document."""
        version = _detect_openapi_version(content)
        return version is not None and version[0] == 3

    async def validate(
        self, source: str | Path, **kwargs: Any
    ) -> list[ParseError]:
        """Validate an OpenAPI spec without producing full IR."""
        text = await self._load_source(source)
        doc = _parse_yaml_or_json(text, str(source))

        errors = _validate_structure(doc)

        version = _detect_openapi_version(doc)
        if version is None:
            errors.append(ParseError("Not a valid OpenAPI document (missing 'openapi' field)"))
        elif version[0] != 3:
            errors.append(
                ParseError(
                    f"Unsupported OpenAPI version: {doc.get('openapi')}. "
                    f"Only 3.0.x and 3.1.x are supported."
                )
            )

        return errors

    async def parse(self, source: str | Path, **kwargs: Any) -> APISpec:
        """Parse an OpenAPI 3.0/3.1 spec into an IR APISpec.

        Args:
            source: Path to the spec file, or a URL.
            **kwargs: Options:
                - resolve_external_refs (bool): Whether to resolve external $refs.
                  Defaults to True.

        Returns:
            Parsed APISpec.

        Raises:
            ParseException: On invalid input.
            ValidationException: On structural validation failure.
        """
        # Emit PRE_PARSE hook (optional — plugins may not be loaded)
        try:
            from api2mcp.plugins import get_hook_manager
            from api2mcp.plugins.hooks import PRE_PARSE
            await get_hook_manager().emit(PRE_PARSE, path=str(source))
        except Exception:  # noqa: BLE001
            pass  # plugins are optional

        text = await self._load_source(source)
        doc = _parse_yaml_or_json(text, str(source))

        # Detect version
        version = _detect_openapi_version(doc)
        if version is None or version[0] != 3:
            raise ParseException(
                f"Not an OpenAPI 3.x document. Found: {doc.get('openapi', 'missing')}"
            )

        # Validate structure
        errors = _validate_structure(doc)
        hard_errors = [e for e in errors if e.severity == "error"]
        if hard_errors:
            raise ValidationException(
                f"OpenAPI validation failed with {len(hard_errors)} error(s)",
                errors=hard_errors,
            )
        for w in errors:
            if w.severity == "warning":
                logger.warning("Validation warning: %s", w)

        # Determine base path for external $ref resolution
        base_path: Path | None = None
        if isinstance(source, Path):
            base_path = source.parent
        elif isinstance(source, str) and not source.startswith(("http://", "https://")):
            base_path = Path(source).parent

        # Set up ref resolver
        resolver = RefResolver(doc, base_path)

        # Determine source format
        is_31 = version >= (3, 1, 0)
        source_format = "openapi3.1" if is_31 else "openapi3.0"

        # Extract info
        info = doc.get("info", {})
        title = info.get("title", "Untitled API")
        api_version = info.get("version", "0.0.0")
        description = info.get("description", "")

        # Extract servers
        servers = self._parse_servers(doc.get("servers", []))
        base_url = servers[0].url if servers else ""

        # Extract security schemes
        components = doc.get("components", {})
        security_schemes_raw = components.get("securitySchemes", {})
        auth_schemes = await _extract_auth_schemes(security_schemes_raw, resolver)

        # Extract models (components/schemas)
        models = await self._parse_models(components.get("schemas", {}), resolver)

        # Extract endpoints from paths
        global_security = doc.get("security", [])
        endpoints = await self._parse_paths(
            doc.get("paths", {}), resolver, global_security
        )

        # OpenAPI 3.1: also extract from webhooks
        if is_31 and "webhooks" in doc:
            webhook_endpoints = await self._parse_paths(
                doc["webhooks"], resolver, global_security, is_webhook=True
            )
            endpoints.extend(webhook_endpoints)

        api_spec = APISpec(
            title=title,
            version=api_version,
            description=description,
            base_url=base_url,
            servers=servers,
            endpoints=endpoints,
            auth_schemes=auth_schemes,
            models=models,
            metadata={
                "openapi_version": doc.get("openapi", ""),
                "external_docs": doc.get("externalDocs", {}),
                "tags": doc.get("tags", []),
            },
            source_format=source_format,
        )

        # Emit POST_PARSE hook
        try:
            from api2mcp.plugins import get_hook_manager
            from api2mcp.plugins.hooks import POST_PARSE
            await get_hook_manager().emit(POST_PARSE, api_spec=api_spec)
        except Exception:  # noqa: BLE001
            pass

        return api_spec

    # ------------------------------------------------------------------ #
    #  Internal parsing helpers
    # ------------------------------------------------------------------ #

    @staticmethod
    def _parse_servers(raw_servers: list[dict[str, Any]]) -> list[ServerInfo]:
        """Parse the servers array."""
        servers: list[ServerInfo] = []
        for srv in raw_servers:
            if not isinstance(srv, dict):
                continue
            servers.append(
                ServerInfo(
                    url=srv.get("url", ""),
                    description=srv.get("description", ""),
                    variables=srv.get("variables", {}),
                )
            )
        return servers

    @staticmethod
    async def _parse_models(
        schemas: dict[str, Any], resolver: RefResolver
    ) -> dict[str, ModelDef]:
        """Parse components/schemas into ModelDef dict."""
        models: dict[str, ModelDef] = {}
        for name, schema_raw in schemas.items():
            if "$ref" in schema_raw:
                schema_raw = await resolver.resolve(schema_raw["$ref"])
            models[name] = ModelDef(
                name=name,
                schema=await _schema_to_ir(schema_raw, resolver, ref_name=name),
                description=schema_raw.get("description", ""),
            )
        return models

    async def _parse_paths(
        self,
        paths: dict[str, Any],
        resolver: RefResolver,
        global_security: list[dict[str, list[str]]],
        is_webhook: bool = False,
    ) -> list[Endpoint]:
        """Parse paths object into list of Endpoint."""
        endpoints: list[Endpoint] = []
        op_id_counter: dict[str, int] = {}

        for path_key, path_item_raw in paths.items():
            if not isinstance(path_item_raw, dict):
                continue

            # Resolve $ref at path item level
            if "$ref" in path_item_raw:
                path_item_raw = await resolver.resolve(path_item_raw["$ref"])

            # Path-level parameters
            path_level_params = path_item_raw.get("parameters", [])

            for method_str in _VALID_HTTP_METHODS:
                if method_str not in path_item_raw:
                    continue

                operation = path_item_raw[method_str]
                if not isinstance(operation, dict):
                    continue

                endpoint = await self._parse_operation(
                    path=path_key,
                    method_str=method_str,
                    operation=operation,
                    path_level_params=path_level_params,
                    resolver=resolver,
                    global_security=global_security,
                    op_id_counter=op_id_counter,
                    is_webhook=is_webhook,
                )
                endpoints.append(endpoint)

        return endpoints

    async def _parse_operation(
        self,
        path: str,
        method_str: str,
        operation: dict[str, Any],
        path_level_params: list[dict[str, Any]],
        resolver: RefResolver,
        global_security: list[dict[str, list[str]]],
        op_id_counter: dict[str, int],
        is_webhook: bool,
    ) -> Endpoint:
        """Parse a single operation into an Endpoint."""
        method = HttpMethod(method_str.upper())

        # Generate operation ID
        op_id = operation.get("operationId", "")
        if not op_id:
            op_id = self._generate_operation_id(method_str, path, op_id_counter)

        # Merge path-level + operation-level parameters
        params = await self._merge_parameters(
            path_level_params,
            operation.get("parameters", []),
            resolver,
        )

        # Request body
        request_body = await self._parse_request_body(
            operation.get("requestBody"), resolver
        )

        # Responses
        responses = await self._parse_responses(operation.get("responses", {}), resolver)

        # Security (operation-level overrides global)
        security = operation.get("security", global_security)

        # Detect pagination
        pagination = _detect_pagination(params)

        metadata: dict[str, Any] = {}
        if is_webhook:
            metadata["webhook"] = True

        return Endpoint(
            path=path,
            method=method,
            operation_id=op_id,
            summary=operation.get("summary", ""),
            description=operation.get("description", ""),
            parameters=params,
            request_body=request_body,
            responses=responses,
            tags=operation.get("tags", []),
            security=security,
            deprecated=operation.get("deprecated", False),
            pagination=pagination,
            metadata=metadata,
        )

    @staticmethod
    def _generate_operation_id(
        method: str, path: str, counter: dict[str, int]
    ) -> str:
        """Generate an operation ID from method + path when operationId is missing."""
        # Convert /users/{user_id}/repos to users_user_id_repos
        clean = path.strip("/").replace("{", "").replace("}", "")
        segments = [s for s in clean.split("/") if s]
        candidate = f"{method}_{'_'.join(segments)}" if segments else method

        # Handle collisions
        if candidate in counter:
            counter[candidate] += 1
            return f"{candidate}_{counter[candidate]}"
        counter[candidate] = 0
        return candidate

    @staticmethod
    async def _merge_parameters(
        path_params: list[dict[str, Any]],
        op_params: list[dict[str, Any]],
        resolver: RefResolver,
    ) -> list[Parameter]:
        """Merge path-level and operation-level parameters.

        Operation params override path params when name+in match.
        """
        # Build index of path params
        merged: dict[tuple[str, str], dict[str, Any]] = {}
        for raw in path_params:
            if "$ref" in raw:
                raw = await resolver.resolve(raw["$ref"])
            key = (raw.get("name", ""), raw.get("in", ""))
            merged[key] = raw

        # Operation params override
        for raw in op_params:
            if "$ref" in raw:
                raw = await resolver.resolve(raw["$ref"])
            key = (raw.get("name", ""), raw.get("in", ""))
            merged[key] = raw

        # Convert to IR
        result: list[Parameter] = []
        for raw in merged.values():
            location_str = raw.get("in", "query")
            try:
                location = ParameterLocation(location_str)
            except ValueError:
                logger.warning("Unknown parameter location '%s', defaulting to query", location_str)
                location = ParameterLocation.QUERY

            schema_raw = raw.get("schema", {"type": "string"})
            schema = await _schema_to_ir(schema_raw, resolver)

            result.append(
                Parameter(
                    name=raw.get("name", ""),
                    location=location,
                    schema=schema,
                    required=raw.get("required", location == ParameterLocation.PATH),
                    description=raw.get("description", ""),
                    deprecated=raw.get("deprecated", False),
                    example=raw.get("example"),
                )
            )
        return result

    @staticmethod
    async def _parse_request_body(
        raw: dict[str, Any] | None, resolver: RefResolver
    ) -> RequestBody | None:
        """Parse requestBody into IR RequestBody."""
        if raw is None:
            return None

        if "$ref" in raw:
            raw = await resolver.resolve(raw["$ref"])

        content = raw.get("content", {})
        if not content:
            return None

        # Prefer application/json, fall back to first content type
        if "application/json" in content:
            ct = "application/json"
            media = content[ct]
        else:
            ct = next(iter(content))
            media = content[ct]

        schema_raw = media.get("schema", {"type": "object"})
        schema = await _schema_to_ir(schema_raw, resolver)

        return RequestBody(
            content_type=ct,
            schema=schema,
            required=raw.get("required", False),
            description=raw.get("description", ""),
        )

    @staticmethod
    async def _parse_responses(
        raw_responses: dict[str, Any], resolver: RefResolver
    ) -> list[Response]:
        """Parse responses object into list of IR Response."""
        responses: list[Response] = []
        for status_code, resp_raw in raw_responses.items():
            if "$ref" in resp_raw:
                resp_raw = await resolver.resolve(resp_raw["$ref"])

            content = resp_raw.get("content", {})
            content_type = ""
            schema: SchemaRef | None = None

            if content:
                # Prefer application/json
                if "application/json" in content:
                    content_type = "application/json"
                    media = content[content_type]
                else:
                    content_type = next(iter(content))
                    media = content[content_type]

                if "schema" in media:
                    schema = await _schema_to_ir(media["schema"], resolver)

            responses.append(
                Response(
                    status_code=str(status_code),
                    description=resp_raw.get("description", ""),
                    content_type=content_type,
                    schema=schema,
                )
            )
        return responses

    # ------------------------------------------------------------------ #
    #  Source loading
    # ------------------------------------------------------------------ #

    @staticmethod
    async def _load_source(source: str | Path) -> str:
        """Load spec content from a file path or URL."""
        if isinstance(source, Path):
            path = source
        elif source.startswith(("http://", "https://")):
            async with httpx.AsyncClient() as client:
                resp = await client.get(source, timeout=30, follow_redirects=True)
                resp.raise_for_status()
                return resp.text
        else:
            path = Path(source)

        if not path.exists():
            raise ParseException(f"File not found: {path}")
        return path.read_text(encoding="utf-8")

Functions

detect(content)

Return True if content looks like an OpenAPI 3.x document.

Source code in src/api2mcp/parsers/openapi.py
def detect(self, content: dict[str, Any]) -> bool:
    """Return True if content looks like an OpenAPI 3.x document."""
    version = _detect_openapi_version(content)
    return version is not None and version[0] == 3

validate(source, **kwargs) async

Validate an OpenAPI spec without producing full IR.

Source code in src/api2mcp/parsers/openapi.py
async def validate(
    self, source: str | Path, **kwargs: Any
) -> list[ParseError]:
    """Validate an OpenAPI spec without producing full IR."""
    text = await self._load_source(source)
    doc = _parse_yaml_or_json(text, str(source))

    errors = _validate_structure(doc)

    version = _detect_openapi_version(doc)
    if version is None:
        errors.append(ParseError("Not a valid OpenAPI document (missing 'openapi' field)"))
    elif version[0] != 3:
        errors.append(
            ParseError(
                f"Unsupported OpenAPI version: {doc.get('openapi')}. "
                f"Only 3.0.x and 3.1.x are supported."
            )
        )

    return errors

parse(source, **kwargs) async

Parse an OpenAPI 3.0/3.1 spec into an IR APISpec.

Parameters:

Name Type Description Default
source str | Path

Path to the spec file, or a URL.

required
**kwargs Any

Options: - resolve_external_refs (bool): Whether to resolve external $refs. Defaults to True.

{}

Returns:

Type Description
APISpec

Parsed APISpec.

Raises:

Type Description
ParseException

On invalid input.

ValidationException

On structural validation failure.

Source code in src/api2mcp/parsers/openapi.py
async def parse(self, source: str | Path, **kwargs: Any) -> APISpec:
    """Parse an OpenAPI 3.0/3.1 spec into an IR APISpec.

    Args:
        source: Path to the spec file, or a URL.
        **kwargs: Options:
            - resolve_external_refs (bool): Whether to resolve external $refs.
              Defaults to True.

    Returns:
        Parsed APISpec.

    Raises:
        ParseException: On invalid input.
        ValidationException: On structural validation failure.
    """
    # Emit PRE_PARSE hook (optional — plugins may not be loaded)
    try:
        from api2mcp.plugins import get_hook_manager
        from api2mcp.plugins.hooks import PRE_PARSE
        await get_hook_manager().emit(PRE_PARSE, path=str(source))
    except Exception:  # noqa: BLE001
        pass  # plugins are optional

    text = await self._load_source(source)
    doc = _parse_yaml_or_json(text, str(source))

    # Detect version
    version = _detect_openapi_version(doc)
    if version is None or version[0] != 3:
        raise ParseException(
            f"Not an OpenAPI 3.x document. Found: {doc.get('openapi', 'missing')}"
        )

    # Validate structure
    errors = _validate_structure(doc)
    hard_errors = [e for e in errors if e.severity == "error"]
    if hard_errors:
        raise ValidationException(
            f"OpenAPI validation failed with {len(hard_errors)} error(s)",
            errors=hard_errors,
        )
    for w in errors:
        if w.severity == "warning":
            logger.warning("Validation warning: %s", w)

    # Determine base path for external $ref resolution
    base_path: Path | None = None
    if isinstance(source, Path):
        base_path = source.parent
    elif isinstance(source, str) and not source.startswith(("http://", "https://")):
        base_path = Path(source).parent

    # Set up ref resolver
    resolver = RefResolver(doc, base_path)

    # Determine source format
    is_31 = version >= (3, 1, 0)
    source_format = "openapi3.1" if is_31 else "openapi3.0"

    # Extract info
    info = doc.get("info", {})
    title = info.get("title", "Untitled API")
    api_version = info.get("version", "0.0.0")
    description = info.get("description", "")

    # Extract servers
    servers = self._parse_servers(doc.get("servers", []))
    base_url = servers[0].url if servers else ""

    # Extract security schemes
    components = doc.get("components", {})
    security_schemes_raw = components.get("securitySchemes", {})
    auth_schemes = await _extract_auth_schemes(security_schemes_raw, resolver)

    # Extract models (components/schemas)
    models = await self._parse_models(components.get("schemas", {}), resolver)

    # Extract endpoints from paths
    global_security = doc.get("security", [])
    endpoints = await self._parse_paths(
        doc.get("paths", {}), resolver, global_security
    )

    # OpenAPI 3.1: also extract from webhooks
    if is_31 and "webhooks" in doc:
        webhook_endpoints = await self._parse_paths(
            doc["webhooks"], resolver, global_security, is_webhook=True
        )
        endpoints.extend(webhook_endpoints)

    api_spec = APISpec(
        title=title,
        version=api_version,
        description=description,
        base_url=base_url,
        servers=servers,
        endpoints=endpoints,
        auth_schemes=auth_schemes,
        models=models,
        metadata={
            "openapi_version": doc.get("openapi", ""),
            "external_docs": doc.get("externalDocs", {}),
            "tags": doc.get("tags", []),
        },
        source_format=source_format,
    )

    # Emit POST_PARSE hook
    try:
        from api2mcp.plugins import get_hook_manager
        from api2mcp.plugins.hooks import POST_PARSE
        await get_hook_manager().emit(POST_PARSE, api_spec=api_spec)
    except Exception:  # noqa: BLE001
        pass

    return api_spec

api2mcp.parsers.graphql

api2mcp.parsers.graphql.GraphQLParser

Bases: BaseParser

Parser for GraphQL schemas (SDL and introspection JSON).

Parameters:

Name Type Description Default
graphql_endpoint str

The HTTP path of the GraphQL endpoint that will be stored in the IR. Defaults to "/graphql".

'/graphql'

Example::

parser = GraphQLParser()
spec = await parser.parse("schema.graphql")
# or
spec = await parser.parse("introspection.json")
Source code in src/api2mcp/parsers/graphql.py
class GraphQLParser(BaseParser):
    """Parser for GraphQL schemas (SDL and introspection JSON).

    Args:
        graphql_endpoint: The HTTP path of the GraphQL endpoint that will be
            stored in the IR.  Defaults to ``"/graphql"``.

    Example::

        parser = GraphQLParser()
        spec = await parser.parse("schema.graphql")
        # or
        spec = await parser.parse("introspection.json")
    """

    def __init__(self, graphql_endpoint: str = "/graphql") -> None:
        self._graphql_endpoint = graphql_endpoint

    # ------------------------------------------------------------------
    # BaseParser interface
    # ------------------------------------------------------------------

    def detect(self, content: dict[str, Any]) -> bool:
        """Return ``True`` if *content* looks like a GraphQL introspection result."""
        if "__schema" in content:
            return True
        if "data" in content and isinstance(content["data"], dict):
            return "__schema" in content["data"]
        return False

    async def validate(self, source: str | Path, **_kwargs: Any) -> list[ParseError]:
        """Validate a GraphQL schema without producing full IR."""
        text = await self._load_source(source)
        format_, data = self._detect_format(text)

        if format_ == "introspection":
            assert data is not None
            return _IntrospectionParser(self._graphql_endpoint).validate(data)
        else:
            return _SDLParser(self._graphql_endpoint).validate(text)

    async def parse(self, source: str | Path, **kwargs: Any) -> APISpec:
        """Parse a GraphQL schema into an IR :class:`APISpec`.

        Args:
            source: File path, URL, or raw SDL/JSON string.
            **kwargs: Optional:

                * ``title`` (str) — API title.  Defaults to the filename
                  stem or ``"GraphQL API"``.
                * ``endpoint_url`` (str) — overrides the GraphQL HTTP endpoint
                  URL stored in the IR.
                * ``base_url`` (str) — base URL of the GraphQL server (e.g.
                  ``"https://api.example.com"``).

        Returns:
            Parsed :class:`~api2mcp.core.ir_schema.APISpec`.

        Raises:
            :class:`~api2mcp.core.exceptions.ParseException`: On invalid input.
            :class:`ImportError`: If ``graphql-core`` is not installed.
        """
        text = await self._load_source(source)

        # Determine title from kwargs or source path
        title: str = kwargs.get("title", "")
        if not title:
            if isinstance(source, Path):
                title = source.stem.replace("_", " ").replace("-", " ").title()
            elif isinstance(source, str) and not source.startswith(("http://", "https://")):
                stem = Path(source).stem
                title = stem.replace("_", " ").replace("-", " ").title()
            else:
                title = "GraphQL API"

        base_url: str = kwargs.get("base_url", "")
        endpoint_url: str = kwargs.get("endpoint_url", self._graphql_endpoint)
        gql_parser_endpoint = endpoint_url

        format_, data = self._detect_format(text)

        if format_ == "introspection":
            assert data is not None
            spec = _IntrospectionParser(gql_parser_endpoint).parse(data, title=title)
        else:
            spec = _SDLParser(gql_parser_endpoint).parse(text, title=title)

        # Attach server info if base_url was provided
        if base_url:
            gql_url = base_url.rstrip("/") + endpoint_url
            spec.servers = [ServerInfo(url=gql_url, description="GraphQL endpoint")]
            spec.base_url = gql_url

        return spec

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    @staticmethod
    def _detect_format(
        text: str,
    ) -> tuple[str, dict[str, Any] | None]:
        """Detect whether *text* is SDL or an introspection JSON result.

        Returns:
            A ``(format, data)`` tuple where *format* is ``"sdl"`` or
            ``"introspection"`` and *data* is the parsed dict (introspection
            only; ``None`` for SDL).
        """
        stripped = text.lstrip()
        if stripped.startswith(("{", "[")):
            # Looks like JSON — try to parse
            try:
                data = json.loads(text)
                if isinstance(data, dict) and (
                    "__schema" in data
                    or ("data" in data and "__schema" in (data.get("data") or {}))
                ):
                    return "introspection", data
            except json.JSONDecodeError as exc:
                logger.debug("Ignoring GraphQL parse detail: %s", exc)

        return "sdl", None

    @staticmethod
    async def _load_source(source: str | Path) -> str:
        """Load source content from a file path or URL."""
        if isinstance(source, Path):
            if not source.exists():
                raise ParseException(f"File not found: {source}")
            return source.read_text(encoding="utf-8")

        if isinstance(source, str):
            if source.startswith(("http://", "https://")):
                async with httpx.AsyncClient() as client:
                    resp = await client.get(source, timeout=30, follow_redirects=True)
                    resp.raise_for_status()
                    return resp.text
            # Check if it's a file path or raw SDL/JSON string
            path = Path(source)
            if path.suffix in (".graphql", ".gql", ".json") and path.exists():
                return path.read_text(encoding="utf-8")
            # Treat as raw SDL/JSON content
            return source

        raise ParseException(f"Unsupported source type: {type(source).__name__}")

Functions

detect(content)

Return True if content looks like a GraphQL introspection result.

Source code in src/api2mcp/parsers/graphql.py
def detect(self, content: dict[str, Any]) -> bool:
    """Return ``True`` if *content* looks like a GraphQL introspection result."""
    if "__schema" in content:
        return True
    if "data" in content and isinstance(content["data"], dict):
        return "__schema" in content["data"]
    return False

validate(source, **_kwargs) async

Validate a GraphQL schema without producing full IR.

Source code in src/api2mcp/parsers/graphql.py
async def validate(self, source: str | Path, **_kwargs: Any) -> list[ParseError]:
    """Validate a GraphQL schema without producing full IR."""
    text = await self._load_source(source)
    format_, data = self._detect_format(text)

    if format_ == "introspection":
        assert data is not None
        return _IntrospectionParser(self._graphql_endpoint).validate(data)
    else:
        return _SDLParser(self._graphql_endpoint).validate(text)

parse(source, **kwargs) async

Parse a GraphQL schema into an IR :class:APISpec.

Parameters:

Name Type Description Default
source str | Path

File path, URL, or raw SDL/JSON string.

required
**kwargs Any

Optional:

  • title (str) — API title. Defaults to the filename stem or "GraphQL API".
  • endpoint_url (str) — overrides the GraphQL HTTP endpoint URL stored in the IR.
  • base_url (str) — base URL of the GraphQL server (e.g. "https://api.example.com").
{}

Returns:

Name Type Description
Parsed APISpec

class:~api2mcp.core.ir_schema.APISpec.

Raises:

Type Description

class:~api2mcp.core.exceptions.ParseException: On invalid input.

class:ImportError: If graphql-core is not installed.

Source code in src/api2mcp/parsers/graphql.py
async def parse(self, source: str | Path, **kwargs: Any) -> APISpec:
    """Parse a GraphQL schema into an IR :class:`APISpec`.

    Args:
        source: File path, URL, or raw SDL/JSON string.
        **kwargs: Optional:

            * ``title`` (str) — API title.  Defaults to the filename
              stem or ``"GraphQL API"``.
            * ``endpoint_url`` (str) — overrides the GraphQL HTTP endpoint
              URL stored in the IR.
            * ``base_url`` (str) — base URL of the GraphQL server (e.g.
              ``"https://api.example.com"``).

    Returns:
        Parsed :class:`~api2mcp.core.ir_schema.APISpec`.

    Raises:
        :class:`~api2mcp.core.exceptions.ParseException`: On invalid input.
        :class:`ImportError`: If ``graphql-core`` is not installed.
    """
    text = await self._load_source(source)

    # Determine title from kwargs or source path
    title: str = kwargs.get("title", "")
    if not title:
        if isinstance(source, Path):
            title = source.stem.replace("_", " ").replace("-", " ").title()
        elif isinstance(source, str) and not source.startswith(("http://", "https://")):
            stem = Path(source).stem
            title = stem.replace("_", " ").replace("-", " ").title()
        else:
            title = "GraphQL API"

    base_url: str = kwargs.get("base_url", "")
    endpoint_url: str = kwargs.get("endpoint_url", self._graphql_endpoint)
    gql_parser_endpoint = endpoint_url

    format_, data = self._detect_format(text)

    if format_ == "introspection":
        assert data is not None
        spec = _IntrospectionParser(gql_parser_endpoint).parse(data, title=title)
    else:
        spec = _SDLParser(gql_parser_endpoint).parse(text, title=title)

    # Attach server info if base_url was provided
    if base_url:
        gql_url = base_url.rstrip("/") + endpoint_url
        spec.servers = [ServerInfo(url=gql_url, description="GraphQL endpoint")]
        spec.base_url = gql_url

    return spec

api2mcp.parsers.postman

api2mcp.parsers.postman.PostmanParser

Bases: BaseParser

Parse Postman Collection v2.1 files into an :class:APISpec.

Supports: - Collection v2.1 (and v2.0 with a best-effort mapping) - Variable substitution ({{varName}}) - Folder hierarchy → endpoint tags - Collection / folder / request-level auth

Source code in src/api2mcp/parsers/postman.py
class PostmanParser(BaseParser):
    """Parse Postman Collection v2.1 files into an :class:`APISpec`.

    Supports:
    - Collection v2.1 (and v2.0 with a best-effort mapping)
    - Variable substitution (``{{varName}}``)
    - Folder hierarchy → endpoint tags
    - Collection / folder / request-level auth
    """

    # ------------------------------------------------------------------
    # BaseParser interface
    # ------------------------------------------------------------------

    def detect(self, content: dict[str, Any]) -> bool:
        """Return True if *content* looks like a Postman Collection."""
        # Must have "info" with a schema URL and an "item" array
        info = content.get("info")
        if not isinstance(info, dict):
            return False
        schema_url = str(info.get("schema", ""))
        has_schema = "getpostman.com" in schema_url or _SCHEMA_V21 in schema_url or _SCHEMA_V20 in schema_url
        has_items = "item" in content
        return has_schema and has_items

    async def validate(self, source: str | Path, **_kwargs: Any) -> list[ParseError]:
        """Validate a Postman Collection document.

        Returns a list of :class:`ParseError`; empty means valid.
        """
        errors: list[ParseError] = []
        try:
            doc = await self._load_doc(source)
        except ParseException as exc:
            if exc.errors:
                return list(exc.errors)
            return [ParseError(str(exc))]

        if not self.detect(doc):
            errors.append(
                ParseError(
                    "Document does not appear to be a Postman Collection v2.x "
                    "(expected 'info.schema' containing getpostman.com and an 'item' array)"
                )
            )

        info = doc.get("info", {})
        if not isinstance(info, dict) or not info.get("name"):
            errors.append(ParseError("Missing or empty 'info.name' field", path="/info/name"))

        if "item" not in doc:
            errors.append(ParseError("Missing required 'item' array", path="/item"))

        return errors

    async def parse(
        self,
        source: str | Path,
        *,
        title: str | None = None,
        **_kwargs: Any,
    ) -> APISpec:
        """Parse a Postman Collection and return an :class:`APISpec`.

        Args:
            source: File path or raw JSON string.
            title:  Override the collection name as the API title.

        Returns:
            Parsed :class:`APISpec` with ``source_format = "postman"``.

        Raises:
            ParseException: If the document cannot be parsed or is not a
                Postman Collection.
        """
        doc = await self._load_doc(source)

        info = doc.get("info", {})
        schema_url = info.get("schema", "") if isinstance(info, dict) else ""
        # Detect v1 format (no info.schema, has top-level 'requests' key)
        if "requests" in doc and not schema_url:
            raise ParseException(
                "Unsupported Postman Collection version (v1 detected). "
                "Please export your collection as v2.1 format."
            )

        if not self.detect(doc):
            raise ParseException(
                "PostmanParser requires a Postman Collection v2.x document "
                "(expected 'info.schema' containing getpostman.com)"
            )

        # --- Collection metadata ---
        info = doc.get("info", {})
        collection_name = title or str(info.get("name", "Postman Collection"))
        collection_desc = str(info.get("description", ""))
        collection_version = str(info.get("version", "1.0.0"))

        # --- Variables ---
        variables = extract_variables(doc.get("variable", []))

        # --- Collection-level auth ---
        collection_auth = parse_auth(doc.get("auth"))

        # --- Walk all items ---
        endpoints: list[Endpoint] = []
        base_urls: set[str] = set()
        _walk_items(
            doc.get("item", []),
            [],
            variables,
            collection_auth,
            endpoints,
            base_urls,
        )

        # --- Auth schemes ---
        auth_schemes: list[AuthScheme] = []
        if collection_auth:
            auth_schemes.append(collection_auth)
        # Deduplicate auth schemes from endpoints
        seen_auth: set[str] = {a.name for a in auth_schemes}
        for ep in endpoints:
            for sec in ep.security:
                for auth_name in sec:
                    if auth_name not in seen_auth:
                        # We can't reconstruct the full scheme here; add a placeholder
                        seen_auth.add(auth_name)

        # --- Servers ---
        servers: list[ServerInfo] = []
        for url in sorted(base_urls):
            servers.append(ServerInfo(url=url))
        base_url = servers[0].url if servers else ""

        return APISpec(
            title=collection_name,
            version=collection_version,
            description=collection_desc,
            base_url=base_url,
            servers=servers,
            endpoints=endpoints,
            auth_schemes=auth_schemes,
            models={},  # Postman has no explicit schema definitions
            source_format="postman",
        )

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    async def _load_doc(self, source: str | Path) -> dict[str, Any]:
        """Load and parse a Postman Collection from various source types."""
        if isinstance(source, Path):
            if not source.exists():
                raise ParseException(f"File not found: {source}")
            text = source.read_text(encoding="utf-8")
            return self._parse_json(text, str(source))

        text = str(source)

        # Raw JSON string
        stripped = text.lstrip()
        if stripped.startswith("{"):
            return self._parse_json(text, "<string>")

        # File path string
        path = Path(text)
        if path.exists():
            content = path.read_text(encoding="utf-8")
            return self._parse_json(content, str(path))

        raise ParseException(f"Cannot resolve source: {text!r}")

    @staticmethod
    def _parse_json(text: str, source_name: str) -> dict[str, Any]:
        """Parse JSON text into a dict."""
        try:
            data = json.loads(text)
        except json.JSONDecodeError as exc:
            raise ParseException(
                f"Failed to parse {source_name}: {exc}",
                errors=[ParseError(str(exc), line=exc.lineno, column=exc.colno)],
            ) from exc

        if not isinstance(data, dict):
            raise ParseException(
                f"Expected JSON object at root of {source_name}, got {type(data).__name__}"
            )
        return data

Functions

detect(content)

Return True if content looks like a Postman Collection.

Source code in src/api2mcp/parsers/postman.py
def detect(self, content: dict[str, Any]) -> bool:
    """Return True if *content* looks like a Postman Collection."""
    # Must have "info" with a schema URL and an "item" array
    info = content.get("info")
    if not isinstance(info, dict):
        return False
    schema_url = str(info.get("schema", ""))
    has_schema = "getpostman.com" in schema_url or _SCHEMA_V21 in schema_url or _SCHEMA_V20 in schema_url
    has_items = "item" in content
    return has_schema and has_items

validate(source, **_kwargs) async

Validate a Postman Collection document.

Returns a list of :class:ParseError; empty means valid.

Source code in src/api2mcp/parsers/postman.py
async def validate(self, source: str | Path, **_kwargs: Any) -> list[ParseError]:
    """Validate a Postman Collection document.

    Returns a list of :class:`ParseError`; empty means valid.
    """
    errors: list[ParseError] = []
    try:
        doc = await self._load_doc(source)
    except ParseException as exc:
        if exc.errors:
            return list(exc.errors)
        return [ParseError(str(exc))]

    if not self.detect(doc):
        errors.append(
            ParseError(
                "Document does not appear to be a Postman Collection v2.x "
                "(expected 'info.schema' containing getpostman.com and an 'item' array)"
            )
        )

    info = doc.get("info", {})
    if not isinstance(info, dict) or not info.get("name"):
        errors.append(ParseError("Missing or empty 'info.name' field", path="/info/name"))

    if "item" not in doc:
        errors.append(ParseError("Missing required 'item' array", path="/item"))

    return errors

parse(source, *, title=None, **_kwargs) async

Parse a Postman Collection and return an :class:APISpec.

Parameters:

Name Type Description Default
source str | Path

File path or raw JSON string.

required
title str | None

Override the collection name as the API title.

None

Returns:

Name Type Description
Parsed APISpec

class:APISpec with source_format = "postman".

Raises:

Type Description
ParseException

If the document cannot be parsed or is not a Postman Collection.

Source code in src/api2mcp/parsers/postman.py
async def parse(
    self,
    source: str | Path,
    *,
    title: str | None = None,
    **_kwargs: Any,
) -> APISpec:
    """Parse a Postman Collection and return an :class:`APISpec`.

    Args:
        source: File path or raw JSON string.
        title:  Override the collection name as the API title.

    Returns:
        Parsed :class:`APISpec` with ``source_format = "postman"``.

    Raises:
        ParseException: If the document cannot be parsed or is not a
            Postman Collection.
    """
    doc = await self._load_doc(source)

    info = doc.get("info", {})
    schema_url = info.get("schema", "") if isinstance(info, dict) else ""
    # Detect v1 format (no info.schema, has top-level 'requests' key)
    if "requests" in doc and not schema_url:
        raise ParseException(
            "Unsupported Postman Collection version (v1 detected). "
            "Please export your collection as v2.1 format."
        )

    if not self.detect(doc):
        raise ParseException(
            "PostmanParser requires a Postman Collection v2.x document "
            "(expected 'info.schema' containing getpostman.com)"
        )

    # --- Collection metadata ---
    info = doc.get("info", {})
    collection_name = title or str(info.get("name", "Postman Collection"))
    collection_desc = str(info.get("description", ""))
    collection_version = str(info.get("version", "1.0.0"))

    # --- Variables ---
    variables = extract_variables(doc.get("variable", []))

    # --- Collection-level auth ---
    collection_auth = parse_auth(doc.get("auth"))

    # --- Walk all items ---
    endpoints: list[Endpoint] = []
    base_urls: set[str] = set()
    _walk_items(
        doc.get("item", []),
        [],
        variables,
        collection_auth,
        endpoints,
        base_urls,
    )

    # --- Auth schemes ---
    auth_schemes: list[AuthScheme] = []
    if collection_auth:
        auth_schemes.append(collection_auth)
    # Deduplicate auth schemes from endpoints
    seen_auth: set[str] = {a.name for a in auth_schemes}
    for ep in endpoints:
        for sec in ep.security:
            for auth_name in sec:
                if auth_name not in seen_auth:
                    # We can't reconstruct the full scheme here; add a placeholder
                    seen_auth.add(auth_name)

    # --- Servers ---
    servers: list[ServerInfo] = []
    for url in sorted(base_urls):
        servers.append(ServerInfo(url=url))
    base_url = servers[0].url if servers else ""

    return APISpec(
        title=collection_name,
        version=collection_version,
        description=collection_desc,
        base_url=base_url,
        servers=servers,
        endpoints=endpoints,
        auth_schemes=auth_schemes,
        models={},  # Postman has no explicit schema definitions
        source_format="postman",
    )

Generators

api2mcp.generators.tool

api2mcp.generators.tool

MCP Tool Generator — IR to MCP tool definitions (TASK-016, TASK-017).

Consumes IR (APISpec) and produces MCP-compliant tool definitions. Supports: - JSON Schema input_schema generation - Jinja2 template-based server code generation - Edge cases: no parameters, file uploads, multipart, empty responses

Classes

ToolGenerator

Generates MCP tool definitions and server code from IR.

Parameters:

Name Type Description Default
max_depth int

Maximum nesting depth for schema simplification.

5
template_dir Path | None

Override the Jinja2 template directory.

None
Source code in src/api2mcp/generators/tool.py
class ToolGenerator:
    """Generates MCP tool definitions and server code from IR.

    Args:
        max_depth: Maximum nesting depth for schema simplification.
        template_dir: Override the Jinja2 template directory.
    """

    def __init__(
        self,
        max_depth: int = 5,
        template_dir: Path | None = None,
    ) -> None:
        self.max_depth = max_depth
        self.template_dir = template_dir or _TEMPLATE_DIR
        self._jinja_env: jinja2.Environment | None = None

    @property
    def jinja_env(self) -> jinja2.Environment:
        """Lazily create Jinja2 environment."""
        if self._jinja_env is None:
            self._jinja_env = jinja2.Environment(
                loader=jinja2.FileSystemLoader(str(self.template_dir)),
                autoescape=False,
                trim_blocks=True,
                lstrip_blocks=True,
                keep_trailing_newline=True,
            )
        return self._jinja_env

    def generate(self, api_spec: APISpec) -> list[MCPToolDef]:
        """Generate MCP tool definitions from an IR APISpec.

        Args:
            api_spec: Parsed API specification (IR).

        Returns:
            List of MCPToolDef objects, one per endpoint.

        Raises:
            GeneratorException: If generation fails for any endpoint.
        """
        # Emit PRE_GENERATE hook
        try:
            from api2mcp.plugins import get_hook_manager
            from api2mcp.plugins.hooks import PRE_GENERATE
            get_hook_manager().emit_sync(PRE_GENERATE, api_spec=api_spec)
        except Exception:  # noqa: BLE001
            pass  # plugins are optional

        if not api_spec.endpoints:
            logger.warning("API spec '%s' has no endpoints — no tools generated", api_spec.title)
            return []

        # Resolve names with collision handling
        name_map = resolve_collisions(api_spec.endpoints)

        tools: list[MCPToolDef] = []
        for endpoint in api_spec.endpoints:
            try:
                tool = self._endpoint_to_tool(endpoint, name_map)
                tools.append(tool)
            except Exception as exc:
                raise GeneratorException(
                    f"Failed to generate tool for {endpoint.method.value} {endpoint.path}: {exc}",
                    endpoint=f"{endpoint.method.value} {endpoint.path}",
                ) from exc

        logger.info(
            "Generated %d tools from '%s' (v%s)",
            len(tools),
            api_spec.title,
            api_spec.version,
        )

        # Emit POST_GENERATE hook
        try:
            from api2mcp.plugins import get_hook_manager
            from api2mcp.plugins.hooks import POST_GENERATE
            get_hook_manager().emit_sync(POST_GENERATE, tools=tools)
        except Exception:  # noqa: BLE001
            pass  # plugins are optional

        return tools

    def generate_server_code(
        self,
        api_spec: APISpec,
        output_dir: Path,
        server_name: str | None = None,
    ) -> list[Path]:
        """Generate Python MCP server code from IR using Jinja2 templates.

        Args:
            api_spec: Parsed API specification.
            output_dir: Directory to write generated files.
            server_name: Override the server name (defaults to sanitized API title).

        Returns:
            List of generated file paths.
        """
        tools = self.generate(api_spec)
        if not tools:
            return []

        name = server_name or sanitize_name(api_spec.title)
        output_dir.mkdir(parents=True, exist_ok=True)

        generated: list[Path] = []

        # Generate server.py
        server_template = self.jinja_env.get_template("server.py.j2")
        server_code = server_template.render(
            api_spec=api_spec,
            tools=tools,
            server_name=name,
        )
        server_path = output_dir / "server.py"
        server_path.write_text(server_code, encoding="utf-8")
        generated.append(server_path)
        logger.info("Generated server code: %s", server_path)

        return generated

    def _endpoint_to_tool(
        self,
        endpoint: Endpoint,
        name_map: dict[str, str],
    ) -> MCPToolDef:
        """Convert a single IR endpoint to an MCPToolDef."""
        # Look up resolved name
        key = endpoint.operation_id or f"{endpoint.method.value} {endpoint.path}"
        name = name_map.get(key, derive_tool_name(endpoint))

        description = self._build_description(endpoint)
        input_schema = build_input_schema(endpoint, max_depth=self.max_depth)

        # Track body parameter names for template rendering
        body_param_names = self._extract_body_param_names(endpoint, input_schema)

        metadata: dict[str, Any] = {}
        if endpoint.tags:
            metadata["tags"] = endpoint.tags
        if endpoint.deprecated:
            metadata["deprecated"] = True

        return MCPToolDef(
            name=name,
            description=description,
            input_schema=input_schema,
            endpoint=endpoint,
            body_param_names=body_param_names,
            metadata=metadata,
        )

    def _build_description(self, endpoint: Endpoint) -> str:
        """Build a human-readable tool description from endpoint metadata."""
        parts: list[str] = []

        if endpoint.summary:
            parts.append(endpoint.summary)
        elif endpoint.description:
            # Use first sentence of description
            first_sentence = endpoint.description.split(".")[0].strip()
            if first_sentence:
                parts.append(first_sentence)
        else:
            # Fallback: generate from method + path
            parts.append(f"{endpoint.method.value} {endpoint.path}")

        if endpoint.deprecated:
            parts.append("[DEPRECATED]")

        return " ".join(parts)

    def _extract_body_param_names(
        self,
        endpoint: Endpoint,
        input_schema: dict[str, Any],
    ) -> list[str]:
        """Extract the parameter names that came from the request body.

        Used by templates to know which args to send as JSON body vs query/path params.
        """
        if endpoint.request_body is None:
            return []

        # Collect non-body parameter names
        non_body_params = {
            p.name
            for p in endpoint.parameters
            if p.location != ParameterLocation.BODY
        }

        # Body params = all input_schema properties minus non-body params
        all_props = set(input_schema.get("properties", {}).keys())
        body_props = all_props - non_body_params
        return sorted(body_props)
Attributes
jinja_env property

Lazily create Jinja2 environment.

Functions
generate(api_spec)

Generate MCP tool definitions from an IR APISpec.

Parameters:

Name Type Description Default
api_spec APISpec

Parsed API specification (IR).

required

Returns:

Type Description
list[MCPToolDef]

List of MCPToolDef objects, one per endpoint.

Raises:

Type Description
GeneratorException

If generation fails for any endpoint.

Source code in src/api2mcp/generators/tool.py
def generate(self, api_spec: APISpec) -> list[MCPToolDef]:
    """Generate MCP tool definitions from an IR APISpec.

    Args:
        api_spec: Parsed API specification (IR).

    Returns:
        List of MCPToolDef objects, one per endpoint.

    Raises:
        GeneratorException: If generation fails for any endpoint.
    """
    # Emit PRE_GENERATE hook
    try:
        from api2mcp.plugins import get_hook_manager
        from api2mcp.plugins.hooks import PRE_GENERATE
        get_hook_manager().emit_sync(PRE_GENERATE, api_spec=api_spec)
    except Exception:  # noqa: BLE001
        pass  # plugins are optional

    if not api_spec.endpoints:
        logger.warning("API spec '%s' has no endpoints — no tools generated", api_spec.title)
        return []

    # Resolve names with collision handling
    name_map = resolve_collisions(api_spec.endpoints)

    tools: list[MCPToolDef] = []
    for endpoint in api_spec.endpoints:
        try:
            tool = self._endpoint_to_tool(endpoint, name_map)
            tools.append(tool)
        except Exception as exc:
            raise GeneratorException(
                f"Failed to generate tool for {endpoint.method.value} {endpoint.path}: {exc}",
                endpoint=f"{endpoint.method.value} {endpoint.path}",
            ) from exc

    logger.info(
        "Generated %d tools from '%s' (v%s)",
        len(tools),
        api_spec.title,
        api_spec.version,
    )

    # Emit POST_GENERATE hook
    try:
        from api2mcp.plugins import get_hook_manager
        from api2mcp.plugins.hooks import POST_GENERATE
        get_hook_manager().emit_sync(POST_GENERATE, tools=tools)
    except Exception:  # noqa: BLE001
        pass  # plugins are optional

    return tools
generate_server_code(api_spec, output_dir, server_name=None)

Generate Python MCP server code from IR using Jinja2 templates.

Parameters:

Name Type Description Default
api_spec APISpec

Parsed API specification.

required
output_dir Path

Directory to write generated files.

required
server_name str | None

Override the server name (defaults to sanitized API title).

None

Returns:

Type Description
list[Path]

List of generated file paths.

Source code in src/api2mcp/generators/tool.py
def generate_server_code(
    self,
    api_spec: APISpec,
    output_dir: Path,
    server_name: str | None = None,
) -> list[Path]:
    """Generate Python MCP server code from IR using Jinja2 templates.

    Args:
        api_spec: Parsed API specification.
        output_dir: Directory to write generated files.
        server_name: Override the server name (defaults to sanitized API title).

    Returns:
        List of generated file paths.
    """
    tools = self.generate(api_spec)
    if not tools:
        return []

    name = server_name or sanitize_name(api_spec.title)
    output_dir.mkdir(parents=True, exist_ok=True)

    generated: list[Path] = []

    # Generate server.py
    server_template = self.jinja_env.get_template("server.py.j2")
    server_code = server_template.render(
        api_spec=api_spec,
        tools=tools,
        server_name=name,
    )
    server_path = output_dir / "server.py"
    server_path.write_text(server_code, encoding="utf-8")
    generated.append(server_path)
    logger.info("Generated server code: %s", server_path)

    return generated

MCPToolDef dataclass

A generated MCP tool definition.

Represents a single tool in the MCP tools/list response.

Source code in src/api2mcp/generators/tool.py
@dataclass
class MCPToolDef:
    """A generated MCP tool definition.

    Represents a single tool in the MCP tools/list response.
    """

    name: str
    description: str
    input_schema: dict[str, Any]
    endpoint: Endpoint
    body_param_names: list[str] = field(default_factory=list)
    metadata: dict[str, Any] = field(default_factory=dict)
    timeout: float | None = None

    def to_mcp_dict(self) -> dict[str, Any]:
        """Convert to the dict format expected by MCP protocol."""
        return {
            "name": self.name,
            "description": self.description,
            "inputSchema": self.input_schema,
        }
Functions
to_mcp_dict()

Convert to the dict format expected by MCP protocol.

Source code in src/api2mcp/generators/tool.py
def to_mcp_dict(self) -> dict[str, Any]:
    """Convert to the dict format expected by MCP protocol."""
    return {
        "name": self.name,
        "description": self.description,
        "inputSchema": self.input_schema,
    }

Testing

api2mcp.testing

api2mcp.testing

API2MCP Testing Framework — F6.3.

Built-in testing utilities for generated MCP servers:

  • :class:MCPTestClient — in-process tool execution testing
  • :class:MockResponseGenerator — mock API responses from spec
  • :class:MockScenario — single mock scenario definition
  • :class:SnapshotStore — snapshot testing for generated output
  • :class:CoverageReporter — tool execution coverage tracking
  • :class:CoverageReport — coverage report snapshot
  • :class:ToolResult — result of a tool call

Classes

MCPTestClient

In-process MCP test client backed by mock API responses.

Parameters:

Name Type Description Default
server_dir str | Path

Directory containing spec.yaml (or openapi.yaml).

'.'
scenario str

Default mock scenario name to use ("success").

'success'
seed int | None

Random seed forwarded to :class:MockResponseGenerator.

None

The client tracks which tools are called; use :attr:call_log for assertions and integrate with :class:~api2mcp.testing.coverage.CoverageReporter.

Source code in src/api2mcp/testing/client.py
class MCPTestClient:
    """In-process MCP test client backed by mock API responses.

    Args:
        server_dir: Directory containing ``spec.yaml`` (or ``openapi.yaml``).
        scenario:   Default mock scenario name to use (``"success"``).
        seed:       Random seed forwarded to :class:`MockResponseGenerator`.

    The client tracks which tools are called; use :attr:`call_log` for
    assertions and integrate with :class:`~api2mcp.testing.coverage.CoverageReporter`.
    """

    _SPEC_CANDIDATES = ["spec.yaml", "openapi.yaml", "openapi.yml", "openapi.json"]

    def __init__(
        self,
        server_dir: str | Path = ".",
        *,
        scenario: str = "success",
        seed: int | None = None,
    ) -> None:
        self.server_dir = Path(server_dir)
        self.default_scenario = scenario
        self._seed = seed

        self._api_spec: APISpec | None = None
        self._tools: list[MCPToolDef] = []
        self._mock_gen: MockResponseGenerator | None = None
        self._call_log: list[ToolResult] = []

    # ------------------------------------------------------------------
    # Context manager
    # ------------------------------------------------------------------

    async def __aenter__(self) -> MCPTestClient:
        await self._load()
        return self

    async def __aexit__(self, *_: object) -> None:
        logger.debug("Ignoring test client error: %s", _)

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------

    async def list_tools(self) -> list[dict[str, Any]]:
        """Return all MCP tool definitions (name + description + inputSchema).

        Returns:
            List of dicts in MCP ``tools/list`` format.
        """
        self._ensure_loaded()
        return [t.to_mcp_dict() for t in self._tools]

    async def call_tool(
        self,
        tool_name: str,
        arguments: dict[str, Any] | None = None,
        *,
        scenario: str | None = None,
    ) -> ToolResult:
        """Execute a tool against a mock API response.

        Args:
            tool_name:  Name of the MCP tool to call.
            arguments:  Input arguments (validated against the tool's schema).
            scenario:   Override the default mock scenario for this call.

        Returns:
            :class:`ToolResult` with status, content, and metadata.

        Raises:
            KeyError: If *tool_name* is not found in the loaded tools.
            ValueError: If required arguments are missing.
        """
        self._ensure_loaded()
        assert self._mock_gen is not None

        # Validate tool exists
        tool = self._find_tool(tool_name)
        if tool is None:
            available = [t.name for t in self._tools]
            raise KeyError(
                f"Tool {tool_name!r} not found. Available: {available}"
            )

        # Validate required arguments
        arguments = arguments or {}
        self._validate_arguments(tool, arguments)

        # Pick mock scenario
        scenario_name = scenario or self.default_scenario
        mock_scenario = self._pick_scenario(tool_name, scenario_name)

        # Build result
        is_success = 200 <= mock_scenario.status_code < 300
        result = ToolResult(
            tool_name=tool_name,
            status="success" if is_success else "error",
            content=mock_scenario.body or {},
            status_code=mock_scenario.status_code,
            scenario=mock_scenario,
        )

        self._call_log.append(result)
        return result

    @property
    def call_log(self) -> list[ToolResult]:
        """All :class:`ToolResult` objects produced since the client was created."""
        return list(self._call_log)

    @property
    def api_spec(self) -> APISpec:
        """The loaded :class:`~api2mcp.core.ir_schema.APISpec`."""
        self._ensure_loaded()
        assert self._api_spec is not None
        return self._api_spec

    @property
    def tools(self) -> list[MCPToolDef]:
        """All generated :class:`~api2mcp.generators.tool.MCPToolDef` objects."""
        self._ensure_loaded()
        return list(self._tools)

    def called_tool_names(self) -> list[str]:
        """Return the names of every tool that has been called (with duplicates)."""
        return [r.tool_name for r in self._call_log]

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    async def _load(self) -> None:
        spec_path = self._find_spec_file()
        parser = OpenAPIParser()
        self._api_spec = await parser.parse(spec_path)
        generator = ToolGenerator()
        self._tools = generator.generate(self._api_spec)
        self._mock_gen = MockResponseGenerator(self._api_spec, seed=self._seed)

    def _find_spec_file(self) -> Path:
        for name in self._SPEC_CANDIDATES:
            path = self.server_dir / name
            if path.is_file():
                return path
        raise FileNotFoundError(
            f"No spec file found in {self.server_dir}. "
            f"Expected one of: {self._SPEC_CANDIDATES}"
        )

    def _ensure_loaded(self) -> None:
        if self._api_spec is None:
            raise RuntimeError(
                "MCPTestClient not loaded. Use 'async with MCPTestClient(...) as client:'"
            )

    def _find_tool(self, name: str) -> MCPToolDef | None:
        for tool in self._tools:
            if tool.name == name:
                return tool
        return None

    def _validate_arguments(
        self, tool: MCPToolDef, arguments: dict[str, Any]
    ) -> None:
        schema = tool.input_schema
        required = schema.get("required", [])
        for field_name in required:
            if field_name not in arguments:
                raise ValueError(
                    f"Tool {tool.name!r} requires argument {field_name!r}"
                )

    def _pick_scenario(self, tool_name: str, scenario_name: str) -> MockScenario:
        assert self._mock_gen is not None

        # Resolve to the tool so we can use its endpoint directly
        tool = self._find_tool(tool_name)
        if tool is not None:
            scenarios = self._mock_gen._generate_scenarios(tool.endpoint)
        else:
            try:
                scenarios = self._mock_gen.scenarios_for(tool_name)
            except KeyError:
                return MockScenario(name="success", status_code=200, body={"ok": True})

        # Find by name, fall back to first scenario
        for s in scenarios:
            if s.name == scenario_name:
                return s
        return scenarios[0]
Attributes
call_log property

All :class:ToolResult objects produced since the client was created.

api_spec property

The loaded :class:~api2mcp.core.ir_schema.APISpec.

tools property

All generated :class:~api2mcp.generators.tool.MCPToolDef objects.

Functions
list_tools() async

Return all MCP tool definitions (name + description + inputSchema).

Returns:

Type Description
list[dict[str, Any]]

List of dicts in MCP tools/list format.

Source code in src/api2mcp/testing/client.py
async def list_tools(self) -> list[dict[str, Any]]:
    """Return all MCP tool definitions (name + description + inputSchema).

    Returns:
        List of dicts in MCP ``tools/list`` format.
    """
    self._ensure_loaded()
    return [t.to_mcp_dict() for t in self._tools]
call_tool(tool_name, arguments=None, *, scenario=None) async

Execute a tool against a mock API response.

Parameters:

Name Type Description Default
tool_name str

Name of the MCP tool to call.

required
arguments dict[str, Any] | None

Input arguments (validated against the tool's schema).

None
scenario str | None

Override the default mock scenario for this call.

None

Returns:

Type Description
ToolResult

class:ToolResult with status, content, and metadata.

Raises:

Type Description
KeyError

If tool_name is not found in the loaded tools.

ValueError

If required arguments are missing.

Source code in src/api2mcp/testing/client.py
async def call_tool(
    self,
    tool_name: str,
    arguments: dict[str, Any] | None = None,
    *,
    scenario: str | None = None,
) -> ToolResult:
    """Execute a tool against a mock API response.

    Args:
        tool_name:  Name of the MCP tool to call.
        arguments:  Input arguments (validated against the tool's schema).
        scenario:   Override the default mock scenario for this call.

    Returns:
        :class:`ToolResult` with status, content, and metadata.

    Raises:
        KeyError: If *tool_name* is not found in the loaded tools.
        ValueError: If required arguments are missing.
    """
    self._ensure_loaded()
    assert self._mock_gen is not None

    # Validate tool exists
    tool = self._find_tool(tool_name)
    if tool is None:
        available = [t.name for t in self._tools]
        raise KeyError(
            f"Tool {tool_name!r} not found. Available: {available}"
        )

    # Validate required arguments
    arguments = arguments or {}
    self._validate_arguments(tool, arguments)

    # Pick mock scenario
    scenario_name = scenario or self.default_scenario
    mock_scenario = self._pick_scenario(tool_name, scenario_name)

    # Build result
    is_success = 200 <= mock_scenario.status_code < 300
    result = ToolResult(
        tool_name=tool_name,
        status="success" if is_success else "error",
        content=mock_scenario.body or {},
        status_code=mock_scenario.status_code,
        scenario=mock_scenario,
    )

    self._call_log.append(result)
    return result
called_tool_names()

Return the names of every tool that has been called (with duplicates).

Source code in src/api2mcp/testing/client.py
def called_tool_names(self) -> list[str]:
    """Return the names of every tool that has been called (with duplicates)."""
    return [r.tool_name for r in self._call_log]

ToolResult dataclass

Result of a :meth:MCPTestClient.call_tool invocation.

Attributes:

Name Type Description
tool_name str

Name of the called tool.

status str

"success" or "error".

content dict[str, Any] | list[Any]

Parsed response body (dict or list).

status_code int

HTTP status code from the mock scenario.

scenario MockScenario

The :class:MockScenario that was applied.

Source code in src/api2mcp/testing/client.py
@dataclass
class ToolResult:
    """Result of a :meth:`MCPTestClient.call_tool` invocation.

    Attributes:
        tool_name:   Name of the called tool.
        status:      ``"success"`` or ``"error"``.
        content:     Parsed response body (dict or list).
        status_code: HTTP status code from the mock scenario.
        scenario:    The :class:`MockScenario` that was applied.
    """

    tool_name: str
    status: str
    content: dict[str, Any] | list[Any]
    status_code: int
    scenario: MockScenario

CoverageReporter

Tracks and reports tool execution coverage.

Parameters:

Name Type Description Default
tools list[MCPToolDef]

List of :class:~api2mcp.generators.tool.MCPToolDef objects from the server under test.

required
Source code in src/api2mcp/testing/coverage.py
class CoverageReporter:
    """Tracks and reports tool execution coverage.

    Args:
        tools: List of :class:`~api2mcp.generators.tool.MCPToolDef` objects
               from the server under test.
    """

    def __init__(self, tools: list[MCPToolDef]) -> None:
        self._all_tools: set[str] = {t.name for t in tools}
        self._call_counts: dict[str, int] = {t.name: 0 for t in tools}

    # ------------------------------------------------------------------
    # Recording
    # ------------------------------------------------------------------

    def record_call(self, tool_name: str) -> None:
        """Record a single call to *tool_name*.

        Unknown tool names are silently ignored so the reporter can be used
        with partial tool sets.
        """
        if tool_name in self._call_counts:
            self._call_counts[tool_name] += 1

    def record_results(self, results: list[ToolResult]) -> None:
        """Bulk-record calls from a :attr:`~api2mcp.testing.client.MCPTestClient.call_log`.

        Args:
            results: List of :class:`~api2mcp.testing.client.ToolResult` objects.
        """
        for r in results:
            self.record_call(r.tool_name)

    def reset(self) -> None:
        """Reset all call counts to zero."""
        for name in self._call_counts:
            self._call_counts[name] = 0

    # ------------------------------------------------------------------
    # Reporting
    # ------------------------------------------------------------------

    def report(self) -> CoverageReport:
        """Build and return a :class:`CoverageReport`.

        Returns:
            Snapshot of coverage at the time of the call.
        """
        called = {name for name, count in self._call_counts.items() if count > 0}
        uncalled = self._all_tools - called
        total = len(self._all_tools)
        pct = (len(called) / total * 100.0) if total > 0 else 100.0
        return CoverageReport(
            total_tools=total,
            called_tools=called,
            uncalled_tools=uncalled,
            call_counts=dict(self._call_counts),
            percentage=pct,
        )

    @classmethod
    def from_client(cls, client: Any) -> CoverageReporter:
        """Create a reporter pre-populated from a :class:`~api2mcp.testing.client.MCPTestClient`.

        Args:
            client: A loaded :class:`MCPTestClient` instance.

        Returns:
            :class:`CoverageReporter` with all calls already recorded.
        """
        reporter = cls(client.tools)
        reporter.record_results(client.call_log)
        return reporter
Functions
record_call(tool_name)

Record a single call to tool_name.

Unknown tool names are silently ignored so the reporter can be used with partial tool sets.

Source code in src/api2mcp/testing/coverage.py
def record_call(self, tool_name: str) -> None:
    """Record a single call to *tool_name*.

    Unknown tool names are silently ignored so the reporter can be used
    with partial tool sets.
    """
    if tool_name in self._call_counts:
        self._call_counts[tool_name] += 1
record_results(results)

Bulk-record calls from a :attr:~api2mcp.testing.client.MCPTestClient.call_log.

Parameters:

Name Type Description Default
results list[ToolResult]

List of :class:~api2mcp.testing.client.ToolResult objects.

required
Source code in src/api2mcp/testing/coverage.py
def record_results(self, results: list[ToolResult]) -> None:
    """Bulk-record calls from a :attr:`~api2mcp.testing.client.MCPTestClient.call_log`.

    Args:
        results: List of :class:`~api2mcp.testing.client.ToolResult` objects.
    """
    for r in results:
        self.record_call(r.tool_name)
reset()

Reset all call counts to zero.

Source code in src/api2mcp/testing/coverage.py
def reset(self) -> None:
    """Reset all call counts to zero."""
    for name in self._call_counts:
        self._call_counts[name] = 0
report()

Build and return a :class:CoverageReport.

Returns:

Type Description
CoverageReport

Snapshot of coverage at the time of the call.

Source code in src/api2mcp/testing/coverage.py
def report(self) -> CoverageReport:
    """Build and return a :class:`CoverageReport`.

    Returns:
        Snapshot of coverage at the time of the call.
    """
    called = {name for name, count in self._call_counts.items() if count > 0}
    uncalled = self._all_tools - called
    total = len(self._all_tools)
    pct = (len(called) / total * 100.0) if total > 0 else 100.0
    return CoverageReport(
        total_tools=total,
        called_tools=called,
        uncalled_tools=uncalled,
        call_counts=dict(self._call_counts),
        percentage=pct,
    )
from_client(client) classmethod

Create a reporter pre-populated from a :class:~api2mcp.testing.client.MCPTestClient.

Parameters:

Name Type Description Default
client Any

A loaded :class:MCPTestClient instance.

required

Returns:

Type Description
CoverageReporter

class:CoverageReporter with all calls already recorded.

Source code in src/api2mcp/testing/coverage.py
@classmethod
def from_client(cls, client: Any) -> CoverageReporter:
    """Create a reporter pre-populated from a :class:`~api2mcp.testing.client.MCPTestClient`.

    Args:
        client: A loaded :class:`MCPTestClient` instance.

    Returns:
        :class:`CoverageReporter` with all calls already recorded.
    """
    reporter = cls(client.tools)
    reporter.record_results(client.call_log)
    return reporter

CoverageReport dataclass

Immutable snapshot of coverage at a point in time.

Attributes:

Name Type Description
total_tools int

Total number of tools in the server.

called_tools set[str]

Set of tool names that were called at least once.

uncalled_tools set[str]

Tools not yet covered.

call_counts dict[str, int]

How many times each tool was called.

percentage float

Coverage percentage (0–100).

Source code in src/api2mcp/testing/coverage.py
@dataclass
class CoverageReport:
    """Immutable snapshot of coverage at a point in time.

    Attributes:
        total_tools:   Total number of tools in the server.
        called_tools:  Set of tool names that were called at least once.
        uncalled_tools: Tools not yet covered.
        call_counts:   How many times each tool was called.
        percentage:    Coverage percentage (0–100).
    """

    total_tools: int
    called_tools: set[str]
    uncalled_tools: set[str]
    call_counts: dict[str, int]
    percentage: float

    def summary(self) -> str:
        """Return a human-readable one-line summary."""
        return (
            f"Tool coverage: {len(self.called_tools)}/{self.total_tools} "
            f"({self.percentage:.1f}%)"
        )

    def to_dict(self) -> dict[str, Any]:
        """Serialise to a plain dict (suitable for JSON output)."""
        return {
            "total_tools": self.total_tools,
            "called_tools": sorted(self.called_tools),
            "uncalled_tools": sorted(self.uncalled_tools),
            "call_counts": dict(sorted(self.call_counts.items())),
            "percentage": round(self.percentage, 2),
        }

    def assert_minimum(self, minimum: float) -> None:
        """Assert that coverage meets a minimum threshold.

        Args:
            minimum: Minimum required percentage (0–100).

        Raises:
            AssertionError: If coverage is below *minimum*.
        """
        if self.percentage < minimum:
            raise AssertionError(
                f"Coverage {self.percentage:.1f}% is below minimum {minimum:.1f}%.\n"
                f"Uncalled tools: {sorted(self.uncalled_tools)}"
            )
Functions
summary()

Return a human-readable one-line summary.

Source code in src/api2mcp/testing/coverage.py
def summary(self) -> str:
    """Return a human-readable one-line summary."""
    return (
        f"Tool coverage: {len(self.called_tools)}/{self.total_tools} "
        f"({self.percentage:.1f}%)"
    )
to_dict()

Serialise to a plain dict (suitable for JSON output).

Source code in src/api2mcp/testing/coverage.py
def to_dict(self) -> dict[str, Any]:
    """Serialise to a plain dict (suitable for JSON output)."""
    return {
        "total_tools": self.total_tools,
        "called_tools": sorted(self.called_tools),
        "uncalled_tools": sorted(self.uncalled_tools),
        "call_counts": dict(sorted(self.call_counts.items())),
        "percentage": round(self.percentage, 2),
    }
assert_minimum(minimum)

Assert that coverage meets a minimum threshold.

Parameters:

Name Type Description Default
minimum float

Minimum required percentage (0–100).

required

Raises:

Type Description
AssertionError

If coverage is below minimum.

Source code in src/api2mcp/testing/coverage.py
def assert_minimum(self, minimum: float) -> None:
    """Assert that coverage meets a minimum threshold.

    Args:
        minimum: Minimum required percentage (0–100).

    Raises:
        AssertionError: If coverage is below *minimum*.
    """
    if self.percentage < minimum:
        raise AssertionError(
            f"Coverage {self.percentage:.1f}% is below minimum {minimum:.1f}%.\n"
            f"Uncalled tools: {sorted(self.uncalled_tools)}"
        )

SnapshotStore

Manages snapshot files for generated MCP server output.

Each snapshot is stored as a <name>.json file inside snapshot_dir.

Parameters:

Name Type Description Default
snapshot_dir str | Path

Directory to read/write snapshot files. Created automatically if it does not exist.

'tests/snapshots/data'
update bool

When True every :meth:assert_match call writes the current output instead of comparing it. Equivalent to running tests with --snapshot-update.

False
Source code in src/api2mcp/testing/snapshot.py
class SnapshotStore:
    """Manages snapshot files for generated MCP server output.

    Each snapshot is stored as a ``<name>.json`` file inside *snapshot_dir*.

    Args:
        snapshot_dir: Directory to read/write snapshot files.
                      Created automatically if it does not exist.
        update:       When ``True`` every :meth:`assert_match` call writes
                      the current output instead of comparing it.  Equivalent
                      to running tests with ``--snapshot-update``.
    """

    def __init__(
        self,
        snapshot_dir: str | Path = "tests/snapshots/data",
        *,
        update: bool = False,
    ) -> None:
        self.snapshot_dir = Path(snapshot_dir)
        self.update = update
        self._snapshot_dir_created = False

    # ------------------------------------------------------------------
    # Core API
    # ------------------------------------------------------------------

    def assert_match(
        self,
        name: str,
        tools: list[MCPToolDef],
        *,
        update: bool | None = None,
    ) -> None:
        """Assert that *tools* matches the stored snapshot named *name*.

        If no snapshot exists yet (first run), it is created automatically
        and the assertion passes.

        Args:
            name:   Snapshot identifier (used as filename stem).
            tools:  Tool definitions to snapshot.
            update: Override :attr:`update` for this call only.

        Raises:
            :class:`SnapshotMismatch`: When the current output differs from
                the stored snapshot and *update* is ``False``.
        """
        do_update = update if update is not None else self.update
        current = _normalise(_tools_to_snapshot(tools))
        snapshot_path = self._path(name)

        if do_update or not snapshot_path.exists():
            self._write(snapshot_path, current)
            return

        stored = snapshot_path.read_text(encoding="utf-8")
        if stored != current:
            raise SnapshotMismatch(name, stored, current)

    def save(self, name: str, tools: list[MCPToolDef]) -> None:
        """Unconditionally overwrite the snapshot for *name*.

        Args:
            name:  Snapshot identifier.
            tools: Tool definitions to snapshot.
        """
        current = _normalise(_tools_to_snapshot(tools))
        self._write(self._path(name), current)

    def load(self, name: str) -> dict[str, Any]:
        """Load and return the stored snapshot dict for *name*.

        Args:
            name: Snapshot identifier.

        Returns:
            Parsed snapshot dict.

        Raises:
            FileNotFoundError: If the snapshot does not exist.
        """
        path = self._path(name)
        if not path.exists():
            raise FileNotFoundError(f"Snapshot {name!r} not found at {path}")
        return json.loads(path.read_text(encoding="utf-8"))

    def exists(self, name: str) -> bool:
        """Return ``True`` if a snapshot file exists for *name*."""
        return self._path(name).exists()

    def delete(self, name: str) -> None:
        """Delete the snapshot file for *name* (no-op if missing)."""
        path = self._path(name)
        if path.exists():
            path.unlink()

    def list_snapshots(self) -> list[str]:
        """Return the names of all snapshots stored in :attr:`snapshot_dir`."""
        if not self.snapshot_dir.is_dir():
            return []
        return sorted(p.stem for p in self.snapshot_dir.glob("*.json"))

    # ------------------------------------------------------------------
    # Helpers
    # ------------------------------------------------------------------

    def _path(self, name: str) -> Path:
        return self.snapshot_dir / f"{name}.json"

    def _write(self, path: Path, content: str) -> None:
        if not self._snapshot_dir_created:
            self.snapshot_dir.mkdir(parents=True, exist_ok=True)
            self._snapshot_dir_created = True
        path.write_text(content, encoding="utf-8")
Functions
assert_match(name, tools, *, update=None)

Assert that tools matches the stored snapshot named name.

If no snapshot exists yet (first run), it is created automatically and the assertion passes.

Parameters:

Name Type Description Default
name str

Snapshot identifier (used as filename stem).

required
tools list[MCPToolDef]

Tool definitions to snapshot.

required
update bool | None

Override :attr:update for this call only.

None

Raises:

Type Description

class:SnapshotMismatch: When the current output differs from the stored snapshot and update is False.

Source code in src/api2mcp/testing/snapshot.py
def assert_match(
    self,
    name: str,
    tools: list[MCPToolDef],
    *,
    update: bool | None = None,
) -> None:
    """Assert that *tools* matches the stored snapshot named *name*.

    If no snapshot exists yet (first run), it is created automatically
    and the assertion passes.

    Args:
        name:   Snapshot identifier (used as filename stem).
        tools:  Tool definitions to snapshot.
        update: Override :attr:`update` for this call only.

    Raises:
        :class:`SnapshotMismatch`: When the current output differs from
            the stored snapshot and *update* is ``False``.
    """
    do_update = update if update is not None else self.update
    current = _normalise(_tools_to_snapshot(tools))
    snapshot_path = self._path(name)

    if do_update or not snapshot_path.exists():
        self._write(snapshot_path, current)
        return

    stored = snapshot_path.read_text(encoding="utf-8")
    if stored != current:
        raise SnapshotMismatch(name, stored, current)
save(name, tools)

Unconditionally overwrite the snapshot for name.

Parameters:

Name Type Description Default
name str

Snapshot identifier.

required
tools list[MCPToolDef]

Tool definitions to snapshot.

required
Source code in src/api2mcp/testing/snapshot.py
def save(self, name: str, tools: list[MCPToolDef]) -> None:
    """Unconditionally overwrite the snapshot for *name*.

    Args:
        name:  Snapshot identifier.
        tools: Tool definitions to snapshot.
    """
    current = _normalise(_tools_to_snapshot(tools))
    self._write(self._path(name), current)
load(name)

Load and return the stored snapshot dict for name.

Parameters:

Name Type Description Default
name str

Snapshot identifier.

required

Returns:

Type Description
dict[str, Any]

Parsed snapshot dict.

Raises:

Type Description
FileNotFoundError

If the snapshot does not exist.

Source code in src/api2mcp/testing/snapshot.py
def load(self, name: str) -> dict[str, Any]:
    """Load and return the stored snapshot dict for *name*.

    Args:
        name: Snapshot identifier.

    Returns:
        Parsed snapshot dict.

    Raises:
        FileNotFoundError: If the snapshot does not exist.
    """
    path = self._path(name)
    if not path.exists():
        raise FileNotFoundError(f"Snapshot {name!r} not found at {path}")
    return json.loads(path.read_text(encoding="utf-8"))
exists(name)

Return True if a snapshot file exists for name.

Source code in src/api2mcp/testing/snapshot.py
def exists(self, name: str) -> bool:
    """Return ``True`` if a snapshot file exists for *name*."""
    return self._path(name).exists()
delete(name)

Delete the snapshot file for name (no-op if missing).

Source code in src/api2mcp/testing/snapshot.py
def delete(self, name: str) -> None:
    """Delete the snapshot file for *name* (no-op if missing)."""
    path = self._path(name)
    if path.exists():
        path.unlink()
list_snapshots()

Return the names of all snapshots stored in :attr:snapshot_dir.

Source code in src/api2mcp/testing/snapshot.py
def list_snapshots(self) -> list[str]:
    """Return the names of all snapshots stored in :attr:`snapshot_dir`."""
    if not self.snapshot_dir.is_dir():
        return []
    return sorted(p.stem for p in self.snapshot_dir.glob("*.json"))

MockResponseGenerator

Generate mock responses from an :class:~api2mcp.core.ir_schema.APISpec.

Given an API spec, this class produces per-endpoint mock scenarios that can be used by :class:~api2mcp.testing.client.MCPTestClient to simulate API responses without making real HTTP calls.

Parameters:

Name Type Description Default
api_spec APISpec

The parsed API specification.

required
seed int | None

Optional random seed for deterministic generation.

None

Usage::

generator = MockResponseGenerator(api_spec)
scenarios = generator.scenarios_for("list_issues")
# → [MockScenario("success", 200, ...), MockScenario("not_found", 404, ...)]
Source code in src/api2mcp/testing/mock_generator.py
class MockResponseGenerator:
    """Generate mock responses from an :class:`~api2mcp.core.ir_schema.APISpec`.

    Given an API spec, this class produces per-endpoint mock scenarios that
    can be used by :class:`~api2mcp.testing.client.MCPTestClient` to simulate
    API responses without making real HTTP calls.

    Args:
        api_spec: The parsed API specification.
        seed:     Optional random seed for deterministic generation.

    Usage::

        generator = MockResponseGenerator(api_spec)
        scenarios = generator.scenarios_for("list_issues")
        # → [MockScenario("success", 200, ...), MockScenario("not_found", 404, ...)]
    """

    def __init__(self, api_spec: APISpec, *, seed: int | None = None) -> None:
        self.api_spec = api_spec
        self._rng = random.Random(seed)

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------

    def scenarios_for(self, tool_name: str) -> list[MockScenario]:
        """Return mock scenarios for the endpoint matching *tool_name*.

        Args:
            tool_name: The MCP tool name (e.g. ``"list_issues"``).

        Returns:
            List of :class:`MockScenario` objects (success + error cases).

        Raises:
            KeyError: If no endpoint matches *tool_name*.
        """
        endpoint = self._find_endpoint(tool_name)
        if endpoint is None:
            raise KeyError(f"No endpoint found for tool name: {tool_name!r}")
        return self._generate_scenarios(endpoint)

    def all_scenarios(self) -> dict[str, list[MockScenario]]:
        """Return scenarios for every endpoint in the spec.

        Returns:
            Mapping of tool name → list of :class:`MockScenario`.
        """
        result: dict[str, list[MockScenario]] = {}
        for endpoint in self.api_spec.endpoints:
            tool_name = self._endpoint_to_tool_name(endpoint)
            result[tool_name] = self._generate_scenarios(endpoint)
        return result

    def success_body(self, tool_name: str) -> dict[str, Any] | list[Any]:
        """Return just the success scenario body for *tool_name*.

        Args:
            tool_name: MCP tool name.

        Returns:
            Mock response body.
        """
        scenarios = self.scenarios_for(tool_name)
        success = next((s for s in scenarios if s.status_code == 200), scenarios[0])
        return success.body or {}

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    def _find_endpoint(self, tool_name: str) -> Endpoint | None:
        for endpoint in self.api_spec.endpoints:
            if self._endpoint_to_tool_name(endpoint) == tool_name:
                return endpoint
        return None

    @staticmethod
    def _endpoint_to_tool_name(endpoint: Endpoint) -> str:
        """Derive a tool name from an endpoint — mirrors ToolGenerator logic."""
        method = endpoint.method.value.lower()
        path_parts = [p.strip("/").replace("{", "").replace("}", "") for p in endpoint.path.split("/") if p]
        name = "_".join([method] + path_parts)
        return name[:64]

    def _generate_scenarios(self, endpoint: Endpoint) -> list[MockScenario]:
        scenarios: list[MockScenario] = []

        # Success scenario
        scenarios.append(
            MockScenario(
                name="success",
                status_code=200,
                body=self._generate_body(endpoint),
            )
        )

        # 404 for GET endpoints with path parameters
        has_path_param = any(
            p.location == ParameterLocation.PATH
            for p in endpoint.parameters
        )
        if endpoint.method.value.upper() == "GET" and has_path_param:
            scenarios.append(
                MockScenario(
                    name="not_found",
                    status_code=404,
                    body={"error": "not_found", "message": "Resource not found"},
                )
            )

        # 401 for any authenticated endpoint
        scenarios.append(
            MockScenario(
                name="unauthorized",
                status_code=401,
                body={"error": "unauthorized", "message": "Authentication required"},
            )
        )

        # 422 / 400 for mutation endpoints
        if endpoint.method.value.upper() in ("POST", "PUT", "PATCH"):
            scenarios.append(
                MockScenario(
                    name="validation_error",
                    status_code=422,
                    body={
                        "error": "validation_error",
                        "detail": [{"field": "body", "msg": "Field required"}],
                    },
                )
            )

        return scenarios

    def _generate_body(self, endpoint: Endpoint) -> dict[str, Any] | list[Any]:
        """Generate a plausible mock response body."""
        # List endpoints return arrays
        if endpoint.method.value.upper() == "GET" and not any(
            p.location == ParameterLocation.PATH for p in endpoint.parameters
        ):
            return [self._generate_item(), self._generate_item()]
        return self._generate_item()

    def _generate_item(self) -> dict[str, Any]:
        return {
            "id": self._rng.randint(1, 9999),
            "name": "".join(self._rng.choices(string.ascii_lowercase, k=8)),
            "created_at": "2025-01-01T00:00:00Z",
            "status": self._rng.choice(["active", "inactive", "pending"]),
        }
Functions
scenarios_for(tool_name)

Return mock scenarios for the endpoint matching tool_name.

Parameters:

Name Type Description Default
tool_name str

The MCP tool name (e.g. "list_issues").

required

Returns:

Type Description
list[MockScenario]

List of :class:MockScenario objects (success + error cases).

Raises:

Type Description
KeyError

If no endpoint matches tool_name.

Source code in src/api2mcp/testing/mock_generator.py
def scenarios_for(self, tool_name: str) -> list[MockScenario]:
    """Return mock scenarios for the endpoint matching *tool_name*.

    Args:
        tool_name: The MCP tool name (e.g. ``"list_issues"``).

    Returns:
        List of :class:`MockScenario` objects (success + error cases).

    Raises:
        KeyError: If no endpoint matches *tool_name*.
    """
    endpoint = self._find_endpoint(tool_name)
    if endpoint is None:
        raise KeyError(f"No endpoint found for tool name: {tool_name!r}")
    return self._generate_scenarios(endpoint)
all_scenarios()

Return scenarios for every endpoint in the spec.

Returns:

Type Description
dict[str, list[MockScenario]]

Mapping of tool name → list of :class:MockScenario.

Source code in src/api2mcp/testing/mock_generator.py
def all_scenarios(self) -> dict[str, list[MockScenario]]:
    """Return scenarios for every endpoint in the spec.

    Returns:
        Mapping of tool name → list of :class:`MockScenario`.
    """
    result: dict[str, list[MockScenario]] = {}
    for endpoint in self.api_spec.endpoints:
        tool_name = self._endpoint_to_tool_name(endpoint)
        result[tool_name] = self._generate_scenarios(endpoint)
    return result
success_body(tool_name)

Return just the success scenario body for tool_name.

Parameters:

Name Type Description Default
tool_name str

MCP tool name.

required

Returns:

Type Description
dict[str, Any] | list[Any]

Mock response body.

Source code in src/api2mcp/testing/mock_generator.py
def success_body(self, tool_name: str) -> dict[str, Any] | list[Any]:
    """Return just the success scenario body for *tool_name*.

    Args:
        tool_name: MCP tool name.

    Returns:
        Mock response body.
    """
    scenarios = self.scenarios_for(tool_name)
    success = next((s for s in scenarios if s.status_code == 200), scenarios[0])
    return success.body or {}

MockScenario

Defines how to generate a mock response.

Attributes:

Name Type Description
name

Human-readable scenario name.

status_code

HTTP status code to return.

body

Response body dict (or None for empty body).

headers

Extra response headers.

Source code in src/api2mcp/testing/mock_generator.py
class MockScenario:
    """Defines how to generate a mock response.

    Attributes:
        name:        Human-readable scenario name.
        status_code: HTTP status code to return.
        body:        Response body dict (or None for empty body).
        headers:     Extra response headers.
    """

    def __init__(
        self,
        name: str,
        status_code: int = 200,
        body: dict[str, Any] | list[Any] | None = None,
        headers: dict[str, str] | None = None,
    ) -> None:
        self.name = name
        self.status_code = status_code
        self.body = body
        self.headers = headers or {}

    def to_dict(self) -> dict[str, Any]:
        return {
            "name": self.name,
            "status_code": self.status_code,
            "body": self.body,
            "headers": self.headers,
        }

Plugins

api2mcp.plugins

api2mcp.plugins

Plugin system for API2MCP (F7.2).

Exports the public API for plugin authors and host code.

Classes

BasePlugin

Abstract base class for all API2MCP plugins.

Subclass this and define the class-level attributes below. The framework will instantiate your class and call :meth:setup/:meth:teardown.

Class attributes

id : str Unique slug identifier. Required. name : str Human-readable name. Required. version : str Semver version string (default "0.1.0"). description : str One-line description. author : str Author name. requires : list[str] IDs of plugins that must be loaded before this one.

Source code in src/api2mcp/plugins/base.py
class BasePlugin:
    """Abstract base class for all API2MCP plugins.

    Subclass this and define the class-level attributes below.  The
    framework will instantiate your class and call :meth:`setup`/:meth:`teardown`.

    Class attributes
    ----------------
    id : str
        Unique slug identifier.  **Required.**
    name : str
        Human-readable name.  **Required.**
    version : str
        Semver version string (default ``"0.1.0"``).
    description : str
        One-line description.
    author : str
        Author name.
    requires : list[str]
        IDs of plugins that must be loaded before this one.
    """

    id: str = ""
    name: str = ""
    version: str = "0.1.0"
    description: str = ""
    author: str = ""
    requires: list[str] = []

    # ------------------------------------------------------------------
    # Lifecycle
    # ------------------------------------------------------------------

    def setup(self, hook_manager: HookManager) -> None:
        """Called when the plugin is loaded.

        Override to register hooks and perform initialisation.

        Args:
            hook_manager: The application :class:`~api2mcp.plugins.hooks.HookManager`.
        """

    def teardown(self) -> None:
        """Called when the plugin is unloaded.

        Override to release resources.
        """

    # ------------------------------------------------------------------
    # Helpers
    # ------------------------------------------------------------------

    @classmethod
    def metadata(cls) -> PluginMetadata:
        """Return :class:`PluginMetadata` derived from class attributes."""
        return PluginMetadata(
            id=cls.id,
            name=cls.name,
            version=cls.version,
            description=cls.description,
            author=cls.author,
            requires=list(cls.requires),
        )

    def __repr__(self) -> str:
        return f"{type(self).__name__}(id={self.id!r}, version={self.version!r})"
Functions
setup(hook_manager)

Called when the plugin is loaded.

Override to register hooks and perform initialisation.

Parameters:

Name Type Description Default
hook_manager HookManager

The application :class:~api2mcp.plugins.hooks.HookManager.

required
Source code in src/api2mcp/plugins/base.py
def setup(self, hook_manager: HookManager) -> None:
    """Called when the plugin is loaded.

    Override to register hooks and perform initialisation.

    Args:
        hook_manager: The application :class:`~api2mcp.plugins.hooks.HookManager`.
    """
teardown()

Called when the plugin is unloaded.

Override to release resources.

Source code in src/api2mcp/plugins/base.py
def teardown(self) -> None:
    """Called when the plugin is unloaded.

    Override to release resources.
    """
metadata() classmethod

Return :class:PluginMetadata derived from class attributes.

Source code in src/api2mcp/plugins/base.py
@classmethod
def metadata(cls) -> PluginMetadata:
    """Return :class:`PluginMetadata` derived from class attributes."""
    return PluginMetadata(
        id=cls.id,
        name=cls.name,
        version=cls.version,
        description=cls.description,
        author=cls.author,
        requires=list(cls.requires),
    )

HookManager

Manages plugin hooks throughout the API2MCP pipeline.

Thread-safety note: registration is not thread-safe; register all hooks before starting concurrent work.

Source code in src/api2mcp/plugins/hooks.py
class HookManager:
    """Manages plugin hooks throughout the API2MCP pipeline.

    Thread-safety note: registration is not thread-safe; register all hooks
    before starting concurrent work.
    """

    def __init__(self) -> None:
        self._hooks: dict[str, list[HookRegistration]] = defaultdict(list)
        import threading as _threading
        self._lock = _threading.Lock()  # sync lock — register/unregister are sync; emit snapshots under lock

    # ------------------------------------------------------------------
    # Registration
    # ------------------------------------------------------------------

    def register_hook(
        self,
        event: str,
        callback: Callable[..., Any],
        *,
        plugin_id: str = "",
        priority: int = 100,
    ) -> HookRegistration:
        """Register *callback* to fire when *event* is emitted.

        Args:
            event:     Hook event name (see :data:`KNOWN_HOOKS`).
            callback:  Any callable (sync or async).
            plugin_id: Identifier of the owning plugin.
            priority:  Execution order — lower numbers run first.

        Returns:
            The :class:`HookRegistration` that was created.
        """
        reg = HookRegistration(hook=event, callback=callback, plugin_id=plugin_id, priority=priority)
        with self._lock:
            self._hooks[event].append(reg)
            # Keep sorted by priority
            self._hooks[event].sort(key=lambda r: r.priority)
        log.debug("Registered hook %r for plugin %r (priority=%d)", event, plugin_id, priority)
        return reg

    def unregister_hook(self, registration: HookRegistration) -> bool:
        """Remove a previously registered hook.

        Args:
            registration: The :class:`HookRegistration` to remove.

        Returns:
            ``True`` if found and removed, ``False`` otherwise.
        """
        with self._lock:
            bucket = self._hooks.get(registration.hook, [])
            try:
                bucket.remove(registration)
                return True
            except ValueError:
                return False

    # ------------------------------------------------------------------
    # Emission
    # ------------------------------------------------------------------

    async def emit(self, event: str, **kwargs: Any) -> list[Any]:
        """Emit *event*, invoking all registered callbacks in priority order.

        Async callbacks are awaited; sync callbacks are called directly.
        Exceptions in individual callbacks are logged but do NOT abort the
        remaining callbacks.

        Args:
            event:   Hook event name.
            **kwargs: Keyword arguments forwarded to every callback.

        Returns:
            List of return values from each callback (``None`` values included).
        """
        results: list[Any] = []
        with self._lock:
            hooks = list(self._hooks.get(event, []))
        for reg in hooks:
            try:
                if inspect.iscoroutinefunction(reg.callback):
                    result = await reg.callback(**kwargs)
                else:
                    result = reg.callback(**kwargs)
                results.append(result)
            except Exception as exc:
                log.exception(
                    "Hook %r raised in plugin %r: %s", event, reg.plugin_id, exc
                )
                results.append(None)
        return results

    def emit_sync(self, event: str, **kwargs: Any) -> list[Any]:
        """Synchronous wrapper around :meth:`emit` for non-async contexts.

        Args:
            event:   Hook event name.
            **kwargs: Keyword arguments forwarded to every callback.

        Returns:
            List of return values from each callback.
        """
        try:
            asyncio.get_running_loop()
            # Already inside a running event loop — run in a background thread
            import concurrent.futures
            with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
                fut = pool.submit(asyncio.run, self.emit(event, **kwargs))
                return fut.result()
        except RuntimeError:
            # No running loop — safe to use asyncio.run
            return asyncio.run(self.emit(event, **kwargs))

    # ------------------------------------------------------------------
    # Introspection
    # ------------------------------------------------------------------

    def registered_hooks(self, event: str | None = None) -> list[HookRegistration]:
        """Return all registrations, optionally filtered by *event*.

        Args:
            event: If given, return only registrations for this event.

        Returns:
            List of :class:`HookRegistration` objects.
        """
        if event is not None:
            return list(self._hooks.get(event, []))
        return [r for regs in self._hooks.values() for r in regs]

    def clear(self, event: str | None = None) -> None:
        """Remove all registrations, or only those for *event*.

        Args:
            event: If given, clear only this event's registrations.
        """
        if event is not None:
            self._hooks.pop(event, None)
        else:
            self._hooks.clear()

    def hook_count(self, event: str) -> int:
        """Return the number of registered callbacks for *event*."""
        return len(self._hooks.get(event, []))
Functions
register_hook(event, callback, *, plugin_id='', priority=100)

Register callback to fire when event is emitted.

Parameters:

Name Type Description Default
event str

Hook event name (see :data:KNOWN_HOOKS).

required
callback Callable[..., Any]

Any callable (sync or async).

required
plugin_id str

Identifier of the owning plugin.

''
priority int

Execution order — lower numbers run first.

100

Returns:

Name Type Description
The HookRegistration

class:HookRegistration that was created.

Source code in src/api2mcp/plugins/hooks.py
def register_hook(
    self,
    event: str,
    callback: Callable[..., Any],
    *,
    plugin_id: str = "",
    priority: int = 100,
) -> HookRegistration:
    """Register *callback* to fire when *event* is emitted.

    Args:
        event:     Hook event name (see :data:`KNOWN_HOOKS`).
        callback:  Any callable (sync or async).
        plugin_id: Identifier of the owning plugin.
        priority:  Execution order — lower numbers run first.

    Returns:
        The :class:`HookRegistration` that was created.
    """
    reg = HookRegistration(hook=event, callback=callback, plugin_id=plugin_id, priority=priority)
    with self._lock:
        self._hooks[event].append(reg)
        # Keep sorted by priority
        self._hooks[event].sort(key=lambda r: r.priority)
    log.debug("Registered hook %r for plugin %r (priority=%d)", event, plugin_id, priority)
    return reg
unregister_hook(registration)

Remove a previously registered hook.

Parameters:

Name Type Description Default
registration HookRegistration

The :class:HookRegistration to remove.

required

Returns:

Type Description
bool

True if found and removed, False otherwise.

Source code in src/api2mcp/plugins/hooks.py
def unregister_hook(self, registration: HookRegistration) -> bool:
    """Remove a previously registered hook.

    Args:
        registration: The :class:`HookRegistration` to remove.

    Returns:
        ``True`` if found and removed, ``False`` otherwise.
    """
    with self._lock:
        bucket = self._hooks.get(registration.hook, [])
        try:
            bucket.remove(registration)
            return True
        except ValueError:
            return False
emit(event, **kwargs) async

Emit event, invoking all registered callbacks in priority order.

Async callbacks are awaited; sync callbacks are called directly. Exceptions in individual callbacks are logged but do NOT abort the remaining callbacks.

Parameters:

Name Type Description Default
event str

Hook event name.

required
**kwargs Any

Keyword arguments forwarded to every callback.

{}

Returns:

Type Description
list[Any]

List of return values from each callback (None values included).

Source code in src/api2mcp/plugins/hooks.py
async def emit(self, event: str, **kwargs: Any) -> list[Any]:
    """Emit *event*, invoking all registered callbacks in priority order.

    Async callbacks are awaited; sync callbacks are called directly.
    Exceptions in individual callbacks are logged but do NOT abort the
    remaining callbacks.

    Args:
        event:   Hook event name.
        **kwargs: Keyword arguments forwarded to every callback.

    Returns:
        List of return values from each callback (``None`` values included).
    """
    results: list[Any] = []
    with self._lock:
        hooks = list(self._hooks.get(event, []))
    for reg in hooks:
        try:
            if inspect.iscoroutinefunction(reg.callback):
                result = await reg.callback(**kwargs)
            else:
                result = reg.callback(**kwargs)
            results.append(result)
        except Exception as exc:
            log.exception(
                "Hook %r raised in plugin %r: %s", event, reg.plugin_id, exc
            )
            results.append(None)
    return results
emit_sync(event, **kwargs)

Synchronous wrapper around :meth:emit for non-async contexts.

Parameters:

Name Type Description Default
event str

Hook event name.

required
**kwargs Any

Keyword arguments forwarded to every callback.

{}

Returns:

Type Description
list[Any]

List of return values from each callback.

Source code in src/api2mcp/plugins/hooks.py
def emit_sync(self, event: str, **kwargs: Any) -> list[Any]:
    """Synchronous wrapper around :meth:`emit` for non-async contexts.

    Args:
        event:   Hook event name.
        **kwargs: Keyword arguments forwarded to every callback.

    Returns:
        List of return values from each callback.
    """
    try:
        asyncio.get_running_loop()
        # Already inside a running event loop — run in a background thread
        import concurrent.futures
        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
            fut = pool.submit(asyncio.run, self.emit(event, **kwargs))
            return fut.result()
    except RuntimeError:
        # No running loop — safe to use asyncio.run
        return asyncio.run(self.emit(event, **kwargs))
registered_hooks(event=None)

Return all registrations, optionally filtered by event.

Parameters:

Name Type Description Default
event str | None

If given, return only registrations for this event.

None

Returns:

Type Description
list[HookRegistration]

List of :class:HookRegistration objects.

Source code in src/api2mcp/plugins/hooks.py
def registered_hooks(self, event: str | None = None) -> list[HookRegistration]:
    """Return all registrations, optionally filtered by *event*.

    Args:
        event: If given, return only registrations for this event.

    Returns:
        List of :class:`HookRegistration` objects.
    """
    if event is not None:
        return list(self._hooks.get(event, []))
    return [r for regs in self._hooks.values() for r in regs]
clear(event=None)

Remove all registrations, or only those for event.

Parameters:

Name Type Description Default
event str | None

If given, clear only this event's registrations.

None
Source code in src/api2mcp/plugins/hooks.py
def clear(self, event: str | None = None) -> None:
    """Remove all registrations, or only those for *event*.

    Args:
        event: If given, clear only this event's registrations.
    """
    if event is not None:
        self._hooks.pop(event, None)
    else:
        self._hooks.clear()
hook_count(event)

Return the number of registered callbacks for event.

Source code in src/api2mcp/plugins/hooks.py
def hook_count(self, event: str) -> int:
    """Return the number of registered callbacks for *event*."""
    return len(self._hooks.get(event, []))

PluginManager

Orchestrates the full plugin lifecycle.

Parameters:

Name Type Description Default
plugin_dir Path | None

Local directory for directory-based discovery.

None
sandbox PluginSandbox | None

:class:~api2mcp.plugins.sandbox.PluginSandbox to use. If None, hooks run without sandbox wrapping.

None
loader PluginLoader | None

Custom :class:~api2mcp.plugins.discovery.PluginLoader.

None
Source code in src/api2mcp/plugins/manager.py
class PluginManager:
    """Orchestrates the full plugin lifecycle.

    Args:
        plugin_dir:  Local directory for directory-based discovery.
        sandbox:     :class:`~api2mcp.plugins.sandbox.PluginSandbox` to use.
                     If ``None``, hooks run without sandbox wrapping.
        loader:      Custom :class:`~api2mcp.plugins.discovery.PluginLoader`.
    """

    def __init__(
        self,
        plugin_dir: Path | None = None,
        sandbox: PluginSandbox | None = None,
        loader: PluginLoader | None = None,
    ) -> None:
        self._loader = loader or PluginLoader(plugin_dir=plugin_dir)
        self._sandbox = sandbox
        self._hooks = HookManager()
        self._loaded: list[BasePlugin] = []
        self._lock = threading.Lock()  # sync lock — PluginManager lifecycle methods are synchronous

    # ------------------------------------------------------------------
    # Properties
    # ------------------------------------------------------------------

    @property
    def hooks(self) -> HookManager:
        """The shared :class:`~api2mcp.plugins.hooks.HookManager`."""
        return self._hooks

    @property
    def loaded_plugins(self) -> list[BasePlugin]:
        """List of currently loaded :class:`~api2mcp.plugins.base.BasePlugin` instances."""
        return list(self._loaded)

    # ------------------------------------------------------------------
    # Lifecycle
    # ------------------------------------------------------------------

    def load_plugins(self, plugins: list[BasePlugin]) -> None:
        """Load *plugins* in dependency-resolved order.

        Args:
            plugins: Pre-discovered plugin instances to load.

        Raises:
            :class:`~api2mcp.plugins.dependency.PluginDependencyError`:
                If dependency resolution fails.
        """
        ordered = resolve_load_order(plugins)
        for plugin in ordered:
            try:
                plugin.setup(self._hooks)
                with self._lock:
                    self._loaded.append(plugin)
                log.info("Loaded plugin: %s %s", plugin.id, plugin.version)
            except Exception as exc:
                log.error("Plugin %r setup failed: %s", plugin.id, exc)

    def load_all(self, directory: Path | None = None) -> None:
        """Discover and load all plugins (entry points + directory).

        Args:
            directory: Override local plugin directory for this call.
        """
        discovered = self._loader.discover_all(directory)
        if not discovered:
            log.debug("No plugins discovered")
            return
        try:
            self.load_plugins(discovered)
        except PluginDependencyError as exc:
            log.error("Plugin dependency error: %s", exc)

    def unload_all(self) -> None:
        """Call :meth:`~api2mcp.plugins.base.BasePlugin.teardown` on all plugins
        and clear the hooks."""
        with self._lock:
            plugins_snapshot = list(reversed(self._loaded))
        for plugin in plugins_snapshot:
            try:
                plugin.teardown()
                log.info("Unloaded plugin: %s", plugin.id)
            except Exception as exc:
                log.warning("Plugin %r teardown raised: %s", plugin.id, exc)
        with self._lock:
            self._loaded.clear()
        self._hooks.clear()

    def get_plugin(self, plugin_id: str) -> BasePlugin | None:
        """Return the loaded plugin with *plugin_id*, or ``None``."""
        for p in self._loaded:
            if p.id == plugin_id:
                return p
        return None
Attributes
hooks property

The shared :class:~api2mcp.plugins.hooks.HookManager.

loaded_plugins property

List of currently loaded :class:~api2mcp.plugins.base.BasePlugin instances.

Functions
load_plugins(plugins)

Load plugins in dependency-resolved order.

Parameters:

Name Type Description Default
plugins list[BasePlugin]

Pre-discovered plugin instances to load.

required

Raises:

Type Description

class:~api2mcp.plugins.dependency.PluginDependencyError: If dependency resolution fails.

Source code in src/api2mcp/plugins/manager.py
def load_plugins(self, plugins: list[BasePlugin]) -> None:
    """Load *plugins* in dependency-resolved order.

    Args:
        plugins: Pre-discovered plugin instances to load.

    Raises:
        :class:`~api2mcp.plugins.dependency.PluginDependencyError`:
            If dependency resolution fails.
    """
    ordered = resolve_load_order(plugins)
    for plugin in ordered:
        try:
            plugin.setup(self._hooks)
            with self._lock:
                self._loaded.append(plugin)
            log.info("Loaded plugin: %s %s", plugin.id, plugin.version)
        except Exception as exc:
            log.error("Plugin %r setup failed: %s", plugin.id, exc)
load_all(directory=None)

Discover and load all plugins (entry points + directory).

Parameters:

Name Type Description Default
directory Path | None

Override local plugin directory for this call.

None
Source code in src/api2mcp/plugins/manager.py
def load_all(self, directory: Path | None = None) -> None:
    """Discover and load all plugins (entry points + directory).

    Args:
        directory: Override local plugin directory for this call.
    """
    discovered = self._loader.discover_all(directory)
    if not discovered:
        log.debug("No plugins discovered")
        return
    try:
        self.load_plugins(discovered)
    except PluginDependencyError as exc:
        log.error("Plugin dependency error: %s", exc)
unload_all()

Call :meth:~api2mcp.plugins.base.BasePlugin.teardown on all plugins and clear the hooks.

Source code in src/api2mcp/plugins/manager.py
def unload_all(self) -> None:
    """Call :meth:`~api2mcp.plugins.base.BasePlugin.teardown` on all plugins
    and clear the hooks."""
    with self._lock:
        plugins_snapshot = list(reversed(self._loaded))
    for plugin in plugins_snapshot:
        try:
            plugin.teardown()
            log.info("Unloaded plugin: %s", plugin.id)
        except Exception as exc:
            log.warning("Plugin %r teardown raised: %s", plugin.id, exc)
    with self._lock:
        self._loaded.clear()
    self._hooks.clear()
get_plugin(plugin_id)

Return the loaded plugin with plugin_id, or None.

Source code in src/api2mcp/plugins/manager.py
def get_plugin(self, plugin_id: str) -> BasePlugin | None:
    """Return the loaded plugin with *plugin_id*, or ``None``."""
    for p in self._loaded:
        if p.id == plugin_id:
            return p
    return None

PluginLoader

Discovers and instantiates :class:~api2mcp.plugins.base.BasePlugin subclasses.

Parameters:

Name Type Description Default
plugin_dir Path | None

Local directory to scan for .py plugin files. Defaults to ~/.api2mcp/plugins/.

None
Source code in src/api2mcp/plugins/discovery.py
class PluginLoader:
    """Discovers and instantiates :class:`~api2mcp.plugins.base.BasePlugin` subclasses.

    Args:
        plugin_dir: Local directory to scan for ``.py`` plugin files.
                    Defaults to ``~/.api2mcp/plugins/``.
    """

    def __init__(self, plugin_dir: Path | None = None) -> None:
        self.plugin_dir = plugin_dir or _DEFAULT_PLUGIN_DIR

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------

    def discover_entry_points(self) -> list[BasePlugin]:
        """Load plugins registered via the ``api2mcp.plugins`` entry-point group.

        Returns:
            List of instantiated :class:`BasePlugin` objects.
        """
        plugins: list[BasePlugin] = []
        try:
            from importlib.metadata import entry_points

            eps = entry_points(group=_EP_GROUP)
            for ep in eps:
                try:
                    plugin_cls = ep.load()
                    if self._is_valid_plugin_class(plugin_cls):
                        plugins.append(plugin_cls())
                        log.debug("Loaded entry-point plugin: %r", ep.name)
                    else:
                        log.warning(
                            "Entry point %r did not return a BasePlugin subclass", ep.name
                        )
                except Exception as exc:
                    log.warning("Failed to load entry-point plugin %r: %s", ep.name, exc)
        except Exception as exc:
            log.debug("Entry-point discovery failed: %s", exc)
        return plugins

    def discover_directory(self, directory: Path | None = None) -> list[BasePlugin]:
        """Scan *directory* for ``.py`` files and load plugin classes.

        Each ``.py`` file is imported as a module.  All
        :class:`~api2mcp.plugins.base.BasePlugin` subclasses with a non-empty
        ``id`` attribute are instantiated.

        Args:
            directory: Directory to scan.  Defaults to :attr:`plugin_dir`.

        Returns:
            List of instantiated :class:`BasePlugin` objects.
        """
        directory = directory or self.plugin_dir
        plugins: list[BasePlugin] = []

        if not directory.is_dir():
            log.debug("Plugin directory %s does not exist, skipping", directory)
            return plugins

        for path in sorted(directory.glob("*.py")):
            try:
                module = self._load_module_from_file(path)
                found = self._extract_plugins(module)
                plugins.extend(found)
                log.debug("Loaded %d plugin(s) from %s", len(found), path)
            except Exception as exc:
                log.warning("Failed to load plugin file %s: %s", path, exc)

        return plugins

    def discover_all(self, directory: Path | None = None) -> list[BasePlugin]:
        """Combine entry-point and directory discovery.

        Args:
            directory: Override local plugin directory.

        Returns:
            Deduplicated list of :class:`BasePlugin` instances (by ``id``).
        """
        seen_ids: set[str] = set()
        result: list[BasePlugin] = []

        for plugin in self.discover_entry_points() + self.discover_directory(directory):
            if plugin.id in seen_ids:
                log.debug("Skipping duplicate plugin id %r", plugin.id)
                continue
            seen_ids.add(plugin.id)
            result.append(plugin)

        return result

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    @staticmethod
    def _load_module_from_file(path: Path) -> Any:
        """Import a Python file as a fresh module."""
        module_name = f"api2mcp_plugin_{path.stem}"
        spec = importlib.util.spec_from_file_location(module_name, path)
        if spec is None or spec.loader is None:
            raise ImportError(f"Cannot create module spec for {path}")
        module = importlib.util.module_from_spec(spec)
        sys.modules[module_name] = module
        spec.loader.exec_module(module)  # type: ignore[union-attr]
        return module

    @staticmethod
    def _extract_plugins(module: Any) -> list[BasePlugin]:
        """Find and instantiate all valid :class:`BasePlugin` subclasses in *module*."""
        plugins: list[BasePlugin] = []
        for _name, obj in inspect.getmembers(module, inspect.isclass):
            if (
                obj is not BasePlugin
                and issubclass(obj, BasePlugin)
                and obj.id  # must have a non-empty id
            ):
                try:
                    plugins.append(obj())
                except Exception as exc:
                    log.warning("Failed to instantiate plugin class %r: %s", obj.__name__, exc)
        return plugins

    @staticmethod
    def _is_valid_plugin_class(plugin_cls: Any) -> bool:
        """Return ``True`` if *plugin_cls* is a concrete :class:`BasePlugin` subclass."""
        return (
            inspect.isclass(plugin_cls)
            and issubclass(plugin_cls, BasePlugin)
            and plugin_cls is not BasePlugin
            and bool(getattr(plugin_cls, "id", ""))
        )
Functions
discover_entry_points()

Load plugins registered via the api2mcp.plugins entry-point group.

Returns:

Type Description
list[BasePlugin]

List of instantiated :class:BasePlugin objects.

Source code in src/api2mcp/plugins/discovery.py
def discover_entry_points(self) -> list[BasePlugin]:
    """Load plugins registered via the ``api2mcp.plugins`` entry-point group.

    Returns:
        List of instantiated :class:`BasePlugin` objects.
    """
    plugins: list[BasePlugin] = []
    try:
        from importlib.metadata import entry_points

        eps = entry_points(group=_EP_GROUP)
        for ep in eps:
            try:
                plugin_cls = ep.load()
                if self._is_valid_plugin_class(plugin_cls):
                    plugins.append(plugin_cls())
                    log.debug("Loaded entry-point plugin: %r", ep.name)
                else:
                    log.warning(
                        "Entry point %r did not return a BasePlugin subclass", ep.name
                    )
            except Exception as exc:
                log.warning("Failed to load entry-point plugin %r: %s", ep.name, exc)
    except Exception as exc:
        log.debug("Entry-point discovery failed: %s", exc)
    return plugins
discover_directory(directory=None)

Scan directory for .py files and load plugin classes.

Each .py file is imported as a module. All :class:~api2mcp.plugins.base.BasePlugin subclasses with a non-empty id attribute are instantiated.

Parameters:

Name Type Description Default
directory Path | None

Directory to scan. Defaults to :attr:plugin_dir.

None

Returns:

Type Description
list[BasePlugin]

List of instantiated :class:BasePlugin objects.

Source code in src/api2mcp/plugins/discovery.py
def discover_directory(self, directory: Path | None = None) -> list[BasePlugin]:
    """Scan *directory* for ``.py`` files and load plugin classes.

    Each ``.py`` file is imported as a module.  All
    :class:`~api2mcp.plugins.base.BasePlugin` subclasses with a non-empty
    ``id`` attribute are instantiated.

    Args:
        directory: Directory to scan.  Defaults to :attr:`plugin_dir`.

    Returns:
        List of instantiated :class:`BasePlugin` objects.
    """
    directory = directory or self.plugin_dir
    plugins: list[BasePlugin] = []

    if not directory.is_dir():
        log.debug("Plugin directory %s does not exist, skipping", directory)
        return plugins

    for path in sorted(directory.glob("*.py")):
        try:
            module = self._load_module_from_file(path)
            found = self._extract_plugins(module)
            plugins.extend(found)
            log.debug("Loaded %d plugin(s) from %s", len(found), path)
        except Exception as exc:
            log.warning("Failed to load plugin file %s: %s", path, exc)

    return plugins
discover_all(directory=None)

Combine entry-point and directory discovery.

Parameters:

Name Type Description Default
directory Path | None

Override local plugin directory.

None

Returns:

Type Description
list[BasePlugin]

Deduplicated list of :class:BasePlugin instances (by id).

Source code in src/api2mcp/plugins/discovery.py
def discover_all(self, directory: Path | None = None) -> list[BasePlugin]:
    """Combine entry-point and directory discovery.

    Args:
        directory: Override local plugin directory.

    Returns:
        Deduplicated list of :class:`BasePlugin` instances (by ``id``).
    """
    seen_ids: set[str] = set()
    result: list[BasePlugin] = []

    for plugin in self.discover_entry_points() + self.discover_directory(directory):
        if plugin.id in seen_ids:
            log.debug("Skipping duplicate plugin id %r", plugin.id)
            continue
        seen_ids.add(plugin.id)
        result.append(plugin)

    return result

PluginSandbox

Runs plugin callbacks with timeout enforcement and exception isolation.

Parameters:

Name Type Description Default
timeout float | None

Maximum seconds a single callback may run. None disables the timeout (useful in tests).

10.0
reraise bool

If True, exceptions from callbacks are re-raised after logging. Default False (swallow & return None).

False
Source code in src/api2mcp/plugins/sandbox.py
class PluginSandbox:
    """Runs plugin callbacks with timeout enforcement and exception isolation.

    Args:
        timeout: Maximum seconds a single callback may run.  ``None`` disables
                 the timeout (useful in tests).
        reraise: If ``True``, exceptions from callbacks are re-raised after
                 logging.  Default ``False`` (swallow & return ``None``).
    """

    def __init__(self, timeout: float | None = 10.0, *, reraise: bool = False) -> None:
        self.timeout = timeout
        self.reraise = reraise

    async def call(
        self, callback: Callable[..., Any], **kwargs: Any
    ) -> Any:
        """Invoke *callback* safely with optional timeout.

        Args:
            callback: Sync or async callable.
            **kwargs: Forwarded to *callback*.

        Returns:
            The return value of *callback*, or ``None`` on error/timeout.

        Raises:
            Exception: Only if :attr:`reraise` is ``True`` and the callback raises.
        """
        try:
            if inspect.iscoroutinefunction(callback):
                coro = callback(**kwargs)
                if self.timeout is not None:
                    return await asyncio.wait_for(coro, timeout=self.timeout)
                return await coro
            else:
                # Run sync callbacks in a thread executor to support timeout
                loop = asyncio.get_event_loop()
                if self.timeout is not None:
                    return await asyncio.wait_for(
                        loop.run_in_executor(None, lambda: callback(**kwargs)),
                        timeout=self.timeout,
                    )
                return callback(**kwargs)

        except TimeoutError:
            log.warning(
                "Plugin callback %r timed out after %.1fs",
                getattr(callback, "__name__", repr(callback)),
                self.timeout,
            )
            if self.reraise:
                raise
            return None

        except SandboxViolation:
            log.error(
                "Plugin callback %r attempted a blocked operation",
                getattr(callback, "__name__", repr(callback)),
            )
            if self.reraise:
                raise
            return None

        except Exception as exc:
            log.exception(
                "Plugin callback %r raised: %s",
                getattr(callback, "__name__", repr(callback)),
                exc,
            )
            if self.reraise:
                raise
            return None
Functions
call(callback, **kwargs) async

Invoke callback safely with optional timeout.

Parameters:

Name Type Description Default
callback Callable[..., Any]

Sync or async callable.

required
**kwargs Any

Forwarded to callback.

{}

Returns:

Type Description
Any

The return value of callback, or None on error/timeout.

Raises:

Type Description
Exception

Only if :attr:reraise is True and the callback raises.

Source code in src/api2mcp/plugins/sandbox.py
async def call(
    self, callback: Callable[..., Any], **kwargs: Any
) -> Any:
    """Invoke *callback* safely with optional timeout.

    Args:
        callback: Sync or async callable.
        **kwargs: Forwarded to *callback*.

    Returns:
        The return value of *callback*, or ``None`` on error/timeout.

    Raises:
        Exception: Only if :attr:`reraise` is ``True`` and the callback raises.
    """
    try:
        if inspect.iscoroutinefunction(callback):
            coro = callback(**kwargs)
            if self.timeout is not None:
                return await asyncio.wait_for(coro, timeout=self.timeout)
            return await coro
        else:
            # Run sync callbacks in a thread executor to support timeout
            loop = asyncio.get_event_loop()
            if self.timeout is not None:
                return await asyncio.wait_for(
                    loop.run_in_executor(None, lambda: callback(**kwargs)),
                    timeout=self.timeout,
                )
            return callback(**kwargs)

    except TimeoutError:
        log.warning(
            "Plugin callback %r timed out after %.1fs",
            getattr(callback, "__name__", repr(callback)),
            self.timeout,
        )
        if self.reraise:
            raise
        return None

    except SandboxViolation:
        log.error(
            "Plugin callback %r attempted a blocked operation",
            getattr(callback, "__name__", repr(callback)),
        )
        if self.reraise:
            raise
        return None

    except Exception as exc:
        log.exception(
            "Plugin callback %r raised: %s",
            getattr(callback, "__name__", repr(callback)),
            exc,
        )
        if self.reraise:
            raise
        return None

Templates

api2mcp.templates

api2mcp.templates

Template registry and installer for F7.1.

Classes

TemplateManifest dataclass

Parsed contents of a template.yaml file.

Attributes:

Name Type Description
id str

Unique slug identifier (e.g. "github-issues").

name str

Human-readable template name.

description str

Short description of what the template does.

author str

Author handle or organisation name.

version str

Current version string (semver recommended).

tags list[str]

Searchable keyword tags.

spec_file str

Relative path to the OpenAPI/Swagger spec inside the template repo.

repository str

URL of the template's git repository.

versions list[VersionEntry]

Ordered list of :class:VersionEntry objects (newest first).

rating float

Average user rating (0.0–5.0).

downloads int

Total install count.

reviews list[ReviewEntry]

List of :class:ReviewEntry objects.

Source code in src/api2mcp/templates/manifest.py
@dataclass
class TemplateManifest:
    """Parsed contents of a ``template.yaml`` file.

    Attributes:
        id:          Unique slug identifier (e.g. ``"github-issues"``).
        name:        Human-readable template name.
        description: Short description of what the template does.
        author:      Author handle or organisation name.
        version:     Current version string (semver recommended).
        tags:        Searchable keyword tags.
        spec_file:   Relative path to the OpenAPI/Swagger spec inside the template repo.
        repository:  URL of the template's git repository.
        versions:    Ordered list of :class:`VersionEntry` objects (newest first).
        rating:      Average user rating (0.0–5.0).
        downloads:   Total install count.
        reviews:     List of :class:`ReviewEntry` objects.
    """

    id: str
    name: str
    description: str = ""
    author: str = ""
    version: str = "0.1.0"
    tags: list[str] = field(default_factory=list)
    spec_file: str = "openapi.yaml"
    repository: str = ""
    versions: list[VersionEntry] = field(default_factory=list)
    rating: float = 0.0
    downloads: int = 0
    reviews: list[ReviewEntry] = field(default_factory=list)

    # ------------------------------------------------------------------
    # Factories
    # ------------------------------------------------------------------

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> TemplateManifest:
        """Build a :class:`TemplateManifest` from a raw parsed dict.

        Args:
            data: Dict loaded from ``template.yaml``.

        Returns:
            A populated :class:`TemplateManifest`.

        Raises:
            ValueError: If required fields (``id``, ``name``) are missing.
        """
        if "id" not in data:
            raise ValueError("template.yaml must contain 'id'")
        if "name" not in data:
            raise ValueError("template.yaml must contain 'name'")

        versions = [VersionEntry.from_dict(v) for v in data.get("versions", [])]
        reviews = [ReviewEntry.from_dict(r) for r in data.get("reviews", [])]

        return cls(
            id=data["id"],
            name=data["name"],
            description=data.get("description", ""),
            author=data.get("author", ""),
            version=str(data.get("version", "0.1.0")),
            tags=list(data.get("tags", [])),
            spec_file=str(data.get("spec_file", "openapi.yaml")),
            repository=str(data.get("repository", "")),
            versions=versions,
            rating=float(data.get("rating", 0.0)),
            downloads=int(data.get("downloads", 0)),
            reviews=reviews,
        )

    @classmethod
    def from_yaml(cls, text: str) -> TemplateManifest:
        """Parse a ``template.yaml`` string.

        Args:
            text: Raw YAML content.

        Returns:
            :class:`TemplateManifest`

        Raises:
            ValueError: If the YAML is invalid or required fields are missing.
        """
        try:
            data = yaml.safe_load(text)
        except yaml.YAMLError as exc:
            raise ValueError(f"Invalid YAML in template manifest: {exc}") from exc
        if not isinstance(data, dict):
            raise ValueError("template.yaml must be a YAML mapping")
        return cls.from_dict(data)

    # ------------------------------------------------------------------
    # Helpers
    # ------------------------------------------------------------------

    def to_dict(self) -> dict[str, Any]:
        """Serialise to a plain dict (round-trippable via :meth:`from_dict`)."""
        return {
            "id": self.id,
            "name": self.name,
            "description": self.description,
            "author": self.author,
            "version": self.version,
            "tags": list(self.tags),
            "spec_file": self.spec_file,
            "repository": self.repository,
            "versions": [v.to_dict() for v in self.versions],
            "rating": self.rating,
            "downloads": self.downloads,
            "reviews": [r.to_dict() for r in self.reviews],
        }

    def latest_version_tag(self) -> str | None:
        """Return the tag of the newest version entry, or ``None`` if empty."""
        return self.versions[0].tag if self.versions else None

    def resolve_version(self, requested: str | None) -> str:
        """Resolve a requested version string to a git tag.

        Args:
            requested: Version tag (e.g. ``"v1.0.0"``) or ``None`` / ``"latest"``.

        Returns:
            The resolved git tag string.

        Raises:
            ValueError: If *requested* does not match any known version.
        """
        if requested is None or requested.lower() in ("latest", ""):
            return self.latest_version_tag() or self.version
        available = {v.tag for v in self.versions}
        if requested in available:
            return requested
        raise ValueError(
            f"Version {requested!r} not found in template {self.id!r}. "
            f"Available: {sorted(available)}"
        )

    def matches_query(self, query: str) -> bool:
        """Return ``True`` if *query* appears in the id, name, description, or tags."""
        q = query.lower()
        return (
            q in self.id.lower()
            or q in self.name.lower()
            or q in self.description.lower()
            or any(q in tag.lower() for tag in self.tags)
        )
Functions
from_dict(data) classmethod

Build a :class:TemplateManifest from a raw parsed dict.

Parameters:

Name Type Description Default
data dict[str, Any]

Dict loaded from template.yaml.

required

Returns:

Type Description
TemplateManifest

A populated :class:TemplateManifest.

Raises:

Type Description
ValueError

If required fields (id, name) are missing.

Source code in src/api2mcp/templates/manifest.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> TemplateManifest:
    """Build a :class:`TemplateManifest` from a raw parsed dict.

    Args:
        data: Dict loaded from ``template.yaml``.

    Returns:
        A populated :class:`TemplateManifest`.

    Raises:
        ValueError: If required fields (``id``, ``name``) are missing.
    """
    if "id" not in data:
        raise ValueError("template.yaml must contain 'id'")
    if "name" not in data:
        raise ValueError("template.yaml must contain 'name'")

    versions = [VersionEntry.from_dict(v) for v in data.get("versions", [])]
    reviews = [ReviewEntry.from_dict(r) for r in data.get("reviews", [])]

    return cls(
        id=data["id"],
        name=data["name"],
        description=data.get("description", ""),
        author=data.get("author", ""),
        version=str(data.get("version", "0.1.0")),
        tags=list(data.get("tags", [])),
        spec_file=str(data.get("spec_file", "openapi.yaml")),
        repository=str(data.get("repository", "")),
        versions=versions,
        rating=float(data.get("rating", 0.0)),
        downloads=int(data.get("downloads", 0)),
        reviews=reviews,
    )
from_yaml(text) classmethod

Parse a template.yaml string.

Parameters:

Name Type Description Default
text str

Raw YAML content.

required

Returns:

Type Description
TemplateManifest

class:TemplateManifest

Raises:

Type Description
ValueError

If the YAML is invalid or required fields are missing.

Source code in src/api2mcp/templates/manifest.py
@classmethod
def from_yaml(cls, text: str) -> TemplateManifest:
    """Parse a ``template.yaml`` string.

    Args:
        text: Raw YAML content.

    Returns:
        :class:`TemplateManifest`

    Raises:
        ValueError: If the YAML is invalid or required fields are missing.
    """
    try:
        data = yaml.safe_load(text)
    except yaml.YAMLError as exc:
        raise ValueError(f"Invalid YAML in template manifest: {exc}") from exc
    if not isinstance(data, dict):
        raise ValueError("template.yaml must be a YAML mapping")
    return cls.from_dict(data)
to_dict()

Serialise to a plain dict (round-trippable via :meth:from_dict).

Source code in src/api2mcp/templates/manifest.py
def to_dict(self) -> dict[str, Any]:
    """Serialise to a plain dict (round-trippable via :meth:`from_dict`)."""
    return {
        "id": self.id,
        "name": self.name,
        "description": self.description,
        "author": self.author,
        "version": self.version,
        "tags": list(self.tags),
        "spec_file": self.spec_file,
        "repository": self.repository,
        "versions": [v.to_dict() for v in self.versions],
        "rating": self.rating,
        "downloads": self.downloads,
        "reviews": [r.to_dict() for r in self.reviews],
    }
latest_version_tag()

Return the tag of the newest version entry, or None if empty.

Source code in src/api2mcp/templates/manifest.py
def latest_version_tag(self) -> str | None:
    """Return the tag of the newest version entry, or ``None`` if empty."""
    return self.versions[0].tag if self.versions else None
resolve_version(requested)

Resolve a requested version string to a git tag.

Parameters:

Name Type Description Default
requested str | None

Version tag (e.g. "v1.0.0") or None / "latest".

required

Returns:

Type Description
str

The resolved git tag string.

Raises:

Type Description
ValueError

If requested does not match any known version.

Source code in src/api2mcp/templates/manifest.py
def resolve_version(self, requested: str | None) -> str:
    """Resolve a requested version string to a git tag.

    Args:
        requested: Version tag (e.g. ``"v1.0.0"``) or ``None`` / ``"latest"``.

    Returns:
        The resolved git tag string.

    Raises:
        ValueError: If *requested* does not match any known version.
    """
    if requested is None or requested.lower() in ("latest", ""):
        return self.latest_version_tag() or self.version
    available = {v.tag for v in self.versions}
    if requested in available:
        return requested
    raise ValueError(
        f"Version {requested!r} not found in template {self.id!r}. "
        f"Available: {sorted(available)}"
    )
matches_query(query)

Return True if query appears in the id, name, description, or tags.

Source code in src/api2mcp/templates/manifest.py
def matches_query(self, query: str) -> bool:
    """Return ``True`` if *query* appears in the id, name, description, or tags."""
    q = query.lower()
    return (
        q in self.id.lower()
        or q in self.name.lower()
        or q in self.description.lower()
        or any(q in tag.lower() for tag in self.tags)
    )

TemplateRegistry

Client for the remote template registry.

Parameters:

Name Type Description Default
index_url str

URL of the remote registry.yaml index file.

_DEFAULT_INDEX_URL
cache_dir Path | None

Local directory for caching the index and downloaded manifests.

None
http_client Any | None

Optional pre-built :class:httpx.AsyncClient (injected for tests).

None
Source code in src/api2mcp/templates/registry.py
class TemplateRegistry:
    """Client for the remote template registry.

    Args:
        index_url:  URL of the remote ``registry.yaml`` index file.
        cache_dir:  Local directory for caching the index and downloaded manifests.
        http_client: Optional pre-built :class:`httpx.AsyncClient` (injected for tests).
    """

    def __init__(
        self,
        index_url: str = _DEFAULT_INDEX_URL,
        cache_dir: Path | None = None,
        http_client: Any | None = None,
    ) -> None:
        self.index_url = index_url
        self.cache_dir = cache_dir or (Path.home() / ".api2mcp" / "templates")
        self._http_client = http_client
        self._index: RegistryIndex | None = None

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------

    async def refresh(self, *, force: bool = False) -> RegistryIndex:
        """Fetch (or re-fetch) the registry index.

        Uses local disk cache if the cached file is still fresh,
        unless *force* is ``True``.

        Args:
            force: Bypass cache and always fetch from remote.

        Returns:
            The loaded :class:`RegistryIndex`.
        """
        cache_file = self.cache_dir / "registry.yaml"

        if not force and cache_file.is_file():
            age = time.time() - cache_file.stat().st_mtime
            if age < _CACHE_TTL_SECONDS:
                text = cache_file.read_text(encoding="utf-8")
                self._index = RegistryIndex.from_yaml(text)
                return self._index

        text = await self._fetch_url(self.index_url)
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        cache_file.write_text(text, encoding="utf-8")
        self._index = RegistryIndex.from_yaml(text)
        return self._index

    def search(self, query: str = "") -> list[TemplateManifest]:
        """Filter templates by *query* (case-insensitive substring match).

        Searches template id, name, description, and tags.

        Args:
            query: Search string. Empty string returns all templates.

        Returns:
            List of matching :class:`TemplateManifest` objects.

        Raises:
            RuntimeError: If :meth:`refresh` has not been called yet.
        """
        self._ensure_index()
        assert self._index is not None
        if not query:
            return list(self._index.templates)
        return [t for t in self._index.templates if t.matches_query(query)]

    def get(self, template_id: str) -> TemplateManifest | None:
        """Look up a template by exact id.

        Args:
            template_id: Exact template slug.

        Returns:
            :class:`TemplateManifest` or ``None`` if not found.
        """
        self._ensure_index()
        assert self._index is not None
        for t in self._index.templates:
            if t.id == template_id:
                return t
        return None

    async def fetch_manifest(self, template_id: str) -> TemplateManifest:
        """Fetch the full ``template.yaml`` manifest from the template's repository.

        The summary entry in the registry index only contains subset metadata.
        This method fetches the full manifest from the template's own repo.

        Args:
            template_id: Exact template slug.

        Returns:
            Full :class:`TemplateManifest`.

        Raises:
            KeyError:   If *template_id* is not in the registry.
            ValueError: If the remote manifest YAML is malformed.
        """
        summary = self.get(template_id)
        if summary is None:
            available = [t.id for t in (self._index.templates if self._index else [])]
            raise KeyError(
                f"Template {template_id!r} not found in registry. "
                f"Available: {available}"
            )

        manifest_url = _build_raw_url(summary.repository)
        text = await self._fetch_url(manifest_url)
        return TemplateManifest.from_yaml(text)

    @property
    def index(self) -> RegistryIndex | None:
        """The currently loaded :class:`RegistryIndex`, or ``None``."""
        return self._index

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    async def _fetch_url(self, url: str) -> str:
        """Fetch *url* and return the response body as text.

        Uses an injected ``http_client`` if provided, otherwise creates a
        temporary :class:`httpx.AsyncClient`.
        """
        if self._http_client is not None:
            response = await self._http_client.get(url)
            response.raise_for_status()
            return response.text

        import httpx

        async with httpx.AsyncClient(follow_redirects=True, timeout=30.0) as client:
            response = await client.get(url)
            response.raise_for_status()
            return response.text

    def _ensure_index(self) -> None:
        if self._index is None:
            raise RuntimeError(
                "Registry index not loaded. Call 'await registry.refresh()' first."
            )
Attributes
index property

The currently loaded :class:RegistryIndex, or None.

Functions
refresh(*, force=False) async

Fetch (or re-fetch) the registry index.

Uses local disk cache if the cached file is still fresh, unless force is True.

Parameters:

Name Type Description Default
force bool

Bypass cache and always fetch from remote.

False

Returns:

Type Description
RegistryIndex

The loaded :class:RegistryIndex.

Source code in src/api2mcp/templates/registry.py
async def refresh(self, *, force: bool = False) -> RegistryIndex:
    """Fetch (or re-fetch) the registry index.

    Uses local disk cache if the cached file is still fresh,
    unless *force* is ``True``.

    Args:
        force: Bypass cache and always fetch from remote.

    Returns:
        The loaded :class:`RegistryIndex`.
    """
    cache_file = self.cache_dir / "registry.yaml"

    if not force and cache_file.is_file():
        age = time.time() - cache_file.stat().st_mtime
        if age < _CACHE_TTL_SECONDS:
            text = cache_file.read_text(encoding="utf-8")
            self._index = RegistryIndex.from_yaml(text)
            return self._index

    text = await self._fetch_url(self.index_url)
    self.cache_dir.mkdir(parents=True, exist_ok=True)
    cache_file.write_text(text, encoding="utf-8")
    self._index = RegistryIndex.from_yaml(text)
    return self._index
search(query='')

Filter templates by query (case-insensitive substring match).

Searches template id, name, description, and tags.

Parameters:

Name Type Description Default
query str

Search string. Empty string returns all templates.

''

Returns:

Type Description
list[TemplateManifest]

List of matching :class:TemplateManifest objects.

Raises:

Type Description
RuntimeError

If :meth:refresh has not been called yet.

Source code in src/api2mcp/templates/registry.py
def search(self, query: str = "") -> list[TemplateManifest]:
    """Filter templates by *query* (case-insensitive substring match).

    Searches template id, name, description, and tags.

    Args:
        query: Search string. Empty string returns all templates.

    Returns:
        List of matching :class:`TemplateManifest` objects.

    Raises:
        RuntimeError: If :meth:`refresh` has not been called yet.
    """
    self._ensure_index()
    assert self._index is not None
    if not query:
        return list(self._index.templates)
    return [t for t in self._index.templates if t.matches_query(query)]
get(template_id)

Look up a template by exact id.

Parameters:

Name Type Description Default
template_id str

Exact template slug.

required

Returns:

Type Description
TemplateManifest | None

class:TemplateManifest or None if not found.

Source code in src/api2mcp/templates/registry.py
def get(self, template_id: str) -> TemplateManifest | None:
    """Look up a template by exact id.

    Args:
        template_id: Exact template slug.

    Returns:
        :class:`TemplateManifest` or ``None`` if not found.
    """
    self._ensure_index()
    assert self._index is not None
    for t in self._index.templates:
        if t.id == template_id:
            return t
    return None
fetch_manifest(template_id) async

Fetch the full template.yaml manifest from the template's repository.

The summary entry in the registry index only contains subset metadata. This method fetches the full manifest from the template's own repo.

Parameters:

Name Type Description Default
template_id str

Exact template slug.

required

Returns:

Name Type Description
Full TemplateManifest

class:TemplateManifest.

Raises:

Type Description
KeyError

If template_id is not in the registry.

ValueError

If the remote manifest YAML is malformed.

Source code in src/api2mcp/templates/registry.py
async def fetch_manifest(self, template_id: str) -> TemplateManifest:
    """Fetch the full ``template.yaml`` manifest from the template's repository.

    The summary entry in the registry index only contains subset metadata.
    This method fetches the full manifest from the template's own repo.

    Args:
        template_id: Exact template slug.

    Returns:
        Full :class:`TemplateManifest`.

    Raises:
        KeyError:   If *template_id* is not in the registry.
        ValueError: If the remote manifest YAML is malformed.
    """
    summary = self.get(template_id)
    if summary is None:
        available = [t.id for t in (self._index.templates if self._index else [])]
        raise KeyError(
            f"Template {template_id!r} not found in registry. "
            f"Available: {available}"
        )

    manifest_url = _build_raw_url(summary.repository)
    text = await self._fetch_url(manifest_url)
    return TemplateManifest.from_yaml(text)

TemplateInstaller

Downloads and installs MCP server templates.

Parameters:

Name Type Description Default
registry TemplateRegistry | None

:class:TemplateRegistry to look up templates.

None
git_runner Any | None

Callable (args: list[str]) -> None that executes git commands. Injected for testing; defaults to a real subprocess.run wrapper.

None
Source code in src/api2mcp/templates/installer.py
class TemplateInstaller:
    """Downloads and installs MCP server templates.

    Args:
        registry:     :class:`TemplateRegistry` to look up templates.
        git_runner:   Callable ``(args: list[str]) -> None`` that executes git
                      commands.  Injected for testing; defaults to a real
                      ``subprocess.run`` wrapper.
    """

    def __init__(
        self,
        registry: TemplateRegistry | None = None,
        git_runner: Any | None = None,
    ) -> None:
        self._registry = registry or TemplateRegistry()
        self._git_runner = git_runner or _default_git_runner

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------

    async def install(
        self,
        template_id: str,
        *,
        dest: Path,
        version: str | None = None,
    ) -> InstalledTemplate:
        """Install a template into *dest*.

        Fetches the full manifest, resolves the version, clones the
        repository at the requested tag into *dest*, and writes a
        ``installed.yaml`` receipt.

        Args:
            template_id: Template slug from the registry.
            dest:        Directory to install into (created if missing).
            version:     Git tag to install.  ``None`` / ``"latest"`` use the
                         newest version listed in the manifest.

        Returns:
            :class:`InstalledTemplate`

        Raises:
            KeyError:    If *template_id* is not in the registry.
            ValueError:  If *version* is unknown.
            RuntimeError: If git clone fails.
        """
        manifest = await self._registry.fetch_manifest(template_id)
        resolved_version = manifest.resolve_version(version)

        dest = Path(dest)
        dest.mkdir(parents=True, exist_ok=True)

        self._clone(manifest.repository, resolved_version, dest)
        self._write_receipt(dest, manifest, resolved_version)

        return InstalledTemplate(manifest=manifest, version=resolved_version, dest=dest)

    async def update(
        self,
        template_id: str,
        *,
        dest: Path,
        version: str | None = None,
    ) -> InstalledTemplate:
        """Update an existing installation to a newer version.

        Equivalent to a fresh install that overwrites *dest*.

        Args:
            template_id: Template slug.
            dest:        Directory of the existing installation.
            version:     Target version tag.  ``None`` installs the latest.

        Returns:
            :class:`InstalledTemplate`
        """
        if dest.exists():
            shutil.rmtree(dest)
        return await self.install(template_id, dest=dest, version=version)

    @staticmethod
    def read_receipt(dest: Path) -> dict[str, str] | None:
        """Read the ``installed.yaml`` receipt from *dest*.

        Returns:
            Dict with ``id``, ``version``, ``repository`` keys, or ``None``
            if no receipt exists.
        """
        import yaml

        receipt_file = dest / "installed.yaml"
        if not receipt_file.is_file():
            return None
        data = yaml.safe_load(receipt_file.read_text(encoding="utf-8"))
        if isinstance(data, dict):
            return data  # type: ignore[return-value]
        return None

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    def _clone(self, repo_url: str, version: str, dest: Path) -> None:
        """Shallow-clone *repo_url* at *version* into *dest*."""
        # Clone into a temp dir then move contents to avoid nested .git
        with tempfile.TemporaryDirectory() as tmp:
            tmp_path = Path(tmp) / "repo"
            self._git_runner(
                [
                    "git",
                    "clone",
                    "--depth", "1",
                    "--branch", version,
                    repo_url,
                    str(tmp_path),
                ]
            )
            # Move all contents (including hidden files) into dest
            for item in tmp_path.iterdir():
                target = dest / item.name
                if target.exists():
                    if target.is_dir():
                        shutil.rmtree(target)
                    else:
                        target.unlink()
                shutil.move(str(item), str(dest))

    def _write_receipt(
        self, dest: Path, manifest: TemplateManifest, version: str
    ) -> None:
        """Write an ``installed.yaml`` receipt to *dest*."""
        import yaml

        receipt = {
            "id": manifest.id,
            "name": manifest.name,
            "version": version,
            "repository": manifest.repository,
        }
        (dest / "installed.yaml").write_text(
            yaml.dump(receipt, default_flow_style=False), encoding="utf-8"
        )
Functions
install(template_id, *, dest, version=None) async

Install a template into dest.

Fetches the full manifest, resolves the version, clones the repository at the requested tag into dest, and writes a installed.yaml receipt.

Parameters:

Name Type Description Default
template_id str

Template slug from the registry.

required
dest Path

Directory to install into (created if missing).

required
version str | None

Git tag to install. None / "latest" use the newest version listed in the manifest.

None

Returns:

Type Description
InstalledTemplate

class:InstalledTemplate

Raises:

Type Description
KeyError

If template_id is not in the registry.

ValueError

If version is unknown.

RuntimeError

If git clone fails.

Source code in src/api2mcp/templates/installer.py
async def install(
    self,
    template_id: str,
    *,
    dest: Path,
    version: str | None = None,
) -> InstalledTemplate:
    """Install a template into *dest*.

    Fetches the full manifest, resolves the version, clones the
    repository at the requested tag into *dest*, and writes a
    ``installed.yaml`` receipt.

    Args:
        template_id: Template slug from the registry.
        dest:        Directory to install into (created if missing).
        version:     Git tag to install.  ``None`` / ``"latest"`` use the
                     newest version listed in the manifest.

    Returns:
        :class:`InstalledTemplate`

    Raises:
        KeyError:    If *template_id* is not in the registry.
        ValueError:  If *version* is unknown.
        RuntimeError: If git clone fails.
    """
    manifest = await self._registry.fetch_manifest(template_id)
    resolved_version = manifest.resolve_version(version)

    dest = Path(dest)
    dest.mkdir(parents=True, exist_ok=True)

    self._clone(manifest.repository, resolved_version, dest)
    self._write_receipt(dest, manifest, resolved_version)

    return InstalledTemplate(manifest=manifest, version=resolved_version, dest=dest)
update(template_id, *, dest, version=None) async

Update an existing installation to a newer version.

Equivalent to a fresh install that overwrites dest.

Parameters:

Name Type Description Default
template_id str

Template slug.

required
dest Path

Directory of the existing installation.

required
version str | None

Target version tag. None installs the latest.

None

Returns:

Type Description
InstalledTemplate

class:InstalledTemplate

Source code in src/api2mcp/templates/installer.py
async def update(
    self,
    template_id: str,
    *,
    dest: Path,
    version: str | None = None,
) -> InstalledTemplate:
    """Update an existing installation to a newer version.

    Equivalent to a fresh install that overwrites *dest*.

    Args:
        template_id: Template slug.
        dest:        Directory of the existing installation.
        version:     Target version tag.  ``None`` installs the latest.

    Returns:
        :class:`InstalledTemplate`
    """
    if dest.exists():
        shutil.rmtree(dest)
    return await self.install(template_id, dest=dest, version=version)
read_receipt(dest) staticmethod

Read the installed.yaml receipt from dest.

Returns:

Type Description
dict[str, str] | None

Dict with id, version, repository keys, or None

dict[str, str] | None

if no receipt exists.

Source code in src/api2mcp/templates/installer.py
@staticmethod
def read_receipt(dest: Path) -> dict[str, str] | None:
    """Read the ``installed.yaml`` receipt from *dest*.

    Returns:
        Dict with ``id``, ``version``, ``repository`` keys, or ``None``
        if no receipt exists.
    """
    import yaml

    receipt_file = dest / "installed.yaml"
    if not receipt_file.is_file():
        return None
    data = yaml.safe_load(receipt_file.read_text(encoding="utf-8"))
    if isinstance(data, dict):
        return data  # type: ignore[return-value]
    return None

Orchestration

api2mcp.orchestration.adapters

api2mcp.orchestration.adapters.base.MCPToolAdapter

Adapts an MCP tool to LangChain's StructuredTool interface.

Do not instantiate directly — use :meth:from_mcp_tool instead.

The adapter tracks per-tool latency metrics and wraps execution with configurable retry (tenacity exponential backoff) and timeout.

Attributes:

Name Type Description
name

Colon-namespaced tool name, e.g. "github:list_issues".

description

Human-readable tool description forwarded to the LLM.

server_name

MCP server identifier.

timeout_seconds

Per-call asyncio timeout.

retry_count

Maximum tenacity retry attempts.

Source code in src/api2mcp/orchestration/adapters/base.py
class MCPToolAdapter:
    """Adapts an MCP tool to LangChain's StructuredTool interface.

    **Do not instantiate directly** — use :meth:`from_mcp_tool` instead.

    The adapter tracks per-tool latency metrics and wraps execution with
    configurable retry (tenacity exponential backoff) and timeout.

    Attributes:
        name: Colon-namespaced tool name, e.g. ``"github:list_issues"``.
        description: Human-readable tool description forwarded to the LLM.
        server_name: MCP server identifier.
        timeout_seconds: Per-call asyncio timeout.
        retry_count: Maximum tenacity retry attempts.
    """

    def __init__(
        self,
        *,
        session: Any,  # mcp.client.session.ClientSession
        mcp_tool_name: str,
        server_name: str,
        name: str,
        description: str,
        args_schema: type[BaseModel],
        timeout_seconds: float = 30.0,
        retry_count: int = 3,
        response_transformer: Callable[[str], str] | None = None,
    ) -> None:
        self._session = session
        self._mcp_tool_name = mcp_tool_name
        self.server_name = server_name
        self.name = name
        self.description = description
        self.args_schema = args_schema
        self.timeout_seconds = timeout_seconds
        self.retry_count = retry_count
        self._response_transformer = response_transformer
        self._call_count: int = 0
        self._total_latency_ms: float = 0.0

    # ------------------------------------------------------------------
    # Factory (returns StructuredTool)
    # ------------------------------------------------------------------

    @classmethod
    async def from_mcp_tool(
        cls,
        session: Any,
        tool: Any,  # mcp.types.Tool
        server_name: str,
        *,
        timeout_seconds: float = 30.0,
        retry_count: int = 3,
        response_transformer: Callable[[str], str] | None = None,
    ) -> StructuredTool:
        """Convert an MCP :class:`~mcp.types.Tool` into a LangChain StructuredTool.

        Args:
            session: Active :class:`~mcp.client.session.ClientSession`.
            tool: The :class:`~mcp.types.Tool` definition from ``list_tools()``.
            server_name: Identifier for the MCP server (used in namespacing).
            timeout_seconds: Per-call asyncio timeout in seconds.
            retry_count: Maximum retry attempts on transient errors.
            response_transformer: Optional callable applied to the raw result
                string before returning to the LLM.

        Returns:
            A :class:`~langchain_core.tools.StructuredTool` ready for use in
            LangGraph agents.

        Example::

            tool = await MCPToolAdapter.from_mcp_tool(session, mcp_tool, "github")
            result = await tool.ainvoke({"owner": "user", "repo": "project"})
        """
        schema: dict[str, Any] = tool.inputSchema if tool.inputSchema else {}
        args_schema = _json_schema_to_pydantic(tool.name, schema)

        namespaced_name = f"{server_name}:{tool.name}"
        description = (
            tool.description
            or f"MCP tool '{tool.name}' from server '{server_name}'"
        )

        adapter = cls(
            session=session,
            mcp_tool_name=tool.name,
            server_name=server_name,
            name=namespaced_name,
            description=description,
            args_schema=args_schema,
            timeout_seconds=timeout_seconds,
            retry_count=retry_count,
            response_transformer=response_transformer,
        )
        return adapter.to_structured_tool()

    # ------------------------------------------------------------------
    # Execution
    # ------------------------------------------------------------------

    async def _execute(self, **kwargs: Any) -> str:
        """Execute the MCP tool call with retry and timeout.

        Retries on :class:`asyncio.TimeoutError` and :class:`ConnectionError`
        using exponential backoff (tenacity).

        Args:
            **kwargs: Arguments matching :attr:`args_schema` fields.

        Returns:
            Text content extracted from the MCP ``CallToolResult``.

        Raises:
            RuntimeError: If the tool returns an error payload or all retries
                are exhausted.
            asyncio.TimeoutError: If the call exceeds :attr:`timeout_seconds`
                on the final attempt.
        """
        arguments = {k: v for k, v in kwargs.items() if v is not None} or None

        async for attempt in AsyncRetrying(
            stop=stop_after_attempt(self.retry_count),
            wait=wait_exponential(multiplier=1, min=1, max=10),
            retry=retry_if_exception_type(
                (asyncio.TimeoutError, ConnectionError, OSError)
            ),
            reraise=True,
        ):
            with attempt:
                t_start = time.monotonic()
                try:
                    result = await asyncio.wait_for(
                        self._session.call_tool(
                            name=self._mcp_tool_name,
                            arguments=arguments,
                        ),
                        timeout=self.timeout_seconds,
                    )
                except TimeoutError:
                    elapsed_ms = (time.monotonic() - t_start) * 1000
                    logger.warning(
                        "Tool '%s' timed out after %.0fms (limit=%.1fs)",
                        self.name,
                        elapsed_ms,
                        self.timeout_seconds,
                    )
                    raise
                finally:
                    elapsed_ms = (time.monotonic() - t_start) * 1000
                    self._call_count += 1
                    self._total_latency_ms += elapsed_ms

                if getattr(result, "isError", False):
                    error_text = _extract_text(result.content)
                    logger.error(
                        "Tool '%s' returned MCP error: %s", self.name, error_text
                    )
                    raise RuntimeError(
                        f"MCP tool '{self._mcp_tool_name}' error: {error_text}"
                    )

                raw = _extract_text(result.content)
                return (
                    self._response_transformer(raw)
                    if self._response_transformer
                    else raw
                )

        # Unreachable — tenacity reraises; silences mypy
        raise RuntimeError(
            f"Tool '{self.name}' failed after {self.retry_count} attempts"
        )

    # ------------------------------------------------------------------
    # StructuredTool factory
    # ------------------------------------------------------------------

    def to_structured_tool(self) -> StructuredTool:
        """Return a :class:`~langchain_core.tools.StructuredTool` wrapping this adapter.

        The returned tool uses the bound :meth:`_execute` coroutine and the
        dynamically generated Pydantic :attr:`args_schema`.
        """
        return StructuredTool.from_function(
            coroutine=self._execute,
            name=self.name,
            description=self.description,
            args_schema=self.args_schema,
        )

    # ------------------------------------------------------------------
    # Metrics
    # ------------------------------------------------------------------

    @property
    def call_count(self) -> int:
        """Total number of calls attempted through this adapter."""
        return self._call_count

    @property
    def avg_latency_ms(self) -> float:
        """Average call latency in milliseconds (0.0 if no calls made)."""
        if self._call_count == 0:
            return 0.0
        return self._total_latency_ms / self._call_count

    def metrics(self) -> dict[str, Any]:
        """Return a snapshot of call metrics for this adapter.

        Returns:
            Dict with ``name``, ``server_name``, ``mcp_tool_name``,
            ``call_count``, ``avg_latency_ms``, ``total_latency_ms``.
        """
        return {
            "name": self.name,
            "server_name": self.server_name,
            "mcp_tool_name": self._mcp_tool_name,
            "call_count": self._call_count,
            "avg_latency_ms": round(self.avg_latency_ms, 2),
            "total_latency_ms": round(self._total_latency_ms, 2),
        }

Attributes

call_count property

Total number of calls attempted through this adapter.

avg_latency_ms property

Average call latency in milliseconds (0.0 if no calls made).

Functions

from_mcp_tool(session, tool, server_name, *, timeout_seconds=30.0, retry_count=3, response_transformer=None) async classmethod

Convert an MCP :class:~mcp.types.Tool into a LangChain StructuredTool.

Parameters:

Name Type Description Default
session Any

Active :class:~mcp.client.session.ClientSession.

required
tool Any

The :class:~mcp.types.Tool definition from list_tools().

required
server_name str

Identifier for the MCP server (used in namespacing).

required
timeout_seconds float

Per-call asyncio timeout in seconds.

30.0
retry_count int

Maximum retry attempts on transient errors.

3
response_transformer Callable[[str], str] | None

Optional callable applied to the raw result string before returning to the LLM.

None

Returns:

Name Type Description
A StructuredTool

class:~langchain_core.tools.StructuredTool ready for use in

StructuredTool

LangGraph agents.

Example::

tool = await MCPToolAdapter.from_mcp_tool(session, mcp_tool, "github")
result = await tool.ainvoke({"owner": "user", "repo": "project"})
Source code in src/api2mcp/orchestration/adapters/base.py
@classmethod
async def from_mcp_tool(
    cls,
    session: Any,
    tool: Any,  # mcp.types.Tool
    server_name: str,
    *,
    timeout_seconds: float = 30.0,
    retry_count: int = 3,
    response_transformer: Callable[[str], str] | None = None,
) -> StructuredTool:
    """Convert an MCP :class:`~mcp.types.Tool` into a LangChain StructuredTool.

    Args:
        session: Active :class:`~mcp.client.session.ClientSession`.
        tool: The :class:`~mcp.types.Tool` definition from ``list_tools()``.
        server_name: Identifier for the MCP server (used in namespacing).
        timeout_seconds: Per-call asyncio timeout in seconds.
        retry_count: Maximum retry attempts on transient errors.
        response_transformer: Optional callable applied to the raw result
            string before returning to the LLM.

    Returns:
        A :class:`~langchain_core.tools.StructuredTool` ready for use in
        LangGraph agents.

    Example::

        tool = await MCPToolAdapter.from_mcp_tool(session, mcp_tool, "github")
        result = await tool.ainvoke({"owner": "user", "repo": "project"})
    """
    schema: dict[str, Any] = tool.inputSchema if tool.inputSchema else {}
    args_schema = _json_schema_to_pydantic(tool.name, schema)

    namespaced_name = f"{server_name}:{tool.name}"
    description = (
        tool.description
        or f"MCP tool '{tool.name}' from server '{server_name}'"
    )

    adapter = cls(
        session=session,
        mcp_tool_name=tool.name,
        server_name=server_name,
        name=namespaced_name,
        description=description,
        args_schema=args_schema,
        timeout_seconds=timeout_seconds,
        retry_count=retry_count,
        response_transformer=response_transformer,
    )
    return adapter.to_structured_tool()

to_structured_tool()

Return a :class:~langchain_core.tools.StructuredTool wrapping this adapter.

The returned tool uses the bound :meth:_execute coroutine and the dynamically generated Pydantic :attr:args_schema.

Source code in src/api2mcp/orchestration/adapters/base.py
def to_structured_tool(self) -> StructuredTool:
    """Return a :class:`~langchain_core.tools.StructuredTool` wrapping this adapter.

    The returned tool uses the bound :meth:`_execute` coroutine and the
    dynamically generated Pydantic :attr:`args_schema`.
    """
    return StructuredTool.from_function(
        coroutine=self._execute,
        name=self.name,
        description=self.description,
        args_schema=self.args_schema,
    )

metrics()

Return a snapshot of call metrics for this adapter.

Returns:

Type Description
dict[str, Any]

Dict with name, server_name, mcp_tool_name,

dict[str, Any]

call_count, avg_latency_ms, total_latency_ms.

Source code in src/api2mcp/orchestration/adapters/base.py
def metrics(self) -> dict[str, Any]:
    """Return a snapshot of call metrics for this adapter.

    Returns:
        Dict with ``name``, ``server_name``, ``mcp_tool_name``,
        ``call_count``, ``avg_latency_ms``, ``total_latency_ms``.
    """
    return {
        "name": self.name,
        "server_name": self.server_name,
        "mcp_tool_name": self._mcp_tool_name,
        "call_count": self._call_count,
        "avg_latency_ms": round(self.avg_latency_ms, 2),
        "total_latency_ms": round(self._total_latency_ms, 2),
    }

api2mcp.orchestration.adapters.registry.MCPToolRegistry

Central registry for MCP tools across multiple servers.

Supports two registration modes:

  1. Session-based (eager): pass an already-active :class:~mcp.client.session.ClientSession to :meth:register_server. The caller owns session lifecycle.

  2. Config-based (lazy): pass a :class:ServerConfig to :meth:register_server_config, then call :meth:connect_server / :meth:connect_all to establish subprocess connections managed internally. Call :meth:close to clean up.

Tool names use colon namespacing::

"github:list_issues"
"jira:create_ticket"

Parameters:

Name Type Description Default
default_timeout float

Per-call timeout forwarded to every adapter.

30.0
default_retry_count int

Retry count forwarded to every adapter.

3
Source code in src/api2mcp/orchestration/adapters/registry.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
class MCPToolRegistry:
    """Central registry for MCP tools across multiple servers.

    Supports two registration modes:

    1. **Session-based** (eager): pass an already-active
       :class:`~mcp.client.session.ClientSession` to
       :meth:`register_server`.  The caller owns session lifecycle.

    2. **Config-based** (lazy): pass a :class:`ServerConfig` to
       :meth:`register_server_config`, then call :meth:`connect_server` /
       :meth:`connect_all` to establish subprocess connections managed
       internally.  Call :meth:`close` to clean up.

    Tool names use colon namespacing::

        "github:list_issues"
        "jira:create_ticket"

    Args:
        default_timeout: Per-call timeout forwarded to every adapter.
        default_retry_count: Retry count forwarded to every adapter.
    """

    def __init__(
        self,
        *,
        default_timeout: float = 30.0,
        default_retry_count: int = 3,
    ) -> None:
        self._default_timeout = default_timeout
        self._default_retry_count = default_retry_count
        # server_name → ClientSession
        self._sessions: dict[str, Any] = {}
        # namespaced_name → StructuredTool
        self._tools: dict[str, StructuredTool] = {}
        # namespaced_name → MCPToolAdapter (for usage stats)
        self._adapters: dict[str, MCPToolAdapter] = {}
        # server_name → [namespaced_names]
        self._server_tool_names: dict[str, list[str]] = {}
        # server_name → ServerConfig (for lazy connections)
        self._configs: dict[str, ServerConfig] = {}
        # manages subprocess lifecycles for config-based connections
        self._exit_stack: contextlib.AsyncExitStack = contextlib.AsyncExitStack()
        # protects concurrent mutations to internal dicts
        self._lock: asyncio.Lock = asyncio.Lock()

    # ------------------------------------------------------------------
    # Async context manager
    # ------------------------------------------------------------------

    async def __aenter__(self) -> MCPToolRegistry:
        await self._exit_stack.__aenter__()
        return self

    async def __aexit__(self, *args: Any) -> None:
        await self._exit_stack.__aexit__(*args)

    # ------------------------------------------------------------------
    # Config-based (lazy) registration
    # ------------------------------------------------------------------

    async def register_server_config(self, config: ServerConfig) -> None:
        """Register a :class:`ServerConfig` without connecting.

        The actual subprocess connection is deferred to
        :meth:`connect_server` or :meth:`connect_all`.

        Args:
            config: Server configuration to store.
        """
        async with self._lock:
            self._configs[config.name] = config
        logger.debug("Registered config for server '%s'", config.name)

    async def connect_server(self, name: str) -> Any:
        """Establish a subprocess connection for a registered config.

        Launches the subprocess described by the stored
        :class:`ServerConfig`, creates a
        :class:`~mcp.client.session.ClientSession`, initialises the
        protocol, and calls :meth:`register_server` to discover tools.

        If the server is already connected, returns the existing session.

        Args:
            name: Server name matching a previously registered
                :class:`ServerConfig`.

        Returns:
            The active :class:`~mcp.client.session.ClientSession`.

        Raises:
            ValueError: If no config is registered under *name*.
            ImportError: If the ``mcp`` package is not installed.
        """
        if name not in self._configs:
            raise ValueError(
                f"No ServerConfig registered for '{name}'. "
                "Call register_server_config() first."
            )
        if name in self._sessions:
            logger.debug("Server '%s' already connected, reusing session", name)
            return self._sessions[name]

        config = self._configs[name]

        try:
            from mcp import ClientSession, StdioServerParameters
            from mcp.client.stdio import stdio_client
        except ImportError as exc:  # pragma: no cover
            raise ImportError(
                "The 'mcp' package is required for subprocess connections. "
                "Install it with: pip install mcp"
            ) from exc

        server_params = StdioServerParameters(
            command=config.command,
            args=config.args,
            env=config.env if config.env else None,
        )

        read, write = await self._exit_stack.enter_async_context(
            stdio_client(server_params)
        )
        session: Any = await self._exit_stack.enter_async_context(
            ClientSession(read, write)
        )
        await session.initialize()
        await self.register_server(name, session)
        logger.info("Connected to server '%s' (command=%s)", name, config.command)
        return session

    async def connect_all(self) -> None:
        """Connect all registered :class:`ServerConfig` instances.

        Skips servers that are already connected.  Errors from individual
        servers are logged and re-raised immediately.
        """
        async with self._lock:
            pending = [name for name in self._configs if name not in self._sessions]
        for name in pending:
            await self.connect_server(name)

    # ------------------------------------------------------------------
    # Session-based (eager) registration
    # ------------------------------------------------------------------

    async def register_server(
        self,
        server_name: str,
        session: Any,  # mcp.client.session.ClientSession
        *,
        timeout_seconds: float | None = None,
        retry_count: int | None = None,
    ) -> list[str]:
        """Discover all tools from *session* and register them under *server_name*.

        Calls ``session.list_tools()`` once and converts every returned tool
        into a :class:`~langchain_core.tools.StructuredTool`.

        Args:
            server_name: Logical server identifier (namespace prefix).
            session: Active MCP ClientSession.
            timeout_seconds: Per-adapter timeout override (defaults to
                :attr:`default_timeout`).
            retry_count: Per-adapter retry count override (defaults to
                :attr:`default_retry_count`).

        Returns:
            List of registered colon-namespaced tool names.
        """
        timeout = timeout_seconds if timeout_seconds is not None else self._default_timeout
        retries = retry_count if retry_count is not None else self._default_retry_count

        # Perform async I/O outside the lock to avoid holding it across awaits.
        list_result = await session.list_tools()
        mcp_tools = list_result.tools

        registered: list[str] = []
        new_adapters: dict[str, MCPToolAdapter] = {}
        new_tools: dict[str, StructuredTool] = {}
        for mcp_tool in mcp_tools:
            schema: dict[str, Any] = mcp_tool.inputSchema if mcp_tool.inputSchema else {}
            args_schema = _json_schema_to_pydantic(mcp_tool.name, schema)
            namespaced_name = f"{server_name}:{mcp_tool.name}"
            description = (
                mcp_tool.description
                or f"MCP tool '{mcp_tool.name}' from server '{server_name}'"
            )

            adapter = MCPToolAdapter(
                session=session,
                mcp_tool_name=mcp_tool.name,
                server_name=server_name,
                name=namespaced_name,
                description=description,
                args_schema=args_schema,
                timeout_seconds=timeout,
                retry_count=retries,
            )
            new_adapters[namespaced_name] = adapter
            new_tools[namespaced_name] = adapter.to_structured_tool()
            registered.append(namespaced_name)
            logger.debug("Registered tool '%s'", namespaced_name)

        # Commit all dict mutations atomically under the lock.
        async with self._lock:
            self._sessions[server_name] = session
            self._adapters.update(new_adapters)
            self._tools.update(new_tools)
            self._server_tool_names[server_name] = registered

        logger.info(
            "Registered %d tool(s) from server '%s'", len(registered), server_name
        )
        return registered

    async def register_tool(
        self,
        server_name: str,
        tool: StructuredTool,
    ) -> str:
        """Manually register a pre-built :class:`~langchain_core.tools.StructuredTool`.

        Useful for testing or when the tool is built outside the registry.

        Args:
            server_name: Logical server identifier (namespace prefix).
            tool: The StructuredTool to register.

        Returns:
            The colon-namespaced name used in the registry.
        """
        if ":" in tool.name:
            namespaced = tool.name
        else:
            namespaced = f"{server_name}:{tool.name}"

        async with self._lock:
            self._tools[namespaced] = tool
            self._server_tool_names.setdefault(server_name, []).append(namespaced)
        return namespaced

    async def unregister_server(self, server_name: str) -> bool:
        """Remove all tools, adapters, and the session for *server_name*.

        Note: Subprocess connections established via :meth:`connect_server`
        are only fully closed when :meth:`close` is called.  This method
        only removes the server from the registry's lookup tables.

        Returns:
            ``True`` if the server was registered, ``False`` otherwise.
        """
        async with self._lock:
            if server_name not in self._sessions:
                return False

            del self._sessions[server_name]
            for name in self._server_tool_names.pop(server_name, []):
                self._tools.pop(name, None)
                self._adapters.pop(name, None)
            self._configs.pop(server_name, None)

        logger.info("Unregistered server '%s'", server_name)
        return True

    # ------------------------------------------------------------------
    # Retrieval
    # ------------------------------------------------------------------

    def get_tool(self, namespaced_name: str) -> StructuredTool | None:
        """Return the tool registered as *namespaced_name*, or ``None``.

        Args:
            namespaced_name: Colon-namespaced name, e.g. ``"github:list_issues"``.
        """
        return self._tools.get(namespaced_name)

    def get_tools(
        self,
        *,
        server: str | None = None,
        category: str | None = None,
        pattern: str | None = None,
    ) -> list[StructuredTool]:
        """Return registered tools, optionally filtered.

        Filters are combined (AND semantics).

        Args:
            server: If given, return only tools from this server.
            category: If given, filter by inferred category.  Supported
                values: ``"read"``, ``"write"``, ``"other"``.
            pattern: If given, filter tool names using :mod:`fnmatch`
                glob syntax, e.g. ``"github:list_*"``.

        Returns:
            Filtered list of :class:`~langchain_core.tools.StructuredTool`.
        """
        items: list[tuple[str, StructuredTool]] = list(self._tools.items())

        if server is not None:
            prefix = f"{server}:"
            items = [(n, t) for n, t in items if n.startswith(prefix)]

        if category is not None:
            items = [(n, t) for n, t in items if _infer_category(n) == category]

        if pattern is not None:
            items = [(n, t) for n, t in items if fnmatch.fnmatch(n, pattern)]

        return [t for _, t in items]

    # ------------------------------------------------------------------
    # Introspection
    # ------------------------------------------------------------------

    def registered_servers(self) -> list[str]:
        """Return a list of all registered server names."""
        return list(self._sessions.keys())

    def list_servers(self) -> list[str]:
        """Return all registered server names (alias for :meth:`registered_servers`)."""
        return self.registered_servers()

    def list_categories(self) -> list[str]:
        """Return the distinct inferred categories present in the registry.

        Returns:
            Sorted list of category strings (e.g. ``["other", "read", "write"]``).
        """
        return sorted({_infer_category(name) for name in self._tools})

    def registered_tools(self) -> list[str]:
        """Return all registered colon-namespaced tool names."""
        return list(self._tools.keys())

    def tools_for_server(self, server_name: str) -> list[str]:
        """Return colon-namespaced names for all tools from *server_name*.

        Returns an empty list if the server is not registered.
        """
        return list(self._server_tool_names.get(server_name, []))

    # ------------------------------------------------------------------
    # Usage statistics
    # ------------------------------------------------------------------

    def get_usage_stats(self) -> dict[str, dict[str, Any]]:
        """Return usage statistics for all tools with registered adapters.

        Tools registered manually via :meth:`register_tool` are excluded
        (no adapter to track calls).

        Returns:
            Mapping of namespaced tool name → metrics dict.  Each metrics
            dict contains ``name``, ``server_name``, ``mcp_tool_name``,
            ``call_count``, ``avg_latency_ms``, ``total_latency_ms``.
        """
        return {name: adapter.metrics() for name, adapter in self._adapters.items()}

    # ------------------------------------------------------------------
    # Lifecycle
    # ------------------------------------------------------------------

    async def close(self) -> None:
        """Close all subprocess connections managed by this registry.

        Calls :meth:`contextlib.AsyncExitStack.aclose` which tears down
        every :class:`~mcp.client.session.ClientSession` and subprocess
        established via :meth:`connect_server`.

        Sessions registered directly with :meth:`register_server` are
        **not** closed here — their lifecycle is managed by the caller.
        """
        await self._exit_stack.aclose()
        logger.info("Closed all managed server connections")

    # ------------------------------------------------------------------
    # Dunder helpers
    # ------------------------------------------------------------------

    def __len__(self) -> int:
        return len(self._tools)

    def __contains__(self, namespaced_name: str) -> bool:
        return namespaced_name in self._tools

Functions

register_server_config(config) async

Register a :class:ServerConfig without connecting.

The actual subprocess connection is deferred to :meth:connect_server or :meth:connect_all.

Parameters:

Name Type Description Default
config ServerConfig

Server configuration to store.

required
Source code in src/api2mcp/orchestration/adapters/registry.py
async def register_server_config(self, config: ServerConfig) -> None:
    """Register a :class:`ServerConfig` without connecting.

    The actual subprocess connection is deferred to
    :meth:`connect_server` or :meth:`connect_all`.

    Args:
        config: Server configuration to store.
    """
    async with self._lock:
        self._configs[config.name] = config
    logger.debug("Registered config for server '%s'", config.name)

connect_server(name) async

Establish a subprocess connection for a registered config.

Launches the subprocess described by the stored :class:ServerConfig, creates a :class:~mcp.client.session.ClientSession, initialises the protocol, and calls :meth:register_server to discover tools.

If the server is already connected, returns the existing session.

Parameters:

Name Type Description Default
name str

Server name matching a previously registered :class:ServerConfig.

required

Returns:

Type Description
Any

The active :class:~mcp.client.session.ClientSession.

Raises:

Type Description
ValueError

If no config is registered under name.

ImportError

If the mcp package is not installed.

Source code in src/api2mcp/orchestration/adapters/registry.py
async def connect_server(self, name: str) -> Any:
    """Establish a subprocess connection for a registered config.

    Launches the subprocess described by the stored
    :class:`ServerConfig`, creates a
    :class:`~mcp.client.session.ClientSession`, initialises the
    protocol, and calls :meth:`register_server` to discover tools.

    If the server is already connected, returns the existing session.

    Args:
        name: Server name matching a previously registered
            :class:`ServerConfig`.

    Returns:
        The active :class:`~mcp.client.session.ClientSession`.

    Raises:
        ValueError: If no config is registered under *name*.
        ImportError: If the ``mcp`` package is not installed.
    """
    if name not in self._configs:
        raise ValueError(
            f"No ServerConfig registered for '{name}'. "
            "Call register_server_config() first."
        )
    if name in self._sessions:
        logger.debug("Server '%s' already connected, reusing session", name)
        return self._sessions[name]

    config = self._configs[name]

    try:
        from mcp import ClientSession, StdioServerParameters
        from mcp.client.stdio import stdio_client
    except ImportError as exc:  # pragma: no cover
        raise ImportError(
            "The 'mcp' package is required for subprocess connections. "
            "Install it with: pip install mcp"
        ) from exc

    server_params = StdioServerParameters(
        command=config.command,
        args=config.args,
        env=config.env if config.env else None,
    )

    read, write = await self._exit_stack.enter_async_context(
        stdio_client(server_params)
    )
    session: Any = await self._exit_stack.enter_async_context(
        ClientSession(read, write)
    )
    await session.initialize()
    await self.register_server(name, session)
    logger.info("Connected to server '%s' (command=%s)", name, config.command)
    return session

connect_all() async

Connect all registered :class:ServerConfig instances.

Skips servers that are already connected. Errors from individual servers are logged and re-raised immediately.

Source code in src/api2mcp/orchestration/adapters/registry.py
async def connect_all(self) -> None:
    """Connect all registered :class:`ServerConfig` instances.

    Skips servers that are already connected.  Errors from individual
    servers are logged and re-raised immediately.
    """
    async with self._lock:
        pending = [name for name in self._configs if name not in self._sessions]
    for name in pending:
        await self.connect_server(name)

register_server(server_name, session, *, timeout_seconds=None, retry_count=None) async

Discover all tools from session and register them under server_name.

Calls session.list_tools() once and converts every returned tool into a :class:~langchain_core.tools.StructuredTool.

Parameters:

Name Type Description Default
server_name str

Logical server identifier (namespace prefix).

required
session Any

Active MCP ClientSession.

required
timeout_seconds float | None

Per-adapter timeout override (defaults to :attr:default_timeout).

None
retry_count int | None

Per-adapter retry count override (defaults to :attr:default_retry_count).

None

Returns:

Type Description
list[str]

List of registered colon-namespaced tool names.

Source code in src/api2mcp/orchestration/adapters/registry.py
async def register_server(
    self,
    server_name: str,
    session: Any,  # mcp.client.session.ClientSession
    *,
    timeout_seconds: float | None = None,
    retry_count: int | None = None,
) -> list[str]:
    """Discover all tools from *session* and register them under *server_name*.

    Calls ``session.list_tools()`` once and converts every returned tool
    into a :class:`~langchain_core.tools.StructuredTool`.

    Args:
        server_name: Logical server identifier (namespace prefix).
        session: Active MCP ClientSession.
        timeout_seconds: Per-adapter timeout override (defaults to
            :attr:`default_timeout`).
        retry_count: Per-adapter retry count override (defaults to
            :attr:`default_retry_count`).

    Returns:
        List of registered colon-namespaced tool names.
    """
    timeout = timeout_seconds if timeout_seconds is not None else self._default_timeout
    retries = retry_count if retry_count is not None else self._default_retry_count

    # Perform async I/O outside the lock to avoid holding it across awaits.
    list_result = await session.list_tools()
    mcp_tools = list_result.tools

    registered: list[str] = []
    new_adapters: dict[str, MCPToolAdapter] = {}
    new_tools: dict[str, StructuredTool] = {}
    for mcp_tool in mcp_tools:
        schema: dict[str, Any] = mcp_tool.inputSchema if mcp_tool.inputSchema else {}
        args_schema = _json_schema_to_pydantic(mcp_tool.name, schema)
        namespaced_name = f"{server_name}:{mcp_tool.name}"
        description = (
            mcp_tool.description
            or f"MCP tool '{mcp_tool.name}' from server '{server_name}'"
        )

        adapter = MCPToolAdapter(
            session=session,
            mcp_tool_name=mcp_tool.name,
            server_name=server_name,
            name=namespaced_name,
            description=description,
            args_schema=args_schema,
            timeout_seconds=timeout,
            retry_count=retries,
        )
        new_adapters[namespaced_name] = adapter
        new_tools[namespaced_name] = adapter.to_structured_tool()
        registered.append(namespaced_name)
        logger.debug("Registered tool '%s'", namespaced_name)

    # Commit all dict mutations atomically under the lock.
    async with self._lock:
        self._sessions[server_name] = session
        self._adapters.update(new_adapters)
        self._tools.update(new_tools)
        self._server_tool_names[server_name] = registered

    logger.info(
        "Registered %d tool(s) from server '%s'", len(registered), server_name
    )
    return registered

register_tool(server_name, tool) async

Manually register a pre-built :class:~langchain_core.tools.StructuredTool.

Useful for testing or when the tool is built outside the registry.

Parameters:

Name Type Description Default
server_name str

Logical server identifier (namespace prefix).

required
tool StructuredTool

The StructuredTool to register.

required

Returns:

Type Description
str

The colon-namespaced name used in the registry.

Source code in src/api2mcp/orchestration/adapters/registry.py
async def register_tool(
    self,
    server_name: str,
    tool: StructuredTool,
) -> str:
    """Manually register a pre-built :class:`~langchain_core.tools.StructuredTool`.

    Useful for testing or when the tool is built outside the registry.

    Args:
        server_name: Logical server identifier (namespace prefix).
        tool: The StructuredTool to register.

    Returns:
        The colon-namespaced name used in the registry.
    """
    if ":" in tool.name:
        namespaced = tool.name
    else:
        namespaced = f"{server_name}:{tool.name}"

    async with self._lock:
        self._tools[namespaced] = tool
        self._server_tool_names.setdefault(server_name, []).append(namespaced)
    return namespaced

unregister_server(server_name) async

Remove all tools, adapters, and the session for server_name.

Note: Subprocess connections established via :meth:connect_server are only fully closed when :meth:close is called. This method only removes the server from the registry's lookup tables.

Returns:

Type Description
bool

True if the server was registered, False otherwise.

Source code in src/api2mcp/orchestration/adapters/registry.py
async def unregister_server(self, server_name: str) -> bool:
    """Remove all tools, adapters, and the session for *server_name*.

    Note: Subprocess connections established via :meth:`connect_server`
    are only fully closed when :meth:`close` is called.  This method
    only removes the server from the registry's lookup tables.

    Returns:
        ``True`` if the server was registered, ``False`` otherwise.
    """
    async with self._lock:
        if server_name not in self._sessions:
            return False

        del self._sessions[server_name]
        for name in self._server_tool_names.pop(server_name, []):
            self._tools.pop(name, None)
            self._adapters.pop(name, None)
        self._configs.pop(server_name, None)

    logger.info("Unregistered server '%s'", server_name)
    return True

get_tool(namespaced_name)

Return the tool registered as namespaced_name, or None.

Parameters:

Name Type Description Default
namespaced_name str

Colon-namespaced name, e.g. "github:list_issues".

required
Source code in src/api2mcp/orchestration/adapters/registry.py
def get_tool(self, namespaced_name: str) -> StructuredTool | None:
    """Return the tool registered as *namespaced_name*, or ``None``.

    Args:
        namespaced_name: Colon-namespaced name, e.g. ``"github:list_issues"``.
    """
    return self._tools.get(namespaced_name)

get_tools(*, server=None, category=None, pattern=None)

Return registered tools, optionally filtered.

Filters are combined (AND semantics).

Parameters:

Name Type Description Default
server str | None

If given, return only tools from this server.

None
category str | None

If given, filter by inferred category. Supported values: "read", "write", "other".

None
pattern str | None

If given, filter tool names using :mod:fnmatch glob syntax, e.g. "github:list_*".

None

Returns:

Type Description
list[StructuredTool]

Filtered list of :class:~langchain_core.tools.StructuredTool.

Source code in src/api2mcp/orchestration/adapters/registry.py
def get_tools(
    self,
    *,
    server: str | None = None,
    category: str | None = None,
    pattern: str | None = None,
) -> list[StructuredTool]:
    """Return registered tools, optionally filtered.

    Filters are combined (AND semantics).

    Args:
        server: If given, return only tools from this server.
        category: If given, filter by inferred category.  Supported
            values: ``"read"``, ``"write"``, ``"other"``.
        pattern: If given, filter tool names using :mod:`fnmatch`
            glob syntax, e.g. ``"github:list_*"``.

    Returns:
        Filtered list of :class:`~langchain_core.tools.StructuredTool`.
    """
    items: list[tuple[str, StructuredTool]] = list(self._tools.items())

    if server is not None:
        prefix = f"{server}:"
        items = [(n, t) for n, t in items if n.startswith(prefix)]

    if category is not None:
        items = [(n, t) for n, t in items if _infer_category(n) == category]

    if pattern is not None:
        items = [(n, t) for n, t in items if fnmatch.fnmatch(n, pattern)]

    return [t for _, t in items]

registered_servers()

Return a list of all registered server names.

Source code in src/api2mcp/orchestration/adapters/registry.py
def registered_servers(self) -> list[str]:
    """Return a list of all registered server names."""
    return list(self._sessions.keys())

list_servers()

Return all registered server names (alias for :meth:registered_servers).

Source code in src/api2mcp/orchestration/adapters/registry.py
def list_servers(self) -> list[str]:
    """Return all registered server names (alias for :meth:`registered_servers`)."""
    return self.registered_servers()

list_categories()

Return the distinct inferred categories present in the registry.

Returns:

Type Description
list[str]

Sorted list of category strings (e.g. ["other", "read", "write"]).

Source code in src/api2mcp/orchestration/adapters/registry.py
def list_categories(self) -> list[str]:
    """Return the distinct inferred categories present in the registry.

    Returns:
        Sorted list of category strings (e.g. ``["other", "read", "write"]``).
    """
    return sorted({_infer_category(name) for name in self._tools})

registered_tools()

Return all registered colon-namespaced tool names.

Source code in src/api2mcp/orchestration/adapters/registry.py
def registered_tools(self) -> list[str]:
    """Return all registered colon-namespaced tool names."""
    return list(self._tools.keys())

tools_for_server(server_name)

Return colon-namespaced names for all tools from server_name.

Returns an empty list if the server is not registered.

Source code in src/api2mcp/orchestration/adapters/registry.py
def tools_for_server(self, server_name: str) -> list[str]:
    """Return colon-namespaced names for all tools from *server_name*.

    Returns an empty list if the server is not registered.
    """
    return list(self._server_tool_names.get(server_name, []))

get_usage_stats()

Return usage statistics for all tools with registered adapters.

Tools registered manually via :meth:register_tool are excluded (no adapter to track calls).

Returns:

Type Description
dict[str, dict[str, Any]]

Mapping of namespaced tool name → metrics dict. Each metrics

dict[str, dict[str, Any]]

dict contains name, server_name, mcp_tool_name,

dict[str, dict[str, Any]]

call_count, avg_latency_ms, total_latency_ms.

Source code in src/api2mcp/orchestration/adapters/registry.py
def get_usage_stats(self) -> dict[str, dict[str, Any]]:
    """Return usage statistics for all tools with registered adapters.

    Tools registered manually via :meth:`register_tool` are excluded
    (no adapter to track calls).

    Returns:
        Mapping of namespaced tool name → metrics dict.  Each metrics
        dict contains ``name``, ``server_name``, ``mcp_tool_name``,
        ``call_count``, ``avg_latency_ms``, ``total_latency_ms``.
    """
    return {name: adapter.metrics() for name, adapter in self._adapters.items()}

close() async

Close all subprocess connections managed by this registry.

Calls :meth:contextlib.AsyncExitStack.aclose which tears down every :class:~mcp.client.session.ClientSession and subprocess established via :meth:connect_server.

Sessions registered directly with :meth:register_server are not closed here — their lifecycle is managed by the caller.

Source code in src/api2mcp/orchestration/adapters/registry.py
async def close(self) -> None:
    """Close all subprocess connections managed by this registry.

    Calls :meth:`contextlib.AsyncExitStack.aclose` which tears down
    every :class:`~mcp.client.session.ClientSession` and subprocess
    established via :meth:`connect_server`.

    Sessions registered directly with :meth:`register_server` are
    **not** closed here — their lifecycle is managed by the caller.
    """
    await self._exit_stack.aclose()
    logger.info("Closed all managed server connections")

Graphs

api2mcp.orchestration.graphs.reactive.ReactiveGraph

Bases: BaseAPIGraph

Single-API reactive (ReAct) agent graph.

Wraps langgraph.prebuilt.create_react_agent with an API2MCP-specific system prompt and tool set sourced from :class:~api2mcp.orchestration.adapters.registry.MCPToolRegistry.

Parameters:

Name Type Description Default
model BaseChatModel

LangChain chat model for the agent node.

required
registry MCPToolRegistry

Tool registry that provides the MCP-backed tools for api_name.

required
api_name str

Logical MCP server name (e.g. "github"). Used both to filter tools from the registry and to render the system prompt template.

required
checkpointer Any

Optional LangGraph checkpointer. When provided, the compiled graph supports multi-turn / persistent memory.

None
max_iterations int

Maximum agent iterations before the graph terminates with a GraphRecursionError. Forwarded as recursion_limit in the run/stream config dict.

10
Source code in src/api2mcp/orchestration/graphs/reactive.py
class ReactiveGraph(BaseAPIGraph):
    """Single-API reactive (ReAct) agent graph.

    Wraps ``langgraph.prebuilt.create_react_agent`` with an
    API2MCP-specific system prompt and tool set sourced from
    :class:`~api2mcp.orchestration.adapters.registry.MCPToolRegistry`.

    Args:
        model: LangChain chat model for the agent node.
        registry: Tool registry that provides the MCP-backed tools
            for *api_name*.
        api_name: Logical MCP server name (e.g. ``"github"``).  Used
            both to filter tools from the registry and to render the
            system prompt template.
        checkpointer: Optional LangGraph checkpointer.  When provided,
            the compiled graph supports multi-turn / persistent memory.
        max_iterations: Maximum agent iterations before the graph
            terminates with a ``GraphRecursionError``.  Forwarded as
            ``recursion_limit`` in the run/stream config dict.
    """

    def __init__(
        self,
        model: BaseChatModel,
        registry: MCPToolRegistry,
        *,
        api_name: str,
        checkpointer: Any = None,
        max_iterations: int = 10,
    ) -> None:
        self.api_name = api_name
        # super().__init__ calls build_graph(), which needs api_name to exist
        super().__init__(
            model,
            registry,
            checkpointer=checkpointer,
            max_iterations=max_iterations,
        )

    # ------------------------------------------------------------------
    # System prompt
    # ------------------------------------------------------------------

    def _build_system_prompt(self) -> str:
        """Build the system prompt for the ReAct agent.

        Renders an API2MCP-specific template that injects ``{api_name}``
        and ``{available_tools}`` so the LLM understands the context and
        tool namespace.

        Returns:
            Rendered system prompt string.
        """
        tools: list[StructuredTool] = self.registry.get_tools(server=self.api_name)
        tool_names: list[str] = [t.name for t in tools]
        available_tools_str = ", ".join(tool_names) if tool_names else "(none registered)"

        return (
            f"You are an intelligent API assistant for the '{self.api_name}' API.\n"
            f"You have access to the following MCP tools: {available_tools_str}.\n"
            "Use these tools to answer the user's request accurately and concisely.\n"
            "Always prefer tool calls over assumptions when live data is needed.\n"
            "If a tool call fails, explain the error and, where possible, suggest "
            "an alternative approach."
        )

    # ------------------------------------------------------------------
    # BaseAPIGraph implementation
    # ------------------------------------------------------------------

    def build_graph(self) -> Any:
        """Compile the ReAct agent graph via ``create_react_agent``.

        Retrieves tools for :attr:`api_name` from the registry, builds
        the system prompt, and delegates to
        ``langgraph.prebuilt.create_react_agent``.

        Returns:
            Compiled LangGraph graph (``CompiledGraph``).

        Raises:
            ImportError: If ``langgraph`` is not installed.
        """
        if create_react_agent is None:  # pragma: no cover
            raise ImportError(
                "The 'langgraph' package is required for ReactiveGraph. "
                "Install it with: pip install langgraph"
            )

        tools: list[StructuredTool] = self.registry.get_tools(server=self.api_name)
        system_prompt: str = self._build_system_prompt()

        logger.info(
            "ReactiveGraph.build_graph: api_name='%s', tools=%d, "
            "checkpointer=%s, max_iterations=%d",
            self.api_name,
            len(tools),
            type(self.checkpointer).__name__,
            self.max_iterations,
        )

        compiled = create_react_agent(
            model=self.model,
            tools=tools,
            state_modifier=system_prompt,
            checkpointer=self.checkpointer,
        )
        return compiled

    # ------------------------------------------------------------------
    # run / stream — override base to add error recovery logging
    # ------------------------------------------------------------------

    async def run(
        self,
        user_input: str,
        *,
        thread_id: str = "default",
        **kwargs: Any,
    ) -> Any:
        """Invoke the reactive agent and return the final state.

        Delegates to :meth:`BaseAPIGraph.run` and catches residual
        ``asyncio.TimeoutError``, ``ConnectionError``, and ``OSError``
        exceptions that may escape the adapter's retry logic (e.g. if
        all retry attempts are exhausted).

        Args:
            user_input: The user's request / prompt.
            thread_id: Checkpointer thread identifier.
            **kwargs: Extra keys passed through to the graph input dict.

        Returns:
            Final graph state dict from ``ainvoke``.

        Raises:
            asyncio.TimeoutError: Re-raised after logging if the MCP
                server does not respond within the configured timeout
                after all retries.
            ConnectionError: Re-raised after logging on persistent
                connection failure.
            OSError: Re-raised after logging on OS-level I/O failure.
        """
        try:
            return await super().run(user_input, thread_id=thread_id, **kwargs)
        except TimeoutError:
            logger.error(
                "ReactiveGraph.run: MCP timeout for api_name='%s', thread_id='%s'",
                self.api_name,
                thread_id,
            )
            raise
        except (ConnectionError, OSError) as exc:
            logger.error(
                "ReactiveGraph.run: MCP connection error for api_name='%s', "
                "thread_id='%s': %s",
                self.api_name,
                thread_id,
                exc,
            )
            raise

    async def stream(
        self,
        user_input: str,
        *,
        thread_id: str = "default",
        **kwargs: Any,
    ) -> AsyncIterator[dict[str, Any]]:
        """Stream events from the reactive agent.

        Delegates to :meth:`BaseAPIGraph.stream` and propagates events
        as an async generator.  Residual MCP errors are logged before
        re-raising.

        Args:
            user_input: The user's request / prompt.
            thread_id: Checkpointer thread identifier.
            **kwargs: Extra keys passed through to the graph input dict.

        Yields:
            LangGraph event dicts (``astream_events`` v2 format).

        Raises:
            asyncio.TimeoutError: Re-raised after logging.
            ConnectionError: Re-raised after logging.
            OSError: Re-raised after logging.
        """
        try:
            async for event in super().stream(user_input, thread_id=thread_id, **kwargs):
                yield event
        except TimeoutError:
            logger.error(
                "ReactiveGraph.stream: MCP timeout for api_name='%s', thread_id='%s'",
                self.api_name,
                thread_id,
            )
            raise
        except (ConnectionError, OSError) as exc:
            logger.error(
                "ReactiveGraph.stream: MCP connection error for api_name='%s', "
                "thread_id='%s': %s",
                self.api_name,
                thread_id,
                exc,
            )
            raise

Functions

build_graph()

Compile the ReAct agent graph via create_react_agent.

Retrieves tools for :attr:api_name from the registry, builds the system prompt, and delegates to langgraph.prebuilt.create_react_agent.

Returns:

Type Description
Any

Compiled LangGraph graph (CompiledGraph).

Raises:

Type Description
ImportError

If langgraph is not installed.

Source code in src/api2mcp/orchestration/graphs/reactive.py
def build_graph(self) -> Any:
    """Compile the ReAct agent graph via ``create_react_agent``.

    Retrieves tools for :attr:`api_name` from the registry, builds
    the system prompt, and delegates to
    ``langgraph.prebuilt.create_react_agent``.

    Returns:
        Compiled LangGraph graph (``CompiledGraph``).

    Raises:
        ImportError: If ``langgraph`` is not installed.
    """
    if create_react_agent is None:  # pragma: no cover
        raise ImportError(
            "The 'langgraph' package is required for ReactiveGraph. "
            "Install it with: pip install langgraph"
        )

    tools: list[StructuredTool] = self.registry.get_tools(server=self.api_name)
    system_prompt: str = self._build_system_prompt()

    logger.info(
        "ReactiveGraph.build_graph: api_name='%s', tools=%d, "
        "checkpointer=%s, max_iterations=%d",
        self.api_name,
        len(tools),
        type(self.checkpointer).__name__,
        self.max_iterations,
    )

    compiled = create_react_agent(
        model=self.model,
        tools=tools,
        state_modifier=system_prompt,
        checkpointer=self.checkpointer,
    )
    return compiled

run(user_input, *, thread_id='default', **kwargs) async

Invoke the reactive agent and return the final state.

Delegates to :meth:BaseAPIGraph.run and catches residual asyncio.TimeoutError, ConnectionError, and OSError exceptions that may escape the adapter's retry logic (e.g. if all retry attempts are exhausted).

Parameters:

Name Type Description Default
user_input str

The user's request / prompt.

required
thread_id str

Checkpointer thread identifier.

'default'
**kwargs Any

Extra keys passed through to the graph input dict.

{}

Returns:

Type Description
Any

Final graph state dict from ainvoke.

Raises:

Type Description
TimeoutError

Re-raised after logging if the MCP server does not respond within the configured timeout after all retries.

ConnectionError

Re-raised after logging on persistent connection failure.

OSError

Re-raised after logging on OS-level I/O failure.

Source code in src/api2mcp/orchestration/graphs/reactive.py
async def run(
    self,
    user_input: str,
    *,
    thread_id: str = "default",
    **kwargs: Any,
) -> Any:
    """Invoke the reactive agent and return the final state.

    Delegates to :meth:`BaseAPIGraph.run` and catches residual
    ``asyncio.TimeoutError``, ``ConnectionError``, and ``OSError``
    exceptions that may escape the adapter's retry logic (e.g. if
    all retry attempts are exhausted).

    Args:
        user_input: The user's request / prompt.
        thread_id: Checkpointer thread identifier.
        **kwargs: Extra keys passed through to the graph input dict.

    Returns:
        Final graph state dict from ``ainvoke``.

    Raises:
        asyncio.TimeoutError: Re-raised after logging if the MCP
            server does not respond within the configured timeout
            after all retries.
        ConnectionError: Re-raised after logging on persistent
            connection failure.
        OSError: Re-raised after logging on OS-level I/O failure.
    """
    try:
        return await super().run(user_input, thread_id=thread_id, **kwargs)
    except TimeoutError:
        logger.error(
            "ReactiveGraph.run: MCP timeout for api_name='%s', thread_id='%s'",
            self.api_name,
            thread_id,
        )
        raise
    except (ConnectionError, OSError) as exc:
        logger.error(
            "ReactiveGraph.run: MCP connection error for api_name='%s', "
            "thread_id='%s': %s",
            self.api_name,
            thread_id,
            exc,
        )
        raise

stream(user_input, *, thread_id='default', **kwargs) async

Stream events from the reactive agent.

Delegates to :meth:BaseAPIGraph.stream and propagates events as an async generator. Residual MCP errors are logged before re-raising.

Parameters:

Name Type Description Default
user_input str

The user's request / prompt.

required
thread_id str

Checkpointer thread identifier.

'default'
**kwargs Any

Extra keys passed through to the graph input dict.

{}

Yields:

Type Description
AsyncIterator[dict[str, Any]]

LangGraph event dicts (astream_events v2 format).

Raises:

Type Description
TimeoutError

Re-raised after logging.

ConnectionError

Re-raised after logging.

OSError

Re-raised after logging.

Source code in src/api2mcp/orchestration/graphs/reactive.py
async def stream(
    self,
    user_input: str,
    *,
    thread_id: str = "default",
    **kwargs: Any,
) -> AsyncIterator[dict[str, Any]]:
    """Stream events from the reactive agent.

    Delegates to :meth:`BaseAPIGraph.stream` and propagates events
    as an async generator.  Residual MCP errors are logged before
    re-raising.

    Args:
        user_input: The user's request / prompt.
        thread_id: Checkpointer thread identifier.
        **kwargs: Extra keys passed through to the graph input dict.

    Yields:
        LangGraph event dicts (``astream_events`` v2 format).

    Raises:
        asyncio.TimeoutError: Re-raised after logging.
        ConnectionError: Re-raised after logging.
        OSError: Re-raised after logging.
    """
    try:
        async for event in super().stream(user_input, thread_id=thread_id, **kwargs):
            yield event
    except TimeoutError:
        logger.error(
            "ReactiveGraph.stream: MCP timeout for api_name='%s', thread_id='%s'",
            self.api_name,
            thread_id,
        )
        raise
    except (ConnectionError, OSError) as exc:
        logger.error(
            "ReactiveGraph.stream: MCP connection error for api_name='%s', "
            "thread_id='%s': %s",
            self.api_name,
            thread_id,
            exc,
        )
        raise

api2mcp.orchestration.graphs.planner.PlannerGraph

Bases: BaseAPIGraph

Plan-and-execute graph for multi-API workflows.

Builds a :class:~langgraph.graph.StateGraph that:

  1. Calls an LLM to generate an :class:ExecutionStep plan.
  2. Validates the plan for dependency cycles.
  3. Executes steps respecting the configured execution mode.
  4. On failure, asks the LLM to revise the remaining plan.
  5. Synthesises a final human-readable result from all step outputs.

Inherits from :class:~api2mcp.orchestration.graphs.base.BaseAPIGraph.

Parameters:

Name Type Description Default
model BaseChatModel

LangChain chat model used for planning, synthesis, and replanning (must support ainvoke).

required
registry MCPToolRegistry

Populated :class:~api2mcp.orchestration.adapters.registry.MCPToolRegistry.

required
api_names list[str]

Names of MCP servers available for this workflow.

required
execution_mode str

"sequential", "parallel", or "mixed".

'sequential'
checkpointer Any | None

LangGraph checkpointer for persistence (optional).

None
max_iterations int

Guard against infinite replan loops.

20
Source code in src/api2mcp/orchestration/graphs/planner.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
class PlannerGraph(BaseAPIGraph):
    """Plan-and-execute graph for multi-API workflows.

    Builds a :class:`~langgraph.graph.StateGraph` that:

    1. Calls an LLM to generate an :class:`ExecutionStep` plan.
    2. Validates the plan for dependency cycles.
    3. Executes steps respecting the configured execution mode.
    4. On failure, asks the LLM to revise the remaining plan.
    5. Synthesises a final human-readable result from all step outputs.

    Inherits from :class:`~api2mcp.orchestration.graphs.base.BaseAPIGraph`.

    Args:
        model: LangChain chat model used for planning, synthesis, and
            replanning (must support ``ainvoke``).
        registry: Populated
            :class:`~api2mcp.orchestration.adapters.registry.MCPToolRegistry`.
        api_names: Names of MCP servers available for this workflow.
        execution_mode: ``"sequential"``, ``"parallel"``, or ``"mixed"``.
        checkpointer: LangGraph checkpointer for persistence (optional).
        max_iterations: Guard against infinite replan loops.
    """

    def __init__(
        self,
        model: BaseChatModel,
        registry: MCPToolRegistry,
        api_names: list[str],
        execution_mode: str = "sequential",
        checkpointer: Any | None = None,
        max_iterations: int = 20,
    ) -> None:
        # Store planner-specific attributes BEFORE calling super().__init__,
        # because super().__init__ calls build_graph() which needs them.
        self._api_names: list[str] = api_names
        self._execution_mode: str = execution_mode

        super().__init__(
            model,
            registry,
            checkpointer=checkpointer,
            max_iterations=max_iterations,
        )

    # ------------------------------------------------------------------
    # BaseAPIGraph interface
    # ------------------------------------------------------------------

    def build_graph(self) -> Any:
        """Construct and compile the LangGraph StateGraph.

        Called automatically by :meth:`BaseAPIGraph.__init__`.

        Returns:
            Compiled graph (``CompiledGraph``) ready for invocation.
        """
        builder: StateGraph = StateGraph(MultiAPIState)

        builder.add_node("planner_node", self._planner_node)
        builder.add_node("validate_plan_node", self._validate_plan_node)
        builder.add_node("executor_node", self._executor_node)
        builder.add_node("step_complete_node", self._step_complete_node)
        builder.add_node("synthesis_node", self._synthesis_node)
        builder.add_node("replan_node", self._replan_node)

        builder.add_edge(START, "planner_node")
        builder.add_edge("planner_node", "validate_plan_node")
        builder.add_edge("validate_plan_node", "executor_node")
        builder.add_edge("executor_node", "step_complete_node")
        builder.add_edge("synthesis_node", END)
        builder.add_edge("replan_node", "executor_node")

        builder.add_conditional_edges(
            "step_complete_node",
            self._route_after_step,
            {
                "synthesis_node": "synthesis_node",
                "executor_node": "executor_node",
                "replan_node": "replan_node",
                END: END,
            },
        )

        return builder.compile(checkpointer=self.checkpointer)

    # ------------------------------------------------------------------
    # Public run / stream (override base to build correct initial state)
    # ------------------------------------------------------------------

    async def run(
        self,
        user_input: str,
        *,
        thread_id: str = "default",
        **kwargs: Any,
    ) -> MultiAPIState:
        """Execute the planner workflow and return the final state.

        Args:
            user_input: Natural-language request for the workflow to fulfil.
            thread_id: Checkpointer thread identifier (defaults to
                ``"default"``).
            **kwargs: Additional keys merged into the initial state.

        Returns:
            Final :class:`MultiAPIState` after the graph terminates.
        """
        state = self._initial_state(user_input, **kwargs)
        config: RunnableConfig = {
            "configurable": {"thread_id": thread_id},
            "recursion_limit": self.max_iterations,
        }
        logger.info("Starting PlannerGraph run (thread_id=%s)", thread_id)
        final_state: MultiAPIState = await self._graph.ainvoke(state, config=config)
        logger.info(
            "PlannerGraph run complete — status=%s",
            final_state.get("workflow_status"),
        )
        return final_state

    async def stream(  # type: ignore[override]
        self,
        user_input: str,
        *,
        thread_id: str = "default",
        **kwargs: Any,
    ) -> AsyncIterator[dict[str, Any]]:
        """Stream graph state updates as they are emitted by each node.

        Yields one dict per node execution, keyed by node name.

        Args:
            user_input: Natural-language request for the workflow to fulfil.
            thread_id: Checkpointer thread identifier.
            **kwargs: Additional keys merged into the initial state.

        Yields:
            Partial state update dicts emitted after each node completes.
        """
        state = self._initial_state(user_input, **kwargs)
        config: RunnableConfig = {
            "configurable": {"thread_id": thread_id},
            "recursion_limit": self.max_iterations,
        }
        logger.info("Starting PlannerGraph stream (thread_id=%s)", thread_id)
        async for chunk in self._graph.astream(state, config=config):
            yield chunk

    # ------------------------------------------------------------------
    # Routing
    # ------------------------------------------------------------------

    def _route_after_step(
        self,
        state: MultiAPIState,
    ) -> Literal["synthesis_node", "executor_node", "replan_node"] | str:
        """Decide what to do after a step completes or fails.

        Returns:
            Name of the next node or ``END``.
        """
        plan: list[dict[str, Any]] = state.get("execution_plan", [])
        idx: int = state.get("current_step_index", 0)
        iteration: int = state.get("iteration_count", 0)
        max_iter: int = state.get("max_iterations", self.max_iterations)

        if iteration >= max_iter:
            logger.warning("Max iterations (%d) reached — terminating", max_iter)
            return END

        if not plan:
            return "synthesis_node"

        # Check if the step that was just executed failed
        last_executed_idx = idx - 1
        if 0 <= last_executed_idx < len(plan):
            if plan[last_executed_idx].get("status") == ToolCallStatus.FAILED.value:
                return "replan_node"

        # All steps executed?
        if idx >= len(plan):
            return "synthesis_node"

        return "executor_node"

    # ------------------------------------------------------------------
    # Node: planner
    # ------------------------------------------------------------------

    async def _planner_node(self, state: MultiAPIState) -> dict[str, Any]:
        """Call the LLM to produce an initial execution plan.

        Args:
            state: Current graph state.

        Returns:
            Partial state update with the raw plan embedded in messages.
        """
        available_tools = self.registry.registered_tools()
        tools_description = "\n".join(
            f"  - {t}"
            for t in available_tools
            if t.split(":")[0] in self._api_names
        )

        user_messages = [
            m for m in state.get("messages", []) if isinstance(m, HumanMessage)
        ]
        user_request = (
            user_messages[-1].content if user_messages else "No request provided."
        )

        prompt = (
            f"Available APIs: {', '.join(self._api_names)}\n"
            f"Available tools:\n{tools_description}\n\n"
            f"User request: {user_request}"
        )

        messages = [
            SystemMessage(content=_PLANNER_SYSTEM),
            HumanMessage(content=prompt),
        ]

        logger.info("Calling LLM for initial plan generation")
        response: AIMessage = await self.model.ainvoke(messages)

        return {
            "messages": [response],
            "workflow_status": "planning",
            "iteration_count": state.get("iteration_count", 0) + 1,
        }

    # ------------------------------------------------------------------
    # Node: validate_plan
    # ------------------------------------------------------------------

    async def _validate_plan_node(self, state: MultiAPIState) -> dict[str, Any]:
        """Parse and validate the LLM-generated plan.

        Extracts JSON from the last AI message, parses it into
        :class:`ExecutionStep` objects, checks for dependency cycles, and
        stores the validated plan in ``execution_plan``.

        Args:
            state: Current graph state.

        Returns:
            Partial state update with ``execution_plan``,
            ``current_step_index``, ``execution_mode``, and
            ``available_apis`` populated.
        """
        messages = state.get("messages", [])
        ai_messages = [m for m in messages if isinstance(m, AIMessage)]

        if not ai_messages:
            logger.error("No AI message found — cannot parse plan")
            return {
                "errors": ["validate_plan: no AI message found"],
                "workflow_status": "failed",
                "execution_plan": [],
            }

        raw_content = str(ai_messages[-1].content)

        try:
            plan_data: list[dict[str, Any]] = _parse_json_from_llm(raw_content)
        except ValueError as exc:
            logger.error("Failed to parse plan JSON: %s", exc)
            return {
                "errors": [f"validate_plan: JSON parse error — {exc}"],
                "workflow_status": "failed",
                "execution_plan": [],
            }

        steps = [ExecutionStep.from_dict(d) for d in plan_data]

        if _has_cycle(steps):
            logger.error("Dependency cycle detected in execution plan")
            return {
                "errors": ["validate_plan: dependency cycle detected"],
                "workflow_status": "failed",
                "execution_plan": [],
            }

        ordered = _topological_order(steps)
        logger.info("Validated plan with %d step(s)", len(ordered))

        return {
            "execution_plan": [s.to_dict() for s in ordered],
            "current_step_index": 0,
            "execution_mode": self._execution_mode,
            "available_apis": self._api_names,
            "intermediate_results": {},
            "data_mappings": {},
            "workflow_status": "executing",
        }

    # ------------------------------------------------------------------
    # Node: executor
    # ------------------------------------------------------------------

    async def _executor_node(self, state: MultiAPIState) -> dict[str, Any]:
        """Execute the step(s) at the current index.

        Behaviour depends on ``execution_mode``:

        - ``"sequential"``: execute one step at a time.
        - ``"parallel"``: execute all remaining pending steps concurrently
          via :func:`asyncio.gather`.
        - ``"mixed"``: execute independent (dependency-ready) steps in
          parallel, then advance to the next sync point.

        Args:
            state: Current graph state.

        Returns:
            Partial state update with ``intermediate_results`` and the
            updated ``execution_plan``.
        """
        plan: list[dict[str, Any]] = list(state.get("execution_plan", []))
        idx: int = state.get("current_step_index", 0)
        intermediate: dict[str, Any] = dict(state.get("intermediate_results", {}))
        mode: str = state.get("execution_mode", "sequential")

        if idx >= len(plan):
            return {}

        if mode == "parallel":
            results, updated_plan = await self._execute_steps_parallel(
                plan, intermediate
            )
        elif mode == "mixed":
            results, updated_plan = await self._execute_mixed(plan, intermediate)
        else:
            # sequential: execute single step at idx
            step_dict = dict(plan[idx])
            step = ExecutionStep.from_dict(step_dict)
            step, result = await self._run_single_step(step, intermediate)
            updated_plan = list(plan)
            updated_plan[idx] = step.to_dict()
            results = {step.step_id: result} if result is not None else {}

        return {
            "execution_plan": updated_plan,
            "intermediate_results": results,
            "iteration_count": state.get("iteration_count", 0) + 1,
        }

    async def _run_single_step(
        self,
        step: ExecutionStep,
        intermediate: dict[str, Any],
    ) -> tuple[ExecutionStep, Any]:
        """Execute a single step against the registry.

        Resolves variable placeholders, invokes the tool, and updates
        the step's status and result/error fields.

        Args:
            step: The step to execute.
            intermediate: Accumulated intermediate results for variable
                substitution.

        Returns:
            Tuple of (updated_step, raw_result).  ``raw_result`` is ``None``
            on failure.
        """
        step.status = ToolCallStatus.RUNNING
        namespaced_name = f"{step.api}:{step.tool}"
        tool = self.registry.get_tool(namespaced_name)

        if tool is None:
            step.status = ToolCallStatus.FAILED
            step.error = f"Tool '{namespaced_name}' not found in registry"
            logger.error(step.error)
            return step, None

        resolved_args = _substitute_variables(step.arguments, intermediate)
        logger.info("Executing step '%s' → %s", step.step_id, namespaced_name)

        try:
            raw_result = await tool.ainvoke(resolved_args)
            step.status = ToolCallStatus.COMPLETED
            step.result = raw_result
            logger.info("Step '%s' completed successfully", step.step_id)
            return step, raw_result
        except Exception as exc:  # noqa: BLE001
            step.status = ToolCallStatus.FAILED
            step.error = str(exc)
            logger.error("Step '%s' failed: %s", step.step_id, exc)
            return step, None

    async def _execute_steps_parallel(
        self,
        plan: list[dict[str, Any]],
        intermediate: dict[str, Any],
    ) -> tuple[dict[str, Any], list[dict[str, Any]]]:
        """Execute all pending steps concurrently via :func:`asyncio.gather`.

        Args:
            plan: Full execution plan as list of dicts.
            intermediate: Current intermediate results.

        Returns:
            Tuple of (new_results_dict, updated_plan_as_list_of_dicts).
        """
        pending_indices = [
            i
            for i, s in enumerate(plan)
            if s.get("status") == ToolCallStatus.PENDING.value
        ]

        steps_to_run = [ExecutionStep.from_dict(plan[i]) for i in pending_indices]
        tasks = [self._run_single_step(s, intermediate) for s in steps_to_run]
        outcomes: list[tuple[ExecutionStep, Any]] = await asyncio.gather(*tasks)

        updated_plan = list(plan)
        new_results: dict[str, Any] = {}

        for plan_idx, (step, result) in zip(pending_indices, outcomes, strict=False):
            updated_plan[plan_idx] = step.to_dict()
            if result is not None:
                new_results[step.step_id] = result

        return new_results, updated_plan

    async def _execute_mixed(
        self,
        plan: list[dict[str, Any]],
        intermediate: dict[str, Any],
    ) -> tuple[dict[str, Any], list[dict[str, Any]]]:
        """Execute the next "wave" of independent steps in parallel.

        A wave is the maximal set of pending steps whose declared
        dependencies are all already completed.

        Args:
            plan: Full execution plan as list of dicts.
            intermediate: Current intermediate results.

        Returns:
            Tuple of (new_results_dict, updated_plan_as_list_of_dicts).
        """
        completed_ids: set[str] = {
            s["step_id"]
            for s in plan
            if s.get("status") == ToolCallStatus.COMPLETED.value
        }

        wave_indices: list[int] = [
            i
            for i, step_dict in enumerate(plan)
            if step_dict.get("status") == ToolCallStatus.PENDING.value
            and set(step_dict.get("dependencies", [])).issubset(completed_ids)
        ]

        if not wave_indices:
            # No ready steps — fall back to running all pending in parallel
            return await self._execute_steps_parallel(plan, intermediate)

        steps_to_run = [ExecutionStep.from_dict(plan[i]) for i in wave_indices]
        tasks = [self._run_single_step(s, intermediate) for s in steps_to_run]
        outcomes: list[tuple[ExecutionStep, Any]] = await asyncio.gather(*tasks)

        updated_plan = list(plan)
        new_results: dict[str, Any] = {}
        for plan_idx, (step, result) in zip(wave_indices, outcomes, strict=False):
            updated_plan[plan_idx] = step.to_dict()
            if result is not None:
                new_results[step.step_id] = result

        return new_results, updated_plan

    # ------------------------------------------------------------------
    # Node: step_complete
    # ------------------------------------------------------------------

    async def _step_complete_node(self, state: MultiAPIState) -> dict[str, Any]:
        """Advance the step index after execution.

        For sequential mode, increments ``current_step_index`` by one.
        For parallel/mixed, sets the index to the next pending step (or
        past the end if all are done) so the router chooses synthesis.

        Args:
            state: Current graph state.

        Returns:
            Partial state update with the new ``current_step_index``.
        """
        plan: list[dict[str, Any]] = state.get("execution_plan", [])
        idx: int = state.get("current_step_index", 0)
        mode: str = state.get("execution_mode", "sequential")

        if mode == "sequential":
            new_idx = idx + 1
        else:
            new_idx = next(
                (
                    i
                    for i, s in enumerate(plan)
                    if s.get("status") == ToolCallStatus.PENDING.value
                ),
                len(plan),
            )

        return {"current_step_index": new_idx}

    # ------------------------------------------------------------------
    # Node: synthesis
    # ------------------------------------------------------------------

    async def _synthesis_node(self, state: MultiAPIState) -> dict[str, Any]:
        """Synthesise a final result from all step outputs via an LLM call.

        Args:
            state: Current graph state.

        Returns:
            Partial state update with ``final_result`` and
            ``workflow_status = "completed"``.
        """
        user_messages = [
            m for m in state.get("messages", []) if isinstance(m, HumanMessage)
        ]
        user_request = (
            user_messages[0].content if user_messages else "No original request."
        )

        intermediate: dict[str, Any] = state.get("intermediate_results", {})
        plan: list[dict[str, Any]] = state.get("execution_plan", [])

        steps_summary_parts: list[str] = []
        for step_dict in plan:
            sid = step_dict["step_id"]
            desc = step_dict.get("description", "")
            result = intermediate.get(sid, step_dict.get("result", "no result"))
            error = step_dict.get("error")
            if error:
                steps_summary_parts.append(f"Step {sid} ({desc}): FAILED — {error}")
            else:
                steps_summary_parts.append(f"Step {sid} ({desc}): {result}")

        steps_summary = "\n".join(steps_summary_parts) or "No steps were executed."

        prompt = (
            f"Original request: {user_request}\n\n"
            f"Step results:\n{steps_summary}\n\n"
            "Please synthesise a concise summary of what was accomplished."
        )

        messages = [
            SystemMessage(content=_SYNTHESIS_SYSTEM),
            HumanMessage(content=prompt),
        ]

        logger.info("Calling LLM for result synthesis")
        response: AIMessage = await self.model.ainvoke(messages)
        final_result = str(response.content)

        return {
            "messages": [response],
            "final_result": final_result,
            "workflow_status": "completed",
        }

    # ------------------------------------------------------------------
    # Node: replan
    # ------------------------------------------------------------------

    async def _replan_node(self, state: MultiAPIState) -> dict[str, Any]:
        """Ask the LLM to revise the plan after a step failure.

        Sends the current plan, failure details, and the original user
        request to the model, then replaces the execution plan with the
        revised plan.

        Args:
            state: Current graph state.

        Returns:
            Partial state update with a revised ``execution_plan`` and
            ``current_step_index`` reset to the first pending step.
        """
        user_messages = [
            m for m in state.get("messages", []) if isinstance(m, HumanMessage)
        ]
        user_request = (
            user_messages[0].content if user_messages else "No original request."
        )

        plan: list[dict[str, Any]] = state.get("execution_plan", [])
        failed_steps = [
            s for s in plan if s.get("status") == ToolCallStatus.FAILED.value
        ]
        failure_details = "; ".join(
            f"{s['step_id']}: {s.get('error', 'unknown error')}"
            for s in failed_steps
        )

        current_plan_json = json.dumps(plan, indent=2)
        prompt = (
            f"Original request: {user_request}\n\n"
            f"Current plan:\n{current_plan_json}\n\n"
            f"Failures: {failure_details}\n\n"
            "Return a revised JSON array of the remaining steps to complete the request."
        )

        messages_to_send = [
            SystemMessage(content=_REPLAN_SYSTEM),
            HumanMessage(content=prompt),
        ]

        logger.info("Calling LLM to replan after failure")
        response: AIMessage = await self.model.ainvoke(messages_to_send)

        try:
            revised_data: list[dict[str, Any]] = _parse_json_from_llm(
                str(response.content)
            )
            revised_steps = [ExecutionStep.from_dict(d) for d in revised_data]
            if _has_cycle(revised_steps):
                logger.error("Revised plan contains a cycle — aborting replan")
                return {
                    "messages": [response],
                    "errors": ["replan: revised plan contains a dependency cycle"],
                    "workflow_status": "failed",
                }
            ordered = _topological_order(revised_steps)
            new_plan = [s.to_dict() for s in ordered]
        except (ValueError, KeyError) as exc:
            logger.error("Failed to parse revised plan: %s", exc)
            return {
                "messages": [response],
                "errors": [f"replan: JSON parse error — {exc}"],
                "workflow_status": "failed",
            }

        first_pending = next(
            (
                i
                for i, s in enumerate(new_plan)
                if s.get("status") == ToolCallStatus.PENDING.value
            ),
            0,
        )

        return {
            "messages": [response],
            "execution_plan": new_plan,
            "current_step_index": first_pending,
            "iteration_count": state.get("iteration_count", 0) + 1,
        }

    # ------------------------------------------------------------------
    # Helpers
    # ------------------------------------------------------------------

    def _initial_state(self, user_input: str, **kwargs: Any) -> MultiAPIState:
        """Build the initial :class:`MultiAPIState` for a new workflow run.

        Args:
            user_input: The user's natural-language request.
            **kwargs: Additional fields to merge into the state.

        Returns:
            Fully populated initial state dict.
        """
        base: MultiAPIState = {
            "messages": [HumanMessage(content=user_input)],
            "workflow_id": str(uuid.uuid4()),
            "workflow_status": "planning",
            "errors": [],
            "iteration_count": 0,
            "max_iterations": self.max_iterations,
            "available_apis": self._api_names,
            "execution_plan": [],
            "intermediate_results": {},
            "data_mappings": {},
            "current_step_index": 0,
            "execution_mode": self._execution_mode,
            "final_result": None,
        }
        # Allow callers to override individual fields
        base.update(kwargs)  # type: ignore[typeddict-item]
        return base

Functions

build_graph()

Construct and compile the LangGraph StateGraph.

Called automatically by :meth:BaseAPIGraph.__init__.

Returns:

Type Description
Any

Compiled graph (CompiledGraph) ready for invocation.

Source code in src/api2mcp/orchestration/graphs/planner.py
def build_graph(self) -> Any:
    """Construct and compile the LangGraph StateGraph.

    Called automatically by :meth:`BaseAPIGraph.__init__`.

    Returns:
        Compiled graph (``CompiledGraph``) ready for invocation.
    """
    builder: StateGraph = StateGraph(MultiAPIState)

    builder.add_node("planner_node", self._planner_node)
    builder.add_node("validate_plan_node", self._validate_plan_node)
    builder.add_node("executor_node", self._executor_node)
    builder.add_node("step_complete_node", self._step_complete_node)
    builder.add_node("synthesis_node", self._synthesis_node)
    builder.add_node("replan_node", self._replan_node)

    builder.add_edge(START, "planner_node")
    builder.add_edge("planner_node", "validate_plan_node")
    builder.add_edge("validate_plan_node", "executor_node")
    builder.add_edge("executor_node", "step_complete_node")
    builder.add_edge("synthesis_node", END)
    builder.add_edge("replan_node", "executor_node")

    builder.add_conditional_edges(
        "step_complete_node",
        self._route_after_step,
        {
            "synthesis_node": "synthesis_node",
            "executor_node": "executor_node",
            "replan_node": "replan_node",
            END: END,
        },
    )

    return builder.compile(checkpointer=self.checkpointer)

run(user_input, *, thread_id='default', **kwargs) async

Execute the planner workflow and return the final state.

Parameters:

Name Type Description Default
user_input str

Natural-language request for the workflow to fulfil.

required
thread_id str

Checkpointer thread identifier (defaults to "default").

'default'
**kwargs Any

Additional keys merged into the initial state.

{}

Returns:

Name Type Description
Final MultiAPIState

class:MultiAPIState after the graph terminates.

Source code in src/api2mcp/orchestration/graphs/planner.py
async def run(
    self,
    user_input: str,
    *,
    thread_id: str = "default",
    **kwargs: Any,
) -> MultiAPIState:
    """Execute the planner workflow and return the final state.

    Args:
        user_input: Natural-language request for the workflow to fulfil.
        thread_id: Checkpointer thread identifier (defaults to
            ``"default"``).
        **kwargs: Additional keys merged into the initial state.

    Returns:
        Final :class:`MultiAPIState` after the graph terminates.
    """
    state = self._initial_state(user_input, **kwargs)
    config: RunnableConfig = {
        "configurable": {"thread_id": thread_id},
        "recursion_limit": self.max_iterations,
    }
    logger.info("Starting PlannerGraph run (thread_id=%s)", thread_id)
    final_state: MultiAPIState = await self._graph.ainvoke(state, config=config)
    logger.info(
        "PlannerGraph run complete — status=%s",
        final_state.get("workflow_status"),
    )
    return final_state

stream(user_input, *, thread_id='default', **kwargs) async

Stream graph state updates as they are emitted by each node.

Yields one dict per node execution, keyed by node name.

Parameters:

Name Type Description Default
user_input str

Natural-language request for the workflow to fulfil.

required
thread_id str

Checkpointer thread identifier.

'default'
**kwargs Any

Additional keys merged into the initial state.

{}

Yields:

Type Description
AsyncIterator[dict[str, Any]]

Partial state update dicts emitted after each node completes.

Source code in src/api2mcp/orchestration/graphs/planner.py
async def stream(  # type: ignore[override]
    self,
    user_input: str,
    *,
    thread_id: str = "default",
    **kwargs: Any,
) -> AsyncIterator[dict[str, Any]]:
    """Stream graph state updates as they are emitted by each node.

    Yields one dict per node execution, keyed by node name.

    Args:
        user_input: Natural-language request for the workflow to fulfil.
        thread_id: Checkpointer thread identifier.
        **kwargs: Additional keys merged into the initial state.

    Yields:
        Partial state update dicts emitted after each node completes.
    """
    state = self._initial_state(user_input, **kwargs)
    config: RunnableConfig = {
        "configurable": {"thread_id": thread_id},
        "recursion_limit": self.max_iterations,
    }
    logger.info("Starting PlannerGraph stream (thread_id=%s)", thread_id)
    async for chunk in self._graph.astream(state, config=config):
        yield chunk

api2mcp.orchestration.graphs.conversational.ConversationalGraph

Bases: BaseAPIGraph

Multi-turn conversational agent with human-in-the-loop support.

Builds a custom :class:~langgraph.graph.StateGraph over :class:~api2mcp.orchestration.state.definitions.ConversationalState with four nodes:

  • agent — calls the LLM with memory-filtered messages.
  • tools — executes non-destructive tool calls from the agent.
  • clarify — surfaces a clarification question to the caller.
  • approve — pauses via interrupt() for user approval before executing a destructive tool.

Parameters:

Name Type Description Default
model BaseChatModel

LangChain chat model used by the agent node.

required
registry MCPToolRegistry

Tool registry that provides MCP-backed :class:~langchain_core.tools.StructuredTool instances.

required
api_names list[str] | None

Optional list of MCP server names to expose as tools. When None, all registered tools are made available.

None
memory_strategy str

One of "window", "summary", or "full". Defaults to "window".

'window'
max_history int

Maximum number of non-system messages to keep when the "window" or "summary" strategy is active. Defaults to 20.

20
checkpointer Any

LangGraph checkpointer required for session persistence and human-in-the-loop interrupt/resume support. Recommended: pass a MemorySaver or AsyncSqliteSaver.

None
max_iterations int

Upper bound on agent iterations, forwarded as recursion_limit in the run/stream config. Defaults to 50.

50
Source code in src/api2mcp/orchestration/graphs/conversational.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
class ConversationalGraph(BaseAPIGraph):
    """Multi-turn conversational agent with human-in-the-loop support.

    Builds a custom :class:`~langgraph.graph.StateGraph` over
    :class:`~api2mcp.orchestration.state.definitions.ConversationalState`
    with four nodes:

    - ``agent`` — calls the LLM with memory-filtered messages.
    - ``tools`` — executes non-destructive tool calls from the agent.
    - ``clarify`` — surfaces a clarification question to the caller.
    - ``approve`` — pauses via ``interrupt()`` for user approval before
      executing a destructive tool.

    Args:
        model: LangChain chat model used by the agent node.
        registry: Tool registry that provides MCP-backed
            :class:`~langchain_core.tools.StructuredTool` instances.
        api_names: Optional list of MCP server names to expose as tools.
            When ``None``, all registered tools are made available.
        memory_strategy: One of ``"window"``, ``"summary"``, or ``"full"``.
            Defaults to ``"window"``.
        max_history: Maximum number of non-system messages to keep when
            the ``"window"`` or ``"summary"`` strategy is active.
            Defaults to ``20``.
        checkpointer: LangGraph checkpointer required for session
            persistence and human-in-the-loop interrupt/resume support.
            Recommended: pass a ``MemorySaver`` or ``AsyncSqliteSaver``.
        max_iterations: Upper bound on agent iterations, forwarded as
            ``recursion_limit`` in the run/stream config.  Defaults to
            ``50``.
    """

    def __init__(
        self,
        model: BaseChatModel,
        registry: MCPToolRegistry,
        *,
        api_names: list[str] | None = None,
        memory_strategy: str = "window",
        max_history: int = 20,
        checkpointer: Any = None,
        max_iterations: int = 50,
    ) -> None:
        # Store conversational-specific attributes BEFORE super().__init__
        # because build_graph() is called inside super().__init__.
        self._api_names = api_names
        self._memory_strategy = memory_strategy
        self._max_history = max_history
        super().__init__(
            model,
            registry,
            checkpointer=checkpointer,
            max_iterations=max_iterations,
        )

    # ------------------------------------------------------------------
    # BaseAPIGraph implementation
    # ------------------------------------------------------------------

    def build_graph(self) -> Any:
        """Compile and return the conversational LangGraph graph.

        Constructs a :class:`~langgraph.graph.StateGraph` over
        :class:`~api2mcp.orchestration.state.definitions.ConversationalState`
        with ``agent``, ``tools``, ``clarify``, and ``approve`` nodes, then
        compiles it with the configured checkpointer.

        Returns:
            Compiled LangGraph ``CompiledGraph`` instance.
        """
        graph: StateGraph = StateGraph(ConversationalState)

        # Register nodes
        graph.add_node("agent", self._agent_node)
        graph.add_node("tools", self._tool_node)
        graph.add_node("clarify", self._clarification_node)
        graph.add_node("approve", self._approval_node)

        # Edges
        graph.add_edge(START, "agent")
        graph.add_conditional_edges("agent", self._route_agent_output)
        graph.add_edge("tools", "agent")
        graph.add_edge("clarify", END)
        graph.add_edge("approve", "tools")

        compiled = graph.compile(checkpointer=self.checkpointer)

        tool_count = len(self._get_tools())
        logger.info(
            "ConversationalGraph.build_graph: api_names=%s, tools=%d, "
            "memory_strategy=%r, max_history=%d, checkpointer=%s, "
            "max_iterations=%d",
            self._api_names,
            tool_count,
            self._memory_strategy,
            self._max_history,
            type(self.checkpointer).__name__,
            self.max_iterations,
        )
        return compiled

    # ------------------------------------------------------------------
    # Tool helpers
    # ------------------------------------------------------------------

    def _get_tools(self) -> list[Any]:
        """Return the StructuredTool list for this graph's API scope.

        When ``api_names`` is set, tools are fetched per-server and
        combined.  When ``None``, all registered tools are returned.

        Returns:
            List of :class:`~langchain_core.tools.StructuredTool` objects.
        """
        if self._api_names:
            tools: list[Any] = []
            for api in self._api_names:
                tools.extend(self.registry.get_tools(server=api))
            return tools
        return self.registry.get_tools()

    def _build_system_prompt(self) -> str:
        """Build the system prompt that is prepended to every LLM call.

        Returns:
            Rendered system prompt string.
        """
        tools = self._get_tools()
        tool_names = [t.name for t in tools]
        available_tools_str = (
            ", ".join(tool_names) if tool_names else "(none registered)"
        )
        apis_str = (
            ", ".join(self._api_names)
            if self._api_names
            else "all registered APIs"
        )
        return (
            f"You are an intelligent conversational API assistant with access to: {apis_str}.\n"
            f"Available MCP tools: {available_tools_str}.\n"
            "You can ask the user for clarification when needed — simply phrase your response as a question.\n"
            "For destructive operations (delete, remove, drop, destroy, purge, reset), "
            "always use the corresponding tool so that approval can be requested.\n"
            "Keep responses concise and accurate."
        )

    # ------------------------------------------------------------------
    # Destructive tool detection
    # ------------------------------------------------------------------

    def _is_destructive(self, tool_name: str) -> bool:
        """Return ``True`` if *tool_name* represents a destructive operation.

        Only the tool-local part (after the colon namespace) is inspected.

        Args:
            tool_name: Fully-qualified tool name (e.g. ``"github:delete_issue"``
                or simply ``"delete_issue"``).

        Returns:
            ``True`` when the local name starts with any of the
            :data:`_DESTRUCTIVE_PREFIXES`.

        Examples::

            >>> graph._is_destructive("github:delete_issue")
            True
            >>> graph._is_destructive("github:list_issues")
            False
        """
        tool_part = tool_name.split(":")[-1].lower()
        return any(tool_part.startswith(p) for p in _DESTRUCTIVE_PREFIXES)

    # ------------------------------------------------------------------
    # Memory strategies
    # ------------------------------------------------------------------

    def _apply_memory_strategy(
        self,
        messages: list[BaseMessage],
        strategy: str,
        max_history: int,
    ) -> list[BaseMessage]:
        """Filter *messages* according to *strategy* before an LLM call.

        Three strategies are supported:

        ``"window"``
            Keep all :class:`~langchain_core.messages.SystemMessage` objects
            plus the most recent *max_history* non-system messages.

        ``"summary"``
            Identical to ``"window"`` for now (async summarisation is
            performed externally; this layer applies truncation as a
            fallback so the context window is never exceeded).

        ``"full"``
            Return the complete message list unchanged.

        Args:
            messages: The full conversation history.
            strategy: One of ``"window"``, ``"summary"``, or ``"full"``.
            max_history: Window size for ``"window"`` and ``"summary"``.

        Returns:
            Filtered list of messages appropriate for the next LLM call.
        """
        if strategy == "full":
            return list(messages)

        if strategy in ("window", "summary"):
            system_msgs = [m for m in messages if isinstance(m, SystemMessage)]
            other_msgs = [m for m in messages if not isinstance(m, SystemMessage)]
            truncated = other_msgs[-max_history:] if max_history > 0 else []
            return system_msgs + truncated

        # Unknown strategy — fall back to full
        logger.warning(
            "ConversationalGraph: unknown memory_strategy=%r; falling back to 'full'",
            strategy,
        )
        return list(messages)

    # ------------------------------------------------------------------
    # Routing
    # ------------------------------------------------------------------

    def _route_agent_output(
        self,
        state: ConversationalState,
    ) -> Literal["approve", "clarify", "tools", "__end__"]:
        """Inspect the last AI message and decide which node to visit next.

        Routing rules (evaluated in order):

        1. If the last message has tool calls and **any** tool is destructive
           → ``"approve"``
        2. If the last message has no tool calls and its text content
           contains ``"?"`` → ``"clarify"``
        3. If the last message has tool calls (non-destructive) → ``"tools"``
        4. Otherwise → :data:`~langgraph.graph.END` (``"__end__"``)

        Args:
            state: Current :class:`~api2mcp.orchestration.state.definitions.ConversationalState`.

        Returns:
            The name of the next node, or ``END``.
        """
        messages: Sequence[BaseMessage] = state.get("messages", [])
        if not messages:
            return END  # type: ignore[return-value]

        last: BaseMessage = messages[-1]

        if not isinstance(last, AIMessage):
            return END  # type: ignore[return-value]

        tool_calls: list[dict[str, Any]] = getattr(last, "tool_calls", []) or []

        if tool_calls:
            # Check for destructive operations first
            for tc in tool_calls:
                tc_name: str = tc.get("name", "")
                if self._is_destructive(tc_name):
                    logger.debug(
                        "ConversationalGraph._route_agent_output: "
                        "destructive tool detected=%r → approve",
                        tc_name,
                    )
                    return "approve"
            # Non-destructive tool calls
            return "tools"

        # No tool calls — check for clarification question
        content = last.content if isinstance(last.content, str) else ""
        if "?" in content:
            logger.debug(
                "ConversationalGraph._route_agent_output: "
                "clarification question detected → clarify"
            )
            return "clarify"

        return END  # type: ignore[return-value]

    # ------------------------------------------------------------------
    # Graph nodes
    # ------------------------------------------------------------------

    async def _agent_node(
        self,
        state: ConversationalState,
    ) -> dict[str, Any]:
        """Call the LLM with memory-filtered messages.

        Prepends the system prompt (if not already present), applies the
        configured memory strategy to trim history, then invokes the model.
        The AI response is appended to the ``messages`` list via the
        ``add_messages`` reducer.

        Args:
            state: Current graph state.

        Returns:
            State update dict with ``"messages"`` containing the new
            :class:`~langchain_core.messages.AIMessage`.
        """
        messages: list[BaseMessage] = list(state.get("messages", []))

        # Ensure there is a system message at position 0
        has_system = any(isinstance(m, SystemMessage) for m in messages)
        if not has_system:
            messages = [SystemMessage(content=self._build_system_prompt())] + messages

        # Apply memory strategy before calling the LLM
        filtered = self._apply_memory_strategy(
            messages,
            state.get("memory_strategy", self._memory_strategy),
            state.get("max_history", self._max_history),
        )

        # Bind tools to the model if any are available
        tools = self._get_tools()
        bound_model = self.model.bind_tools(tools) if tools else self.model

        logger.debug(
            "ConversationalGraph._agent_node: calling model with %d messages "
            "(after memory filter from %d total)",
            len(filtered),
            len(messages),
        )

        ai_message: AIMessage = await bound_model.ainvoke(filtered)

        iteration_count: int = state.get("iteration_count", 0) + 1

        return {
            "messages": [ai_message],
            "iteration_count": iteration_count,
            "workflow_status": "executing",
        }

    async def _tool_node(
        self,
        state: ConversationalState,
    ) -> dict[str, Any]:
        """Execute non-destructive tool calls from the last AI message.

        Iterates over ``tool_calls`` in the last
        :class:`~langchain_core.messages.AIMessage`, resolves each tool
        from the registry, invokes it asynchronously, and collects the
        results as :class:`~langchain_core.messages.ToolMessage` objects.

        Args:
            state: Current graph state.

        Returns:
            State update dict with ``"messages"`` containing one
            :class:`~langchain_core.messages.ToolMessage` per tool call.
        """
        messages: list[BaseMessage] = list(state.get("messages", []))
        if not messages:
            return {"messages": []}

        last = messages[-1]
        if not isinstance(last, AIMessage):
            return {"messages": []}

        tool_calls: list[dict[str, Any]] = getattr(last, "tool_calls", []) or []
        tool_messages: list[ToolMessage] = []

        for tc in tool_calls:
            tool_name: str = tc.get("name", "")
            tool_args: dict[str, Any] = tc.get("args", {})
            tool_id: str = tc.get("id", tool_name)

            logger.debug(
                "ConversationalGraph._tool_node: invoking tool=%r args=%r",
                tool_name,
                tool_args,
            )

            try:
                tool = self.registry.get_tool(tool_name)
                if tool is None:
                    result_content = f"Error: tool '{tool_name}' not found in registry."
                else:
                    result_content = str(await tool.ainvoke(tool_args))
            except Exception as exc:  # noqa: BLE001
                logger.error(
                    "ConversationalGraph._tool_node: tool=%r raised %s: %s",
                    tool_name,
                    type(exc).__name__,
                    exc,
                )
                result_content = f"Error executing tool '{tool_name}': {exc}"

            tool_messages.append(
                ToolMessage(
                    content=result_content,
                    tool_call_id=tool_id,
                    name=tool_name,
                )
            )

        return {"messages": tool_messages}

    async def _clarification_node(
        self,
        state: ConversationalState,
    ) -> dict[str, Any]:
        """Handle a clarification request from the agent.

        Sets ``conversation_mode`` to ``"waiting_clarification"`` and
        returns the state update.  The graph then transitions to ``END``
        so the caller can surface the question, collect the human answer,
        and resume the conversation with the next ``run()`` call.

        Args:
            state: Current graph state.

        Returns:
            State update dict with ``conversation_mode`` set.
        """
        logger.debug("ConversationalGraph._clarification_node: waiting for user clarification")
        return {
            "conversation_mode": "waiting_clarification",
            "workflow_status": "executing",
        }

    async def _approval_node(
        self,
        state: ConversationalState,
    ) -> dict[str, Any]:
        """Pause for user approval before executing a destructive tool.

        Sets ``conversation_mode`` to ``"waiting_approval"``, then calls
        ``interrupt()`` with a payload describing the pending action.  The
        graph will pause here until resumed via a
        ``Command(resume=True)`` (approve) or ``Command(resume=False)``
        (reject).

        If the user approves, the graph proceeds to the ``tools`` node.
        If the user rejects, the tool call is removed from
        ``pending_actions`` and the graph still proceeds to ``tools``
        (which will find no matching tool calls to execute for the
        rejected action).

        Args:
            state: Current graph state.

        Returns:
            State update dict after the interrupt is resolved.

        Raises:
            ImportError: If ``langgraph`` is not installed or does not
                expose ``langgraph.types.interrupt``.
        """
        if interrupt is None:  # pragma: no cover
            raise ImportError(
                "LangGraph 1.0+ is required for human-in-the-loop support. "
                "Install it with: pip install 'langgraph>=1.0.0'"
            )

        messages: list[BaseMessage] = list(state.get("messages", []))
        last = messages[-1] if messages else None

        # Collect pending action descriptions from the last AI message
        tool_calls: list[dict[str, Any]] = []
        if isinstance(last, AIMessage):
            tool_calls = getattr(last, "tool_calls", []) or []

        destructive_calls = [
            tc for tc in tool_calls if self._is_destructive(tc.get("name", ""))
        ]

        action_desc = (
            destructive_calls[0].get("name", "unknown action")
            if destructive_calls
            else "unknown action"
        )

        logger.debug(
            "ConversationalGraph._approval_node: requesting approval for action=%r",
            action_desc,
        )

        # Pause and wait for human resume signal
        approved: Any = interrupt({"action": action_desc, "tool_calls": destructive_calls})

        pending_actions: list[dict[str, Any]] = list(state.get("pending_actions", []))

        if approved:
            logger.debug(
                "ConversationalGraph._approval_node: action approved, proceeding to tools"
            )
            return {
                "conversation_mode": "active",
                "pending_actions": pending_actions,
                "workflow_status": "executing",
            }
        else:
            logger.debug(
                "ConversationalGraph._approval_node: action rejected by user"
            )
            # Remove the rejected action from pending if tracked
            return {
                "conversation_mode": "active",
                "pending_actions": [
                    a for a in pending_actions if a.get("name") != action_desc
                ],
                "workflow_status": "executing",
            }

Functions

build_graph()

Compile and return the conversational LangGraph graph.

Constructs a :class:~langgraph.graph.StateGraph over :class:~api2mcp.orchestration.state.definitions.ConversationalState with agent, tools, clarify, and approve nodes, then compiles it with the configured checkpointer.

Returns:

Type Description
Any

Compiled LangGraph CompiledGraph instance.

Source code in src/api2mcp/orchestration/graphs/conversational.py
def build_graph(self) -> Any:
    """Compile and return the conversational LangGraph graph.

    Constructs a :class:`~langgraph.graph.StateGraph` over
    :class:`~api2mcp.orchestration.state.definitions.ConversationalState`
    with ``agent``, ``tools``, ``clarify``, and ``approve`` nodes, then
    compiles it with the configured checkpointer.

    Returns:
        Compiled LangGraph ``CompiledGraph`` instance.
    """
    graph: StateGraph = StateGraph(ConversationalState)

    # Register nodes
    graph.add_node("agent", self._agent_node)
    graph.add_node("tools", self._tool_node)
    graph.add_node("clarify", self._clarification_node)
    graph.add_node("approve", self._approval_node)

    # Edges
    graph.add_edge(START, "agent")
    graph.add_conditional_edges("agent", self._route_agent_output)
    graph.add_edge("tools", "agent")
    graph.add_edge("clarify", END)
    graph.add_edge("approve", "tools")

    compiled = graph.compile(checkpointer=self.checkpointer)

    tool_count = len(self._get_tools())
    logger.info(
        "ConversationalGraph.build_graph: api_names=%s, tools=%d, "
        "memory_strategy=%r, max_history=%d, checkpointer=%s, "
        "max_iterations=%d",
        self._api_names,
        tool_count,
        self._memory_strategy,
        self._max_history,
        type(self.checkpointer).__name__,
        self.max_iterations,
    )
    return compiled