#
tokens: 33106/50000 4/154 files (page 5/5)
lines: off (toggle) GitHub
raw markdown copy
This is page 5 of 5. Use http://codebase.md/php-mcp/server?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .github
│   └── workflows
│       ├── changelog.yml
│       └── tests.yml
├── .gitignore
├── .php-cs-fixer.php
├── CHANGELOG.md
├── composer.json
├── CONTRIBUTING.md
├── examples
│   ├── 01-discovery-stdio-calculator
│   │   ├── McpElements.php
│   │   └── server.php
│   ├── 02-discovery-http-userprofile
│   │   ├── McpElements.php
│   │   ├── server.php
│   │   └── UserIdCompletionProvider.php
│   ├── 03-manual-registration-stdio
│   │   ├── server.php
│   │   └── SimpleHandlers.php
│   ├── 04-combined-registration-http
│   │   ├── DiscoveredElements.php
│   │   ├── ManualHandlers.php
│   │   └── server.php
│   ├── 05-stdio-env-variables
│   │   ├── EnvToolHandler.php
│   │   └── server.php
│   ├── 06-custom-dependencies-stdio
│   │   ├── McpTaskHandlers.php
│   │   ├── server.php
│   │   └── Services.php
│   ├── 07-complex-tool-schema-http
│   │   ├── EventTypes.php
│   │   ├── McpEventScheduler.php
│   │   └── server.php
│   └── 08-schema-showcase-streamable
│       ├── SchemaShowcaseElements.php
│       └── server.php
├── LICENSE
├── phpunit.xml
├── README.md
├── src
│   ├── Attributes
│   │   ├── CompletionProvider.php
│   │   ├── McpPrompt.php
│   │   ├── McpResource.php
│   │   ├── McpResourceTemplate.php
│   │   ├── McpTool.php
│   │   └── Schema.php
│   ├── Configuration.php
│   ├── Context.php
│   ├── Contracts
│   │   ├── CompletionProviderInterface.php
│   │   ├── EventStoreInterface.php
│   │   ├── LoggerAwareInterface.php
│   │   ├── LoopAwareInterface.php
│   │   ├── ServerTransportInterface.php
│   │   ├── SessionHandlerInterface.php
│   │   └── SessionInterface.php
│   ├── Defaults
│   │   ├── ArrayCache.php
│   │   ├── BasicContainer.php
│   │   ├── DefaultUuidSessionIdGenerator.php
│   │   ├── EnumCompletionProvider.php
│   │   ├── FileCache.php
│   │   ├── InMemoryEventStore.php
│   │   ├── ListCompletionProvider.php
│   │   └── SystemClock.php
│   ├── Dispatcher.php
│   ├── Elements
│   │   ├── RegisteredElement.php
│   │   ├── RegisteredPrompt.php
│   │   ├── RegisteredResource.php
│   │   ├── RegisteredResourceTemplate.php
│   │   └── RegisteredTool.php
│   ├── Exception
│   │   ├── ConfigurationException.php
│   │   ├── DiscoveryException.php
│   │   ├── McpServerException.php
│   │   ├── ProtocolException.php
│   │   └── TransportException.php
│   ├── Protocol.php
│   ├── Registry.php
│   ├── Server.php
│   ├── ServerBuilder.php
│   ├── Session
│   │   ├── ArraySessionHandler.php
│   │   ├── CacheSessionHandler.php
│   │   ├── Session.php
│   │   ├── SessionManager.php
│   │   └── SubscriptionManager.php
│   ├── Transports
│   │   ├── HttpServerTransport.php
│   │   ├── StdioServerTransport.php
│   │   └── StreamableHttpServerTransport.php
│   └── Utils
│       ├── Discoverer.php
│       ├── DocBlockParser.php
│       ├── HandlerResolver.php
│       ├── SchemaGenerator.php
│       └── SchemaValidator.php
└── tests
    ├── Fixtures
    │   ├── Discovery
    │   │   ├── DiscoverablePromptHandler.php
    │   │   ├── DiscoverableResourceHandler.php
    │   │   ├── DiscoverableTemplateHandler.php
    │   │   ├── DiscoverableToolHandler.php
    │   │   ├── EnhancedCompletionHandler.php
    │   │   ├── InvocablePromptFixture.php
    │   │   ├── InvocableResourceFixture.php
    │   │   ├── InvocableResourceTemplateFixture.php
    │   │   ├── InvocableToolFixture.php
    │   │   ├── NonDiscoverableClass.php
    │   │   └── SubDir
    │   │       └── HiddenTool.php
    │   ├── Enums
    │   │   ├── BackedIntEnum.php
    │   │   ├── BackedStringEnum.php
    │   │   ├── PriorityEnum.php
    │   │   ├── StatusEnum.php
    │   │   └── UnitEnum.php
    │   ├── General
    │   │   ├── CompletionProviderFixture.php
    │   │   ├── DocBlockTestFixture.php
    │   │   ├── InvokableHandlerFixture.php
    │   │   ├── PromptHandlerFixture.php
    │   │   ├── RequestAttributeChecker.php
    │   │   ├── ResourceHandlerFixture.php
    │   │   ├── ToolHandlerFixture.php
    │   │   └── VariousTypesHandler.php
    │   ├── Middlewares
    │   │   ├── ErrorMiddleware.php
    │   │   ├── FirstMiddleware.php
    │   │   ├── HeaderMiddleware.php
    │   │   ├── RequestAttributeMiddleware.php
    │   │   ├── SecondMiddleware.php
    │   │   ├── ShortCircuitMiddleware.php
    │   │   └── ThirdMiddleware.php
    │   ├── Schema
    │   │   └── SchemaGenerationTarget.php
    │   ├── ServerScripts
    │   │   ├── HttpTestServer.php
    │   │   ├── StdioTestServer.php
    │   │   └── StreamableHttpTestServer.php
    │   └── Utils
    │       ├── AttributeFixtures.php
    │       ├── DockBlockParserFixture.php
    │       └── SchemaGeneratorFixture.php
    ├── Integration
    │   ├── DiscoveryTest.php
    │   ├── HttpServerTransportTest.php
    │   ├── SchemaGenerationTest.php
    │   ├── StdioServerTransportTest.php
    │   └── StreamableHttpServerTransportTest.php
    ├── Mocks
    │   ├── Clients
    │   │   ├── MockJsonHttpClient.php
    │   │   ├── MockSseClient.php
    │   │   └── MockStreamHttpClient.php
    │   └── Clock
    │       └── FixedClock.php
    ├── Pest.php
    ├── TestCase.php
    └── Unit
        ├── Attributes
        │   ├── CompletionProviderTest.php
        │   ├── McpPromptTest.php
        │   ├── McpResourceTemplateTest.php
        │   ├── McpResourceTest.php
        │   └── McpToolTest.php
        ├── ConfigurationTest.php
        ├── Defaults
        │   ├── EnumCompletionProviderTest.php
        │   └── ListCompletionProviderTest.php
        ├── DispatcherTest.php
        ├── Elements
        │   ├── RegisteredElementTest.php
        │   ├── RegisteredPromptTest.php
        │   ├── RegisteredResourceTemplateTest.php
        │   ├── RegisteredResourceTest.php
        │   └── RegisteredToolTest.php
        ├── ProtocolTest.php
        ├── RegistryTest.php
        ├── ServerBuilderTest.php
        ├── ServerTest.php
        ├── Session
        │   ├── ArraySessionHandlerTest.php
        │   ├── CacheSessionHandlerTest.php
        │   ├── SessionManagerTest.php
        │   └── SessionTest.php
        └── Utils
            ├── DocBlockParserTest.php
            ├── HandlerResolverTest.php
            └── SchemaValidatorTest.php
```

# Files

--------------------------------------------------------------------------------
/src/Transports/StreamableHttpServerTransport.php:
--------------------------------------------------------------------------------

```php
<?php

declare(strict_types=1);

namespace PhpMcp\Server\Transports;

use Evenement\EventEmitterTrait;
use PhpMcp\Server\Contracts\EventStoreInterface;
use PhpMcp\Server\Contracts\LoggerAwareInterface;
use PhpMcp\Server\Contracts\LoopAwareInterface;
use PhpMcp\Server\Contracts\ServerTransportInterface;
use PhpMcp\Server\Exception\McpServerException;
use PhpMcp\Server\Exception\TransportException;
use PhpMcp\Schema\JsonRpc\Message;
use PhpMcp\Schema\JsonRpc\BatchRequest;
use PhpMcp\Schema\JsonRpc\BatchResponse;
use PhpMcp\Schema\JsonRpc\Error;
use PhpMcp\Schema\JsonRpc\Parser;
use PhpMcp\Schema\JsonRpc\Request;
use PhpMcp\Schema\JsonRpc\Response;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Http\HttpServer;
use React\Http\Message\Response as HttpResponse;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use React\Socket\SocketServer;
use React\Stream\ThroughStream;
use Throwable;

use function React\Promise\resolve;
use function React\Promise\reject;

class StreamableHttpServerTransport implements ServerTransportInterface, LoggerAwareInterface, LoopAwareInterface
{
    use EventEmitterTrait;

    protected LoggerInterface $logger;
    protected LoopInterface $loop;

    private ?SocketServer $socket = null;
    private ?HttpServer $http = null;
    private bool $listening = false;
    private bool $closing = false;

    private ?EventStoreInterface $eventStore;

    /**
     * Stores Deferred objects for POST requests awaiting a direct JSON response.
     * Keyed by a unique pendingRequestId.
     * @var array<string, Deferred>
     */
    private array $pendingRequests = [];

    /**
     * Stores active SSE streams.
     * Key: streamId
     * Value: ['stream' => ThroughStream, 'sessionId' => string, 'context' => array]
     * @var array<string, array{stream: ThroughStream, sessionId: string, context: array}>
     */
    private array $activeSseStreams = [];

    private ?ThroughStream $getStream = null;

    /**
     * @param bool $enableJsonResponse If true, the server will return JSON responses instead of starting an SSE stream.
     * @param bool $stateless If true, the server will not emit client_connected events.
     * @param EventStoreInterface $eventStore If provided, the server will replay events to the client.
     * @param array<callable(\Psr\Http\Message\ServerRequestInterface, callable): (\Psr\Http\Message\ResponseInterface|\React\Promise\PromiseInterface)> $middlewares Middlewares to be applied to the HTTP server.
     * This can be useful for simple request/response scenarios without streaming.
     */
    public function __construct(
        private readonly string $host = '127.0.0.1',
        private readonly int $port = 8080,
        private string $mcpPath = '/mcp',
        private ?array $sslContext = null,
        private readonly bool $enableJsonResponse = true,
        private readonly bool $stateless = false,
        ?EventStoreInterface $eventStore = null,
        private array $middlewares = []
    ) {
        $this->logger = new NullLogger();
        $this->loop = Loop::get();
        $this->mcpPath = '/' . trim($mcpPath, '/');
        $this->eventStore = $eventStore;

        foreach ($this->middlewares as $mw) {
            if (!is_callable($mw)) {
                throw new \InvalidArgumentException('All provided middlewares must be callable.');
            }
        }
    }

    protected function generateId(): string
    {
        return bin2hex(random_bytes(16)); // 32 hex characters
    }

    public function setLogger(LoggerInterface $logger): void
    {
        $this->logger = $logger;
    }

    public function setLoop(LoopInterface $loop): void
    {
        $this->loop = $loop;
    }

    public function listen(): void
    {
        if ($this->listening) {
            throw new TransportException('StreamableHttp transport is already listening.');
        }

        if ($this->closing) {
            throw new TransportException('Cannot listen, transport is closing/closed.');
        }

        $listenAddress = "{$this->host}:{$this->port}";
        $protocol = $this->sslContext ? 'https' : 'http';

        try {
            $this->socket = new SocketServer(
                $listenAddress,
                $this->sslContext ?? [],
                $this->loop
            );

            $handlers = array_merge($this->middlewares, [$this->createRequestHandler()]);
            $this->http = new HttpServer($this->loop, ...$handlers);
            $this->http->listen($this->socket);

            $this->socket->on('error', function (Throwable $error) {
                $this->logger->error('Socket server error (StreamableHttp).', ['error' => $error->getMessage()]);
                $this->emit('error', [new TransportException("Socket server error: {$error->getMessage()}", 0, $error)]);
                $this->close();
            });

            $this->logger->info("Server is up and listening on {$protocol}://{$listenAddress} 🚀");
            $this->logger->info("MCP Endpoint: {$protocol}://{$listenAddress}{$this->mcpPath}");

            $this->listening = true;
            $this->closing = false;
            $this->emit('ready');
        } catch (Throwable $e) {
            $this->logger->error("Failed to start StreamableHttp listener on {$listenAddress}", ['exception' => $e]);
            throw new TransportException("Failed to start StreamableHttp listener on {$listenAddress}: {$e->getMessage()}", 0, $e);
        }
    }

    private function createRequestHandler(): callable
    {
        return function (ServerRequestInterface $request) {
            $path = $request->getUri()->getPath();
            $method = $request->getMethod();

            $this->logger->debug("Request received", ['method' => $method, 'path' => $path, 'target' => $this->mcpPath]);

            if ($path !== $this->mcpPath) {
                $error = Error::forInvalidRequest("Not found: {$path}");
                return new HttpResponse(404, ['Content-Type' => 'application/json'], json_encode($error));
            }

            $corsHeaders = [
                'Access-Control-Allow-Origin' => '*',
                'Access-Control-Allow-Methods' => 'GET, POST, DELETE, OPTIONS',
                'Access-Control-Allow-Headers' => 'Content-Type, Mcp-Session-Id, Last-Event-ID, Authorization',
            ];

            if ($method === 'OPTIONS') {
                return new HttpResponse(204, $corsHeaders);
            }

            $addCors = function (HttpResponse $r) use ($corsHeaders) {
                foreach ($corsHeaders as $key => $value) {
                    $r = $r->withAddedHeader($key, $value);
                }
                return $r;
            };

            try {
                return match ($method) {
                    'GET' => $this->handleGetRequest($request)->then($addCors, fn($e) => $addCors($this->handleRequestError($e, $request))),
                    'POST' => $this->handlePostRequest($request)->then($addCors, fn($e) => $addCors($this->handleRequestError($e, $request))),
                    'DELETE' => $this->handleDeleteRequest($request)->then($addCors, fn($e) => $addCors($this->handleRequestError($e, $request))),
                    default => $addCors($this->handleUnsupportedRequest($request)),
                };
            } catch (Throwable $e) {
                return $addCors($this->handleRequestError($e, $request));
            }
        };
    }

    private function handleGetRequest(ServerRequestInterface $request): PromiseInterface
    {
        if ($this->stateless) {
            $error = Error::forInvalidRequest("GET requests (SSE streaming) are not supported in stateless mode.");
            return resolve(new HttpResponse(405, ['Content-Type' => 'application/json'], json_encode($error)));
        }

        $acceptHeader = $request->getHeaderLine('Accept');
        if (!str_contains($acceptHeader, 'text/event-stream')) {
            $error = Error::forInvalidRequest("Not Acceptable: Client must accept text/event-stream for GET requests.");
            return resolve(new HttpResponse(406, ['Content-Type' => 'application/json'], json_encode($error)));
        }

        $sessionId = $request->getHeaderLine('Mcp-Session-Id');
        if (empty($sessionId)) {
            $this->logger->warning("GET request without Mcp-Session-Id.");
            $error = Error::forInvalidRequest("Mcp-Session-Id header required for GET requests.");
            return resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error)));
        }

        $this->getStream = new ThroughStream();

        $this->getStream->on('close', function () use ($sessionId) {
            $this->logger->debug("GET SSE stream closed.", ['sessionId' => $sessionId]);
            $this->getStream = null;
        });

        $this->getStream->on('error', function (Throwable $e) use ($sessionId) {
            $this->logger->error("GET SSE stream error.", ['sessionId' => $sessionId, 'error' => $e->getMessage()]);
            $this->getStream = null;
        });

        $headers = [
            'Content-Type' => 'text/event-stream',
            'Cache-Control' => 'no-cache',
            'Connection' => 'keep-alive',
            'X-Accel-Buffering' => 'no',
        ];

        $response = new HttpResponse(200, $headers, $this->getStream);

        if ($this->eventStore) {
            $lastEventId = $request->getHeaderLine('Last-Event-ID');
            $this->replayEvents($lastEventId, $this->getStream, $sessionId);
        }

        return resolve($response);
    }

    private function handlePostRequest(ServerRequestInterface $request): PromiseInterface
    {
        $deferred = new Deferred();

        $acceptHeader = $request->getHeaderLine('Accept');
        if (!str_contains($acceptHeader, 'application/json') && !str_contains($acceptHeader, 'text/event-stream')) {
            $error = Error::forInvalidRequest("Not Acceptable: Client must accept both application/json or text/event-stream");
            $deferred->resolve(new HttpResponse(406, ['Content-Type' => 'application/json'], json_encode($error)));
            return $deferred->promise();
        }

        if (!str_contains($request->getHeaderLine('Content-Type'), 'application/json')) {
            $error = Error::forInvalidRequest("Unsupported Media Type: Content-Type must be application/json");
            $deferred->resolve(new HttpResponse(415, ['Content-Type' => 'application/json'], json_encode($error)));
            return $deferred->promise();
        }

        $body = $request->getBody()->getContents();

        if (empty($body)) {
            $this->logger->warning("Received empty POST body");
            $error = Error::forInvalidRequest("Empty request body.");
            $deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error)));
            return $deferred->promise();
        }

        try {
            $message = Parser::parse($body);
        } catch (Throwable $e) {
            $this->logger->error("Failed to parse MCP message from POST body", ['error' => $e->getMessage()]);
            $error = Error::forParseError("Invalid JSON: " . $e->getMessage());
            $deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error)));
            return $deferred->promise();
        }

        $isInitializeRequest = ($message instanceof Request && $message->method === 'initialize');
        $sessionId = null;

        if ($this->stateless) {
            $sessionId = $this->generateId();
            $this->emit('client_connected', [$sessionId]);
        } else {
            if ($isInitializeRequest) {
                if ($request->hasHeader('Mcp-Session-Id')) {
                    $this->logger->warning("Client sent Mcp-Session-Id with InitializeRequest. Ignoring.", ['clientSentId' => $request->getHeaderLine('Mcp-Session-Id')]);
                    $error = Error::forInvalidRequest("Invalid request: Session already initialized. Mcp-Session-Id header not allowed with InitializeRequest.", $message->getId());
                    $deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error)));
                    return $deferred->promise();
                }

                $sessionId = $this->generateId();
                $this->emit('client_connected', [$sessionId]);
            } else {
                $sessionId = $request->getHeaderLine('Mcp-Session-Id');

                if (empty($sessionId)) {
                    $this->logger->warning("POST request without Mcp-Session-Id.");
                    $error = Error::forInvalidRequest("Mcp-Session-Id header required for POST requests.", $message->getId());
                    $deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error)));
                    return $deferred->promise();
                }
            }
        }

        $context = [
            'is_initialize_request' => $isInitializeRequest,
        ];

        $nRequests = match (true) {
            $message instanceof Request => 1,
            $message instanceof BatchRequest => $message->nRequests(),
            default => 0,
        };

        if ($nRequests === 0) {
            $deferred->resolve(new HttpResponse(202));
            $context['type'] = 'post_202_sent';
        } else {
            if ($this->enableJsonResponse) {
                $pendingRequestId = $this->generateId();
                $this->pendingRequests[$pendingRequestId] = $deferred;

                $timeoutTimer = $this->loop->addTimer(30, function () use ($pendingRequestId, $sessionId) {
                    if (isset($this->pendingRequests[$pendingRequestId])) {
                        $deferred = $this->pendingRequests[$pendingRequestId];
                        unset($this->pendingRequests[$pendingRequestId]);
                        $this->logger->warning("Timeout waiting for direct JSON response processing.", ['pending_request_id' => $pendingRequestId, 'session_id' => $sessionId]);
                        $errorResponse = McpServerException::internalError("Request processing timed out.")->toJsonRpcError($pendingRequestId);
                        $deferred->resolve(new HttpResponse(500, ['Content-Type' => 'application/json'], json_encode($errorResponse->toArray())));
                    }
                });

                $this->pendingRequests[$pendingRequestId]->promise()->finally(function () use ($timeoutTimer) {
                    $this->loop->cancelTimer($timeoutTimer);
                });

                $context['type'] = 'post_json';
                $context['pending_request_id'] = $pendingRequestId;
            } else {
                $streamId = $this->generateId();
                $sseStream = new ThroughStream();
                $this->activeSseStreams[$streamId] = [
                    'stream' => $sseStream,
                    'sessionId' => $sessionId,
                    'context' => ['nRequests' => $nRequests, 'nResponses' => 0]
                ];

                $sseStream->on('close', function () use ($streamId) {
                    $this->logger->info("POST SSE stream closed by client/server.", ['streamId' => $streamId, 'sessionId' => $this->activeSseStreams[$streamId]['sessionId']]);
                    unset($this->activeSseStreams[$streamId]);
                });
                $sseStream->on('error', function (Throwable $e) use ($streamId) {
                    $this->logger->error("POST SSE stream error.", ['streamId' => $streamId, 'sessionId' => $this->activeSseStreams[$streamId]['sessionId'], 'error' => $e->getMessage()]);
                    unset($this->activeSseStreams[$streamId]);
                });

                $headers = [
                    'Content-Type' => 'text/event-stream',
                    'Cache-Control' => 'no-cache',
                    'Connection' => 'keep-alive',
                    'X-Accel-Buffering' => 'no',
                ];

                if (!empty($sessionId) && !$this->stateless) {
                    $headers['Mcp-Session-Id'] = $sessionId;
                }

                $deferred->resolve(new HttpResponse(200, $headers, $sseStream));
                $context['type'] = 'post_sse';
                $context['streamId'] = $streamId;
                $context['nRequests'] = $nRequests;
            }
        }

        $context['stateless'] = $this->stateless;
        $context['request'] = $request;

        $this->loop->futureTick(function () use ($message, $sessionId, $context) {
            $this->emit('message', [$message, $sessionId, $context]);
        });

        return $deferred->promise();
    }

    private function handleDeleteRequest(ServerRequestInterface $request): PromiseInterface
    {
        if ($this->stateless) {
            return resolve(new HttpResponse(204));
        }

        $sessionId = $request->getHeaderLine('Mcp-Session-Id');
        if (empty($sessionId)) {
            $this->logger->warning("DELETE request without Mcp-Session-Id.");
            $error = Error::forInvalidRequest("Mcp-Session-Id header required for DELETE.");
            return resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error)));
        }

        $streamsToClose = [];
        foreach ($this->activeSseStreams as $streamId => $streamInfo) {
            if ($streamInfo['sessionId'] === $sessionId) {
                $streamsToClose[] = $streamId;
            }
        }

        foreach ($streamsToClose as $streamId) {
            $this->activeSseStreams[$streamId]['stream']->end();
            unset($this->activeSseStreams[$streamId]);
        }

        if ($this->getStream !== null) {
            $this->getStream->end();
            $this->getStream = null;
        }

        $this->emit('client_disconnected', [$sessionId, 'Session terminated by DELETE request']);

        return resolve(new HttpResponse(204));
    }

    private function handleUnsupportedRequest(ServerRequestInterface $request): HttpResponse
    {
        $error = Error::forInvalidRequest("Method not allowed: {$request->getMethod()}");
        $headers = [
            'Content-Type' => 'application/json',
            'Allow' => 'GET, POST, DELETE, OPTIONS',
        ];
        return new HttpResponse(405, $headers, json_encode($error));
    }

    private function handleRequestError(Throwable $e, ServerRequestInterface $request): HttpResponse
    {
        $this->logger->error("Error processing HTTP request", [
            'method' => $request->getMethod(),
            'path' => $request->getUri()->getPath(),
            'exception' => $e->getMessage()
        ]);

        if ($e instanceof TransportException) {
            $error = Error::forInternalError("Transport Error: " . $e->getMessage());
            return new HttpResponse(500, ['Content-Type' => 'application/json'], json_encode($error));
        }

        $error = Error::forInternalError("Internal Server Error during HTTP request processing.");
        return new HttpResponse(500, ['Content-Type' => 'application/json'], json_encode($error));
    }

    public function sendMessage(Message $message, string $sessionId, array $context = []): PromiseInterface
    {
        if ($this->closing) {
            return reject(new TransportException('Transport is closing.'));
        }

        $isInitializeResponse = ($context['is_initialize_request'] ?? false) && ($message instanceof Response);

        switch ($context['type'] ?? null) {
            case 'post_202_sent':
                return resolve(null);

            case 'post_sse':
                $streamId = $context['streamId'];
                if (!isset($this->activeSseStreams[$streamId])) {
                    $this->logger->error("SSE stream for POST not found.", ['streamId' => $streamId, 'sessionId' => $sessionId]);
                    return reject(new TransportException("SSE stream {$streamId} not found for POST response."));
                }

                $stream = $this->activeSseStreams[$streamId]['stream'];
                if (!$stream->isWritable()) {
                    $this->logger->warning("SSE stream for POST is not writable.", ['streamId' => $streamId, 'sessionId' => $sessionId]);
                    return reject(new TransportException("SSE stream {$streamId} for POST is not writable."));
                }

                $sentCountThisCall = 0;

                if ($message instanceof Response || $message instanceof Error) {
                    $json = json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
                    $eventId = $this->eventStore ? $this->eventStore->storeEvent($streamId, $json) : null;
                    $this->sendSseEventToStream($stream, $json, $eventId);
                    $sentCountThisCall = 1;
                } elseif ($message instanceof BatchResponse) {
                    foreach ($message->getAll() as $singleResponse) {
                        $json = json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
                        $eventId = $this->eventStore ? $this->eventStore->storeEvent($streamId, $json) : null;
                        $this->sendSseEventToStream($stream, $json, $eventId);
                        $sentCountThisCall++;
                    }
                }

                if (isset($this->activeSseStreams[$streamId]['context'])) {
                    $this->activeSseStreams[$streamId]['context']['nResponses'] += $sentCountThisCall;
                    if ($this->activeSseStreams[$streamId]['context']['nResponses'] >= $this->activeSseStreams[$streamId]['context']['nRequests']) {
                        $this->logger->info("All expected responses sent for POST SSE stream. Closing.", ['streamId' => $streamId, 'sessionId' => $sessionId]);
                        $stream->end(); // Will trigger 'close' event.

                        if ($context['stateless'] ?? false) {
                            $this->loop->futureTick(function () use ($sessionId) {
                                $this->emit('client_disconnected', [$sessionId, 'Stateless request completed']);
                            });
                        }
                    }
                }

                return resolve(null);

            case 'post_json':
                $pendingRequestId = $context['pending_request_id'];
                if (!isset($this->pendingRequests[$pendingRequestId])) {
                    $this->logger->error("Pending direct JSON request not found.", ['pending_request_id' => $pendingRequestId, 'session_id' => $sessionId]);
                    return reject(new TransportException("Pending request {$pendingRequestId} not found."));
                }

                $deferred = $this->pendingRequests[$pendingRequestId];
                unset($this->pendingRequests[$pendingRequestId]);

                $responseBody = json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
                $headers = ['Content-Type' => 'application/json'];
                if ($isInitializeResponse && !$this->stateless) {
                    $headers['Mcp-Session-Id'] = $sessionId;
                }

                $statusCode = $context['status_code'] ?? 200;
                $deferred->resolve(new HttpResponse($statusCode, $headers, $responseBody . "\n"));

                if ($context['stateless'] ?? false) {
                    $this->loop->futureTick(function () use ($sessionId) {
                        $this->emit('client_disconnected', [$sessionId, 'Stateless request completed']);
                    });
                }

                return resolve(null);

            default:
                if ($this->getStream === null) {
                    $this->logger->error("GET SSE stream not found.", ['sessionId' => $sessionId]);
                    return reject(new TransportException("GET SSE stream not found."));
                }

                if (!$this->getStream->isWritable()) {
                    $this->logger->warning("GET SSE stream is not writable.", ['sessionId' => $sessionId]);
                    return reject(new TransportException("GET SSE stream not writable."));
                }
                if ($message instanceof Response || $message instanceof Error) {
                    $json = json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
                    $eventId = $this->eventStore ? $this->eventStore->storeEvent('GET_STREAM', $json) : null;
                    $this->sendSseEventToStream($this->getStream, $json, $eventId);
                } elseif ($message instanceof BatchResponse) {
                    foreach ($message->getAll() as $singleResponse) {
                        $json = json_encode($singleResponse, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
                        $eventId = $this->eventStore ? $this->eventStore->storeEvent('GET_STREAM', $json) : null;
                        $this->sendSseEventToStream($this->getStream, $json, $eventId);
                    }
                }
                return resolve(null);
        }
    }

    private function replayEvents(string $lastEventId, ThroughStream $sseStream, string $sessionId): void
    {
        if (empty($lastEventId)) {
            return;
        }

        try {
            $this->eventStore->replayEventsAfter(
                $lastEventId,
                function (string $replayedEventId, string $json) use ($sseStream) {
                    $this->logger->debug("Replaying event", ['replayedEventId' => $replayedEventId]);
                    $this->sendSseEventToStream($sseStream, $json, $replayedEventId);
                }
            );
        } catch (Throwable $e) {
            $this->logger->error("Error during event replay.", ['sessionId' => $sessionId, 'exception' => $e]);
        }
    }

    private function sendSseEventToStream(ThroughStream $stream, string $data, ?string $eventId = null): bool
    {
        if (! $stream->isWritable()) {
            return false;
        }

        $frame = "event: message\n";
        if ($eventId !== null) {
            $frame .= "id: {$eventId}\n";
        }

        $lines = explode("\n", $data);
        foreach ($lines as $line) {
            $frame .= "data: {$line}\n";
        }
        $frame .= "\n";

        return $stream->write($frame);
    }

    public function close(): void
    {
        if ($this->closing) {
            return;
        }

        $this->closing = true;
        $this->listening = false;
        $this->logger->info('Closing transport...');

        if ($this->socket) {
            $this->socket->close();
            $this->socket = null;
        }

        foreach ($this->activeSseStreams as $streamId => $streamInfo) {
            if ($streamInfo['stream']->isWritable()) {
                $streamInfo['stream']->end();
            }
        }

        if ($this->getStream !== null) {
            $this->getStream->end();
            $this->getStream = null;
        }

        foreach ($this->pendingRequests as $pendingRequestId => $deferred) {
            $deferred->reject(new TransportException('Transport is closing.'));
        }

        $this->activeSseStreams = [];
        $this->pendingRequests = [];

        $this->emit('close', ['Transport closed.']);
        $this->removeAllListeners();
    }
}

```

--------------------------------------------------------------------------------
/src/Utils/SchemaGenerator.php:
--------------------------------------------------------------------------------

```php
<?php

namespace PhpMcp\Server\Utils;

use phpDocumentor\Reflection\DocBlock\Tags\Param;
use PhpMcp\Server\Attributes\Schema;
use PhpMcp\Server\Context;
use ReflectionEnum;
use ReflectionIntersectionType;
use ReflectionMethod;
use ReflectionNamedType;
use ReflectionParameter;
use ReflectionType;
use ReflectionUnionType;
use stdClass;

/**
 * Generates JSON Schema for method parameters with intelligent Schema attribute handling.
 *
 * Priority system:
 * 1. Schema attributes (method-level and parameter-level)
 * 2. Reflection type information
 * 3. DocBlock type information
 */
class SchemaGenerator
{
    private DocBlockParser $docBlockParser;

    public function __construct(DocBlockParser $docBlockParser)
    {
        $this->docBlockParser = $docBlockParser;
    }

    /**
     * Generates a JSON Schema object (as a PHP array) for a method's or function's parameters.
     */
    public function generate(\ReflectionMethod|\ReflectionFunction $reflection): array
    {
        $methodSchema = $this->extractMethodLevelSchema($reflection);

        if ($methodSchema && isset($methodSchema['definition'])) {
            return $methodSchema['definition'];
        }

        $parametersInfo = $this->parseParametersInfo($reflection);

        return $this->buildSchemaFromParameters($parametersInfo, $methodSchema);
    }

    /**
     * Extracts method-level or function-level Schema attribute.
     */
    private function extractMethodLevelSchema(\ReflectionMethod|\ReflectionFunction $reflection): ?array
    {
        $schemaAttrs = $reflection->getAttributes(Schema::class, \ReflectionAttribute::IS_INSTANCEOF);
        if (empty($schemaAttrs)) {
            return null;
        }

        $schemaAttr = $schemaAttrs[0]->newInstance();
        return $schemaAttr->toArray();
    }

    /**
     * Extracts parameter-level Schema attribute.
     */
    private function extractParameterLevelSchema(ReflectionParameter $parameter): array
    {
        $schemaAttrs = $parameter->getAttributes(Schema::class, \ReflectionAttribute::IS_INSTANCEOF);
        if (empty($schemaAttrs)) {
            return [];
        }

        $schemaAttr = $schemaAttrs[0]->newInstance();
        return $schemaAttr->toArray();
    }

    /**
     * Builds the final schema from parameter information and method-level schema.
     *
     * @param array<int, array{
     *     name: string,
     *     doc_block_tag: Param|null,
     *     reflection_param: ReflectionParameter,
     *     reflection_type_object: ReflectionType|null,
     *     type_string: string,
     *     description: string|null,
     *     required: bool,
     *     allows_null: bool,
     *     default_value: mixed|null,
     *     has_default: bool,
     *     is_variadic: bool,
     *     parameter_schema: array<string, mixed>
     * }> $parametersInfo
     *
     * @param array<string, mixed>|null $methodSchema
     *
     * @return array<string, mixed>
     */
    private function buildSchemaFromParameters(array $parametersInfo, ?array $methodSchema): array
    {
        $schema = [
            'type' => 'object',
            'properties' => [],
            'required' => [],
        ];

        // Apply method-level schema as base
        if ($methodSchema) {
            $schema = array_merge($schema, $methodSchema);
            if (!isset($schema['type'])) {
                $schema['type'] = 'object';
            }
            if (!isset($schema['properties'])) {
                $schema['properties'] = [];
            }
            if (!isset($schema['required'])) {
                $schema['required'] = [];
            }
        }

        foreach ($parametersInfo as $paramInfo) {
            $paramName = $paramInfo['name'];

            $methodLevelParamSchema = $schema['properties'][$paramName] ?? null;

            $paramSchema = $this->buildParameterSchema($paramInfo, $methodLevelParamSchema);

            $schema['properties'][$paramName] = $paramSchema;

            if ($paramInfo['required'] && !in_array($paramName, $schema['required'])) {
                $schema['required'][] = $paramName;
            } elseif (!$paramInfo['required'] && ($key = array_search($paramName, $schema['required'])) !== false) {
                unset($schema['required'][$key]);
                $schema['required'] = array_values($schema['required']); // Re-index
            }
        }

        // Clean up empty properties
        if (empty($schema['properties'])) {
            $schema['properties'] = new stdClass();
        }
        if (empty($schema['required'])) {
            unset($schema['required']);
        }

        return $schema;
    }

    /**
     * Builds the final schema for a single parameter by merging all three levels.
     *
     * @param array{
     *     name: string,
     *     doc_block_tag: Param|null,
     *     reflection_param: ReflectionParameter,
     *     reflection_type_object: ReflectionType|null,
     *     type_string: string,
     *     description: string|null,
     *     required: bool,
     *     allows_null: bool,
     *     default_value: mixed|null,
     *     has_default: bool,
     *     is_variadic: bool,
     *     parameter_schema: array<string, mixed>
     * } $paramInfo
     * @param array<string, mixed>|null $methodLevelParamSchema
     */
    private function buildParameterSchema(array $paramInfo, ?array $methodLevelParamSchema = null): array
    {
        if ($paramInfo['is_variadic']) {
            return $this->buildVariadicParameterSchema($paramInfo);
        }

        $inferredSchema = $this->buildInferredParameterSchema($paramInfo);

        // Method-level takes precedence over inferred schema
        $mergedSchema = $inferredSchema;
        if ($methodLevelParamSchema) {
            $mergedSchema = $this->mergeSchemas($inferredSchema, $methodLevelParamSchema);
        }

        // Parameter-level takes highest precedence
        $parameterLevelSchema = $paramInfo['parameter_schema'];
        if (!empty($parameterLevelSchema)) {
            if (isset($parameterLevelSchema['definition']) && is_array($parameterLevelSchema['definition'])) {
                return $parameterLevelSchema['definition'];
            }

            $mergedSchema = $this->mergeSchemas($mergedSchema, $parameterLevelSchema);
        }

        return $mergedSchema;
    }

    /**
     * Merge two schemas where the dominant schema takes precedence over the recessive one.
     *
     * @param array $recessiveSchema The schema with lower precedence
     * @param array $dominantSchema The schema with higher precedence
     */
    private function mergeSchemas(array $recessiveSchema, array $dominantSchema): array
    {
        $mergedSchema = array_merge($recessiveSchema, $dominantSchema);

        return $mergedSchema;
    }

    /**
     * Builds parameter schema from inferred type and docblock information only.
     * Returns empty array for variadic parameters (handled separately).
     */
    private function buildInferredParameterSchema(array $paramInfo): array
    {
        $paramSchema = [];

        // Variadic parameters are handled separately
        if ($paramInfo['is_variadic']) {
            return [];
        }

        // Infer JSON Schema types
        $jsonTypes = $this->inferParameterTypes($paramInfo);

        if (count($jsonTypes) === 1) {
            $paramSchema['type'] = $jsonTypes[0];
        } elseif (count($jsonTypes) > 1) {
            $paramSchema['type'] = $jsonTypes;
        }

        // Add description from docblock
        if ($paramInfo['description']) {
            $paramSchema['description'] = $paramInfo['description'];
        }

        // Add default value only if parameter actually has a default
        if ($paramInfo['has_default']) {
            $paramSchema['default'] = $paramInfo['default_value'];
        }

        // Handle enums
        $paramSchema = $this->applyEnumConstraints($paramSchema, $paramInfo);

        // Handle array items
        $paramSchema = $this->applyArrayConstraints($paramSchema, $paramInfo);

        return $paramSchema;
    }

    /**
     * Builds schema for variadic parameters.
     */
    private function buildVariadicParameterSchema(array $paramInfo): array
    {
        $paramSchema = ['type' => 'array'];

        // Apply parameter-level Schema attributes first
        if (!empty($paramInfo['parameter_schema'])) {
            $paramSchema = array_merge($paramSchema, $paramInfo['parameter_schema']);
            // Ensure type is always array for variadic
            $paramSchema['type'] = 'array';
        }

        if ($paramInfo['description']) {
            $paramSchema['description'] = $paramInfo['description'];
        }

        // If no items specified by Schema attribute, infer from type
        if (!isset($paramSchema['items'])) {
            $itemJsonTypes = $this->mapPhpTypeToJsonSchemaType($paramInfo['type_string']);
            $nonNullItemTypes = array_filter($itemJsonTypes, fn($t) => $t !== 'null');

            if (count($nonNullItemTypes) === 1) {
                $paramSchema['items'] = ['type' => $nonNullItemTypes[0]];
            }
        }

        return $paramSchema;
    }

    /**
     * Infers JSON Schema types for a parameter.
     */
    private function inferParameterTypes(array $paramInfo): array
    {
        $jsonTypes = $this->mapPhpTypeToJsonSchemaType($paramInfo['type_string']);

        if ($paramInfo['allows_null'] && strtolower($paramInfo['type_string']) !== 'mixed' && !in_array('null', $jsonTypes)) {
            $jsonTypes[] = 'null';
        }

        if (count($jsonTypes) > 1) {
            // Sort but ensure null comes first for consistency
            $nullIndex = array_search('null', $jsonTypes);
            if ($nullIndex !== false) {
                unset($jsonTypes[$nullIndex]);
                sort($jsonTypes);
                array_unshift($jsonTypes, 'null');
            } else {
                sort($jsonTypes);
            }
        }

        return $jsonTypes;
    }

    /**
     * Applies enum constraints to parameter schema.
     */
    private function applyEnumConstraints(array $paramSchema, array $paramInfo): array
    {
        $reflectionType = $paramInfo['reflection_type_object'];

        if (!($reflectionType instanceof ReflectionNamedType) || $reflectionType->isBuiltin() || !enum_exists($reflectionType->getName())) {
            return $paramSchema;
        }

        $enumClass = $reflectionType->getName();
        $enumReflection = new ReflectionEnum($enumClass);
        $backingTypeReflection = $enumReflection->getBackingType();

        if ($enumReflection->isBacked() && $backingTypeReflection instanceof ReflectionNamedType) {
            $paramSchema['enum'] = array_column($enumClass::cases(), 'value');
            $jsonBackingType = match ($backingTypeReflection->getName()) {
                'int' => 'integer',
                'string' => 'string',
                default => null,
            };

            if ($jsonBackingType) {
                if (isset($paramSchema['type']) && is_array($paramSchema['type']) && in_array('null', $paramSchema['type'])) {
                    $paramSchema['type'] = ['null', $jsonBackingType];
                } else {
                    $paramSchema['type'] = $jsonBackingType;
                }
            }
        } else {
            // Non-backed enum - use names as enum values
            $paramSchema['enum'] = array_column($enumClass::cases(), 'name');
            if (isset($paramSchema['type']) && is_array($paramSchema['type']) && in_array('null', $paramSchema['type'])) {
                $paramSchema['type'] = ['null', 'string'];
            } else {
                $paramSchema['type'] = 'string';
            }
        }

        return $paramSchema;
    }

    /**
     * Applies array-specific constraints to parameter schema.
     */
    private function applyArrayConstraints(array $paramSchema, array $paramInfo): array
    {
        if (!isset($paramSchema['type'])) {
            return $paramSchema;
        }

        $typeString = $paramInfo['type_string'];
        $allowsNull = $paramInfo['allows_null'];

        // Handle object-like arrays using array{} syntax
        if (preg_match('/^array\s*{/i', $typeString)) {
            $objectSchema = $this->inferArrayItemsType($typeString);
            if (is_array($objectSchema) && isset($objectSchema['properties'])) {
                $paramSchema = array_merge($paramSchema, $objectSchema);
                $paramSchema['type'] = $allowsNull ? ['object', 'null'] : 'object';
            }
        }
        // Handle regular arrays
        elseif (in_array('array', $this->mapPhpTypeToJsonSchemaType($typeString))) {
            $itemsType = $this->inferArrayItemsType($typeString);
            if ($itemsType !== 'any') {
                if (is_string($itemsType)) {
                    $paramSchema['items'] = ['type' => $itemsType];
                } else {
                    if (!isset($itemsType['type']) && isset($itemsType['properties'])) {
                        $itemsType = array_merge(['type' => 'object'], $itemsType);
                    }
                    $paramSchema['items'] = $itemsType;
                }
            }

            if ($allowsNull) {
                $paramSchema['type'] = ['array', 'null'];
                sort($paramSchema['type']);
            } else {
                $paramSchema['type'] = 'array';
            }
        }

        return $paramSchema;
    }

    /**
     * Parses detailed information about a method's parameters.
     *
     * @return array<int, array{
     *     name: string,
     *     doc_block_tag: Param|null,
     *     reflection_param: ReflectionParameter,
     *     reflection_type_object: ReflectionType|null,
     *     type_string: string,
     *     description: string|null,
     *     required: bool,
     *     allows_null: bool,
     *     default_value: mixed|null,
     *     has_default: bool,
     *     is_variadic: bool,
     *     parameter_schema: array<string, mixed>
     * }>
     */
    private function parseParametersInfo(\ReflectionMethod|\ReflectionFunction $reflection): array
    {
        $docComment = $reflection->getDocComment() ?: null;
        $docBlock = $this->docBlockParser->parseDocBlock($docComment);
        $paramTags = $this->docBlockParser->getParamTags($docBlock);
        $parametersInfo = [];

        foreach ($reflection->getParameters() as $rp) {
            $paramName = $rp->getName();
            $paramTag = $paramTags['$' . $paramName] ?? null;

            $reflectionType = $rp->getType();

            if ($reflectionType instanceof ReflectionNamedType && $reflectionType?->getName() === Context::class) {
                continue;
            }

            $typeString = $this->getParameterTypeString($rp, $paramTag);
            $description = $this->docBlockParser->getParamDescription($paramTag);
            $hasDefault = $rp->isDefaultValueAvailable();
            $defaultValue = $hasDefault ? $rp->getDefaultValue() : null;
            $isVariadic = $rp->isVariadic();

            $parameterSchema = $this->extractParameterLevelSchema($rp);

            if ($defaultValue instanceof \BackedEnum) {
                $defaultValue = $defaultValue->value;
            }

            if ($defaultValue instanceof \UnitEnum) {
                $defaultValue = $defaultValue->name;
            }

            $allowsNull = false;
            if ($reflectionType && $reflectionType->allowsNull()) {
                $allowsNull = true;
            } elseif ($hasDefault && $defaultValue === null) {
                $allowsNull = true;
            } elseif (str_contains($typeString, 'null') || strtolower($typeString) === 'mixed') {
                $allowsNull = true;
            }

            $parametersInfo[] = [
                'name' => $paramName,
                'doc_block_tag' => $paramTag,
                'reflection_param' => $rp,
                'reflection_type_object' => $reflectionType,
                'type_string' => $typeString,
                'description' => $description,
                'required' => !$rp->isOptional(),
                'allows_null' => $allowsNull,
                'default_value' => $defaultValue,
                'has_default' => $hasDefault,
                'is_variadic' => $isVariadic,
                'parameter_schema' => $parameterSchema,
            ];
        }

        return $parametersInfo;
    }

    /**
     * Determines the type string for a parameter, prioritizing DocBlock.
     */
    private function getParameterTypeString(ReflectionParameter $rp, ?Param $paramTag): string
    {
        $docBlockType = $this->docBlockParser->getParamTypeString($paramTag);
        $isDocBlockTypeGeneric = false;

        if ($docBlockType !== null) {
            if (in_array(strtolower($docBlockType), ['mixed', 'unknown', ''])) {
                $isDocBlockTypeGeneric = true;
            }
        } else {
            $isDocBlockTypeGeneric = true; // No tag or no type in tag implies generic
        }

        $reflectionType = $rp->getType();
        $reflectionTypeString = null;
        if ($reflectionType) {
            $reflectionTypeString = $this->getTypeStringFromReflection($reflectionType, $rp->allowsNull());
        }

        // Prioritize Reflection if DocBlock type is generic AND Reflection provides a more specific type
        if ($isDocBlockTypeGeneric && $reflectionTypeString !== null && $reflectionTypeString !== 'mixed') {
            return $reflectionTypeString;
        }

        // Otherwise, use the DocBlock type if it was valid and non-generic
        if ($docBlockType !== null && !$isDocBlockTypeGeneric) {
            // Consider if DocBlock adds nullability missing from reflection
            if (stripos($docBlockType, 'null') !== false && $reflectionTypeString && stripos($reflectionTypeString, 'null') === false && !str_ends_with($reflectionTypeString, '|null')) {
                // If reflection didn't capture null, but docblock did, append |null (if not already mixed)
                if ($reflectionTypeString !== 'mixed') {
                    return $reflectionTypeString . '|null';
                }
            }

            return $docBlockType;
        }

        // Fallback to Reflection type even if it was generic ('mixed')
        if ($reflectionTypeString !== null) {
            return $reflectionTypeString;
        }

        // Default to 'mixed' if nothing else found
        return 'mixed';
    }

    /**
     * Converts a ReflectionType object into a type string representation.
     */
    private function getTypeStringFromReflection(?ReflectionType $type, bool $nativeAllowsNull): string
    {
        if ($type === null) {
            return 'mixed';
        }

        $types = [];
        if ($type instanceof ReflectionUnionType) {
            foreach ($type->getTypes() as $innerType) {
                $types[] = $this->getTypeStringFromReflection($innerType, $innerType->allowsNull());
            }
            if ($nativeAllowsNull) {
                $types = array_filter($types, fn($t) => strtolower($t) !== 'null');
            }
            $typeString = implode('|', array_unique(array_filter($types)));
        } elseif ($type instanceof ReflectionIntersectionType) {
            foreach ($type->getTypes() as $innerType) {
                $types[] = $this->getTypeStringFromReflection($innerType, false);
            }
            $typeString = implode('&', array_unique(array_filter($types)));
        } elseif ($type instanceof ReflectionNamedType) {
            $typeString = $type->getName();
        } else {
            return 'mixed';
        }

        $typeString = match (strtolower($typeString)) {
            'bool' => 'boolean',
            'int' => 'integer',
            'float', 'double' => 'number',
            'str' => 'string',
            default => $typeString,
        };

        $isNullable = $nativeAllowsNull;
        if ($type instanceof ReflectionNamedType && $type->getName() === 'mixed') {
            $isNullable = true;
        }

        if ($type instanceof ReflectionUnionType && !$nativeAllowsNull) {
            foreach ($type->getTypes() as $innerType) {
                if ($innerType instanceof ReflectionNamedType && strtolower($innerType->getName()) === 'null') {
                    $isNullable = true;
                    break;
                }
            }
        }

        if ($isNullable && $typeString !== 'mixed' && stripos($typeString, 'null') === false) {
            if (!str_ends_with($typeString, '|null') && !str_ends_with($typeString, '&null')) {
                $typeString .= '|null';
            }
        }

        // Remove leading backslash from class names, but handle built-ins like 'int' or unions like 'int|string'
        if (str_contains($typeString, '\\')) {
            $parts = preg_split('/([|&])/', $typeString, -1, PREG_SPLIT_DELIM_CAPTURE);
            $processedParts = array_map(fn($part) => str_starts_with($part, '\\') ? ltrim($part, '\\') : $part, $parts);
            $typeString = implode('', $processedParts);
        }

        return $typeString ?: 'mixed';
    }

    /**
     * Maps a PHP type string (potentially a union) to an array of JSON Schema type names.
     */
    private function mapPhpTypeToJsonSchemaType(string $phpTypeString): array
    {
        $normalizedType = strtolower(trim($phpTypeString));

        // PRIORITY 1: Check for array{} syntax which should be treated as object
        if (preg_match('/^array\s*{/i', $normalizedType)) {
            return ['object'];
        }

        // PRIORITY 2: Check for array syntax first (T[] or generics)
        if (
            str_contains($normalizedType, '[]') ||
            preg_match('/^(array|list|iterable|collection)</i', $normalizedType)
        ) {
            return ['array'];
        }

        // PRIORITY 3: Handle unions (recursive)
        if (str_contains($normalizedType, '|')) {
            $types = explode('|', $normalizedType);
            $jsonTypes = [];
            foreach ($types as $type) {
                $mapped = $this->mapPhpTypeToJsonSchemaType(trim($type));
                $jsonTypes = array_merge($jsonTypes, $mapped);
            }

            return array_values(array_unique($jsonTypes));
        }

        // PRIORITY 4: Handle simple built-in types
        return match ($normalizedType) {
            'string', 'scalar' => ['string'],
            '?string' => ['null', 'string'],
            'int', 'integer' => ['integer'],
            '?int', '?integer' => ['null', 'integer'],
            'float', 'double', 'number' => ['number'],
            '?float', '?double', '?number' => ['null', 'number'],
            'bool', 'boolean' => ['boolean'],
            '?bool', '?boolean' => ['null', 'boolean'],
            'array' => ['array'],
            '?array' => ['null', 'array'],
            'object', 'stdclass' => ['object'],
            '?object', '?stdclass' => ['null', 'object'],
            'null' => ['null'],
            'resource', 'callable' => ['object'],
            'mixed' => [],
            'void', 'never' => [],
            default => ['object'],
        };
    }

    /**
     * Infers the 'items' schema type for an array based on DocBlock type hints.
     */
    private function inferArrayItemsType(string $phpTypeString): string|array
    {
        $normalizedType = trim($phpTypeString);

        // Case 1: Simple T[] syntax (e.g., string[], int[], bool[], etc.)
        if (preg_match('/^(\\??)([\w\\\\]+)\\s*\\[\\]$/i', $normalizedType, $matches)) {
            $itemType = strtolower($matches[2]);
            return $this->mapSimpleTypeToJsonSchema($itemType);
        }

        // Case 2: Generic array<T> syntax (e.g., array<string>, array<int>, etc.)
        if (preg_match('/^(\\??)array\s*<\s*([\w\\\\|]+)\s*>$/i', $normalizedType, $matches)) {
            $itemType = strtolower($matches[2]);
            return $this->mapSimpleTypeToJsonSchema($itemType);
        }

        // Case 3: Nested array<array<T>> syntax or T[][] syntax
        if (
            preg_match('/^(\\??)array\s*<\s*array\s*<\s*([\w\\\\|]+)\s*>\s*>$/i', $normalizedType, $matches) ||
            preg_match('/^(\\??)([\w\\\\]+)\s*\[\]\[\]$/i', $normalizedType, $matches)
        ) {
            $innerType = $this->mapSimpleTypeToJsonSchema(isset($matches[2]) ? strtolower($matches[2]) : 'any');
            // Return a schema for array with items being arrays
            return [
                'type' => 'array',
                'items' => [
                    'type' => $innerType
                ]
            ];
        }

        // Case 4: Object-like array syntax (e.g., array{name: string, age: int})
        if (preg_match('/^(\\??)array\s*\{(.+)\}$/is', $normalizedType, $matches)) {
            return $this->parseObjectLikeArray($matches[2]);
        }

        return 'any';
    }

    /**
     * Parses object-like array syntax into a JSON Schema object
     */
    private function parseObjectLikeArray(string $propertiesStr): array
    {
        $properties = [];
        $required = [];

        // Parse properties from the string, handling nested structures
        $depth = 0;
        $buffer = '';

        for ($i = 0; $i < strlen($propertiesStr); $i++) {
            $char = $propertiesStr[$i];

            // Track nested braces
            if ($char === '{') {
                $depth++;
                $buffer .= $char;
            } elseif ($char === '}') {
                $depth--;
                $buffer .= $char;
            }
            // Property separator (comma)
            elseif ($char === ',' && $depth === 0) {
                // Process the completed property
                $this->parsePropertyDefinition(trim($buffer), $properties, $required);
                $buffer = '';
            } else {
                $buffer .= $char;
            }
        }

        // Process the last property
        if (!empty($buffer)) {
            $this->parsePropertyDefinition(trim($buffer), $properties, $required);
        }

        if (!empty($properties)) {
            return [
                'type' => 'object',
                'properties' => $properties,
                'required' => $required
            ];
        }

        return ['type' => 'object'];
    }

    /**
     * Parses a single property definition from an object-like array syntax
     */
    private function parsePropertyDefinition(string $propDefinition, array &$properties, array &$required): void
    {
        // Match property name and type
        if (preg_match('/^([a-zA-Z_\x80-\xff][a-zA-Z0-9_\x80-\xff]*)\s*:\s*(.+)$/i', $propDefinition, $matches)) {
            $propName = $matches[1];
            $propType = trim($matches[2]);

            // Add to required properties
            $required[] = $propName;

            // Check for nested array{} syntax
            if (preg_match('/^array\s*\{(.+)\}$/is', $propType, $nestedMatches)) {
                $nestedSchema = $this->parseObjectLikeArray($nestedMatches[1]);
                $properties[$propName] = $nestedSchema;
            }
            // Check for array<T> or T[] syntax
            elseif (
                preg_match('/^array\s*<\s*([\w\\\\|]+)\s*>$/i', $propType, $arrayMatches) ||
                preg_match('/^([\w\\\\]+)\s*\[\]$/i', $propType, $arrayMatches)
            ) {
                $itemType = $arrayMatches[1] ?? 'any';
                $properties[$propName] = [
                    'type' => 'array',
                    'items' => [
                        'type' => $this->mapSimpleTypeToJsonSchema($itemType)
                    ]
                ];
            }
            // Simple type
            else {
                $properties[$propName] = ['type' => $this->mapSimpleTypeToJsonSchema($propType)];
            }
        }
    }

    /**
     * Helper method to map basic PHP types to JSON Schema types
     */
    private function mapSimpleTypeToJsonSchema(string $type): string
    {
        return match (strtolower($type)) {
            'string' => 'string',
            'int', 'integer' => 'integer',
            'bool', 'boolean' => 'boolean',
            'float', 'double', 'number' => 'number',
            'array' => 'array',
            'object', 'stdclass' => 'object',
            default => in_array(strtolower($type), ['datetime', 'datetimeinterface']) ? 'string' : 'object',
        };
    }
}

```

--------------------------------------------------------------------------------
/tests/Unit/DispatcherTest.php:
--------------------------------------------------------------------------------

```php
<?php

namespace PhpMcp\Server\Tests\Unit;

use Mockery;
use Mockery\MockInterface;
use PhpMcp\Schema\ClientCapabilities;
use PhpMcp\Server\Context;
use PhpMcp\Server\Configuration;
use PhpMcp\Server\Contracts\CompletionProviderInterface;
use PhpMcp\Server\Contracts\SessionInterface;
use PhpMcp\Server\Dispatcher;
use PhpMcp\Server\Elements\RegisteredPrompt;
use PhpMcp\Server\Elements\RegisteredResource;
use PhpMcp\Server\Elements\RegisteredResourceTemplate;
use PhpMcp\Server\Elements\RegisteredTool;
use PhpMcp\Server\Exception\McpServerException;
use PhpMcp\Schema\Implementation;
use PhpMcp\Schema\JsonRpc\Notification as JsonRpcNotification;
use PhpMcp\Schema\JsonRpc\Request as JsonRpcRequest;
use PhpMcp\Schema\Prompt as PromptSchema;
use PhpMcp\Schema\PromptArgument;
use PhpMcp\Schema\Request\CallToolRequest;
use PhpMcp\Schema\Request\CompletionCompleteRequest;
use PhpMcp\Schema\Request\GetPromptRequest;
use PhpMcp\Schema\Request\InitializeRequest;
use PhpMcp\Schema\Request\ListToolsRequest;
use PhpMcp\Schema\Request\ReadResourceRequest;
use PhpMcp\Schema\Request\ResourceSubscribeRequest;
use PhpMcp\Schema\Request\SetLogLevelRequest;
use PhpMcp\Schema\Resource as ResourceSchema;
use PhpMcp\Schema\ResourceTemplate as ResourceTemplateSchema;
use PhpMcp\Schema\Result\CallToolResult;
use PhpMcp\Schema\Result\CompletionCompleteResult;
use PhpMcp\Schema\Result\EmptyResult;
use PhpMcp\Schema\Result\GetPromptResult;
use PhpMcp\Schema\Result\InitializeResult;
use PhpMcp\Schema\Result\ReadResourceResult;
use PhpMcp\Schema\ServerCapabilities;
use PhpMcp\Schema\Tool as ToolSchema;
use PhpMcp\Server\Registry;
use PhpMcp\Server\Session\SubscriptionManager;
use PhpMcp\Server\Utils\SchemaValidator;
use Psr\Container\ContainerInterface;
use Psr\Log\NullLogger;
use PhpMcp\Schema\Content\TextContent;
use PhpMcp\Schema\Content\PromptMessage;
use PhpMcp\Schema\Enum\LoggingLevel;
use PhpMcp\Schema\Enum\Role;
use PhpMcp\Schema\PromptReference;
use PhpMcp\Schema\Request\ListPromptsRequest;
use PhpMcp\Schema\Request\ListResourcesRequest;
use PhpMcp\Schema\Request\ListResourceTemplatesRequest;
use PhpMcp\Schema\ResourceReference;
use PhpMcp\Server\Protocol;
use PhpMcp\Server\Tests\Fixtures\Enums\StatusEnum;
use React\EventLoop\Loop;

const DISPATCHER_SESSION_ID = 'dispatcher-session-xyz';
const DISPATCHER_PAGINATION_LIMIT = 3;

beforeEach(function () {
    /** @var MockInterface&Configuration $configuration */
    $this->configuration = Mockery::mock(Configuration::class);
    /** @var MockInterface&Registry $registry */
    $this->registry = Mockery::mock(Registry::class);
    /** @var MockInterface&SubscriptionManager $subscriptionManager */
    $this->subscriptionManager = Mockery::mock(SubscriptionManager::class);
    /** @var MockInterface&SchemaValidator $schemaValidator */
    $this->schemaValidator = Mockery::mock(SchemaValidator::class);
    /** @var MockInterface&SessionInterface $session */
    $this->session = Mockery::mock(SessionInterface::class);
    /** @var MockInterface&ContainerInterface $container */
    $this->container = Mockery::mock(ContainerInterface::class);
    $this->context = new Context($this->session);

    $configuration = new Configuration(
        serverInfo: Implementation::make('DispatcherTestServer', '1.0'),
        capabilities: ServerCapabilities::make(),
        paginationLimit: DISPATCHER_PAGINATION_LIMIT,
        logger: new NullLogger(),
        loop: Loop::get(),
        cache: null,
        container: $this->container
    );

    $this->dispatcher = new Dispatcher(
        $configuration,
        $this->registry,
        $this->subscriptionManager,
        $this->schemaValidator
    );
});

it('routes to handleInitialize for initialize request', function () {
    $request = new JsonRpcRequest(
        jsonrpc: '2.0',
        id: 1,
        method: 'initialize',
        params: [
            'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
            'clientInfo' => ['name' => 'client', 'version' => '1.0'],
            'capabilities' => [],
        ]
    );
    $this->session->shouldReceive('set')->with('client_info', Mockery::on(fn($value) => $value['name'] === 'client' && $value['version'] === '1.0'))->once();
    $this->session->shouldReceive('set')->with('protocol_version', Protocol::LATEST_PROTOCOL_VERSION)->once();

    $result = $this->dispatcher->handleRequest($request, $this->context);
    expect($result)->toBeInstanceOf(InitializeResult::class);
    expect($result->protocolVersion)->toBe(Protocol::LATEST_PROTOCOL_VERSION);
    expect($result->serverInfo->name)->toBe('DispatcherTestServer');
});

it('routes to handlePing for ping request', function () {
    $request = new JsonRpcRequest('2.0', 'id1', 'ping', []);
    $result = $this->dispatcher->handleRequest($request, $this->context);
    expect($result)->toBeInstanceOf(EmptyResult::class);
});

it('throws MethodNotFound for unknown request method', function () {
    $rawRequest = new JsonRpcRequest('2.0', 'id1', 'unknown/method', []);
    $this->dispatcher->handleRequest($rawRequest, $this->context);
})->throws(McpServerException::class, "Method 'unknown/method' not found.");

it('routes to handleNotificationInitialized for initialized notification', function () {
    $notification = new JsonRpcNotification('2.0', 'notifications/initialized', []);
    $this->session->shouldReceive('set')->with('initialized', true)->once();
    $this->dispatcher->handleNotification($notification, $this->session);
});

it('does nothing for unknown notification method', function () {
    $rawNotification = new JsonRpcNotification('2.0', 'unknown/notification', []);
    $this->session->shouldNotReceive('set');
    $this->dispatcher->handleNotification($rawNotification, $this->session);
});


it('can handle initialize request', function () {
    $clientInfo = Implementation::make('TestClient', '0.9.9');
    $request = InitializeRequest::make(1, Protocol::LATEST_PROTOCOL_VERSION, ClientCapabilities::make(), $clientInfo, []);
    $this->session->shouldReceive('set')->with('client_info', $clientInfo->toArray())->once();
    $this->session->shouldReceive('set')->with('protocol_version', Protocol::LATEST_PROTOCOL_VERSION)->once();

    $result = $this->dispatcher->handleInitialize($request, $this->session);
    expect($result->protocolVersion)->toBe(Protocol::LATEST_PROTOCOL_VERSION);
    expect($result->serverInfo->name)->toBe('DispatcherTestServer');
    expect($result->capabilities)->toBeInstanceOf(ServerCapabilities::class);
});

it('can handle initialize request with older supported protocol version', function () {
    $clientInfo = Implementation::make('TestClient', '0.9.9');
    $clientRequestedVersion = '2024-11-05';
    $request = InitializeRequest::make(1, $clientRequestedVersion, ClientCapabilities::make(), $clientInfo, []);
    $this->session->shouldReceive('set')->with('client_info', $clientInfo->toArray())->once();
    $this->session->shouldReceive('set')->with('protocol_version', $clientRequestedVersion)->once();

    $result = $this->dispatcher->handleInitialize($request, $this->session);
    expect($result->protocolVersion)->toBe($clientRequestedVersion);
    expect($result->serverInfo->name)->toBe('DispatcherTestServer');
    expect($result->capabilities)->toBeInstanceOf(ServerCapabilities::class);
});

it('can handle initialize request with unsupported protocol version', function () {
    $clientInfo = Implementation::make('TestClient', '0.9.9');
    $unsupportedVersion = '1999-01-01';
    $request = InitializeRequest::make(1, $unsupportedVersion, ClientCapabilities::make(), $clientInfo, []);
    $this->session->shouldReceive('set')->with('client_info', $clientInfo->toArray())->once();
    $this->session->shouldReceive('set')->with('protocol_version', Protocol::LATEST_PROTOCOL_VERSION)->once();

    $result = $this->dispatcher->handleInitialize($request, $this->session);
    expect($result->protocolVersion)->toBe(Protocol::LATEST_PROTOCOL_VERSION);
    expect($result->serverInfo->name)->toBe('DispatcherTestServer');
    expect($result->capabilities)->toBeInstanceOf(ServerCapabilities::class);
});

it('can handle tool list request and return paginated tools', function () {
    $toolSchemas = [
        ToolSchema::make('tool1', ['type' => 'object', 'properties' => []]),
        ToolSchema::make('tool2', ['type' => 'object', 'properties' => []]),
        ToolSchema::make('tool3', ['type' => 'object', 'properties' => []]),
        ToolSchema::make('tool4', ['type' => 'object', 'properties' => []]),
    ];
    $this->registry->shouldReceive('getTools')->andReturn($toolSchemas);

    $request = ListToolsRequest::make(1);
    $result = $this->dispatcher->handleToolList($request);
    expect($result->tools)->toHaveCount(DISPATCHER_PAGINATION_LIMIT);
    expect($result->tools[0]->name)->toBe('tool1');
    expect($result->nextCursor)->toBeString();

    $nextCursor = $result->nextCursor;
    $requestPage2 = ListToolsRequest::make(2, $nextCursor);
    $resultPage2 = $this->dispatcher->handleToolList($requestPage2);
    expect($resultPage2->tools)->toHaveCount(count($toolSchemas) - DISPATCHER_PAGINATION_LIMIT);
    expect($resultPage2->tools[0]->name)->toBe('tool4');
    expect($resultPage2->nextCursor)->toBeNull();
});

it('can handle tool call request and return result', function () {
    $toolName = 'my-calculator';
    $args = ['a' => 10, 'b' => 5];
    $toolSchema = ToolSchema::make($toolName, ['type' => 'object', 'properties' => ['a' => ['type' => 'integer'], 'b' => ['type' => 'integer']]]);
    $registeredToolMock = Mockery::mock(RegisteredTool::class, [$toolSchema, 'MyToolHandler', 'handleTool', false]);

    $this->registry->shouldReceive('getTool')->with($toolName)->andReturn($registeredToolMock);
    $this->schemaValidator->shouldReceive('validateAgainstJsonSchema')->with($args, $toolSchema->inputSchema)->andReturn([]); // No validation errors
    $registeredToolMock->shouldReceive('call')->with($this->container, $args, $this->context)->andReturn([TextContent::make("Result: 15")]);

    $request = CallToolRequest::make(1, $toolName, $args);
    $result = $this->dispatcher->handleToolCall($request, $this->context);

    expect($result)->toBeInstanceOf(CallToolResult::class);
    expect($result->content[0]->text)->toBe("Result: 15");
    expect($result->isError)->toBeFalse();
});

it('can handle tool call request and throw exception if tool not found', function () {
    $this->registry->shouldReceive('getTool')->with('unknown-tool')->andReturn(null);
    $request = CallToolRequest::make(1, 'unknown-tool', []);
    $this->dispatcher->handleToolCall($request, $this->context);
})->throws(McpServerException::class, "Tool 'unknown-tool' not found.");

it('can handle tool call request and throw exception if argument validation fails', function () {
    $toolName = 'strict-tool';
    $args = ['param' => 'wrong_type'];
    $toolSchema = ToolSchema::make($toolName, ['type' => 'object', 'properties' => ['param' => ['type' => 'integer']]]);
    $registeredToolMock = Mockery::mock(RegisteredTool::class, [$toolSchema, 'MyToolHandler', 'handleTool', false]);

    $this->registry->shouldReceive('getTool')->with($toolName)->andReturn($registeredToolMock);
    $validationErrors = [['pointer' => '/param', 'keyword' => 'type', 'message' => 'Expected integer']];
    $this->schemaValidator->shouldReceive('validateAgainstJsonSchema')->with($args, $toolSchema->inputSchema)->andReturn($validationErrors);

    $request = CallToolRequest::make(1, $toolName, $args);
    try {
        $this->dispatcher->handleToolCall($request, $this->context);
    } catch (McpServerException $e) {
        expect($e->getMessage())->toContain("Invalid parameters for tool 'strict-tool'");
        expect($e->getData()['validation_errors'])->toBeArray();
    }
});

it('can handle tool call request and return error if tool execution throws exception', function () {
    $toolName = 'failing-tool';
    $toolSchema = ToolSchema::make($toolName, ['type' => 'object', 'properties' => []]);
    $registeredToolMock = Mockery::mock(RegisteredTool::class, [$toolSchema, 'MyToolHandler', 'handleTool', false]);

    $this->registry->shouldReceive('getTool')->with($toolName)->andReturn($registeredToolMock);
    $this->schemaValidator->shouldReceive('validateAgainstJsonSchema')->andReturn([]);
    $registeredToolMock->shouldReceive('call')->andThrow(new \RuntimeException("Tool crashed!"));

    $request = CallToolRequest::make(1, $toolName, []);
    $result = $this->dispatcher->handleToolCall($request, $this->context);

    expect($result->isError)->toBeTrue();
    expect($result->content[0]->text)->toBe("Tool execution failed: Tool crashed!");
});

it('can handle tool call request and return error if result formatting fails', function () {
    $toolName = 'bad-result-tool';
    $toolSchema = ToolSchema::make($toolName, ['type' => 'object', 'properties' => []]);
    $registeredToolMock = Mockery::mock(RegisteredTool::class, [$toolSchema, 'MyToolHandler', 'handleTool', false]);

    $this->registry->shouldReceive('getTool')->with($toolName)->andReturn($registeredToolMock);
    $this->schemaValidator->shouldReceive('validateAgainstJsonSchema')->andReturn([]);
    $registeredToolMock->shouldReceive('call')->andThrow(new \JsonException("Unencodable."));


    $request = CallToolRequest::make(1, $toolName, []);
    $result = $this->dispatcher->handleToolCall($request, $this->context);

    expect($result->isError)->toBeTrue();
    expect($result->content[0]->text)->toBe("Failed to serialize tool result: Unencodable.");
});

it('can handle resources list request and return paginated resources', function () {
    $resourceSchemas = [
        ResourceSchema::make('res://1', 'Resource1'),
        ResourceSchema::make('res://2', 'Resource2'),
        ResourceSchema::make('res://3', 'Resource3'),
        ResourceSchema::make('res://4', 'Resource4'),
        ResourceSchema::make('res://5', 'Resource5')
    ];
    $this->registry->shouldReceive('getResources')->andReturn($resourceSchemas);

    $requestP1 = ListResourcesRequest::make(1);
    $resultP1 = $this->dispatcher->handleResourcesList($requestP1);
    expect($resultP1->resources)->toHaveCount(DISPATCHER_PAGINATION_LIMIT);
    expect(array_map(fn($r) => $r->name, $resultP1->resources))->toEqual(['Resource1', 'Resource2', 'Resource3']);
    expect($resultP1->nextCursor)->toBe(base64_encode('offset=3'));

    // Page 2
    $requestP2 = ListResourcesRequest::make(2, $resultP1->nextCursor);
    $resultP2 = $this->dispatcher->handleResourcesList($requestP2);
    expect($resultP2->resources)->toHaveCount(2);
    expect(array_map(fn($r) => $r->name, $resultP2->resources))->toEqual(['Resource4', 'Resource5']);
    expect($resultP2->nextCursor)->toBeNull();
});

it('can handle resources list request and return empty if registry has no resources', function () {
    $this->registry->shouldReceive('getResources')->andReturn([]);
    $request = ListResourcesRequest::make(1);
    $result = $this->dispatcher->handleResourcesList($request);
    expect($result->resources)->toBeEmpty();
    expect($result->nextCursor)->toBeNull();
});

it('can handle resource template list request and return paginated templates', function () {
    $templateSchemas = [
        ResourceTemplateSchema::make('tpl://{id}/1', 'Template1'),
        ResourceTemplateSchema::make('tpl://{id}/2', 'Template2'),
        ResourceTemplateSchema::make('tpl://{id}/3', 'Template3'),
        ResourceTemplateSchema::make('tpl://{id}/4', 'Template4'),
    ];
    $this->registry->shouldReceive('getResourceTemplates')->andReturn($templateSchemas);

    // Page 1
    $requestP1 = ListResourceTemplatesRequest::make(1);
    $resultP1 = $this->dispatcher->handleResourceTemplateList($requestP1);
    expect($resultP1->resourceTemplates)->toHaveCount(DISPATCHER_PAGINATION_LIMIT);
    expect(array_map(fn($rt) => $rt->name, $resultP1->resourceTemplates))->toEqual(['Template1', 'Template2', 'Template3']);
    expect($resultP1->nextCursor)->toBe(base64_encode('offset=3'));

    // Page 2
    $requestP2 = ListResourceTemplatesRequest::make(2, $resultP1->nextCursor);
    $resultP2 = $this->dispatcher->handleResourceTemplateList($requestP2);
    expect($resultP2->resourceTemplates)->toHaveCount(1);
    expect(array_map(fn($rt) => $rt->name, $resultP2->resourceTemplates))->toEqual(['Template4']);
    expect($resultP2->nextCursor)->toBeNull();
});

it('can handle resource read request and return resource contents', function () {
    $uri = 'file://data.txt';
    $resourceSchema = ResourceSchema::make($uri, 'file_resource');
    $registeredResourceMock = Mockery::mock(RegisteredResource::class, [$resourceSchema, ['MyResourceHandler', 'read'], false]);
    $resourceContents = [TextContent::make('File content')];

    $this->registry->shouldReceive('getResource')->with($uri)->andReturn($registeredResourceMock);
    $registeredResourceMock->shouldReceive('read')->with($this->container, $uri, $this->context)->andReturn($resourceContents);

    $request = ReadResourceRequest::make(1, $uri);
    $result = $this->dispatcher->handleResourceRead($request, $this->context);

    expect($result)->toBeInstanceOf(ReadResourceResult::class);
    expect($result->contents)->toEqual($resourceContents);
});

it('can handle resource read request and throw exception if resource not found', function () {
    $this->registry->shouldReceive('getResource')->with('unknown://uri')->andReturn(null);
    $request = ReadResourceRequest::make(1, 'unknown://uri');
    $this->dispatcher->handleResourceRead($request, $this->context);
})->throws(McpServerException::class, "Resource URI 'unknown://uri' not found.");

it('can handle resource subscribe request and call subscription manager', function () {
    $uri = 'news://updates';
    $this->session->shouldReceive('getId')->andReturn(DISPATCHER_SESSION_ID);
    $this->subscriptionManager->shouldReceive('subscribe')->with(DISPATCHER_SESSION_ID, $uri)->once();
    $request = ResourceSubscribeRequest::make(1, $uri);
    $result = $this->dispatcher->handleResourceSubscribe($request, $this->session);
    expect($result)->toBeInstanceOf(EmptyResult::class);
});

it('can handle prompts list request and return paginated prompts', function () {
    $promptSchemas = [
        PromptSchema::make('promptA', '', []),
        PromptSchema::make('promptB', '', []),
        PromptSchema::make('promptC', '', []),
        PromptSchema::make('promptD', '', []),
        PromptSchema::make('promptE', '', []),
        PromptSchema::make('promptF', '', []),
    ]; // 6 prompts
    $this->registry->shouldReceive('getPrompts')->andReturn($promptSchemas);

    // Page 1
    $requestP1 = ListPromptsRequest::make(1);
    $resultP1 = $this->dispatcher->handlePromptsList($requestP1);
    expect($resultP1->prompts)->toHaveCount(DISPATCHER_PAGINATION_LIMIT);
    expect(array_map(fn($p) => $p->name, $resultP1->prompts))->toEqual(['promptA', 'promptB', 'promptC']);
    expect($resultP1->nextCursor)->toBe(base64_encode('offset=3'));

    // Page 2
    $requestP2 = ListPromptsRequest::make(2, $resultP1->nextCursor);
    $resultP2 = $this->dispatcher->handlePromptsList($requestP2);
    expect($resultP2->prompts)->toHaveCount(DISPATCHER_PAGINATION_LIMIT); // 3 more
    expect(array_map(fn($p) => $p->name, $resultP2->prompts))->toEqual(['promptD', 'promptE', 'promptF']);
    expect($resultP2->nextCursor)->toBeNull(); // End of list
});

it('can handle prompt get request and return prompt messages', function () {
    $promptName = 'daily-summary';
    $args = ['date' => '2024-07-16'];
    $promptSchema = PromptSchema::make($promptName, 'summary_prompt', [PromptArgument::make('date', required: true)]);
    $registeredPromptMock = Mockery::mock(RegisteredPrompt::class, [$promptSchema, ['MyPromptHandler', 'get'], false]);
    $promptMessages = [PromptMessage::make(Role::User, TextContent::make("Summary for 2024-07-16"))];

    $this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPromptMock);
    $registeredPromptMock->shouldReceive('get')->with($this->container, $args, $this->context)->andReturn($promptMessages);

    $request = GetPromptRequest::make(1, $promptName, $args);
    $result = $this->dispatcher->handlePromptGet($request, $this->context);

    expect($result)->toBeInstanceOf(GetPromptResult::class);
    expect($result->messages)->toEqual($promptMessages);
    expect($result->description)->toBe($promptSchema->description);
});

it('can handle prompt get request and throw exception if required argument is missing', function () {
    $promptName = 'needs-topic';
    $promptSchema = PromptSchema::make($promptName, '', [PromptArgument::make('topic', required: true)]);
    $registeredPromptMock = Mockery::mock(RegisteredPrompt::class, [$promptSchema, ['MyPromptHandler', 'get'], false]);
    $this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPromptMock);

    $request = GetPromptRequest::make(1, $promptName, ['other_arg' => 'value']); // 'topic' is missing
    $this->dispatcher->handlePromptGet($request, $this->context);
})->throws(McpServerException::class, "Missing required argument 'topic' for prompt 'needs-topic'.");


it('can handle logging set level request and set log level on session', function () {
    $level = LoggingLevel::Debug;
    $this->session->shouldReceive('getId')->andReturn(DISPATCHER_SESSION_ID);
    $this->session->shouldReceive('set')->with('log_level', 'debug')->once();

    $request = SetLogLevelRequest::make(1, $level);
    $result = $this->dispatcher->handleLoggingSetLevel($request, $this->session);

    expect($result)->toBeInstanceOf(EmptyResult::class);
});

it('can handle completion complete request for prompt and delegate to provider', function () {
    $promptName = 'my-completable-prompt';
    $argName = 'tagName';
    $currentValue = 'php';
    $completions = ['php-mcp', 'php-fig'];
    $mockCompletionProvider = Mockery::mock(CompletionProviderInterface::class);
    $providerClass = get_class($mockCompletionProvider);

    $promptSchema = PromptSchema::make($promptName, '', [PromptArgument::make($argName)]);
    $registeredPrompt = new RegisteredPrompt(
        schema: $promptSchema,
        handler: ['MyPromptHandler', 'get'],
        isManual: false,
        completionProviders: [$argName => $providerClass]
    );

    $this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPrompt);
    $this->container->shouldReceive('get')->with($providerClass)->andReturn($mockCompletionProvider);
    $mockCompletionProvider->shouldReceive('getCompletions')->with($currentValue, $this->session)->andReturn($completions);

    $request = CompletionCompleteRequest::make(1, PromptReference::make($promptName), ['name' => $argName, 'value' => $currentValue]);
    $result = $this->dispatcher->handleCompletionComplete($request, $this->session);

    expect($result)->toBeInstanceOf(CompletionCompleteResult::class);
    expect($result->values)->toEqual($completions);
    expect($result->total)->toBe(count($completions));
    expect($result->hasMore)->toBeFalse();
});

it('can handle completion complete request for resource template and delegate to provider', function () {
    $templateUri = 'item://{itemId}/category/{catName}';
    $uriVarName = 'catName';
    $currentValue = 'boo';
    $completions = ['books', 'boomerangs'];
    $mockCompletionProvider = Mockery::mock(CompletionProviderInterface::class);
    $providerClass = get_class($mockCompletionProvider);

    $templateSchema = ResourceTemplateSchema::make($templateUri, 'item-template');
    $registeredTemplate = new RegisteredResourceTemplate(
        schema: $templateSchema,
        handler: ['MyResourceTemplateHandler', 'get'],
        isManual: false,
        completionProviders: [$uriVarName => $providerClass]
    );

    $this->registry->shouldReceive('getResourceTemplate')->with($templateUri)->andReturn($registeredTemplate);
    $this->container->shouldReceive('get')->with($providerClass)->andReturn($mockCompletionProvider);
    $mockCompletionProvider->shouldReceive('getCompletions')->with($currentValue, $this->session)->andReturn($completions);

    $request = CompletionCompleteRequest::make(1, ResourceReference::make($templateUri), ['name' => $uriVarName, 'value' => $currentValue]);
    $result = $this->dispatcher->handleCompletionComplete($request, $this->session);

    expect($result->values)->toEqual($completions);
});

it('can handle completion complete request and return empty if no provider', function () {
    $promptName = 'no-provider-prompt';
    $promptSchema = PromptSchema::make($promptName, '', [PromptArgument::make('arg')]);
    $registeredPrompt = new RegisteredPrompt(
        schema: $promptSchema,
        handler: ['MyPromptHandler', 'get'],
        isManual: false,
        completionProviders: []
    );
    $this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPrompt);

    $request = CompletionCompleteRequest::make(1, PromptReference::make($promptName), ['name' => 'arg', 'value' => '']);
    $result = $this->dispatcher->handleCompletionComplete($request, $this->session);
    expect($result->values)->toBeEmpty();
});

it('can handle completion complete request with ListCompletionProvider instance', function () {
    $promptName = 'list-completion-prompt';
    $argName = 'category';
    $currentValue = 'bl';
    $expectedCompletions = ['blog'];

    $listProvider = new \PhpMcp\Server\Defaults\ListCompletionProvider(['blog', 'news', 'docs', 'api']);

    $promptSchema = PromptSchema::make($promptName, '', [PromptArgument::make($argName)]);
    $registeredPrompt = new RegisteredPrompt(
        schema: $promptSchema,
        handler: ['MyPromptHandler', 'get'],
        isManual: false,
        completionProviders: [$argName => $listProvider]
    );

    $this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPrompt);

    $request = CompletionCompleteRequest::make(1, PromptReference::make($promptName), ['name' => $argName, 'value' => $currentValue]);
    $result = $this->dispatcher->handleCompletionComplete($request, $this->session);

    expect($result->values)->toEqual($expectedCompletions);
    expect($result->total)->toBe(1);
    expect($result->hasMore)->toBeFalse();
});

it('can handle completion complete request with EnumCompletionProvider instance', function () {
    $promptName = 'enum-completion-prompt';
    $argName = 'status';
    $currentValue = 'a';
    $expectedCompletions = ['archived'];

    $enumProvider = new \PhpMcp\Server\Defaults\EnumCompletionProvider(StatusEnum::class);

    $promptSchema = PromptSchema::make($promptName, '', [PromptArgument::make($argName)]);
    $registeredPrompt = new RegisteredPrompt(
        schema: $promptSchema,
        handler: ['MyPromptHandler', 'get'],
        isManual: false,
        completionProviders: [$argName => $enumProvider]
    );

    $this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPrompt);

    $request = CompletionCompleteRequest::make(1, PromptReference::make($promptName), ['name' => $argName, 'value' => $currentValue]);
    $result = $this->dispatcher->handleCompletionComplete($request, $this->session);

    expect($result->values)->toEqual($expectedCompletions);
    expect($result->total)->toBe(1);
    expect($result->hasMore)->toBeFalse();
});


it('decodeCursor handles null and invalid cursors', function () {
    $method = new \ReflectionMethod(Dispatcher::class, 'decodeCursor');
    $method->setAccessible(true);

    expect($method->invoke($this->dispatcher, null))->toBe(0);
    expect($method->invoke($this->dispatcher, 'not_base64_$$$'))->toBe(0);
    expect($method->invoke($this->dispatcher, base64_encode('invalid_format')))->toBe(0);
    expect($method->invoke($this->dispatcher, base64_encode('offset=123')))->toBe(123);
});

it('encodeNextCursor generates correct cursor or null', function () {
    $method = new \ReflectionMethod(Dispatcher::class, 'encodeNextCursor');
    $method->setAccessible(true);
    $limit = DISPATCHER_PAGINATION_LIMIT;

    expect($method->invoke($this->dispatcher, 0, $limit, 10, $limit))->toBe(base64_encode('offset=3'));
    expect($method->invoke($this->dispatcher, 0, $limit, $limit, $limit))->toBeNull();
    expect($method->invoke($this->dispatcher, $limit, 2, $limit + 2 + 1, $limit))->toBe(base64_encode('offset=' . ($limit + 2)));
    expect($method->invoke($this->dispatcher, $limit, 1, $limit + 1, $limit))->toBeNull();
    expect($method->invoke($this->dispatcher, 0, 0, 10, $limit))->toBeNull();
});

```

--------------------------------------------------------------------------------
/tests/Integration/StreamableHttpServerTransportTest.php:
--------------------------------------------------------------------------------

```php
<?php

use PhpMcp\Server\Protocol;
use PhpMcp\Server\Tests\Mocks\Clients\MockJsonHttpClient;
use PhpMcp\Server\Tests\Mocks\Clients\MockStreamHttpClient;
use React\ChildProcess\Process;
use React\Http\Browser;
use React\Http\Message\ResponseException;
use React\Stream\ReadableStreamInterface;

use function React\Async\await;
use function React\Promise\resolve;

const STREAMABLE_HTTP_SCRIPT_PATH = __DIR__ . '/../Fixtures/ServerScripts/StreamableHttpTestServer.php';
const STREAMABLE_HTTP_PROCESS_TIMEOUT = 9;
const STREAMABLE_HTTP_HOST = '127.0.0.1';
const STREAMABLE_MCP_PATH = 'mcp_streamable_json_mode';

beforeEach(function () {
    if (!is_file(STREAMABLE_HTTP_SCRIPT_PATH)) {
        $this->markTestSkipped("Server script not found: " . STREAMABLE_HTTP_SCRIPT_PATH);
    }
    if (!is_executable(STREAMABLE_HTTP_SCRIPT_PATH)) {
        chmod(STREAMABLE_HTTP_SCRIPT_PATH, 0755);
    }

    $phpPath = PHP_BINARY ?: 'php';
    $commandPhpPath = str_contains($phpPath, ' ') ? '"' . $phpPath . '"' : $phpPath;
    $commandScriptPath = escapeshellarg(STREAMABLE_HTTP_SCRIPT_PATH);
    $this->port = findFreePort();

    $jsonModeCommandArgs = [
        escapeshellarg(STREAMABLE_HTTP_HOST),
        escapeshellarg((string)$this->port),
        escapeshellarg(STREAMABLE_MCP_PATH),
        escapeshellarg('true'), // enableJsonResponse = true
    ];
    $this->jsonModeCommand = $commandPhpPath . ' ' . $commandScriptPath . ' ' . implode(' ', $jsonModeCommandArgs);

    $streamModeCommandArgs = [
        escapeshellarg(STREAMABLE_HTTP_HOST),
        escapeshellarg((string)$this->port),
        escapeshellarg(STREAMABLE_MCP_PATH),
        escapeshellarg('false'), // enableJsonResponse = false
    ];
    $this->streamModeCommand = $commandPhpPath . ' ' . $commandScriptPath . ' ' . implode(' ', $streamModeCommandArgs);

    $statelessModeCommandArgs = [
        escapeshellarg(STREAMABLE_HTTP_HOST),
        escapeshellarg((string)$this->port),
        escapeshellarg(STREAMABLE_MCP_PATH),
        escapeshellarg('true'), // enableJsonResponse = true
        escapeshellarg('false'), // useEventStore = false
        escapeshellarg('true'), // stateless = true
    ];
    $this->statelessModeCommand = $commandPhpPath . ' ' . $commandScriptPath . ' ' . implode(' ', $statelessModeCommandArgs);

    $this->process = null;
});

afterEach(function () {
    if ($this->process instanceof Process && $this->process->isRunning()) {
        if ($this->process->stdout instanceof ReadableStreamInterface) {
            $this->process->stdout->close();
        }
        if ($this->process->stderr instanceof ReadableStreamInterface) {
            $this->process->stderr->close();
        }

        $this->process->terminate(SIGTERM);
        try {
            await(delay(0.02));
        } catch (\Throwable $e) {
        }
        if ($this->process->isRunning()) {
            $this->process->terminate(SIGKILL);
        }
    }
    $this->process = null;
});

describe('JSON MODE', function () {
    beforeEach(function () {
        $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
        $this->process->start();

        $this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);

        await(delay(0.2));
    });

    it('server starts, initializes via POST JSON, calls a tool, and closes', function () {
        // 1. Initialize
        $initResult = await($this->jsonClient->sendRequest('initialize', [
            'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
            'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'],
            'capabilities' => []
        ], 'init-json-1'));

        expect($initResult['statusCode'])->toBe(200);
        expect($initResult['body']['id'])->toBe('init-json-1');
        expect($initResult['body'])->not->toHaveKey('error');
        expect($initResult['body']['result']['protocolVersion'])->toBe(Protocol::LATEST_PROTOCOL_VERSION);
        expect($initResult['body']['result']['serverInfo']['name'])->toBe('StreamableHttpIntegrationServer');
        expect($this->jsonClient->sessionId)->toBeString()->not->toBeEmpty();

        // 2. Initialized notification
        $notifResult = await($this->jsonClient->sendNotification('notifications/initialized'));
        expect($notifResult['statusCode'])->toBe(202);

        // 3. Call a tool
        $toolResult = await($this->jsonClient->sendRequest('tools/call', [
            'name' => 'greet_streamable_tool',
            'arguments' => ['name' => 'JSON Mode User']
        ], 'tool-json-1'));

        expect($toolResult['statusCode'])->toBe(200);
        expect($toolResult['body']['id'])->toBe('tool-json-1');
        expect($toolResult['body'])->not->toHaveKey('error');
        expect($toolResult['body']['result']['content'][0]['text'])->toBe('Hello, JSON Mode User!');

        // Server process is terminated in afterEach
    })->group('integration', 'streamable_http_json');


    it('return HTTP 400 error response for invalid JSON in POST request', function () {
        $malformedJson = '{"jsonrpc":"2.0", "id": "bad-json-post-1", "method": "tools/list", "params": {"broken"}';

        $promise = $this->jsonClient->browser->post(
            $this->jsonClient->baseUrl,
            ['Content-Type' => 'application/json', 'Accept' => 'application/json'],
            $malformedJson
        );

        try {
            await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));
        } catch (ResponseException $e) {
            expect($e->getResponse()->getStatusCode())->toBe(400);
            $bodyContent = (string) $e->getResponse()->getBody();
            $decodedBody = json_decode($bodyContent, true);

            expect($decodedBody['jsonrpc'])->toBe('2.0');
            expect($decodedBody['id'])->toBe('');
            expect($decodedBody['error']['code'])->toBe(-32700);
            expect($decodedBody['error']['message'])->toContain('Invalid JSON');
        }
    })->group('integration', 'streamable_http_json');

    it('returns JSON-RPC error result for request for non-existent method', function () {
        // 1. Initialize
        await($this->jsonClient->sendRequest('initialize', ['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => []], 'init-json-err'));
        await($this->jsonClient->sendNotification('notifications/initialized'));

        // 2. Request non-existent method
        $errorResult = await($this->jsonClient->sendRequest('non/existentToolViaJson', [], 'err-meth-json-1'));

        expect($errorResult['statusCode'])->toBe(200);
        expect($errorResult['body']['id'])->toBe('err-meth-json-1');
        expect($errorResult['body']['error']['code'])->toBe(-32601);
        expect($errorResult['body']['error']['message'])->toContain("Method 'non/existentToolViaJson' not found");
    })->group('integration', 'streamable_http_json');

    it('can handle batch requests correctly', function () {
        // 1. Initialize
        await($this->jsonClient->sendRequest('initialize', [
            'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
            'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'],
            'capabilities' => []
        ], 'init-json-batch'));
        expect($this->jsonClient->sessionId)->toBeString()->not->toBeEmpty();
        await($this->jsonClient->sendNotification('notifications/initialized'));

        // 2. Send Batch Request
        $batchRequests = [
            ['jsonrpc' => '2.0', 'id' => 'batch-req-1', 'method' => 'tools/call', 'params' => ['name' => 'greet_streamable_tool', 'arguments' => ['name' => 'Batch Item 1']]],
            ['jsonrpc' => '2.0', 'method' => 'notifications/something'],
            ['jsonrpc' => '2.0', 'id' => 'batch-req-2', 'method' => 'tools/call', 'params' => ['name' => 'sum_streamable_tool', 'arguments' => ['a' => 10, 'b' => 20]]],
            ['jsonrpc' => '2.0', 'id' => 'batch-req-3', 'method' => 'nonexistent/method']
        ];

        $batchResponse = await($this->jsonClient->sendBatchRequest($batchRequests));



        $findResponseById = function (array $batch, $id) {
            foreach ($batch as $item) {
                if (isset($item['id']) && $item['id'] === $id) {
                    return $item;
                }
            }
            return null;
        };

        expect($batchResponse['statusCode'])->toBe(200);
        expect($batchResponse['body'])->toBeArray()->toHaveCount(3);

        $response1 = $findResponseById($batchResponse['body'], 'batch-req-1');
        $response2 = $findResponseById($batchResponse['body'], 'batch-req-2');
        $response3 = $findResponseById($batchResponse['body'], 'batch-req-3');

        expect($response1['result']['content'][0]['text'])->toBe('Hello, Batch Item 1!');
        expect($response2['result']['content'][0]['text'])->toBe('30');
        expect($response3['error']['code'])->toBe(-32601);
        expect($response3['error']['message'])->toContain("Method 'nonexistent/method' not found");
    })->group('integration', 'streamable_http_json');

    it('can handle tool list request', function () {
        await($this->jsonClient->sendRequest('initialize', [
            'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
            'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'],
            'capabilities' => []
        ], 'init-json-tools'));
        await($this->jsonClient->sendNotification('notifications/initialized'));

        $toolListResult = await($this->jsonClient->sendRequest('tools/list', [], 'tool-list-json-1'));

        expect($toolListResult['statusCode'])->toBe(200);
        expect($toolListResult['body']['id'])->toBe('tool-list-json-1');
        expect($toolListResult['body']['result']['tools'])->toBeArray();
        expect(count($toolListResult['body']['result']['tools']))->toBe(4);
        expect($toolListResult['body']['result']['tools'][0]['name'])->toBe('greet_streamable_tool');
        expect($toolListResult['body']['result']['tools'][1]['name'])->toBe('sum_streamable_tool');
        expect($toolListResult['body']['result']['tools'][2]['name'])->toBe('tool_reads_context');
    })->group('integration', 'streamable_http_json');

    it('passes request in Context', function () {
        await($this->jsonClient->sendRequest('initialize', [
            'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
            'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'],
            'capabilities' => []
        ], 'init-json-context'));
        await($this->jsonClient->sendNotification('notifications/initialized'));

        $toolResult = await($this->jsonClient->sendRequest('tools/call', [
            'name' => 'tool_reads_context',
            'arguments' => []
        ], 'tool-json-context-1', ['X-Test-Header' => 'TestValue']));

        expect($toolResult['statusCode'])->toBe(200);
        expect($toolResult['body']['id'])->toBe('tool-json-context-1');
        expect($toolResult['body'])->not->toHaveKey('error');
        expect($toolResult['body']['result']['content'][0]['text'])->toBe('TestValue');
    })->group('integration', 'streamable_http_json');

    it('can read a registered resource', function () {
        await($this->jsonClient->sendRequest('initialize', [
            'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
            'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'],
            'capabilities' => []
        ], 'init-json-res'));
        await($this->jsonClient->sendNotification('notifications/initialized'));

        $resourceResult = await($this->jsonClient->sendRequest('resources/read', ['uri' => 'test://streamable/static'], 'res-read-json-1'));

        expect($resourceResult['statusCode'])->toBe(200);
        expect($resourceResult['body']['id'])->toBe('res-read-json-1');
        $contents = $resourceResult['body']['result']['contents'];
        expect($contents[0]['uri'])->toBe('test://streamable/static');
        expect($contents[0]['text'])->toBe(\PhpMcp\Server\Tests\Fixtures\General\ResourceHandlerFixture::$staticTextContent);
    })->group('integration', 'streamable_http_json');

    it('can get a registered prompt', function () {
        await($this->jsonClient->sendRequest('initialize', ['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => []], 'init-json-prompt'));
        await($this->jsonClient->sendNotification('notifications/initialized'));

        $promptResult = await($this->jsonClient->sendRequest('prompts/get', [
            'name' => 'simple_streamable_prompt',
            'arguments' => ['name' => 'JsonPromptUser', 'style' => 'terse']
        ], 'prompt-get-json-1'));

        expect($promptResult['statusCode'])->toBe(200);
        expect($promptResult['body']['id'])->toBe('prompt-get-json-1');
        $messages = $promptResult['body']['result']['messages'];
        expect($messages[0]['content']['text'])->toBe('Craft a terse greeting for JsonPromptUser.');
    })->group('integration', 'streamable_http_json');

    it('rejects subsequent requests if client does not send initialized notification', function () {
        // 1. Initialize ONLY
        await($this->jsonClient->sendRequest('initialize', ['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => []], 'init-json-noack'));
        // Client "forgets" to send notifications/initialized back

        // 2. Attempt to Call a tool
        $toolResult = await($this->jsonClient->sendRequest('tools/call', [
            'name' => 'greet_streamable_tool',
            'arguments' => ['name' => 'NoAckJsonUser']
        ], 'tool-json-noack'));

        expect($toolResult['statusCode'])->toBe(200); // HTTP is fine
        expect($toolResult['body']['id'])->toBe('tool-json-noack');
        expect($toolResult['body']['error']['code'])->toBe(-32600); // Invalid Request
        expect($toolResult['body']['error']['message'])->toContain('Client session not initialized');
    })->group('integration', 'streamable_http_json');

    it('returns HTTP 400 error for non-initialize requests without Mcp-Session-Id', function () {
        await($this->jsonClient->sendRequest('initialize', ['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => []], 'init-sess-test'));
        $this->jsonClient->sessionId = null;

        try {
            await($this->jsonClient->sendRequest('tools/list', [], 'tools-list-no-session'));
        } catch (ResponseException $e) {
            expect($e->getResponse()->getStatusCode())->toBe(400);
            $bodyContent = (string) $e->getResponse()->getBody();
            $decodedBody = json_decode($bodyContent, true);

            expect($decodedBody['jsonrpc'])->toBe('2.0');
            expect($decodedBody['id'])->toBe('tools-list-no-session');
            expect($decodedBody['error']['code'])->toBe(-32600);
            expect($decodedBody['error']['message'])->toContain('Mcp-Session-Id header required');
        }
    })->group('integration', 'streamable_http_json');
});

describe('STREAM MODE', function () {
    beforeEach(function () {
        $this->process = new Process($this->streamModeCommand, getcwd() ?: null, null, []);
        $this->process->start();
        $this->streamClient = new MockStreamHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);
        await(delay(0.2));
    });
    afterEach(function () {
        if ($this->streamClient ?? null) {
            $this->streamClient->closeMainSseStream();
        }
    });

    it('server starts, initializes via POST JSON, calls a tool, and closes', function () {
        // 1. Initialize Request
        $initResponse = await($this->streamClient->sendInitializeRequest([
            'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
            'clientInfo' => ['name' => 'StreamModeClient', 'version' => '1.0'],
            'capabilities' => []
        ], 'init-stream-1'));

        expect($this->streamClient->sessionId)->toBeString()->not->toBeEmpty();
        expect($initResponse['id'])->toBe('init-stream-1');
        expect($initResponse)->not->toHaveKey('error');
        expect($initResponse['result']['protocolVersion'])->toBe(Protocol::LATEST_PROTOCOL_VERSION);
        expect($initResponse['result']['serverInfo']['name'])->toBe('StreamableHttpIntegrationServer');

        // 2. Send Initialized Notification
        $notifResult = await($this->streamClient->sendHttpNotification('notifications/initialized'));
        expect($notifResult['statusCode'])->toBe(202);

        // 3. Call a tool
        $toolResponse = await($this->streamClient->sendRequest('tools/call', [
            'name' => 'greet_streamable_tool',
            'arguments' => ['name' => 'Stream Mode User']
        ], 'tool-stream-1'));

        expect($toolResponse['id'])->toBe('tool-stream-1');
        expect($toolResponse)->not->toHaveKey('error');
        expect($toolResponse['result']['content'][0]['text'])->toBe('Hello, Stream Mode User!');
    })->group('integration', 'streamable_http_stream');

    it('return HTTP 400 error response for invalid JSON in POST request', function () {
        $malformedJson = '{"jsonrpc":"2.0", "id": "bad-json-stream-1", "method": "tools/list", "params": {"broken"}';

        $postPromise = $this->streamClient->browser->post(
            $this->streamClient->baseMcpUrl,
            ['Content-Type' => 'application/json', 'Accept' => 'text/event-stream'],
            $malformedJson
        );

        try {
            await(timeout($postPromise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));
        } catch (ResponseException $e) {
            $httpResponse = $e->getResponse();
            $bodyContent = (string) $httpResponse->getBody();
            $decodedBody = json_decode($bodyContent, true);

            expect($httpResponse->getStatusCode())->toBe(400);
            expect($decodedBody['jsonrpc'])->toBe('2.0');
            expect($decodedBody['id'])->toBe('');
            expect($decodedBody['error']['code'])->toBe(-32700);
            expect($decodedBody['error']['message'])->toContain('Invalid JSON');
        }
    })->group('integration', 'streamable_http_stream');

    it('returns JSON-RPC error result for request for non-existent method', function () {
        // 1. Initialize
        await($this->streamClient->sendInitializeRequest([
            'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
            'clientInfo' => ['name' => 'StreamModeClient', 'version' => '1.0'],
            'capabilities' => []
        ], 'init-stream-err'));
        await($this->streamClient->sendHttpNotification('notifications/initialized'));

        // 2. Send Request
        $errorResponse = await($this->streamClient->sendRequest('non/existentToolViaStream', [], 'err-meth-stream-1'));

        expect($errorResponse['id'])->toBe('err-meth-stream-1');
        expect($errorResponse['error']['code'])->toBe(-32601);
        expect($errorResponse['error']['message'])->toContain("Method 'non/existentToolViaStream' not found");
    })->group('integration', 'streamable_http_stream');

    it('can handle batch requests correctly', function () {
        await($this->streamClient->sendInitializeRequest([
            'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
            'clientInfo' => ['name' => 'StreamModeBatchClient', 'version' => '1.0'],
            'capabilities' => []
        ], 'init-stream-batch'));
        expect($this->streamClient->sessionId)->toBeString()->not->toBeEmpty();
        await($this->streamClient->sendHttpNotification('notifications/initialized'));

        $batchRequests = [
            ['jsonrpc' => '2.0', 'id' => 'batch-req-1', 'method' => 'tools/call', 'params' => ['name' => 'greet_streamable_tool', 'arguments' => ['name' => 'Batch Item 1']]],
            ['jsonrpc' => '2.0', 'method' => 'notifications/something'],
            ['jsonrpc' => '2.0', 'id' => 'batch-req-2', 'method' => 'tools/call', 'params' => ['name' => 'sum_streamable_tool', 'arguments' => ['a' => 10, 'b' => 20]]],
            ['jsonrpc' => '2.0', 'id' => 'batch-req-3', 'method' => 'nonexistent/method']
        ];

        $batchResponseArray = await($this->streamClient->sendBatchRequest($batchRequests));

        expect($batchResponseArray)->toBeArray()->toHaveCount(3);

        $findResponseById = function (array $batch, $id) {
            foreach ($batch as $item) {
                if (isset($item['id']) && $item['id'] === $id) {
                    return $item;
                }
            }
            return null;
        };

        $response1 = $findResponseById($batchResponseArray, 'batch-req-1');
        $response2 = $findResponseById($batchResponseArray, 'batch-req-2');
        $response3 = $findResponseById($batchResponseArray, 'batch-req-3');

        expect($response1['result']['content'][0]['text'])->toBe('Hello, Batch Item 1!');
        expect($response2['result']['content'][0]['text'])->toBe('30');
        expect($response3['error']['code'])->toBe(-32601);
        expect($response3['error']['message'])->toContain("Method 'nonexistent/method' not found");
    })->group('integration', 'streamable_http_stream');

    it('passes request in Context', function () {
        await($this->streamClient->sendInitializeRequest([
            'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
            'clientInfo' => ['name' => 'StreamModeClient', 'version' => '1.0'],
            'capabilities' => []
        ], 'init-stream-context'));
        expect($this->streamClient->sessionId)->toBeString()->not->toBeEmpty();
        await($this->streamClient->sendHttpNotification('notifications/initialized'));

        $toolResult = await($this->streamClient->sendRequest('tools/call', [
            'name' => 'tool_reads_context',
            'arguments' => []
        ], 'tool-stream-context-1', ['X-Test-Header' => 'TestValue']));

        expect($toolResult['id'])->toBe('tool-stream-context-1');
        expect($toolResult)->not->toHaveKey('error');
        expect($toolResult['result']['content'][0]['text'])->toBe('TestValue');
    })->group('integration', 'streamable_http_stream');

    it('can handle tool list request', function () {
        await($this->streamClient->sendInitializeRequest(['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => []], 'init-stream-tools'));
        await($this->streamClient->sendHttpNotification('notifications/initialized'));

        $toolListResponse = await($this->streamClient->sendRequest('tools/list', [], 'tool-list-stream-1'));

        expect($toolListResponse['id'])->toBe('tool-list-stream-1');
        expect($toolListResponse)->not->toHaveKey('error');
        expect($toolListResponse['result']['tools'])->toBeArray();
        expect(count($toolListResponse['result']['tools']))->toBe(4);
        expect($toolListResponse['result']['tools'][0]['name'])->toBe('greet_streamable_tool');
        expect($toolListResponse['result']['tools'][1]['name'])->toBe('sum_streamable_tool');
        expect($toolListResponse['result']['tools'][2]['name'])->toBe('tool_reads_context');
    })->group('integration', 'streamable_http_stream');

    it('can read a registered resource', function () {
        await($this->streamClient->sendInitializeRequest(['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => []], 'init-stream-res'));
        await($this->streamClient->sendHttpNotification('notifications/initialized'));

        $resourceResponse = await($this->streamClient->sendRequest('resources/read', ['uri' => 'test://streamable/static'], 'res-read-stream-1'));

        expect($resourceResponse['id'])->toBe('res-read-stream-1');
        $contents = $resourceResponse['result']['contents'];
        expect($contents[0]['uri'])->toBe('test://streamable/static');
        expect($contents[0]['text'])->toBe(\PhpMcp\Server\Tests\Fixtures\General\ResourceHandlerFixture::$staticTextContent);
    })->group('integration', 'streamable_http_stream');

    it('can get a registered prompt', function () {
        await($this->streamClient->sendInitializeRequest(['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => []], 'init-stream-prompt'));
        await($this->streamClient->sendHttpNotification('notifications/initialized'));

        $promptResponse = await($this->streamClient->sendRequest('prompts/get', [
            'name' => 'simple_streamable_prompt',
            'arguments' => ['name' => 'StreamPromptUser', 'style' => 'formal']
        ], 'prompt-get-stream-1'));

        expect($promptResponse['id'])->toBe('prompt-get-stream-1');
        $messages = $promptResponse['result']['messages'];
        expect($messages[0]['content']['text'])->toBe('Craft a formal greeting for StreamPromptUser.');
    })->group('integration', 'streamable_http_stream');

    it('rejects subsequent requests if client does not send initialized notification', function () {
        await($this->streamClient->sendInitializeRequest([
            'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
            'clientInfo' => ['name' => 'StreamModeClient', 'version' => '1.0'],
            'capabilities' => []
        ], 'init-stream-noack'));

        $toolResponse = await($this->streamClient->sendRequest('tools/call', [
            'name' => 'greet_streamable_tool',
            'arguments' => ['name' => 'NoAckStreamUser']
        ], 'tool-stream-noack'));

        expect($toolResponse['id'])->toBe('tool-stream-noack');
        expect($toolResponse['error']['code'])->toBe(-32600);
        expect($toolResponse['error']['message'])->toContain('Client session not initialized');
    })->group('integration', 'streamable_http_stream');

    it('returns HTTP 400 error for non-initialize requests without Mcp-Session-Id', function () {
        await($this->streamClient->sendInitializeRequest([
            'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
            'clientInfo' => ['name' => 'StreamModeClient', 'version' => '1.0'],
            'capabilities' => []
        ], 'init-stream-sess-test'));
        $validSessionId = $this->streamClient->sessionId;
        $this->streamClient->sessionId = null;

        try {
            await($this->streamClient->sendRequest('tools/list', [], 'tools-list-no-session-stream'));
            $this->fail("Expected request to tools/list to fail with 400, but it succeeded.");
        } catch (ResponseException $e) {
            expect($e->getResponse()->getStatusCode())->toBe(400);
            // Body can't be a json since the header accepts only text/event-stream
        }

        $this->streamClient->sessionId = $validSessionId;
    })->group('integration', 'streamable_http_stream');
});

/**
 * STATELESS MODE TESTS
 * 
 * Tests for the stateless mode of StreamableHttpServerTransport, which:
 * - Generates session IDs internally but doesn't expose them to clients
 * - Doesn't require session IDs in requests after initialization
 * - Doesn't include session IDs in response headers
 * - Disables GET requests (SSE streaming) 
 * - Makes DELETE requests meaningless (but returns 204)
 * - Treats each request as independent (no persistent session state)
 * 
 * This mode is designed to work with clients like OpenAI's MCP implementation
 * that have issues with session management in "never require approval" mode.
 */
describe('STATELESS MODE', function () {
    beforeEach(function () {
        $this->process = new Process($this->statelessModeCommand, getcwd() ?: null, null, []);
        $this->process->start();
        $this->statelessClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);
        await(delay(0.2));
    });

    it('allows tool calls without having to send initialized notification', function () {
        // 1. Initialize Request
        $initResult = await($this->statelessClient->sendRequest('initialize', [
            'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
            'clientInfo' => ['name' => 'StatelessModeClient', 'version' => '1.0'],
            'capabilities' => []
        ], 'init-stateless-1'));

        expect($initResult['statusCode'])->toBe(200);
        expect($initResult['body']['id'])->toBe('init-stateless-1');
        expect($initResult['body'])->not->toHaveKey('error');
        expect($initResult['body']['result']['protocolVersion'])->toBe(Protocol::LATEST_PROTOCOL_VERSION);
        expect($initResult['body']['result']['serverInfo']['name'])->toBe('StreamableHttpIntegrationServer');
        expect($this->statelessClient->sessionId)->toBeString()->toBeEmpty();

        // 2. Call a tool
        $toolResult = await($this->statelessClient->sendRequest('tools/call', [
            'name' => 'greet_streamable_tool',
            'arguments' => ['name' => 'Stateless Mode User']
        ], 'tool-stateless-1'));

        expect($toolResult['statusCode'])->toBe(200);
        expect($toolResult['body']['id'])->toBe('tool-stateless-1');
        expect($toolResult['body'])->not->toHaveKey('error');
        expect($toolResult['body']['result']['content'][0]['text'])->toBe('Hello, Stateless Mode User!');
    })->group('integration', 'streamable_http_stateless');

    it('return HTTP 400 error response for invalid JSON in POST request', function () {
        $malformedJson = '{"jsonrpc":"2.0", "id": "bad-json-stateless-1", "method": "tools/list", "params": {"broken"}';

        $postPromise = $this->statelessClient->browser->post(
            $this->statelessClient->baseUrl,
            ['Content-Type' => 'application/json', 'Accept' => 'application/json'],
            $malformedJson
        );

        try {
            await(timeout($postPromise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));
        } catch (ResponseException $e) {
            $httpResponse = $e->getResponse();
            $bodyContent = (string) $httpResponse->getBody();
            $decodedBody = json_decode($bodyContent, true);

            expect($httpResponse->getStatusCode())->toBe(400);
            expect($decodedBody['jsonrpc'])->toBe('2.0');
            expect($decodedBody['id'])->toBe('');
            expect($decodedBody['error']['code'])->toBe(-32700);
            expect($decodedBody['error']['message'])->toContain('Invalid JSON');
        }
    })->group('integration', 'streamable_http_stateless');

    it('returns JSON-RPC error result for request for non-existent method', function () {
        $errorResult = await($this->statelessClient->sendRequest('non/existentToolViaStateless', [], 'err-meth-stateless-1'));

        expect($errorResult['statusCode'])->toBe(200);
        expect($errorResult['body']['id'])->toBe('err-meth-stateless-1');
        expect($errorResult['body']['error']['code'])->toBe(-32601);
        expect($errorResult['body']['error']['message'])->toContain("Method 'non/existentToolViaStateless' not found");
    })->group('integration', 'streamable_http_stateless');

    it('can handle batch requests correctly', function () {
        $batchRequests = [
            ['jsonrpc' => '2.0', 'id' => 'batch-req-1', 'method' => 'tools/call', 'params' => ['name' => 'greet_streamable_tool', 'arguments' => ['name' => 'Batch Item 1']]],
            ['jsonrpc' => '2.0', 'method' => 'notifications/something'],
            ['jsonrpc' => '2.0', 'id' => 'batch-req-2', 'method' => 'tools/call', 'params' => ['name' => 'sum_streamable_tool', 'arguments' => ['a' => 10, 'b' => 20]]],
            ['jsonrpc' => '2.0', 'id' => 'batch-req-3', 'method' => 'nonexistent/method']
        ];

        $batchResponse = await($this->statelessClient->sendBatchRequest($batchRequests));

        $findResponseById = function (array $batch, $id) {
            foreach ($batch as $item) {
                if (isset($item['id']) && $item['id'] === $id) {
                    return $item;
                }
            }
            return null;
        };

        expect($batchResponse['statusCode'])->toBe(200);
        expect($batchResponse['body'])->toBeArray()->toHaveCount(3);

        $response1 = $findResponseById($batchResponse['body'], 'batch-req-1');
        $response2 = $findResponseById($batchResponse['body'], 'batch-req-2');
        $response3 = $findResponseById($batchResponse['body'], 'batch-req-3');

        expect($response1['result']['content'][0]['text'])->toBe('Hello, Batch Item 1!');
        expect($response2['result']['content'][0]['text'])->toBe('30');
        expect($response3['error']['code'])->toBe(-32601);
        expect($response3['error']['message'])->toContain("Method 'nonexistent/method' not found");
    })->group('integration', 'streamable_http_stateless');

    it('passes request in Context', function () {
        $toolResult = await($this->statelessClient->sendRequest('tools/call', [
            'name' => 'tool_reads_context',
            'arguments' => []
        ], 'tool-stateless-context-1', ['X-Test-Header' => 'TestValue']));

        expect($toolResult['statusCode'])->toBe(200);
        expect($toolResult['body']['id'])->toBe('tool-stateless-context-1');
        expect($toolResult['body'])->not->toHaveKey('error');
        expect($toolResult['body']['result']['content'][0]['text'])->toBe('TestValue');
    })->group('integration', 'streamable_http_stateless');

    it('can handle tool list request', function () {
        $toolListResult = await($this->statelessClient->sendRequest('tools/list', [], 'tool-list-stateless-1'));

        expect($toolListResult['statusCode'])->toBe(200);
        expect($toolListResult['body']['id'])->toBe('tool-list-stateless-1');
        expect($toolListResult['body'])->not->toHaveKey('error');
        expect($toolListResult['body']['result']['tools'])->toBeArray();
        expect(count($toolListResult['body']['result']['tools']))->toBe(4);
        expect($toolListResult['body']['result']['tools'][0]['name'])->toBe('greet_streamable_tool');
        expect($toolListResult['body']['result']['tools'][1]['name'])->toBe('sum_streamable_tool');
        expect($toolListResult['body']['result']['tools'][2]['name'])->toBe('tool_reads_context');
    })->group('integration', 'streamable_http_stateless');

    it('can read a registered resource', function () {
        $resourceResult = await($this->statelessClient->sendRequest('resources/read', ['uri' => 'test://streamable/static'], 'res-read-stateless-1'));

        expect($resourceResult['statusCode'])->toBe(200);
        expect($resourceResult['body']['id'])->toBe('res-read-stateless-1');
        $contents = $resourceResult['body']['result']['contents'];
        expect($contents[0]['uri'])->toBe('test://streamable/static');
        expect($contents[0]['text'])->toBe(\PhpMcp\Server\Tests\Fixtures\General\ResourceHandlerFixture::$staticTextContent);
    })->group('integration', 'streamable_http_stateless');

    it('can get a registered prompt', function () {
        $promptResult = await($this->statelessClient->sendRequest('prompts/get', [
            'name' => 'simple_streamable_prompt',
            'arguments' => ['name' => 'StatelessPromptUser', 'style' => 'formal']
        ], 'prompt-get-stateless-1'));

        expect($promptResult['statusCode'])->toBe(200);
        expect($promptResult['body']['id'])->toBe('prompt-get-stateless-1');
        $messages = $promptResult['body']['result']['messages'];
        expect($messages[0]['content']['text'])->toBe('Craft a formal greeting for StatelessPromptUser.');
    })->group('integration', 'streamable_http_stateless');

    it('does not return session ID in response headers in stateless mode', function () {
        $promise = $this->statelessClient->browser->post(
            $this->statelessClient->baseUrl,
            ['Content-Type' => 'application/json', 'Accept' => 'application/json'],
            json_encode([
                'jsonrpc' => '2.0',
                'id' => 'init-header-test',
                'method' => 'initialize',
                'params' => [
                    'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
                    'clientInfo' => ['name' => 'StatelessHeaderTest', 'version' => '1.0'],
                    'capabilities' => []
                ]
            ])
        );

        $response = await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));

        expect($response->getStatusCode())->toBe(200);
        expect($response->hasHeader('Mcp-Session-Id'))->toBeFalse();

        $body = json_decode((string) $response->getBody(), true);
        expect($body['id'])->toBe('init-header-test');
        expect($body)->not->toHaveKey('error');
    })->group('integration', 'streamable_http_stateless');

    it('returns HTTP 405 for GET requests (SSE disabled) in stateless mode', function () {
        try {
            $getPromise = $this->statelessClient->browser->get(
                $this->statelessClient->baseUrl,
                ['Accept' => 'text/event-stream']
            );
            await(timeout($getPromise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));
            $this->fail("Expected GET request to fail with 405, but it succeeded.");
        } catch (ResponseException $e) {
            expect($e->getResponse()->getStatusCode())->toBe(405);
            $bodyContent = (string) $e->getResponse()->getBody();
            $decodedBody = json_decode($bodyContent, true);
            expect($decodedBody['error']['message'])->toContain('GET requests (SSE streaming) are not supported in stateless mode');
        }
    })->group('integration', 'streamable_http_stateless');

    it('returns 204 for DELETE requests in stateless mode (but they are meaningless)', function () {
        $deletePromise = $this->statelessClient->browser->delete($this->statelessClient->baseUrl);
        $response = await(timeout($deletePromise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));

        expect($response->getStatusCode())->toBe(204);
        expect((string) $response->getBody())->toBeEmpty();
    })->group('integration', 'streamable_http_stateless');

    it('handles multiple independent tool calls in stateless mode', function () {
        $toolResult1 = await($this->statelessClient->sendRequest('tools/call', [
            'name' => 'greet_streamable_tool',
            'arguments' => ['name' => 'User 1']
        ], 'tool-multi-1'));

        $toolResult2 = await($this->statelessClient->sendRequest('tools/call', [
            'name' => 'sum_streamable_tool',
            'arguments' => ['a' => 5, 'b' => 10]
        ], 'tool-multi-2'));

        $toolResult3 = await($this->statelessClient->sendRequest('tools/call', [
            'name' => 'greet_streamable_tool',
            'arguments' => ['name' => 'User 3']
        ], 'tool-multi-3'));

        expect($toolResult1['statusCode'])->toBe(200);
        expect($toolResult1['body']['id'])->toBe('tool-multi-1');
        expect($toolResult1['body']['result']['content'][0]['text'])->toBe('Hello, User 1!');

        expect($toolResult2['statusCode'])->toBe(200);
        expect($toolResult2['body']['id'])->toBe('tool-multi-2');
        expect($toolResult2['body']['result']['content'][0]['text'])->toBe('15');

        expect($toolResult3['statusCode'])->toBe(200);
        expect($toolResult3['body']['id'])->toBe('tool-multi-3');
        expect($toolResult3['body']['result']['content'][0]['text'])->toBe('Hello, User 3!');
    })->group('integration', 'streamable_http_stateless');
});

it('responds to OPTIONS request with CORS headers', function () {
    $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
    $this->process->start();
    $this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);
    await(delay(0.1));

    $browser = new Browser();
    $optionsUrl = $this->jsonClient->baseUrl;

    $promise = $browser->request('OPTIONS', $optionsUrl);
    $response = await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));

    expect($response->getStatusCode())->toBe(204);
    expect($response->getHeaderLine('Access-Control-Allow-Origin'))->toBe('*');
    expect($response->getHeaderLine('Access-Control-Allow-Methods'))->toContain('POST');
    expect($response->getHeaderLine('Access-Control-Allow-Methods'))->toContain('GET');
    expect($response->getHeaderLine('Access-Control-Allow-Headers'))->toContain('Mcp-Session-Id');
})->group('integration', 'streamable_http');

it('returns 404 for unknown paths', function () {
    $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
    $this->process->start();
    $this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);
    await(delay(0.1));

    $browser = new Browser();
    $unknownUrl = "http://" . STREAMABLE_HTTP_HOST . ":" . $this->port . "/completely/unknown/path";

    $promise = $browser->get($unknownUrl);

    try {
        await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));
        $this->fail("Request to unknown path should have failed with 404.");
    } catch (ResponseException $e) {
        expect($e->getResponse()->getStatusCode())->toBe(404);
        $decodedBody = json_decode((string)$e->getResponse()->getBody(), true);
        expect($decodedBody['error']['message'])->toContain('Not found');
    }
})->group('integration', 'streamable_http');

it('can delete client session with DELETE request', function () {
    $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
    $this->process->start();
    $this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);
    await(delay(0.1));

    // 1. Initialize
    await($this->jsonClient->sendRequest('initialize', ['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => []], 'init-delete-test'));
    $sessionIdForDelete = $this->jsonClient->sessionId;
    expect($sessionIdForDelete)->toBeString();
    await($this->jsonClient->sendNotification('notifications/initialized'));

    // 2. Establish a GET SSE connection
    $sseUrl = $this->jsonClient->baseUrl;
    $browserForSse = (new Browser())->withTimeout(3);
    $ssePromise = $browserForSse->requestStreaming('GET', $sseUrl, [
        'Accept' => 'text/event-stream',
        'Mcp-Session-Id' => $sessionIdForDelete
    ]);
    $ssePsrResponse = await(timeout($ssePromise, 3));
    expect($ssePsrResponse->getStatusCode())->toBe(200);
    expect($ssePsrResponse->getHeaderLine('Content-Type'))->toBe('text/event-stream');

    $sseStream = $ssePsrResponse->getBody();
    assert($sseStream instanceof ReadableStreamInterface);

    $isSseStreamClosed = false;
    $sseStream->on('close', function () use (&$isSseStreamClosed) {
        $isSseStreamClosed = true;
    });

    // 3. Send DELETE request
    $deleteResponse = await($this->jsonClient->sendDeleteRequest());
    expect($deleteResponse['statusCode'])->toBe(204);

    // 4. Assert that the GET SSE stream was closed
    await(delay(0.1));
    expect($isSseStreamClosed)->toBeTrue("The GET SSE stream for session {$sessionIdForDelete} was not closed after DELETE request.");

    // 5. Assert that the client session was deleted
    try {
        await($this->jsonClient->sendRequest('tools/list', [], 'tool-list-json-1'));
        $this->fail("Expected request to tools/list to fail with 400, but it succeeded.");
    } catch (ResponseException $e) {
        expect($e->getResponse()->getStatusCode())->toBe(404);
        $bodyContent = (string) $e->getResponse()->getBody();
        $decodedBody = json_decode($bodyContent, true);
        expect($decodedBody['error']['code'])->toBe(-32600);
        expect($decodedBody['error']['message'])->toContain('Invalid or expired session');
    }
})->group('integration', 'streamable_http_json');

it('executes middleware that adds headers to response', function () {
    $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
    $this->process->start();
    $this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);
    await(delay(0.1));

    // 1. Send a request and check that middleware-added header is present
    $response = await($this->jsonClient->sendRequest('initialize', [
        'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
        'clientInfo' => ['name' => 'MiddlewareTestClient'],
        'capabilities' => []
    ], 'init-middleware-headers'));

    // Check that the response has the header added by middleware
    expect($this->jsonClient->lastResponseHeaders)->toContain('X-Test-Middleware: header-added');
})->group('integration', 'streamable_http', 'middleware');

it('executes middleware that modifies request attributes', function () {
    $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
    $this->process->start();
    $this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);
    await(delay(0.1));

    // 1. Initialize 
    await($this->jsonClient->sendRequest('initialize', [
        'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
        'clientInfo' => ['name' => 'MiddlewareAttrTestClient', 'version' => '1.0'],
        'capabilities' => []
    ], 'init-middleware-attr'));
    await($this->jsonClient->sendNotification('notifications/initialized'));

    // 2. Call tool that checks for middleware-added attribute
    $toolResponse = await($this->jsonClient->sendRequest('tools/call', [
        'name' => 'check_request_attribute_tool',
        'arguments' => []
    ], 'tool-attr-check'));

    expect($toolResponse['body']['result']['content'][0]['text'])->toBe('middleware-value-found: middleware-value');
})->group('integration', 'streamable_http', 'middleware');

it('executes middleware that can short-circuit request processing', function () {
    $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
    $this->process->start();
    await(delay(0.1));

    $browser = new Browser();
    $shortCircuitUrl = "http://" . STREAMABLE_HTTP_HOST . ":" . $this->port . "/" . STREAMABLE_MCP_PATH . "/short-circuit";

    $promise = $browser->get($shortCircuitUrl);

    try {
        $response = await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));
        $this->fail("Expected a 418 status code response, but request succeeded");
    } catch (ResponseException $e) {
        expect($e->getResponse()->getStatusCode())->toBe(418);
        $body = (string) $e->getResponse()->getBody();
        expect($body)->toBe('Short-circuited by middleware');
    } catch (\Throwable $e) {
        $this->fail("Short-circuit middleware test failed: " . $e->getMessage());
    }
})->group('integration', 'streamable_http', 'middleware');

it('executes multiple middlewares in correct order', function () {
    $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
    $this->process->start();
    $this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);
    await(delay(0.1));

    // 1. Send a request and check middleware order
    await($this->jsonClient->sendRequest('initialize', [
        'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
        'clientInfo' => ['name' => 'MiddlewareOrderTestClient'],
        'capabilities' => []
    ], 'init-middleware-order'));

    // Check that headers from multiple middlewares are present in correct order
    expect($this->jsonClient->lastResponseHeaders)->toContain('X-Middleware-Order: third,second,first');
})->group('integration', 'streamable_http', 'middleware');

it('handles middleware that throws exceptions gracefully', function () {
    $this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
    $this->process->start();
    await(delay(0.1));

    $browser = new Browser();
    $errorUrl = "http://" . STREAMABLE_HTTP_HOST . ":" . $this->port . "/" . STREAMABLE_MCP_PATH . "/error-middleware";

    $promise = $browser->get($errorUrl);

    try {
        await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));
        $this->fail("Error middleware should have thrown an exception.");
    } catch (ResponseException $e) {
        expect($e->getResponse()->getStatusCode())->toBe(500);
        $body = (string) $e->getResponse()->getBody();
        // ReactPHP handles exceptions and returns a generic error message
        expect($body)->toContain('Internal Server Error');
    }
})->group('integration', 'streamable_http', 'middleware');

```
Page 5/5FirstPrevNextLast