This is page 3 of 8. Use http://codebase.md/saidsurucu/yargi-mcp?lines=false&page={x} to view the full context.
# Directory Structure
```
├── __main__.py
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│ └── workflows
│ └── publish.yml
├── .gitignore
├── .serena
│ ├── .gitignore
│ └── project.yml
├── 5ire-settings.png
├── analyze_kik_hash_generation.py
├── anayasa_mcp_module
│ ├── __init__.py
│ ├── bireysel_client.py
│ ├── client.py
│ ├── models.py
│ └── unified_client.py
├── asgi_app.py
├── bddk_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── bedesten_mcp_module
│ ├── __init__.py
│ ├── client.py
│ ├── enums.py
│ └── models.py
├── check_response_format.py
├── CLAUDE.md
├── danistay_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── docker-compose.yml
├── Dockerfile
├── docs
│ └── DEPLOYMENT.md
├── emsal_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── example_fastapi_app.py
├── fly-no-auth.toml
├── fly.toml
├── kik_mcp_module
│ ├── __init__.py
│ ├── client_v2.py
│ ├── client.py
│ ├── models_v2.py
│ └── models.py
├── kvkk_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── LICENSE
├── mcp_auth
│ ├── __init__.py
│ ├── clerk_config.py
│ ├── middleware.py
│ ├── oauth.py
│ ├── policy.py
│ └── storage.py
├── mcp_auth_factory.py
├── mcp_auth_http_adapter.py
├── mcp_auth_http_simple.py
├── mcp_server_main.py
├── nginx.conf
├── ornek.png
├── Procfile
├── pyproject.toml
├── railway.json
├── README.md
├── redis_session_store.py
├── rekabet_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── requirements.txt
├── run_asgi.py
├── saidsurucu-yargi-mcp-f5fa007
│ ├── __main__.py
│ ├── .dockerignore
│ ├── .env.example
│ ├── .gitattributes
│ ├── .github
│ │ └── workflows
│ │ └── publish.yml
│ ├── .gitignore
│ ├── 5ire-settings.png
│ ├── anayasa_mcp_module
│ │ ├── __init__.py
│ │ ├── bireysel_client.py
│ │ ├── client.py
│ │ ├── models.py
│ │ └── unified_client.py
│ ├── asgi_app.py
│ ├── bddk_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── bedesten_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── enums.py
│ │ └── models.py
│ ├── check_response_format.py
│ ├── danistay_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── docker-compose.yml
│ ├── Dockerfile
│ ├── docs
│ │ └── DEPLOYMENT.md
│ ├── emsal_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── example_fastapi_app.py
│ ├── kik_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── kvkk_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── LICENSE
│ ├── mcp_auth
│ │ ├── __init__.py
│ │ ├── clerk_config.py
│ │ ├── middleware.py
│ │ ├── oauth.py
│ │ ├── policy.py
│ │ └── storage.py
│ ├── mcp_auth_factory.py
│ ├── mcp_auth_http_adapter.py
│ ├── mcp_auth_http_simple.py
│ ├── mcp_server_main.py
│ ├── nginx.conf
│ ├── ornek.png
│ ├── Procfile
│ ├── pyproject.toml
│ ├── railway.json
│ ├── README.md
│ ├── redis_session_store.py
│ ├── rekabet_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── run_asgi.py
│ ├── sayistay_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── enums.py
│ │ ├── models.py
│ │ └── unified_client.py
│ ├── starlette_app.py
│ ├── stripe_webhook.py
│ ├── uyusmazlik_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ └── yargitay_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── sayistay_mcp_module
│ ├── __init__.py
│ ├── client.py
│ ├── enums.py
│ ├── models.py
│ └── unified_client.py
├── starlette_app.py
├── stripe_webhook.py
├── uv.lock
├── uyusmazlik_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
└── yargitay_mcp_module
├── __init__.py
├── client.py
└── models.py
```
# Files
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/bedesten_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# bedesten_mcp_module/client.py
import httpx
import base64
from typing import Optional
import logging
from markitdown import MarkItDown
import io
from .models import (
BedestenSearchRequest, BedestenSearchResponse,
BedestenDocumentRequest, BedestenDocumentResponse,
BedestenDocumentMarkdown, BedestenDocumentRequestData
)
from .enums import get_full_birim_adi
logger = logging.getLogger(__name__)
class BedestenApiClient:
"""
API Client for Bedesten (bedesten.adalet.gov.tr) - Alternative legal decision search system.
Currently used for Yargıtay decisions, but can be extended for other court types.
"""
BASE_URL = "https://bedesten.adalet.gov.tr"
SEARCH_ENDPOINT = "/emsal-karar/searchDocuments"
DOCUMENT_ENDPOINT = "/emsal-karar/getDocumentContent"
def __init__(self, request_timeout: float = 60.0):
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Accept": "*/*",
"Accept-Language": "tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7",
"AdaletApplicationName": "UyapMevzuat",
"Content-Type": "application/json; charset=utf-8",
"Origin": "https://mevzuat.adalet.gov.tr",
"Referer": "https://mevzuat.adalet.gov.tr/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-site",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"
},
timeout=request_timeout
)
async def search_documents(self, search_request: BedestenSearchRequest) -> BedestenSearchResponse:
"""
Search for documents using Bedesten API.
Currently supports: YARGITAYKARARI, DANISTAYKARARI, YERELHUKMAHKARARI, etc.
"""
logger.info(f"BedestenApiClient: Searching documents with phrase: {search_request.data.phrase}")
# Map abbreviated birimAdi to full Turkish name before sending to API
original_birim_adi = search_request.data.birimAdi
mapped_birim_adi = get_full_birim_adi(original_birim_adi)
search_request.data.birimAdi = mapped_birim_adi
if original_birim_adi != "ALL":
logger.info(f"BedestenApiClient: Mapped birimAdi '{original_birim_adi}' to '{mapped_birim_adi}'")
try:
# Create request dict and remove birimAdi if empty
request_dict = search_request.model_dump()
if not request_dict["data"]["birimAdi"]: # Remove if empty string
del request_dict["data"]["birimAdi"]
response = await self.http_client.post(
self.SEARCH_ENDPOINT,
json=request_dict
)
response.raise_for_status()
response_json = response.json()
# Parse and return the response
return BedestenSearchResponse(**response_json)
except httpx.RequestError as e:
logger.error(f"BedestenApiClient: HTTP request error during search: {e}")
raise
except Exception as e:
logger.error(f"BedestenApiClient: Error processing search response: {e}")
raise
async def get_document_as_markdown(self, document_id: str) -> BedestenDocumentMarkdown:
"""
Get document content and convert to markdown.
Handles both HTML (text/html) and PDF (application/pdf) content types.
"""
logger.info(f"BedestenApiClient: Fetching document for markdown conversion (ID: {document_id})")
try:
# Prepare request
doc_request = BedestenDocumentRequest(
data=BedestenDocumentRequestData(documentId=document_id)
)
# Get document
response = await self.http_client.post(
self.DOCUMENT_ENDPOINT,
json=doc_request.model_dump()
)
response.raise_for_status()
response_json = response.json()
doc_response = BedestenDocumentResponse(**response_json)
# Decode base64 content
content_bytes = base64.b64decode(doc_response.data.content)
mime_type = doc_response.data.mimeType
logger.info(f"BedestenApiClient: Document mime type: {mime_type}")
# Convert to markdown based on mime type
if mime_type == "text/html":
html_content = content_bytes.decode('utf-8')
markdown_content = self._convert_html_to_markdown(html_content)
elif mime_type == "application/pdf":
markdown_content = self._convert_pdf_to_markdown(content_bytes)
else:
logger.warning(f"Unsupported mime type: {mime_type}")
markdown_content = f"Unsupported content type: {mime_type}. Unable to convert to markdown."
return BedestenDocumentMarkdown(
documentId=document_id,
markdown_content=markdown_content,
source_url=f"{self.BASE_URL}/document/{document_id}",
mime_type=mime_type
)
except httpx.RequestError as e:
logger.error(f"BedestenApiClient: HTTP error fetching document {document_id}: {e}")
raise
except Exception as e:
logger.error(f"BedestenApiClient: Error processing document {document_id}: {e}")
raise
def _convert_html_to_markdown(self, html_content: str) -> Optional[str]:
"""Convert HTML to Markdown using MarkItDown"""
if not html_content:
return None
try:
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_content.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
result = md_converter.convert(html_stream)
markdown_content = result.text_content
logger.info("Successfully converted HTML to Markdown")
return markdown_content
except Exception as e:
logger.error(f"Error converting HTML to Markdown: {e}")
return f"Error converting HTML content: {str(e)}"
def _convert_pdf_to_markdown(self, pdf_bytes: bytes) -> Optional[str]:
"""Convert PDF to Markdown using MarkItDown"""
if not pdf_bytes:
return None
try:
# Create BytesIO stream from PDF bytes
pdf_stream = io.BytesIO(pdf_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
result = md_converter.convert(pdf_stream)
markdown_content = result.text_content
logger.info("Successfully converted PDF to Markdown")
return markdown_content
except Exception as e:
logger.error(f"Error converting PDF to Markdown: {e}")
return f"Error converting PDF content: {str(e)}. The document may be corrupted or in an unsupported format."
async def close_client_session(self):
"""Close HTTP client session"""
await self.http_client.aclose()
logger.info("BedestenApiClient: HTTP client session closed.")
```
--------------------------------------------------------------------------------
/docs/DEPLOYMENT.md:
--------------------------------------------------------------------------------
```markdown
# Yargı MCP Server Dağıtım Rehberi
Bu rehber, Yargı MCP Server'ın ASGI web servisi olarak çeşitli dağıtım seçeneklerini kapsar.
## İçindekiler
- [Hızlı Başlangıç](#hızlı-başlangıç)
- [Yerel Geliştirme](#yerel-geliştirme)
- [Production Dağıtımı](#production-dağıtımı)
- [Cloud Dağıtımı](#cloud-dağıtımı)
- [Docker Dağıtımı](#docker-dağıtımı)
- [Güvenlik Hususları](#güvenlik-hususları)
- [İzleme](#izleme)
## Hızlı Başlangıç
### 1. Bağımlılıkları Yükleyin
```bash
# ASGI sunucusu için uvicorn yükleyin
pip install uvicorn
# Veya tüm bağımlılıklarla birlikte yükleyin
pip install -e .
pip install uvicorn
```
### 2. Sunucuyu Çalıştırın
```bash
# Temel başlatma
python run_asgi.py
# Veya doğrudan uvicorn ile
uvicorn asgi_app:app --host 0.0.0.0 --port 8000
```
Sunucu şu adreslerde kullanılabilir olacak:
- MCP Endpoint: `http://localhost:8000/mcp/`
- Sağlık Kontrolü: `http://localhost:8000/health`
- API Durumu: `http://localhost:8000/status`
## Yerel Geliştirme
### Otomatik Yeniden Yükleme ile Geliştirme Sunucusu
```bash
python run_asgi.py --reload --log-level debug
```
### FastAPI Entegrasyonunu Kullanma
Ek REST API endpoint'leri için:
```bash
uvicorn fastapi_app:app --reload
```
Bu şunları sağlar:
- `/docs` adresinde interaktif API dokümantasyonu
- `/api/tools` adresinde araç listesi
- `/api/databases` adresinde veritabanı bilgileri
### Ortam Değişkenleri
`.env.example` dosyasını temel alarak bir `.env` dosyası oluşturun:
```bash
cp .env.example .env
```
Temel değişkenler:
- `HOST`: Sunucu host adresi (varsayılan: 127.0.0.1)
- `PORT`: Sunucu portu (varsayılan: 8000)
- `ALLOWED_ORIGINS`: CORS kökenleri (virgülle ayrılmış)
- `LOG_LEVEL`: Log seviyesi (debug, info, warning, error)
## Production Dağıtımı
### 1. Uvicorn ile Çoklu Worker Kullanımı
```bash
python run_asgi.py --host 0.0.0.0 --port 8000 --workers 4
```
### 2. Gunicorn Kullanımı
```bash
pip install gunicorn
gunicorn asgi_app:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
```
### 3. Nginx Reverse Proxy ile
1. Nginx'i yükleyin
2. Sağlanan `nginx.conf` dosyasını kullanın:
```bash
sudo cp nginx.conf /etc/nginx/sites-available/yargi-mcp
sudo ln -s /etc/nginx/sites-available/yargi-mcp /etc/nginx/sites-enabled/
sudo nginx -t
sudo systemctl reload nginx
```
### 4. Systemd Servisi
`/etc/systemd/system/yargi-mcp.service` dosyasını oluşturun:
```ini
[Unit]
Description=Yargı MCP Server
After=network.target
[Service]
Type=exec
User=www-data
WorkingDirectory=/opt/yargi-mcp
Environment="PATH=/opt/yargi-mcp/venv/bin"
ExecStart=/opt/yargi-mcp/venv/bin/uvicorn asgi_app:app --host 0.0.0.0 --port 8000 --workers 4
Restart=on-failure
RestartSec=5
[Install]
WantedBy=multi-user.target
```
Etkinleştirin ve başlatın:
```bash
sudo systemctl enable yargi-mcp
sudo systemctl start yargi-mcp
```
## Cloud Dağıtımı
### Heroku
1. `Procfile` oluşturun:
```
web: uvicorn asgi_app:app --host 0.0.0.0 --port $PORT
```
2. Dağıtın:
```bash
heroku create uygulama-isminiz
git push heroku main
```
### Railway
1. `railway.json` ekleyin:
```json
{
"build": {
"builder": "NIXPACKS"
},
"deploy": {
"startCommand": "uvicorn asgi_app:app --host 0.0.0.0 --port $PORT"
}
}
```
2. Railway CLI veya GitHub entegrasyonu ile dağıtın
### Google Cloud Run
1. Container oluşturun:
```bash
docker build -t yargi-mcp .
docker tag yargi-mcp gcr.io/PROJE_ADINIZ/yargi-mcp
docker push gcr.io/PROJE_ADINIZ/yargi-mcp
```
2. Dağıtın:
```bash
gcloud run deploy yargi-mcp \
--image gcr.io/PROJE_ADINIZ/yargi-mcp \
--platform managed \
--region us-central1 \
--allow-unauthenticated
```
### AWS Lambda (Mangum kullanarak)
1. Mangum'u yükleyin:
```bash
pip install mangum
```
2. `lambda_handler.py` oluşturun:
```python
from mangum import Mangum
from asgi_app import app
handler = Mangum(app, lifespan="off")
```
3. AWS SAM veya Serverless Framework kullanarak dağıtın
## Docker Dağıtımı
### Tek Container
```bash
# Oluşturun
docker build -t yargi-mcp .
# Çalıştırın
docker run -p 8000:8000 --env-file .env yargi-mcp
```
### Docker Compose
```bash
# Geliştirme
docker-compose up
# Nginx ile Production
docker-compose --profile production up
# Redis önbellekleme ile
docker-compose --profile with-cache up
```
### Kubernetes
Deployment YAML oluşturun:
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: yargi-mcp
spec:
replicas: 3
selector:
matchLabels:
app: yargi-mcp
template:
metadata:
labels:
app: yargi-mcp
spec:
containers:
- name: yargi-mcp
image: yargi-mcp:latest
ports:
- containerPort: 8000
env:
- name: HOST
value: "0.0.0.0"
- name: PORT
value: "8000"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
name: yargi-mcp-service
spec:
selector:
app: yargi-mcp
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
```
## Güvenlik Hususları
### 1. Kimlik Doğrulama
`API_TOKEN` ortam değişkenini ayarlayarak token kimlik doğrulamasını etkinleştirin:
```bash
export API_TOKEN=gizli-token-degeri
```
Ardından isteklere ekleyin:
```bash
curl -H "Authorization: Bearer gizli-token-degeri" http://localhost:8000/api/tools
```
### 2. HTTPS/SSL
Production için her zaman HTTPS kullanın:
1. SSL sertifikası edinin (Let's Encrypt vb.)
2. Nginx veya cloud sağlayıcıda yapılandırın
3. `ALLOWED_ORIGINS` değerini https:// kullanacak şekilde güncelleyin
### 3. Rate Limiting (Hız Sınırlama)
Sağlanan Nginx yapılandırması rate limiting içerir:
- API endpoint'leri: 10 istek/saniye
- MCP endpoint: 100 istek/saniye
### 4. CORS Yapılandırması
Production için belirli kaynaklara izin verin:
```bash
ALLOWED_ORIGINS=https://app.sizindomain.com,https://www.sizindomain.com
```
## İzleme
### Sağlık Kontrolleri
`/health` endpoint'ini izleyin:
```bash
curl http://localhost:8000/health
```
Yanıt:
```json
{
"status": "healthy",
"timestamp": "2024-12-26T10:00:00",
"uptime_seconds": 3600,
"tools_operational": true
}
```
### Loglama
Ortam değişkeni ile log seviyesini yapılandırın:
```bash
LOG_LEVEL=info # veya debug, warning, error
```
Loglar şuraya yazılır:
- Konsol (stdout)
- `logs/mcp_server.log` dosyası
### Metrikler (Opsiyonel)
OpenTelemetry desteği için:
```bash
pip install opentelemetry-instrumentation-fastapi
```
Ortam değişkenlerini ayarlayın:
```bash
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
OTEL_SERVICE_NAME=yargi-mcp-server
```
## Sorun Giderme
### Port Zaten Kullanımda
```bash
# 8000 portunu kullanan işlemi bulun
lsof -i :8000
# İşlemi sonlandırın
kill -9 <PID>
```
### İzin Hataları
Dosya izinlerinin doğru olduğundan emin olun:
```bash
chmod +x run_asgi.py
chown -R www-data:www-data /opt/yargi-mcp
```
### Bellek Sorunları
Büyük belge işleme için worker belleğini artırın:
```bash
# systemd servisinde
Environment="PYTHONMALLOC=malloc"
LimitNOFILE=65536
```
### Zaman Aşımı Sorunları
Zaman aşımlarını ayarlayın:
1. Uvicorn: `--timeout-keep-alive 75`
2. Nginx: `proxy_read_timeout 300s;`
3. Cloud sağlayıcılar: Platform özel zaman aşımı ayarlarını kontrol edin
## Performans Ayarlama
### 1. Worker İşlemleri
- Geliştirme: 1 worker
- Production: CPU çekirdeği başına 2-4 worker
### 2. Bağlantı Havuzlama
Sunucu varsayılan olarak httpx ile bağlantı havuzlama kullanır.
### 3. Önbellekleme (Gelecek Geliştirme)
Redis önbellekleme docker-compose ile etkinleştirilebilir:
```bash
docker-compose --profile with-cache up
```
### 4. Veritabanı Zaman Aşımları
`.env` dosyasında veritabanı başına zaman aşımlarını ayarlayın:
```bash
YARGITAY_TIMEOUT=60
DANISTAY_TIMEOUT=60
ANAYASA_TIMEOUT=90
```
## Destek
Sorunlar ve sorular için:
- GitHub Issues: https://github.com/saidsurucu/yargi-mcp/issues
- Dokümantasyon: README.md dosyasına bakın
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/docs/DEPLOYMENT.md:
--------------------------------------------------------------------------------
```markdown
# Yargı MCP Server Dağıtım Rehberi
Bu rehber, Yargı MCP Server'ın ASGI web servisi olarak çeşitli dağıtım seçeneklerini kapsar.
## İçindekiler
- [Hızlı Başlangıç](#hızlı-başlangıç)
- [Yerel Geliştirme](#yerel-geliştirme)
- [Production Dağıtımı](#production-dağıtımı)
- [Cloud Dağıtımı](#cloud-dağıtımı)
- [Docker Dağıtımı](#docker-dağıtımı)
- [Güvenlik Hususları](#güvenlik-hususları)
- [İzleme](#izleme)
## Hızlı Başlangıç
### 1. Bağımlılıkları Yükleyin
```bash
# ASGI sunucusu için uvicorn yükleyin
pip install uvicorn
# Veya tüm bağımlılıklarla birlikte yükleyin
pip install -e .
pip install uvicorn
```
### 2. Sunucuyu Çalıştırın
```bash
# Temel başlatma
python run_asgi.py
# Veya doğrudan uvicorn ile
uvicorn asgi_app:app --host 0.0.0.0 --port 8000
```
Sunucu şu adreslerde kullanılabilir olacak:
- MCP Endpoint: `http://localhost:8000/mcp/`
- Sağlık Kontrolü: `http://localhost:8000/health`
- API Durumu: `http://localhost:8000/status`
## Yerel Geliştirme
### Otomatik Yeniden Yükleme ile Geliştirme Sunucusu
```bash
python run_asgi.py --reload --log-level debug
```
### FastAPI Entegrasyonunu Kullanma
Ek REST API endpoint'leri için:
```bash
uvicorn fastapi_app:app --reload
```
Bu şunları sağlar:
- `/docs` adresinde interaktif API dokümantasyonu
- `/api/tools` adresinde araç listesi
- `/api/databases` adresinde veritabanı bilgileri
### Ortam Değişkenleri
`.env.example` dosyasını temel alarak bir `.env` dosyası oluşturun:
```bash
cp .env.example .env
```
Temel değişkenler:
- `HOST`: Sunucu host adresi (varsayılan: 127.0.0.1)
- `PORT`: Sunucu portu (varsayılan: 8000)
- `ALLOWED_ORIGINS`: CORS kökenleri (virgülle ayrılmış)
- `LOG_LEVEL`: Log seviyesi (debug, info, warning, error)
## Production Dağıtımı
### 1. Uvicorn ile Çoklu Worker Kullanımı
```bash
python run_asgi.py --host 0.0.0.0 --port 8000 --workers 4
```
### 2. Gunicorn Kullanımı
```bash
pip install gunicorn
gunicorn asgi_app:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
```
### 3. Nginx Reverse Proxy ile
1. Nginx'i yükleyin
2. Sağlanan `nginx.conf` dosyasını kullanın:
```bash
sudo cp nginx.conf /etc/nginx/sites-available/yargi-mcp
sudo ln -s /etc/nginx/sites-available/yargi-mcp /etc/nginx/sites-enabled/
sudo nginx -t
sudo systemctl reload nginx
```
### 4. Systemd Servisi
`/etc/systemd/system/yargi-mcp.service` dosyasını oluşturun:
```ini
[Unit]
Description=Yargı MCP Server
After=network.target
[Service]
Type=exec
User=www-data
WorkingDirectory=/opt/yargi-mcp
Environment="PATH=/opt/yargi-mcp/venv/bin"
ExecStart=/opt/yargi-mcp/venv/bin/uvicorn asgi_app:app --host 0.0.0.0 --port 8000 --workers 4
Restart=on-failure
RestartSec=5
[Install]
WantedBy=multi-user.target
```
Etkinleştirin ve başlatın:
```bash
sudo systemctl enable yargi-mcp
sudo systemctl start yargi-mcp
```
## Cloud Dağıtımı
### Heroku
1. `Procfile` oluşturun:
```
web: uvicorn asgi_app:app --host 0.0.0.0 --port $PORT
```
2. Dağıtın:
```bash
heroku create uygulama-isminiz
git push heroku main
```
### Railway
1. `railway.json` ekleyin:
```json
{
"build": {
"builder": "NIXPACKS"
},
"deploy": {
"startCommand": "uvicorn asgi_app:app --host 0.0.0.0 --port $PORT"
}
}
```
2. Railway CLI veya GitHub entegrasyonu ile dağıtın
### Google Cloud Run
1. Container oluşturun:
```bash
docker build -t yargi-mcp .
docker tag yargi-mcp gcr.io/PROJE_ADINIZ/yargi-mcp
docker push gcr.io/PROJE_ADINIZ/yargi-mcp
```
2. Dağıtın:
```bash
gcloud run deploy yargi-mcp \
--image gcr.io/PROJE_ADINIZ/yargi-mcp \
--platform managed \
--region us-central1 \
--allow-unauthenticated
```
### AWS Lambda (Mangum kullanarak)
1. Mangum'u yükleyin:
```bash
pip install mangum
```
2. `lambda_handler.py` oluşturun:
```python
from mangum import Mangum
from asgi_app import app
handler = Mangum(app, lifespan="off")
```
3. AWS SAM veya Serverless Framework kullanarak dağıtın
## Docker Dağıtımı
### Tek Container
```bash
# Oluşturun
docker build -t yargi-mcp .
# Çalıştırın
docker run -p 8000:8000 --env-file .env yargi-mcp
```
### Docker Compose
```bash
# Geliştirme
docker-compose up
# Nginx ile Production
docker-compose --profile production up
# Redis önbellekleme ile
docker-compose --profile with-cache up
```
### Kubernetes
Deployment YAML oluşturun:
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: yargi-mcp
spec:
replicas: 3
selector:
matchLabels:
app: yargi-mcp
template:
metadata:
labels:
app: yargi-mcp
spec:
containers:
- name: yargi-mcp
image: yargi-mcp:latest
ports:
- containerPort: 8000
env:
- name: HOST
value: "0.0.0.0"
- name: PORT
value: "8000"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
name: yargi-mcp-service
spec:
selector:
app: yargi-mcp
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
```
## Güvenlik Hususları
### 1. Kimlik Doğrulama
`API_TOKEN` ortam değişkenini ayarlayarak token kimlik doğrulamasını etkinleştirin:
```bash
export API_TOKEN=gizli-token-degeri
```
Ardından isteklere ekleyin:
```bash
curl -H "Authorization: Bearer gizli-token-degeri" http://localhost:8000/api/tools
```
### 2. HTTPS/SSL
Production için her zaman HTTPS kullanın:
1. SSL sertifikası edinin (Let's Encrypt vb.)
2. Nginx veya cloud sağlayıcıda yapılandırın
3. `ALLOWED_ORIGINS` değerini https:// kullanacak şekilde güncelleyin
### 3. Rate Limiting (Hız Sınırlama)
Sağlanan Nginx yapılandırması rate limiting içerir:
- API endpoint'leri: 10 istek/saniye
- MCP endpoint: 100 istek/saniye
### 4. CORS Yapılandırması
Production için belirli kaynaklara izin verin:
```bash
ALLOWED_ORIGINS=https://app.sizindomain.com,https://www.sizindomain.com
```
## İzleme
### Sağlık Kontrolleri
`/health` endpoint'ini izleyin:
```bash
curl http://localhost:8000/health
```
Yanıt:
```json
{
"status": "healthy",
"timestamp": "2024-12-26T10:00:00",
"uptime_seconds": 3600,
"tools_operational": true
}
```
### Loglama
Ortam değişkeni ile log seviyesini yapılandırın:
```bash
LOG_LEVEL=info # veya debug, warning, error
```
Loglar şuraya yazılır:
- Konsol (stdout)
- `logs/mcp_server.log` dosyası
### Metrikler (Opsiyonel)
OpenTelemetry desteği için:
```bash
pip install opentelemetry-instrumentation-fastapi
```
Ortam değişkenlerini ayarlayın:
```bash
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
OTEL_SERVICE_NAME=yargi-mcp-server
```
## Sorun Giderme
### Port Zaten Kullanımda
```bash
# 8000 portunu kullanan işlemi bulun
lsof -i :8000
# İşlemi sonlandırın
kill -9 <PID>
```
### İzin Hataları
Dosya izinlerinin doğru olduğundan emin olun:
```bash
chmod +x run_asgi.py
chown -R www-data:www-data /opt/yargi-mcp
```
### Bellek Sorunları
Büyük belge işleme için worker belleğini artırın:
```bash
# systemd servisinde
Environment="PYTHONMALLOC=malloc"
LimitNOFILE=65536
```
### Zaman Aşımı Sorunları
Zaman aşımlarını ayarlayın:
1. Uvicorn: `--timeout-keep-alive 75`
2. Nginx: `proxy_read_timeout 300s;`
3. Cloud sağlayıcılar: Platform özel zaman aşımı ayarlarını kontrol edin
## Performans Ayarlama
### 1. Worker İşlemleri
- Geliştirme: 1 worker
- Production: CPU çekirdeği başına 2-4 worker
### 2. Bağlantı Havuzlama
Sunucu varsayılan olarak httpx ile bağlantı havuzlama kullanır.
### 3. Önbellekleme (Gelecek Geliştirme)
Redis önbellekleme docker-compose ile etkinleştirilebilir:
```bash
docker-compose --profile with-cache up
```
### 4. Veritabanı Zaman Aşımları
`.env` dosyasında veritabanı başına zaman aşımlarını ayarlayın:
```bash
YARGITAY_TIMEOUT=60
DANISTAY_TIMEOUT=60
ANAYASA_TIMEOUT=90
```
## Destek
Sorunlar ve sorular için:
- GitHub Issues: https://github.com/saidsurucu/yargi-mcp/issues
- Dokümantasyon: README.md dosyasına bakın
```
--------------------------------------------------------------------------------
/emsal_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# emsal_mcp_module/client.py
import httpx
# from bs4 import BeautifulSoup # Uncomment if needed for advanced HTML pre-processing
from typing import Dict, Any, List, Optional
import logging
import html
import re
import io
from markitdown import MarkItDown
from .models import (
EmsalSearchRequest,
EmsalDetailedSearchRequestData,
EmsalApiResponse,
EmsalDocumentMarkdown
)
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
class EmsalApiClient:
"""API Client for Emsal (UYAP Precedent Decision) search system."""
BASE_URL = "https://emsal.uyap.gov.tr"
DETAILED_SEARCH_ENDPOINT = "/aramadetaylist"
DOCUMENT_ENDPOINT = "/getDokuman"
def __init__(self, request_timeout: float = 30.0):
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Content-Type": "application/json; charset=UTF-8",
"Accept": "application/json, text/plain, */*",
"X-Requested-With": "XMLHttpRequest",
},
timeout=request_timeout,
verify=False # As per user's original FastAPI code
)
async def search_detailed_decisions(
self,
params: EmsalSearchRequest
) -> EmsalApiResponse:
"""Performs a detailed search for Emsal decisions."""
data_for_api_payload = EmsalDetailedSearchRequestData(
arananKelime=params.keyword or "",
Bam_Hukuk_Mahkemeleri=params.selected_bam_civil_court, # Uses alias "Bam Hukuk Mahkemeleri"
Hukuk_Mahkemeleri=params.selected_civil_court, # Uses alias "Hukuk Mahkemeleri"
birimHukukMah="+".join(params.selected_regional_civil_chambers) if params.selected_regional_civil_chambers else "",
esasYil=params.case_year_esas or "",
esasIlkSiraNo=params.case_start_seq_esas or "",
esasSonSiraNo=params.case_end_seq_esas or "",
kararYil=params.decision_year_karar or "",
kararIlkSiraNo=params.decision_start_seq_karar or "",
kararSonSiraNo=params.decision_end_seq_karar or "",
baslangicTarihi=params.start_date or "",
bitisTarihi=params.end_date or "",
siralama=params.sort_criteria,
siralamaDirection=params.sort_direction,
pageSize=params.page_size,
pageNumber=params.page_number
)
# Create request dict and remove empty string fields to avoid API issues
payload_dict = data_for_api_payload.model_dump(by_alias=True, exclude_none=True)
# Remove empty string fields that might cause API issues
cleaned_payload = {k: v for k, v in payload_dict.items() if v != ""}
final_payload = {"data": cleaned_payload}
logger.info(f"EmsalApiClient: Performing DETAILED search with payload: {final_payload}")
return await self._execute_api_search(self.DETAILED_SEARCH_ENDPOINT, final_payload)
async def _execute_api_search(self, endpoint: str, payload: Dict) -> EmsalApiResponse:
"""Helper method to execute search POST request and process response for Emsal."""
try:
response = await self.http_client.post(endpoint, json=payload)
response.raise_for_status()
response_json_data = response.json()
logger.debug(f"EmsalApiClient: Raw API response from {endpoint}: {response_json_data}")
api_response_parsed = EmsalApiResponse(**response_json_data)
if api_response_parsed.data and api_response_parsed.data.data:
for decision_item in api_response_parsed.data.data:
if decision_item.id:
decision_item.document_url = f"{self.BASE_URL}{self.DOCUMENT_ENDPOINT}?id={decision_item.id}"
return api_response_parsed
except httpx.RequestError as e:
logger.error(f"EmsalApiClient: HTTP request error during Emsal search to {endpoint}: {e}")
raise
except Exception as e:
logger.error(f"EmsalApiClient: Error processing or validating Emsal search response from {endpoint}: {e}")
raise
def _clean_html_and_convert_to_markdown_emsal(self, html_content_from_api_data_field: str) -> Optional[str]:
"""
Cleans HTML (from Emsal API 'data' field containing HTML string)
and converts it to Markdown using MarkItDown.
This assumes Emsal /getDokuman response is JSON with HTML in "data" field,
similar to Yargitay and the last Emsal /getDokuman example.
"""
if not html_content_from_api_data_field:
return None
# Basic HTML unescaping and fixing common escaped characters
# Based on user's original fix_html_content in app/routers/emsal.py
content = html.unescape(html_content_from_api_data_field)
content = content.replace('\\"', '"')
content = content.replace('\\r\\n', '\n')
content = content.replace('\\n', '\n')
content = content.replace('\\t', '\t')
# The HTML string from "data" field starts with "<html><head>..."
html_input_for_markdown = content
markdown_text = None
try:
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_input_for_markdown.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
conversion_result = md_converter.convert(html_stream)
markdown_text = conversion_result.text_content
logger.info("EmsalApiClient: HTML to Markdown conversion successful.")
except Exception as e:
logger.error(f"EmsalApiClient: Error during MarkItDown HTML to Markdown conversion for Emsal: {e}")
return markdown_text
async def get_decision_document_as_markdown(self, id: str) -> EmsalDocumentMarkdown:
"""
Retrieves a specific Emsal decision by ID and returns its content as Markdown.
Assumes Emsal /getDokuman endpoint returns JSON with HTML content in the 'data' field.
"""
document_api_url = f"{self.DOCUMENT_ENDPOINT}?id={id}"
source_url = f"{self.BASE_URL}{document_api_url}"
logger.info(f"EmsalApiClient: Fetching Emsal document for Markdown (ID: {id}) from {source_url}")
try:
response = await self.http_client.get(document_api_url)
response.raise_for_status()
# Emsal /getDokuman returns JSON with HTML in 'data' field (confirmed by user example)
response_json = response.json()
html_content_from_api = response_json.get("data")
if not isinstance(html_content_from_api, str) or not html_content_from_api.strip():
logger.warning(f"EmsalApiClient: Received empty or non-string HTML in 'data' field for Emsal ID {id}.")
return EmsalDocumentMarkdown(id=id, markdown_content=None, source_url=source_url)
markdown_content = self._clean_html_and_convert_to_markdown_emsal(html_content_from_api)
return EmsalDocumentMarkdown(
id=id,
markdown_content=markdown_content,
source_url=source_url
)
except httpx.RequestError as e:
logger.error(f"EmsalApiClient: HTTP error fetching Emsal document (ID: {id}): {e}")
raise
except ValueError as e:
logger.error(f"EmsalApiClient: ValueError processing Emsal document response (ID: {id}): {e}")
raise
except Exception as e:
logger.error(f"EmsalApiClient: General error processing Emsal document (ID: {id}): {e}")
raise
async def close_client_session(self):
"""Closes the HTTPX client session."""
if self.http_client and not self.http_client.is_closed:
await self.http_client.aclose()
logger.info("EmsalApiClient: HTTP client session closed.")
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/emsal_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# emsal_mcp_module/client.py
import httpx
# from bs4 import BeautifulSoup # Uncomment if needed for advanced HTML pre-processing
from typing import Dict, Any, List, Optional
import logging
import html
import re
import io
from markitdown import MarkItDown
from .models import (
EmsalSearchRequest,
EmsalDetailedSearchRequestData,
EmsalApiResponse,
EmsalDocumentMarkdown
)
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
class EmsalApiClient:
"""API Client for Emsal (UYAP Precedent Decision) search system."""
BASE_URL = "https://emsal.uyap.gov.tr"
DETAILED_SEARCH_ENDPOINT = "/aramadetaylist"
DOCUMENT_ENDPOINT = "/getDokuman"
def __init__(self, request_timeout: float = 30.0):
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Content-Type": "application/json; charset=UTF-8",
"Accept": "application/json, text/plain, */*",
"X-Requested-With": "XMLHttpRequest",
},
timeout=request_timeout,
verify=False # As per user's original FastAPI code
)
async def search_detailed_decisions(
self,
params: EmsalSearchRequest
) -> EmsalApiResponse:
"""Performs a detailed search for Emsal decisions."""
data_for_api_payload = EmsalDetailedSearchRequestData(
arananKelime=params.keyword or "",
Bam_Hukuk_Mahkemeleri=params.selected_bam_civil_court, # Uses alias "Bam Hukuk Mahkemeleri"
Hukuk_Mahkemeleri=params.selected_civil_court, # Uses alias "Hukuk Mahkemeleri"
birimHukukMah="+".join(params.selected_regional_civil_chambers) if params.selected_regional_civil_chambers else "",
esasYil=params.case_year_esas or "",
esasIlkSiraNo=params.case_start_seq_esas or "",
esasSonSiraNo=params.case_end_seq_esas or "",
kararYil=params.decision_year_karar or "",
kararIlkSiraNo=params.decision_start_seq_karar or "",
kararSonSiraNo=params.decision_end_seq_karar or "",
baslangicTarihi=params.start_date or "",
bitisTarihi=params.end_date or "",
siralama=params.sort_criteria,
siralamaDirection=params.sort_direction,
pageSize=params.page_size,
pageNumber=params.page_number
)
# Create request dict and remove empty string fields to avoid API issues
payload_dict = data_for_api_payload.model_dump(by_alias=True, exclude_none=True)
# Remove empty string fields that might cause API issues
cleaned_payload = {k: v for k, v in payload_dict.items() if v != ""}
final_payload = {"data": cleaned_payload}
logger.info(f"EmsalApiClient: Performing DETAILED search with payload: {final_payload}")
return await self._execute_api_search(self.DETAILED_SEARCH_ENDPOINT, final_payload)
async def _execute_api_search(self, endpoint: str, payload: Dict) -> EmsalApiResponse:
"""Helper method to execute search POST request and process response for Emsal."""
try:
response = await self.http_client.post(endpoint, json=payload)
response.raise_for_status()
response_json_data = response.json()
logger.debug(f"EmsalApiClient: Raw API response from {endpoint}: {response_json_data}")
api_response_parsed = EmsalApiResponse(**response_json_data)
if api_response_parsed.data and api_response_parsed.data.data:
for decision_item in api_response_parsed.data.data:
if decision_item.id:
decision_item.document_url = f"{self.BASE_URL}{self.DOCUMENT_ENDPOINT}?id={decision_item.id}"
return api_response_parsed
except httpx.RequestError as e:
logger.error(f"EmsalApiClient: HTTP request error during Emsal search to {endpoint}: {e}")
raise
except Exception as e:
logger.error(f"EmsalApiClient: Error processing or validating Emsal search response from {endpoint}: {e}")
raise
def _clean_html_and_convert_to_markdown_emsal(self, html_content_from_api_data_field: str) -> Optional[str]:
"""
Cleans HTML (from Emsal API 'data' field containing HTML string)
and converts it to Markdown using MarkItDown.
This assumes Emsal /getDokuman response is JSON with HTML in "data" field,
similar to Yargitay and the last Emsal /getDokuman example.
"""
if not html_content_from_api_data_field:
return None
# Basic HTML unescaping and fixing common escaped characters
# Based on user's original fix_html_content in app/routers/emsal.py
content = html.unescape(html_content_from_api_data_field)
content = content.replace('\\"', '"')
content = content.replace('\\r\\n', '\n')
content = content.replace('\\n', '\n')
content = content.replace('\\t', '\t')
# The HTML string from "data" field starts with "<html><head>..."
html_input_for_markdown = content
markdown_text = None
try:
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_input_for_markdown.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
conversion_result = md_converter.convert(html_stream)
markdown_text = conversion_result.text_content
logger.info("EmsalApiClient: HTML to Markdown conversion successful.")
except Exception as e:
logger.error(f"EmsalApiClient: Error during MarkItDown HTML to Markdown conversion for Emsal: {e}")
return markdown_text
async def get_decision_document_as_markdown(self, id: str) -> EmsalDocumentMarkdown:
"""
Retrieves a specific Emsal decision by ID and returns its content as Markdown.
Assumes Emsal /getDokuman endpoint returns JSON with HTML content in the 'data' field.
"""
document_api_url = f"{self.DOCUMENT_ENDPOINT}?id={id}"
source_url = f"{self.BASE_URL}{document_api_url}"
logger.info(f"EmsalApiClient: Fetching Emsal document for Markdown (ID: {id}) from {source_url}")
try:
response = await self.http_client.get(document_api_url)
response.raise_for_status()
# Emsal /getDokuman returns JSON with HTML in 'data' field (confirmed by user example)
response_json = response.json()
html_content_from_api = response_json.get("data")
if not isinstance(html_content_from_api, str) or not html_content_from_api.strip():
logger.warning(f"EmsalApiClient: Received empty or non-string HTML in 'data' field for Emsal ID {id}.")
return EmsalDocumentMarkdown(id=id, markdown_content=None, source_url=source_url)
markdown_content = self._clean_html_and_convert_to_markdown_emsal(html_content_from_api)
return EmsalDocumentMarkdown(
id=id,
markdown_content=markdown_content,
source_url=source_url
)
except httpx.RequestError as e:
logger.error(f"EmsalApiClient: HTTP error fetching Emsal document (ID: {id}): {e}")
raise
except ValueError as e:
logger.error(f"EmsalApiClient: ValueError processing Emsal document response (ID: {id}): {e}")
raise
except Exception as e:
logger.error(f"EmsalApiClient: General error processing Emsal document (ID: {id}): {e}")
raise
async def close_client_session(self):
"""Closes the HTTPX client session."""
if self.http_client and not self.http_client.is_closed:
await self.http_client.aclose()
logger.info("EmsalApiClient: HTTP client session closed.")
```
--------------------------------------------------------------------------------
/bedesten_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# bedesten_mcp_module/client.py
import httpx
import base64
from typing import Optional
import logging
from markitdown import MarkItDown
import io
from .models import (
BedestenSearchRequest, BedestenSearchResponse,
BedestenDocumentRequest, BedestenDocumentResponse,
BedestenDocumentMarkdown, BedestenDocumentRequestData
)
from .enums import get_full_birim_adi
logger = logging.getLogger(__name__)
class BedestenApiClient:
"""
API Client for Bedesten (bedesten.adalet.gov.tr) - Alternative legal decision search system.
Currently used for Yargıtay decisions, but can be extended for other court types.
"""
BASE_URL = "https://bedesten.adalet.gov.tr"
SEARCH_ENDPOINT = "/emsal-karar/searchDocuments"
DOCUMENT_ENDPOINT = "/emsal-karar/getDocumentContent"
def __init__(self, request_timeout: float = 60.0):
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Accept": "*/*",
"Accept-Language": "tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7",
"AdaletApplicationName": "UyapMevzuat",
"Content-Type": "application/json; charset=utf-8",
"Origin": "https://mevzuat.adalet.gov.tr",
"Referer": "https://mevzuat.adalet.gov.tr/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-site",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"
},
timeout=request_timeout
)
async def search_documents(self, search_request: BedestenSearchRequest) -> BedestenSearchResponse:
"""
Search for documents using Bedesten API.
Currently supports: YARGITAYKARARI, DANISTAYKARARI, YERELHUKMAHKARARI, etc.
"""
logger.info(f"BedestenApiClient: Searching documents with phrase: {search_request.data.phrase}")
# Map abbreviated birimAdi to full Turkish name before sending to API
original_birim_adi = search_request.data.birimAdi
mapped_birim_adi = get_full_birim_adi(original_birim_adi)
search_request.data.birimAdi = mapped_birim_adi
if original_birim_adi != "ALL":
logger.info(f"BedestenApiClient: Mapped birimAdi '{original_birim_adi}' to '{mapped_birim_adi}'")
try:
# Create request dict and remove birimAdi if empty
request_dict = search_request.model_dump()
if not request_dict["data"]["birimAdi"]: # Remove if empty string
del request_dict["data"]["birimAdi"]
response = await self.http_client.post(
self.SEARCH_ENDPOINT,
json=request_dict
)
response.raise_for_status()
response_json = response.json()
# Parse and return the response
return BedestenSearchResponse(**response_json)
except httpx.RequestError as e:
logger.error(f"BedestenApiClient: HTTP request error during search: {e}")
raise
except Exception as e:
logger.error(f"BedestenApiClient: Error processing search response: {e}")
raise
async def get_document_as_markdown(self, document_id: str) -> BedestenDocumentMarkdown:
"""
Get document content and convert to markdown.
Handles both HTML (text/html) and PDF (application/pdf) content types.
"""
logger.info(f"BedestenApiClient: Fetching document for markdown conversion (ID: {document_id})")
try:
# Prepare request
doc_request = BedestenDocumentRequest(
data=BedestenDocumentRequestData(documentId=document_id)
)
# Get document
response = await self.http_client.post(
self.DOCUMENT_ENDPOINT,
json=doc_request.model_dump()
)
response.raise_for_status()
response_json = response.json()
doc_response = BedestenDocumentResponse(**response_json)
# Add null safety checks for document data
if not hasattr(doc_response, 'data') or doc_response.data is None:
raise ValueError("Document response does not contain data")
if not hasattr(doc_response.data, 'content') or doc_response.data.content is None:
raise ValueError("Document data does not contain content")
if not hasattr(doc_response.data, 'mimeType') or doc_response.data.mimeType is None:
raise ValueError("Document data does not contain mimeType")
# Decode base64 content with error handling
try:
content_bytes = base64.b64decode(doc_response.data.content)
except Exception as e:
raise ValueError(f"Failed to decode base64 content: {str(e)}")
mime_type = doc_response.data.mimeType
logger.info(f"BedestenApiClient: Document mime type: {mime_type}")
# Convert to markdown based on mime type
if mime_type == "text/html":
html_content = content_bytes.decode('utf-8')
markdown_content = self._convert_html_to_markdown(html_content)
elif mime_type == "application/pdf":
markdown_content = self._convert_pdf_to_markdown(content_bytes)
else:
logger.warning(f"Unsupported mime type: {mime_type}")
markdown_content = f"Unsupported content type: {mime_type}. Unable to convert to markdown."
return BedestenDocumentMarkdown(
documentId=document_id,
markdown_content=markdown_content,
source_url=f"{self.BASE_URL}/document/{document_id}",
mime_type=mime_type
)
except httpx.RequestError as e:
logger.error(f"BedestenApiClient: HTTP error fetching document {document_id}: {e}")
raise
except Exception as e:
logger.error(f"BedestenApiClient: Error processing document {document_id}: {e}")
raise
def _convert_html_to_markdown(self, html_content: str) -> Optional[str]:
"""Convert HTML to Markdown using MarkItDown"""
if not html_content:
return None
try:
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_content.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
result = md_converter.convert(html_stream)
markdown_content = result.text_content
logger.info("Successfully converted HTML to Markdown")
return markdown_content
except Exception as e:
logger.error(f"Error converting HTML to Markdown: {e}")
return f"Error converting HTML content: {str(e)}"
def _convert_pdf_to_markdown(self, pdf_bytes: bytes) -> Optional[str]:
"""Convert PDF to Markdown using MarkItDown"""
if not pdf_bytes:
return None
try:
# Create BytesIO stream from PDF bytes
pdf_stream = io.BytesIO(pdf_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
result = md_converter.convert(pdf_stream)
markdown_content = result.text_content
logger.info("Successfully converted PDF to Markdown")
return markdown_content
except Exception as e:
logger.error(f"Error converting PDF to Markdown: {e}")
return f"Error converting PDF content: {str(e)}. The document may be corrupted or in an unsupported format."
async def close_client_session(self):
"""Close HTTP client session"""
await self.http_client.aclose()
logger.info("BedestenApiClient: HTTP client session closed.")
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/yargitay_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# yargitay_mcp_module/client.py
import httpx
from bs4 import BeautifulSoup # Still needed for pre-processing HTML before markitdown
from typing import Dict, Any, List, Optional
import logging
import html
import re
import io
from markitdown import MarkItDown
from .models import (
YargitayDetailedSearchRequest,
YargitayApiSearchResponse,
YargitayApiDecisionEntry,
YargitayDocumentMarkdown,
CompactYargitaySearchResult
)
logger = logging.getLogger(__name__)
# Basic logging configuration if no handlers are configured
if not logger.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
class YargitayOfficialApiClient:
"""
API Client for Yargitay's official decision search system.
Targets the detailed search endpoint (e.g., /aramadetaylist) based on user-provided payload.
"""
BASE_URL = "https://karararama.yargitay.gov.tr"
# The form action was "/detayliArama". This often maps to an API endpoint like "/aramadetaylist".
# This should be confirmed with the actual API.
DETAILED_SEARCH_ENDPOINT = "/aramadetaylist"
DOCUMENT_ENDPOINT = "/getDokuman"
def __init__(self, request_timeout: float = 60.0):
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Content-Type": "application/json; charset=UTF-8",
"Accept": "application/json, text/plain, */*",
"X-Requested-With": "XMLHttpRequest",
"X-KL-KIS-Ajax-Request": "Ajax_Request", # Seen in a Yargitay client example
"Referer": f"{self.BASE_URL}/" # Some APIs might check referer
},
timeout=request_timeout,
verify=False # SSL verification disabled as per original user code - use with caution
)
async def search_detailed_decisions(
self,
search_params: YargitayDetailedSearchRequest
) -> YargitayApiSearchResponse:
"""
Performs a detailed search for decisions in Yargitay
using the structured search_params.
"""
# Create the main payload structure with the 'data' key
request_payload = {"data": search_params.model_dump(exclude_none=True, by_alias=True)}
logger.info(f"YargitayOfficialApiClient: Performing detailed search with payload: {request_payload}")
try:
response = await self.http_client.post(self.DETAILED_SEARCH_ENDPOINT, json=request_payload)
response.raise_for_status() # Raise an exception for HTTP 4xx or 5xx status codes
response_json_data = response.json()
logger.debug(f"YargitayOfficialApiClient: Raw API response: {response_json_data}")
# Handle None or empty data response from API
if response_json_data is None:
logger.warning("YargitayOfficialApiClient: API returned None response")
response_json_data = {"data": {"data": [], "recordsTotal": 0, "recordsFiltered": 0}}
elif not isinstance(response_json_data, dict):
logger.warning(f"YargitayOfficialApiClient: API returned unexpected response type: {type(response_json_data)}")
response_json_data = {"data": {"data": [], "recordsTotal": 0, "recordsFiltered": 0}}
elif response_json_data.get("data") is None:
logger.warning("YargitayOfficialApiClient: API response data field is None")
response_json_data["data"] = {"data": [], "recordsTotal": 0, "recordsFiltered": 0}
# Validate and parse the response using Pydantic models
api_response = YargitayApiSearchResponse(**response_json_data)
# Populate the document_url for each decision entry
if api_response.data and api_response.data.data:
for decision_item in api_response.data.data:
decision_item.document_url = f"{self.BASE_URL}{self.DOCUMENT_ENDPOINT}?id={decision_item.id}"
return api_response
except httpx.RequestError as e:
logger.error(f"YargitayOfficialApiClient: HTTP request error during detailed search: {e}")
raise # Re-raise to be handled by the calling MCP tool
except Exception as e: # Catches Pydantic ValidationErrors as well
logger.error(f"YargitayOfficialApiClient: Error processing or validating detailed search response: {e}")
raise
def _convert_html_to_markdown(self, html_from_api_data_field: str) -> Optional[str]:
"""
Takes raw HTML string (from Yargitay API 'data' field for a document),
pre-processes it, and converts it to Markdown using MarkItDown.
Returns only the Markdown string or None if conversion fails.
"""
if not html_from_api_data_field:
return None
# Pre-process HTML: unescape entities and fix common escaped sequences
# Based on user's original fix_html_content
processed_html = html.unescape(html_from_api_data_field)
processed_html = processed_html.replace('\\"', '"')
processed_html = processed_html.replace('\\r\\n', '\n')
processed_html = processed_html.replace('\\n', '\n')
processed_html = processed_html.replace('\\t', '\t')
# MarkItDown often works best with a full HTML document structure.
# The Yargitay /getDokuman response already provides a full <html>...</html> string.
# If it were just a fragment, we might wrap it like:
# html_to_convert = f"<html><head><meta charset=\"UTF-8\"></head><body>{processed_html}</body></html>"
# But since it's already a full HTML string in "data":
html_to_convert = processed_html
markdown_output = None
try:
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_to_convert.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
conversion_result = md_converter.convert(html_stream)
markdown_output = conversion_result.text_content
logger.info("Successfully converted HTML to Markdown.")
except Exception as e:
logger.error(f"Error during MarkItDown HTML to Markdown conversion: {e}")
return markdown_output
async def get_decision_document_as_markdown(self, id: str) -> YargitayDocumentMarkdown:
"""
Retrieves a specific Yargitay decision by its ID and returns its content
as Markdown.
Based on user-provided /getDokuman response structure.
"""
document_api_url = f"{self.DOCUMENT_ENDPOINT}?id={id}"
source_url = f"{self.BASE_URL}{document_api_url}" # The original URL of the document
logger.info(f"YargitayOfficialApiClient: Fetching document for Markdown conversion (ID: {id})")
try:
response = await self.http_client.get(document_api_url)
response.raise_for_status()
# Expecting JSON response with HTML content in the 'data' field
response_json = response.json()
html_content_from_api = response_json.get("data")
if not isinstance(html_content_from_api, str):
logger.error(f"YargitayOfficialApiClient: 'data' field in API response is not a string or not found (ID: {id}).")
raise ValueError("Expected HTML content not found in API response's 'data' field.")
markdown_content = self._convert_html_to_markdown(html_content_from_api)
return YargitayDocumentMarkdown(
id=id,
markdown_content=markdown_content,
source_url=source_url
)
except httpx.RequestError as e:
logger.error(f"YargitayOfficialApiClient: HTTP error fetching document for Markdown (ID: {id}): {e}")
raise
except ValueError as e: # For JSON parsing errors or missing 'data' field
logger.error(f"YargitayOfficialApiClient: Error processing document response for Markdown (ID: {id}): {e}")
raise
except Exception as e: # For other unexpected errors
logger.error(f"YargitayOfficialApiClient: General error fetching/processing document for Markdown (ID: {id}): {e}")
raise
async def close_client_session(self):
"""Closes the HTTPX client session."""
await self.http_client.aclose()
logger.info("YargitayOfficialApiClient: HTTP client session closed.")
```
--------------------------------------------------------------------------------
/yargitay_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# yargitay_mcp_module/client.py
import httpx
from bs4 import BeautifulSoup # Still needed for pre-processing HTML before markitdown
from typing import Dict, Any, List, Optional
import logging
import html
import re
import io
from markitdown import MarkItDown
from .models import (
YargitayDetailedSearchRequest,
YargitayApiSearchResponse,
YargitayApiDecisionEntry,
YargitayDocumentMarkdown,
CompactYargitaySearchResult
)
logger = logging.getLogger(__name__)
# Basic logging configuration if no handlers are configured
if not logger.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
class YargitayOfficialApiClient:
"""
API Client for Yargitay's official decision search system.
Targets the detailed search endpoint (e.g., /aramadetaylist) based on user-provided payload.
"""
BASE_URL = "https://karararama.yargitay.gov.tr"
# The form action was "/detayliArama". This often maps to an API endpoint like "/aramadetaylist".
# This should be confirmed with the actual API.
DETAILED_SEARCH_ENDPOINT = "/aramadetaylist"
DOCUMENT_ENDPOINT = "/getDokuman"
def __init__(self, request_timeout: float = 60.0):
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Content-Type": "application/json; charset=UTF-8",
"Accept": "application/json, text/plain, */*",
"X-Requested-With": "XMLHttpRequest",
"X-KL-KIS-Ajax-Request": "Ajax_Request", # Seen in a Yargitay client example
"Referer": f"{self.BASE_URL}/" # Some APIs might check referer
},
timeout=request_timeout,
verify=False # SSL verification disabled as per original user code - use with caution
)
async def search_detailed_decisions(
self,
search_params: YargitayDetailedSearchRequest
) -> YargitayApiSearchResponse:
"""
Performs a detailed search for decisions in Yargitay
using the structured search_params.
"""
# Create the main payload structure with the 'data' key
request_payload = {"data": search_params.model_dump(exclude_none=True, by_alias=True)}
logger.info(f"YargitayOfficialApiClient: Performing detailed search with payload: {request_payload}")
try:
response = await self.http_client.post(self.DETAILED_SEARCH_ENDPOINT, json=request_payload)
response.raise_for_status() # Raise an exception for HTTP 4xx or 5xx status codes
response_json_data = response.json()
logger.debug(f"YargitayOfficialApiClient: Raw API response: {response_json_data}")
# Handle None or empty data response from API
if response_json_data is None:
logger.warning("YargitayOfficialApiClient: API returned None response")
response_json_data = {"data": {"data": [], "recordsTotal": 0, "recordsFiltered": 0}}
elif not isinstance(response_json_data, dict):
logger.warning(f"YargitayOfficialApiClient: API returned unexpected response type: {type(response_json_data)}")
response_json_data = {"data": {"data": [], "recordsTotal": 0, "recordsFiltered": 0}}
elif response_json_data.get("data") is None:
logger.warning("YargitayOfficialApiClient: API response data field is None")
response_json_data["data"] = {"data": [], "recordsTotal": 0, "recordsFiltered": 0}
# Validate and parse the response using Pydantic models
api_response = YargitayApiSearchResponse(**response_json_data)
# Populate the document_url for each decision entry
if api_response.data and api_response.data.data:
for decision_item in api_response.data.data:
decision_item.document_url = f"{self.BASE_URL}{self.DOCUMENT_ENDPOINT}?id={decision_item.id}"
return api_response
except httpx.RequestError as e:
logger.error(f"YargitayOfficialApiClient: HTTP request error during detailed search: {e}")
raise # Re-raise to be handled by the calling MCP tool
except Exception as e: # Catches Pydantic ValidationErrors as well
logger.error(f"YargitayOfficialApiClient: Error processing or validating detailed search response: {e}")
raise
def _convert_html_to_markdown(self, html_from_api_data_field: str) -> Optional[str]:
"""
Takes raw HTML string (from Yargitay API 'data' field for a document),
pre-processes it, and converts it to Markdown using MarkItDown.
Returns only the Markdown string or None if conversion fails.
"""
if not html_from_api_data_field:
return None
# Pre-process HTML: unescape entities and fix common escaped sequences
# Based on user's original fix_html_content
processed_html = html.unescape(html_from_api_data_field)
processed_html = processed_html.replace('\\"', '"')
processed_html = processed_html.replace('\\r\\n', '\n')
processed_html = processed_html.replace('\\n', '\n')
processed_html = processed_html.replace('\\t', '\t')
# MarkItDown often works best with a full HTML document structure.
# The Yargitay /getDokuman response already provides a full <html>...</html> string.
# If it were just a fragment, we might wrap it like:
# html_to_convert = f"<html><head><meta charset=\"UTF-8\"></head><body>{processed_html}</body></html>"
# But since it's already a full HTML string in "data":
html_to_convert = processed_html
markdown_output = None
try:
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_to_convert.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
conversion_result = md_converter.convert(html_stream)
markdown_output = conversion_result.text_content
logger.info("Successfully converted HTML to Markdown.")
except Exception as e:
logger.error(f"Error during MarkItDown HTML to Markdown conversion: {e}")
return markdown_output
async def get_decision_document_as_markdown(self, id: str) -> YargitayDocumentMarkdown:
"""
Retrieves a specific Yargitay decision by its ID and returns its content
as Markdown.
Based on user-provided /getDokuman response structure.
"""
document_api_url = f"{self.DOCUMENT_ENDPOINT}?id={id}"
source_url = f"{self.BASE_URL}{document_api_url}" # The original URL of the document
logger.info(f"YargitayOfficialApiClient: Fetching document for Markdown conversion (ID: {id})")
try:
response = await self.http_client.get(document_api_url)
response.raise_for_status()
# Expecting JSON response with HTML content in the 'data' field
response_json = response.json()
html_content_from_api = response_json.get("data")
if not isinstance(html_content_from_api, str):
logger.error(f"YargitayOfficialApiClient: 'data' field in API response is not a string or not found (ID: {id}).")
raise ValueError("Expected HTML content not found in API response's 'data' field.")
markdown_content = self._convert_html_to_markdown(html_content_from_api)
return YargitayDocumentMarkdown(
id=id,
markdown_content=markdown_content,
source_url=source_url
)
except httpx.RequestError as e:
logger.error(f"YargitayOfficialApiClient: HTTP error fetching document for Markdown (ID: {id}): {e}")
raise
except ValueError as e: # For JSON parsing errors or missing 'data' field
logger.error(f"YargitayOfficialApiClient: Error processing document response for Markdown (ID: {id}): {e}")
raise
except Exception as e: # For other unexpected errors
logger.error(f"YargitayOfficialApiClient: General error fetching/processing document for Markdown (ID: {id}): {e}")
raise
async def close_client_session(self):
"""Closes the HTTPX client session."""
await self.http_client.aclose()
logger.info("YargitayOfficialApiClient: HTTP client session closed.")
```
--------------------------------------------------------------------------------
/danistay_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# danistay_mcp_module/client.py
import httpx
from bs4 import BeautifulSoup
from typing import Dict, Any, List, Optional
import logging
import html
import re
import io
from markitdown import MarkItDown
from .models import (
DanistayKeywordSearchRequest,
DanistayDetailedSearchRequest,
DanistayApiResponse,
DanistayDocumentMarkdown,
DanistayKeywordSearchRequestData,
DanistayDetailedSearchRequestData
)
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
class DanistayApiClient:
BASE_URL = "https://karararama.danistay.gov.tr"
KEYWORD_SEARCH_ENDPOINT = "/aramalist"
DETAILED_SEARCH_ENDPOINT = "/aramadetaylist"
DOCUMENT_ENDPOINT = "/getDokuman"
def __init__(self, request_timeout: float = 30.0):
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Content-Type": "application/json; charset=UTF-8", # Arama endpoint'leri için
"Accept": "application/json, text/plain, */*", # Arama endpoint'leri için
"X-Requested-With": "XMLHttpRequest",
},
timeout=request_timeout,
verify=False
)
def _prepare_keywords_for_api(self, keywords: List[str]) -> List[str]:
return ['"' + k.strip('"') + '"' for k in keywords if k and k.strip()]
async def search_keyword_decisions(
self,
params: DanistayKeywordSearchRequest
) -> DanistayApiResponse:
data_for_payload = DanistayKeywordSearchRequestData(
andKelimeler=self._prepare_keywords_for_api(params.andKelimeler),
orKelimeler=self._prepare_keywords_for_api(params.orKelimeler),
notAndKelimeler=self._prepare_keywords_for_api(params.notAndKelimeler),
notOrKelimeler=self._prepare_keywords_for_api(params.notOrKelimeler),
pageSize=params.pageSize,
pageNumber=params.pageNumber
)
final_payload = {"data": data_for_payload.model_dump(exclude_none=True)}
logger.info(f"DanistayApiClient: Performing KEYWORD search via {self.KEYWORD_SEARCH_ENDPOINT} with payload: {final_payload}")
return await self._execute_api_search(self.KEYWORD_SEARCH_ENDPOINT, final_payload)
async def search_detailed_decisions(
self,
params: DanistayDetailedSearchRequest
) -> DanistayApiResponse:
data_for_payload = DanistayDetailedSearchRequestData(
daire=params.daire or "",
esasYil=params.esasYil or "",
esasIlkSiraNo=params.esasIlkSiraNo or "",
esasSonSiraNo=params.esasSonSiraNo or "",
kararYil=params.kararYil or "",
kararIlkSiraNo=params.kararIlkSiraNo or "",
kararSonSiraNo=params.kararSonSiraNo or "",
baslangicTarihi=params.baslangicTarihi or "",
bitisTarihi=params.bitisTarihi or "",
mevzuatNumarasi=params.mevzuatNumarasi or "",
mevzuatAdi=params.mevzuatAdi or "",
madde=params.madde or "",
siralama="1",
siralamaDirection="desc",
pageSize=params.pageSize,
pageNumber=params.pageNumber
)
# Create request dict and remove empty string fields to avoid API issues
payload_dict = data_for_payload.model_dump(exclude_defaults=False, exclude_none=False)
# Remove empty string fields that might cause API issues
cleaned_payload = {k: v for k, v in payload_dict.items() if v != ""}
final_payload = {"data": cleaned_payload}
logger.info(f"DanistayApiClient: Performing DETAILED search via {self.DETAILED_SEARCH_ENDPOINT} with payload: {final_payload}")
return await self._execute_api_search(self.DETAILED_SEARCH_ENDPOINT, final_payload)
async def _execute_api_search(self, endpoint: str, payload: Dict) -> DanistayApiResponse:
try:
response = await self.http_client.post(endpoint, json=payload)
response.raise_for_status()
response_json_data = response.json()
logger.debug(f"DanistayApiClient: Raw API response from {endpoint}: {response_json_data}")
api_response_parsed = DanistayApiResponse(**response_json_data)
if api_response_parsed.data and api_response_parsed.data.data:
for decision_item in api_response_parsed.data.data:
if decision_item.id:
decision_item.document_url = f"{self.BASE_URL}{self.DOCUMENT_ENDPOINT}?id={decision_item.id}"
return api_response_parsed
except httpx.RequestError as e:
logger.error(f"DanistayApiClient: HTTP request error during search to {endpoint}: {e}")
raise
except Exception as e:
logger.error(f"DanistayApiClient: Error processing or validating search response from {endpoint}: {e}")
raise
def _convert_html_to_markdown_danistay(self, direct_html_content: str) -> Optional[str]:
"""
Converts direct HTML content (assumed from Danıştay /getDokuman) to Markdown.
"""
if not direct_html_content:
return None
# Basic HTML unescaping and fixing common escaped characters
# This step might be less critical if MarkItDown handles them, but good for pre-cleaning.
processed_html = html.unescape(direct_html_content)
processed_html = processed_html.replace('\\"', '"') # If any such JS-escaped strings exist
# Danistay HTML doesn't seem to have \\r\\n etc. from the example, but keeping for robustness
processed_html = processed_html.replace('\\r\\n', '\n').replace('\\n', '\n').replace('\\t', '\t')
# For simplicity and to leverage MarkItDown's capability to handle full docs,
# we pass the pre-processed full HTML.
html_input_for_markdown = processed_html
markdown_text = None
try:
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_input_for_markdown.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
conversion_result = md_converter.convert(html_stream)
markdown_text = conversion_result.text_content
logger.info("DanistayApiClient: HTML to Markdown conversion successful.")
except Exception as e:
logger.error(f"DanistayApiClient: Error during MarkItDown HTML to Markdown conversion: {e}")
return markdown_text
async def get_decision_document_as_markdown(self, id: str) -> DanistayDocumentMarkdown:
"""
Retrieves a specific Danıştay decision by ID and returns its content as Markdown.
The /getDokuman endpoint for Danıştay requires arananKelime parameter.
"""
# Add required arananKelime parameter - using empty string as minimum requirement
document_api_url = f"{self.DOCUMENT_ENDPOINT}?id={id}&arananKelime="
source_url = f"{self.BASE_URL}{document_api_url}"
logger.info(f"DanistayApiClient: Fetching Danistay document for Markdown (ID: {id}) from {source_url}")
try:
# For direct HTML response, we might want different headers if the API is sensitive,
# but httpx usually handles basic GET requests well.
response = await self.http_client.get(document_api_url)
response.raise_for_status()
# Danıştay /getDokuman directly returns HTML text
html_content_from_api = response.text
if not isinstance(html_content_from_api, str) or not html_content_from_api.strip():
logger.warning(f"DanistayApiClient: Received empty or non-string HTML content for ID {id}.")
# Return with None markdown_content if HTML is effectively empty
return DanistayDocumentMarkdown(
id=id,
markdown_content=None,
source_url=source_url
)
markdown_content = self._convert_html_to_markdown_danistay(html_content_from_api)
return DanistayDocumentMarkdown(
id=id,
markdown_content=markdown_content,
source_url=source_url
)
except httpx.RequestError as e:
logger.error(f"DanistayApiClient: HTTP error fetching Danistay document (ID: {id}): {e}")
raise
# Removed ValueError for JSON as Danistay /getDokuman returns direct HTML
except Exception as e: # Catches other errors like MarkItDown issues if they propagate
logger.error(f"DanistayApiClient: General error processing Danistay document (ID: {id}): {e}")
raise
async def close_client_session(self):
"""Closes the HTTPX client session."""
if self.http_client and not self.http_client.is_closed:
await self.http_client.aclose()
logger.info("DanistayApiClient: HTTP client session closed.")
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/danistay_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# danistay_mcp_module/client.py
import httpx
from bs4 import BeautifulSoup
from typing import Dict, Any, List, Optional
import logging
import html
import re
import io
from markitdown import MarkItDown
from .models import (
DanistayKeywordSearchRequest,
DanistayDetailedSearchRequest,
DanistayApiResponse,
DanistayDocumentMarkdown,
DanistayKeywordSearchRequestData,
DanistayDetailedSearchRequestData
)
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
class DanistayApiClient:
BASE_URL = "https://karararama.danistay.gov.tr"
KEYWORD_SEARCH_ENDPOINT = "/aramalist"
DETAILED_SEARCH_ENDPOINT = "/aramadetaylist"
DOCUMENT_ENDPOINT = "/getDokuman"
def __init__(self, request_timeout: float = 30.0):
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Content-Type": "application/json; charset=UTF-8", # Arama endpoint'leri için
"Accept": "application/json, text/plain, */*", # Arama endpoint'leri için
"X-Requested-With": "XMLHttpRequest",
},
timeout=request_timeout,
verify=False
)
def _prepare_keywords_for_api(self, keywords: List[str]) -> List[str]:
return ['"' + k.strip('"') + '"' for k in keywords if k and k.strip()]
async def search_keyword_decisions(
self,
params: DanistayKeywordSearchRequest
) -> DanistayApiResponse:
data_for_payload = DanistayKeywordSearchRequestData(
andKelimeler=self._prepare_keywords_for_api(params.andKelimeler),
orKelimeler=self._prepare_keywords_for_api(params.orKelimeler),
notAndKelimeler=self._prepare_keywords_for_api(params.notAndKelimeler),
notOrKelimeler=self._prepare_keywords_for_api(params.notOrKelimeler),
pageSize=params.pageSize,
pageNumber=params.pageNumber
)
final_payload = {"data": data_for_payload.model_dump(exclude_none=True)}
logger.info(f"DanistayApiClient: Performing KEYWORD search via {self.KEYWORD_SEARCH_ENDPOINT} with payload: {final_payload}")
return await self._execute_api_search(self.KEYWORD_SEARCH_ENDPOINT, final_payload)
async def search_detailed_decisions(
self,
params: DanistayDetailedSearchRequest
) -> DanistayApiResponse:
data_for_payload = DanistayDetailedSearchRequestData(
daire=params.daire or "",
esasYil=params.esasYil or "",
esasIlkSiraNo=params.esasIlkSiraNo or "",
esasSonSiraNo=params.esasSonSiraNo or "",
kararYil=params.kararYil or "",
kararIlkSiraNo=params.kararIlkSiraNo or "",
kararSonSiraNo=params.kararSonSiraNo or "",
baslangicTarihi=params.baslangicTarihi or "",
bitisTarihi=params.bitisTarihi or "",
mevzuatNumarasi=params.mevzuatNumarasi or "",
mevzuatAdi=params.mevzuatAdi or "",
madde=params.madde or "",
siralama="1",
siralamaDirection="desc",
pageSize=params.pageSize,
pageNumber=params.pageNumber
)
# Create request dict and remove empty string fields to avoid API issues
payload_dict = data_for_payload.model_dump(exclude_defaults=False, exclude_none=False)
# Remove empty string fields that might cause API issues
cleaned_payload = {k: v for k, v in payload_dict.items() if v != ""}
final_payload = {"data": cleaned_payload}
logger.info(f"DanistayApiClient: Performing DETAILED search via {self.DETAILED_SEARCH_ENDPOINT} with payload: {final_payload}")
return await self._execute_api_search(self.DETAILED_SEARCH_ENDPOINT, final_payload)
async def _execute_api_search(self, endpoint: str, payload: Dict) -> DanistayApiResponse:
try:
response = await self.http_client.post(endpoint, json=payload)
response.raise_for_status()
response_json_data = response.json()
logger.debug(f"DanistayApiClient: Raw API response from {endpoint}: {response_json_data}")
api_response_parsed = DanistayApiResponse(**response_json_data)
if api_response_parsed.data and api_response_parsed.data.data:
for decision_item in api_response_parsed.data.data:
if decision_item.id:
decision_item.document_url = f"{self.BASE_URL}{self.DOCUMENT_ENDPOINT}?id={decision_item.id}"
return api_response_parsed
except httpx.RequestError as e:
logger.error(f"DanistayApiClient: HTTP request error during search to {endpoint}: {e}")
raise
except Exception as e:
logger.error(f"DanistayApiClient: Error processing or validating search response from {endpoint}: {e}")
raise
def _convert_html_to_markdown_danistay(self, direct_html_content: str) -> Optional[str]:
"""
Converts direct HTML content (assumed from Danıştay /getDokuman) to Markdown.
"""
if not direct_html_content:
return None
# Basic HTML unescaping and fixing common escaped characters
# This step might be less critical if MarkItDown handles them, but good for pre-cleaning.
processed_html = html.unescape(direct_html_content)
processed_html = processed_html.replace('\\"', '"') # If any such JS-escaped strings exist
# Danistay HTML doesn't seem to have \\r\\n etc. from the example, but keeping for robustness
processed_html = processed_html.replace('\\r\\n', '\n').replace('\\n', '\n').replace('\\t', '\t')
# For simplicity and to leverage MarkItDown's capability to handle full docs,
# we pass the pre-processed full HTML.
html_input_for_markdown = processed_html
markdown_text = None
try:
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_input_for_markdown.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
conversion_result = md_converter.convert(html_stream)
markdown_text = conversion_result.text_content
logger.info("DanistayApiClient: HTML to Markdown conversion successful.")
except Exception as e:
logger.error(f"DanistayApiClient: Error during MarkItDown HTML to Markdown conversion: {e}")
return markdown_text
async def get_decision_document_as_markdown(self, id: str) -> DanistayDocumentMarkdown:
"""
Retrieves a specific Danıştay decision by ID and returns its content as Markdown.
The /getDokuman endpoint for Danıştay requires arananKelime parameter.
"""
# Add required arananKelime parameter - using empty string as minimum requirement
document_api_url = f"{self.DOCUMENT_ENDPOINT}?id={id}&arananKelime="
source_url = f"{self.BASE_URL}{document_api_url}"
logger.info(f"DanistayApiClient: Fetching Danistay document for Markdown (ID: {id}) from {source_url}")
try:
# For direct HTML response, we might want different headers if the API is sensitive,
# but httpx usually handles basic GET requests well.
response = await self.http_client.get(document_api_url)
response.raise_for_status()
# Danıştay /getDokuman directly returns HTML text
html_content_from_api = response.text
if not isinstance(html_content_from_api, str) or not html_content_from_api.strip():
logger.warning(f"DanistayApiClient: Received empty or non-string HTML content for ID {id}.")
# Return with None markdown_content if HTML is effectively empty
return DanistayDocumentMarkdown(
id=id,
markdown_content=None,
source_url=source_url
)
markdown_content = self._convert_html_to_markdown_danistay(html_content_from_api)
return DanistayDocumentMarkdown(
id=id,
markdown_content=markdown_content,
source_url=source_url
)
except httpx.RequestError as e:
logger.error(f"DanistayApiClient: HTTP error fetching Danistay document (ID: {id}): {e}")
raise
# Removed ValueError for JSON as Danistay /getDokuman returns direct HTML
except Exception as e: # Catches other errors like MarkItDown issues if they propagate
logger.error(f"DanistayApiClient: General error processing Danistay document (ID: {id}): {e}")
raise
async def close_client_session(self):
"""Closes the HTTPX client session."""
if self.http_client and not self.http_client.is_closed:
await self.http_client.aclose()
logger.info("DanistayApiClient: HTTP client session closed.")
```
--------------------------------------------------------------------------------
/bddk_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# bddk_mcp_module/client.py
import httpx
from typing import List, Optional, Dict, Any
import logging
import os
import re
import io
import math
from urllib.parse import urlparse
from markitdown import MarkItDown
from .models import (
BddkSearchRequest,
BddkDecisionSummary,
BddkSearchResult,
BddkDocumentMarkdown
)
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
class BddkApiClient:
"""
API client for searching and retrieving BDDK (Banking Regulation Authority) decisions
using Tavily Search API for discovery and direct HTTP requests for content retrieval.
"""
TAVILY_API_URL = "https://api.tavily.com/search"
BDDK_BASE_URL = "https://www.bddk.org.tr"
DOCUMENT_URL_TEMPLATE = "https://www.bddk.org.tr/Mevzuat/DokumanGetir/{document_id}"
DOCUMENT_MARKDOWN_CHUNK_SIZE = 5000 # Character limit per page
def __init__(self, request_timeout: float = 60.0):
"""Initialize the BDDK API client."""
self.tavily_api_key = os.getenv("TAVILY_API_KEY")
if not self.tavily_api_key:
# Fallback to development token
self.tavily_api_key = "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"
logger.info("Using fallback Tavily API token (development token)")
else:
logger.info("Using Tavily API key from environment variable")
self.http_client = httpx.AsyncClient(
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
},
timeout=httpx.Timeout(request_timeout)
)
self.markitdown = MarkItDown()
async def close_client_session(self):
"""Close the HTTP client session."""
await self.http_client.aclose()
logger.info("BddkApiClient: HTTP client session closed.")
def _extract_document_id(self, url: str) -> Optional[str]:
"""Extract document ID from BDDK URL."""
# Primary pattern: https://www.bddk.org.tr/Mevzuat/DokumanGetir/310
match = re.search(r'/DokumanGetir/(\d+)', url)
if match:
return match.group(1)
# Alternative patterns for different BDDK URL formats
# Pattern: /Liste/55 -> use as document ID
match = re.search(r'/Liste/(\d+)', url)
if match:
return match.group(1)
# Pattern: /EkGetir/13?ekId=381 -> use ekId as document ID
match = re.search(r'ekId=(\d+)', url)
if match:
return match.group(1)
return None
async def search_decisions(
self,
request: BddkSearchRequest
) -> BddkSearchResult:
"""
Search for BDDK decisions using Tavily API.
Args:
request: Search request parameters
Returns:
BddkSearchResult with matching decisions
"""
try:
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.tavily_api_key}"
}
# Tavily API request - enhanced for BDDK decision documents
query = f"{request.keywords} \"Karar Sayısı\""
payload = {
"query": query,
"country": "turkey",
"include_domains": ["https://www.bddk.org.tr/Mevzuat/DokumanGetir"],
"max_results": request.pageSize,
"search_depth": "advanced"
}
# Calculate offset for pagination
if request.page > 1:
# Tavily doesn't have direct pagination, so we'll need to handle this
# For now, we'll just return empty for pages > 1
logger.warning(f"Tavily API doesn't support pagination. Page {request.page} requested.")
response = await self.http_client.post(
self.TAVILY_API_URL,
json=payload,
headers=headers
)
response.raise_for_status()
data = response.json()
# Log raw Tavily response for debugging
logger.info(f"Tavily returned {len(data.get('results', []))} results")
# Convert Tavily results to our format
decisions = []
for result in data.get("results", []):
# Extract document ID from URL
url = result.get("url", "")
logger.debug(f"Processing URL: {url}")
doc_id = self._extract_document_id(url)
if doc_id:
decision = BddkDecisionSummary(
title=result.get("title", "").replace("[PDF] ", "").strip(),
document_id=doc_id,
content=result.get("content", "")[:500] # Limit content length
)
decisions.append(decision)
logger.debug(f"Added decision: {decision.title} (ID: {doc_id})")
else:
logger.warning(f"Could not extract document ID from URL: {url}")
return BddkSearchResult(
decisions=decisions,
total_results=len(data.get("results", [])),
page=request.page,
pageSize=request.pageSize
)
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error searching BDDK decisions: {e}")
if e.response.status_code == 401:
raise Exception("Tavily API authentication failed. Check API key.")
raise Exception(f"Failed to search BDDK decisions: {str(e)}")
except Exception as e:
logger.error(f"Error searching BDDK decisions: {e}")
raise Exception(f"Failed to search BDDK decisions: {str(e)}")
async def get_document_markdown(
self,
document_id: str,
page_number: int = 1
) -> BddkDocumentMarkdown:
"""
Retrieve a BDDK document and convert it to Markdown format.
Args:
document_id: BDDK document ID (e.g., '310')
page_number: Page number for paginated content (1-indexed)
Returns:
BddkDocumentMarkdown with paginated content
"""
try:
# Try different URL patterns for BDDK documents
potential_urls = [
f"https://www.bddk.org.tr/Mevzuat/DokumanGetir/{document_id}",
f"https://www.bddk.org.tr/Mevzuat/Liste/{document_id}",
f"https://www.bddk.org.tr/KurumHakkinda/EkGetir/13?ekId={document_id}",
f"https://www.bddk.org.tr/KurumHakkinda/EkGetir/5?ekId={document_id}"
]
document_url = None
response = None
# Try each URL pattern until one works
for url in potential_urls:
try:
logger.info(f"Trying BDDK document URL: {url}")
response = await self.http_client.get(
url,
follow_redirects=True
)
response.raise_for_status()
document_url = url
break
except httpx.HTTPStatusError:
continue
if not response or not document_url:
raise Exception(f"Could not find document with ID {document_id}")
logger.info(f"Successfully fetched BDDK document from: {document_url}")
# Determine content type
content_type = response.headers.get("content-type", "").lower()
# Convert to Markdown based on content type
if "pdf" in content_type:
# Handle PDF documents
pdf_stream = io.BytesIO(response.content)
result = self.markitdown.convert_stream(pdf_stream, file_extension=".pdf")
markdown_content = result.text_content
else:
# Handle HTML documents
html_stream = io.BytesIO(response.content)
result = self.markitdown.convert_stream(html_stream, file_extension=".html")
markdown_content = result.text_content
# Clean up the markdown content
markdown_content = markdown_content.strip()
# Calculate pagination
total_length = len(markdown_content)
total_pages = math.ceil(total_length / self.DOCUMENT_MARKDOWN_CHUNK_SIZE)
# Extract the requested page
start_idx = (page_number - 1) * self.DOCUMENT_MARKDOWN_CHUNK_SIZE
end_idx = start_idx + self.DOCUMENT_MARKDOWN_CHUNK_SIZE
page_content = markdown_content[start_idx:end_idx]
return BddkDocumentMarkdown(
document_id=document_id,
markdown_content=page_content,
page_number=page_number,
total_pages=total_pages
)
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error fetching BDDK document {document_id}: {e}")
raise Exception(f"Failed to fetch BDDK document: {str(e)}")
except Exception as e:
logger.error(f"Error processing BDDK document {document_id}: {e}")
raise Exception(f"Failed to process BDDK document: {str(e)}")
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/bddk_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# bddk_mcp_module/client.py
import httpx
from typing import List, Optional, Dict, Any
import logging
import os
import re
import io
import math
from urllib.parse import urlparse
from markitdown import MarkItDown
from .models import (
BddkSearchRequest,
BddkDecisionSummary,
BddkSearchResult,
BddkDocumentMarkdown
)
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
class BddkApiClient:
"""
API client for searching and retrieving BDDK (Banking Regulation Authority) decisions
using Tavily Search API for discovery and direct HTTP requests for content retrieval.
"""
TAVILY_API_URL = "https://api.tavily.com/search"
BDDK_BASE_URL = "https://www.bddk.org.tr"
DOCUMENT_URL_TEMPLATE = "https://www.bddk.org.tr/Mevzuat/DokumanGetir/{document_id}"
DOCUMENT_MARKDOWN_CHUNK_SIZE = 5000 # Character limit per page
def __init__(self, request_timeout: float = 60.0):
"""Initialize the BDDK API client."""
self.tavily_api_key = os.getenv("TAVILY_API_KEY")
if not self.tavily_api_key:
# Fallback to development token
self.tavily_api_key = "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"
logger.info("Using fallback Tavily API token (development token)")
else:
logger.info("Using Tavily API key from environment variable")
self.http_client = httpx.AsyncClient(
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
},
timeout=httpx.Timeout(request_timeout)
)
self.markitdown = MarkItDown()
async def close_client_session(self):
"""Close the HTTP client session."""
await self.http_client.aclose()
logger.info("BddkApiClient: HTTP client session closed.")
def _extract_document_id(self, url: str) -> Optional[str]:
"""Extract document ID from BDDK URL."""
# Primary pattern: https://www.bddk.org.tr/Mevzuat/DokumanGetir/310
match = re.search(r'/DokumanGetir/(\d+)', url)
if match:
return match.group(1)
# Alternative patterns for different BDDK URL formats
# Pattern: /Liste/55 -> use as document ID
match = re.search(r'/Liste/(\d+)', url)
if match:
return match.group(1)
# Pattern: /EkGetir/13?ekId=381 -> use ekId as document ID
match = re.search(r'ekId=(\d+)', url)
if match:
return match.group(1)
return None
async def search_decisions(
self,
request: BddkSearchRequest
) -> BddkSearchResult:
"""
Search for BDDK decisions using Tavily API.
Args:
request: Search request parameters
Returns:
BddkSearchResult with matching decisions
"""
try:
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.tavily_api_key}"
}
# Tavily API request - enhanced for BDDK decision documents
query = f"{request.keywords} \"Karar Sayısı\""
payload = {
"query": query,
"country": "turkey",
"include_domains": ["https://www.bddk.org.tr/Mevzuat/DokumanGetir"],
"max_results": request.pageSize,
"search_depth": "advanced"
}
# Calculate offset for pagination
if request.page > 1:
# Tavily doesn't have direct pagination, so we'll need to handle this
# For now, we'll just return empty for pages > 1
logger.warning(f"Tavily API doesn't support pagination. Page {request.page} requested.")
response = await self.http_client.post(
self.TAVILY_API_URL,
json=payload,
headers=headers
)
response.raise_for_status()
data = response.json()
# Log raw Tavily response for debugging
logger.info(f"Tavily returned {len(data.get('results', []))} results")
# Convert Tavily results to our format
decisions = []
for result in data.get("results", []):
# Extract document ID from URL
url = result.get("url", "")
logger.debug(f"Processing URL: {url}")
doc_id = self._extract_document_id(url)
if doc_id:
decision = BddkDecisionSummary(
title=result.get("title", "").replace("[PDF] ", "").strip(),
document_id=doc_id,
content=result.get("content", "")[:500] # Limit content length
)
decisions.append(decision)
logger.debug(f"Added decision: {decision.title} (ID: {doc_id})")
else:
logger.warning(f"Could not extract document ID from URL: {url}")
return BddkSearchResult(
decisions=decisions,
total_results=len(data.get("results", [])),
page=request.page,
pageSize=request.pageSize
)
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error searching BDDK decisions: {e}")
if e.response.status_code == 401:
raise Exception("Tavily API authentication failed. Check API key.")
raise Exception(f"Failed to search BDDK decisions: {str(e)}")
except Exception as e:
logger.error(f"Error searching BDDK decisions: {e}")
raise Exception(f"Failed to search BDDK decisions: {str(e)}")
async def get_document_markdown(
self,
document_id: str,
page_number: int = 1
) -> BddkDocumentMarkdown:
"""
Retrieve a BDDK document and convert it to Markdown format.
Args:
document_id: BDDK document ID (e.g., '310')
page_number: Page number for paginated content (1-indexed)
Returns:
BddkDocumentMarkdown with paginated content
"""
try:
# Try different URL patterns for BDDK documents
potential_urls = [
f"https://www.bddk.org.tr/Mevzuat/DokumanGetir/{document_id}",
f"https://www.bddk.org.tr/Mevzuat/Liste/{document_id}",
f"https://www.bddk.org.tr/KurumHakkinda/EkGetir/13?ekId={document_id}",
f"https://www.bddk.org.tr/KurumHakkinda/EkGetir/5?ekId={document_id}"
]
document_url = None
response = None
# Try each URL pattern until one works
for url in potential_urls:
try:
logger.info(f"Trying BDDK document URL: {url}")
response = await self.http_client.get(
url,
follow_redirects=True
)
response.raise_for_status()
document_url = url
break
except httpx.HTTPStatusError:
continue
if not response or not document_url:
raise Exception(f"Could not find document with ID {document_id}")
logger.info(f"Successfully fetched BDDK document from: {document_url}")
# Determine content type
content_type = response.headers.get("content-type", "").lower()
# Convert to Markdown based on content type
if "pdf" in content_type:
# Handle PDF documents
pdf_stream = io.BytesIO(response.content)
result = self.markitdown.convert_stream(pdf_stream, file_extension=".pdf")
markdown_content = result.text_content
else:
# Handle HTML documents
html_stream = io.BytesIO(response.content)
result = self.markitdown.convert_stream(html_stream, file_extension=".html")
markdown_content = result.text_content
# Clean up the markdown content
markdown_content = markdown_content.strip()
# Calculate pagination
total_length = len(markdown_content)
total_pages = math.ceil(total_length / self.DOCUMENT_MARKDOWN_CHUNK_SIZE)
# Extract the requested page
start_idx = (page_number - 1) * self.DOCUMENT_MARKDOWN_CHUNK_SIZE
end_idx = start_idx + self.DOCUMENT_MARKDOWN_CHUNK_SIZE
page_content = markdown_content[start_idx:end_idx]
return BddkDocumentMarkdown(
document_id=document_id,
markdown_content=page_content,
page_number=page_number,
total_pages=total_pages
)
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error fetching BDDK document {document_id}: {e}")
raise Exception(f"Failed to fetch BDDK document: {str(e)}")
except Exception as e:
logger.error(f"Error processing BDDK document {document_id}: {e}")
raise Exception(f"Failed to process BDDK document: {str(e)}")
```
--------------------------------------------------------------------------------
/analyze_kik_hash_generation.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Analyze KİK v2 hash generation by examining JavaScript code patterns
and trying to reverse engineer the hash generation logic.
"""
import asyncio
import json
import hashlib
import hmac
import base64
from fastmcp import Client
from mcp_server_main import app
def analyze_webpack_hash_patterns():
"""
Analyze the webpack JavaScript code you provided to find hash generation patterns
"""
print("🔍 Analyzing webpack hash generation patterns...")
# From the JavaScript code, I can see several hash/ID generation patterns:
hash_patterns = {
# Webpack chunk system hashes (from the JS code)
"webpack_chunks": {
315: "d9a9486a4f5ba326",
531: "cd8fb385c88033ae",
671: "04c48b287646627a",
856: "682c9a7b87351f90",
1017: "9de022378fc275f6",
# ... many more from the __webpack_require__.u function
},
# Symbol generation from Zone.js
"zone_symbols": [
"__zone_symbol__",
"__Zone_symbol_prefix",
"Zone.__symbol__"
],
# Angular module federation patterns
"module_federation": [
"__webpack_modules__",
"__webpack_module_cache__",
"__webpack_require__"
]
}
# The target hash format
target_hash = "42f9bcd59e0dfbca36dec9accf5686c7a92aa97724cd8fc3550beb84b80409da"
print(f"🎯 Target hash: {target_hash}")
print(f" Length: {len(target_hash)} characters")
print(f" Format: {'SHA256' if len(target_hash) == 64 else 'Other'} (64 chars = SHA256)")
return hash_patterns
def test_webpack_style_hashing(data_dict):
"""Test webpack-style hash generation methods"""
hashes = {}
for key, value in data_dict.items():
test_string = str(value)
# Try various webpack-style hash methods
hashes[f"webpack_md5_{key}"] = hashlib.md5(test_string.encode()).hexdigest()
hashes[f"webpack_sha1_{key}"] = hashlib.sha1(test_string.encode()).hexdigest()
hashes[f"webpack_sha256_{key}"] = hashlib.sha256(test_string.encode()).hexdigest()
# Try with various prefixes/suffixes (common in webpack)
prefixed = f"__webpack__{test_string}"
hashes[f"webpack_prefixed_sha256_{key}"] = hashlib.sha256(prefixed.encode()).hexdigest()
# Try with module federation style
module_style = f"shell:{test_string}"
hashes[f"module_fed_sha256_{key}"] = hashlib.sha256(module_style.encode()).hexdigest()
# Try JSON stringified
json_style = json.dumps({"id": value, "type": "decision"}, separators=(',', ':'))
hashes[f"json_sha256_{key}"] = hashlib.sha256(json_style.encode()).hexdigest()
# Try with timestamp or sequence
with_seq = f"{test_string}_0"
hashes[f"seq_sha256_{key}"] = hashlib.sha256(with_seq.encode()).hexdigest()
return hashes
def test_angular_routing_hashes(data_dict):
"""Test Angular routing/state management hash generation"""
hashes = {}
for key, value in data_dict.items():
# Angular often uses route parameters for hash generation
route_style = f"/kurul-kararlari/{value}"
hashes[f"route_sha256_{key}"] = hashlib.sha256(route_style.encode()).hexdigest()
# Component state style
state_style = f"KurulKararGoster_{value}"
hashes[f"state_sha256_{key}"] = hashlib.sha256(state_style.encode()).hexdigest()
# Angular module style
module_style = f"kik.kurul.karar.{value}"
hashes[f"module_sha256_{key}"] = hashlib.sha256(module_style.encode()).hexdigest()
return hashes
def test_base64_encoding_variants(data_dict):
"""Test various base64 and encoding variants"""
hashes = {}
for key, value in data_dict.items():
test_string = str(value)
# Try base64 encoding then hashing
b64_encoded = base64.b64encode(test_string.encode()).decode()
hashes[f"b64_sha256_{key}"] = hashlib.sha256(b64_encoded.encode()).hexdigest()
# Try URL-safe base64
b64_url = base64.urlsafe_b64encode(test_string.encode()).decode()
hashes[f"b64url_sha256_{key}"] = hashlib.sha256(b64_url.encode()).hexdigest()
# Try hex encoding
hex_encoded = test_string.encode().hex()
hashes[f"hex_sha256_{key}"] = hashlib.sha256(hex_encoded.encode()).hexdigest()
return hashes
async def test_hash_generation_comprehensive():
print("🔐 Comprehensive KİK document hash generation analysis...")
print("=" * 70)
# First analyze the webpack patterns
webpack_patterns = analyze_webpack_hash_patterns()
client = Client(app)
async with client:
print("✅ MCP client connected")
# Get sample decisions
print("\n📊 Getting sample decisions for hash analysis...")
search_result = await client.call_tool("search_kik_v2_decisions", {
"decision_type": "uyusmazlik",
"karar_metni": "2024"
})
if hasattr(search_result, 'content') and search_result.content:
search_data = json.loads(search_result.content[0].text)
decisions = search_data.get('decisions', [])
if decisions:
print(f"✅ Found {len(decisions)} decisions")
# Test with first decision
sample_decision = decisions[0]
print(f"\n📋 Sample decision for hash analysis:")
for key, value in sample_decision.items():
print(f" {key}: {value}")
target_hash = "42f9bcd59e0dfbca36dec9accf5686c7a92aa97724cd8fc3550beb84b80409da"
print(f"\n🎯 Target hash to match: {target_hash}")
all_hashes = {}
# Test different hash generation methods
print(f"\n🔨 Testing webpack-style hashing...")
webpack_hashes = test_webpack_style_hashing(sample_decision)
all_hashes.update(webpack_hashes)
print(f"🔨 Testing Angular routing hashes...")
angular_hashes = test_angular_routing_hashes(sample_decision)
all_hashes.update(angular_hashes)
print(f"🔨 Testing base64 encoding variants...")
b64_hashes = test_base64_encoding_variants(sample_decision)
all_hashes.update(b64_hashes)
# Check for matches
print(f"\n🎯 Checking for hash matches...")
matches_found = []
partial_matches = []
for hash_name, hash_value in all_hashes.items():
if hash_value == target_hash:
matches_found.append((hash_name, hash_value))
print(f" 🎉 EXACT MATCH FOUND: {hash_name}")
elif hash_value[:8] == target_hash[:8]: # First 8 chars match
partial_matches.append((hash_name, hash_value))
print(f" 🔍 Partial match (first 8): {hash_name} -> {hash_value[:16]}...")
elif hash_value[-8:] == target_hash[-8:]: # Last 8 chars match
partial_matches.append((hash_name, hash_value))
print(f" 🔍 Partial match (last 8): {hash_name} -> ...{hash_value[-16:]}")
if not matches_found and not partial_matches:
print(f" ❌ No matches found")
print(f"\n📝 Sample generated hashes (first 10):")
for i, (hash_name, hash_value) in enumerate(list(all_hashes.items())[:10]):
print(f" {hash_name}: {hash_value}")
# Try combinations with other decisions
print(f"\n🔄 Testing hash combinations with multiple decisions...")
if len(decisions) > 1:
for i, decision in enumerate(decisions[1:3]): # Test 2 more
print(f"\n Testing decision {i+2}: {decision.get('kararNo')}")
decision_hashes = test_webpack_style_hashing(decision)
for hash_name, hash_value in decision_hashes.items():
if hash_value == target_hash:
print(f" 🎉 MATCH FOUND in decision {i+2}: {hash_name}")
matches_found.append((f"decision_{i+2}_{hash_name}", hash_value))
# Try composite hashes (combining multiple fields)
print(f"\n🔗 Testing composite hash generation...")
composite_tests = [
f"{sample_decision.get('gundemMaddesiId')}_{sample_decision.get('kararNo')}",
f"{sample_decision.get('kararNo')}_{sample_decision.get('kararTarihi')}",
f"uyusmazlik_{sample_decision.get('gundemMaddesiId')}_{sample_decision.get('kararTarihi')}",
json.dumps(sample_decision, separators=(',', ':'), sort_keys=True),
f"{sample_decision.get('basvuran')}_{sample_decision.get('gundemMaddesiId')}",
]
for i, composite_str in enumerate(composite_tests):
composite_hash = hashlib.sha256(composite_str.encode()).hexdigest()
if composite_hash == target_hash:
print(f" 🎉 COMPOSITE MATCH FOUND: test_{i} -> {composite_str[:50]}...")
matches_found.append((f"composite_{i}", composite_hash))
print(f"\n🎯 Hash analysis completed!")
print(f" Total matches found: {len(matches_found)}")
print(f" Partial matches: {len(partial_matches)}")
else:
print("❌ No decisions found")
else:
print("❌ Search failed")
print("=" * 70)
if __name__ == "__main__":
asyncio.run(test_hash_generation_comprehensive())
```
--------------------------------------------------------------------------------
/mcp_auth/oauth.py:
--------------------------------------------------------------------------------
```python
"""
OAuth 2.1 + PKCE implementation for MCP servers with Clerk integration
"""
import base64
import hashlib
import secrets
import time
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Optional
from urllib.parse import urlencode
import httpx
import jwt
from jwt.exceptions import PyJWTError, InvalidTokenError
from .storage import PersistentStorage
# Try to import Clerk SDK
try:
from clerk_backend_api import Clerk
CLERK_AVAILABLE = True
except ImportError:
CLERK_AVAILABLE = False
Clerk = None
logger = logging.getLogger(__name__)
@dataclass
class OAuthConfig:
"""OAuth provider configuration for Clerk"""
client_id: str
client_secret: str
authorization_endpoint: str
token_endpoint: str
jwks_uri: str | None = None
issuer: str = "mcp-auth"
scopes: list[str] = None
def __post_init__(self):
if self.scopes is None:
self.scopes = ["mcp:tools:read", "mcp:tools:write"]
class PKCEChallenge:
"""PKCE challenge/verifier pair for OAuth 2.1"""
def __init__(self):
self.verifier = (
base64.urlsafe_b64encode(secrets.token_bytes(32))
.decode("utf-8")
.rstrip("=")
)
challenge_bytes = hashlib.sha256(self.verifier.encode("utf-8")).digest()
self.challenge = (
base64.urlsafe_b64encode(challenge_bytes).decode("utf-8").rstrip("=")
)
class OAuthProvider:
"""OAuth 2.1 provider with PKCE support and Clerk integration"""
def __init__(self, config: OAuthConfig, jwt_secret: str):
self.config = config
self.jwt_secret = jwt_secret
# Use persistent storage instead of memory
self.storage = PersistentStorage()
# Initialize Clerk SDK if available
self.clerk = None
if CLERK_AVAILABLE and config.client_secret:
try:
self.clerk = Clerk(bearer_auth=config.client_secret)
logger.info("Clerk SDK initialized successfully")
except Exception as e:
logger.warning(f"Failed to initialize Clerk SDK: {e}")
logger.info("OAuth provider initialized with persistent storage")
def generate_authorization_url(
self,
redirect_uri: str,
state: str | None = None,
scopes: list[str] | None = None,
) -> tuple[str, PKCEChallenge]:
"""Generate OAuth authorization URL with PKCE for Clerk"""
pkce = PKCEChallenge()
session_id = secrets.token_urlsafe(32)
if state is None:
state = secrets.token_urlsafe(16)
if scopes is None:
scopes = self.config.scopes
# Store session data with expiration
session_data = {
"pkce_verifier": pkce.verifier,
"state": state,
"redirect_uri": redirect_uri,
"scopes": scopes,
"created_at": time.time(),
"expires_at": (datetime.utcnow() + timedelta(minutes=10)).timestamp(),
}
self.storage.set_session(session_id, session_data)
# Build Clerk OAuth URL
# Check if this is a custom domain (sign-in endpoint)
if self.config.authorization_endpoint.endswith('/sign-in'):
# For custom domains, Clerk expects redirect_url parameter
params = {
"redirect_url": redirect_uri,
"state": f"{state}:{session_id}",
}
auth_url = f"{self.config.authorization_endpoint}?{urlencode(params)}"
else:
# Standard OAuth flow with PKCE
params = {
"response_type": "code",
"client_id": self.config.client_id,
"redirect_uri": redirect_uri,
"scope": " ".join(scopes),
"state": f"{state}:{session_id}", # Combine state with session ID
"code_challenge": pkce.challenge,
"code_challenge_method": "S256",
}
auth_url = f"{self.config.authorization_endpoint}?{urlencode(params)}"
logger.info(f"Generated OAuth URL with session {session_id[:8]}...")
logger.debug(f"Auth URL: {auth_url}")
return auth_url, pkce
async def exchange_code_for_token(
self, code: str, state: str, redirect_uri: str
) -> dict[str, Any]:
"""Exchange authorization code for access token with Clerk"""
try:
original_state, session_id = state.split(":", 1)
except ValueError as e:
logger.error(f"Invalid state format: {state}")
raise ValueError("Invalid state format") from e
session = self.storage.get_session(session_id)
if not session:
logger.error(f"Session {session_id} not found")
raise ValueError("Invalid session")
# Check session expiration
if datetime.utcnow().timestamp() > session.get("expires_at", 0):
self.storage.delete_session(session_id)
logger.error(f"Session {session_id} expired")
raise ValueError("Session expired")
if session["state"] != original_state:
logger.error(f"State mismatch: expected {session['state']}, got {original_state}")
raise ValueError("State mismatch")
if session["redirect_uri"] != redirect_uri:
logger.error(f"Redirect URI mismatch: expected {session['redirect_uri']}, got {redirect_uri}")
raise ValueError("Redirect URI mismatch")
# Prepare token exchange request for Clerk
token_data = {
"grant_type": "authorization_code",
"client_id": self.config.client_id,
"client_secret": self.config.client_secret,
"code": code,
"redirect_uri": redirect_uri,
"code_verifier": session["pkce_verifier"],
}
logger.info(f"Exchanging code with Clerk for session {session_id[:8]}...")
async with httpx.AsyncClient() as client:
response = await client.post(
self.config.token_endpoint,
data=token_data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=30.0,
)
if response.status_code != 200:
logger.error(f"Clerk token exchange failed: {response.status_code} - {response.text}")
raise ValueError(f"Token exchange failed: {response.text}")
token_response = response.json()
logger.info("Successfully exchanged code for Clerk token")
# Create MCP-scoped JWT token
access_token = self._create_mcp_token(
session["scopes"], token_response.get("access_token"), session_id
)
# Store token for introspection
token_id = secrets.token_urlsafe(16)
token_data = {
"access_token": access_token,
"scopes": session["scopes"],
"created_at": time.time(),
"expires_at": (datetime.utcnow() + timedelta(hours=1)).timestamp(),
"session_id": session_id,
"clerk_token": token_response.get("access_token"),
}
self.storage.set_token(token_id, token_data)
# Clean up session
self.storage.delete_session(session_id)
return {
"access_token": access_token,
"token_type": "bearer",
"expires_in": 3600,
"scope": " ".join(session["scopes"]),
}
def validate_pkce(self, code_verifier: str, code_challenge: str) -> bool:
"""Validate PKCE code challenge (RFC 7636)"""
# S256 method
verifier_hash = hashlib.sha256(code_verifier.encode()).digest()
expected_challenge = base64.urlsafe_b64encode(verifier_hash).decode().rstrip('=')
return expected_challenge == code_challenge
def _create_mcp_token(
self, scopes: list[str], upstream_token: str, session_id: str
) -> str:
"""Create MCP-scoped JWT token with Clerk token embedded"""
now = int(time.time())
payload = {
"iss": self.config.issuer,
"sub": session_id,
"aud": "mcp-server",
"iat": now,
"exp": now + 3600, # 1 hour expiration
"mcp_tool_scopes": scopes,
"upstream_token": upstream_token,
"clerk_integration": True,
}
return jwt.encode(payload, self.jwt_secret, algorithm="HS256")
def introspect_token(self, token: str) -> dict[str, Any]:
"""Introspect and validate MCP token"""
try:
payload = jwt.decode(token, self.jwt_secret, algorithms=["HS256"])
# Check if token is expired
if payload.get("exp", 0) < time.time():
return {"active": False, "error": "token_expired"}
return {
"active": True,
"sub": payload.get("sub"),
"aud": payload.get("aud"),
"iss": payload.get("iss"),
"exp": payload.get("exp"),
"iat": payload.get("iat"),
"mcp_tool_scopes": payload.get("mcp_tool_scopes", []),
"upstream_token": payload.get("upstream_token"),
"clerk_integration": payload.get("clerk_integration", False),
}
except PyJWTError as e:
logger.warning(f"Token validation failed: {e}")
return {"active": False, "error": "invalid_token"}
def revoke_token(self, token: str) -> bool:
"""Revoke a token"""
try:
payload = jwt.decode(token, self.jwt_secret, algorithms=["HS256"])
session_id = payload.get("sub")
# Remove all tokens associated with this session
all_tokens = self.storage.get_tokens()
tokens_to_remove = [
token_id
for token_id, token_data in all_tokens.items()
if token_data.get("session_id") == session_id
]
for token_id in tokens_to_remove:
self.storage.delete_token(token_id)
logger.info(f"Revoked {len(tokens_to_remove)} tokens for session {session_id}")
return True
except InvalidTokenError as e:
logger.warning(f"Token revocation failed: {e}")
return False
def cleanup_expired_sessions(self):
"""Clean up expired sessions and tokens"""
# This is now handled automatically by persistent storage
self.storage.cleanup_expired_sessions()
logger.debug("Cleanup completed via persistent storage")
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/mcp_auth/oauth.py:
--------------------------------------------------------------------------------
```python
"""
OAuth 2.1 + PKCE implementation for MCP servers with Clerk integration
"""
import base64
import hashlib
import secrets
import time
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Optional
from urllib.parse import urlencode
import httpx
import jwt
from jwt.exceptions import PyJWTError, InvalidTokenError
from .storage import PersistentStorage
# Try to import Clerk SDK
try:
from clerk_backend_api import Clerk
CLERK_AVAILABLE = True
except ImportError:
CLERK_AVAILABLE = False
Clerk = None
logger = logging.getLogger(__name__)
@dataclass
class OAuthConfig:
"""OAuth provider configuration for Clerk"""
client_id: str
client_secret: str
authorization_endpoint: str
token_endpoint: str
jwks_uri: str | None = None
issuer: str = "mcp-auth"
scopes: list[str] = None
def __post_init__(self):
if self.scopes is None:
self.scopes = ["mcp:tools:read", "mcp:tools:write"]
class PKCEChallenge:
"""PKCE challenge/verifier pair for OAuth 2.1"""
def __init__(self):
self.verifier = (
base64.urlsafe_b64encode(secrets.token_bytes(32))
.decode("utf-8")
.rstrip("=")
)
challenge_bytes = hashlib.sha256(self.verifier.encode("utf-8")).digest()
self.challenge = (
base64.urlsafe_b64encode(challenge_bytes).decode("utf-8").rstrip("=")
)
class OAuthProvider:
"""OAuth 2.1 provider with PKCE support and Clerk integration"""
def __init__(self, config: OAuthConfig, jwt_secret: str):
self.config = config
self.jwt_secret = jwt_secret
# Use persistent storage instead of memory
self.storage = PersistentStorage()
# Initialize Clerk SDK if available
self.clerk = None
if CLERK_AVAILABLE and config.client_secret:
try:
self.clerk = Clerk(bearer_auth=config.client_secret)
logger.info("Clerk SDK initialized successfully")
except Exception as e:
logger.warning(f"Failed to initialize Clerk SDK: {e}")
logger.info("OAuth provider initialized with persistent storage")
def generate_authorization_url(
self,
redirect_uri: str,
state: str | None = None,
scopes: list[str] | None = None,
) -> tuple[str, PKCEChallenge]:
"""Generate OAuth authorization URL with PKCE for Clerk"""
pkce = PKCEChallenge()
session_id = secrets.token_urlsafe(32)
if state is None:
state = secrets.token_urlsafe(16)
if scopes is None:
scopes = self.config.scopes
# Store session data with expiration
session_data = {
"pkce_verifier": pkce.verifier,
"state": state,
"redirect_uri": redirect_uri,
"scopes": scopes,
"created_at": time.time(),
"expires_at": (datetime.utcnow() + timedelta(minutes=10)).timestamp(),
}
self.storage.set_session(session_id, session_data)
# Build Clerk OAuth URL
# Check if this is a custom domain (sign-in endpoint)
if self.config.authorization_endpoint.endswith('/sign-in'):
# For custom domains, Clerk expects redirect_url parameter
params = {
"redirect_url": redirect_uri,
"state": f"{state}:{session_id}",
}
auth_url = f"{self.config.authorization_endpoint}?{urlencode(params)}"
else:
# Standard OAuth flow with PKCE
params = {
"response_type": "code",
"client_id": self.config.client_id,
"redirect_uri": redirect_uri,
"scope": " ".join(scopes),
"state": f"{state}:{session_id}", # Combine state with session ID
"code_challenge": pkce.challenge,
"code_challenge_method": "S256",
}
auth_url = f"{self.config.authorization_endpoint}?{urlencode(params)}"
logger.info(f"Generated OAuth URL with session {session_id[:8]}...")
logger.debug(f"Auth URL: {auth_url}")
return auth_url, pkce
async def exchange_code_for_token(
self, code: str, state: str, redirect_uri: str
) -> dict[str, Any]:
"""Exchange authorization code for access token with Clerk"""
try:
original_state, session_id = state.split(":", 1)
except ValueError as e:
logger.error(f"Invalid state format: {state}")
raise ValueError("Invalid state format") from e
session = self.storage.get_session(session_id)
if not session:
logger.error(f"Session {session_id} not found")
raise ValueError("Invalid session")
# Check session expiration
if datetime.utcnow().timestamp() > session.get("expires_at", 0):
self.storage.delete_session(session_id)
logger.error(f"Session {session_id} expired")
raise ValueError("Session expired")
if session["state"] != original_state:
logger.error(f"State mismatch: expected {session['state']}, got {original_state}")
raise ValueError("State mismatch")
if session["redirect_uri"] != redirect_uri:
logger.error(f"Redirect URI mismatch: expected {session['redirect_uri']}, got {redirect_uri}")
raise ValueError("Redirect URI mismatch")
# Prepare token exchange request for Clerk
token_data = {
"grant_type": "authorization_code",
"client_id": self.config.client_id,
"client_secret": self.config.client_secret,
"code": code,
"redirect_uri": redirect_uri,
"code_verifier": session["pkce_verifier"],
}
logger.info(f"Exchanging code with Clerk for session {session_id[:8]}...")
async with httpx.AsyncClient() as client:
response = await client.post(
self.config.token_endpoint,
data=token_data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=30.0,
)
if response.status_code != 200:
logger.error(f"Clerk token exchange failed: {response.status_code} - {response.text}")
raise ValueError(f"Token exchange failed: {response.text}")
token_response = response.json()
logger.info("Successfully exchanged code for Clerk token")
# Create MCP-scoped JWT token
access_token = self._create_mcp_token(
session["scopes"], token_response.get("access_token"), session_id
)
# Store token for introspection
token_id = secrets.token_urlsafe(16)
token_data = {
"access_token": access_token,
"scopes": session["scopes"],
"created_at": time.time(),
"expires_at": (datetime.utcnow() + timedelta(hours=1)).timestamp(),
"session_id": session_id,
"clerk_token": token_response.get("access_token"),
}
self.storage.set_token(token_id, token_data)
# Clean up session
self.storage.delete_session(session_id)
return {
"access_token": access_token,
"token_type": "bearer",
"expires_in": 3600,
"scope": " ".join(session["scopes"]),
}
def validate_pkce(self, code_verifier: str, code_challenge: str) -> bool:
"""Validate PKCE code challenge (RFC 7636)"""
# S256 method
verifier_hash = hashlib.sha256(code_verifier.encode()).digest()
expected_challenge = base64.urlsafe_b64encode(verifier_hash).decode().rstrip('=')
return expected_challenge == code_challenge
def _create_mcp_token(
self, scopes: list[str], upstream_token: str, session_id: str
) -> str:
"""Create MCP-scoped JWT token with Clerk token embedded"""
now = int(time.time())
payload = {
"iss": self.config.issuer,
"sub": session_id,
"aud": "mcp-server",
"iat": now,
"exp": now + 3600, # 1 hour expiration
"mcp_tool_scopes": scopes,
"upstream_token": upstream_token,
"clerk_integration": True,
}
return jwt.encode(payload, self.jwt_secret, algorithm="HS256")
def introspect_token(self, token: str) -> dict[str, Any]:
"""Introspect and validate MCP token"""
try:
payload = jwt.decode(token, self.jwt_secret, algorithms=["HS256"])
# Check if token is expired
if payload.get("exp", 0) < time.time():
return {"active": False, "error": "token_expired"}
return {
"active": True,
"sub": payload.get("sub"),
"aud": payload.get("aud"),
"iss": payload.get("iss"),
"exp": payload.get("exp"),
"iat": payload.get("iat"),
"mcp_tool_scopes": payload.get("mcp_tool_scopes", []),
"upstream_token": payload.get("upstream_token"),
"clerk_integration": payload.get("clerk_integration", False),
}
except PyJWTError as e:
logger.warning(f"Token validation failed: {e}")
return {"active": False, "error": "invalid_token"}
def revoke_token(self, token: str) -> bool:
"""Revoke a token"""
try:
payload = jwt.decode(token, self.jwt_secret, algorithms=["HS256"])
session_id = payload.get("sub")
# Remove all tokens associated with this session
all_tokens = self.storage.get_tokens()
tokens_to_remove = [
token_id
for token_id, token_data in all_tokens.items()
if token_data.get("session_id") == session_id
]
for token_id in tokens_to_remove:
self.storage.delete_token(token_id)
logger.info(f"Revoked {len(tokens_to_remove)} tokens for session {session_id}")
return True
except InvalidTokenError as e:
logger.warning(f"Token revocation failed: {e}")
return False
def cleanup_expired_sessions(self):
"""Clean up expired sessions and tokens"""
# This is now handled automatically by persistent storage
self.storage.cleanup_expired_sessions()
logger.debug("Cleanup completed via persistent storage")
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/sayistay_mcp_module/models.py:
--------------------------------------------------------------------------------
```python
# sayistay_mcp_module/models.py
from pydantic import BaseModel, Field
from typing import Optional, List, Union, Dict, Any, Literal
from enum import Enum
from .enums import DaireEnum, KamuIdaresiTuruEnum, WebKararKonusuEnum
# --- Unified Enums ---
class SayistayDecisionTypeEnum(str, Enum):
GENEL_KURUL = "genel_kurul"
TEMYIZ_KURULU = "temyiz_kurulu"
DAIRE = "daire"
# ============================================================================
# Genel Kurul (General Assembly) Models
# ============================================================================
class GenelKurulSearchRequest(BaseModel):
"""
Search request for Sayıştay Genel Kurul (General Assembly) decisions.
Genel Kurul decisions are precedent-setting rulings made by the full assembly
of the Turkish Court of Accounts, typically addressing interpretation of
audit and accountability regulations.
"""
karar_no: str = Field("", description="Decision no")
karar_ek: str = Field("", description="Appendix no")
karar_tarih_baslangic: str = Field("", description="Start year (YYYY)")
karar_tarih_bitis: str = Field("", description="End year")
karar_tamami: str = Field("", description="Value")
# DataTables pagination
start: int = Field(0, description="Starting record for pagination (0-based)")
length: int = Field(10, description="Number of records per page (1-10)")
class GenelKurulDecision(BaseModel):
"""Single Genel Kurul decision entry from search results."""
id: int = Field(..., description="Unique decision ID")
karar_no: str = Field(..., description="Decision number (e.g., '5415/1')")
karar_tarih: str = Field(..., description="Decision date in DD.MM.YYYY format")
karar_ozeti: str = Field(..., description="Decision summary/abstract")
class GenelKurulSearchResponse(BaseModel):
"""Response from Genel Kurul search endpoint."""
decisions: List[GenelKurulDecision] = Field(default_factory=list, description="List of matching decisions")
total_records: int = Field(0, description="Total number of matching records")
total_filtered: int = Field(0, description="Number of records after filtering")
draw: int = Field(1, description="DataTables draw counter")
# ============================================================================
# Temyiz Kurulu (Appeals Board) Models
# ============================================================================
class TemyizKuruluSearchRequest(BaseModel):
"""
Search request for Sayıştay Temyiz Kurulu (Appeals Board) decisions.
Temyiz Kurulu reviews appeals against audit chamber decisions,
providing higher-level review of audit findings and sanctions.
"""
ilam_dairesi: DaireEnum = Field("ALL", description="Value")
yili: str = Field("", description="Value")
karar_tarih_baslangic: str = Field("", description="Value")
karar_tarih_bitis: str = Field("", description="End year")
kamu_idaresi_turu: KamuIdaresiTuruEnum = Field("ALL", description="Value")
ilam_no: str = Field("", description="Audit report number (İlam No, max 50 chars)")
dosya_no: str = Field("", description="File number for the case")
temyiz_tutanak_no: str = Field("", description="Appeals board meeting minutes number")
temyiz_karar: str = Field("", description="Value")
web_karar_konusu: WebKararKonusuEnum = Field("ALL", description="Value")
# DataTables pagination
start: int = Field(0, description="Starting record for pagination (0-based)")
length: int = Field(10, description="Number of records per page (1-10)")
class TemyizKuruluDecision(BaseModel):
"""Single Temyiz Kurulu decision entry from search results."""
id: int = Field(..., description="Unique decision ID")
temyiz_tutanak_tarihi: str = Field(..., description="Appeals board meeting date in DD.MM.YYYY format")
ilam_dairesi: int = Field(..., description="Chamber number (1-8)")
temyiz_karar: str = Field(..., description="Appeals decision summary and reasoning")
class TemyizKuruluSearchResponse(BaseModel):
"""Response from Temyiz Kurulu search endpoint."""
decisions: List[TemyizKuruluDecision] = Field(default_factory=list, description="List of matching appeals decisions")
total_records: int = Field(0, description="Total number of matching records")
total_filtered: int = Field(0, description="Number of records after filtering")
draw: int = Field(1, description="DataTables draw counter")
# ============================================================================
# Daire (Chamber) Models
# ============================================================================
class DaireSearchRequest(BaseModel):
"""
Search request for Sayıştay Daire (Chamber) decisions.
Daire decisions are first-instance audit findings and sanctions
issued by individual audit chambers before potential appeals.
"""
yargilama_dairesi: DaireEnum = Field("ALL", description="Value")
karar_tarih_baslangic: str = Field("", description="Value")
karar_tarih_bitis: str = Field("", description="End year")
ilam_no: str = Field("", description="Audit report number (İlam No, max 50 chars)")
kamu_idaresi_turu: KamuIdaresiTuruEnum = Field("ALL", description="Value")
hesap_yili: str = Field("", description="Value")
web_karar_konusu: WebKararKonusuEnum = Field("ALL", description="Value")
web_karar_metni: str = Field("", description="Value")
# DataTables pagination
start: int = Field(0, description="Starting record for pagination (0-based)")
length: int = Field(10, description="Number of records per page (1-10)")
class DaireDecision(BaseModel):
"""Single Daire decision entry from search results."""
id: int = Field(..., description="Unique decision ID")
yargilama_dairesi: int = Field(..., description="Chamber number (1-8)")
karar_tarih: str = Field(..., description="Decision date in DD.MM.YYYY format")
karar_no: str = Field(..., description="Decision number")
ilam_no: str = Field("", description="Audit report number (may be null)")
madde_no: int = Field(..., description="Article/item number within the decision")
kamu_idaresi_turu: str = Field(..., description="Public administration type")
hesap_yili: int = Field(..., description="Account year being audited")
web_karar_konusu: str = Field(..., description="Decision subject category")
web_karar_metni: str = Field(..., description="Decision text/summary")
class DaireSearchResponse(BaseModel):
"""Response from Daire search endpoint."""
decisions: List[DaireDecision] = Field(default_factory=list, description="List of matching chamber decisions")
total_records: int = Field(0, description="Total number of matching records")
total_filtered: int = Field(0, description="Number of records after filtering")
draw: int = Field(1, description="DataTables draw counter")
# ============================================================================
# Document Models
# ============================================================================
class SayistayDocumentMarkdown(BaseModel):
"""
Sayıştay decision document converted to Markdown format.
Used for retrieving full text of decisions from any of the three
decision types (Genel Kurul, Temyiz Kurulu, Daire).
"""
decision_id: str = Field(..., description="Unique decision identifier")
decision_type: str = Field(..., description="Value")
source_url: str = Field(..., description="Original URL where the document was retrieved")
markdown_content: Optional[str] = Field(None, description="Full decision text converted to Markdown format")
retrieval_date: Optional[str] = Field(None, description="Date when document was retrieved (ISO format)")
error_message: Optional[str] = Field(None, description="Error message if document retrieval failed")
# ============================================================================
# Unified Models
# ============================================================================
class SayistayUnifiedSearchRequest(BaseModel):
"""Unified search request for all Sayıştay decision types."""
decision_type: Literal["genel_kurul", "temyiz_kurulu", "daire"] = Field(..., description="Decision type: genel_kurul, temyiz_kurulu, or daire")
# Common pagination parameters
start: int = Field(0, ge=0, description="Starting record for pagination (0-based)")
length: int = Field(10, ge=1, le=100, description="Number of records per page (1-100)")
# Common search parameters
karar_tarih_baslangic: str = Field("", description="Start date (DD.MM.YYYY format)")
karar_tarih_bitis: str = Field("", description="End date (DD.MM.YYYY format)")
kamu_idaresi_turu: KamuIdaresiTuruEnum = Field("ALL", description="Public administration type filter")
ilam_no: str = Field("", description="Audit report number (İlam No, max 50 chars)")
web_karar_konusu: WebKararKonusuEnum = Field("ALL", description="Decision subject category filter")
# Genel Kurul specific parameters (ignored for other types)
karar_no: str = Field("", description="Decision number (genel_kurul only)")
karar_ek: str = Field("", description="Decision appendix number (genel_kurul only)")
karar_tamami: str = Field("", description="Full text search (genel_kurul only)")
# Temyiz Kurulu specific parameters (ignored for other types)
ilam_dairesi: DaireEnum = Field("ALL", description="Audit chamber selection (temyiz_kurulu only)")
yili: str = Field("", description="Year (YYYY format, temyiz_kurulu only)")
dosya_no: str = Field("", description="File number (temyiz_kurulu only)")
temyiz_tutanak_no: str = Field("", description="Appeals board meeting minutes number (temyiz_kurulu only)")
temyiz_karar: str = Field("", description="Appeals decision text search (temyiz_kurulu only)")
# Daire specific parameters (ignored for other types)
yargilama_dairesi: DaireEnum = Field("ALL", description="Chamber selection (daire only)")
hesap_yili: str = Field("", description="Account year (daire only)")
web_karar_metni: str = Field("", description="Decision text search (daire only)")
class SayistayUnifiedSearchResult(BaseModel):
"""Unified search result containing decisions from any Sayıştay decision type."""
decision_type: Literal["genel_kurul", "temyiz_kurulu", "daire"] = Field(..., description="Type of decisions returned")
decisions: List[Dict[str, Any]] = Field(default_factory=list, description="Decision list (structure varies by type)")
total_records: int = Field(0, description="Total number of records found")
total_filtered: int = Field(0, description="Number of records after filtering")
draw: int = Field(1, description="DataTables draw counter")
class SayistayUnifiedDocumentMarkdown(BaseModel):
"""Unified document model for all Sayıştay decision types."""
decision_type: Literal["genel_kurul", "temyiz_kurulu", "daire"] = Field(..., description="Type of document")
decision_id: str = Field(..., description="Decision ID")
source_url: str = Field(..., description="Source URL of the document")
document_data: Dict[str, Any] = Field(default_factory=dict, description="Document content and metadata")
markdown_content: Optional[str] = Field(None, description="Markdown content")
error_message: Optional[str] = Field(None, description="Error message if retrieval failed")
```
--------------------------------------------------------------------------------
/sayistay_mcp_module/models.py:
--------------------------------------------------------------------------------
```python
# sayistay_mcp_module/models.py
from pydantic import BaseModel, Field
from typing import Optional, List, Union, Dict, Any, Literal
from enum import Enum
from .enums import DaireEnum, KamuIdaresiTuruEnum, WebKararKonusuEnum
# --- Unified Enums ---
class SayistayDecisionTypeEnum(str, Enum):
GENEL_KURUL = "genel_kurul"
TEMYIZ_KURULU = "temyiz_kurulu"
DAIRE = "daire"
# ============================================================================
# Genel Kurul (General Assembly) Models
# ============================================================================
class GenelKurulSearchRequest(BaseModel):
"""
Search request for Sayıştay Genel Kurul (General Assembly) decisions.
Genel Kurul decisions are precedent-setting rulings made by the full assembly
of the Turkish Court of Accounts, typically addressing interpretation of
audit and accountability regulations.
"""
karar_no: str = Field("", description="Decision no")
karar_ek: str = Field("", description="Appendix no")
karar_tarih_baslangic: str = Field("", description="Start year (YYYY)")
karar_tarih_bitis: str = Field("", description="End year")
karar_tamami: str = Field("", description="Value")
# DataTables pagination
start: int = Field(0, description="Starting record for pagination (0-based)")
length: int = Field(10, description="Number of records per page (1-10)")
class GenelKurulDecision(BaseModel):
"""Single Genel Kurul decision entry from search results."""
id: int = Field(..., description="Unique decision ID")
karar_no: str = Field(..., description="Decision number (e.g., '5415/1')")
karar_tarih: str = Field(..., description="Decision date in DD.MM.YYYY format")
karar_ozeti: str = Field(..., description="Decision summary/abstract")
class GenelKurulSearchResponse(BaseModel):
"""Response from Genel Kurul search endpoint."""
decisions: List[GenelKurulDecision] = Field(default_factory=list, description="List of matching decisions")
total_records: int = Field(0, description="Total number of matching records")
total_filtered: int = Field(0, description="Number of records after filtering")
draw: int = Field(1, description="DataTables draw counter")
# ============================================================================
# Temyiz Kurulu (Appeals Board) Models
# ============================================================================
class TemyizKuruluSearchRequest(BaseModel):
"""
Search request for Sayıştay Temyiz Kurulu (Appeals Board) decisions.
Temyiz Kurulu reviews appeals against audit chamber decisions,
providing higher-level review of audit findings and sanctions.
"""
ilam_dairesi: DaireEnum = Field("ALL", description="Value")
yili: str = Field("", description="Value")
karar_tarih_baslangic: str = Field("", description="Value")
karar_tarih_bitis: str = Field("", description="End year")
kamu_idaresi_turu: KamuIdaresiTuruEnum = Field("ALL", description="Value")
ilam_no: str = Field("", description="Audit report number (İlam No, max 50 chars)")
dosya_no: str = Field("", description="File number for the case")
temyiz_tutanak_no: str = Field("", description="Appeals board meeting minutes number")
temyiz_karar: str = Field("", description="Value")
web_karar_konusu: WebKararKonusuEnum = Field("ALL", description="Value")
# DataTables pagination
start: int = Field(0, description="Starting record for pagination (0-based)")
length: int = Field(10, description="Number of records per page (1-10)")
class TemyizKuruluDecision(BaseModel):
"""Single Temyiz Kurulu decision entry from search results."""
id: int = Field(..., description="Unique decision ID")
temyiz_tutanak_tarihi: str = Field(..., description="Appeals board meeting date in DD.MM.YYYY format")
ilam_dairesi: int = Field(..., description="Chamber number (1-8)")
temyiz_karar: str = Field(..., description="Appeals decision summary and reasoning")
class TemyizKuruluSearchResponse(BaseModel):
"""Response from Temyiz Kurulu search endpoint."""
decisions: List[TemyizKuruluDecision] = Field(default_factory=list, description="List of matching appeals decisions")
total_records: int = Field(0, description="Total number of matching records")
total_filtered: int = Field(0, description="Number of records after filtering")
draw: int = Field(1, description="DataTables draw counter")
# ============================================================================
# Daire (Chamber) Models
# ============================================================================
class DaireSearchRequest(BaseModel):
"""
Search request for Sayıştay Daire (Chamber) decisions.
Daire decisions are first-instance audit findings and sanctions
issued by individual audit chambers before potential appeals.
"""
yargilama_dairesi: DaireEnum = Field("ALL", description="Value")
karar_tarih_baslangic: str = Field("", description="Value")
karar_tarih_bitis: str = Field("", description="End year")
ilam_no: str = Field("", description="Audit report number (İlam No, max 50 chars)")
kamu_idaresi_turu: KamuIdaresiTuruEnum = Field("ALL", description="Value")
hesap_yili: str = Field("", description="Value")
web_karar_konusu: WebKararKonusuEnum = Field("ALL", description="Value")
web_karar_metni: str = Field("", description="Value")
# DataTables pagination
start: int = Field(0, description="Starting record for pagination (0-based)")
length: int = Field(10, description="Number of records per page (1-10)")
class DaireDecision(BaseModel):
"""Single Daire decision entry from search results."""
id: int = Field(..., description="Unique decision ID")
yargilama_dairesi: int = Field(..., description="Chamber number (1-8)")
karar_tarih: str = Field(..., description="Decision date in DD.MM.YYYY format")
karar_no: str = Field(..., description="Decision number")
ilam_no: str = Field("", description="Audit report number (may be null)")
madde_no: int = Field(..., description="Article/item number within the decision")
kamu_idaresi_turu: str = Field(..., description="Public administration type")
hesap_yili: int = Field(..., description="Account year being audited")
web_karar_konusu: str = Field(..., description="Decision subject category")
web_karar_metni: str = Field(..., description="Decision text/summary")
class DaireSearchResponse(BaseModel):
"""Response from Daire search endpoint."""
decisions: List[DaireDecision] = Field(default_factory=list, description="List of matching chamber decisions")
total_records: int = Field(0, description="Total number of matching records")
total_filtered: int = Field(0, description="Number of records after filtering")
draw: int = Field(1, description="DataTables draw counter")
# ============================================================================
# Document Models
# ============================================================================
class SayistayDocumentMarkdown(BaseModel):
"""
Sayıştay decision document converted to Markdown format.
Used for retrieving full text of decisions from any of the three
decision types (Genel Kurul, Temyiz Kurulu, Daire).
"""
decision_id: str = Field(..., description="Unique decision identifier")
decision_type: str = Field(..., description="Value")
source_url: str = Field(..., description="Original URL where the document was retrieved")
markdown_content: Optional[str] = Field(None, description="Full decision text converted to Markdown format")
retrieval_date: Optional[str] = Field(None, description="Date when document was retrieved (ISO format)")
error_message: Optional[str] = Field(None, description="Error message if document retrieval failed")
# ============================================================================
# Unified Models
# ============================================================================
class SayistayUnifiedSearchRequest(BaseModel):
"""Unified search request for all Sayıştay decision types."""
decision_type: Literal["genel_kurul", "temyiz_kurulu", "daire"] = Field(..., description="Decision type: genel_kurul, temyiz_kurulu, or daire")
# Common pagination parameters
start: int = Field(0, ge=0, description="Starting record for pagination (0-based)")
length: int = Field(10, ge=1, le=100, description="Number of records per page (1-100)")
# Common search parameters
karar_tarih_baslangic: str = Field("", description="Start date (DD.MM.YYYY format)")
karar_tarih_bitis: str = Field("", description="End date (DD.MM.YYYY format)")
kamu_idaresi_turu: KamuIdaresiTuruEnum = Field("ALL", description="Public administration type filter")
ilam_no: str = Field("", description="Audit report number (İlam No, max 50 chars)")
web_karar_konusu: WebKararKonusuEnum = Field("ALL", description="Decision subject category filter")
# Genel Kurul specific parameters (ignored for other types)
karar_no: str = Field("", description="Decision number (genel_kurul only)")
karar_ek: str = Field("", description="Decision appendix number (genel_kurul only)")
karar_tamami: str = Field("", description="Full text search (genel_kurul only)")
# Temyiz Kurulu specific parameters (ignored for other types)
ilam_dairesi: DaireEnum = Field("ALL", description="Audit chamber selection (temyiz_kurulu only)")
yili: str = Field("", description="Year (YYYY format, temyiz_kurulu only)")
dosya_no: str = Field("", description="File number (temyiz_kurulu only)")
temyiz_tutanak_no: str = Field("", description="Appeals board meeting minutes number (temyiz_kurulu only)")
temyiz_karar: str = Field("", description="Appeals decision text search (temyiz_kurulu only)")
# Daire specific parameters (ignored for other types)
yargilama_dairesi: DaireEnum = Field("ALL", description="Chamber selection (daire only)")
hesap_yili: str = Field("", description="Account year (daire only)")
web_karar_metni: str = Field("", description="Decision text search (daire only)")
class SayistayUnifiedSearchResult(BaseModel):
"""Unified search result containing decisions from any Sayıştay decision type."""
decision_type: Literal["genel_kurul", "temyiz_kurulu", "daire"] = Field(..., description="Type of decisions returned")
decisions: List[Dict[str, Any]] = Field(default_factory=list, description="Decision list (structure varies by type)")
total_records: int = Field(0, description="Total number of records found")
total_filtered: int = Field(0, description="Number of records after filtering")
draw: int = Field(1, description="DataTables draw counter")
class SayistayUnifiedDocumentMarkdown(BaseModel):
"""Unified document model for all Sayıştay decision types."""
decision_type: Literal["genel_kurul", "temyiz_kurulu", "daire"] = Field(..., description="Type of document")
decision_id: str = Field(..., description="Decision ID")
source_url: str = Field(..., description="Source URL of the document")
document_data: Dict[str, Any] = Field(default_factory=dict, description="Document content and metadata")
markdown_content: Optional[str] = Field(None, description="Markdown content")
error_message: Optional[str] = Field(None, description="Error message if retrieval failed")
```
--------------------------------------------------------------------------------
/mcp_auth/middleware.py:
--------------------------------------------------------------------------------
```python
"""
MCP server middleware for OAuth authentication and authorization
"""
import functools
import logging
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, Optional
logger = logging.getLogger(__name__)
try:
from fastmcp import FastMCP
FASTMCP_AVAILABLE = True
except ImportError:
FASTMCP_AVAILABLE = False
FastMCP = None
logger.warning("FastMCP not available, some features will be disabled")
from .oauth import OAuthProvider
from .policy import PolicyEngine
@dataclass
class AuthContext:
"""Authentication context passed to MCP tools"""
user_id: str
scopes: list[str]
claims: dict[str, Any]
token: str
class MCPAuthMiddleware:
"""Authentication middleware for MCP servers"""
def __init__(self, oauth_provider: OAuthProvider, policy_engine: PolicyEngine):
self.oauth_provider = oauth_provider
self.policy_engine = policy_engine
def authenticate_request(self, authorization_header: str) -> AuthContext | None:
"""Extract and validate auth token from request"""
if not authorization_header:
logger.debug("No authorization header provided")
return None
if not authorization_header.startswith("Bearer "):
logger.debug("Authorization header does not start with 'Bearer '")
return None
token = authorization_header[7:] # Remove 'Bearer ' prefix
token_info = self.oauth_provider.introspect_token(token)
if not token_info.get("active"):
logger.warning("Token is not active")
return None
logger.debug(f"Authenticated user: {token_info.get('sub', 'unknown')}")
return AuthContext(
user_id=token_info.get("sub", "unknown"),
scopes=token_info.get("mcp_tool_scopes", []),
claims=token_info,
token=token,
)
def authorize_tool_call(
self, tool_name: str, auth_context: AuthContext
) -> tuple[bool, str | None]:
"""Check if user can call the specified tool"""
return self.policy_engine.authorize_tool_call(
tool_name=tool_name,
user_scopes=auth_context.scopes,
user_claims=auth_context.claims,
)
def auth_required(
oauth_provider: OAuthProvider,
policy_engine: PolicyEngine,
tool_name: str | None = None,
):
"""
Decorator to require authentication for MCP tool functions
Usage:
@auth_required(oauth_provider, policy_engine, "search_yargitay")
def my_tool_function(context: AuthContext, ...):
pass
"""
def decorator(func: Callable) -> Callable:
middleware = MCPAuthMiddleware(oauth_provider, policy_engine)
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Extract authorization header from kwargs
auth_header = kwargs.pop("authorization", None)
# Also check in args if it's a Request object
if not auth_header and args:
for arg in args:
if hasattr(arg, 'headers'):
auth_header = arg.headers.get("Authorization")
break
if not auth_header:
logger.warning(f"No authorization header for tool '{tool_name or func.__name__}'")
raise PermissionError("Authorization header required")
auth_context = middleware.authenticate_request(auth_header)
if not auth_context:
logger.warning(f"Authentication failed for tool '{tool_name or func.__name__}'")
raise PermissionError("Invalid or expired token")
actual_tool_name = tool_name or func.__name__
authorized, reason = middleware.authorize_tool_call(
actual_tool_name, auth_context
)
if not authorized:
logger.warning(f"Authorization failed for tool '{actual_tool_name}': {reason}")
raise PermissionError(f"Access denied: {reason}")
# Add auth context to function call
return await func(auth_context, *args, **kwargs)
return wrapper
return decorator
class FastMCPAuthWrapper:
"""Wrapper for FastMCP servers to add authentication"""
def __init__(
self,
mcp_server: "FastMCP",
oauth_provider: OAuthProvider,
policy_engine: PolicyEngine,
):
if not FASTMCP_AVAILABLE:
raise ImportError("FastMCP is required for FastMCPAuthWrapper")
self.mcp_server = mcp_server
self.middleware = MCPAuthMiddleware(oauth_provider, policy_engine)
self.oauth_provider = oauth_provider
logger.info("Initializing FastMCP authentication wrapper")
self._wrap_tools()
def _wrap_tools(self):
"""Wrap all existing tools with auth middleware"""
# Try different FastMCP tool storage locations
tool_registry = None
if hasattr(self.mcp_server, '_tools'):
tool_registry = self.mcp_server._tools
elif hasattr(self.mcp_server, 'tools'):
tool_registry = self.mcp_server.tools
elif hasattr(self.mcp_server, '_tool_registry'):
tool_registry = self.mcp_server._tool_registry
elif hasattr(self.mcp_server, '_handlers') and hasattr(self.mcp_server._handlers, 'tools'):
tool_registry = self.mcp_server._handlers.tools
if not tool_registry:
logger.warning("FastMCP server tool registry not found, tools will not be automatically wrapped")
logger.debug(f"Available server attributes: {dir(self.mcp_server)}")
return
logger.debug(f"Found tool registry with {len(tool_registry)} tools")
original_tools = dict(tool_registry)
wrapped_count = 0
for tool_name, tool_func in original_tools.items():
try:
wrapped_func = self._create_auth_wrapper(tool_name, tool_func)
tool_registry[tool_name] = wrapped_func
wrapped_count += 1
logger.debug(f"Wrapped tool: {tool_name}")
except Exception as e:
logger.error(f"Failed to wrap tool {tool_name}: {e}")
logger.info(f"Successfully wrapped {wrapped_count} tools with authentication")
def _create_auth_wrapper(self, tool_name: str, original_func: Callable) -> Callable:
"""Create auth wrapper for a specific tool"""
@functools.wraps(original_func)
async def auth_wrapper(*args, **kwargs):
# Extract authorization from various sources
auth_header = None
# Check kwargs first
auth_header = kwargs.pop("authorization", None)
# Check if first argument is a Request object
if not auth_header and args:
first_arg = args[0]
if hasattr(first_arg, 'headers'):
auth_header = first_arg.headers.get("Authorization")
if not auth_header:
logger.warning(f"No authorization header for tool '{tool_name}'")
raise PermissionError("Authorization required")
auth_context = self.middleware.authenticate_request(auth_header)
if not auth_context:
logger.warning(f"Authentication failed for tool '{tool_name}'")
raise PermissionError("Invalid token")
authorized, reason = self.middleware.authorize_tool_call(
tool_name, auth_context
)
if not authorized:
logger.warning(f"Authorization failed for tool '{tool_name}': {reason}")
raise PermissionError(f"Access denied: {reason}")
# Add auth context to kwargs
kwargs["auth_context"] = auth_context
logger.debug(f"Calling tool '{tool_name}' for user {auth_context.user_id}")
return await original_func(*args, **kwargs)
return auth_wrapper
def add_oauth_endpoints(self):
"""Add OAuth endpoints to the MCP server"""
@self.mcp_server.tool(
description="Initiate OAuth 2.1 authorization flow with PKCE",
annotations={"readOnlyHint": True, "idempotentHint": False}
)
async def oauth_authorize(redirect_uri: str, scopes: Optional[str] = None):
"""OAuth authorization endpoint"""
scope_list = scopes.split(" ") if scopes else None
auth_url, pkce = self.oauth_provider.generate_authorization_url(
redirect_uri=redirect_uri, scopes=scope_list
)
logger.info(f"Generated authorization URL for redirect_uri: {redirect_uri}")
return {
"authorization_url": auth_url,
"code_verifier": pkce.verifier, # For PKCE flow
"code_challenge": pkce.challenge,
"instructions": "Use the authorization_url to complete OAuth flow, then exchange the returned code using oauth_token tool"
}
@self.mcp_server.tool(
description="Exchange OAuth authorization code for access token",
annotations={"readOnlyHint": False, "idempotentHint": False}
)
async def oauth_token(
code: str,
state: str,
redirect_uri: str
):
"""OAuth token exchange endpoint"""
try:
result = await self.oauth_provider.exchange_code_for_token(
code=code, state=state, redirect_uri=redirect_uri
)
logger.info("Successfully exchanged authorization code for token")
return result
except Exception as e:
logger.error(f"Token exchange failed: {e}")
raise
@self.mcp_server.tool(
description="Validate and introspect OAuth access token",
annotations={"readOnlyHint": True, "idempotentHint": True}
)
async def oauth_introspect(token: str):
"""Token introspection endpoint"""
result = self.oauth_provider.introspect_token(token)
logger.debug(f"Token introspection: active={result.get('active', False)}")
return result
@self.mcp_server.tool(
description="Revoke OAuth access token",
annotations={"readOnlyHint": False, "idempotentHint": False}
)
async def oauth_revoke(token: str):
"""Token revocation endpoint"""
success = self.oauth_provider.revoke_token(token)
logger.info(f"Token revocation: success={success}")
return {"revoked": success}
@self.mcp_server.tool(
description="Get list of tools available to authenticated user",
annotations={"readOnlyHint": True, "idempotentHint": True}
)
async def oauth_user_tools(authorization: str):
"""Get user's allowed tools based on scopes"""
auth_context = self.middleware.authenticate_request(authorization)
if not auth_context:
raise PermissionError("Invalid token")
allowed_patterns = self.middleware.policy_engine.get_allowed_tools(auth_context.scopes)
return {
"user_id": auth_context.user_id,
"scopes": auth_context.scopes,
"allowed_tool_patterns": allowed_patterns,
"message": "Use these patterns to determine which tools you can access"
}
logger.info("Added OAuth endpoints: oauth_authorize, oauth_token, oauth_introspect, oauth_revoke, oauth_user_tools")
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/mcp_auth/middleware.py:
--------------------------------------------------------------------------------
```python
"""
MCP server middleware for OAuth authentication and authorization
"""
import functools
import logging
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, Optional
logger = logging.getLogger(__name__)
try:
from fastmcp import FastMCP
FASTMCP_AVAILABLE = True
except ImportError:
FASTMCP_AVAILABLE = False
FastMCP = None
logger.warning("FastMCP not available, some features will be disabled")
from .oauth import OAuthProvider
from .policy import PolicyEngine
@dataclass
class AuthContext:
"""Authentication context passed to MCP tools"""
user_id: str
scopes: list[str]
claims: dict[str, Any]
token: str
class MCPAuthMiddleware:
"""Authentication middleware for MCP servers"""
def __init__(self, oauth_provider: OAuthProvider, policy_engine: PolicyEngine):
self.oauth_provider = oauth_provider
self.policy_engine = policy_engine
def authenticate_request(self, authorization_header: str) -> AuthContext | None:
"""Extract and validate auth token from request"""
if not authorization_header:
logger.debug("No authorization header provided")
return None
if not authorization_header.startswith("Bearer "):
logger.debug("Authorization header does not start with 'Bearer '")
return None
token = authorization_header[7:] # Remove 'Bearer ' prefix
token_info = self.oauth_provider.introspect_token(token)
if not token_info.get("active"):
logger.warning("Token is not active")
return None
logger.debug(f"Authenticated user: {token_info.get('sub', 'unknown')}")
return AuthContext(
user_id=token_info.get("sub", "unknown"),
scopes=token_info.get("mcp_tool_scopes", []),
claims=token_info,
token=token,
)
def authorize_tool_call(
self, tool_name: str, auth_context: AuthContext
) -> tuple[bool, str | None]:
"""Check if user can call the specified tool"""
return self.policy_engine.authorize_tool_call(
tool_name=tool_name,
user_scopes=auth_context.scopes,
user_claims=auth_context.claims,
)
def auth_required(
oauth_provider: OAuthProvider,
policy_engine: PolicyEngine,
tool_name: str | None = None,
):
"""
Decorator to require authentication for MCP tool functions
Usage:
@auth_required(oauth_provider, policy_engine, "search_yargitay")
def my_tool_function(context: AuthContext, ...):
pass
"""
def decorator(func: Callable) -> Callable:
middleware = MCPAuthMiddleware(oauth_provider, policy_engine)
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Extract authorization header from kwargs
auth_header = kwargs.pop("authorization", None)
# Also check in args if it's a Request object
if not auth_header and args:
for arg in args:
if hasattr(arg, 'headers'):
auth_header = arg.headers.get("Authorization")
break
if not auth_header:
logger.warning(f"No authorization header for tool '{tool_name or func.__name__}'")
raise PermissionError("Authorization header required")
auth_context = middleware.authenticate_request(auth_header)
if not auth_context:
logger.warning(f"Authentication failed for tool '{tool_name or func.__name__}'")
raise PermissionError("Invalid or expired token")
actual_tool_name = tool_name or func.__name__
authorized, reason = middleware.authorize_tool_call(
actual_tool_name, auth_context
)
if not authorized:
logger.warning(f"Authorization failed for tool '{actual_tool_name}': {reason}")
raise PermissionError(f"Access denied: {reason}")
# Add auth context to function call
return await func(auth_context, *args, **kwargs)
return wrapper
return decorator
class FastMCPAuthWrapper:
"""Wrapper for FastMCP servers to add authentication"""
def __init__(
self,
mcp_server: "FastMCP",
oauth_provider: OAuthProvider,
policy_engine: PolicyEngine,
):
if not FASTMCP_AVAILABLE:
raise ImportError("FastMCP is required for FastMCPAuthWrapper")
self.mcp_server = mcp_server
self.middleware = MCPAuthMiddleware(oauth_provider, policy_engine)
self.oauth_provider = oauth_provider
logger.info("Initializing FastMCP authentication wrapper")
self._wrap_tools()
def _wrap_tools(self):
"""Wrap all existing tools with auth middleware"""
# Try different FastMCP tool storage locations
tool_registry = None
if hasattr(self.mcp_server, '_tools'):
tool_registry = self.mcp_server._tools
elif hasattr(self.mcp_server, 'tools'):
tool_registry = self.mcp_server.tools
elif hasattr(self.mcp_server, '_tool_registry'):
tool_registry = self.mcp_server._tool_registry
elif hasattr(self.mcp_server, '_handlers') and hasattr(self.mcp_server._handlers, 'tools'):
tool_registry = self.mcp_server._handlers.tools
if not tool_registry:
logger.warning("FastMCP server tool registry not found, tools will not be automatically wrapped")
logger.debug(f"Available server attributes: {dir(self.mcp_server)}")
return
logger.debug(f"Found tool registry with {len(tool_registry)} tools")
original_tools = dict(tool_registry)
wrapped_count = 0
for tool_name, tool_func in original_tools.items():
try:
wrapped_func = self._create_auth_wrapper(tool_name, tool_func)
tool_registry[tool_name] = wrapped_func
wrapped_count += 1
logger.debug(f"Wrapped tool: {tool_name}")
except Exception as e:
logger.error(f"Failed to wrap tool {tool_name}: {e}")
logger.info(f"Successfully wrapped {wrapped_count} tools with authentication")
def _create_auth_wrapper(self, tool_name: str, original_func: Callable) -> Callable:
"""Create auth wrapper for a specific tool"""
@functools.wraps(original_func)
async def auth_wrapper(*args, **kwargs):
# Extract authorization from various sources
auth_header = None
# Check kwargs first
auth_header = kwargs.pop("authorization", None)
# Check if first argument is a Request object
if not auth_header and args:
first_arg = args[0]
if hasattr(first_arg, 'headers'):
auth_header = first_arg.headers.get("Authorization")
if not auth_header:
logger.warning(f"No authorization header for tool '{tool_name}'")
raise PermissionError("Authorization required")
auth_context = self.middleware.authenticate_request(auth_header)
if not auth_context:
logger.warning(f"Authentication failed for tool '{tool_name}'")
raise PermissionError("Invalid token")
authorized, reason = self.middleware.authorize_tool_call(
tool_name, auth_context
)
if not authorized:
logger.warning(f"Authorization failed for tool '{tool_name}': {reason}")
raise PermissionError(f"Access denied: {reason}")
# Add auth context to kwargs
kwargs["auth_context"] = auth_context
logger.debug(f"Calling tool '{tool_name}' for user {auth_context.user_id}")
return await original_func(*args, **kwargs)
return auth_wrapper
def add_oauth_endpoints(self):
"""Add OAuth endpoints to the MCP server"""
@self.mcp_server.tool(
description="Initiate OAuth 2.1 authorization flow with PKCE",
annotations={"readOnlyHint": True, "idempotentHint": False}
)
async def oauth_authorize(redirect_uri: str, scopes: Optional[str] = None):
"""OAuth authorization endpoint"""
scope_list = scopes.split(" ") if scopes else None
auth_url, pkce = self.oauth_provider.generate_authorization_url(
redirect_uri=redirect_uri, scopes=scope_list
)
logger.info(f"Generated authorization URL for redirect_uri: {redirect_uri}")
return {
"authorization_url": auth_url,
"code_verifier": pkce.verifier, # For PKCE flow
"code_challenge": pkce.challenge,
"instructions": "Use the authorization_url to complete OAuth flow, then exchange the returned code using oauth_token tool"
}
@self.mcp_server.tool(
description="Exchange OAuth authorization code for access token",
annotations={"readOnlyHint": False, "idempotentHint": False}
)
async def oauth_token(
code: str,
state: str,
redirect_uri: str
):
"""OAuth token exchange endpoint"""
try:
result = await self.oauth_provider.exchange_code_for_token(
code=code, state=state, redirect_uri=redirect_uri
)
logger.info("Successfully exchanged authorization code for token")
return result
except Exception as e:
logger.error(f"Token exchange failed: {e}")
raise
@self.mcp_server.tool(
description="Validate and introspect OAuth access token",
annotations={"readOnlyHint": True, "idempotentHint": True}
)
async def oauth_introspect(token: str):
"""Token introspection endpoint"""
result = self.oauth_provider.introspect_token(token)
logger.debug(f"Token introspection: active={result.get('active', False)}")
return result
@self.mcp_server.tool(
description="Revoke OAuth access token",
annotations={"readOnlyHint": False, "idempotentHint": False}
)
async def oauth_revoke(token: str):
"""Token revocation endpoint"""
success = self.oauth_provider.revoke_token(token)
logger.info(f"Token revocation: success={success}")
return {"revoked": success}
@self.mcp_server.tool(
description="Get list of tools available to authenticated user",
annotations={"readOnlyHint": True, "idempotentHint": True}
)
async def oauth_user_tools(authorization: str):
"""Get user's allowed tools based on scopes"""
auth_context = self.middleware.authenticate_request(authorization)
if not auth_context:
raise PermissionError("Invalid token")
allowed_patterns = self.middleware.policy_engine.get_allowed_tools(auth_context.scopes)
return {
"user_id": auth_context.user_id,
"scopes": auth_context.scopes,
"allowed_tool_patterns": allowed_patterns,
"message": "Use these patterns to determine which tools you can access"
}
logger.info("Added OAuth endpoints: oauth_authorize, oauth_token, oauth_introspect, oauth_revoke, oauth_user_tools")
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/uyusmazlik_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# uyusmazlik_mcp_module/client.py
import httpx
import aiohttp
from bs4 import BeautifulSoup
from typing import Dict, Any, List, Optional, Union, Tuple
import logging
import html
import re
import io
from markitdown import MarkItDown
from urllib.parse import urljoin, urlencode # urlencode for aiohttp form data
from .models import (
UyusmazlikSearchRequest,
UyusmazlikApiDecisionEntry,
UyusmazlikSearchResponse,
UyusmazlikDocumentMarkdown,
UyusmazlikBolumEnum,
UyusmazlikTuruEnum,
UyusmazlikKararSonucuEnum
)
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# --- Mappings from user-friendly Enum values to API IDs ---
BOLUM_ENUM_TO_ID_MAP = {
UyusmazlikBolumEnum.CEZA_BOLUMU: "f6b74320-f2d7-4209-ad6e-c6df180d4e7c",
UyusmazlikBolumEnum.GENEL_KURUL_KARARLARI: "e4ca658d-a75a-4719-b866-b2d2f1c3b1d9",
UyusmazlikBolumEnum.HUKUK_BOLUMU: "96b26fc4-ef8e-4a4f-a9cc-a3de89952aa1",
UyusmazlikBolumEnum.TUMU: "", # Represents "...Seçiniz..." or all - empty string for API
"ALL": "" # Also map the new "ALL" literal to empty string for backward compatibility
}
UYUSMAZLIK_TURU_ENUM_TO_ID_MAP = {
UyusmazlikTuruEnum.GOREV_UYUSMAZLIGI: "7b1e2cd3-8f09-418a-921c-bbe501e1740c",
UyusmazlikTuruEnum.HUKUM_UYUSMAZLIGI: "19b88402-172b-4c1d-8339-595c942a89f5",
UyusmazlikTuruEnum.TUMU: "", # Represents "...Seçiniz..." or all - empty string for API
"ALL": "" # Also map the new "ALL" literal to empty string for backward compatibility
}
KARAR_SONUCU_ENUM_TO_ID_MAP = {
# These IDs are from the form HTML provided by the user
UyusmazlikKararSonucuEnum.HUKUM_UYUSMAZLIGI_OLMADIGINA_DAIR: "6f47d87f-dcb5-412e-9878-000385dba1d9",
UyusmazlikKararSonucuEnum.HUKUM_UYUSMAZLIGI_OLDUGUNA_DAIR: "5a01742a-c440-4c4a-ba1f-da20837cffed",
# Add all other 'Karar Sonucu' enum members and their corresponding GUIDs
# by inspecting the 'KararSonucuList' checkboxes in the provided form HTML.
}
# --- End Mappings ---
class UyusmazlikApiClient:
BASE_URL = "https://kararlar.uyusmazlik.gov.tr"
SEARCH_ENDPOINT = "/Arama/Search"
# Individual documents are fetched by their full URLs obtained from search results.
def __init__(self, request_timeout: float = 30.0):
self.request_timeout = request_timeout # Store timeout for aiohttp and httpx
# Headers for aiohttp search. httpx for docs will create its own.
self.default_aiohttp_search_headers = {
"Accept": "*/*", # Mimicking browser headers provided by user
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7",
"X-Requested-With": "XMLHttpRequest",
"Origin": self.BASE_URL,
"Referer": self.BASE_URL + "/",
}
async def search_decisions(
self,
params: UyusmazlikSearchRequest
) -> UyusmazlikSearchResponse:
bolum_id_for_api = BOLUM_ENUM_TO_ID_MAP.get(params.bolum, "")
uyusmazlik_id_for_api = UYUSMAZLIK_TURU_ENUM_TO_ID_MAP.get(params.uyusmazlik_turu, "")
form_data_list: List[Tuple[str, str]] = []
def add_to_form_data(key: str, value: Optional[str]):
# API expects empty strings for omitted optional fields based on user payload example
form_data_list.append((key, value or ""))
add_to_form_data("BolumId", bolum_id_for_api)
add_to_form_data("UyusmazlikId", uyusmazlik_id_for_api)
if params.karar_sonuclari:
for enum_member in params.karar_sonuclari:
api_id = KARAR_SONUCU_ENUM_TO_ID_MAP.get(enum_member)
if api_id: # Only add if a valid ID is found
form_data_list.append(('KararSonucuList', api_id))
add_to_form_data("EsasYil", params.esas_yil)
add_to_form_data("EsasSayisi", params.esas_sayisi)
add_to_form_data("KararYil", params.karar_yil)
add_to_form_data("KararSayisi", params.karar_sayisi)
add_to_form_data("KanunNo", params.kanun_no)
add_to_form_data("KararDateBegin", params.karar_date_begin)
add_to_form_data("KararDateEnd", params.karar_date_end)
add_to_form_data("ResmiGazeteSayi", params.resmi_gazete_sayi)
add_to_form_data("ResmiGazeteDate", params.resmi_gazete_date)
add_to_form_data("Icerik", params.icerik)
add_to_form_data("Tumce", params.tumce)
add_to_form_data("WildCard", params.wild_card)
add_to_form_data("Hepsi", params.hepsi)
add_to_form_data("Herhangibirisi", params.herhangi_birisi)
add_to_form_data("NotHepsi", params.not_hepsi)
# X-Requested-With is handled by default_aiohttp_search_headers
search_url = urljoin(self.BASE_URL, self.SEARCH_ENDPOINT)
# For aiohttp, data for application/x-www-form-urlencoded should be a dict or str.
# Using urlencode for list of tuples.
encoded_form_payload = urlencode(form_data_list, encoding='UTF-8')
logger.info(f"UyusmazlikApiClient (aiohttp): Performing search to {search_url} with form_data: {encoded_form_payload}")
html_content = ""
aiohttp_headers = self.default_aiohttp_search_headers.copy()
aiohttp_headers["Content-Type"] = "application/x-www-form-urlencoded; charset=UTF-8"
try:
# Create a new session for each call for simplicity with aiohttp here
async with aiohttp.ClientSession(headers=aiohttp_headers) as session:
async with session.post(search_url, data=encoded_form_payload, timeout=self.request_timeout) as response:
response.raise_for_status() # Raises ClientResponseError for 400-599
html_content = await response.text(encoding='utf-8') # Ensure correct encoding
logger.debug("UyusmazlikApiClient (aiohttp): Received HTML response for search.")
except aiohttp.ClientError as e:
logger.error(f"UyusmazlikApiClient (aiohttp): HTTP client error during search: {e}")
raise # Re-raise to be handled by the MCP tool
except Exception as e:
logger.error(f"UyusmazlikApiClient (aiohttp): Error processing search request: {e}")
raise
# --- HTML Parsing (remains the same as previous version) ---
soup = BeautifulSoup(html_content, 'html.parser')
total_records_text_div = soup.find("div", class_="pull-right label label-important")
total_records = None
if total_records_text_div:
match_records = re.search(r'(\d+)\s*adet kayıt bulundu', total_records_text_div.get_text(strip=True))
if match_records:
total_records = int(match_records.group(1))
result_table = soup.find("table", class_="table-hover")
processed_decisions: List[UyusmazlikApiDecisionEntry] = []
if result_table:
rows = result_table.find_all("tr")
if len(rows) > 1: # Skip header row
for row in rows[1:]:
cols = row.find_all('td')
if len(cols) >= 5:
try:
popover_div = cols[0].find("div", attrs={"data-rel": "popover"})
popover_content_raw = popover_div["data-content"] if popover_div and popover_div.has_attr("data-content") else None
link_tag = cols[0].find('a')
doc_relative_url = link_tag['href'] if link_tag and link_tag.has_attr('href') else None
if not doc_relative_url: continue
document_url_str = urljoin(self.BASE_URL, doc_relative_url)
pdf_link_tag = cols[5].find('a', href=re.compile(r'\.pdf$', re.IGNORECASE)) if len(cols) > 5 else None
pdf_url_str = urljoin(self.BASE_URL, pdf_link_tag['href']) if pdf_link_tag and pdf_link_tag.has_attr('href') else None
decision_data_parsed = {
"karar_sayisi": cols[0].get_text(strip=True),
"esas_sayisi": cols[1].get_text(strip=True),
"bolum": cols[2].get_text(strip=True),
"uyusmazlik_konusu": cols[3].get_text(strip=True),
"karar_sonucu": cols[4].get_text(strip=True),
"popover_content": html.unescape(popover_content_raw) if popover_content_raw else None,
"document_url": document_url_str,
"pdf_url": pdf_url_str
}
decision_model = UyusmazlikApiDecisionEntry(**decision_data_parsed)
processed_decisions.append(decision_model)
except Exception as e:
logger.warning(f"UyusmazlikApiClient: Could not parse decision row. Row content: {row.get_text(strip=True, separator=' | ')}, Error: {e}")
return UyusmazlikSearchResponse(
decisions=processed_decisions,
total_records_found=total_records
)
def _convert_html_to_markdown_uyusmazlik(self, full_decision_html_content: str) -> Optional[str]:
"""Converts direct HTML content (from an Uyuşmazlık decision page) to Markdown."""
if not full_decision_html_content:
return None
processed_html = html.unescape(full_decision_html_content)
# As per user request, pass the full (unescaped) HTML to MarkItDown
html_input_for_markdown = processed_html
markdown_text = None
try:
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_input_for_markdown.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
conversion_result = md_converter.convert(html_stream)
markdown_text = conversion_result.text_content
logger.info("UyusmazlikApiClient: HTML to Markdown conversion successful.")
except Exception as e:
logger.error(f"UyusmazlikApiClient: Error during MarkItDown HTML to Markdown conversion: {e}")
return markdown_text
async def get_decision_document_as_markdown(self, document_url: str) -> UyusmazlikDocumentMarkdown:
"""
Retrieves a specific Uyuşmazlık decision from its full URL and returns content as Markdown.
"""
logger.info(f"UyusmazlikApiClient (httpx for docs): Fetching Uyuşmazlık document for Markdown from URL: {document_url}")
try:
# Using a new httpx.AsyncClient instance for this GET request for simplicity
async with httpx.AsyncClient(verify=False, timeout=self.request_timeout) as doc_fetch_client:
get_response = await doc_fetch_client.get(document_url, headers={"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"})
get_response.raise_for_status()
html_content_from_api = get_response.text
if not isinstance(html_content_from_api, str) or not html_content_from_api.strip():
logger.warning(f"UyusmazlikApiClient: Received empty or non-string HTML from URL {document_url}.")
return UyusmazlikDocumentMarkdown(source_url=document_url, markdown_content=None)
markdown_content = self._convert_html_to_markdown_uyusmazlik(html_content_from_api)
return UyusmazlikDocumentMarkdown(source_url=document_url, markdown_content=markdown_content)
except httpx.RequestError as e:
logger.error(f"UyusmazlikApiClient (httpx for docs): HTTP error fetching Uyuşmazlık document from {document_url}: {e}")
raise
except Exception as e:
logger.error(f"UyusmazlikApiClient (httpx for docs): General error processing Uyuşmazlık document from {document_url}: {e}")
raise
async def close_client_session(self):
logger.info("UyusmazlikApiClient: No persistent client session from __init__ to close.")
```
--------------------------------------------------------------------------------
/uyusmazlik_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# uyusmazlik_mcp_module/client.py
import httpx
from bs4 import BeautifulSoup
from typing import Dict, Any, List, Optional, Union, Tuple
import logging
import html
import re
import io
from markitdown import MarkItDown
from urllib.parse import urljoin
from .models import (
UyusmazlikSearchRequest,
UyusmazlikApiDecisionEntry,
UyusmazlikSearchResponse,
UyusmazlikDocumentMarkdown,
UyusmazlikBolumEnum,
UyusmazlikTuruEnum,
UyusmazlikKararSonucuEnum
)
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# --- Mappings from user-friendly Enum values to API IDs ---
BOLUM_ENUM_TO_ID_MAP = {
UyusmazlikBolumEnum.CEZA_BOLUMU: "f6b74320-f2d7-4209-ad6e-c6df180d4e7c",
UyusmazlikBolumEnum.GENEL_KURUL_KARARLARI: "e4ca658d-a75a-4719-b866-b2d2f1c3b1d9",
UyusmazlikBolumEnum.HUKUK_BOLUMU: "96b26fc4-ef8e-4a4f-a9cc-a3de89952aa1",
UyusmazlikBolumEnum.TUMU: "", # Represents "...Seçiniz..." or all - empty string for API
"ALL": "" # Also map the new "ALL" literal to empty string for backward compatibility
}
UYUSMAZLIK_TURU_ENUM_TO_ID_MAP = {
UyusmazlikTuruEnum.GOREV_UYUSMAZLIGI: "7b1e2cd3-8f09-418a-921c-bbe501e1740c",
UyusmazlikTuruEnum.HUKUM_UYUSMAZLIGI: "19b88402-172b-4c1d-8339-595c942a89f5",
UyusmazlikTuruEnum.TUMU: "", # Represents "...Seçiniz..." or all - empty string for API
"ALL": "" # Also map the new "ALL" literal to empty string for backward compatibility
}
KARAR_SONUCU_ENUM_TO_ID_MAP = {
# These IDs are from the form HTML provided by the user
UyusmazlikKararSonucuEnum.HUKUM_UYUSMAZLIGI_OLMADIGINA_DAIR: "6f47d87f-dcb5-412e-9878-000385dba1d9",
UyusmazlikKararSonucuEnum.HUKUM_UYUSMAZLIGI_OLDUGUNA_DAIR: "5a01742a-c440-4c4a-ba1f-da20837cffed",
# Add all other 'Karar Sonucu' enum members and their corresponding GUIDs
# by inspecting the 'KararSonucuList' checkboxes in the provided form HTML.
}
# --- End Mappings ---
class UyusmazlikApiClient:
BASE_URL = "https://kararlar.uyusmazlik.gov.tr"
SEARCH_ENDPOINT = "/Arama/Search"
# Individual documents are fetched by their full URLs obtained from search results.
def __init__(self, request_timeout: float = 30.0):
self.request_timeout = request_timeout
# Create shared httpx client for all requests
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7",
"X-Requested-With": "XMLHttpRequest",
"Origin": self.BASE_URL,
"Referer": self.BASE_URL + "/",
},
timeout=request_timeout,
verify=False
)
async def search_decisions(
self,
params: UyusmazlikSearchRequest
) -> UyusmazlikSearchResponse:
bolum_id_for_api = BOLUM_ENUM_TO_ID_MAP.get(params.bolum, "")
uyusmazlik_id_for_api = UYUSMAZLIK_TURU_ENUM_TO_ID_MAP.get(params.uyusmazlik_turu, "")
form_data_list: List[Tuple[str, str]] = []
def add_to_form_data(key: str, value: Optional[str]):
# API expects empty strings for omitted optional fields based on user payload example
form_data_list.append((key, value or ""))
add_to_form_data("BolumId", bolum_id_for_api)
add_to_form_data("UyusmazlikId", uyusmazlik_id_for_api)
if params.karar_sonuclari:
for enum_member in params.karar_sonuclari:
api_id = KARAR_SONUCU_ENUM_TO_ID_MAP.get(enum_member)
if api_id: # Only add if a valid ID is found
form_data_list.append(('KararSonucuList', api_id))
add_to_form_data("EsasYil", params.esas_yil)
add_to_form_data("EsasSayisi", params.esas_sayisi)
add_to_form_data("KararYil", params.karar_yil)
add_to_form_data("KararSayisi", params.karar_sayisi)
add_to_form_data("KanunNo", params.kanun_no)
add_to_form_data("KararDateBegin", params.karar_date_begin)
add_to_form_data("KararDateEnd", params.karar_date_end)
add_to_form_data("ResmiGazeteSayi", params.resmi_gazete_sayi)
add_to_form_data("ResmiGazeteDate", params.resmi_gazete_date)
add_to_form_data("Icerik", params.icerik)
add_to_form_data("Tumce", params.tumce)
add_to_form_data("WildCard", params.wild_card)
add_to_form_data("Hepsi", params.hepsi)
add_to_form_data("Herhangibirisi", params.herhangi_birisi)
add_to_form_data("NotHepsi", params.not_hepsi)
# Convert form data to dict for httpx
form_data_dict = {}
for key, value in form_data_list:
if key in form_data_dict:
# Handle multiple values (like KararSonucuList)
if not isinstance(form_data_dict[key], list):
form_data_dict[key] = [form_data_dict[key]]
form_data_dict[key].append(value)
else:
form_data_dict[key] = value
logger.info(f"UyusmazlikApiClient (httpx): Performing search to {self.SEARCH_ENDPOINT} with form_data: {form_data_dict}")
try:
# Use shared httpx client
response = await self.http_client.post(
self.SEARCH_ENDPOINT,
data=form_data_dict,
headers={"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"}
)
response.raise_for_status()
html_content = response.text
logger.debug("UyusmazlikApiClient (httpx): Received HTML response for search.")
except httpx.HTTPError as e:
logger.error(f"UyusmazlikApiClient (httpx): HTTP client error during search: {e}")
raise # Re-raise to be handled by the MCP tool
except Exception as e:
logger.error(f"UyusmazlikApiClient (httpx): Error processing search request: {e}")
raise
# --- HTML Parsing (remains the same as previous version) ---
soup = BeautifulSoup(html_content, 'html.parser')
total_records_text_div = soup.find("div", class_="pull-right label label-important")
total_records = None
if total_records_text_div:
match_records = re.search(r'(\d+)\s*adet kayıt bulundu', total_records_text_div.get_text(strip=True))
if match_records:
total_records = int(match_records.group(1))
result_table = soup.find("table", class_="table-hover")
processed_decisions: List[UyusmazlikApiDecisionEntry] = []
if result_table:
rows = result_table.find_all("tr")
if len(rows) > 1: # Skip header row
for row in rows[1:]:
cols = row.find_all('td')
if len(cols) >= 5:
try:
popover_div = cols[0].find("div", attrs={"data-rel": "popover"})
popover_content_raw = popover_div["data-content"] if popover_div and popover_div.has_attr("data-content") else None
link_tag = cols[0].find('a')
doc_relative_url = link_tag['href'] if link_tag and link_tag.has_attr('href') else None
if not doc_relative_url: continue
document_url_str = urljoin(self.BASE_URL, doc_relative_url)
pdf_link_tag = cols[5].find('a', href=re.compile(r'\.pdf$', re.IGNORECASE)) if len(cols) > 5 else None
pdf_url_str = urljoin(self.BASE_URL, pdf_link_tag['href']) if pdf_link_tag and pdf_link_tag.has_attr('href') else None
decision_data_parsed = {
"karar_sayisi": cols[0].get_text(strip=True),
"esas_sayisi": cols[1].get_text(strip=True),
"bolum": cols[2].get_text(strip=True),
"uyusmazlik_konusu": cols[3].get_text(strip=True),
"karar_sonucu": cols[4].get_text(strip=True),
"popover_content": html.unescape(popover_content_raw) if popover_content_raw else None,
"document_url": document_url_str,
"pdf_url": pdf_url_str
}
decision_model = UyusmazlikApiDecisionEntry(**decision_data_parsed)
processed_decisions.append(decision_model)
except Exception as e:
logger.warning(f"UyusmazlikApiClient: Could not parse decision row. Row content: {row.get_text(strip=True, separator=' | ')}, Error: {e}")
return UyusmazlikSearchResponse(
decisions=processed_decisions,
total_records_found=total_records
)
def _convert_html_to_markdown_uyusmazlik(self, full_decision_html_content: str) -> Optional[str]:
"""Converts direct HTML content (from an Uyuşmazlık decision page) to Markdown."""
if not full_decision_html_content:
return None
processed_html = html.unescape(full_decision_html_content)
# As per user request, pass the full (unescaped) HTML to MarkItDown
html_input_for_markdown = processed_html
markdown_text = None
try:
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_input_for_markdown.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
conversion_result = md_converter.convert(html_stream)
markdown_text = conversion_result.text_content
logger.info("UyusmazlikApiClient: HTML to Markdown conversion successful.")
except Exception as e:
logger.error(f"UyusmazlikApiClient: Error during MarkItDown HTML to Markdown conversion: {e}")
return markdown_text
async def get_decision_document_as_markdown(self, document_url: str) -> UyusmazlikDocumentMarkdown:
"""
Retrieves a specific Uyuşmazlık decision from its full URL and returns content as Markdown.
"""
logger.info(f"UyusmazlikApiClient (httpx for docs): Fetching Uyuşmazlık document for Markdown from URL: {document_url}")
try:
# Using a new httpx.AsyncClient instance for this GET request for simplicity
async with httpx.AsyncClient(verify=False, timeout=self.request_timeout) as doc_fetch_client:
get_response = await doc_fetch_client.get(document_url, headers={"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"})
get_response.raise_for_status()
html_content_from_api = get_response.text
if not isinstance(html_content_from_api, str) or not html_content_from_api.strip():
logger.warning(f"UyusmazlikApiClient: Received empty or non-string HTML from URL {document_url}.")
return UyusmazlikDocumentMarkdown(source_url=document_url, markdown_content=None)
markdown_content = self._convert_html_to_markdown_uyusmazlik(html_content_from_api)
return UyusmazlikDocumentMarkdown(source_url=document_url, markdown_content=markdown_content)
except httpx.RequestError as e:
logger.error(f"UyusmazlikApiClient (httpx for docs): HTTP error fetching Uyuşmazlık document from {document_url}: {e}")
raise
except Exception as e:
logger.error(f"UyusmazlikApiClient (httpx for docs): General error processing Uyuşmazlık document from {document_url}: {e}")
raise
async def close_client_session(self):
"""Close the shared httpx client session."""
if hasattr(self, 'http_client') and self.http_client:
await self.http_client.aclose()
logger.info("UyusmazlikApiClient: HTTP client session closed.")
else:
logger.info("UyusmazlikApiClient: No persistent client session from __init__ to close.")
```