This is page 5 of 5. Use http://codebase.md/php-mcp/server?page={x} to view the full context.
# Directory Structure
```
├── .github
│ └── workflows
│ ├── changelog.yml
│ └── tests.yml
├── .gitignore
├── .php-cs-fixer.php
├── CHANGELOG.md
├── composer.json
├── CONTRIBUTING.md
├── examples
│ ├── 01-discovery-stdio-calculator
│ │ ├── McpElements.php
│ │ └── server.php
│ ├── 02-discovery-http-userprofile
│ │ ├── McpElements.php
│ │ ├── server.php
│ │ └── UserIdCompletionProvider.php
│ ├── 03-manual-registration-stdio
│ │ ├── server.php
│ │ └── SimpleHandlers.php
│ ├── 04-combined-registration-http
│ │ ├── DiscoveredElements.php
│ │ ├── ManualHandlers.php
│ │ └── server.php
│ ├── 05-stdio-env-variables
│ │ ├── EnvToolHandler.php
│ │ └── server.php
│ ├── 06-custom-dependencies-stdio
│ │ ├── McpTaskHandlers.php
│ │ ├── server.php
│ │ └── Services.php
│ ├── 07-complex-tool-schema-http
│ │ ├── EventTypes.php
│ │ ├── McpEventScheduler.php
│ │ └── server.php
│ └── 08-schema-showcase-streamable
│ ├── SchemaShowcaseElements.php
│ └── server.php
├── LICENSE
├── phpunit.xml
├── README.md
├── src
│ ├── Attributes
│ │ ├── CompletionProvider.php
│ │ ├── McpPrompt.php
│ │ ├── McpResource.php
│ │ ├── McpResourceTemplate.php
│ │ ├── McpTool.php
│ │ └── Schema.php
│ ├── Configuration.php
│ ├── Context.php
│ ├── Contracts
│ │ ├── CompletionProviderInterface.php
│ │ ├── EventStoreInterface.php
│ │ ├── LoggerAwareInterface.php
│ │ ├── LoopAwareInterface.php
│ │ ├── ServerTransportInterface.php
│ │ ├── SessionHandlerInterface.php
│ │ └── SessionInterface.php
│ ├── Defaults
│ │ ├── ArrayCache.php
│ │ ├── BasicContainer.php
│ │ ├── DefaultUuidSessionIdGenerator.php
│ │ ├── EnumCompletionProvider.php
│ │ ├── FileCache.php
│ │ ├── InMemoryEventStore.php
│ │ ├── ListCompletionProvider.php
│ │ └── SystemClock.php
│ ├── Dispatcher.php
│ ├── Elements
│ │ ├── RegisteredElement.php
│ │ ├── RegisteredPrompt.php
│ │ ├── RegisteredResource.php
│ │ ├── RegisteredResourceTemplate.php
│ │ └── RegisteredTool.php
│ ├── Exception
│ │ ├── ConfigurationException.php
│ │ ├── DiscoveryException.php
│ │ ├── McpServerException.php
│ │ ├── ProtocolException.php
│ │ └── TransportException.php
│ ├── Protocol.php
│ ├── Registry.php
│ ├── Server.php
│ ├── ServerBuilder.php
│ ├── Session
│ │ ├── ArraySessionHandler.php
│ │ ├── CacheSessionHandler.php
│ │ ├── Session.php
│ │ ├── SessionManager.php
│ │ └── SubscriptionManager.php
│ ├── Transports
│ │ ├── HttpServerTransport.php
│ │ ├── StdioServerTransport.php
│ │ └── StreamableHttpServerTransport.php
│ └── Utils
│ ├── Discoverer.php
│ ├── DocBlockParser.php
│ ├── HandlerResolver.php
│ ├── SchemaGenerator.php
│ └── SchemaValidator.php
└── tests
├── Fixtures
│ ├── Discovery
│ │ ├── DiscoverablePromptHandler.php
│ │ ├── DiscoverableResourceHandler.php
│ │ ├── DiscoverableTemplateHandler.php
│ │ ├── DiscoverableToolHandler.php
│ │ ├── EnhancedCompletionHandler.php
│ │ ├── InvocablePromptFixture.php
│ │ ├── InvocableResourceFixture.php
│ │ ├── InvocableResourceTemplateFixture.php
│ │ ├── InvocableToolFixture.php
│ │ ├── NonDiscoverableClass.php
│ │ └── SubDir
│ │ └── HiddenTool.php
│ ├── Enums
│ │ ├── BackedIntEnum.php
│ │ ├── BackedStringEnum.php
│ │ ├── PriorityEnum.php
│ │ ├── StatusEnum.php
│ │ └── UnitEnum.php
│ ├── General
│ │ ├── CompletionProviderFixture.php
│ │ ├── DocBlockTestFixture.php
│ │ ├── InvokableHandlerFixture.php
│ │ ├── PromptHandlerFixture.php
│ │ ├── RequestAttributeChecker.php
│ │ ├── ResourceHandlerFixture.php
│ │ ├── ToolHandlerFixture.php
│ │ └── VariousTypesHandler.php
│ ├── Middlewares
│ │ ├── ErrorMiddleware.php
│ │ ├── FirstMiddleware.php
│ │ ├── HeaderMiddleware.php
│ │ ├── RequestAttributeMiddleware.php
│ │ ├── SecondMiddleware.php
│ │ ├── ShortCircuitMiddleware.php
│ │ └── ThirdMiddleware.php
│ ├── Schema
│ │ └── SchemaGenerationTarget.php
│ ├── ServerScripts
│ │ ├── HttpTestServer.php
│ │ ├── StdioTestServer.php
│ │ └── StreamableHttpTestServer.php
│ └── Utils
│ ├── AttributeFixtures.php
│ ├── DockBlockParserFixture.php
│ └── SchemaGeneratorFixture.php
├── Integration
│ ├── DiscoveryTest.php
│ ├── HttpServerTransportTest.php
│ ├── SchemaGenerationTest.php
│ ├── StdioServerTransportTest.php
│ └── StreamableHttpServerTransportTest.php
├── Mocks
│ ├── Clients
│ │ ├── MockJsonHttpClient.php
│ │ ├── MockSseClient.php
│ │ └── MockStreamHttpClient.php
│ └── Clock
│ └── FixedClock.php
├── Pest.php
├── TestCase.php
└── Unit
├── Attributes
│ ├── CompletionProviderTest.php
│ ├── McpPromptTest.php
│ ├── McpResourceTemplateTest.php
│ ├── McpResourceTest.php
│ └── McpToolTest.php
├── ConfigurationTest.php
├── Defaults
│ ├── EnumCompletionProviderTest.php
│ └── ListCompletionProviderTest.php
├── DispatcherTest.php
├── Elements
│ ├── RegisteredElementTest.php
│ ├── RegisteredPromptTest.php
│ ├── RegisteredResourceTemplateTest.php
│ ├── RegisteredResourceTest.php
│ └── RegisteredToolTest.php
├── ProtocolTest.php
├── RegistryTest.php
├── ServerBuilderTest.php
├── ServerTest.php
├── Session
│ ├── ArraySessionHandlerTest.php
│ ├── CacheSessionHandlerTest.php
│ ├── SessionManagerTest.php
│ └── SessionTest.php
└── Utils
├── DocBlockParserTest.php
├── HandlerResolverTest.php
└── SchemaValidatorTest.php
```
# Files
--------------------------------------------------------------------------------
/src/Transports/StreamableHttpServerTransport.php:
--------------------------------------------------------------------------------
```php
<?php
declare(strict_types=1);
namespace PhpMcp\Server\Transports;
use Evenement\EventEmitterTrait;
use PhpMcp\Server\Contracts\EventStoreInterface;
use PhpMcp\Server\Contracts\LoggerAwareInterface;
use PhpMcp\Server\Contracts\LoopAwareInterface;
use PhpMcp\Server\Contracts\ServerTransportInterface;
use PhpMcp\Server\Exception\McpServerException;
use PhpMcp\Server\Exception\TransportException;
use PhpMcp\Schema\JsonRpc\Message;
use PhpMcp\Schema\JsonRpc\BatchRequest;
use PhpMcp\Schema\JsonRpc\BatchResponse;
use PhpMcp\Schema\JsonRpc\Error;
use PhpMcp\Schema\JsonRpc\Parser;
use PhpMcp\Schema\JsonRpc\Request;
use PhpMcp\Schema\JsonRpc\Response;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Http\HttpServer;
use React\Http\Message\Response as HttpResponse;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use React\Socket\SocketServer;
use React\Stream\ThroughStream;
use Throwable;
use function React\Promise\resolve;
use function React\Promise\reject;
class StreamableHttpServerTransport implements ServerTransportInterface, LoggerAwareInterface, LoopAwareInterface
{
use EventEmitterTrait;
protected LoggerInterface $logger;
protected LoopInterface $loop;
private ?SocketServer $socket = null;
private ?HttpServer $http = null;
private bool $listening = false;
private bool $closing = false;
private ?EventStoreInterface $eventStore;
/**
* Stores Deferred objects for POST requests awaiting a direct JSON response.
* Keyed by a unique pendingRequestId.
* @var array<string, Deferred>
*/
private array $pendingRequests = [];
/**
* Stores active SSE streams.
* Key: streamId
* Value: ['stream' => ThroughStream, 'sessionId' => string, 'context' => array]
* @var array<string, array{stream: ThroughStream, sessionId: string, context: array}>
*/
private array $activeSseStreams = [];
private ?ThroughStream $getStream = null;
/**
* @param bool $enableJsonResponse If true, the server will return JSON responses instead of starting an SSE stream.
* @param bool $stateless If true, the server will not emit client_connected events.
* @param EventStoreInterface $eventStore If provided, the server will replay events to the client.
* @param array<callable(\Psr\Http\Message\ServerRequestInterface, callable): (\Psr\Http\Message\ResponseInterface|\React\Promise\PromiseInterface)> $middlewares Middlewares to be applied to the HTTP server.
* This can be useful for simple request/response scenarios without streaming.
*/
public function __construct(
private readonly string $host = '127.0.0.1',
private readonly int $port = 8080,
private string $mcpPath = '/mcp',
private ?array $sslContext = null,
private readonly bool $enableJsonResponse = true,
private readonly bool $stateless = false,
?EventStoreInterface $eventStore = null,
private array $middlewares = []
) {
$this->logger = new NullLogger();
$this->loop = Loop::get();
$this->mcpPath = '/' . trim($mcpPath, '/');
$this->eventStore = $eventStore;
foreach ($this->middlewares as $mw) {
if (!is_callable($mw)) {
throw new \InvalidArgumentException('All provided middlewares must be callable.');
}
}
}
protected function generateId(): string
{
return bin2hex(random_bytes(16)); // 32 hex characters
}
public function setLogger(LoggerInterface $logger): void
{
$this->logger = $logger;
}
public function setLoop(LoopInterface $loop): void
{
$this->loop = $loop;
}
public function listen(): void
{
if ($this->listening) {
throw new TransportException('StreamableHttp transport is already listening.');
}
if ($this->closing) {
throw new TransportException('Cannot listen, transport is closing/closed.');
}
$listenAddress = "{$this->host}:{$this->port}";
$protocol = $this->sslContext ? 'https' : 'http';
try {
$this->socket = new SocketServer(
$listenAddress,
$this->sslContext ?? [],
$this->loop
);
$handlers = array_merge($this->middlewares, [$this->createRequestHandler()]);
$this->http = new HttpServer($this->loop, ...$handlers);
$this->http->listen($this->socket);
$this->socket->on('error', function (Throwable $error) {
$this->logger->error('Socket server error (StreamableHttp).', ['error' => $error->getMessage()]);
$this->emit('error', [new TransportException("Socket server error: {$error->getMessage()}", 0, $error)]);
$this->close();
});
$this->logger->info("Server is up and listening on {$protocol}://{$listenAddress} 🚀");
$this->logger->info("MCP Endpoint: {$protocol}://{$listenAddress}{$this->mcpPath}");
$this->listening = true;
$this->closing = false;
$this->emit('ready');
} catch (Throwable $e) {
$this->logger->error("Failed to start StreamableHttp listener on {$listenAddress}", ['exception' => $e]);
throw new TransportException("Failed to start StreamableHttp listener on {$listenAddress}: {$e->getMessage()}", 0, $e);
}
}
private function createRequestHandler(): callable
{
return function (ServerRequestInterface $request) {
$path = $request->getUri()->getPath();
$method = $request->getMethod();
$this->logger->debug("Request received", ['method' => $method, 'path' => $path, 'target' => $this->mcpPath]);
if ($path !== $this->mcpPath) {
$error = Error::forInvalidRequest("Not found: {$path}");
return new HttpResponse(404, ['Content-Type' => 'application/json'], json_encode($error));
}
$corsHeaders = [
'Access-Control-Allow-Origin' => '*',
'Access-Control-Allow-Methods' => 'GET, POST, DELETE, OPTIONS',
'Access-Control-Allow-Headers' => 'Content-Type, Mcp-Session-Id, Last-Event-ID, Authorization',
];
if ($method === 'OPTIONS') {
return new HttpResponse(204, $corsHeaders);
}
$addCors = function (HttpResponse $r) use ($corsHeaders) {
foreach ($corsHeaders as $key => $value) {
$r = $r->withAddedHeader($key, $value);
}
return $r;
};
try {
return match ($method) {
'GET' => $this->handleGetRequest($request)->then($addCors, fn($e) => $addCors($this->handleRequestError($e, $request))),
'POST' => $this->handlePostRequest($request)->then($addCors, fn($e) => $addCors($this->handleRequestError($e, $request))),
'DELETE' => $this->handleDeleteRequest($request)->then($addCors, fn($e) => $addCors($this->handleRequestError($e, $request))),
default => $addCors($this->handleUnsupportedRequest($request)),
};
} catch (Throwable $e) {
return $addCors($this->handleRequestError($e, $request));
}
};
}
private function handleGetRequest(ServerRequestInterface $request): PromiseInterface
{
if ($this->stateless) {
$error = Error::forInvalidRequest("GET requests (SSE streaming) are not supported in stateless mode.");
return resolve(new HttpResponse(405, ['Content-Type' => 'application/json'], json_encode($error)));
}
$acceptHeader = $request->getHeaderLine('Accept');
if (!str_contains($acceptHeader, 'text/event-stream')) {
$error = Error::forInvalidRequest("Not Acceptable: Client must accept text/event-stream for GET requests.");
return resolve(new HttpResponse(406, ['Content-Type' => 'application/json'], json_encode($error)));
}
$sessionId = $request->getHeaderLine('Mcp-Session-Id');
if (empty($sessionId)) {
$this->logger->warning("GET request without Mcp-Session-Id.");
$error = Error::forInvalidRequest("Mcp-Session-Id header required for GET requests.");
return resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error)));
}
$this->getStream = new ThroughStream();
$this->getStream->on('close', function () use ($sessionId) {
$this->logger->debug("GET SSE stream closed.", ['sessionId' => $sessionId]);
$this->getStream = null;
});
$this->getStream->on('error', function (Throwable $e) use ($sessionId) {
$this->logger->error("GET SSE stream error.", ['sessionId' => $sessionId, 'error' => $e->getMessage()]);
$this->getStream = null;
});
$headers = [
'Content-Type' => 'text/event-stream',
'Cache-Control' => 'no-cache',
'Connection' => 'keep-alive',
'X-Accel-Buffering' => 'no',
];
$response = new HttpResponse(200, $headers, $this->getStream);
if ($this->eventStore) {
$lastEventId = $request->getHeaderLine('Last-Event-ID');
$this->replayEvents($lastEventId, $this->getStream, $sessionId);
}
return resolve($response);
}
private function handlePostRequest(ServerRequestInterface $request): PromiseInterface
{
$deferred = new Deferred();
$acceptHeader = $request->getHeaderLine('Accept');
if (!str_contains($acceptHeader, 'application/json') && !str_contains($acceptHeader, 'text/event-stream')) {
$error = Error::forInvalidRequest("Not Acceptable: Client must accept both application/json or text/event-stream");
$deferred->resolve(new HttpResponse(406, ['Content-Type' => 'application/json'], json_encode($error)));
return $deferred->promise();
}
if (!str_contains($request->getHeaderLine('Content-Type'), 'application/json')) {
$error = Error::forInvalidRequest("Unsupported Media Type: Content-Type must be application/json");
$deferred->resolve(new HttpResponse(415, ['Content-Type' => 'application/json'], json_encode($error)));
return $deferred->promise();
}
$body = $request->getBody()->getContents();
if (empty($body)) {
$this->logger->warning("Received empty POST body");
$error = Error::forInvalidRequest("Empty request body.");
$deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error)));
return $deferred->promise();
}
try {
$message = Parser::parse($body);
} catch (Throwable $e) {
$this->logger->error("Failed to parse MCP message from POST body", ['error' => $e->getMessage()]);
$error = Error::forParseError("Invalid JSON: " . $e->getMessage());
$deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error)));
return $deferred->promise();
}
$isInitializeRequest = ($message instanceof Request && $message->method === 'initialize');
$sessionId = null;
if ($this->stateless) {
$sessionId = $this->generateId();
$this->emit('client_connected', [$sessionId]);
} else {
if ($isInitializeRequest) {
if ($request->hasHeader('Mcp-Session-Id')) {
$this->logger->warning("Client sent Mcp-Session-Id with InitializeRequest. Ignoring.", ['clientSentId' => $request->getHeaderLine('Mcp-Session-Id')]);
$error = Error::forInvalidRequest("Invalid request: Session already initialized. Mcp-Session-Id header not allowed with InitializeRequest.", $message->getId());
$deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error)));
return $deferred->promise();
}
$sessionId = $this->generateId();
$this->emit('client_connected', [$sessionId]);
} else {
$sessionId = $request->getHeaderLine('Mcp-Session-Id');
if (empty($sessionId)) {
$this->logger->warning("POST request without Mcp-Session-Id.");
$error = Error::forInvalidRequest("Mcp-Session-Id header required for POST requests.", $message->getId());
$deferred->resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error)));
return $deferred->promise();
}
}
}
$context = [
'is_initialize_request' => $isInitializeRequest,
];
$nRequests = match (true) {
$message instanceof Request => 1,
$message instanceof BatchRequest => $message->nRequests(),
default => 0,
};
if ($nRequests === 0) {
$deferred->resolve(new HttpResponse(202));
$context['type'] = 'post_202_sent';
} else {
if ($this->enableJsonResponse) {
$pendingRequestId = $this->generateId();
$this->pendingRequests[$pendingRequestId] = $deferred;
$timeoutTimer = $this->loop->addTimer(30, function () use ($pendingRequestId, $sessionId) {
if (isset($this->pendingRequests[$pendingRequestId])) {
$deferred = $this->pendingRequests[$pendingRequestId];
unset($this->pendingRequests[$pendingRequestId]);
$this->logger->warning("Timeout waiting for direct JSON response processing.", ['pending_request_id' => $pendingRequestId, 'session_id' => $sessionId]);
$errorResponse = McpServerException::internalError("Request processing timed out.")->toJsonRpcError($pendingRequestId);
$deferred->resolve(new HttpResponse(500, ['Content-Type' => 'application/json'], json_encode($errorResponse->toArray())));
}
});
$this->pendingRequests[$pendingRequestId]->promise()->finally(function () use ($timeoutTimer) {
$this->loop->cancelTimer($timeoutTimer);
});
$context['type'] = 'post_json';
$context['pending_request_id'] = $pendingRequestId;
} else {
$streamId = $this->generateId();
$sseStream = new ThroughStream();
$this->activeSseStreams[$streamId] = [
'stream' => $sseStream,
'sessionId' => $sessionId,
'context' => ['nRequests' => $nRequests, 'nResponses' => 0]
];
$sseStream->on('close', function () use ($streamId) {
$this->logger->info("POST SSE stream closed by client/server.", ['streamId' => $streamId, 'sessionId' => $this->activeSseStreams[$streamId]['sessionId']]);
unset($this->activeSseStreams[$streamId]);
});
$sseStream->on('error', function (Throwable $e) use ($streamId) {
$this->logger->error("POST SSE stream error.", ['streamId' => $streamId, 'sessionId' => $this->activeSseStreams[$streamId]['sessionId'], 'error' => $e->getMessage()]);
unset($this->activeSseStreams[$streamId]);
});
$headers = [
'Content-Type' => 'text/event-stream',
'Cache-Control' => 'no-cache',
'Connection' => 'keep-alive',
'X-Accel-Buffering' => 'no',
];
if (!empty($sessionId) && !$this->stateless) {
$headers['Mcp-Session-Id'] = $sessionId;
}
$deferred->resolve(new HttpResponse(200, $headers, $sseStream));
$context['type'] = 'post_sse';
$context['streamId'] = $streamId;
$context['nRequests'] = $nRequests;
}
}
$context['stateless'] = $this->stateless;
$context['request'] = $request;
$this->loop->futureTick(function () use ($message, $sessionId, $context) {
$this->emit('message', [$message, $sessionId, $context]);
});
return $deferred->promise();
}
private function handleDeleteRequest(ServerRequestInterface $request): PromiseInterface
{
if ($this->stateless) {
return resolve(new HttpResponse(204));
}
$sessionId = $request->getHeaderLine('Mcp-Session-Id');
if (empty($sessionId)) {
$this->logger->warning("DELETE request without Mcp-Session-Id.");
$error = Error::forInvalidRequest("Mcp-Session-Id header required for DELETE.");
return resolve(new HttpResponse(400, ['Content-Type' => 'application/json'], json_encode($error)));
}
$streamsToClose = [];
foreach ($this->activeSseStreams as $streamId => $streamInfo) {
if ($streamInfo['sessionId'] === $sessionId) {
$streamsToClose[] = $streamId;
}
}
foreach ($streamsToClose as $streamId) {
$this->activeSseStreams[$streamId]['stream']->end();
unset($this->activeSseStreams[$streamId]);
}
if ($this->getStream !== null) {
$this->getStream->end();
$this->getStream = null;
}
$this->emit('client_disconnected', [$sessionId, 'Session terminated by DELETE request']);
return resolve(new HttpResponse(204));
}
private function handleUnsupportedRequest(ServerRequestInterface $request): HttpResponse
{
$error = Error::forInvalidRequest("Method not allowed: {$request->getMethod()}");
$headers = [
'Content-Type' => 'application/json',
'Allow' => 'GET, POST, DELETE, OPTIONS',
];
return new HttpResponse(405, $headers, json_encode($error));
}
private function handleRequestError(Throwable $e, ServerRequestInterface $request): HttpResponse
{
$this->logger->error("Error processing HTTP request", [
'method' => $request->getMethod(),
'path' => $request->getUri()->getPath(),
'exception' => $e->getMessage()
]);
if ($e instanceof TransportException) {
$error = Error::forInternalError("Transport Error: " . $e->getMessage());
return new HttpResponse(500, ['Content-Type' => 'application/json'], json_encode($error));
}
$error = Error::forInternalError("Internal Server Error during HTTP request processing.");
return new HttpResponse(500, ['Content-Type' => 'application/json'], json_encode($error));
}
public function sendMessage(Message $message, string $sessionId, array $context = []): PromiseInterface
{
if ($this->closing) {
return reject(new TransportException('Transport is closing.'));
}
$isInitializeResponse = ($context['is_initialize_request'] ?? false) && ($message instanceof Response);
switch ($context['type'] ?? null) {
case 'post_202_sent':
return resolve(null);
case 'post_sse':
$streamId = $context['streamId'];
if (!isset($this->activeSseStreams[$streamId])) {
$this->logger->error("SSE stream for POST not found.", ['streamId' => $streamId, 'sessionId' => $sessionId]);
return reject(new TransportException("SSE stream {$streamId} not found for POST response."));
}
$stream = $this->activeSseStreams[$streamId]['stream'];
if (!$stream->isWritable()) {
$this->logger->warning("SSE stream for POST is not writable.", ['streamId' => $streamId, 'sessionId' => $sessionId]);
return reject(new TransportException("SSE stream {$streamId} for POST is not writable."));
}
$sentCountThisCall = 0;
if ($message instanceof Response || $message instanceof Error) {
$json = json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
$eventId = $this->eventStore ? $this->eventStore->storeEvent($streamId, $json) : null;
$this->sendSseEventToStream($stream, $json, $eventId);
$sentCountThisCall = 1;
} elseif ($message instanceof BatchResponse) {
foreach ($message->getAll() as $singleResponse) {
$json = json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
$eventId = $this->eventStore ? $this->eventStore->storeEvent($streamId, $json) : null;
$this->sendSseEventToStream($stream, $json, $eventId);
$sentCountThisCall++;
}
}
if (isset($this->activeSseStreams[$streamId]['context'])) {
$this->activeSseStreams[$streamId]['context']['nResponses'] += $sentCountThisCall;
if ($this->activeSseStreams[$streamId]['context']['nResponses'] >= $this->activeSseStreams[$streamId]['context']['nRequests']) {
$this->logger->info("All expected responses sent for POST SSE stream. Closing.", ['streamId' => $streamId, 'sessionId' => $sessionId]);
$stream->end(); // Will trigger 'close' event.
if ($context['stateless'] ?? false) {
$this->loop->futureTick(function () use ($sessionId) {
$this->emit('client_disconnected', [$sessionId, 'Stateless request completed']);
});
}
}
}
return resolve(null);
case 'post_json':
$pendingRequestId = $context['pending_request_id'];
if (!isset($this->pendingRequests[$pendingRequestId])) {
$this->logger->error("Pending direct JSON request not found.", ['pending_request_id' => $pendingRequestId, 'session_id' => $sessionId]);
return reject(new TransportException("Pending request {$pendingRequestId} not found."));
}
$deferred = $this->pendingRequests[$pendingRequestId];
unset($this->pendingRequests[$pendingRequestId]);
$responseBody = json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
$headers = ['Content-Type' => 'application/json'];
if ($isInitializeResponse && !$this->stateless) {
$headers['Mcp-Session-Id'] = $sessionId;
}
$statusCode = $context['status_code'] ?? 200;
$deferred->resolve(new HttpResponse($statusCode, $headers, $responseBody . "\n"));
if ($context['stateless'] ?? false) {
$this->loop->futureTick(function () use ($sessionId) {
$this->emit('client_disconnected', [$sessionId, 'Stateless request completed']);
});
}
return resolve(null);
default:
if ($this->getStream === null) {
$this->logger->error("GET SSE stream not found.", ['sessionId' => $sessionId]);
return reject(new TransportException("GET SSE stream not found."));
}
if (!$this->getStream->isWritable()) {
$this->logger->warning("GET SSE stream is not writable.", ['sessionId' => $sessionId]);
return reject(new TransportException("GET SSE stream not writable."));
}
if ($message instanceof Response || $message instanceof Error) {
$json = json_encode($message, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
$eventId = $this->eventStore ? $this->eventStore->storeEvent('GET_STREAM', $json) : null;
$this->sendSseEventToStream($this->getStream, $json, $eventId);
} elseif ($message instanceof BatchResponse) {
foreach ($message->getAll() as $singleResponse) {
$json = json_encode($singleResponse, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
$eventId = $this->eventStore ? $this->eventStore->storeEvent('GET_STREAM', $json) : null;
$this->sendSseEventToStream($this->getStream, $json, $eventId);
}
}
return resolve(null);
}
}
private function replayEvents(string $lastEventId, ThroughStream $sseStream, string $sessionId): void
{
if (empty($lastEventId)) {
return;
}
try {
$this->eventStore->replayEventsAfter(
$lastEventId,
function (string $replayedEventId, string $json) use ($sseStream) {
$this->logger->debug("Replaying event", ['replayedEventId' => $replayedEventId]);
$this->sendSseEventToStream($sseStream, $json, $replayedEventId);
}
);
} catch (Throwable $e) {
$this->logger->error("Error during event replay.", ['sessionId' => $sessionId, 'exception' => $e]);
}
}
private function sendSseEventToStream(ThroughStream $stream, string $data, ?string $eventId = null): bool
{
if (! $stream->isWritable()) {
return false;
}
$frame = "event: message\n";
if ($eventId !== null) {
$frame .= "id: {$eventId}\n";
}
$lines = explode("\n", $data);
foreach ($lines as $line) {
$frame .= "data: {$line}\n";
}
$frame .= "\n";
return $stream->write($frame);
}
public function close(): void
{
if ($this->closing) {
return;
}
$this->closing = true;
$this->listening = false;
$this->logger->info('Closing transport...');
if ($this->socket) {
$this->socket->close();
$this->socket = null;
}
foreach ($this->activeSseStreams as $streamId => $streamInfo) {
if ($streamInfo['stream']->isWritable()) {
$streamInfo['stream']->end();
}
}
if ($this->getStream !== null) {
$this->getStream->end();
$this->getStream = null;
}
foreach ($this->pendingRequests as $pendingRequestId => $deferred) {
$deferred->reject(new TransportException('Transport is closing.'));
}
$this->activeSseStreams = [];
$this->pendingRequests = [];
$this->emit('close', ['Transport closed.']);
$this->removeAllListeners();
}
}
```
--------------------------------------------------------------------------------
/src/Utils/SchemaGenerator.php:
--------------------------------------------------------------------------------
```php
<?php
namespace PhpMcp\Server\Utils;
use phpDocumentor\Reflection\DocBlock\Tags\Param;
use PhpMcp\Server\Attributes\Schema;
use PhpMcp\Server\Context;
use ReflectionEnum;
use ReflectionIntersectionType;
use ReflectionMethod;
use ReflectionNamedType;
use ReflectionParameter;
use ReflectionType;
use ReflectionUnionType;
use stdClass;
/**
* Generates JSON Schema for method parameters with intelligent Schema attribute handling.
*
* Priority system:
* 1. Schema attributes (method-level and parameter-level)
* 2. Reflection type information
* 3. DocBlock type information
*/
class SchemaGenerator
{
private DocBlockParser $docBlockParser;
public function __construct(DocBlockParser $docBlockParser)
{
$this->docBlockParser = $docBlockParser;
}
/**
* Generates a JSON Schema object (as a PHP array) for a method's or function's parameters.
*/
public function generate(\ReflectionMethod|\ReflectionFunction $reflection): array
{
$methodSchema = $this->extractMethodLevelSchema($reflection);
if ($methodSchema && isset($methodSchema['definition'])) {
return $methodSchema['definition'];
}
$parametersInfo = $this->parseParametersInfo($reflection);
return $this->buildSchemaFromParameters($parametersInfo, $methodSchema);
}
/**
* Extracts method-level or function-level Schema attribute.
*/
private function extractMethodLevelSchema(\ReflectionMethod|\ReflectionFunction $reflection): ?array
{
$schemaAttrs = $reflection->getAttributes(Schema::class, \ReflectionAttribute::IS_INSTANCEOF);
if (empty($schemaAttrs)) {
return null;
}
$schemaAttr = $schemaAttrs[0]->newInstance();
return $schemaAttr->toArray();
}
/**
* Extracts parameter-level Schema attribute.
*/
private function extractParameterLevelSchema(ReflectionParameter $parameter): array
{
$schemaAttrs = $parameter->getAttributes(Schema::class, \ReflectionAttribute::IS_INSTANCEOF);
if (empty($schemaAttrs)) {
return [];
}
$schemaAttr = $schemaAttrs[0]->newInstance();
return $schemaAttr->toArray();
}
/**
* Builds the final schema from parameter information and method-level schema.
*
* @param array<int, array{
* name: string,
* doc_block_tag: Param|null,
* reflection_param: ReflectionParameter,
* reflection_type_object: ReflectionType|null,
* type_string: string,
* description: string|null,
* required: bool,
* allows_null: bool,
* default_value: mixed|null,
* has_default: bool,
* is_variadic: bool,
* parameter_schema: array<string, mixed>
* }> $parametersInfo
*
* @param array<string, mixed>|null $methodSchema
*
* @return array<string, mixed>
*/
private function buildSchemaFromParameters(array $parametersInfo, ?array $methodSchema): array
{
$schema = [
'type' => 'object',
'properties' => [],
'required' => [],
];
// Apply method-level schema as base
if ($methodSchema) {
$schema = array_merge($schema, $methodSchema);
if (!isset($schema['type'])) {
$schema['type'] = 'object';
}
if (!isset($schema['properties'])) {
$schema['properties'] = [];
}
if (!isset($schema['required'])) {
$schema['required'] = [];
}
}
foreach ($parametersInfo as $paramInfo) {
$paramName = $paramInfo['name'];
$methodLevelParamSchema = $schema['properties'][$paramName] ?? null;
$paramSchema = $this->buildParameterSchema($paramInfo, $methodLevelParamSchema);
$schema['properties'][$paramName] = $paramSchema;
if ($paramInfo['required'] && !in_array($paramName, $schema['required'])) {
$schema['required'][] = $paramName;
} elseif (!$paramInfo['required'] && ($key = array_search($paramName, $schema['required'])) !== false) {
unset($schema['required'][$key]);
$schema['required'] = array_values($schema['required']); // Re-index
}
}
// Clean up empty properties
if (empty($schema['properties'])) {
$schema['properties'] = new stdClass();
}
if (empty($schema['required'])) {
unset($schema['required']);
}
return $schema;
}
/**
* Builds the final schema for a single parameter by merging all three levels.
*
* @param array{
* name: string,
* doc_block_tag: Param|null,
* reflection_param: ReflectionParameter,
* reflection_type_object: ReflectionType|null,
* type_string: string,
* description: string|null,
* required: bool,
* allows_null: bool,
* default_value: mixed|null,
* has_default: bool,
* is_variadic: bool,
* parameter_schema: array<string, mixed>
* } $paramInfo
* @param array<string, mixed>|null $methodLevelParamSchema
*/
private function buildParameterSchema(array $paramInfo, ?array $methodLevelParamSchema = null): array
{
if ($paramInfo['is_variadic']) {
return $this->buildVariadicParameterSchema($paramInfo);
}
$inferredSchema = $this->buildInferredParameterSchema($paramInfo);
// Method-level takes precedence over inferred schema
$mergedSchema = $inferredSchema;
if ($methodLevelParamSchema) {
$mergedSchema = $this->mergeSchemas($inferredSchema, $methodLevelParamSchema);
}
// Parameter-level takes highest precedence
$parameterLevelSchema = $paramInfo['parameter_schema'];
if (!empty($parameterLevelSchema)) {
if (isset($parameterLevelSchema['definition']) && is_array($parameterLevelSchema['definition'])) {
return $parameterLevelSchema['definition'];
}
$mergedSchema = $this->mergeSchemas($mergedSchema, $parameterLevelSchema);
}
return $mergedSchema;
}
/**
* Merge two schemas where the dominant schema takes precedence over the recessive one.
*
* @param array $recessiveSchema The schema with lower precedence
* @param array $dominantSchema The schema with higher precedence
*/
private function mergeSchemas(array $recessiveSchema, array $dominantSchema): array
{
$mergedSchema = array_merge($recessiveSchema, $dominantSchema);
return $mergedSchema;
}
/**
* Builds parameter schema from inferred type and docblock information only.
* Returns empty array for variadic parameters (handled separately).
*/
private function buildInferredParameterSchema(array $paramInfo): array
{
$paramSchema = [];
// Variadic parameters are handled separately
if ($paramInfo['is_variadic']) {
return [];
}
// Infer JSON Schema types
$jsonTypes = $this->inferParameterTypes($paramInfo);
if (count($jsonTypes) === 1) {
$paramSchema['type'] = $jsonTypes[0];
} elseif (count($jsonTypes) > 1) {
$paramSchema['type'] = $jsonTypes;
}
// Add description from docblock
if ($paramInfo['description']) {
$paramSchema['description'] = $paramInfo['description'];
}
// Add default value only if parameter actually has a default
if ($paramInfo['has_default']) {
$paramSchema['default'] = $paramInfo['default_value'];
}
// Handle enums
$paramSchema = $this->applyEnumConstraints($paramSchema, $paramInfo);
// Handle array items
$paramSchema = $this->applyArrayConstraints($paramSchema, $paramInfo);
return $paramSchema;
}
/**
* Builds schema for variadic parameters.
*/
private function buildVariadicParameterSchema(array $paramInfo): array
{
$paramSchema = ['type' => 'array'];
// Apply parameter-level Schema attributes first
if (!empty($paramInfo['parameter_schema'])) {
$paramSchema = array_merge($paramSchema, $paramInfo['parameter_schema']);
// Ensure type is always array for variadic
$paramSchema['type'] = 'array';
}
if ($paramInfo['description']) {
$paramSchema['description'] = $paramInfo['description'];
}
// If no items specified by Schema attribute, infer from type
if (!isset($paramSchema['items'])) {
$itemJsonTypes = $this->mapPhpTypeToJsonSchemaType($paramInfo['type_string']);
$nonNullItemTypes = array_filter($itemJsonTypes, fn($t) => $t !== 'null');
if (count($nonNullItemTypes) === 1) {
$paramSchema['items'] = ['type' => $nonNullItemTypes[0]];
}
}
return $paramSchema;
}
/**
* Infers JSON Schema types for a parameter.
*/
private function inferParameterTypes(array $paramInfo): array
{
$jsonTypes = $this->mapPhpTypeToJsonSchemaType($paramInfo['type_string']);
if ($paramInfo['allows_null'] && strtolower($paramInfo['type_string']) !== 'mixed' && !in_array('null', $jsonTypes)) {
$jsonTypes[] = 'null';
}
if (count($jsonTypes) > 1) {
// Sort but ensure null comes first for consistency
$nullIndex = array_search('null', $jsonTypes);
if ($nullIndex !== false) {
unset($jsonTypes[$nullIndex]);
sort($jsonTypes);
array_unshift($jsonTypes, 'null');
} else {
sort($jsonTypes);
}
}
return $jsonTypes;
}
/**
* Applies enum constraints to parameter schema.
*/
private function applyEnumConstraints(array $paramSchema, array $paramInfo): array
{
$reflectionType = $paramInfo['reflection_type_object'];
if (!($reflectionType instanceof ReflectionNamedType) || $reflectionType->isBuiltin() || !enum_exists($reflectionType->getName())) {
return $paramSchema;
}
$enumClass = $reflectionType->getName();
$enumReflection = new ReflectionEnum($enumClass);
$backingTypeReflection = $enumReflection->getBackingType();
if ($enumReflection->isBacked() && $backingTypeReflection instanceof ReflectionNamedType) {
$paramSchema['enum'] = array_column($enumClass::cases(), 'value');
$jsonBackingType = match ($backingTypeReflection->getName()) {
'int' => 'integer',
'string' => 'string',
default => null,
};
if ($jsonBackingType) {
if (isset($paramSchema['type']) && is_array($paramSchema['type']) && in_array('null', $paramSchema['type'])) {
$paramSchema['type'] = ['null', $jsonBackingType];
} else {
$paramSchema['type'] = $jsonBackingType;
}
}
} else {
// Non-backed enum - use names as enum values
$paramSchema['enum'] = array_column($enumClass::cases(), 'name');
if (isset($paramSchema['type']) && is_array($paramSchema['type']) && in_array('null', $paramSchema['type'])) {
$paramSchema['type'] = ['null', 'string'];
} else {
$paramSchema['type'] = 'string';
}
}
return $paramSchema;
}
/**
* Applies array-specific constraints to parameter schema.
*/
private function applyArrayConstraints(array $paramSchema, array $paramInfo): array
{
if (!isset($paramSchema['type'])) {
return $paramSchema;
}
$typeString = $paramInfo['type_string'];
$allowsNull = $paramInfo['allows_null'];
// Handle object-like arrays using array{} syntax
if (preg_match('/^array\s*{/i', $typeString)) {
$objectSchema = $this->inferArrayItemsType($typeString);
if (is_array($objectSchema) && isset($objectSchema['properties'])) {
$paramSchema = array_merge($paramSchema, $objectSchema);
$paramSchema['type'] = $allowsNull ? ['object', 'null'] : 'object';
}
}
// Handle regular arrays
elseif (in_array('array', $this->mapPhpTypeToJsonSchemaType($typeString))) {
$itemsType = $this->inferArrayItemsType($typeString);
if ($itemsType !== 'any') {
if (is_string($itemsType)) {
$paramSchema['items'] = ['type' => $itemsType];
} else {
if (!isset($itemsType['type']) && isset($itemsType['properties'])) {
$itemsType = array_merge(['type' => 'object'], $itemsType);
}
$paramSchema['items'] = $itemsType;
}
}
if ($allowsNull) {
$paramSchema['type'] = ['array', 'null'];
sort($paramSchema['type']);
} else {
$paramSchema['type'] = 'array';
}
}
return $paramSchema;
}
/**
* Parses detailed information about a method's parameters.
*
* @return array<int, array{
* name: string,
* doc_block_tag: Param|null,
* reflection_param: ReflectionParameter,
* reflection_type_object: ReflectionType|null,
* type_string: string,
* description: string|null,
* required: bool,
* allows_null: bool,
* default_value: mixed|null,
* has_default: bool,
* is_variadic: bool,
* parameter_schema: array<string, mixed>
* }>
*/
private function parseParametersInfo(\ReflectionMethod|\ReflectionFunction $reflection): array
{
$docComment = $reflection->getDocComment() ?: null;
$docBlock = $this->docBlockParser->parseDocBlock($docComment);
$paramTags = $this->docBlockParser->getParamTags($docBlock);
$parametersInfo = [];
foreach ($reflection->getParameters() as $rp) {
$paramName = $rp->getName();
$paramTag = $paramTags['$' . $paramName] ?? null;
$reflectionType = $rp->getType();
if ($reflectionType instanceof ReflectionNamedType && $reflectionType?->getName() === Context::class) {
continue;
}
$typeString = $this->getParameterTypeString($rp, $paramTag);
$description = $this->docBlockParser->getParamDescription($paramTag);
$hasDefault = $rp->isDefaultValueAvailable();
$defaultValue = $hasDefault ? $rp->getDefaultValue() : null;
$isVariadic = $rp->isVariadic();
$parameterSchema = $this->extractParameterLevelSchema($rp);
if ($defaultValue instanceof \BackedEnum) {
$defaultValue = $defaultValue->value;
}
if ($defaultValue instanceof \UnitEnum) {
$defaultValue = $defaultValue->name;
}
$allowsNull = false;
if ($reflectionType && $reflectionType->allowsNull()) {
$allowsNull = true;
} elseif ($hasDefault && $defaultValue === null) {
$allowsNull = true;
} elseif (str_contains($typeString, 'null') || strtolower($typeString) === 'mixed') {
$allowsNull = true;
}
$parametersInfo[] = [
'name' => $paramName,
'doc_block_tag' => $paramTag,
'reflection_param' => $rp,
'reflection_type_object' => $reflectionType,
'type_string' => $typeString,
'description' => $description,
'required' => !$rp->isOptional(),
'allows_null' => $allowsNull,
'default_value' => $defaultValue,
'has_default' => $hasDefault,
'is_variadic' => $isVariadic,
'parameter_schema' => $parameterSchema,
];
}
return $parametersInfo;
}
/**
* Determines the type string for a parameter, prioritizing DocBlock.
*/
private function getParameterTypeString(ReflectionParameter $rp, ?Param $paramTag): string
{
$docBlockType = $this->docBlockParser->getParamTypeString($paramTag);
$isDocBlockTypeGeneric = false;
if ($docBlockType !== null) {
if (in_array(strtolower($docBlockType), ['mixed', 'unknown', ''])) {
$isDocBlockTypeGeneric = true;
}
} else {
$isDocBlockTypeGeneric = true; // No tag or no type in tag implies generic
}
$reflectionType = $rp->getType();
$reflectionTypeString = null;
if ($reflectionType) {
$reflectionTypeString = $this->getTypeStringFromReflection($reflectionType, $rp->allowsNull());
}
// Prioritize Reflection if DocBlock type is generic AND Reflection provides a more specific type
if ($isDocBlockTypeGeneric && $reflectionTypeString !== null && $reflectionTypeString !== 'mixed') {
return $reflectionTypeString;
}
// Otherwise, use the DocBlock type if it was valid and non-generic
if ($docBlockType !== null && !$isDocBlockTypeGeneric) {
// Consider if DocBlock adds nullability missing from reflection
if (stripos($docBlockType, 'null') !== false && $reflectionTypeString && stripos($reflectionTypeString, 'null') === false && !str_ends_with($reflectionTypeString, '|null')) {
// If reflection didn't capture null, but docblock did, append |null (if not already mixed)
if ($reflectionTypeString !== 'mixed') {
return $reflectionTypeString . '|null';
}
}
return $docBlockType;
}
// Fallback to Reflection type even if it was generic ('mixed')
if ($reflectionTypeString !== null) {
return $reflectionTypeString;
}
// Default to 'mixed' if nothing else found
return 'mixed';
}
/**
* Converts a ReflectionType object into a type string representation.
*/
private function getTypeStringFromReflection(?ReflectionType $type, bool $nativeAllowsNull): string
{
if ($type === null) {
return 'mixed';
}
$types = [];
if ($type instanceof ReflectionUnionType) {
foreach ($type->getTypes() as $innerType) {
$types[] = $this->getTypeStringFromReflection($innerType, $innerType->allowsNull());
}
if ($nativeAllowsNull) {
$types = array_filter($types, fn($t) => strtolower($t) !== 'null');
}
$typeString = implode('|', array_unique(array_filter($types)));
} elseif ($type instanceof ReflectionIntersectionType) {
foreach ($type->getTypes() as $innerType) {
$types[] = $this->getTypeStringFromReflection($innerType, false);
}
$typeString = implode('&', array_unique(array_filter($types)));
} elseif ($type instanceof ReflectionNamedType) {
$typeString = $type->getName();
} else {
return 'mixed';
}
$typeString = match (strtolower($typeString)) {
'bool' => 'boolean',
'int' => 'integer',
'float', 'double' => 'number',
'str' => 'string',
default => $typeString,
};
$isNullable = $nativeAllowsNull;
if ($type instanceof ReflectionNamedType && $type->getName() === 'mixed') {
$isNullable = true;
}
if ($type instanceof ReflectionUnionType && !$nativeAllowsNull) {
foreach ($type->getTypes() as $innerType) {
if ($innerType instanceof ReflectionNamedType && strtolower($innerType->getName()) === 'null') {
$isNullable = true;
break;
}
}
}
if ($isNullable && $typeString !== 'mixed' && stripos($typeString, 'null') === false) {
if (!str_ends_with($typeString, '|null') && !str_ends_with($typeString, '&null')) {
$typeString .= '|null';
}
}
// Remove leading backslash from class names, but handle built-ins like 'int' or unions like 'int|string'
if (str_contains($typeString, '\\')) {
$parts = preg_split('/([|&])/', $typeString, -1, PREG_SPLIT_DELIM_CAPTURE);
$processedParts = array_map(fn($part) => str_starts_with($part, '\\') ? ltrim($part, '\\') : $part, $parts);
$typeString = implode('', $processedParts);
}
return $typeString ?: 'mixed';
}
/**
* Maps a PHP type string (potentially a union) to an array of JSON Schema type names.
*/
private function mapPhpTypeToJsonSchemaType(string $phpTypeString): array
{
$normalizedType = strtolower(trim($phpTypeString));
// PRIORITY 1: Check for array{} syntax which should be treated as object
if (preg_match('/^array\s*{/i', $normalizedType)) {
return ['object'];
}
// PRIORITY 2: Check for array syntax first (T[] or generics)
if (
str_contains($normalizedType, '[]') ||
preg_match('/^(array|list|iterable|collection)</i', $normalizedType)
) {
return ['array'];
}
// PRIORITY 3: Handle unions (recursive)
if (str_contains($normalizedType, '|')) {
$types = explode('|', $normalizedType);
$jsonTypes = [];
foreach ($types as $type) {
$mapped = $this->mapPhpTypeToJsonSchemaType(trim($type));
$jsonTypes = array_merge($jsonTypes, $mapped);
}
return array_values(array_unique($jsonTypes));
}
// PRIORITY 4: Handle simple built-in types
return match ($normalizedType) {
'string', 'scalar' => ['string'],
'?string' => ['null', 'string'],
'int', 'integer' => ['integer'],
'?int', '?integer' => ['null', 'integer'],
'float', 'double', 'number' => ['number'],
'?float', '?double', '?number' => ['null', 'number'],
'bool', 'boolean' => ['boolean'],
'?bool', '?boolean' => ['null', 'boolean'],
'array' => ['array'],
'?array' => ['null', 'array'],
'object', 'stdclass' => ['object'],
'?object', '?stdclass' => ['null', 'object'],
'null' => ['null'],
'resource', 'callable' => ['object'],
'mixed' => [],
'void', 'never' => [],
default => ['object'],
};
}
/**
* Infers the 'items' schema type for an array based on DocBlock type hints.
*/
private function inferArrayItemsType(string $phpTypeString): string|array
{
$normalizedType = trim($phpTypeString);
// Case 1: Simple T[] syntax (e.g., string[], int[], bool[], etc.)
if (preg_match('/^(\\??)([\w\\\\]+)\\s*\\[\\]$/i', $normalizedType, $matches)) {
$itemType = strtolower($matches[2]);
return $this->mapSimpleTypeToJsonSchema($itemType);
}
// Case 2: Generic array<T> syntax (e.g., array<string>, array<int>, etc.)
if (preg_match('/^(\\??)array\s*<\s*([\w\\\\|]+)\s*>$/i', $normalizedType, $matches)) {
$itemType = strtolower($matches[2]);
return $this->mapSimpleTypeToJsonSchema($itemType);
}
// Case 3: Nested array<array<T>> syntax or T[][] syntax
if (
preg_match('/^(\\??)array\s*<\s*array\s*<\s*([\w\\\\|]+)\s*>\s*>$/i', $normalizedType, $matches) ||
preg_match('/^(\\??)([\w\\\\]+)\s*\[\]\[\]$/i', $normalizedType, $matches)
) {
$innerType = $this->mapSimpleTypeToJsonSchema(isset($matches[2]) ? strtolower($matches[2]) : 'any');
// Return a schema for array with items being arrays
return [
'type' => 'array',
'items' => [
'type' => $innerType
]
];
}
// Case 4: Object-like array syntax (e.g., array{name: string, age: int})
if (preg_match('/^(\\??)array\s*\{(.+)\}$/is', $normalizedType, $matches)) {
return $this->parseObjectLikeArray($matches[2]);
}
return 'any';
}
/**
* Parses object-like array syntax into a JSON Schema object
*/
private function parseObjectLikeArray(string $propertiesStr): array
{
$properties = [];
$required = [];
// Parse properties from the string, handling nested structures
$depth = 0;
$buffer = '';
for ($i = 0; $i < strlen($propertiesStr); $i++) {
$char = $propertiesStr[$i];
// Track nested braces
if ($char === '{') {
$depth++;
$buffer .= $char;
} elseif ($char === '}') {
$depth--;
$buffer .= $char;
}
// Property separator (comma)
elseif ($char === ',' && $depth === 0) {
// Process the completed property
$this->parsePropertyDefinition(trim($buffer), $properties, $required);
$buffer = '';
} else {
$buffer .= $char;
}
}
// Process the last property
if (!empty($buffer)) {
$this->parsePropertyDefinition(trim($buffer), $properties, $required);
}
if (!empty($properties)) {
return [
'type' => 'object',
'properties' => $properties,
'required' => $required
];
}
return ['type' => 'object'];
}
/**
* Parses a single property definition from an object-like array syntax
*/
private function parsePropertyDefinition(string $propDefinition, array &$properties, array &$required): void
{
// Match property name and type
if (preg_match('/^([a-zA-Z_\x80-\xff][a-zA-Z0-9_\x80-\xff]*)\s*:\s*(.+)$/i', $propDefinition, $matches)) {
$propName = $matches[1];
$propType = trim($matches[2]);
// Add to required properties
$required[] = $propName;
// Check for nested array{} syntax
if (preg_match('/^array\s*\{(.+)\}$/is', $propType, $nestedMatches)) {
$nestedSchema = $this->parseObjectLikeArray($nestedMatches[1]);
$properties[$propName] = $nestedSchema;
}
// Check for array<T> or T[] syntax
elseif (
preg_match('/^array\s*<\s*([\w\\\\|]+)\s*>$/i', $propType, $arrayMatches) ||
preg_match('/^([\w\\\\]+)\s*\[\]$/i', $propType, $arrayMatches)
) {
$itemType = $arrayMatches[1] ?? 'any';
$properties[$propName] = [
'type' => 'array',
'items' => [
'type' => $this->mapSimpleTypeToJsonSchema($itemType)
]
];
}
// Simple type
else {
$properties[$propName] = ['type' => $this->mapSimpleTypeToJsonSchema($propType)];
}
}
}
/**
* Helper method to map basic PHP types to JSON Schema types
*/
private function mapSimpleTypeToJsonSchema(string $type): string
{
return match (strtolower($type)) {
'string' => 'string',
'int', 'integer' => 'integer',
'bool', 'boolean' => 'boolean',
'float', 'double', 'number' => 'number',
'array' => 'array',
'object', 'stdclass' => 'object',
default => in_array(strtolower($type), ['datetime', 'datetimeinterface']) ? 'string' : 'object',
};
}
}
```
--------------------------------------------------------------------------------
/tests/Unit/DispatcherTest.php:
--------------------------------------------------------------------------------
```php
<?php
namespace PhpMcp\Server\Tests\Unit;
use Mockery;
use Mockery\MockInterface;
use PhpMcp\Schema\ClientCapabilities;
use PhpMcp\Server\Context;
use PhpMcp\Server\Configuration;
use PhpMcp\Server\Contracts\CompletionProviderInterface;
use PhpMcp\Server\Contracts\SessionInterface;
use PhpMcp\Server\Dispatcher;
use PhpMcp\Server\Elements\RegisteredPrompt;
use PhpMcp\Server\Elements\RegisteredResource;
use PhpMcp\Server\Elements\RegisteredResourceTemplate;
use PhpMcp\Server\Elements\RegisteredTool;
use PhpMcp\Server\Exception\McpServerException;
use PhpMcp\Schema\Implementation;
use PhpMcp\Schema\JsonRpc\Notification as JsonRpcNotification;
use PhpMcp\Schema\JsonRpc\Request as JsonRpcRequest;
use PhpMcp\Schema\Prompt as PromptSchema;
use PhpMcp\Schema\PromptArgument;
use PhpMcp\Schema\Request\CallToolRequest;
use PhpMcp\Schema\Request\CompletionCompleteRequest;
use PhpMcp\Schema\Request\GetPromptRequest;
use PhpMcp\Schema\Request\InitializeRequest;
use PhpMcp\Schema\Request\ListToolsRequest;
use PhpMcp\Schema\Request\ReadResourceRequest;
use PhpMcp\Schema\Request\ResourceSubscribeRequest;
use PhpMcp\Schema\Request\SetLogLevelRequest;
use PhpMcp\Schema\Resource as ResourceSchema;
use PhpMcp\Schema\ResourceTemplate as ResourceTemplateSchema;
use PhpMcp\Schema\Result\CallToolResult;
use PhpMcp\Schema\Result\CompletionCompleteResult;
use PhpMcp\Schema\Result\EmptyResult;
use PhpMcp\Schema\Result\GetPromptResult;
use PhpMcp\Schema\Result\InitializeResult;
use PhpMcp\Schema\Result\ReadResourceResult;
use PhpMcp\Schema\ServerCapabilities;
use PhpMcp\Schema\Tool as ToolSchema;
use PhpMcp\Server\Registry;
use PhpMcp\Server\Session\SubscriptionManager;
use PhpMcp\Server\Utils\SchemaValidator;
use Psr\Container\ContainerInterface;
use Psr\Log\NullLogger;
use PhpMcp\Schema\Content\TextContent;
use PhpMcp\Schema\Content\PromptMessage;
use PhpMcp\Schema\Enum\LoggingLevel;
use PhpMcp\Schema\Enum\Role;
use PhpMcp\Schema\PromptReference;
use PhpMcp\Schema\Request\ListPromptsRequest;
use PhpMcp\Schema\Request\ListResourcesRequest;
use PhpMcp\Schema\Request\ListResourceTemplatesRequest;
use PhpMcp\Schema\ResourceReference;
use PhpMcp\Server\Protocol;
use PhpMcp\Server\Tests\Fixtures\Enums\StatusEnum;
use React\EventLoop\Loop;
const DISPATCHER_SESSION_ID = 'dispatcher-session-xyz';
const DISPATCHER_PAGINATION_LIMIT = 3;
beforeEach(function () {
/** @var MockInterface&Configuration $configuration */
$this->configuration = Mockery::mock(Configuration::class);
/** @var MockInterface&Registry $registry */
$this->registry = Mockery::mock(Registry::class);
/** @var MockInterface&SubscriptionManager $subscriptionManager */
$this->subscriptionManager = Mockery::mock(SubscriptionManager::class);
/** @var MockInterface&SchemaValidator $schemaValidator */
$this->schemaValidator = Mockery::mock(SchemaValidator::class);
/** @var MockInterface&SessionInterface $session */
$this->session = Mockery::mock(SessionInterface::class);
/** @var MockInterface&ContainerInterface $container */
$this->container = Mockery::mock(ContainerInterface::class);
$this->context = new Context($this->session);
$configuration = new Configuration(
serverInfo: Implementation::make('DispatcherTestServer', '1.0'),
capabilities: ServerCapabilities::make(),
paginationLimit: DISPATCHER_PAGINATION_LIMIT,
logger: new NullLogger(),
loop: Loop::get(),
cache: null,
container: $this->container
);
$this->dispatcher = new Dispatcher(
$configuration,
$this->registry,
$this->subscriptionManager,
$this->schemaValidator
);
});
it('routes to handleInitialize for initialize request', function () {
$request = new JsonRpcRequest(
jsonrpc: '2.0',
id: 1,
method: 'initialize',
params: [
'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
'clientInfo' => ['name' => 'client', 'version' => '1.0'],
'capabilities' => [],
]
);
$this->session->shouldReceive('set')->with('client_info', Mockery::on(fn($value) => $value['name'] === 'client' && $value['version'] === '1.0'))->once();
$this->session->shouldReceive('set')->with('protocol_version', Protocol::LATEST_PROTOCOL_VERSION)->once();
$result = $this->dispatcher->handleRequest($request, $this->context);
expect($result)->toBeInstanceOf(InitializeResult::class);
expect($result->protocolVersion)->toBe(Protocol::LATEST_PROTOCOL_VERSION);
expect($result->serverInfo->name)->toBe('DispatcherTestServer');
});
it('routes to handlePing for ping request', function () {
$request = new JsonRpcRequest('2.0', 'id1', 'ping', []);
$result = $this->dispatcher->handleRequest($request, $this->context);
expect($result)->toBeInstanceOf(EmptyResult::class);
});
it('throws MethodNotFound for unknown request method', function () {
$rawRequest = new JsonRpcRequest('2.0', 'id1', 'unknown/method', []);
$this->dispatcher->handleRequest($rawRequest, $this->context);
})->throws(McpServerException::class, "Method 'unknown/method' not found.");
it('routes to handleNotificationInitialized for initialized notification', function () {
$notification = new JsonRpcNotification('2.0', 'notifications/initialized', []);
$this->session->shouldReceive('set')->with('initialized', true)->once();
$this->dispatcher->handleNotification($notification, $this->session);
});
it('does nothing for unknown notification method', function () {
$rawNotification = new JsonRpcNotification('2.0', 'unknown/notification', []);
$this->session->shouldNotReceive('set');
$this->dispatcher->handleNotification($rawNotification, $this->session);
});
it('can handle initialize request', function () {
$clientInfo = Implementation::make('TestClient', '0.9.9');
$request = InitializeRequest::make(1, Protocol::LATEST_PROTOCOL_VERSION, ClientCapabilities::make(), $clientInfo, []);
$this->session->shouldReceive('set')->with('client_info', $clientInfo->toArray())->once();
$this->session->shouldReceive('set')->with('protocol_version', Protocol::LATEST_PROTOCOL_VERSION)->once();
$result = $this->dispatcher->handleInitialize($request, $this->session);
expect($result->protocolVersion)->toBe(Protocol::LATEST_PROTOCOL_VERSION);
expect($result->serverInfo->name)->toBe('DispatcherTestServer');
expect($result->capabilities)->toBeInstanceOf(ServerCapabilities::class);
});
it('can handle initialize request with older supported protocol version', function () {
$clientInfo = Implementation::make('TestClient', '0.9.9');
$clientRequestedVersion = '2024-11-05';
$request = InitializeRequest::make(1, $clientRequestedVersion, ClientCapabilities::make(), $clientInfo, []);
$this->session->shouldReceive('set')->with('client_info', $clientInfo->toArray())->once();
$this->session->shouldReceive('set')->with('protocol_version', $clientRequestedVersion)->once();
$result = $this->dispatcher->handleInitialize($request, $this->session);
expect($result->protocolVersion)->toBe($clientRequestedVersion);
expect($result->serverInfo->name)->toBe('DispatcherTestServer');
expect($result->capabilities)->toBeInstanceOf(ServerCapabilities::class);
});
it('can handle initialize request with unsupported protocol version', function () {
$clientInfo = Implementation::make('TestClient', '0.9.9');
$unsupportedVersion = '1999-01-01';
$request = InitializeRequest::make(1, $unsupportedVersion, ClientCapabilities::make(), $clientInfo, []);
$this->session->shouldReceive('set')->with('client_info', $clientInfo->toArray())->once();
$this->session->shouldReceive('set')->with('protocol_version', Protocol::LATEST_PROTOCOL_VERSION)->once();
$result = $this->dispatcher->handleInitialize($request, $this->session);
expect($result->protocolVersion)->toBe(Protocol::LATEST_PROTOCOL_VERSION);
expect($result->serverInfo->name)->toBe('DispatcherTestServer');
expect($result->capabilities)->toBeInstanceOf(ServerCapabilities::class);
});
it('can handle tool list request and return paginated tools', function () {
$toolSchemas = [
ToolSchema::make('tool1', ['type' => 'object', 'properties' => []]),
ToolSchema::make('tool2', ['type' => 'object', 'properties' => []]),
ToolSchema::make('tool3', ['type' => 'object', 'properties' => []]),
ToolSchema::make('tool4', ['type' => 'object', 'properties' => []]),
];
$this->registry->shouldReceive('getTools')->andReturn($toolSchemas);
$request = ListToolsRequest::make(1);
$result = $this->dispatcher->handleToolList($request);
expect($result->tools)->toHaveCount(DISPATCHER_PAGINATION_LIMIT);
expect($result->tools[0]->name)->toBe('tool1');
expect($result->nextCursor)->toBeString();
$nextCursor = $result->nextCursor;
$requestPage2 = ListToolsRequest::make(2, $nextCursor);
$resultPage2 = $this->dispatcher->handleToolList($requestPage2);
expect($resultPage2->tools)->toHaveCount(count($toolSchemas) - DISPATCHER_PAGINATION_LIMIT);
expect($resultPage2->tools[0]->name)->toBe('tool4');
expect($resultPage2->nextCursor)->toBeNull();
});
it('can handle tool call request and return result', function () {
$toolName = 'my-calculator';
$args = ['a' => 10, 'b' => 5];
$toolSchema = ToolSchema::make($toolName, ['type' => 'object', 'properties' => ['a' => ['type' => 'integer'], 'b' => ['type' => 'integer']]]);
$registeredToolMock = Mockery::mock(RegisteredTool::class, [$toolSchema, 'MyToolHandler', 'handleTool', false]);
$this->registry->shouldReceive('getTool')->with($toolName)->andReturn($registeredToolMock);
$this->schemaValidator->shouldReceive('validateAgainstJsonSchema')->with($args, $toolSchema->inputSchema)->andReturn([]); // No validation errors
$registeredToolMock->shouldReceive('call')->with($this->container, $args, $this->context)->andReturn([TextContent::make("Result: 15")]);
$request = CallToolRequest::make(1, $toolName, $args);
$result = $this->dispatcher->handleToolCall($request, $this->context);
expect($result)->toBeInstanceOf(CallToolResult::class);
expect($result->content[0]->text)->toBe("Result: 15");
expect($result->isError)->toBeFalse();
});
it('can handle tool call request and throw exception if tool not found', function () {
$this->registry->shouldReceive('getTool')->with('unknown-tool')->andReturn(null);
$request = CallToolRequest::make(1, 'unknown-tool', []);
$this->dispatcher->handleToolCall($request, $this->context);
})->throws(McpServerException::class, "Tool 'unknown-tool' not found.");
it('can handle tool call request and throw exception if argument validation fails', function () {
$toolName = 'strict-tool';
$args = ['param' => 'wrong_type'];
$toolSchema = ToolSchema::make($toolName, ['type' => 'object', 'properties' => ['param' => ['type' => 'integer']]]);
$registeredToolMock = Mockery::mock(RegisteredTool::class, [$toolSchema, 'MyToolHandler', 'handleTool', false]);
$this->registry->shouldReceive('getTool')->with($toolName)->andReturn($registeredToolMock);
$validationErrors = [['pointer' => '/param', 'keyword' => 'type', 'message' => 'Expected integer']];
$this->schemaValidator->shouldReceive('validateAgainstJsonSchema')->with($args, $toolSchema->inputSchema)->andReturn($validationErrors);
$request = CallToolRequest::make(1, $toolName, $args);
try {
$this->dispatcher->handleToolCall($request, $this->context);
} catch (McpServerException $e) {
expect($e->getMessage())->toContain("Invalid parameters for tool 'strict-tool'");
expect($e->getData()['validation_errors'])->toBeArray();
}
});
it('can handle tool call request and return error if tool execution throws exception', function () {
$toolName = 'failing-tool';
$toolSchema = ToolSchema::make($toolName, ['type' => 'object', 'properties' => []]);
$registeredToolMock = Mockery::mock(RegisteredTool::class, [$toolSchema, 'MyToolHandler', 'handleTool', false]);
$this->registry->shouldReceive('getTool')->with($toolName)->andReturn($registeredToolMock);
$this->schemaValidator->shouldReceive('validateAgainstJsonSchema')->andReturn([]);
$registeredToolMock->shouldReceive('call')->andThrow(new \RuntimeException("Tool crashed!"));
$request = CallToolRequest::make(1, $toolName, []);
$result = $this->dispatcher->handleToolCall($request, $this->context);
expect($result->isError)->toBeTrue();
expect($result->content[0]->text)->toBe("Tool execution failed: Tool crashed!");
});
it('can handle tool call request and return error if result formatting fails', function () {
$toolName = 'bad-result-tool';
$toolSchema = ToolSchema::make($toolName, ['type' => 'object', 'properties' => []]);
$registeredToolMock = Mockery::mock(RegisteredTool::class, [$toolSchema, 'MyToolHandler', 'handleTool', false]);
$this->registry->shouldReceive('getTool')->with($toolName)->andReturn($registeredToolMock);
$this->schemaValidator->shouldReceive('validateAgainstJsonSchema')->andReturn([]);
$registeredToolMock->shouldReceive('call')->andThrow(new \JsonException("Unencodable."));
$request = CallToolRequest::make(1, $toolName, []);
$result = $this->dispatcher->handleToolCall($request, $this->context);
expect($result->isError)->toBeTrue();
expect($result->content[0]->text)->toBe("Failed to serialize tool result: Unencodable.");
});
it('can handle resources list request and return paginated resources', function () {
$resourceSchemas = [
ResourceSchema::make('res://1', 'Resource1'),
ResourceSchema::make('res://2', 'Resource2'),
ResourceSchema::make('res://3', 'Resource3'),
ResourceSchema::make('res://4', 'Resource4'),
ResourceSchema::make('res://5', 'Resource5')
];
$this->registry->shouldReceive('getResources')->andReturn($resourceSchemas);
$requestP1 = ListResourcesRequest::make(1);
$resultP1 = $this->dispatcher->handleResourcesList($requestP1);
expect($resultP1->resources)->toHaveCount(DISPATCHER_PAGINATION_LIMIT);
expect(array_map(fn($r) => $r->name, $resultP1->resources))->toEqual(['Resource1', 'Resource2', 'Resource3']);
expect($resultP1->nextCursor)->toBe(base64_encode('offset=3'));
// Page 2
$requestP2 = ListResourcesRequest::make(2, $resultP1->nextCursor);
$resultP2 = $this->dispatcher->handleResourcesList($requestP2);
expect($resultP2->resources)->toHaveCount(2);
expect(array_map(fn($r) => $r->name, $resultP2->resources))->toEqual(['Resource4', 'Resource5']);
expect($resultP2->nextCursor)->toBeNull();
});
it('can handle resources list request and return empty if registry has no resources', function () {
$this->registry->shouldReceive('getResources')->andReturn([]);
$request = ListResourcesRequest::make(1);
$result = $this->dispatcher->handleResourcesList($request);
expect($result->resources)->toBeEmpty();
expect($result->nextCursor)->toBeNull();
});
it('can handle resource template list request and return paginated templates', function () {
$templateSchemas = [
ResourceTemplateSchema::make('tpl://{id}/1', 'Template1'),
ResourceTemplateSchema::make('tpl://{id}/2', 'Template2'),
ResourceTemplateSchema::make('tpl://{id}/3', 'Template3'),
ResourceTemplateSchema::make('tpl://{id}/4', 'Template4'),
];
$this->registry->shouldReceive('getResourceTemplates')->andReturn($templateSchemas);
// Page 1
$requestP1 = ListResourceTemplatesRequest::make(1);
$resultP1 = $this->dispatcher->handleResourceTemplateList($requestP1);
expect($resultP1->resourceTemplates)->toHaveCount(DISPATCHER_PAGINATION_LIMIT);
expect(array_map(fn($rt) => $rt->name, $resultP1->resourceTemplates))->toEqual(['Template1', 'Template2', 'Template3']);
expect($resultP1->nextCursor)->toBe(base64_encode('offset=3'));
// Page 2
$requestP2 = ListResourceTemplatesRequest::make(2, $resultP1->nextCursor);
$resultP2 = $this->dispatcher->handleResourceTemplateList($requestP2);
expect($resultP2->resourceTemplates)->toHaveCount(1);
expect(array_map(fn($rt) => $rt->name, $resultP2->resourceTemplates))->toEqual(['Template4']);
expect($resultP2->nextCursor)->toBeNull();
});
it('can handle resource read request and return resource contents', function () {
$uri = 'file://data.txt';
$resourceSchema = ResourceSchema::make($uri, 'file_resource');
$registeredResourceMock = Mockery::mock(RegisteredResource::class, [$resourceSchema, ['MyResourceHandler', 'read'], false]);
$resourceContents = [TextContent::make('File content')];
$this->registry->shouldReceive('getResource')->with($uri)->andReturn($registeredResourceMock);
$registeredResourceMock->shouldReceive('read')->with($this->container, $uri, $this->context)->andReturn($resourceContents);
$request = ReadResourceRequest::make(1, $uri);
$result = $this->dispatcher->handleResourceRead($request, $this->context);
expect($result)->toBeInstanceOf(ReadResourceResult::class);
expect($result->contents)->toEqual($resourceContents);
});
it('can handle resource read request and throw exception if resource not found', function () {
$this->registry->shouldReceive('getResource')->with('unknown://uri')->andReturn(null);
$request = ReadResourceRequest::make(1, 'unknown://uri');
$this->dispatcher->handleResourceRead($request, $this->context);
})->throws(McpServerException::class, "Resource URI 'unknown://uri' not found.");
it('can handle resource subscribe request and call subscription manager', function () {
$uri = 'news://updates';
$this->session->shouldReceive('getId')->andReturn(DISPATCHER_SESSION_ID);
$this->subscriptionManager->shouldReceive('subscribe')->with(DISPATCHER_SESSION_ID, $uri)->once();
$request = ResourceSubscribeRequest::make(1, $uri);
$result = $this->dispatcher->handleResourceSubscribe($request, $this->session);
expect($result)->toBeInstanceOf(EmptyResult::class);
});
it('can handle prompts list request and return paginated prompts', function () {
$promptSchemas = [
PromptSchema::make('promptA', '', []),
PromptSchema::make('promptB', '', []),
PromptSchema::make('promptC', '', []),
PromptSchema::make('promptD', '', []),
PromptSchema::make('promptE', '', []),
PromptSchema::make('promptF', '', []),
]; // 6 prompts
$this->registry->shouldReceive('getPrompts')->andReturn($promptSchemas);
// Page 1
$requestP1 = ListPromptsRequest::make(1);
$resultP1 = $this->dispatcher->handlePromptsList($requestP1);
expect($resultP1->prompts)->toHaveCount(DISPATCHER_PAGINATION_LIMIT);
expect(array_map(fn($p) => $p->name, $resultP1->prompts))->toEqual(['promptA', 'promptB', 'promptC']);
expect($resultP1->nextCursor)->toBe(base64_encode('offset=3'));
// Page 2
$requestP2 = ListPromptsRequest::make(2, $resultP1->nextCursor);
$resultP2 = $this->dispatcher->handlePromptsList($requestP2);
expect($resultP2->prompts)->toHaveCount(DISPATCHER_PAGINATION_LIMIT); // 3 more
expect(array_map(fn($p) => $p->name, $resultP2->prompts))->toEqual(['promptD', 'promptE', 'promptF']);
expect($resultP2->nextCursor)->toBeNull(); // End of list
});
it('can handle prompt get request and return prompt messages', function () {
$promptName = 'daily-summary';
$args = ['date' => '2024-07-16'];
$promptSchema = PromptSchema::make($promptName, 'summary_prompt', [PromptArgument::make('date', required: true)]);
$registeredPromptMock = Mockery::mock(RegisteredPrompt::class, [$promptSchema, ['MyPromptHandler', 'get'], false]);
$promptMessages = [PromptMessage::make(Role::User, TextContent::make("Summary for 2024-07-16"))];
$this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPromptMock);
$registeredPromptMock->shouldReceive('get')->with($this->container, $args, $this->context)->andReturn($promptMessages);
$request = GetPromptRequest::make(1, $promptName, $args);
$result = $this->dispatcher->handlePromptGet($request, $this->context);
expect($result)->toBeInstanceOf(GetPromptResult::class);
expect($result->messages)->toEqual($promptMessages);
expect($result->description)->toBe($promptSchema->description);
});
it('can handle prompt get request and throw exception if required argument is missing', function () {
$promptName = 'needs-topic';
$promptSchema = PromptSchema::make($promptName, '', [PromptArgument::make('topic', required: true)]);
$registeredPromptMock = Mockery::mock(RegisteredPrompt::class, [$promptSchema, ['MyPromptHandler', 'get'], false]);
$this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPromptMock);
$request = GetPromptRequest::make(1, $promptName, ['other_arg' => 'value']); // 'topic' is missing
$this->dispatcher->handlePromptGet($request, $this->context);
})->throws(McpServerException::class, "Missing required argument 'topic' for prompt 'needs-topic'.");
it('can handle logging set level request and set log level on session', function () {
$level = LoggingLevel::Debug;
$this->session->shouldReceive('getId')->andReturn(DISPATCHER_SESSION_ID);
$this->session->shouldReceive('set')->with('log_level', 'debug')->once();
$request = SetLogLevelRequest::make(1, $level);
$result = $this->dispatcher->handleLoggingSetLevel($request, $this->session);
expect($result)->toBeInstanceOf(EmptyResult::class);
});
it('can handle completion complete request for prompt and delegate to provider', function () {
$promptName = 'my-completable-prompt';
$argName = 'tagName';
$currentValue = 'php';
$completions = ['php-mcp', 'php-fig'];
$mockCompletionProvider = Mockery::mock(CompletionProviderInterface::class);
$providerClass = get_class($mockCompletionProvider);
$promptSchema = PromptSchema::make($promptName, '', [PromptArgument::make($argName)]);
$registeredPrompt = new RegisteredPrompt(
schema: $promptSchema,
handler: ['MyPromptHandler', 'get'],
isManual: false,
completionProviders: [$argName => $providerClass]
);
$this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPrompt);
$this->container->shouldReceive('get')->with($providerClass)->andReturn($mockCompletionProvider);
$mockCompletionProvider->shouldReceive('getCompletions')->with($currentValue, $this->session)->andReturn($completions);
$request = CompletionCompleteRequest::make(1, PromptReference::make($promptName), ['name' => $argName, 'value' => $currentValue]);
$result = $this->dispatcher->handleCompletionComplete($request, $this->session);
expect($result)->toBeInstanceOf(CompletionCompleteResult::class);
expect($result->values)->toEqual($completions);
expect($result->total)->toBe(count($completions));
expect($result->hasMore)->toBeFalse();
});
it('can handle completion complete request for resource template and delegate to provider', function () {
$templateUri = 'item://{itemId}/category/{catName}';
$uriVarName = 'catName';
$currentValue = 'boo';
$completions = ['books', 'boomerangs'];
$mockCompletionProvider = Mockery::mock(CompletionProviderInterface::class);
$providerClass = get_class($mockCompletionProvider);
$templateSchema = ResourceTemplateSchema::make($templateUri, 'item-template');
$registeredTemplate = new RegisteredResourceTemplate(
schema: $templateSchema,
handler: ['MyResourceTemplateHandler', 'get'],
isManual: false,
completionProviders: [$uriVarName => $providerClass]
);
$this->registry->shouldReceive('getResourceTemplate')->with($templateUri)->andReturn($registeredTemplate);
$this->container->shouldReceive('get')->with($providerClass)->andReturn($mockCompletionProvider);
$mockCompletionProvider->shouldReceive('getCompletions')->with($currentValue, $this->session)->andReturn($completions);
$request = CompletionCompleteRequest::make(1, ResourceReference::make($templateUri), ['name' => $uriVarName, 'value' => $currentValue]);
$result = $this->dispatcher->handleCompletionComplete($request, $this->session);
expect($result->values)->toEqual($completions);
});
it('can handle completion complete request and return empty if no provider', function () {
$promptName = 'no-provider-prompt';
$promptSchema = PromptSchema::make($promptName, '', [PromptArgument::make('arg')]);
$registeredPrompt = new RegisteredPrompt(
schema: $promptSchema,
handler: ['MyPromptHandler', 'get'],
isManual: false,
completionProviders: []
);
$this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPrompt);
$request = CompletionCompleteRequest::make(1, PromptReference::make($promptName), ['name' => 'arg', 'value' => '']);
$result = $this->dispatcher->handleCompletionComplete($request, $this->session);
expect($result->values)->toBeEmpty();
});
it('can handle completion complete request with ListCompletionProvider instance', function () {
$promptName = 'list-completion-prompt';
$argName = 'category';
$currentValue = 'bl';
$expectedCompletions = ['blog'];
$listProvider = new \PhpMcp\Server\Defaults\ListCompletionProvider(['blog', 'news', 'docs', 'api']);
$promptSchema = PromptSchema::make($promptName, '', [PromptArgument::make($argName)]);
$registeredPrompt = new RegisteredPrompt(
schema: $promptSchema,
handler: ['MyPromptHandler', 'get'],
isManual: false,
completionProviders: [$argName => $listProvider]
);
$this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPrompt);
$request = CompletionCompleteRequest::make(1, PromptReference::make($promptName), ['name' => $argName, 'value' => $currentValue]);
$result = $this->dispatcher->handleCompletionComplete($request, $this->session);
expect($result->values)->toEqual($expectedCompletions);
expect($result->total)->toBe(1);
expect($result->hasMore)->toBeFalse();
});
it('can handle completion complete request with EnumCompletionProvider instance', function () {
$promptName = 'enum-completion-prompt';
$argName = 'status';
$currentValue = 'a';
$expectedCompletions = ['archived'];
$enumProvider = new \PhpMcp\Server\Defaults\EnumCompletionProvider(StatusEnum::class);
$promptSchema = PromptSchema::make($promptName, '', [PromptArgument::make($argName)]);
$registeredPrompt = new RegisteredPrompt(
schema: $promptSchema,
handler: ['MyPromptHandler', 'get'],
isManual: false,
completionProviders: [$argName => $enumProvider]
);
$this->registry->shouldReceive('getPrompt')->with($promptName)->andReturn($registeredPrompt);
$request = CompletionCompleteRequest::make(1, PromptReference::make($promptName), ['name' => $argName, 'value' => $currentValue]);
$result = $this->dispatcher->handleCompletionComplete($request, $this->session);
expect($result->values)->toEqual($expectedCompletions);
expect($result->total)->toBe(1);
expect($result->hasMore)->toBeFalse();
});
it('decodeCursor handles null and invalid cursors', function () {
$method = new \ReflectionMethod(Dispatcher::class, 'decodeCursor');
$method->setAccessible(true);
expect($method->invoke($this->dispatcher, null))->toBe(0);
expect($method->invoke($this->dispatcher, 'not_base64_$$$'))->toBe(0);
expect($method->invoke($this->dispatcher, base64_encode('invalid_format')))->toBe(0);
expect($method->invoke($this->dispatcher, base64_encode('offset=123')))->toBe(123);
});
it('encodeNextCursor generates correct cursor or null', function () {
$method = new \ReflectionMethod(Dispatcher::class, 'encodeNextCursor');
$method->setAccessible(true);
$limit = DISPATCHER_PAGINATION_LIMIT;
expect($method->invoke($this->dispatcher, 0, $limit, 10, $limit))->toBe(base64_encode('offset=3'));
expect($method->invoke($this->dispatcher, 0, $limit, $limit, $limit))->toBeNull();
expect($method->invoke($this->dispatcher, $limit, 2, $limit + 2 + 1, $limit))->toBe(base64_encode('offset=' . ($limit + 2)));
expect($method->invoke($this->dispatcher, $limit, 1, $limit + 1, $limit))->toBeNull();
expect($method->invoke($this->dispatcher, 0, 0, 10, $limit))->toBeNull();
});
```
--------------------------------------------------------------------------------
/tests/Integration/StreamableHttpServerTransportTest.php:
--------------------------------------------------------------------------------
```php
<?php
use PhpMcp\Server\Protocol;
use PhpMcp\Server\Tests\Mocks\Clients\MockJsonHttpClient;
use PhpMcp\Server\Tests\Mocks\Clients\MockStreamHttpClient;
use React\ChildProcess\Process;
use React\Http\Browser;
use React\Http\Message\ResponseException;
use React\Stream\ReadableStreamInterface;
use function React\Async\await;
use function React\Promise\resolve;
const STREAMABLE_HTTP_SCRIPT_PATH = __DIR__ . '/../Fixtures/ServerScripts/StreamableHttpTestServer.php';
const STREAMABLE_HTTP_PROCESS_TIMEOUT = 9;
const STREAMABLE_HTTP_HOST = '127.0.0.1';
const STREAMABLE_MCP_PATH = 'mcp_streamable_json_mode';
beforeEach(function () {
if (!is_file(STREAMABLE_HTTP_SCRIPT_PATH)) {
$this->markTestSkipped("Server script not found: " . STREAMABLE_HTTP_SCRIPT_PATH);
}
if (!is_executable(STREAMABLE_HTTP_SCRIPT_PATH)) {
chmod(STREAMABLE_HTTP_SCRIPT_PATH, 0755);
}
$phpPath = PHP_BINARY ?: 'php';
$commandPhpPath = str_contains($phpPath, ' ') ? '"' . $phpPath . '"' : $phpPath;
$commandScriptPath = escapeshellarg(STREAMABLE_HTTP_SCRIPT_PATH);
$this->port = findFreePort();
$jsonModeCommandArgs = [
escapeshellarg(STREAMABLE_HTTP_HOST),
escapeshellarg((string)$this->port),
escapeshellarg(STREAMABLE_MCP_PATH),
escapeshellarg('true'), // enableJsonResponse = true
];
$this->jsonModeCommand = $commandPhpPath . ' ' . $commandScriptPath . ' ' . implode(' ', $jsonModeCommandArgs);
$streamModeCommandArgs = [
escapeshellarg(STREAMABLE_HTTP_HOST),
escapeshellarg((string)$this->port),
escapeshellarg(STREAMABLE_MCP_PATH),
escapeshellarg('false'), // enableJsonResponse = false
];
$this->streamModeCommand = $commandPhpPath . ' ' . $commandScriptPath . ' ' . implode(' ', $streamModeCommandArgs);
$statelessModeCommandArgs = [
escapeshellarg(STREAMABLE_HTTP_HOST),
escapeshellarg((string)$this->port),
escapeshellarg(STREAMABLE_MCP_PATH),
escapeshellarg('true'), // enableJsonResponse = true
escapeshellarg('false'), // useEventStore = false
escapeshellarg('true'), // stateless = true
];
$this->statelessModeCommand = $commandPhpPath . ' ' . $commandScriptPath . ' ' . implode(' ', $statelessModeCommandArgs);
$this->process = null;
});
afterEach(function () {
if ($this->process instanceof Process && $this->process->isRunning()) {
if ($this->process->stdout instanceof ReadableStreamInterface) {
$this->process->stdout->close();
}
if ($this->process->stderr instanceof ReadableStreamInterface) {
$this->process->stderr->close();
}
$this->process->terminate(SIGTERM);
try {
await(delay(0.02));
} catch (\Throwable $e) {
}
if ($this->process->isRunning()) {
$this->process->terminate(SIGKILL);
}
}
$this->process = null;
});
describe('JSON MODE', function () {
beforeEach(function () {
$this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
$this->process->start();
$this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);
await(delay(0.2));
});
it('server starts, initializes via POST JSON, calls a tool, and closes', function () {
// 1. Initialize
$initResult = await($this->jsonClient->sendRequest('initialize', [
'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'],
'capabilities' => []
], 'init-json-1'));
expect($initResult['statusCode'])->toBe(200);
expect($initResult['body']['id'])->toBe('init-json-1');
expect($initResult['body'])->not->toHaveKey('error');
expect($initResult['body']['result']['protocolVersion'])->toBe(Protocol::LATEST_PROTOCOL_VERSION);
expect($initResult['body']['result']['serverInfo']['name'])->toBe('StreamableHttpIntegrationServer');
expect($this->jsonClient->sessionId)->toBeString()->not->toBeEmpty();
// 2. Initialized notification
$notifResult = await($this->jsonClient->sendNotification('notifications/initialized'));
expect($notifResult['statusCode'])->toBe(202);
// 3. Call a tool
$toolResult = await($this->jsonClient->sendRequest('tools/call', [
'name' => 'greet_streamable_tool',
'arguments' => ['name' => 'JSON Mode User']
], 'tool-json-1'));
expect($toolResult['statusCode'])->toBe(200);
expect($toolResult['body']['id'])->toBe('tool-json-1');
expect($toolResult['body'])->not->toHaveKey('error');
expect($toolResult['body']['result']['content'][0]['text'])->toBe('Hello, JSON Mode User!');
// Server process is terminated in afterEach
})->group('integration', 'streamable_http_json');
it('return HTTP 400 error response for invalid JSON in POST request', function () {
$malformedJson = '{"jsonrpc":"2.0", "id": "bad-json-post-1", "method": "tools/list", "params": {"broken"}';
$promise = $this->jsonClient->browser->post(
$this->jsonClient->baseUrl,
['Content-Type' => 'application/json', 'Accept' => 'application/json'],
$malformedJson
);
try {
await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));
} catch (ResponseException $e) {
expect($e->getResponse()->getStatusCode())->toBe(400);
$bodyContent = (string) $e->getResponse()->getBody();
$decodedBody = json_decode($bodyContent, true);
expect($decodedBody['jsonrpc'])->toBe('2.0');
expect($decodedBody['id'])->toBe('');
expect($decodedBody['error']['code'])->toBe(-32700);
expect($decodedBody['error']['message'])->toContain('Invalid JSON');
}
})->group('integration', 'streamable_http_json');
it('returns JSON-RPC error result for request for non-existent method', function () {
// 1. Initialize
await($this->jsonClient->sendRequest('initialize', ['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => []], 'init-json-err'));
await($this->jsonClient->sendNotification('notifications/initialized'));
// 2. Request non-existent method
$errorResult = await($this->jsonClient->sendRequest('non/existentToolViaJson', [], 'err-meth-json-1'));
expect($errorResult['statusCode'])->toBe(200);
expect($errorResult['body']['id'])->toBe('err-meth-json-1');
expect($errorResult['body']['error']['code'])->toBe(-32601);
expect($errorResult['body']['error']['message'])->toContain("Method 'non/existentToolViaJson' not found");
})->group('integration', 'streamable_http_json');
it('can handle batch requests correctly', function () {
// 1. Initialize
await($this->jsonClient->sendRequest('initialize', [
'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'],
'capabilities' => []
], 'init-json-batch'));
expect($this->jsonClient->sessionId)->toBeString()->not->toBeEmpty();
await($this->jsonClient->sendNotification('notifications/initialized'));
// 2. Send Batch Request
$batchRequests = [
['jsonrpc' => '2.0', 'id' => 'batch-req-1', 'method' => 'tools/call', 'params' => ['name' => 'greet_streamable_tool', 'arguments' => ['name' => 'Batch Item 1']]],
['jsonrpc' => '2.0', 'method' => 'notifications/something'],
['jsonrpc' => '2.0', 'id' => 'batch-req-2', 'method' => 'tools/call', 'params' => ['name' => 'sum_streamable_tool', 'arguments' => ['a' => 10, 'b' => 20]]],
['jsonrpc' => '2.0', 'id' => 'batch-req-3', 'method' => 'nonexistent/method']
];
$batchResponse = await($this->jsonClient->sendBatchRequest($batchRequests));
$findResponseById = function (array $batch, $id) {
foreach ($batch as $item) {
if (isset($item['id']) && $item['id'] === $id) {
return $item;
}
}
return null;
};
expect($batchResponse['statusCode'])->toBe(200);
expect($batchResponse['body'])->toBeArray()->toHaveCount(3);
$response1 = $findResponseById($batchResponse['body'], 'batch-req-1');
$response2 = $findResponseById($batchResponse['body'], 'batch-req-2');
$response3 = $findResponseById($batchResponse['body'], 'batch-req-3');
expect($response1['result']['content'][0]['text'])->toBe('Hello, Batch Item 1!');
expect($response2['result']['content'][0]['text'])->toBe('30');
expect($response3['error']['code'])->toBe(-32601);
expect($response3['error']['message'])->toContain("Method 'nonexistent/method' not found");
})->group('integration', 'streamable_http_json');
it('can handle tool list request', function () {
await($this->jsonClient->sendRequest('initialize', [
'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'],
'capabilities' => []
], 'init-json-tools'));
await($this->jsonClient->sendNotification('notifications/initialized'));
$toolListResult = await($this->jsonClient->sendRequest('tools/list', [], 'tool-list-json-1'));
expect($toolListResult['statusCode'])->toBe(200);
expect($toolListResult['body']['id'])->toBe('tool-list-json-1');
expect($toolListResult['body']['result']['tools'])->toBeArray();
expect(count($toolListResult['body']['result']['tools']))->toBe(4);
expect($toolListResult['body']['result']['tools'][0]['name'])->toBe('greet_streamable_tool');
expect($toolListResult['body']['result']['tools'][1]['name'])->toBe('sum_streamable_tool');
expect($toolListResult['body']['result']['tools'][2]['name'])->toBe('tool_reads_context');
})->group('integration', 'streamable_http_json');
it('passes request in Context', function () {
await($this->jsonClient->sendRequest('initialize', [
'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'],
'capabilities' => []
], 'init-json-context'));
await($this->jsonClient->sendNotification('notifications/initialized'));
$toolResult = await($this->jsonClient->sendRequest('tools/call', [
'name' => 'tool_reads_context',
'arguments' => []
], 'tool-json-context-1', ['X-Test-Header' => 'TestValue']));
expect($toolResult['statusCode'])->toBe(200);
expect($toolResult['body']['id'])->toBe('tool-json-context-1');
expect($toolResult['body'])->not->toHaveKey('error');
expect($toolResult['body']['result']['content'][0]['text'])->toBe('TestValue');
})->group('integration', 'streamable_http_json');
it('can read a registered resource', function () {
await($this->jsonClient->sendRequest('initialize', [
'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'],
'capabilities' => []
], 'init-json-res'));
await($this->jsonClient->sendNotification('notifications/initialized'));
$resourceResult = await($this->jsonClient->sendRequest('resources/read', ['uri' => 'test://streamable/static'], 'res-read-json-1'));
expect($resourceResult['statusCode'])->toBe(200);
expect($resourceResult['body']['id'])->toBe('res-read-json-1');
$contents = $resourceResult['body']['result']['contents'];
expect($contents[0]['uri'])->toBe('test://streamable/static');
expect($contents[0]['text'])->toBe(\PhpMcp\Server\Tests\Fixtures\General\ResourceHandlerFixture::$staticTextContent);
})->group('integration', 'streamable_http_json');
it('can get a registered prompt', function () {
await($this->jsonClient->sendRequest('initialize', ['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => []], 'init-json-prompt'));
await($this->jsonClient->sendNotification('notifications/initialized'));
$promptResult = await($this->jsonClient->sendRequest('prompts/get', [
'name' => 'simple_streamable_prompt',
'arguments' => ['name' => 'JsonPromptUser', 'style' => 'terse']
], 'prompt-get-json-1'));
expect($promptResult['statusCode'])->toBe(200);
expect($promptResult['body']['id'])->toBe('prompt-get-json-1');
$messages = $promptResult['body']['result']['messages'];
expect($messages[0]['content']['text'])->toBe('Craft a terse greeting for JsonPromptUser.');
})->group('integration', 'streamable_http_json');
it('rejects subsequent requests if client does not send initialized notification', function () {
// 1. Initialize ONLY
await($this->jsonClient->sendRequest('initialize', ['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => []], 'init-json-noack'));
// Client "forgets" to send notifications/initialized back
// 2. Attempt to Call a tool
$toolResult = await($this->jsonClient->sendRequest('tools/call', [
'name' => 'greet_streamable_tool',
'arguments' => ['name' => 'NoAckJsonUser']
], 'tool-json-noack'));
expect($toolResult['statusCode'])->toBe(200); // HTTP is fine
expect($toolResult['body']['id'])->toBe('tool-json-noack');
expect($toolResult['body']['error']['code'])->toBe(-32600); // Invalid Request
expect($toolResult['body']['error']['message'])->toContain('Client session not initialized');
})->group('integration', 'streamable_http_json');
it('returns HTTP 400 error for non-initialize requests without Mcp-Session-Id', function () {
await($this->jsonClient->sendRequest('initialize', ['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => []], 'init-sess-test'));
$this->jsonClient->sessionId = null;
try {
await($this->jsonClient->sendRequest('tools/list', [], 'tools-list-no-session'));
} catch (ResponseException $e) {
expect($e->getResponse()->getStatusCode())->toBe(400);
$bodyContent = (string) $e->getResponse()->getBody();
$decodedBody = json_decode($bodyContent, true);
expect($decodedBody['jsonrpc'])->toBe('2.0');
expect($decodedBody['id'])->toBe('tools-list-no-session');
expect($decodedBody['error']['code'])->toBe(-32600);
expect($decodedBody['error']['message'])->toContain('Mcp-Session-Id header required');
}
})->group('integration', 'streamable_http_json');
});
describe('STREAM MODE', function () {
beforeEach(function () {
$this->process = new Process($this->streamModeCommand, getcwd() ?: null, null, []);
$this->process->start();
$this->streamClient = new MockStreamHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);
await(delay(0.2));
});
afterEach(function () {
if ($this->streamClient ?? null) {
$this->streamClient->closeMainSseStream();
}
});
it('server starts, initializes via POST JSON, calls a tool, and closes', function () {
// 1. Initialize Request
$initResponse = await($this->streamClient->sendInitializeRequest([
'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
'clientInfo' => ['name' => 'StreamModeClient', 'version' => '1.0'],
'capabilities' => []
], 'init-stream-1'));
expect($this->streamClient->sessionId)->toBeString()->not->toBeEmpty();
expect($initResponse['id'])->toBe('init-stream-1');
expect($initResponse)->not->toHaveKey('error');
expect($initResponse['result']['protocolVersion'])->toBe(Protocol::LATEST_PROTOCOL_VERSION);
expect($initResponse['result']['serverInfo']['name'])->toBe('StreamableHttpIntegrationServer');
// 2. Send Initialized Notification
$notifResult = await($this->streamClient->sendHttpNotification('notifications/initialized'));
expect($notifResult['statusCode'])->toBe(202);
// 3. Call a tool
$toolResponse = await($this->streamClient->sendRequest('tools/call', [
'name' => 'greet_streamable_tool',
'arguments' => ['name' => 'Stream Mode User']
], 'tool-stream-1'));
expect($toolResponse['id'])->toBe('tool-stream-1');
expect($toolResponse)->not->toHaveKey('error');
expect($toolResponse['result']['content'][0]['text'])->toBe('Hello, Stream Mode User!');
})->group('integration', 'streamable_http_stream');
it('return HTTP 400 error response for invalid JSON in POST request', function () {
$malformedJson = '{"jsonrpc":"2.0", "id": "bad-json-stream-1", "method": "tools/list", "params": {"broken"}';
$postPromise = $this->streamClient->browser->post(
$this->streamClient->baseMcpUrl,
['Content-Type' => 'application/json', 'Accept' => 'text/event-stream'],
$malformedJson
);
try {
await(timeout($postPromise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));
} catch (ResponseException $e) {
$httpResponse = $e->getResponse();
$bodyContent = (string) $httpResponse->getBody();
$decodedBody = json_decode($bodyContent, true);
expect($httpResponse->getStatusCode())->toBe(400);
expect($decodedBody['jsonrpc'])->toBe('2.0');
expect($decodedBody['id'])->toBe('');
expect($decodedBody['error']['code'])->toBe(-32700);
expect($decodedBody['error']['message'])->toContain('Invalid JSON');
}
})->group('integration', 'streamable_http_stream');
it('returns JSON-RPC error result for request for non-existent method', function () {
// 1. Initialize
await($this->streamClient->sendInitializeRequest([
'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
'clientInfo' => ['name' => 'StreamModeClient', 'version' => '1.0'],
'capabilities' => []
], 'init-stream-err'));
await($this->streamClient->sendHttpNotification('notifications/initialized'));
// 2. Send Request
$errorResponse = await($this->streamClient->sendRequest('non/existentToolViaStream', [], 'err-meth-stream-1'));
expect($errorResponse['id'])->toBe('err-meth-stream-1');
expect($errorResponse['error']['code'])->toBe(-32601);
expect($errorResponse['error']['message'])->toContain("Method 'non/existentToolViaStream' not found");
})->group('integration', 'streamable_http_stream');
it('can handle batch requests correctly', function () {
await($this->streamClient->sendInitializeRequest([
'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
'clientInfo' => ['name' => 'StreamModeBatchClient', 'version' => '1.0'],
'capabilities' => []
], 'init-stream-batch'));
expect($this->streamClient->sessionId)->toBeString()->not->toBeEmpty();
await($this->streamClient->sendHttpNotification('notifications/initialized'));
$batchRequests = [
['jsonrpc' => '2.0', 'id' => 'batch-req-1', 'method' => 'tools/call', 'params' => ['name' => 'greet_streamable_tool', 'arguments' => ['name' => 'Batch Item 1']]],
['jsonrpc' => '2.0', 'method' => 'notifications/something'],
['jsonrpc' => '2.0', 'id' => 'batch-req-2', 'method' => 'tools/call', 'params' => ['name' => 'sum_streamable_tool', 'arguments' => ['a' => 10, 'b' => 20]]],
['jsonrpc' => '2.0', 'id' => 'batch-req-3', 'method' => 'nonexistent/method']
];
$batchResponseArray = await($this->streamClient->sendBatchRequest($batchRequests));
expect($batchResponseArray)->toBeArray()->toHaveCount(3);
$findResponseById = function (array $batch, $id) {
foreach ($batch as $item) {
if (isset($item['id']) && $item['id'] === $id) {
return $item;
}
}
return null;
};
$response1 = $findResponseById($batchResponseArray, 'batch-req-1');
$response2 = $findResponseById($batchResponseArray, 'batch-req-2');
$response3 = $findResponseById($batchResponseArray, 'batch-req-3');
expect($response1['result']['content'][0]['text'])->toBe('Hello, Batch Item 1!');
expect($response2['result']['content'][0]['text'])->toBe('30');
expect($response3['error']['code'])->toBe(-32601);
expect($response3['error']['message'])->toContain("Method 'nonexistent/method' not found");
})->group('integration', 'streamable_http_stream');
it('passes request in Context', function () {
await($this->streamClient->sendInitializeRequest([
'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
'clientInfo' => ['name' => 'StreamModeClient', 'version' => '1.0'],
'capabilities' => []
], 'init-stream-context'));
expect($this->streamClient->sessionId)->toBeString()->not->toBeEmpty();
await($this->streamClient->sendHttpNotification('notifications/initialized'));
$toolResult = await($this->streamClient->sendRequest('tools/call', [
'name' => 'tool_reads_context',
'arguments' => []
], 'tool-stream-context-1', ['X-Test-Header' => 'TestValue']));
expect($toolResult['id'])->toBe('tool-stream-context-1');
expect($toolResult)->not->toHaveKey('error');
expect($toolResult['result']['content'][0]['text'])->toBe('TestValue');
})->group('integration', 'streamable_http_stream');
it('can handle tool list request', function () {
await($this->streamClient->sendInitializeRequest(['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => []], 'init-stream-tools'));
await($this->streamClient->sendHttpNotification('notifications/initialized'));
$toolListResponse = await($this->streamClient->sendRequest('tools/list', [], 'tool-list-stream-1'));
expect($toolListResponse['id'])->toBe('tool-list-stream-1');
expect($toolListResponse)->not->toHaveKey('error');
expect($toolListResponse['result']['tools'])->toBeArray();
expect(count($toolListResponse['result']['tools']))->toBe(4);
expect($toolListResponse['result']['tools'][0]['name'])->toBe('greet_streamable_tool');
expect($toolListResponse['result']['tools'][1]['name'])->toBe('sum_streamable_tool');
expect($toolListResponse['result']['tools'][2]['name'])->toBe('tool_reads_context');
})->group('integration', 'streamable_http_stream');
it('can read a registered resource', function () {
await($this->streamClient->sendInitializeRequest(['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => []], 'init-stream-res'));
await($this->streamClient->sendHttpNotification('notifications/initialized'));
$resourceResponse = await($this->streamClient->sendRequest('resources/read', ['uri' => 'test://streamable/static'], 'res-read-stream-1'));
expect($resourceResponse['id'])->toBe('res-read-stream-1');
$contents = $resourceResponse['result']['contents'];
expect($contents[0]['uri'])->toBe('test://streamable/static');
expect($contents[0]['text'])->toBe(\PhpMcp\Server\Tests\Fixtures\General\ResourceHandlerFixture::$staticTextContent);
})->group('integration', 'streamable_http_stream');
it('can get a registered prompt', function () {
await($this->streamClient->sendInitializeRequest(['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => []], 'init-stream-prompt'));
await($this->streamClient->sendHttpNotification('notifications/initialized'));
$promptResponse = await($this->streamClient->sendRequest('prompts/get', [
'name' => 'simple_streamable_prompt',
'arguments' => ['name' => 'StreamPromptUser', 'style' => 'formal']
], 'prompt-get-stream-1'));
expect($promptResponse['id'])->toBe('prompt-get-stream-1');
$messages = $promptResponse['result']['messages'];
expect($messages[0]['content']['text'])->toBe('Craft a formal greeting for StreamPromptUser.');
})->group('integration', 'streamable_http_stream');
it('rejects subsequent requests if client does not send initialized notification', function () {
await($this->streamClient->sendInitializeRequest([
'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
'clientInfo' => ['name' => 'StreamModeClient', 'version' => '1.0'],
'capabilities' => []
], 'init-stream-noack'));
$toolResponse = await($this->streamClient->sendRequest('tools/call', [
'name' => 'greet_streamable_tool',
'arguments' => ['name' => 'NoAckStreamUser']
], 'tool-stream-noack'));
expect($toolResponse['id'])->toBe('tool-stream-noack');
expect($toolResponse['error']['code'])->toBe(-32600);
expect($toolResponse['error']['message'])->toContain('Client session not initialized');
})->group('integration', 'streamable_http_stream');
it('returns HTTP 400 error for non-initialize requests without Mcp-Session-Id', function () {
await($this->streamClient->sendInitializeRequest([
'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
'clientInfo' => ['name' => 'StreamModeClient', 'version' => '1.0'],
'capabilities' => []
], 'init-stream-sess-test'));
$validSessionId = $this->streamClient->sessionId;
$this->streamClient->sessionId = null;
try {
await($this->streamClient->sendRequest('tools/list', [], 'tools-list-no-session-stream'));
$this->fail("Expected request to tools/list to fail with 400, but it succeeded.");
} catch (ResponseException $e) {
expect($e->getResponse()->getStatusCode())->toBe(400);
// Body can't be a json since the header accepts only text/event-stream
}
$this->streamClient->sessionId = $validSessionId;
})->group('integration', 'streamable_http_stream');
});
/**
* STATELESS MODE TESTS
*
* Tests for the stateless mode of StreamableHttpServerTransport, which:
* - Generates session IDs internally but doesn't expose them to clients
* - Doesn't require session IDs in requests after initialization
* - Doesn't include session IDs in response headers
* - Disables GET requests (SSE streaming)
* - Makes DELETE requests meaningless (but returns 204)
* - Treats each request as independent (no persistent session state)
*
* This mode is designed to work with clients like OpenAI's MCP implementation
* that have issues with session management in "never require approval" mode.
*/
describe('STATELESS MODE', function () {
beforeEach(function () {
$this->process = new Process($this->statelessModeCommand, getcwd() ?: null, null, []);
$this->process->start();
$this->statelessClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);
await(delay(0.2));
});
it('allows tool calls without having to send initialized notification', function () {
// 1. Initialize Request
$initResult = await($this->statelessClient->sendRequest('initialize', [
'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
'clientInfo' => ['name' => 'StatelessModeClient', 'version' => '1.0'],
'capabilities' => []
], 'init-stateless-1'));
expect($initResult['statusCode'])->toBe(200);
expect($initResult['body']['id'])->toBe('init-stateless-1');
expect($initResult['body'])->not->toHaveKey('error');
expect($initResult['body']['result']['protocolVersion'])->toBe(Protocol::LATEST_PROTOCOL_VERSION);
expect($initResult['body']['result']['serverInfo']['name'])->toBe('StreamableHttpIntegrationServer');
expect($this->statelessClient->sessionId)->toBeString()->toBeEmpty();
// 2. Call a tool
$toolResult = await($this->statelessClient->sendRequest('tools/call', [
'name' => 'greet_streamable_tool',
'arguments' => ['name' => 'Stateless Mode User']
], 'tool-stateless-1'));
expect($toolResult['statusCode'])->toBe(200);
expect($toolResult['body']['id'])->toBe('tool-stateless-1');
expect($toolResult['body'])->not->toHaveKey('error');
expect($toolResult['body']['result']['content'][0]['text'])->toBe('Hello, Stateless Mode User!');
})->group('integration', 'streamable_http_stateless');
it('return HTTP 400 error response for invalid JSON in POST request', function () {
$malformedJson = '{"jsonrpc":"2.0", "id": "bad-json-stateless-1", "method": "tools/list", "params": {"broken"}';
$postPromise = $this->statelessClient->browser->post(
$this->statelessClient->baseUrl,
['Content-Type' => 'application/json', 'Accept' => 'application/json'],
$malformedJson
);
try {
await(timeout($postPromise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));
} catch (ResponseException $e) {
$httpResponse = $e->getResponse();
$bodyContent = (string) $httpResponse->getBody();
$decodedBody = json_decode($bodyContent, true);
expect($httpResponse->getStatusCode())->toBe(400);
expect($decodedBody['jsonrpc'])->toBe('2.0');
expect($decodedBody['id'])->toBe('');
expect($decodedBody['error']['code'])->toBe(-32700);
expect($decodedBody['error']['message'])->toContain('Invalid JSON');
}
})->group('integration', 'streamable_http_stateless');
it('returns JSON-RPC error result for request for non-existent method', function () {
$errorResult = await($this->statelessClient->sendRequest('non/existentToolViaStateless', [], 'err-meth-stateless-1'));
expect($errorResult['statusCode'])->toBe(200);
expect($errorResult['body']['id'])->toBe('err-meth-stateless-1');
expect($errorResult['body']['error']['code'])->toBe(-32601);
expect($errorResult['body']['error']['message'])->toContain("Method 'non/existentToolViaStateless' not found");
})->group('integration', 'streamable_http_stateless');
it('can handle batch requests correctly', function () {
$batchRequests = [
['jsonrpc' => '2.0', 'id' => 'batch-req-1', 'method' => 'tools/call', 'params' => ['name' => 'greet_streamable_tool', 'arguments' => ['name' => 'Batch Item 1']]],
['jsonrpc' => '2.0', 'method' => 'notifications/something'],
['jsonrpc' => '2.0', 'id' => 'batch-req-2', 'method' => 'tools/call', 'params' => ['name' => 'sum_streamable_tool', 'arguments' => ['a' => 10, 'b' => 20]]],
['jsonrpc' => '2.0', 'id' => 'batch-req-3', 'method' => 'nonexistent/method']
];
$batchResponse = await($this->statelessClient->sendBatchRequest($batchRequests));
$findResponseById = function (array $batch, $id) {
foreach ($batch as $item) {
if (isset($item['id']) && $item['id'] === $id) {
return $item;
}
}
return null;
};
expect($batchResponse['statusCode'])->toBe(200);
expect($batchResponse['body'])->toBeArray()->toHaveCount(3);
$response1 = $findResponseById($batchResponse['body'], 'batch-req-1');
$response2 = $findResponseById($batchResponse['body'], 'batch-req-2');
$response3 = $findResponseById($batchResponse['body'], 'batch-req-3');
expect($response1['result']['content'][0]['text'])->toBe('Hello, Batch Item 1!');
expect($response2['result']['content'][0]['text'])->toBe('30');
expect($response3['error']['code'])->toBe(-32601);
expect($response3['error']['message'])->toContain("Method 'nonexistent/method' not found");
})->group('integration', 'streamable_http_stateless');
it('passes request in Context', function () {
$toolResult = await($this->statelessClient->sendRequest('tools/call', [
'name' => 'tool_reads_context',
'arguments' => []
], 'tool-stateless-context-1', ['X-Test-Header' => 'TestValue']));
expect($toolResult['statusCode'])->toBe(200);
expect($toolResult['body']['id'])->toBe('tool-stateless-context-1');
expect($toolResult['body'])->not->toHaveKey('error');
expect($toolResult['body']['result']['content'][0]['text'])->toBe('TestValue');
})->group('integration', 'streamable_http_stateless');
it('can handle tool list request', function () {
$toolListResult = await($this->statelessClient->sendRequest('tools/list', [], 'tool-list-stateless-1'));
expect($toolListResult['statusCode'])->toBe(200);
expect($toolListResult['body']['id'])->toBe('tool-list-stateless-1');
expect($toolListResult['body'])->not->toHaveKey('error');
expect($toolListResult['body']['result']['tools'])->toBeArray();
expect(count($toolListResult['body']['result']['tools']))->toBe(4);
expect($toolListResult['body']['result']['tools'][0]['name'])->toBe('greet_streamable_tool');
expect($toolListResult['body']['result']['tools'][1]['name'])->toBe('sum_streamable_tool');
expect($toolListResult['body']['result']['tools'][2]['name'])->toBe('tool_reads_context');
})->group('integration', 'streamable_http_stateless');
it('can read a registered resource', function () {
$resourceResult = await($this->statelessClient->sendRequest('resources/read', ['uri' => 'test://streamable/static'], 'res-read-stateless-1'));
expect($resourceResult['statusCode'])->toBe(200);
expect($resourceResult['body']['id'])->toBe('res-read-stateless-1');
$contents = $resourceResult['body']['result']['contents'];
expect($contents[0]['uri'])->toBe('test://streamable/static');
expect($contents[0]['text'])->toBe(\PhpMcp\Server\Tests\Fixtures\General\ResourceHandlerFixture::$staticTextContent);
})->group('integration', 'streamable_http_stateless');
it('can get a registered prompt', function () {
$promptResult = await($this->statelessClient->sendRequest('prompts/get', [
'name' => 'simple_streamable_prompt',
'arguments' => ['name' => 'StatelessPromptUser', 'style' => 'formal']
], 'prompt-get-stateless-1'));
expect($promptResult['statusCode'])->toBe(200);
expect($promptResult['body']['id'])->toBe('prompt-get-stateless-1');
$messages = $promptResult['body']['result']['messages'];
expect($messages[0]['content']['text'])->toBe('Craft a formal greeting for StatelessPromptUser.');
})->group('integration', 'streamable_http_stateless');
it('does not return session ID in response headers in stateless mode', function () {
$promise = $this->statelessClient->browser->post(
$this->statelessClient->baseUrl,
['Content-Type' => 'application/json', 'Accept' => 'application/json'],
json_encode([
'jsonrpc' => '2.0',
'id' => 'init-header-test',
'method' => 'initialize',
'params' => [
'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
'clientInfo' => ['name' => 'StatelessHeaderTest', 'version' => '1.0'],
'capabilities' => []
]
])
);
$response = await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));
expect($response->getStatusCode())->toBe(200);
expect($response->hasHeader('Mcp-Session-Id'))->toBeFalse();
$body = json_decode((string) $response->getBody(), true);
expect($body['id'])->toBe('init-header-test');
expect($body)->not->toHaveKey('error');
})->group('integration', 'streamable_http_stateless');
it('returns HTTP 405 for GET requests (SSE disabled) in stateless mode', function () {
try {
$getPromise = $this->statelessClient->browser->get(
$this->statelessClient->baseUrl,
['Accept' => 'text/event-stream']
);
await(timeout($getPromise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));
$this->fail("Expected GET request to fail with 405, but it succeeded.");
} catch (ResponseException $e) {
expect($e->getResponse()->getStatusCode())->toBe(405);
$bodyContent = (string) $e->getResponse()->getBody();
$decodedBody = json_decode($bodyContent, true);
expect($decodedBody['error']['message'])->toContain('GET requests (SSE streaming) are not supported in stateless mode');
}
})->group('integration', 'streamable_http_stateless');
it('returns 204 for DELETE requests in stateless mode (but they are meaningless)', function () {
$deletePromise = $this->statelessClient->browser->delete($this->statelessClient->baseUrl);
$response = await(timeout($deletePromise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));
expect($response->getStatusCode())->toBe(204);
expect((string) $response->getBody())->toBeEmpty();
})->group('integration', 'streamable_http_stateless');
it('handles multiple independent tool calls in stateless mode', function () {
$toolResult1 = await($this->statelessClient->sendRequest('tools/call', [
'name' => 'greet_streamable_tool',
'arguments' => ['name' => 'User 1']
], 'tool-multi-1'));
$toolResult2 = await($this->statelessClient->sendRequest('tools/call', [
'name' => 'sum_streamable_tool',
'arguments' => ['a' => 5, 'b' => 10]
], 'tool-multi-2'));
$toolResult3 = await($this->statelessClient->sendRequest('tools/call', [
'name' => 'greet_streamable_tool',
'arguments' => ['name' => 'User 3']
], 'tool-multi-3'));
expect($toolResult1['statusCode'])->toBe(200);
expect($toolResult1['body']['id'])->toBe('tool-multi-1');
expect($toolResult1['body']['result']['content'][0]['text'])->toBe('Hello, User 1!');
expect($toolResult2['statusCode'])->toBe(200);
expect($toolResult2['body']['id'])->toBe('tool-multi-2');
expect($toolResult2['body']['result']['content'][0]['text'])->toBe('15');
expect($toolResult3['statusCode'])->toBe(200);
expect($toolResult3['body']['id'])->toBe('tool-multi-3');
expect($toolResult3['body']['result']['content'][0]['text'])->toBe('Hello, User 3!');
})->group('integration', 'streamable_http_stateless');
});
it('responds to OPTIONS request with CORS headers', function () {
$this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
$this->process->start();
$this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);
await(delay(0.1));
$browser = new Browser();
$optionsUrl = $this->jsonClient->baseUrl;
$promise = $browser->request('OPTIONS', $optionsUrl);
$response = await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));
expect($response->getStatusCode())->toBe(204);
expect($response->getHeaderLine('Access-Control-Allow-Origin'))->toBe('*');
expect($response->getHeaderLine('Access-Control-Allow-Methods'))->toContain('POST');
expect($response->getHeaderLine('Access-Control-Allow-Methods'))->toContain('GET');
expect($response->getHeaderLine('Access-Control-Allow-Headers'))->toContain('Mcp-Session-Id');
})->group('integration', 'streamable_http');
it('returns 404 for unknown paths', function () {
$this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
$this->process->start();
$this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);
await(delay(0.1));
$browser = new Browser();
$unknownUrl = "http://" . STREAMABLE_HTTP_HOST . ":" . $this->port . "/completely/unknown/path";
$promise = $browser->get($unknownUrl);
try {
await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));
$this->fail("Request to unknown path should have failed with 404.");
} catch (ResponseException $e) {
expect($e->getResponse()->getStatusCode())->toBe(404);
$decodedBody = json_decode((string)$e->getResponse()->getBody(), true);
expect($decodedBody['error']['message'])->toContain('Not found');
}
})->group('integration', 'streamable_http');
it('can delete client session with DELETE request', function () {
$this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
$this->process->start();
$this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);
await(delay(0.1));
// 1. Initialize
await($this->jsonClient->sendRequest('initialize', ['protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION, 'clientInfo' => ['name' => 'JsonModeClient', 'version' => '1.0'], 'capabilities' => []], 'init-delete-test'));
$sessionIdForDelete = $this->jsonClient->sessionId;
expect($sessionIdForDelete)->toBeString();
await($this->jsonClient->sendNotification('notifications/initialized'));
// 2. Establish a GET SSE connection
$sseUrl = $this->jsonClient->baseUrl;
$browserForSse = (new Browser())->withTimeout(3);
$ssePromise = $browserForSse->requestStreaming('GET', $sseUrl, [
'Accept' => 'text/event-stream',
'Mcp-Session-Id' => $sessionIdForDelete
]);
$ssePsrResponse = await(timeout($ssePromise, 3));
expect($ssePsrResponse->getStatusCode())->toBe(200);
expect($ssePsrResponse->getHeaderLine('Content-Type'))->toBe('text/event-stream');
$sseStream = $ssePsrResponse->getBody();
assert($sseStream instanceof ReadableStreamInterface);
$isSseStreamClosed = false;
$sseStream->on('close', function () use (&$isSseStreamClosed) {
$isSseStreamClosed = true;
});
// 3. Send DELETE request
$deleteResponse = await($this->jsonClient->sendDeleteRequest());
expect($deleteResponse['statusCode'])->toBe(204);
// 4. Assert that the GET SSE stream was closed
await(delay(0.1));
expect($isSseStreamClosed)->toBeTrue("The GET SSE stream for session {$sessionIdForDelete} was not closed after DELETE request.");
// 5. Assert that the client session was deleted
try {
await($this->jsonClient->sendRequest('tools/list', [], 'tool-list-json-1'));
$this->fail("Expected request to tools/list to fail with 400, but it succeeded.");
} catch (ResponseException $e) {
expect($e->getResponse()->getStatusCode())->toBe(404);
$bodyContent = (string) $e->getResponse()->getBody();
$decodedBody = json_decode($bodyContent, true);
expect($decodedBody['error']['code'])->toBe(-32600);
expect($decodedBody['error']['message'])->toContain('Invalid or expired session');
}
})->group('integration', 'streamable_http_json');
it('executes middleware that adds headers to response', function () {
$this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
$this->process->start();
$this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);
await(delay(0.1));
// 1. Send a request and check that middleware-added header is present
$response = await($this->jsonClient->sendRequest('initialize', [
'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
'clientInfo' => ['name' => 'MiddlewareTestClient'],
'capabilities' => []
], 'init-middleware-headers'));
// Check that the response has the header added by middleware
expect($this->jsonClient->lastResponseHeaders)->toContain('X-Test-Middleware: header-added');
})->group('integration', 'streamable_http', 'middleware');
it('executes middleware that modifies request attributes', function () {
$this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
$this->process->start();
$this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);
await(delay(0.1));
// 1. Initialize
await($this->jsonClient->sendRequest('initialize', [
'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
'clientInfo' => ['name' => 'MiddlewareAttrTestClient', 'version' => '1.0'],
'capabilities' => []
], 'init-middleware-attr'));
await($this->jsonClient->sendNotification('notifications/initialized'));
// 2. Call tool that checks for middleware-added attribute
$toolResponse = await($this->jsonClient->sendRequest('tools/call', [
'name' => 'check_request_attribute_tool',
'arguments' => []
], 'tool-attr-check'));
expect($toolResponse['body']['result']['content'][0]['text'])->toBe('middleware-value-found: middleware-value');
})->group('integration', 'streamable_http', 'middleware');
it('executes middleware that can short-circuit request processing', function () {
$this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
$this->process->start();
await(delay(0.1));
$browser = new Browser();
$shortCircuitUrl = "http://" . STREAMABLE_HTTP_HOST . ":" . $this->port . "/" . STREAMABLE_MCP_PATH . "/short-circuit";
$promise = $browser->get($shortCircuitUrl);
try {
$response = await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));
$this->fail("Expected a 418 status code response, but request succeeded");
} catch (ResponseException $e) {
expect($e->getResponse()->getStatusCode())->toBe(418);
$body = (string) $e->getResponse()->getBody();
expect($body)->toBe('Short-circuited by middleware');
} catch (\Throwable $e) {
$this->fail("Short-circuit middleware test failed: " . $e->getMessage());
}
})->group('integration', 'streamable_http', 'middleware');
it('executes multiple middlewares in correct order', function () {
$this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
$this->process->start();
$this->jsonClient = new MockJsonHttpClient(STREAMABLE_HTTP_HOST, $this->port, STREAMABLE_MCP_PATH);
await(delay(0.1));
// 1. Send a request and check middleware order
await($this->jsonClient->sendRequest('initialize', [
'protocolVersion' => Protocol::LATEST_PROTOCOL_VERSION,
'clientInfo' => ['name' => 'MiddlewareOrderTestClient'],
'capabilities' => []
], 'init-middleware-order'));
// Check that headers from multiple middlewares are present in correct order
expect($this->jsonClient->lastResponseHeaders)->toContain('X-Middleware-Order: third,second,first');
})->group('integration', 'streamable_http', 'middleware');
it('handles middleware that throws exceptions gracefully', function () {
$this->process = new Process($this->jsonModeCommand, getcwd() ?: null, null, []);
$this->process->start();
await(delay(0.1));
$browser = new Browser();
$errorUrl = "http://" . STREAMABLE_HTTP_HOST . ":" . $this->port . "/" . STREAMABLE_MCP_PATH . "/error-middleware";
$promise = $browser->get($errorUrl);
try {
await(timeout($promise, STREAMABLE_HTTP_PROCESS_TIMEOUT - 2));
$this->fail("Error middleware should have thrown an exception.");
} catch (ResponseException $e) {
expect($e->getResponse()->getStatusCode())->toBe(500);
$body = (string) $e->getResponse()->getBody();
// ReactPHP handles exceptions and returns a generic error message
expect($body)->toContain('Internal Server Error');
}
})->group('integration', 'streamable_http', 'middleware');
```