This is page 4 of 8. Use http://codebase.md/saidsurucu/yargi-mcp?lines=false&page={x} to view the full context.
# Directory Structure
```
├── __main__.py
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│ └── workflows
│ └── publish.yml
├── .gitignore
├── .serena
│ ├── .gitignore
│ └── project.yml
├── 5ire-settings.png
├── analyze_kik_hash_generation.py
├── anayasa_mcp_module
│ ├── __init__.py
│ ├── bireysel_client.py
│ ├── client.py
│ ├── models.py
│ └── unified_client.py
├── asgi_app.py
├── bddk_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── bedesten_mcp_module
│ ├── __init__.py
│ ├── client.py
│ ├── enums.py
│ └── models.py
├── check_response_format.py
├── CLAUDE.md
├── danistay_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── docker-compose.yml
├── Dockerfile
├── docs
│ └── DEPLOYMENT.md
├── emsal_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── example_fastapi_app.py
├── fly-no-auth.toml
├── fly.toml
├── kik_mcp_module
│ ├── __init__.py
│ ├── client_v2.py
│ ├── client.py
│ ├── models_v2.py
│ └── models.py
├── kvkk_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── LICENSE
├── mcp_auth
│ ├── __init__.py
│ ├── clerk_config.py
│ ├── middleware.py
│ ├── oauth.py
│ ├── policy.py
│ └── storage.py
├── mcp_auth_factory.py
├── mcp_auth_http_adapter.py
├── mcp_auth_http_simple.py
├── mcp_server_main.py
├── nginx.conf
├── ornek.png
├── Procfile
├── pyproject.toml
├── railway.json
├── README.md
├── redis_session_store.py
├── rekabet_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── requirements.txt
├── run_asgi.py
├── saidsurucu-yargi-mcp-f5fa007
│ ├── __main__.py
│ ├── .dockerignore
│ ├── .env.example
│ ├── .gitattributes
│ ├── .github
│ │ └── workflows
│ │ └── publish.yml
│ ├── .gitignore
│ ├── 5ire-settings.png
│ ├── anayasa_mcp_module
│ │ ├── __init__.py
│ │ ├── bireysel_client.py
│ │ ├── client.py
│ │ ├── models.py
│ │ └── unified_client.py
│ ├── asgi_app.py
│ ├── bddk_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── bedesten_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── enums.py
│ │ └── models.py
│ ├── check_response_format.py
│ ├── danistay_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── docker-compose.yml
│ ├── Dockerfile
│ ├── docs
│ │ └── DEPLOYMENT.md
│ ├── emsal_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── example_fastapi_app.py
│ ├── kik_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── kvkk_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── LICENSE
│ ├── mcp_auth
│ │ ├── __init__.py
│ │ ├── clerk_config.py
│ │ ├── middleware.py
│ │ ├── oauth.py
│ │ ├── policy.py
│ │ └── storage.py
│ ├── mcp_auth_factory.py
│ ├── mcp_auth_http_adapter.py
│ ├── mcp_auth_http_simple.py
│ ├── mcp_server_main.py
│ ├── nginx.conf
│ ├── ornek.png
│ ├── Procfile
│ ├── pyproject.toml
│ ├── railway.json
│ ├── README.md
│ ├── redis_session_store.py
│ ├── rekabet_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── run_asgi.py
│ ├── sayistay_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── enums.py
│ │ ├── models.py
│ │ └── unified_client.py
│ ├── starlette_app.py
│ ├── stripe_webhook.py
│ ├── uyusmazlik_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ └── yargitay_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── sayistay_mcp_module
│ ├── __init__.py
│ ├── client.py
│ ├── enums.py
│ ├── models.py
│ └── unified_client.py
├── starlette_app.py
├── stripe_webhook.py
├── uv.lock
├── uyusmazlik_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
└── yargitay_mcp_module
├── __init__.py
├── client.py
└── models.py
```
# Files
--------------------------------------------------------------------------------
/mcp_auth_http_adapter.py:
--------------------------------------------------------------------------------
```python
"""
HTTP adapter for MCP Auth Toolkit OAuth endpoints
Exposes MCP OAuth tools as HTTP endpoints for Claude.ai integration
"""
import os
import logging
import secrets
import time
from typing import Optional
from urllib.parse import urlencode, quote
from datetime import datetime, timedelta
from fastapi import APIRouter, Request, Query, HTTPException
from fastapi.responses import RedirectResponse, JSONResponse
# Try to import Clerk SDK
try:
from clerk_backend_api import Clerk
CLERK_AVAILABLE = True
except ImportError as e:
CLERK_AVAILABLE = False
Clerk = None
logger = logging.getLogger(__name__)
router = APIRouter()
# OAuth configuration
BASE_URL = os.getenv("BASE_URL", "https://yargimcp.com")
@router.get("/.well-known/oauth-authorization-server")
async def get_oauth_metadata():
"""OAuth 2.0 Authorization Server Metadata (RFC 8414)"""
return JSONResponse({
"issuer": BASE_URL,
"authorization_endpoint": f"{BASE_URL}/authorize",
"token_endpoint": f"{BASE_URL}/token",
"registration_endpoint": f"{BASE_URL}/register",
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code", "refresh_token"],
"code_challenge_methods_supported": ["S256"],
"token_endpoint_auth_methods_supported": ["none"],
"scopes_supported": ["mcp:tools:read", "mcp:tools:write", "openid", "profile", "email"],
"service_documentation": f"{BASE_URL}/mcp/"
})
@router.get("/.well-known/oauth-protected-resource")
async def get_protected_resource_metadata():
"""OAuth Protected Resource Metadata (RFC 9728)"""
return JSONResponse({
"resource": BASE_URL,
"authorization_servers": [BASE_URL],
"bearer_methods_supported": ["header"],
"scopes_supported": ["mcp:tools:read", "mcp:tools:write"],
"resource_documentation": f"{BASE_URL}/docs"
})
@router.get("/authorize")
async def authorize_endpoint(
response_type: str = Query(...),
client_id: str = Query(...),
redirect_uri: str = Query(...),
code_challenge: str = Query(...),
code_challenge_method: str = Query("S256"),
state: Optional[str] = Query(None),
scope: Optional[str] = Query(None)
):
"""OAuth 2.1 Authorization Endpoint - Uses Clerk SDK for custom domains"""
logger.info(f"OAuth authorize request - client_id: {client_id}, redirect_uri: {redirect_uri}")
if not CLERK_AVAILABLE:
logger.error("Clerk SDK not available")
raise HTTPException(status_code=500, detail="Clerk SDK not available")
# Store OAuth session for later validation
try:
from mcp_server_main import app as mcp_app
from mcp_auth_factory import get_oauth_provider
oauth_provider = get_oauth_provider(mcp_app)
if not oauth_provider:
raise HTTPException(status_code=500, detail="OAuth provider not configured")
# Generate session and store PKCE
session_id = secrets.token_urlsafe(32)
if state is None:
state = secrets.token_urlsafe(16)
# Create PKCE challenge
from mcp_auth.oauth import PKCEChallenge
pkce = PKCEChallenge()
# Store session data
session_data = {
"pkce_verifier": pkce.verifier,
"pkce_challenge": code_challenge, # Store the client's challenge
"state": state,
"redirect_uri": redirect_uri,
"client_id": client_id,
"scopes": scope.split(" ") if scope else ["mcp:tools:read", "mcp:tools:write"],
"created_at": time.time(),
"expires_at": (datetime.utcnow() + timedelta(minutes=10)).timestamp(),
}
oauth_provider.storage.set_session(session_id, session_data)
# For Clerk with custom domains, we need to use their hosted sign-in page
# We'll pass our callback URL and session info in the state
callback_url = f"{BASE_URL}/auth/callback"
# Encode session info in state for retrieval after Clerk auth
combined_state = f"{state}:{session_id}"
# Use Clerk's sign-in URL with proper parameters
clerk_domain = os.getenv("CLERK_DOMAIN", "accounts.yargimcp.com")
sign_in_params = {
"redirect_url": f"{callback_url}?state={quote(combined_state)}",
}
sign_in_url = f"https://{clerk_domain}/sign-in?{urlencode(sign_in_params)}"
logger.info(f"Redirecting to Clerk sign-in: {sign_in_url}")
return RedirectResponse(url=sign_in_url)
except Exception as e:
logger.exception(f"Authorization failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/auth/callback")
async def oauth_callback(
request: Request,
state: Optional[str] = Query(None),
clerk_token: Optional[str] = Query(None)
):
"""Handle OAuth callback from Clerk - supports both JWT token and cookie auth"""
logger.info(f"OAuth callback received - state: {state}")
logger.info(f"Query params: {dict(request.query_params)}")
logger.info(f"Cookies: {dict(request.cookies)}")
logger.info(f"Clerk JWT token provided: {bool(clerk_token)}")
# Support both JWT token (for cross-domain) and cookie auth (for subdomain)
try:
if not state:
logger.error("No state parameter provided")
return JSONResponse(
status_code=400,
content={"error": "invalid_request", "error_description": "Missing state parameter"}
)
# Parse state to get original state and session ID
try:
if ":" in state:
original_state, session_id = state.rsplit(":", 1)
else:
original_state = state
session_id = state # Fallback
except ValueError:
logger.error(f"Invalid state format: {state}")
return JSONResponse(
status_code=400,
content={"error": "invalid_request", "error_description": "Invalid state format"}
)
# Get OAuth provider
from mcp_server_main import app as mcp_app
from mcp_auth_factory import get_oauth_provider
oauth_provider = get_oauth_provider(mcp_app)
if not oauth_provider:
raise HTTPException(status_code=500, detail="OAuth provider not configured")
# Get stored session
oauth_session = oauth_provider.storage.get_session(session_id)
if not oauth_session:
logger.error(f"OAuth session not found for ID: {session_id}")
return JSONResponse(
status_code=400,
content={"error": "invalid_request", "error_description": "OAuth session expired or not found"}
)
# Check if we have a JWT token (for cross-domain auth)
user_authenticated = False
auth_method = "none"
if clerk_token:
logger.info("Attempting JWT token validation")
try:
# Validate JWT token with Clerk
from clerk_backend_api import Clerk
clerk = Clerk(bearer_auth=os.getenv("CLERK_SECRET_KEY"))
# Extract session_id from JWT token and verify with Clerk
import jwt
decoded_token = jwt.decode(clerk_token, options={"verify_signature": False})
session_id = decoded_token.get("sid") or decoded_token.get("session_id")
if session_id:
# Verify with Clerk using session_id
session = clerk.sessions.verify(session_id=session_id, token=clerk_token)
user_id = session.user_id if session else None
else:
user_id = None
if user_id:
logger.info(f"JWT token validation successful - user_id: {user_id}")
user_authenticated = True
auth_method = "jwt_token"
# Store user info in session for token exchange
oauth_session["user_id"] = user_id
oauth_session["auth_method"] = "jwt_token"
else:
logger.error("JWT token validation failed - no user_id in claims")
except Exception as e:
logger.error(f"JWT token validation failed: {str(e)}")
# Fall through to cookie validation
# If no JWT token or validation failed, check cookies
if not user_authenticated:
logger.info("Checking for Clerk session cookies")
# Check for Clerk session cookies (for subdomain auth)
clerk_session_cookie = request.cookies.get("__session")
if clerk_session_cookie:
logger.info("Found Clerk session cookie, assuming authenticated")
user_authenticated = True
auth_method = "cookie"
oauth_session["auth_method"] = "cookie"
else:
logger.info("No Clerk session cookie found")
# For custom domains, we'll also trust that Clerk redirected here
if not user_authenticated:
logger.info("Trusting Clerk redirect for custom domain flow")
user_authenticated = True
auth_method = "trusted_redirect"
oauth_session["auth_method"] = "trusted_redirect"
logger.info(f"User authenticated: {user_authenticated}, method: {auth_method}")
# Generate simple authorization code for custom domain flow
auth_code = f"clerk_custom_{session_id}_{int(time.time())}"
# Store the code mapping for token exchange
code_data = {
"session_id": session_id,
"clerk_authenticated": user_authenticated,
"auth_method": auth_method,
"custom_domain_flow": True,
"created_at": time.time(),
"expires_at": (datetime.utcnow() + timedelta(minutes=5)).timestamp(),
}
if "user_id" in oauth_session:
code_data["user_id"] = oauth_session["user_id"]
oauth_provider.storage.set_session(f"code_{auth_code}", code_data)
# Build redirect URL back to Claude
redirect_params = {
"code": auth_code,
"state": original_state
}
redirect_url = f"{oauth_session['redirect_uri']}?{urlencode(redirect_params)}"
logger.info(f"Redirecting back to Claude: {redirect_url}")
return RedirectResponse(url=redirect_url)
except Exception as e:
logger.exception(f"Callback processing failed: {e}")
return JSONResponse(
status_code=500,
content={"error": "server_error", "error_description": str(e)}
)
@router.post("/register")
async def register_client(request: Request):
"""Dynamic Client Registration (RFC 7591)"""
data = await request.json()
logger.info(f"Client registration request: {data}")
# Simple dynamic registration - accept any client
client_id = f"mcp-client-{os.urandom(8).hex()}"
return JSONResponse({
"client_id": client_id,
"client_secret": None, # Public client
"redirect_uris": data.get("redirect_uris", []),
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
"client_name": data.get("client_name", "MCP Client"),
"token_endpoint_auth_method": "none",
"client_id_issued_at": int(datetime.now().timestamp())
})
@router.post("/token")
async def token_endpoint(request: Request):
"""OAuth 2.1 Token Endpoint"""
# Parse form data
form_data = await request.form()
grant_type = form_data.get("grant_type")
code = form_data.get("code")
redirect_uri = form_data.get("redirect_uri")
client_id = form_data.get("client_id")
code_verifier = form_data.get("code_verifier")
logger.info(f"Token exchange - grant_type: {grant_type}, code: {code[:20] if code else 'None'}...")
if grant_type != "authorization_code":
return JSONResponse(
status_code=400,
content={"error": "unsupported_grant_type"}
)
try:
# OAuth token exchange - validate code and return Clerk JWT
# This supports proper OAuth flow while using Clerk JWT tokens
if not code or not redirect_uri:
logger.error("Missing required parameters: code or redirect_uri")
return JSONResponse(
status_code=400,
content={"error": "invalid_request", "error_description": "Missing code or redirect_uri"}
)
# Validate OAuth code with Clerk
if CLERK_AVAILABLE:
try:
clerk = Clerk(bearer_auth=os.getenv("CLERK_SECRET_KEY"))
# In a real implementation, you'd validate the code with Clerk
# For now, we'll assume the code is valid if it looks like a Clerk code
if len(code) > 10: # Basic validation
# Create a mock session with the code
# In practice, this would be validated with Clerk's OAuth flow
# Return Clerk JWT token format
# This should be the actual Clerk JWT token from the OAuth flow
return JSONResponse({
"access_token": f"mock_clerk_jwt_{code}",
"token_type": "Bearer",
"expires_in": 3600,
"scope": "yargi.read yargi.search"
})
else:
logger.error(f"Invalid code format: {code}")
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Invalid authorization code"}
)
except Exception as e:
logger.error(f"Clerk validation failed: {e}")
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Authorization code validation failed"}
)
else:
logger.warning("Clerk SDK not available, using mock response")
return JSONResponse({
"access_token": "mock_jwt_token_for_development",
"token_type": "Bearer",
"expires_in": 3600,
"scope": "yargi.read yargi.search"
})
except Exception as e:
logger.exception(f"Token exchange failed: {e}")
return JSONResponse(
status_code=500,
content={"error": "server_error", "error_description": str(e)}
)
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/mcp_auth_http_adapter.py:
--------------------------------------------------------------------------------
```python
"""
HTTP adapter for MCP Auth Toolkit OAuth endpoints
Exposes MCP OAuth tools as HTTP endpoints for Claude.ai integration
"""
import os
import logging
import secrets
import time
from typing import Optional
from urllib.parse import urlencode, quote
from datetime import datetime, timedelta
from fastapi import APIRouter, Request, Query, HTTPException
from fastapi.responses import RedirectResponse, JSONResponse
# Try to import Clerk SDK
try:
from clerk_backend_api import Clerk
CLERK_AVAILABLE = True
except ImportError as e:
CLERK_AVAILABLE = False
Clerk = None
logger = logging.getLogger(__name__)
router = APIRouter()
# OAuth configuration
BASE_URL = os.getenv("BASE_URL", "https://yargimcp.com")
@router.get("/.well-known/oauth-authorization-server")
async def get_oauth_metadata():
"""OAuth 2.0 Authorization Server Metadata (RFC 8414)"""
return JSONResponse({
"issuer": BASE_URL,
"authorization_endpoint": f"{BASE_URL}/authorize",
"token_endpoint": f"{BASE_URL}/token",
"registration_endpoint": f"{BASE_URL}/register",
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code", "refresh_token"],
"code_challenge_methods_supported": ["S256"],
"token_endpoint_auth_methods_supported": ["none"],
"scopes_supported": ["mcp:tools:read", "mcp:tools:write", "openid", "profile", "email"],
"service_documentation": f"{BASE_URL}/mcp/"
})
@router.get("/.well-known/oauth-protected-resource")
async def get_protected_resource_metadata():
"""OAuth Protected Resource Metadata (RFC 9728)"""
return JSONResponse({
"resource": BASE_URL,
"authorization_servers": [BASE_URL],
"bearer_methods_supported": ["header"],
"scopes_supported": ["mcp:tools:read", "mcp:tools:write"],
"resource_documentation": f"{BASE_URL}/docs"
})
@router.get("/authorize")
async def authorize_endpoint(
response_type: str = Query(...),
client_id: str = Query(...),
redirect_uri: str = Query(...),
code_challenge: str = Query(...),
code_challenge_method: str = Query("S256"),
state: Optional[str] = Query(None),
scope: Optional[str] = Query(None)
):
"""OAuth 2.1 Authorization Endpoint - Uses Clerk SDK for custom domains"""
logger.info(f"OAuth authorize request - client_id: {client_id}, redirect_uri: {redirect_uri}")
if not CLERK_AVAILABLE:
logger.error("Clerk SDK not available")
raise HTTPException(status_code=500, detail="Clerk SDK not available")
# Store OAuth session for later validation
try:
from mcp_server_main import app as mcp_app
from mcp_auth_factory import get_oauth_provider
oauth_provider = get_oauth_provider(mcp_app)
if not oauth_provider:
raise HTTPException(status_code=500, detail="OAuth provider not configured")
# Generate session and store PKCE
session_id = secrets.token_urlsafe(32)
if state is None:
state = secrets.token_urlsafe(16)
# Create PKCE challenge
from mcp_auth.oauth import PKCEChallenge
pkce = PKCEChallenge()
# Store session data
session_data = {
"pkce_verifier": pkce.verifier,
"pkce_challenge": code_challenge, # Store the client's challenge
"state": state,
"redirect_uri": redirect_uri,
"client_id": client_id,
"scopes": scope.split(" ") if scope else ["mcp:tools:read", "mcp:tools:write"],
"created_at": time.time(),
"expires_at": (datetime.utcnow() + timedelta(minutes=10)).timestamp(),
}
oauth_provider.storage.set_session(session_id, session_data)
# For Clerk with custom domains, we need to use their hosted sign-in page
# We'll pass our callback URL and session info in the state
callback_url = f"{BASE_URL}/auth/callback"
# Encode session info in state for retrieval after Clerk auth
combined_state = f"{state}:{session_id}"
# Use Clerk's sign-in URL with proper parameters
clerk_domain = os.getenv("CLERK_DOMAIN", "accounts.yargimcp.com")
sign_in_params = {
"redirect_url": f"{callback_url}?state={quote(combined_state)}",
}
sign_in_url = f"https://{clerk_domain}/sign-in?{urlencode(sign_in_params)}"
logger.info(f"Redirecting to Clerk sign-in: {sign_in_url}")
return RedirectResponse(url=sign_in_url)
except Exception as e:
logger.exception(f"Authorization failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/auth/callback")
async def oauth_callback(
request: Request,
state: Optional[str] = Query(None),
clerk_token: Optional[str] = Query(None)
):
"""Handle OAuth callback from Clerk - supports both JWT token and cookie auth"""
logger.info(f"OAuth callback received - state: {state}")
logger.info(f"Query params: {dict(request.query_params)}")
logger.info(f"Cookies: {dict(request.cookies)}")
logger.info(f"Clerk JWT token provided: {bool(clerk_token)}")
# Support both JWT token (for cross-domain) and cookie auth (for subdomain)
try:
if not state:
logger.error("No state parameter provided")
return JSONResponse(
status_code=400,
content={"error": "invalid_request", "error_description": "Missing state parameter"}
)
# Parse state to get original state and session ID
try:
if ":" in state:
original_state, session_id = state.rsplit(":", 1)
else:
original_state = state
session_id = state # Fallback
except ValueError:
logger.error(f"Invalid state format: {state}")
return JSONResponse(
status_code=400,
content={"error": "invalid_request", "error_description": "Invalid state format"}
)
# Get OAuth provider
from mcp_server_main import app as mcp_app
from mcp_auth_factory import get_oauth_provider
oauth_provider = get_oauth_provider(mcp_app)
if not oauth_provider:
raise HTTPException(status_code=500, detail="OAuth provider not configured")
# Get stored session
oauth_session = oauth_provider.storage.get_session(session_id)
if not oauth_session:
logger.error(f"OAuth session not found for ID: {session_id}")
return JSONResponse(
status_code=400,
content={"error": "invalid_request", "error_description": "OAuth session expired or not found"}
)
# Check if we have a JWT token (for cross-domain auth)
user_authenticated = False
auth_method = "none"
if clerk_token:
logger.info("Attempting JWT token validation")
try:
# Validate JWT token with Clerk
from clerk_backend_api import Clerk
clerk = Clerk(bearer_auth=os.getenv("CLERK_SECRET_KEY"))
# Extract session_id from JWT token and verify with Clerk
import jwt
decoded_token = jwt.decode(clerk_token, options={"verify_signature": False})
session_id = decoded_token.get("sid") or decoded_token.get("session_id")
if session_id:
# Verify with Clerk using session_id
session = clerk.sessions.verify(session_id=session_id, token=clerk_token)
user_id = session.user_id if session else None
else:
user_id = None
if user_id:
logger.info(f"JWT token validation successful - user_id: {user_id}")
user_authenticated = True
auth_method = "jwt_token"
# Store user info in session for token exchange
oauth_session["user_id"] = user_id
oauth_session["auth_method"] = "jwt_token"
else:
logger.error("JWT token validation failed - no user_id in claims")
except Exception as e:
logger.error(f"JWT token validation failed: {str(e)}")
# Fall through to cookie validation
# If no JWT token or validation failed, check cookies
if not user_authenticated:
logger.info("Checking for Clerk session cookies")
# Check for Clerk session cookies (for subdomain auth)
clerk_session_cookie = request.cookies.get("__session")
if clerk_session_cookie:
logger.info("Found Clerk session cookie, assuming authenticated")
user_authenticated = True
auth_method = "cookie"
oauth_session["auth_method"] = "cookie"
else:
logger.info("No Clerk session cookie found")
# For custom domains, we'll also trust that Clerk redirected here
if not user_authenticated:
logger.info("Trusting Clerk redirect for custom domain flow")
user_authenticated = True
auth_method = "trusted_redirect"
oauth_session["auth_method"] = "trusted_redirect"
logger.info(f"User authenticated: {user_authenticated}, method: {auth_method}")
# Generate simple authorization code for custom domain flow
auth_code = f"clerk_custom_{session_id}_{int(time.time())}"
# Store the code mapping for token exchange
code_data = {
"session_id": session_id,
"clerk_authenticated": user_authenticated,
"auth_method": auth_method,
"custom_domain_flow": True,
"created_at": time.time(),
"expires_at": (datetime.utcnow() + timedelta(minutes=5)).timestamp(),
}
if "user_id" in oauth_session:
code_data["user_id"] = oauth_session["user_id"]
oauth_provider.storage.set_session(f"code_{auth_code}", code_data)
# Build redirect URL back to Claude
redirect_params = {
"code": auth_code,
"state": original_state
}
redirect_url = f"{oauth_session['redirect_uri']}?{urlencode(redirect_params)}"
logger.info(f"Redirecting back to Claude: {redirect_url}")
return RedirectResponse(url=redirect_url)
except Exception as e:
logger.exception(f"Callback processing failed: {e}")
return JSONResponse(
status_code=500,
content={"error": "server_error", "error_description": str(e)}
)
@router.post("/register")
async def register_client(request: Request):
"""Dynamic Client Registration (RFC 7591)"""
data = await request.json()
logger.info(f"Client registration request: {data}")
# Simple dynamic registration - accept any client
client_id = f"mcp-client-{os.urandom(8).hex()}"
return JSONResponse({
"client_id": client_id,
"client_secret": None, # Public client
"redirect_uris": data.get("redirect_uris", []),
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
"client_name": data.get("client_name", "MCP Client"),
"token_endpoint_auth_method": "none",
"client_id_issued_at": int(datetime.now().timestamp())
})
@router.post("/token")
async def token_endpoint(request: Request):
"""OAuth 2.1 Token Endpoint"""
# Parse form data
form_data = await request.form()
grant_type = form_data.get("grant_type")
code = form_data.get("code")
redirect_uri = form_data.get("redirect_uri")
client_id = form_data.get("client_id")
code_verifier = form_data.get("code_verifier")
logger.info(f"Token exchange - grant_type: {grant_type}, code: {code[:20] if code else 'None'}...")
if grant_type != "authorization_code":
return JSONResponse(
status_code=400,
content={"error": "unsupported_grant_type"}
)
try:
# OAuth token exchange - validate code and return Clerk JWT
# This supports proper OAuth flow while using Clerk JWT tokens
if not code or not redirect_uri:
logger.error("Missing required parameters: code or redirect_uri")
return JSONResponse(
status_code=400,
content={"error": "invalid_request", "error_description": "Missing code or redirect_uri"}
)
# Validate OAuth code with Clerk
if CLERK_AVAILABLE:
try:
clerk = Clerk(bearer_auth=os.getenv("CLERK_SECRET_KEY"))
# In a real implementation, you'd validate the code with Clerk
# For now, we'll assume the code is valid if it looks like a Clerk code
if len(code) > 10: # Basic validation
# Create a mock session with the code
# In practice, this would be validated with Clerk's OAuth flow
# Return Clerk JWT token format
# This should be the actual Clerk JWT token from the OAuth flow
return JSONResponse({
"access_token": f"mock_clerk_jwt_{code}",
"token_type": "Bearer",
"expires_in": 3600,
"scope": "yargi.read yargi.search"
})
else:
logger.error(f"Invalid code format: {code}")
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Invalid authorization code"}
)
except Exception as e:
logger.error(f"Clerk validation failed: {e}")
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Authorization code validation failed"}
)
else:
logger.warning("Clerk SDK not available, using mock response")
return JSONResponse({
"access_token": "mock_jwt_token_for_development",
"token_type": "Bearer",
"expires_in": 3600,
"scope": "yargi.read yargi.search"
})
except Exception as e:
logger.exception(f"Token exchange failed: {e}")
return JSONResponse(
status_code=500,
content={"error": "server_error", "error_description": str(e)}
)
```
--------------------------------------------------------------------------------
/kvkk_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# kvkk_mcp_module/client.py
import httpx
from bs4 import BeautifulSoup
from typing import List, Optional, Dict, Any
import logging
import os
import re
import io
import math
from urllib.parse import urljoin, urlparse, parse_qs
from markitdown import MarkItDown
from pydantic import HttpUrl
from .models import (
KvkkSearchRequest,
KvkkDecisionSummary,
KvkkSearchResult,
KvkkDocumentMarkdown
)
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
class KvkkApiClient:
"""
API client for searching and retrieving KVKK (Personal Data Protection Authority) decisions
using Brave Search API for discovery and direct HTTP requests for content retrieval.
"""
BRAVE_API_URL = "https://api.search.brave.com/res/v1/web/search"
KVKK_BASE_URL = "https://www.kvkk.gov.tr"
DOCUMENT_MARKDOWN_CHUNK_SIZE = 5000 # Character limit per page
def __init__(self, request_timeout: float = 60.0):
"""Initialize the KVKK API client."""
self.brave_api_token = os.getenv("BRAVE_API_TOKEN")
if not self.brave_api_token:
# Fallback to provided free token
self.brave_api_token = "BSAuaRKB-dvSDSQxIN0ft1p2k6N82Kq"
logger.info("Using fallback Brave API token (limited free token)")
else:
logger.info("Using Brave API token from environment variable")
self.http_client = httpx.AsyncClient(
headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
},
timeout=request_timeout,
verify=True,
follow_redirects=True
)
def _construct_search_query(self, keywords: str) -> str:
"""Construct the search query for Brave API."""
base_query = 'site:kvkk.gov.tr "karar özeti"'
if keywords.strip():
return f"{base_query} {keywords.strip()}"
return base_query
def _extract_decision_id_from_url(self, url: str) -> Optional[str]:
"""Extract decision ID from KVKK decision URL."""
try:
# Example URL: https://www.kvkk.gov.tr/Icerik/7288/2021-1303
parsed_url = urlparse(url)
path_parts = parsed_url.path.strip('/').split('/')
if len(path_parts) >= 3 and path_parts[0] == 'Icerik':
# Extract the decision ID from the path
decision_id = '/'.join(path_parts[1:]) # e.g., "7288/2021-1303"
return decision_id
except Exception as e:
logger.debug(f"Could not extract decision ID from URL {url}: {e}")
return None
def _extract_decision_metadata_from_title(self, title: str) -> Dict[str, Optional[str]]:
"""Extract decision metadata from title string."""
metadata = {
"decision_date": None,
"decision_number": None
}
if not title:
return metadata
# Extract decision date (DD/MM/YYYY format)
date_match = re.search(r'(\d{1,2}/\d{1,2}/\d{4})', title)
if date_match:
metadata["decision_date"] = date_match.group(1)
# Extract decision number (YYYY/XXXX format)
number_match = re.search(r'(\d{4}/\d+)', title)
if number_match:
metadata["decision_number"] = number_match.group(1)
return metadata
async def search_decisions(self, params: KvkkSearchRequest) -> KvkkSearchResult:
"""Search for KVKK decisions using Brave API."""
search_query = self._construct_search_query(params.keywords)
logger.info(f"KvkkApiClient: Searching with query: {search_query}")
try:
# Calculate offset for pagination
offset = (params.page - 1) * params.pageSize
response = await self.http_client.get(
self.BRAVE_API_URL,
headers={
"Accept": "application/json",
"Accept-Encoding": "gzip",
"x-subscription-token": self.brave_api_token
},
params={
"q": search_query,
"country": "TR",
"search_lang": "tr",
"ui_lang": "tr-TR",
"offset": offset,
"count": params.pageSize
}
)
response.raise_for_status()
data = response.json()
# Extract search results
decisions = []
web_results = data.get("web", {}).get("results", [])
for result in web_results:
title = result.get("title", "")
url = result.get("url", "")
description = result.get("description", "")
# Extract metadata from title
metadata = self._extract_decision_metadata_from_title(title)
# Extract decision ID from URL
decision_id = self._extract_decision_id_from_url(url)
decision = KvkkDecisionSummary(
title=title,
url=HttpUrl(url) if url else None,
description=description,
decision_id=decision_id,
publication_date=metadata.get("decision_date"),
decision_number=metadata.get("decision_number")
)
decisions.append(decision)
# Get total results if available
total_results = None
query_info = data.get("query", {})
if "total_results" in query_info:
total_results = query_info["total_results"]
return KvkkSearchResult(
decisions=decisions,
total_results=total_results,
page=params.page,
pageSize=params.pageSize,
query=search_query
)
except httpx.RequestError as e:
logger.error(f"KvkkApiClient: HTTP request error during search: {e}")
return KvkkSearchResult(
decisions=[],
total_results=0,
page=params.page,
pageSize=params.pageSize,
query=search_query
)
except Exception as e:
logger.error(f"KvkkApiClient: Unexpected error during search: {e}")
return KvkkSearchResult(
decisions=[],
total_results=0,
page=params.page,
pageSize=params.pageSize,
query=search_query
)
def _extract_decision_content_from_html(self, html: str, url: str) -> Dict[str, Any]:
"""Extract decision content from KVKK decision page HTML."""
try:
soup = BeautifulSoup(html, 'html.parser')
# Extract title
title = None
title_element = soup.find('h3', class_='blog-post-title')
if title_element:
title = title_element.get_text(strip=True)
elif soup.title:
title = soup.title.get_text(strip=True)
# Extract decision content from the main content div
content_div = soup.find('div', class_='blog-post-inner')
if not content_div:
# Fallback to other possible content containers
content_div = soup.find('div', style='text-align:justify;')
if not content_div:
logger.warning(f"Could not find decision content div in {url}")
return {
"title": title,
"decision_date": None,
"decision_number": None,
"subject_summary": None,
"html_content": None
}
# Extract decision metadata from table
decision_date = None
decision_number = None
subject_summary = None
table = content_div.find('table')
if table:
rows = table.find_all('tr')
for row in rows:
cells = row.find_all('td')
if len(cells) >= 3:
field_name = cells[0].get_text(strip=True)
field_value = cells[2].get_text(strip=True)
if 'Karar Tarihi' in field_name:
decision_date = field_value
elif 'Karar No' in field_name:
decision_number = field_value
elif 'Konu Özeti' in field_name:
subject_summary = field_value
return {
"title": title,
"decision_date": decision_date,
"decision_number": decision_number,
"subject_summary": subject_summary,
"html_content": str(content_div)
}
except Exception as e:
logger.error(f"Error extracting content from HTML for {url}: {e}")
return {
"title": None,
"decision_date": None,
"decision_number": None,
"subject_summary": None,
"html_content": None
}
def _convert_html_to_markdown(self, html_content: str) -> Optional[str]:
"""Convert HTML content to Markdown using MarkItDown with BytesIO to avoid filename length issues."""
if not html_content:
return None
try:
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_content.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown(enable_plugins=False)
result = md_converter.convert(html_stream)
return result.text_content
except Exception as e:
logger.error(f"Error converting HTML to Markdown: {e}")
return None
async def get_decision_document(self, decision_url: str, page_number: int = 1) -> KvkkDocumentMarkdown:
"""Retrieve and convert a KVKK decision document to paginated Markdown."""
logger.info(f"KvkkApiClient: Getting decision document from: {decision_url}, page: {page_number}")
try:
# Fetch the decision page
response = await self.http_client.get(decision_url)
response.raise_for_status()
# Extract content from HTML
extracted_data = self._extract_decision_content_from_html(response.text, decision_url)
# Convert HTML content to Markdown
full_markdown_content = None
if extracted_data["html_content"]:
full_markdown_content = self._convert_html_to_markdown(extracted_data["html_content"])
if not full_markdown_content:
return KvkkDocumentMarkdown(
source_url=HttpUrl(decision_url),
title=extracted_data["title"],
decision_date=extracted_data["decision_date"],
decision_number=extracted_data["decision_number"],
subject_summary=extracted_data["subject_summary"],
markdown_chunk=None,
current_page=page_number,
total_pages=0,
is_paginated=False,
error_message="Could not convert document content to Markdown"
)
# Calculate pagination
content_length = len(full_markdown_content)
total_pages = math.ceil(content_length / self.DOCUMENT_MARKDOWN_CHUNK_SIZE)
if total_pages == 0:
total_pages = 1
# Clamp page number to valid range
current_page_clamped = max(1, min(page_number, total_pages))
# Extract the requested chunk
start_index = (current_page_clamped - 1) * self.DOCUMENT_MARKDOWN_CHUNK_SIZE
end_index = start_index + self.DOCUMENT_MARKDOWN_CHUNK_SIZE
markdown_chunk = full_markdown_content[start_index:end_index]
return KvkkDocumentMarkdown(
source_url=HttpUrl(decision_url),
title=extracted_data["title"],
decision_date=extracted_data["decision_date"],
decision_number=extracted_data["decision_number"],
subject_summary=extracted_data["subject_summary"],
markdown_chunk=markdown_chunk,
current_page=current_page_clamped,
total_pages=total_pages,
is_paginated=(total_pages > 1),
error_message=None
)
except httpx.HTTPStatusError as e:
error_msg = f"HTTP error {e.response.status_code} when fetching decision document"
logger.error(f"KvkkApiClient: {error_msg}")
return KvkkDocumentMarkdown(
source_url=HttpUrl(decision_url),
title=None,
decision_date=None,
decision_number=None,
subject_summary=None,
markdown_chunk=None,
current_page=page_number,
total_pages=0,
is_paginated=False,
error_message=error_msg
)
except Exception as e:
error_msg = f"Unexpected error when fetching decision document: {str(e)}"
logger.error(f"KvkkApiClient: {error_msg}")
return KvkkDocumentMarkdown(
source_url=HttpUrl(decision_url),
title=None,
decision_date=None,
decision_number=None,
subject_summary=None,
markdown_chunk=None,
current_page=page_number,
total_pages=0,
is_paginated=False,
error_message=error_msg
)
async def close_client_session(self):
"""Close the HTTP client session."""
if hasattr(self, 'http_client') and self.http_client and not self.http_client.is_closed:
await self.http_client.aclose()
logger.info("KvkkApiClient: HTTP client session closed.")
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/kvkk_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# kvkk_mcp_module/client.py
import httpx
from bs4 import BeautifulSoup
from typing import List, Optional, Dict, Any
import logging
import os
import re
import io
import math
from urllib.parse import urljoin, urlparse, parse_qs
from markitdown import MarkItDown
from pydantic import HttpUrl
from .models import (
KvkkSearchRequest,
KvkkDecisionSummary,
KvkkSearchResult,
KvkkDocumentMarkdown
)
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
class KvkkApiClient:
"""
API client for searching and retrieving KVKK (Personal Data Protection Authority) decisions
using Brave Search API for discovery and direct HTTP requests for content retrieval.
"""
BRAVE_API_URL = "https://api.search.brave.com/res/v1/web/search"
KVKK_BASE_URL = "https://www.kvkk.gov.tr"
DOCUMENT_MARKDOWN_CHUNK_SIZE = 5000 # Character limit per page
def __init__(self, request_timeout: float = 60.0):
"""Initialize the KVKK API client."""
self.brave_api_token = os.getenv("BRAVE_API_TOKEN")
if not self.brave_api_token:
# Fallback to provided free token
self.brave_api_token = "BSAuaRKB-dvSDSQxIN0ft1p2k6N82Kq"
logger.info("Using fallback Brave API token (limited free token)")
else:
logger.info("Using Brave API token from environment variable")
self.http_client = httpx.AsyncClient(
headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
},
timeout=request_timeout,
verify=True,
follow_redirects=True
)
def _construct_search_query(self, keywords: str) -> str:
"""Construct the search query for Brave API."""
base_query = 'site:kvkk.gov.tr "karar özeti"'
if keywords.strip():
return f"{base_query} {keywords.strip()}"
return base_query
def _extract_decision_id_from_url(self, url: str) -> Optional[str]:
"""Extract decision ID from KVKK decision URL."""
try:
# Example URL: https://www.kvkk.gov.tr/Icerik/7288/2021-1303
parsed_url = urlparse(url)
path_parts = parsed_url.path.strip('/').split('/')
if len(path_parts) >= 3 and path_parts[0] == 'Icerik':
# Extract the decision ID from the path
decision_id = '/'.join(path_parts[1:]) # e.g., "7288/2021-1303"
return decision_id
except Exception as e:
logger.debug(f"Could not extract decision ID from URL {url}: {e}")
return None
def _extract_decision_metadata_from_title(self, title: str) -> Dict[str, Optional[str]]:
"""Extract decision metadata from title string."""
metadata = {
"decision_date": None,
"decision_number": None
}
if not title:
return metadata
# Extract decision date (DD/MM/YYYY format)
date_match = re.search(r'(\d{1,2}/\d{1,2}/\d{4})', title)
if date_match:
metadata["decision_date"] = date_match.group(1)
# Extract decision number (YYYY/XXXX format)
number_match = re.search(r'(\d{4}/\d+)', title)
if number_match:
metadata["decision_number"] = number_match.group(1)
return metadata
async def search_decisions(self, params: KvkkSearchRequest) -> KvkkSearchResult:
"""Search for KVKK decisions using Brave API."""
search_query = self._construct_search_query(params.keywords)
logger.info(f"KvkkApiClient: Searching with query: {search_query}")
try:
# Calculate offset for pagination
offset = (params.page - 1) * params.pageSize
response = await self.http_client.get(
self.BRAVE_API_URL,
headers={
"Accept": "application/json",
"Accept-Encoding": "gzip",
"x-subscription-token": self.brave_api_token
},
params={
"q": search_query,
"country": "TR",
"search_lang": "tr",
"ui_lang": "tr-TR",
"offset": offset,
"count": params.pageSize
}
)
response.raise_for_status()
data = response.json()
# Extract search results
decisions = []
web_results = data.get("web", {}).get("results", [])
for result in web_results:
title = result.get("title", "")
url = result.get("url", "")
description = result.get("description", "")
# Extract metadata from title
metadata = self._extract_decision_metadata_from_title(title)
# Extract decision ID from URL
decision_id = self._extract_decision_id_from_url(url)
decision = KvkkDecisionSummary(
title=title,
url=HttpUrl(url) if url else None,
description=description,
decision_id=decision_id,
publication_date=metadata.get("decision_date"),
decision_number=metadata.get("decision_number")
)
decisions.append(decision)
# Get total results if available
total_results = None
query_info = data.get("query", {})
if "total_results" in query_info:
total_results = query_info["total_results"]
return KvkkSearchResult(
decisions=decisions,
total_results=total_results,
page=params.page,
pageSize=params.pageSize,
query=search_query
)
except httpx.RequestError as e:
logger.error(f"KvkkApiClient: HTTP request error during search: {e}")
return KvkkSearchResult(
decisions=[],
total_results=0,
page=params.page,
pageSize=params.pageSize,
query=search_query
)
except Exception as e:
logger.error(f"KvkkApiClient: Unexpected error during search: {e}")
return KvkkSearchResult(
decisions=[],
total_results=0,
page=params.page,
pageSize=params.pageSize,
query=search_query
)
def _extract_decision_content_from_html(self, html: str, url: str) -> Dict[str, Any]:
"""Extract decision content from KVKK decision page HTML."""
try:
soup = BeautifulSoup(html, 'html.parser')
# Extract title
title = None
title_element = soup.find('h3', class_='blog-post-title')
if title_element:
title = title_element.get_text(strip=True)
elif soup.title:
title = soup.title.get_text(strip=True)
# Extract decision content from the main content div
content_div = soup.find('div', class_='blog-post-inner')
if not content_div:
# Fallback to other possible content containers
content_div = soup.find('div', style='text-align:justify;')
if not content_div:
logger.warning(f"Could not find decision content div in {url}")
return {
"title": title,
"decision_date": None,
"decision_number": None,
"subject_summary": None,
"html_content": None
}
# Extract decision metadata from table
decision_date = None
decision_number = None
subject_summary = None
table = content_div.find('table')
if table:
rows = table.find_all('tr')
for row in rows:
cells = row.find_all('td')
if len(cells) >= 3:
field_name = cells[0].get_text(strip=True)
field_value = cells[2].get_text(strip=True)
if 'Karar Tarihi' in field_name:
decision_date = field_value
elif 'Karar No' in field_name:
decision_number = field_value
elif 'Konu Özeti' in field_name:
subject_summary = field_value
return {
"title": title,
"decision_date": decision_date,
"decision_number": decision_number,
"subject_summary": subject_summary,
"html_content": str(content_div)
}
except Exception as e:
logger.error(f"Error extracting content from HTML for {url}: {e}")
return {
"title": None,
"decision_date": None,
"decision_number": None,
"subject_summary": None,
"html_content": None
}
def _convert_html_to_markdown(self, html_content: str) -> Optional[str]:
"""Convert HTML content to Markdown using MarkItDown with BytesIO to avoid filename length issues."""
if not html_content:
return None
try:
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_content.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown(enable_plugins=False)
result = md_converter.convert(html_stream)
return result.text_content
except Exception as e:
logger.error(f"Error converting HTML to Markdown: {e}")
return None
async def get_decision_document(self, decision_url: str, page_number: int = 1) -> KvkkDocumentMarkdown:
"""Retrieve and convert a KVKK decision document to paginated Markdown."""
logger.info(f"KvkkApiClient: Getting decision document from: {decision_url}, page: {page_number}")
try:
# Fetch the decision page
response = await self.http_client.get(decision_url)
response.raise_for_status()
# Extract content from HTML
extracted_data = self._extract_decision_content_from_html(response.text, decision_url)
# Convert HTML content to Markdown
full_markdown_content = None
if extracted_data["html_content"]:
full_markdown_content = self._convert_html_to_markdown(extracted_data["html_content"])
if not full_markdown_content:
return KvkkDocumentMarkdown(
source_url=HttpUrl(decision_url),
title=extracted_data["title"],
decision_date=extracted_data["decision_date"],
decision_number=extracted_data["decision_number"],
subject_summary=extracted_data["subject_summary"],
markdown_chunk=None,
current_page=page_number,
total_pages=0,
is_paginated=False,
error_message="Could not convert document content to Markdown"
)
# Calculate pagination
content_length = len(full_markdown_content)
total_pages = math.ceil(content_length / self.DOCUMENT_MARKDOWN_CHUNK_SIZE)
if total_pages == 0:
total_pages = 1
# Clamp page number to valid range
current_page_clamped = max(1, min(page_number, total_pages))
# Extract the requested chunk
start_index = (current_page_clamped - 1) * self.DOCUMENT_MARKDOWN_CHUNK_SIZE
end_index = start_index + self.DOCUMENT_MARKDOWN_CHUNK_SIZE
markdown_chunk = full_markdown_content[start_index:end_index]
return KvkkDocumentMarkdown(
source_url=HttpUrl(decision_url),
title=extracted_data["title"],
decision_date=extracted_data["decision_date"],
decision_number=extracted_data["decision_number"],
subject_summary=extracted_data["subject_summary"],
markdown_chunk=markdown_chunk,
current_page=current_page_clamped,
total_pages=total_pages,
is_paginated=(total_pages > 1),
error_message=None
)
except httpx.HTTPStatusError as e:
error_msg = f"HTTP error {e.response.status_code} when fetching decision document"
logger.error(f"KvkkApiClient: {error_msg}")
return KvkkDocumentMarkdown(
source_url=HttpUrl(decision_url),
title=None,
decision_date=None,
decision_number=None,
subject_summary=None,
markdown_chunk=None,
current_page=page_number,
total_pages=0,
is_paginated=False,
error_message=error_msg
)
except Exception as e:
error_msg = f"Unexpected error when fetching decision document: {str(e)}"
logger.error(f"KvkkApiClient: {error_msg}")
return KvkkDocumentMarkdown(
source_url=HttpUrl(decision_url),
title=None,
decision_date=None,
decision_number=None,
subject_summary=None,
markdown_chunk=None,
current_page=page_number,
total_pages=0,
is_paginated=False,
error_message=error_msg
)
async def close_client_session(self):
"""Close the HTTP client session."""
if hasattr(self, 'http_client') and self.http_client and not self.http_client.is_closed:
await self.http_client.aclose()
logger.info("KvkkApiClient: HTTP client session closed.")
```
--------------------------------------------------------------------------------
/anayasa_mcp_module/models.py:
--------------------------------------------------------------------------------
```python
# anayasa_mcp_module/models.py
from pydantic import BaseModel, Field, HttpUrl
from typing import List, Optional, Dict, Any, Literal
from enum import Enum
# --- Enums (AnayasaDonemEnum, etc. - same as before) ---
class AnayasaDonemEnum(str, Enum):
TUMU = "ALL"
DONEM_1961 = "1"
DONEM_1982 = "2"
class AnayasaVarYokEnum(str, Enum):
TUMU = "ALL"
YOK = "0"
VAR = "1"
class AnayasaIncelemeSonucuEnum(str, Enum):
TUMU = "ALL"
ESAS_ACILMAMIS_SAYILMA = "1"
ESAS_IPTAL = "2"
ESAS_KARAR_YER_OLMADIGI = "3"
ESAS_RET = "4"
ILK_ACILMAMIS_SAYILMA = "5"
ILK_ISIN_GERI_CEVRILMESI = "6"
ILK_KARAR_YER_OLMADIGI = "7"
ILK_RET = "8"
KANUN_6216_M43_4_IPTAL = "12"
class AnayasaSonucGerekcesiEnum(str, Enum):
TUMU = "ALL"
ANAYASAYA_AYKIRI_DEGIL = "29"
ANAYASAYA_ESAS_YONUNDEN_AYKIRILIK = "1"
ANAYASAYA_ESAS_YONUNDEN_UYGUNLUK = "2"
ANAYASAYA_SEKIL_ESAS_UYGUNLUK = "30"
ANAYASAYA_SEKIL_YONUNDEN_AYKIRILIK = "3"
ANAYASAYA_SEKIL_YONUNDEN_UYGUNLUK = "4"
AYKIRILIK_ANAYASAYA_ESAS_YONUNDEN_DUPLICATE = "27"
BASVURU_KARARI = "5"
DENETIM_DISI = "6"
DIGER_GEREKCE_1 = "7"
DIGER_GEREKCE_2 = "8"
EKSIKLIGIN_GIDERILMEMESI = "9"
GEREKCE = "10"
GOREV = "11"
GOREV_YETKI = "12"
GOREVLI_MAHKEME = "13"
GORULMEKTE_OLAN_DAVA = "14"
MAHKEME = "15"
NORMDA_DEGISIKLIK_YAPILMASI = "16"
NORMUN_YURURLUKTEN_KALDIRILMASI = "17"
ON_YIL_YASAGI = "18"
SURE = "19"
USULE_UYMAMA = "20"
UYGULANACAK_NORM = "21"
UYGULANAMAZ_HALE_GELME = "22"
YETKI = "23"
YETKI_SURE = "24"
YOK_HUKMUNDE_OLMAMA = "25"
YOKLUK = "26"
# --- End Enums ---
class AnayasaNormDenetimiSearchRequest(BaseModel):
"""Model for Anayasa Mahkemesi (Norm Denetimi) search request for the MCP tool."""
keywords_all: Optional[List[str]] = Field(default_factory=list, description="Keywords for AND logic (KelimeAra[]).")
keywords_any: Optional[List[str]] = Field(default_factory=list, description="Keywords for OR logic (HerhangiBirKelimeAra[]).")
keywords_exclude: Optional[List[str]] = Field(default_factory=list, description="Keywords to exclude (BulunmayanKelimeAra[]).")
period: Optional[Literal["ALL", "1", "2"]] = Field(default="ALL", description="Constitutional period (Donemler_id).")
case_number_esas: str = Field("", description="Case registry number (EsasNo), e.g., '2023/123'.")
decision_number_karar: str = Field("", description="Decision number (KararNo), e.g., '2023/456'.")
first_review_date_start: str = Field("", description="First review start date (IlkIncelemeTarihiIlk), format DD/MM/YYYY.")
first_review_date_end: str = Field("", description="First review end date (IlkIncelemeTarihiSon), format DD/MM/YYYY.")
decision_date_start: str = Field("", description="Decision start date (KararTarihiIlk), format DD/MM/YYYY.")
decision_date_end: str = Field("", description="Decision end date (KararTarihiSon), format DD/MM/YYYY.")
application_type: Optional[Literal["ALL", "1", "2", "3"]] = Field(default="ALL", description="Type of application (BasvuruTurler_id).")
applicant_general_name: str = Field("", description="General applicant name (BasvuranGeneller_id).")
applicant_specific_name: str = Field("", description="Specific applicant name (BasvuranOzeller_id).")
official_gazette_date_start: str = Field("", description="Official Gazette start date (ResmiGazeteTarihiIlk), format DD/MM/YYYY.")
official_gazette_date_end: str = Field("", description="Official Gazette end date (ResmiGazeteTarihiSon), format DD/MM/YYYY.")
official_gazette_number_start: str = Field("", description="Official Gazette starting number (ResmiGazeteSayisiIlk).")
official_gazette_number_end: str = Field("", description="Official Gazette ending number (ResmiGazeteSayisiSon).")
has_press_release: Optional[Literal["ALL", "0", "1"]] = Field(default="ALL", description="Press release available (BasinDuyurusu).")
has_dissenting_opinion: Optional[Literal["ALL", "0", "1"]] = Field(default="ALL", description="Dissenting opinion exists (KarsiOy).")
has_different_reasoning: Optional[Literal["ALL", "0", "1"]] = Field(default="ALL", description="Different reasoning exists (FarkliGerekce).")
attending_members_names: Optional[List[str]] = Field(default_factory=list, description="List of attending members' exact names (Uyeler_id[]).")
rapporteur_name: str = Field("", description="Rapporteur's exact name (Raportorler_id).")
norm_type: Optional[Literal["ALL", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "0"]] = Field(default="ALL", description="Type of the reviewed norm (NormunTurler_id).")
norm_id_or_name: str = Field("", description="Number or name of the norm (NormunNumarasiAdlar_id).")
norm_article: str = Field("", description="Article number of the norm (NormunMaddeNumarasi).")
review_outcomes: Optional[List[Literal["1", "2", "3", "4", "5", "6", "7", "8", "12"]]] = Field(default_factory=list, description="List of review types and outcomes (IncelemeTuruKararSonuclar_id[]).")
reason_for_final_outcome: Optional[Literal["ALL", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "29", "30"]] = Field(default="ALL", description="Main reason for the decision outcome (KararSonucununGerekcesi).")
basis_constitution_article_numbers: Optional[List[str]] = Field(default_factory=list, description="List of supporting Constitution article numbers (DayanakHukmu[]).")
results_per_page: int = Field(10, ge=1, le=10, description="Results per page.")
page_to_fetch: int = Field(1, ge=1, description="Page number to fetch for results list.")
sort_by_criteria: str = Field("KararTarihi", description="Sort criteria. Options: 'KararTarihi', 'YayinTarihi', 'Toplam' (keyword count).")
class AnayasaReviewedNormInfo(BaseModel):
"""Details of a norm reviewed within an AYM decision summary."""
norm_name_or_number: str = Field("", description="Norm name or number")
article_number: str = Field("", description="Article number")
review_type_and_outcome: str = Field("", description="Review type and outcome")
outcome_reason: str = Field("", description="Outcome reason")
basis_constitution_articles_cited: List[str] = Field(default_factory=list)
postponement_period: str = Field("", description="Postponement period")
class AnayasaDecisionSummary(BaseModel):
"""Model for a single Anayasa Mahkemesi (Norm Denetimi) decision summary from search results."""
decision_reference_no: str = Field("", description="Decision reference number")
decision_page_url: str = Field("", description="Decision page URL")
keywords_found_count: Optional[int] = Field(0, description="Keywords found count")
application_type_summary: str = Field("", description="Application type summary")
applicant_summary: str = Field("", description="Applicant summary")
decision_outcome_summary: str = Field("", description="Decision outcome summary")
decision_date_summary: str = Field("", description="Decision date summary")
reviewed_norms: List[AnayasaReviewedNormInfo] = Field(default_factory=list)
class AnayasaSearchResult(BaseModel):
"""Model for the overall search result for Anayasa Mahkemesi Norm Denetimi decisions."""
decisions: List[AnayasaDecisionSummary]
total_records_found: int = Field(0, description="Total records found")
retrieved_page_number: int = Field(1, description="Retrieved page number")
class AnayasaDocumentMarkdown(BaseModel):
"""
Model for an Anayasa Mahkemesi (Norm Denetimi) decision document, containing a chunk of Markdown content
and pagination information.
"""
source_url: HttpUrl
decision_reference_no_from_page: str = Field("", description="E.K. No parsed from the document page.")
decision_date_from_page: str = Field("", description="Decision date parsed from the document page.")
official_gazette_info_from_page: str = Field("", description="Official Gazette info parsed from the document page.")
markdown_chunk: str = Field("", description="A 5,000 character chunk of the Markdown content.") # Corrected chunk size
current_page: int = Field(description="The current page number of the markdown chunk (1-indexed).")
total_pages: int = Field(description="Total number of pages for the full markdown content.")
is_paginated: bool = Field(description="True if the full markdown content is split into multiple pages.")
# --- Models for Anayasa Mahkemesi - Bireysel Başvuru Karar Raporu ---
class AnayasaBireyselReportSearchRequest(BaseModel):
"""Model for Anayasa Mahkemesi (Bireysel Başvuru) 'Karar Arama Raporu' search request."""
keywords: Optional[List[str]] = Field(default_factory=list, description="Keywords for AND logic (KelimeAra[]).")
page_to_fetch: int = Field(1, ge=1, description="Page number to fetch for the report (page). Default is 1.")
class AnayasaBireyselReportDecisionDetail(BaseModel):
"""Details of a specific right/claim within a Bireysel Başvuru decision summary in a report."""
hak: str = Field("", description="İhlal edildiği iddia edilen hak (örneğin, Mülkiyet hakkı).")
mudahale_iddiasi: str = Field("", description="İhlale neden olan müdahale iddiası.")
sonuc: str = Field("", description="İnceleme sonucu (örneğin, İhlal, Düşme).")
giderim: str = Field("", description="Kararlaştırılan giderim (örneğin, Yeniden yargılama).")
class AnayasaBireyselReportDecisionSummary(BaseModel):
"""Model for a single Anayasa Mahkemesi (Bireysel Başvuru) decision summary from a 'Karar Arama Raporu'."""
title: str = Field("", description="Başvurunun başlığı (e.g., 'HASAN DURMUŞ Başvurusuna İlişkin Karar').")
decision_reference_no: str = Field("", description="Başvuru Numarası (e.g., '2019/19126').")
decision_page_url: str = Field("", description="URL to the full decision page.")
decision_type_summary: str = Field("", description="Karar Türü (Başvuru Sonucu) (e.g., 'Esas (İhlal)').")
decision_making_body: str = Field("", description="Kararı Veren Birim (e.g., 'Genel Kurul', 'Birinci Bölüm').")
application_date_summary: str = Field("", description="Başvuru Tarihi (DD/MM/YYYY).")
decision_date_summary: str = Field("", description="Karar Tarihi (DD/MM/YYYY).")
application_subject_summary: str = Field("", description="Başvuru konusunun özeti.")
details: List[AnayasaBireyselReportDecisionDetail] = Field(default_factory=list, description="İncelenen haklar ve sonuçlarına ilişkin detaylar.")
class AnayasaBireyselReportSearchResult(BaseModel):
"""Model for the overall search result for Anayasa Mahkemesi 'Karar Arama Raporu'."""
decisions: List[AnayasaBireyselReportDecisionSummary]
total_records_found: int = Field(0, description="Raporda bulunan toplam karar sayısı.")
retrieved_page_number: int = Field(description="Alınan rapor sayfa numarası.")
class AnayasaBireyselBasvuruDocumentMarkdown(BaseModel):
"""
Model for an Anayasa Mahkemesi (Bireysel Başvuru) decision document, containing a chunk of Markdown content
and pagination information. Fetched from /BB/YYYY/NNNN paths.
"""
source_url: HttpUrl
basvuru_no_from_page: Optional[str] = Field(None, description="Başvuru Numarası (B.No) parsed from the document page.")
karar_tarihi_from_page: Optional[str] = Field(None, description="Decision date parsed from the document page.")
basvuru_tarihi_from_page: Optional[str] = Field(None, description="Application date parsed from the document page.")
karari_veren_birim_from_page: Optional[str] = Field(None, description="Deciding body (Bölüm/Genel Kurul) parsed from the document page.")
karar_turu_from_page: Optional[str] = Field(None, description="Decision type (Başvuru Sonucu) parsed from the document page.")
resmi_gazete_info_from_page: Optional[str] = Field(None, description="Official Gazette info parsed from the document page, if available.")
markdown_chunk: Optional[str] = Field(None, description="A 5,000 character chunk of the Markdown content.")
current_page: int = Field(description="The current page number of the markdown chunk (1-indexed).")
total_pages: int = Field(description="Total number of pages for the full markdown content.")
is_paginated: bool = Field(description="True if the full markdown content is split into multiple pages.")
# --- End Models for Bireysel Başvuru ---
# --- Unified Models ---
class AnayasaUnifiedSearchRequest(BaseModel):
"""Unified search request for both Norm Denetimi and Bireysel Başvuru."""
decision_type: Literal["norm_denetimi", "bireysel_basvuru"] = Field(..., description="Decision type: norm_denetimi or bireysel_basvuru")
# Common parameters
keywords: List[str] = Field(default_factory=list, description="Keywords to search for")
page_to_fetch: int = Field(1, ge=1, le=100, description="Page number to fetch (1-100)")
results_per_page: int = Field(10, ge=1, le=100, description="Results per page (1-100)")
# Norm Denetimi specific parameters (ignored for bireysel_basvuru)
keywords_all: List[str] = Field(default_factory=list, description="All keywords must be present (norm_denetimi only)")
keywords_any: List[str] = Field(default_factory=list, description="Any of these keywords (norm_denetimi only)")
decision_type_norm: Literal["ALL", "1", "2", "3"] = Field("ALL", description="Decision type for norm denetimi")
application_date_start: str = Field("", description="Application start date (norm_denetimi only)")
application_date_end: str = Field("", description="Application end date (norm_denetimi only)")
# Bireysel Başvuru specific parameters (ignored for norm_denetimi)
decision_start_date: str = Field("", description="Decision start date (bireysel_basvuru only)")
decision_end_date: str = Field("", description="Decision end date (bireysel_basvuru only)")
norm_type: Literal["ALL", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "0"] = Field("ALL", description="Norm type (bireysel_basvuru only)")
subject_category: str = Field("", description="Subject category (bireysel_basvuru only)")
class AnayasaUnifiedSearchResult(BaseModel):
"""Unified search result containing decisions from either system."""
decision_type: Literal["norm_denetimi", "bireysel_basvuru"] = Field(..., description="Type of decisions returned")
decisions: List[Dict[str, Any]] = Field(default_factory=list, description="Decision list (structure varies by type)")
total_records_found: int = Field(0, description="Total number of records found")
retrieved_page_number: int = Field(1, description="Page number that was retrieved")
class AnayasaUnifiedDocumentMarkdown(BaseModel):
"""Unified document model for both Norm Denetimi and Bireysel Başvuru."""
decision_type: Literal["norm_denetimi", "bireysel_basvuru"] = Field(..., description="Type of document")
source_url: HttpUrl = Field(..., description="Source URL of the document")
document_data: Dict[str, Any] = Field(default_factory=dict, description="Document content and metadata")
markdown_chunk: Optional[str] = Field(None, description="Markdown content chunk")
current_page: int = Field(1, description="Current page number")
total_pages: int = Field(1, description="Total number of pages")
is_paginated: bool = Field(False, description="Whether document is paginated")
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/anayasa_mcp_module/models.py:
--------------------------------------------------------------------------------
```python
# anayasa_mcp_module/models.py
from pydantic import BaseModel, Field, HttpUrl
from typing import List, Optional, Dict, Any, Literal
from enum import Enum
# --- Enums (AnayasaDonemEnum, etc. - same as before) ---
class AnayasaDonemEnum(str, Enum):
TUMU = "ALL"
DONEM_1961 = "1"
DONEM_1982 = "2"
class AnayasaVarYokEnum(str, Enum):
TUMU = "ALL"
YOK = "0"
VAR = "1"
class AnayasaIncelemeSonucuEnum(str, Enum):
TUMU = "ALL"
ESAS_ACILMAMIS_SAYILMA = "1"
ESAS_IPTAL = "2"
ESAS_KARAR_YER_OLMADIGI = "3"
ESAS_RET = "4"
ILK_ACILMAMIS_SAYILMA = "5"
ILK_ISIN_GERI_CEVRILMESI = "6"
ILK_KARAR_YER_OLMADIGI = "7"
ILK_RET = "8"
KANUN_6216_M43_4_IPTAL = "12"
class AnayasaSonucGerekcesiEnum(str, Enum):
TUMU = "ALL"
ANAYASAYA_AYKIRI_DEGIL = "29"
ANAYASAYA_ESAS_YONUNDEN_AYKIRILIK = "1"
ANAYASAYA_ESAS_YONUNDEN_UYGUNLUK = "2"
ANAYASAYA_SEKIL_ESAS_UYGUNLUK = "30"
ANAYASAYA_SEKIL_YONUNDEN_AYKIRILIK = "3"
ANAYASAYA_SEKIL_YONUNDEN_UYGUNLUK = "4"
AYKIRILIK_ANAYASAYA_ESAS_YONUNDEN_DUPLICATE = "27"
BASVURU_KARARI = "5"
DENETIM_DISI = "6"
DIGER_GEREKCE_1 = "7"
DIGER_GEREKCE_2 = "8"
EKSIKLIGIN_GIDERILMEMESI = "9"
GEREKCE = "10"
GOREV = "11"
GOREV_YETKI = "12"
GOREVLI_MAHKEME = "13"
GORULMEKTE_OLAN_DAVA = "14"
MAHKEME = "15"
NORMDA_DEGISIKLIK_YAPILMASI = "16"
NORMUN_YURURLUKTEN_KALDIRILMASI = "17"
ON_YIL_YASAGI = "18"
SURE = "19"
USULE_UYMAMA = "20"
UYGULANACAK_NORM = "21"
UYGULANAMAZ_HALE_GELME = "22"
YETKI = "23"
YETKI_SURE = "24"
YOK_HUKMUNDE_OLMAMA = "25"
YOKLUK = "26"
# --- End Enums ---
class AnayasaNormDenetimiSearchRequest(BaseModel):
"""Model for Anayasa Mahkemesi (Norm Denetimi) search request for the MCP tool."""
keywords_all: Optional[List[str]] = Field(default_factory=list, description="Keywords for AND logic (KelimeAra[]).")
keywords_any: Optional[List[str]] = Field(default_factory=list, description="Keywords for OR logic (HerhangiBirKelimeAra[]).")
keywords_exclude: Optional[List[str]] = Field(default_factory=list, description="Keywords to exclude (BulunmayanKelimeAra[]).")
period: Optional[Literal["ALL", "1", "2"]] = Field(default="ALL", description="Constitutional period (Donemler_id).")
case_number_esas: str = Field("", description="Case registry number (EsasNo), e.g., '2023/123'.")
decision_number_karar: str = Field("", description="Decision number (KararNo), e.g., '2023/456'.")
first_review_date_start: str = Field("", description="First review start date (IlkIncelemeTarihiIlk), format DD/MM/YYYY.")
first_review_date_end: str = Field("", description="First review end date (IlkIncelemeTarihiSon), format DD/MM/YYYY.")
decision_date_start: str = Field("", description="Decision start date (KararTarihiIlk), format DD/MM/YYYY.")
decision_date_end: str = Field("", description="Decision end date (KararTarihiSon), format DD/MM/YYYY.")
application_type: Optional[Literal["ALL", "1", "2", "3"]] = Field(default="ALL", description="Type of application (BasvuruTurler_id).")
applicant_general_name: str = Field("", description="General applicant name (BasvuranGeneller_id).")
applicant_specific_name: str = Field("", description="Specific applicant name (BasvuranOzeller_id).")
official_gazette_date_start: str = Field("", description="Official Gazette start date (ResmiGazeteTarihiIlk), format DD/MM/YYYY.")
official_gazette_date_end: str = Field("", description="Official Gazette end date (ResmiGazeteTarihiSon), format DD/MM/YYYY.")
official_gazette_number_start: str = Field("", description="Official Gazette starting number (ResmiGazeteSayisiIlk).")
official_gazette_number_end: str = Field("", description="Official Gazette ending number (ResmiGazeteSayisiSon).")
has_press_release: Optional[Literal["ALL", "0", "1"]] = Field(default="ALL", description="Press release available (BasinDuyurusu).")
has_dissenting_opinion: Optional[Literal["ALL", "0", "1"]] = Field(default="ALL", description="Dissenting opinion exists (KarsiOy).")
has_different_reasoning: Optional[Literal["ALL", "0", "1"]] = Field(default="ALL", description="Different reasoning exists (FarkliGerekce).")
attending_members_names: Optional[List[str]] = Field(default_factory=list, description="List of attending members' exact names (Uyeler_id[]).")
rapporteur_name: str = Field("", description="Rapporteur's exact name (Raportorler_id).")
norm_type: Optional[Literal["ALL", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "0"]] = Field(default="ALL", description="Type of the reviewed norm (NormunTurler_id).")
norm_id_or_name: str = Field("", description="Number or name of the norm (NormunNumarasiAdlar_id).")
norm_article: str = Field("", description="Article number of the norm (NormunMaddeNumarasi).")
review_outcomes: Optional[List[Literal["1", "2", "3", "4", "5", "6", "7", "8", "12"]]] = Field(default_factory=list, description="List of review types and outcomes (IncelemeTuruKararSonuclar_id[]).")
reason_for_final_outcome: Optional[Literal["ALL", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "29", "30"]] = Field(default="ALL", description="Main reason for the decision outcome (KararSonucununGerekcesi).")
basis_constitution_article_numbers: Optional[List[str]] = Field(default_factory=list, description="List of supporting Constitution article numbers (DayanakHukmu[]).")
results_per_page: int = Field(10, ge=1, le=10, description="Results per page.")
page_to_fetch: int = Field(1, ge=1, description="Page number to fetch for results list.")
sort_by_criteria: str = Field("KararTarihi", description="Sort criteria. Options: 'KararTarihi', 'YayinTarihi', 'Toplam' (keyword count).")
class AnayasaReviewedNormInfo(BaseModel):
"""Details of a norm reviewed within an AYM decision summary."""
norm_name_or_number: str = Field("", description="Norm name or number")
article_number: str = Field("", description="Article number")
review_type_and_outcome: str = Field("", description="Review type and outcome")
outcome_reason: str = Field("", description="Outcome reason")
basis_constitution_articles_cited: List[str] = Field(default_factory=list)
postponement_period: str = Field("", description="Postponement period")
class AnayasaDecisionSummary(BaseModel):
"""Model for a single Anayasa Mahkemesi (Norm Denetimi) decision summary from search results."""
decision_reference_no: str = Field("", description="Decision reference number")
decision_page_url: str = Field("", description="Decision page URL")
keywords_found_count: Optional[int] = Field(0, description="Keywords found count")
application_type_summary: str = Field("", description="Application type summary")
applicant_summary: str = Field("", description="Applicant summary")
decision_outcome_summary: str = Field("", description="Decision outcome summary")
decision_date_summary: str = Field("", description="Decision date summary")
reviewed_norms: List[AnayasaReviewedNormInfo] = Field(default_factory=list)
class AnayasaSearchResult(BaseModel):
"""Model for the overall search result for Anayasa Mahkemesi Norm Denetimi decisions."""
decisions: List[AnayasaDecisionSummary]
total_records_found: int = Field(0, description="Total records found")
retrieved_page_number: int = Field(1, description="Retrieved page number")
class AnayasaDocumentMarkdown(BaseModel):
"""
Model for an Anayasa Mahkemesi (Norm Denetimi) decision document, containing a chunk of Markdown content
and pagination information.
"""
source_url: HttpUrl
decision_reference_no_from_page: str = Field("", description="E.K. No parsed from the document page.")
decision_date_from_page: str = Field("", description="Decision date parsed from the document page.")
official_gazette_info_from_page: str = Field("", description="Official Gazette info parsed from the document page.")
markdown_chunk: str = Field("", description="A 5,000 character chunk of the Markdown content.") # Corrected chunk size
current_page: int = Field(description="The current page number of the markdown chunk (1-indexed).")
total_pages: int = Field(description="Total number of pages for the full markdown content.")
is_paginated: bool = Field(description="True if the full markdown content is split into multiple pages.")
# --- Models for Anayasa Mahkemesi - Bireysel Başvuru Karar Raporu ---
class AnayasaBireyselReportSearchRequest(BaseModel):
"""Model for Anayasa Mahkemesi (Bireysel Başvuru) 'Karar Arama Raporu' search request."""
keywords: Optional[List[str]] = Field(default_factory=list, description="Keywords for AND logic (KelimeAra[]).")
page_to_fetch: int = Field(1, ge=1, description="Page number to fetch for the report (page). Default is 1.")
class AnayasaBireyselReportDecisionDetail(BaseModel):
"""Details of a specific right/claim within a Bireysel Başvuru decision summary in a report."""
hak: str = Field("", description="İhlal edildiği iddia edilen hak (örneğin, Mülkiyet hakkı).")
mudahale_iddiasi: str = Field("", description="İhlale neden olan müdahale iddiası.")
sonuc: str = Field("", description="İnceleme sonucu (örneğin, İhlal, Düşme).")
giderim: str = Field("", description="Kararlaştırılan giderim (örneğin, Yeniden yargılama).")
class AnayasaBireyselReportDecisionSummary(BaseModel):
"""Model for a single Anayasa Mahkemesi (Bireysel Başvuru) decision summary from a 'Karar Arama Raporu'."""
title: str = Field("", description="Başvurunun başlığı (e.g., 'HASAN DURMUŞ Başvurusuna İlişkin Karar').")
decision_reference_no: str = Field("", description="Başvuru Numarası (e.g., '2019/19126').")
decision_page_url: str = Field("", description="URL to the full decision page.")
decision_type_summary: str = Field("", description="Karar Türü (Başvuru Sonucu) (e.g., 'Esas (İhlal)').")
decision_making_body: str = Field("", description="Kararı Veren Birim (e.g., 'Genel Kurul', 'Birinci Bölüm').")
application_date_summary: str = Field("", description="Başvuru Tarihi (DD/MM/YYYY).")
decision_date_summary: str = Field("", description="Karar Tarihi (DD/MM/YYYY).")
application_subject_summary: str = Field("", description="Başvuru konusunun özeti.")
details: List[AnayasaBireyselReportDecisionDetail] = Field(default_factory=list, description="İncelenen haklar ve sonuçlarına ilişkin detaylar.")
class AnayasaBireyselReportSearchResult(BaseModel):
"""Model for the overall search result for Anayasa Mahkemesi 'Karar Arama Raporu'."""
decisions: List[AnayasaBireyselReportDecisionSummary]
total_records_found: int = Field(0, description="Raporda bulunan toplam karar sayısı.")
retrieved_page_number: int = Field(description="Alınan rapor sayfa numarası.")
class AnayasaBireyselBasvuruDocumentMarkdown(BaseModel):
"""
Model for an Anayasa Mahkemesi (Bireysel Başvuru) decision document, containing a chunk of Markdown content
and pagination information. Fetched from /BB/YYYY/NNNN paths.
"""
source_url: HttpUrl
basvuru_no_from_page: Optional[str] = Field(None, description="Başvuru Numarası (B.No) parsed from the document page.")
karar_tarihi_from_page: Optional[str] = Field(None, description="Decision date parsed from the document page.")
basvuru_tarihi_from_page: Optional[str] = Field(None, description="Application date parsed from the document page.")
karari_veren_birim_from_page: Optional[str] = Field(None, description="Deciding body (Bölüm/Genel Kurul) parsed from the document page.")
karar_turu_from_page: Optional[str] = Field(None, description="Decision type (Başvuru Sonucu) parsed from the document page.")
resmi_gazete_info_from_page: Optional[str] = Field(None, description="Official Gazette info parsed from the document page, if available.")
markdown_chunk: Optional[str] = Field(None, description="A 5,000 character chunk of the Markdown content.")
current_page: int = Field(description="The current page number of the markdown chunk (1-indexed).")
total_pages: int = Field(description="Total number of pages for the full markdown content.")
is_paginated: bool = Field(description="True if the full markdown content is split into multiple pages.")
# --- End Models for Bireysel Başvuru ---
# --- Unified Models ---
class AnayasaUnifiedSearchRequest(BaseModel):
"""Unified search request for both Norm Denetimi and Bireysel Başvuru."""
decision_type: Literal["norm_denetimi", "bireysel_basvuru"] = Field(..., description="Decision type: norm_denetimi or bireysel_basvuru")
# Common parameters
keywords: List[str] = Field(default_factory=list, description="Keywords to search for")
page_to_fetch: int = Field(1, ge=1, le=100, description="Page number to fetch (1-100)")
results_per_page: int = Field(10, ge=1, le=100, description="Results per page (1-100)")
# Norm Denetimi specific parameters (ignored for bireysel_basvuru)
keywords_all: List[str] = Field(default_factory=list, description="All keywords must be present (norm_denetimi only)")
keywords_any: List[str] = Field(default_factory=list, description="Any of these keywords (norm_denetimi only)")
decision_type_norm: Literal["ALL", "1", "2", "3"] = Field("ALL", description="Decision type for norm denetimi")
application_date_start: str = Field("", description="Application start date (norm_denetimi only)")
application_date_end: str = Field("", description="Application end date (norm_denetimi only)")
# Bireysel Başvuru specific parameters (ignored for norm_denetimi)
decision_start_date: str = Field("", description="Decision start date (bireysel_basvuru only)")
decision_end_date: str = Field("", description="Decision end date (bireysel_basvuru only)")
norm_type: Literal["ALL", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "0"] = Field("ALL", description="Norm type (bireysel_basvuru only)")
subject_category: str = Field("", description="Subject category (bireysel_basvuru only)")
class AnayasaUnifiedSearchResult(BaseModel):
"""Unified search result containing decisions from either system."""
decision_type: Literal["norm_denetimi", "bireysel_basvuru"] = Field(..., description="Type of decisions returned")
decisions: List[Dict[str, Any]] = Field(default_factory=list, description="Decision list (structure varies by type)")
total_records_found: int = Field(0, description="Total number of records found")
retrieved_page_number: int = Field(1, description="Page number that was retrieved")
class AnayasaUnifiedDocumentMarkdown(BaseModel):
"""Unified document model for both Norm Denetimi and Bireysel Başvuru."""
decision_type: Literal["norm_denetimi", "bireysel_basvuru"] = Field(..., description="Type of document")
source_url: HttpUrl = Field(..., description="Source URL of the document")
document_data: Dict[str, Any] = Field(default_factory=dict, description="Document content and metadata")
markdown_chunk: Optional[str] = Field(None, description="Markdown content chunk")
current_page: int = Field(1, description="Current page number")
total_pages: int = Field(1, description="Total number of pages")
is_paginated: bool = Field(False, description="Whether document is paginated")
```
--------------------------------------------------------------------------------
/redis_session_store.py:
--------------------------------------------------------------------------------
```python
"""
Redis Session Store for OAuth Authorization Codes and User Sessions
This module provides Redis-based storage for OAuth authorization codes and user sessions,
enabling multi-machine deployment support by replacing in-memory storage.
Uses Upstash Redis via REST API for serverless-friendly operation.
"""
import os
import json
import time
import logging
from typing import Optional, Dict, Any, Union
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
try:
from upstash_redis import Redis
UPSTASH_AVAILABLE = True
except ImportError:
UPSTASH_AVAILABLE = False
Redis = None
# Use standard Python exceptions for Redis connection errors
import socket
from requests.exceptions import ConnectionError as RequestsConnectionError, Timeout as RequestsTimeout
class RedisSessionStore:
"""
Redis-based session store for OAuth flows and user sessions.
Uses Upstash Redis REST API for connection-free operation suitable for
multi-instance deployments on platforms like Fly.io.
"""
def __init__(self):
"""Initialize Redis connection using environment variables."""
if not UPSTASH_AVAILABLE:
raise ImportError("upstash-redis package is required. Install with: pip install upstash-redis")
# Initialize Upstash Redis client from environment with optimized connection settings
try:
# Get Upstash Redis configuration
redis_url = os.getenv("UPSTASH_REDIS_REST_URL")
redis_token = os.getenv("UPSTASH_REDIS_REST_TOKEN")
if not redis_url or not redis_token:
raise ValueError("UPSTASH_REDIS_REST_URL and UPSTASH_REDIS_REST_TOKEN must be set")
logger.info(f"Connecting to Upstash Redis at {redis_url[:30]}...")
# Initialize with explicit configuration for better SSL handling
self.redis = Redis(
url=redis_url,
token=redis_token
)
logger.info("Upstash Redis client created")
# Skip connection test during initialization to prevent server hang
# Connection will be tested during first actual operation
logger.info("Redis client initialized - connection will be tested on first use")
except Exception as e:
logger.error(f"Failed to initialize Upstash Redis: {e}")
raise
# TTL values (in seconds)
self.oauth_code_ttl = int(os.getenv("OAUTH_CODE_TTL", "600")) # 10 minutes
self.session_ttl = int(os.getenv("SESSION_TTL", "3600")) # 1 hour
def _serialize_data(self, data: Dict[str, Any]) -> Dict[str, str]:
"""Convert data to Redis-compatible string format."""
serialized = {}
for key, value in data.items():
if isinstance(value, (dict, list)):
serialized[key] = json.dumps(value)
elif isinstance(value, (int, float)):
serialized[key] = str(value)
elif isinstance(value, bool):
serialized[key] = "true" if value else "false"
else:
serialized[key] = str(value)
return serialized
def _deserialize_data(self, data: Dict[str, str]) -> Dict[str, Any]:
"""Convert Redis string data back to original types."""
if not data:
return {}
deserialized = {}
for key, value in data.items():
if not isinstance(value, str):
deserialized[key] = value
continue
# Try to deserialize JSON
if value.startswith(('[', '{')):
try:
deserialized[key] = json.loads(value)
continue
except json.JSONDecodeError:
pass
# Try to convert numbers
if value.isdigit():
deserialized[key] = int(value)
continue
if value.replace('.', '').isdigit():
try:
deserialized[key] = float(value)
continue
except ValueError:
pass
# Handle booleans
if value in ("true", "false"):
deserialized[key] = value == "true"
continue
# Keep as string
deserialized[key] = value
return deserialized
# OAuth Authorization Code Methods
def set_oauth_code(self, code: str, data: Dict[str, Any]) -> bool:
"""
Store OAuth authorization code with automatic expiration.
Args:
code: Authorization code string
data: Code data including user_id, client_id, etc.
Returns:
True if stored successfully, False otherwise
"""
try:
key = f"oauth:code:{code}"
# Add timestamp for debugging
data_with_timestamp = data.copy()
data_with_timestamp.update({
"created_at": time.time(),
"expires_at": time.time() + self.oauth_code_ttl
})
# Serialize and store - Upstash Redis doesn't support mapping parameter
serialized_data = self._serialize_data(data_with_timestamp)
# Use individual hset calls for each field with retry logic
max_retries = 3
for attempt in range(max_retries):
try:
# Clear any existing data first
self.redis.delete(key)
# Set all fields in a pipeline-like manner
for field, value in serialized_data.items():
self.redis.hset(key, field, value)
# Set expiration
self.redis.expire(key, self.oauth_code_ttl)
logger.info(f"Stored OAuth code {code[:10]}... with TTL {self.oauth_code_ttl}s (attempt {attempt + 1})")
return True
except (RequestsConnectionError, RequestsTimeout, OSError, socket.error) as e:
logger.warning(f"Redis connection error on attempt {attempt + 1}: {e}")
if attempt == max_retries - 1:
raise # Re-raise on final attempt
time.sleep(0.5 * (attempt + 1)) # Exponential backoff
except Exception as e:
logger.error(f"Failed to store OAuth code {code[:10]}... after {max_retries} attempts: {e}")
return False
def get_oauth_code(self, code: str, delete_after_use: bool = True) -> Optional[Dict[str, Any]]:
"""
Retrieve OAuth authorization code data.
Args:
code: Authorization code string
delete_after_use: If True, delete the code after retrieval (one-time use)
Returns:
Code data dictionary or None if not found/expired
"""
max_retries = 3
for attempt in range(max_retries):
try:
key = f"oauth:code:{code}"
# Get all hash fields with retry
data = self.redis.hgetall(key)
if not data:
logger.warning(f"OAuth code {code[:10]}... not found or expired (attempt {attempt + 1})")
return None
# Deserialize data
deserialized_data = self._deserialize_data(data)
# Check manual expiration (in case Redis TTL failed)
expires_at = deserialized_data.get("expires_at", 0)
if expires_at and time.time() > expires_at:
logger.warning(f"OAuth code {code[:10]}... manually expired")
try:
self.redis.delete(key)
except Exception as del_error:
logger.warning(f"Failed to delete expired code: {del_error}")
return None
# Delete after use for security (one-time use)
if delete_after_use:
try:
self.redis.delete(key)
logger.info(f"Retrieved and deleted OAuth code {code[:10]}... (attempt {attempt + 1})")
except Exception as del_error:
logger.warning(f"Failed to delete code after use: {del_error}")
# Continue anyway since we got the data
else:
logger.info(f"Retrieved OAuth code {code[:10]}... (not deleted, attempt {attempt + 1})")
return deserialized_data
except (RequestsConnectionError, RequestsTimeout, OSError, socket.error) as e:
logger.warning(f"Redis connection error on retrieval attempt {attempt + 1}: {e}")
if attempt == max_retries - 1:
logger.error(f"Failed to retrieve OAuth code {code[:10]}... after {max_retries} attempts: {e}")
return None
time.sleep(0.5 * (attempt + 1)) # Exponential backoff
except Exception as e:
logger.error(f"Failed to retrieve OAuth code {code[:10]}... on attempt {attempt + 1}: {e}")
if attempt == max_retries - 1:
return None
time.sleep(0.5 * (attempt + 1))
return None
# User Session Methods
def set_session(self, session_id: str, user_data: Dict[str, Any]) -> bool:
"""
Store user session data with sliding expiration.
Args:
session_id: Unique session identifier
user_data: User session data (user_id, email, scopes, etc.)
Returns:
True if stored successfully, False otherwise
"""
try:
key = f"session:{session_id}"
# Add session metadata
session_data = user_data.copy()
session_data.update({
"session_id": session_id,
"created_at": time.time(),
"last_accessed": time.time()
})
# Serialize and store - Upstash Redis doesn't support mapping parameter
serialized_data = self._serialize_data(session_data)
# Use individual hset calls for each field (Upstash compatibility)
for field, value in serialized_data.items():
self.redis.hset(key, field, value)
self.redis.expire(key, self.session_ttl)
logger.info(f"Stored session {session_id[:10]}... with TTL {self.session_ttl}s")
return True
except Exception as e:
logger.error(f"Failed to store session {session_id[:10]}...: {e}")
return False
def get_session(self, session_id: str, refresh_ttl: bool = True) -> Optional[Dict[str, Any]]:
"""
Retrieve user session data.
Args:
session_id: Session identifier
refresh_ttl: If True, extend session TTL on access
Returns:
Session data dictionary or None if not found/expired
"""
try:
key = f"session:{session_id}"
# Get session data
data = self.redis.hgetall(key)
if not data:
logger.warning(f"Session {session_id[:10]}... not found or expired")
return None
# Deserialize data
session_data = self._deserialize_data(data)
# Update last accessed time and refresh TTL
if refresh_ttl:
session_data["last_accessed"] = time.time()
self.redis.hset(key, "last_accessed", str(time.time()))
self.redis.expire(key, self.session_ttl)
logger.debug(f"Refreshed session {session_id[:10]}... TTL")
return session_data
except Exception as e:
logger.error(f"Failed to retrieve session {session_id[:10]}...: {e}")
return None
def delete_session(self, session_id: str) -> bool:
"""
Delete user session (logout).
Args:
session_id: Session identifier
Returns:
True if deleted successfully, False otherwise
"""
try:
key = f"session:{session_id}"
result = self.redis.delete(key)
if result:
logger.info(f"Deleted session {session_id[:10]}...")
return True
else:
logger.warning(f"Session {session_id[:10]}... not found for deletion")
return False
except Exception as e:
logger.error(f"Failed to delete session {session_id[:10]}...: {e}")
return False
# Health Check Methods
def health_check(self) -> Dict[str, Any]:
"""
Perform Redis health check.
Returns:
Health status dictionary
"""
try:
# Test basic operations
test_key = f"health:check:{int(time.time())}"
test_value = {"timestamp": time.time(), "test": True}
# Test set - Use individual hset calls for Upstash compatibility
serialized_test = self._serialize_data(test_value)
for field, value in serialized_test.items():
self.redis.hset(test_key, field, value)
# Test get
retrieved = self.redis.hgetall(test_key)
# Test delete
self.redis.delete(test_key)
return {
"status": "healthy",
"redis_connected": True,
"operations_working": bool(retrieved),
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Redis health check failed: {e}")
return {
"status": "unhealthy",
"redis_connected": False,
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
def get_stats(self) -> Dict[str, Any]:
"""
Get Redis usage statistics.
Returns:
Statistics dictionary
"""
try:
# Get basic info (not all Upstash plans support INFO command)
stats = {
"oauth_codes_pattern": "oauth:code:*",
"sessions_pattern": "session:*",
"timestamp": datetime.utcnow().isoformat()
}
try:
# Try to get counts (may fail on some Upstash plans)
oauth_keys = self.redis.keys("oauth:code:*")
session_keys = self.redis.keys("session:*")
stats.update({
"active_oauth_codes": len(oauth_keys) if oauth_keys else 0,
"active_sessions": len(session_keys) if session_keys else 0
})
except Exception as e:
logger.warning(f"Could not get detailed stats: {e}")
stats["warning"] = "Detailed stats not available on this Redis plan"
return stats
except Exception as e:
logger.error(f"Failed to get Redis stats: {e}")
return {"error": str(e), "timestamp": datetime.utcnow().isoformat()}
# Global instance for easy importing
redis_store = None
def get_redis_store() -> Optional[RedisSessionStore]:
"""
Get global Redis store instance (singleton pattern).
Returns:
RedisSessionStore instance or None if initialization fails
"""
global redis_store
if redis_store is None:
try:
logger.info("Initializing Redis store...")
redis_store = RedisSessionStore()
logger.info("Redis store initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Redis store: {e}")
redis_store = None
return redis_store
def init_redis_store() -> RedisSessionStore:
"""
Initialize Redis store and perform health check.
Returns:
RedisSessionStore instance
Raises:
Exception if Redis is not available or unhealthy
"""
store = get_redis_store()
# Perform health check
health = store.health_check()
if health["status"] != "healthy":
raise Exception(f"Redis health check failed: {health}")
logger.info("Redis session store initialized and healthy")
return store
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/redis_session_store.py:
--------------------------------------------------------------------------------
```python
"""
Redis Session Store for OAuth Authorization Codes and User Sessions
This module provides Redis-based storage for OAuth authorization codes and user sessions,
enabling multi-machine deployment support by replacing in-memory storage.
Uses Upstash Redis via REST API for serverless-friendly operation.
"""
import os
import json
import time
import logging
from typing import Optional, Dict, Any, Union
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
try:
from upstash_redis import Redis
UPSTASH_AVAILABLE = True
except ImportError:
UPSTASH_AVAILABLE = False
Redis = None
# Use standard Python exceptions for Redis connection errors
import socket
from requests.exceptions import ConnectionError as RequestsConnectionError, Timeout as RequestsTimeout
class RedisSessionStore:
"""
Redis-based session store for OAuth flows and user sessions.
Uses Upstash Redis REST API for connection-free operation suitable for
multi-instance deployments on platforms like Fly.io.
"""
def __init__(self):
"""Initialize Redis connection using environment variables."""
if not UPSTASH_AVAILABLE:
raise ImportError("upstash-redis package is required. Install with: pip install upstash-redis")
# Initialize Upstash Redis client from environment with optimized connection settings
try:
# Get Upstash Redis configuration
redis_url = os.getenv("UPSTASH_REDIS_REST_URL")
redis_token = os.getenv("UPSTASH_REDIS_REST_TOKEN")
if not redis_url or not redis_token:
raise ValueError("UPSTASH_REDIS_REST_URL and UPSTASH_REDIS_REST_TOKEN must be set")
logger.info(f"Connecting to Upstash Redis at {redis_url[:30]}...")
# Initialize with explicit configuration for better SSL handling
self.redis = Redis(
url=redis_url,
token=redis_token
)
logger.info("Upstash Redis client created")
# Skip connection test during initialization to prevent server hang
# Connection will be tested during first actual operation
logger.info("Redis client initialized - connection will be tested on first use")
except Exception as e:
logger.error(f"Failed to initialize Upstash Redis: {e}")
raise
# TTL values (in seconds)
self.oauth_code_ttl = int(os.getenv("OAUTH_CODE_TTL", "600")) # 10 minutes
self.session_ttl = int(os.getenv("SESSION_TTL", "3600")) # 1 hour
def _serialize_data(self, data: Dict[str, Any]) -> Dict[str, str]:
"""Convert data to Redis-compatible string format."""
serialized = {}
for key, value in data.items():
if isinstance(value, (dict, list)):
serialized[key] = json.dumps(value)
elif isinstance(value, (int, float)):
serialized[key] = str(value)
elif isinstance(value, bool):
serialized[key] = "true" if value else "false"
else:
serialized[key] = str(value)
return serialized
def _deserialize_data(self, data: Dict[str, str]) -> Dict[str, Any]:
"""Convert Redis string data back to original types."""
if not data:
return {}
deserialized = {}
for key, value in data.items():
if not isinstance(value, str):
deserialized[key] = value
continue
# Try to deserialize JSON
if value.startswith(('[', '{')):
try:
deserialized[key] = json.loads(value)
continue
except json.JSONDecodeError:
pass
# Try to convert numbers
if value.isdigit():
deserialized[key] = int(value)
continue
if value.replace('.', '').isdigit():
try:
deserialized[key] = float(value)
continue
except ValueError:
pass
# Handle booleans
if value in ("true", "false"):
deserialized[key] = value == "true"
continue
# Keep as string
deserialized[key] = value
return deserialized
# OAuth Authorization Code Methods
def set_oauth_code(self, code: str, data: Dict[str, Any]) -> bool:
"""
Store OAuth authorization code with automatic expiration.
Args:
code: Authorization code string
data: Code data including user_id, client_id, etc.
Returns:
True if stored successfully, False otherwise
"""
try:
key = f"oauth:code:{code}"
# Add timestamp for debugging
data_with_timestamp = data.copy()
data_with_timestamp.update({
"created_at": time.time(),
"expires_at": time.time() + self.oauth_code_ttl
})
# Serialize and store - Upstash Redis doesn't support mapping parameter
serialized_data = self._serialize_data(data_with_timestamp)
# Use individual hset calls for each field with retry logic
max_retries = 3
for attempt in range(max_retries):
try:
# Clear any existing data first
self.redis.delete(key)
# Set all fields in a pipeline-like manner
for field, value in serialized_data.items():
self.redis.hset(key, field, value)
# Set expiration
self.redis.expire(key, self.oauth_code_ttl)
logger.info(f"Stored OAuth code {code[:10]}... with TTL {self.oauth_code_ttl}s (attempt {attempt + 1})")
return True
except (RequestsConnectionError, RequestsTimeout, OSError, socket.error) as e:
logger.warning(f"Redis connection error on attempt {attempt + 1}: {e}")
if attempt == max_retries - 1:
raise # Re-raise on final attempt
time.sleep(0.5 * (attempt + 1)) # Exponential backoff
except Exception as e:
logger.error(f"Failed to store OAuth code {code[:10]}... after {max_retries} attempts: {e}")
return False
def get_oauth_code(self, code: str, delete_after_use: bool = True) -> Optional[Dict[str, Any]]:
"""
Retrieve OAuth authorization code data.
Args:
code: Authorization code string
delete_after_use: If True, delete the code after retrieval (one-time use)
Returns:
Code data dictionary or None if not found/expired
"""
max_retries = 3
for attempt in range(max_retries):
try:
key = f"oauth:code:{code}"
# Get all hash fields with retry
data = self.redis.hgetall(key)
if not data:
logger.warning(f"OAuth code {code[:10]}... not found or expired (attempt {attempt + 1})")
return None
# Deserialize data
deserialized_data = self._deserialize_data(data)
# Check manual expiration (in case Redis TTL failed)
expires_at = deserialized_data.get("expires_at", 0)
if expires_at and time.time() > expires_at:
logger.warning(f"OAuth code {code[:10]}... manually expired")
try:
self.redis.delete(key)
except Exception as del_error:
logger.warning(f"Failed to delete expired code: {del_error}")
return None
# Delete after use for security (one-time use)
if delete_after_use:
try:
self.redis.delete(key)
logger.info(f"Retrieved and deleted OAuth code {code[:10]}... (attempt {attempt + 1})")
except Exception as del_error:
logger.warning(f"Failed to delete code after use: {del_error}")
# Continue anyway since we got the data
else:
logger.info(f"Retrieved OAuth code {code[:10]}... (not deleted, attempt {attempt + 1})")
return deserialized_data
except (RequestsConnectionError, RequestsTimeout, OSError, socket.error) as e:
logger.warning(f"Redis connection error on retrieval attempt {attempt + 1}: {e}")
if attempt == max_retries - 1:
logger.error(f"Failed to retrieve OAuth code {code[:10]}... after {max_retries} attempts: {e}")
return None
time.sleep(0.5 * (attempt + 1)) # Exponential backoff
except Exception as e:
logger.error(f"Failed to retrieve OAuth code {code[:10]}... on attempt {attempt + 1}: {e}")
if attempt == max_retries - 1:
return None
time.sleep(0.5 * (attempt + 1))
return None
# User Session Methods
def set_session(self, session_id: str, user_data: Dict[str, Any]) -> bool:
"""
Store user session data with sliding expiration.
Args:
session_id: Unique session identifier
user_data: User session data (user_id, email, scopes, etc.)
Returns:
True if stored successfully, False otherwise
"""
try:
key = f"session:{session_id}"
# Add session metadata
session_data = user_data.copy()
session_data.update({
"session_id": session_id,
"created_at": time.time(),
"last_accessed": time.time()
})
# Serialize and store - Upstash Redis doesn't support mapping parameter
serialized_data = self._serialize_data(session_data)
# Use individual hset calls for each field (Upstash compatibility)
for field, value in serialized_data.items():
self.redis.hset(key, field, value)
self.redis.expire(key, self.session_ttl)
logger.info(f"Stored session {session_id[:10]}... with TTL {self.session_ttl}s")
return True
except Exception as e:
logger.error(f"Failed to store session {session_id[:10]}...: {e}")
return False
def get_session(self, session_id: str, refresh_ttl: bool = True) -> Optional[Dict[str, Any]]:
"""
Retrieve user session data.
Args:
session_id: Session identifier
refresh_ttl: If True, extend session TTL on access
Returns:
Session data dictionary or None if not found/expired
"""
try:
key = f"session:{session_id}"
# Get session data
data = self.redis.hgetall(key)
if not data:
logger.warning(f"Session {session_id[:10]}... not found or expired")
return None
# Deserialize data
session_data = self._deserialize_data(data)
# Update last accessed time and refresh TTL
if refresh_ttl:
session_data["last_accessed"] = time.time()
self.redis.hset(key, "last_accessed", str(time.time()))
self.redis.expire(key, self.session_ttl)
logger.debug(f"Refreshed session {session_id[:10]}... TTL")
return session_data
except Exception as e:
logger.error(f"Failed to retrieve session {session_id[:10]}...: {e}")
return None
def delete_session(self, session_id: str) -> bool:
"""
Delete user session (logout).
Args:
session_id: Session identifier
Returns:
True if deleted successfully, False otherwise
"""
try:
key = f"session:{session_id}"
result = self.redis.delete(key)
if result:
logger.info(f"Deleted session {session_id[:10]}...")
return True
else:
logger.warning(f"Session {session_id[:10]}... not found for deletion")
return False
except Exception as e:
logger.error(f"Failed to delete session {session_id[:10]}...: {e}")
return False
# Health Check Methods
def health_check(self) -> Dict[str, Any]:
"""
Perform Redis health check.
Returns:
Health status dictionary
"""
try:
# Test basic operations
test_key = f"health:check:{int(time.time())}"
test_value = {"timestamp": time.time(), "test": True}
# Test set - Use individual hset calls for Upstash compatibility
serialized_test = self._serialize_data(test_value)
for field, value in serialized_test.items():
self.redis.hset(test_key, field, value)
# Test get
retrieved = self.redis.hgetall(test_key)
# Test delete
self.redis.delete(test_key)
return {
"status": "healthy",
"redis_connected": True,
"operations_working": bool(retrieved),
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Redis health check failed: {e}")
return {
"status": "unhealthy",
"redis_connected": False,
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
def get_stats(self) -> Dict[str, Any]:
"""
Get Redis usage statistics.
Returns:
Statistics dictionary
"""
try:
# Get basic info (not all Upstash plans support INFO command)
stats = {
"oauth_codes_pattern": "oauth:code:*",
"sessions_pattern": "session:*",
"timestamp": datetime.utcnow().isoformat()
}
try:
# Try to get counts (may fail on some Upstash plans)
oauth_keys = self.redis.keys("oauth:code:*")
session_keys = self.redis.keys("session:*")
stats.update({
"active_oauth_codes": len(oauth_keys) if oauth_keys else 0,
"active_sessions": len(session_keys) if session_keys else 0
})
except Exception as e:
logger.warning(f"Could not get detailed stats: {e}")
stats["warning"] = "Detailed stats not available on this Redis plan"
return stats
except Exception as e:
logger.error(f"Failed to get Redis stats: {e}")
return {"error": str(e), "timestamp": datetime.utcnow().isoformat()}
# Global instance for easy importing
redis_store = None
def get_redis_store() -> Optional[RedisSessionStore]:
"""
Get global Redis store instance (singleton pattern).
Returns:
RedisSessionStore instance or None if initialization fails
"""
global redis_store
if redis_store is None:
try:
logger.info("Initializing Redis store...")
redis_store = RedisSessionStore()
logger.info("Redis store initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Redis store: {e}")
redis_store = None
return redis_store
def init_redis_store() -> RedisSessionStore:
"""
Initialize Redis store and perform health check.
Returns:
RedisSessionStore instance
Raises:
Exception if Redis is not available or unhealthy
"""
store = get_redis_store()
# Perform health check
health = store.health_check()
if health["status"] != "healthy":
raise Exception(f"Redis health check failed: {health}")
logger.info("Redis session store initialized and healthy")
return store
```
--------------------------------------------------------------------------------
/anayasa_mcp_module/bireysel_client.py:
--------------------------------------------------------------------------------
```python
# anayasa_mcp_module/bireysel_client.py
# This client is for Bireysel Başvuru: https://kararlarbilgibankasi.anayasa.gov.tr
import httpx
from bs4 import BeautifulSoup, Tag
from typing import Dict, Any, List, Optional, Tuple
import logging
import html
import re
import io
from urllib.parse import urlencode, urljoin, quote
from markitdown import MarkItDown
import math # For math.ceil for pagination
from .models import (
AnayasaBireyselReportSearchRequest,
AnayasaBireyselReportDecisionDetail,
AnayasaBireyselReportDecisionSummary,
AnayasaBireyselReportSearchResult,
AnayasaBireyselBasvuruDocumentMarkdown, # Model for Bireysel Başvuru document
)
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
class AnayasaBireyselBasvuruApiClient:
BASE_URL = "https://kararlarbilgibankasi.anayasa.gov.tr"
SEARCH_PATH = "/Ara"
DOCUMENT_MARKDOWN_CHUNK_SIZE = 5000 # Character limit per page
def __init__(self, request_timeout: float = 60.0):
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
},
timeout=request_timeout,
verify=True,
follow_redirects=True
)
def _build_query_params_for_bireysel_report(self, params: AnayasaBireyselReportSearchRequest) -> List[Tuple[str, str]]:
query_params: List[Tuple[str, str]] = []
query_params.append(("KararBulteni", "1")) # Specific to this report type
if params.keywords:
for kw in params.keywords:
query_params.append(("KelimeAra[]", kw))
if params.page_to_fetch and params.page_to_fetch > 1:
query_params.append(("page", str(params.page_to_fetch)))
return query_params
async def search_bireysel_basvuru_report(
self,
params: AnayasaBireyselReportSearchRequest
) -> AnayasaBireyselReportSearchResult:
final_query_params = self._build_query_params_for_bireysel_report(params)
request_url = self.SEARCH_PATH
logger.info(f"AnayasaBireyselBasvuruApiClient: Performing Bireysel Başvuru Report search. Path: {request_url}, Params: {final_query_params}")
try:
response = await self.http_client.get(request_url, params=final_query_params)
response.raise_for_status()
html_content = response.text
except httpx.RequestError as e:
logger.error(f"AnayasaBireyselBasvuruApiClient: HTTP request error during Bireysel Başvuru Report search: {e}")
raise
except Exception as e:
logger.error(f"AnayasaBireyselBasvuruApiClient: Error processing Bireysel Başvuru Report search request: {e}")
raise
soup = BeautifulSoup(html_content, 'html.parser')
total_records = None
bulunan_karar_div = soup.find("div", class_="bulunankararsayisi")
if bulunan_karar_div:
match_records = re.search(r'(\d+)\s*Karar Bulundu', bulunan_karar_div.get_text(strip=True))
if match_records:
total_records = int(match_records.group(1))
processed_decisions: List[AnayasaBireyselReportDecisionSummary] = []
report_content_area = soup.find("div", class_="HaberBulteni")
if not report_content_area:
logger.warning("HaberBulteni div not found, attempting to parse decision divs from the whole page.")
report_content_area = soup
decision_divs = report_content_area.find_all("div", class_="KararBulteniBirKarar")
if not decision_divs:
logger.warning("No KararBulteniBirKarar divs found.")
for decision_div in decision_divs:
title_tag = decision_div.find("h4")
title_text = title_tag.get_text(strip=True) if title_tag and title_tag.strong else (title_tag.get_text(strip=True) if title_tag else "")
alti_cizili_div = decision_div.find("div", class_="AltiCizili")
ref_no, dec_type, body, app_date, dec_date, url_path = "", "", "", "", "", ""
if alti_cizili_div:
link_tag = alti_cizili_div.find("a", href=True)
if link_tag:
ref_no = link_tag.get_text(strip=True)
url_path = link_tag['href']
parts_text = alti_cizili_div.get_text(separator="|", strip=True)
parts = [part.strip() for part in parts_text.split("|")]
# Clean ref_no from the first part if it was extracted from link
if ref_no and parts and parts[0].strip().startswith(ref_no):
parts[0] = parts[0].replace(ref_no, "").strip()
if not parts[0]: parts.pop(0) # Remove empty string if ref_no was the only content
# Assign parts based on typical order, adjusting for missing ref_no at start
current_idx = 0
if not ref_no and len(parts) > current_idx and re.match(r"\d+/\d+", parts[current_idx]): # Check if first part is ref_no
ref_no = parts[current_idx]
current_idx += 1
dec_type = parts[current_idx] if len(parts) > current_idx else ""
current_idx += 1
body = parts[current_idx] if len(parts) > current_idx else ""
current_idx += 1
app_date_raw = parts[current_idx] if len(parts) > current_idx else ""
current_idx += 1
dec_date_raw = parts[current_idx] if len(parts) > current_idx else ""
if app_date_raw and "Başvuru Tarihi :" in app_date_raw:
app_date = app_date_raw.replace("Başvuru Tarihi :", "").strip()
elif app_date_raw: # If label is missing but format matches
app_date_match = re.search(r'(\d{1,2}/\d{1,2}/\d{4})', app_date_raw)
if app_date_match: app_date = app_date_match.group(1)
if dec_date_raw and "Karar Tarihi :" in dec_date_raw:
dec_date = dec_date_raw.replace("Karar Tarihi :", "").strip()
elif dec_date_raw: # If label is missing but format matches
dec_date_match = re.search(r'(\d{1,2}/\d{1,2}/\d{4})', dec_date_raw)
if dec_date_match: dec_date = dec_date_match.group(1)
subject_div = decision_div.find(lambda tag: tag.name == 'div' and not tag.has_attr('class') and tag.get_text(strip=True).startswith("BAŞVURU KONUSU :"))
subject_text = subject_div.get_text(strip=True).replace("BAŞVURU KONUSU :", "").strip() if subject_div else ""
details_list: List[AnayasaBireyselReportDecisionDetail] = []
karar_detaylari_div = decision_div.find_next_sibling("div", id="KararDetaylari") # Corrected: was KararDetaylari
if karar_detaylari_div:
table = karar_detaylari_div.find("table", class_="table")
if table and table.find("tbody"):
for row in table.find("tbody").find_all("tr"):
cells = row.find_all("td")
if len(cells) == 4: # Hak, Müdahale İddiası, Sonuç, Giderim
details_list.append(AnayasaBireyselReportDecisionDetail(
hak=cells[0].get_text(strip=True) or "",
mudahale_iddiasi=cells[1].get_text(strip=True) or "",
sonuc=cells[2].get_text(strip=True) or "",
giderim=cells[3].get_text(strip=True) or "",
))
full_decision_page_url = urljoin(self.BASE_URL, url_path) if url_path else ""
processed_decisions.append(AnayasaBireyselReportDecisionSummary(
title=title_text,
decision_reference_no=ref_no,
decision_page_url=full_decision_page_url,
decision_type_summary=dec_type,
decision_making_body=body,
application_date_summary=app_date,
decision_date_summary=dec_date,
application_subject_summary=subject_text,
details=details_list
))
return AnayasaBireyselReportSearchResult(
decisions=processed_decisions,
total_records_found=total_records,
retrieved_page_number=params.page_to_fetch
)
def _convert_html_to_markdown_bireysel(self, full_decision_html_content: str) -> Optional[str]:
if not full_decision_html_content:
return None
processed_html = html.unescape(full_decision_html_content)
soup = BeautifulSoup(processed_html, "html.parser")
html_input_for_markdown = ""
karar_tab_content = soup.find("div", id="Karar")
if karar_tab_content:
karar_html_span = karar_tab_content.find("span", class_="kararHtml")
if karar_html_span:
word_section = karar_html_span.find("div", class_="WordSection1")
if word_section:
for s in word_section.select('script, style, .item.col-xs-12.col-sm-12, center:has(b)'):
s.decompose()
html_input_for_markdown = str(word_section)
else:
logger.warning("AnayasaBireyselBasvuruApiClient: WordSection1 not found in span.kararHtml. Using span.kararHtml content.")
for s in karar_html_span.select('script, style, .item.col-xs-12.col-sm-12, center:has(b)'):
s.decompose()
html_input_for_markdown = str(karar_html_span)
else:
logger.warning("AnayasaBireyselBasvuruApiClient: span.kararHtml not found in div#Karar. Using div#Karar content.")
for s in karar_tab_content.select('script, style, .item.col-xs-12.col-sm-12, center:has(b)'):
s.decompose()
html_input_for_markdown = str(karar_tab_content)
else:
logger.warning("AnayasaBireyselBasvuruApiClient: div#Karar (KARAR tab) not found. Trying WordSection1 fallback.")
word_section_fallback = soup.find("div", class_="WordSection1")
if word_section_fallback:
for s in word_section_fallback.select('script, style, .item.col-xs-12.col-sm-12, center:has(b)'):
s.decompose()
html_input_for_markdown = str(word_section_fallback)
else:
body_tag = soup.find("body")
if body_tag:
for s in body_tag.select('script, style, .item.col-xs-12.col-sm-12, center:has(b), .banner, .footer, .yazdirmaalani, .filtreler, .menu, .altmenu, .geri, .arabuton, .temizlebutonu, form#KararGetir, .TabBaslik, #KararDetaylari, .share-button-container'):
s.decompose()
html_input_for_markdown = str(body_tag)
else:
html_input_for_markdown = processed_html
markdown_text = None
try:
# Ensure the content is wrapped in basic HTML structure if it's not already
if not html_input_for_markdown.strip().lower().startswith(("<html", "<!doctype")):
html_content = f"<html><head><meta charset=\"UTF-8\"></head><body>{html_input_for_markdown}</body></html>"
else:
html_content = html_input_for_markdown
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_content.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
conversion_result = md_converter.convert(html_stream)
markdown_text = conversion_result.text_content
except Exception as e:
logger.error(f"AnayasaBireyselBasvuruApiClient: MarkItDown conversion error: {e}")
return markdown_text
async def get_decision_document_as_markdown(
self,
document_url_path: str, # e.g. /BB/2021/20295
page_number: int = 1
) -> AnayasaBireyselBasvuruDocumentMarkdown:
full_url = urljoin(self.BASE_URL, document_url_path)
logger.info(f"AnayasaBireyselBasvuruApiClient: Fetching Bireysel Başvuru document for Markdown (page {page_number}) from URL: {full_url}")
basvuru_no_from_page = None
karar_tarihi_from_page = None
basvuru_tarihi_from_page = None
karari_veren_birim_from_page = None
karar_turu_from_page = None
resmi_gazete_info_from_page = None
try:
response = await self.http_client.get(full_url)
response.raise_for_status()
html_content_from_api = response.text
if not isinstance(html_content_from_api, str) or not html_content_from_api.strip():
logger.warning(f"AnayasaBireyselBasvuruApiClient: Received empty HTML from {full_url}.")
return AnayasaBireyselBasvuruDocumentMarkdown(
source_url=full_url, markdown_chunk=None, current_page=page_number, total_pages=0, is_paginated=False
)
soup = BeautifulSoup(html_content_from_api, 'html.parser')
meta_desc_tag = soup.find("meta", attrs={"name": "description"})
if meta_desc_tag and meta_desc_tag.get("content"):
content = meta_desc_tag["content"]
bn_match = re.search(r"B\.\s*No:\s*([\d\/]+)", content)
if bn_match: basvuru_no_from_page = bn_match.group(1).strip()
date_match = re.search(r"(\d{1,2}\/\d{1,2}\/\d{4}),\s*§", content)
if date_match: karar_tarihi_from_page = date_match.group(1).strip()
karar_detaylari_tab = soup.find("div", id="KararDetaylari")
if karar_detaylari_tab:
table = karar_detaylari_tab.find("table", class_="table")
if table:
rows = table.find_all("tr")
for row in rows:
cells = row.find_all("td")
if len(cells) == 2:
key = cells[0].get_text(strip=True)
value = cells[1].get_text(strip=True)
if "Kararı Veren Birim" in key: karari_veren_birim_from_page = value
elif "Karar Türü (Başvuru Sonucu)" in key: karar_turu_from_page = value
elif "Başvuru No" in key and not basvuru_no_from_page: basvuru_no_from_page = value
elif "Başvuru Tarihi" in key: basvuru_tarihi_from_page = value
elif "Karar Tarihi" in key and not karar_tarihi_from_page: karar_tarihi_from_page = value
elif "Resmi Gazete Tarih / Sayı" in key: resmi_gazete_info_from_page = value
full_markdown_content = self._convert_html_to_markdown_bireysel(html_content_from_api)
if not full_markdown_content:
return AnayasaBireyselBasvuruDocumentMarkdown(
source_url=full_url,
basvuru_no_from_page=basvuru_no_from_page,
karar_tarihi_from_page=karar_tarihi_from_page,
basvuru_tarihi_from_page=basvuru_tarihi_from_page,
karari_veren_birim_from_page=karari_veren_birim_from_page,
karar_turu_from_page=karar_turu_from_page,
resmi_gazete_info_from_page=resmi_gazete_info_from_page,
markdown_chunk=None,
current_page=page_number,
total_pages=0,
is_paginated=False
)
content_length = len(full_markdown_content)
total_pages = math.ceil(content_length / self.DOCUMENT_MARKDOWN_CHUNK_SIZE)
if total_pages == 0: total_pages = 1
current_page_clamped = max(1, min(page_number, total_pages))
start_index = (current_page_clamped - 1) * self.DOCUMENT_MARKDOWN_CHUNK_SIZE
end_index = start_index + self.DOCUMENT_MARKDOWN_CHUNK_SIZE
markdown_chunk = full_markdown_content[start_index:end_index]
return AnayasaBireyselBasvuruDocumentMarkdown(
source_url=full_url,
basvuru_no_from_page=basvuru_no_from_page,
karar_tarihi_from_page=karar_tarihi_from_page,
basvuru_tarihi_from_page=basvuru_tarihi_from_page,
karari_veren_birim_from_page=karari_veren_birim_from_page,
karar_turu_from_page=karar_turu_from_page,
resmi_gazete_info_from_page=resmi_gazete_info_from_page,
markdown_chunk=markdown_chunk,
current_page=current_page_clamped,
total_pages=total_pages,
is_paginated=(total_pages > 1)
)
except httpx.RequestError as e:
logger.error(f"AnayasaBireyselBasvuruApiClient: HTTP error fetching Bireysel Başvuru document from {full_url}: {e}")
raise
except Exception as e:
logger.error(f"AnayasaBireyselBasvuruApiClient: General error processing Bireysel Başvuru document from {full_url}: {e}")
raise
async def close_client_session(self):
if hasattr(self, 'http_client') and self.http_client and not self.http_client.is_closed:
await self.http_client.aclose()
logger.info("AnayasaBireyselBasvuruApiClient: HTTP client session closed.")
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/anayasa_mcp_module/bireysel_client.py:
--------------------------------------------------------------------------------
```python
# anayasa_mcp_module/bireysel_client.py
# This client is for Bireysel Başvuru: https://kararlarbilgibankasi.anayasa.gov.tr
import httpx
from bs4 import BeautifulSoup, Tag
from typing import Dict, Any, List, Optional, Tuple
import logging
import html
import re
import io
from urllib.parse import urlencode, urljoin, quote
from markitdown import MarkItDown
import math # For math.ceil for pagination
from .models import (
AnayasaBireyselReportSearchRequest,
AnayasaBireyselReportDecisionDetail,
AnayasaBireyselReportDecisionSummary,
AnayasaBireyselReportSearchResult,
AnayasaBireyselBasvuruDocumentMarkdown, # Model for Bireysel Başvuru document
)
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
class AnayasaBireyselBasvuruApiClient:
BASE_URL = "https://kararlarbilgibankasi.anayasa.gov.tr"
SEARCH_PATH = "/Ara"
DOCUMENT_MARKDOWN_CHUNK_SIZE = 5000 # Character limit per page
def __init__(self, request_timeout: float = 60.0):
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
},
timeout=request_timeout,
verify=True,
follow_redirects=True
)
def _build_query_params_for_bireysel_report(self, params: AnayasaBireyselReportSearchRequest) -> List[Tuple[str, str]]:
query_params: List[Tuple[str, str]] = []
query_params.append(("KararBulteni", "1")) # Specific to this report type
if params.keywords:
for kw in params.keywords:
query_params.append(("KelimeAra[]", kw))
if params.page_to_fetch and params.page_to_fetch > 1:
query_params.append(("page", str(params.page_to_fetch)))
return query_params
async def search_bireysel_basvuru_report(
self,
params: AnayasaBireyselReportSearchRequest
) -> AnayasaBireyselReportSearchResult:
final_query_params = self._build_query_params_for_bireysel_report(params)
request_url = self.SEARCH_PATH
logger.info(f"AnayasaBireyselBasvuruApiClient: Performing Bireysel Başvuru Report search. Path: {request_url}, Params: {final_query_params}")
try:
response = await self.http_client.get(request_url, params=final_query_params)
response.raise_for_status()
html_content = response.text
except httpx.RequestError as e:
logger.error(f"AnayasaBireyselBasvuruApiClient: HTTP request error during Bireysel Başvuru Report search: {e}")
raise
except Exception as e:
logger.error(f"AnayasaBireyselBasvuruApiClient: Error processing Bireysel Başvuru Report search request: {e}")
raise
soup = BeautifulSoup(html_content, 'html.parser')
total_records = None
bulunan_karar_div = soup.find("div", class_="bulunankararsayisi")
if bulunan_karar_div:
match_records = re.search(r'(\d+)\s*Karar Bulundu', bulunan_karar_div.get_text(strip=True))
if match_records:
total_records = int(match_records.group(1))
processed_decisions: List[AnayasaBireyselReportDecisionSummary] = []
report_content_area = soup.find("div", class_="HaberBulteni")
if not report_content_area:
logger.warning("HaberBulteni div not found, attempting to parse decision divs from the whole page.")
report_content_area = soup
decision_divs = report_content_area.find_all("div", class_="KararBulteniBirKarar")
if not decision_divs:
logger.warning("No KararBulteniBirKarar divs found.")
for decision_div in decision_divs:
title_tag = decision_div.find("h4")
title_text = title_tag.get_text(strip=True) if title_tag and title_tag.strong else (title_tag.get_text(strip=True) if title_tag else "")
alti_cizili_div = decision_div.find("div", class_="AltiCizili")
ref_no, dec_type, body, app_date, dec_date, url_path = "", "", "", "", "", ""
if alti_cizili_div:
link_tag = alti_cizili_div.find("a", href=True)
if link_tag:
ref_no = link_tag.get_text(strip=True)
url_path = link_tag['href']
parts_text = alti_cizili_div.get_text(separator="|", strip=True)
parts = [part.strip() for part in parts_text.split("|")]
# Clean ref_no from the first part if it was extracted from link
if ref_no and parts and parts[0].strip().startswith(ref_no):
parts[0] = parts[0].replace(ref_no, "").strip()
if not parts[0]: parts.pop(0) # Remove empty string if ref_no was the only content
# Assign parts based on typical order, adjusting for missing ref_no at start
current_idx = 0
if not ref_no and len(parts) > current_idx and re.match(r"\d+/\d+", parts[current_idx]): # Check if first part is ref_no
ref_no = parts[current_idx]
current_idx += 1
dec_type = parts[current_idx] if len(parts) > current_idx else ""
current_idx += 1
body = parts[current_idx] if len(parts) > current_idx else ""
current_idx += 1
app_date_raw = parts[current_idx] if len(parts) > current_idx else ""
current_idx += 1
dec_date_raw = parts[current_idx] if len(parts) > current_idx else ""
if app_date_raw and "Başvuru Tarihi :" in app_date_raw:
app_date = app_date_raw.replace("Başvuru Tarihi :", "").strip()
elif app_date_raw: # If label is missing but format matches
app_date_match = re.search(r'(\d{1,2}/\d{1,2}/\d{4})', app_date_raw)
if app_date_match: app_date = app_date_match.group(1)
if dec_date_raw and "Karar Tarihi :" in dec_date_raw:
dec_date = dec_date_raw.replace("Karar Tarihi :", "").strip()
elif dec_date_raw: # If label is missing but format matches
dec_date_match = re.search(r'(\d{1,2}/\d{1,2}/\d{4})', dec_date_raw)
if dec_date_match: dec_date = dec_date_match.group(1)
subject_div = decision_div.find(lambda tag: tag.name == 'div' and not tag.has_attr('class') and tag.get_text(strip=True).startswith("BAŞVURU KONUSU :"))
subject_text = subject_div.get_text(strip=True).replace("BAŞVURU KONUSU :", "").strip() if subject_div else ""
details_list: List[AnayasaBireyselReportDecisionDetail] = []
karar_detaylari_div = decision_div.find_next_sibling("div", id="KararDetaylari") # Corrected: was KararDetaylari
if karar_detaylari_div:
table = karar_detaylari_div.find("table", class_="table")
if table and table.find("tbody"):
for row in table.find("tbody").find_all("tr"):
cells = row.find_all("td")
if len(cells) == 4: # Hak, Müdahale İddiası, Sonuç, Giderim
details_list.append(AnayasaBireyselReportDecisionDetail(
hak=cells[0].get_text(strip=True) or "",
mudahale_iddiasi=cells[1].get_text(strip=True) or "",
sonuc=cells[2].get_text(strip=True) or "",
giderim=cells[3].get_text(strip=True) or "",
))
full_decision_page_url = urljoin(self.BASE_URL, url_path) if url_path else ""
processed_decisions.append(AnayasaBireyselReportDecisionSummary(
title=title_text,
decision_reference_no=ref_no,
decision_page_url=full_decision_page_url,
decision_type_summary=dec_type,
decision_making_body=body,
application_date_summary=app_date,
decision_date_summary=dec_date,
application_subject_summary=subject_text,
details=details_list
))
return AnayasaBireyselReportSearchResult(
decisions=processed_decisions,
total_records_found=total_records,
retrieved_page_number=params.page_to_fetch
)
def _convert_html_to_markdown_bireysel(self, full_decision_html_content: str) -> Optional[str]:
if not full_decision_html_content:
return None
processed_html = html.unescape(full_decision_html_content)
soup = BeautifulSoup(processed_html, "html.parser")
html_input_for_markdown = ""
karar_tab_content = soup.find("div", id="Karar")
if karar_tab_content:
karar_html_span = karar_tab_content.find("span", class_="kararHtml")
if karar_html_span:
word_section = karar_html_span.find("div", class_="WordSection1")
if word_section:
for s in word_section.select('script, style, .item.col-xs-12.col-sm-12, center:has(b)'):
s.decompose()
html_input_for_markdown = str(word_section)
else:
logger.warning("AnayasaBireyselBasvuruApiClient: WordSection1 not found in span.kararHtml. Using span.kararHtml content.")
for s in karar_html_span.select('script, style, .item.col-xs-12.col-sm-12, center:has(b)'):
s.decompose()
html_input_for_markdown = str(karar_html_span)
else:
logger.warning("AnayasaBireyselBasvuruApiClient: span.kararHtml not found in div#Karar. Using div#Karar content.")
for s in karar_tab_content.select('script, style, .item.col-xs-12.col-sm-12, center:has(b)'):
s.decompose()
html_input_for_markdown = str(karar_tab_content)
else:
logger.warning("AnayasaBireyselBasvuruApiClient: div#Karar (KARAR tab) not found. Trying WordSection1 fallback.")
word_section_fallback = soup.find("div", class_="WordSection1")
if word_section_fallback:
for s in word_section_fallback.select('script, style, .item.col-xs-12.col-sm-12, center:has(b)'):
s.decompose()
html_input_for_markdown = str(word_section_fallback)
else:
body_tag = soup.find("body")
if body_tag:
for s in body_tag.select('script, style, .item.col-xs-12.col-sm-12, center:has(b), .banner, .footer, .yazdirmaalani, .filtreler, .menu, .altmenu, .geri, .arabuton, .temizlebutonu, form#KararGetir, .TabBaslik, #KararDetaylari, .share-button-container'):
s.decompose()
html_input_for_markdown = str(body_tag)
else:
html_input_for_markdown = processed_html
markdown_text = None
try:
# Ensure the content is wrapped in basic HTML structure if it's not already
if not html_input_for_markdown.strip().lower().startswith(("<html", "<!doctype")):
html_content = f"<html><head><meta charset=\"UTF-8\"></head><body>{html_input_for_markdown}</body></html>"
else:
html_content = html_input_for_markdown
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_content.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
conversion_result = md_converter.convert(html_stream)
markdown_text = conversion_result.text_content
except Exception as e:
logger.error(f"AnayasaBireyselBasvuruApiClient: MarkItDown conversion error: {e}")
return markdown_text
async def get_decision_document_as_markdown(
self,
document_url_path: str, # e.g. /BB/2021/20295
page_number: int = 1
) -> AnayasaBireyselBasvuruDocumentMarkdown:
full_url = urljoin(self.BASE_URL, document_url_path)
logger.info(f"AnayasaBireyselBasvuruApiClient: Fetching Bireysel Başvuru document for Markdown (page {page_number}) from URL: {full_url}")
basvuru_no_from_page = None
karar_tarihi_from_page = None
basvuru_tarihi_from_page = None
karari_veren_birim_from_page = None
karar_turu_from_page = None
resmi_gazete_info_from_page = None
try:
response = await self.http_client.get(full_url)
response.raise_for_status()
html_content_from_api = response.text
if not isinstance(html_content_from_api, str) or not html_content_from_api.strip():
logger.warning(f"AnayasaBireyselBasvuruApiClient: Received empty HTML from {full_url}.")
return AnayasaBireyselBasvuruDocumentMarkdown(
source_url=full_url, markdown_chunk=None, current_page=page_number, total_pages=0, is_paginated=False
)
soup = BeautifulSoup(html_content_from_api, 'html.parser')
meta_desc_tag = soup.find("meta", attrs={"name": "description"})
if meta_desc_tag and meta_desc_tag.get("content"):
content = meta_desc_tag["content"]
bn_match = re.search(r"B\.\s*No:\s*([\d\/]+)", content)
if bn_match: basvuru_no_from_page = bn_match.group(1).strip()
date_match = re.search(r"(\d{1,2}\/\d{1,2}\/\d{4}),\s*§", content)
if date_match: karar_tarihi_from_page = date_match.group(1).strip()
karar_detaylari_tab = soup.find("div", id="KararDetaylari")
if karar_detaylari_tab:
table = karar_detaylari_tab.find("table", class_="table")
if table:
rows = table.find_all("tr")
for row in rows:
cells = row.find_all("td")
if len(cells) == 2:
key = cells[0].get_text(strip=True)
value = cells[1].get_text(strip=True)
if "Kararı Veren Birim" in key: karari_veren_birim_from_page = value
elif "Karar Türü (Başvuru Sonucu)" in key: karar_turu_from_page = value
elif "Başvuru No" in key and not basvuru_no_from_page: basvuru_no_from_page = value
elif "Başvuru Tarihi" in key: basvuru_tarihi_from_page = value
elif "Karar Tarihi" in key and not karar_tarihi_from_page: karar_tarihi_from_page = value
elif "Resmi Gazete Tarih / Sayı" in key: resmi_gazete_info_from_page = value
full_markdown_content = self._convert_html_to_markdown_bireysel(html_content_from_api)
if not full_markdown_content:
return AnayasaBireyselBasvuruDocumentMarkdown(
source_url=full_url,
basvuru_no_from_page=basvuru_no_from_page,
karar_tarihi_from_page=karar_tarihi_from_page,
basvuru_tarihi_from_page=basvuru_tarihi_from_page,
karari_veren_birim_from_page=karari_veren_birim_from_page,
karar_turu_from_page=karar_turu_from_page,
resmi_gazete_info_from_page=resmi_gazete_info_from_page,
markdown_chunk=None,
current_page=page_number,
total_pages=0,
is_paginated=False
)
content_length = len(full_markdown_content)
total_pages = math.ceil(content_length / self.DOCUMENT_MARKDOWN_CHUNK_SIZE)
if total_pages == 0: total_pages = 1
current_page_clamped = max(1, min(page_number, total_pages))
start_index = (current_page_clamped - 1) * self.DOCUMENT_MARKDOWN_CHUNK_SIZE
end_index = start_index + self.DOCUMENT_MARKDOWN_CHUNK_SIZE
markdown_chunk = full_markdown_content[start_index:end_index]
return AnayasaBireyselBasvuruDocumentMarkdown(
source_url=full_url,
basvuru_no_from_page=basvuru_no_from_page,
karar_tarihi_from_page=karar_tarihi_from_page,
basvuru_tarihi_from_page=basvuru_tarihi_from_page,
karari_veren_birim_from_page=karari_veren_birim_from_page,
karar_turu_from_page=karar_turu_from_page,
resmi_gazete_info_from_page=resmi_gazete_info_from_page,
markdown_chunk=markdown_chunk,
current_page=current_page_clamped,
total_pages=total_pages,
is_paginated=(total_pages > 1)
)
except httpx.RequestError as e:
logger.error(f"AnayasaBireyselBasvuruApiClient: HTTP error fetching Bireysel Başvuru document from {full_url}: {e}")
raise
except Exception as e:
logger.error(f"AnayasaBireyselBasvuruApiClient: General error processing Bireysel Başvuru document from {full_url}: {e}")
raise
async def close_client_session(self):
if hasattr(self, 'http_client') and self.http_client and not self.http_client.is_closed:
await self.http_client.aclose()
logger.info("AnayasaBireyselBasvuruApiClient: HTTP client session closed.")
```
--------------------------------------------------------------------------------
/kik_mcp_module/client_v2.py:
--------------------------------------------------------------------------------
```python
# kik_mcp_module/client_v2.py
import httpx
import requests
import logging
import uuid
import base64
import ssl
from typing import Optional
from datetime import datetime
from .models_v2 import (
KikV2DecisionType, KikV2SearchPayload, KikV2SearchPayloadDk, KikV2SearchPayloadMk,
KikV2RequestData, KikV2QueryRequest, KikV2KeyValuePair,
KikV2SearchResponse, KikV2SearchResponseDk, KikV2SearchResponseMk,
KikV2SearchResult, KikV2CompactDecision, KikV2DocumentMarkdown
)
logger = logging.getLogger(__name__)
class KikV2ApiClient:
"""
New KIK v2 API Client for https://ekapv2.kik.gov.tr
This client uses the modern JSON-based API endpoint that provides
better structured data compared to the legacy form-based API.
"""
BASE_URL = "https://ekapv2.kik.gov.tr"
# Endpoint mappings for different decision types
ENDPOINTS = {
KikV2DecisionType.UYUSMAZLIK: "/b_ihalearaclari/api/KurulKararlari/GetKurulKararlari",
KikV2DecisionType.DUZENLEYICI: "/b_ihalearaclari/api/KurulKararlari/GetKurulKararlariDk",
KikV2DecisionType.MAHKEME: "/b_ihalearaclari/api/KurulKararlari/GetKurulKararlariMk"
}
def __init__(self, request_timeout: float = 60.0):
# Create SSL context with legacy server support
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Enable legacy server connect option for older SSL implementations (Python 3.12+)
if hasattr(ssl, 'OP_LEGACY_SERVER_CONNECT'):
ssl_context.options |= ssl.OP_LEGACY_SERVER_CONNECT
# Set broader cipher suite support including legacy ciphers
ssl_context.set_ciphers('ALL:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!PSK:!SRP:!CAMELLIA')
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
verify=ssl_context,
headers={
"Accept": "application/json",
"Accept-Language": "tr",
"Content-Type": "application/json",
"Origin": self.BASE_URL,
"Referer": f"{self.BASE_URL}/sorgulamalar/kurul-kararlari",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36",
"api-version": "v1",
"sec-ch-ua": '"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"macOS"'
},
timeout=request_timeout
)
# Generate security headers (these might need to be updated based on API requirements)
self.security_headers = self._generate_security_headers()
def _generate_security_headers(self) -> dict:
"""
Generate the custom security headers required by KIK v2 API.
These headers appear to be for request validation/encryption.
"""
# Generate a random GUID for each session
request_guid = str(uuid.uuid4())
# These are example values - in a real implementation, these might need
# to be calculated based on the request content or session
return {
"X-Custom-Request-Guid": request_guid,
"X-Custom-Request-R8id": "hwnOjsN8qdgtDw70x3sKkxab0rj2bQ8Uph4+C+oU+9AMmQqRN3eMOEEeet748DOf",
"X-Custom-Request-Siv": "p2IQRTitF8z7I39nBjdAqA==",
"X-Custom-Request-Ts": "1vB3Wwrt8YQ5U6t3XAzZ+Q=="
}
def _build_search_payload(self,
decision_type: KikV2DecisionType,
karar_metni: str = "",
karar_no: str = "",
basvuran: str = "",
idare_adi: str = "",
baslangic_tarihi: str = "",
bitis_tarihi: str = ""):
"""Build the search payload for KIK v2 API."""
key_value_pairs = []
# Add non-empty search criteria
if karar_metni:
key_value_pairs.append(KikV2KeyValuePair(key="KararMetni", value=karar_metni))
if karar_no:
key_value_pairs.append(KikV2KeyValuePair(key="KararNo", value=karar_no))
if basvuran:
key_value_pairs.append(KikV2KeyValuePair(key="BasvuranAdi", value=basvuran))
if idare_adi:
key_value_pairs.append(KikV2KeyValuePair(key="IdareAdi", value=idare_adi))
if baslangic_tarihi:
key_value_pairs.append(KikV2KeyValuePair(key="BaslangicTarihi", value=baslangic_tarihi))
if bitis_tarihi:
key_value_pairs.append(KikV2KeyValuePair(key="BitisTarihi", value=bitis_tarihi))
# If no search criteria provided, use a generic search
if not key_value_pairs:
key_value_pairs.append(KikV2KeyValuePair(key="KararMetni", value=""))
query_request = KikV2QueryRequest(keyValueOfstringanyType=key_value_pairs)
request_data = KikV2RequestData(keyValuePairs=query_request)
# Return appropriate payload based on decision type
if decision_type == KikV2DecisionType.UYUSMAZLIK:
return KikV2SearchPayload(sorgulaKurulKararlari=request_data)
elif decision_type == KikV2DecisionType.DUZENLEYICI:
return KikV2SearchPayloadDk(sorgulaKurulKararlariDk=request_data)
elif decision_type == KikV2DecisionType.MAHKEME:
return KikV2SearchPayloadMk(sorgulaKurulKararlariMk=request_data)
else:
raise ValueError(f"Unsupported decision type: {decision_type}")
async def search_decisions(self,
decision_type: KikV2DecisionType = KikV2DecisionType.UYUSMAZLIK,
karar_metni: str = "",
karar_no: str = "",
basvuran: str = "",
idare_adi: str = "",
baslangic_tarihi: str = "",
bitis_tarihi: str = "") -> KikV2SearchResult:
"""
Search KIK decisions using the v2 API.
Args:
decision_type: Type of decision to search (uyusmazlik/duzenleyici/mahkeme)
karar_metni: Decision text search
karar_no: Decision number (e.g., "2025/UH.II-1801")
basvuran: Applicant name
idare_adi: Administration name
baslangic_tarihi: Start date (YYYY-MM-DD format)
bitis_tarihi: End date (YYYY-MM-DD format)
Returns:
KikV2SearchResult with compact decision list
"""
logger.info(f"KikV2ApiClient: Searching {decision_type.value} decisions with criteria - karar_metni: '{karar_metni}', karar_no: '{karar_no}', basvuran: '{basvuran}'")
try:
# Build request payload
payload = self._build_search_payload(
decision_type=decision_type,
karar_metni=karar_metni,
karar_no=karar_no,
basvuran=basvuran,
idare_adi=idare_adi,
baslangic_tarihi=baslangic_tarihi,
bitis_tarihi=bitis_tarihi
)
# Update security headers for this request
headers = {**self.http_client.headers, **self._generate_security_headers()}
# Get the appropriate endpoint for this decision type
endpoint = self.ENDPOINTS[decision_type]
# Make API request
response = await self.http_client.post(
endpoint,
json=payload.model_dump(),
headers=headers
)
response.raise_for_status()
response_data = response.json()
logger.debug(f"KikV2ApiClient: Raw API response structure: {type(response_data)}")
# Parse the API response based on decision type
if decision_type == KikV2DecisionType.UYUSMAZLIK:
api_response = KikV2SearchResponse(**response_data)
result_data = api_response.SorgulaKurulKararlariResponse.SorgulaKurulKararlariResult
elif decision_type == KikV2DecisionType.DUZENLEYICI:
api_response = KikV2SearchResponseDk(**response_data)
result_data = api_response.SorgulaKurulKararlariDkResponse.SorgulaKurulKararlariDkResult
elif decision_type == KikV2DecisionType.MAHKEME:
api_response = KikV2SearchResponseMk(**response_data)
result_data = api_response.SorgulaKurulKararlariMkResponse.SorgulaKurulKararlariMkResult
else:
raise ValueError(f"Unsupported decision type: {decision_type}")
# Check for API errors
if result_data.hataKodu and result_data.hataKodu != "0":
logger.warning(f"KikV2ApiClient: API returned error - Code: {result_data.hataKodu}, Message: {result_data.hataMesaji}")
return KikV2SearchResult(
decisions=[],
total_records=0,
page=1,
error_code=result_data.hataKodu,
error_message=result_data.hataMesaji
)
# Convert to compact format
compact_decisions = []
total_count = 0
for decision_group in result_data.KurulKararTutanakDetayListesi:
for decision_detail in decision_group.KurulKararTutanakDetayi:
compact_decision = KikV2CompactDecision(
kararNo=decision_detail.kararNo,
kararTarihi=decision_detail.kararTarihi,
basvuran=decision_detail.basvuran,
idareAdi=decision_detail.idareAdi,
basvuruKonusu=decision_detail.basvuruKonusu,
gundemMaddesiId=decision_detail.gundemMaddesiId,
decision_type=decision_type.value
)
compact_decisions.append(compact_decision)
total_count += 1
logger.info(f"KikV2ApiClient: Found {total_count} decisions")
return KikV2SearchResult(
decisions=compact_decisions,
total_records=total_count,
page=1,
error_code="0",
error_message=""
)
except httpx.HTTPStatusError as e:
logger.error(f"KikV2ApiClient: HTTP error during search: {e.response.status_code} - {e.response.text}")
return KikV2SearchResult(
decisions=[],
total_records=0,
page=1,
error_code="HTTP_ERROR",
error_message=f"HTTP {e.response.status_code}: {e.response.text}"
)
except Exception as e:
logger.error(f"KikV2ApiClient: Unexpected error during search: {str(e)}")
return KikV2SearchResult(
decisions=[],
total_records=0,
page=1,
error_code="UNEXPECTED_ERROR",
error_message=str(e)
)
async def get_document_markdown(self, document_id: str) -> KikV2DocumentMarkdown:
"""
Get KİK decision document content in Markdown format.
This method uses a two-step process:
1. Call GetSorgulamaUrl endpoint to get the actual document URL
2. Use Playwright to navigate to that URL and extract content
Args:
document_id: The gundemMaddesiId from search results
Returns:
KikV2DocumentMarkdown with document content converted to Markdown
"""
logger.info(f"KikV2ApiClient: Getting document for ID: {document_id}")
if not document_id or not document_id.strip():
return KikV2DocumentMarkdown(
document_id=document_id,
kararNo="",
markdown_content="",
source_url="",
error_message="Document ID is required"
)
try:
# Step 1: Get the actual document URL using GetSorgulamaUrl endpoint
logger.info(f"KikV2ApiClient: Step 1 - Getting document URL for ID: {document_id}")
# Update security headers for this request
headers = {**self.http_client.headers, **self._generate_security_headers()}
# Call GetSorgulamaUrl to get the real document URL
url_payload = {"sorguSayfaTipi": 2} # As shown in curl example
url_response = await self.http_client.post(
"/b_ihalearaclari/api/KurulKararlari/GetSorgulamaUrl",
json=url_payload,
headers=headers
)
url_response.raise_for_status()
url_data = url_response.json()
# Get the base document URL from API response
base_document_url = url_data.get("sorgulamaUrl", "")
if not base_document_url:
return KikV2DocumentMarkdown(
document_id=document_id,
kararNo="",
markdown_content="",
source_url="",
error_message="Could not get document URL from GetSorgulamaUrl API"
)
# Construct full document URL with the actual document ID
document_url = f"{base_document_url}?KararId={document_id}"
logger.info(f"KikV2ApiClient: Step 2 - Retrieved document URL: {document_url}")
except Exception as e:
logger.error(f"KikV2ApiClient: Error getting document URL for ID {document_id}: {str(e)}")
# Fallback to old method if GetSorgulamaUrl fails
document_url = f"https://ekap.kik.gov.tr/EKAP/Vatandas/KurulKararGoster.aspx?KararId={document_id}"
logger.info(f"KikV2ApiClient: Falling back to direct URL: {document_url}")
try:
# Step 2: Use Playwright to get the actual document content
logger.info(f"KikV2ApiClient: Step 2 - Using Playwright to retrieve document from: {document_url}")
try:
from playwright.async_api import async_playwright
async with async_playwright() as p:
# Launch browser
browser = await p.chromium.launch(
headless=True,
args=['--no-sandbox', '--disable-dev-shm-usage']
)
page = await browser.new_page(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36"
)
# Navigate to document page with longer timeout for JS loading
await page.goto(document_url, wait_until="networkidle", timeout=15000)
# Wait for the document content to load (KİK pages might need more time for JS execution)
await page.wait_for_timeout(3000)
# Wait for Angular/Zone.js to finish loading and document to be ready
try:
# Wait for Angular zone to be available (this JavaScript code you showed)
await page.wait_for_function(
"typeof Zone !== 'undefined' && Zone.current",
timeout=10000
)
# Wait for network to be idle after Angular bootstrap
await page.wait_for_load_state("networkidle", timeout=10000)
# Wait for specific document content to appear
await page.wait_for_function(
"""
document.body.textContent.length > 5000 &&
(document.body.textContent.includes('Karar') ||
document.body.textContent.includes('KURUL') ||
document.body.textContent.includes('Gündem') ||
document.body.textContent.includes('Toplantı'))
""",
timeout=15000
)
logger.info("KikV2ApiClient: Angular document content loaded successfully")
except Exception as e:
logger.warning(f"KikV2ApiClient: Angular content loading timed out, proceeding anyway: {str(e)}")
# Give a bit more time for any remaining content to load
await page.wait_for_timeout(5000)
# Get page content
html_content = await page.content()
await browser.close()
logger.info(f"KikV2ApiClient: Retrieved content via Playwright, length: {len(html_content)}")
except ImportError:
logger.info("KikV2ApiClient: Playwright not available, falling back to httpx")
# Fallback to httpx
response = await self.http_client.get(
document_url,
headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "tr,en-US;q=0.5",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36",
"Referer": "https://ekap.kik.gov.tr/",
"Cache-Control": "no-cache"
}
)
response.raise_for_status()
html_content = response.text
# Convert HTML to Markdown using MarkItDown with BytesIO
try:
from markitdown import MarkItDown
from io import BytesIO
md = MarkItDown()
html_bytes = html_content.encode('utf-8')
html_stream = BytesIO(html_bytes)
result = md.convert_stream(html_stream, file_extension=".html")
markdown_content = result.text_content
return KikV2DocumentMarkdown(
document_id=document_id,
kararNo="",
markdown_content=markdown_content,
source_url=document_url,
error_message=""
)
except ImportError:
return KikV2DocumentMarkdown(
document_id=document_id,
kararNo="",
markdown_content="MarkItDown library not available",
source_url=document_url,
error_message="MarkItDown library not installed"
)
except Exception as e:
logger.error(f"KikV2ApiClient: Error retrieving document {document_id}: {str(e)}")
return KikV2DocumentMarkdown(
document_id=document_id,
kararNo="",
markdown_content="",
source_url=document_url,
error_message=str(e)
)
async def close_client_session(self):
"""Close HTTP client session."""
await self.http_client.aclose()
logger.info("KikV2ApiClient: HTTP client session closed.")
```
--------------------------------------------------------------------------------
/anayasa_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# anayasa_mcp_module/client.py
# This client is for Norm Denetimi: https://normkararlarbilgibankasi.anayasa.gov.tr
import httpx
from bs4 import BeautifulSoup
from typing import Dict, Any, List, Optional, Tuple
import logging
import html
import re
import io
from urllib.parse import urlencode, urljoin, quote
from markitdown import MarkItDown
import math # For math.ceil for pagination
from .models import (
AnayasaNormDenetimiSearchRequest,
AnayasaDecisionSummary,
AnayasaReviewedNormInfo,
AnayasaSearchResult,
AnayasaDocumentMarkdown, # Model for Norm Denetimi document
)
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
class AnayasaMahkemesiApiClient:
BASE_URL = "https://normkararlarbilgibankasi.anayasa.gov.tr"
SEARCH_PATH_SEGMENT = "Ara"
DOCUMENT_MARKDOWN_CHUNK_SIZE = 5000 # Character limit per page
def __init__(self, request_timeout: float = 60.0):
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
},
timeout=request_timeout,
verify=True,
follow_redirects=True
)
def _build_search_query_params_for_aym(self, params: AnayasaNormDenetimiSearchRequest) -> List[Tuple[str, str]]:
query_params: List[Tuple[str, str]] = []
if params.keywords_all:
for kw in params.keywords_all: query_params.append(("KelimeAra[]", kw))
if params.keywords_any:
for kw in params.keywords_any: query_params.append(("HerhangiBirKelimeAra[]", kw))
if params.keywords_exclude:
for kw in params.keywords_exclude: query_params.append(("BulunmayanKelimeAra[]", kw))
if params.period and params.period and params.period != "ALL": query_params.append(("Donemler_id", params.period))
if params.case_number_esas: query_params.append(("EsasNo", params.case_number_esas))
if params.decision_number_karar: query_params.append(("KararNo", params.decision_number_karar))
if params.first_review_date_start: query_params.append(("IlkIncelemeTarihiIlk", params.first_review_date_start))
if params.first_review_date_end: query_params.append(("IlkIncelemeTarihiSon", params.first_review_date_end))
if params.decision_date_start: query_params.append(("KararTarihiIlk", params.decision_date_start))
if params.decision_date_end: query_params.append(("KararTarihiSon", params.decision_date_end))
if params.application_type and params.application_type and params.application_type != "ALL": query_params.append(("BasvuruTurler_id", params.application_type))
if params.applicant_general_name: query_params.append(("BasvuranGeneller_id", params.applicant_general_name))
if params.applicant_specific_name: query_params.append(("BasvuranOzeller_id", params.applicant_specific_name))
if params.attending_members_names:
for name in params.attending_members_names: query_params.append(("Uyeler_id[]", name))
if params.rapporteur_name: query_params.append(("Raportorler_id", params.rapporteur_name))
if params.norm_type and params.norm_type and params.norm_type != "ALL": query_params.append(("NormunTurler_id", params.norm_type))
if params.norm_id_or_name: query_params.append(("NormunNumarasiAdlar_id", params.norm_id_or_name))
if params.norm_article: query_params.append(("NormunMaddeNumarasi", params.norm_article))
if params.review_outcomes:
for outcome_val in params.review_outcomes:
if outcome_val and outcome_val != "ALL": query_params.append(("IncelemeTuruKararSonuclar_id[]", outcome_val))
if params.reason_for_final_outcome and params.reason_for_final_outcome and params.reason_for_final_outcome != "ALL":
query_params.append(("KararSonucununGerekcesi", params.reason_for_final_outcome))
if params.basis_constitution_article_numbers:
for article_no in params.basis_constitution_article_numbers: query_params.append(("DayanakHukmu[]", article_no))
if params.official_gazette_date_start: query_params.append(("ResmiGazeteTarihiIlk", params.official_gazette_date_start))
if params.official_gazette_date_end: query_params.append(("ResmiGazeteTarihiSon", params.official_gazette_date_end))
if params.official_gazette_number_start: query_params.append(("ResmiGazeteSayisiIlk", params.official_gazette_number_start))
if params.official_gazette_number_end: query_params.append(("ResmiGazeteSayisiSon", params.official_gazette_number_end))
if params.has_press_release and params.has_press_release and params.has_press_release != "ALL": query_params.append(("BasinDuyurusu", params.has_press_release))
if params.has_dissenting_opinion and params.has_dissenting_opinion and params.has_dissenting_opinion != "ALL": query_params.append(("KarsiOy", params.has_dissenting_opinion))
if params.has_different_reasoning and params.has_different_reasoning and params.has_different_reasoning != "ALL": query_params.append(("FarkliGerekce", params.has_different_reasoning))
# Add pagination and sorting parameters as query params instead of URL path
if params.results_per_page and params.results_per_page != 10:
query_params.append(("SatirSayisi", str(params.results_per_page)))
if params.sort_by_criteria and params.sort_by_criteria != "KararTarihi":
query_params.append(("Siralama", params.sort_by_criteria))
if params.page_to_fetch and params.page_to_fetch > 1:
query_params.append(("page", str(params.page_to_fetch)))
return query_params
async def search_norm_denetimi_decisions(
self,
params: AnayasaNormDenetimiSearchRequest
) -> AnayasaSearchResult:
# Use simple /Ara endpoint - the complex path structure seems to cause 404s
request_path = f"/{self.SEARCH_PATH_SEGMENT}"
final_query_params = self._build_search_query_params_for_aym(params)
logger.info(f"AnayasaMahkemesiApiClient: Performing Norm Denetimi search. Path: {request_path}, Params: {final_query_params}")
try:
response = await self.http_client.get(request_path, params=final_query_params)
response.raise_for_status()
html_content = response.text
except httpx.RequestError as e:
logger.error(f"AnayasaMahkemesiApiClient: HTTP request error during Norm Denetimi search: {e}")
raise
except Exception as e:
logger.error(f"AnayasaMahkemesiApiClient: Error processing Norm Denetimi search request: {e}")
raise
soup = BeautifulSoup(html_content, 'html.parser')
total_records = None
bulunan_karar_div = soup.find("div", class_="bulunankararsayisi")
if not bulunan_karar_div: # Fallback for mobile view
bulunan_karar_div = soup.find("div", class_="bulunankararsayisiMobil")
if bulunan_karar_div:
match_records = re.search(r'(\d+)\s*Karar Bulundu', bulunan_karar_div.get_text(strip=True))
if match_records:
total_records = int(match_records.group(1))
processed_decisions: List[AnayasaDecisionSummary] = []
decision_divs = soup.find_all("div", class_="birkarar")
for decision_div in decision_divs:
link_tag = decision_div.find("a", href=True)
doc_url_path = link_tag['href'] if link_tag else None
decision_page_url_str = urljoin(self.BASE_URL, doc_url_path) if doc_url_path else None
title_div = decision_div.find("div", class_="bkararbaslik")
ek_no_text_raw = title_div.get_text(strip=True, separator=" ").replace('\xa0', ' ') if title_div else ""
ek_no_match = re.search(r"(E\.\s*\d+/\d+\s*,\s*K\.\s*\d+/\d+)", ek_no_text_raw)
ek_no_text = ek_no_match.group(1) if ek_no_match else ek_no_text_raw.split("Sayılı Karar")[0].strip()
keyword_count_div = title_div.find("div", class_="BulunanKelimeSayisi") if title_div else None
keyword_count_text = keyword_count_div.get_text(strip=True).replace("Bulunan Kelime Sayısı", "").strip() if keyword_count_div else None
keyword_count = int(keyword_count_text) if keyword_count_text and keyword_count_text.isdigit() else None
info_div = decision_div.find("div", class_="kararbilgileri")
info_parts = [part.strip() for part in info_div.get_text(separator="|").split("|")] if info_div else []
app_type_summary = info_parts[0] if len(info_parts) > 0 else None
applicant_summary = info_parts[1] if len(info_parts) > 1 else None
outcome_summary = info_parts[2] if len(info_parts) > 2 else None
dec_date_raw = info_parts[3] if len(info_parts) > 3 else None
decision_date_summary = dec_date_raw.replace("Karar Tarihi:", "").strip() if dec_date_raw else None
reviewed_norms_list: List[AnayasaReviewedNormInfo] = []
details_table_container = decision_div.find_next_sibling("div", class_=re.compile(r"col-sm-12")) # The details table is in a sibling div
if details_table_container:
details_table = details_table_container.find("table", class_="table")
if details_table and details_table.find("tbody"):
for row in details_table.find("tbody").find_all("tr"):
cells = row.find_all("td")
if len(cells) == 6:
reviewed_norms_list.append(AnayasaReviewedNormInfo(
norm_name_or_number=cells[0].get_text(strip=True) or None,
article_number=cells[1].get_text(strip=True) or None,
review_type_and_outcome=cells[2].get_text(strip=True) or None,
outcome_reason=cells[3].get_text(strip=True) or None,
basis_constitution_articles_cited=[a.strip() for a in cells[4].get_text(strip=True).split(',') if a.strip()] if cells[4].get_text(strip=True) else [],
postponement_period=cells[5].get_text(strip=True) or None
))
processed_decisions.append(AnayasaDecisionSummary(
decision_reference_no=ek_no_text,
decision_page_url=decision_page_url_str,
keywords_found_count=keyword_count,
application_type_summary=app_type_summary,
applicant_summary=applicant_summary,
decision_outcome_summary=outcome_summary,
decision_date_summary=decision_date_summary,
reviewed_norms=reviewed_norms_list
))
return AnayasaSearchResult(
decisions=processed_decisions,
total_records_found=total_records,
retrieved_page_number=params.page_to_fetch
)
def _convert_html_to_markdown_norm_denetimi(self, full_decision_html_content: str) -> Optional[str]:
"""Converts direct HTML content from an Anayasa Mahkemesi Norm Denetimi decision page to Markdown."""
if not full_decision_html_content:
return None
processed_html = html.unescape(full_decision_html_content)
soup = BeautifulSoup(processed_html, "html.parser")
html_input_for_markdown = ""
karar_tab_content = soup.find("div", id="Karar") # "KARAR" tab content
if karar_tab_content:
karar_metni_div = karar_tab_content.find("div", class_="KararMetni")
if karar_metni_div:
# Remove scripts and styles
for script_tag in karar_metni_div.find_all("script"): script_tag.decompose()
for style_tag in karar_metni_div.find_all("style"): style_tag.decompose()
# Remove "Künye Kopyala" button and other non-content divs
for item_div in karar_metni_div.find_all("div", class_="item col-sm-12"): item_div.decompose()
for modal_div in karar_metni_div.find_all("div", class_="modal fade"): modal_div.decompose() # If any modals
word_section = karar_metni_div.find("div", class_="WordSection1")
html_input_for_markdown = str(word_section) if word_section else str(karar_metni_div)
else:
html_input_for_markdown = str(karar_tab_content)
else:
# Fallback if specific structure is not found
word_section_fallback = soup.find("div", class_="WordSection1")
if word_section_fallback:
html_input_for_markdown = str(word_section_fallback)
else:
# Last resort: use the whole body or the raw HTML
body_tag = soup.find("body")
html_input_for_markdown = str(body_tag) if body_tag else processed_html
markdown_text = None
try:
# Ensure the content is wrapped in basic HTML structure if it's not already
if not html_input_for_markdown.strip().lower().startswith(("<html", "<!doctype")):
html_content = f"<html><head><meta charset=\"UTF-8\"></head><body>{html_input_for_markdown}</body></html>"
else:
html_content = html_input_for_markdown
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_content.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
conversion_result = md_converter.convert(html_stream)
markdown_text = conversion_result.text_content
except Exception as e:
logger.error(f"AnayasaMahkemesiApiClient: MarkItDown conversion error: {e}")
return markdown_text
async def get_decision_document_as_markdown(
self,
document_url: str,
page_number: int = 1
) -> AnayasaDocumentMarkdown:
"""
Retrieves a specific Anayasa Mahkemesi (Norm Denetimi) decision,
converts its content to Markdown, and returns the requested page/chunk.
"""
full_url = urljoin(self.BASE_URL, document_url) if not document_url.startswith("http") else document_url
logger.info(f"AnayasaMahkemesiApiClient: Fetching Norm Denetimi document for Markdown (page {page_number}) from URL: {full_url}")
decision_ek_no_from_page = None
decision_date_from_page = None
official_gazette_from_page = None
try:
# Use a new client instance for document fetching if headers/timeout needs to be different,
# or reuse self.http_client if settings are compatible. For now, self.http_client.
get_response = await self.http_client.get(full_url, headers={"Accept": "text/html"})
get_response.raise_for_status()
html_content_from_api = get_response.text
if not isinstance(html_content_from_api, str) or not html_content_from_api.strip():
logger.warning(f"AnayasaMahkemesiApiClient: Received empty or non-string HTML from URL {full_url}.")
return AnayasaDocumentMarkdown(
source_url=full_url, markdown_chunk=None, current_page=page_number, total_pages=0, is_paginated=False
)
# Extract metadata from the page content (E.K. No, Date, RG)
soup = BeautifulSoup(html_content_from_api, "html.parser")
karar_metni_div = soup.find("div", class_="KararMetni") # Usually within div#Karar
if not karar_metni_div: # Fallback if not in KararMetni
karar_metni_div = soup.find("div", class_="WordSection1")
# Initialize with empty string defaults
decision_ek_no_from_page = ""
decision_date_from_page = ""
official_gazette_from_page = ""
if karar_metni_div:
# Attempt to find E.K. No (Esas No, Karar No)
# Norm Denetimi pages often have this in bold <p> tags directly or in the WordSection1
# Look for patterns like "Esas No.: YYYY/NN" and "Karar No.: YYYY/NN"
esas_no_tag = karar_metni_div.find(lambda tag: tag.name == "p" and tag.find("b") and "Esas No.:" in tag.find("b").get_text())
karar_no_tag = karar_metni_div.find(lambda tag: tag.name == "p" and tag.find("b") and "Karar No.:" in tag.find("b").get_text())
karar_tarihi_tag = karar_metni_div.find(lambda tag: tag.name == "p" and tag.find("b") and "Karar tarihi:" in tag.find("b").get_text()) # Less common on Norm pages
resmi_gazete_tag = karar_metni_div.find(lambda tag: tag.name == "p" and ("Resmî Gazete tarih ve sayısı:" in tag.get_text() or "Resmi Gazete tarih/sayı:" in tag.get_text()))
if esas_no_tag and esas_no_tag.find("b") and karar_no_tag and karar_no_tag.find("b"):
esas_str = esas_no_tag.find("b").get_text(strip=True).replace('Esas No.:', '').strip()
karar_str = karar_no_tag.find("b").get_text(strip=True).replace('Karar No.:', '').strip()
decision_ek_no_from_page = f"E.{esas_str}, K.{karar_str}"
if karar_tarihi_tag and karar_tarihi_tag.find("b"):
decision_date_from_page = karar_tarihi_tag.find("b").get_text(strip=True).replace("Karar tarihi:", "").strip()
elif karar_metni_div: # Fallback for Karar Tarihi if not in specific tag
date_match = re.search(r"Karar Tarihi\s*:\s*([\d\.]+)", karar_metni_div.get_text()) # Norm pages often use DD.MM.YYYY
if date_match: decision_date_from_page = date_match.group(1).strip()
if resmi_gazete_tag:
# Try to get the bold part first if it exists
bold_rg_tag = resmi_gazete_tag.find("b")
rg_text_content = bold_rg_tag.get_text(strip=True) if bold_rg_tag else resmi_gazete_tag.get_text(strip=True)
official_gazette_from_page = rg_text_content.replace("Resmî Gazete tarih ve sayısı:", "").replace("Resmi Gazete tarih/sayı:", "").strip()
full_markdown_content = self._convert_html_to_markdown_norm_denetimi(html_content_from_api)
if not full_markdown_content:
return AnayasaDocumentMarkdown(
source_url=full_url,
decision_reference_no_from_page=decision_ek_no_from_page,
decision_date_from_page=decision_date_from_page,
official_gazette_info_from_page=official_gazette_from_page,
markdown_chunk=None,
current_page=page_number,
total_pages=0,
is_paginated=False
)
content_length = len(full_markdown_content)
total_pages = math.ceil(content_length / self.DOCUMENT_MARKDOWN_CHUNK_SIZE)
if total_pages == 0: total_pages = 1
current_page_clamped = max(1, min(page_number, total_pages))
start_index = (current_page_clamped - 1) * self.DOCUMENT_MARKDOWN_CHUNK_SIZE
end_index = start_index + self.DOCUMENT_MARKDOWN_CHUNK_SIZE
markdown_chunk = full_markdown_content[start_index:end_index]
return AnayasaDocumentMarkdown(
source_url=full_url,
decision_reference_no_from_page=decision_ek_no_from_page,
decision_date_from_page=decision_date_from_page,
official_gazette_info_from_page=official_gazette_from_page,
markdown_chunk=markdown_chunk,
current_page=current_page_clamped,
total_pages=total_pages,
is_paginated=(total_pages > 1)
)
except httpx.RequestError as e:
logger.error(f"AnayasaMahkemesiApiClient: HTTP error fetching Norm Denetimi document from {full_url}: {e}")
raise
except Exception as e:
logger.error(f"AnayasaMahkemesiApiClient: General error processing Norm Denetimi document from {full_url}: {e}")
raise
async def close_client_session(self):
if hasattr(self, 'http_client') and self.http_client and not self.http_client.is_closed:
await self.http_client.aclose()
logger.info("AnayasaMahkemesiApiClient (Norm Denetimi): HTTP client session closed.")
```