This is page 5 of 5. Use http://codebase.md/php-mcp/server?lines=false&page={x} to view the full context. # Directory Structure ``` ├── .github │ └── workflows │ ├── changelog.yml │ └── tests.yml ├── .gitignore ├── .php-cs-fixer.php ├── CHANGELOG.md ├── composer.json ├── CONTRIBUTING.md ├── examples │ ├── 01-discovery-stdio-calculator │ │ ├── McpElements.php │ │ └── server.php │ ├── 02-discovery-http-userprofile │ │ ├── McpElements.php │ │ ├── server.php │ │ └── UserIdCompletionProvider.php │ ├── 03-manual-registration-stdio │ │ ├── server.php │ │ └── SimpleHandlers.php │ ├── 04-combined-registration-http │ │ ├── DiscoveredElements.php │ │ ├── ManualHandlers.php │ │ └── server.php │ ├── 05-stdio-env-variables │ │ ├── EnvToolHandler.php │ │ └── server.php │ ├── 06-custom-dependencies-stdio │ │ ├── McpTaskHandlers.php │ │ ├── server.php │ │ └── Services.php │ ├── 07-complex-tool-schema-http │ │ ├── EventTypes.php │ │ ├── McpEventScheduler.php │ │ └── server.php │ └── 08-schema-showcase-streamable │ ├── SchemaShowcaseElements.php │ └── server.php ├── LICENSE ├── phpunit.xml ├── README.md ├── src │ ├── Attributes │ │ ├── CompletionProvider.php │ │ ├── McpPrompt.php │ │ ├── McpResource.php │ │ ├── McpResourceTemplate.php │ │ ├── McpTool.php │ │ └── Schema.php │ ├── Configuration.php │ ├── Context.php │ ├── Contracts │ │ ├── CompletionProviderInterface.php │ │ ├── EventStoreInterface.php │ │ ├── LoggerAwareInterface.php │ │ ├── LoopAwareInterface.php │ │ ├── ServerTransportInterface.php │ │ ├── SessionHandlerInterface.php │ │ └── SessionInterface.php │ ├── Defaults │ │ ├── ArrayCache.php │ │ ├── BasicContainer.php │ │ ├── DefaultUuidSessionIdGenerator.php │ │ ├── EnumCompletionProvider.php │ │ ├── FileCache.php │ │ ├── InMemoryEventStore.php │ │ ├── ListCompletionProvider.php │ │ └── SystemClock.php │ ├── Dispatcher.php │ ├── Elements │ │ ├── RegisteredElement.php │ │ ├── RegisteredPrompt.php │ │ ├── RegisteredResource.php │ │ ├── RegisteredResourceTemplate.php │ │ └── RegisteredTool.php │ ├── Exception │ │ ├── ConfigurationException.php │ │ ├── DiscoveryException.php │ │ ├── McpServerException.php │ │ ├── ProtocolException.php │ │ └── TransportException.php │ ├── Protocol.php │ ├── Registry.php │ ├── Server.php │ ├── ServerBuilder.php │ ├── Session │ │ ├── ArraySessionHandler.php │ │ ├── CacheSessionHandler.php │ │ ├── Session.php │ │ ├── SessionManager.php │ │ └── SubscriptionManager.php │ ├── Transports │ │ ├── HttpServerTransport.php │ │ ├── StdioServerTransport.php │ │ └── StreamableHttpServerTransport.php │ └── Utils │ ├── Discoverer.php │ ├── DocBlockParser.php │ ├── HandlerResolver.php │ ├── SchemaGenerator.php │ └── SchemaValidator.php └── tests ├── Fixtures │ ├── Discovery │ │ ├── DiscoverablePromptHandler.php │ │ ├── DiscoverableResourceHandler.php │ │ ├── DiscoverableTemplateHandler.php │ │ ├── DiscoverableToolHandler.php │ │ ├── EnhancedCompletionHandler.php │ │ ├── InvocablePromptFixture.php │ │ ├── InvocableResourceFixture.php │ │ ├── InvocableResourceTemplateFixture.php │ │ ├── InvocableToolFixture.php │ │ ├── NonDiscoverableClass.php │ │ └── SubDir │ │ └── HiddenTool.php │ ├── Enums │ │ ├── BackedIntEnum.php │ │ ├── BackedStringEnum.php │ │ ├── PriorityEnum.php │ │ ├── StatusEnum.php │ │ └── UnitEnum.php │ ├── General │ │ ├── CompletionProviderFixture.php │ │ ├── DocBlockTestFixture.php │ │ ├── InvokableHandlerFixture.php │ │ ├── PromptHandlerFixture.php │ │ ├── RequestAttributeChecker.php │ │ ├── ResourceHandlerFixture.php │ │ ├── ToolHandlerFixture.php │ │ └── VariousTypesHandler.php │ ├── Middlewares │ │ ├── ErrorMiddleware.php │ │ ├── FirstMiddleware.php │ │ ├── HeaderMiddleware.php │ │ ├── RequestAttributeMiddleware.php │ │ ├── SecondMiddleware.php │ │ ├── ShortCircuitMiddleware.php │ │ └── ThirdMiddleware.php │ ├── Schema │ │ └── SchemaGenerationTarget.php │ ├── ServerScripts │ │ ├── HttpTestServer.php │ │ ├── StdioTestServer.php │ │ └── StreamableHttpTestServer.php │ └── Utils │ ├── AttributeFixtures.php │ ├── DockBlockParserFixture.php │ └── SchemaGeneratorFixture.php ├── Integration │ ├── DiscoveryTest.php │ ├── HttpServerTransportTest.php │ ├── SchemaGenerationTest.php │ ├── StdioServerTransportTest.php │ └── StreamableHttpServerTransportTest.php ├── Mocks │ ├── Clients │ │ ├── MockJsonHttpClient.php │ │ ├── MockSseClient.php │ │ └── MockStreamHttpClient.php │ └── Clock │ └── FixedClock.php ├── Pest.php ├── TestCase.php └── Unit ├── Attributes │ ├── CompletionProviderTest.php │ ├── McpPromptTest.php │ ├── McpResourceTemplateTest.php │ ├── McpResourceTest.php │ └── McpToolTest.php ├── ConfigurationTest.php ├── Defaults │ ├── EnumCompletionProviderTest.php │ └── ListCompletionProviderTest.php ├── DispatcherTest.php ├── Elements │ ├── RegisteredElementTest.php │ ├── RegisteredPromptTest.php │ ├── RegisteredResourceTemplateTest.php │ ├── RegisteredResourceTest.php │ └── RegisteredToolTest.php ├── ProtocolTest.php ├── RegistryTest.php ├── ServerBuilderTest.php ├── ServerTest.php ├── Session │ ├── ArraySessionHandlerTest.php │ ├── CacheSessionHandlerTest.php │ ├── SessionManagerTest.php │ └── SessionTest.php └── Utils ├── DocBlockParserTest.php ├── HandlerResolverTest.php └── SchemaValidatorTest.php ``` # Files -------------------------------------------------------------------------------- /src/Transports/StreamableHttpServerTransport.php: -------------------------------------------------------------------------------- ```php <?php declare(strict_types=1); namespace PhpMcp\Server\Transports; use Evenement\EventEmitterTrait; use PhpMcp\Server\Contracts\EventStoreInterface; use PhpMcp\Server\Contracts\LoggerAwareInterface; use PhpMcp\Server\Contracts\LoopAwareInterface; use PhpMcp\Server\Contracts\ServerTransportInterface; use PhpMcp\Server\Exception\McpServerException; use PhpMcp\Server\Exception\TransportException; use PhpMcp\Schema\JsonRpc\Message; use PhpMcp\Schema\JsonRpc\BatchRequest; use PhpMcp\Schema\JsonRpc\BatchResponse; use PhpMcp\Schema\JsonRpc\Error; use PhpMcp\Schema\JsonRpc\Parser; use PhpMcp\Schema\JsonRpc\Request; use PhpMcp\Schema\JsonRpc\Response; use Psr\Http\Message\ServerRequestInterface; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use React\EventLoop\Loop; use React\EventLoop\LoopInterface; use React\Http\HttpServer; use React\Http\Message\Response as HttpResponse; use React\Promise\Deferred; use React\Promise\PromiseInterface; use React\Socket\SocketServer; use React\Stream\ThroughStream; use Throwable; use function React\Promise\resolve; use function React\Promise\reject; class StreamableHttpServerTransport implements ServerTransportInterface, LoggerAwareInterface, LoopAwareInterface { use EventEmitterTrait; protected LoggerInterface $logger; protected LoopInterface $loop; private ?SocketServer $socket = null; private ?HttpServer $http = null; private bool $listening = false; private bool $closing = false; private ?EventStoreInterface $eventStore; /** * Stores Deferred objects for POST requests awaiting a direct JSON response. * Keyed by a unique pendingRequestId. * @var array<string, Deferred> */ private array $pendingRequests = []; /** * Stores active SSE streams. * Key: streamId * Value: ['stream' => ThroughStream, 'sessionId' => string, 'context' => array] * @var array<string, array{stream: ThroughStream, sessionId: string, context: array}> */ private array $activeSseStreams = []; private ?ThroughStream $getStream = null; /** * @param bool $enableJsonResponse If true, the server will return JSON responses instead of starting an SSE stream. * @param bool $stateless If true, the server will not emit client_connected events. * @param EventStoreInterface $eventStore If provided, the server will replay events to the client. * @param array<callable(\Psr\Http\Message\ServerRequestInterface, callable): (\Psr\Http\Message\ResponseInterface|\React\Promise\PromiseInterface)> $middlewares Middlewares to be applied to the HTTP server. * This can be useful for simple request/response scenarios without streaming. */ public function __construct( private readonly string $host = '127.0.0.1', private readonly int $port = 8080, private string $mcpPath = '/mcp', private ?array $sslContext = null, private readonly bool $enableJsonResponse = true, private readonly bool $stateless = false, ?EventStoreInterface $eventStore = null, private array $middlewares = [] ) { $this->logger = new NullLogger(); $this->loop = Loop::get(); $this->mcpPath = '/' . trim($mcpPath, '/'); $this->eventStore = $eventStore; foreach ($this->middlewares as $mw) { if (!is_callable($mw)) { throw new \InvalidArgumentException('All provided middlewares must be callable.'); } } } protected function generateId(): string { return bin2hex(random_bytes(16)); // 32 hex characters } public function setLogger(LoggerInterface $logger): void { $this->logger = $logger; } public function setLoop(LoopInterface $loop): void { $this->loop = $loop; } public function listen(): void { if ($this->listening) { throw new TransportException('StreamableHttp transport is already listening.'); } if ($this->closing) { throw new TransportException('Cannot listen, transport is closing/closed.'); } $listenAddress = "{$this->host}:{$this->port}"; $protocol = $this->sslContext ? 'https' : 'http'; try { $this->socket = new SocketServer( $listenAddress, $this->sslContext ?? [], $this->loop ); $handlers = array_merge($this->middlewares, [$this->createRequestHandler()]); $this->http = new HttpServer($this->loop, ...$handlers); $this->http->listen($this->socket); $this->socket->on('error', function (Throwable $error) { $this->logger->error('Socket server error (StreamableHttp).', ['error' => $error->getMessage()]); $this->emit('error', [new TransportException("Socket server error: {$error->getMessage()}", 0, $error)]); $this->close(); }); $this->logger->info("Server is up and listening on {$protocol}://{$listenAddress} 🚀"); $this->logger->info("MCP Endpoint: {$protocol}://{$listenAddress}{$this->mcpPath}"); $this->listening = true; $this->closing = false; $this->emit('ready'); } catch (Throwable $e) { $this->logger->error("Failed to start StreamableHttp listener on {$listenAddress}", ['exception' => $e]); throw new TransportException("Failed to start StreamableHttp listener on {$listenAddress}: {$e->getMessage()}", 0, $e); } } private function createRequestHandler(): callable { return function (ServerRequestInterface $request) { $path = $request->getUri()->getPath(); $method = $request->getMethod(); $this->logger->debug("Request received", ['method' => $method, 'path' => $path, 'target' => $this->mcpPath]); if ($path !== $this->mcpPath) { $error = Error::forInvalidRequest("Not found: {$path}"); return new HttpResponse(404, ['Content-Type' => 'application/json'], json_encode($error)); } $corsHeaders = [ 'Access-Control-Allow-Origin' => '*', 'Access-Control-Allow-Methods' => 'GET, POST, DELETE, OPTIONS', 'Access-Control-Allow-Headers' => 'Content-Type, Mcp-Session-Id, Last-Event-ID, Authorization', ]; if ($method === 'OPTIONS') { return new HttpResponse(204, $corsHeaders); } $addCors = function (HttpResponse $r) use ($corsHeaders) { foreach ($corsHeaders as $key => $value) { $r = $r->withAddedHeader($key, $value); } return $r; }; try { return match ($method) { 'GET' => $this->handleGetRequest($request)->then($addCors, fn($e) => $addCors($this->handleRequestError($e, $request))), 'POST' => $this->handlePostRequest($request)->then($addCors, fn($e) => $addCors($this->handleRequestError($e, $request))), 'DELETE' => $this->handleDeleteRequest($request)->then($addCors, fn($e) => $addCors($this->handleRequestError($e, $request))), default => $addCors($this->handleUnsupportedRequest($request)), }; } catch (Throwable $e) { return $addCors($this->handleRequestError($e, $request)); } }; } private function handleGetRequest(ServerRequestInterface $request): PromiseInterface { if ($this->stateless) { $error = Error::forInvalidRequest("GET requests (SSE streaming) are not supported in stateless mode."); return resolve(new HttpResponse(405, ['Content-Type' => 'application/json'], json_encode($error))); } $acceptHeader = $request->getHeaderLine('Accept'); if (!str_contains($acceptHeader, 'text/event-stream')) { $error = Error::forInvalidRequest("Not Acceptable: Client must accept text/event-stream for GET requests."); return resolve(new HttpResponse(406, ['Content-Type' => 'application/json'], json_encode($error))); } $sessionId = $request->getHeaderLine('Mcp-Session-Id'); if (empty($sessionId)) { $this->logger->warning("GET request without Mcp-Session-Id."); $error = Error::forInvalidRequest("Mcp-Session-Id header required for GET requests."); return resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error))); } $this->getStream = new ThroughStream(); $this->getStream->on('close', function () use ($sessionId) { $this->logger->debug("GET SSE stream closed.", ['sessionId' => $sessionId]); $this->getStream = null; }); $this->getStream->on('error', function (Throwable $e) use ($sessionId) { $this->logger->error("GET SSE stream error.", ['sessionId' => $sessionId, 'error' => $e->getMessage()]); $this->getStream = null; }); $headers = [ 'Content-Type' => 'text/event-stream', 'Cache-Control' => 'no-cache', 'Connection' => 'keep-alive', 'X-Accel-Buffering' => 'no', ]; $response = new HttpResponse(200, $headers, $this->getStream); if ($this->eventStore) { $lastEventId = $request->getHeaderLine('Last-Event-ID'); $this->replayEvents($lastEventId, $this->getStream, $sessionId); } return resolve($response); } private function handlePostRequest(ServerRequestInterface $request): PromiseInterface { $deferred = new Deferred(); $acceptHeader = $request->getHeaderLine('Accept'); if (!str_contains($acceptHeader, 'application/json') && !str_contains($acceptHeader, 'text/event-stream')) { $error = Error::forInvalidRequest("Not Acceptable: Client must accept both application/json or text/event-stream"); $deferred->resolve(new HttpResponse(406, ['Content-Type' => 'application/json'], json_encode($error))); return $deferred->promise(); } if (!str_contains($request->getHeaderLine('Content-Type'), 'application/json')) { $error = Error::forInvalidRequest("Unsupported Media Type: Content-Type must be application/json"); $deferred->resolve(new HttpResponse(415, ['Content-Type' => 'application/json'], json_encode($error))); return $deferred->promise(); } $body = $request->getBody()->getContents(); if (empty($body)) { $this->logger->warning("Received empty POST body"); $error = Error::forInvalidRequest("Empty request body."); $deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error))); return $deferred->promise(); } try { $message = Parser::parse($body); } catch (Throwable $e) { $this->logger->error("Failed to parse MCP message from POST body", ['error' => $e->getMessage()]); $error = Error::forParseError("Invalid JSON: " . $e->getMessage()); $deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error))); return $deferred->promise(); } $isInitializeRequest = ($message instanceof Request && $message->method === 'initialize'); $sessionId = null; if ($this->stateless) { $sessionId = $this->generateId(); $this->emit('client_connected', [$sessionId]); } else { if ($isInitializeRequest) { if ($request->hasHeader('Mcp-Session-Id')) { $this->logger->warning("Client sent Mcp-Session-Id with InitializeRequest. Ignoring.", ['clientSentId' => $request->getHeaderLine('Mcp-Session-Id')]); $error = Error::forInvalidRequest("Invalid request: Session already initialized. Mcp-Session-Id header not allowed with InitializeRequest.", $message->getId()); $deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error))); return $deferred->promise(); } $sessionId = $this->generateId(); $this->emit('client_connected', [$sessionId]); } else { $sessionId = $request->getHeaderLine('Mcp-Session-Id'); if (empty($sessionId)) { $this->logger->warning("POST request without Mcp-Session-Id."); $error = Error::forInvalidRequest("Mcp-Session-Id header required for POST requests.", $message->getId()); $deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error))); return $deferred->promise(); } } } $context = [ 'is_initialize_request' => $isInitializeRequest, ]; $nRequests = match (true) { $message instanceof Request => 1, $message instanceof BatchRequest => $message->nRequests(), default => 0, }; if ($nRequests === 0) { $deferred->resolve(new HttpResponse(202)); $context['type'] = 'post_202_sent'; } else { if ($this->enableJsonResponse) { $pendingRequestId = $this->generateId(); $this->pendingRequests[$pendingRequestId] = $deferred; $timeoutTimer = $this->loop->addTimer(30, function () use ($pendingRequestId, $sessionId) { if (isset($this->pendingRequests[$pendingRequestId])) { $deferred = $this->pendingRequests[$pendingRequestId]; unset($this->pendingRequests[$pendingRequestId]); $this->logger->warning("Timeout waiting for direct JSON response processing.", ['pending_request_id' => $pendingRequestId, 'session_id' => $sessionId]); $errorResponse = McpServerException::internalError("Request processing timed out.")->toJsonRpcError($pendingRequestId); $deferred->resolve(new HttpResponse(500, ['Content-Type' => 'application/json'], json_encode($errorResponse->toArray()))); } }); $this->pendingRequests[$pendingRequestId]->promise()->finally(function () use ($timeoutTimer) { $this->loop->cancelTimer($timeoutTimer); }); $context['type'] = 'post_json'; $context['pending_request_id'] = $pendingRequestId; } else { $streamId = $this->generateId(); $sseStream = new ThroughStream(); $this->activeSseStreams[$streamId] = [ 'stream' => $sseStream, 'sessionId' => $sessionId, 'context' => ['nRequests' => $nRequests, 'nResponses' => 0] ]; $sseStream->on('close', function () use ($streamId) { $this->logger->info("POST SSE stream closed by client/server.", ['streamId' => $streamId, 'sessionId' => $this->activeSseStreams[$streamId]['sessionId']]); unset($this->activeSseStreams[$streamId]); }); $sseStream->on('error', function (Throwable $e) use ($streamId) { $this->logger->error("POST SSE stream error.", ['streamId' => $streamId, 'sessionId' => $this->activeSseStreams[$streamId]['sessionId'], 'error' => $e->getMessage()]); unset($this->activeSseStreams[$streamId]); }); $headers = [ 'Content-Type' => 'text/event-stream', 'Cache-Control' => 'no-cache', 'Connection' => 'keep-alive', 'X-Accel-Buffering' => 'no', ]; if (!empty($sessionId) && !$this->stateless) { $headers['Mcp-Session-Id'] = $sessionId; } $deferred->resolve(new HttpResponse(200, $headers, $sseStream)); $context['type'] = 'post_sse'; $context['streamId'] = $streamId; $context['nRequests'] = $nRequests; } } $context['stateless'] = $this->stateless; $context['request'] = $request; $this->loop->futureTick(function () use ($message, $sessionId, $context) { $this->emit('message', [$message, $sessionId, $context]); }); return $deferred->promise(); } private function handleDeleteRequest(ServerRequestInterface $request): PromiseInterface { if ($this->stateless) { return resolve(new HttpResponse(204)); } $sessionId = $request->getHeaderLine('Mcp-Session-Id'); if (empty($sessionId)) { $this->logger->warning("DELETE request without Mcp-Session-Id."); $error = Error::forInvalidRequest("Mcp-Session-Id header required for DELETE."); return resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error))); } $streamsToClose = []; foreach ($this->activeSseStreams as $streamId => $streamInfo) { if ($streamInfo['sessionId'] === $sessionId) { $streamsToClose[] = $streamId; } } foreach ($streamsToClose as $streamId) { $this->activeSseStreams[$streamId]['stream']->end(); unset($this->activeSseStreams[$streamId]); } if ($this->getStream !== null) { $this->getStream->end(); $this->getStream = null; } $this->emit('client_disconnected', [$sessionId, 'Session terminated by DELETE request']); return resolve(new HttpResponse(204)); } private function handleUnsupportedRequest(ServerRequestInterface $request): HttpResponse { $error = Error::forInvalidRequest("Method not allowed: {$request->getMethod()}"); $headers = [ 'Content-Type' => 'application/json', 'Allow' => 'GET, POST, DELETE, OPTIONS', ]; return new HttpResponse(405, $headers, json_encode($error)); } private function handleRequestError(Throwable $e, ServerRequestInterface $request): HttpResponse { $this->logger->error("Error processing HTTP request", [ 'method' => $request->getMethod(), 'path' => $request->getUri()->getPath(), 'exception' => $e->getMessage() ]); if ($e instanceof TransportException) { $error = Error::forInternalError("Transport Error: " . $e->getMessage()); return new HttpResponse(500, ['Content-Type' => 'application/json'], json_encode($error)); } $error = Error::forInternalError("Internal Server Error during HTTP request processing."); return new HttpResponse(500, ['Content-Type' => 'application/json'], json_encode($error)); } public function sendMessage(Message $message, string $sessionId, array $context = []): PromiseInterface { if ($this->closing) { return reject(new TransportException('Transport is closing.')); } $isInitializeResponse = ($context['is_initialize_request'] ?? false) && ($message instanceof Response); switch ($context['type'] ?? null) { case 'post_202_sent': return resolve(null); case 'post_sse': $streamId = $context['streamId']; if (!isset($this->activeSseStreams[$streamId])) { $this->logger->error("SSE stream for POST not found.", ['streamId' => $streamId, 'sessionId' => $sessionId]); return reject(new TransportException("SSE stream {$streamId} not found for POST response.")); } $stream = $this->activeSseStreams[$streamId]['stream']; if (!$stream->isWritable()) { $this->logger->warning("SSE stream for POST is not writable.", ['streamId' => $streamId, 'sessionId' => $sessionId]); return reject(new TransportException("SSE stream {$streamId} for POST is not writable.")); } $sentCountThisCall = 0; if ($message instanceof Response || $message instanceof Error) { $json = json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); $eventId = $this->eventStore ? $this->eventStore->storeEvent($streamId, $json) : null; $this->sendSseEventToStream($stream, $json, $eventId); $sentCountThisCall = 1; } elseif ($message instanceof BatchResponse) { foreach ($message->getAll() as $singleResponse) { $json = json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); $eventId = $this->eventStore ? $this->eventStore->storeEvent($streamId, $json) : null; $this->sendSseEventToStream($stream, $json, $eventId); $sentCountThisCall++; } } if (isset($this->activeSseStreams[$streamId]['context'])) { $this->activeSseStreams[$streamId]['context']['nResponses'] += $sentCountThisCall; if ($this->activeSseStreams[$streamId]['context']['nResponses'] >= $this->activeSseStreams[$streamId]['context']['nRequests']) { $this->logger->info("All expected responses sent for POST SSE stream. Closing.", ['streamId' => $streamId, 'sessionId' => $sessionId]); $stream->end(); // Will trigger 'close' event. if ($context['stateless'] ?? false) { $this->loop->futureTick(function () use ($sessionId) { $this->emit('client_disconnected', [$sessionId, 'Stateless request completed']); }); } } } return resolve(null); case 'post_json': $pendingRequestId = $context['pending_request_id']; if (!isset($this->pendingRequests[$pendingRequestId])) { $this->logger->error("Pending direct JSON request not found.", ['pending_request_id' => $pendingRequestId, 'session_id' => $sessionId]); return reject(new TransportException("Pending request {$pendingRequestId} not found.")); } $deferred = $this->pendingRequests[$pendingRequestId]; unset($this->pendingRequests[$pendingRequestId]); $responseBody = json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); $headers = ['Content-Type' => 'application/json']; if ($isInitializeResponse && !$this->stateless) { $headers['Mcp-Session-Id'] = $sessionId; } $statusCode = $context['status_code'] ?? 200; $deferred->resolve(new HttpResponse($statusCode, $headers, $responseBody . "\n")); if ($context['stateless'] ?? false) { $this->loop->futureTick(function () use ($sessionId) { $this->emit('client_disconnected', [$sessionId, 'Stateless request completed']); }); } return resolve(null); default: if ($this->getStream === null) { $this->logger->error("GET SSE stream not found.", ['sessionId' => $sessionId]); return reject(new TransportException("GET SSE stream not found.")); } if (!$this->getStream->isWritable()) { $this->logger->warning("GET SSE stream is not writable.", ['sessionId' => $sessionId]); return reject(new TransportException("GET SSE stream not writable.")); } if ($message instanceof Response || $message instanceof Error) { $json = json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); $eventId = $this->eventStore ? $this->eventStore->storeEvent('GET_STREAM', $json) : null; $this->sendSseEventToStream($this->getStream, $json, $eventId); } elseif ($message instanceof BatchResponse) { foreach ($message->getAll() as $singleResponse) { $json = json_encode($singleResponse, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); $eventId = $this->eventStore ? $this->eventStore->storeEvent('GET_STREAM', $json) : null; $this->sendSseEventToStream($this->getStream, $json, $eventId); } } return resolve(null); } } private function replayEvents(string $lastEventId, ThroughStream $sseStream, string $sessionId): void { if (empty($lastEventId)) { return; } try { $this->eventStore->replayEventsAfter( $lastEventId, function (string $replayedEventId, string $json) use ($sseStream) { $this->logger->debug("Replaying event", ['replayedEventId' => $replayedEventId]); $this->sendSseEventToStream($sseStream, $json, $replayedEventId); } ); } catch (Throwable $e) { $this->logger->error("Error during event replay.", ['sessionId' => $sessionId, 'exception' => $e]); } } private function sendSseEventToStream(ThroughStream $stream, string $data, ?string $eventId = null): bool { if (! $stream->isWritable()) { return false; } $frame = "event: message\n"; if ($eventId !== null) { $frame .= "id: {$eventId}\n"; } $lines = explode("\n", $data); foreach ($lines as $line) { $frame .= "data: {$line}\n"; } $frame .= "\n"; return $stream->write($frame); } public function close(): void { if ($this->closing) { return; } $this->closing = true; $this->listening = false; $this->logger->info('Closing transport...'); if ($this->socket) { $this->socket->close(); $this->socket = null; } foreach ($this->activeSseStreams as $streamId => $streamInfo) { if ($streamInfo['stream']->isWritable()) { $streamInfo['stream']->end(); } } if ($this->getStream !== null) { $this->getStream->end(); $this->getStream = null; } foreach ($this->pendingRequests as $pendingRequestId => $deferred) { $deferred->reject(new TransportException('Transport is closing.')); } $this->activeSseStreams = []; $this->pendingRequests = []; $this->emit('close', ['Transport closed.']); $this->removeAllListeners(); } } ``` -------------------------------------------------------------------------------- /src/Utils/SchemaGenerator.php: -------------------------------------------------------------------------------- ```php <?php namespace PhpMcp\Server\Utils; use phpDocumentor\Reflection\DocBlock\Tags\Param; use PhpMcp\Server\Attributes\Schema; use PhpMcp\Server\Context; use ReflectionEnum; use ReflectionIntersectionType; use ReflectionMethod; use ReflectionNamedType; use ReflectionParameter; use ReflectionType; use ReflectionUnionType; use stdClass; /** * Generates JSON Schema for method parameters with intelligent Schema attribute handling. * * Priority system: * 1. Schema attributes (method-level and parameter-level) * 2. Reflection type information * 3. DocBlock type information */ class SchemaGenerator { private DocBlockParser $docBlockParser; public function __construct(DocBlockParser $docBlockParser) { $this->docBlockParser = $docBlockParser; } /** * Generates a JSON Schema object (as a PHP array) for a method's or function's parameters. */ public function generate(\ReflectionMethod|\ReflectionFunction $reflection): array { $methodSchema = $this->extractMethodLevelSchema($reflection); if ($methodSchema && isset($methodSchema['definition'])) { return $methodSchema['definition']; } $parametersInfo = $this->parseParametersInfo($reflection); return $this->buildSchemaFromParameters($parametersInfo, $methodSchema); } /** * Extracts method-level or function-level Schema attribute. */ private function extractMethodLevelSchema(\ReflectionMethod|\ReflectionFunction $reflection): ?array { $schemaAttrs = $reflection->getAttributes(Schema::class, \ReflectionAttribute::IS_INSTANCEOF); if (empty($schemaAttrs)) { return null; } $schemaAttr = $schemaAttrs[0]->newInstance(); return $schemaAttr->toArray(); } /** * Extracts parameter-level Schema attribute. */ private function extractParameterLevelSchema(ReflectionParameter $parameter): array { $schemaAttrs = $parameter->getAttributes(Schema::class, \ReflectionAttribute::IS_INSTANCEOF); if (empty($schemaAttrs)) { return []; } $schemaAttr = $schemaAttrs[0]->newInstance(); return $schemaAttr->toArray(); } /** * Builds the final schema from parameter information and method-level schema. * * @param array<int, array{ * name: string, * doc_block_tag: Param|null, * reflection_param: ReflectionParameter, * reflection_type_object: ReflectionType|null, * type_string: string, * description: string|null, * required: bool, * allows_null: bool, * default_value: mixed|null, * has_default: bool, * is_variadic: bool, * parameter_schema: array<string, mixed> * }> $parametersInfo * * @param array<string, mixed>|null $methodSchema * * @return array<string, mixed> */ private function buildSchemaFromParameters(array $parametersInfo, ?array $methodSchema): array { $schema = [ 'type' => 'object', 'properties' => [], 'required' => [], ]; // Apply method-level schema as base if ($methodSchema) { $schema = array_merge($schema, $methodSchema); if (!isset($schema['type'])) { $schema['type'] = 'object'; } if (!isset($schema['properties'])) { $schema['properties'] = []; } if (!isset($schema['required'])) { $schema['required'] = []; } } foreach ($parametersInfo as $paramInfo) { $paramName = $paramInfo['name']; $methodLevelParamSchema = $schema['properties'][$paramName] ?? null; $paramSchema = $this->buildParameterSchema($paramInfo, $methodLevelParamSchema); $schema['properties'][$paramName] = $paramSchema; if ($paramInfo['required'] && !in_array($paramName, $schema['required'])) { $schema['required'][] = $paramName; } elseif (!$paramInfo['required'] && ($key = array_search($paramName, $schema['required'])) !== false) { unset($schema['required'][$key]); $schema['required'] = array_values($schema['required']); // Re-index } } // Clean up empty properties if (empty($schema['properties'])) { $schema['properties'] = new stdClass(); } if (empty($schema['required'])) { unset($schema['required']); } return $schema; } /** * Builds the final schema for a single parameter by merging all three levels. * * @param array{ * name: string, * doc_block_tag: Param|null, * reflection_param: ReflectionParameter, * reflection_type_object: ReflectionType|null, * type_string: string, * description: string|null, * required: bool, * allows_null: bool, * default_value: mixed|null, * has_default: bool, * is_variadic: bool, * parameter_schema: array<string, mixed> * } $paramInfo * @param array<string, mixed>|null $methodLevelParamSchema */ private function buildParameterSchema(array $paramInfo, ?array $methodLevelParamSchema = null): array { if ($paramInfo['is_variadic']) { return $this->buildVariadicParameterSchema($paramInfo); } $inferredSchema = $this->buildInferredParameterSchema($paramInfo); // Method-level takes precedence over inferred schema $mergedSchema = $inferredSchema; if ($methodLevelParamSchema) { $mergedSchema = $this->mergeSchemas($inferredSchema, $methodLevelParamSchema); } // Parameter-level takes highest precedence $parameterLevelSchema = $paramInfo['parameter_schema']; if (!empty($parameterLevelSchema)) { if (isset($parameterLevelSchema['definition']) && is_array($parameterLevelSchema['definition'])) { return $parameterLevelSchema['definition']; } $mergedSchema = $this->mergeSchemas($mergedSchema, $parameterLevelSchema); } return $mergedSchema; } /** * Merge two schemas where the dominant schema takes precedence over the recessive one. * * @param array $recessiveSchema The schema with lower precedence * @param array $dominantSchema The schema with higher precedence */ private function mergeSchemas(array $recessiveSchema, array $dominantSchema): array { $mergedSchema = array_merge($recessiveSchema, $dominantSchema); return $mergedSchema; } /** * Builds parameter schema from inferred type and docblock information only. * Returns empty array for variadic parameters (handled separately). */ private function buildInferredParameterSchema(array $paramInfo): array { $paramSchema = []; // Variadic parameters are handled separately if ($paramInfo['is_variadic']) { return []; } // Infer JSON Schema types $jsonTypes = $this->inferParameterTypes($paramInfo); if (count($jsonTypes) === 1) { $paramSchema['type'] = $jsonTypes[0]; } elseif (count($jsonTypes) > 1) { $paramSchema['type'] = $jsonTypes; } // Add description from docblock if ($paramInfo['description']) { $paramSchema['description'] = $paramInfo['description']; } // Add default value only if parameter actually has a default if ($paramInfo['has_default']) { $paramSchema['default'] = $paramInfo['default_value']; } // Handle enums $paramSchema = $this->applyEnumConstraints($paramSchema, $paramInfo); // Handle array items $paramSchema = $this->applyArrayConstraints($paramSchema, $paramInfo); return $paramSchema; } /** * Builds schema for variadic parameters. */ private function buildVariadicParameterSchema(array $paramInfo): array { $paramSchema = ['type' => 'array']; // Apply parameter-level Schema attributes first if (!empty($paramInfo['parameter_schema'])) { $paramSchema = array_merge($paramSchema, $paramInfo['parameter_schema']); // Ensure type is always array for variadic $paramSchema['type'] = 'array'; } if ($paramInfo['description']) { $paramSchema['description'] = $paramInfo['description']; } // If no items specified by Schema attribute, infer from type if (!isset($paramSchema['items'])) { $itemJsonTypes = $this->mapPhpTypeToJsonSchemaType($paramInfo['type_string']); $nonNullItemTypes = array_filter($itemJsonTypes, fn($t) => $t !== 'null'); if (count($nonNullItemTypes) === 1) { $paramSchema['items'] = ['type' => $nonNullItemTypes[0]]; } } return $paramSchema; } /** * Infers JSON Schema types for a parameter. */ private function inferParameterTypes(array $paramInfo): array { $jsonTypes = $this->mapPhpTypeToJsonSchemaType($paramInfo['type_string']); if ($paramInfo['allows_null'] && strtolower($paramInfo['type_string']) !== 'mixed' && !in_array('null', $jsonTypes)) { $jsonTypes[] = 'null'; } if (count($jsonTypes) > 1) { // Sort but ensure null comes first for consistency $nullIndex = array_search('null', $jsonTypes); if ($nullIndex !== false) { unset($jsonTypes[$nullIndex]); sort($jsonTypes); array_unshift($jsonTypes, 'null'); } else { sort($jsonTypes); } } return $jsonTypes; } /** * Applies enum constraints to parameter schema. */ private function applyEnumConstraints(array $paramSchema, array $paramInfo): array { $reflectionType = $paramInfo['reflection_type_object']; if (!($reflectionType instanceof ReflectionNamedType) || $reflectionType->isBuiltin() || !enum_exists($reflectionType->getName())) { return $paramSchema; } $enumClass = $reflectionType->getName(); $enumReflection = new ReflectionEnum($enumClass); $backingTypeReflection = $enumReflection->getBackingType(); if ($enumReflection->isBacked() && $backingTypeReflection instanceof ReflectionNamedType) { $paramSchema['enum'] = array_column($enumClass::cases(), 'value'); $jsonBackingType = match ($backingTypeReflection->getName()) { 'int' => 'integer', 'string' => 'string', default => null, }; if ($jsonBackingType) { if (isset($paramSchema['type']) && is_array($paramSchema['type']) && in_array('null', $paramSchema['type'])) { $paramSchema['type'] = ['null', $jsonBackingType]; } else { $paramSchema['type'] = $jsonBackingType; } } } else { // Non-backed enum - use names as enum values $paramSchema['enum'] = array_column($enumClass::cases(), 'name'); if (isset($paramSchema['type']) && is_array($paramSchema['type']) && in_array('null', $paramSchema['type'])) { $paramSchema['type'] = ['null', 'string']; } else { $paramSchema['type'] = 'string'; } } return $paramSchema; } /** * Applies array-specific constraints to parameter schema. */ private function applyArrayConstraints(array $paramSchema, array $paramInfo): array { if (!isset($paramSchema['type'])) { return $paramSchema; } $typeString = $paramInfo['type_string']; $allowsNull = $paramInfo['allows_null']; // Handle object-like arrays using array{} syntax if (preg_match('/^array\s*{/i', $typeString)) { $objectSchema = $this->inferArrayItemsType($typeString); if (is_array($objectSchema) && isset($objectSchema['properties'])) { $paramSchema = array_merge($paramSchema, $objectSchema); $paramSchema['type'] = $allowsNull ? ['object', 'null'] : 'object'; } } // Handle regular arrays elseif (in_array('array', $this->mapPhpTypeToJsonSchemaType($typeString))) { $itemsType = $this->inferArrayItemsType($typeString); if ($itemsType !== 'any') { if (is_string($itemsType)) { $paramSchema['items'] = ['type' => $itemsType]; } else { if (!isset($itemsType['type']) && isset($itemsType['properties'])) { $itemsType = array_merge(['type' => 'object'], $itemsType); } $paramSchema['items'] = $itemsType; } } if ($allowsNull) { $paramSchema['type'] = ['array', 'null']; sort($paramSchema['type']); } else { $paramSchema['type'] = 'array'; } } return $paramSchema; } /** * Parses detailed information about a method's parameters. * * @return array<int, array{ * name: string, * doc_block_tag: Param|null, * reflection_param: ReflectionParameter, * reflection_type_object: ReflectionType|null, * type_string: string, * description: string|null, * required: bool, * allows_null: bool, * default_value: mixed|null, * has_default: bool, * is_variadic: bool, * parameter_schema: array<string, mixed> * }> */ private function parseParametersInfo(\ReflectionMethod|\ReflectionFunction $reflection): array { $docComment = $reflection->getDocComment() ?: null; $docBlock = $this->docBlockParser->parseDocBlock($docComment); $paramTags = $this->docBlockParser->getParamTags($docBlock); $parametersInfo = []; foreach ($reflection->getParameters() as $rp) { $paramName = $rp->getName(); $paramTag = $paramTags['$' . $paramName] ?? null; $reflectionType = $rp->getType(); if ($reflectionType instanceof ReflectionNamedType && $reflectionType?->getName() === Context::class) { continue; } $typeString = $this->getParameterTypeString($rp, $paramTag); $description = $this->docBlockParser->getParamDescription($paramTag); $hasDefault = $rp->isDefaultValueAvailable(); $defaultValue = $hasDefault ? $rp->getDefaultValue() : null; $isVariadic = $rp->isVariadic(); $parameterSchema = $this->extractParameterLevelSchema($rp); if ($defaultValue instanceof \BackedEnum) { $defaultValue = $defaultValue->value; } if ($defaultValue instanceof \UnitEnum) { $defaultValue = $defaultValue->name; } $allowsNull = false; if ($reflectionType && $reflectionType->allowsNull()) { $allowsNull = true; } elseif ($hasDefault && $defaultValue === null) { $allowsNull = true; } elseif (str_contains($typeString, 'null') || strtolower($typeString) === 'mixed') { $allowsNull = true; } $parametersInfo[] = [ 'name' => $paramName, 'doc_block_tag' => $paramTag, 'reflection_param' => $rp, 'reflection_type_object' => $reflectionType, 'type_string' => $typeString, 'description' => $description, 'required' => !$rp->isOptional(), 'allows_null' => $allowsNull, 'default_value' => $defaultValue, 'has_default' => $hasDefault, 'is_variadic' => $isVariadic, 'parameter_schema' => $parameterSchema, ]; } return $parametersInfo; } /** * Determines the type string for a parameter, prioritizing DocBlock. */ private function getParameterTypeString(ReflectionParameter $rp, ?Param $paramTag): string { $docBlockType = $this->docBlockParser->getParamTypeString($paramTag); $isDocBlockTypeGeneric = false; if ($docBlockType !== null) { if (in_array(strtolower($docBlockType), ['mixed', 'unknown', ''])) { $isDocBlockTypeGeneric = true; } } else { $isDocBlockTypeGeneric = true; // No tag or no type in tag implies generic } $reflectionType = $rp->getType(); $reflectionTypeString = null; if ($reflectionType) { $reflectionTypeString = $this->getTypeStringFromReflection($reflectionType, $rp->allowsNull()); } // Prioritize Reflection if DocBlock type is generic AND Reflection provides a more specific type if ($isDocBlockTypeGeneric && $reflectionTypeString !== null && $reflectionTypeString !== 'mixed') { return $reflectionTypeString; } // Otherwise, use the DocBlock type if it was valid and non-generic if ($docBlockType !== null && !$isDocBlockTypeGeneric) { // Consider if DocBlock adds nullability missing from reflection if (stripos($docBlockType, 'null') !== false && $reflectionTypeString && stripos($reflectionTypeString, 'null') === false && !str_ends_with($reflectionTypeString, '|null')) { // If reflection didn't capture null, but docblock did, append |null (if not already mixed) if ($reflectionTypeString !== 'mixed') { return $reflectionTypeString . '|null'; } } return $docBlockType; } // Fallback to Reflection type even if it was generic ('mixed') if ($reflectionTypeString !== null) { return $reflectionTypeString; } // Default to 'mixed' if nothing else found return 'mixed'; } /** * Converts a ReflectionType object into a type string representation. */ private function getTypeStringFromReflection(?ReflectionType $type, bool $nativeAllowsNull): string { if ($type === null) { return 'mixed'; } $types = []; if ($type instanceof ReflectionUnionType) { foreach ($type->getTypes() as $innerType) { $types[] = $this->getTypeStringFromReflection($innerType, $innerType->allowsNull()); } if ($nativeAllowsNull) { $types = array_filter($types, fn($t) => strtolower($t) !== 'null'); } $typeString = implode('|', array_unique(array_filter($types))); } elseif ($type instanceof ReflectionIntersectionType) { foreach ($type->getTypes() as $innerType) { $types[] = $this->getTypeStringFromReflection($innerType, false); } $typeString = implode('&', array_unique(array_filter($types))); } elseif ($type instanceof ReflectionNamedType) { $typeString = $type->getName(); } else { return 'mixed'; } $typeString = match (strtolower($typeString)) { 'bool' => 'boolean', 'int' => 'integer', 'float', 'double' => 'number', 'str' => 'string', default => $typeString, }; $isNullable = $nativeAllowsNull; if ($type instanceof ReflectionNamedType && $type->getName() === 'mixed') { $isNullable = true; } if ($type instanceof ReflectionUnionType && !$nativeAllowsNull) { foreach ($type->getTypes() as $innerType) { if ($innerType instanceof ReflectionNamedType && strtolower($innerType->getName()) === 'null') { $isNullable = true; break; } } } if ($isNullable && $typeString !== 'mixed' && stripos($typeString, 'null') === false) { if (!str_ends_with($typeString, '|null') && !str_ends_with($typeString, '&null')) { $typeString .= '|null'; } } // Remove leading backslash from class names, but handle built-ins like 'int' or unions like 'int|string' if (str_contains($typeString, '\\')) { $parts = preg_split('/([|&])/', $typeString, -1, PREG_SPLIT_DELIM_CAPTURE); $processedParts = array_map(fn($part) => str_starts_with($part, '\\') ? ltrim($part, '\\') : $part, $parts); $typeString = implode('', $processedParts); } return $typeString ?: 'mixed'; } /** * Maps a PHP type string (potentially a union) to an array of JSON Schema type names. */ private function mapPhpTypeToJsonSchemaType(string $phpTypeString): array { $normalizedType = strtolower(trim($phpTypeString)); // PRIORITY 1: Check for array{} syntax which should be treated as object if (preg_match('/^array\s*{/i', $normalizedType)) { return ['object']; } // PRIORITY 2: Check for array syntax first (T[] or generics) if ( str_contains($normalizedType, '[]') || preg_match('/^(array|list|iterable|collection)</i', $normalizedType) ) { return ['array']; } // PRIORITY 3: Handle unions (recursive) if (str_contains($normalizedType, '|')) { $types = explode('|', $normalizedType); $jsonTypes = []; foreach ($types as $type) { $mapped = $this->mapPhpTypeToJsonSchemaType(trim($type)); $jsonTypes = array_merge($jsonTypes, $mapped); } return array_values(array_unique($jsonTypes)); } // PRIORITY 4: Handle simple built-in types return match ($normalizedType) { 'string', 'scalar' => ['string'], '?string' => ['null', 'string'], 'int', 'integer' => ['integer'], '?int', '?integer' => ['null', 'integer'], 'float', 'double', 'number' => ['number'], '?float', '?double', '?number' => ['null', 'number'], 'bool', 'boolean' => ['boolean'], '?bool', '?boolean' => ['null', 'boolean'], 'array' => ['array'], '?array' => ['null', 'array'], 'object', 'stdclass' => ['object'], '?object', '?stdclass' => ['null', 'object'], 'null' => ['null'], 'resource', 'callable' => ['object'], 'mixed' => [], 'void', 'never' => [], default => ['object'], }; } /** * Infers the 'items' schema type for an array based on DocBlock type hints. */ private function inferArrayItemsType(string $phpTypeString): string|array { $normalizedType = trim($phpTypeString); // Case 1: Simple T[] syntax (e.g., string[], int[], bool[], etc.) if (preg_match('/^(\\??)([\w\\\\]+)\\s*\\[\\]$/i', $normalizedType, $matches)) { $itemType = strtolower($matches[2]); return $this->mapSimpleTypeToJsonSchema($itemType); } // Case 2: Generic array<T> syntax (e.g., array<string>, array<int>, etc.) if (preg_match('/^(\\??)array\s*<\s*([\w\\\\|]+)\s*>$/i', $normalizedType, $matches)) { $itemType = strtolower($matches[2]); return $this->mapSimpleTypeToJsonSchema($itemType); } // Case 3: Nested array<array<T>> syntax or T[][] syntax if ( preg_match('/^(\\??)array\s*<\s*array\s*<\s*([\w\\\\|]+)\s*>\s*>$/i', $normalizedType, $matches) || preg_match('/^(\\??)([\w\\\\]+)\s*\[\]\[\]$/i', $normalizedType, $matches) ) { $innerType = $this->mapSimpleTypeToJsonSchema(isset($matches[2]) ? strtolower($matches[2]) : 'any'); // Return a schema for array with items being arrays return [ 'type' => 'array', 'items' => [ 'type' => $innerType ] ]; } // Case 4: Object-like array syntax (e.g., array{name: string, age: int}) if (preg_match('/^(\\??)array\s*\{(.+)\}$/is', $normalizedType, $matches)) { return $this->parseObjectLikeArray($matches[2]); } return 'any'; } /** * Parses object-like array syntax into a JSON Schema object */ private function parseObjectLikeArray(string $propertiesStr): array { $properties = []; $required = []; // Parse properties from the string, handling nested structures $depth = 0; $buffer = ''; for ($i = 0; $i < strlen($propertiesStr); $i++) { $char = $propertiesStr[$i]; // Track nested braces if ($char === '{') { $depth++; $buffer .= $char; } elseif ($char === '}') { $depth--; $buffer .= $char; } // Property separator (comma) elseif ($char === ',' && $depth === 0) { // Process the completed property $this->parsePropertyDefinition(trim($buffer), $properties, $required); $buffer = ''; } else { $buffer .= $char; } } // Process the last property if (!empty($buffer)) { $this->parsePropertyDefinition(trim($buffer), $properties, $required); } if (!empty($properties)) { return [ 'type' => 'object', 'properties' => $properties, 'required' => $required ]; } return ['type' => 'object']; } /** * Parses a single property definition from an object-like array syntax */ private function parsePropertyDefinition(string $propDefinition, array &$properties, array &$required): void { // Match property name and type if (preg_match('/^([a-zA-Z_\x80-\xff][a-zA-Z0-9_\x80-\xff]*)\s*:\s*(.+)$/i', $propDefinition, $matches)) { $propName = $matches[1]; $propType = trim($matches[2]); // Add to required properties $required[] = $propName; // Check for nested array{} syntax if (preg_match('/^array\s*\{(.+)\}$/is', $propType, $nestedMatches)) { $nestedSchema = $this->parseObjectLikeArray($nestedMatches[1]); $properties[$propName] = $nestedSchema; } // Check for array<T> or T[] syntax elseif ( preg_match('/^array\s*<\s*([\w\\\\|]+)\s*>$/i', $propType, $arrayMatches) || preg_match('/^([\w\\\\]+)\s*\[\]$/i', $propType, $arrayMatches) ) { $itemType = $arrayMatches[1] ?? 'any'; $properties[$propName] = [ 'type' => 'array', 'items' => [ 'type' => $this->mapSimpleTypeToJsonSchema($itemType) ] ]; } // Simple type else { $properties[$propName] = ['type' => $this->mapSimpleTypeToJsonSchema($propType)]; } } } /** * Helper method to map basic PHP types to JSON Schema types */ private function mapSimpleTypeToJsonSchema(string $type): string { return match (strtolower($type)) { 'string' => 'string', 'int', 'integer' => 'integer', 'bool', 'boolean' => 'boolean', 'float', 'double', 'number' => 'number', 'array' => 'array', 'object', 'stdclass' => 'object', default => in_array(strtolower($type), ['datetime', 'datetimeinterface']) ? 'string' : 'object', }; } } ``` -------------------------------------------------------------------------------- /tests/Unit/DispatcherTest.php: -------------------------------------------------------------------------------- ```php <?php namespace PhpMcp\Server\Tests\Unit; use Mockery; use Mockery\MockInterface; use PhpMcp\Schema\ClientCapabilities; use PhpMcp\Server\Context; use PhpMcp\Server\Configuration; use PhpMcp\Server\Contracts\CompletionProviderInterface; use PhpMcp\Server\Contracts\SessionInterface; use PhpMcp\Server\Dispatcher; use PhpMcp\Server\Elements\RegisteredPrompt; use PhpMcp\Server\Elements\RegisteredResource; use PhpMcp\Server\Elements\RegisteredResourceTemplate; use PhpMcp\Server\Elements\RegisteredTool; use PhpMcp\Server\Exception\McpServerException; use PhpMcp\Schema\Implementation; use PhpMcp\Schema\JsonRpc\Notification as JsonRpcNotification; use PhpMcp\Schema\JsonRpc\Request as JsonRpcRequest; use PhpMcp\Schema\Prompt as PromptSchema; use PhpMcp\Schema\PromptArgument; use PhpMcp\Schema\Request\CallToolRequest; use PhpMcp\Schema\Request\CompletionCompleteRequest; use PhpMcp\Schema\Request\GetPromptRequest; use PhpMcp\Schema\Request\InitializeRequest; use PhpMcp\Schema\Request\ListToolsRequest; use PhpMcp\Schema\Request\ReadResourceRequest; use PhpMcp\Schema\Request\ResourceSubscribeRequest; use PhpMcp\Schema\Request\SetLogLevelRequest; use PhpMcp\Schema\Resource as ResourceSchema; use PhpMcp\Schema\ResourceTemplate as ResourceTemplateSchema; use PhpMcp\Schema\Result\CallToolResult; use PhpMcp\Schema\Result\CompletionCompleteResult; use PhpMcp\Schema\Result\EmptyResult; use PhpMcp\Schema\Result\GetPromptResult; use PhpMcp\Schema\Result\InitializeResult; use PhpMcp\Schema\Result\ReadResourceResult; use PhpMcp\Schema\ServerCapabilities; use PhpMcp\Schema\Tool as ToolSchema; use PhpMcp\Server\Registry; use PhpMcp\Server\Session\SubscriptionManager; use PhpMcp\Server\Utils\SchemaValidator; use Psr\Container\ContainerInterface; use Psr\Log\NullLogger; use PhpMcp\Schema\Content\TextContent; use PhpMcp\Schema\Content\PromptMessage; use PhpMcp\Schema\Enum\LoggingLevel; use PhpMcp\Schema\Enum\Role; use PhpMcp\Schema\PromptReference; use PhpMcp\Schema\Request\ListPromptsRequest; use PhpMcp\Schema\Request\ListResourcesRequest; use PhpMcp\Schema\Request\ListResourceTemplatesRequest; use PhpMcp\Schema\ResourceReference; use PhpMcp\Server\Protocol; use PhpMcp\Server\Tests\Fixtures\Enums\StatusEnum; use React\EventLoop\Loop; const DISPATCHER_SESSION_ID = 'dispatcher-session-xyz'; const DISPATCHER_PAGINATION_LIMIT = 3; beforeEach(function () { /** @var MockInterface&Configuration $configuration */ $this->configuration = Mockery::mock(Configuration::class); /** @var MockInterface&Registry $registry */ $this->registry = Mockery::mock(Registry::class); /** @var MockInterface&SubscriptionManager $subscriptionManager */ $this->subscriptionManager = Mockery::mock(SubscriptionManager::class); /** @var MockInterface&SchemaValidator $schemaValidator */ $this->schemaValidator = Mockery::mock(SchemaValidator::class); /** @var MockInterface&SessionInterface $session */ $this->session = Mockery::mock(SessionInterface::class); /** @var MockInterface&ContainerInterface $container */ $this->container = Mockery::mock(ContainerInterface::class); $this->context = new Context($this->session); $configuration = new Configuration( serverInfo: Implementation::make('DispatcherTestServer', '1.0'), capabilities: ServerCapabilities::make(), paginationLimit: DISPATCHER_PAGINATION_LIMIT, logger: new NullLogger(), loop: Loop::get(), cache: null, container: $this->container ); $this->dispatcher = new Dispatcher( $configuration, $this->registry, $this->subscriptionManager, $this->schemaValidator ); }); it('routes to handleInitialize for initialize request', function () { $request = new JsonRpcRequest( jsonrpc: '2.0', id: 1, method: 'initialize', params: [ 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'client', 'version' => '1.0'], 'capabilities' => [], ] ); $this->session->shouldReceive('set')->with('client_info', Mockery::on(fn($value) => $value['name'] === 'client' && $value['version'] === '1.0'))->once(); $this->session->shouldReceive('set')->with('protocol_version', Protocol::LATEST_PROTOCOL_VERSION)->once(); $result = $this->dispatcher->handleRequest($request, $this->context); expect($result)->toBeInstanceOf(InitializeResult::class); expect($result->protocolVersion)->toBe(Protocol::LATEST_PROTOCOL_VERSION); expect($result->serverInfo->name)->toBe('DispatcherTestServer'); }); it('routes to handlePing for ping request', function () { $request = new JsonRpcRequest('2.0', 'id1', 'ping', []); $result = $this->dispatcher->handleRequest($request, $this->context); expect($result)->toBeInstanceOf(EmptyResult::class); }); it('throws MethodNotFound for unknown request method', function () { $rawRequest = new JsonRpcRequest('2.0', 'id1', 'unknown/method', []); $this->dispatcher->handleRequest($rawRequest, $this->context); })->throws(McpServerException::class, "Method 'unknown/method' not found."); it('routes to handleNotificationInitialized for initialized notification', function () { $notification = new JsonRpcNotification('2.0', 'notifications/initialized', []); $this->session->shouldReceive('set')->with('initialized', true)->once(); $this->dispatcher->handleNotification($notification, $this->session); }); it('does nothing for unknown notification method', function () { $rawNotification = new JsonRpcNotification('2.0', 'unknown/notification', []); $this->session->shouldNotReceive('set'); $this->dispatcher->handleNotification($rawNotification, $this->session); }); it('can handle initialize request', function () { $clientInfo = Implementation::make('TestClient', '0.9.9'); $request = InitializeRequest::make(1, Protocol::LATEST_PROTOCOL_VERSION, ClientCapabilities::make(), $clientInfo, []); $this->session->shouldReceive('set')->with('client_info', $clientInfo->toArray())->once(); $this->session->shouldReceive('set')->with('protocol_version', Protocol::LATEST_PROTOCOL_VERSION)->once(); $result = $this->dispatcher->handleInitialize($request, $this->session); expect($result->protocolVersion)->toBe(Protocol::LATEST_PROTOCOL_VERSION); expect($result->serverInfo->name)->toBe('DispatcherTestServer'); expect($result->capabilities)->toBeInstanceOf(ServerCapabilities::class); }); it('can handle initialize request with older supported protocol version', function () { $clientInfo = Implementation::make('TestClient', '0.9.9'); $clientRequestedVersion = '2024-11-05'; $request = InitializeRequest::make(1, $clientRequestedVersion, ClientCapabilities::make(), $clientInfo, []); $this->session->shouldReceive('set')->with('client_info', $clientInfo->toArray())->once(); $this->session->shouldReceive('set')->with('protocol_version', $clientRequestedVersion)->once(); $result = $this->dispatcher->handleInitialize($request, $this->session); expect($result->protocolVersion)->toBe($clientRequestedVersion); expect($result->serverInfo->name)->toBe('DispatcherTestServer'); expect($result->capabilities)->toBeInstanceOf(ServerCapabilities::class); }); it('can handle initialize request with unsupported protocol version', function () { $clientInfo = Implementation::make('TestClient', '0.9.9'); $unsupportedVersion = '1999-01-01'; $request = InitializeRequest::make(1, $unsupportedVersion, ClientCapabilities::make(), $clientInfo, []); $this->session->shouldReceive('set')->with('client_info', $clientInfo->toArray())->once(); $this->session->shouldReceive('set')->with('protocol_version', Protocol::LATEST_PROTOCOL_VERSION)->once(); $result = $this->dispatcher->handleInitialize($request, $this->session); expect($result->protocolVersion)->toBe(Protocol::LATEST_PROTOCOL_VERSION); expect($result->serverInfo->name)->toBe('DispatcherTestServer'); expect($result->capabilities)->toBeInstanceOf(ServerCapabilities::class); }); it('can handle tool list request and return paginated tools', function () { $toolSchemas = [ ToolSchema::make('tool1', ['type' => 'object', 'properties' => []]), ToolSchema::make('tool2', ['type' => 'object', 'properties' => []]), ToolSchema::make('tool3', ['type' => 'object', 'properties' => []]), ToolSchema::make('tool4', ['type' => 'object', 'properties' => []]), ]; $this->registry->shouldReceive('getTools')->andReturn($toolSchemas); $request = ListToolsRequest::make(1); $result = $this->dispatcher->handleToolList($request); expect($result->tools)->toHaveCount(DISPATCHER_PAGINATION_LIMIT); expect($result->tools[0]->name)->toBe('tool1'); expect($result->nextCursor)->toBeString(); $nextCursor = $result->nextCursor; $requestPage2 = ListToolsRequest::make(2, $nextCursor); $resultPage2 = $this->dispatcher->handleToolList($requestPage2); expect($resultPage2->tools)->toHaveCount(count($toolSchemas) - DISPATCHER_PAGINATION_LIMIT); expect($resultPage2->tools[0]->name)->toBe('tool4'); expect($resultPage2->nextCursor)->toBeNull(); }); it('can handle tool call request and return result', function () { $toolName = 'my-calculator'; $args = ['a' => 10, 'b' => 5]; $toolSchema = ToolSchema::make($toolName, ['type' => 'object', 'properties' => ['a' => ['type' => 'integer'], 'b' => ['type' => 'integer']]]); $registeredToolMock = Mockery::mock(RegisteredTool::class, [$toolSchema, 'MyToolHandler', 'handleTool', false]); $this->registry->shouldReceive('getTool')->with($toolName)->andReturn($registeredToolMock); $this->schemaValidator->shouldReceive('validateAgainstJsonSchema')->with($args, $toolSchema->inputSchema)->andReturn([]); // No validation errors $registeredToolMock->shouldReceive('call')->with($this->container, $args, $this->context)->andReturn([TextContent::make("Result: 15")]); $request = CallToolRequest::make(1, $toolName, $args); $result = $this->dispatcher->handleToolCall($request, $this->context); expect($result)->toBeInstanceOf(CallToolResult::class); expect($result->content[0]->text)->toBe("Result: 15"); expect($result->isError)->toBeFalse(); }); it('can handle tool call request and throw exception if tool not found', function () { $this->registry->shouldReceive('getTool')->with('unknown-tool')->andReturn(null); $request = CallToolRequest::make(1, 'unknown-tool', []); $this->dispatcher->handleToolCall($request, $this->context); })->throws(McpServerException::class, "Tool 'unknown-tool' not found."); it('can handle tool call request and throw exception if argument validation fails', function () { $toolName = 'strict-tool'; $args = ['param' => 'wrong_type']; $toolSchema = ToolSchema::make($toolName, ['type' => 'object', 'properties' => ['param' => ['type' => 'integer']]]); $registeredToolMock = Mockery::mock(RegisteredTool::class, [$toolSchema, 'MyToolHandler', 'handleTool', false]); $this->registry->shouldReceive('getTool')->with($toolName)->andReturn($registeredToolMock); $validationErrors = [['pointer' => '/param', 'keyword' => 'type', 'message' => 'Expected integer']]; $this->schemaValidator->shouldReceive('validateAgainstJsonSchema')->with($args, $toolSchema->inputSchema)->andReturn($validationErrors); $request = CallToolRequest::make(1, $toolName, $args); try { $this->dispatcher->handleToolCall($request, $this->context); } catch (McpServerException $e) { expect($e->getMessage())->toContain("Invalid parameters for tool 'strict-tool'"); expect($e->getData()['validation_errors'])->toBeArray(); } }); it('can handle tool call request and return error if tool execution throws exception', function () { $toolName = 'failing-tool'; $toolSchema = ToolSchema::make($toolName, ['type' => 'object', 'properties' => []]); $registeredToolMock = Mockery::mock(RegisteredTool::class, [$toolSchema, 'MyToolHandler', 'handleTool', false]); $this->registry->shouldReceive('getTool')->with($toolName)->andReturn($registeredToolMock); $this->schemaValidator->shouldReceive('validateAgainstJsonSchema')->andReturn([]); $registeredToolMock->shouldReceive('call')->andThrow(new \RuntimeException("Tool crashed!")); $request = CallToolRequest::make(1, $toolName, []); $result = $this->dispatcher->handleToolCall($request, $this->context); expect($result->isError)->toBeTrue(); expect($result->content[0]->text)->toBe("Tool execution failed: Tool crashed!"); }); it('can handle tool call request and return error if result formatting fails', function () { $toolName = 'bad-result-tool'; $toolSchema = ToolSchema::make($toolName, ['type' => 'object', 'properties' => []]); $registeredToolMock = Mockery::mock(RegisteredTool::class, [$toolSchema, 'MyToolHandler', 'handleTool', false]); $this->registry->shouldReceive('getTool')->with($toolName)->andReturn($registeredToolMock); $this->schemaValidator->shouldReceive('validateAgainstJsonSchema')->andReturn([]); $registeredToolMock->shouldReceive('call')->andThrow(new \JsonException("Unencodable.")); $request = CallToolRequest::make(1, $toolName, []); $result = $this->dispatcher->handleToolCall($request, $this->context); expect($result->isError)->toBeTrue(); expect($result->content[0]->text)->toBe("Failed to serialize tool result: Unencodable."); }); it('can handle resources list request and return paginated resources', function () { $resourceSchemas = [ ResourceSchema::make('res://1', 'Resource1'), ResourceSchema::make('res://2', 'Resource2'), ResourceSchema::make('res://3', 'Resource3'), ResourceSchema::make('res://4', 'Resource4'), ResourceSchema::make('res://5', 'Resource5') ]; $this->registry->shouldReceive('getResources')->andReturn($resourceSchemas); $requestP1 = ListResourcesRequest::make(1); $resultP1 = $this->dispatcher->handleResourcesList($requestP1); expect($resultP1->resources)->toHaveCount(DISPATCHER_PAGINATION_LIMIT); expect(array_map(fn($r) => $r->name, $resultP1->resources))->toEqual(['Resource1', 'Resource2', 'Resource3']); expect($resultP1->nextCursor)->toBe(base64_encode('offset=3')); // Page 2 $requestP2 = ListResourcesRequest::make(2, $resultP1->nextCursor); $resultP2 = $this->dispatcher->handleResourcesList($requestP2); expect($resultP2->resources)->toHaveCount(2); expect(array_map(fn($r) => $r->name, $resultP2->resources))->toEqual(['Resource4', 'Resource5']); expect($resultP2->nextCursor)->toBeNull(); }); it('can handle resources list request and return empty if registry has no resources', function () { $this->registry->shouldReceive('getResources')->andReturn([]); $request = ListResourcesRequest::make(1); $result = $this->dispatcher->handleResourcesList($request); expect($result->resources)->toBeEmpty(); expect($result->nextCursor)->toBeNull(); }); it('can handle resource template list request and return paginated templates', function () { $templateSchemas = [ ResourceTemplateSchema::make('tpl://{id}/1', 'Template1'), ResourceTemplateSchema::make('tpl://{id}/2', 'Template2'), ResourceTemplateSchema::make('tpl://{id}/3', 'Template3'), ResourceTemplateSchema::make('tpl://{id}/4', 'Template4'), ]; $this->registry->shouldReceive('getResourceTemplates')->andReturn($templateSchemas); // Page 1 $requestP1 = ListResourceTemplatesRequest::make(1); $resultP1 = $this->dispatcher->handleResourceTemplateList($requestP1); expect($resultP1->resourceTemplates)->toHaveCount(DISPATCHER_PAGINATION_LIMIT); expect(array_map(fn($rt) => $rt->name, $resultP1->resourceTemplates))->toEqual(['Template1', 'Template2', 'Template3']); expect($resultP1->nextCursor)->toBe(base64_encode('offset=3')); // Page 2 $requestP2 = ListResourceTemplatesRequest::make(2, $resultP1->nextCursor); $resultP2 = $this->dispatcher->handleResourceTemplateList($requestP2); expect($resultP2->resourceTemplates)->toHaveCount(1); expect(array_map(fn($rt) => $rt->name, $resultP2->resourceTemplates))->toEqual(['Template4']); expect($resultP2->nextCursor)->toBeNull(); }); it('can handle resource read request and return resource contents', function () { $uri = 'file://data.txt'; $resourceSchema = ResourceSchema::make($uri, 'file_resource'); $registeredResourceMock = Mockery::mock(RegisteredResource::class, [$resourceSchema, ['MyResourceHandler', 'read'], false]); $resourceContents = [TextContent::make('File content')]; $this->registry->shouldReceive('getResource')->with($uri)->andReturn($registeredResourceMock); $registeredResourceMock->shouldReceive('read')->with($this->container, $uri, $this->context)->andReturn($resourceContents); $request = ReadResourceRequest::make(1, $uri); $result = $this->dispatcher->handleResourceRead($request, $this->context); expect($result)->toBeInstanceOf(ReadResourceResult::class); expect($result->contents)->toEqual($resourceContents); }); it('can handle resource read request and throw exception if resource not found', function () { $this->registry->shouldReceive('getResource')->with('unknown://uri')->andReturn(null); $request = ReadResourceRequest::make(1, 'unknown://uri'); $this->dispatcher->handleResourceRead($request, $this->context); })->throws(McpServerException::class, "Resource URI 'unknown://uri' not found."); it('can handle resource subscribe request and call subscription manager', function () { $uri = 'news://updates'; $this->session->shouldReceive('getId')->andReturn(DISPATCHER_SESSION_ID); $this->subscriptionManager->shouldReceive('subscribe')->with(DISPATCHER_SESSION_ID, $uri)->once(); $request = ResourceSubscribeRequest::make(1, $uri); $result = $this->dispatcher->handleResourceSubscribe($request, $this->session); expect($result)->toBeInstanceOf(EmptyResult::class); }); it('can handle prompts list request and return paginated prompts', function () { $promptSchemas = [ PromptSchema::make('promptA', '', []), PromptSchema::make('promptB', '', []), PromptSchema::make('promptC', '', []), PromptSchema::make('promptD', '', []), PromptSchema::make('promptE', '', []), PromptSchema::make('promptF', '', []), ]; // 6 prompts $this->registry->shouldReceive('getPrompts')->andReturn($promptSchemas); // Page 1 $requestP1 = ListPromptsRequest::make(1); $resultP1 = $this->dispatcher->handlePromptsList($requestP1); expect($resultP1->prompts)->toHaveCount(DISPATCHER_PAGINATION_LIMIT); expect(array_map(fn($p) => $p->name, $resultP1->prompts))->toEqual(['promptA', 'promptB', 'promptC']); expect($resultP1->nextCursor)->toBe(base64_encode('offset=3')); // Page 2 $requestP2 = ListPromptsRequest::make(2, $resultP1->nextCursor); $resultP2 = $this->dispatcher->handlePromptsList($requestP2); expect($resultP2->prompts)->toHaveCount(DISPATCHER_PAGINATION_LIMIT); // 3 more expect(array_map(fn($p) => $p->name, $resultP2->prompts))->toEqual(['promptD', 'promptE', 'promptF']); expect($resultP2->nextCursor)->toBeNull(); // End of list }); it('can handle prompt get request and return prompt messages', function () { $promptName = 'daily-summary'; $args = ['date' => '2024-07-16']; $promptSchema = PromptSchema::make($promptName, 'summary_prompt', [PromptArgument::make('date', required: true)]); $registeredPromptMock = Mockery::mock(RegisteredPrompt::class, [$promptSchema, ['MyPromptHandler', 'get'], false]); $promptMessages = [PromptMessage::make(Role::User, TextContent::make("Summary for 2024-07-16"))]; $this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPromptMock); $registeredPromptMock->shouldReceive('get')->with($this->container, $args, $this->context)->andReturn($promptMessages); $request = GetPromptRequest::make(1, $promptName, $args); $result = $this->dispatcher->handlePromptGet($request, $this->context); expect($result)->toBeInstanceOf(GetPromptResult::class); expect($result->messages)->toEqual($promptMessages); expect($result->description)->toBe($promptSchema->description); }); it('can handle prompt get request and throw exception if required argument is missing', function () { $promptName = 'needs-topic'; $promptSchema = PromptSchema::make($promptName, '', [PromptArgument::make('topic', required: true)]); $registeredPromptMock = Mockery::mock(RegisteredPrompt::class, [$promptSchema, ['MyPromptHandler', 'get'], false]); $this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPromptMock); $request = GetPromptRequest::make(1, $promptName, ['other_arg' => 'value']); // 'topic' is missing $this->dispatcher->handlePromptGet($request, $this->context); })->throws(McpServerException::class, "Missing required argument 'topic' for prompt 'needs-topic'."); it('can handle logging set level request and set log level on session', function () { $level = LoggingLevel::Debug; $this->session->shouldReceive('getId')->andReturn(DISPATCHER_SESSION_ID); $this->session->shouldReceive('set')->with('log_level', 'debug')->once(); $request = SetLogLevelRequest::make(1, $level); $result = $this->dispatcher->handleLoggingSetLevel($request, $this->session); expect($result)->toBeInstanceOf(EmptyResult::class); }); it('can handle completion complete request for prompt and delegate to provider', function () { $promptName = 'my-completable-prompt'; $argName = 'tagName'; $currentValue = 'php'; $completions = ['php-mcp', 'php-fig']; $mockCompletionProvider = Mockery::mock(CompletionProviderInterface::class); $providerClass = get_class($mockCompletionProvider); $promptSchema = PromptSchema::make($promptName, '', [PromptArgument::make($argName)]); $registeredPrompt = new RegisteredPrompt( schema: $promptSchema, handler: ['MyPromptHandler', 'get'], isManual: false, completionProviders: [$argName => $providerClass] ); $this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPrompt); $this->container->shouldReceive('get')->with($providerClass)->andReturn($mockCompletionProvider); $mockCompletionProvider->shouldReceive('getCompletions')->with($currentValue, $this->session)->andReturn($completions); $request = CompletionCompleteRequest::make(1, PromptReference::make($promptName), ['name' => $argName, 'value' => $currentValue]); $result = $this->dispatcher->handleCompletionComplete($request, $this->session); expect($result)->toBeInstanceOf(CompletionCompleteResult::class); expect($result->values)->toEqual($completions); expect($result->total)->toBe(count($completions)); expect($result->hasMore)->toBeFalse(); }); it('can handle completion complete request for resource template and delegate to provider', function () { $templateUri = 'item://{itemId}/category/{catName}'; $uriVarName = 'catName'; $currentValue = 'boo'; $completions = ['books', 'boomerangs']; $mockCompletionProvider = Mockery::mock(CompletionProviderInterface::class); $providerClass = get_class($mockCompletionProvider); $templateSchema = ResourceTemplateSchema::make($templateUri, 'item-template'); $registeredTemplate = new RegisteredResourceTemplate( schema: $templateSchema, handler: ['MyResourceTemplateHandler', 'get'], isManual: false, completionProviders: [$uriVarName => $providerClass] ); $this->registry->shouldReceive('getResourceTemplate')->with($templateUri)->andReturn($registeredTemplate); $this->container->shouldReceive('get')->with($providerClass)->andReturn($mockCompletionProvider); $mockCompletionProvider->shouldReceive('getCompletions')->with($currentValue, $this->session)->andReturn($completions); $request = CompletionCompleteRequest::make(1, ResourceReference::make($templateUri), ['name' => $uriVarName, 'value' => $currentValue]); $result = $this->dispatcher->handleCompletionComplete($request, $this->session); expect($result->values)->toEqual($completions); }); it('can handle completion complete request and return empty if no provider', function () { $promptName = 'no-provider-prompt'; $promptSchema = PromptSchema::make($promptName, '', [PromptArgument::make('arg')]); $registeredPrompt = new RegisteredPrompt( schema: $promptSchema, handler: ['MyPromptHandler', 'get'], isManual: false, completionProviders: [] ); $this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPrompt); $request = CompletionCompleteRequest::make(1, PromptReference::make($promptName), ['name' => 'arg', 'value' => '']); $result = $this->dispatcher->handleCompletionComplete($request, $this->session); expect($result->values)->toBeEmpty(); }); it('can handle completion complete request with ListCompletionProvider instance', function () { $promptName = 'list-completion-prompt'; $argName = 'category'; $currentValue = 'bl'; $expectedCompletions = ['blog']; $listProvider = new \PhpMcp\Server\Defaults\ListCompletionProvider(['blog', 'news', 'docs', 'api']); $promptSchema = PromptSchema::make($promptName, '', [PromptArgument::make($argName)]); $registeredPrompt = new RegisteredPrompt( schema: $promptSchema, handler: ['MyPromptHandler', 'get'], isManual: false, completionProviders: [$argName => $listProvider] ); $this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPrompt); $request = CompletionCompleteRequest::make(1, PromptReference::make($promptName), ['name' => $argName, 'value' => $currentValue]); $result = $this->dispatcher->handleCompletionComplete($request, $this->session); expect($result->values)->toEqual($expectedCompletions); expect($result->total)->toBe(1); expect($result->hasMore)->toBeFalse(); }); it('can handle completion complete request with EnumCompletionProvider instance', function () { $promptName = 'enum-completion-prompt'; $argName = 'status'; $currentValue = 'a'; $expectedCompletions = ['archived']; $enumProvider = new \PhpMcp\Server\Defaults\EnumCompletionProvider(StatusEnum::class); $promptSchema = PromptSchema::make($promptName, '', [PromptArgument::make($argName)]); $registeredPrompt = new RegisteredPrompt( schema: $promptSchema, handler: ['MyPromptHandler', 'get'], isManual: false, completionProviders: [$argName => $enumProvider] ); $this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPrompt); $request = CompletionCompleteRequest::make(1, PromptReference::make($promptName), ['name' => $argName, 'value' => $currentValue]); $result = $this->dispatcher->handleCompletionComplete($request, $this->session); expect($result->values)->toEqual($expectedCompletions); expect($result->total)->toBe(1); expect($result->hasMore)->toBeFalse(); }); it('decodeCursor handles null and invalid cursors', function () { $method = new \ReflectionMethod(Dispatcher::class, 'decodeCursor'); $method->setAccessible(true); expect($method->invoke($this->dispatcher, null))->toBe(0); expect($method->invoke($this->dispatcher, 'not_base64_$$$'))->toBe(0); expect($method->invoke($this->dispatcher, base64_encode('invalid_format')))->toBe(0); expect($method->invoke($this->dispatcher, base64_encode('offset=123')))->toBe(123); }); it('encodeNextCursor generates correct cursor or null', function () { $method = new \ReflectionMethod(Dispatcher::class, 'encodeNextCursor'); $method->setAccessible(true); $limit = DISPATCHER_PAGINATION_LIMIT; expect($method->invoke($this->dispatcher, 0, $limit, 10, $limit))->toBe(base64_encode('offset=3')); expect($method->invoke($this->dispatcher, 0, $limit, $limit, $limit))->toBeNull(); expect($method->invoke($this->dispatcher, $limit, 2, $limit + 2 + 1, $limit))->toBe(base64_encode('offset=' . ($limit + 2))); expect($method->invoke($this->dispatcher, $limit, 1, $limit + 1, $limit))->toBeNull(); expect($method->invoke($this->dispatcher, 0, 0, 10, $limit))->toBeNull(); }); ``` -------------------------------------------------------------------------------- /tests/Integration/StreamableHttpServerTransportTest.php: -------------------------------------------------------------------------------- ```php <?php use PhpMcp\Server\Protocol; use PhpMcp\Server\Tests\Mocks\Clients\MockJsonHttpClient; use PhpMcp\Server\Tests\Mocks\Clients\MockStreamHttpClient; use React\ChildProcess\Process; use React\Http\Browser; use React\Http\Message\ResponseException; use React\Stream\ReadableStreamInterface; use function React\Async\await; use function React\Promise\resolve; const STREAMABLE_HTTP_SCRIPT_PATH = __DIR__ . '/../Fixtures/ServerScripts/StreamableHttpTestServer.php'; const STREAMABLE_HTTP_PROCESS_TIMEOUT = 9; const STREAMABLE_HTTP_HOST = '127.0.0.1'; const STREAMABLE_MCP_PATH = 'mcp_streamable_json_mode'; beforeEach(function () { if (!is_file(STREAMABLE_HTTP_SCRIPT_PATH)) { $this->markTestSkipped("Server script not found: " . STREAMABLE_HTTP_SCRIPT_PATH); } if (!is_executable(STREAMABLE_HTTP_SCRIPT_PATH)) { chmod(STREAMABLE_HTTP_SCRIPT_PATH, 0755); } $phpPath = PHP_BINARY ?: 'php'; $commandPhpPath = str_contains($phpPath, ' ') ? '"' . $phpPath . '"' : $phpPath; $commandScriptPath = escapeshellarg(STREAMABLE_HTTP_SCRIPT_PATH); $this->port = findFreePort(); $jsonModeCommandArgs = [ escapeshellarg(STREAMABLE_HTTP_HOST), escapeshellarg((string)$this->port), escapeshellarg(STREAMABLE_MCP_PATH), escapeshellarg('true'), // enableJsonResponse = true ]; $this->jsonModeCommand = $commandPhpPath . ' ' . $commandScriptPath . ' ' . implode(' ', $jsonModeCommandArgs); $streamModeCommandArgs = [ escapeshellarg(STREAMABLE_HTTP_HOST), escapeshellarg((string)$this->port), escapeshellarg(STREAMABLE_MCP_PATH), escapeshellarg('false'), // enableJsonResponse = false ]; $this->streamModeCommand = $commandPhpPath . ' ' . $commandScriptPath . ' ' . implode(' ', $streamModeCommandArgs); $statelessModeCommandArgs = [ escapeshellarg(STREAMABLE_HTTP_HOST), escapeshellarg((string)$this->port), escapeshellarg(STREAMABLE_MCP_PATH), escapeshellarg('true'), // enableJsonResponse = true escapeshellarg('false'), // useEventStore = false escapeshellarg('true'), // stateless = true ]; $this->statelessModeCommand = $commandPhpPath . ' ' . $commandScriptPath . ' ' . implode(' ', $statelessModeCommandArgs); $this->process = null; }); afterEach(function () { if ($this->process instanceof Process && $this->process->isRunning()) { if ($this->process->stdout instanceof ReadableStreamInterface) { $this->process->stdout->close(); } if ($this->process->stderr instanceof ReadableStreamInterface) { $this->process->stderr->close(); } $this->process->terminate(SIGTERM); try { await(delay(0.02)); } catch (\Throwable $e) { } if ($this->process->isRunning()) { $this->process->terminate(SIGKILL); } } $this->process = null; }); describe('JSON MODE', function () { beforeEach(function () { $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []); $this->process->start(); $this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH); await(delay(0.2)); }); it('server starts, initializes via POST JSON, calls a tool, and closes', function () { // 1. Initialize $initResult = await($this->jsonClient->sendRequest('initialize', [ 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => [] ], 'init-json-1')); expect($initResult['statusCode'])->toBe(200); expect($initResult['body']['id'])->toBe('init-json-1'); expect($initResult['body'])->not->toHaveKey('error'); expect($initResult['body']['result']['protocolVersion'])->toBe(Protocol::LATEST_PROTOCOL_VERSION); expect($initResult['body']['result']['serverInfo']['name'])->toBe('StreamableHttpIntegrationServer'); expect($this->jsonClient->sessionId)->toBeString()->not->toBeEmpty(); // 2. Initialized notification $notifResult = await($this->jsonClient->sendNotification('notifications/initialized')); expect($notifResult['statusCode'])->toBe(202); // 3. Call a tool $toolResult = await($this->jsonClient->sendRequest('tools/call', [ 'name' => 'greet_streamable_tool', 'arguments' => ['name' => 'JSON Mode User'] ], 'tool-json-1')); expect($toolResult['statusCode'])->toBe(200); expect($toolResult['body']['id'])->toBe('tool-json-1'); expect($toolResult['body'])->not->toHaveKey('error'); expect($toolResult['body']['result']['content'][0]['text'])->toBe('Hello, JSON Mode User!'); // Server process is terminated in afterEach })->group('integration', 'streamable_http_json'); it('return HTTP 400 error response for invalid JSON in POST request', function () { $malformedJson = '{"jsonrpc":"2.0", "id": "bad-json-post-1", "method": "tools/list", "params": {"broken"}'; $promise = $this->jsonClient->browser->post( $this->jsonClient->baseUrl, ['Content-Type' => 'application/json', 'Accept' => 'application/json'], $malformedJson ); try { await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2)); } catch (ResponseException $e) { expect($e->getResponse()->getStatusCode())->toBe(400); $bodyContent = (string) $e->getResponse()->getBody(); $decodedBody = json_decode($bodyContent, true); expect($decodedBody['jsonrpc'])->toBe('2.0'); expect($decodedBody['id'])->toBe(''); expect($decodedBody['error']['code'])->toBe(-32700); expect($decodedBody['error']['message'])->toContain('Invalid JSON'); } })->group('integration', 'streamable_http_json'); it('returns JSON-RPC error result for request for non-existent method', function () { // 1. Initialize await($this->jsonClient->sendRequest('initialize', ['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => []], 'init-json-err')); await($this->jsonClient->sendNotification('notifications/initialized')); // 2. Request non-existent method $errorResult = await($this->jsonClient->sendRequest('non/existentToolViaJson', [], 'err-meth-json-1')); expect($errorResult['statusCode'])->toBe(200); expect($errorResult['body']['id'])->toBe('err-meth-json-1'); expect($errorResult['body']['error']['code'])->toBe(-32601); expect($errorResult['body']['error']['message'])->toContain("Method 'non/existentToolViaJson' not found"); })->group('integration', 'streamable_http_json'); it('can handle batch requests correctly', function () { // 1. Initialize await($this->jsonClient->sendRequest('initialize', [ 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => [] ], 'init-json-batch')); expect($this->jsonClient->sessionId)->toBeString()->not->toBeEmpty(); await($this->jsonClient->sendNotification('notifications/initialized')); // 2. Send Batch Request $batchRequests = [ ['jsonrpc' => '2.0', 'id' => 'batch-req-1', 'method' => 'tools/call', 'params' => ['name' => 'greet_streamable_tool', 'arguments' => ['name' => 'Batch Item 1']]], ['jsonrpc' => '2.0', 'method' => 'notifications/something'], ['jsonrpc' => '2.0', 'id' => 'batch-req-2', 'method' => 'tools/call', 'params' => ['name' => 'sum_streamable_tool', 'arguments' => ['a' => 10, 'b' => 20]]], ['jsonrpc' => '2.0', 'id' => 'batch-req-3', 'method' => 'nonexistent/method'] ]; $batchResponse = await($this->jsonClient->sendBatchRequest($batchRequests)); $findResponseById = function (array $batch, $id) { foreach ($batch as $item) { if (isset($item['id']) && $item['id'] === $id) { return $item; } } return null; }; expect($batchResponse['statusCode'])->toBe(200); expect($batchResponse['body'])->toBeArray()->toHaveCount(3); $response1 = $findResponseById($batchResponse['body'], 'batch-req-1'); $response2 = $findResponseById($batchResponse['body'], 'batch-req-2'); $response3 = $findResponseById($batchResponse['body'], 'batch-req-3'); expect($response1['result']['content'][0]['text'])->toBe('Hello, Batch Item 1!'); expect($response2['result']['content'][0]['text'])->toBe('30'); expect($response3['error']['code'])->toBe(-32601); expect($response3['error']['message'])->toContain("Method 'nonexistent/method' not found"); })->group('integration', 'streamable_http_json'); it('can handle tool list request', function () { await($this->jsonClient->sendRequest('initialize', [ 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => [] ], 'init-json-tools')); await($this->jsonClient->sendNotification('notifications/initialized')); $toolListResult = await($this->jsonClient->sendRequest('tools/list', [], 'tool-list-json-1')); expect($toolListResult['statusCode'])->toBe(200); expect($toolListResult['body']['id'])->toBe('tool-list-json-1'); expect($toolListResult['body']['result']['tools'])->toBeArray(); expect(count($toolListResult['body']['result']['tools']))->toBe(4); expect($toolListResult['body']['result']['tools'][0]['name'])->toBe('greet_streamable_tool'); expect($toolListResult['body']['result']['tools'][1]['name'])->toBe('sum_streamable_tool'); expect($toolListResult['body']['result']['tools'][2]['name'])->toBe('tool_reads_context'); })->group('integration', 'streamable_http_json'); it('passes request in Context', function () { await($this->jsonClient->sendRequest('initialize', [ 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => [] ], 'init-json-context')); await($this->jsonClient->sendNotification('notifications/initialized')); $toolResult = await($this->jsonClient->sendRequest('tools/call', [ 'name' => 'tool_reads_context', 'arguments' => [] ], 'tool-json-context-1', ['X-Test-Header' => 'TestValue'])); expect($toolResult['statusCode'])->toBe(200); expect($toolResult['body']['id'])->toBe('tool-json-context-1'); expect($toolResult['body'])->not->toHaveKey('error'); expect($toolResult['body']['result']['content'][0]['text'])->toBe('TestValue'); })->group('integration', 'streamable_http_json'); it('can read a registered resource', function () { await($this->jsonClient->sendRequest('initialize', [ 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => [] ], 'init-json-res')); await($this->jsonClient->sendNotification('notifications/initialized')); $resourceResult = await($this->jsonClient->sendRequest('resources/read', ['uri' => 'test://streamable/static'], 'res-read-json-1')); expect($resourceResult['statusCode'])->toBe(200); expect($resourceResult['body']['id'])->toBe('res-read-json-1'); $contents = $resourceResult['body']['result']['contents']; expect($contents[0]['uri'])->toBe('test://streamable/static'); expect($contents[0]['text'])->toBe(\PhpMcp\Server\Tests\Fixtures\General\ResourceHandlerFixture::$staticTextContent); })->group('integration', 'streamable_http_json'); it('can get a registered prompt', function () { await($this->jsonClient->sendRequest('initialize', ['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => []], 'init-json-prompt')); await($this->jsonClient->sendNotification('notifications/initialized')); $promptResult = await($this->jsonClient->sendRequest('prompts/get', [ 'name' => 'simple_streamable_prompt', 'arguments' => ['name' => 'JsonPromptUser', 'style' => 'terse'] ], 'prompt-get-json-1')); expect($promptResult['statusCode'])->toBe(200); expect($promptResult['body']['id'])->toBe('prompt-get-json-1'); $messages = $promptResult['body']['result']['messages']; expect($messages[0]['content']['text'])->toBe('Craft a terse greeting for JsonPromptUser.'); })->group('integration', 'streamable_http_json'); it('rejects subsequent requests if client does not send initialized notification', function () { // 1. Initialize ONLY await($this->jsonClient->sendRequest('initialize', ['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => []], 'init-json-noack')); // Client "forgets" to send notifications/initialized back // 2. Attempt to Call a tool $toolResult = await($this->jsonClient->sendRequest('tools/call', [ 'name' => 'greet_streamable_tool', 'arguments' => ['name' => 'NoAckJsonUser'] ], 'tool-json-noack')); expect($toolResult['statusCode'])->toBe(200); // HTTP is fine expect($toolResult['body']['id'])->toBe('tool-json-noack'); expect($toolResult['body']['error']['code'])->toBe(-32600); // Invalid Request expect($toolResult['body']['error']['message'])->toContain('Client session not initialized'); })->group('integration', 'streamable_http_json'); it('returns HTTP 400 error for non-initialize requests without Mcp-Session-Id', function () { await($this->jsonClient->sendRequest('initialize', ['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => []], 'init-sess-test')); $this->jsonClient->sessionId = null; try { await($this->jsonClient->sendRequest('tools/list', [], 'tools-list-no-session')); } catch (ResponseException $e) { expect($e->getResponse()->getStatusCode())->toBe(400); $bodyContent = (string) $e->getResponse()->getBody(); $decodedBody = json_decode($bodyContent, true); expect($decodedBody['jsonrpc'])->toBe('2.0'); expect($decodedBody['id'])->toBe('tools-list-no-session'); expect($decodedBody['error']['code'])->toBe(-32600); expect($decodedBody['error']['message'])->toContain('Mcp-Session-Id header required'); } })->group('integration', 'streamable_http_json'); }); describe('STREAM MODE', function () { beforeEach(function () { $this->process = new Process($this->streamModeCommand, getcwd() ?: null, null, []); $this->process->start(); $this->streamClient = new MockStreamHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH); await(delay(0.2)); }); afterEach(function () { if ($this->streamClient ?? null) { $this->streamClient->closeMainSseStream(); } }); it('server starts, initializes via POST JSON, calls a tool, and closes', function () { // 1. Initialize Request $initResponse = await($this->streamClient->sendInitializeRequest([ 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'StreamModeClient', 'version' => '1.0'], 'capabilities' => [] ], 'init-stream-1')); expect($this->streamClient->sessionId)->toBeString()->not->toBeEmpty(); expect($initResponse['id'])->toBe('init-stream-1'); expect($initResponse)->not->toHaveKey('error'); expect($initResponse['result']['protocolVersion'])->toBe(Protocol::LATEST_PROTOCOL_VERSION); expect($initResponse['result']['serverInfo']['name'])->toBe('StreamableHttpIntegrationServer'); // 2. Send Initialized Notification $notifResult = await($this->streamClient->sendHttpNotification('notifications/initialized')); expect($notifResult['statusCode'])->toBe(202); // 3. Call a tool $toolResponse = await($this->streamClient->sendRequest('tools/call', [ 'name' => 'greet_streamable_tool', 'arguments' => ['name' => 'Stream Mode User'] ], 'tool-stream-1')); expect($toolResponse['id'])->toBe('tool-stream-1'); expect($toolResponse)->not->toHaveKey('error'); expect($toolResponse['result']['content'][0]['text'])->toBe('Hello, Stream Mode User!'); })->group('integration', 'streamable_http_stream'); it('return HTTP 400 error response for invalid JSON in POST request', function () { $malformedJson = '{"jsonrpc":"2.0", "id": "bad-json-stream-1", "method": "tools/list", "params": {"broken"}'; $postPromise = $this->streamClient->browser->post( $this->streamClient->baseMcpUrl, ['Content-Type' => 'application/json', 'Accept' => 'text/event-stream'], $malformedJson ); try { await(timeout($postPromise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2)); } catch (ResponseException $e) { $httpResponse = $e->getResponse(); $bodyContent = (string) $httpResponse->getBody(); $decodedBody = json_decode($bodyContent, true); expect($httpResponse->getStatusCode())->toBe(400); expect($decodedBody['jsonrpc'])->toBe('2.0'); expect($decodedBody['id'])->toBe(''); expect($decodedBody['error']['code'])->toBe(-32700); expect($decodedBody['error']['message'])->toContain('Invalid JSON'); } })->group('integration', 'streamable_http_stream'); it('returns JSON-RPC error result for request for non-existent method', function () { // 1. Initialize await($this->streamClient->sendInitializeRequest([ 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'StreamModeClient', 'version' => '1.0'], 'capabilities' => [] ], 'init-stream-err')); await($this->streamClient->sendHttpNotification('notifications/initialized')); // 2. Send Request $errorResponse = await($this->streamClient->sendRequest('non/existentToolViaStream', [], 'err-meth-stream-1')); expect($errorResponse['id'])->toBe('err-meth-stream-1'); expect($errorResponse['error']['code'])->toBe(-32601); expect($errorResponse['error']['message'])->toContain("Method 'non/existentToolViaStream' not found"); })->group('integration', 'streamable_http_stream'); it('can handle batch requests correctly', function () { await($this->streamClient->sendInitializeRequest([ 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'StreamModeBatchClient', 'version' => '1.0'], 'capabilities' => [] ], 'init-stream-batch')); expect($this->streamClient->sessionId)->toBeString()->not->toBeEmpty(); await($this->streamClient->sendHttpNotification('notifications/initialized')); $batchRequests = [ ['jsonrpc' => '2.0', 'id' => 'batch-req-1', 'method' => 'tools/call', 'params' => ['name' => 'greet_streamable_tool', 'arguments' => ['name' => 'Batch Item 1']]], ['jsonrpc' => '2.0', 'method' => 'notifications/something'], ['jsonrpc' => '2.0', 'id' => 'batch-req-2', 'method' => 'tools/call', 'params' => ['name' => 'sum_streamable_tool', 'arguments' => ['a' => 10, 'b' => 20]]], ['jsonrpc' => '2.0', 'id' => 'batch-req-3', 'method' => 'nonexistent/method'] ]; $batchResponseArray = await($this->streamClient->sendBatchRequest($batchRequests)); expect($batchResponseArray)->toBeArray()->toHaveCount(3); $findResponseById = function (array $batch, $id) { foreach ($batch as $item) { if (isset($item['id']) && $item['id'] === $id) { return $item; } } return null; }; $response1 = $findResponseById($batchResponseArray, 'batch-req-1'); $response2 = $findResponseById($batchResponseArray, 'batch-req-2'); $response3 = $findResponseById($batchResponseArray, 'batch-req-3'); expect($response1['result']['content'][0]['text'])->toBe('Hello, Batch Item 1!'); expect($response2['result']['content'][0]['text'])->toBe('30'); expect($response3['error']['code'])->toBe(-32601); expect($response3['error']['message'])->toContain("Method 'nonexistent/method' not found"); })->group('integration', 'streamable_http_stream'); it('passes request in Context', function () { await($this->streamClient->sendInitializeRequest([ 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'StreamModeClient', 'version' => '1.0'], 'capabilities' => [] ], 'init-stream-context')); expect($this->streamClient->sessionId)->toBeString()->not->toBeEmpty(); await($this->streamClient->sendHttpNotification('notifications/initialized')); $toolResult = await($this->streamClient->sendRequest('tools/call', [ 'name' => 'tool_reads_context', 'arguments' => [] ], 'tool-stream-context-1', ['X-Test-Header' => 'TestValue'])); expect($toolResult['id'])->toBe('tool-stream-context-1'); expect($toolResult)->not->toHaveKey('error'); expect($toolResult['result']['content'][0]['text'])->toBe('TestValue'); })->group('integration', 'streamable_http_stream'); it('can handle tool list request', function () { await($this->streamClient->sendInitializeRequest(['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => []], 'init-stream-tools')); await($this->streamClient->sendHttpNotification('notifications/initialized')); $toolListResponse = await($this->streamClient->sendRequest('tools/list', [], 'tool-list-stream-1')); expect($toolListResponse['id'])->toBe('tool-list-stream-1'); expect($toolListResponse)->not->toHaveKey('error'); expect($toolListResponse['result']['tools'])->toBeArray(); expect(count($toolListResponse['result']['tools']))->toBe(4); expect($toolListResponse['result']['tools'][0]['name'])->toBe('greet_streamable_tool'); expect($toolListResponse['result']['tools'][1]['name'])->toBe('sum_streamable_tool'); expect($toolListResponse['result']['tools'][2]['name'])->toBe('tool_reads_context'); })->group('integration', 'streamable_http_stream'); it('can read a registered resource', function () { await($this->streamClient->sendInitializeRequest(['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => []], 'init-stream-res')); await($this->streamClient->sendHttpNotification('notifications/initialized')); $resourceResponse = await($this->streamClient->sendRequest('resources/read', ['uri' => 'test://streamable/static'], 'res-read-stream-1')); expect($resourceResponse['id'])->toBe('res-read-stream-1'); $contents = $resourceResponse['result']['contents']; expect($contents[0]['uri'])->toBe('test://streamable/static'); expect($contents[0]['text'])->toBe(\PhpMcp\Server\Tests\Fixtures\General\ResourceHandlerFixture::$staticTextContent); })->group('integration', 'streamable_http_stream'); it('can get a registered prompt', function () { await($this->streamClient->sendInitializeRequest(['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => []], 'init-stream-prompt')); await($this->streamClient->sendHttpNotification('notifications/initialized')); $promptResponse = await($this->streamClient->sendRequest('prompts/get', [ 'name' => 'simple_streamable_prompt', 'arguments' => ['name' => 'StreamPromptUser', 'style' => 'formal'] ], 'prompt-get-stream-1')); expect($promptResponse['id'])->toBe('prompt-get-stream-1'); $messages = $promptResponse['result']['messages']; expect($messages[0]['content']['text'])->toBe('Craft a formal greeting for StreamPromptUser.'); })->group('integration', 'streamable_http_stream'); it('rejects subsequent requests if client does not send initialized notification', function () { await($this->streamClient->sendInitializeRequest([ 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'StreamModeClient', 'version' => '1.0'], 'capabilities' => [] ], 'init-stream-noack')); $toolResponse = await($this->streamClient->sendRequest('tools/call', [ 'name' => 'greet_streamable_tool', 'arguments' => ['name' => 'NoAckStreamUser'] ], 'tool-stream-noack')); expect($toolResponse['id'])->toBe('tool-stream-noack'); expect($toolResponse['error']['code'])->toBe(-32600); expect($toolResponse['error']['message'])->toContain('Client session not initialized'); })->group('integration', 'streamable_http_stream'); it('returns HTTP 400 error for non-initialize requests without Mcp-Session-Id', function () { await($this->streamClient->sendInitializeRequest([ 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'StreamModeClient', 'version' => '1.0'], 'capabilities' => [] ], 'init-stream-sess-test')); $validSessionId = $this->streamClient->sessionId; $this->streamClient->sessionId = null; try { await($this->streamClient->sendRequest('tools/list', [], 'tools-list-no-session-stream')); $this->fail("Expected request to tools/list to fail with 400, but it succeeded."); } catch (ResponseException $e) { expect($e->getResponse()->getStatusCode())->toBe(400); // Body can't be a json since the header accepts only text/event-stream } $this->streamClient->sessionId = $validSessionId; })->group('integration', 'streamable_http_stream'); }); /** * STATELESS MODE TESTS * * Tests for the stateless mode of StreamableHttpServerTransport, which: * - Generates session IDs internally but doesn't expose them to clients * - Doesn't require session IDs in requests after initialization * - Doesn't include session IDs in response headers * - Disables GET requests (SSE streaming) * - Makes DELETE requests meaningless (but returns 204) * - Treats each request as independent (no persistent session state) * * This mode is designed to work with clients like OpenAI's MCP implementation * that have issues with session management in "never require approval" mode. */ describe('STATELESS MODE', function () { beforeEach(function () { $this->process = new Process($this->statelessModeCommand, getcwd() ?: null, null, []); $this->process->start(); $this->statelessClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH); await(delay(0.2)); }); it('allows tool calls without having to send initialized notification', function () { // 1. Initialize Request $initResult = await($this->statelessClient->sendRequest('initialize', [ 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'StatelessModeClient', 'version' => '1.0'], 'capabilities' => [] ], 'init-stateless-1')); expect($initResult['statusCode'])->toBe(200); expect($initResult['body']['id'])->toBe('init-stateless-1'); expect($initResult['body'])->not->toHaveKey('error'); expect($initResult['body']['result']['protocolVersion'])->toBe(Protocol::LATEST_PROTOCOL_VERSION); expect($initResult['body']['result']['serverInfo']['name'])->toBe('StreamableHttpIntegrationServer'); expect($this->statelessClient->sessionId)->toBeString()->toBeEmpty(); // 2. Call a tool $toolResult = await($this->statelessClient->sendRequest('tools/call', [ 'name' => 'greet_streamable_tool', 'arguments' => ['name' => 'Stateless Mode User'] ], 'tool-stateless-1')); expect($toolResult['statusCode'])->toBe(200); expect($toolResult['body']['id'])->toBe('tool-stateless-1'); expect($toolResult['body'])->not->toHaveKey('error'); expect($toolResult['body']['result']['content'][0]['text'])->toBe('Hello, Stateless Mode User!'); })->group('integration', 'streamable_http_stateless'); it('return HTTP 400 error response for invalid JSON in POST request', function () { $malformedJson = '{"jsonrpc":"2.0", "id": "bad-json-stateless-1", "method": "tools/list", "params": {"broken"}'; $postPromise = $this->statelessClient->browser->post( $this->statelessClient->baseUrl, ['Content-Type' => 'application/json', 'Accept' => 'application/json'], $malformedJson ); try { await(timeout($postPromise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2)); } catch (ResponseException $e) { $httpResponse = $e->getResponse(); $bodyContent = (string) $httpResponse->getBody(); $decodedBody = json_decode($bodyContent, true); expect($httpResponse->getStatusCode())->toBe(400); expect($decodedBody['jsonrpc'])->toBe('2.0'); expect($decodedBody['id'])->toBe(''); expect($decodedBody['error']['code'])->toBe(-32700); expect($decodedBody['error']['message'])->toContain('Invalid JSON'); } })->group('integration', 'streamable_http_stateless'); it('returns JSON-RPC error result for request for non-existent method', function () { $errorResult = await($this->statelessClient->sendRequest('non/existentToolViaStateless', [], 'err-meth-stateless-1')); expect($errorResult['statusCode'])->toBe(200); expect($errorResult['body']['id'])->toBe('err-meth-stateless-1'); expect($errorResult['body']['error']['code'])->toBe(-32601); expect($errorResult['body']['error']['message'])->toContain("Method 'non/existentToolViaStateless' not found"); })->group('integration', 'streamable_http_stateless'); it('can handle batch requests correctly', function () { $batchRequests = [ ['jsonrpc' => '2.0', 'id' => 'batch-req-1', 'method' => 'tools/call', 'params' => ['name' => 'greet_streamable_tool', 'arguments' => ['name' => 'Batch Item 1']]], ['jsonrpc' => '2.0', 'method' => 'notifications/something'], ['jsonrpc' => '2.0', 'id' => 'batch-req-2', 'method' => 'tools/call', 'params' => ['name' => 'sum_streamable_tool', 'arguments' => ['a' => 10, 'b' => 20]]], ['jsonrpc' => '2.0', 'id' => 'batch-req-3', 'method' => 'nonexistent/method'] ]; $batchResponse = await($this->statelessClient->sendBatchRequest($batchRequests)); $findResponseById = function (array $batch, $id) { foreach ($batch as $item) { if (isset($item['id']) && $item['id'] === $id) { return $item; } } return null; }; expect($batchResponse['statusCode'])->toBe(200); expect($batchResponse['body'])->toBeArray()->toHaveCount(3); $response1 = $findResponseById($batchResponse['body'], 'batch-req-1'); $response2 = $findResponseById($batchResponse['body'], 'batch-req-2'); $response3 = $findResponseById($batchResponse['body'], 'batch-req-3'); expect($response1['result']['content'][0]['text'])->toBe('Hello, Batch Item 1!'); expect($response2['result']['content'][0]['text'])->toBe('30'); expect($response3['error']['code'])->toBe(-32601); expect($response3['error']['message'])->toContain("Method 'nonexistent/method' not found"); })->group('integration', 'streamable_http_stateless'); it('passes request in Context', function () { $toolResult = await($this->statelessClient->sendRequest('tools/call', [ 'name' => 'tool_reads_context', 'arguments' => [] ], 'tool-stateless-context-1', ['X-Test-Header' => 'TestValue'])); expect($toolResult['statusCode'])->toBe(200); expect($toolResult['body']['id'])->toBe('tool-stateless-context-1'); expect($toolResult['body'])->not->toHaveKey('error'); expect($toolResult['body']['result']['content'][0]['text'])->toBe('TestValue'); })->group('integration', 'streamable_http_stateless'); it('can handle tool list request', function () { $toolListResult = await($this->statelessClient->sendRequest('tools/list', [], 'tool-list-stateless-1')); expect($toolListResult['statusCode'])->toBe(200); expect($toolListResult['body']['id'])->toBe('tool-list-stateless-1'); expect($toolListResult['body'])->not->toHaveKey('error'); expect($toolListResult['body']['result']['tools'])->toBeArray(); expect(count($toolListResult['body']['result']['tools']))->toBe(4); expect($toolListResult['body']['result']['tools'][0]['name'])->toBe('greet_streamable_tool'); expect($toolListResult['body']['result']['tools'][1]['name'])->toBe('sum_streamable_tool'); expect($toolListResult['body']['result']['tools'][2]['name'])->toBe('tool_reads_context'); })->group('integration', 'streamable_http_stateless'); it('can read a registered resource', function () { $resourceResult = await($this->statelessClient->sendRequest('resources/read', ['uri' => 'test://streamable/static'], 'res-read-stateless-1')); expect($resourceResult['statusCode'])->toBe(200); expect($resourceResult['body']['id'])->toBe('res-read-stateless-1'); $contents = $resourceResult['body']['result']['contents']; expect($contents[0]['uri'])->toBe('test://streamable/static'); expect($contents[0]['text'])->toBe(\PhpMcp\Server\Tests\Fixtures\General\ResourceHandlerFixture::$staticTextContent); })->group('integration', 'streamable_http_stateless'); it('can get a registered prompt', function () { $promptResult = await($this->statelessClient->sendRequest('prompts/get', [ 'name' => 'simple_streamable_prompt', 'arguments' => ['name' => 'StatelessPromptUser', 'style' => 'formal'] ], 'prompt-get-stateless-1')); expect($promptResult['statusCode'])->toBe(200); expect($promptResult['body']['id'])->toBe('prompt-get-stateless-1'); $messages = $promptResult['body']['result']['messages']; expect($messages[0]['content']['text'])->toBe('Craft a formal greeting for StatelessPromptUser.'); })->group('integration', 'streamable_http_stateless'); it('does not return session ID in response headers in stateless mode', function () { $promise = $this->statelessClient->browser->post( $this->statelessClient->baseUrl, ['Content-Type' => 'application/json', 'Accept' => 'application/json'], json_encode([ 'jsonrpc' => '2.0', 'id' => 'init-header-test', 'method' => 'initialize', 'params' => [ 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'StatelessHeaderTest', 'version' => '1.0'], 'capabilities' => [] ] ]) ); $response = await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2)); expect($response->getStatusCode())->toBe(200); expect($response->hasHeader('Mcp-Session-Id'))->toBeFalse(); $body = json_decode((string) $response->getBody(), true); expect($body['id'])->toBe('init-header-test'); expect($body)->not->toHaveKey('error'); })->group('integration', 'streamable_http_stateless'); it('returns HTTP 405 for GET requests (SSE disabled) in stateless mode', function () { try { $getPromise = $this->statelessClient->browser->get( $this->statelessClient->baseUrl, ['Accept' => 'text/event-stream'] ); await(timeout($getPromise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2)); $this->fail("Expected GET request to fail with 405, but it succeeded."); } catch (ResponseException $e) { expect($e->getResponse()->getStatusCode())->toBe(405); $bodyContent = (string) $e->getResponse()->getBody(); $decodedBody = json_decode($bodyContent, true); expect($decodedBody['error']['message'])->toContain('GET requests (SSE streaming) are not supported in stateless mode'); } })->group('integration', 'streamable_http_stateless'); it('returns 204 for DELETE requests in stateless mode (but they are meaningless)', function () { $deletePromise = $this->statelessClient->browser->delete($this->statelessClient->baseUrl); $response = await(timeout($deletePromise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2)); expect($response->getStatusCode())->toBe(204); expect((string) $response->getBody())->toBeEmpty(); })->group('integration', 'streamable_http_stateless'); it('handles multiple independent tool calls in stateless mode', function () { $toolResult1 = await($this->statelessClient->sendRequest('tools/call', [ 'name' => 'greet_streamable_tool', 'arguments' => ['name' => 'User 1'] ], 'tool-multi-1')); $toolResult2 = await($this->statelessClient->sendRequest('tools/call', [ 'name' => 'sum_streamable_tool', 'arguments' => ['a' => 5, 'b' => 10] ], 'tool-multi-2')); $toolResult3 = await($this->statelessClient->sendRequest('tools/call', [ 'name' => 'greet_streamable_tool', 'arguments' => ['name' => 'User 3'] ], 'tool-multi-3')); expect($toolResult1['statusCode'])->toBe(200); expect($toolResult1['body']['id'])->toBe('tool-multi-1'); expect($toolResult1['body']['result']['content'][0]['text'])->toBe('Hello, User 1!'); expect($toolResult2['statusCode'])->toBe(200); expect($toolResult2['body']['id'])->toBe('tool-multi-2'); expect($toolResult2['body']['result']['content'][0]['text'])->toBe('15'); expect($toolResult3['statusCode'])->toBe(200); expect($toolResult3['body']['id'])->toBe('tool-multi-3'); expect($toolResult3['body']['result']['content'][0]['text'])->toBe('Hello, User 3!'); })->group('integration', 'streamable_http_stateless'); }); it('responds to OPTIONS request with CORS headers', function () { $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []); $this->process->start(); $this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH); await(delay(0.1)); $browser = new Browser(); $optionsUrl = $this->jsonClient->baseUrl; $promise = $browser->request('OPTIONS', $optionsUrl); $response = await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2)); expect($response->getStatusCode())->toBe(204); expect($response->getHeaderLine('Access-Control-Allow-Origin'))->toBe('*'); expect($response->getHeaderLine('Access-Control-Allow-Methods'))->toContain('POST'); expect($response->getHeaderLine('Access-Control-Allow-Methods'))->toContain('GET'); expect($response->getHeaderLine('Access-Control-Allow-Headers'))->toContain('Mcp-Session-Id'); })->group('integration', 'streamable_http'); it('returns 404 for unknown paths', function () { $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []); $this->process->start(); $this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH); await(delay(0.1)); $browser = new Browser(); $unknownUrl = "http://" . STREAMABLE_HTTP_HOST . ":" . $this->port . "/completely/unknown/path"; $promise = $browser->get($unknownUrl); try { await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2)); $this->fail("Request to unknown path should have failed with 404."); } catch (ResponseException $e) { expect($e->getResponse()->getStatusCode())->toBe(404); $decodedBody = json_decode((string)$e->getResponse()->getBody(), true); expect($decodedBody['error']['message'])->toContain('Not found'); } })->group('integration', 'streamable_http'); it('can delete client session with DELETE request', function () { $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []); $this->process->start(); $this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH); await(delay(0.1)); // 1. Initialize await($this->jsonClient->sendRequest('initialize', ['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => []], 'init-delete-test')); $sessionIdForDelete = $this->jsonClient->sessionId; expect($sessionIdForDelete)->toBeString(); await($this->jsonClient->sendNotification('notifications/initialized')); // 2. Establish a GET SSE connection $sseUrl = $this->jsonClient->baseUrl; $browserForSse = (new Browser())->withTimeout(3); $ssePromise = $browserForSse->requestStreaming('GET', $sseUrl, [ 'Accept' => 'text/event-stream', 'Mcp-Session-Id' => $sessionIdForDelete ]); $ssePsrResponse = await(timeout($ssePromise, 3)); expect($ssePsrResponse->getStatusCode())->toBe(200); expect($ssePsrResponse->getHeaderLine('Content-Type'))->toBe('text/event-stream'); $sseStream = $ssePsrResponse->getBody(); assert($sseStream instanceof ReadableStreamInterface); $isSseStreamClosed = false; $sseStream->on('close', function () use (&$isSseStreamClosed) { $isSseStreamClosed = true; }); // 3. Send DELETE request $deleteResponse = await($this->jsonClient->sendDeleteRequest()); expect($deleteResponse['statusCode'])->toBe(204); // 4. Assert that the GET SSE stream was closed await(delay(0.1)); expect($isSseStreamClosed)->toBeTrue("The GET SSE stream for session {$sessionIdForDelete} was not closed after DELETE request."); // 5. Assert that the client session was deleted try { await($this->jsonClient->sendRequest('tools/list', [], 'tool-list-json-1')); $this->fail("Expected request to tools/list to fail with 400, but it succeeded."); } catch (ResponseException $e) { expect($e->getResponse()->getStatusCode())->toBe(404); $bodyContent = (string) $e->getResponse()->getBody(); $decodedBody = json_decode($bodyContent, true); expect($decodedBody['error']['code'])->toBe(-32600); expect($decodedBody['error']['message'])->toContain('Invalid or expired session'); } })->group('integration', 'streamable_http_json'); it('executes middleware that adds headers to response', function () { $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []); $this->process->start(); $this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH); await(delay(0.1)); // 1. Send a request and check that middleware-added header is present $response = await($this->jsonClient->sendRequest('initialize', [ 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'MiddlewareTestClient'], 'capabilities' => [] ], 'init-middleware-headers')); // Check that the response has the header added by middleware expect($this->jsonClient->lastResponseHeaders)->toContain('X-Test-Middleware: header-added'); })->group('integration', 'streamable_http', 'middleware'); it('executes middleware that modifies request attributes', function () { $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []); $this->process->start(); $this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH); await(delay(0.1)); // 1. Initialize await($this->jsonClient->sendRequest('initialize', [ 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'MiddlewareAttrTestClient', 'version' => '1.0'], 'capabilities' => [] ], 'init-middleware-attr')); await($this->jsonClient->sendNotification('notifications/initialized')); // 2. Call tool that checks for middleware-added attribute $toolResponse = await($this->jsonClient->sendRequest('tools/call', [ 'name' => 'check_request_attribute_tool', 'arguments' => [] ], 'tool-attr-check')); expect($toolResponse['body']['result']['content'][0]['text'])->toBe('middleware-value-found: middleware-value'); })->group('integration', 'streamable_http', 'middleware'); it('executes middleware that can short-circuit request processing', function () { $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []); $this->process->start(); await(delay(0.1)); $browser = new Browser(); $shortCircuitUrl = "http://" . STREAMABLE_HTTP_HOST . ":" . $this->port . "/" . STREAMABLE_MCP_PATH . "/short-circuit"; $promise = $browser->get($shortCircuitUrl); try { $response = await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2)); $this->fail("Expected a 418 status code response, but request succeeded"); } catch (ResponseException $e) { expect($e->getResponse()->getStatusCode())->toBe(418); $body = (string) $e->getResponse()->getBody(); expect($body)->toBe('Short-circuited by middleware'); } catch (\Throwable $e) { $this->fail("Short-circuit middleware test failed: " . $e->getMessage()); } })->group('integration', 'streamable_http', 'middleware'); it('executes multiple middlewares in correct order', function () { $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []); $this->process->start(); $this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH); await(delay(0.1)); // 1. Send a request and check middleware order await($this->jsonClient->sendRequest('initialize', [ 'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'MiddlewareOrderTestClient'], 'capabilities' => [] ], 'init-middleware-order')); // Check that headers from multiple middlewares are present in correct order expect($this->jsonClient->lastResponseHeaders)->toContain('X-Middleware-Order: third,second,first'); })->group('integration', 'streamable_http', 'middleware'); it('handles middleware that throws exceptions gracefully', function () { $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []); $this->process->start(); await(delay(0.1)); $browser = new Browser(); $errorUrl = "http://" . STREAMABLE_HTTP_HOST . ":" . $this->port . "/" . STREAMABLE_MCP_PATH . "/error-middleware"; $promise = $browser->get($errorUrl); try { await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2)); $this->fail("Error middleware should have thrown an exception."); } catch (ResponseException $e) { expect($e->getResponse()->getStatusCode())->toBe(500); $body = (string) $e->getResponse()->getBody(); // ReactPHP handles exceptions and returns a generic error message expect($body)->toContain('Internal Server Error'); } })->group('integration', 'streamable_http', 'middleware'); ```