This is page 12 of 16. Use http://codebase.md/fujitsu-ai/mcp-server-for-mas-developments?page={x} to view the full context.
# Directory Structure
```
├── .gitattributes
├── .gitignore
├── agents
│ ├── __init__.py
│ ├── AgentInterface
│ │ ├── __init__.py
│ │ ├── Python
│ │ │ ├── __init__.py
│ │ │ ├── agent.py
│ │ │ ├── color.py
│ │ │ ├── config.py
│ │ │ ├── language.py
│ │ │ ├── local_file_handler.py
│ │ │ └── network.py
│ │ └── requirements.txt
│ ├── AgentMonitoring
│ │ ├── ChatBot-Agent Dashboard Example - Grafana.json
│ │ ├── images
│ │ │ ├── Grafana.png
│ │ │ └── Prometheus.png
│ │ ├── IoT-Agent Dashboard Example - Grafana.json
│ │ ├── OpenAI compatible API - Agent Dashboard Example - Grafana.json
│ │ ├── prometheus Example.yml
│ │ └── README.md
│ ├── ChatBotAgent
│ │ ├── __init__.py
│ │ ├── config.json.example
│ │ ├── html
│ │ │ ├── favicon.ico
│ │ │ ├── index_de.html
│ │ │ ├── index.html
│ │ │ ├── Logo_light.svg
│ │ │ ├── start_http_server.ps1
│ │ │ └── start_http_server.sh
│ │ ├── Python
│ │ │ ├── __init__.py
│ │ │ └── chatbot_agent.py
│ │ ├── README.md
│ │ └── requirements.txt
│ ├── IoTAgent
│ │ ├── config_example.json
│ │ ├── Python
│ │ │ ├── iot_mqtt_agent.py
│ │ │ └── language.py
│ │ ├── README.md
│ │ └── requirements.txt
│ ├── ISMAgent
│ │ ├── config_example.json
│ │ ├── PGPT Scenario Prompts
│ │ │ ├── ISM System Prompt - Detecting Error State.txt
│ │ │ ├── ISM User Post-Prompt - Detecting Error State.txt
│ │ │ ├── ISM User Pre-Prompt - Detecting Error State.txt
│ │ │ └── README.md
│ │ ├── Python
│ │ │ ├── ism_agent.py
│ │ │ └── language.py
│ │ ├── README.md
│ │ ├── requirements.txt
│ │ └── start_ism_agent.ps1
│ ├── MCP-Client
│ │ ├── __init__.py
│ │ ├── .env.example
│ │ ├── Python
│ │ │ ├── __init__.py
│ │ │ ├── chat_handler.py
│ │ │ ├── config.py
│ │ │ ├── environment.py
│ │ │ ├── llm_client.py
│ │ │ ├── mcp_client_sse.py
│ │ │ ├── mcp_client.py
│ │ │ ├── messages
│ │ │ │ ├── __init__.py
│ │ │ │ ├── message_types
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── incrementing_id_message.py
│ │ │ │ │ ├── initialize_message.py
│ │ │ │ │ ├── json_rpc_message.py
│ │ │ │ │ ├── ping_message.py
│ │ │ │ │ ├── prompts_messages.py
│ │ │ │ │ ├── prompts_models.py
│ │ │ │ │ ├── resources_messages.py
│ │ │ │ │ └── tools_messages.py
│ │ │ │ ├── send_call_tool.py
│ │ │ │ ├── send_initialize_message.py
│ │ │ │ ├── send_message.py
│ │ │ │ ├── send_ping.py
│ │ │ │ ├── send_prompts.py
│ │ │ │ ├── send_resources.py
│ │ │ │ └── send_tools_list.py
│ │ │ ├── system_prompt_generator.py
│ │ │ ├── tools_handler.py
│ │ │ └── transport
│ │ │ ├── __init__.py
│ │ │ └── stdio
│ │ │ ├── __init__.py
│ │ │ ├── stdio_client.py
│ │ │ ├── stdio_server_parameters.py
│ │ │ └── stdio_server_shutdown.py
│ │ ├── README.md
│ │ ├── requirements.txt
│ │ └── server_config.json
│ ├── OpenAI_Compatible_API_Agent
│ │ ├── __init__.py
│ │ ├── docker-compose.yml
│ │ ├── Dockerfile
│ │ ├── pgpt_openai_api_mcp.json.example
│ │ ├── pgpt_openai_api_proxy.json.example
│ │ ├── Python
│ │ │ ├── __init__.py
│ │ │ ├── client_tests
│ │ │ │ ├── __init__.py
│ │ │ │ ├── openai_test_client_structured.py
│ │ │ │ ├── openai_test_client_tools.py
│ │ │ │ ├── openai_test_client.py
│ │ │ │ ├── vllm_client_multimodal.py
│ │ │ │ ├── vllm_client.py
│ │ │ │ ├── vllm_structured.py
│ │ │ │ └── vllm_structured2.py
│ │ │ ├── generate_api_key.py
│ │ │ ├── open_ai_helper.py
│ │ │ ├── openai_compatible_api.py
│ │ │ ├── openai_mcp_api.py
│ │ │ ├── pgpt_api.py
│ │ │ ├── privategpt_api.py
│ │ │ └── vllmproxy.py
│ │ ├── README.md
│ │ └── requirements.txt
│ └── SourceManagerAgent
│ ├── __init__.py
│ ├── config.json.example
│ └── Python
│ ├── __init__.py
│ ├── file_tools
│ │ └── loader_factory.py
│ ├── file_upload_agent.py
│ └── local_db.py
├── clients
│ ├── __init__.py
│ ├── C# .Net
│ │ ├── 1.0 mcp_login
│ │ │ ├── bin
│ │ │ │ └── Debug
│ │ │ │ └── net9.0
│ │ │ │ ├── mcp_login.deps.json
│ │ │ │ ├── mcp_login.dll
│ │ │ │ ├── mcp_login.exe
│ │ │ │ ├── mcp_login.pdb
│ │ │ │ ├── mcp_login.runtimeconfig.json
│ │ │ │ └── Newtonsoft.Json.dll
│ │ │ ├── mcp_login.csproj
│ │ │ ├── obj
│ │ │ │ ├── Debug
│ │ │ │ │ └── net9.0
│ │ │ │ │ ├── .NETCoreApp,Version=v9.0.AssemblyAttributes.cs
│ │ │ │ │ ├── apphost.exe
│ │ │ │ │ ├── mcp_login.AssemblyInfo.cs
│ │ │ │ │ ├── mcp_login.AssemblyInfoInputs.cache
│ │ │ │ │ ├── mcp_login.assets.cache
│ │ │ │ │ ├── mcp_login.csproj.AssemblyReference.cache
│ │ │ │ │ ├── mcp_login.csproj.CoreCompileInputs.cache
│ │ │ │ │ ├── mcp_login.csproj.FileListAbsolute.txt
│ │ │ │ │ ├── mcp_login.csproj.Up2Date
│ │ │ │ │ ├── mcp_login.dll
│ │ │ │ │ ├── mcp_login.GeneratedMSBuildEditorConfig.editorconfig
│ │ │ │ │ ├── mcp_login.genruntimeconfig.cache
│ │ │ │ │ ├── mcp_login.GlobalUsings.g.cs
│ │ │ │ │ ├── mcp_login.pdb
│ │ │ │ │ ├── ref
│ │ │ │ │ │ └── mcp_login.dll
│ │ │ │ │ └── refint
│ │ │ │ │ └── mcp_login.dll
│ │ │ │ ├── mcp_login.csproj.nuget.dgspec.json
│ │ │ │ ├── mcp_login.csproj.nuget.g.props
│ │ │ │ ├── mcp_login.csproj.nuget.g.targets
│ │ │ │ ├── project.assets.json
│ │ │ │ └── project.nuget.cache
│ │ │ └── Program.cs
│ │ ├── 1.1 mcp_logout
│ │ │ ├── bin
│ │ │ │ └── Debug
│ │ │ │ └── net9.0
│ │ │ │ ├── mcp_logout.deps.json
│ │ │ │ ├── mcp_logout.dll
│ │ │ │ ├── mcp_logout.exe
│ │ │ │ ├── mcp_logout.pdb
│ │ │ │ ├── mcp_logout.runtimeconfig.json
│ │ │ │ └── Newtonsoft.Json.dll
│ │ │ ├── mcp_logout.csproj
│ │ │ ├── obj
│ │ │ │ ├── Debug
│ │ │ │ │ └── net9.0
│ │ │ │ │ ├── .NETCoreApp,Version=v9.0.AssemblyAttributes.cs
│ │ │ │ │ ├── apphost.exe
│ │ │ │ │ ├── mcp_logout.AssemblyInfo.cs
│ │ │ │ │ ├── mcp_logout.AssemblyInfoInputs.cache
│ │ │ │ │ ├── mcp_logout.assets.cache
│ │ │ │ │ ├── mcp_logout.csproj.AssemblyReference.cache
│ │ │ │ │ ├── mcp_logout.csproj.CoreCompileInputs.cache
│ │ │ │ │ ├── mcp_logout.csproj.FileListAbsolute.txt
│ │ │ │ │ ├── mcp_logout.csproj.Up2Date
│ │ │ │ │ ├── mcp_logout.dll
│ │ │ │ │ ├── mcp_logout.GeneratedMSBuildEditorConfig.editorconfig
│ │ │ │ │ ├── mcp_logout.genruntimeconfig.cache
│ │ │ │ │ ├── mcp_logout.GlobalUsings.g.cs
│ │ │ │ │ ├── mcp_logout.pdb
│ │ │ │ │ ├── ref
│ │ │ │ │ │ └── mcp_logout.dll
│ │ │ │ │ └── refint
│ │ │ │ │ └── mcp_logout.dll
│ │ │ │ ├── mcp_logout.csproj.nuget.dgspec.json
│ │ │ │ ├── mcp_logout.csproj.nuget.g.props
│ │ │ │ ├── mcp_logout.csproj.nuget.g.targets
│ │ │ │ ├── project.assets.json
│ │ │ │ └── project.nuget.cache
│ │ │ └── Program.cs
│ │ ├── 2.0 mcp_chat
│ │ │ ├── bin
│ │ │ │ └── Debug
│ │ │ │ └── net9.0
│ │ │ │ ├── mcp_chat.deps.json
│ │ │ │ ├── mcp_chat.dll
│ │ │ │ ├── mcp_chat.exe
│ │ │ │ ├── mcp_chat.pdb
│ │ │ │ ├── mcp_chat.runtimeconfig.json
│ │ │ │ └── Newtonsoft.Json.dll
│ │ │ ├── mcp_chat.csproj
│ │ │ ├── obj
│ │ │ │ ├── Debug
│ │ │ │ │ └── net9.0
│ │ │ │ │ ├── .NETCoreApp,Version=v9.0.AssemblyAttributes.cs
│ │ │ │ │ ├── apphost.exe
│ │ │ │ │ ├── mcp_chat.AssemblyInfo.cs
│ │ │ │ │ ├── mcp_chat.AssemblyInfoInputs.cache
│ │ │ │ │ ├── mcp_chat.assets.cache
│ │ │ │ │ ├── mcp_chat.csproj.AssemblyReference.cache
│ │ │ │ │ ├── mcp_chat.csproj.CoreCompileInputs.cache
│ │ │ │ │ ├── mcp_chat.csproj.FileListAbsolute.txt
│ │ │ │ │ ├── mcp_chat.csproj.Up2Date
│ │ │ │ │ ├── mcp_chat.dll
│ │ │ │ │ ├── mcp_chat.GeneratedMSBuildEditorConfig.editorconfig
│ │ │ │ │ ├── mcp_chat.genruntimeconfig.cache
│ │ │ │ │ ├── mcp_chat.GlobalUsings.g.cs
│ │ │ │ │ ├── mcp_chat.pdb
│ │ │ │ │ ├── ref
│ │ │ │ │ │ └── mcp_chat.dll
│ │ │ │ │ └── refint
│ │ │ │ │ └── mcp_chat.dll
│ │ │ │ ├── mcp_chat.csproj.nuget.dgspec.json
│ │ │ │ ├── mcp_chat.csproj.nuget.g.props
│ │ │ │ ├── mcp_chat.csproj.nuget.g.targets
│ │ │ │ ├── project.assets.json
│ │ │ │ └── project.nuget.cache
│ │ │ └── Program.cs
│ │ ├── 2.1 mcp_continue_chat
│ │ │ ├── bin
│ │ │ │ └── Debug
│ │ │ │ └── net9.0
│ │ │ │ ├── mcp_continue_chat.deps.json
│ │ │ │ ├── mcp_continue_chat.dll
│ │ │ │ ├── mcp_continue_chat.exe
│ │ │ │ ├── mcp_continue_chat.pdb
│ │ │ │ ├── mcp_continue_chat.runtimeconfig.json
│ │ │ │ └── Newtonsoft.Json.dll
│ │ │ ├── mcp_continue_chat.csproj
│ │ │ ├── obj
│ │ │ │ ├── Debug
│ │ │ │ │ └── net9.0
│ │ │ │ │ ├── .NETCoreApp,Version=v9.0.AssemblyAttributes.cs
│ │ │ │ │ ├── apphost.exe
│ │ │ │ │ ├── mcp_cont.EF178231.Up2Date
│ │ │ │ │ ├── mcp_continue_chat.AssemblyInfo.cs
│ │ │ │ │ ├── mcp_continue_chat.AssemblyInfoInputs.cache
│ │ │ │ │ ├── mcp_continue_chat.assets.cache
│ │ │ │ │ ├── mcp_continue_chat.csproj.AssemblyReference.cache
│ │ │ │ │ ├── mcp_continue_chat.csproj.CoreCompileInputs.cache
│ │ │ │ │ ├── mcp_continue_chat.csproj.FileListAbsolute.txt
│ │ │ │ │ ├── mcp_continue_chat.dll
│ │ │ │ │ ├── mcp_continue_chat.GeneratedMSBuildEditorConfig.editorconfig
│ │ │ │ │ ├── mcp_continue_chat.genruntimeconfig.cache
│ │ │ │ │ ├── mcp_continue_chat.GlobalUsings.g.cs
│ │ │ │ │ ├── mcp_continue_chat.pdb
│ │ │ │ │ ├── ref
│ │ │ │ │ │ └── mcp_continue_chat.dll
│ │ │ │ │ └── refint
│ │ │ │ │ └── mcp_continue_chat.dll
│ │ │ │ ├── mcp_continue_chat.csproj.nuget.dgspec.json
│ │ │ │ ├── mcp_continue_chat.csproj.nuget.g.props
│ │ │ │ ├── mcp_continue_chat.csproj.nuget.g.targets
│ │ │ │ ├── project.assets.json
│ │ │ │ └── project.nuget.cache
│ │ │ └── Program.cs
│ │ ├── 2.2 mcp_get_chat_info
│ │ │ ├── bin
│ │ │ │ └── Debug
│ │ │ │ └── net9.0
│ │ │ │ ├── mcp_get_chat_info.deps.json
│ │ │ │ ├── mcp_get_chat_info.dll
│ │ │ │ ├── mcp_get_chat_info.exe
│ │ │ │ ├── mcp_get_chat_info.pdb
│ │ │ │ ├── mcp_get_chat_info.runtimeconfig.json
│ │ │ │ └── Newtonsoft.Json.dll
│ │ │ ├── Dokumente - Verknüpfung.lnk
│ │ │ ├── mcp_get_chat_info.csproj
│ │ │ ├── obj
│ │ │ │ ├── Debug
│ │ │ │ │ └── net9.0
│ │ │ │ │ ├── .NETCoreApp,Version=v9.0.AssemblyAttributes.cs
│ │ │ │ │ ├── apphost.exe
│ │ │ │ │ ├── mcp_get_.DFF47B4E.Up2Date
│ │ │ │ │ ├── mcp_get_chat_info.AssemblyInfo.cs
│ │ │ │ │ ├── mcp_get_chat_info.AssemblyInfoInputs.cache
│ │ │ │ │ ├── mcp_get_chat_info.assets.cache
│ │ │ │ │ ├── mcp_get_chat_info.csproj.AssemblyReference.cache
│ │ │ │ │ ├── mcp_get_chat_info.csproj.CoreCompileInputs.cache
│ │ │ │ │ ├── mcp_get_chat_info.csproj.FileListAbsolute.txt
│ │ │ │ │ ├── mcp_get_chat_info.dll
│ │ │ │ │ ├── mcp_get_chat_info.GeneratedMSBuildEditorConfig.editorconfig
│ │ │ │ │ ├── mcp_get_chat_info.genruntimeconfig.cache
│ │ │ │ │ ├── mcp_get_chat_info.GlobalUsings.g.cs
│ │ │ │ │ ├── mcp_get_chat_info.pdb
│ │ │ │ │ ├── ref
│ │ │ │ │ │ └── mcp_get_chat_info.dll
│ │ │ │ │ └── refint
│ │ │ │ │ └── mcp_get_chat_info.dll
│ │ │ │ ├── mcp_get_chat_info.csproj.nuget.dgspec.json
│ │ │ │ ├── mcp_get_chat_info.csproj.nuget.g.props
│ │ │ │ ├── mcp_get_chat_info.csproj.nuget.g.targets
│ │ │ │ ├── project.assets.json
│ │ │ │ └── project.nuget.cache
│ │ │ └── Program.cs
│ │ ├── 3.0 mcp_create_source
│ │ │ ├── bin
│ │ │ │ └── Debug
│ │ │ │ └── net9.0
│ │ │ │ ├── mcp_create_source.deps.json
│ │ │ │ ├── mcp_create_source.dll
│ │ │ │ ├── mcp_create_source.exe
│ │ │ │ ├── mcp_create_source.pdb
│ │ │ │ ├── mcp_create_source.runtimeconfig.json
│ │ │ │ └── Newtonsoft.Json.dll
│ │ │ ├── mcp_create_source.csproj
│ │ │ ├── obj
│ │ │ │ ├── Debug
│ │ │ │ │ └── net9.0
│ │ │ │ │ ├── .NETCoreApp,Version=v9.0.AssemblyAttributes.cs
│ │ │ │ │ ├── apphost.exe
│ │ │ │ │ ├── mcp_crea.CB4ED912.Up2Date
│ │ │ │ │ ├── mcp_create_source.AssemblyInfo.cs
│ │ │ │ │ ├── mcp_create_source.AssemblyInfoInputs.cache
│ │ │ │ │ ├── mcp_create_source.assets.cache
│ │ │ │ │ ├── mcp_create_source.csproj.AssemblyReference.cache
│ │ │ │ │ ├── mcp_create_source.csproj.CoreCompileInputs.cache
│ │ │ │ │ ├── mcp_create_source.csproj.FileListAbsolute.txt
│ │ │ │ │ ├── mcp_create_source.dll
│ │ │ │ │ ├── mcp_create_source.GeneratedMSBuildEditorConfig.editorconfig
│ │ │ │ │ ├── mcp_create_source.genruntimeconfig.cache
│ │ │ │ │ ├── mcp_create_source.GlobalUsings.g.cs
│ │ │ │ │ ├── mcp_create_source.pdb
│ │ │ │ │ ├── ref
│ │ │ │ │ │ └── mcp_create_source.dll
│ │ │ │ │ └── refint
│ │ │ │ │ └── mcp_create_source.dll
│ │ │ │ ├── mcp_create_source.csproj.nuget.dgspec.json
│ │ │ │ ├── mcp_create_source.csproj.nuget.g.props
│ │ │ │ ├── mcp_create_source.csproj.nuget.g.targets
│ │ │ │ ├── project.assets.json
│ │ │ │ └── project.nuget.cache
│ │ │ └── Program.cs
│ │ ├── 3.1 mcp_get_source
│ │ │ ├── bin
│ │ │ │ └── Debug
│ │ │ │ └── net9.0
│ │ │ │ ├── mcp_get_source.deps.json
│ │ │ │ ├── mcp_get_source.dll
│ │ │ │ ├── mcp_get_source.exe
│ │ │ │ ├── mcp_get_source.pdb
│ │ │ │ ├── mcp_get_source.runtimeconfig.json
│ │ │ │ └── Newtonsoft.Json.dll
│ │ │ ├── mcp_get_source.csproj
│ │ │ ├── obj
│ │ │ │ ├── Debug
│ │ │ │ │ └── net9.0
│ │ │ │ │ ├── .NETCoreApp,Version=v9.0.AssemblyAttributes.cs
│ │ │ │ │ ├── apphost.exe
│ │ │ │ │ ├── mcp_get_.4E61956F.Up2Date
│ │ │ │ │ ├── mcp_get_source.AssemblyInfo.cs
│ │ │ │ │ ├── mcp_get_source.AssemblyInfoInputs.cache
│ │ │ │ │ ├── mcp_get_source.assets.cache
│ │ │ │ │ ├── mcp_get_source.csproj.AssemblyReference.cache
│ │ │ │ │ ├── mcp_get_source.csproj.CoreCompileInputs.cache
│ │ │ │ │ ├── mcp_get_source.csproj.FileListAbsolute.txt
│ │ │ │ │ ├── mcp_get_source.dll
│ │ │ │ │ ├── mcp_get_source.GeneratedMSBuildEditorConfig.editorconfig
│ │ │ │ │ ├── mcp_get_source.genruntimeconfig.cache
│ │ │ │ │ ├── mcp_get_source.GlobalUsings.g.cs
│ │ │ │ │ ├── mcp_get_source.pdb
│ │ │ │ │ ├── ref
│ │ │ │ │ │ └── mcp_get_source.dll
│ │ │ │ │ └── refint
│ │ │ │ │ └── mcp_get_source.dll
│ │ │ │ ├── mcp_get_source.csproj.nuget.dgspec.json
│ │ │ │ ├── mcp_get_source.csproj.nuget.g.props
│ │ │ │ ├── mcp_get_source.csproj.nuget.g.targets
│ │ │ │ ├── project.assets.json
│ │ │ │ └── project.nuget.cache
│ │ │ └── Program.cs
│ │ ├── 3.2 mcp_list_sources
│ │ │ ├── bin
│ │ │ │ └── Debug
│ │ │ │ └── net9.0
│ │ │ │ ├── mcp_list_sources.deps.json
│ │ │ │ ├── mcp_list_sources.dll
│ │ │ │ ├── mcp_list_sources.exe
│ │ │ │ ├── mcp_list_sources.pdb
│ │ │ │ ├── mcp_list_sources.runtimeconfig.json
│ │ │ │ └── Newtonsoft.Json.dll
│ │ │ ├── mcp_list_sources.csproj
│ │ │ ├── obj
│ │ │ │ ├── Debug
│ │ │ │ │ └── net9.0
│ │ │ │ │ ├── .NETCoreApp,Version=v9.0.AssemblyAttributes.cs
│ │ │ │ │ ├── apphost.exe
│ │ │ │ │ ├── mcp_list_sources.AssemblyInfo.cs
│ │ │ │ │ ├── mcp_list_sources.AssemblyInfoInputs.cache
│ │ │ │ │ ├── mcp_list_sources.assets.cache
│ │ │ │ │ ├── mcp_list_sources.csproj.AssemblyReference.cache
│ │ │ │ │ ├── mcp_list_sources.csproj.CoreCompileInputs.cache
│ │ │ │ │ ├── mcp_list_sources.csproj.FileListAbsolute.txt
│ │ │ │ │ ├── mcp_list_sources.dll
│ │ │ │ │ ├── mcp_list_sources.GeneratedMSBuildEditorConfig.editorconfig
│ │ │ │ │ ├── mcp_list_sources.genruntimeconfig.cache
│ │ │ │ │ ├── mcp_list_sources.GlobalUsings.g.cs
│ │ │ │ │ ├── mcp_list_sources.pdb
│ │ │ │ │ ├── mcp_list.A720E197.Up2Date
│ │ │ │ │ ├── ref
│ │ │ │ │ │ └── mcp_list_sources.dll
│ │ │ │ │ └── refint
│ │ │ │ │ └── mcp_list_sources.dll
│ │ │ │ ├── mcp_list_sources.csproj.nuget.dgspec.json
│ │ │ │ ├── mcp_list_sources.csproj.nuget.g.props
│ │ │ │ ├── mcp_list_sources.csproj.nuget.g.targets
│ │ │ │ ├── project.assets.json
│ │ │ │ └── project.nuget.cache
│ │ │ └── Program.cs
│ │ ├── 3.3 mcp_edit_source
│ │ │ ├── bin
│ │ │ │ └── Debug
│ │ │ │ └── net9.0
│ │ │ │ ├── mcp_edit_source.deps.json
│ │ │ │ ├── mcp_edit_source.dll
│ │ │ │ ├── mcp_edit_source.exe
│ │ │ │ ├── mcp_edit_source.pdb
│ │ │ │ ├── mcp_edit_source.runtimeconfig.json
│ │ │ │ └── Newtonsoft.Json.dll
│ │ │ ├── mcp_edit_source.csproj
│ │ │ ├── obj
│ │ │ │ ├── Debug
│ │ │ │ │ └── net9.0
│ │ │ │ │ ├── .NETCoreApp,Version=v9.0.AssemblyAttributes.cs
│ │ │ │ │ ├── apphost.exe
│ │ │ │ │ ├── mcp_edit_source.AssemblyInfo.cs
│ │ │ │ │ ├── mcp_edit_source.AssemblyInfoInputs.cache
│ │ │ │ │ ├── mcp_edit_source.assets.cache
│ │ │ │ │ ├── mcp_edit_source.csproj.AssemblyReference.cache
│ │ │ │ │ ├── mcp_edit_source.csproj.CoreCompileInputs.cache
│ │ │ │ │ ├── mcp_edit_source.csproj.FileListAbsolute.txt
│ │ │ │ │ ├── mcp_edit_source.dll
│ │ │ │ │ ├── mcp_edit_source.GeneratedMSBuildEditorConfig.editorconfig
│ │ │ │ │ ├── mcp_edit_source.genruntimeconfig.cache
│ │ │ │ │ ├── mcp_edit_source.GlobalUsings.g.cs
│ │ │ │ │ ├── mcp_edit_source.pdb
│ │ │ │ │ ├── mcp_edit.7303BE3B.Up2Date
│ │ │ │ │ ├── ref
│ │ │ │ │ │ └── mcp_edit_source.dll
│ │ │ │ │ └── refint
│ │ │ │ │ └── mcp_edit_source.dll
│ │ │ │ ├── mcp_edit_source.csproj.nuget.dgspec.json
│ │ │ │ ├── mcp_edit_source.csproj.nuget.g.props
│ │ │ │ ├── mcp_edit_source.csproj.nuget.g.targets
│ │ │ │ ├── project.assets.json
│ │ │ │ └── project.nuget.cache
│ │ │ └── Program.cs
│ │ ├── 3.4 mcp_delete_source
│ │ │ ├── bin
│ │ │ │ └── Debug
│ │ │ │ └── net9.0
│ │ │ │ ├── mcp_delete_source.deps.json
│ │ │ │ ├── mcp_delete_source.dll
│ │ │ │ ├── mcp_delete_source.exe
│ │ │ │ ├── mcp_delete_source.pdb
│ │ │ │ ├── mcp_delete_source.runtimeconfig.json
│ │ │ │ └── Newtonsoft.Json.dll
│ │ │ ├── mcp_delete_source.csproj
│ │ │ ├── obj
│ │ │ │ ├── Debug
│ │ │ │ │ └── net9.0
│ │ │ │ │ ├── .NETCoreApp,Version=v9.0.AssemblyAttributes.cs
│ │ │ │ │ ├── apphost.exe
│ │ │ │ │ ├── mcp_dele.67DD13F9.Up2Date
│ │ │ │ │ ├── mcp_delete_source.AssemblyInfo.cs
│ │ │ │ │ ├── mcp_delete_source.AssemblyInfoInputs.cache
│ │ │ │ │ ├── mcp_delete_source.assets.cache
│ │ │ │ │ ├── mcp_delete_source.csproj.AssemblyReference.cache
│ │ │ │ │ ├── mcp_delete_source.csproj.CoreCompileInputs.cache
│ │ │ │ │ ├── mcp_delete_source.csproj.FileListAbsolute.txt
│ │ │ │ │ ├── mcp_delete_source.dll
│ │ │ │ │ ├── mcp_delete_source.GeneratedMSBuildEditorConfig.editorconfig
│ │ │ │ │ ├── mcp_delete_source.genruntimeconfig.cache
│ │ │ │ │ ├── mcp_delete_source.GlobalUsings.g.cs
│ │ │ │ │ ├── mcp_delete_source.pdb
│ │ │ │ │ ├── ref
│ │ │ │ │ │ └── mcp_delete_source.dll
│ │ │ │ │ └── refint
│ │ │ │ │ └── mcp_delete_source.dll
│ │ │ │ ├── mcp_delete_source.csproj.nuget.dgspec.json
│ │ │ │ ├── mcp_delete_source.csproj.nuget.g.props
│ │ │ │ ├── mcp_delete_source.csproj.nuget.g.targets
│ │ │ │ ├── project.assets.json
│ │ │ │ └── project.nuget.cache
│ │ │ └── Program.cs
│ │ ├── 4.0 mcp_list_groups
│ │ │ ├── bin
│ │ │ │ └── Debug
│ │ │ │ └── net9.0
│ │ │ │ ├── mcp_list_groups.deps.json
│ │ │ │ ├── mcp_list_groups.dll
│ │ │ │ ├── mcp_list_groups.exe
│ │ │ │ ├── mcp_list_groups.pdb
│ │ │ │ ├── mcp_list_groups.runtimeconfig.json
│ │ │ │ └── Newtonsoft.Json.dll
│ │ │ ├── mcp_list_groups.csproj
│ │ │ ├── obj
│ │ │ │ ├── Debug
│ │ │ │ │ └── net9.0
│ │ │ │ │ ├── .NETCoreApp,Version=v9.0.AssemblyAttributes.cs
│ │ │ │ │ ├── apphost.exe
│ │ │ │ │ ├── mcp_list_groups.AssemblyInfo.cs
│ │ │ │ │ ├── mcp_list_groups.AssemblyInfoInputs.cache
│ │ │ │ │ ├── mcp_list_groups.assets.cache
│ │ │ │ │ ├── mcp_list_groups.csproj.AssemblyReference.cache
│ │ │ │ │ ├── mcp_list_groups.csproj.CoreCompileInputs.cache
│ │ │ │ │ ├── mcp_list_groups.csproj.FileListAbsolute.txt
│ │ │ │ │ ├── mcp_list_groups.dll
│ │ │ │ │ ├── mcp_list_groups.GeneratedMSBuildEditorConfig.editorconfig
│ │ │ │ │ ├── mcp_list_groups.genruntimeconfig.cache
│ │ │ │ │ ├── mcp_list_groups.GlobalUsings.g.cs
│ │ │ │ │ ├── mcp_list_groups.pdb
│ │ │ │ │ ├── mcp_list.EBD5E0D2.Up2Date
│ │ │ │ │ ├── ref
│ │ │ │ │ │ └── mcp_list_groups.dll
│ │ │ │ │ └── refint
│ │ │ │ │ └── mcp_list_groups.dll
│ │ │ │ ├── mcp_list_groups.csproj.nuget.dgspec.json
│ │ │ │ ├── mcp_list_groups.csproj.nuget.g.props
│ │ │ │ ├── mcp_list_groups.csproj.nuget.g.targets
│ │ │ │ ├── project.assets.json
│ │ │ │ └── project.nuget.cache
│ │ │ └── Program.cs
│ │ ├── 4.1 mcp_store_group
│ │ │ ├── bin
│ │ │ │ └── Debug
│ │ │ │ └── net9.0
│ │ │ │ ├── mcp_store_group.deps.json
│ │ │ │ ├── mcp_store_group.dll
│ │ │ │ ├── mcp_store_group.exe
│ │ │ │ ├── mcp_store_group.pdb
│ │ │ │ ├── mcp_store_group.runtimeconfig.json
│ │ │ │ └── Newtonsoft.Json.dll
│ │ │ ├── mcp_store_group.csproj
│ │ │ ├── obj
│ │ │ │ ├── Debug
│ │ │ │ │ └── net9.0
│ │ │ │ │ ├── .NETCoreApp,Version=v9.0.AssemblyAttributes.cs
│ │ │ │ │ ├── apphost.exe
│ │ │ │ │ ├── mcp_stor.AFB4AA35.Up2Date
│ │ │ │ │ ├── mcp_store_group.AssemblyInfo.cs
│ │ │ │ │ ├── mcp_store_group.AssemblyInfoInputs.cache
│ │ │ │ │ ├── mcp_store_group.assets.cache
│ │ │ │ │ ├── mcp_store_group.csproj.AssemblyReference.cache
│ │ │ │ │ ├── mcp_store_group.csproj.CoreCompileInputs.cache
│ │ │ │ │ ├── mcp_store_group.csproj.FileListAbsolute.txt
│ │ │ │ │ ├── mcp_store_group.dll
│ │ │ │ │ ├── mcp_store_group.GeneratedMSBuildEditorConfig.editorconfig
│ │ │ │ │ ├── mcp_store_group.genruntimeconfig.cache
│ │ │ │ │ ├── mcp_store_group.GlobalUsings.g.cs
│ │ │ │ │ ├── mcp_store_group.pdb
│ │ │ │ │ ├── ref
│ │ │ │ │ │ └── mcp_store_group.dll
│ │ │ │ │ └── refint
│ │ │ │ │ └── mcp_store_group.dll
│ │ │ │ ├── mcp_store_group.csproj.nuget.dgspec.json
│ │ │ │ ├── mcp_store_group.csproj.nuget.g.props
│ │ │ │ ├── mcp_store_group.csproj.nuget.g.targets
│ │ │ │ ├── project.assets.json
│ │ │ │ └── project.nuget.cache
│ │ │ └── Program.cs
│ │ ├── 4.2 mcp_delete_group
│ │ │ ├── bin
│ │ │ │ └── Debug
│ │ │ │ └── net9.0
│ │ │ │ ├── mcp_delete_group.deps.json
│ │ │ │ ├── mcp_delete_group.dll
│ │ │ │ ├── mcp_delete_group.exe
│ │ │ │ ├── mcp_delete_group.pdb
│ │ │ │ ├── mcp_delete_group.runtimeconfig.json
│ │ │ │ └── Newtonsoft.Json.dll
│ │ │ ├── mcp_delete_group.csproj
│ │ │ ├── obj
│ │ │ │ ├── Debug
│ │ │ │ │ └── net9.0
│ │ │ │ │ ├── .NETCoreApp,Version=v9.0.AssemblyAttributes.cs
│ │ │ │ │ ├── apphost.exe
│ │ │ │ │ ├── mcp_dele.FE1C6298.Up2Date
│ │ │ │ │ ├── mcp_delete_group.AssemblyInfo.cs
│ │ │ │ │ ├── mcp_delete_group.AssemblyInfoInputs.cache
│ │ │ │ │ ├── mcp_delete_group.assets.cache
│ │ │ │ │ ├── mcp_delete_group.csproj.AssemblyReference.cache
│ │ │ │ │ ├── mcp_delete_group.csproj.CoreCompileInputs.cache
│ │ │ │ │ ├── mcp_delete_group.csproj.FileListAbsolute.txt
│ │ │ │ │ ├── mcp_delete_group.dll
│ │ │ │ │ ├── mcp_delete_group.GeneratedMSBuildEditorConfig.editorconfig
│ │ │ │ │ ├── mcp_delete_group.genruntimeconfig.cache
│ │ │ │ │ ├── mcp_delete_group.GlobalUsings.g.cs
│ │ │ │ │ ├── mcp_delete_group.pdb
│ │ │ │ │ ├── ref
│ │ │ │ │ │ └── mcp_delete_group.dll
│ │ │ │ │ └── refint
│ │ │ │ │ └── mcp_delete_group.dll
│ │ │ │ ├── mcp_delete_group.csproj.nuget.dgspec.json
│ │ │ │ ├── mcp_delete_group.csproj.nuget.g.props
│ │ │ │ ├── mcp_delete_group.csproj.nuget.g.targets
│ │ │ │ ├── project.assets.json
│ │ │ │ └── project.nuget.cache
│ │ │ └── Program.cs
│ │ ├── 5.0 mcp_store_user
│ │ │ ├── bin
│ │ │ │ └── Debug
│ │ │ │ └── net9.0
│ │ │ │ ├── mcp_store_user.deps.json
│ │ │ │ ├── mcp_store_user.dll
│ │ │ │ ├── mcp_store_user.exe
│ │ │ │ ├── mcp_store_user.pdb
│ │ │ │ ├── mcp_store_user.runtimeconfig.json
│ │ │ │ └── Newtonsoft.Json.dll
│ │ │ ├── mcp_store_user.csproj
│ │ │ ├── obj
│ │ │ │ ├── Debug
│ │ │ │ │ └── net9.0
│ │ │ │ │ ├── .NETCoreApp,Version=v9.0.AssemblyAttributes.cs
│ │ │ │ │ ├── apphost.exe
│ │ │ │ │ ├── mcp_stor.6C0F0C8A.Up2Date
│ │ │ │ │ ├── mcp_store_user.AssemblyInfo.cs
│ │ │ │ │ ├── mcp_store_user.AssemblyInfoInputs.cache
│ │ │ │ │ ├── mcp_store_user.assets.cache
│ │ │ │ │ ├── mcp_store_user.csproj.AssemblyReference.cache
│ │ │ │ │ ├── mcp_store_user.csproj.CoreCompileInputs.cache
│ │ │ │ │ ├── mcp_store_user.csproj.FileListAbsolute.txt
│ │ │ │ │ ├── mcp_store_user.dll
│ │ │ │ │ ├── mcp_store_user.GeneratedMSBuildEditorConfig.editorconfig
│ │ │ │ │ ├── mcp_store_user.genruntimeconfig.cache
│ │ │ │ │ ├── mcp_store_user.GlobalUsings.g.cs
│ │ │ │ │ ├── mcp_store_user.pdb
│ │ │ │ │ ├── ref
│ │ │ │ │ │ └── mcp_store_user.dll
│ │ │ │ │ └── refint
│ │ │ │ │ └── mcp_store_user.dll
│ │ │ │ ├── mcp_store_user.csproj.nuget.dgspec.json
│ │ │ │ ├── mcp_store_user.csproj.nuget.g.props
│ │ │ │ ├── mcp_store_user.csproj.nuget.g.targets
│ │ │ │ ├── project.assets.json
│ │ │ │ └── project.nuget.cache
│ │ │ └── Program.cs
│ │ ├── 5.1 mcp_edit_user
│ │ │ ├── bin
│ │ │ │ └── Debug
│ │ │ │ └── net9.0
│ │ │ │ ├── mcp_edit_user.deps.json
│ │ │ │ ├── mcp_edit_user.dll
│ │ │ │ ├── mcp_edit_user.exe
│ │ │ │ ├── mcp_edit_user.pdb
│ │ │ │ ├── mcp_edit_user.runtimeconfig.json
│ │ │ │ └── Newtonsoft.Json.dll
│ │ │ ├── mcp_edit_user.csproj
│ │ │ ├── obj
│ │ │ │ ├── Debug
│ │ │ │ │ └── net9.0
│ │ │ │ │ ├── .NETCoreApp,Version=v9.0.AssemblyAttributes.cs
│ │ │ │ │ ├── apphost.exe
│ │ │ │ │ ├── mcp_edit_user.AssemblyInfo.cs
│ │ │ │ │ ├── mcp_edit_user.AssemblyInfoInputs.cache
│ │ │ │ │ ├── mcp_edit_user.assets.cache
│ │ │ │ │ ├── mcp_edit_user.csproj.AssemblyReference.cache
│ │ │ │ │ ├── mcp_edit_user.csproj.CoreCompileInputs.cache
│ │ │ │ │ ├── mcp_edit_user.csproj.FileListAbsolute.txt
│ │ │ │ │ ├── mcp_edit_user.dll
│ │ │ │ │ ├── mcp_edit_user.GeneratedMSBuildEditorConfig.editorconfig
│ │ │ │ │ ├── mcp_edit_user.genruntimeconfig.cache
│ │ │ │ │ ├── mcp_edit_user.GlobalUsings.g.cs
│ │ │ │ │ ├── mcp_edit_user.pdb
│ │ │ │ │ ├── mcp_edit.94A30270.Up2Date
│ │ │ │ │ ├── ref
│ │ │ │ │ │ └── mcp_edit_user.dll
│ │ │ │ │ └── refint
│ │ │ │ │ └── mcp_edit_user.dll
│ │ │ │ ├── mcp_edit_user.csproj.nuget.dgspec.json
│ │ │ │ ├── mcp_edit_user.csproj.nuget.g.props
│ │ │ │ ├── mcp_edit_user.csproj.nuget.g.targets
│ │ │ │ ├── project.assets.json
│ │ │ │ └── project.nuget.cache
│ │ │ └── Program.cs
│ │ ├── 5.2 mcp_delete_user
│ │ │ ├── bin
│ │ │ │ └── Debug
│ │ │ │ └── net9.0
│ │ │ │ ├── mcp_delete_user.deps.json
│ │ │ │ ├── mcp_delete_user.dll
│ │ │ │ ├── mcp_delete_user.exe
│ │ │ │ ├── mcp_delete_user.pdb
│ │ │ │ ├── mcp_delete_user.runtimeconfig.json
│ │ │ │ └── Newtonsoft.Json.dll
│ │ │ ├── mcp_delete_user.csproj
│ │ │ ├── obj
│ │ │ │ ├── Debug
│ │ │ │ │ └── net9.0
│ │ │ │ │ ├── .NETCoreApp,Version=v9.0.AssemblyAttributes.cs
│ │ │ │ │ ├── apphost.exe
│ │ │ │ │ ├── mcp_dele.CEB7E33D.Up2Date
│ │ │ │ │ ├── mcp_delete_user.AssemblyInfo.cs
│ │ │ │ │ ├── mcp_delete_user.AssemblyInfoInputs.cache
│ │ │ │ │ ├── mcp_delete_user.assets.cache
│ │ │ │ │ ├── mcp_delete_user.csproj.AssemblyReference.cache
│ │ │ │ │ ├── mcp_delete_user.csproj.CoreCompileInputs.cache
│ │ │ │ │ ├── mcp_delete_user.csproj.FileListAbsolute.txt
│ │ │ │ │ ├── mcp_delete_user.dll
│ │ │ │ │ ├── mcp_delete_user.GeneratedMSBuildEditorConfig.editorconfig
│ │ │ │ │ ├── mcp_delete_user.genruntimeconfig.cache
│ │ │ │ │ ├── mcp_delete_user.GlobalUsings.g.cs
│ │ │ │ │ ├── mcp_delete_user.pdb
│ │ │ │ │ ├── ref
│ │ │ │ │ │ └── mcp_delete_user.dll
│ │ │ │ │ └── refint
│ │ │ │ │ └── mcp_delete_user.dll
│ │ │ │ ├── mcp_delete_user.csproj.nuget.dgspec.json
│ │ │ │ ├── mcp_delete_user.csproj.nuget.g.props
│ │ │ │ ├── mcp_delete_user.csproj.nuget.g.targets
│ │ │ │ ├── project.assets.json
│ │ │ │ └── project.nuget.cache
│ │ │ └── Program.cs
│ │ ├── Code Archiv
│ │ │ ├── mcp_chat.cs
│ │ │ ├── mcp_continue_chat.cs
│ │ │ ├── mcp_create_source.cs
│ │ │ ├── mcp_delete_group.cs
│ │ │ ├── mcp_delete_source.cs
│ │ │ ├── mcp_delete_user.cs
│ │ │ ├── mcp_edit_source.cs
│ │ │ ├── mcp_edit_user.cs
│ │ │ ├── mcp_get_chat_info.cs
│ │ │ ├── mcp_get_source.cs
│ │ │ ├── mcp_list_groups.cs
│ │ │ ├── mcp_list_sources.cs
│ │ │ ├── mcp_login.cs
│ │ │ ├── mcp_logout.cs
│ │ │ ├── mcp_store_group.cs
│ │ │ └── mcp_store_user.cs
│ │ └── README.md
│ ├── C++
│ │ ├── .vscode
│ │ │ └── launch.json
│ │ ├── 1.0 mcp_login
│ │ │ ├── MCPLoginClient.cpp
│ │ │ └── Non-TLS version
│ │ │ ├── MCPLoginClient.cpp
│ │ │ └── MCPLoginClient.exe
│ │ ├── 1.1 mcp_logout
│ │ │ ├── MCPLogoutClient.cpp
│ │ │ └── MCPLogoutClient.exe
│ │ ├── 2.0 mcp_chat
│ │ │ ├── MCPChatClient.cpp
│ │ │ └── MCPChatClient.exe
│ │ ├── 2.1 mcp_continue_chat
│ │ │ ├── MCPChatContinuationClient.cpp
│ │ │ └── MCPChatContinuationClient.exe
│ │ ├── 2.2 mcp_get_chat_info
│ │ │ ├── MCPGetChatInfoClient.cpp
│ │ │ └── MCPGetChatInfoClient.exe
│ │ ├── 3.0 mcp_create_source
│ │ │ ├── MCPCreateSourceClient.cpp
│ │ │ └── MCPCreateSourceClient.exe
│ │ ├── 3.1 mcp_get_source
│ │ │ ├── MCPGetSourceClient.cpp
│ │ │ └── MCPGetSourceClient.exe
│ │ ├── 3.2 mcp_list_sources
│ │ │ ├── MCPListSourcesClient.cpp
│ │ │ └── MCPListSourcesClient.exe
│ │ ├── 3.3 mcp_edit_source
│ │ │ ├── MCPEditSourceClient.cpp
│ │ │ └── MCPEditSourceClient.exe
│ │ ├── 3.4 mcp_delete_source
│ │ │ ├── MCPDeleteSourceClient.cpp
│ │ │ └── MCPDeleteSourceClient.exe
│ │ ├── 4.0 mcp_list_groups
│ │ │ ├── MCPListGroupsClient.cpp
│ │ │ └── MCPListGroupsClient.exe
│ │ ├── 4.1 mcp_store_group
│ │ │ ├── MCPStoreGroupClient.cpp
│ │ │ └── MCPStoreGroupClient.exe
│ │ ├── 4.2 mcp_delete_group
│ │ │ ├── MPCDeleteGroupClient.cpp
│ │ │ └── MPCDeleteGroupClient.exe
│ │ ├── 5.0 mcp_store_user
│ │ │ ├── MCPStoreUserClient.cpp
│ │ │ └── MCPStoreUserClient.exe
│ │ ├── 5.1 mcp_edit_user
│ │ │ ├── MCPEditUserClient.cpp
│ │ │ └── MCPEditUserClient.exe
│ │ ├── 5.2 mcp_delete_user
│ │ │ ├── MCPDeleteUserClient.cpp
│ │ │ └── MCPDeleteUserClient.exe
│ │ ├── 9.0 mcp_keygen
│ │ │ ├── MCPKeygenClient.cpp
│ │ │ └── MCPKeygenClient.exe
│ │ └── README.md
│ ├── Go
│ │ ├── 1.0 mcp_login
│ │ │ ├── go.mod
│ │ │ ├── MCPLoginClient.exe
│ │ │ └── MCPLoginClient.go
│ │ ├── 1.1 mcp_logout
│ │ │ ├── MCPLogoutClient.exe
│ │ │ └── MCPLogoutClient.go
│ │ ├── 2.0 mcp_chat
│ │ │ ├── MCPChatClient.exe
│ │ │ └── MCPChatClient.go
│ │ ├── 2.1 mcp_continue_chat
│ │ │ ├── MCPChatContinuationClient.exe
│ │ │ └── MCPChatContinuationClient.go
│ │ ├── 2.2 mcp_get_chat_info
│ │ │ ├── MCPGetChatInfoClient.exe
│ │ │ └── MCPGetChatInfoClient.go
│ │ ├── 3.0 mcp_create_source
│ │ │ ├── MCPCreateSourceClient.exe
│ │ │ └── MCPCreateSourceClient.go
│ │ ├── 3.1 mcp_get_source
│ │ │ ├── MCPGetSourceClient.exe
│ │ │ └── MCPGetSourceClient.go
│ │ ├── 3.2 mcp_list_sources
│ │ │ ├── MCPListSourcesClient.exe
│ │ │ └── MCPListSourcesClient.go
│ │ ├── 3.3 mcp_edit_source
│ │ │ ├── MCPEditSourceClient.exe
│ │ │ └── MCPEditSourceClient.go
│ │ ├── 3.4 mcp_delete_source
│ │ │ ├── MCPDeleteSourceClient.exe
│ │ │ └── MCPDeleteSourceClient.go
│ │ ├── 4.0 mcp_list_groups
│ │ │ ├── MCPListGroupsClient.exe
│ │ │ └── MCPListGroupsClient.go
│ │ ├── 4.1 mcp_store_group
│ │ │ ├── MCPStoreGroupClient.exe
│ │ │ └── MCPStoreGroupClient.go
│ │ ├── 4.2 mcp_delete_group
│ │ │ ├── MCPDeleteGroupClient.exe
│ │ │ └── MCPDeleteGroupClient.go
│ │ ├── 5.0 mcp_store_user
│ │ │ ├── MCPStoreUserClient.exe
│ │ │ └── MCPStoreUserClient.go
│ │ ├── 5.1 mcp_edit_user
│ │ │ ├── MCPEditUserClient.exe
│ │ │ └── MCPEditUserClient.go
│ │ ├── 5.2 mcp_delete_user
│ │ │ ├── MCPDeleteUserClient.exe
│ │ │ └── MCPDeleteUserClient.go
│ │ ├── 9.0 mcp_keygen
│ │ │ ├── MCPKeygenClient.exe
│ │ │ └── MCPKeygenClient.go
│ │ └── README.md
│ ├── Gradio
│ │ ├── Api.py
│ │ ├── config.json.example
│ │ ├── config.py
│ │ ├── favicon.ico
│ │ ├── file_tools
│ │ │ └── loader_factory.py
│ │ ├── language.py
│ │ ├── logos
│ │ │ ├── fsas.png
│ │ │ └── Logo_dark.svg
│ │ ├── main.py
│ │ ├── mcp_client.py
│ │ ├── mcp_servers
│ │ │ ├── arxiv
│ │ │ │ ├── arxiv-stdio.js
│ │ │ │ ├── package.json
│ │ │ │ ├── README.md
│ │ │ │ ├── requirements.txt
│ │ │ │ └── server_config.example.json
│ │ │ ├── demo-mcp-server
│ │ │ │ ├── demo-tools-sse.js
│ │ │ │ ├── demo-tools-stdio.js
│ │ │ │ └── tools
│ │ │ │ ├── assets.js
│ │ │ │ ├── calculator.js
│ │ │ │ └── weather.js
│ │ │ ├── filesystem
│ │ │ │ ├── Dockerfile
│ │ │ │ ├── index.ts
│ │ │ │ ├── package.json
│ │ │ │ ├── README.md
│ │ │ │ ├── test
│ │ │ │ │ └── new.txt
│ │ │ │ └── tsconfig.json
│ │ │ ├── moondream
│ │ │ │ └── server.py
│ │ │ ├── pgpt
│ │ │ │ ├── __init__.py
│ │ │ │ ├── Api.py
│ │ │ │ ├── config.json.example
│ │ │ │ ├── config.py
│ │ │ │ ├── language.py
│ │ │ │ ├── pyproject.toml
│ │ │ │ ├── README.md
│ │ │ │ └── server.py
│ │ │ ├── replicate_flux
│ │ │ │ └── server.py
│ │ │ └── sqlite
│ │ │ ├── .python-version
│ │ │ ├── Dockerfile
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── src
│ │ │ └── mcp_server_sqlite
│ │ │ ├── __init__.py
│ │ │ └── server.py
│ │ ├── messages
│ │ │ ├── __init__.py
│ │ │ ├── message_types
│ │ │ │ ├── __init__.py
│ │ │ │ ├── incrementing_id_message.py
│ │ │ │ ├── initialize_message.py
│ │ │ │ ├── json_rpc_message.py
│ │ │ │ ├── ping_message.py
│ │ │ │ ├── prompts_messages.py
│ │ │ │ ├── prompts_models.py
│ │ │ │ ├── resources_messages.py
│ │ │ │ └── tools_messages.py
│ │ │ ├── send_call_tool.py
│ │ │ ├── send_initialize_message.py
│ │ │ ├── send_message.py
│ │ │ ├── send_ping.py
│ │ │ ├── send_prompts.py
│ │ │ ├── send_resources.py
│ │ │ └── send_tools_list.py
│ │ ├── README.md
│ │ ├── requirements.txt
│ │ ├── server_config.json
│ │ ├── SourceManagement.py
│ │ ├── transport
│ │ │ ├── __init__.py
│ │ │ └── stdio
│ │ │ ├── __init__.py
│ │ │ ├── stdio_client.py
│ │ │ ├── stdio_server_parameters.py
│ │ │ └── stdio_server_shutdown.py
│ │ ├── tsconfig.json
│ │ └── UserManagement.py
│ ├── Java
│ │ ├── 1.0 mcp_login
│ │ │ ├── json-20241224.jar
│ │ │ ├── MCPLoginClient.class
│ │ │ └── MCPLoginClient.java
│ │ ├── 1.1 mcp_logout
│ │ │ ├── json-20241224.jar
│ │ │ ├── MCPLogoutClient.class
│ │ │ └── MCPLogoutClient.java
│ │ ├── 2.0 mcp_chat
│ │ │ ├── json-20241224.jar
│ │ │ ├── MCPChatClient.class
│ │ │ └── MCPChatClient.java
│ │ ├── 2.1 mcp_continue_chat
│ │ │ ├── json-20241224.jar
│ │ │ ├── MCPContinueChatClient.class
│ │ │ └── MCPContinueChatClient.java
│ │ ├── 2.2 mcp_get_chat_info
│ │ │ ├── json-20241224.jar
│ │ │ ├── MCPGetChatInfoClient.class
│ │ │ └── MCPGetChatInfoClient.java
│ │ ├── 3.0 mcp_create_source
│ │ │ ├── json-20241224.jar
│ │ │ ├── MCPCreateSourceClient.class
│ │ │ └── MCPCreateSourceClient.java
│ │ ├── 3.1 mcp_get_source
│ │ │ ├── json-20241224.jar
│ │ │ ├── MCPGetSourceClient.class
│ │ │ └── MCPGetSourceClient.java
│ │ ├── 3.2 mcp_list_sources
│ │ │ ├── json-20241224.jar
│ │ │ ├── MCPListSourcesClient.class
│ │ │ └── MCPListSourcesClient.java
│ │ ├── 3.3 mcp_edit_source
│ │ │ ├── json-20241224.jar
│ │ │ ├── MCPEditSourceClient.class
│ │ │ └── MCPEditSourceClient.java
│ │ ├── 3.4 mcp_delete_source
│ │ │ ├── json-20241224.jar
│ │ │ ├── MCPDeleteSourceClient.class
│ │ │ └── MCPDeleteSourceClient.java
│ │ ├── 4.0 mcp_list_groups
│ │ │ ├── json-20241224.jar
│ │ │ ├── MCPListGroupsClient.class
│ │ │ └── MCPListGroupsClient.java
│ │ ├── 4.1 mcp_store_group
│ │ │ ├── json-20241224.jar
│ │ │ ├── MCPStoreGroupClient.class
│ │ │ └── MCPStoreGroupClient.java
│ │ ├── 4.2 mcp_delete_group
│ │ │ ├── json-20241224.jar
│ │ │ ├── MCPDeleteGroupClient.class
│ │ │ └── MCPDeleteGroupClient.java
│ │ ├── 5.0 mcp_store_user
│ │ │ ├── json-20241224.jar
│ │ │ ├── MCPStoreUserClient.class
│ │ │ └── MCPStoreUserClient.java
│ │ ├── 5.1 mcp_edit_user
│ │ │ ├── json-20241224.jar
│ │ │ ├── MCPEditUserClient.class
│ │ │ └── MCPEditUserClient.java
│ │ ├── 5.2 mcp_delete_user
│ │ │ ├── json-20241224.jar
│ │ │ ├── MCPDeleteUserClient.class
│ │ │ └── MCPDeleteUserClient.java
│ │ └── README.md
│ ├── JavaScript
│ │ ├── 1.0 mcp_login
│ │ │ └── MCPLoginClient.js
│ │ ├── 1.1 mcp_logout
│ │ │ └── MCPLogoutClient.js
│ │ ├── 2.0 mcp_chat
│ │ │ └── MCPChatClient.js
│ │ ├── 2.1 mcp_continue_chat
│ │ │ └── MCPContinueChatClient.js
│ │ ├── 2.2 mcp_get_chat_info
│ │ │ └── MCPGetChatInfoClient.js
│ │ ├── 3.0 mcp_create_source
│ │ │ └── MCPCreateSourceClient.js
│ │ ├── 3.1 mcp_get_source
│ │ │ └── MCPGetSourceClient.js
│ │ ├── 3.2 mcp_list_sources
│ │ │ └── MCPListSourcesClient.js
│ │ ├── 3.3 mcp_edit_source
│ │ │ └── MCPEditSourceClient.js
│ │ ├── 3.4 mcp_delete_source
│ │ │ └── MCPDeleteSourceClient.js
│ │ ├── 4.0 mcp_list_groups
│ │ │ └── MCPListGroupsClient.js
│ │ ├── 4.1 mcp_store_group
│ │ │ └── MCPStoreGroupClient.js
│ │ ├── 4.2 mcp_delete_group
│ │ │ └── MCPDeleteGroupClient.js
│ │ ├── 5.0 mcp_store_user
│ │ │ └── MCPStoreUserClient.js
│ │ ├── 5.1 mcp_edit_user
│ │ │ └── MCPEditUserClient.js
│ │ ├── 5.2 mcp_delete_user
│ │ │ └── MCPDeleteUserClient.js
│ │ ├── 9.0 mcp_keygen
│ │ │ └── MCPKeygenClient.js
│ │ └── README.md
│ ├── PHP
│ │ ├── 1.0 mcp_login
│ │ │ └── MCPLoginClient.php
│ │ ├── 1.1 mcp_logout
│ │ │ └── MCPLogoutClient.php
│ │ ├── 2.0 mcp_chat
│ │ │ └── MCPChatClient.php
│ │ ├── 2.1 mcp_continue_chat
│ │ │ └── MCPContinueChatClient.php
│ │ ├── 2.2 mcp_get_chat_info
│ │ │ └── MCPGetChatInfoClient.php
│ │ ├── 3.0 mcp_create_source
│ │ │ └── MCPCreateSourceClient.php
│ │ ├── 3.1 mcp_get_source
│ │ │ └── MCPGetSourceClient.php
│ │ ├── 3.2 mcp_list_sources
│ │ │ └── MCPListSourcesClient.php
│ │ ├── 3.3 mcp_edit_source
│ │ │ └── MCPEditSourceClient.php
│ │ ├── 3.4 mcp_delete_source
│ │ │ └── MCPDeleteSourceClient.php
│ │ ├── 4.0 mcp_list_groups
│ │ │ └── MCPListGroupsClient.php
│ │ ├── 4.1 mcp_store_group
│ │ │ └── MCPStoreGroupClient.php
│ │ ├── 4.2 mcp_delete_group
│ │ │ └── MCPDeleteGroupClient.php
│ │ ├── 5.0 mcp_store_user
│ │ │ └── MCPStoreUserClient.php
│ │ ├── 5.1 mcp_edit_user
│ │ │ └── MCPEditUserClient.php
│ │ ├── 5.2 mcp_delete_user
│ │ │ └── MCPDeleteUserClient.php
│ │ ├── 9.0 mcp_keygen
│ │ │ └── MCPKeygenClient.php
│ │ └── README.md
│ └── Python
│ ├── __init__.py
│ ├── 1.0 mcp_login
│ │ └── MCPLoginClient.py
│ ├── 1.1 mcp_logout
│ │ └── MCPLogoutClient.py
│ ├── 2.0 mcp_chat
│ │ └── MCPChatClient.py
│ ├── 2.1 mcp_continue_chat
│ │ └── MCPContinueChatClient.py
│ ├── 2.2 mcp_get_chat_info
│ │ └── MCPGetChatInfoClient.py
│ ├── 2.3 mcp_delete_all_chats
│ │ └── MCPDeleteAllChatsClient.py
│ ├── 2.4 mcp_delete_chat
│ │ └── MCPDeleteChatClient.py
│ ├── 3.0 mcp_create_source
│ │ └── MCPCreateSourceClient.py
│ ├── 3.1 mcp_get_source
│ │ └── MCPGetSourceClient.py
│ ├── 3.2 mcp_list_sources
│ │ └── MCPListSourcesClient.py
│ ├── 3.3 mcp_edit_source
│ │ └── MCPEditSourceClient.py
│ ├── 3.4 mcp_delete_source
│ │ └── MCPDeleteSourceClient.py
│ ├── 4.0 mcp_list_groups
│ │ └── MCPListGroupsClient.py
│ ├── 4.1 mcp_store_group
│ │ └── MCPStoreGroupClient.py
│ ├── 4.2 mcp_delete_group
│ │ └── MCPDeleteGroupClient.py
│ ├── 5.0 mcp_store_user
│ │ └── MCPStoreUserClient.py
│ ├── 5.1 mcp_edit_user
│ │ └── MCPEditUserClient.py
│ ├── 5.2 mcp_delete_user
│ │ └── MCPDeleteUserClient.py
│ ├── 9.0 mcp_keygen
│ │ └── MCPKeygenClient.py
│ ├── Gradio
│ │ ├── __init__.py
│ │ └── server_config.json
│ └── README.md
├── examples
│ ├── create_users_from_csv
│ │ ├── config.json.example
│ │ ├── config.py
│ │ ├── create_users_from_csv.py
│ │ └── language.py
│ ├── dynamic_sources
│ │ └── rss_reader
│ │ ├── Api.py
│ │ ├── config.json.example
│ │ ├── config.py
│ │ ├── demo_dynamic_sources.py
│ │ └── rss_parser.py
│ ├── example_users_to_add_no_tz.csv
│ └── sftp_upload_with_id
│ ├── Api.py
│ ├── config_ftp.json.example
│ ├── config.py
│ ├── demo_upload.py
│ ├── language.py
│ └── requirements.txt
├── images
│ ├── alternative mcp client.png
│ ├── favicon
│ │ ├── android-chrome-192x192.png
│ │ ├── android-chrome-512x512.png
│ │ ├── apple-touch-icon.png
│ │ ├── favicon-16x16.png
│ │ ├── favicon-32x32.png
│ │ ├── favicon.ico
│ │ └── site.webmanifest
│ ├── mcp-general-architecture.png
│ ├── privateGPT-MCP.png
│ └── privateGPT.png
├── InstallMPCServer.sh
├── jest.config.js
├── LICENSE
├── package.json
├── pgpt.env.json.example
├── README.md
├── security
│ ├── generate_decrypted_password.js
│ └── generate_encrypted_password.js
├── src
│ ├── helper.js
│ ├── index.js
│ ├── logger.js
│ ├── pgpt-messages.js
│ ├── public
│ │ ├── index.html
│ │ └── pgpt-mcp-logo.png
│ ├── services
│ │ └── pgpt-service.ts
│ └── types
│ └── api.ts
├── start_chatbot_agent.ps1
├── start_chatbot_agent.sh
├── start_iot_agent.ps1
├── start_iot_agent.sh
├── start_openai_compatible_api_agent.ps1
├── start_openai_compatible_api_agent.sh
├── tsconfig.json
├── ver
│ ├── index_np.js
│ └── index_proxy_np.js
└── WORKLOG.md
```
# Files
--------------------------------------------------------------------------------
/examples/dynamic_sources/rss_reader/Api.py:
--------------------------------------------------------------------------------
```python
import json
from pathlib import Path
import requests
import urllib3
import base64
from httpcore import NetworkError
from .config import Config
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def initialize_session(proxy_user, proxy_password, access_header):
"""Set up the session with proxy authentication."""
session = requests.Session()
session.verify = False
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
}
if access_header is not None:
headers['X-Custom-Header'] = access_header
elif proxy_user is not None and proxy_password is not None:
auth = base64.b64encode(f"{proxy_user}:{proxy_password}".encode()).decode()
headers['Authorization'] = f'Basic {auth}'
session.headers.update(headers)
return session
class PrivateGPTAPI:
def __init__(self, config, client_api_key=None):
"""Initialize the chat client with proxy authentication."""
self.token = None
self.chat_id = None
self.base_url = config.get("base_url")
self.proxy_user = config.get("proxy_user", None)
if self.proxy_user == "":
self.proxy_user = None
self.proxy_password = config.get("proxy_password", None)
if self.proxy_password == "":
self.proxy_password = None
self.access_header = config.get("access_header", None)
if self.access_header == "":
self.access_header = None
self.chosen_groups = config.get("groups", [])
self.language = config.get("language", "en")
self.use_public = config.get("use_public", True)
self.whitelist_keys = config.get("whitelist_keys", [])
self.logged_in = False
if client_api_key is not None:
self.email, self.password = decrypt_api_key(client_api_key)
if len(self.whitelist_keys) > 0:
if client_api_key not in self.whitelist_keys:
print("not authorized")
else:
self.email = config.get("email", None)
self.password = config.get("password", None)
self.session = initialize_session(self.proxy_user, self.proxy_password, self.access_header)
if self.login():
self.logged_in = True
def login(self):
"""Authenticate the user and retrieve the token."""
url = f"{self.base_url}/login"
payload = {"email": self.email, "password": self.password}
try:
response = self.session.post(url, json=payload)
print(response.content)
response.raise_for_status()
data = response.json()
self.token = data['data']['token']
# Prüfen, ob der Header bereits existiert
if 'Authorization' in self.session.headers:
self.session.headers['Authorization'] += f', Bearer {self.token}'
else:
self.session.headers['Authorization'] = f'Bearer {self.token}'
self.chat_id = None
print("✅ Login successful.")
return True
except requests.exceptions.RequestException as e:
print(f"❌ Login failed: {e}")
return False
def create_chat(self, user_input):
"""Start a new chat session.
This method sends a POST request to the '/chats' endpoint with the provided parameters.
It initializes a new chat session and stores the chat ID for future use.
"""
url = f"{self.base_url}/chats"
payload = {
"language": self.language,
"question": user_input, # Initial question to start the chat
"usePublic": self.use_public,
"groups": self.chosen_groups
}
try:
response = self.session.post(url, json=payload)
response.raise_for_status() # Raise an exception if the response was not successful
data = response.json()
self.chat_id = data['data']['chatId'] # Store the chat ID for future use
print("✅ Chat initialized.")
resp = response.json()
try:
answer = resp.get('data', None).get('answer', "error")
except:
print(response.json())
resp = {"data":
{"answer": "error"}
}
answer = "error"
if answer.startswith("{\"role\":"):
answerj = json.loads(answer)
resp["data"]["answer"] = answerj["content"]
resp["data"]["chatId"] = "0"
print(f"💡 Response: {answer}")
return resp
except requests.exceptions.RequestException as e:
# It seems we get disconnections from time to time..
# print(f"⚠️ Failed to get response on first try, trying again..: {e}")
try:
response = self.session.patch(url, json=payload)
response.raise_for_status()
data = response.json()
answer = data.get('data', {}).get('answer', "No answer provided.")
print(f"💡 Response: {answer}")
return data
except:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def list_personal_groups(self):
url = f"{self.base_url}/groups"
try:
resp = self.session.get(url)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
personal = data_block.get("personalGroups", [])
return personal
except NetworkError as e:
return []
def get_document_info(self, id):
url = f"{self.base_url}/sources/{id }"
try:
resp = self.session.get(url)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
return data_block
except NetworkError as e:
return []
def query_private_gpt(self, user_input) -> json:
"""Send a question to the chat and retrieve the response."""
if not self.chat_id:
print("❌ Chat session not initialized.")
return False
url = f"{self.base_url}/chats/{self.chat_id}"
payload = {"question": user_input}
try:
response = self.session.patch(url, json=payload)
# response.raise_for_status()
resp = response.json()
try:
answer = resp.get('data', None).get('answer', "error")
except:
print(response.json())
resp = {"data":
{"answer": "error"}
}
answer = "error"
if answer.startswith("{\"role\":"):
answerj = json.loads(answer)
resp["data"]["answer"] = answerj["content"]
resp["data"]["chatId"] = "0"
print(f"💡 Response: {answer}")
return resp
except requests.exceptions.RequestException as e:
# It seems we get disconnections from time to time..
# print(f"⚠️ Failed to get response on first try, trying again..: {e}")
try:
response = self.session.patch(url, json=payload)
response.raise_for_status()
data = response.json()
answer = data.get('data', {}).get('answer', "No answer provided.")
print(f"💡 Response: {answer}")
return data
except:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def add_source(self, markdown, groups, name):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources"
try:
payload = {
"name": name,
"groups": groups,
"content": markdown
}
resp = self.session.post(url, json=payload)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
return data_block
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def update_source(self, source_id, markdown=None, groups=None, name=None):
"""Edit an existing Source"""
url = f"{self.base_url}/sources/{source_id}"
try:
payload = {}
if groups is None:
existing_groups = self.get_document_info(source_id)["groups"]
payload["groups"] = existing_groups
else:
payload["groups"] = groups
if markdown is not None:
payload["content"] = markdown
if name is not None:
payload["name"] = name
resp = self.session.patch(url, json=payload)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
return data_block
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def delete_source(self, source_id):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources/{source_id}"
try:
resp = self.session.delete(url)
j = json.loads(resp.content)
message = j["message"]
if not message:
return "failed"
return message
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def get_sources_from_group(self, group):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources/groups"
try:
payload = {
"groupName": group
}
resp = self.session.post(url, json=payload)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
sources = []
for source in data_block["sources"]:
doc = self.get_document_info(source)
sources.append(doc)
return sources
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def respond_with_context(self, messages, response_format=None, request_tools=None):
last_user_message = next((p for p in reversed(messages) if p["role"] == "user"), None)
user_input = ""
for message in messages:
if message["role"] == "system":
user_input = str(message) + "\n"
if last_user_message is not None:
user_input += last_user_message["content"]
last_assistant_message = next((p for p in reversed(messages) if p["role"] == "assistant"), None)
last_tool_message = next((p for p in reversed(messages) if p["role"] == "tool"), None)
hastoolresult = False
if last_tool_message is not None and last_assistant_message is not None and last_assistant_message.tool_calls is not None and len(
last_assistant_message.tool_calls) > 0:
user_input += "\nYou called the tool: " + str(
last_assistant_message.tool_calls[0]) + ". The result was: " + last_tool_message.content
hastoolresult = True
print(f"💁 Request: " + user_input)
# PGPT manages history and context itself so we don't need to forward the history.
add_context = False
if add_context:
messages.pop()
user_input += "\nHere is some context about the previous conversation:\n"
for message in messages:
user_input += f"{message.role}: {message.content}\n"
if response_format is not None:
print("Response format: " + str(response_format))
user_input += add_response_format(response_format)
if request_tools is not None and not hastoolresult:
user_input += add_tools(request_tools, last_tool_message)
if not self.logged_in:
self.login()
else:
if self.chat_id is None:
result = self.create_chat(user_input)
else:
result = self.query_private_gpt(user_input)
if 'data' in result:
response_data = result.get("data")
if request_tools is not None and not hastoolresult and is_json(
clean_response(response_data.get("answer"))):
response_data["tool_call"] = clean_response(response_data.get("answer", ""))
return response_data
elif 'error' in result:
# Try to login again and send the query once more on error.
if self.login():
if self.chat_id is None:
result = self.create_chat(user_input)
else:
result = self.query_private_gpt(user_input)
if 'data' in result:
return result['data']
else:
return result
else:
return result
def is_json(myjson):
try:
json.loads(myjson)
except ValueError as e:
return False
return True
def add_response_format(response_format):
# prompt = "\nPlease fill in the following template with realistic and appropriate information. Be creative. The field 'type' defines the output format. In your reply, only return the generated json\n"
prompt = "\nPlease fill in the following json template with realistic and appropriate information. In your reply, only return the generated json. If you can't answer return an empty json.\n"
prompt += json.dumps(response_format)
return prompt
def add_tools(response_tools, last_tool_message):
prompt = "\nPlease select the fitting provided tool to create your answer. Only return the generated result of the tool. Do not describe what you are doing, just return the json.\n"
index = 1
for tool in response_tools:
prompt += "\n" + json.dumps(tool) + "\n"
index += 1
return prompt
def clean_response(response):
# Remove artefacts from reply here
response = response.replace("[TOOL_CALLS]", "")
return response
def decrypt_api_key(api_key):
"""
This is PoC code and methods should be replaced with a more secure way to deal with credentials (e.g. in a db)
"""
try:
base64_bytes = api_key.encode("ascii")
decoded_string_bytes = base64.b64decode(base64_bytes)
decoded_key = decoded_string_bytes.decode("ascii")
except Exception as e:
print(e)
decoded_key = "invalid:invalid"
return decoded_key.split(":")[0], decoded_key.split(":")[1]
def main():
"""Main function to run the chat application."""
config_file = Path.absolute(Path(__file__).parent.parent / "pgpt_openai_api_proxy.json")
config = Config(config_file=config_file, required_fields=["base_url"])
chat = PrivateGPTAPI(config)
print("Type your questions below. Type 'quit' to exit.")
while True:
try:
question = input("❓ Question: ").strip()
if question.lower() == 'quit':
break
if question:
chat.query_private_gpt(question)
except KeyboardInterrupt:
print("\nExiting chat...")
break
except Exception as e:
print(f"❌ Error: {str(e)}")
break
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/clients/Gradio/mcp_servers/pgpt/Api.py:
--------------------------------------------------------------------------------
```python
import json
from pathlib import Path
import requests
import urllib3
import base64
from httpcore import NetworkError
from config import Config
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def initialize_session(proxy_user, proxy_password, access_header):
"""Set up the session with proxy authentication."""
session = requests.Session()
session.verify = False
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
}
if access_header is not None:
headers['X-Custom-Header'] = access_header
elif proxy_user is not None and proxy_password is not None:
auth = base64.b64encode(f"{proxy_user}:{proxy_password}".encode()).decode()
headers['Authorization'] = f'Basic {auth}'
session.headers.update(headers)
return session
class PrivateGPTAPI:
def __init__(self, config, client_api_key=None):
"""Initialize the chat client with proxy authentication."""
self.token = None
self.chat_id = None
self.base_url = config.get("base_url")
self.proxy_user = config.get("proxy_user", None)
if self.proxy_user == "":
self.proxy_user = None
self.proxy_password = config.get("proxy_password", None)
if self.proxy_password == "":
self.proxy_password = None
self.access_header = config.get("access_header", None)
if self.access_header == "":
self.access_header = None
self.chosen_groups = config.get("groups", [])
self.language = config.get("language", "en")
self.use_public = config.get("use_public", True)
self.whitelist_keys = config.get("whitelist_keys", [])
self.logged_in = False
if client_api_key is not None:
self.email, self.password = decrypt_api_key(client_api_key)
if len(self.whitelist_keys) > 0:
if client_api_key not in self.whitelist_keys:
print("not authorized")
else:
self.email = config.get("email", None)
self.password = config.get("password", None)
self.session = initialize_session(self.proxy_user, self.proxy_password, self.access_header)
if self.login():
self.logged_in = True
def login(self):
"""Authenticate the user and retrieve the token."""
url = f"{self.base_url}/login"
payload = {"email": self.email, "password": self.password}
try:
response = self.session.post(url, json=payload)
print(response.content)
response.raise_for_status()
data = response.json()
self.token = data['data']['token']
# Prüfen, ob der Header bereits existiert
if 'Authorization' in self.session.headers:
self.session.headers['Authorization'] += f', Bearer {self.token}'
else:
self.session.headers['Authorization'] = f'Bearer {self.token}'
self.chat_id = None
print("✅ Login successful.")
return True
except requests.exceptions.RequestException as e:
print(f"❌ Login failed: {e}")
return False
def create_chat(self, user_input):
"""Start a new chat session.
This method sends a POST request to the '/chats' endpoint with the provided parameters.
It initializes a new chat session and stores the chat ID for future use.
"""
url = f"{self.base_url}/chats"
payload = {
"language": self.language,
"question": user_input, # Initial question to start the chat
"usePublic": self.use_public,
"groups": self.chosen_groups
}
try:
response = self.session.post(url, json=payload)
response.raise_for_status() # Raise an exception if the response was not successful
data = response.json()
self.chat_id = data['data']['chatId'] # Store the chat ID for future use
print("✅ Chat initialized.")
resp = response.json()
try:
answer = resp.get('data', None).get('answer', "error")
except:
print(response.json())
resp = {"data":
{"answer": "error"}
}
answer = "error"
if answer.startswith("{\"role\":"):
answerj = json.loads(answer)
resp["data"]["answer"] = answerj["content"]
resp["data"]["chatId"] = "0"
print(f"💡 Response: {answer}")
return resp
except requests.exceptions.RequestException as e:
# It seems we get disconnections from time to time..
# print(f"⚠️ Failed to get response on first try, trying again..: {e}")
try:
response = self.session.patch(url, json=payload)
response.raise_for_status()
data = response.json()
answer = data.get('data', {}).get('answer', "No answer provided.")
print(f"💡 Response: {answer}")
return data
except:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def list_personal_groups(self):
url = f"{self.base_url}/groups"
try:
resp = self.session.get(url)
try:
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
personal = data_block.get("personalGroups", [])
return personal
except:
return []
except NetworkError as e:
return []
def get_document_info(self, id):
url = f"{self.base_url}/sources/{id }"
try:
resp = self.session.get(url)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
return data_block
except NetworkError as e:
return []
def query_private_gpt(self, user_input) -> json:
"""Send a question to the chat and retrieve the response."""
if not self.chat_id:
print("❌ Chat session not initialized.")
return False
url = f"{self.base_url}/chats/{self.chat_id}"
payload = {"question": user_input}
try:
response = self.session.patch(url, json=payload)
# response.raise_for_status()
resp = response.json()
try:
answer = resp.get('data', None).get('answer', "error")
except:
print(response.json())
resp = {"data":
{"answer": "error"}
}
answer = "error"
if answer.startswith("{\"role\":"):
answerj = json.loads(answer)
resp["data"]["answer"] = answerj["content"]
resp["data"]["chatId"] = "0"
print(f"💡 Response: {answer}")
return resp
except requests.exceptions.RequestException as e:
# It seems we get disconnections from time to time..
# print(f"⚠️ Failed to get response on first try, trying again..: {e}")
try:
response = self.session.patch(url, json=payload)
response.raise_for_status()
data = response.json()
answer = data.get('data', {}).get('answer', "No answer provided.")
print(f"💡 Response: {answer}")
return data
except:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def add_user(self, userName, userEmail, userPassword, userGroups):
"""Add a user"""
url = f"{self.base_url}/users"
try:
payload = { # necessary name, email, password, usePublic
"name": userName,
"email": userEmail,
"password": userPassword,
"usePublic": False,
"language": "en", # optional - defaults to "en"
# "timezone": "UTC", # optional - defaults to "Europe/Berlin"
"groups": userGroups,
"roles": ["documents"]
# "activateFtp": true,
# "ftpPassword": "myFTP-Password1337"
}
resp = self.session.post(url, json=payload)
j = json.loads(resp.content)
data_block = j["message"]
if not data_block:
return "failed"
return data_block
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def add_source(self, markdown, groups, name):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources"
try:
payload = {
"name": name,
"groups": groups,
"content": markdown
}
resp = self.session.post(url, json=payload)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
return data_block
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def delete_source(self, source_id):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources/{source_id}"
try:
resp = self.session.delete(url)
j = json.loads(resp.content)
message = j["message"]
if not message:
return "failed"
return message
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def get_sources_from_group(self, group):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources/groups"
try:
payload = {
"groupName": group
}
resp = self.session.post(url, json=payload)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
sources = []
for source in data_block["sources"]:
doc = self.get_document_info(source)
sources.append(doc)
return sources
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return []
def respond_with_context(self, messages, response_format=None, request_tools=None):
last_user_message = next((p for p in reversed(messages) if p["role"] == "user"), None)
user_input = ""
for message in messages:
if message["role"] == "system":
user_input = str(message) + "\n"
if last_user_message is not None:
user_input += last_user_message["content"]
last_assistant_message = next((p for p in reversed(messages) if p["role"] == "assistant"), None)
last_tool_message = next((p for p in reversed(messages) if p["role"] == "tool"), None)
hastoolresult = False
if last_tool_message is not None and last_assistant_message is not None and last_assistant_message.tool_calls is not None and len(
last_assistant_message.tool_calls) > 0:
user_input += "\nYou called the tool: " + str(
last_assistant_message.tool_calls[0]) + ". The result was: " + last_tool_message.content
hastoolresult = True
print(f"💁 Request: " + user_input)
# PGPT manages history and context itself so we don't need to forward the history.
add_context = False
if add_context:
messages.pop()
user_input += "\nHere is some context about the previous conversation:\n"
for message in messages:
user_input += f"{message.role}: {message.content}\n"
if response_format is not None:
print("Response format: " + str(response_format))
user_input += add_response_format(response_format)
if request_tools is not None and not hastoolresult:
user_input += add_tools(request_tools, last_tool_message)
if not self.logged_in:
self.login()
else:
if self.chat_id is None:
result = self.create_chat(user_input)
else:
result = self.query_private_gpt(user_input)
if 'data' in result:
response_data = result.get("data")
if request_tools is not None and not hastoolresult and is_json(
clean_response(response_data.get("answer"))):
response_data["tool_call"] = clean_response(response_data.get("answer", ""))
return response_data
elif 'error' in result:
# Try to login again and send the query once more on error.
if self.login():
if self.chat_id is None:
result = self.create_chat(user_input)
else:
result = self.query_private_gpt(user_input)
if 'data' in result:
return result['data']
else:
return result
else:
return result
def is_json(myjson):
try:
json.loads(myjson)
except ValueError as e:
return False
return True
def add_response_format(response_format):
# prompt = "\nPlease fill in the following template with realistic and appropriate information. Be creative. The field 'type' defines the output format. In your reply, only return the generated json\n"
prompt = "\nPlease fill in the following json template with realistic and appropriate information. In your reply, only return the generated json. If you can't answer return an empty json.\n"
prompt += json.dumps(response_format)
return prompt
def add_tools(response_tools, last_tool_message):
prompt = "\nPlease select the fitting provided tool to create your answer. Only return the generated result of the tool. Do not describe what you are doing, just return the json.\n"
index = 1
for tool in response_tools:
prompt += "\n" + json.dumps(tool) + "\n"
index += 1
return prompt
def clean_response(response):
# Remove artefacts from reply here
response = response.replace("[TOOL_CALLS]", "")
return response
def decrypt_api_key(api_key):
"""
This is PoC code and methods should be replaced with a more secure way to deal with credentials (e.g. in a db)
"""
try:
base64_bytes = api_key.encode("ascii")
decoded_string_bytes = base64.b64decode(base64_bytes)
decoded_key = decoded_string_bytes.decode("ascii")
except Exception as e:
print(e)
decoded_key = "invalid:invalid"
return decoded_key.split(":")[0], decoded_key.split(":")[1]
def main():
"""Main function to run the chat application."""
config_file = Path.absolute(Path(__file__).parent.parent / "pgpt_openai_api_proxy.json")
config = Config(config_file=config_file, required_fields=["base_url"])
chat = PrivateGPTAPI(config)
print("Type your questions below. Type 'quit' to exit.")
while True:
try:
question = input("❓ Question: ").strip()
if question.lower() == 'quit':
break
if question:
chat.query_private_gpt(question)
except KeyboardInterrupt:
print("\nExiting chat...")
break
except Exception as e:
print(f"❌ Error: {str(e)}")
break
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/examples/sftp_upload_with_id/Api.py:
--------------------------------------------------------------------------------
```python
import json
import os
import posixpath
from pathlib import Path
from time import sleep
import paramiko
import requests
import urllib3
import base64
from httpcore import NetworkError
from .config import Config
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def initialize_session(proxy_user, proxy_password, access_header):
"""Set up the session with proxy authentication."""
session = requests.Session()
session.verify = False
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
}
if access_header is not None:
headers['X-Custom-Header'] = access_header
elif proxy_user is not None and proxy_password is not None:
auth = base64.b64encode(f"{proxy_user}:{proxy_password}".encode()).decode()
headers['Authorization'] = f'Basic {auth}'
session.headers.update(headers)
return session
class PrivateGPTAPI:
def __init__(self, config, client_api_key=None):
"""Initialize the chat client with proxy authentication."""
self.token = None
self.chat_id = None
self.base_url = config.get("base_url")
self.proxy_user = config.get("proxy_user", None)
if self.proxy_user == "":
self.proxy_user = None
self.proxy_password = config.get("proxy_password", None)
if self.proxy_password == "":
self.proxy_password = None
self.access_header = config.get("access_header", None)
if self.access_header == "":
self.access_header = None
self.chosen_groups = config.get("groups", [])
self.language = config.get("language", "en")
self.use_public = config.get("use_public", True)
self.whitelist_keys = config.get("whitelist_keys", [])
self.logged_in = False
if client_api_key is not None:
self.email, self.password = decrypt_api_key(client_api_key)
if len(self.whitelist_keys) > 0:
if client_api_key not in self.whitelist_keys:
print("not authorized")
else:
self.email = config.get("email", None)
self.password = config.get("password", None)
self.ftp_password = config.get("ftp_password", None)
self.session = initialize_session(self.proxy_user, self.proxy_password, self.access_header)
if self.login():
self.logged_in = True
if self.ftp_password is not None:
self.ftp_host = config.get("ftp_host", None)
self.ftp_port = config.get("ftp_port", None)
self.ftp_folder = config.get("ftp_folder", "/")
self.ftp_subfolder = config.get("ftp_subfolder", "temp")
def login(self):
"""Authenticate the user and retrieve the token."""
url = f"{self.base_url}/login"
payload = {"email": self.email, "password": self.password}
try:
response = self.session.post(url, json=payload)
print(response.content)
response.raise_for_status()
data = response.json()
self.token = data['data']['token']
# Prüfen, ob der Header bereits existiert
if 'Authorization' in self.session.headers:
self.session.headers['Authorization'] += f', Bearer {self.token}'
else:
self.session.headers['Authorization'] = f'Bearer {self.token}'
self.chat_id = None
print("✅ Login successful.")
return True
except requests.exceptions.RequestException as e:
print(f"❌ Login failed: {e}")
return False
def create_chat(self, user_input):
"""Start a new chat session.
This method sends a POST request to the '/chats' endpoint with the provided parameters.
It initializes a new chat session and stores the chat ID for future use.
"""
url = f"{self.base_url}/chats"
payload = {
"language": self.language,
"question": user_input, # Initial question to start the chat
"usePublic": self.use_public,
"groups": self.chosen_groups
}
try:
response = self.session.post(url, json=payload)
response.raise_for_status() # Raise an exception if the response was not successful
data = response.json()
self.chat_id = data['data']['chatId'] # Store the chat ID for future use
print("✅ Chat initialized.")
resp = response.json()
try:
answer = resp.get('data', None).get('answer', "error")
except:
print(response.json())
resp = {"data":
{"answer": "error"}
}
answer = "error"
if answer.startswith("{\"role\":"):
answerj = json.loads(answer)
resp["data"]["answer"] = answerj["content"]
resp["data"]["chatId"] = "0"
print(f"💡 Response: {answer}")
return resp
except requests.exceptions.RequestException as e:
# It seems we get disconnections from time to time..
# print(f"⚠️ Failed to get response on first try, trying again..: {e}")
try:
response = self.session.patch(url, json=payload)
response.raise_for_status()
data = response.json()
answer = data.get('data', {}).get('answer', "No answer provided.")
print(f"💡 Response: {answer}")
return data
except:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def list_personal_groups(self):
url = f"{self.base_url}/groups"
try:
resp = self.session.get(url)
try:
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
personal = data_block.get("personalGroups", [])
return personal
except:
return []
except NetworkError as e:
return []
def get_document_info(self, id):
url = f"{self.base_url}/sources/{id }"
try:
resp = self.session.get(url)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
return data_block
except NetworkError as e:
return []
def query_private_gpt(self, user_input) -> json:
"""Send a question to the chat and retrieve the response."""
if not self.chat_id:
print("❌ Chat session not initialized.")
return False
url = f"{self.base_url}/chats/{self.chat_id}"
payload = {"question": user_input}
try:
response = self.session.patch(url, json=payload)
# response.raise_for_status()
resp = response.json()
try:
answer = resp.get('data', None).get('answer', "error")
except:
print(response.json())
resp = {"data":
{"answer": "error"}
}
answer = "error"
if answer.startswith("{\"role\":"):
answerj = json.loads(answer)
resp["data"]["answer"] = answerj["content"]
resp["data"]["chatId"] = "0"
print(f"💡 Response: {answer}")
return resp
except requests.exceptions.RequestException as e:
# It seems we get disconnections from time to time..
# print(f"⚠️ Failed to get response on first try, trying again..: {e}")
try:
response = self.session.patch(url, json=payload)
response.raise_for_status()
data = response.json()
answer = data.get('data', {}).get('answer', "No answer provided.")
print(f"💡 Response: {answer}")
return data
except:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def add_source(self, markdown, groups, name):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources"
try:
payload = {
"name": name,
"groups": groups,
"content": markdown
}
resp = self.session.post(url, json=payload)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
return data_block
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def update_source(self, source_id, markdown=None, groups=None, name=None):
"""Edit an existing Source"""
url = f"{self.base_url}/sources/{source_id}"
try:
payload = {}
if groups is None:
existing_groups = self.get_document_info(source_id)["groups"]
payload["groups"] = existing_groups
else:
payload["groups"] = groups
if markdown is not None:
payload["content"] = markdown
if name is not None:
payload["name"] = name
resp = self.session.patch(url, json=payload)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
return data_block
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def delete_source(self, source_id):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources/{source_id}"
try:
resp = self.session.delete(url)
j = json.loads(resp.content)
message = j["message"]
if not message:
return "failed"
return message
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def upload_sftp(self, file_path):
# Connect to SFTP to determine existing suffixes
transport = paramiko.Transport((self.ftp_host, self.ftp_port))
transport.connect(username=self.email, password=self.ftp_password)
sftp = paramiko.SFTPClient.from_transport(transport)
remote_base_dir = posixpath.join(self.ftp_folder, self.ftp_subfolder)
# Ensure the remote directory exists
try:
sftp.chdir(remote_base_dir)
except IOError:
# Create remote dirs if missing
parts = remote_base_dir.strip("/").split("/")
path = ""
for part in parts:
path = posixpath.join(path, part)
try:
sftp.chdir(path)
except IOError:
sftp.mkdir(path)
sftp.chdir(path)
# Determine remote file name
remote_filename = os.path.basename(file_path)
remote_path = posixpath.join(remote_base_dir, remote_filename)
# Upload the file
try:
sftp.put(file_path, remote_path)
print(f"Uploaded {file_path} to {remote_path} successfully.")
except Exception as e:
print(e)
finally:
sftp.close()
transport.close()
print(f"Connection closed")
sources = []
while len(sources) == 0:
print(f"Checking file status")
sleep(2)
sources = self.get_sources_from_group("temp")
return sources
def get_sources_from_group(self, group):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources/groups"
try:
payload = {
"groupName": group
}
resp = self.session.post(url, json=payload)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
sources = []
for source in data_block["sources"]:
doc = self.get_document_info(source)
sources.append(doc)
return sources
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return []
def respond_with_context(self, messages, response_format=None, request_tools=None):
last_user_message = next((p for p in reversed(messages) if p["role"] == "user"), None)
user_input = ""
for message in messages:
if message["role"] == "system":
user_input = str(message) + "\n"
if last_user_message is not None:
user_input += last_user_message["content"]
last_assistant_message = next((p for p in reversed(messages) if p["role"] == "assistant"), None)
last_tool_message = next((p for p in reversed(messages) if p["role"] == "tool"), None)
hastoolresult = False
if last_tool_message is not None and last_assistant_message is not None and last_assistant_message.tool_calls is not None and len(
last_assistant_message.tool_calls) > 0:
user_input += "\nYou called the tool: " + str(
last_assistant_message.tool_calls[0]) + ". The result was: " + last_tool_message.content
hastoolresult = True
print(f"💁 Request: " + user_input)
# PGPT manages history and context itself so we don't need to forward the history.
add_context = False
if add_context:
messages.pop()
user_input += "\nHere is some context about the previous conversation:\n"
for message in messages:
user_input += f"{message.role}: {message.content}\n"
if response_format is not None:
print("Response format: " + str(response_format))
user_input += add_response_format(response_format)
if request_tools is not None and not hastoolresult:
user_input += add_tools(request_tools, last_tool_message)
if not self.logged_in:
self.login()
else:
if self.chat_id is None:
result = self.create_chat(user_input)
else:
result = self.query_private_gpt(user_input)
if 'data' in result:
response_data = result.get("data")
if request_tools is not None and not hastoolresult and is_json(
clean_response(response_data.get("answer"))):
response_data["tool_call"] = clean_response(response_data.get("answer", ""))
return response_data
elif 'error' in result:
# Try to login again and send the query once more on error.
if self.login():
if self.chat_id is None:
result = self.create_chat(user_input)
else:
result = self.query_private_gpt(user_input)
if 'data' in result:
return result['data']
else:
return result
else:
return result
def is_json(myjson):
try:
json.loads(myjson)
except ValueError as e:
return False
return True
def add_response_format(response_format):
# prompt = "\nPlease fill in the following template with realistic and appropriate information. Be creative. The field 'type' defines the output format. In your reply, only return the generated json\n"
prompt = "\nPlease fill in the following json template with realistic and appropriate information. In your reply, only return the generated json. If you can't answer return an empty json.\n"
prompt += json.dumps(response_format)
return prompt
def add_tools(response_tools, last_tool_message):
prompt = "\nPlease select the fitting provided tool to create your answer. Only return the generated result of the tool. Do not describe what you are doing, just return the json.\n"
index = 1
for tool in response_tools:
prompt += "\n" + json.dumps(tool) + "\n"
index += 1
return prompt
def clean_response(response):
# Remove artefacts from reply here
response = response.replace("[TOOL_CALLS]", "")
return response
def decrypt_api_key(api_key):
"""
This is PoC code and methods should be replaced with a more secure way to deal with credentials (e.g. in a db)
"""
try:
base64_bytes = api_key.encode("ascii")
decoded_string_bytes = base64.b64decode(base64_bytes)
decoded_key = decoded_string_bytes.decode("ascii")
except Exception as e:
print(e)
decoded_key = "invalid:invalid"
return decoded_key.split(":")[0], decoded_key.split(":")[1]
def main():
"""Main function to run the chat application."""
config_file = Path.absolute(Path(__file__).parent.parent / "pgpt_openai_api_proxy.json")
config = Config(config_file=config_file, required_fields=["base_url"])
chat = PrivateGPTAPI(config)
print("Type your questions below. Type 'quit' to exit.")
while True:
try:
question = input("❓ Question: ").strip()
if question.lower() == 'quit':
break
if question:
chat.query_private_gpt(question)
except KeyboardInterrupt:
print("\nExiting chat...")
break
except Exception as e:
print(f"❌ Error: {str(e)}")
break
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/clients/Gradio/Api.py:
--------------------------------------------------------------------------------
```python
import json
import os
import posixpath
from pathlib import Path
from time import sleep
import paramiko
import requests
import urllib3
import base64
from httpcore import NetworkError
from clients.Gradio.config import Config
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def initialize_session(proxy_user, proxy_password, access_header):
"""Set up the session with proxy authentication."""
session = requests.Session()
session.verify = False
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
}
if access_header is not None:
headers['X-Custom-Header'] = access_header
elif proxy_user is not None and proxy_password is not None:
auth = base64.b64encode(f"{proxy_user}:{proxy_password}".encode()).decode()
headers['Authorization'] = f'Basic {auth}'
session.headers.update(headers)
return session
class PrivateGPTAPI:
def __init__(self, config, client_api_key=None):
"""Initialize the chat client with proxy authentication."""
self.token = None
self.chat_id = None
self.base_url = config.get("base_url")
self.proxy_user = config.get("proxy_user", None)
if self.proxy_user == "":
self.proxy_user = None
self.proxy_password = config.get("proxy_password", None)
if self.proxy_password == "":
self.proxy_password = None
self.access_header = config.get("access_header", None)
if self.access_header == "":
self.access_header = None
self.chosen_groups = config.get("groups", [])
self.language = config.get("language", "en")
self.use_public = config.get("use_public", False)
self.whitelist_keys = config.get("whitelist_keys", [])
self.logged_in = False
if client_api_key is not None:
self.email, self.password = decrypt_api_key(client_api_key)
if len(self.whitelist_keys) > 0:
if client_api_key not in self.whitelist_keys:
print("not authorized")
else:
self.email = config.get("email", None)
self.password = config.get("password", None)
self.ftp_password = config.get("ftp_password", None)
self.session = initialize_session(self.proxy_user, self.proxy_password, self.access_header)
if self.login():
self.logged_in = True
if self.ftp_password is not None:
self.ftp_host = config.get("ftp_host", None)
self.ftp_port = config.get("ftp_port", None)
self.ftp_folder = config.get("ftp_folder", "/")
self.ftp_subfolder = config.get("ftp_subfolder", "temp")
def login(self):
"""Authenticate the user and retrieve the token."""
url = f"{self.base_url}/login"
payload = {"email": self.email, "password": self.password}
try:
response = self.session.post(url, json=payload)
print(response.content)
response.raise_for_status()
data = response.json()
self.token = data['data']['token']
# Prüfen, ob der Header bereits existiert
if 'Authorization' in self.session.headers:
self.session.headers['Authorization'] += f', Bearer {self.token}'
else:
self.session.headers['Authorization'] = f'Bearer {self.token}'
self.chat_id = None
print("✅ Login successful.")
return True
except requests.exceptions.RequestException as e:
print(f"❌ Login failed: {e}")
return False
def create_chat(self, user_input):
"""Start a new chat session.
This method sends a POST request to the '/chats' endpoint with the provided parameters.
It initializes a new chat session and stores the chat ID for future use.
"""
url = f"{self.base_url}/chats"
payload = {
"language": self.language,
"question": user_input, # Initial question to start the chat
"usePublic": self.use_public,
"groups": self.chosen_groups
}
try:
response = self.session.post(url, json=payload)
response.raise_for_status() # Raise an exception if the response was not successful
data = response.json()
self.chat_id = data['data']['chatId'] # Store the chat ID for future use
print("✅ Chat initialized.")
resp = response.json()
try:
answer = resp.get('data', None).get('answer', "error")
except:
print(response.json())
resp = {"data":
{"answer": "error"}
}
answer = "error"
if answer.startswith("{\"role\":"):
answerj = json.loads(answer)
resp["data"]["answer"] = answerj["content"]
resp["data"]["chatId"] = "0"
print(f"💡 Response: {answer}")
return resp
except requests.exceptions.RequestException as e:
# It seems we get disconnections from time to time..
# print(f"⚠️ Failed to get response on first try, trying again..: {e}")
try:
response = self.session.patch(url, json=payload)
response.raise_for_status()
data = response.json()
answer = data.get('data', {}).get('answer', "No answer provided.")
print(f"💡 Response: {answer}")
return data
except:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def list_personal_groups(self):
url = f"{self.base_url}/groups"
try:
resp = self.session.get(url)
try:
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
personal = data_block.get("personalGroups", [])
return personal
except:
return []
except NetworkError as e:
return []
def get_document_info(self, id):
url = f"{self.base_url}/sources/{id }"
try:
resp = self.session.get(url)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
return data_block
except NetworkError as e:
return []
def query_private_gpt(self, user_input) -> json:
"""Send a question to the chat and retrieve the response."""
if not self.chat_id:
print("❌ Chat session not initialized.")
return False
url = f"{self.base_url}/chats/{self.chat_id}"
payload = {"question": user_input}
try:
response = self.session.patch(url, json=payload)
# response.raise_for_status()
resp = response.json()
try:
answer = resp.get('data', None).get('answer', "error")
except:
print(response.json())
resp = {"data":
{"answer": "error"}
}
answer = "error"
if answer.startswith("{\"role\":"):
answerj = json.loads(answer)
resp["data"]["answer"] = answerj["content"]
resp["data"]["chatId"] = "0"
print(f"💡 Response: {answer}")
return resp
except requests.exceptions.RequestException as e:
# It seems we get disconnections from time to time..
# print(f"⚠️ Failed to get response on first try, trying again..: {e}")
try:
response = self.session.patch(url, json=payload)
response.raise_for_status()
data = response.json()
answer = data.get('data', {}).get('answer', "No answer provided.")
print(f"💡 Response: {answer}")
return data
except:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def add_source(self, markdown, groups, name):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources"
try:
payload = {
"name": name,
"groups": groups,
"content": markdown
}
resp = self.session.post(url, json=payload)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
return data_block
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def update_source(self, source_id, markdown=None, groups=None, name=None):
"""Edit an existing Source"""
url = f"{self.base_url}/sources/{source_id}"
try:
payload = {}
if groups is None:
existing_groups = self.get_document_info(source_id)["groups"]
payload["groups"] = existing_groups
else:
payload["groups"] = groups
if markdown is not None:
payload["content"] = markdown
if name is not None:
payload["name"] = name
resp = self.session.patch(url, json=payload)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
return data_block
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def delete_source(self, source_id):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources/{source_id}"
try:
resp = self.session.delete(url)
j = json.loads(resp.content)
message = j["message"]
if not message:
return "failed"
return message
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def upload_sftp(self, file_path):
# Connect to SFTP to determine existing suffixes
transport = paramiko.Transport((self.ftp_host, self.ftp_port))
transport.connect(username=self.email, password=self.ftp_password)
sftp = paramiko.SFTPClient.from_transport(transport)
remote_base_dir = posixpath.join(self.ftp_folder, self.ftp_subfolder)
# Ensure the remote directory exists
try:
sftp.chdir(remote_base_dir)
except IOError:
# Create remote dirs if missing
parts = remote_base_dir.strip("/").split("/")
path = ""
for part in parts:
path = posixpath.join(path, part)
try:
sftp.chdir(path)
except IOError:
sftp.mkdir(path)
sftp.chdir(path)
# Determine remote file name
remote_filename = os.path.basename(file_path)
remote_path = posixpath.join(remote_base_dir, remote_filename)
# Upload the file
try:
sftp.put(file_path, remote_path)
print(f"Uploaded {file_path} to {remote_path} successfully.")
except Exception as e:
print(e)
finally:
sftp.close()
transport.close()
print(f"Connection closed")
sources = []
while len(sources) == 0:
print(f"Checking file status")
sleep(2)
sources = self.get_sources_from_group("temp")
return sources
def get_sources_from_group(self, group):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources/groups"
try:
payload = {
"groupName": group
}
resp = self.session.post(url, json=payload)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
sources = []
for source in data_block["sources"]:
doc = self.get_document_info(source)
sources.append(doc)
return sources
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return []
def respond_with_context(self, messages, response_format=None, request_tools=None):
last_user_message = next((p for p in reversed(messages) if p["role"] == "user"), None)
user_input = ""
for message in messages:
if message["role"] == "system":
user_input = str(message) + "\n"
if last_user_message is not None:
user_input += last_user_message["content"]
last_assistant_message = next((p for p in reversed(messages) if p["role"] == "assistant"), None)
last_tool_message = next((p for p in reversed(messages) if p["role"] == "tool"), None)
hastoolresult = False
if last_tool_message is not None and last_assistant_message is not None and last_assistant_message.tool_calls is not None and len(
last_assistant_message.tool_calls) > 0:
user_input += "\nYou called the tool: " + str(
last_assistant_message.tool_calls[0]) + ". The result was: " + last_tool_message.content
hastoolresult = True
print(f"💁 Request: " + user_input)
# PGPT manages history and context itself so we don't need to forward the history.
add_context = False
if add_context:
messages.pop()
user_input += "\nHere is some context about the previous conversation:\n"
for message in messages:
user_input += f"{message.role}: {message.content}\n"
if response_format is not None:
print("Response format: " + str(response_format))
user_input += add_response_format(response_format)
if request_tools is not None and not hastoolresult:
user_input += add_tools(request_tools, last_tool_message)
if not self.logged_in:
self.login()
else:
if self.chat_id is None:
result = self.create_chat(user_input)
else:
result = self.query_private_gpt(user_input)
if 'data' in result:
response_data = result.get("data")
if request_tools is not None and not hastoolresult and is_json(
clean_response(response_data.get("answer"))):
response_data["tool_call"] = clean_response(response_data.get("answer", ""))
return response_data
elif 'error' in result:
# Try to login again and send the query once more on error.
if self.login():
if self.chat_id is None:
result = self.create_chat(user_input)
else:
result = self.query_private_gpt(user_input)
if 'data' in result:
return result['data']
else:
return result
else:
return result
def is_json(myjson):
try:
json.loads(myjson)
except ValueError as e:
return False
return True
def add_response_format(response_format):
# prompt = "\nPlease fill in the following template with realistic and appropriate information. Be creative. The field 'type' defines the output format. In your reply, only return the generated json\n"
prompt = "\nPlease fill in the following json template with realistic and appropriate information. In your reply, only return the generated json. If you can't answer return an empty json.\n"
prompt += json.dumps(response_format)
return prompt
def add_tools(response_tools, last_tool_message):
prompt = "\nPlease select the fitting provided tool to create your answer. Only return the generated result of the tool. Do not describe what you are doing, just return the json.\n"
index = 1
for tool in response_tools:
prompt += "\n" + json.dumps(tool) + "\n"
index += 1
return prompt
def clean_response(response):
# Remove artefacts from reply here
response = response.replace("[TOOL_CALLS]", "")
return response
def decrypt_api_key(api_key):
"""
This is PoC code and methods should be replaced with a more secure way to deal with credentials (e.g. in a db)
"""
try:
base64_bytes = api_key.encode("ascii")
decoded_string_bytes = base64.b64decode(base64_bytes)
decoded_key = decoded_string_bytes.decode("ascii")
except Exception as e:
print(e)
decoded_key = "invalid:invalid"
return decoded_key.split(":")[0], decoded_key.split(":")[1]
def main():
"""Main function to run the chat application."""
config_file = Path.absolute(Path(__file__).parent / "config.json")
config = Config(config_file=config_file, required_fields=["base_url"])
chat = PrivateGPTAPI(config)
print("Type your questions below. Type 'quit' to exit.")
while True:
try:
question = input("❓ Question: ").strip()
if question.lower() == 'quit':
break
if question:
if chat.chat_id is None:
chat.create_chat(question)
else:
chat.query_private_gpt(question)
except KeyboardInterrupt:
print("\nExiting chat...")
break
except Exception as e:
print(f"❌ Error: {str(e)}")
break
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/clients/Gradio/mcp_servers/sqlite/src/mcp_server_sqlite/server.py:
--------------------------------------------------------------------------------
```python
import os
import sys
import sqlite3
import logging
from contextlib import closing
from pathlib import Path
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
from pydantic import AnyUrl
from typing import Any
# reconfigure UnicodeEncodeError prone default (i.e. windows-1252) to utf-8
if sys.platform == "win32" and os.environ.get('PYTHONIOENCODING') is None:
sys.stdin.reconfigure(encoding="utf-8")
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
logger = logging.getLogger('mcp_sqlite_server')
logger.info("Starting MCP SQLite Server")
PROMPT_TEMPLATE = """
The assistants goal is to walkthrough an informative demo of MCP. To demonstrate the Model Context Protocol (MCP) we will leverage this example server to interact with an SQLite database.
It is important that you first explain to the user what is going on. The user has downloaded and installed the SQLite MCP Server and is now ready to use it.
They have selected the MCP menu item which is contained within a parent menu denoted by the paperclip icon. Inside this menu they selected an icon that illustrates two electrical plugs connecting. This is the MCP menu.
Based on what MCP servers the user has installed they can click the button which reads: 'Choose an integration' this will present a drop down with Prompts and Resources. The user has selected the prompt titled: 'mcp-demo'.
This text file is that prompt. The goal of the following instructions is to walk the user through the process of using the 3 core aspects of an MCP server. These are: Prompts, Tools, and Resources.
They have already used a prompt and provided a topic. The topic is: {topic}. The user is now ready to begin the demo.
Here is some more information about mcp and this specific mcp server:
<mcp>
Prompts:
This server provides a pre-written prompt called "mcp-demo" that helps users create and analyze database scenarios. The prompt accepts a "topic" argument and guides users through creating tables, analyzing data, and generating insights. For example, if a user provides "retail sales" as the topic, the prompt will help create relevant database tables and guide the analysis process. Prompts basically serve as interactive templates that help structure the conversation with the LLM in a useful way.
Resources:
This server exposes one key resource: "memo://insights", which is a business insights memo that gets automatically updated throughout the analysis process. As users analyze the database and discover insights, the memo resource gets updated in real-time to reflect new findings. Resources act as living documents that provide context to the conversation.
Tools:
This server provides several SQL-related tools:
"read_query": Executes SELECT queries to read data from the database
"write_query": Executes INSERT, UPDATE, or DELETE queries to modify data
"create_table": Creates new tables in the database
"list_tables": Shows all existing tables
"describe_table": Shows the schema for a specific table
"append_insight": Adds a new business insight to the memo resource
</mcp>
<demo-instructions>
You are an AI assistant tasked with generating a comprehensive business scenario based on a given topic.
Your goal is to create a narrative that involves a data-driven business problem, develop a database structure to support it, generate relevant queries, create a dashboard, and provide a final solution.
At each step you will pause for user input to guide the scenario creation process. Overall ensure the scenario is engaging, informative, and demonstrates the capabilities of the SQLite MCP Server.
You should guide the scenario to completion. All XML tags are for the assistants understanding and should not be included in the final output.
1. The user has chosen the topic: {topic}.
2. Create a business problem narrative:
a. Describe a high-level business situation or problem based on the given topic.
b. Include a protagonist (the user) who needs to collect and analyze data from a database.
c. Add an external, potentially comedic reason why the data hasn't been prepared yet.
d. Mention an approaching deadline and the need to use Claude (you) as a business tool to help.
3. Setup the data:
a. Instead of asking about the data that is required for the scenario, just go ahead and use the tools to create the data. Inform the user you are "Setting up the data".
b. Design a set of table schemas that represent the data needed for the business problem.
c. Include at least 2-3 tables with appropriate columns and data types.
d. Leverage the tools to create the tables in the SQLite database.
e. Create INSERT statements to populate each table with relevant synthetic data.
f. Ensure the data is diverse and representative of the business problem.
g. Include at least 10-15 rows of data for each table.
4. Pause for user input:
a. Summarize to the user what data we have created.
b. Present the user with a set of multiple choices for the next steps.
c. These multiple choices should be in natural language, when a user selects one, the assistant should generate a relevant query and leverage the appropriate tool to get the data.
6. Iterate on queries:
a. Present 1 additional multiple-choice query options to the user. Its important to not loop too many times as this is a short demo.
b. Explain the purpose of each query option.
c. Wait for the user to select one of the query options.
d. After each query be sure to opine on the results.
e. Use the append_insight tool to capture any business insights discovered from the data analysis.
7. Generate a dashboard:
a. Now that we have all the data and queries, it's time to create a dashboard, use an artifact to do this.
b. Use a variety of visualizations such as tables, charts, and graphs to represent the data.
c. Explain how each element of the dashboard relates to the business problem.
d. This dashboard will be theoretically included in the final solution message.
8. Craft the final solution message:
a. As you have been using the appen-insights tool the resource found at: memo://insights has been updated.
b. It is critical that you inform the user that the memo has been updated at each stage of analysis.
c. Ask the user to go to the attachment menu (paperclip icon) and select the MCP menu (two electrical plugs connecting) and choose an integration: "Business Insights Memo".
d. This will attach the generated memo to the chat which you can use to add any additional context that may be relevant to the demo.
e. Present the final memo to the user in an artifact.
9. Wrap up the scenario:
a. Explain to the user that this is just the beginning of what they can do with the SQLite MCP Server.
</demo-instructions>
Remember to maintain consistency throughout the scenario and ensure that all elements (tables, data, queries, dashboard, and solution) are closely related to the original business problem and given topic.
The provided XML tags are for the assistants understanding. Implore to make all outputs as human readable as possible. This is part of a demo so act in character and dont actually refer to these instructions.
Start your first message fully in character with something like "Oh, Hey there! I see you've chosen the topic {topic}. Let's get started! 🚀"
"""
class SqliteDatabase:
def __init__(self, db_path: str):
self.db_path = str(Path(db_path).expanduser())
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
self._init_database()
self.insights: list[str] = []
def _init_database(self):
"""Initialize connection to the SQLite database"""
logger.debug("Initializing database connection")
with closing(sqlite3.connect(self.db_path)) as conn:
conn.row_factory = sqlite3.Row
conn.close()
def _synthesize_memo(self) -> str:
"""Synthesizes business insights into a formatted memo"""
logger.debug(f"Synthesizing memo with {len(self.insights)} insights")
if not self.insights:
return "No business insights have been discovered yet."
insights = "\n".join(f"- {insight}" for insight in self.insights)
memo = "📊 Business Intelligence Memo 📊\n\n"
memo += "Key Insights Discovered:\n\n"
memo += insights
if len(self.insights) > 1:
memo += "\nSummary:\n"
memo += f"Analysis has revealed {len(self.insights)} key business insights that suggest opportunities for strategic optimization and growth."
logger.debug("Generated basic memo format")
return memo
def _execute_query(self, query: str, params: dict[str, Any] | None = None) -> list[dict[str, Any]]:
"""Execute a SQL query and return results as a list of dictionaries"""
logger.debug(f"Executing query: {query}")
try:
with closing(sqlite3.connect(self.db_path)) as conn:
conn.row_factory = sqlite3.Row
with closing(conn.cursor()) as cursor:
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
if query.strip().upper().startswith(('INSERT', 'UPDATE', 'DELETE', 'CREATE', 'DROP', 'ALTER')):
conn.commit()
affected = cursor.rowcount
logger.debug(f"Write query affected {affected} rows")
return [{"affected_rows": affected}]
results = [dict(row) for row in cursor.fetchall()]
logger.debug(f"Read query returned {len(results)} rows")
return results
except Exception as e:
logger.error(f"Database error executing query: {e}")
raise
async def main(db_path: str):
logger.info(f"Starting SQLite MCP Server with DB path: {db_path}")
db = SqliteDatabase(db_path)
server = Server("sqlite-manager")
# Register handlers
logger.debug("Registering handlers")
@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
logger.debug("Handling list_resources request")
return [
types.Resource(
uri=AnyUrl("memo://insights"),
name="Business Insights Memo",
description="A living document of discovered business insights",
mimeType="text/plain",
)
]
@server.read_resource()
async def handle_read_resource(uri: AnyUrl) -> str:
logger.debug(f"Handling read_resource request for URI: {uri}")
if uri.scheme != "memo":
logger.error(f"Unsupported URI scheme: {uri.scheme}")
raise ValueError(f"Unsupported URI scheme: {uri.scheme}")
path = str(uri).replace("memo://", "")
if not path or path != "insights":
logger.error(f"Unknown resource path: {path}")
raise ValueError(f"Unknown resource path: {path}")
return db._synthesize_memo()
@server.list_prompts()
async def handle_list_prompts() -> list[types.Prompt]:
logger.debug("Handling list_prompts request")
return [
types.Prompt(
name="mcp-demo",
description="A prompt to seed the database with initial data and demonstrate what you can do with an SQLite MCP Server + Claude",
arguments=[
types.PromptArgument(
name="topic",
description="Topic to seed the database with initial data",
required=True,
)
],
)
]
@server.get_prompt()
async def handle_get_prompt(name: str, arguments: dict[str, str] | None) -> types.GetPromptResult:
logger.debug(f"Handling get_prompt request for {name} with args {arguments}")
if name != "mcp-demo":
logger.error(f"Unknown prompt: {name}")
raise ValueError(f"Unknown prompt: {name}")
if not arguments or "topic" not in arguments:
logger.error("Missing required argument: topic")
raise ValueError("Missing required argument: topic")
topic = arguments["topic"]
prompt = PROMPT_TEMPLATE.format(topic=topic)
logger.debug(f"Generated prompt template for topic: {topic}")
return types.GetPromptResult(
description=f"Demo template for {topic}",
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(type="text", text=prompt.strip()),
)
],
)
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools"""
return [
types.Tool(
name="read_query",
description="Execute a SELECT query on the SQLite database",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "SELECT SQL query to execute"},
},
"required": ["query"],
},
),
types.Tool(
name="write_query",
description="Execute an INSERT, UPDATE, or DELETE query on the SQLite database",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "SQL query to execute"},
},
"required": ["query"],
},
),
types.Tool(
name="create_table",
description="Create a new table in the SQLite database",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "CREATE TABLE SQL statement"},
},
"required": ["query"],
},
),
types.Tool(
name="list_tables",
description="List all tables in the SQLite database",
inputSchema={
"type": "object",
"properties": {},
},
),
types.Tool(
name="describe_table",
description="Get the schema information for a specific table",
inputSchema={
"type": "object",
"properties": {
"table_name": {"type": "string", "description": "Name of the table to describe"},
},
"required": ["table_name"],
},
),
types.Tool(
name="append_insight",
description="Add a business insight to the memo",
inputSchema={
"type": "object",
"properties": {
"insight": {"type": "string", "description": "Business insight discovered from data analysis"},
},
"required": ["insight"],
},
),
]
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict[str, Any] | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""Handle tool execution requests"""
try:
if name == "list_tables":
results = db._execute_query(
"SELECT name FROM sqlite_master WHERE type='table'"
)
return [types.TextContent(type="text", text=str(results))]
elif name == "describe_table":
if not arguments or "table_name" not in arguments:
raise ValueError("Missing table_name argument")
results = db._execute_query(
f"PRAGMA table_info({arguments['table_name']})"
)
return [types.TextContent(type="text", text=str(results))]
elif name == "append_insight":
if not arguments or "insight" not in arguments:
raise ValueError("Missing insight argument")
db.insights.append(arguments["insight"])
_ = db._synthesize_memo()
# Notify clients that the memo resource has changed
await server.request_context.session.send_resource_updated(AnyUrl("memo://insights"))
return [types.TextContent(type="text", text="Insight added to memo")]
if not arguments:
raise ValueError("Missing arguments")
if name == "read_query":
if not arguments["query"].strip().upper().startswith("SELECT"):
raise ValueError("Only SELECT queries are allowed for read_query")
results = db._execute_query(arguments["query"])
return [types.TextContent(type="text", text=str(results))]
elif name == "write_query":
if arguments["query"].strip().upper().startswith("SELECT"):
raise ValueError("SELECT queries are not allowed for write_query")
results = db._execute_query(arguments["query"])
return [types.TextContent(type="text", text=str(results))]
elif name == "create_table":
if not arguments["query"].strip().upper().startswith("CREATE TABLE"):
raise ValueError("Only CREATE TABLE statements are allowed")
db._execute_query(arguments["query"])
return [types.TextContent(type="text", text="Table created successfully")]
else:
raise ValueError(f"Unknown tool: {name}")
except sqlite3.Error as e:
return [types.TextContent(type="text", text=f"Database error: {str(e)}")]
except Exception as e:
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
logger.info("Server running with stdio transport")
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="sqlite",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
```
--------------------------------------------------------------------------------
/examples/create_users_from_csv/create_users_from_csv.py:
--------------------------------------------------------------------------------
```python
# https://github.com/Fujitsu-AI/MCP-Server-for-MAS-Developments/tree/main/examples
import json
import os
import posixpath
from pathlib import Path
from time import sleep
import paramiko
import requests
import urllib3
import base64
from httpcore import NetworkError
import pandas as pd
from config import Config
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def initialize_session(proxy_user, proxy_password, access_header):
"""Set up the session with proxy authentication."""
session = requests.Session()
session.verify = False
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
}
if access_header is not None:
headers['X-Custom-Header'] = access_header
elif proxy_user is not None and proxy_password is not None:
auth = base64.b64encode(f"{proxy_user}:{proxy_password}".encode()).decode()
headers['Authorization'] = f'Basic {auth}'
session.headers.update(headers)
return session
class PrivateGPTAPI:
def __init__(self, config, client_api_key=None):
"""Initialize the chat client with proxy authentication."""
self.token = None
self.chat_id = None
self.base_url = config.get("base_url")
self.proxy_user = config.get("proxy_user", None)
if self.proxy_user == "":
self.proxy_user = None
self.proxy_password = config.get("proxy_password", None)
if self.proxy_password == "":
self.proxy_password = None
self.access_header = config.get("access_header", None)
if self.access_header == "":
self.access_header = None
self.chosen_groups = config.get("groups", [])
self.language = config.get("language", "en")
self.use_public = config.get("use_public", True)
self.whitelist_keys = config.get("whitelist_keys", [])
self.logged_in = False
if client_api_key is not None:
self.email, self.password = decrypt_api_key(client_api_key)
if len(self.whitelist_keys) > 0:
if client_api_key not in self.whitelist_keys:
print("not authorized")
else:
self.email = config.get("email", None)
self.password = config.get("password", None)
self.ftp_password = config.get("ftp_password", None)
self.session = initialize_session(self.proxy_user, self.proxy_password, self.access_header)
if self.login():
self.logged_in = True
if self.ftp_password is not None:
self.ftp_host = config.get("ftp_host", None)
self.ftp_port = config.get("ftp_port", None)
self.ftp_folder = config.get("ftp_folder", "/")
self.ftp_subfolder = config.get("ftp_subfolder", "temp")
def login(self):
"""Authenticate the user and retrieve the token."""
url = f"{self.base_url}/login"
payload = {"email": self.email, "password": self.password}
try:
response = self.session.post(url, json=payload)
print(response.content)
response.raise_for_status()
data = response.json()
self.token = data['data']['token']
# Prüfen, ob der Header bereits existiert
if 'Authorization' in self.session.headers:
self.session.headers['Authorization'] += f', Bearer {self.token}'
else:
self.session.headers['Authorization'] = f'Bearer {self.token}'
self.chat_id = None
print("✅ Login successful.")
return True
except requests.exceptions.RequestException as e:
print(f"❌ Login failed: {e}")
return False
def create_chat(self, user_input):
"""Start a new chat session.
This method sends a POST request to the '/chats' endpoint with the provided parameters.
It initializes a new chat session and stores the chat ID for future use.
"""
url = f"{self.base_url}/chats"
payload = {
"language": self.language,
"question": user_input, # Initial question to start the chat
"usePublic": self.use_public,
"groups": self.chosen_groups
}
try:
response = self.session.post(url, json=payload)
print(response)
response.raise_for_status() # Raise an exception if the response was not successful
data = response.json()
self.chat_id = data['data']['chatId'] # Store the chat ID for future use
print("✅ Chat initialized.")
resp = response.json()
try:
answer = resp.get('data', None).get('answer', "error")
except:
print(response.json())
resp = {"data":
{"answer": "error"}
}
answer = "error"
if answer.startswith("{\"role\":"):
answerj = json.loads(answer)
resp["data"]["answer"] = answerj["content"]
resp["data"]["chatId"] = "0"
print(f"💡 Response: {answer}")
return resp
except requests.exceptions.RequestException as e:
# It seems we get disconnections from time to time..
# print(f"⚠️ Failed to get response on first try, trying again..: {e}")
try:
response = self.session.patch(url, json=payload)
response.raise_for_status()
data = response.json()
answer = data.get('data', {}).get('answer', "No answer provided.")
print(f"💡 Response: {answer}")
return data
except:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def list_personal_groups(self):
url = f"{self.base_url}/groups"
try:
resp = self.session.get(url)
try:
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
personal = data_block.get("personalGroups", [])
return personal
except:
return []
except NetworkError as e:
return []
def get_document_info(self, id):
url = f"{self.base_url}/sources/{id }"
try:
resp = self.session.get(url)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
return data_block
except NetworkError as e:
return []
def query_private_gpt(self, user_input) -> json:
"""Send a question to the chat and retrieve the response."""
#if not self.chat_id:
# print("❌ Chat session not initialized.")
# return False
# self.create_chat(self)
url = f"{self.base_url}/chats/{self.chat_id}"
payload = {"question": user_input}
try:
response = self.session.patch(url, json=payload)
# response.raise_for_status()
resp = response.json()
try:
answer = resp.get('data', None).get('answer', "error")
except:
print(response.json())
resp = {"data":
{"answer": "error"}
}
answer = "error"
if answer.startswith("{\"role\":"):
answerj = json.loads(answer)
resp["data"]["answer"] = answerj["content"]
resp["data"]["chatId"] = "0"
print(f"💡 Response: {answer}")
return resp
except requests.exceptions.RequestException as e:
# It seems we get disconnections from time to time..
# print(f"⚠️ Failed to get response on first try, trying again..: {e}")
try:
response = self.session.patch(url, json=payload)
response.raise_for_status()
data = response.json()
answer = data.get('data', {}).get('answer', "No answer provided.")
print(f"💡 Response: {answer}")
return data
except:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def add_source(self, markdown, groups, name):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources"
try:
payload = {
"name": name,
"groups": groups,
"content": markdown
}
resp = self.session.post(url, json=payload)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
return data_block
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def update_source(self, source_id, markdown=None, groups=None, name=None):
"""Edit an existing Source"""
url = f"{self.base_url}/sources/{source_id}"
try:
payload = {}
if groups is None:
existing_groups = self.get_document_info(source_id)["groups"]
payload["groups"] = existing_groups
else:
payload["groups"] = groups
if markdown is not None:
payload["content"] = markdown
if name is not None:
payload["name"] = name
resp = self.session.patch(url, json=payload)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
return data_block
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def delete_source(self, source_id):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources/{source_id}"
try:
resp = self.session.delete(url)
j = json.loads(resp.content)
message = j["message"]
if not message:
return "failed"
return message
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return {"error": f"❌ Failed to get response: {e}"}
def upload_sftp(self, file_path):
# Connect to SFTP to determine existing suffixes
transport = paramiko.Transport((self.ftp_host, self.ftp_port))
transport.connect(username=self.email, password=self.ftp_password)
sftp = paramiko.SFTPClient.from_transport(transport)
remote_base_dir = posixpath.join(self.ftp_folder, self.ftp_subfolder)
# Ensure the remote directory exists
try:
sftp.chdir(remote_base_dir)
except IOError:
# Create remote dirs if missing
parts = remote_base_dir.strip("/").split("/")
path = ""
for part in parts:
path = posixpath.join(path, part)
try:
sftp.chdir(path)
except IOError:
sftp.mkdir(path)
sftp.chdir(path)
# Determine remote file name
remote_filename = os.path.basename(file_path)
remote_path = posixpath.join(remote_base_dir, remote_filename)
# Upload the file
try:
sftp.put(file_path, remote_path)
print(f"Uploaded {file_path} to {remote_path} successfully.")
except Exception as e:
print(e)
finally:
sftp.close()
transport.close()
sources = []
while len(sources) == 0:
print(f"Waiting for file to be added..")
sleep(2)
sources = self.get_sources_from_group(self.ftp_subfolder)
return sources
def get_sources_from_group(self, group):
"""Send a source id to retrieve details. Working with version 1.3.3 and newer"""
url = f"{self.base_url}/sources/groups"
try:
payload = {
"groupName": group
}
resp = self.session.post(url, json=payload)
j = json.loads(resp.content)
data_block = j["data"]
if not data_block:
return []
sources = []
for source in data_block["sources"]:
doc = self.get_document_info(source)
sources.append(doc)
return sources
except requests.exceptions.RequestException as e:
print(f"❌ Failed to get response: {e}")
return []
def respond_with_context(self, messages, response_format=None, request_tools=None):
last_user_message = next((p for p in reversed(messages) if p["role"] == "user"), None)
user_input = ""
for message in messages:
if message["role"] == "system":
user_input = str(message) + "\n"
if last_user_message is not None:
user_input += last_user_message["content"]
last_assistant_message = next((p for p in reversed(messages) if p["role"] == "assistant"), None)
last_tool_message = next((p for p in reversed(messages) if p["role"] == "tool"), None)
hastoolresult = False
if last_tool_message is not None and last_assistant_message is not None and last_assistant_message.tool_calls is not None and len(
last_assistant_message.tool_calls) > 0:
user_input += "\nYou called the tool: " + str(
last_assistant_message.tool_calls[0]) + ". The result was: " + last_tool_message.content
hastoolresult = True
print(f"💁 Request: " + user_input)
# PGPT manages history and context itself so we don't need to forward the history.
add_context = False
if add_context:
messages.pop()
user_input += "\nHere is some context about the previous conversation:\n"
for message in messages:
user_input += f"{message.role}: {message.content}\n"
if response_format is not None:
print("Response format: " + str(response_format))
user_input += add_response_format(response_format)
if request_tools is not None and not hastoolresult:
user_input += add_tools(request_tools, last_tool_message)
if not self.logged_in:
self.login()
else:
if self.chat_id is None:
result = self.create_chat(user_input)
else:
result = self.query_private_gpt(user_input)
if 'data' in result:
response_data = result.get("data")
if request_tools is not None and not hastoolresult and is_json(
clean_response(response_data.get("answer"))):
response_data["tool_call"] = clean_response(response_data.get("answer", ""))
return response_data
elif 'error' in result:
# Try to login again and send the query once more on error.
if self.login():
if self.chat_id is None:
result = self.create_chat(user_input)
else:
result = self.query_private_gpt(user_input)
if 'data' in result:
return result['data']
else:
return result
else:
return result
def add_user_from_payload(self, payload):
'''
this function creates users from a payload dictio
'''
url = f"{self.base_url}/users"
try:
response = self.session.post(url, json=payload)
except:
pass
print(response)
print(response.content)
def is_json(myjson):
try:
json.loads(myjson)
except ValueError as e:
return False
return True
def add_response_format(response_format):
# prompt = "\nPlease fill in the following template with realistic and appropriate information. Be creative. The field 'type' defines the output format. In your reply, only return the generated json\n"
prompt = "\nPlease fill in the following json template with realistic and appropriate information. In your reply, only return the generated json. If you can't answer return an empty json.\n"
prompt += json.dumps(response_format)
return prompt
def add_tools(response_tools, last_tool_message):
prompt = "\nPlease select the fitting provided tool to create your answer. Only return the generated result of the tool. Do not describe what you are doing, just return the json.\n"
index = 1
for tool in response_tools:
prompt += "\n" + json.dumps(tool) + "\n"
index += 1
return prompt
def clean_response(response):
# Remove artefacts from reply here
response = response.replace("[TOOL_CALLS]", "")
return response
def decrypt_api_key(api_key):
"""
This is PoC code and methods should be replaced with a more secure way to deal with credentials (e.g. in a db)
"""
try:
base64_bytes = api_key.encode("ascii")
decoded_string_bytes = base64.b64decode(base64_bytes)
decoded_key = decoded_string_bytes.decode("ascii")
except Exception as e:
print(e)
decoded_key = "invalid:invalid"
return decoded_key.split(":")[0], decoded_key.split(":")[1]
def input_with_default(prompt, default):
user_input = input(f"{prompt} [{default}]: ")
return user_input if user_input else default
def main():
"""Main function to run the chat application."""
config_file = Path.absolute(Path(__file__).parent / "config.json")
config = Config(config_file=config_file, required_fields=["base_url"])
chat = PrivateGPTAPI(config)
print("Type the filename with users to be added or press enter to use the dafault. Type 'quit' to skip the user creation.")
while True:
try:
user_file = input_with_default("Filename: ", "users_to_add_no_tz.csv")
print(f"You entered: {user_file}")
if user_file.lower() == 'quit':
break
if user_file:
print(f'')
users = pd.read_csv(user_file, sep=';')
users_dict_list = users.to_dict('records')
users_dict_list = [ {k:(json.loads(v) if k in['groups', 'roles'] else v)
for k,v in dict.items()}
for dict in users_dict_list]
[print(account) for account in users_dict_list]
[chat.add_user_from_payload(user_dict) for user_dict in users_dict_list]
break
except KeyboardInterrupt:
print("\nExiting user creation ...")
break
except Exception as e:
print(f"❌ Error: {str(e)}")
break
# chat.add_user(name = "api_test", email="[email protected]")
print("Type your questions below. Type 'quit' to exit.")
while True:
try:
question = input("❓ Question: ").strip()
if question.lower() == 'quit':
break
if question:
if chat.chat_id is None:
chat.create_chat(question)
else:
chat.query_private_gpt(question)
except KeyboardInterrupt:
print("\nExiting chat...")
break
except Exception as e:
print(f"❌ Error: {str(e)}")
break
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/clients/Gradio/mcp_servers/filesystem/index.ts:
--------------------------------------------------------------------------------
```typescript
#!/usr/bin/env node
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
ToolSchema,
} from "@modelcontextprotocol/sdk/types.js";
import fs from "fs/promises";
import path from "path";
import os from 'os';
import { z } from "zod";
import { zodToJsonSchema } from "zod-to-json-schema";
import { diffLines, createTwoFilesPatch } from 'diff';
import { minimatch } from 'minimatch';
// Command line argument parsing
const args = process.argv.slice(2);
if (args.length === 0) {
console.error("Usage: mcp-server-filesystem <allowed-directory> [additional-directories...]");
process.exit(1);
}
// Normalize all paths consistently
function normalizePath(p: string): string {
return path.normalize(p);
}
function expandHome(filepath: string): string {
if (filepath.startsWith('~/') || filepath === '~') {
return path.join(os.homedir(), filepath.slice(1));
}
return filepath;
}
// Store allowed directories in normalized form
const allowedDirectories = args.map(dir =>
normalizePath(path.resolve(expandHome(dir)))
);
// Validate that all directories exist and are accessible
await Promise.all(args.map(async (dir) => {
try {
const stats = await fs.stat(dir);
if (!stats.isDirectory()) {
console.error(`Error: ${dir} is not a directory`);
process.exit(1);
}
} catch (error) {
console.error(`Error accessing directory ${dir}:`, error);
process.exit(1);
}
}));
// Security utilities
async function validatePath(requestedPath: string): Promise<string> {
const expandedPath = expandHome(requestedPath);
const absolute = path.isAbsolute(expandedPath)
? path.resolve(expandedPath)
: path.resolve(process.cwd(), expandedPath);
const normalizedRequested = normalizePath(absolute);
// Check if path is within allowed directories
const isAllowed = allowedDirectories.some(dir => normalizedRequested.startsWith(dir));
if (!isAllowed) {
throw new Error(`Access denied - path outside allowed directories: ${absolute} not in ${allowedDirectories.join(', ')}`);
}
// Handle symlinks by checking their real path
try {
const realPath = await fs.realpath(absolute);
const normalizedReal = normalizePath(realPath);
const isRealPathAllowed = allowedDirectories.some(dir => normalizedReal.startsWith(dir));
if (!isRealPathAllowed) {
throw new Error("Access denied - symlink target outside allowed directories");
}
return realPath;
} catch (error) {
// For new files that don't exist yet, verify parent directory
const parentDir = path.dirname(absolute);
try {
const realParentPath = await fs.realpath(parentDir);
const normalizedParent = normalizePath(realParentPath);
const isParentAllowed = allowedDirectories.some(dir => normalizedParent.startsWith(dir));
if (!isParentAllowed) {
throw new Error("Access denied - parent directory outside allowed directories");
}
return absolute;
} catch {
throw new Error(`Parent directory does not exist: ${parentDir}`);
}
}
}
// Schema definitions
const ReadFileArgsSchema = z.object({
path: z.string(),
});
const ReadMultipleFilesArgsSchema = z.object({
paths: z.array(z.string()),
});
const WriteFileArgsSchema = z.object({
path: z.string(),
content: z.string(),
});
const EditOperation = z.object({
oldText: z.string().describe('Text to search for - must match exactly'),
newText: z.string().describe('Text to replace with')
});
const EditFileArgsSchema = z.object({
path: z.string(),
edits: z.array(EditOperation),
dryRun: z.boolean().default(false).describe('Preview changes using git-style diff format')
});
const CreateDirectoryArgsSchema = z.object({
path: z.string(),
});
const ListDirectoryArgsSchema = z.object({
path: z.string(),
});
const DirectoryTreeArgsSchema = z.object({
path: z.string(),
});
const MoveFileArgsSchema = z.object({
source: z.string(),
destination: z.string(),
});
const SearchFilesArgsSchema = z.object({
path: z.string(),
pattern: z.string(),
excludePatterns: z.array(z.string()).optional().default([])
});
const GetFileInfoArgsSchema = z.object({
path: z.string(),
});
const ToolInputSchema = ToolSchema.shape.inputSchema;
type ToolInput = z.infer<typeof ToolInputSchema>;
interface FileInfo {
size: number;
created: Date;
modified: Date;
accessed: Date;
isDirectory: boolean;
isFile: boolean;
permissions: string;
}
// Server setup
const server = new Server(
{
name: "secure-filesystem-server",
version: "0.2.0",
},
{
capabilities: {
tools: {},
},
},
);
// Tool implementations
async function getFileStats(filePath: string): Promise<FileInfo> {
const stats = await fs.stat(filePath);
return {
size: stats.size,
created: stats.birthtime,
modified: stats.mtime,
accessed: stats.atime,
isDirectory: stats.isDirectory(),
isFile: stats.isFile(),
permissions: stats.mode.toString(8).slice(-3),
};
}
async function searchFiles(
rootPath: string,
pattern: string,
excludePatterns: string[] = []
): Promise<string[]> {
const results: string[] = [];
async function search(currentPath: string) {
const entries = await fs.readdir(currentPath, { withFileTypes: true });
for (const entry of entries) {
const fullPath = path.join(currentPath, entry.name);
try {
// Validate each path before processing
await validatePath(fullPath);
// Check if path matches any exclude pattern
const relativePath = path.relative(rootPath, fullPath);
const shouldExclude = excludePatterns.some(pattern => {
const globPattern = pattern.includes('*') ? pattern : `**/${pattern}/**`;
return minimatch(relativePath, globPattern, { dot: true });
});
if (shouldExclude) {
continue;
}
if (entry.name.toLowerCase().includes(pattern.toLowerCase())) {
results.push(fullPath);
}
if (entry.isDirectory()) {
await search(fullPath);
}
} catch (error) {
// Skip invalid paths during search
continue;
}
}
}
await search(rootPath);
return results;
}
// file editing and diffing utilities
function normalizeLineEndings(text: string): string {
return text.replace(/\r\n/g, '\n');
}
function createUnifiedDiff(originalContent: string, newContent: string, filepath: string = 'file'): string {
// Ensure consistent line endings for diff
const normalizedOriginal = normalizeLineEndings(originalContent);
const normalizedNew = normalizeLineEndings(newContent);
return createTwoFilesPatch(
filepath,
filepath,
normalizedOriginal,
normalizedNew,
'original',
'modified'
);
}
async function applyFileEdits(
filePath: string,
edits: Array<{oldText: string, newText: string}>,
dryRun = false
): Promise<string> {
// Read file content and normalize line endings
const content = normalizeLineEndings(await fs.readFile(filePath, 'utf-8'));
// Apply edits sequentially
let modifiedContent = content;
for (const edit of edits) {
const normalizedOld = normalizeLineEndings(edit.oldText);
const normalizedNew = normalizeLineEndings(edit.newText);
// If exact match exists, use it
if (modifiedContent.includes(normalizedOld)) {
modifiedContent = modifiedContent.replace(normalizedOld, normalizedNew);
continue;
}
// Otherwise, try line-by-line matching with flexibility for whitespace
const oldLines = normalizedOld.split('\n');
const contentLines = modifiedContent.split('\n');
let matchFound = false;
for (let i = 0; i <= contentLines.length - oldLines.length; i++) {
const potentialMatch = contentLines.slice(i, i + oldLines.length);
// Compare lines with normalized whitespace
const isMatch = oldLines.every((oldLine, j) => {
const contentLine = potentialMatch[j];
return oldLine.trim() === contentLine.trim();
});
if (isMatch) {
// Preserve original indentation of first line
const originalIndent = contentLines[i].match(/^\s*/)?.[0] || '';
const newLines = normalizedNew.split('\n').map((line, j) => {
if (j === 0) return originalIndent + line.trimStart();
// For subsequent lines, try to preserve relative indentation
const oldIndent = oldLines[j]?.match(/^\s*/)?.[0] || '';
const newIndent = line.match(/^\s*/)?.[0] || '';
if (oldIndent && newIndent) {
const relativeIndent = newIndent.length - oldIndent.length;
return originalIndent + ' '.repeat(Math.max(0, relativeIndent)) + line.trimStart();
}
return line;
});
contentLines.splice(i, oldLines.length, ...newLines);
modifiedContent = contentLines.join('\n');
matchFound = true;
break;
}
}
if (!matchFound) {
throw new Error(`Could not find exact match for edit:\n${edit.oldText}`);
}
}
// Create unified diff
const diff = createUnifiedDiff(content, modifiedContent, filePath);
// Format diff with appropriate number of backticks
let numBackticks = 3;
while (diff.includes('`'.repeat(numBackticks))) {
numBackticks++;
}
const formattedDiff = `${'`'.repeat(numBackticks)}diff\n${diff}${'`'.repeat(numBackticks)}\n\n`;
if (!dryRun) {
await fs.writeFile(filePath, modifiedContent, 'utf-8');
}
return formattedDiff;
}
// Tool handlers
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "read_file",
description:
"Read the complete contents of a file from the file system. " +
"Handles various text encodings and provides detailed error messages " +
"if the file cannot be read. Use this tool when you need to examine " +
"the contents of a single file. Only works within allowed directories.",
inputSchema: zodToJsonSchema(ReadFileArgsSchema) as ToolInput,
},
{
name: "read_multiple_files",
description:
"Read the contents of multiple files simultaneously. This is more " +
"efficient than reading files one by one when you need to analyze " +
"or compare multiple files. Each file's content is returned with its " +
"path as a reference. Failed reads for individual files won't stop " +
"the entire operation. Only works within allowed directories.",
inputSchema: zodToJsonSchema(ReadMultipleFilesArgsSchema) as ToolInput,
},
{
name: "write_file",
description:
"Create a new file or completely overwrite an existing file with new content. " +
"Use with caution as it will overwrite existing files without warning. " +
"Handles text content with proper encoding. Only works within allowed directories.",
inputSchema: zodToJsonSchema(WriteFileArgsSchema) as ToolInput,
},
{
name: "edit_file",
description:
"Make line-based edits to a text file. Each edit replaces exact line sequences " +
"with new content. Returns a git-style diff showing the changes made. " +
"Only works within allowed directories.",
inputSchema: zodToJsonSchema(EditFileArgsSchema) as ToolInput,
},
{
name: "create_directory",
description:
"Create a new directory or ensure a directory exists. Can create multiple " +
"nested directories in one operation. If the directory already exists, " +
"this operation will succeed silently. Perfect for setting up directory " +
"structures for projects or ensuring required paths exist. Only works within allowed directories.",
inputSchema: zodToJsonSchema(CreateDirectoryArgsSchema) as ToolInput,
},
{
name: "list_directory",
description:
"Get a detailed listing of all files and directories in a specified path. " +
"Results clearly distinguish between files and directories with [FILE] and [DIR] " +
"prefixes. This tool is essential for understanding directory structure and " +
"finding specific files within a directory. Only works within allowed directories.",
inputSchema: zodToJsonSchema(ListDirectoryArgsSchema) as ToolInput,
},
{
name: "directory_tree",
description:
"Get a recursive tree view of files and directories as a JSON structure. " +
"Each entry includes 'name', 'type' (file/directory), and 'children' for directories. " +
"Files have no children array, while directories always have a children array (which may be empty). " +
"The output is formatted with 2-space indentation for readability. Only works within allowed directories.",
inputSchema: zodToJsonSchema(DirectoryTreeArgsSchema) as ToolInput,
},
{
name: "move_file",
description:
"Move or rename files and directories. Can move files between directories " +
"and rename them in a single operation. If the destination exists, the " +
"operation will fail. Works across different directories and can be used " +
"for simple renaming within the same directory. Both source and destination must be within allowed directories.",
inputSchema: zodToJsonSchema(MoveFileArgsSchema) as ToolInput,
},
{
name: "search_files",
description:
"Recursively search for files and directories matching a pattern. " +
"Searches through all subdirectories from the starting path. The search " +
"is case-insensitive and matches partial names. Returns full paths to all " +
"matching items. Great for finding files when you don't know their exact location. " +
"Only searches within allowed directories.",
inputSchema: zodToJsonSchema(SearchFilesArgsSchema) as ToolInput,
},
{
name: "get_file_info",
description:
"Retrieve detailed metadata about a file or directory. Returns comprehensive " +
"information including size, creation time, last modified time, permissions, " +
"and type. This tool is perfect for understanding file characteristics " +
"without reading the actual content. Only works within allowed directories.",
inputSchema: zodToJsonSchema(GetFileInfoArgsSchema) as ToolInput,
},
{
name: "list_allowed_directories",
description:
"Returns the list of directories that this server is allowed to access. " +
"Use this to understand which directories are available before trying to access files.",
inputSchema: {
type: "object",
properties: {},
required: [],
},
},
],
};
});
server.setRequestHandler(CallToolRequestSchema, async (request) => {
try {
const { name, arguments: args } = request.params;
switch (name) {
case "read_file": {
const parsed = ReadFileArgsSchema.safeParse(args);
if (!parsed.success) {
throw new Error(`Invalid arguments for read_file: ${parsed.error}`);
}
const validPath = await validatePath(parsed.data.path);
const content = await fs.readFile(validPath, "utf-8");
return {
content: [{ type: "text", text: content }],
};
}
case "read_multiple_files": {
const parsed = ReadMultipleFilesArgsSchema.safeParse(args);
if (!parsed.success) {
throw new Error(`Invalid arguments for read_multiple_files: ${parsed.error}`);
}
const results = await Promise.all(
parsed.data.paths.map(async (filePath: string) => {
try {
const validPath = await validatePath(filePath);
const content = await fs.readFile(validPath, "utf-8");
return `${filePath}:\n${content}\n`;
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
return `${filePath}: Error - ${errorMessage}`;
}
}),
);
return {
content: [{ type: "text", text: results.join("\n---\n") }],
};
}
case "write_file": {
const parsed = WriteFileArgsSchema.safeParse(args);
if (!parsed.success) {
throw new Error(`Invalid arguments for write_file: ${parsed.error}`);
}
const validPath = await validatePath(parsed.data.path);
await fs.writeFile(validPath, parsed.data.content, "utf-8");
return {
content: [{ type: "text", text: `Successfully wrote to ${parsed.data.path}` }],
};
}
case "edit_file": {
const parsed = EditFileArgsSchema.safeParse(args);
if (!parsed.success) {
throw new Error(`Invalid arguments for edit_file: ${parsed.error}`);
}
const validPath = await validatePath(parsed.data.path);
const result = await applyFileEdits(validPath, parsed.data.edits, parsed.data.dryRun);
return {
content: [{ type: "text", text: result }],
};
}
case "create_directory": {
const parsed = CreateDirectoryArgsSchema.safeParse(args);
if (!parsed.success) {
throw new Error(`Invalid arguments for create_directory: ${parsed.error}`);
}
const validPath = await validatePath(parsed.data.path);
await fs.mkdir(validPath, { recursive: true });
return {
content: [{ type: "text", text: `Successfully created directory ${parsed.data.path}` }],
};
}
case "list_directory": {
const parsed = ListDirectoryArgsSchema.safeParse(args);
if (!parsed.success) {
throw new Error(`Invalid arguments for list_directory: ${parsed.error}`);
}
const validPath = await validatePath(parsed.data.path);
const entries = await fs.readdir(validPath, { withFileTypes: true });
const formatted = entries
.map((entry) => `${entry.isDirectory() ? "[DIR]" : "[FILE]"} ${entry.name}`)
.join("\n");
return {
content: [{ type: "text", text: formatted }],
};
}
case "directory_tree": {
const parsed = DirectoryTreeArgsSchema.safeParse(args);
if (!parsed.success) {
throw new Error(`Invalid arguments for directory_tree: ${parsed.error}`);
}
interface TreeEntry {
name: string;
type: 'file' | 'directory';
children?: TreeEntry[];
}
async function buildTree(currentPath: string): Promise<TreeEntry[]> {
const validPath = await validatePath(currentPath);
const entries = await fs.readdir(validPath, {withFileTypes: true});
const result: TreeEntry[] = [];
for (const entry of entries) {
const entryData: TreeEntry = {
name: entry.name,
type: entry.isDirectory() ? 'directory' : 'file'
};
if (entry.isDirectory()) {
const subPath = path.join(currentPath, entry.name);
entryData.children = await buildTree(subPath);
}
result.push(entryData);
}
return result;
}
const treeData = await buildTree(parsed.data.path);
return {
content: [{
type: "text",
text: JSON.stringify(treeData, null, 2)
}],
};
}
case "move_file": {
const parsed = MoveFileArgsSchema.safeParse(args);
if (!parsed.success) {
throw new Error(`Invalid arguments for move_file: ${parsed.error}`);
}
const validSourcePath = await validatePath(parsed.data.source);
const validDestPath = await validatePath(parsed.data.destination);
await fs.rename(validSourcePath, validDestPath);
return {
content: [{ type: "text", text: `Successfully moved ${parsed.data.source} to ${parsed.data.destination}` }],
};
}
case "search_files": {
const parsed = SearchFilesArgsSchema.safeParse(args);
if (!parsed.success) {
throw new Error(`Invalid arguments for search_files: ${parsed.error}`);
}
const validPath = await validatePath(parsed.data.path);
const results = await searchFiles(validPath, parsed.data.pattern, parsed.data.excludePatterns);
return {
content: [{ type: "text", text: results.length > 0 ? results.join("\n") : "No matches found" }],
};
}
case "get_file_info": {
const parsed = GetFileInfoArgsSchema.safeParse(args);
if (!parsed.success) {
throw new Error(`Invalid arguments for get_file_info: ${parsed.error}`);
}
const validPath = await validatePath(parsed.data.path);
const info = await getFileStats(validPath);
return {
content: [{ type: "text", text: Object.entries(info)
.map(([key, value]) => `${key}: ${value}`)
.join("\n") }],
};
}
case "list_allowed_directories": {
return {
content: [{
type: "text",
text: `Allowed directories:\n${allowedDirectories.join('\n')}`
}],
};
}
default:
throw new Error(`Unknown tool: ${name}`);
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
return {
content: [{ type: "text", text: `Error: ${errorMessage}` }],
isError: true,
};
}
});
// Start server
async function runServer() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("Secure MCP Filesystem Server running on stdio");
console.error("Allowed directories:", allowedDirectories);
}
runServer().catch((error) => {
console.error("Fatal error running server:", error);
process.exit(1);
});
```