This is page 13 of 21. Use http://codebase.md/sparesparrow/mcp-project-orchestrator?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .cursorrules
├── .env.example
├── .github
│ └── workflows
│ ├── build.yml
│ ├── ci-cd.yml
│ ├── ci.yml
│ ├── deploy.yml
│ ├── ecosystem-monitor.yml
│ ├── fan-out-orchestrator.yml
│ └── release.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AUTOMOTIVE_CAMERA_SYSTEM_SUMMARY.md
├── automotive-camera-system
│ ├── docs
│ │ └── IMPLEMENTACE_CS.md
│ └── README.md
├── AWS_MCP_IMPLEMENTATION_SUMMARY.md
├── AWS_MCP_QUICKSTART.md
├── AWS_SIP_TRUNK_DEPLOYMENT_COMPLETE.md
├── aws-sip-trunk
│ ├── .gitignore
│ ├── config
│ │ ├── extensions.conf.j2
│ │ └── pjsip.conf.j2
│ ├── DEPLOYMENT_SUMMARY.md
│ ├── docs
│ │ ├── DEPLOYMENT.md
│ │ └── TROUBLESHOOTING.md
│ ├── PROJECT_INDEX.md
│ ├── pyproject.toml
│ ├── QUICKSTART.md
│ ├── README.md
│ ├── scripts
│ │ ├── deploy-asterisk-aws.sh
│ │ └── user-data.sh
│ ├── terraform
│ │ ├── ec2.tf
│ │ ├── main.tf
│ │ ├── monitoring.tf
│ │ ├── networking.tf
│ │ ├── outputs.tf
│ │ ├── storage.tf
│ │ ├── terraform.tfvars.example
│ │ └── variables.tf
│ ├── tests
│ │ └── test_sip_connectivity.py
│ └── VERIFICATION_CHECKLIST.md
├── CLAUDE.md
├── component_templates.json
├── conanfile.py
├── config
│ ├── default.json
│ └── project_orchestration.json
├── Containerfile
├── cursor-templates
│ └── openssl
│ ├── linux-dev.mdc.jinja2
│ └── shared.mdc.jinja2
├── data
│ └── prompts
│ └── templates
│ ├── advanced-multi-server-template.json
│ ├── analysis-assistant.json
│ ├── analyze-mermaid-diagram.json
│ ├── architecture-design-assistant.json
│ ├── code-diagram-documentation-creator.json
│ ├── code-refactoring-assistant.json
│ ├── code-review-assistant.json
│ ├── collaborative-development.json
│ ├── consolidated-interfaces-template.json
│ ├── could-you-interpret-the-assumed-applicat.json
│ ├── data-analysis-template.json
│ ├── database-query-assistant.json
│ ├── debugging-assistant.json
│ ├── development-system-prompt-zcna0.json
│ ├── development-system-prompt.json
│ ├── development-workflow.json
│ ├── docker-compose-prompt-combiner.json
│ ├── docker-containerization-guide.json
│ ├── docker-mcp-servers-orchestration.json
│ ├── foresight-assistant.json
│ ├── generate-different-types-of-questions-ab.json
│ ├── generate-mermaid-diagram.json
│ ├── image-1-describe-the-icon-in-one-sen.json
│ ├── initialize-project-setup-for-a-new-micro.json
│ ├── install-dependencies-build-run-test.json
│ ├── mcp-code-generator.json
│ ├── mcp-integration-assistant.json
│ ├── mcp-resources-explorer.json
│ ├── mcp-resources-integration.json
│ ├── mcp-server-configurator.json
│ ├── mcp-server-dev-prompt-combiner.json
│ ├── mcp-server-integration-template.json
│ ├── mcp-template-system.json
│ ├── mermaid-analysis-expert.json
│ ├── mermaid-class-diagram-generator.json
│ ├── mermaid-diagram-generator.json
│ ├── mermaid-diagram-modifier.json
│ ├── modify-mermaid-diagram.json
│ ├── monorepo-migration-guide.json
│ ├── multi-resource-context.json
│ ├── project-analysis-assistant.json
│ ├── prompt-combiner-interface.json
│ ├── prompt-templates.json
│ ├── repository-explorer.json
│ ├── research-assistant.json
│ ├── sequential-data-analysis.json
│ ├── solid-code-analysis-visualizer.json
│ ├── task-list-helper-8ithy.json
│ ├── template-based-mcp-integration.json
│ ├── templates.json
│ ├── test-prompt.json
│ └── you-are-limited-to-respond-yes-or-no-onl.json
├── docs
│ ├── AWS_MCP.md
│ ├── AWS.md
│ ├── CONAN.md
│ └── integration.md
├── elevenlabs-agents
│ ├── agent-prompts.json
│ └── README.md
├── IMPLEMENTATION_STATUS.md
├── integration_plan.md
├── LICENSE
├── MANIFEST.in
├── mcp-project-orchestrator
│ └── openssl
│ ├── .github
│ │ └── workflows
│ │ └── validate-cursor-config.yml
│ ├── conanfile.py
│ ├── CURSOR_DEPLOYMENT_POLISH.md
│ ├── cursor-rules
│ │ ├── mcp.json.jinja2
│ │ ├── prompts
│ │ │ ├── fips-compliance.md.jinja2
│ │ │ ├── openssl-coding-standards.md.jinja2
│ │ │ └── pr-review.md.jinja2
│ │ └── rules
│ │ ├── ci-linux.mdc.jinja2
│ │ ├── linux-dev.mdc.jinja2
│ │ ├── macos-dev.mdc.jinja2
│ │ ├── shared.mdc.jinja2
│ │ └── windows-dev.mdc.jinja2
│ ├── docs
│ │ └── cursor-configuration-management.md
│ ├── examples
│ │ └── example-workspace
│ │ ├── .cursor
│ │ │ ├── mcp.json
│ │ │ └── rules
│ │ │ ├── linux-dev.mdc
│ │ │ └── shared.mdc
│ │ ├── .gitignore
│ │ ├── CMakeLists.txt
│ │ ├── conanfile.py
│ │ ├── profiles
│ │ │ ├── linux-gcc-debug.profile
│ │ │ └── linux-gcc-release.profile
│ │ ├── README.md
│ │ └── src
│ │ ├── crypto_utils.cpp
│ │ ├── crypto_utils.h
│ │ └── main.cpp
│ ├── IMPLEMENTATION_SUMMARY.md
│ ├── mcp_orchestrator
│ │ ├── __init__.py
│ │ ├── cli.py
│ │ ├── conan_integration.py
│ │ ├── cursor_config.py
│ │ ├── cursor_deployer.py
│ │ ├── deploy_cursor.py
│ │ ├── env_config.py
│ │ ├── platform_detector.py
│ │ └── yaml_validator.py
│ ├── openssl-cursor-example-workspace-20251014_121133.zip
│ ├── pyproject.toml
│ ├── README.md
│ ├── requirements.txt
│ ├── scripts
│ │ └── create_example_workspace.py
│ ├── setup.py
│ ├── test_deployment.py
│ └── tests
│ ├── __init__.py
│ ├── test_cursor_deployer.py
│ └── test_template_validation.py
├── printcast-agent
│ ├── .env.example
│ ├── config
│ │ └── asterisk
│ │ └── extensions.conf
│ ├── Containerfile
│ ├── docker-compose.yml
│ ├── pyproject.toml
│ ├── README.md
│ ├── scripts
│ │ └── docker-entrypoint.sh
│ ├── src
│ │ ├── integrations
│ │ │ ├── __init__.py
│ │ │ ├── asterisk.py
│ │ │ ├── content.py
│ │ │ ├── delivery.py
│ │ │ ├── elevenlabs.py
│ │ │ └── printing.py
│ │ ├── mcp_server
│ │ │ ├── __init__.py
│ │ │ ├── main.py
│ │ │ └── server.py
│ │ └── orchestration
│ │ ├── __init__.py
│ │ └── workflow.py
│ └── tests
│ └── test_mcp_server.py
├── project_orchestration.json
├── project_templates.json
├── pyproject.toml
├── README.md
├── REFACTORING_COMPLETED.md
├── REFACTORING_RECOMMENDATIONS.md
├── requirements.txt
├── scripts
│ ├── archive
│ │ ├── init_claude_test.sh
│ │ ├── init_postgres.sh
│ │ ├── start_mcp_servers.sh
│ │ └── test_claude_desktop.sh
│ ├── consolidate_mermaid.py
│ ├── consolidate_prompts.py
│ ├── consolidate_resources.py
│ ├── consolidate_templates.py
│ ├── INSTRUCTIONS.md
│ ├── README.md
│ ├── setup_aws_mcp.sh
│ ├── setup_mcp.sh
│ ├── setup_orchestrator.sh
│ ├── setup_project.py
│ └── test_mcp.sh
├── src
│ └── mcp_project_orchestrator
│ ├── __init__.py
│ ├── __main__.py
│ ├── aws_mcp.py
│ ├── cli
│ │ └── __init__.py
│ ├── cli.py
│ ├── commands
│ │ └── openssl_cli.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── config.py
│ │ ├── exceptions.py
│ │ ├── fastmcp.py
│ │ ├── logging.py
│ │ └── managers.py
│ ├── cursor_deployer.py
│ ├── ecosystem_monitor.py
│ ├── fan_out_orchestrator.py
│ ├── fastmcp.py
│ ├── mcp-py
│ │ ├── AggregateVersions.py
│ │ ├── CustomBashTool.py
│ │ ├── FileAnnotator.py
│ │ ├── mcp-client.py
│ │ ├── mcp-server.py
│ │ ├── MermaidDiagramGenerator.py
│ │ ├── NamingAgent.py
│ │ └── solid-analyzer-agent.py
│ ├── mermaid
│ │ ├── __init__.py
│ │ ├── generator.py
│ │ ├── mermaid_orchestrator.py
│ │ ├── renderer.py
│ │ ├── templates
│ │ │ ├── AbstractFactory-diagram.json
│ │ │ ├── Adapter-diagram.json
│ │ │ ├── Analyze_Mermaid_Diagram.json
│ │ │ ├── Builder-diagram.json
│ │ │ ├── Chain-diagram.json
│ │ │ ├── Code_Diagram_Documentation_Creator.json
│ │ │ ├── Command-diagram.json
│ │ │ ├── Decorator-diagram.json
│ │ │ ├── Facade-diagram.json
│ │ │ ├── Factory-diagram.json
│ │ │ ├── flowchart
│ │ │ │ ├── AbstractFactory-diagram.json
│ │ │ │ ├── Adapter-diagram.json
│ │ │ │ ├── Analyze_Mermaid_Diagram.json
│ │ │ │ ├── Builder-diagram.json
│ │ │ │ ├── Chain-diagram.json
│ │ │ │ ├── Code_Diagram_Documentation_Creator.json
│ │ │ │ ├── Command-diagram.json
│ │ │ │ ├── Decorator-diagram.json
│ │ │ │ ├── Facade-diagram.json
│ │ │ │ ├── Factory-diagram.json
│ │ │ │ ├── Generate_Mermaid_Diagram.json
│ │ │ │ ├── generated_diagram.json
│ │ │ │ ├── integration.json
│ │ │ │ ├── Iterator-diagram.json
│ │ │ │ ├── Mediator-diagram.json
│ │ │ │ ├── Memento-diagram.json
│ │ │ │ ├── Mermaid_Analysis_Expert.json
│ │ │ │ ├── Mermaid_Class_Diagram_Generator.json
│ │ │ │ ├── Mermaid_Diagram_Generator.json
│ │ │ │ ├── Mermaid_Diagram_Modifier.json
│ │ │ │ ├── Modify_Mermaid_Diagram.json
│ │ │ │ ├── Observer-diagram.json
│ │ │ │ ├── Prototype-diagram.json
│ │ │ │ ├── Proxy-diagram.json
│ │ │ │ ├── README.json
│ │ │ │ ├── Singleton-diagram.json
│ │ │ │ ├── State-diagram.json
│ │ │ │ ├── Strategy-diagram.json
│ │ │ │ ├── TemplateMethod-diagram.json
│ │ │ │ ├── theme_dark.json
│ │ │ │ ├── theme_default.json
│ │ │ │ ├── theme_pastel.json
│ │ │ │ ├── theme_vibrant.json
│ │ │ │ └── Visitor-diagram.json
│ │ │ ├── Generate_Mermaid_Diagram.json
│ │ │ ├── generated_diagram.json
│ │ │ ├── index.json
│ │ │ ├── integration.json
│ │ │ ├── Iterator-diagram.json
│ │ │ ├── Mediator-diagram.json
│ │ │ ├── Memento-diagram.json
│ │ │ ├── Mermaid_Analysis_Expert.json
│ │ │ ├── Mermaid_Class_Diagram_Generator.json
│ │ │ ├── Mermaid_Diagram_Generator.json
│ │ │ ├── Mermaid_Diagram_Modifier.json
│ │ │ ├── Modify_Mermaid_Diagram.json
│ │ │ ├── Observer-diagram.json
│ │ │ ├── Prototype-diagram.json
│ │ │ ├── Proxy-diagram.json
│ │ │ ├── README.json
│ │ │ ├── Singleton-diagram.json
│ │ │ ├── State-diagram.json
│ │ │ ├── Strategy-diagram.json
│ │ │ ├── TemplateMethod-diagram.json
│ │ │ ├── theme_dark.json
│ │ │ ├── theme_default.json
│ │ │ ├── theme_pastel.json
│ │ │ ├── theme_vibrant.json
│ │ │ └── Visitor-diagram.json
│ │ └── types.py
│ ├── project_orchestration.py
│ ├── prompt_manager
│ │ ├── __init__.py
│ │ ├── loader.py
│ │ ├── manager.py
│ │ └── template.py
│ ├── prompts
│ │ ├── __dirname.json
│ │ ├── __image_1___describe_the_icon_in_one_sen___.json
│ │ ├── __init__.py
│ │ ├── __type.json
│ │ ├── _.json
│ │ ├── _DEFAULT_OPEN_DELIMITER.json
│ │ ├── _emojiRegex.json
│ │ ├── _UUID_CHARS.json
│ │ ├── a.json
│ │ ├── A.json
│ │ ├── Aa.json
│ │ ├── aAnnotationPadding.json
│ │ ├── absoluteThresholdGroup.json
│ │ ├── add.json
│ │ ├── ADDITIONAL_PROPERTY_FLAG.json
│ │ ├── Advanced_Multi-Server_Integration_Template.json
│ │ ├── allOptionsList.json
│ │ ├── analysis
│ │ │ ├── Data_Analysis_Template.json
│ │ │ ├── index.json
│ │ │ ├── Mermaid_Analysis_Expert.json
│ │ │ ├── Sequential_Data_Analysis_with_MCP_Integration.json
│ │ │ └── SOLID_Code_Analysis_Visualizer.json
│ │ ├── Analysis_Assistant.json
│ │ ├── Analyze_Mermaid_Diagram.json
│ │ ├── ANDROID_EVERGREEN_FIRST.json
│ │ ├── ANSI_ESCAPE_BELL.json
│ │ ├── architecture
│ │ │ ├── index.json
│ │ │ └── PromptCombiner_Interface.json
│ │ ├── Architecture_Design_Assistant.json
│ │ ├── argsTag.json
│ │ ├── ARROW.json
│ │ ├── assistant
│ │ │ ├── Analysis_Assistant.json
│ │ │ ├── Architecture_Design_Assistant.json
│ │ │ ├── Code_Refactoring_Assistant.json
│ │ │ ├── Code_Review_Assistant.json
│ │ │ ├── Database_Query_Assistant.json
│ │ │ ├── Debugging_Assistant.json
│ │ │ ├── Foresight_Assistant.json
│ │ │ ├── index.json
│ │ │ ├── MCP_Integration_Assistant.json
│ │ │ ├── Project_Analysis_Assistant.json
│ │ │ └── Research_Assistant.json
│ │ ├── astralRange.json
│ │ ├── at.json
│ │ ├── authorization_endpoint.json
│ │ ├── b.json
│ │ ├── BABELIGNORE_FILENAME.json
│ │ ├── BACKSLASH.json
│ │ ├── backupId.json
│ │ ├── BANG.json
│ │ ├── BASE64_MAP.json
│ │ ├── baseFlags.json
│ │ ├── Basic_Template.json
│ │ ├── bgModel.json
│ │ ├── bignum.json
│ │ ├── blockKeywordsStr.json
│ │ ├── BOMChar.json
│ │ ├── boundary.json
│ │ ├── brackets.json
│ │ ├── BROWSER_VAR.json
│ │ ├── bt.json
│ │ ├── BUILTIN.json
│ │ ├── BULLET.json
│ │ ├── c.json
│ │ ├── C.json
│ │ ├── CACHE_VERSION.json
│ │ ├── cacheControl.json
│ │ ├── cacheProp.json
│ │ ├── category.py
│ │ ├── CHANGE_EVENT.json
│ │ ├── CHAR_CODE_0.json
│ │ ├── chars.json
│ │ ├── cjsPattern.json
│ │ ├── cKeywords.json
│ │ ├── classForPercent.json
│ │ ├── classStr.json
│ │ ├── clientFirstMessageBare.json
│ │ ├── cmd.json
│ │ ├── Code_Diagram_Documentation_Creator.json
│ │ ├── Code_Refactoring_Assistant.json
│ │ ├── Code_Review_Assistant.json
│ │ ├── code.json
│ │ ├── coding
│ │ │ ├── __dirname.json
│ │ │ ├── _.json
│ │ │ ├── _DEFAULT_OPEN_DELIMITER.json
│ │ │ ├── _emojiRegex.json
│ │ │ ├── _UUID_CHARS.json
│ │ │ ├── a.json
│ │ │ ├── A.json
│ │ │ ├── aAnnotationPadding.json
│ │ │ ├── absoluteThresholdGroup.json
│ │ │ ├── add.json
│ │ │ ├── ADDITIONAL_PROPERTY_FLAG.json
│ │ │ ├── allOptionsList.json
│ │ │ ├── ANDROID_EVERGREEN_FIRST.json
│ │ │ ├── ANSI_ESCAPE_BELL.json
│ │ │ ├── argsTag.json
│ │ │ ├── ARROW.json
│ │ │ ├── astralRange.json
│ │ │ ├── at.json
│ │ │ ├── authorization_endpoint.json
│ │ │ ├── BABELIGNORE_FILENAME.json
│ │ │ ├── BACKSLASH.json
│ │ │ ├── BANG.json
│ │ │ ├── BASE64_MAP.json
│ │ │ ├── baseFlags.json
│ │ │ ├── bgModel.json
│ │ │ ├── bignum.json
│ │ │ ├── blockKeywordsStr.json
│ │ │ ├── BOMChar.json
│ │ │ ├── boundary.json
│ │ │ ├── brackets.json
│ │ │ ├── BROWSER_VAR.json
│ │ │ ├── bt.json
│ │ │ ├── BUILTIN.json
│ │ │ ├── BULLET.json
│ │ │ ├── c.json
│ │ │ ├── C.json
│ │ │ ├── CACHE_VERSION.json
│ │ │ ├── cacheControl.json
│ │ │ ├── cacheProp.json
│ │ │ ├── CHANGE_EVENT.json
│ │ │ ├── CHAR_CODE_0.json
│ │ │ ├── chars.json
│ │ │ ├── cjsPattern.json
│ │ │ ├── cKeywords.json
│ │ │ ├── classForPercent.json
│ │ │ ├── classStr.json
│ │ │ ├── clientFirstMessageBare.json
│ │ │ ├── cmd.json
│ │ │ ├── code.json
│ │ │ ├── colorCode.json
│ │ │ ├── comma.json
│ │ │ ├── command.json
│ │ │ ├── configJsContent.json
│ │ │ ├── connectionString.json
│ │ │ ├── cssClassStr.json
│ │ │ ├── currentBoundaryParse.json
│ │ │ ├── d.json
│ │ │ ├── data.json
│ │ │ ├── DATA.json
│ │ │ ├── dataWebpackPrefix.json
│ │ │ ├── debug.json
│ │ │ ├── decodeStateVectorV2.json
│ │ │ ├── DEFAULT_DELIMITER.json
│ │ │ ├── DEFAULT_DIAGRAM_DIRECTION.json
│ │ │ ├── DEFAULT_JS_PATTERN.json
│ │ │ ├── DEFAULT_LOG_TARGET.json
│ │ │ ├── defaultHelpOpt.json
│ │ │ ├── defaultHost.json
│ │ │ ├── deferY18nLookupPrefix.json
│ │ │ ├── DELIM.json
│ │ │ ├── delimiter.json
│ │ │ ├── DEPRECATION.json
│ │ │ ├── destMain.json
│ │ │ ├── DID_NOT_THROW.json
│ │ │ ├── direction.json
│ │ │ ├── displayValue.json
│ │ │ ├── DNS.json
│ │ │ ├── doc.json
│ │ │ ├── DOCUMENTATION_NOTE.json
│ │ │ ├── DOT.json
│ │ │ ├── DOTS.json
│ │ │ ├── dummyCompoundId.json
│ │ │ ├── e.json
│ │ │ ├── E.json
│ │ │ ├── earlyHintsLink.json
│ │ │ ├── elide.json
│ │ │ ├── EMPTY.json
│ │ │ ├── end.json
│ │ │ ├── endpoint.json
│ │ │ ├── environment.json
│ │ │ ├── ERR_CODE.json
│ │ │ ├── errMessage.json
│ │ │ ├── errMsg.json
│ │ │ ├── ERROR_MESSAGE.json
│ │ │ ├── error.json
│ │ │ ├── ERROR.json
│ │ │ ├── ERRORCLASS.json
│ │ │ ├── errorMessage.json
│ │ │ ├── es6Default.json
│ │ │ ├── ESC.json
│ │ │ ├── Escapable.json
│ │ │ ├── escapedChar.json
│ │ │ ├── escapeFuncStr.json
│ │ │ ├── escSlash.json
│ │ │ ├── ev.json
│ │ │ ├── event.json
│ │ │ ├── execaMessage.json
│ │ │ ├── EXPECTED_LABEL.json
│ │ │ ├── expected.json
│ │ │ ├── expectedString.json
│ │ │ ├── expression1.json
│ │ │ ├── EXTENSION.json
│ │ │ ├── f.json
│ │ │ ├── FAIL_TEXT.json
│ │ │ ├── FILE_BROWSER_FACTORY.json
│ │ │ ├── fill.json
│ │ │ ├── findPackageJson.json
│ │ │ ├── fnKey.json
│ │ │ ├── FORMAT.json
│ │ │ ├── formatted.json
│ │ │ ├── from.json
│ │ │ ├── fullpaths.json
│ │ │ ├── FUNC_ERROR_TEXT.json
│ │ │ ├── GenStateSuspendedStart.json
│ │ │ ├── GENSYNC_EXPECTED_START.json
│ │ │ ├── gutter.json
│ │ │ ├── h.json
│ │ │ ├── handlerFuncName.json
│ │ │ ├── HASH_UNDEFINED.json
│ │ │ ├── head.json
│ │ │ ├── helpMessage.json
│ │ │ ├── HINT_ARG.json
│ │ │ ├── HOOK_RETURNED_NOTHING_ERROR_MESSAGE.json
│ │ │ ├── i.json
│ │ │ ├── id.json
│ │ │ ├── identifier.json
│ │ │ ├── Identifier.json
│ │ │ ├── INDENT.json
│ │ │ ├── indentation.json
│ │ │ ├── index.json
│ │ │ ├── INDIRECTION_FRAGMENT.json
│ │ │ ├── input.json
│ │ │ ├── inputText.json
│ │ │ ├── insert.json
│ │ │ ├── insertPromptQuery.json
│ │ │ ├── INSPECT_MAX_BYTES.json
│ │ │ ├── intToCharMap.json
│ │ │ ├── IS_ITERABLE_SENTINEL.json
│ │ │ ├── IS_KEYED_SENTINEL.json
│ │ │ ├── isConfigType.json
│ │ │ ├── isoSentinel.json
│ │ │ ├── isSourceNode.json
│ │ │ ├── j.json
│ │ │ ├── JAKE_CMD.json
│ │ │ ├── JEST_GLOBAL_NAME.json
│ │ │ ├── JEST_GLOBALS_MODULE_NAME.json
│ │ │ ├── JSON_SYNTAX_CHAR.json
│ │ │ ├── json.json
│ │ │ ├── jsonType.json
│ │ │ ├── jupyter_namespaceObject.json
│ │ │ ├── JUPYTERLAB_DOCMANAGER_PLUGIN_ID.json
│ │ │ ├── k.json
│ │ │ ├── KERNEL_STATUS_ERROR_CLASS.json
│ │ │ ├── key.json
│ │ │ ├── l.json
│ │ │ ├── labelId.json
│ │ │ ├── LATEST_PROTOCOL_VERSION.json
│ │ │ ├── LETTERDASHNUMBER.json
│ │ │ ├── LF.json
│ │ │ ├── LIMIT_REPLACE_NODE.json
│ │ │ ├── logTime.json
│ │ │ ├── lstatkey.json
│ │ │ ├── lt.json
│ │ │ ├── m.json
│ │ │ ├── maliciousPayload.json
│ │ │ ├── mask.json
│ │ │ ├── match.json
│ │ │ ├── matchingDelim.json
│ │ │ ├── MAXIMUM_MESSAGE_SIZE.json
│ │ │ ├── mdcContent.json
│ │ │ ├── MERMAID_DOM_ID_PREFIX.json
│ │ │ ├── message.json
│ │ │ ├── messages.json
│ │ │ ├── meth.json
│ │ │ ├── minimatch.json
│ │ │ ├── MOCK_CONSTRUCTOR_NAME.json
│ │ │ ├── MOCKS_PATTERN.json
│ │ │ ├── moduleDirectory.json
│ │ │ ├── msg.json
│ │ │ ├── mtr.json
│ │ │ ├── multipartType.json
│ │ │ ├── n.json
│ │ │ ├── N.json
│ │ │ ├── name.json
│ │ │ ├── NATIVE_PLATFORM.json
│ │ │ ├── newUrl.json
│ │ │ ├── NM.json
│ │ │ ├── NO_ARGUMENTS.json
│ │ │ ├── NO_DIFF_MESSAGE.json
│ │ │ ├── NODE_MODULES.json
│ │ │ ├── nodeInternalPrefix.json
│ │ │ ├── nonASCIIidentifierStartChars.json
│ │ │ ├── nonKey.json
│ │ │ ├── NOT_A_DOT.json
│ │ │ ├── notCharacterOrDash.json
│ │ │ ├── notebookURL.json
│ │ │ ├── notSelector.json
│ │ │ ├── nullTag.json
│ │ │ ├── num.json
│ │ │ ├── NUMBER.json
│ │ │ ├── o.json
│ │ │ ├── O.json
│ │ │ ├── octChar.json
│ │ │ ├── octetStreamType.json
│ │ │ ├── operators.json
│ │ │ ├── out.json
│ │ │ ├── OUTSIDE_JEST_VM_PROTOCOL.json
│ │ │ ├── override.json
│ │ │ ├── p.json
│ │ │ ├── PACKAGE_FILENAME.json
│ │ │ ├── PACKAGE_JSON.json
│ │ │ ├── packageVersion.json
│ │ │ ├── paddedNumber.json
│ │ │ ├── page.json
│ │ │ ├── parseClass.json
│ │ │ ├── path.json
│ │ │ ├── pathExt.json
│ │ │ ├── pattern.json
│ │ │ ├── PatternBoolean.json
│ │ │ ├── pBuiltins.json
│ │ │ ├── pFloatForm.json
│ │ │ ├── pkg.json
│ │ │ ├── PLUGIN_ID_DOC_MANAGER.json
│ │ │ ├── plusChar.json
│ │ │ ├── PN_CHARS.json
│ │ │ ├── point.json
│ │ │ ├── prefix.json
│ │ │ ├── PRETTY_PLACEHOLDER.json
│ │ │ ├── property_prefix.json
│ │ │ ├── pubkey256.json
│ │ │ ├── Q.json
│ │ │ ├── qmark.json
│ │ │ ├── QO.json
│ │ │ ├── query.json
│ │ │ ├── querystringType.json
│ │ │ ├── queryText.json
│ │ │ ├── r.json
│ │ │ ├── R.json
│ │ │ ├── rangeStart.json
│ │ │ ├── re.json
│ │ │ ├── reI.json
│ │ │ ├── REQUIRED_FIELD_SYMBOL.json
│ │ │ ├── reserve.json
│ │ │ ├── resolvedDestination.json
│ │ │ ├── resolverDir.json
│ │ │ ├── responseType.json
│ │ │ ├── result.json
│ │ │ ├── ROOT_DESCRIBE_BLOCK_NAME.json
│ │ │ ├── ROOT_NAMESPACE_NAME.json
│ │ │ ├── ROOT_TASK_NAME.json
│ │ │ ├── route.json
│ │ │ ├── RUNNING_TEXT.json
│ │ │ ├── s.json
│ │ │ ├── SCHEMA_PATH.json
│ │ │ ├── se.json
│ │ │ ├── SEARCHABLE_CLASS.json
│ │ │ ├── secret.json
│ │ │ ├── selector.json
│ │ │ ├── SEMVER_SPEC_VERSION.json
│ │ │ ├── sensitiveHeaders.json
│ │ │ ├── sep.json
│ │ │ ├── separator.json
│ │ │ ├── SHAPE_STATE.json
│ │ │ ├── shape.json
│ │ │ ├── SHARED.json
│ │ │ ├── short.json
│ │ │ ├── side.json
│ │ │ ├── SNAPSHOT_VERSION.json
│ │ │ ├── SOURCE_MAPPING_PREFIX.json
│ │ │ ├── source.json
│ │ │ ├── sourceMapContent.json
│ │ │ ├── SPACE_SYMBOL.json
│ │ │ ├── SPACE.json
│ │ │ ├── sqlKeywords.json
│ │ │ ├── sranges.json
│ │ │ ├── st.json
│ │ │ ├── ST.json
│ │ │ ├── stack.json
│ │ │ ├── START_HIDING.json
│ │ │ ├── START_OF_LINE.json
│ │ │ ├── startNoTraversal.json
│ │ │ ├── STATES.json
│ │ │ ├── stats.json
│ │ │ ├── statSync.json
│ │ │ ├── storageStatus.json
│ │ │ ├── storageType.json
│ │ │ ├── str.json
│ │ │ ├── stringifiedObject.json
│ │ │ ├── stringPath.json
│ │ │ ├── stringResult.json
│ │ │ ├── stringTag.json
│ │ │ ├── strValue.json
│ │ │ ├── style.json
│ │ │ ├── SUB_NAME.json
│ │ │ ├── subkey.json
│ │ │ ├── SUBPROTOCOL.json
│ │ │ ├── SUITE_NAME.json
│ │ │ ├── symbolPattern.json
│ │ │ ├── symbolTag.json
│ │ │ ├── t.json
│ │ │ ├── T.json
│ │ │ ├── templateDir.json
│ │ │ ├── tempName.json
│ │ │ ├── text.json
│ │ │ ├── time.json
│ │ │ ├── titleSeparator.json
│ │ │ ├── tmpl.json
│ │ │ ├── tn.json
│ │ │ ├── toValue.json
│ │ │ ├── transform.json
│ │ │ ├── trustProxyDefaultSymbol.json
│ │ │ ├── typeArgumentsKey.json
│ │ │ ├── typeKey.json
│ │ │ ├── typeMessage.json
│ │ │ ├── typesRegistryPackageName.json
│ │ │ ├── u.json
│ │ │ ├── UNDEFINED.json
│ │ │ ├── unit.json
│ │ │ ├── UNMATCHED_SURROGATE_PAIR_REPLACE.json
│ │ │ ├── ur.json
│ │ │ ├── USAGE.json
│ │ │ ├── value.json
│ │ │ ├── Vr.json
│ │ │ ├── watchmanURL.json
│ │ │ ├── webkit.json
│ │ │ ├── xhtml.json
│ │ │ ├── XP_DEFAULT_PATHEXT.json
│ │ │ └── y.json
│ │ ├── Collaborative_Development_with_MCP_Integration.json
│ │ ├── colorCode.json
│ │ ├── comma.json
│ │ ├── command.json
│ │ ├── completionShTemplate.json
│ │ ├── configJsContent.json
│ │ ├── connectionString.json
│ │ ├── Consolidated_TypeScript_Interfaces_Template.json
│ │ ├── Could_you_interpret_the_assumed_applicat___.json
│ │ ├── cssClassStr.json
│ │ ├── currentBoundaryParse.json
│ │ ├── d.json
│ │ ├── Data_Analysis_Template.json
│ │ ├── data.json
│ │ ├── DATA.json
│ │ ├── Database_Query_Assistant.json
│ │ ├── dataWebpackPrefix.json
│ │ ├── debug.json
│ │ ├── Debugging_Assistant.json
│ │ ├── decodeStateVectorV2.json
│ │ ├── DEFAULT_DELIMITER.json
│ │ ├── DEFAULT_DIAGRAM_DIRECTION.json
│ │ ├── DEFAULT_INDENT.json
│ │ ├── DEFAULT_JS_PATTERN.json
│ │ ├── DEFAULT_LOG_TARGET.json
│ │ ├── defaultHelpOpt.json
│ │ ├── defaultHost.json
│ │ ├── deferY18nLookupPrefix.json
│ │ ├── DELIM.json
│ │ ├── delimiter.json
│ │ ├── DEPRECATION.json
│ │ ├── DESCENDING.json
│ │ ├── destMain.json
│ │ ├── development
│ │ │ ├── Collaborative_Development_with_MCP_Integration.json
│ │ │ ├── Consolidated_TypeScript_Interfaces_Template.json
│ │ │ ├── Development_Workflow.json
│ │ │ ├── index.json
│ │ │ ├── MCP_Server_Development_Prompt_Combiner.json
│ │ │ └── Monorepo_Migration_and_Code_Organization_Guide.json
│ │ ├── Development_System_Prompt.json
│ │ ├── Development_Workflow.json
│ │ ├── devops
│ │ │ ├── Docker_Compose_Prompt_Combiner.json
│ │ │ ├── Docker_Containerization_Guide.json
│ │ │ └── index.json
│ │ ├── DID_NOT_THROW.json
│ │ ├── direction.json
│ │ ├── displayValue.json
│ │ ├── DNS.json
│ │ ├── doc.json
│ │ ├── Docker_Compose_Prompt_Combiner.json
│ │ ├── Docker_Containerization_Guide.json
│ │ ├── Docker_MCP_Servers_Orchestration_Guide.json
│ │ ├── DOCUMENTATION_NOTE.json
│ │ ├── DOT.json
│ │ ├── DOTS.json
│ │ ├── dummyCompoundId.json
│ │ ├── e.json
│ │ ├── E.json
│ │ ├── earlyHintsLink.json
│ │ ├── elide.json
│ │ ├── EMPTY.json
│ │ ├── encoded.json
│ │ ├── end.json
│ │ ├── endpoint.json
│ │ ├── environment.json
│ │ ├── ERR_CODE.json
│ │ ├── errMessage.json
│ │ ├── errMsg.json
│ │ ├── ERROR_MESSAGE.json
│ │ ├── error.json
│ │ ├── ERROR.json
│ │ ├── ERRORCLASS.json
│ │ ├── errorMessage.json
│ │ ├── es6Default.json
│ │ ├── ESC.json
│ │ ├── Escapable.json
│ │ ├── escapedChar.json
│ │ ├── escapeFuncStr.json
│ │ ├── escSlash.json
│ │ ├── ev.json
│ │ ├── event.json
│ │ ├── execaMessage.json
│ │ ├── EXPECTED_LABEL.json
│ │ ├── expected.json
│ │ ├── expectedString.json
│ │ ├── expression1.json
│ │ ├── EXTENSION.json
│ │ ├── f.json
│ │ ├── FAIL_TEXT.json
│ │ ├── FILE_BROWSER_FACTORY.json
│ │ ├── fill.json
│ │ ├── findPackageJson.json
│ │ ├── fnKey.json
│ │ ├── Foresight_Assistant.json
│ │ ├── FORMAT.json
│ │ ├── formatted.json
│ │ ├── from.json
│ │ ├── fullpaths.json
│ │ ├── FUNC_ERROR_TEXT.json
│ │ ├── general
│ │ │ └── index.json
│ │ ├── Generate_different_types_of_questions_ab___.json
│ │ ├── Generate_Mermaid_Diagram.json
│ │ ├── GenStateSuspendedStart.json
│ │ ├── GENSYNC_EXPECTED_START.json
│ │ ├── GitHub_Repository_Explorer.json
│ │ ├── gutter.json
│ │ ├── h.json
│ │ ├── handlerFuncName.json
│ │ ├── HASH_UNDEFINED.json
│ │ ├── head.json
│ │ ├── helpMessage.json
│ │ ├── HINT_ARG.json
│ │ ├── HOOK_RETURNED_NOTHING_ERROR_MESSAGE.json
│ │ ├── i.json
│ │ ├── id.json
│ │ ├── identifier.json
│ │ ├── Identifier.json
│ │ ├── INDENT.json
│ │ ├── indentation.json
│ │ ├── index.json
│ │ ├── INDIRECTION_FRAGMENT.json
│ │ ├── Initialize_project_setup_for_a_new_micro___.json
│ │ ├── input.json
│ │ ├── inputText.json
│ │ ├── insert.json
│ │ ├── insertPromptQuery.json
│ │ ├── INSPECT_MAX_BYTES.json
│ │ ├── install_dependencies__build__run__test____.json
│ │ ├── intToCharMap.json
│ │ ├── IS_ITERABLE_SENTINEL.json
│ │ ├── IS_KEYED_SENTINEL.json
│ │ ├── isConfigType.json
│ │ ├── isoSentinel.json
│ │ ├── isSourceNode.json
│ │ ├── j.json
│ │ ├── J.json
│ │ ├── JAKE_CMD.json
│ │ ├── JEST_GLOBAL_NAME.json
│ │ ├── JEST_GLOBALS_MODULE_NAME.json
│ │ ├── JSON_SYNTAX_CHAR.json
│ │ ├── json.json
│ │ ├── jsonType.json
│ │ ├── jupyter_namespaceObject.json
│ │ ├── JUPYTERLAB_DOCMANAGER_PLUGIN_ID.json
│ │ ├── k.json
│ │ ├── KERNEL_STATUS_ERROR_CLASS.json
│ │ ├── key.json
│ │ ├── l.json
│ │ ├── labelId.json
│ │ ├── LATEST_PROTOCOL_VERSION.json
│ │ ├── LETTERDASHNUMBER.json
│ │ ├── LF.json
│ │ ├── LIMIT_REPLACE_NODE.json
│ │ ├── LINE_FEED.json
│ │ ├── logTime.json
│ │ ├── lstatkey.json
│ │ ├── lt.json
│ │ ├── m.json
│ │ ├── maliciousPayload.json
│ │ ├── manager.py
│ │ ├── marker.json
│ │ ├── mask.json
│ │ ├── match.json
│ │ ├── matchingDelim.json
│ │ ├── MAXIMUM_MESSAGE_SIZE.json
│ │ ├── MCP_Integration_Assistant.json
│ │ ├── MCP_Resources_Explorer.json
│ │ ├── MCP_Resources_Integration_Guide.json
│ │ ├── MCP_Server_Development_Prompt_Combiner.json
│ │ ├── MCP_Server_Integration_Guide.json
│ │ ├── mcp-code-generator.json
│ │ ├── mdcContent.json
│ │ ├── Mermaid_Analysis_Expert.json
│ │ ├── Mermaid_Class_Diagram_Generator.json
│ │ ├── Mermaid_Diagram_Generator.json
│ │ ├── Mermaid_Diagram_Modifier.json
│ │ ├── MERMAID_DOM_ID_PREFIX.json
│ │ ├── message.json
│ │ ├── messages.json
│ │ ├── meth.json
│ │ ├── minimatch.json
│ │ ├── MOBILE_QUERY.json
│ │ ├── MOCK_CONSTRUCTOR_NAME.json
│ │ ├── MOCKS_PATTERN.json
│ │ ├── Modify_Mermaid_Diagram.json
│ │ ├── moduleDirectory.json
│ │ ├── Monorepo_Migration_and_Code_Organization_Guide.json
│ │ ├── msg.json
│ │ ├── mtr.json
│ │ ├── Multi-Resource_Context_Assistant.json
│ │ ├── multipartType.json
│ │ ├── n.json
│ │ ├── N.json
│ │ ├── name.json
│ │ ├── NATIVE_PLATFORM.json
│ │ ├── newUrl.json
│ │ ├── NM.json
│ │ ├── NO_ARGUMENTS.json
│ │ ├── NO_DIFF_MESSAGE.json
│ │ ├── NODE_MODULES.json
│ │ ├── nodeInternalPrefix.json
│ │ ├── nonASCIIidentifierStartChars.json
│ │ ├── nonKey.json
│ │ ├── NOT_A_DOT.json
│ │ ├── notCharacterOrDash.json
│ │ ├── notebookURL.json
│ │ ├── notSelector.json
│ │ ├── nullTag.json
│ │ ├── num.json
│ │ ├── NUMBER.json
│ │ ├── o.json
│ │ ├── O.json
│ │ ├── octChar.json
│ │ ├── octetStreamType.json
│ │ ├── operators.json
│ │ ├── other
│ │ │ ├── __image_1___describe_the_icon_in_one_sen___.json
│ │ │ ├── __type.json
│ │ │ ├── Advanced_Multi-Server_Integration_Template.json
│ │ │ ├── Analyze_Mermaid_Diagram.json
│ │ │ ├── Basic_Template.json
│ │ │ ├── Code_Diagram_Documentation_Creator.json
│ │ │ ├── Collaborative_Development_with_MCP_Integration.json
│ │ │ ├── completionShTemplate.json
│ │ │ ├── Could_you_interpret_the_assumed_applicat___.json
│ │ │ ├── DEFAULT_INDENT.json
│ │ │ ├── Docker_MCP_Servers_Orchestration_Guide.json
│ │ │ ├── Generate_different_types_of_questions_ab___.json
│ │ │ ├── Generate_Mermaid_Diagram.json
│ │ │ ├── GitHub_Repository_Explorer.json
│ │ │ ├── index.json
│ │ │ ├── Initialize_project_setup_for_a_new_micro___.json
│ │ │ ├── install_dependencies__build__run__test____.json
│ │ │ ├── LINE_FEED.json
│ │ │ ├── MCP_Resources_Explorer.json
│ │ │ ├── MCP_Resources_Integration_Guide.json
│ │ │ ├── MCP_Server_Integration_Guide.json
│ │ │ ├── mcp-code-generator.json
│ │ │ ├── Mermaid_Class_Diagram_Generator.json
│ │ │ ├── Mermaid_Diagram_Generator.json
│ │ │ ├── Mermaid_Diagram_Modifier.json
│ │ │ ├── Modify_Mermaid_Diagram.json
│ │ │ ├── Multi-Resource_Context_Assistant.json
│ │ │ ├── output.json
│ │ │ ├── sseUrl.json
│ │ │ ├── string.json
│ │ │ ├── Task_List_Helper.json
│ │ │ ├── Template-Based_MCP_Integration.json
│ │ │ ├── Test_Prompt.json
│ │ │ ├── type.json
│ │ │ ├── VERSION.json
│ │ │ ├── WIN_SLASH.json
│ │ │ └── You_are_limited_to_respond_Yes_or_No_onl___.json
│ │ ├── out.json
│ │ ├── output.json
│ │ ├── OUTSIDE_JEST_VM_PROTOCOL.json
│ │ ├── override.json
│ │ ├── p.json
│ │ ├── PACKAGE_FILENAME.json
│ │ ├── PACKAGE_JSON.json
│ │ ├── packageVersion.json
│ │ ├── paddedNumber.json
│ │ ├── page.json
│ │ ├── parseClass.json
│ │ ├── PATH_NODE_MODULES.json
│ │ ├── path.json
│ │ ├── pathExt.json
│ │ ├── pattern.json
│ │ ├── PatternBoolean.json
│ │ ├── pBuiltins.json
│ │ ├── pFloatForm.json
│ │ ├── pkg.json
│ │ ├── PLUGIN_ID_DOC_MANAGER.json
│ │ ├── plusChar.json
│ │ ├── PN_CHARS.json
│ │ ├── point.json
│ │ ├── prefix.json
│ │ ├── PRETTY_PLACEHOLDER.json
│ │ ├── Project_Analysis_Assistant.json
│ │ ├── ProjectsUpdatedInBackgroundEvent.json
│ │ ├── PromptCombiner_Interface.json
│ │ ├── promptId.json
│ │ ├── property_prefix.json
│ │ ├── pubkey256.json
│ │ ├── Q.json
│ │ ├── qmark.json
│ │ ├── QO.json
│ │ ├── query.json
│ │ ├── querystringType.json
│ │ ├── queryText.json
│ │ ├── r.json
│ │ ├── R.json
│ │ ├── rangeStart.json
│ │ ├── re.json
│ │ ├── reI.json
│ │ ├── REQUIRED_FIELD_SYMBOL.json
│ │ ├── Research_Assistant.json
│ │ ├── reserve.json
│ │ ├── resolvedDestination.json
│ │ ├── resolverDir.json
│ │ ├── responseType.json
│ │ ├── result.json
│ │ ├── ROOT_DESCRIBE_BLOCK_NAME.json
│ │ ├── ROOT_NAMESPACE_NAME.json
│ │ ├── ROOT_TASK_NAME.json
│ │ ├── route.json
│ │ ├── RUNNING_TEXT.json
│ │ ├── RXstyle.json
│ │ ├── s.json
│ │ ├── SCHEMA_PATH.json
│ │ ├── schemaQuery.json
│ │ ├── se.json
│ │ ├── SEARCHABLE_CLASS.json
│ │ ├── secret.json
│ │ ├── selector.json
│ │ ├── SEMVER_SPEC_VERSION.json
│ │ ├── sensitiveHeaders.json
│ │ ├── sep.json
│ │ ├── separator.json
│ │ ├── Sequential_Data_Analysis_with_MCP_Integration.json
│ │ ├── SHAPE_STATE.json
│ │ ├── shape.json
│ │ ├── SHARED.json
│ │ ├── short.json
│ │ ├── side.json
│ │ ├── SNAPSHOT_VERSION.json
│ │ ├── SOLID_Code_Analysis_Visualizer.json
│ │ ├── SOURCE_MAPPING_PREFIX.json
│ │ ├── source.json
│ │ ├── sourceMapContent.json
│ │ ├── SPACE_SYMBOL.json
│ │ ├── SPACE.json
│ │ ├── sqlKeywords.json
│ │ ├── sranges.json
│ │ ├── sseUrl.json
│ │ ├── st.json
│ │ ├── ST.json
│ │ ├── stack.json
│ │ ├── START_HIDING.json
│ │ ├── START_OF_LINE.json
│ │ ├── startNoTraversal.json
│ │ ├── STATES.json
│ │ ├── stats.json
│ │ ├── statSync.json
│ │ ├── status.json
│ │ ├── storageStatus.json
│ │ ├── storageType.json
│ │ ├── str.json
│ │ ├── string.json
│ │ ├── stringifiedObject.json
│ │ ├── stringPath.json
│ │ ├── stringResult.json
│ │ ├── stringTag.json
│ │ ├── strValue.json
│ │ ├── style.json
│ │ ├── SUB_NAME.json
│ │ ├── subkey.json
│ │ ├── SUBPROTOCOL.json
│ │ ├── SUITE_NAME.json
│ │ ├── symbolPattern.json
│ │ ├── symbolTag.json
│ │ ├── system
│ │ │ ├── Aa.json
│ │ │ ├── b.json
│ │ │ ├── Development_System_Prompt.json
│ │ │ ├── index.json
│ │ │ ├── marker.json
│ │ │ ├── PATH_NODE_MODULES.json
│ │ │ ├── ProjectsUpdatedInBackgroundEvent.json
│ │ │ ├── RXstyle.json
│ │ │ ├── status.json
│ │ │ └── versionMajorMinor.json
│ │ ├── t.json
│ │ ├── T.json
│ │ ├── Task_List_Helper.json
│ │ ├── Template-Based_MCP_Integration.json
│ │ ├── template.py
│ │ ├── templateDir.json
│ │ ├── tempName.json
│ │ ├── Test_Prompt.json
│ │ ├── text.json
│ │ ├── time.json
│ │ ├── titleSeparator.json
│ │ ├── tmpl.json
│ │ ├── tn.json
│ │ ├── TOPBAR_FACTORY.json
│ │ ├── toValue.json
│ │ ├── transform.json
│ │ ├── trustProxyDefaultSymbol.json
│ │ ├── txt.json
│ │ ├── type.json
│ │ ├── typeArgumentsKey.json
│ │ ├── typeKey.json
│ │ ├── typeMessage.json
│ │ ├── typesRegistryPackageName.json
│ │ ├── u.json
│ │ ├── UNDEFINED.json
│ │ ├── unit.json
│ │ ├── UNMATCHED_SURROGATE_PAIR_REPLACE.json
│ │ ├── ur.json
│ │ ├── usage.json
│ │ ├── USAGE.json
│ │ ├── user
│ │ │ ├── backupId.json
│ │ │ ├── DESCENDING.json
│ │ │ ├── encoded.json
│ │ │ ├── index.json
│ │ │ ├── J.json
│ │ │ ├── MOBILE_QUERY.json
│ │ │ ├── promptId.json
│ │ │ ├── schemaQuery.json
│ │ │ ├── TOPBAR_FACTORY.json
│ │ │ ├── txt.json
│ │ │ └── usage.json
│ │ ├── value.json
│ │ ├── VERSION.json
│ │ ├── version.py
│ │ ├── versionMajorMinor.json
│ │ ├── Vr.json
│ │ ├── watchmanURL.json
│ │ ├── webkit.json
│ │ ├── WIN_SLASH.json
│ │ ├── xhtml.json
│ │ ├── XP_DEFAULT_PATHEXT.json
│ │ ├── y.json
│ │ └── You_are_limited_to_respond_Yes_or_No_onl___.json
│ ├── resources
│ │ ├── __init__.py
│ │ ├── code_examples
│ │ │ └── index.json
│ │ ├── config
│ │ │ └── index.json
│ │ ├── documentation
│ │ │ └── index.json
│ │ ├── images
│ │ │ └── index.json
│ │ ├── index.json
│ │ └── other
│ │ └── index.json
│ ├── server.py
│ ├── templates
│ │ ├── __init__.py
│ │ ├── AbstractFactory.json
│ │ ├── Adapter.json
│ │ ├── base.py
│ │ ├── Builder.json
│ │ ├── Chain.json
│ │ ├── Command.json
│ │ ├── component
│ │ │ ├── AbstractFactory.json
│ │ │ ├── Adapter.json
│ │ │ ├── Builder.json
│ │ │ ├── Chain.json
│ │ │ ├── Command.json
│ │ │ ├── Decorator.json
│ │ │ ├── Facade.json
│ │ │ ├── Factory.json
│ │ │ ├── Iterator.json
│ │ │ ├── Mediator.json
│ │ │ ├── Memento.json
│ │ │ ├── Observer.json
│ │ │ ├── Prototype.json
│ │ │ ├── Proxy.json
│ │ │ ├── Singleton.json
│ │ │ ├── State.json
│ │ │ ├── Strategy.json
│ │ │ ├── TemplateMethod.json
│ │ │ └── Visitor.json
│ │ ├── component.py
│ │ ├── Decorator.json
│ │ ├── Facade.json
│ │ ├── Factory.json
│ │ ├── index.json
│ │ ├── Iterator.json
│ │ ├── manager.py
│ │ ├── Mediator.json
│ │ ├── Memento.json
│ │ ├── Observer.json
│ │ ├── project.py
│ │ ├── Prototype.json
│ │ ├── Proxy.json
│ │ ├── renderer.py
│ │ ├── Singleton.json
│ │ ├── State.json
│ │ ├── Strategy.json
│ │ ├── template_manager.py
│ │ ├── TemplateMethod.json
│ │ ├── types.py
│ │ └── Visitor.json
│ └── utils
│ └── __init__.py
├── SUMMARY.md
├── TASK_COMPLETION_SUMMARY.md
├── templates
│ └── openssl
│ ├── files
│ │ ├── CMakeLists.txt.jinja2
│ │ ├── conanfile.py.jinja2
│ │ ├── main.cpp.jinja2
│ │ └── README.md.jinja2
│ ├── openssl-consumer.json
│ └── template.json
├── test_openssl_integration.sh
├── test_package
│ └── conanfile.py
└── tests
├── __init__.py
├── conftest.py
├── integration
│ ├── test_core_integration.py
│ ├── test_mermaid_integration.py
│ ├── test_prompt_manager_integration.py
│ └── test_server_integration.py
├── test_aws_mcp.py
├── test_base_classes.py
├── test_config.py
├── test_exceptions.py
├── test_mermaid.py
├── test_prompts.py
└── test_templates.py
```
# Files
--------------------------------------------------------------------------------
/printcast-agent/src/integrations/asterisk.py:
--------------------------------------------------------------------------------
```python
"""
Asterisk SIP integration for PrintCast Agent.
Handles telephony operations including:
- Call routing and management
- IVR interactions
- DTMF processing
- Call recording
"""
import asyncio
import logging
from typing import Any, Callable, Dict, List, Optional
from datetime import datetime
import structlog
from panoramisk import Manager
from pydantic import BaseModel, Field
logger = structlog.get_logger(__name__)
class CallInfo(BaseModel):
"""Information about an active call."""
channel: str
caller_id: str
called_number: str
start_time: datetime
state: str
unique_id: str
metadata: Dict[str, Any] = Field(default_factory=dict)
class AsteriskManager:
"""
Manages Asterisk SIP server integration.
Provides high-level interface for:
- AMI (Asterisk Manager Interface) operations
- Call control and routing
- IVR menu handling
- DTMF input processing
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize Asterisk manager.
Args:
config: Asterisk configuration including:
- host: Asterisk server hostname
- port: AMI port (default 5038)
- username: AMI username
- password: AMI password
- context: Default dialplan context
"""
self.config = config
self.ami: Optional[Manager] = None
self.connected = False
self.active_calls: Dict[str, CallInfo] = {}
self.event_handlers: Dict[str, List[Callable]] = {}
# Configuration
self.host = config.get("host", "localhost")
self.port = config.get("port", 5038)
self.username = config.get("username", "admin")
self.password = config.get("password", "")
self.context = config.get("context", "printcast-ivr")
logger.info(
"Asterisk manager initialized",
host=self.host,
port=self.port,
context=self.context
)
async def connect(self) -> bool:
"""
Connect to Asterisk AMI.
Returns:
True if connection successful
"""
try:
self.ami = Manager(
host=self.host,
port=self.port,
username=self.username,
secret=self.password
)
# Connect to AMI
await self.ami.connect()
# Register event handlers
self.ami.register_event("*", self._handle_ami_event)
self.connected = True
logger.info("Connected to Asterisk AMI", host=self.host)
return True
except Exception as e:
logger.error("Failed to connect to Asterisk", error=str(e))
self.connected = False
return False
async def disconnect(self):
"""Disconnect from Asterisk AMI."""
if self.ami and self.connected:
try:
await self.ami.logoff()
await self.ami.close()
self.connected = False
logger.info("Disconnected from Asterisk")
except Exception as e:
logger.error("Error disconnecting from Asterisk", error=str(e))
def is_connected(self) -> bool:
"""Check if connected to Asterisk."""
return self.connected
async def _handle_ami_event(self, event: Dict[str, Any]):
"""
Handle AMI events.
Args:
event: AMI event data
"""
event_type = event.get("Event", "")
try:
# Handle specific events
if event_type == "Newchannel":
await self._handle_new_channel(event)
elif event_type == "Hangup":
await self._handle_hangup(event)
elif event_type == "DTMF":
await self._handle_dtmf(event)
elif event_type == "NewCallerid":
await self._handle_caller_id(event)
# Call registered handlers
if event_type in self.event_handlers:
for handler in self.event_handlers[event_type]:
asyncio.create_task(handler(event))
except Exception as e:
logger.error(
"Error handling AMI event",
event_type=event_type,
error=str(e)
)
async def _handle_new_channel(self, event: Dict[str, Any]):
"""Handle new channel creation."""
channel = event.get("Channel", "")
caller_id = event.get("CallerIDNum", "")
unique_id = event.get("Uniqueid", "")
call_info = CallInfo(
channel=channel,
caller_id=caller_id,
called_number=event.get("Exten", ""),
start_time=datetime.now(),
state="ringing",
unique_id=unique_id
)
self.active_calls[unique_id] = call_info
logger.info(
"New call detected",
channel=channel,
caller_id=caller_id,
unique_id=unique_id
)
async def _handle_hangup(self, event: Dict[str, Any]):
"""Handle call hangup."""
unique_id = event.get("Uniqueid", "")
if unique_id in self.active_calls:
call_info = self.active_calls[unique_id]
duration = (datetime.now() - call_info.start_time).total_seconds()
logger.info(
"Call ended",
unique_id=unique_id,
duration=duration,
caller_id=call_info.caller_id
)
del self.active_calls[unique_id]
async def _handle_dtmf(self, event: Dict[str, Any]):
"""Handle DTMF digit press."""
digit = event.get("Digit", "")
unique_id = event.get("Uniqueid", "")
if unique_id in self.active_calls:
call_info = self.active_calls[unique_id]
# Store DTMF in metadata
if "dtmf_buffer" not in call_info.metadata:
call_info.metadata["dtmf_buffer"] = ""
call_info.metadata["dtmf_buffer"] += digit
logger.debug(
"DTMF received",
digit=digit,
unique_id=unique_id,
buffer=call_info.metadata["dtmf_buffer"]
)
async def _handle_caller_id(self, event: Dict[str, Any]):
"""Handle caller ID update."""
unique_id = event.get("Uniqueid", "")
caller_id = event.get("CallerIDNum", "")
if unique_id in self.active_calls:
self.active_calls[unique_id].caller_id = caller_id
async def originate_call(
self,
destination: str,
caller_id: str = "PrintCast",
timeout: int = 30,
variables: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""
Originate an outbound call.
Args:
destination: Destination number
caller_id: Caller ID to present
timeout: Call timeout in seconds
variables: Channel variables to set
Returns:
Call result information
"""
if not self.connected:
raise RuntimeError("Not connected to Asterisk")
try:
response = await self.ami.send_action({
"Action": "Originate",
"Channel": f"SIP/{destination}",
"Context": self.context,
"Exten": "s",
"Priority": "1",
"CallerID": caller_id,
"Timeout": str(timeout * 1000),
"Variable": variables or {}
})
logger.info(
"Call originated",
destination=destination,
caller_id=caller_id
)
return {
"success": response.get("Response") == "Success",
"message": response.get("Message", ""),
"action_id": response.get("ActionID", "")
}
except Exception as e:
logger.error("Failed to originate call", error=str(e))
raise
async def transfer_call(
self,
channel: str,
destination: str,
context: Optional[str] = None
) -> bool:
"""
Transfer an active call.
Args:
channel: Channel to transfer
destination: Transfer destination
context: Optional context (uses default if not specified)
Returns:
True if transfer successful
"""
if not self.connected:
raise RuntimeError("Not connected to Asterisk")
try:
response = await self.ami.send_action({
"Action": "Redirect",
"Channel": channel,
"Context": context or self.context,
"Exten": destination,
"Priority": "1"
})
success = response.get("Response") == "Success"
logger.info(
"Call transferred",
channel=channel,
destination=destination,
success=success
)
return success
except Exception as e:
logger.error("Failed to transfer call", error=str(e))
return False
async def hangup_call(self, channel: str, cause: int = 16) -> bool:
"""
Hangup an active call.
Args:
channel: Channel to hangup
cause: Hangup cause code (16 = normal clearing)
Returns:
True if hangup successful
"""
if not self.connected:
raise RuntimeError("Not connected to Asterisk")
try:
response = await self.ami.send_action({
"Action": "Hangup",
"Channel": channel,
"Cause": str(cause)
})
success = response.get("Response") == "Success"
logger.info(
"Call hangup requested",
channel=channel,
success=success
)
return success
except Exception as e:
logger.error("Failed to hangup call", error=str(e))
return False
async def play_audio(
self,
channel: str,
audio_file: str,
interrupt_dtmf: bool = True
) -> bool:
"""
Play audio file to channel.
Args:
channel: Channel to play audio to
audio_file: Path to audio file
interrupt_dtmf: Allow DTMF to interrupt playback
Returns:
True if playback started
"""
if not self.connected:
raise RuntimeError("Not connected to Asterisk")
try:
response = await self.ami.send_action({
"Action": "Playback",
"Channel": channel,
"Filename": audio_file,
"Interrupt": "yes" if interrupt_dtmf else "no"
})
success = response.get("Response") == "Success"
logger.info(
"Audio playback started",
channel=channel,
file=audio_file,
success=success
)
return success
except Exception as e:
logger.error("Failed to play audio", error=str(e))
return False
async def get_channel_variable(
self,
channel: str,
variable: str
) -> Optional[str]:
"""
Get channel variable value.
Args:
channel: Channel name
variable: Variable name
Returns:
Variable value or None
"""
if not self.connected:
return None
try:
response = await self.ami.send_action({
"Action": "GetVar",
"Channel": channel,
"Variable": variable
})
if response.get("Response") == "Success":
return response.get("Value")
return None
except Exception as e:
logger.error("Failed to get channel variable", error=str(e))
return None
async def set_channel_variable(
self,
channel: str,
variable: str,
value: str
) -> bool:
"""
Set channel variable.
Args:
channel: Channel name
variable: Variable name
value: Variable value
Returns:
True if variable set successfully
"""
if not self.connected:
return False
try:
response = await self.ami.send_action({
"Action": "SetVar",
"Channel": channel,
"Variable": variable,
"Value": value
})
return response.get("Response") == "Success"
except Exception as e:
logger.error("Failed to set channel variable", error=str(e))
return False
def register_event_handler(
self,
event_type: str,
handler: Callable[[Dict[str, Any]], None]
):
"""
Register custom event handler.
Args:
event_type: AMI event type
handler: Async handler function
"""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler)
logger.debug(
"Event handler registered",
event_type=event_type,
handler=handler.__name__
)
async def execute_agi_command(
self,
channel: str,
command: str,
args: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Execute AGI command on channel.
Args:
channel: Channel name
command: AGI command
args: Command arguments
Returns:
Command result
"""
if not self.connected:
raise RuntimeError("Not connected to Asterisk")
command_line = command
if args:
command_line += " " + " ".join(args)
try:
response = await self.ami.send_action({
"Action": "AGI",
"Channel": channel,
"Command": command_line
})
return {
"success": response.get("Response") == "Success",
"result": response.get("Result", ""),
"data": response.get("ResultData", "")
}
except Exception as e:
logger.error("Failed to execute AGI command", error=str(e))
raise
def get_active_calls(self) -> List[CallInfo]:
"""Get list of active calls."""
return list(self.active_calls.values())
def get_call_by_caller_id(self, caller_id: str) -> Optional[CallInfo]:
"""Get call info by caller ID."""
for call in self.active_calls.values():
if call.caller_id == caller_id:
return call
return None
```
--------------------------------------------------------------------------------
/src/mcp_project_orchestrator/templates/template_manager.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Template Manager module for MCP Project Orchestrator.
This module manages the retrieval, selection, and application of project templates
and component templates. It loads templates from JSON files, allows selection
based on design patterns, applies templates by creating the project structure,
and generates basic documentation.
"""
import os
import json
import re
from typing import Any, Dict, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
@dataclass
class TemplateVersion:
"""Represents a template version with metadata."""
major: int
minor: int
patch: int
created_at: datetime
updated_at: datetime
@classmethod
def from_string(cls, version_str: str) -> 'TemplateVersion':
"""Create a TemplateVersion from a string like '1.2.3'."""
major, minor, patch = map(int, version_str.split('.'))
now = datetime.now()
return cls(major, minor, patch, now, now)
def __str__(self) -> str:
return f"{self.major}.{self.minor}.{self.patch}"
class TemplateManager:
"""
Manager for project and component templates.
Attributes:
templates_path: Optional path to the templates directory or file.
project_templates: List of project templates loaded from JSON.
component_templates: List of component templates loaded from JSON.
template_versions: Dictionary mapping template names to their versions.
template_inheritance: Dictionary tracking template inheritance relationships.
"""
def __init__(self, templates_path: Optional[str] = None) -> None:
"""
Initialize the TemplateManager.
Args:
templates_path: Optional path to templates. If not provided, defaults to
reading 'project_templates.json' and 'component_templates.json'
from the current working directory.
"""
self.templates_path = templates_path
self.template_versions: Dict[str, TemplateVersion] = {}
self.template_inheritance: Dict[str, List[str]] = {}
self.project_templates = self._load_templates("project_templates.json")
self.component_templates = self._load_templates("component_templates.json")
def _validate_template(self, template: Dict[str, Any]) -> Tuple[bool, str]:
"""
Validate a template's structure and content.
Args:
template: The template dictionary to validate.
Returns:
A tuple of (is_valid, error_message).
"""
required_fields = ["project_name", "version", "description", "components"]
# Check required fields
for field in required_fields:
if field not in template:
return False, f"Missing required field: {field}"
# Validate version format
version = template.get("version", "")
if not re.match(r'^\d+\.\d+\.\d+$', version):
return False, "Invalid version format. Expected: X.Y.Z"
# Validate components structure
components = template.get("components", [])
if not isinstance(components, list):
return False, "Components must be a list"
for comp in components:
if not isinstance(comp, dict) or "name" not in comp:
return False, "Each component must be a dictionary with at least a 'name' field"
return True, ""
def _load_templates(self, filename: str) -> List[Dict[str, Any]]:
"""
Load templates from the specified JSON file.
Args:
filename: The JSON file name to load templates from.
Returns:
A list of template dictionaries. If file not found or error occurs, returns an empty list.
"""
paths_to_try = [
self.templates_path if self.templates_path else filename,
os.path.join(os.getcwd(), filename),
os.path.join(os.getcwd(), "templates", filename),
os.path.join(Path.home(), ".mcp", "templates", filename)
]
for path in paths_to_try:
if os.path.exists(path):
try:
with open(path, "r") as f:
templates = json.load(f)
if not isinstance(templates, list):
continue
# Validate and process each template
valid_templates = []
for template in templates:
is_valid, error = self._validate_template(template)
if is_valid:
# Process version
name = template["project_name"]
version = template.get("version", "0.1.0")
self.template_versions[name] = TemplateVersion.from_string(version)
# Process inheritance
if "extends" in template:
parent = template["extends"]
if parent not in self.template_inheritance:
self.template_inheritance[parent] = []
self.template_inheritance[parent].append(name)
valid_templates.append(template)
return valid_templates
except (json.JSONDecodeError, OSError):
continue
return []
def get_template_version(self, template_name: str) -> Optional[TemplateVersion]:
"""
Get the version information for a template.
Args:
template_name: Name of the template.
Returns:
TemplateVersion object if found, None otherwise.
"""
return self.template_versions.get(template_name)
def get_derived_templates(self, template_name: str) -> List[str]:
"""
Get all templates that inherit from the specified template.
Args:
template_name: Name of the base template.
Returns:
List of template names that inherit from the specified template.
"""
return self.template_inheritance.get(template_name, [])
def get_project_templates(self) -> List[Dict[str, Any]]:
"""
Retrieve project templates.
Returns:
A list of project templates.
"""
return self.project_templates
def get_component_templates(self) -> List[Dict[str, Any]]:
"""
Retrieve component templates.
Returns:
A list of component templates.
"""
return self.component_templates
def _merge_templates(self, child: Dict[str, Any], parent: Dict[str, Any]) -> Dict[str, Any]:
"""
Merge a child template with its parent template.
Args:
child: The child template dictionary.
parent: The parent template dictionary.
Returns:
A new dictionary containing the merged template.
"""
merged = parent.copy()
# Merge basic fields
for field in ["project_name", "description", "version"]:
if field in child:
merged[field] = child[field]
# Merge keywords
merged["keywords"] = list(set(parent.get("keywords", []) + child.get("keywords", [])))
# Merge components with override support
parent_components = {comp["name"]: comp for comp in parent.get("components", [])}
child_components = {comp["name"]: comp for comp in child.get("components", [])}
# Start with parent components
final_components = parent_components.copy()
# Override or add child components
final_components.update(child_components)
merged["components"] = list(final_components.values())
return merged
def get_template_with_inheritance(self, template_name: str) -> Optional[Dict[str, Any]]:
"""
Get a template with all inherited properties merged.
Args:
template_name: Name of the template to retrieve.
Returns:
The merged template dictionary if found, None otherwise.
"""
template = next((t for t in self.project_templates if t["project_name"] == template_name), None)
if not template:
return None
# If template extends another, merge with parent
if "extends" in template:
parent_name = template["extends"]
parent = self.get_template_with_inheritance(parent_name) # Recursive call for nested inheritance
if parent:
template = self._merge_templates(template, parent)
return template
def reload_templates(self) -> None:
"""Reload all templates from disk."""
self.template_versions.clear()
self.template_inheritance.clear()
self.project_templates = self._load_templates("project_templates.json")
self.component_templates = self._load_templates("component_templates.json")
def watch_templates(self, callback: Optional[callable] = None) -> None:
"""
Start watching template files for changes.
Args:
callback: Optional function to call when templates are reloaded.
"""
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class TemplateHandler(FileSystemEventHandler):
def __init__(self, manager: 'TemplateManager', callback: Optional[callable]):
self.manager = manager
self.callback = callback
def on_modified(self, event):
if event.src_path.endswith(('.json')):
self.manager.reload_templates()
if self.callback:
self.callback()
paths_to_watch = [
os.getcwd(),
os.path.join(os.getcwd(), "templates"),
os.path.join(Path.home(), ".mcp", "templates")
]
observer = Observer()
handler = TemplateHandler(self, callback)
for path in paths_to_watch:
if os.path.exists(path):
observer.schedule(handler, path, recursive=False)
observer.start()
def select_template(self, description: str, patterns: List[str]) -> str:
"""
Select an appropriate template based on the project description and design patterns.
Args:
description: Project description.
patterns: List of identified design patterns.
Returns:
The name of the selected template. If no template matches, returns a default template name.
"""
# Enhanced template selection logic
best_match = None
max_score = -1
for template in self.project_templates:
score = 0
# Score based on keyword matches
keywords = template.get("keywords", [])
for pattern in patterns:
if pattern in keywords:
score += 2
# Score based on description similarity
template_desc = template.get("description", "").lower()
description = description.lower()
common_words = set(template_desc.split()) & set(description.split())
score += len(common_words)
# Check inheritance - templates that are more specialized (inherit from others) get a bonus
if "extends" in template:
score += 1
if score > max_score:
max_score = score
best_match = template
if best_match:
return best_match.get("project_name", "DefaultProject")
# Fallback to first template if available
if self.project_templates:
return self.project_templates[0].get("project_name", "DefaultProject")
return "DefaultProject"
def apply_template(self, template_name: str, project_name: str, description: str,
patterns: List[str], output_dir: str) -> Dict[str, Any]:
"""
Apply the selected template: create the project directory structure and placeholder files.
Args:
template_name: Name of the template to use.
project_name: Name of the new project.
description: Description of the project.
patterns: List of design patterns.
output_dir: Directory where the project will be created.
Returns:
A dictionary containing the project path and a success message; otherwise, error details.
"""
# Find the template by matching project_name
template = next((t for t in self.project_templates if t.get("project_name") == template_name), None)
if not template:
return {"error": f"Template '{template_name}' not found."}
project_path = os.path.join(output_dir, project_name)
if os.path.exists(project_path):
return {"error": f"Project '{project_name}' already exists."}
try:
# Create project structure directories
os.makedirs(os.path.join(project_path, "src", "components"), exist_ok=True)
os.makedirs(os.path.join(project_path, "src", "interfaces"), exist_ok=True)
os.makedirs(os.path.join(project_path, "src", "services"), exist_ok=True)
os.makedirs(os.path.join(project_path, "src", "utils"), exist_ok=True)
os.makedirs(os.path.join(project_path, "tests"), exist_ok=True)
os.makedirs(os.path.join(project_path, "docs"), exist_ok=True)
# Generate placeholder files for each component defined in the template
components = template.get("components", [])
for comp in components:
comp_name = comp.get("name", "Component")
# Create interface file
interface_path = os.path.join(project_path, "src", "interfaces", f"i_{comp_name.lower()}.py")
with open(interface_path, "w") as f:
f.write(f"# TODO: Define interface methods for {comp_name}\nclass I{comp_name}:\n pass\n")
# Create implementation file
impl_path = os.path.join(project_path, "src", "components", f"{comp_name.lower()}.py")
with open(impl_path, "w") as f:
f.write(f"# TODO: Implement {comp_name} logic\nclass {comp_name}:\n pass\n")
# Create service file (optional placeholder)
service_path = os.path.join(project_path, "src", "services", f"{comp_name.lower()}_service.py")
with open(service_path, "w") as f:
f.write(f"# TODO: Implement service logic for {comp_name}\n")
# Create a basic README file
readme_path = os.path.join(project_path, "README.md")
with open(readme_path, "w") as f:
f.write(f"# {project_name}\n\n{description}\n")
return {"project_path": project_path, "message": "Project created successfully."}
except Exception as e:
return {"error": str(e)}
def generate_documentation(self, project_path: str) -> str:
"""
Generate documentation for the project at the given path.
Args:
project_path: The path to the project directory.
Returns:
A string containing the generated documentation in Markdown format.
"""
# Generate a placeholder README documentation
doc = f"# Project Documentation\n\nProject path: {project_path}\n\n---\n\nThis documentation is auto-generated based on the project template."
return doc
```
--------------------------------------------------------------------------------
/data/prompts/templates/mcp-server-integration-template.json:
--------------------------------------------------------------------------------
```json
{
"id": "mcp-server-integration-template",
"name": "MCP Server Integration Guide",
"description": "A comprehensive template for planning, configuring, and integrating multiple MCP servers into a cohesive ecosystem",
"content": "# MCP Server Integration Guide\n\nI'll help you integrate multiple MCP servers to create a powerful AI context ecosystem for {{project_name}}. By combining specialized MCP servers, you can significantly enhance AI capabilities beyond what a single model can provide.\n\n## Project Requirements Analysis\n\n### Core Use Case\n\nYour primary use case for MCP server integration is:\n- **{{primary_use_case}}**\n\n### Key Requirements\n\nBased on your use case, we'll focus on these requirements:\n1. {{requirement_1}}\n2. {{requirement_2}}\n3. {{requirement_3}}\n\n## MCP Server Selection\n\nBased on your requirements, I recommend these MCP servers:\n\n### Core Infrastructure\n- **{{primary_mcp_server}}**: {{primary_server_description}}\n- **{{secondary_mcp_server}}**: {{secondary_server_description}}\n- **{{tertiary_mcp_server}}**: {{tertiary_server_description}}\n\n### Supporting Services\n- Additional servers to consider: {{additional_servers}}\n\n## Integration Architecture\n\n```mermaid\ngraph TD\n Client[AI Client] --> |Requests| Primary[{{primary_mcp_server}}]\n Primary --> |Data Flow| Secondary[{{secondary_mcp_server}}]\n Primary --> |Data Flow| Tertiary[{{tertiary_mcp_server}}]\n \n subgraph \"Core MCP Ecosystem\"\n Primary\n Secondary\n Tertiary\n end\n```\n\n## Configuration and Setup\n\n### Installation Steps\n\n1. **{{primary_mcp_server}}**:\n ```bash\n {{primary_installation_command}}\n ```\n\n2. **{{secondary_mcp_server}}**:\n ```bash\n {{secondary_installation_command}}\n ```\n\n3. **{{tertiary_mcp_server}}**:\n ```bash\n {{tertiary_installation_command}}\n ```\n\n### Claude Desktop Configuration\n\n```json\n{\n \"mcpServers\": {\n \"{{primary_mcp_server_id}}\": {\n \"command\": \"{{primary_command}}\",\n \"args\": [{{primary_args}}],\n \"env\": {\n {{primary_env_vars}}\n }\n },\n \"{{secondary_mcp_server_id}}\": {\n \"command\": \"{{secondary_command}}\",\n \"args\": [{{secondary_args}}],\n \"env\": {\n {{secondary_env_vars}}\n }\n },\n \"{{tertiary_mcp_server_id}}\": {\n \"command\": \"{{tertiary_command}}\",\n \"args\": [{{tertiary_args}}],\n \"env\": {\n {{tertiary_env_vars}}\n }\n }\n }\n}\n```\n\n### Docker Compose Integration\n\n```yaml\nversion: '3'\nservices:\n {{primary_mcp_server_id}}:\n image: {{primary_image}}\n environment:\n - {{primary_environment_1}}\n - {{primary_environment_2}}\n volumes:\n - {{primary_volume_mapping}}\n ports:\n - \"{{primary_port_mapping}}\"\n \n {{secondary_mcp_server_id}}:\n image: {{secondary_image}}\n environment:\n - {{secondary_environment_1}}\n - {{secondary_environment_2}}\n volumes:\n - {{secondary_volume_mapping}}\n ports:\n - \"{{secondary_port_mapping}}\"\n \n {{tertiary_mcp_server_id}}:\n image: {{tertiary_image}}\n environment:\n - {{tertiary_environment_1}}\n - {{tertiary_environment_2}}\n volumes:\n - {{tertiary_volume_mapping}}\n ports:\n - \"{{tertiary_port_mapping}}\"\n```\n\n## Integration Patterns\n\n### Data Flow\n\nFor your use case, I recommend the following data flow pattern:\n\n```\n{{data_flow_pattern}}\n```\n\n### Communication Model\n\nThe optimal communication model for your servers is:\n**{{communication_model}}**\n\nRationale: {{communication_rationale}}\n\n## Best Practices for Your Integration\n\n1. **Performance Optimization**: {{performance_recommendation}}\n2. **Security Considerations**: {{security_recommendation}}\n3. **Error Handling**: {{error_handling_recommendation}}\n4. **Testing Strategy**: {{testing_recommendation}}\n\n## MCP Server Interaction Examples\n\n### Example 1: {{example_scenario_1}}\n\n```javascript\n// Client-side code example\nuse_mcp_tool({\n server_name: \"{{primary_mcp_server_id}}\",\n tool_name: \"{{example_tool_1}}\",\n arguments: {\n {{example_args_1}}\n }\n});\n```\n\n### Example 2: {{example_scenario_2}}\n\n```javascript\n// Client-side code example\nuse_mcp_tool({\n server_name: \"{{secondary_mcp_server_id}}\",\n tool_name: \"{{example_tool_2}}\",\n arguments: {\n {{example_args_2}}\n }\n});\n```\n\n## Troubleshooting Guide\n\n| Problem | Possible Cause | Solution |\n|---------|----------------|----------|\n| {{problem_1}} | {{cause_1}} | {{solution_1}} |\n| {{problem_2}} | {{cause_2}} | {{solution_2}} |\n| {{problem_3}} | {{cause_3}} | {{solution_3}} |\n\n## Next Steps\n\n1. {{next_step_1}}\n2. {{next_step_2}}\n3. {{next_step_3}}\n\nWould you like me to elaborate on any specific aspect of this MCP server integration plan?",
"variables": [
"project_name",
"primary_use_case",
"requirement_1",
"requirement_2",
"requirement_3",
"primary_mcp_server",
"primary_server_description",
"secondary_mcp_server",
"secondary_server_description",
"tertiary_mcp_server",
"tertiary_server_description",
"additional_servers",
"primary_installation_command",
"secondary_installation_command",
"tertiary_installation_command",
"primary_mcp_server_id",
"primary_command",
"primary_args",
"primary_env_vars",
"secondary_mcp_server_id",
"secondary_command",
"secondary_args",
"secondary_env_vars",
"tertiary_mcp_server_id",
"tertiary_command",
"tertiary_args",
"tertiary_env_vars",
"primary_image",
"primary_environment_1",
"primary_environment_2",
"primary_volume_mapping",
"primary_port_mapping",
"secondary_image",
"secondary_environment_1",
"secondary_environment_2",
"secondary_volume_mapping",
"secondary_port_mapping",
"tertiary_image",
"tertiary_environment_1",
"tertiary_environment_2",
"tertiary_volume_mapping",
"tertiary_port_mapping",
"data_flow_pattern",
"communication_model",
"communication_rationale",
"performance_recommendation",
"security_recommendation",
"error_handling_recommendation",
"testing_recommendation",
"example_scenario_1",
"example_tool_1",
"example_args_1",
"example_scenario_2",
"example_tool_2",
"example_args_2",
"problem_1",
"cause_1",
"solution_1",
"problem_2",
"cause_2",
"solution_2",
"problem_3",
"cause_3",
"solution_3",
"next_step_1",
"next_step_2",
"next_step_3"
],
"examples": [
{
"name": "Development Environment Integration",
"values": {
"project_name": "AI-Enhanced Development Environment",
"primary_use_case": "Creating an integrated development environment that enhances coding, documentation, and testing with AI assistance",
"requirement_1": "Code repository analysis and exploration",
"requirement_2": "Database query and schema analysis",
"requirement_3": "Documentation generation and enhancement",
"primary_mcp_server": "github",
"primary_server_description": "Integrates with GitHub repositories to provide code context and exploration",
"secondary_mcp_server": "filesystem",
"secondary_server_description": "Provides access to local project files and configuration",
"tertiary_mcp_server": "postgres",
"tertiary_server_description": "Allows database exploration and SQL query execution",
"additional_servers": "prompts, sequential-thinking, memory",
"primary_installation_command": "npx -y @modelcontextprotocol/server-github",
"secondary_installation_command": "npx -y @modelcontextprotocol/server-filesystem /path/to/workspace",
"tertiary_installation_command": "npx -y @modelcontextprotocol/server-postgres postgresql://localhost/mydb",
"primary_mcp_server_id": "github",
"primary_command": "npx",
"primary_args": "\"-y\", \"@modelcontextprotocol/server-github\"",
"primary_env_vars": "\"GITHUB_PERSONAL_ACCESS_TOKEN\": \"your-token-here\"",
"secondary_mcp_server_id": "filesystem",
"secondary_command": "npx",
"secondary_args": "\"-y\", \"@modelcontextprotocol/server-filesystem\", \"/path/to/workspace\"",
"secondary_env_vars": "",
"tertiary_mcp_server_id": "postgres",
"tertiary_command": "npx",
"tertiary_args": "\"-y\", \"@modelcontextprotocol/server-postgres\", \"postgresql://localhost/mydb\"",
"tertiary_env_vars": "",
"primary_image": "node:alpine",
"primary_environment_1": "GITHUB_PERSONAL_ACCESS_TOKEN=your-token-here",
"primary_environment_2": "PORT=3001",
"primary_volume_mapping": "./data:/data",
"primary_port_mapping": "3001:3000",
"secondary_image": "node:alpine",
"secondary_environment_1": "PORT=3002",
"secondary_environment_2": "",
"secondary_volume_mapping": "./workspace:/workspace",
"secondary_port_mapping": "3002:3000",
"tertiary_image": "node:alpine",
"tertiary_environment_1": "PORT=3003",
"tertiary_environment_2": "",
"tertiary_volume_mapping": "./pgdata:/var/lib/postgresql/data",
"tertiary_port_mapping": "3003:3000",
"data_flow_pattern": "GitHub → Filesystem → Postgres → Client, with bidirectional flows as needed",
"communication_model": "Hub and Spoke with GitHub as the central hub",
"communication_rationale": "Centralizing around GitHub allows for repository-centric workflows, which matches most development scenarios",
"performance_recommendation": "Use volume mounting for filesystem paths to minimize container rebuild times during development",
"security_recommendation": "Utilize environment variables and Docker secrets for sensitive tokens and credentials",
"error_handling_recommendation": "Implement retries with exponential backoff for GitHub API requests to handle rate limiting",
"testing_recommendation": "Create a test suite with mock repositories to validate cross-server integration before production use",
"example_scenario_1": "Exploring a repository",
"example_tool_1": "list_repositories",
"example_args_1": "owner: \"username\", limit: 5",
"example_scenario_2": "Reading project files",
"example_tool_2": "read_directory",
"example_args_2": "path: \"/workspace/src\"",
"problem_1": "GitHub API rate limiting",
"cause_1": "Too many requests in a short time period",
"solution_1": "Implement caching and rate limiting in the client code",
"problem_2": "Permission denied for filesystem",
"cause_2": "Container user doesn't have access to mounted volumes",
"solution_2": "Check file permissions and user IDs in container",
"problem_3": "Database connection issues",
"cause_3": "Incorrect connection string or database not running",
"solution_3": "Verify database is running and connection parameters are correct",
"next_step_1": "Set up Docker Compose environment with the three core MCP servers",
"next_step_2": "Configure Claude Desktop to use these MCP servers",
"next_step_3": "Create sample prompts that utilize multiple servers for code exploration tasks"
}
},
{
"name": "Content Creation Ecosystem",
"values": {
"project_name": "AI-Powered Content Creation Suite",
"primary_use_case": "Building a sophisticated content creation system with research, drafting, and media generation capabilities",
"requirement_1": "Real-time web research and citation gathering",
"requirement_2": "Automated content generation with template support",
"requirement_3": "Text-to-speech conversion for audio content",
"primary_mcp_server": "brave-search",
"primary_server_description": "Provides up-to-date web search capabilities for research",
"secondary_mcp_server": "prompts",
"secondary_server_description": "Manages content templates and generation patterns",
"tertiary_mcp_server": "elevenlabs",
"tertiary_server_description": "Converts text to high-quality speech for podcasts or audio content",
"additional_servers": "memory, filesystem",
"primary_installation_command": "npx -y @modelcontextprotocol/server-brave-search",
"secondary_installation_command": "npx -y @sparesparrow/mcp-prompts",
"tertiary_installation_command": "uvx elevenlabs-mcp-server",
"primary_mcp_server_id": "brave-search",
"primary_command": "npx",
"primary_args": "\"-y\", \"@modelcontextprotocol/server-brave-search\"",
"primary_env_vars": "\"BRAVE_API_KEY\": \"your-brave-api-key\"",
"secondary_mcp_server_id": "prompts",
"secondary_command": "npx",
"secondary_args": "\"-y\", \"@sparesparrow/mcp-prompts\"",
"secondary_env_vars": "\"STORAGE_TYPE\": \"file\", \"PROMPTS_DIR\": \"/path/to/prompts\"",
"tertiary_mcp_server_id": "elevenlabs",
"tertiary_command": "uvx",
"tertiary_args": "\"elevenlabs-mcp-server\"",
"tertiary_env_vars": "\"ELEVENLABS_API_KEY\": \"your-elevenlabs-api-key\", \"ELEVENLABS_VOICE_ID\": \"preferred-voice-id\"",
"primary_image": "node:alpine",
"primary_environment_1": "BRAVE_API_KEY=your-brave-api-key",
"primary_environment_2": "PORT=3001",
"primary_volume_mapping": "./data:/data",
"primary_port_mapping": "3001:3000",
"secondary_image": "sparesparrow/mcp-prompts:latest",
"secondary_environment_1": "STORAGE_TYPE=file",
"secondary_environment_2": "PROMPTS_DIR=/app/data/prompts",
"secondary_volume_mapping": "./prompts:/app/data/prompts",
"secondary_port_mapping": "3002:3000",
"tertiary_image": "node:alpine",
"tertiary_environment_1": "ELEVENLABS_API_KEY=your-elevenlabs-api-key",
"tertiary_environment_2": "ELEVENLABS_VOICE_ID=preferred-voice-id",
"tertiary_volume_mapping": "./audio:/app/data/audio",
"tertiary_port_mapping": "3003:3000",
"data_flow_pattern": "Brave Search → Prompts → ElevenLabs → Client, with the option to store results in Memory or Filesystem",
"communication_model": "Pipeline Processing",
"communication_rationale": "Content creation naturally follows a linear workflow from research to drafting to audio production",
"performance_recommendation": "Cache search results from Brave Search to minimize API usage and improve response times",
"security_recommendation": "Store all API keys in environment variables and never expose them in generated content",
"error_handling_recommendation": "Implement fallback voices for ElevenLabs in case the primary voice is unavailable",
"testing_recommendation": "Create sample prompts that exercise the full pipeline from research to audio generation",
"example_scenario_1": "Researching a topic",
"example_tool_1": "search",
"example_args_1": "query: \"latest developments in AI assistants 2025\"",
"example_scenario_2": "Generating an article template",
"example_tool_2": "apply_template",
"example_args_2": "template_id: \"blog-article\", variables: {topic: \"AI advancements\", tone: \"educational\"}",
"problem_1": "Brave Search API limits exceeded",
"cause_1": "Too many searches in a short time period",
"solution_1": "Implement rate limiting and caching for search results",
"problem_2": "Missing prompts or templates",
"cause_2": "Incorrect path to prompts directory",
"solution_2": "Verify PROMPTS_DIR environment variable points to existing directory",
"problem_3": "ElevenLabs audio generation fails",
"cause_3": "Invalid API key or voice ID",
"solution_3": "Check API key validity and available voices through ElevenLabs dashboard",
"next_step_1": "Set up Docker Compose environment with all three MCP servers",
"next_step_2": "Create a set of content templates in the prompts server",
"next_step_3": "Develop a sample workflow that demonstrates research, content generation, and audio production"
}
}
],
"categories": ["integration", "multi-server", "configuration", "advanced", "docker"]
}
```
--------------------------------------------------------------------------------
/src/mcp_project_orchestrator/project_orchestration.py:
--------------------------------------------------------------------------------
```python
import os
import json
from typing import List, Dict, Optional
from dotenv import load_dotenv
# Import our own FastMCP implementation
from .fastmcp import FastMCP
# Load environment variables
load_dotenv()
# Import AWS MCP integration
try:
from .aws_mcp import register_aws_mcp_tools, AWSConfig
AWS_MCP_AVAILABLE = True
except ImportError:
AWS_MCP_AVAILABLE = False
print("Warning: AWS MCP integration not available. Install boto3 to enable AWS features.")
# Load MCP configuration from JSON file
CONFIG_FILE = 'project_orchestration.json'
with open(CONFIG_FILE, 'r') as config_file:
MCP_CONFIG = json.load(config_file)
# MCP configuration details (e.g., communication_protocol, mcp_compliance) are now available in MCP_CONFIG
# Directory for projects
PROJECTS_DIR = './projects'
os.makedirs(PROJECTS_DIR, exist_ok=True)
# Load project templates from JSON file
with open('project_templates.json', 'r') as f:
PROJECT_TEMPLATES = json.load(f)
# Comprehensive README template aligned with JSON requirements
README_TEMPLATE = """
# {{project_name}}
## Overview
{{project_name}} is designed to {{primary_purpose}} using {{design}} patterns, adhering to systematic approaches for maintainability and scalability.
## Architecture
### Design Patterns
{{design_patterns}}
### Software Architecture
{{software_architecture}}
### Components and Modules
{{components_section}}
### Relationships
{{relationships}}
### Interfaces
{{interfaces_section}}
### Communication Protocols
{{communication_protocols}}
### Technologies
{{technologies}}
### Dependencies
{{dependencies}}
### Commands
- **Installation**: `{{install_command}}`
- **Build**: `{{build_command}}`
- **Run**: `{{run_command}}`
- **Test**: `{{test_command}}`
## File Structure
{{project_structure}}
## Implementation Strategy
{{implementation_strategy}}
## Mermaid Diagrams
{{mermaid_diagrams}}
## Instructions for Composer Implementor Agent
{{instructions}}
"""
# Initialize MCP server
mcp = FastMCP("ProjectOrchestrator")
mcp.config = MCP_CONFIG # attach configuration to MCP server instance
'''
MCP Project Orchestrator Server
-------------------------------
This MCP server orchestrates the creation and configuration of new software projects.
It performs the following steps:
1. Extracts key design patterns and architecture concepts from user input.
2. Selects an appropriate project template from a standardized catalogue.
3. Applies the template by creating well-structured directories and placeholder files.
4. Generates comprehensive documentation including software architecture, components, process flows, and file structures.
The server configuration is loaded from 'project_orchestration.json', which defines overall settings such as communication protocols and compliance standards.
Developers can extend or modify this orchestration process by updating the template definitions or the configuration JSON.
'''
# Tool: Analyze design patterns and architecture
@mcp.tool()
def analyze_design_patterns(idea: str) -> Dict[str, List[str]]:
"""Analyze the user's idea to identify design patterns and architecture concepts."""
idea_lower = idea.lower()
patterns = []
architectures = []
keyword_map = {
"microservices": ("Microservices Architecture", "Distributed System"),
"event": ("Event-Driven Architecture", "Asynchronous Processing"),
"async": ("Event-Driven Architecture", "Asynchronous Processing"),
"data": ("Repository Pattern", "Layered Architecture"),
"repository": ("Repository Pattern", "Layered Architecture"),
"cqrs": ("CQRS", "Event Sourcing"),
"client": ("Client-Server", "Request-Response"),
"server": ("Client-Server", "Request-Response"),
"modular": ("Modular Monolith", "Monolithic Architecture"),
"serverless": ("Serverless Architecture", "Function-as-a-Service"),
"bridge": ("Bridge Pattern", "Abstraction Separation"),
"composite": ("Composite Pattern", "Tree Structure"),
"flyweight": ("Flyweight Pattern", "Memory Optimization"),
"strategy": ("Strategy Pattern", "Behavioral Flexibility"),
"template": ("Template Method Pattern", "Algorithm Skeleton"),
"visitor": ("Visitor Pattern", "Operation Separation")
}
for keyword, (pattern, arch) in keyword_map.items():
if keyword in idea_lower:
if pattern not in patterns:
patterns.append(pattern)
if arch not in architectures:
architectures.append(arch)
if not patterns:
patterns.append("Modular Monolith")
architectures.append("Monolithic Architecture")
return {"design_patterns": patterns, "architectures": architectures}
# Tool: Generate Mermaid diagrams (aligned with JSON's MermaidTool)
@mcp.tool()
def mermaid_tool(diagram_planning: str, template_name: Optional[str] = None) -> str:
"""Generate Mermaid diagrams for visualization based on planning."""
planning_lower = diagram_planning.lower()
if "architecture" in planning_lower:
if template_name and "Microservices" in template_name:
return (
"```mermaid\n"
"graph TD\n"
" A[API Gateway] --> B[UserService]\n"
" A --> C[OrderService]\n"
" B --> D[UserDB]\n"
" C --> E[OrderDB]\n"
" B --> F[MessageQueue]\n"
" C --> F\n"
"```\n"
)
elif template_name and "EventDriven" in template_name:
return (
"```mermaid\n"
"graph TD\n"
" A[EventProducer] --> B[EventBus]\n"
" B --> C[EventConsumer]\n"
" C --> D[EventStore]\n"
"```\n"
)
return (
"```mermaid\n"
"graph TD\n"
" A[CoreModule] --> B[Services]\n"
" B --> C[Utilities]\n"
" A --> D[Database]\n"
"```\n"
)
elif "file structure" in planning_lower:
if template_name:
template = next((t for t in PROJECT_TEMPLATES if t["project_name"] == template_name), None)
if template:
components = "\n".join([f" E --> F{i+1}[{c['name']}]" for i, c in enumerate(template["components"])])
return (
"```mermaid\n"
"graph TD\n"
" A[ProjectRoot] --> B[src]\n"
" A --> C[tests]\n"
" A --> D[docs]\n"
" B --> E[components]\n"
f"{components}\n"
" B --> G[interfaces]\n"
" B --> H[services]\n"
" B --> I[utils]\n"
"```\n"
)
return (
"```mermaid\n"
"graph TD\n"
" A[ProjectRoot] --> B[src]\n"
" A --> C[tests]\n"
" A --> D[docs]\n"
" B --> E[components]\n"
" B --> F[interfaces]\n"
" B --> G[services]\n"
" B --> H[utils]\n"
"```\n"
)
elif "process flow" in planning_lower:
return (
"```mermaid\n"
"sequenceDiagram\n"
" participant U as User\n"
" participant S as System\n"
" U->>S: Initiate Action\n"
" S-->>U: Process Result\n"
"```\n"
)
return "```mermaid\n%% Placeholder diagram\n```"
# Tool: Apply project template
@mcp.tool()
def apply_project_template(template_name: str, project_name: str, user_idea: str, design_info: Dict[str, List[str]]) -> str:
"""Apply a template and create comprehensive documentation."""
template = next((t for t in PROJECT_TEMPLATES if t["project_name"] == template_name), None)
if not template:
return f"Error: Template '{template_name}' not found."
project_path = os.path.join(PROJECTS_DIR, project_name)
if os.path.exists(project_path):
return f"Error: Project '{project_name}' already exists."
# Step 5: Prepare detailed file structure
os.makedirs(os.path.join(project_path, "src", "components"), exist_ok=True)
os.makedirs(os.path.join(project_path, "src", "interfaces"), exist_ok=True)
os.makedirs(os.path.join(project_path, "src", "services"), exist_ok=True)
os.makedirs(os.path.join(project_path, "src", "utils"), exist_ok=True)
os.makedirs(os.path.join(project_path, "tests"), exist_ok=True)
os.makedirs(os.path.join(project_path, "docs"), exist_ok=True)
# Generate component files with consistent names and TODOs
components_section = ""
interfaces_section = ""
relationships = ""
communication_protocols = "REST API, Message Queues" if "Microservices" in template_name else "Internal Function Calls"
for i, component in enumerate(template["components"]):
name = component["name"]
# Interface
interface_file = f"i_{name.lower()}.py"
with open(os.path.join(project_path, "src", "interfaces", interface_file), "w") as f:
f.write(f"# TODO: Define interface methods for {name}\nclass I{name}:\n pass\n")
# Implementation
impl_file = f"{name.lower()}.py"
with open(os.path.join(project_path, "src", "components", impl_file), "w") as f:
f.write(f"# TODO: Implement {name} logic\nclass {name}:\n pass\n")
# Service (if applicable)
service_file = f"{name.lower()}_service.py"
with open(os.path.join(project_path, "src", "services", service_file), "w") as f:
f.write(f"# TODO: Implement service logic for {name}\n")
# Test
test_file = f"test_{name.lower()}.py"
with open(os.path.join(project_path, "tests", test_file), "w") as f:
f.write(f"# TODO: Write unit tests for {name}\n")
components_section += (
f"- **{name}**: {component.get('description', 'TBD')}\n"
f" - Interface: [{interface_file}](./src/interfaces/{interface_file})\n"
f" - Implementation: [{impl_file}](./src/components/{impl_file})\n"
f" - Service: [{service_file}](./src/services/{service_file})\n"
f" - Tests: [{test_file}](./tests/{test_file})\n"
)
interfaces_section += f"class I{name}:\n # TODO: Define {name} methods\n pass\n\n"
if i > 0:
relationships += f"- {template['components'][i-1]['name']} interacts with {name} via {communication_protocols}\n"
# Step 4: Comprehensive documentation
design_patterns = "- " + "\n- ".join(design_info["design_patterns"])
software_architecture = "- " + "\n- ".join(design_info["architectures"])
technologies = "Python, Flask, Docker, Kafka" if "Microservices" in template_name else "Python, Django"
dependencies = "requests, pytest, docker, confluent-kafka" if "Microservices" in template_name else "django, pytest"
install_command = "pip install -r requirements.txt"
build_command = "docker build ." if "Microservices" in template_name else "python manage.py migrate"
run_command = "docker-compose up" if "Microservices" in template_name else "python manage.py runserver"
test_command = "pytest"
# File structure visualization
project_structure = mermaid_tool("file structure", template_name)
# Step 6: Implementation strategy
impl_order = "\n".join([f"{i+1}. src/components/{c['name'].lower()}.py" for i, c in enumerate(template["components"])])
implementation_strategy = (
f"### File Implementation Order\n{impl_order}\n"
"### Testing Strategies\n- Unit Tests: Use pytest for component-level testing.\n- Integration Tests: Verify inter-component interactions.\n"
f"### Build and Deployment\n- Build: `{build_command}`\n- Deploy: Use Docker containers or a cloud platform like AWS.\n"
)
# Mermaid diagrams
mermaid_diagrams = (
f"### Architecture Diagram\n{mermaid_tool('architecture', template_name)}\n"
f"### File Structure\n{project_structure}\n"
f"### Process Flow\n{mermaid_tool('process flow', template_name)}"
)
# Instructions for the composer implementor agent
instructions = (
"1. Refine the generated documentation in README.md.\n"
"2. Implement components starting with core logic in src/components/.\n"
"3. Use mermaid_tool for additional visualizations (e.g., `mermaid_tool 'detailed process flow'`).\n"
"4. Follow the implementation strategy and test using provided commands."
)
# Substitutions for README
substitutions = {
"project_name": project_name,
"design": ", ".join(design_info["design_patterns"]),
"primary_purpose": template["description"].split(".")[0],
"design_patterns": design_patterns,
"software_architecture": software_architecture,
"components_section": components_section,
"relationships": relationships if relationships else "TBD - Define inter-component relationships",
"interfaces_section": interfaces_section,
"communication_protocols": communication_protocols,
"technologies": technologies,
"dependencies": dependencies,
"install_command": install_command,
"build_command": build_command,
"run_command": run_command,
"test_command": test_command,
"project_structure": project_structure,
"implementation_strategy": implementation_strategy,
"mermaid_diagrams": mermaid_diagrams,
"instructions": instructions
}
# Generate README
readme_content = README_TEMPLATE
for key, value in substitutions.items():
readme_content = readme_content.replace("{{" + key + "}}", value)
with open(os.path.join(project_path, "README.md"), "w") as f:
f.write(readme_content)
return f"Project '{project_name}' created successfully at '{project_path}'."
# Helper: Select template
def select_template(idea: str, design_info: Dict[str, List[str]]) -> str:
"""Select a project template based on design patterns and architectures."""
idea_lower = idea.lower()
patterns = design_info["design_patterns"]
template_map = {
"Microservices Architecture": "MicroservicesArchitectureProject",
"Event-Driven Architecture": "EventDrivenArchitectureProject",
"Repository Pattern": "RepositoryPatternProject",
"CQRS": "CQRSProject",
"Client-Server": "ClientServerProject",
"Modular Monolith": "ModularMonolithProject",
"Serverless Architecture": "ServerlessFunctionProject",
"Bridge Pattern": "BridgeProject",
"Composite Pattern": "CompositeProject",
"Flyweight Pattern": "FlyweightProject",
"Strategy Pattern": "StrategyProject",
"Template Method Pattern": "TemplateMethodProject",
"Visitor Pattern": "VisitorProject"
}
for pattern in patterns:
if pattern in template_map:
return template_map[pattern]
return "ModularMonolithProject" # Default
# Tool: Orchestrate project setup
@mcp.tool()
def orchestrate_new_project(user_idea: str) -> str:
"""Orchestrate the setup of a new software project from the user's idea."""
# Step 1: Information Extraction
design_info = analyze_design_patterns(user_idea)
# Step 2: Design Patterns & Architecture Identification (handled by analyze_design_patterns)
# Step 3: Project Template Application
template_name = select_template(user_idea, design_info)
project_name = user_idea.lower().replace(" ", "_")[:20]
# Steps 4-6: Apply template, generate documentation, prepare file structure, and define strategy
result = apply_project_template(template_name, project_name, user_idea, design_info)
if "Error" in result:
return result
return (
f"Project '{project_name}' has been initialized with template '{template_name}'.\n"
f"Design Patterns Identified: {', '.join(design_info['design_patterns'])}\n"
f"Architecture Concepts: {', '.join(design_info['architectures'])}\n"
"Next Steps: Review the generated README.md at '{project_path}/README.md' for detailed documentation and instructions."
)
# Register AWS MCP tools if available
if AWS_MCP_AVAILABLE and os.getenv("AWS_REGION"):
try:
register_aws_mcp_tools(mcp)
print("AWS MCP tools registered successfully")
except Exception as e:
print(f"Warning: Failed to register AWS MCP tools: {e}")
# Run the server
if __name__ == "__main__":
mcp.run()
```
--------------------------------------------------------------------------------
/printcast-agent/src/mcp_server/server.py:
--------------------------------------------------------------------------------
```python
"""
Main MCP Server implementation for PrintCast Agent.
This server orchestrates voice-to-print workflows, integrating multiple services:
- Asterisk SIP for telephony
- ElevenLabs for conversational AI
- GitHub/RSS for content sourcing
- CUPS for printing
- Delivery services for shipping
"""
import asyncio
import json
import logging
from typing import Any, Dict, List, Optional, Sequence
from datetime import datetime
from fastmcp import FastMCP
from pydantic import BaseModel, Field
import structlog
from ..integrations.asterisk import AsteriskManager
from ..integrations.elevenlabs import ElevenLabsAgent
from ..integrations.content import ContentFetcher
from ..integrations.printing import PrintManager
from ..integrations.delivery import DeliveryService
from ..orchestration.workflow import WorkflowOrchestrator
from ..utils.monitoring import MetricsCollector
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
logger = structlog.get_logger(__name__)
class CallSession(BaseModel):
"""Represents an active call session."""
session_id: str
caller_id: str
start_time: datetime
selected_items: List[str] = Field(default_factory=list)
delivery_address: Optional[str] = None
status: str = "active"
metadata: Dict[str, Any] = Field(default_factory=dict)
class PrintCastMCPServer:
"""
Main MCP server for PrintCast Agent system.
Provides tools and resources for:
- Handling incoming calls through Asterisk
- Managing AI voice conversations
- Fetching and presenting content
- Processing print jobs
- Arranging delivery
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
Initialize the PrintCast MCP Server.
Args:
config: Configuration dictionary for all services
"""
self.config = config or {}
self.app = FastMCP("PrintCast Agent")
self.sessions: Dict[str, CallSession] = {}
# Initialize service managers
self.asterisk = AsteriskManager(self.config.get("asterisk", {}))
self.elevenlabs = ElevenLabsAgent(self.config.get("elevenlabs", {}))
self.content = ContentFetcher(self.config.get("content", {}))
self.printer = PrintManager(self.config.get("printing", {}))
self.delivery = DeliveryService(self.config.get("delivery", {}))
self.orchestrator = WorkflowOrchestrator(
asterisk=self.asterisk,
elevenlabs=self.elevenlabs,
content=self.content,
printer=self.printer,
delivery=self.delivery
)
self.metrics = MetricsCollector()
# Register MCP tools
self._register_tools()
# Register MCP resources
self._register_resources()
logger.info("PrintCast MCP Server initialized", config=self.config)
def _register_tools(self):
"""Register all MCP tools."""
@self.app.tool()
async def handle_incoming_call(
caller_id: str,
dtmf_code: Optional[str] = None,
language: str = "cs"
) -> Dict[str, Any]:
"""
Handle an incoming call and initiate the voice workflow.
Args:
caller_id: The phone number of the caller
dtmf_code: Optional DTMF code entered by caller
language: Language preference (cs=Czech, en=English)
Returns:
Session information and next steps
"""
try:
# Create new session
session = CallSession(
session_id=f"call_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{caller_id}",
caller_id=caller_id,
start_time=datetime.now(),
metadata={"language": language, "dtmf_code": dtmf_code}
)
self.sessions[session.session_id] = session
# Start voice agent
agent_response = await self.elevenlabs.start_conversation(
session_id=session.session_id,
language=language
)
# Get initial content options
content_options = await self.content.get_available_content()
logger.info(
"Call session started",
session_id=session.session_id,
caller_id=caller_id
)
return {
"session_id": session.session_id,
"status": "connected",
"agent_ready": agent_response.get("ready", False),
"content_options": content_options,
"message": f"Welcome! Session {session.session_id} started."
}
except Exception as e:
logger.error("Failed to handle incoming call", error=str(e))
raise
@self.app.tool()
async def fetch_trending_content(
content_type: str = "github",
limit: int = 5,
language: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Fetch trending content from various sources.
Args:
content_type: Type of content (github, rss, news)
limit: Maximum number of items to fetch
language: Optional language filter
Returns:
List of trending content items
"""
try:
if content_type == "github":
items = await self.content.fetch_github_trending(
limit=limit,
language=language
)
elif content_type == "rss":
items = await self.content.fetch_rss_feeds(limit=limit)
elif content_type == "news":
items = await self.content.fetch_news(limit=limit)
else:
raise ValueError(f"Unknown content type: {content_type}")
logger.info(
"Fetched trending content",
type=content_type,
count=len(items)
)
return items
except Exception as e:
logger.error("Failed to fetch content", error=str(e))
raise
@self.app.tool()
async def process_user_selection(
session_id: str,
selected_items: List[str],
delivery_address: str,
delivery_method: str = "post"
) -> Dict[str, Any]:
"""
Process user's content selection and initiate print/delivery.
Args:
session_id: Active session ID
selected_items: List of selected item IDs
delivery_address: Delivery address
delivery_method: Delivery method (post, courier)
Returns:
Order confirmation and tracking information
"""
try:
session = self.sessions.get(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
# Update session
session.selected_items = selected_items
session.delivery_address = delivery_address
# Orchestrate the workflow
result = await self.orchestrator.process_order(
session_id=session_id,
selected_items=selected_items,
delivery_address=delivery_address,
delivery_method=delivery_method
)
# Update metrics
await self.metrics.record_order(session_id, len(selected_items))
logger.info(
"Order processed",
session_id=session_id,
items_count=len(selected_items),
tracking_id=result.get("tracking_id")
)
return result
except Exception as e:
logger.error("Failed to process selection", error=str(e))
raise
@self.app.tool()
async def generate_print_preview(
session_id: str,
selected_items: List[str],
format: str = "pdf"
) -> Dict[str, Any]:
"""
Generate a print preview for selected items.
Args:
session_id: Active session ID
selected_items: List of selected item IDs
format: Output format (pdf, html)
Returns:
Preview file path and metadata
"""
try:
# Generate preview document
preview = await self.printer.generate_preview(
items=selected_items,
format=format
)
logger.info(
"Print preview generated",
session_id=session_id,
format=format
)
return {
"preview_url": preview["url"],
"page_count": preview["pages"],
"file_size": preview["size"],
"format": format
}
except Exception as e:
logger.error("Failed to generate preview", error=str(e))
raise
@self.app.tool()
async def get_delivery_quote(
delivery_address: str,
delivery_method: str = "post",
weight_grams: int = 100
) -> Dict[str, Any]:
"""
Get delivery cost estimate.
Args:
delivery_address: Delivery address
delivery_method: Delivery method
weight_grams: Estimated weight in grams
Returns:
Delivery quote with pricing and timing
"""
try:
quote = await self.delivery.get_quote(
address=delivery_address,
method=delivery_method,
weight=weight_grams
)
return {
"price": quote["price"],
"currency": quote["currency"],
"estimated_delivery": quote["estimated_delivery"],
"carrier": quote["carrier"]
}
except Exception as e:
logger.error("Failed to get delivery quote", error=str(e))
raise
@self.app.tool()
async def end_call_session(
session_id: str,
reason: str = "completed"
) -> Dict[str, Any]:
"""
End an active call session.
Args:
session_id: Session to end
reason: Reason for ending (completed, cancelled, error)
Returns:
Session summary
"""
try:
session = self.sessions.get(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
# Update session status
session.status = reason
# Stop voice agent
await self.elevenlabs.end_conversation(session_id)
# Generate session summary
duration = (datetime.now() - session.start_time).total_seconds()
summary = {
"session_id": session_id,
"duration_seconds": duration,
"items_selected": len(session.selected_items),
"status": reason,
"caller_id": session.caller_id
}
# Clean up session after delay
asyncio.create_task(self._cleanup_session(session_id))
logger.info("Call session ended", **summary)
return summary
except Exception as e:
logger.error("Failed to end session", error=str(e))
raise
def _register_resources(self):
"""Register MCP resources for monitoring and configuration."""
@self.app.resource("resource://sessions/active")
async def get_active_sessions() -> str:
"""Get list of active call sessions."""
active = [
{
"session_id": s.session_id,
"caller_id": s.caller_id,
"start_time": s.start_time.isoformat(),
"status": s.status,
"items_selected": len(s.selected_items)
}
for s in self.sessions.values()
if s.status == "active"
]
return json.dumps(active, indent=2)
@self.app.resource("resource://config/services")
async def get_service_config() -> str:
"""Get current service configuration."""
config = {
"asterisk": {
"enabled": self.asterisk.is_connected(),
"host": self.config.get("asterisk", {}).get("host", "localhost")
},
"elevenlabs": {
"enabled": self.elevenlabs.is_configured(),
"model": self.config.get("elevenlabs", {}).get("model", "eleven_multilingual_v2")
},
"printing": {
"enabled": self.printer.is_available(),
"printer": self.config.get("printing", {}).get("default_printer", "default")
},
"delivery": {
"enabled": self.delivery.is_configured(),
"carriers": self.config.get("delivery", {}).get("carriers", [])
}
}
return json.dumps(config, indent=2)
@self.app.resource("resource://metrics/daily")
async def get_daily_metrics() -> str:
"""Get daily usage metrics."""
metrics = await self.metrics.get_daily_stats()
return json.dumps(metrics, indent=2)
async def _cleanup_session(self, session_id: str, delay: int = 300):
"""
Clean up session data after delay.
Args:
session_id: Session to clean up
delay: Delay in seconds before cleanup
"""
await asyncio.sleep(delay)
if session_id in self.sessions:
del self.sessions[session_id]
logger.info("Session cleaned up", session_id=session_id)
async def start(self):
"""Start the MCP server and all services."""
try:
# Initialize all services
await self.asterisk.connect()
await self.elevenlabs.initialize()
await self.printer.initialize()
await self.delivery.initialize()
# Start metrics collection
asyncio.create_task(self.metrics.start_collection())
# Start the MCP server
logger.info("Starting PrintCast MCP Server")
await self.app.run()
except Exception as e:
logger.error("Failed to start server", error=str(e))
raise
async def stop(self):
"""Stop the MCP server and cleanup."""
try:
# End all active sessions
for session_id in list(self.sessions.keys()):
await self.end_call_session(session_id, reason="shutdown")
# Disconnect services
await self.asterisk.disconnect()
await self.elevenlabs.shutdown()
await self.printer.shutdown()
await self.delivery.shutdown()
logger.info("PrintCast MCP Server stopped")
except Exception as e:
logger.error("Error during shutdown", error=str(e))
raise
```
--------------------------------------------------------------------------------
/mcp-project-orchestrator/openssl/tests/test_template_validation.py:
--------------------------------------------------------------------------------
```python
"""
Tests for template rendering and JSON schema validation.
This module contains tests for Jinja2 template rendering and
JSON configuration schema validation.
"""
import pytest
import json
import tempfile
import shutil
from pathlib import Path
from unittest.mock import Mock, patch
from mcp_orchestrator.cursor_deployer import CursorConfigDeployer
from mcp_orchestrator.yaml_validator import YAMLFrontmatterValidator
from mcp_orchestrator.env_config import EnvironmentConfig
class TestTemplateRendering:
"""Test cases for Jinja2 template rendering."""
def setup_method(self):
"""Set up test fixtures."""
self.temp_dir = tempfile.mkdtemp()
self.repo_root = Path(self.temp_dir) / "test_repo"
self.package_root = Path(self.temp_dir) / "test_package"
# Create test repository
self.repo_root.mkdir(parents=True)
# Create test package structure
self.package_root.mkdir(parents=True)
(self.package_root / "cursor-rules" / "rules").mkdir(parents=True)
(self.package_root / "cursor-rules" / "prompts").mkdir(parents=True)
# Create test templates
self._create_test_templates()
# Create deployer
self.deployer = CursorConfigDeployer(self.repo_root, self.package_root)
def teardown_method(self):
"""Clean up test fixtures."""
shutil.rmtree(self.temp_dir)
def _create_test_templates(self):
"""Create test template files."""
# Create shared rule template
shared_template = self.package_root / "cursor-rules" / "rules" / "shared.mdc.jinja2"
shared_template.write_text("""---
title: {{ title }}
description: {{ description }}
created: {{ timestamp }}
platform: {{ platform }}
user: {{ user }}
---
# {{ title }}
This is a test shared rule template.
Platform: {{ os }}
User: {{ user }}
Python: {{ python_version }}
""")
# Create MCP config template
mcp_template = self.package_root / "cursor-rules" / "mcp.json.jinja2"
mcp_template.write_text("""{
"mcpServers": {
"test-server": {
"command": "{% if os == 'windows' %}npx.cmd{% else %}npx{% endif %}",
"args": ["-y", "@test/server"],
"env": {
"PLATFORM": "{{ os }}",
"USER": "{{ user }}",
"HOME": "{{ home }}",
"CI": {{ is_ci | lower }}
}
}
},
"platform": {
"os": "{{ os }}",
"architecture": "{{ architecture }}",
"pythonVersion": "{{ python_version }}"
}
}
""")
def test_template_rendering_basic(self):
"""Test basic template rendering."""
platform_info = self.deployer.detect_platform()
# Test shared rule template
content = self.deployer._render_template_content(
"rules/shared.mdc.jinja2",
platform_info
)
assert "title: Shared Rules" in content
assert f"Platform: {platform_info['os']}" in content
assert f"User: {platform_info['user']}" in content
assert f"Python: {platform_info['python_version']}" in content
def test_template_rendering_with_custom_variables(self):
"""Test template rendering with custom variables."""
platform_info = self.deployer.detect_platform()
platform_info.update({
"title": "Custom Test Rules",
"description": "Custom test description",
"platform": "test"
})
content = self.deployer._render_template_content(
"rules/shared.mdc.jinja2",
platform_info
)
assert "title: Custom Test Rules" in content
assert "description: Custom test description" in content
assert "platform: test" in content
def test_mcp_config_rendering(self):
"""Test MCP configuration template rendering."""
platform_info = self.deployer.detect_platform()
content = self.deployer._render_template_content(
"mcp.json.jinja2",
platform_info
)
# Parse as JSON to validate
config = json.loads(content)
assert "mcpServers" in config
assert "test-server" in config["mcpServers"]
assert "platform" in config
# Check platform-specific command
expected_command = "npx.cmd" if platform_info["os"] == "windows" else "npx"
assert config["mcpServers"]["test-server"]["command"] == expected_command
# Check environment variables
env = config["mcpServers"]["test-server"]["env"]
assert env["PLATFORM"] == platform_info["os"]
assert env["USER"] == platform_info["user"]
assert env["HOME"] == platform_info["home"]
assert env["CI"] == platform_info["is_ci"]
def test_template_rendering_error_handling(self):
"""Test template rendering error handling."""
# Create invalid template
invalid_template = self.package_root / "cursor-rules" / "invalid.jinja2"
invalid_template.write_text("{{ invalid_variable_that_does_not_exist }}")
with pytest.raises(Exception):
self.deployer._render_template_content(
"invalid.jinja2",
{"os": "linux"}
)
def test_template_rendering_with_filters(self):
"""Test template rendering with Jinja2 filters."""
platform_info = self.deployer.detect_platform()
# Test boolean filter
content = self.deployer._render_template_content(
"mcp.json.jinja2",
platform_info
)
config = json.loads(content)
env = config["mcpServers"]["test-server"]["env"]
assert isinstance(env["CI"], bool)
assert env["CI"] == platform_info["is_ci"]
class TestJSONSchemaValidation:
"""Test cases for JSON configuration schema validation."""
def setup_method(self):
"""Set up test fixtures."""
self.temp_dir = tempfile.mkdtemp()
self.repo_root = Path(self.temp_dir) / "test_repo"
self.package_root = Path(self.temp_dir) / "test_package"
# Create test repository
self.repo_root.mkdir(parents=True)
# Create test package structure
self.package_root.mkdir(parents=True)
(self.package_root / "cursor-rules").mkdir(parents=True)
# Create test MCP template
self._create_mcp_template()
# Create deployer
self.deployer = CursorConfigDeployer(self.repo_root, self.package_root)
def teardown_method(self):
"""Clean up test fixtures."""
shutil.rmtree(self.temp_dir)
def _create_mcp_template(self):
"""Create MCP configuration template."""
mcp_template = self.package_root / "cursor-rules" / "mcp.json.jinja2"
mcp_template.write_text("""{
"mcpServers": {
"openssl-context": {
"command": "{% if os == 'windows' %}npx.cmd{% else %}npx{% endif %}",
"args": ["-y", "@sparesparrow/mcp-openssl-context"],
"env": {
"OPENSSL_PROJECT_ROOT": "{{ repo_root }}",
"CONAN_USER_HOME": "{{ home }}/.conan2",
"PLATFORM": "{{ os }}",
"ARCHITECTURE": "{{ architecture }}",
"PYTHON_VERSION": "{{ python_version }}",
"USER": "{{ user }}"
}
},
"build-intelligence": {
"command": "{% if os == 'windows' %}npx.cmd{% else %}npx{% endif %}",
"args": ["-y", "@sparesparrow/mcp-build-intelligence"],
"env": {
"OPENSSL_PROJECT_ROOT": "{{ repo_root }}",
"PLATFORM": "{{ os }}",
"ARCHITECTURE": "{{ architecture }}",
"BUILD_TYPE": "{% if is_ci %}release{% else %}debug{% endif %}",
"CONAN_USER_HOME": "{{ home }}/.conan2"
}
}
},
"globalShortcut": "Ctrl+Shift+.",
"logging": {
"level": "{% if is_ci %}error{% else %}info{% endif %}",
"file": "{{ repo_root }}/.cursor/cursor.log",
"maxSize": "10MB",
"maxFiles": 5
},
"features": {
"autoComplete": true,
"syntaxHighlighting": true,
"errorChecking": true,
"codeFormatting": true,
"intelligentSuggestions": true
},
"platform": {
"os": "{{ os }}",
"architecture": "{{ architecture }}",
"pythonVersion": "{{ python_version }}",
"user": "{{ user }}",
"home": "{{ home }}",
"ciEnvironment": {{ is_ci }},
"timestamp": "{{ timestamp }}"
}
}
""")
def test_mcp_config_schema_validation(self):
"""Test MCP configuration JSON schema validation."""
platform_info = self.deployer.detect_platform()
platform_info["repo_root"] = str(self.repo_root)
# Render template
content = self.deployer._render_template_content(
"mcp.json.jinja2",
platform_info
)
# Parse as JSON
config = json.loads(content)
# Validate schema
self._validate_mcp_config_schema(config)
def _validate_mcp_config_schema(self, config: dict):
"""Validate MCP configuration schema."""
# Check required top-level fields
required_fields = ["mcpServers", "globalShortcut", "logging", "features", "platform"]
for field in required_fields:
assert field in config, f"Missing required field: {field}"
# Validate mcpServers
mcp_servers = config["mcpServers"]
assert isinstance(mcp_servers, dict), "mcpServers must be a dictionary"
for server_name, server_config in mcp_servers.items():
assert isinstance(server_config, dict), f"Server {server_name} config must be a dictionary"
# Check required server fields
required_server_fields = ["command", "args", "env"]
for field in required_server_fields:
assert field in server_config, f"Server {server_name} missing required field: {field}"
# Validate command
assert isinstance(server_config["command"], str), f"Server {server_name} command must be a string"
assert server_config["command"] in ["npx", "npx.cmd"], f"Server {server_name} has invalid command"
# Validate args
assert isinstance(server_config["args"], list), f"Server {server_name} args must be a list"
# Validate env
assert isinstance(server_config["env"], dict), f"Server {server_name} env must be a dictionary"
# Validate logging
logging = config["logging"]
assert "level" in logging, "Logging missing level field"
assert logging["level"] in ["error", "info", "debug", "warn"], "Invalid logging level"
# Validate features
features = config["features"]
boolean_features = ["autoComplete", "syntaxHighlighting", "errorChecking", "codeFormatting", "intelligentSuggestions"]
for feature in boolean_features:
assert feature in features, f"Missing feature: {feature}"
assert isinstance(features[feature], bool), f"Feature {feature} must be boolean"
# Validate platform
platform = config["platform"]
required_platform_fields = ["os", "architecture", "pythonVersion", "user", "home", "ciEnvironment", "timestamp"]
for field in required_platform_fields:
assert field in platform, f"Platform missing required field: {field}"
def test_mcp_config_rendering_consistency(self):
"""Test that MCP configuration rendering is consistent across platforms."""
platforms = [
{"os": "linux", "is_ci": False},
{"os": "macos", "is_ci": False},
{"os": "windows", "is_ci": False},
{"os": "linux", "is_ci": True},
]
for platform_info in platforms:
platform_info.update({
"architecture": "x86_64",
"python_version": "3.9.0",
"user": "testuser",
"home": "/home/testuser",
"timestamp": "2024-01-01T00:00:00",
"repo_root": str(self.repo_root)
})
# Render template
content = self.deployer._render_template_content(
"mcp.json.jinja2",
platform_info
)
# Parse as JSON
config = json.loads(content)
# Validate schema
self._validate_mcp_config_schema(config)
# Check platform-specific values
assert config["platform"]["os"] == platform_info["os"]
assert config["platform"]["ciEnvironment"] == platform_info["is_ci"]
# Check command based on OS
expected_command = "npx.cmd" if platform_info["os"] == "windows" else "npx"
for server_config in config["mcpServers"].values():
assert server_config["command"] == expected_command
class TestYAMLFrontmatterValidation:
"""Test cases for YAML frontmatter validation."""
def setup_method(self):
"""Set up test fixtures."""
self.temp_dir = tempfile.mkdtemp()
self.validator = YAMLFrontmatterValidator()
def teardown_method(self):
"""Clean up test fixtures."""
shutil.rmtree(self.temp_dir)
def test_valid_frontmatter(self):
"""Test validation of valid frontmatter."""
# Create valid .mdc file
mdc_file = Path(self.temp_dir) / "valid.mdc"
mdc_file.write_text("""---
title: Test Rule
description: A test rule for validation
created: 2024-01-01T00:00:00
platform: linux
user: testuser
---
# Test Rule
This is a test rule.
""")
result = self.validator.validate_file(mdc_file)
assert result.is_valid
assert len(result.errors) == 0
assert result.frontmatter is not None
assert result.frontmatter["title"] == "Test Rule"
def test_missing_required_fields(self):
"""Test validation with missing required fields."""
# Create .mdc file with missing required fields
mdc_file = Path(self.temp_dir) / "invalid.mdc"
mdc_file.write_text("""---
title: Test Rule
---
# Test Rule
This is a test rule.
""")
result = self.validator.validate_file(mdc_file)
assert not result.is_valid
assert len(result.errors) > 0
assert any("Missing required field" in error for error in result.errors)
def test_invalid_yaml_syntax(self):
"""Test validation with invalid YAML syntax."""
# Create .mdc file with invalid YAML
mdc_file = Path(self.temp_dir) / "invalid_yaml.mdc"
mdc_file.write_text("""---
title: Test Rule
description: A test rule
created: 2024-01-01T00:00:00
platform: linux
user: testuser
invalid_yaml: [unclosed list
---
# Test Rule
This is a test rule.
""")
result = self.validator.validate_file(mdc_file)
assert not result.is_valid
assert any("Invalid YAML syntax" in error for error in result.errors)
def test_invalid_platform(self):
"""Test validation with invalid platform."""
# Create .mdc file with invalid platform
mdc_file = Path(self.temp_dir) / "invalid_platform.mdc"
mdc_file.write_text("""---
title: Test Rule
description: A test rule
created: 2024-01-01T00:00:00
platform: invalid_platform
user: testuser
---
# Test Rule
This is a test rule.
""")
result = self.validator.validate_file(mdc_file)
assert not result.is_valid
assert any("Invalid platform" in error for error in result.errors)
def test_no_frontmatter(self):
"""Test validation with no frontmatter."""
# Create .mdc file without frontmatter
mdc_file = Path(self.temp_dir) / "no_frontmatter.mdc"
mdc_file.write_text("""# Test Rule
This is a test rule without frontmatter.
""")
result = self.validator.validate_file(mdc_file)
assert not result.is_valid
assert any("No YAML frontmatter found" in error for error in result.errors)
class TestEnvironmentConfiguration:
"""Test cases for environment configuration."""
def setup_method(self):
"""Set up test fixtures."""
self.env_config = EnvironmentConfig()
def test_get_conan_home_fallback(self):
"""Test Conan home directory fallback."""
# Clear cache
self.env_config._cache.clear()
# Test with environment variable set
with patch.dict(os.environ, {"CONAN_USER_HOME": "/custom/conan/home"}):
conan_home = self.env_config.get_conan_home()
assert conan_home == "/custom/conan/home"
# Test fallback
with patch.dict(os.environ, {}, clear=True):
conan_home = self.env_config.get_conan_home()
assert conan_home.endswith(".conan2")
def test_validate_required_openssl(self):
"""Test validation of required variables for OpenSSL project."""
# Test with all required variables
with patch.dict(os.environ, {
"CONAN_USER_HOME": "/test/conan",
"OPENSSL_ROOT_DIR": "/test/openssl"
}):
is_valid, missing = self.env_config.validate_required("openssl")
assert is_valid
assert len(missing) == 0
# Test with missing variables
with patch.dict(os.environ, {}, clear=True):
is_valid, missing = self.env_config.validate_required("openssl")
assert not is_valid
assert "CONAN_USER_HOME" in missing
assert "OPENSSL_ROOT_DIR" in missing
def test_get_validation_errors(self):
"""Test getting validation error messages."""
with patch.dict(os.environ, {}, clear=True):
errors = self.env_config.get_validation_errors("openssl")
assert len(errors) > 0
assert any("Missing required environment variables" in error for error in errors)
assert any("CONAN_USER_HOME" in error for error in errors)
assert any("OPENSSL_ROOT_DIR" in error for error in errors)
# Import os for patching
import os
```
--------------------------------------------------------------------------------
/src/mcp_project_orchestrator/core/fastmcp.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Enhanced FastMCP server implementation for the MCP Project Orchestrator.
This module provides a comprehensive MCP server that handles communication
with MCP clients like Claude Desktop, exposing project orchestration,
prompt management, and diagram generation capabilities through the Model
Context Protocol.
"""
import os
import sys
import signal
import logging
import json
import asyncio
from typing import Dict, Any, Optional, Callable, List
# Set up logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("mcp-project-orchestrator")
class MCP_Error(Exception):
"""Base exception class for MCP server errors."""
pass
class FastMCPServer:
"""
Enhanced FastMCP server implementation for project orchestration.
This class provides a comprehensive MCP server that handles communication
with MCP clients, exposing orchestration capabilities through
registered tools and resources with robust error handling.
"""
def __init__(self, config):
"""
Initialize the MCP server with the given configuration.
Args:
config: The server configuration object
"""
self.name = config.name if hasattr(config, 'name') else "MCP Project Orchestrator"
self.config = config
self.tools: Dict[str, Dict[str, Any]] = {}
self.resources: Dict[str, Any] = {}
# Set up signal handlers
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGTERM, self._handle_signal)
logger.info(f"Initialized FastMCP server '{self.name}'")
async def initialize(self) -> None:
"""
Initialize the server asynchronously.
This method should be called after the server is created to set up
all required components before starting the server.
"""
logger.info("Initializing FastMCPServer")
# Additional initialization logic can be added here
logger.info("FastMCPServer initialization complete")
async def start(self, host: Optional[str] = None, port: Optional[int] = None) -> None:
"""
Start the server asynchronously.
Args:
host: Optional host to bind to (overrides config)
port: Optional port to bind to (overrides config)
"""
# Use provided values or fall back to config
self.host = host or self.config.host
self.port = port or self.config.port
logger.info(f"Starting FastMCP server on {self.host}:{self.port}")
# Server startup logic would go here
logger.info(f"FastMCP server started successfully on {self.host}:{self.port}")
async def stop(self) -> None:
"""
Stop the server gracefully.
"""
logger.info("Stopping FastMCP server")
# Server shutdown logic would go here
logger.info("FastMCP server stopped")
def tool(self, func: Optional[Callable] = None,
name: Optional[str] = None,
description: Optional[str] = None,
parameters: Optional[Dict[str, Any]] = None):
"""
Decorator to register a function as an MCP tool.
Args:
func: The function to register
name: Optional name for the tool (defaults to function name)
description: Optional description of the tool
parameters: Optional parameters schema for the tool
Returns:
The decorated function
"""
def decorator(fn):
tool_name = name or fn.__name__
tool_desc = description or fn.__doc__ or f"Tool {tool_name}"
# Extract parameters from function signature if not provided
tool_params = parameters or {}
if not tool_params:
import inspect
sig = inspect.signature(fn)
tool_params = {
"type": "object",
"properties": {},
"required": []
}
for param_name, param in sig.parameters.items():
if param_name == "self":
continue
param_type = "string" # Default type
if param.annotation is not inspect.Parameter.empty:
if param.annotation == str:
param_type = "string"
elif param.annotation == int:
param_type = "integer"
elif param.annotation == float:
param_type = "number"
elif param.annotation == bool:
param_type = "boolean"
elif param.annotation == dict or param.annotation == Dict:
param_type = "object"
elif param.annotation == list or param.annotation == List:
param_type = "array"
tool_params["properties"][param_name] = {
"type": param_type,
"description": f"Parameter {param_name}"
}
# Add to required params if no default value
if param.default is inspect.Parameter.empty:
tool_params["required"].append(param_name)
self.tools[tool_name] = {
"function": fn,
"description": tool_desc,
"parameters": tool_params
}
logger.info(f"Registered tool '{tool_name}'")
return fn
if func is None:
return decorator
return decorator(func)
def resource(self, name: str, content: Any) -> None:
"""
Register a resource with the MCP server.
Args:
name: Name of the resource
content: Content of the resource
"""
self.resources[name] = content
logger.info(f"Registered resource '{name}'")
def register_tool(self, name: str, description: str, parameters: Dict[str, Any], handler: Callable):
"""
Register a tool with the MCP server.
Args:
name: Name of the tool
description: Description of the tool
parameters: Parameters schema for the tool
handler: Handler function for the tool
"""
logger.info(f"Registering tool: {name}")
self.tools[name] = {
"function": handler,
"description": description,
"parameters": parameters
}
logger.debug(f"Tool registered: {name} - {description}")
def _handle_signal(self, signum: int, frame: Any) -> None:
"""
Handle termination signals gracefully.
Args:
signum: Signal number
frame: Current stack frame
"""
logger.info(f"Received signal {signum}, shutting down...")
# Create and run an asyncio task to stop the server
loop = asyncio.get_event_loop()
loop.create_task(self.stop())
# Allow some time for cleanup
loop.call_later(2, loop.stop)
def _handle_client_message(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle an MCP protocol message from a client.
Args:
message: The message from the client
Returns:
The response to send back to the client
"""
try:
if "jsonrpc" not in message or message["jsonrpc"] != "2.0":
return self._error_response(message.get("id"), -32600, "Invalid request")
if "method" not in message:
return self._error_response(message.get("id"), -32600, "Method not specified")
method = message["method"]
params = message.get("params", {})
if method == "mcp/initialize":
return self._handle_initialize(message["id"], params)
elif method == "mcp/listTools":
return self._handle_list_tools(message["id"])
elif method == "mcp/callTool":
return self._handle_call_tool(message["id"], params)
elif method == "mcp/listResources":
return self._handle_list_resources(message["id"])
elif method == "mcp/readResource":
return self._handle_read_resource(message["id"], params)
else:
return self._error_response(message["id"], -32601, f"Method '{method}' not supported")
except Exception as e:
logger.error(f"Error handling message: {str(e)}")
return self._error_response(message.get("id"), -32603, f"Internal error: {str(e)}")
def _error_response(self, id: Any, code: int, message: str) -> Dict[str, Any]:
"""
Create an error response according to the JSON-RPC 2.0 spec.
Args:
id: The request ID
code: The error code
message: The error message
Returns:
The error response
"""
return {
"jsonrpc": "2.0",
"id": id,
"error": {
"code": code,
"message": message
}
}
def _handle_initialize(self, id: Any, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle the mcp/initialize method.
Args:
id: The request ID
params: The method parameters
Returns:
The response
"""
# Return server capabilities
return {
"jsonrpc": "2.0",
"id": id,
"result": {
"name": self.name,
"version": "0.1.0",
"capabilities": {
"listTools": True,
"callTool": True,
"listResources": True,
"readResource": True
}
}
}
def _handle_list_tools(self, id: Any) -> Dict[str, Any]:
"""
Handle the mcp/listTools method.
Args:
id: The request ID
Returns:
The response
"""
tools = []
for name, tool in self.tools.items():
tools.append({
"name": name,
"description": tool["description"],
"parameters": tool["parameters"]
})
return {
"jsonrpc": "2.0",
"id": id,
"result": {
"tools": tools
}
}
def _handle_call_tool(self, id: Any, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle the mcp/callTool method.
Args:
id: The request ID
params: The method parameters
Returns:
The response
"""
tool_name = params.get("name")
tool_params = params.get("arguments", {})
if not tool_name:
return self._error_response(id, -32602, "Tool name not specified")
if tool_name not in self.tools:
return self._error_response(id, -32602, f"Tool '{tool_name}' not found")
try:
tool = self.tools[tool_name]["function"]
result = tool(**tool_params)
return {
"jsonrpc": "2.0",
"id": id,
"result": {
"result": result
}
}
except Exception as e:
logger.error(f"Error calling tool '{tool_name}': {str(e)}")
return self._error_response(id, -32603, f"Error calling tool '{tool_name}': {str(e)}")
def _handle_list_resources(self, id: Any) -> Dict[str, Any]:
"""
Handle the mcp/listResources method.
Args:
id: The request ID
Returns:
The response
"""
resources = []
for name in self.resources:
resources.append({
"uri": f"mcp://{self.name.lower()}/resources/{name}",
"name": name
})
return {
"jsonrpc": "2.0",
"id": id,
"result": {
"resources": resources
}
}
def _handle_read_resource(self, id: Any, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle the mcp/readResource method.
Args:
id: The request ID
params: The method parameters
Returns:
The response
"""
uri = params.get("uri")
if not uri:
return self._error_response(id, -32602, "Resource URI not specified")
# Parse the URI to get the resource name
resource_name = uri.split("/")[-1]
if resource_name not in self.resources:
return self._error_response(id, -32602, f"Resource '{resource_name}' not found")
return {
"jsonrpc": "2.0",
"id": id,
"result": {
"contents": self.resources[resource_name]
}
}
def run(self, host: str = "127.0.0.1", port: int = 8080) -> None:
"""
Run the MCP server and handle client connections.
Args:
host: The host to bind to
port: The port to listen on
"""
logger.info(f"FastMCP server '{self.name}' running with configuration: {self.config}")
try:
import asyncio
import websockets
async def handle_websocket(websocket: Any, path: str) -> None:
"""Handle a websocket connection."""
async for message in websocket:
try:
request = json.loads(message)
logger.debug(f"Received message: {request}")
response = self._handle_client_message(request)
logger.debug(f"Sending response: {response}")
await websocket.send(json.dumps(response))
except json.JSONDecodeError as e:
logger.error(f"Error decoding message: {str(e)}")
await websocket.send(json.dumps(self._error_response(None, -32700, "Parse error")))
except Exception as e:
logger.error(f"Error handling message: {str(e)}")
await websocket.send(json.dumps(self._error_response(None, -32603, f"Internal error: {str(e)}")))
# Start the server
start_server = websockets.serve(handle_websocket, host, port)
asyncio.get_event_loop().run_until_complete(start_server)
logger.info(f"Server running on {host}:{port}")
logger.info("Press Ctrl+C to stop")
# Keep the event loop running
asyncio.get_event_loop().run_forever()
except ImportError:
# Fallback to stdio for compatibility with Claude Desktop
logger.info("Websockets not available, falling back to stdio")
self._run_stdio()
except KeyboardInterrupt:
logger.info("Keyboard interrupt received, shutting down...")
except Exception as e:
logger.error(f"Error running server: {str(e)}")
finally:
logger.info("Server shutting down")
def _run_stdio(self) -> None:
"""Run the MCP server using standard input and output streams."""
logger.info("Running in stdio mode")
# Handle UTF-8 encoding on Windows
if sys.platform == "win32":
import msvcrt
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf-8', buffering=1)
sys.stdin = open(sys.stdin.fileno(), mode='r', encoding='utf-8', buffering=1)
while True:
try:
# Read the content length header
header = sys.stdin.readline().strip()
if not header:
continue
content_length = int(header.split(":")[1].strip())
# Skip the empty line
sys.stdin.readline()
# Read the message content
content = sys.stdin.read(content_length)
# Parse and handle the message
message = json.loads(content)
response = self._handle_client_message(message)
# Send the response
response_json = json.dumps(response)
response_bytes = response_json.encode('utf-8')
sys.stdout.write(f"Content-Length: {len(response_bytes)}\r\n\r\n")
sys.stdout.write(response_json)
sys.stdout.flush()
except Exception as e:
logger.error(f"Error in stdio loop: {str(e)}")
# Try to recover and continue
if __name__ == "__main__":
import argparse
from .config import MCPConfig
parser = argparse.ArgumentParser(description="FastMCP Server")
parser.add_argument("--config", help="Path to configuration file")
parser.add_argument("--host", default="127.0.0.1", help="Host to bind to")
parser.add_argument("--port", type=int, default=8080, help="Port to bind to")
args = parser.parse_args()
# Create a config object
config = MCPConfig(args.config) if args.config else MCPConfig()
server = FastMCPServer(config)
server.run(host=args.host, port=args.port)
```
--------------------------------------------------------------------------------
/printcast-agent/src/integrations/printing.py:
--------------------------------------------------------------------------------
```python
"""
Print server integration for PrintCast Agent.
Handles printing operations using CUPS and PDF generation.
"""
import asyncio
import os
import tempfile
from typing import Any, Dict, List, Optional
from datetime import datetime
from pathlib import Path
import subprocess
import base64
import structlog
from pydantic import BaseModel, Field
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib.units import mm
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak, Table, TableStyle
from reportlab.lib import colors
from jinja2 import Template
logger = structlog.get_logger(__name__)
class PrintJob(BaseModel):
"""Represents a print job."""
job_id: str
session_id: str
document_path: str
printer_name: str
status: str = "pending"
pages: int = 0
copies: int = 1
created_at: datetime = Field(default_factory=datetime.now)
completed_at: Optional[datetime] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
class PrintManager:
"""
Manages printing operations.
Features:
- CUPS integration for local/network printers
- PDF generation from content
- Print job queue management
- Print preview generation
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize print manager.
Args:
config: Configuration including:
- default_printer: Default printer name
- cups_server: CUPS server address
- temp_dir: Temporary directory for print files
- pdf_settings: PDF generation settings
"""
self.config = config
self.default_printer = config.get("default_printer", "default")
self.cups_server = config.get("cups_server", "localhost:631")
self.temp_dir = Path(config.get("temp_dir", "/tmp/printcast"))
self.pdf_settings = config.get("pdf_settings", {})
# Create temp directory
self.temp_dir.mkdir(parents=True, exist_ok=True)
# Print job tracking
self.jobs: Dict[str, PrintJob] = {}
self.job_counter = 0
# Check if CUPS is available
self.cups_available = False
logger.info(
"Print manager initialized",
default_printer=self.default_printer,
temp_dir=str(self.temp_dir)
)
async def initialize(self):
"""Initialize print manager and check CUPS availability."""
try:
# Check if CUPS is available
result = await asyncio.create_subprocess_exec(
"lpstat", "-p",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await result.communicate()
if result.returncode == 0:
self.cups_available = True
logger.info("CUPS is available")
# Parse available printers
printers = []
for line in stdout.decode().split("\n"):
if line.startswith("printer"):
parts = line.split()
if len(parts) >= 2:
printers.append(parts[1])
logger.info(
"Available printers",
count=len(printers),
printers=printers
)
else:
logger.warning("CUPS not available", stderr=stderr.decode())
except Exception as e:
logger.warning("Could not check CUPS availability", error=str(e))
self.cups_available = False
async def shutdown(self):
"""Cleanup resources."""
# Cancel pending jobs
for job in self.jobs.values():
if job.status == "pending":
job.status = "cancelled"
logger.info("Print manager shutdown")
def is_available(self) -> bool:
"""Check if printing is available."""
return self.cups_available
async def generate_pdf(
self,
content: str,
title: str = "PrintCast Document",
format: str = "A4",
output_path: Optional[str] = None
) -> str:
"""
Generate PDF from content.
Args:
content: Content to print (text, HTML, or markdown)
title: Document title
format: Page format
output_path: Optional output path
Returns:
Path to generated PDF
"""
try:
# Generate output path if not provided
if not output_path:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = self.temp_dir / f"document_{timestamp}.pdf"
else:
output_path = Path(output_path)
# Create PDF document
doc = SimpleDocTemplate(
str(output_path),
pagesize=A4,
rightMargin=20*mm,
leftMargin=20*mm,
topMargin=20*mm,
bottomMargin=20*mm
)
# Container for the 'Flowable' objects
elements = []
# Define styles
styles = getSampleStyleSheet()
title_style = styles['Title']
heading_style = styles['Heading1']
normal_style = styles['Normal']
# Add title
elements.append(Paragraph(title, title_style))
elements.append(Spacer(1, 12))
# Add timestamp
timestamp_text = f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
elements.append(Paragraph(timestamp_text, normal_style))
elements.append(Spacer(1, 20))
# Parse and add content
if content.startswith("<html>"):
# HTML content - parse and convert
from bs4 import BeautifulSoup
soup = BeautifulSoup(content, "html.parser")
for elem in soup.find_all(["h1", "h2", "h3", "p", "ul", "ol"]):
if elem.name.startswith("h"):
elements.append(Paragraph(elem.text, heading_style))
else:
elements.append(Paragraph(elem.text, normal_style))
elements.append(Spacer(1, 6))
elif content.startswith("#"):
# Markdown content - convert to PDF elements
lines = content.split("\n")
for line in lines:
if line.startswith("##"):
elements.append(Paragraph(line[2:].strip(), heading_style))
elif line.startswith("#"):
elements.append(Paragraph(line[1:].strip(), title_style))
elif line.strip():
elements.append(Paragraph(line, normal_style))
elements.append(Spacer(1, 6))
else:
# Plain text - split by paragraphs
paragraphs = content.split("\n\n")
for para in paragraphs:
if para.strip():
elements.append(Paragraph(para, normal_style))
elements.append(Spacer(1, 12))
# Build PDF
doc.build(elements)
logger.info(
"PDF generated",
path=str(output_path),
size=output_path.stat().st_size
)
return str(output_path)
except Exception as e:
logger.error("Failed to generate PDF", error=str(e))
raise
async def generate_preview(
self,
items: List[str],
format: str = "pdf"
) -> Dict[str, Any]:
"""
Generate print preview.
Args:
items: Content items to preview
format: Preview format
Returns:
Preview information
"""
try:
# Generate preview document
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
preview_path = self.temp_dir / f"preview_{timestamp}.{format}"
# Create preview content
content = "# Print Preview\n\n"
for i, item in enumerate(items, 1):
content += f"## Item {i}\n{item}\n\n"
# Generate document
if format == "pdf":
doc_path = await self.generate_pdf(
content,
title="Print Preview",
output_path=str(preview_path)
)
else:
# HTML preview
html_content = f"""
<!DOCTYPE html>
<html>
<head><title>Print Preview</title></head>
<body>
<h1>Print Preview</h1>
{''.join(f'<div>{item}</div>' for item in items)}
</body>
</html>
"""
preview_path.write_text(html_content)
doc_path = str(preview_path)
# Get file info
file_stat = preview_path.stat()
# Estimate page count (rough)
page_count = max(1, len(items) // 3)
return {
"url": f"file://{doc_path}",
"path": doc_path,
"pages": page_count,
"size": file_stat.st_size,
"format": format
}
except Exception as e:
logger.error("Failed to generate preview", error=str(e))
raise
async def print_document(
self,
document_path: str,
printer_name: Optional[str] = None,
copies: int = 1,
options: Optional[Dict[str, str]] = None
) -> str:
"""
Print a document.
Args:
document_path: Path to document
printer_name: Printer to use
copies: Number of copies
options: Print options
Returns:
Print job ID
"""
if not self.cups_available:
logger.warning("CUPS not available, simulating print")
return await self._simulate_print(document_path)
try:
printer = printer_name or self.default_printer
# Create print job
self.job_counter += 1
job_id = f"job_{self.job_counter}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
job = PrintJob(
job_id=job_id,
session_id="",
document_path=document_path,
printer_name=printer,
copies=copies,
status="pending"
)
self.jobs[job_id] = job
# Build lpr command
cmd = ["lpr", "-P", printer]
if copies > 1:
cmd.extend(["-#", str(copies)])
if options:
for key, value in options.items():
cmd.extend(["-o", f"{key}={value}"])
cmd.append(document_path)
# Execute print command
result = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await result.communicate()
if result.returncode == 0:
job.status = "printing"
logger.info(
"Document sent to printer",
job_id=job_id,
printer=printer,
document=document_path
)
# Start monitoring job
asyncio.create_task(self._monitor_print_job(job_id))
else:
job.status = "failed"
logger.error(
"Failed to print document",
job_id=job_id,
error=stderr.decode()
)
raise RuntimeError(f"Print failed: {stderr.decode()}")
return job_id
except Exception as e:
logger.error("Failed to print document", error=str(e))
raise
async def _simulate_print(self, document_path: str) -> str:
"""Simulate printing when CUPS is not available."""
self.job_counter += 1
job_id = f"sim_job_{self.job_counter}"
job = PrintJob(
job_id=job_id,
session_id="",
document_path=document_path,
printer_name="simulated",
status="simulated"
)
self.jobs[job_id] = job
logger.info(
"Print simulated",
job_id=job_id,
document=document_path
)
# Simulate processing delay
await asyncio.sleep(2)
job.status = "completed"
job.completed_at = datetime.now()
return job_id
async def _monitor_print_job(self, job_id: str):
"""Monitor print job status."""
job = self.jobs.get(job_id)
if not job:
return
try:
# Poll job status
for _ in range(60): # Monitor for up to 60 seconds
await asyncio.sleep(1)
# Check job status using lpstat
result = await asyncio.create_subprocess_exec(
"lpstat", "-W", "completed",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, _ = await result.communicate()
# Simple check - in production would parse lpstat output properly
if job_id in stdout.decode():
job.status = "completed"
job.completed_at = datetime.now()
logger.info("Print job completed", job_id=job_id)
break
except Exception as e:
logger.error(
"Error monitoring print job",
job_id=job_id,
error=str(e)
)
job.status = "error"
async def cancel_print_job(self, job_id: str) -> bool:
"""
Cancel a print job.
Args:
job_id: Job ID to cancel
Returns:
True if cancelled successfully
"""
job = self.jobs.get(job_id)
if not job:
return False
if job.status in ["completed", "cancelled", "failed"]:
return False
try:
if self.cups_available:
# Cancel using lprm
result = await asyncio.create_subprocess_exec(
"lprm", "-P", job.printer_name, job_id,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await result.communicate()
job.status = "cancelled"
logger.info("Print job cancelled", job_id=job_id)
return True
except Exception as e:
logger.error(
"Failed to cancel print job",
job_id=job_id,
error=str(e)
)
return False
def get_job_status(self, job_id: str) -> Optional[Dict[str, Any]]:
"""Get print job status."""
job = self.jobs.get(job_id)
if not job:
return None
return {
"job_id": job.job_id,
"status": job.status,
"printer": job.printer_name,
"document": job.document_path,
"created": job.created_at.isoformat(),
"completed": job.completed_at.isoformat() if job.completed_at else None
}
async def get_printer_list(self) -> List[Dict[str, Any]]:
"""Get list of available printers."""
printers = []
if not self.cups_available:
return [{
"name": "simulated",
"status": "ready",
"default": True
}]
try:
# Get printer list using lpstat
result = await asyncio.create_subprocess_exec(
"lpstat", "-p", "-d",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, _ = await result.communicate()
output = stdout.decode()
# Parse printer list
default_printer = None
for line in output.split("\n"):
if line.startswith("printer"):
parts = line.split()
if len(parts) >= 2:
printer_name = parts[1]
status = "ready" if "enabled" in line else "offline"
printers.append({
"name": printer_name,
"status": status,
"default": False
})
elif line.startswith("system default"):
parts = line.split(":")
if len(parts) >= 2:
default_printer = parts[1].strip()
# Mark default printer
for printer in printers:
if printer["name"] == default_printer:
printer["default"] = True
except Exception as e:
logger.error("Failed to get printer list", error=str(e))
return printers
async def create_print_batch(
self,
documents: List[Dict[str, Any]],
printer_name: Optional[str] = None
) -> List[str]:
"""
Create a batch print job.
Args:
documents: List of documents to print
printer_name: Printer to use
Returns:
List of job IDs
"""
job_ids = []
for doc in documents:
try:
# Generate PDF if needed
if "content" in doc:
doc_path = await self.generate_pdf(
doc["content"],
title=doc.get("title", "Document")
)
else:
doc_path = doc["path"]
# Print document
job_id = await self.print_document(
doc_path,
printer_name=printer_name,
copies=doc.get("copies", 1)
)
job_ids.append(job_id)
except Exception as e:
logger.error(
"Failed to print document in batch",
document=doc.get("title", "Unknown"),
error=str(e)
)
return job_ids
```
--------------------------------------------------------------------------------
/src/mcp_project_orchestrator/aws_mcp.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
AWS MCP Integration Module.
This module provides AWS-specific Model Context Protocol capabilities including:
- AWS service integrations (S3, EC2, Lambda, CloudFormation, etc.)
- AWS best practices enforcement
- AWS documentation and guidance
- Cost optimization recommendations
- IAM and security configurations
Environment Variables Required:
- AWS_REGION: AWS region (default: us-east-1)
- AWS_ACCESS_KEY_ID: AWS access key ID (optional if using IAM roles)
- AWS_SECRET_ACCESS_KEY: AWS secret access key (optional if using IAM roles)
- AWS_SESSION_TOKEN: AWS session token (optional for temporary credentials)
- AWS_PROFILE: AWS CLI profile name (optional)
"""
import os
import json
import logging
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclass
class AWSConfig:
"""
AWS Configuration for MCP integration.
Attributes:
region: AWS region (e.g., us-east-1, eu-west-1)
access_key_id: AWS access key ID (optional if using IAM roles)
secret_access_key: AWS secret access key (optional if using IAM roles)
session_token: AWS session token for temporary credentials
profile: AWS CLI profile name
endpoint_url: Custom endpoint URL (for testing or LocalStack)
"""
region: str = field(default_factory=lambda: os.getenv("AWS_REGION", "us-east-1"))
access_key_id: Optional[str] = field(default_factory=lambda: os.getenv("AWS_ACCESS_KEY_ID"))
secret_access_key: Optional[str] = field(
default_factory=lambda: os.getenv("AWS_SECRET_ACCESS_KEY")
)
session_token: Optional[str] = field(default_factory=lambda: os.getenv("AWS_SESSION_TOKEN"))
profile: Optional[str] = field(default_factory=lambda: os.getenv("AWS_PROFILE"))
endpoint_url: Optional[str] = field(default_factory=lambda: os.getenv("AWS_ENDPOINT_URL"))
def to_boto3_config(self) -> Dict[str, Any]:
"""
Convert AWS config to boto3 client configuration.
Returns:
Dictionary suitable for boto3.client() or boto3.resource()
"""
config = {"region_name": self.region}
if self.access_key_id:
config["aws_access_key_id"] = self.access_key_id
if self.secret_access_key:
config["aws_secret_access_key"] = self.secret_access_key
if self.session_token:
config["aws_session_token"] = self.session_token
if self.endpoint_url:
config["endpoint_url"] = self.endpoint_url
return config
def validate(self) -> bool:
"""
Validate AWS configuration.
Returns:
True if configuration is valid, False otherwise
"""
if not self.region:
logger.error("AWS_REGION is required")
return False
# If access_key_id is provided, secret_access_key must also be provided
if self.access_key_id and not self.secret_access_key:
logger.error("AWS_SECRET_ACCESS_KEY is required when AWS_ACCESS_KEY_ID is set")
return False
if self.secret_access_key and not self.access_key_id:
logger.error("AWS_ACCESS_KEY_ID is required when AWS_SECRET_ACCESS_KEY is set")
return False
return True
class AWSMCPIntegration:
"""
AWS MCP Integration providing AWS service capabilities through MCP.
This class provides tools and resources for:
- AWS service management (S3, EC2, Lambda, etc.)
- AWS best practices and architectural guidance
- Cost optimization recommendations
- Security and IAM configurations
"""
def __init__(self, config: Optional[AWSConfig] = None):
"""
Initialize AWS MCP integration.
Args:
config: AWS configuration. If None, loads from environment variables.
"""
self.config = config or AWSConfig()
self._boto3_available = False
self._clients: Dict[str, Any] = {}
# Validate configuration
if not self.config.validate():
logger.warning("AWS configuration is invalid. Some features may not work.")
# Try to import boto3
try:
import boto3
self._boto3_available = True
logger.info("boto3 is available for AWS operations")
except ImportError:
logger.warning(
"boto3 is not installed. Install it with: pip install boto3 botocore"
)
def _get_client(self, service_name: str):
"""
Get or create a boto3 client for the specified service.
Args:
service_name: AWS service name (e.g., 's3', 'ec2', 'lambda')
Returns:
Boto3 client instance
Raises:
ImportError: If boto3 is not installed
Exception: If client creation fails
"""
if not self._boto3_available:
raise ImportError("boto3 is not installed")
if service_name not in self._clients:
import boto3
# Use profile if specified, otherwise use credentials
if self.config.profile:
session = boto3.Session(profile_name=self.config.profile)
self._clients[service_name] = session.client(
service_name,
region_name=self.config.region,
endpoint_url=self.config.endpoint_url
)
else:
self._clients[service_name] = boto3.client(
service_name,
**self.config.to_boto3_config()
)
logger.info(f"Created {service_name} client for region {self.config.region}")
return self._clients[service_name]
# S3 Operations
def list_s3_buckets(self) -> List[Dict[str, Any]]:
"""
List all S3 buckets in the account.
Returns:
List of bucket information dictionaries
"""
try:
s3 = self._get_client('s3')
response = s3.list_buckets()
return response.get('Buckets', [])
except Exception as e:
logger.error(f"Error listing S3 buckets: {e}")
return []
def list_s3_objects(self, bucket_name: str, prefix: str = "") -> List[Dict[str, Any]]:
"""
List objects in an S3 bucket.
Args:
bucket_name: Name of the S3 bucket
prefix: Optional prefix to filter objects
Returns:
List of object information dictionaries
"""
try:
s3 = self._get_client('s3')
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
return response.get('Contents', [])
except Exception as e:
logger.error(f"Error listing S3 objects in {bucket_name}: {e}")
return []
def upload_to_s3(self, bucket_name: str, file_path: str, object_key: str) -> bool:
"""
Upload a file to S3.
Args:
bucket_name: Name of the S3 bucket
file_path: Local file path to upload
object_key: S3 object key (destination path)
Returns:
True if upload successful, False otherwise
"""
try:
s3 = self._get_client('s3')
s3.upload_file(file_path, bucket_name, object_key)
logger.info(f"Uploaded {file_path} to s3://{bucket_name}/{object_key}")
return True
except Exception as e:
logger.error(f"Error uploading to S3: {e}")
return False
# EC2 Operations
def list_ec2_instances(self) -> List[Dict[str, Any]]:
"""
List all EC2 instances in the region.
Returns:
List of instance information dictionaries
"""
try:
ec2 = self._get_client('ec2')
response = ec2.describe_instances()
instances = []
for reservation in response.get('Reservations', []):
instances.extend(reservation.get('Instances', []))
return instances
except Exception as e:
logger.error(f"Error listing EC2 instances: {e}")
return []
def get_ec2_instance_status(self, instance_id: str) -> Dict[str, Any]:
"""
Get the status of an EC2 instance.
Args:
instance_id: EC2 instance ID
Returns:
Dictionary with instance status information
"""
try:
ec2 = self._get_client('ec2')
response = ec2.describe_instance_status(InstanceIds=[instance_id])
statuses = response.get('InstanceStatuses', [])
return statuses[0] if statuses else {}
except Exception as e:
logger.error(f"Error getting EC2 instance status: {e}")
return {}
# Lambda Operations
def list_lambda_functions(self) -> List[Dict[str, Any]]:
"""
List all Lambda functions in the region.
Returns:
List of Lambda function information dictionaries
"""
try:
lambda_client = self._get_client('lambda')
response = lambda_client.list_functions()
return response.get('Functions', [])
except Exception as e:
logger.error(f"Error listing Lambda functions: {e}")
return []
def invoke_lambda(
self,
function_name: str,
payload: Dict[str, Any],
invocation_type: str = "RequestResponse"
) -> Dict[str, Any]:
"""
Invoke a Lambda function.
Args:
function_name: Name of the Lambda function
payload: Payload to send to the function
invocation_type: Type of invocation (RequestResponse, Event, DryRun)
Returns:
Lambda invocation response
"""
try:
lambda_client = self._get_client('lambda')
response = lambda_client.invoke(
FunctionName=function_name,
InvocationType=invocation_type,
Payload=json.dumps(payload)
)
result = {
'StatusCode': response['StatusCode'],
'Payload': response['Payload'].read().decode('utf-8')
}
if 'FunctionError' in response:
result['FunctionError'] = response['FunctionError']
return result
except Exception as e:
logger.error(f"Error invoking Lambda function: {e}")
return {'error': str(e)}
# CloudFormation Operations
def list_cloudformation_stacks(self) -> List[Dict[str, Any]]:
"""
List all CloudFormation stacks.
Returns:
List of stack information dictionaries
"""
try:
cfn = self._get_client('cloudformation')
response = cfn.describe_stacks()
return response.get('Stacks', [])
except Exception as e:
logger.error(f"Error listing CloudFormation stacks: {e}")
return []
# IAM Operations
def list_iam_users(self) -> List[Dict[str, Any]]:
"""
List all IAM users.
Returns:
List of IAM user information dictionaries
"""
try:
iam = self._get_client('iam')
response = iam.list_users()
return response.get('Users', [])
except Exception as e:
logger.error(f"Error listing IAM users: {e}")
return []
def list_iam_roles(self) -> List[Dict[str, Any]]:
"""
List all IAM roles.
Returns:
List of IAM role information dictionaries
"""
try:
iam = self._get_client('iam')
response = iam.list_roles()
return response.get('Roles', [])
except Exception as e:
logger.error(f"Error listing IAM roles: {e}")
return []
# AWS Best Practices
def get_aws_best_practices(self, service: str) -> Dict[str, Any]:
"""
Get AWS best practices for a specific service.
Args:
service: AWS service name (e.g., 's3', 'ec2', 'lambda')
Returns:
Dictionary containing best practices and recommendations
"""
best_practices = {
's3': {
'security': [
'Enable bucket encryption',
'Use bucket policies for access control',
'Enable versioning for critical data',
'Enable access logging',
'Block public access by default'
],
'cost': [
'Use appropriate storage classes (Standard, IA, Glacier)',
'Enable lifecycle policies',
'Delete incomplete multipart uploads',
'Use S3 Intelligent-Tiering for unpredictable access patterns'
],
'performance': [
'Use CloudFront for content delivery',
'Enable Transfer Acceleration for large files',
'Use multipart upload for files > 100MB'
]
},
'ec2': {
'security': [
'Use security groups properly',
'Enable detailed monitoring',
'Use IAM roles instead of credentials',
'Keep AMIs up to date',
'Enable EBS encryption'
],
'cost': [
'Use Reserved Instances or Savings Plans',
'Right-size instances regularly',
'Use Auto Scaling',
'Stop unused instances',
'Use Spot Instances for flexible workloads'
],
'performance': [
'Choose appropriate instance types',
'Use placement groups for HPC',
'Enable enhanced networking',
'Use EBS-optimized instances'
]
},
'lambda': {
'security': [
'Use IAM roles with least privilege',
'Enable VPC if accessing private resources',
'Use environment variables for configuration',
'Enable AWS X-Ray for tracing'
],
'cost': [
'Optimize memory allocation',
'Reduce cold starts',
'Use Lambda Power Tuning',
'Monitor and optimize execution time'
],
'performance': [
'Reuse execution context',
'Minimize deployment package size',
'Use Lambda layers for common code',
'Configure appropriate timeout values'
]
}
}
return best_practices.get(
service.lower(),
{'message': f'Best practices for {service} not available'}
)
# Cost Optimization
def estimate_costs(self, service: str, usage: Dict[str, Any]) -> Dict[str, Any]:
"""
Estimate AWS costs for a service based on usage.
Args:
service: AWS service name
usage: Dictionary describing usage patterns
Returns:
Dictionary with cost estimates
"""
# This is a simplified example. Real implementation would use AWS Pricing API
estimates = {
's3': {
'storage_gb_month': usage.get('storage_gb', 0) * 0.023,
'requests': usage.get('requests', 0) * 0.0004 / 1000,
'data_transfer_gb': usage.get('data_transfer_gb', 0) * 0.09
},
'ec2': {
't2.micro_hours': usage.get('hours', 0) * 0.0116
},
'lambda': {
'requests': usage.get('requests', 0) * 0.20 / 1000000,
'compute_gb_seconds': usage.get('gb_seconds', 0) * 0.0000166667
}
}
service_estimate = estimates.get(service.lower(), {})
total = sum(service_estimate.values()) if service_estimate else 0
return {
'service': service,
'breakdown': service_estimate,
'total_usd': round(total, 2)
}
def register_aws_mcp_tools(mcp_server):
"""
Register AWS MCP tools with a FastMCP server instance.
Args:
mcp_server: FastMCP server instance
"""
aws = AWSMCPIntegration()
@mcp_server.tool(
name="aws_list_s3_buckets",
description="List all S3 buckets in the AWS account"
)
def list_s3_buckets() -> str:
"""List all S3 buckets."""
buckets = aws.list_s3_buckets()
if not buckets:
return "No S3 buckets found or unable to list buckets."
result = "S3 Buckets:\n"
for bucket in buckets:
result += f"- {bucket['Name']} (Created: {bucket['CreationDate']})\n"
return result
@mcp_server.tool(
name="aws_list_ec2_instances",
description="List all EC2 instances in the current region"
)
def list_ec2_instances() -> str:
"""List all EC2 instances."""
instances = aws.list_ec2_instances()
if not instances:
return "No EC2 instances found."
result = "EC2 Instances:\n"
for instance in instances:
instance_id = instance['InstanceId']
state = instance['State']['Name']
instance_type = instance['InstanceType']
result += f"- {instance_id} ({instance_type}) - State: {state}\n"
return result
@mcp_server.tool(
name="aws_list_lambda_functions",
description="List all Lambda functions in the current region"
)
def list_lambda_functions() -> str:
"""List all Lambda functions."""
functions = aws.list_lambda_functions()
if not functions:
return "No Lambda functions found."
result = "Lambda Functions:\n"
for func in functions:
name = func['FunctionName']
runtime = func['Runtime']
result += f"- {name} ({runtime})\n"
return result
@mcp_server.tool(
name="aws_best_practices",
description="Get AWS best practices for a specific service (s3, ec2, lambda)"
)
def get_best_practices(service: str) -> str:
"""Get AWS best practices for a service."""
practices = aws.get_aws_best_practices(service)
if 'message' in practices:
return practices['message']
result = f"AWS Best Practices for {service.upper()}:\n\n"
for category, items in practices.items():
result += f"{category.upper()}:\n"
for item in items:
result += f" - {item}\n"
result += "\n"
return result
@mcp_server.tool(
name="aws_estimate_costs",
description="Estimate AWS costs based on usage (JSON format)"
)
def estimate_costs(service: str, usage_json: str) -> str:
"""Estimate AWS costs."""
try:
usage = json.loads(usage_json)
estimate = aws.estimate_costs(service, usage)
result = f"Cost Estimate for {service.upper()}:\n\n"
result += f"Breakdown:\n"
for item, cost in estimate['breakdown'].items():
result += f" - {item}: ${cost:.4f}\n"
result += f"\nTotal: ${estimate['total_usd']} USD\n"
return result
except json.JSONDecodeError:
return "Error: Invalid JSON format for usage parameter"
logger.info("AWS MCP tools registered successfully")
```