This is page 5 of 8. Use http://codebase.md/saidsurucu/yargi-mcp?page={x} to view the full context.
# Directory Structure
```
├── __main__.py
├── .dockerignore
├── .env.example
├── .gitattributes
├── .github
│ └── workflows
│ └── publish.yml
├── .gitignore
├── .serena
│ ├── .gitignore
│ └── project.yml
├── 5ire-settings.png
├── analyze_kik_hash_generation.py
├── anayasa_mcp_module
│ ├── __init__.py
│ ├── bireysel_client.py
│ ├── client.py
│ ├── models.py
│ └── unified_client.py
├── asgi_app.py
├── bddk_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── bedesten_mcp_module
│ ├── __init__.py
│ ├── client.py
│ ├── enums.py
│ └── models.py
├── check_response_format.py
├── CLAUDE.md
├── danistay_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── docker-compose.yml
├── Dockerfile
├── docs
│ └── DEPLOYMENT.md
├── emsal_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── example_fastapi_app.py
├── fly-no-auth.toml
├── fly.toml
├── kik_mcp_module
│ ├── __init__.py
│ ├── client_v2.py
│ ├── client.py
│ ├── models_v2.py
│ └── models.py
├── kvkk_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── LICENSE
├── mcp_auth
│ ├── __init__.py
│ ├── clerk_config.py
│ ├── middleware.py
│ ├── oauth.py
│ ├── policy.py
│ └── storage.py
├── mcp_auth_factory.py
├── mcp_auth_http_adapter.py
├── mcp_auth_http_simple.py
├── mcp_server_main.py
├── nginx.conf
├── ornek.png
├── Procfile
├── pyproject.toml
├── railway.json
├── README.md
├── redis_session_store.py
├── rekabet_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── requirements.txt
├── run_asgi.py
├── saidsurucu-yargi-mcp-f5fa007
│ ├── __main__.py
│ ├── .dockerignore
│ ├── .env.example
│ ├── .gitattributes
│ ├── .github
│ │ └── workflows
│ │ └── publish.yml
│ ├── .gitignore
│ ├── 5ire-settings.png
│ ├── anayasa_mcp_module
│ │ ├── __init__.py
│ │ ├── bireysel_client.py
│ │ ├── client.py
│ │ ├── models.py
│ │ └── unified_client.py
│ ├── asgi_app.py
│ ├── bddk_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── bedesten_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── enums.py
│ │ └── models.py
│ ├── check_response_format.py
│ ├── danistay_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── docker-compose.yml
│ ├── Dockerfile
│ ├── docs
│ │ └── DEPLOYMENT.md
│ ├── emsal_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── example_fastapi_app.py
│ ├── kik_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── kvkk_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── LICENSE
│ ├── mcp_auth
│ │ ├── __init__.py
│ │ ├── clerk_config.py
│ │ ├── middleware.py
│ │ ├── oauth.py
│ │ ├── policy.py
│ │ └── storage.py
│ ├── mcp_auth_factory.py
│ ├── mcp_auth_http_adapter.py
│ ├── mcp_auth_http_simple.py
│ ├── mcp_server_main.py
│ ├── nginx.conf
│ ├── ornek.png
│ ├── Procfile
│ ├── pyproject.toml
│ ├── railway.json
│ ├── README.md
│ ├── redis_session_store.py
│ ├── rekabet_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ ├── run_asgi.py
│ ├── sayistay_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── enums.py
│ │ ├── models.py
│ │ └── unified_client.py
│ ├── starlette_app.py
│ ├── stripe_webhook.py
│ ├── uyusmazlik_mcp_module
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── models.py
│ └── yargitay_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
├── sayistay_mcp_module
│ ├── __init__.py
│ ├── client.py
│ ├── enums.py
│ ├── models.py
│ └── unified_client.py
├── starlette_app.py
├── stripe_webhook.py
├── uv.lock
├── uyusmazlik_mcp_module
│ ├── __init__.py
│ ├── client.py
│ └── models.py
└── yargitay_mcp_module
├── __init__.py
├── client.py
└── models.py
```
# Files
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/anayasa_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# anayasa_mcp_module/client.py
# This client is for Norm Denetimi: https://normkararlarbilgibankasi.anayasa.gov.tr
import httpx
from bs4 import BeautifulSoup
from typing import Dict, Any, List, Optional, Tuple
import logging
import html
import re
import io
from urllib.parse import urlencode, urljoin, quote
from markitdown import MarkItDown
import math # For math.ceil for pagination
from .models import (
AnayasaNormDenetimiSearchRequest,
AnayasaDecisionSummary,
AnayasaReviewedNormInfo,
AnayasaSearchResult,
AnayasaDocumentMarkdown, # Model for Norm Denetimi document
)
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
class AnayasaMahkemesiApiClient:
BASE_URL = "https://normkararlarbilgibankasi.anayasa.gov.tr"
SEARCH_PATH_SEGMENT = "Ara"
DOCUMENT_MARKDOWN_CHUNK_SIZE = 5000 # Character limit per page
def __init__(self, request_timeout: float = 60.0):
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
},
timeout=request_timeout,
verify=True,
follow_redirects=True
)
def _build_search_query_params_for_aym(self, params: AnayasaNormDenetimiSearchRequest) -> List[Tuple[str, str]]:
query_params: List[Tuple[str, str]] = []
if params.keywords_all:
for kw in params.keywords_all: query_params.append(("KelimeAra[]", kw))
if params.keywords_any:
for kw in params.keywords_any: query_params.append(("HerhangiBirKelimeAra[]", kw))
if params.keywords_exclude:
for kw in params.keywords_exclude: query_params.append(("BulunmayanKelimeAra[]", kw))
if params.period and params.period and params.period != "ALL": query_params.append(("Donemler_id", params.period))
if params.case_number_esas: query_params.append(("EsasNo", params.case_number_esas))
if params.decision_number_karar: query_params.append(("KararNo", params.decision_number_karar))
if params.first_review_date_start: query_params.append(("IlkIncelemeTarihiIlk", params.first_review_date_start))
if params.first_review_date_end: query_params.append(("IlkIncelemeTarihiSon", params.first_review_date_end))
if params.decision_date_start: query_params.append(("KararTarihiIlk", params.decision_date_start))
if params.decision_date_end: query_params.append(("KararTarihiSon", params.decision_date_end))
if params.application_type and params.application_type and params.application_type != "ALL": query_params.append(("BasvuruTurler_id", params.application_type))
if params.applicant_general_name: query_params.append(("BasvuranGeneller_id", params.applicant_general_name))
if params.applicant_specific_name: query_params.append(("BasvuranOzeller_id", params.applicant_specific_name))
if params.attending_members_names:
for name in params.attending_members_names: query_params.append(("Uyeler_id[]", name))
if params.rapporteur_name: query_params.append(("Raportorler_id", params.rapporteur_name))
if params.norm_type and params.norm_type and params.norm_type != "ALL": query_params.append(("NormunTurler_id", params.norm_type))
if params.norm_id_or_name: query_params.append(("NormunNumarasiAdlar_id", params.norm_id_or_name))
if params.norm_article: query_params.append(("NormunMaddeNumarasi", params.norm_article))
if params.review_outcomes:
for outcome_val in params.review_outcomes:
if outcome_val and outcome_val != "ALL": query_params.append(("IncelemeTuruKararSonuclar_id[]", outcome_val))
if params.reason_for_final_outcome and params.reason_for_final_outcome and params.reason_for_final_outcome != "ALL":
query_params.append(("KararSonucununGerekcesi", params.reason_for_final_outcome))
if params.basis_constitution_article_numbers:
for article_no in params.basis_constitution_article_numbers: query_params.append(("DayanakHukmu[]", article_no))
if params.official_gazette_date_start: query_params.append(("ResmiGazeteTarihiIlk", params.official_gazette_date_start))
if params.official_gazette_date_end: query_params.append(("ResmiGazeteTarihiSon", params.official_gazette_date_end))
if params.official_gazette_number_start: query_params.append(("ResmiGazeteSayisiIlk", params.official_gazette_number_start))
if params.official_gazette_number_end: query_params.append(("ResmiGazeteSayisiSon", params.official_gazette_number_end))
if params.has_press_release and params.has_press_release and params.has_press_release != "ALL": query_params.append(("BasinDuyurusu", params.has_press_release))
if params.has_dissenting_opinion and params.has_dissenting_opinion and params.has_dissenting_opinion != "ALL": query_params.append(("KarsiOy", params.has_dissenting_opinion))
if params.has_different_reasoning and params.has_different_reasoning and params.has_different_reasoning != "ALL": query_params.append(("FarkliGerekce", params.has_different_reasoning))
# Add pagination and sorting parameters as query params instead of URL path
if params.results_per_page and params.results_per_page != 10:
query_params.append(("SatirSayisi", str(params.results_per_page)))
if params.sort_by_criteria and params.sort_by_criteria != "KararTarihi":
query_params.append(("Siralama", params.sort_by_criteria))
if params.page_to_fetch and params.page_to_fetch > 1:
query_params.append(("page", str(params.page_to_fetch)))
return query_params
async def search_norm_denetimi_decisions(
self,
params: AnayasaNormDenetimiSearchRequest
) -> AnayasaSearchResult:
# Use simple /Ara endpoint - the complex path structure seems to cause 404s
request_path = f"/{self.SEARCH_PATH_SEGMENT}"
final_query_params = self._build_search_query_params_for_aym(params)
logger.info(f"AnayasaMahkemesiApiClient: Performing Norm Denetimi search. Path: {request_path}, Params: {final_query_params}")
try:
response = await self.http_client.get(request_path, params=final_query_params)
response.raise_for_status()
html_content = response.text
except httpx.RequestError as e:
logger.error(f"AnayasaMahkemesiApiClient: HTTP request error during Norm Denetimi search: {e}")
raise
except Exception as e:
logger.error(f"AnayasaMahkemesiApiClient: Error processing Norm Denetimi search request: {e}")
raise
soup = BeautifulSoup(html_content, 'html.parser')
total_records = None
bulunan_karar_div = soup.find("div", class_="bulunankararsayisi")
if not bulunan_karar_div: # Fallback for mobile view
bulunan_karar_div = soup.find("div", class_="bulunankararsayisiMobil")
if bulunan_karar_div:
match_records = re.search(r'(\d+)\s*Karar Bulundu', bulunan_karar_div.get_text(strip=True))
if match_records:
total_records = int(match_records.group(1))
processed_decisions: List[AnayasaDecisionSummary] = []
decision_divs = soup.find_all("div", class_="birkarar")
for decision_div in decision_divs:
link_tag = decision_div.find("a", href=True)
doc_url_path = link_tag['href'] if link_tag else None
decision_page_url_str = urljoin(self.BASE_URL, doc_url_path) if doc_url_path else None
title_div = decision_div.find("div", class_="bkararbaslik")
ek_no_text_raw = title_div.get_text(strip=True, separator=" ").replace('\xa0', ' ') if title_div else ""
ek_no_match = re.search(r"(E\.\s*\d+/\d+\s*,\s*K\.\s*\d+/\d+)", ek_no_text_raw)
ek_no_text = ek_no_match.group(1) if ek_no_match else ek_no_text_raw.split("Sayılı Karar")[0].strip()
keyword_count_div = title_div.find("div", class_="BulunanKelimeSayisi") if title_div else None
keyword_count_text = keyword_count_div.get_text(strip=True).replace("Bulunan Kelime Sayısı", "").strip() if keyword_count_div else None
keyword_count = int(keyword_count_text) if keyword_count_text and keyword_count_text.isdigit() else None
info_div = decision_div.find("div", class_="kararbilgileri")
info_parts = [part.strip() for part in info_div.get_text(separator="|").split("|")] if info_div else []
app_type_summary = info_parts[0] if len(info_parts) > 0 else None
applicant_summary = info_parts[1] if len(info_parts) > 1 else None
outcome_summary = info_parts[2] if len(info_parts) > 2 else None
dec_date_raw = info_parts[3] if len(info_parts) > 3 else None
decision_date_summary = dec_date_raw.replace("Karar Tarihi:", "").strip() if dec_date_raw else None
reviewed_norms_list: List[AnayasaReviewedNormInfo] = []
details_table_container = decision_div.find_next_sibling("div", class_=re.compile(r"col-sm-12")) # The details table is in a sibling div
if details_table_container:
details_table = details_table_container.find("table", class_="table")
if details_table and details_table.find("tbody"):
for row in details_table.find("tbody").find_all("tr"):
cells = row.find_all("td")
if len(cells) == 6:
reviewed_norms_list.append(AnayasaReviewedNormInfo(
norm_name_or_number=cells[0].get_text(strip=True) or None,
article_number=cells[1].get_text(strip=True) or None,
review_type_and_outcome=cells[2].get_text(strip=True) or None,
outcome_reason=cells[3].get_text(strip=True) or None,
basis_constitution_articles_cited=[a.strip() for a in cells[4].get_text(strip=True).split(',') if a.strip()] if cells[4].get_text(strip=True) else [],
postponement_period=cells[5].get_text(strip=True) or None
))
processed_decisions.append(AnayasaDecisionSummary(
decision_reference_no=ek_no_text,
decision_page_url=decision_page_url_str,
keywords_found_count=keyword_count,
application_type_summary=app_type_summary,
applicant_summary=applicant_summary,
decision_outcome_summary=outcome_summary,
decision_date_summary=decision_date_summary,
reviewed_norms=reviewed_norms_list
))
return AnayasaSearchResult(
decisions=processed_decisions,
total_records_found=total_records,
retrieved_page_number=params.page_to_fetch
)
def _convert_html_to_markdown_norm_denetimi(self, full_decision_html_content: str) -> Optional[str]:
"""Converts direct HTML content from an Anayasa Mahkemesi Norm Denetimi decision page to Markdown."""
if not full_decision_html_content:
return None
processed_html = html.unescape(full_decision_html_content)
soup = BeautifulSoup(processed_html, "html.parser")
html_input_for_markdown = ""
karar_tab_content = soup.find("div", id="Karar") # "KARAR" tab content
if karar_tab_content:
karar_metni_div = karar_tab_content.find("div", class_="KararMetni")
if karar_metni_div:
# Remove scripts and styles
for script_tag in karar_metni_div.find_all("script"): script_tag.decompose()
for style_tag in karar_metni_div.find_all("style"): style_tag.decompose()
# Remove "Künye Kopyala" button and other non-content divs
for item_div in karar_metni_div.find_all("div", class_="item col-sm-12"): item_div.decompose()
for modal_div in karar_metni_div.find_all("div", class_="modal fade"): modal_div.decompose() # If any modals
word_section = karar_metni_div.find("div", class_="WordSection1")
html_input_for_markdown = str(word_section) if word_section else str(karar_metni_div)
else:
html_input_for_markdown = str(karar_tab_content)
else:
# Fallback if specific structure is not found
word_section_fallback = soup.find("div", class_="WordSection1")
if word_section_fallback:
html_input_for_markdown = str(word_section_fallback)
else:
# Last resort: use the whole body or the raw HTML
body_tag = soup.find("body")
html_input_for_markdown = str(body_tag) if body_tag else processed_html
markdown_text = None
try:
# Ensure the content is wrapped in basic HTML structure if it's not already
if not html_input_for_markdown.strip().lower().startswith(("<html", "<!doctype")):
html_content = f"<html><head><meta charset=\"UTF-8\"></head><body>{html_input_for_markdown}</body></html>"
else:
html_content = html_input_for_markdown
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_content.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
conversion_result = md_converter.convert(html_stream)
markdown_text = conversion_result.text_content
except Exception as e:
logger.error(f"AnayasaMahkemesiApiClient: MarkItDown conversion error: {e}")
return markdown_text
async def get_decision_document_as_markdown(
self,
document_url: str,
page_number: int = 1
) -> AnayasaDocumentMarkdown:
"""
Retrieves a specific Anayasa Mahkemesi (Norm Denetimi) decision,
converts its content to Markdown, and returns the requested page/chunk.
"""
full_url = urljoin(self.BASE_URL, document_url) if not document_url.startswith("http") else document_url
logger.info(f"AnayasaMahkemesiApiClient: Fetching Norm Denetimi document for Markdown (page {page_number}) from URL: {full_url}")
decision_ek_no_from_page = None
decision_date_from_page = None
official_gazette_from_page = None
try:
# Use a new client instance for document fetching if headers/timeout needs to be different,
# or reuse self.http_client if settings are compatible. For now, self.http_client.
get_response = await self.http_client.get(full_url, headers={"Accept": "text/html"})
get_response.raise_for_status()
html_content_from_api = get_response.text
if not isinstance(html_content_from_api, str) or not html_content_from_api.strip():
logger.warning(f"AnayasaMahkemesiApiClient: Received empty or non-string HTML from URL {full_url}.")
return AnayasaDocumentMarkdown(
source_url=full_url, markdown_chunk=None, current_page=page_number, total_pages=0, is_paginated=False
)
# Extract metadata from the page content (E.K. No, Date, RG)
soup = BeautifulSoup(html_content_from_api, "html.parser")
karar_metni_div = soup.find("div", class_="KararMetni") # Usually within div#Karar
if not karar_metni_div: # Fallback if not in KararMetni
karar_metni_div = soup.find("div", class_="WordSection1")
# Initialize with empty string defaults
decision_ek_no_from_page = ""
decision_date_from_page = ""
official_gazette_from_page = ""
if karar_metni_div:
# Attempt to find E.K. No (Esas No, Karar No)
# Norm Denetimi pages often have this in bold <p> tags directly or in the WordSection1
# Look for patterns like "Esas No.: YYYY/NN" and "Karar No.: YYYY/NN"
esas_no_tag = karar_metni_div.find(lambda tag: tag.name == "p" and tag.find("b") and "Esas No.:" in tag.find("b").get_text())
karar_no_tag = karar_metni_div.find(lambda tag: tag.name == "p" and tag.find("b") and "Karar No.:" in tag.find("b").get_text())
karar_tarihi_tag = karar_metni_div.find(lambda tag: tag.name == "p" and tag.find("b") and "Karar tarihi:" in tag.find("b").get_text()) # Less common on Norm pages
resmi_gazete_tag = karar_metni_div.find(lambda tag: tag.name == "p" and ("Resmî Gazete tarih ve sayısı:" in tag.get_text() or "Resmi Gazete tarih/sayı:" in tag.get_text()))
if esas_no_tag and esas_no_tag.find("b") and karar_no_tag and karar_no_tag.find("b"):
esas_str = esas_no_tag.find("b").get_text(strip=True).replace('Esas No.:', '').strip()
karar_str = karar_no_tag.find("b").get_text(strip=True).replace('Karar No.:', '').strip()
decision_ek_no_from_page = f"E.{esas_str}, K.{karar_str}"
if karar_tarihi_tag and karar_tarihi_tag.find("b"):
decision_date_from_page = karar_tarihi_tag.find("b").get_text(strip=True).replace("Karar tarihi:", "").strip()
elif karar_metni_div: # Fallback for Karar Tarihi if not in specific tag
date_match = re.search(r"Karar Tarihi\s*:\s*([\d\.]+)", karar_metni_div.get_text()) # Norm pages often use DD.MM.YYYY
if date_match: decision_date_from_page = date_match.group(1).strip()
if resmi_gazete_tag:
# Try to get the bold part first if it exists
bold_rg_tag = resmi_gazete_tag.find("b")
rg_text_content = bold_rg_tag.get_text(strip=True) if bold_rg_tag else resmi_gazete_tag.get_text(strip=True)
official_gazette_from_page = rg_text_content.replace("Resmî Gazete tarih ve sayısı:", "").replace("Resmi Gazete tarih/sayı:", "").strip()
full_markdown_content = self._convert_html_to_markdown_norm_denetimi(html_content_from_api)
if not full_markdown_content:
return AnayasaDocumentMarkdown(
source_url=full_url,
decision_reference_no_from_page=decision_ek_no_from_page,
decision_date_from_page=decision_date_from_page,
official_gazette_info_from_page=official_gazette_from_page,
markdown_chunk=None,
current_page=page_number,
total_pages=0,
is_paginated=False
)
content_length = len(full_markdown_content)
total_pages = math.ceil(content_length / self.DOCUMENT_MARKDOWN_CHUNK_SIZE)
if total_pages == 0: total_pages = 1
current_page_clamped = max(1, min(page_number, total_pages))
start_index = (current_page_clamped - 1) * self.DOCUMENT_MARKDOWN_CHUNK_SIZE
end_index = start_index + self.DOCUMENT_MARKDOWN_CHUNK_SIZE
markdown_chunk = full_markdown_content[start_index:end_index]
return AnayasaDocumentMarkdown(
source_url=full_url,
decision_reference_no_from_page=decision_ek_no_from_page,
decision_date_from_page=decision_date_from_page,
official_gazette_info_from_page=official_gazette_from_page,
markdown_chunk=markdown_chunk,
current_page=current_page_clamped,
total_pages=total_pages,
is_paginated=(total_pages > 1)
)
except httpx.RequestError as e:
logger.error(f"AnayasaMahkemesiApiClient: HTTP error fetching Norm Denetimi document from {full_url}: {e}")
raise
except Exception as e:
logger.error(f"AnayasaMahkemesiApiClient: General error processing Norm Denetimi document from {full_url}: {e}")
raise
async def close_client_session(self):
if hasattr(self, 'http_client') and self.http_client and not self.http_client.is_closed:
await self.http_client.aclose()
logger.info("AnayasaMahkemesiApiClient (Norm Denetimi): HTTP client session closed.")
```
--------------------------------------------------------------------------------
/mcp_auth_http_simple.py:
--------------------------------------------------------------------------------
```python
"""
Simplified MCP OAuth HTTP adapter - only Clerk JWT based authentication
Uses Redis for authorization code storage to support multi-machine deployment
"""
import os
import logging
from typing import Optional
from urllib.parse import urlencode, quote
from fastapi import APIRouter, Request, Query, HTTPException
from fastapi.responses import RedirectResponse, JSONResponse
# Import Redis session store
from redis_session_store import get_redis_store
# Try to import Clerk SDK
try:
from clerk_backend_api import Clerk
CLERK_AVAILABLE = True
except ImportError:
CLERK_AVAILABLE = False
Clerk = None
logger = logging.getLogger(__name__)
router = APIRouter()
# OAuth configuration
BASE_URL = os.getenv("BASE_URL", "https://api.yargimcp.com")
CLERK_DOMAIN = os.getenv("CLERK_DOMAIN", "accounts.yargimcp.com")
# Initialize Redis store
redis_store = None
def get_redis_session_store():
"""Get Redis store instance with lazy initialization."""
global redis_store
if redis_store is None:
try:
import concurrent.futures
import functools
# Use thread pool with timeout to prevent hanging
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(get_redis_store)
try:
# 5 second timeout for Redis initialization
redis_store = future.result(timeout=5.0)
if redis_store:
logger.info("Redis session store initialized for OAuth handler")
else:
logger.warning("Redis store initialization returned None")
except concurrent.futures.TimeoutError:
logger.error("Redis initialization timed out after 5 seconds")
redis_store = None
future.cancel() # Try to cancel the hanging operation
except Exception as e:
logger.error(f"Failed to initialize Redis store: {e}")
redis_store = None
if redis_store is None:
# Fall back to in-memory storage with warning
logger.warning("Falling back to in-memory storage - multi-machine deployment will not work")
return redis_store
@router.get("/.well-known/oauth-authorization-server")
async def get_oauth_metadata():
"""OAuth 2.0 Authorization Server Metadata (RFC 8414)"""
return JSONResponse({
"issuer": BASE_URL,
"authorization_endpoint": "https://yargimcp.com/mcp-callback",
"token_endpoint": f"{BASE_URL}/token",
"registration_endpoint": f"{BASE_URL}/register",
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code"],
"code_challenge_methods_supported": ["S256"],
"token_endpoint_auth_methods_supported": ["none"],
"scopes_supported": ["read", "search", "openid", "profile", "email"],
"service_documentation": f"{BASE_URL}/mcp/"
})
@router.get("/auth/login")
async def oauth_authorize(
request: Request,
client_id: str = Query(...),
redirect_uri: str = Query(...),
response_type: str = Query("code"),
scope: Optional[str] = Query("read search"),
state: Optional[str] = Query(None),
code_challenge: Optional[str] = Query(None),
code_challenge_method: Optional[str] = Query(None)
):
"""OAuth 2.1 Authorization Endpoint - redirects to Clerk"""
logger.info(f"OAuth authorize request - client_id: {client_id}")
logger.info(f"Redirect URI: {redirect_uri}")
logger.info(f"State: {state}")
logger.info(f"PKCE Challenge: {bool(code_challenge)}")
try:
# Build callback URL with all necessary parameters
callback_url = f"{BASE_URL}/auth/callback"
callback_params = {
"client_id": client_id,
"redirect_uri": redirect_uri,
"state": state or "",
"scope": scope or "read search"
}
# Add PKCE parameters if present
if code_challenge:
callback_params["code_challenge"] = code_challenge
callback_params["code_challenge_method"] = code_challenge_method or "S256"
# Encode callback URL as redirect_url for Clerk
callback_with_params = f"{callback_url}?{urlencode(callback_params)}"
# Build Clerk sign-in URL - use yargimcp.com frontend for JWT token generation
clerk_params = {
"redirect_url": callback_with_params
}
# Use frontend sign-in page that handles JWT token generation
clerk_signin_url = f"https://yargimcp.com/sign-in?{urlencode(clerk_params)}"
logger.info(f"Redirecting to Clerk: {clerk_signin_url}")
return RedirectResponse(url=clerk_signin_url)
except Exception as e:
logger.exception(f"Authorization failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/auth/callback")
async def oauth_callback(
request: Request,
client_id: str = Query(...),
redirect_uri: str = Query(...),
state: Optional[str] = Query(None),
scope: Optional[str] = Query("read search"),
code_challenge: Optional[str] = Query(None),
code_challenge_method: Optional[str] = Query(None),
clerk_token: Optional[str] = Query(None)
):
"""OAuth callback from Clerk - generates authorization code"""
logger.info(f"OAuth callback - client_id: {client_id}")
logger.info(f"Clerk token provided: {bool(clerk_token)}")
try:
# Validate user with Clerk and generate real JWT token
user_authenticated = False
user_id = None
session_id = None
real_jwt_token = None
if clerk_token and CLERK_AVAILABLE:
try:
# Extract user info from JWT token (no Clerk session verification needed)
import jwt
decoded_token = jwt.decode(clerk_token, options={"verify_signature": False})
user_id = decoded_token.get("user_id") or decoded_token.get("sub")
user_email = decoded_token.get("email")
token_scopes = decoded_token.get("scopes", ["read", "search"])
logger.info(f"JWT token claims - user_id: {user_id}, email: {user_email}, scopes: {token_scopes}")
if user_id and user_email:
# JWT token is already signed by Clerk and contains valid user info
user_authenticated = True
logger.info(f"User authenticated via JWT token - user_id: {user_id}")
# Use the JWT token directly as the real token (it's already from Clerk template)
real_jwt_token = clerk_token
logger.info("Using Clerk JWT token directly (already real token)")
else:
logger.error(f"Missing required fields in JWT token - user_id: {bool(user_id)}, email: {bool(user_email)}")
except Exception as e:
logger.error(f"JWT validation failed: {e}")
# Fallback to cookie validation
if not user_authenticated:
clerk_session = request.cookies.get("__session")
if clerk_session:
user_authenticated = True
logger.info("User authenticated via cookie")
# Try to get session from cookie and generate JWT
if CLERK_AVAILABLE:
try:
clerk = Clerk(bearer_auth=os.getenv("CLERK_SECRET_KEY"))
# Note: sessions.verify_session is deprecated, but we'll try
# In practice, you'd need to extract session_id from cookie
logger.info("Cookie authentication - JWT generation not implemented yet")
except Exception as e:
logger.warning(f"Failed to generate JWT from cookie: {e}")
# Only generate authorization code if we have a real JWT token
if user_authenticated and real_jwt_token:
# Generate authorization code
auth_code = f"clerk_auth_{os.urandom(16).hex()}"
# Prepare code data
import time
code_data = {
"user_id": user_id,
"session_id": session_id,
"real_jwt_token": real_jwt_token,
"user_authenticated": user_authenticated,
"client_id": client_id,
"redirect_uri": redirect_uri,
"scope": scope or "read search"
}
# Try to store in Redis, fall back to in-memory if Redis unavailable
store = get_redis_session_store()
if store:
# Store in Redis with automatic expiration
success = store.set_oauth_code(auth_code, code_data)
if success:
logger.info(f"Stored authorization code {auth_code[:10]}... in Redis with real JWT token")
else:
logger.error(f"Failed to store authorization code in Redis, falling back to in-memory")
# Fall back to in-memory storage
if not hasattr(oauth_callback, '_code_storage'):
oauth_callback._code_storage = {}
oauth_callback._code_storage[auth_code] = code_data
else:
# Fall back to in-memory storage
logger.warning("Redis not available, using in-memory storage")
if not hasattr(oauth_callback, '_code_storage'):
oauth_callback._code_storage = {}
oauth_callback._code_storage[auth_code] = code_data
logger.info(f"Stored authorization code in memory (fallback)")
# Redirect back to client with authorization code
redirect_params = {
"code": auth_code,
"state": state or ""
}
final_redirect_url = f"{redirect_uri}?{urlencode(redirect_params)}"
logger.info(f"Redirecting back to client: {final_redirect_url}")
return RedirectResponse(url=final_redirect_url)
else:
# No JWT token yet - redirect back to sign-in page to wait for authentication
logger.info("No JWT token provided - redirecting back to sign-in to complete authentication")
# Keep the same redirect URL so the flow continues
sign_in_params = {
"redirect_url": f"{request.url._url}" # Current callback URL with all params
}
sign_in_url = f"https://yargimcp.com/sign-in?{urlencode(sign_in_params)}"
logger.info(f"Redirecting back to sign-in: {sign_in_url}")
return RedirectResponse(url=sign_in_url)
except Exception as e:
logger.exception(f"Callback processing failed: {e}")
return JSONResponse(
status_code=500,
content={"error": "server_error", "error_description": str(e)}
)
@router.post("/auth/register")
async def register_client(request: Request):
"""Dynamic Client Registration (RFC 7591)"""
data = await request.json()
logger.info(f"Client registration request: {data}")
# Simple dynamic registration - accept any client
client_id = f"mcp-client-{os.urandom(8).hex()}"
return JSONResponse({
"client_id": client_id,
"client_secret": None, # Public client
"redirect_uris": data.get("redirect_uris", []),
"grant_types": ["authorization_code"],
"response_types": ["code"],
"client_name": data.get("client_name", "MCP Client"),
"token_endpoint_auth_method": "none"
})
@router.post("/auth/callback")
async def oauth_callback_post(request: Request):
"""OAuth callback POST endpoint for token exchange"""
# Parse form data (standard OAuth token exchange format)
form_data = await request.form()
grant_type = form_data.get("grant_type")
code = form_data.get("code")
redirect_uri = form_data.get("redirect_uri")
client_id = form_data.get("client_id")
code_verifier = form_data.get("code_verifier")
logger.info(f"OAuth callback POST - grant_type: {grant_type}")
logger.info(f"Code: {code[:20] if code else 'None'}...")
logger.info(f"Client ID: {client_id}")
logger.info(f"PKCE verifier: {bool(code_verifier)}")
if grant_type != "authorization_code":
return JSONResponse(
status_code=400,
content={"error": "unsupported_grant_type"}
)
if not code or not redirect_uri:
return JSONResponse(
status_code=400,
content={"error": "invalid_request", "error_description": "Missing code or redirect_uri"}
)
try:
# Validate authorization code
if not code.startswith("clerk_auth_"):
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Invalid authorization code"}
)
# Retrieve stored JWT token using authorization code from Redis or in-memory fallback
stored_code_data = None
# Try to get from Redis first, then fall back to in-memory
store = get_redis_session_store()
if store:
stored_code_data = store.get_oauth_code(code, delete_after_use=True)
if stored_code_data:
logger.info(f"Retrieved authorization code {code[:10]}... from Redis")
else:
logger.warning(f"Authorization code {code[:10]}... not found in Redis")
# Fall back to in-memory storage if Redis unavailable or code not found
if not stored_code_data and hasattr(oauth_callback, '_code_storage'):
stored_code_data = oauth_callback._code_storage.get(code)
if stored_code_data:
# Clean up in-memory storage
oauth_callback._code_storage.pop(code, None)
logger.info(f"Retrieved authorization code {code[:10]}... from in-memory storage")
if not stored_code_data:
logger.error(f"No stored data found for authorization code: {code}")
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Authorization code not found or expired"}
)
# Note: Redis TTL handles expiration automatically, but check for manual expiration for in-memory fallback
import time
expires_at = stored_code_data.get("expires_at", 0)
if expires_at and time.time() > expires_at:
logger.error(f"Authorization code expired: {code}")
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Authorization code expired"}
)
# Get the real JWT token
real_jwt_token = stored_code_data.get("real_jwt_token")
if real_jwt_token:
logger.info("Returning real Clerk JWT token")
# Note: Code already deleted from Redis, clean up in-memory fallback if used
if hasattr(oauth_callback, '_code_storage'):
oauth_callback._code_storage.pop(code, None)
return JSONResponse({
"access_token": real_jwt_token,
"token_type": "Bearer",
"expires_in": 3600,
"scope": "read search"
})
else:
logger.warning("No real JWT token found, generating mock token")
# Fallback to mock token for testing
mock_token = f"mock_clerk_jwt_{code}"
return JSONResponse({
"access_token": mock_token,
"token_type": "Bearer",
"expires_in": 3600,
"scope": "read search"
})
except Exception as e:
logger.exception(f"OAuth callback POST failed: {e}")
return JSONResponse(
status_code=500,
content={"error": "server_error", "error_description": str(e)}
)
@router.post("/register")
async def register_client(request: Request):
"""Dynamic Client Registration (RFC 7591)"""
data = await request.json()
logger.info(f"Client registration request: {data}")
# Simple dynamic registration - accept any client
client_id = f"mcp-client-{os.urandom(8).hex()}"
return JSONResponse({
"client_id": client_id,
"client_secret": None, # Public client
"redirect_uris": data.get("redirect_uris", []),
"grant_types": ["authorization_code"],
"response_types": ["code"],
"client_name": data.get("client_name", "MCP Client"),
"token_endpoint_auth_method": "none"
})
@router.post("/token")
async def token_endpoint(request: Request):
"""OAuth 2.1 Token Endpoint - exchanges code for Clerk JWT"""
# Parse form data
form_data = await request.form()
grant_type = form_data.get("grant_type")
code = form_data.get("code")
redirect_uri = form_data.get("redirect_uri")
client_id = form_data.get("client_id")
code_verifier = form_data.get("code_verifier")
logger.info(f"Token exchange - grant_type: {grant_type}")
logger.info(f"Code: {code[:20] if code else 'None'}...")
if grant_type != "authorization_code":
return JSONResponse(
status_code=400,
content={"error": "unsupported_grant_type"}
)
if not code or not redirect_uri:
return JSONResponse(
status_code=400,
content={"error": "invalid_request", "error_description": "Missing code or redirect_uri"}
)
try:
# Validate authorization code
if not code.startswith("clerk_auth_"):
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Invalid authorization code"}
)
# Retrieve stored JWT token using authorization code from Redis or in-memory fallback
stored_code_data = None
# Try to get from Redis first, then fall back to in-memory
store = get_redis_session_store()
if store:
stored_code_data = store.get_oauth_code(code, delete_after_use=True)
if stored_code_data:
logger.info(f"Retrieved authorization code {code[:10]}... from Redis (/token endpoint)")
else:
logger.warning(f"Authorization code {code[:10]}... not found in Redis (/token endpoint)")
# Fall back to in-memory storage if Redis unavailable or code not found
if not stored_code_data and hasattr(oauth_callback, '_code_storage'):
stored_code_data = oauth_callback._code_storage.get(code)
if stored_code_data:
# Clean up in-memory storage
oauth_callback._code_storage.pop(code, None)
logger.info(f"Retrieved authorization code {code[:10]}... from in-memory storage (/token endpoint)")
if not stored_code_data:
logger.error(f"No stored data found for authorization code: {code}")
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Authorization code not found or expired"}
)
# Note: Redis TTL handles expiration automatically, but check for manual expiration for in-memory fallback
import time
expires_at = stored_code_data.get("expires_at", 0)
if expires_at and time.time() > expires_at:
logger.error(f"Authorization code expired: {code}")
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Authorization code expired"}
)
# Get the real JWT token
real_jwt_token = stored_code_data.get("real_jwt_token")
if real_jwt_token:
logger.info("Returning real Clerk JWT token from /token endpoint")
# Note: Code already deleted from Redis, clean up in-memory fallback if used
if hasattr(oauth_callback, '_code_storage'):
oauth_callback._code_storage.pop(code, None)
return JSONResponse({
"access_token": real_jwt_token,
"token_type": "Bearer",
"expires_in": 3600,
"scope": "read search"
})
else:
logger.warning("No real JWT token found in /token endpoint, generating mock token")
# Fallback to mock token for testing
mock_token = f"mock_clerk_jwt_{code}"
return JSONResponse({
"access_token": mock_token,
"token_type": "Bearer",
"expires_in": 3600,
"scope": "read search"
})
except Exception as e:
logger.exception(f"Token exchange failed: {e}")
return JSONResponse(
status_code=500,
content={"error": "server_error", "error_description": str(e)}
)
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/mcp_auth_http_simple.py:
--------------------------------------------------------------------------------
```python
"""
Simplified MCP OAuth HTTP adapter - only Clerk JWT based authentication
Uses Redis for authorization code storage to support multi-machine deployment
"""
import os
import logging
from typing import Optional
from urllib.parse import urlencode, quote
from fastapi import APIRouter, Request, Query, HTTPException
from fastapi.responses import RedirectResponse, JSONResponse
# Import Redis session store
from redis_session_store import get_redis_store
# Try to import Clerk SDK
try:
from clerk_backend_api import Clerk
CLERK_AVAILABLE = True
except ImportError:
CLERK_AVAILABLE = False
Clerk = None
logger = logging.getLogger(__name__)
router = APIRouter()
# OAuth configuration
BASE_URL = os.getenv("BASE_URL", "https://api.yargimcp.com")
CLERK_DOMAIN = os.getenv("CLERK_DOMAIN", "accounts.yargimcp.com")
# Initialize Redis store
redis_store = None
def get_redis_session_store():
"""Get Redis store instance with lazy initialization."""
global redis_store
if redis_store is None:
try:
import concurrent.futures
import functools
# Use thread pool with timeout to prevent hanging
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(get_redis_store)
try:
# 5 second timeout for Redis initialization
redis_store = future.result(timeout=5.0)
if redis_store:
logger.info("Redis session store initialized for OAuth handler")
else:
logger.warning("Redis store initialization returned None")
except concurrent.futures.TimeoutError:
logger.error("Redis initialization timed out after 5 seconds")
redis_store = None
future.cancel() # Try to cancel the hanging operation
except Exception as e:
logger.error(f"Failed to initialize Redis store: {e}")
redis_store = None
if redis_store is None:
# Fall back to in-memory storage with warning
logger.warning("Falling back to in-memory storage - multi-machine deployment will not work")
return redis_store
@router.get("/.well-known/oauth-authorization-server")
async def get_oauth_metadata():
"""OAuth 2.0 Authorization Server Metadata (RFC 8414)"""
return JSONResponse({
"issuer": BASE_URL,
"authorization_endpoint": "https://yargimcp.com/mcp-callback",
"token_endpoint": f"{BASE_URL}/token",
"registration_endpoint": f"{BASE_URL}/register",
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code"],
"code_challenge_methods_supported": ["S256"],
"token_endpoint_auth_methods_supported": ["none"],
"scopes_supported": ["read", "search", "openid", "profile", "email"],
"service_documentation": f"{BASE_URL}/mcp/"
})
@router.get("/auth/login")
async def oauth_authorize(
request: Request,
client_id: str = Query(...),
redirect_uri: str = Query(...),
response_type: str = Query("code"),
scope: Optional[str] = Query("read search"),
state: Optional[str] = Query(None),
code_challenge: Optional[str] = Query(None),
code_challenge_method: Optional[str] = Query(None)
):
"""OAuth 2.1 Authorization Endpoint - redirects to Clerk"""
logger.info(f"OAuth authorize request - client_id: {client_id}")
logger.info(f"Redirect URI: {redirect_uri}")
logger.info(f"State: {state}")
logger.info(f"PKCE Challenge: {bool(code_challenge)}")
try:
# Build callback URL with all necessary parameters
callback_url = f"{BASE_URL}/auth/callback"
callback_params = {
"client_id": client_id,
"redirect_uri": redirect_uri,
"state": state or "",
"scope": scope or "read search"
}
# Add PKCE parameters if present
if code_challenge:
callback_params["code_challenge"] = code_challenge
callback_params["code_challenge_method"] = code_challenge_method or "S256"
# Encode callback URL as redirect_url for Clerk
callback_with_params = f"{callback_url}?{urlencode(callback_params)}"
# Build Clerk sign-in URL - use yargimcp.com frontend for JWT token generation
clerk_params = {
"redirect_url": callback_with_params
}
# Use frontend sign-in page that handles JWT token generation
clerk_signin_url = f"https://yargimcp.com/sign-in?{urlencode(clerk_params)}"
logger.info(f"Redirecting to Clerk: {clerk_signin_url}")
return RedirectResponse(url=clerk_signin_url)
except Exception as e:
logger.exception(f"Authorization failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/auth/callback")
async def oauth_callback(
request: Request,
client_id: str = Query(...),
redirect_uri: str = Query(...),
state: Optional[str] = Query(None),
scope: Optional[str] = Query("read search"),
code_challenge: Optional[str] = Query(None),
code_challenge_method: Optional[str] = Query(None),
clerk_token: Optional[str] = Query(None)
):
"""OAuth callback from Clerk - generates authorization code"""
logger.info(f"OAuth callback - client_id: {client_id}")
logger.info(f"Clerk token provided: {bool(clerk_token)}")
try:
# Validate user with Clerk and generate real JWT token
user_authenticated = False
user_id = None
session_id = None
real_jwt_token = None
if clerk_token and CLERK_AVAILABLE:
try:
# Extract user info from JWT token (no Clerk session verification needed)
import jwt
decoded_token = jwt.decode(clerk_token, options={"verify_signature": False})
user_id = decoded_token.get("user_id") or decoded_token.get("sub")
user_email = decoded_token.get("email")
token_scopes = decoded_token.get("scopes", ["read", "search"])
logger.info(f"JWT token claims - user_id: {user_id}, email: {user_email}, scopes: {token_scopes}")
if user_id and user_email:
# JWT token is already signed by Clerk and contains valid user info
user_authenticated = True
logger.info(f"User authenticated via JWT token - user_id: {user_id}")
# Use the JWT token directly as the real token (it's already from Clerk template)
real_jwt_token = clerk_token
logger.info("Using Clerk JWT token directly (already real token)")
else:
logger.error(f"Missing required fields in JWT token - user_id: {bool(user_id)}, email: {bool(user_email)}")
except Exception as e:
logger.error(f"JWT validation failed: {e}")
# Fallback to cookie validation
if not user_authenticated:
clerk_session = request.cookies.get("__session")
if clerk_session:
user_authenticated = True
logger.info("User authenticated via cookie")
# Try to get session from cookie and generate JWT
if CLERK_AVAILABLE:
try:
clerk = Clerk(bearer_auth=os.getenv("CLERK_SECRET_KEY"))
# Note: sessions.verify_session is deprecated, but we'll try
# In practice, you'd need to extract session_id from cookie
logger.info("Cookie authentication - JWT generation not implemented yet")
except Exception as e:
logger.warning(f"Failed to generate JWT from cookie: {e}")
# Only generate authorization code if we have a real JWT token
if user_authenticated and real_jwt_token:
# Generate authorization code
auth_code = f"clerk_auth_{os.urandom(16).hex()}"
# Prepare code data
import time
code_data = {
"user_id": user_id,
"session_id": session_id,
"real_jwt_token": real_jwt_token,
"user_authenticated": user_authenticated,
"client_id": client_id,
"redirect_uri": redirect_uri,
"scope": scope or "read search"
}
# Try to store in Redis, fall back to in-memory if Redis unavailable
store = get_redis_session_store()
if store:
# Store in Redis with automatic expiration
success = store.set_oauth_code(auth_code, code_data)
if success:
logger.info(f"Stored authorization code {auth_code[:10]}... in Redis with real JWT token")
else:
logger.error(f"Failed to store authorization code in Redis, falling back to in-memory")
# Fall back to in-memory storage
if not hasattr(oauth_callback, '_code_storage'):
oauth_callback._code_storage = {}
oauth_callback._code_storage[auth_code] = code_data
else:
# Fall back to in-memory storage
logger.warning("Redis not available, using in-memory storage")
if not hasattr(oauth_callback, '_code_storage'):
oauth_callback._code_storage = {}
oauth_callback._code_storage[auth_code] = code_data
logger.info(f"Stored authorization code in memory (fallback)")
# Redirect back to client with authorization code
redirect_params = {
"code": auth_code,
"state": state or ""
}
final_redirect_url = f"{redirect_uri}?{urlencode(redirect_params)}"
logger.info(f"Redirecting back to client: {final_redirect_url}")
return RedirectResponse(url=final_redirect_url)
else:
# No JWT token yet - redirect back to sign-in page to wait for authentication
logger.info("No JWT token provided - redirecting back to sign-in to complete authentication")
# Keep the same redirect URL so the flow continues
sign_in_params = {
"redirect_url": f"{request.url._url}" # Current callback URL with all params
}
sign_in_url = f"https://yargimcp.com/sign-in?{urlencode(sign_in_params)}"
logger.info(f"Redirecting back to sign-in: {sign_in_url}")
return RedirectResponse(url=sign_in_url)
except Exception as e:
logger.exception(f"Callback processing failed: {e}")
return JSONResponse(
status_code=500,
content={"error": "server_error", "error_description": str(e)}
)
@router.post("/auth/register")
async def register_client(request: Request):
"""Dynamic Client Registration (RFC 7591)"""
data = await request.json()
logger.info(f"Client registration request: {data}")
# Simple dynamic registration - accept any client
client_id = f"mcp-client-{os.urandom(8).hex()}"
return JSONResponse({
"client_id": client_id,
"client_secret": None, # Public client
"redirect_uris": data.get("redirect_uris", []),
"grant_types": ["authorization_code"],
"response_types": ["code"],
"client_name": data.get("client_name", "MCP Client"),
"token_endpoint_auth_method": "none"
})
@router.post("/auth/callback")
async def oauth_callback_post(request: Request):
"""OAuth callback POST endpoint for token exchange"""
# Parse form data (standard OAuth token exchange format)
form_data = await request.form()
grant_type = form_data.get("grant_type")
code = form_data.get("code")
redirect_uri = form_data.get("redirect_uri")
client_id = form_data.get("client_id")
code_verifier = form_data.get("code_verifier")
logger.info(f"OAuth callback POST - grant_type: {grant_type}")
logger.info(f"Code: {code[:20] if code else 'None'}...")
logger.info(f"Client ID: {client_id}")
logger.info(f"PKCE verifier: {bool(code_verifier)}")
if grant_type != "authorization_code":
return JSONResponse(
status_code=400,
content={"error": "unsupported_grant_type"}
)
if not code or not redirect_uri:
return JSONResponse(
status_code=400,
content={"error": "invalid_request", "error_description": "Missing code or redirect_uri"}
)
try:
# Validate authorization code
if not code.startswith("clerk_auth_"):
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Invalid authorization code"}
)
# Retrieve stored JWT token using authorization code from Redis or in-memory fallback
stored_code_data = None
# Try to get from Redis first, then fall back to in-memory
store = get_redis_session_store()
if store:
stored_code_data = store.get_oauth_code(code, delete_after_use=True)
if stored_code_data:
logger.info(f"Retrieved authorization code {code[:10]}... from Redis")
else:
logger.warning(f"Authorization code {code[:10]}... not found in Redis")
# Fall back to in-memory storage if Redis unavailable or code not found
if not stored_code_data and hasattr(oauth_callback, '_code_storage'):
stored_code_data = oauth_callback._code_storage.get(code)
if stored_code_data:
# Clean up in-memory storage
oauth_callback._code_storage.pop(code, None)
logger.info(f"Retrieved authorization code {code[:10]}... from in-memory storage")
if not stored_code_data:
logger.error(f"No stored data found for authorization code: {code}")
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Authorization code not found or expired"}
)
# Note: Redis TTL handles expiration automatically, but check for manual expiration for in-memory fallback
import time
expires_at = stored_code_data.get("expires_at", 0)
if expires_at and time.time() > expires_at:
logger.error(f"Authorization code expired: {code}")
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Authorization code expired"}
)
# Get the real JWT token
real_jwt_token = stored_code_data.get("real_jwt_token")
if real_jwt_token:
logger.info("Returning real Clerk JWT token")
# Note: Code already deleted from Redis, clean up in-memory fallback if used
if hasattr(oauth_callback, '_code_storage'):
oauth_callback._code_storage.pop(code, None)
return JSONResponse({
"access_token": real_jwt_token,
"token_type": "Bearer",
"expires_in": 3600,
"scope": "read search"
})
else:
logger.warning("No real JWT token found, generating mock token")
# Fallback to mock token for testing
mock_token = f"mock_clerk_jwt_{code}"
return JSONResponse({
"access_token": mock_token,
"token_type": "Bearer",
"expires_in": 3600,
"scope": "read search"
})
except Exception as e:
logger.exception(f"OAuth callback POST failed: {e}")
return JSONResponse(
status_code=500,
content={"error": "server_error", "error_description": str(e)}
)
@router.post("/register")
async def register_client(request: Request):
"""Dynamic Client Registration (RFC 7591)"""
data = await request.json()
logger.info(f"Client registration request: {data}")
# Simple dynamic registration - accept any client
client_id = f"mcp-client-{os.urandom(8).hex()}"
return JSONResponse({
"client_id": client_id,
"client_secret": None, # Public client
"redirect_uris": data.get("redirect_uris", []),
"grant_types": ["authorization_code"],
"response_types": ["code"],
"client_name": data.get("client_name", "MCP Client"),
"token_endpoint_auth_method": "none"
})
@router.post("/token")
async def token_endpoint(request: Request):
"""OAuth 2.1 Token Endpoint - exchanges code for Clerk JWT"""
# Parse form data
form_data = await request.form()
grant_type = form_data.get("grant_type")
code = form_data.get("code")
redirect_uri = form_data.get("redirect_uri")
client_id = form_data.get("client_id")
code_verifier = form_data.get("code_verifier")
logger.info(f"Token exchange - grant_type: {grant_type}")
logger.info(f"Code: {code[:20] if code else 'None'}...")
if grant_type != "authorization_code":
return JSONResponse(
status_code=400,
content={"error": "unsupported_grant_type"}
)
if not code or not redirect_uri:
return JSONResponse(
status_code=400,
content={"error": "invalid_request", "error_description": "Missing code or redirect_uri"}
)
try:
# Validate authorization code
if not code.startswith("clerk_auth_"):
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Invalid authorization code"}
)
# Retrieve stored JWT token using authorization code from Redis or in-memory fallback
stored_code_data = None
# Try to get from Redis first, then fall back to in-memory
store = get_redis_session_store()
if store:
stored_code_data = store.get_oauth_code(code, delete_after_use=True)
if stored_code_data:
logger.info(f"Retrieved authorization code {code[:10]}... from Redis (/token endpoint)")
else:
logger.warning(f"Authorization code {code[:10]}... not found in Redis (/token endpoint)")
# Fall back to in-memory storage if Redis unavailable or code not found
if not stored_code_data and hasattr(oauth_callback, '_code_storage'):
stored_code_data = oauth_callback._code_storage.get(code)
if stored_code_data:
# Clean up in-memory storage
oauth_callback._code_storage.pop(code, None)
logger.info(f"Retrieved authorization code {code[:10]}... from in-memory storage (/token endpoint)")
if not stored_code_data:
logger.error(f"No stored data found for authorization code: {code}")
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Authorization code not found or expired"}
)
# Note: Redis TTL handles expiration automatically, but check for manual expiration for in-memory fallback
import time
expires_at = stored_code_data.get("expires_at", 0)
if expires_at and time.time() > expires_at:
logger.error(f"Authorization code expired: {code}")
return JSONResponse(
status_code=400,
content={"error": "invalid_grant", "error_description": "Authorization code expired"}
)
# Get the real JWT token
real_jwt_token = stored_code_data.get("real_jwt_token")
if real_jwt_token:
logger.info("Returning real Clerk JWT token from /token endpoint")
# Note: Code already deleted from Redis, clean up in-memory fallback if used
if hasattr(oauth_callback, '_code_storage'):
oauth_callback._code_storage.pop(code, None)
return JSONResponse({
"access_token": real_jwt_token,
"token_type": "Bearer",
"expires_in": 3600,
"scope": "read search"
})
else:
logger.warning("No real JWT token found in /token endpoint, generating mock token")
# Fallback to mock token for testing
mock_token = f"mock_clerk_jwt_{code}"
return JSONResponse({
"access_token": mock_token,
"token_type": "Bearer",
"expires_in": 3600,
"scope": "read search"
})
except Exception as e:
logger.exception(f"Token exchange failed: {e}")
return JSONResponse(
status_code=500,
content={"error": "server_error", "error_description": str(e)}
)
```
--------------------------------------------------------------------------------
/asgi_app.py:
--------------------------------------------------------------------------------
```python
"""
ASGI application for Yargı MCP Server
This module provides ASGI/HTTP access to the Yargı MCP server,
allowing it to be deployed as a web service with FastAPI wrapper
for OAuth integration and proper middleware support.
Usage:
uvicorn asgi_app:app --host 0.0.0.0 --port 8000
"""
import os
import time
import logging
import json
from datetime import datetime, timedelta
from fastapi import FastAPI, Request, HTTPException, Query
from fastapi.responses import JSONResponse, HTMLResponse, Response
from fastapi.exception_handlers import http_exception_handler
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
# Import the proper create_app function that includes all middleware
from mcp_server_main import create_app
# Conditional auth-related imports (only if auth enabled)
_auth_check = os.getenv("ENABLE_AUTH", "false").lower() == "true"
if _auth_check:
# Import MCP Auth HTTP adapter (OAuth endpoints)
try:
from mcp_auth_http_simple import router as mcp_auth_router
except ImportError:
mcp_auth_router = None
# Import Stripe webhook router
try:
from stripe_webhook import router as stripe_router
except ImportError:
stripe_router = None
else:
mcp_auth_router = None
stripe_router = None
# OAuth configuration from environment variables
CLERK_ISSUER = os.getenv("CLERK_ISSUER", "https://clerk.yargimcp.com")
BASE_URL = os.getenv("BASE_URL", "https://api.yargimcp.com")
CLERK_SECRET_KEY = os.getenv("CLERK_SECRET_KEY")
CLERK_PUBLISHABLE_KEY = os.getenv("CLERK_PUBLISHABLE_KEY")
# Setup logging
logger = logging.getLogger(__name__)
# Configure CORS and Auth middleware
cors_origins = os.getenv("ALLOWED_ORIGINS", "*").split(",")
# Import FastMCP Bearer Auth Provider
from fastmcp.server.auth import BearerAuthProvider
from fastmcp.server.auth.providers.bearer import RSAKeyPair
# Import Clerk SDK at module level for performance
try:
from clerk_backend_api import Clerk
CLERK_SDK_AVAILABLE = True
except ImportError:
CLERK_SDK_AVAILABLE = False
logger.warning("Clerk SDK not available - falling back to development mode")
# Configure Bearer token authentication based on ENABLE_AUTH
auth_enabled = os.getenv("ENABLE_AUTH", "false").lower() == "true"
bearer_auth = None
if CLERK_SECRET_KEY and CLERK_ISSUER:
# Production: Use Clerk JWKS endpoint for token validation
bearer_auth = BearerAuthProvider(
jwks_uri=f"{CLERK_ISSUER}/.well-known/jwks.json",
issuer=None,
algorithm="RS256",
audience=None,
required_scopes=[]
)
else:
# Development: Generate RSA key pair for testing
dev_key_pair = RSAKeyPair.generate()
bearer_auth = BearerAuthProvider(
public_key=dev_key_pair.public_key,
issuer="https://dev.yargimcp.com",
audience="dev-mcp-server",
required_scopes=["yargi.read"]
)
# Create MCP app with Bearer authentication
mcp_server = create_app(auth=bearer_auth if auth_enabled else None)
# Create MCP Starlette sub-application with root path - mount will add /mcp prefix
mcp_app = mcp_server.http_app(path="/")
# Configure JSON encoder for proper Turkish character support
class UTF8JSONResponse(JSONResponse):
def __init__(self, content=None, status_code=200, headers=None, **kwargs):
if headers is None:
headers = {}
headers["Content-Type"] = "application/json; charset=utf-8"
super().__init__(content, status_code, headers, **kwargs)
def render(self, content) -> bytes:
return json.dumps(
content,
ensure_ascii=False,
allow_nan=False,
indent=None,
separators=(",", ":"),
).encode("utf-8")
custom_middleware = [
Middleware(
CORSMiddleware,
allow_origins=cors_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "OPTIONS", "DELETE"],
allow_headers=["Content-Type", "Authorization", "X-Request-ID", "X-Session-ID"],
),
]
# Create FastAPI wrapper application
app = FastAPI(
title="Yargı MCP Server",
description="MCP server for Turkish legal databases with OAuth authentication",
version="0.1.0",
middleware=custom_middleware,
default_response_class=UTF8JSONResponse, # Use UTF-8 JSON encoder
redirect_slashes=False # Disable to prevent 307 redirects on /mcp endpoint
)
# Add auth-related routers to FastAPI (only if available)
if stripe_router:
app.include_router(stripe_router, prefix="/api/stripe")
if mcp_auth_router:
app.include_router(mcp_auth_router)
# Custom 401 exception handler for MCP spec compliance
@app.exception_handler(401)
async def custom_401_handler(request: Request, exc: HTTPException):
"""Custom 401 handler that adds WWW-Authenticate header as required by MCP spec"""
response = await http_exception_handler(request, exc)
# Add WWW-Authenticate header pointing to protected resource metadata
# as required by RFC 9728 Section 5.1 and MCP Authorization spec
response.headers["WWW-Authenticate"] = (
'Bearer '
'error="invalid_token", '
'error_description="The access token is missing or invalid", '
f'resource="{BASE_URL}/.well-known/oauth-protected-resource"'
)
return response
# FastAPI health check endpoint - BEFORE mounting MCP app
@app.get("/health")
async def health_check():
"""Health check endpoint for monitoring"""
return {
"status": "healthy",
"service": "Yargı MCP Server",
"version": "0.1.0",
"tools_count": len(mcp_server._tool_manager._tools),
"auth_enabled": os.getenv("ENABLE_AUTH", "false").lower() == "true"
}
# Add explicit redirect for /mcp to /mcp/ with method preservation
@app.api_route("/mcp", methods=["GET", "POST", "HEAD", "OPTIONS"])
async def redirect_to_slash(request: Request):
"""Redirect /mcp to /mcp/ preserving HTTP method with 308"""
from fastapi.responses import RedirectResponse
return RedirectResponse(url="/mcp/", status_code=308)
# MCP mount at /mcp handles path routing correctly
# IMPORTANT: Add FastAPI endpoints BEFORE mounting MCP app
# Otherwise mount at root will catch all requests
# Debug endpoint to test routing
@app.get("/debug/test")
async def debug_test():
"""Debug endpoint to test if FastAPI routes work"""
return {"message": "FastAPI routes working", "debug": True}
# Clerk CORS proxy endpoints
@app.api_route("/clerk-proxy/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"])
async def clerk_cors_proxy(request: Request, path: str):
"""
Proxy requests to Clerk to bypass CORS restrictions.
Forwards requests from Claude AI to clerk.yargimcp.com with proper CORS headers.
"""
import httpx
# Build target URL
clerk_url = f"https://clerk.yargimcp.com/{path}"
# Forward query parameters
if request.url.query:
clerk_url += f"?{request.url.query}"
# Copy headers (exclude host/origin)
headers = dict(request.headers)
headers.pop('host', None)
headers.pop('origin', None)
headers['origin'] = 'https://yargimcp.com' # Use our frontend domain
try:
async with httpx.AsyncClient() as client:
# Forward the request to Clerk
if request.method == "OPTIONS":
# Handle preflight
response = await client.request(
method=request.method,
url=clerk_url,
headers=headers
)
else:
# Forward body for POST/PUT requests
body = None
if request.method in ["POST", "PUT", "PATCH"]:
body = await request.body()
response = await client.request(
method=request.method,
url=clerk_url,
headers=headers,
content=body
)
# Create response with CORS headers
response_headers = dict(response.headers)
response_headers.update({
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization, Accept, Origin, X-Requested-With",
"Access-Control-Allow-Credentials": "true",
"Access-Control-Max-Age": "86400"
})
return Response(
content=response.content,
status_code=response.status_code,
headers=response_headers,
media_type=response.headers.get("content-type")
)
except Exception as e:
return JSONResponse(
{"error": "proxy_error", "message": str(e)},
status_code=500,
headers={"Access-Control-Allow-Origin": "*"}
)
# FastAPI root endpoint
@app.get("/")
async def root():
"""Root endpoint with service information"""
return {
"service": "Yargı MCP Server",
"description": "MCP server for Turkish legal databases with OAuth authentication",
"endpoints": {
"mcp": "/mcp",
"health": "/health",
"status": "/status",
"stripe_webhook": "/api/stripe/webhook",
"oauth_login": "/auth/login",
"oauth_callback": "/auth/callback",
"oauth_google": "/auth/google/login",
"user_info": "/auth/user"
},
"transports": {
"http": "/mcp"
},
"supported_databases": [
"Yargıtay (Court of Cassation)",
"Danıştay (Council of State)",
"Emsal (Precedent)",
"Uyuşmazlık Mahkemesi (Court of Jurisdictional Disputes)",
"Anayasa Mahkemesi (Constitutional Court)",
"Kamu İhale Kurulu (Public Procurement Authority)",
"Rekabet Kurumu (Competition Authority)",
"Sayıştay (Court of Accounts)",
"KVKK (Personal Data Protection Authority)",
"BDDK (Banking Regulation and Supervision Agency)",
"Bedesten API (Multiple courts)"
],
"authentication": {
"enabled": os.getenv("ENABLE_AUTH", "false").lower() == "true",
"type": "OAuth 2.0 via Clerk",
"issuer": CLERK_ISSUER,
"providers": ["google"],
"flow": "authorization_code"
}
}
# OAuth 2.0 Authorization Server Metadata - MCP standard location
@app.get("/.well-known/oauth-authorization-server")
async def oauth_authorization_server_root():
"""OAuth 2.0 Authorization Server Metadata - root level for compatibility"""
return {
"issuer": BASE_URL, # Use BASE_URL as issuer for MCP integration
"authorization_endpoint": f"{BASE_URL}/auth/login",
"token_endpoint": f"{BASE_URL}/token",
"jwks_uri": f"{CLERK_ISSUER}/.well-known/jwks.json",
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code", "refresh_token"],
"token_endpoint_auth_methods_supported": ["client_secret_basic", "none"],
"scopes_supported": ["read", "search", "openid", "profile", "email"],
"subject_types_supported": ["public"],
"id_token_signing_alg_values_supported": ["RS256"],
"claims_supported": ["sub", "iss", "aud", "exp", "iat", "email", "name"],
"code_challenge_methods_supported": ["S256"],
"service_documentation": f"{BASE_URL}/mcp",
"registration_endpoint": f"{BASE_URL}/register",
"resource_documentation": f"{BASE_URL}/mcp"
}
# Claude AI MCP specific endpoint format - suffix versions
@app.get("/.well-known/oauth-authorization-server/mcp")
async def oauth_authorization_server_mcp_suffix():
"""OAuth 2.0 Authorization Server Metadata - Claude AI MCP specific format"""
return {
"issuer": BASE_URL, # Use BASE_URL as issuer for MCP integration
"authorization_endpoint": f"{BASE_URL}/auth/login",
"token_endpoint": f"{BASE_URL}/token",
"jwks_uri": f"{CLERK_ISSUER}/.well-known/jwks.json",
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code", "refresh_token"],
"token_endpoint_auth_methods_supported": ["client_secret_basic", "none"],
"scopes_supported": ["read", "search", "openid", "profile", "email"],
"subject_types_supported": ["public"],
"id_token_signing_alg_values_supported": ["RS256"],
"claims_supported": ["sub", "iss", "aud", "exp", "iat", "email", "name"],
"code_challenge_methods_supported": ["S256"],
"service_documentation": f"{BASE_URL}/mcp",
"registration_endpoint": f"{BASE_URL}/register",
"resource_documentation": f"{BASE_URL}/mcp"
}
@app.get("/.well-known/oauth-protected-resource/mcp")
async def oauth_protected_resource_mcp_suffix():
"""OAuth 2.0 Protected Resource Metadata - Claude AI MCP specific format"""
return {
"resource": BASE_URL,
"authorization_servers": [
BASE_URL
],
"scopes_supported": ["read", "search"],
"bearer_methods_supported": ["header"],
"resource_documentation": f"{BASE_URL}/mcp",
"resource_policy_uri": f"{BASE_URL}/privacy"
}
# OAuth 2.0 Protected Resource Metadata (RFC 9728) - MCP Spec Required
@app.get("/.well-known/oauth-protected-resource")
async def oauth_protected_resource():
"""OAuth 2.0 Protected Resource Metadata as required by MCP spec"""
return {
"resource": BASE_URL,
"authorization_servers": [
BASE_URL
],
"scopes_supported": ["read", "search"],
"bearer_methods_supported": ["header"],
"resource_documentation": f"{BASE_URL}/mcp",
"resource_policy_uri": f"{BASE_URL}/privacy"
}
# Standard well-known discovery endpoint
@app.get("/.well-known/mcp")
async def well_known_mcp():
"""Standard MCP discovery endpoint"""
return {
"mcp_server": {
"name": "Yargı MCP Server",
"version": "0.1.0",
"endpoint": f"{BASE_URL}/mcp",
"authentication": {
"type": "oauth2",
"authorization_url": f"{BASE_URL}/auth/login",
"scopes": ["read", "search"]
},
"capabilities": ["tools", "resources"],
"tools_count": len(mcp_server._tool_manager._tools)
}
}
# MCP Discovery endpoint for ChatGPT integration
@app.get("/mcp/discovery")
async def mcp_discovery():
"""MCP Discovery endpoint for ChatGPT and other MCP clients"""
return {
"name": "Yargı MCP Server",
"description": "MCP server for Turkish legal databases",
"version": "0.1.0",
"protocol": "mcp",
"transport": "http",
"endpoint": "/mcp",
"authentication": {
"type": "oauth2",
"authorization_url": "/auth/login",
"token_url": "/token",
"scopes": ["read", "search"],
"provider": "clerk"
},
"capabilities": {
"tools": True,
"resources": True,
"prompts": False
},
"tools_count": len(mcp_server._tool_manager._tools),
"contact": {
"url": BASE_URL,
"email": "[email protected]"
}
}
# FastAPI status endpoint
@app.get("/status")
async def status():
"""Status endpoint with detailed information"""
tools = []
for tool in mcp_server._tool_manager._tools.values():
tools.append({
"name": tool.name,
"description": tool.description[:100] + "..." if len(tool.description) > 100 else tool.description
})
return {
"status": "operational",
"tools": tools,
"total_tools": len(tools),
"transport": "streamable_http",
"architecture": "FastAPI wrapper + MCP Starlette sub-app",
"auth_status": "enabled" if os.getenv("ENABLE_AUTH", "false").lower() == "true" else "disabled"
}
# Simplified OAuth session validation for callback endpoints only
async def validate_clerk_session_for_oauth(request: Request, clerk_token: str = None) -> str:
"""Validate Clerk session for OAuth callback endpoints only (not for MCP endpoints)"""
try:
# Use Clerk SDK if available
if not CLERK_SDK_AVAILABLE:
raise ImportError("Clerk SDK not available")
clerk = Clerk(bearer_auth=CLERK_SECRET_KEY)
# Try JWT token first (from URL parameter)
if clerk_token:
try:
return "oauth_user_from_token"
except Exception as e:
pass
# Fallback to cookie validation
clerk_session = request.cookies.get("__session")
if not clerk_session:
raise HTTPException(status_code=401, detail="No Clerk session found")
# Validate session with Clerk
session = clerk.sessions.verify_session(clerk_session)
return session.user_id
except ImportError:
return "dev_user_123"
except Exception as e:
raise HTTPException(status_code=401, detail=f"OAuth session validation failed: {str(e)}")
# MCP OAuth Callback Endpoint
@app.get("/auth/mcp-callback")
async def mcp_oauth_callback(request: Request, clerk_token: str = Query(None)):
"""Handle OAuth callback for MCP token generation"""
try:
# Validate Clerk session with JWT token support
user_id = await validate_clerk_session_for_oauth(request, clerk_token)
# Return success response
return HTMLResponse(f"""
<html>
<head>
<title>MCP Connection Successful</title>
<style>
body {{ font-family: Arial, sans-serif; text-align: center; padding: 50px; }}
.success {{ color: #28a745; }}
.token {{ background: #f8f9fa; padding: 15px; border-radius: 5px; margin: 20px 0; word-break: break-all; }}
</style>
</head>
<body>
<h1 class="success">✅ MCP Connection Successful!</h1>
<p>Your Yargı MCP integration is now active.</p>
<div class="token">
<strong>Authentication:</strong><br>
<code>Use your Clerk JWT token directly with Bearer authentication</code>
</div>
<p>You can now close this window and return to your MCP client.</p>
<script>
// Try to close the popup if opened as such
if (window.opener) {{
window.opener.postMessage({{
type: 'MCP_AUTH_SUCCESS',
token: 'use_clerk_jwt_token'
}}, '*');
setTimeout(() => window.close(), 3000);
}}
</script>
</body>
</html>
""")
except HTTPException as e:
return HTMLResponse(f"""
<html>
<head>
<title>MCP Connection Failed</title>
<style>
body {{ font-family: Arial, sans-serif; text-align: center; padding: 50px; }}
.error {{ color: #dc3545; }}
.debug {{ background: #f8f9fa; padding: 10px; margin: 20px 0; border-radius: 5px; font-family: monospace; }}
</style>
</head>
<body>
<h1 class="error">❌ MCP Connection Failed</h1>
<p>{e.detail}</p>
<div class="debug">
<strong>Debug Info:</strong><br>
Clerk Token: {'✅ Provided' if clerk_token else '❌ Missing'}<br>
Error: {e.detail}<br>
Status: {e.status_code}
</div>
<p>Please try again or contact support.</p>
<a href="https://yargimcp.com/sign-in">Return to Sign In</a>
</body>
</html>
""", status_code=e.status_code)
except Exception as e:
return HTMLResponse(f"""
<html>
<head>
<title>MCP Connection Error</title>
<style>
body {{ font-family: Arial, sans-serif; text-align: center; padding: 50px; }}
.error {{ color: #dc3545; }}
</style>
</head>
<body>
<h1 class="error">❌ Unexpected Error</h1>
<p>An unexpected error occurred during authentication.</p>
<p>Error: {str(e)}</p>
<a href="https://yargimcp.com/sign-in">Return to Sign In</a>
</body>
</html>
""", status_code=500)
# OAuth2 Token Endpoint - Now uses Clerk JWT tokens directly
@app.post("/auth/mcp-token")
async def mcp_token_endpoint(request: Request):
"""OAuth2 token endpoint for MCP clients - returns Clerk JWT token info"""
try:
# Validate Clerk session
user_id = await validate_clerk_session_for_oauth(request)
return {
"message": "Use your Clerk JWT token directly with Bearer authentication",
"token_type": "Bearer",
"scope": "yargi.read",
"user_id": user_id,
"instructions": "Include 'Authorization: Bearer YOUR_CLERK_JWT_TOKEN' in your requests"
}
except HTTPException as e:
return JSONResponse(
status_code=e.status_code,
content={"error": "invalid_request", "error_description": e.detail}
)
# Mount MCP app at /mcp/ with trailing slash
app.mount("/mcp/", mcp_app)
# Set the lifespan context after mounting
app.router.lifespan_context = mcp_app.lifespan
# Export for uvicorn
__all__ = ["app"]
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/asgi_app.py:
--------------------------------------------------------------------------------
```python
"""
ASGI application for Yargı MCP Server
This module provides ASGI/HTTP access to the Yargı MCP server,
allowing it to be deployed as a web service with FastAPI wrapper
for Stripe webhook integration.
Usage:
uvicorn asgi_app:app --host 0.0.0.0 --port 8000
"""
import os
import time
import logging
from datetime import datetime, timedelta
from fastapi import FastAPI, Request, HTTPException, Query
from fastapi.responses import JSONResponse, HTMLResponse
from fastapi.exception_handlers import http_exception_handler
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import Response
from starlette.requests import Request as StarletteRequest
# Import the MCP app creator function
from mcp_server_main import create_app
# Import Stripe webhook router
from stripe_webhook import router as stripe_router
# Import simplified MCP Auth HTTP adapter
from mcp_auth_http_simple import router as mcp_auth_router
# OAuth configuration from environment variables
CLERK_ISSUER = os.getenv("CLERK_ISSUER", "https://accounts.yargimcp.com")
BASE_URL = os.getenv("BASE_URL", "https://yargimcp.com")
# Setup logging
logger = logging.getLogger(__name__)
# Configure CORS and Auth middleware
cors_origins = os.getenv("ALLOWED_ORIGINS", "*").split(",")
# Import FastMCP Bearer Auth Provider
from fastmcp.server.auth import BearerAuthProvider
from fastmcp.server.auth.providers.bearer import RSAKeyPair
# Clerk JWT configuration for Bearer token validation
CLERK_SECRET_KEY = os.getenv("CLERK_SECRET_KEY")
CLERK_ISSUER = os.getenv("CLERK_ISSUER", "https://accounts.yargimcp.com")
CLERK_PUBLISHABLE_KEY = os.getenv("CLERK_PUBLISHABLE_KEY")
# Configure Bearer token authentication
bearer_auth = None
if CLERK_SECRET_KEY and CLERK_ISSUER:
# Production: Use Clerk JWKS endpoint for token validation
bearer_auth = BearerAuthProvider(
jwks_uri=f"{CLERK_ISSUER}/.well-known/jwks.json",
issuer=CLERK_ISSUER,
algorithm="RS256",
audience=None, # Disable audience validation - Clerk uses different audience format
required_scopes=[] # Disable scope validation - Clerk JWT has ['read', 'search']
)
logger.info(f"Bearer auth configured with Clerk JWKS: {CLERK_ISSUER}/.well-known/jwks.json")
else:
# Development: Generate RSA key pair for testing
logger.warning("No Clerk credentials found - using development RSA key pair")
dev_key_pair = RSAKeyPair.generate()
bearer_auth = BearerAuthProvider(
public_key=dev_key_pair.public_key,
issuer="https://dev.yargimcp.com",
audience="dev-mcp-server",
required_scopes=["yargi.read"]
)
# Generate a test token for development
dev_token = dev_key_pair.create_token(
subject="dev-user",
issuer="https://dev.yargimcp.com",
audience="dev-mcp-server",
scopes=["yargi.read", "yargi.search"],
expires_in_seconds=3600 * 24 # 24 hours for development
)
logger.info(f"Development Bearer token: {dev_token}")
custom_middleware = [
Middleware(
CORSMiddleware,
allow_origins=cors_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "OPTIONS", "DELETE"],
allow_headers=["Content-Type", "Authorization", "X-Request-ID", "X-Session-ID"],
),
]
# Create MCP app with Bearer authentication
mcp_server = create_app(auth=bearer_auth)
# Add Starlette middleware to FastAPI (not MCP)
# MCP already has Bearer auth, no need for additional middleware on MCP level
# Create MCP Starlette sub-application with root path - mount will add /mcp prefix
mcp_app = mcp_server.http_app(path="/")
# Configure JSON encoder for proper Turkish character support
import json
from fastapi.responses import JSONResponse
class UTF8JSONResponse(JSONResponse):
def __init__(self, content=None, status_code=200, headers=None, **kwargs):
if headers is None:
headers = {}
headers["Content-Type"] = "application/json; charset=utf-8"
super().__init__(content, status_code, headers, **kwargs)
def render(self, content) -> bytes:
return json.dumps(
content,
ensure_ascii=False,
allow_nan=False,
indent=None,
separators=(",", ":"),
).encode("utf-8")
# Create FastAPI wrapper application
app = FastAPI(
title="Yargı MCP Server",
description="MCP server for Turkish legal databases with OAuth authentication",
version="0.1.0",
middleware=custom_middleware,
default_response_class=UTF8JSONResponse # Use UTF-8 JSON encoder
)
# Add Stripe webhook router to FastAPI
app.include_router(stripe_router, prefix="/api")
# Add MCP Auth HTTP adapter to FastAPI (handles OAuth endpoints)
app.include_router(mcp_auth_router)
# Custom 401 exception handler for MCP spec compliance
@app.exception_handler(401)
async def custom_401_handler(request: Request, exc: HTTPException):
"""Custom 401 handler that adds WWW-Authenticate header as required by MCP spec"""
response = await http_exception_handler(request, exc)
# Add WWW-Authenticate header pointing to protected resource metadata
# as required by RFC 9728 Section 5.1 and MCP Authorization spec
response.headers["WWW-Authenticate"] = (
'Bearer '
'error="invalid_token", '
'error_description="The access token is missing or invalid", '
f'resource="{BASE_URL}/.well-known/oauth-protected-resource"'
)
return response
# FastAPI health check endpoint - BEFORE mounting MCP app
@app.get("/health")
async def health_check():
"""Health check endpoint for monitoring"""
return JSONResponse({
"status": "healthy",
"service": "Yargı MCP Server",
"version": "0.1.0",
"tools_count": len(mcp_server._tool_manager._tools),
"auth_enabled": os.getenv("ENABLE_AUTH", "false").lower() == "true"
})
# Add explicit redirect for /mcp to /mcp/ with method preservation
@app.api_route("/mcp", methods=["GET", "POST", "HEAD", "OPTIONS"])
async def redirect_to_slash(request: Request):
"""Redirect /mcp to /mcp/ preserving HTTP method with 308"""
from fastapi.responses import RedirectResponse
return RedirectResponse(url="/mcp/", status_code=308)
# Mount MCP app at /mcp/ with trailing slash
app.mount("/mcp/", mcp_app)
# Set the lifespan context after mounting
app.router.lifespan_context = mcp_app.lifespan
# SSE transport deprecated - removed
# FastAPI root endpoint
@app.get("/")
async def root():
"""Root endpoint with service information"""
return JSONResponse({
"service": "Yargı MCP Server",
"description": "MCP server for Turkish legal databases with OAuth authentication",
"endpoints": {
"mcp": "/mcp",
"health": "/health",
"status": "/status",
"stripe_webhook": "/api/stripe/webhook",
"oauth_login": "/auth/login",
"oauth_callback": "/auth/callback",
"oauth_google": "/auth/google/login",
"user_info": "/auth/user"
},
"transports": {
"http": "/mcp"
},
"supported_databases": [
"Yargıtay (Court of Cassation)",
"Danıştay (Council of State)",
"Emsal (Precedent)",
"Uyuşmazlık Mahkemesi (Court of Jurisdictional Disputes)",
"Anayasa Mahkemesi (Constitutional Court)",
"Kamu İhale Kurulu (Public Procurement Authority)",
"Rekabet Kurumu (Competition Authority)",
"Sayıştay (Court of Accounts)",
"Bedesten API (Multiple courts)"
],
"authentication": {
"enabled": os.getenv("ENABLE_AUTH", "false").lower() == "true",
"type": "OAuth 2.0 via Clerk",
"issuer": os.getenv("CLERK_ISSUER", "https://clerk.accounts.dev"),
"providers": ["google"],
"flow": "authorization_code"
}
})
# OAuth 2.0 Authorization Server Metadata proxy (for MCP clients that can't reach Clerk directly)
# MCP Auth Toolkit expects this to be under /mcp/.well-known/oauth-authorization-server
@app.get("/mcp/.well-known/oauth-authorization-server")
async def oauth_authorization_server():
"""OAuth 2.0 Authorization Server Metadata proxy to Clerk - MCP Auth Toolkit standard location"""
return JSONResponse({
"issuer": BASE_URL,
"authorization_endpoint": "https://yargimcp.com/mcp-callback",
"token_endpoint": f"{BASE_URL}/token",
"jwks_uri": f"{CLERK_ISSUER}/.well-known/jwks.json",
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code", "refresh_token"],
"token_endpoint_auth_methods_supported": ["client_secret_basic", "none"],
"scopes_supported": ["read", "search", "openid", "profile", "email"],
"subject_types_supported": ["public"],
"id_token_signing_alg_values_supported": ["RS256"],
"claims_supported": ["sub", "iss", "aud", "exp", "iat", "email", "name"],
"code_challenge_methods_supported": ["S256"],
"service_documentation": f"{BASE_URL}/mcp",
"registration_endpoint": f"{BASE_URL}/register",
"resource_documentation": f"{BASE_URL}/mcp"
})
# Claude AI MCP specific endpoint format
@app.get("/.well-known/oauth-authorization-server/mcp")
async def oauth_authorization_server_mcp_suffix():
"""OAuth 2.0 Authorization Server Metadata - Claude AI MCP specific format"""
return JSONResponse({
"issuer": BASE_URL,
"authorization_endpoint": "https://yargimcp.com/mcp-callback",
"token_endpoint": f"{BASE_URL}/token",
"jwks_uri": f"{CLERK_ISSUER}/.well-known/jwks.json",
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code", "refresh_token"],
"token_endpoint_auth_methods_supported": ["client_secret_basic", "none"],
"scopes_supported": ["read", "search", "openid", "profile", "email"],
"subject_types_supported": ["public"],
"id_token_signing_alg_values_supported": ["RS256"],
"claims_supported": ["sub", "iss", "aud", "exp", "iat", "email", "name"],
"code_challenge_methods_supported": ["S256"],
"service_documentation": f"{BASE_URL}/mcp",
"registration_endpoint": f"{BASE_URL}/register",
"resource_documentation": f"{BASE_URL}/mcp"
})
@app.get("/.well-known/oauth-protected-resource/mcp")
async def oauth_protected_resource_mcp_suffix():
"""OAuth 2.0 Protected Resource Metadata - Claude AI MCP specific format"""
return JSONResponse({
"resource": BASE_URL,
"authorization_servers": [
BASE_URL
],
"scopes_supported": ["read", "search"],
"bearer_methods_supported": ["header"],
"resource_documentation": f"{BASE_URL}/mcp",
"resource_policy_uri": f"{BASE_URL}/privacy"
})
# Keep root level for compatibility with some MCP clients
@app.get("/.well-known/oauth-authorization-server")
async def oauth_authorization_server_root():
"""OAuth 2.0 Authorization Server Metadata proxy to Clerk - root level for compatibility"""
return JSONResponse({
"issuer": BASE_URL,
"authorization_endpoint": "https://yargimcp.com/mcp-callback",
"token_endpoint": f"{BASE_URL}/token",
"jwks_uri": f"{CLERK_ISSUER}/.well-known/jwks.json",
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code", "refresh_token"],
"token_endpoint_auth_methods_supported": ["client_secret_basic", "none"],
"scopes_supported": ["read", "search", "openid", "profile", "email"],
"subject_types_supported": ["public"],
"id_token_signing_alg_values_supported": ["RS256"],
"claims_supported": ["sub", "iss", "aud", "exp", "iat", "email", "name"],
"code_challenge_methods_supported": ["S256"],
"service_documentation": f"{BASE_URL}/mcp",
"registration_endpoint": f"{BASE_URL}/register",
"resource_documentation": f"{BASE_URL}/mcp"
})
# Note: GET /mcp is handled by the mounted MCP app itself
# This prevents 405 Method Not Allowed errors on POST requests
# OAuth 2.0 Protected Resource Metadata (RFC 9728) - MCP Spec Required
@app.get("/.well-known/oauth-protected-resource")
async def oauth_protected_resource():
"""OAuth 2.0 Protected Resource Metadata as required by MCP spec"""
return JSONResponse({
"resource": BASE_URL,
"authorization_servers": [
BASE_URL
],
"scopes_supported": ["read", "search"],
"bearer_methods_supported": ["header"],
"resource_documentation": f"{BASE_URL}/mcp",
"resource_policy_uri": f"{BASE_URL}/privacy"
})
# Standard well-known discovery endpoint
@app.get("/.well-known/mcp")
async def well_known_mcp():
"""Standard MCP discovery endpoint"""
return JSONResponse({
"mcp_server": {
"name": "Yargı MCP Server",
"version": "0.1.0",
"endpoint": f"{BASE_URL}/mcp",
"authentication": {
"type": "oauth2",
"authorization_url": f"{BASE_URL}/auth/login",
"scopes": ["read", "search"]
},
"capabilities": ["tools", "resources"],
"tools_count": len(mcp_server._tool_manager._tools)
}
})
# MCP Discovery endpoint for ChatGPT integration
@app.get("/mcp/discovery")
async def mcp_discovery():
"""MCP Discovery endpoint for ChatGPT and other MCP clients"""
return JSONResponse({
"name": "Yargı MCP Server",
"description": "MCP server for Turkish legal databases",
"version": "0.1.0",
"protocol": "mcp",
"transport": "http",
"endpoint": "/mcp",
"authentication": {
"type": "oauth2",
"authorization_url": "/auth/login",
"token_url": "/auth/callback",
"scopes": ["read", "search"],
"provider": "clerk"
},
"capabilities": {
"tools": True,
"resources": True,
"prompts": False
},
"tools_count": len(mcp_server._tool_manager._tools),
"contact": {
"url": BASE_URL,
"email": "[email protected]"
}
})
# FastAPI status endpoint
@app.get("/status")
async def status():
"""Status endpoint with detailed information"""
tools = []
for tool in mcp_server._tool_manager._tools.values():
tools.append({
"name": tool.name,
"description": tool.description[:100] + "..." if len(tool.description) > 100 else tool.description
})
return JSONResponse({
"status": "operational",
"tools": tools,
"total_tools": len(tools),
"transport": "streamable_http",
"architecture": "FastAPI wrapper + MCP Starlette sub-app",
"auth_status": "enabled" if os.getenv("ENABLE_AUTH", "false").lower() == "true" else "disabled"
})
# Note: JWT token validation is now handled entirely by Clerk
# All authentication flows use Clerk JWT tokens directly
async def validate_clerk_session(request: Request, clerk_token: str = None) -> str:
"""Validate Clerk session from cookies or JWT token and return user_id"""
logger.info(f"Validating Clerk session - token provided: {bool(clerk_token)}")
try:
# Try to import Clerk SDK
from clerk_backend_api import Clerk
clerk = Clerk(bearer_auth=os.getenv("CLERK_SECRET_KEY"))
# Try JWT token first (from URL parameter)
if clerk_token:
logger.info("Validating Clerk JWT token from URL parameter")
try:
# Extract session_id from JWT token and verify with Clerk
import jwt
decoded_token = jwt.decode(clerk_token, options={"verify_signature": False})
session_id = decoded_token.get("sid") # Use standard JWT 'sid' claim
if session_id:
# Verify with Clerk using session_id
session = clerk.sessions.verify(session_id=session_id, token=clerk_token)
user_id = session.user_id if session else None
if user_id:
logger.info(f"JWT token validation successful - user_id: {user_id}")
return user_id
else:
logger.error("JWT token validation failed - no user_id in session")
else:
logger.error("No session_id found in JWT token")
except Exception as e:
logger.error(f"JWT token validation failed: {str(e)}")
# Fall through to cookie validation
# Fallback to cookie validation
logger.info("Attempting cookie-based session validation")
clerk_session = request.cookies.get("__session")
if not clerk_session:
logger.error("No Clerk session cookie found")
raise HTTPException(status_code=401, detail="No Clerk session found")
# Validate session with Clerk
session = clerk.sessions.verify_session(clerk_session)
logger.info(f"Cookie session validation successful - user_id: {session.user_id}")
return session.user_id
except ImportError:
# Fallback for development without Clerk SDK
logger.warning("Clerk SDK not available - using development fallback")
return "dev_user_123"
except Exception as e:
logger.error(f"Session validation failed: {str(e)}")
raise HTTPException(status_code=401, detail=f"Session validation failed: {str(e)}")
# MCP OAuth Callback Endpoint
@app.get("/auth/mcp-callback")
async def mcp_oauth_callback(request: Request, clerk_token: str = Query(None)):
"""Handle OAuth callback for MCP token generation"""
logger.info(f"MCP OAuth callback - clerk_token provided: {bool(clerk_token)}")
try:
# Validate Clerk session with JWT token support
user_id = await validate_clerk_session(request, clerk_token)
logger.info(f"User authenticated successfully - user_id: {user_id}")
# Use the Clerk JWT token directly (no need to generate custom token)
logger.info("User authenticated successfully via Clerk")
# Return success response
return HTMLResponse(f"""
<html>
<head>
<title>MCP Connection Successful</title>
<style>
body {{ font-family: Arial, sans-serif; text-align: center; padding: 50px; }}
.success {{ color: #28a745; }}
.token {{ background: #f8f9fa; padding: 15px; border-radius: 5px; margin: 20px 0; word-break: break-all; }}
</style>
</head>
<body>
<h1 class="success">✅ MCP Connection Successful!</h1>
<p>Your Yargı MCP integration is now active.</p>
<div class="token">
<strong>Authentication:</strong><br>
<code>Use your Clerk JWT token directly with Bearer authentication</code>
</div>
<p>You can now close this window and return to your MCP client.</p>
<script>
// Try to close the popup if opened as such
if (window.opener) {{
window.opener.postMessage({{
type: 'MCP_AUTH_SUCCESS',
token: 'use_clerk_jwt_token'
}}, '*');
setTimeout(() => window.close(), 3000);
}}
</script>
</body>
</html>
""")
except HTTPException as e:
logger.error(f"MCP OAuth callback failed: {e.detail}")
return HTMLResponse(f"""
<html>
<head>
<title>MCP Connection Failed</title>
<style>
body {{ font-family: Arial, sans-serif; text-align: center; padding: 50px; }}
.error {{ color: #dc3545; }}
.debug {{ background: #f8f9fa; padding: 10px; margin: 20px 0; border-radius: 5px; font-family: monospace; }}
</style>
</head>
<body>
<h1 class="error">❌ MCP Connection Failed</h1>
<p>{e.detail}</p>
<div class="debug">
<strong>Debug Info:</strong><br>
Clerk Token: {'✅ Provided' if clerk_token else '❌ Missing'}<br>
Error: {e.detail}<br>
Status: {e.status_code}
</div>
<p>Please try again or contact support.</p>
<a href="https://yargimcp.com/sign-in">Return to Sign In</a>
</body>
</html>
""", status_code=e.status_code)
except Exception as e:
logger.error(f"Unexpected error in MCP OAuth callback: {str(e)}")
return HTMLResponse(f"""
<html>
<head>
<title>MCP Connection Error</title>
<style>
body {{ font-family: Arial, sans-serif; text-align: center; padding: 50px; }}
.error {{ color: #dc3545; }}
</style>
</head>
<body>
<h1 class="error">❌ Unexpected Error</h1>
<p>An unexpected error occurred during authentication.</p>
<p>Error: {str(e)}</p>
<a href="https://yargimcp.com/sign-in">Return to Sign In</a>
</body>
</html>
""", status_code=500)
# OAuth2 Token Endpoint - Now uses Clerk JWT tokens directly
@app.post("/auth/mcp-token")
async def mcp_token_endpoint(request: Request):
"""OAuth2 token endpoint for MCP clients - returns Clerk JWT token info"""
try:
# Validate Clerk session
user_id = await validate_clerk_session(request)
return JSONResponse({
"message": "Use your Clerk JWT token directly with Bearer authentication",
"token_type": "Bearer",
"scope": "yargi.read",
"user_id": user_id,
"instructions": "Include 'Authorization: Bearer YOUR_CLERK_JWT_TOKEN' in your requests"
})
except HTTPException as e:
return JSONResponse(
status_code=e.status_code,
content={"error": "invalid_request", "error_description": e.detail}
)
# Note: Only HTTP transport supported - SSE transport deprecated
# Export for uvicorn
__all__ = ["app"]
```
--------------------------------------------------------------------------------
/rekabet_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# rekabet_mcp_module/client.py
import httpx
from bs4 import BeautifulSoup
from typing import List, Optional, Tuple, Dict, Any
import logging
import html
import re
import io # For io.BytesIO
from urllib.parse import urlencode, urljoin, quote, parse_qs, urlparse
from markitdown import MarkItDown
import math
# pypdf for PDF processing (lighter alternative to PyMuPDF)
from pypdf import PdfReader, PdfWriter # PyPDF2'nin devamı niteliğindeki pypdf
from .models import (
RekabetKurumuSearchRequest,
RekabetDecisionSummary,
RekabetSearchResult,
RekabetDocument,
RekabetKararTuruGuidEnum
)
from pydantic import HttpUrl # Ensure HttpUrl is imported from pydantic
logger = logging.getLogger(__name__)
if not logger.hasHandlers(): # Pragma: no cover
logging.basicConfig(
level=logging.INFO, # Varsayılan log seviyesi
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Debug betiğinde daha detaylı loglama için seviye ayrıca ayarlanabilir.
class RekabetKurumuApiClient:
BASE_URL = "https://www.rekabet.gov.tr"
SEARCH_PATH = "/tr/Kararlar"
DECISION_LANDING_PATH_TEMPLATE = "/Karar"
# PDF sayfa bazlı Markdown döndürüldüğü için bu sabit artık doğrudan kullanılmıyor.
# DOCUMENT_MARKDOWN_CHUNK_SIZE = 5000
def __init__(self, request_timeout: float = 60.0):
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
},
timeout=request_timeout,
verify=True,
follow_redirects=True
)
def _build_search_query_params(self, params: RekabetKurumuSearchRequest) -> List[Tuple[str, str]]:
query_params: List[Tuple[str, str]] = []
query_params.append(("sayfaAdi", params.sayfaAdi if params.sayfaAdi is not None else ""))
query_params.append(("YayinlanmaTarihi", params.YayinlanmaTarihi if params.YayinlanmaTarihi is not None else ""))
query_params.append(("PdfText", params.PdfText if params.PdfText is not None else ""))
karar_turu_id_value = ""
if params.KararTuruID is not None:
karar_turu_id_value = params.KararTuruID.value if params.KararTuruID.value != "ALL" else ""
query_params.append(("KararTuruID", karar_turu_id_value))
query_params.append(("KararSayisi", params.KararSayisi if params.KararSayisi is not None else ""))
query_params.append(("KararTarihi", params.KararTarihi if params.KararTarihi is not None else ""))
if params.page and params.page > 1:
query_params.append(("page", str(params.page)))
return query_params
async def search_decisions(self, params: RekabetKurumuSearchRequest) -> RekabetSearchResult:
request_path = self.SEARCH_PATH
final_query_params = self._build_search_query_params(params)
logger.info(f"RekabetKurumuApiClient: Performing search. Path: {request_path}, Parameters: {final_query_params}")
try:
response = await self.http_client.get(request_path, params=final_query_params)
response.raise_for_status()
html_content = response.text
except httpx.RequestError as e:
logger.error(f"RekabetKurumuApiClient: HTTP request error during search: {e}")
raise
soup = BeautifulSoup(html_content, 'html.parser')
processed_decisions: List[RekabetDecisionSummary] = []
total_records: Optional[int] = None
total_pages: Optional[int] = None
pagination_div = soup.find("div", class_="yazi01")
if pagination_div:
text_content = pagination_div.get_text(separator=" ", strip=True)
total_match = re.search(r"Toplam\s*:\s*(\d+)", text_content)
if total_match:
try:
total_records = int(total_match.group(1))
logger.debug(f"Total records found from pagination: {total_records}")
except ValueError:
logger.warning(f"Could not convert 'Toplam' value to int: {total_match.group(1)}")
else:
logger.warning("'Toplam :' string not found in pagination section.")
results_per_page_assumed = 10
if total_records is not None:
calculated_total_pages = math.ceil(total_records / results_per_page_assumed)
total_pages = calculated_total_pages if calculated_total_pages > 0 else (1 if total_records > 0 else 0)
logger.debug(f"Calculated total pages: {total_pages}")
if total_pages is None: # Fallback if total_records couldn't be parsed
last_page_link = pagination_div.select_one("li.PagedList-skipToLast a")
if last_page_link and last_page_link.has_attr('href'):
qs = parse_qs(urlparse(last_page_link['href']).query)
if 'page' in qs and qs['page']:
try:
total_pages = int(qs['page'][0])
logger.debug(f"Total pages found from 'Last >>' link: {total_pages}")
except ValueError:
logger.warning(f"Could not convert page value from 'Last >>' link to int: {qs['page'][0]}")
elif total_records == 0 : total_pages = 0 # If no records, 0 pages
elif total_records is not None and total_records > 0 : total_pages = 1 # If records exist but no last page link (e.g. single page)
else: logger.warning("'Last >>' link not found in pagination section.")
decision_tables_container = soup.find("div", id="kararList")
if not decision_tables_container:
logger.warning("`div#kararList` (decision list container) not found. HTML structure might have changed or no decisions on this page.")
else:
decision_tables = decision_tables_container.find_all("table", class_="equalDivide")
logger.info(f"Found {len(decision_tables)} 'table' elements with class='equalDivide' for parsing.")
if not decision_tables and total_records is not None and total_records > 0 :
logger.warning(f"Page indicates {total_records} records but no decision tables found with class='equalDivide'.")
for idx, table in enumerate(decision_tables):
logger.debug(f"Processing table {idx + 1}...")
try:
rows = table.find_all("tr")
if len(rows) != 3:
logger.warning(f"Table {idx + 1} has an unexpected number of rows ({len(rows)} instead of 3). Skipping. HTML snippet:\n{table.prettify()[:500]}")
continue
# Row 1: Publication Date, Decision Number, Related Cases Link
td_elements_r1 = rows[0].find_all("td")
pub_date = td_elements_r1[0].get_text(strip=True) if len(td_elements_r1) > 0 else None
dec_num = td_elements_r1[1].get_text(strip=True) if len(td_elements_r1) > 1 else None
related_cases_link_tag = td_elements_r1[2].find("a", href=True) if len(td_elements_r1) > 2 else None
related_cases_url_str: Optional[str] = None
karar_id_from_related: Optional[str] = None
if related_cases_link_tag and related_cases_link_tag.has_attr('href'):
related_cases_url_str = urljoin(self.BASE_URL, related_cases_link_tag['href'])
qs_related = parse_qs(urlparse(related_cases_link_tag['href']).query)
if 'kararId' in qs_related and qs_related['kararId']:
karar_id_from_related = qs_related['kararId'][0]
# Row 2: Decision Date, Decision Type
td_elements_r2 = rows[1].find_all("td")
dec_date = td_elements_r2[0].get_text(strip=True) if len(td_elements_r2) > 0 else None
dec_type_text = td_elements_r2[1].get_text(strip=True) if len(td_elements_r2) > 1 else None
# Row 3: Title and Main Decision Link
title_cell = rows[2].find("td", colspan="5")
decision_link_tag = title_cell.find("a", href=True) if title_cell else None
title_text: Optional[str] = None
decision_landing_url_str: Optional[str] = None
karar_id_from_main_link: Optional[str] = None
if decision_link_tag and decision_link_tag.has_attr('href'):
title_text = decision_link_tag.get_text(strip=True)
href_val = decision_link_tag['href']
if href_val.startswith(self.DECISION_LANDING_PATH_TEMPLATE + "?kararId="): # Ensure it's a decision link
decision_landing_url_str = urljoin(self.BASE_URL, href_val)
qs_main = parse_qs(urlparse(href_val).query)
if 'kararId' in qs_main and qs_main['kararId']:
karar_id_from_main_link = qs_main['kararId'][0]
else:
logger.warning(f"Table {idx+1} decision link has unexpected format: {href_val}")
else:
logger.warning(f"Table {idx+1} could not find title/decision link tag.")
current_karar_id = karar_id_from_main_link or karar_id_from_related
if not current_karar_id:
logger.warning(f"Table {idx+1} Karar ID not found. Skipping. Title (if any): {title_text}")
continue
# Convert string URLs to HttpUrl for the model
final_decision_url = HttpUrl(decision_landing_url_str) if decision_landing_url_str else None
final_related_cases_url = HttpUrl(related_cases_url_str) if related_cases_url_str else None
processed_decisions.append(RekabetDecisionSummary(
publication_date=pub_date, decision_number=dec_num, decision_date=dec_date,
decision_type_text=dec_type_text, title=title_text,
decision_url=final_decision_url,
karar_id=current_karar_id,
related_cases_url=final_related_cases_url
))
logger.debug(f"Table {idx+1} parsed successfully: Karar ID '{current_karar_id}', Title '{title_text[:50] if title_text else 'N/A'}...'")
except Exception as e:
logger.warning(f"RekabetKurumuApiClient: Error parsing decision summary {idx+1}: {e}. Problematic Table HTML:\n{table.prettify()}", exc_info=True)
continue
return RekabetSearchResult(
decisions=processed_decisions, total_records_found=total_records,
retrieved_page_number=params.page, total_pages=total_pages if total_pages is not None else 0
)
async def _extract_pdf_url_and_landing_page_metadata(self, karar_id: str, landing_page_html: str, landing_page_url: str) -> Dict[str, Any]:
soup = BeautifulSoup(landing_page_html, 'html.parser')
data: Dict[str, Any] = {
"pdf_url": None,
"title_on_landing_page": soup.title.string.strip() if soup.title and soup.title.string else f"Rekabet Kurumu Kararı {karar_id}",
}
# This part needs to be robust and specific to Rekabet Kurumu's landing page structure.
# Look for common patterns: direct links, download buttons, embedded viewers.
pdf_anchor = soup.find("a", href=re.compile(r"\.pdf(\?|$)", re.IGNORECASE)) # Basic PDF link
if not pdf_anchor: # Try other common patterns if the basic one fails
# Example: Look for links with specific text or class
pdf_anchor = soup.find("a", string=re.compile(r"karar metni|pdf indir", re.IGNORECASE))
if pdf_anchor and pdf_anchor.has_attr('href'):
pdf_path = pdf_anchor['href']
data["pdf_url"] = urljoin(landing_page_url, pdf_path)
logger.info(f"PDF link found on landing page (<a>): {data['pdf_url']}")
else:
iframe_pdf = soup.find("iframe", src=re.compile(r"\.pdf(\?|$)", re.IGNORECASE))
if iframe_pdf and iframe_pdf.has_attr('src'):
pdf_path = iframe_pdf['src']
data["pdf_url"] = urljoin(landing_page_url, pdf_path)
logger.info(f"PDF link found on landing page (<iframe>): {data['pdf_url']}")
else:
embed_pdf = soup.find("embed", src=re.compile(r"\.pdf(\?|$)", re.IGNORECASE), type="application/pdf")
if embed_pdf and embed_pdf.has_attr('src'):
pdf_path = embed_pdf['src']
data["pdf_url"] = urljoin(landing_page_url, pdf_path)
logger.info(f"PDF link found on landing page (<embed>): {data['pdf_url']}")
else:
logger.warning(f"No PDF link found on landing page {landing_page_url} for kararId {karar_id} using common selectors.")
return data
async def _download_pdf_bytes(self, pdf_url: str) -> Optional[bytes]:
try:
url_to_fetch = pdf_url if pdf_url.startswith(('http://', 'https://')) else urljoin(self.BASE_URL, pdf_url)
logger.info(f"Downloading PDF from: {url_to_fetch}")
response = await self.http_client.get(url_to_fetch)
response.raise_for_status()
pdf_bytes = await response.aread()
logger.info(f"PDF content downloaded ({len(pdf_bytes)} bytes) from: {url_to_fetch}")
return pdf_bytes
except httpx.RequestError as e:
logger.error(f"HTTP error downloading PDF from {pdf_url}: {e}")
except Exception as e:
logger.error(f"General error downloading PDF from {pdf_url}: {e}")
return None
def _extract_single_pdf_page_as_pdf_bytes(self, original_pdf_bytes: bytes, page_number_to_extract: int) -> Tuple[Optional[bytes], int]:
total_pages_in_original_pdf = 0
single_page_pdf_bytes: Optional[bytes] = None
if not original_pdf_bytes:
logger.warning("No original PDF bytes provided for page extraction.")
return None, 0
try:
pdf_stream = io.BytesIO(original_pdf_bytes)
reader = PdfReader(pdf_stream)
total_pages_in_original_pdf = len(reader.pages)
if not (0 < page_number_to_extract <= total_pages_in_original_pdf):
logger.warning(f"Requested page number ({page_number_to_extract}) is out of PDF page range (1-{total_pages_in_original_pdf}).")
return None, total_pages_in_original_pdf
writer = PdfWriter()
writer.add_page(reader.pages[page_number_to_extract - 1]) # pypdf is 0-indexed
output_pdf_stream = io.BytesIO()
writer.write(output_pdf_stream)
single_page_pdf_bytes = output_pdf_stream.getvalue()
logger.debug(f"Page {page_number_to_extract} of original PDF (total {total_pages_in_original_pdf} pages) extracted as new PDF using pypdf.")
except Exception as e:
logger.error(f"Error extracting PDF page using pypdf: {e}", exc_info=True)
return None, total_pages_in_original_pdf
return single_page_pdf_bytes, total_pages_in_original_pdf
def _convert_pdf_bytes_to_markdown(self, pdf_bytes: bytes, source_url_for_logging: str) -> Optional[str]:
if not pdf_bytes:
logger.warning(f"No PDF bytes provided for Markdown conversion (source: {source_url_for_logging}).")
return None
pdf_stream = io.BytesIO(pdf_bytes)
try:
md_converter = MarkItDown(enable_plugins=False)
conversion_result = md_converter.convert(pdf_stream)
markdown_text = conversion_result.text_content
if not markdown_text:
logger.warning(f"MarkItDown returned empty content from PDF byte stream (source: {source_url_for_logging}). PDF page might be image-based or MarkItDown could not process the PDF stream.")
return markdown_text
except Exception as e:
logger.error(f"MarkItDown conversion error for PDF byte stream (source: {source_url_for_logging}): {e}", exc_info=True)
return None
async def get_decision_document(self, karar_id: str, page_number: int = 1) -> RekabetDocument:
if not karar_id:
return RekabetDocument(
source_landing_page_url=HttpUrl(f"{self.BASE_URL}"),
karar_id=karar_id or "UNKNOWN_KARAR_ID",
error_message="karar_id is required.",
current_page=1, total_pages=0, is_paginated=False )
decision_url_path = f"{self.DECISION_LANDING_PATH_TEMPLATE}?kararId={karar_id}"
full_landing_page_url = urljoin(self.BASE_URL, decision_url_path)
logger.info(f"RekabetKurumuApiClient: Getting decision document: {full_landing_page_url}, Requested PDF Page: {page_number}")
pdf_url_to_report: Optional[HttpUrl] = None
title_to_report: Optional[str] = f"Rekabet Kurumu Kararı {karar_id}" # Default
error_message: Optional[str] = None
markdown_for_requested_page: Optional[str] = None
total_pdf_pages: int = 0
try:
async with self.http_client.stream("GET", full_landing_page_url) as response:
response.raise_for_status()
content_type = response.headers.get("content-type", "").lower()
final_url_of_response = HttpUrl(str(response.url))
original_pdf_bytes: Optional[bytes] = None
if "application/pdf" in content_type:
logger.info(f"URL {final_url_of_response} is a direct PDF. Processing content.")
pdf_url_to_report = final_url_of_response
original_pdf_bytes = await response.aread()
elif "text/html" in content_type:
logger.info(f"URL {final_url_of_response} is an HTML landing page. Looking for PDF link.")
landing_page_html_bytes = await response.aread()
detected_charset = response.charset_encoding or 'utf-8'
try: landing_page_html = landing_page_html_bytes.decode(detected_charset)
except UnicodeDecodeError: landing_page_html = landing_page_html_bytes.decode('utf-8', errors='replace')
if landing_page_html.strip():
landing_page_data = self._extract_pdf_url_and_landing_page_metadata(karar_id, landing_page_html, str(final_url_of_response))
pdf_url_str_from_html = landing_page_data.get("pdf_url")
if landing_page_data.get("title_on_landing_page"): title_to_report = landing_page_data.get("title_on_landing_page")
if pdf_url_str_from_html:
pdf_url_to_report = HttpUrl(pdf_url_str_from_html)
original_pdf_bytes = await self._download_pdf_bytes(str(pdf_url_to_report))
else: error_message = (error_message or "") + " PDF URL not found on HTML landing page."
else: error_message = "Decision landing page content is empty."
else: error_message = f"Unexpected content type ({content_type}) for URL: {final_url_of_response}"
if original_pdf_bytes:
single_page_pdf_bytes, total_pdf_pages_from_extraction = self._extract_single_pdf_page_as_pdf_bytes(original_pdf_bytes, page_number)
total_pdf_pages = total_pdf_pages_from_extraction
if single_page_pdf_bytes:
markdown_for_requested_page = self._convert_pdf_bytes_to_markdown(single_page_pdf_bytes, str(pdf_url_to_report or full_landing_page_url))
if not markdown_for_requested_page:
error_message = (error_message or "") + f"; Could not convert page {page_number} of PDF to Markdown."
elif total_pdf_pages > 0 :
error_message = (error_message or "") + f"; Could not extract page {page_number} from PDF (page may be out of range or extraction failed)."
else:
error_message = (error_message or "") + "; PDF could not be processed or page count was zero (original PDF might be invalid)."
elif not error_message:
error_message = "PDF content could not be downloaded or identified."
is_paginated = total_pdf_pages > 1
current_page_final = page_number
if total_pdf_pages > 0:
current_page_final = max(1, min(page_number, total_pdf_pages))
elif markdown_for_requested_page is None:
current_page_final = 1
# If markdown is None but there was no specific error for markdown conversion (e.g. PDF not found first)
# make sure error_message reflects that.
if markdown_for_requested_page is None and pdf_url_to_report and not error_message:
error_message = (error_message or "") + "; Failed to produce Markdown from PDF page."
return RekabetDocument(
source_landing_page_url=full_landing_page_url, karar_id=karar_id,
title_on_landing_page=title_to_report, pdf_url=pdf_url_to_report,
markdown_chunk=markdown_for_requested_page, current_page=current_page_final,
total_pages=total_pdf_pages, is_paginated=is_paginated,
error_message=error_message.strip("; ") if error_message else None )
except httpx.HTTPStatusError as e: error_msg_detail = f"HTTP Status error {e.response.status_code} while processing decision page."
except httpx.RequestError as e: error_msg_detail = f"HTTP Request error while processing decision page: {str(e)}"
except Exception as e: error_msg_detail = f"General error while processing decision: {str(e)}"
exc_info_flag = not isinstance(e, (httpx.HTTPStatusError, httpx.RequestError)) if 'e' in locals() else True
logger.error(f"RekabetKurumuApiClient: Error processing decision {karar_id} from {full_landing_page_url}: {error_msg_detail}", exc_info=exc_info_flag)
error_message = (error_message + "; " if error_message else "") + error_msg_detail
return RekabetDocument(
source_landing_page_url=full_landing_page_url, karar_id=karar_id,
title_on_landing_page=title_to_report, pdf_url=pdf_url_to_report,
markdown_chunk=None, current_page=page_number, total_pages=0, is_paginated=False,
error_message=error_message.strip("; ") if error_message else "An unexpected error occurred." )
async def close_client_session(self): # Pragma: no cover
if hasattr(self, 'http_client') and self.http_client and not self.http_client.is_closed:
await self.http_client.aclose()
logger.info("RekabetKurumuApiClient: HTTP client session closed.")
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/rekabet_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# rekabet_mcp_module/client.py
import httpx
from bs4 import BeautifulSoup
from typing import List, Optional, Tuple, Dict, Any
import logging
import html
import re
import io # For io.BytesIO
from urllib.parse import urlencode, urljoin, quote, parse_qs, urlparse
from markitdown import MarkItDown
import math
# pypdf for PDF processing (lighter alternative to PyMuPDF)
from pypdf import PdfReader, PdfWriter # PyPDF2'nin devamı niteliğindeki pypdf
from .models import (
RekabetKurumuSearchRequest,
RekabetDecisionSummary,
RekabetSearchResult,
RekabetDocument,
RekabetKararTuruGuidEnum
)
from pydantic import HttpUrl # Ensure HttpUrl is imported from pydantic
logger = logging.getLogger(__name__)
if not logger.hasHandlers(): # Pragma: no cover
logging.basicConfig(
level=logging.INFO, # Varsayılan log seviyesi
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Debug betiğinde daha detaylı loglama için seviye ayrıca ayarlanabilir.
class RekabetKurumuApiClient:
BASE_URL = "https://www.rekabet.gov.tr"
SEARCH_PATH = "/tr/Kararlar"
DECISION_LANDING_PATH_TEMPLATE = "/Karar"
# PDF sayfa bazlı Markdown döndürüldüğü için bu sabit artık doğrudan kullanılmıyor.
# DOCUMENT_MARKDOWN_CHUNK_SIZE = 5000
def __init__(self, request_timeout: float = 60.0):
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
},
timeout=request_timeout,
verify=True,
follow_redirects=True
)
def _build_search_query_params(self, params: RekabetKurumuSearchRequest) -> List[Tuple[str, str]]:
query_params: List[Tuple[str, str]] = []
query_params.append(("sayfaAdi", params.sayfaAdi if params.sayfaAdi is not None else ""))
query_params.append(("YayinlanmaTarihi", params.YayinlanmaTarihi if params.YayinlanmaTarihi is not None else ""))
query_params.append(("PdfText", params.PdfText if params.PdfText is not None else ""))
karar_turu_id_value = ""
if params.KararTuruID is not None:
karar_turu_id_value = params.KararTuruID.value if params.KararTuruID.value != "ALL" else ""
query_params.append(("KararTuruID", karar_turu_id_value))
query_params.append(("KararSayisi", params.KararSayisi if params.KararSayisi is not None else ""))
query_params.append(("KararTarihi", params.KararTarihi if params.KararTarihi is not None else ""))
if params.page and params.page > 1:
query_params.append(("page", str(params.page)))
return query_params
async def search_decisions(self, params: RekabetKurumuSearchRequest) -> RekabetSearchResult:
request_path = self.SEARCH_PATH
final_query_params = self._build_search_query_params(params)
logger.info(f"RekabetKurumuApiClient: Performing search. Path: {request_path}, Parameters: {final_query_params}")
try:
response = await self.http_client.get(request_path, params=final_query_params)
response.raise_for_status()
html_content = response.text
except httpx.RequestError as e:
logger.error(f"RekabetKurumuApiClient: HTTP request error during search: {e}")
raise
soup = BeautifulSoup(html_content, 'html.parser')
processed_decisions: List[RekabetDecisionSummary] = []
total_records: Optional[int] = None
total_pages: Optional[int] = None
pagination_div = soup.find("div", class_="yazi01")
if pagination_div:
text_content = pagination_div.get_text(separator=" ", strip=True)
total_match = re.search(r"Toplam\s*:\s*(\d+)", text_content)
if total_match:
try:
total_records = int(total_match.group(1))
logger.debug(f"Total records found from pagination: {total_records}")
except ValueError:
logger.warning(f"Could not convert 'Toplam' value to int: {total_match.group(1)}")
else:
logger.warning("'Toplam :' string not found in pagination section.")
results_per_page_assumed = 10
if total_records is not None:
calculated_total_pages = math.ceil(total_records / results_per_page_assumed)
total_pages = calculated_total_pages if calculated_total_pages > 0 else (1 if total_records > 0 else 0)
logger.debug(f"Calculated total pages: {total_pages}")
if total_pages is None: # Fallback if total_records couldn't be parsed
last_page_link = pagination_div.select_one("li.PagedList-skipToLast a")
if last_page_link and last_page_link.has_attr('href'):
qs = parse_qs(urlparse(last_page_link['href']).query)
if 'page' in qs and qs['page']:
try:
total_pages = int(qs['page'][0])
logger.debug(f"Total pages found from 'Last >>' link: {total_pages}")
except ValueError:
logger.warning(f"Could not convert page value from 'Last >>' link to int: {qs['page'][0]}")
elif total_records == 0 : total_pages = 0 # If no records, 0 pages
elif total_records is not None and total_records > 0 : total_pages = 1 # If records exist but no last page link (e.g. single page)
else: logger.warning("'Last >>' link not found in pagination section.")
decision_tables_container = soup.find("div", id="kararList")
if not decision_tables_container:
logger.warning("`div#kararList` (decision list container) not found. HTML structure might have changed or no decisions on this page.")
else:
decision_tables = decision_tables_container.find_all("table", class_="equalDivide")
logger.info(f"Found {len(decision_tables)} 'table' elements with class='equalDivide' for parsing.")
if not decision_tables and total_records is not None and total_records > 0 :
logger.warning(f"Page indicates {total_records} records but no decision tables found with class='equalDivide'.")
for idx, table in enumerate(decision_tables):
logger.debug(f"Processing table {idx + 1}...")
try:
rows = table.find_all("tr")
if len(rows) != 3:
logger.warning(f"Table {idx + 1} has an unexpected number of rows ({len(rows)} instead of 3). Skipping. HTML snippet:\n{table.prettify()[:500]}")
continue
# Row 1: Publication Date, Decision Number, Related Cases Link
td_elements_r1 = rows[0].find_all("td")
pub_date = td_elements_r1[0].get_text(strip=True) if len(td_elements_r1) > 0 else None
dec_num = td_elements_r1[1].get_text(strip=True) if len(td_elements_r1) > 1 else None
related_cases_link_tag = td_elements_r1[2].find("a", href=True) if len(td_elements_r1) > 2 else None
related_cases_url_str: Optional[str] = None
karar_id_from_related: Optional[str] = None
if related_cases_link_tag and related_cases_link_tag.has_attr('href'):
related_cases_url_str = urljoin(self.BASE_URL, related_cases_link_tag['href'])
qs_related = parse_qs(urlparse(related_cases_link_tag['href']).query)
if 'kararId' in qs_related and qs_related['kararId']:
karar_id_from_related = qs_related['kararId'][0]
# Row 2: Decision Date, Decision Type
td_elements_r2 = rows[1].find_all("td")
dec_date = td_elements_r2[0].get_text(strip=True) if len(td_elements_r2) > 0 else None
dec_type_text = td_elements_r2[1].get_text(strip=True) if len(td_elements_r2) > 1 else None
# Row 3: Title and Main Decision Link
title_cell = rows[2].find("td", colspan="5")
decision_link_tag = title_cell.find("a", href=True) if title_cell else None
title_text: Optional[str] = None
decision_landing_url_str: Optional[str] = None
karar_id_from_main_link: Optional[str] = None
if decision_link_tag and decision_link_tag.has_attr('href'):
title_text = decision_link_tag.get_text(strip=True)
href_val = decision_link_tag['href']
if href_val.startswith(self.DECISION_LANDING_PATH_TEMPLATE + "?kararId="): # Ensure it's a decision link
decision_landing_url_str = urljoin(self.BASE_URL, href_val)
qs_main = parse_qs(urlparse(href_val).query)
if 'kararId' in qs_main and qs_main['kararId']:
karar_id_from_main_link = qs_main['kararId'][0]
else:
logger.warning(f"Table {idx+1} decision link has unexpected format: {href_val}")
else:
logger.warning(f"Table {idx+1} could not find title/decision link tag.")
current_karar_id = karar_id_from_main_link or karar_id_from_related
if not current_karar_id:
logger.warning(f"Table {idx+1} Karar ID not found. Skipping. Title (if any): {title_text}")
continue
# Convert string URLs to HttpUrl for the model
final_decision_url = HttpUrl(decision_landing_url_str) if decision_landing_url_str else None
final_related_cases_url = HttpUrl(related_cases_url_str) if related_cases_url_str else None
processed_decisions.append(RekabetDecisionSummary(
publication_date=pub_date, decision_number=dec_num, decision_date=dec_date,
decision_type_text=dec_type_text, title=title_text,
decision_url=final_decision_url,
karar_id=current_karar_id,
related_cases_url=final_related_cases_url
))
logger.debug(f"Table {idx+1} parsed successfully: Karar ID '{current_karar_id}', Title '{title_text[:50] if title_text else 'N/A'}...'")
except Exception as e:
logger.warning(f"RekabetKurumuApiClient: Error parsing decision summary {idx+1}: {e}. Problematic Table HTML:\n{table.prettify()}", exc_info=True)
continue
return RekabetSearchResult(
decisions=processed_decisions, total_records_found=total_records,
retrieved_page_number=params.page, total_pages=total_pages if total_pages is not None else 0
)
async def _extract_pdf_url_and_landing_page_metadata(self, karar_id: str, landing_page_html: str, landing_page_url: str) -> Dict[str, Any]:
soup = BeautifulSoup(landing_page_html, 'html.parser')
data: Dict[str, Any] = {
"pdf_url": None,
"title_on_landing_page": soup.title.string.strip() if soup.title and soup.title.string else f"Rekabet Kurumu Kararı {karar_id}",
}
# This part needs to be robust and specific to Rekabet Kurumu's landing page structure.
# Look for common patterns: direct links, download buttons, embedded viewers.
pdf_anchor = soup.find("a", href=re.compile(r"\.pdf(\?|$)", re.IGNORECASE)) # Basic PDF link
if not pdf_anchor: # Try other common patterns if the basic one fails
# Example: Look for links with specific text or class
pdf_anchor = soup.find("a", string=re.compile(r"karar metni|pdf indir", re.IGNORECASE))
if pdf_anchor and pdf_anchor.has_attr('href'):
pdf_path = pdf_anchor['href']
data["pdf_url"] = urljoin(landing_page_url, pdf_path)
logger.info(f"PDF link found on landing page (<a>): {data['pdf_url']}")
else:
iframe_pdf = soup.find("iframe", src=re.compile(r"\.pdf(\?|$)", re.IGNORECASE))
if iframe_pdf and iframe_pdf.has_attr('src'):
pdf_path = iframe_pdf['src']
data["pdf_url"] = urljoin(landing_page_url, pdf_path)
logger.info(f"PDF link found on landing page (<iframe>): {data['pdf_url']}")
else:
embed_pdf = soup.find("embed", src=re.compile(r"\.pdf(\?|$)", re.IGNORECASE), type="application/pdf")
if embed_pdf and embed_pdf.has_attr('src'):
pdf_path = embed_pdf['src']
data["pdf_url"] = urljoin(landing_page_url, pdf_path)
logger.info(f"PDF link found on landing page (<embed>): {data['pdf_url']}")
else:
logger.warning(f"No PDF link found on landing page {landing_page_url} for kararId {karar_id} using common selectors.")
return data
async def _download_pdf_bytes(self, pdf_url: str) -> Optional[bytes]:
try:
url_to_fetch = pdf_url if pdf_url.startswith(('http://', 'https://')) else urljoin(self.BASE_URL, pdf_url)
logger.info(f"Downloading PDF from: {url_to_fetch}")
response = await self.http_client.get(url_to_fetch)
response.raise_for_status()
pdf_bytes = await response.aread()
logger.info(f"PDF content downloaded ({len(pdf_bytes)} bytes) from: {url_to_fetch}")
return pdf_bytes
except httpx.RequestError as e:
logger.error(f"HTTP error downloading PDF from {pdf_url}: {e}")
except Exception as e:
logger.error(f"General error downloading PDF from {pdf_url}: {e}")
return None
def _extract_single_pdf_page_as_pdf_bytes(self, original_pdf_bytes: bytes, page_number_to_extract: int) -> Tuple[Optional[bytes], int]:
total_pages_in_original_pdf = 0
single_page_pdf_bytes: Optional[bytes] = None
if not original_pdf_bytes:
logger.warning("No original PDF bytes provided for page extraction.")
return None, 0
try:
pdf_stream = io.BytesIO(original_pdf_bytes)
reader = PdfReader(pdf_stream)
total_pages_in_original_pdf = len(reader.pages)
if not (0 < page_number_to_extract <= total_pages_in_original_pdf):
logger.warning(f"Requested page number ({page_number_to_extract}) is out of PDF page range (1-{total_pages_in_original_pdf}).")
return None, total_pages_in_original_pdf
writer = PdfWriter()
writer.add_page(reader.pages[page_number_to_extract - 1]) # pypdf is 0-indexed
output_pdf_stream = io.BytesIO()
writer.write(output_pdf_stream)
single_page_pdf_bytes = output_pdf_stream.getvalue()
logger.debug(f"Page {page_number_to_extract} of original PDF (total {total_pages_in_original_pdf} pages) extracted as new PDF using pypdf.")
except Exception as e:
logger.error(f"Error extracting PDF page using pypdf: {e}", exc_info=True)
return None, total_pages_in_original_pdf
return single_page_pdf_bytes, total_pages_in_original_pdf
def _convert_pdf_bytes_to_markdown(self, pdf_bytes: bytes, source_url_for_logging: str) -> Optional[str]:
if not pdf_bytes:
logger.warning(f"No PDF bytes provided for Markdown conversion (source: {source_url_for_logging}).")
return None
pdf_stream = io.BytesIO(pdf_bytes)
try:
md_converter = MarkItDown(enable_plugins=False)
conversion_result = md_converter.convert(pdf_stream)
markdown_text = conversion_result.text_content
if not markdown_text:
logger.warning(f"MarkItDown returned empty content from PDF byte stream (source: {source_url_for_logging}). PDF page might be image-based or MarkItDown could not process the PDF stream.")
return markdown_text
except Exception as e:
logger.error(f"MarkItDown conversion error for PDF byte stream (source: {source_url_for_logging}): {e}", exc_info=True)
return None
async def get_decision_document(self, karar_id: str, page_number: int = 1) -> RekabetDocument:
if not karar_id:
return RekabetDocument(
source_landing_page_url=HttpUrl(f"{self.BASE_URL}"),
karar_id=karar_id or "UNKNOWN_KARAR_ID",
error_message="karar_id is required.",
current_page=1, total_pages=0, is_paginated=False )
decision_url_path = f"{self.DECISION_LANDING_PATH_TEMPLATE}?kararId={karar_id}"
full_landing_page_url = urljoin(self.BASE_URL, decision_url_path)
logger.info(f"RekabetKurumuApiClient: Getting decision document: {full_landing_page_url}, Requested PDF Page: {page_number}")
pdf_url_to_report: Optional[HttpUrl] = None
title_to_report: Optional[str] = f"Rekabet Kurumu Kararı {karar_id}" # Default
error_message: Optional[str] = None
markdown_for_requested_page: Optional[str] = None
total_pdf_pages: int = 0
try:
async with self.http_client.stream("GET", full_landing_page_url) as response:
response.raise_for_status()
content_type = response.headers.get("content-type", "").lower()
final_url_of_response = HttpUrl(str(response.url))
original_pdf_bytes: Optional[bytes] = None
if "application/pdf" in content_type:
logger.info(f"URL {final_url_of_response} is a direct PDF. Processing content.")
pdf_url_to_report = final_url_of_response
original_pdf_bytes = await response.aread()
elif "text/html" in content_type:
logger.info(f"URL {final_url_of_response} is an HTML landing page. Looking for PDF link.")
landing_page_html_bytes = await response.aread()
detected_charset = response.charset_encoding or 'utf-8'
try: landing_page_html = landing_page_html_bytes.decode(detected_charset)
except UnicodeDecodeError: landing_page_html = landing_page_html_bytes.decode('utf-8', errors='replace')
if landing_page_html.strip():
landing_page_data = self._extract_pdf_url_and_landing_page_metadata(karar_id, landing_page_html, str(final_url_of_response))
pdf_url_str_from_html = landing_page_data.get("pdf_url")
if landing_page_data.get("title_on_landing_page"): title_to_report = landing_page_data.get("title_on_landing_page")
if pdf_url_str_from_html:
pdf_url_to_report = HttpUrl(pdf_url_str_from_html)
original_pdf_bytes = await self._download_pdf_bytes(str(pdf_url_to_report))
else: error_message = (error_message or "") + " PDF URL not found on HTML landing page."
else: error_message = "Decision landing page content is empty."
else: error_message = f"Unexpected content type ({content_type}) for URL: {final_url_of_response}"
if original_pdf_bytes:
single_page_pdf_bytes, total_pdf_pages_from_extraction = self._extract_single_pdf_page_as_pdf_bytes(original_pdf_bytes, page_number)
total_pdf_pages = total_pdf_pages_from_extraction
if single_page_pdf_bytes:
markdown_for_requested_page = self._convert_pdf_bytes_to_markdown(single_page_pdf_bytes, str(pdf_url_to_report or full_landing_page_url))
if not markdown_for_requested_page:
error_message = (error_message or "") + f"; Could not convert page {page_number} of PDF to Markdown."
elif total_pdf_pages > 0 :
error_message = (error_message or "") + f"; Could not extract page {page_number} from PDF (page may be out of range or extraction failed)."
else:
error_message = (error_message or "") + "; PDF could not be processed or page count was zero (original PDF might be invalid)."
elif not error_message:
error_message = "PDF content could not be downloaded or identified."
is_paginated = total_pdf_pages > 1
current_page_final = page_number
if total_pdf_pages > 0:
current_page_final = max(1, min(page_number, total_pdf_pages))
elif markdown_for_requested_page is None:
current_page_final = 1
# If markdown is None but there was no specific error for markdown conversion (e.g. PDF not found first)
# make sure error_message reflects that.
if markdown_for_requested_page is None and pdf_url_to_report and not error_message:
error_message = (error_message or "") + "; Failed to produce Markdown from PDF page."
return RekabetDocument(
source_landing_page_url=full_landing_page_url, karar_id=karar_id,
title_on_landing_page=title_to_report, pdf_url=pdf_url_to_report,
markdown_chunk=markdown_for_requested_page, current_page=current_page_final,
total_pages=total_pdf_pages, is_paginated=is_paginated,
error_message=error_message.strip("; ") if error_message else None )
except httpx.HTTPStatusError as e: error_msg_detail = f"HTTP Status error {e.response.status_code} while processing decision page."
except httpx.RequestError as e: error_msg_detail = f"HTTP Request error while processing decision page: {str(e)}"
except Exception as e: error_msg_detail = f"General error while processing decision: {str(e)}"
exc_info_flag = not isinstance(e, (httpx.HTTPStatusError, httpx.RequestError)) if 'e' in locals() else True
logger.error(f"RekabetKurumuApiClient: Error processing decision {karar_id} from {full_landing_page_url}: {error_msg_detail}", exc_info=exc_info_flag)
error_message = (error_message + "; " if error_message else "") + error_msg_detail
return RekabetDocument(
source_landing_page_url=full_landing_page_url, karar_id=karar_id,
title_on_landing_page=title_to_report, pdf_url=pdf_url_to_report,
markdown_chunk=None, current_page=page_number, total_pages=0, is_paginated=False,
error_message=error_message.strip("; ") if error_message else "An unexpected error occurred." )
async def close_client_session(self): # Pragma: no cover
if hasattr(self, 'http_client') and self.http_client and not self.http_client.is_closed:
await self.http_client.aclose()
logger.info("RekabetKurumuApiClient: HTTP client session closed.")
```
--------------------------------------------------------------------------------
/saidsurucu-yargi-mcp-f5fa007/sayistay_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# sayistay_mcp_module/client.py
import httpx
import re
from bs4 import BeautifulSoup
from typing import Dict, Any, List, Optional, Tuple
import logging
import html
import io
from urllib.parse import urlencode, urljoin
from markitdown import MarkItDown
from .models import (
GenelKurulSearchRequest, GenelKurulSearchResponse, GenelKurulDecision,
TemyizKuruluSearchRequest, TemyizKuruluSearchResponse, TemyizKuruluDecision,
DaireSearchRequest, DaireSearchResponse, DaireDecision,
SayistayDocumentMarkdown
)
from .enums import DaireEnum, KamuIdaresiTuruEnum, WebKararKonusuEnum, WEB_KARAR_KONUSU_MAPPING
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
class SayistayApiClient:
"""
API Client for Sayıştay (Turkish Court of Accounts) decision search system.
Handles three types of decisions:
- Genel Kurul (General Assembly): Precedent-setting interpretive decisions
- Temyiz Kurulu (Appeals Board): Appeals against chamber decisions
- Daire (Chamber): First-instance audit findings and sanctions
Features:
- ASP.NET WebForms session management with CSRF tokens
- DataTables-based pagination and filtering
- Automatic session refresh on expiration
- Document retrieval with Markdown conversion
"""
BASE_URL = "https://www.sayistay.gov.tr"
# Search endpoints for each decision type
GENEL_KURUL_ENDPOINT = "/KararlarGenelKurul/DataTablesList"
TEMYIZ_KURULU_ENDPOINT = "/KararlarTemyiz/DataTablesList"
DAIRE_ENDPOINT = "/KararlarDaire/DataTablesList"
# Page endpoints for session initialization and document access
GENEL_KURUL_PAGE = "/KararlarGenelKurul"
TEMYIZ_KURULU_PAGE = "/KararlarTemyiz"
DAIRE_PAGE = "/KararlarDaire"
def __init__(self, request_timeout: float = 60.0):
self.request_timeout = request_timeout
self.session_cookies: Dict[str, str] = {}
self.csrf_tokens: Dict[str, str] = {} # Store tokens for each endpoint
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
"X-Requested-With": "XMLHttpRequest",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin"
},
timeout=request_timeout,
follow_redirects=True
)
async def _initialize_session_for_endpoint(self, endpoint_type: str) -> bool:
"""
Initialize session and obtain CSRF token for specific endpoint.
Args:
endpoint_type: One of 'genel_kurul', 'temyiz_kurulu', 'daire'
Returns:
True if session initialized successfully, False otherwise
"""
page_mapping = {
'genel_kurul': self.GENEL_KURUL_PAGE,
'temyiz_kurulu': self.TEMYIZ_KURULU_PAGE,
'daire': self.DAIRE_PAGE
}
if endpoint_type not in page_mapping:
logger.error(f"Invalid endpoint type: {endpoint_type}")
return False
page_url = page_mapping[endpoint_type]
logger.info(f"Initializing session for {endpoint_type} endpoint: {page_url}")
try:
response = await self.http_client.get(page_url)
response.raise_for_status()
# Extract session cookies
for cookie_name, cookie_value in response.cookies.items():
self.session_cookies[cookie_name] = cookie_value
logger.debug(f"Stored session cookie: {cookie_name}")
# Extract CSRF token from form
soup = BeautifulSoup(response.text, 'html.parser')
csrf_input = soup.find('input', {'name': '__RequestVerificationToken'})
if csrf_input and csrf_input.get('value'):
self.csrf_tokens[endpoint_type] = csrf_input['value']
logger.info(f"Extracted CSRF token for {endpoint_type}")
return True
else:
logger.warning(f"CSRF token not found in {endpoint_type} page")
return False
except httpx.RequestError as e:
logger.error(f"HTTP error during session initialization for {endpoint_type}: {e}")
return False
except Exception as e:
logger.error(f"Error initializing session for {endpoint_type}: {e}")
return False
def _enum_to_form_value(self, enum_value: str, enum_type: str) -> str:
"""Convert enum values to form values expected by the API."""
if enum_value == "ALL":
if enum_type == "daire":
return "Tüm Daireler"
elif enum_type == "kamu_idaresi":
return "Tüm Kurumlar"
elif enum_type == "web_karar_konusu":
return "Tüm Konular"
# Apply web_karar_konusu mapping
if enum_type == "web_karar_konusu":
return WEB_KARAR_KONUSU_MAPPING.get(enum_value, enum_value)
return enum_value
def _build_datatables_params(self, start: int, length: int, draw: int = 1) -> List[Tuple[str, str]]:
"""Build standard DataTables parameters for all endpoints."""
params = [
("draw", str(draw)),
("start", str(start)),
("length", str(length)),
("search[value]", ""),
("search[regex]", "false")
]
return params
def _build_genel_kurul_form_data(self, params: GenelKurulSearchRequest, draw: int = 1) -> List[Tuple[str, str]]:
"""Build form data for Genel Kurul search request."""
form_data = self._build_datatables_params(params.start, params.length, draw)
# Add DataTables column definitions (from actual request)
column_defs = [
("columns[0][data]", "KARARNO"),
("columns[0][name]", ""),
("columns[0][searchable]", "true"),
("columns[0][orderable]", "false"),
("columns[0][search][value]", ""),
("columns[0][search][regex]", "false"),
("columns[1][data]", "KARARNO"),
("columns[1][name]", ""),
("columns[1][searchable]", "true"),
("columns[1][orderable]", "true"),
("columns[1][search][value]", ""),
("columns[1][search][regex]", "false"),
("columns[2][data]", "KARARTARIH"),
("columns[2][name]", ""),
("columns[2][searchable]", "true"),
("columns[2][orderable]", "true"),
("columns[2][search][value]", ""),
("columns[2][search][regex]", "false"),
("columns[3][data]", "KARAROZETI"),
("columns[3][name]", ""),
("columns[3][searchable]", "true"),
("columns[3][orderable]", "false"),
("columns[3][search][value]", ""),
("columns[3][search][regex]", "false"),
("columns[4][data]", ""),
("columns[4][name]", ""),
("columns[4][searchable]", "true"),
("columns[4][orderable]", "false"),
("columns[4][search][value]", ""),
("columns[4][search][regex]", "false"),
("order[0][column]", "2"),
("order[0][dir]", "desc")
]
form_data.extend(column_defs)
# Add search parameters
form_data.extend([
("KararlarGenelKurulAra.KARARNO", params.karar_no or ""),
("__Invariant[]", "KararlarGenelKurulAra.KARARNO"),
("__Invariant[]", "KararlarGenelKurulAra.KARAREK"),
("KararlarGenelKurulAra.KARAREK", params.karar_ek or ""),
("KararlarGenelKurulAra.KARARTARIHBaslangic", params.karar_tarih_baslangic or "Başlangıç Tarihi"),
("KararlarGenelKurulAra.KARARTARIHBitis", params.karar_tarih_bitis or "Bitiş Tarihi"),
("KararlarGenelKurulAra.KARARTAMAMI", params.karar_tamami or ""),
("__RequestVerificationToken", self.csrf_tokens.get('genel_kurul', ''))
])
return form_data
def _build_temyiz_kurulu_form_data(self, params: TemyizKuruluSearchRequest, draw: int = 1) -> List[Tuple[str, str]]:
"""Build form data for Temyiz Kurulu search request."""
form_data = self._build_datatables_params(params.start, params.length, draw)
# Add DataTables column definitions (from actual request)
column_defs = [
("columns[0][data]", "TEMYIZTUTANAKTARIHI"),
("columns[0][name]", ""),
("columns[0][searchable]", "true"),
("columns[0][orderable]", "false"),
("columns[0][search][value]", ""),
("columns[0][search][regex]", "false"),
("columns[1][data]", "TEMYIZTUTANAKTARIHI"),
("columns[1][name]", ""),
("columns[1][searchable]", "true"),
("columns[1][orderable]", "true"),
("columns[1][search][value]", ""),
("columns[1][search][regex]", "false"),
("columns[2][data]", "ILAMDAIRESI"),
("columns[2][name]", ""),
("columns[2][searchable]", "true"),
("columns[2][orderable]", "true"),
("columns[2][search][value]", ""),
("columns[2][search][regex]", "false"),
("columns[3][data]", "TEMYIZKARAR"),
("columns[3][name]", ""),
("columns[3][searchable]", "true"),
("columns[3][orderable]", "false"),
("columns[3][search][value]", ""),
("columns[3][search][regex]", "false"),
("columns[4][data]", ""),
("columns[4][name]", ""),
("columns[4][searchable]", "true"),
("columns[4][orderable]", "false"),
("columns[4][search][value]", ""),
("columns[4][search][regex]", "false"),
("order[0][column]", "1"),
("order[0][dir]", "desc")
]
form_data.extend(column_defs)
# Add search parameters
daire_value = self._enum_to_form_value(params.ilam_dairesi, "daire")
kamu_idaresi_value = self._enum_to_form_value(params.kamu_idaresi_turu, "kamu_idaresi")
web_karar_konusu_value = self._enum_to_form_value(params.web_karar_konusu, "web_karar_konusu")
form_data.extend([
("KararlarTemyizAra.ILAMDAIRESI", daire_value),
("KararlarTemyizAra.YILI", params.yili or ""),
("KararlarTemyizAra.KARARTRHBaslangic", params.karar_tarih_baslangic or ""),
("KararlarTemyizAra.KARARTRHBitis", params.karar_tarih_bitis or ""),
("KararlarTemyizAra.KAMUIDARESITURU", kamu_idaresi_value if kamu_idaresi_value != "Tüm Kurumlar" else ""),
("KararlarTemyizAra.ILAMNO", params.ilam_no or ""),
("KararlarTemyizAra.DOSYANO", params.dosya_no or ""),
("KararlarTemyizAra.TEMYIZTUTANAKNO", params.temyiz_tutanak_no or ""),
("__Invariant", "KararlarTemyizAra.TEMYIZTUTANAKNO"),
("KararlarTemyizAra.TEMYIZKARAR", params.temyiz_karar or ""),
("KararlarTemyizAra.WEBKARARKONUSU", web_karar_konusu_value if web_karar_konusu_value != "Tüm Konular" else ""),
("__RequestVerificationToken", self.csrf_tokens.get('temyiz_kurulu', ''))
])
return form_data
def _build_daire_form_data(self, params: DaireSearchRequest, draw: int = 1) -> List[Tuple[str, str]]:
"""Build form data for Daire search request."""
form_data = self._build_datatables_params(params.start, params.length, draw)
# Add DataTables column definitions (from actual request)
column_defs = [
("columns[0][data]", "YARGILAMADAIRESI"),
("columns[0][name]", ""),
("columns[0][searchable]", "true"),
("columns[0][orderable]", "false"),
("columns[0][search][value]", ""),
("columns[0][search][regex]", "false"),
("columns[1][data]", "KARARTRH"),
("columns[1][name]", ""),
("columns[1][searchable]", "true"),
("columns[1][orderable]", "true"),
("columns[1][search][value]", ""),
("columns[1][search][regex]", "false"),
("columns[2][data]", "KARARNO"),
("columns[2][name]", ""),
("columns[2][searchable]", "true"),
("columns[2][orderable]", "true"),
("columns[2][search][value]", ""),
("columns[2][search][regex]", "false"),
("columns[3][data]", "YARGILAMADAIRESI"),
("columns[3][name]", ""),
("columns[3][searchable]", "true"),
("columns[3][orderable]", "true"),
("columns[3][search][value]", ""),
("columns[3][search][regex]", "false"),
("columns[4][data]", "WEBKARARMETNI"),
("columns[4][name]", ""),
("columns[4][searchable]", "true"),
("columns[4][orderable]", "false"),
("columns[4][search][value]", ""),
("columns[4][search][regex]", "false"),
("columns[5][data]", ""),
("columns[5][name]", ""),
("columns[5][searchable]", "true"),
("columns[5][orderable]", "false"),
("columns[5][search][value]", ""),
("columns[5][search][regex]", "false"),
("order[0][column]", "2"),
("order[0][dir]", "desc")
]
form_data.extend(column_defs)
# Add search parameters
daire_value = self._enum_to_form_value(params.yargilama_dairesi, "daire")
kamu_idaresi_value = self._enum_to_form_value(params.kamu_idaresi_turu, "kamu_idaresi")
web_karar_konusu_value = self._enum_to_form_value(params.web_karar_konusu, "web_karar_konusu")
form_data.extend([
("KararlarDaireAra.YARGILAMADAIRESI", daire_value),
("KararlarDaireAra.KARARTRHBaslangic", params.karar_tarih_baslangic or ""),
("KararlarDaireAra.KARARTRHBitis", params.karar_tarih_bitis or ""),
("KararlarDaireAra.ILAMNO", params.ilam_no or ""),
("KararlarDaireAra.KAMUIDARESITURU", kamu_idaresi_value if kamu_idaresi_value != "Tüm Kurumlar" else ""),
("KararlarDaireAra.HESAPYILI", params.hesap_yili or ""),
("KararlarDaireAra.WEBKARARKONUSU", web_karar_konusu_value if web_karar_konusu_value != "Tüm Konular" else ""),
("KararlarDaireAra.WEBKARARMETNI", params.web_karar_metni or ""),
("__RequestVerificationToken", self.csrf_tokens.get('daire', ''))
])
return form_data
async def search_genel_kurul_decisions(self, params: GenelKurulSearchRequest) -> GenelKurulSearchResponse:
"""
Search Sayıştay Genel Kurul (General Assembly) decisions.
Args:
params: Search parameters for Genel Kurul decisions
Returns:
GenelKurulSearchResponse with matching decisions
"""
# Initialize session if needed
if 'genel_kurul' not in self.csrf_tokens:
if not await self._initialize_session_for_endpoint('genel_kurul'):
raise Exception("Failed to initialize session for Genel Kurul endpoint")
form_data = self._build_genel_kurul_form_data(params)
encoded_data = urlencode(form_data, encoding='utf-8')
logger.info(f"Searching Genel Kurul decisions with parameters: {params.model_dump(exclude_none=True)}")
try:
# Update headers with cookies
headers = self.http_client.headers.copy()
if self.session_cookies:
cookie_header = "; ".join([f"{k}={v}" for k, v in self.session_cookies.items()])
headers["Cookie"] = cookie_header
response = await self.http_client.post(
self.GENEL_KURUL_ENDPOINT,
data=encoded_data,
headers=headers
)
response.raise_for_status()
response_json = response.json()
# Parse response
decisions = []
for item in response_json.get('data', []):
decisions.append(GenelKurulDecision(
id=item['Id'],
karar_no=item['KARARNO'],
karar_tarih=item['KARARTARIH'],
karar_ozeti=item['KARAROZETI']
))
return GenelKurulSearchResponse(
decisions=decisions,
total_records=response_json.get('recordsTotal', 0),
total_filtered=response_json.get('recordsFiltered', 0),
draw=response_json.get('draw', 1)
)
except httpx.RequestError as e:
logger.error(f"HTTP error during Genel Kurul search: {e}")
raise
except Exception as e:
logger.error(f"Error processing Genel Kurul search: {e}")
raise
async def search_temyiz_kurulu_decisions(self, params: TemyizKuruluSearchRequest) -> TemyizKuruluSearchResponse:
"""
Search Sayıştay Temyiz Kurulu (Appeals Board) decisions.
Args:
params: Search parameters for Temyiz Kurulu decisions
Returns:
TemyizKuruluSearchResponse with matching decisions
"""
# Initialize session if needed
if 'temyiz_kurulu' not in self.csrf_tokens:
if not await self._initialize_session_for_endpoint('temyiz_kurulu'):
raise Exception("Failed to initialize session for Temyiz Kurulu endpoint")
form_data = self._build_temyiz_kurulu_form_data(params)
encoded_data = urlencode(form_data, encoding='utf-8')
logger.info(f"Searching Temyiz Kurulu decisions with parameters: {params.model_dump(exclude_none=True)}")
try:
# Update headers with cookies
headers = self.http_client.headers.copy()
if self.session_cookies:
cookie_header = "; ".join([f"{k}={v}" for k, v in self.session_cookies.items()])
headers["Cookie"] = cookie_header
response = await self.http_client.post(
self.TEMYIZ_KURULU_ENDPOINT,
data=encoded_data,
headers=headers
)
response.raise_for_status()
response_json = response.json()
# Parse response
decisions = []
for item in response_json.get('data', []):
decisions.append(TemyizKuruluDecision(
id=item['Id'],
temyiz_tutanak_tarihi=item['TEMYIZTUTANAKTARIHI'],
ilam_dairesi=item['ILAMDAIRESI'],
temyiz_karar=item['TEMYIZKARAR']
))
return TemyizKuruluSearchResponse(
decisions=decisions,
total_records=response_json.get('recordsTotal', 0),
total_filtered=response_json.get('recordsFiltered', 0),
draw=response_json.get('draw', 1)
)
except httpx.RequestError as e:
logger.error(f"HTTP error during Temyiz Kurulu search: {e}")
raise
except Exception as e:
logger.error(f"Error processing Temyiz Kurulu search: {e}")
raise
async def search_daire_decisions(self, params: DaireSearchRequest) -> DaireSearchResponse:
"""
Search Sayıştay Daire (Chamber) decisions.
Args:
params: Search parameters for Daire decisions
Returns:
DaireSearchResponse with matching decisions
"""
# Initialize session if needed
if 'daire' not in self.csrf_tokens:
if not await self._initialize_session_for_endpoint('daire'):
raise Exception("Failed to initialize session for Daire endpoint")
form_data = self._build_daire_form_data(params)
encoded_data = urlencode(form_data, encoding='utf-8')
logger.info(f"Searching Daire decisions with parameters: {params.model_dump(exclude_none=True)}")
try:
# Update headers with cookies
headers = self.http_client.headers.copy()
if self.session_cookies:
cookie_header = "; ".join([f"{k}={v}" for k, v in self.session_cookies.items()])
headers["Cookie"] = cookie_header
response = await self.http_client.post(
self.DAIRE_ENDPOINT,
data=encoded_data,
headers=headers
)
response.raise_for_status()
response_json = response.json()
# Parse response
decisions = []
for item in response_json.get('data', []):
decisions.append(DaireDecision(
id=item['Id'],
yargilama_dairesi=item['YARGILAMADAIRESI'],
karar_tarih=item['KARARTRH'],
karar_no=item['KARARNO'],
ilam_no=item.get('ILAMNO'), # Use get() to handle None values
madde_no=item['MADDENO'],
kamu_idaresi_turu=item['KAMUIDARESITURU'],
hesap_yili=item['HESAPYILI'],
web_karar_konusu=item['WEBKARARKONUSU'],
web_karar_metni=item['WEBKARARMETNI']
))
return DaireSearchResponse(
decisions=decisions,
total_records=response_json.get('recordsTotal', 0),
total_filtered=response_json.get('recordsFiltered', 0),
draw=response_json.get('draw', 1)
)
except httpx.RequestError as e:
logger.error(f"HTTP error during Daire search: {e}")
raise
except Exception as e:
logger.error(f"Error processing Daire search: {e}")
raise
def _convert_html_to_markdown(self, html_content: str) -> Optional[str]:
"""Convert HTML content to Markdown using MarkItDown with BytesIO to avoid filename length issues."""
if not html_content:
return None
try:
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_content.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
result = md_converter.convert(html_stream)
markdown_content = result.text_content
logger.info("Successfully converted HTML to Markdown")
return markdown_content
except Exception as e:
logger.error(f"Error converting HTML to Markdown: {e}")
return f"Error converting HTML content: {str(e)}"
async def get_document_as_markdown(self, decision_id: str, decision_type: str) -> SayistayDocumentMarkdown:
"""
Retrieve full text of a Sayıştay decision and convert to Markdown.
Args:
decision_id: Unique decision identifier
decision_type: Type of decision ('genel_kurul', 'temyiz_kurulu', 'daire')
Returns:
SayistayDocumentMarkdown with converted content
"""
logger.info(f"Retrieving document for {decision_type} decision ID: {decision_id}")
# Validate decision_id
if not decision_id or not decision_id.strip():
return SayistayDocumentMarkdown(
decision_id=decision_id,
decision_type=decision_type,
source_url="",
markdown_content=None,
error_message="Decision ID cannot be empty"
)
# Map decision type to URL path
url_path_mapping = {
'genel_kurul': 'KararlarGenelKurul',
'temyiz_kurulu': 'KararlarTemyiz',
'daire': 'KararlarDaire'
}
if decision_type not in url_path_mapping:
return SayistayDocumentMarkdown(
decision_id=decision_id,
decision_type=decision_type,
source_url="",
markdown_content=None,
error_message=f"Invalid decision type: {decision_type}. Must be one of: {list(url_path_mapping.keys())}"
)
# Build document URL
url_path = url_path_mapping[decision_type]
document_url = f"{self.BASE_URL}/{url_path}/Detay/{decision_id}/"
try:
# Make HTTP GET request to document URL
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin"
}
# Include session cookies if available
if self.session_cookies:
cookie_header = "; ".join([f"{k}={v}" for k, v in self.session_cookies.items()])
headers["Cookie"] = cookie_header
response = await self.http_client.get(document_url, headers=headers)
response.raise_for_status()
html_content = response.text
if not html_content or not html_content.strip():
logger.warning(f"Received empty HTML content from {document_url}")
return SayistayDocumentMarkdown(
decision_id=decision_id,
decision_type=decision_type,
source_url=document_url,
markdown_content=None,
error_message="Document content is empty"
)
# Convert HTML to Markdown using existing method
markdown_content = self._convert_html_to_markdown(html_content)
if markdown_content and "Error converting HTML content" not in markdown_content:
logger.info(f"Successfully retrieved and converted document {decision_id} to Markdown")
return SayistayDocumentMarkdown(
decision_id=decision_id,
decision_type=decision_type,
source_url=document_url,
markdown_content=markdown_content,
retrieval_date=None # Could add datetime.now().isoformat() if needed
)
else:
return SayistayDocumentMarkdown(
decision_id=decision_id,
decision_type=decision_type,
source_url=document_url,
markdown_content=None,
error_message=f"Failed to convert HTML to Markdown: {markdown_content}"
)
except httpx.HTTPStatusError as e:
error_msg = f"HTTP error {e.response.status_code} when fetching document: {e}"
logger.error(f"HTTP error fetching document {decision_id}: {error_msg}")
return SayistayDocumentMarkdown(
decision_id=decision_id,
decision_type=decision_type,
source_url=document_url,
markdown_content=None,
error_message=error_msg
)
except httpx.RequestError as e:
error_msg = f"Network error when fetching document: {e}"
logger.error(f"Network error fetching document {decision_id}: {error_msg}")
return SayistayDocumentMarkdown(
decision_id=decision_id,
decision_type=decision_type,
source_url=document_url,
markdown_content=None,
error_message=error_msg
)
except Exception as e:
error_msg = f"Unexpected error when fetching document: {e}"
logger.error(f"Unexpected error fetching document {decision_id}: {error_msg}")
return SayistayDocumentMarkdown(
decision_id=decision_id,
decision_type=decision_type,
source_url=document_url,
markdown_content=None,
error_message=error_msg
)
async def close_client_session(self):
"""Close HTTP client session."""
if hasattr(self, 'http_client') and self.http_client and not self.http_client.is_closed:
await self.http_client.aclose()
logger.info("SayistayApiClient: HTTP client session closed.")
```
--------------------------------------------------------------------------------
/sayistay_mcp_module/client.py:
--------------------------------------------------------------------------------
```python
# sayistay_mcp_module/client.py
import httpx
import re
from bs4 import BeautifulSoup
from typing import Dict, Any, List, Optional, Tuple
import logging
import html
import io
from urllib.parse import urlencode, urljoin
from markitdown import MarkItDown
from .models import (
GenelKurulSearchRequest, GenelKurulSearchResponse, GenelKurulDecision,
TemyizKuruluSearchRequest, TemyizKuruluSearchResponse, TemyizKuruluDecision,
DaireSearchRequest, DaireSearchResponse, DaireDecision,
SayistayDocumentMarkdown
)
from .enums import DaireEnum, KamuIdaresiTuruEnum, WebKararKonusuEnum, WEB_KARAR_KONUSU_MAPPING
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
class SayistayApiClient:
"""
API Client for Sayıştay (Turkish Court of Accounts) decision search system.
Handles three types of decisions:
- Genel Kurul (General Assembly): Precedent-setting interpretive decisions
- Temyiz Kurulu (Appeals Board): Appeals against chamber decisions
- Daire (Chamber): First-instance audit findings and sanctions
Features:
- ASP.NET WebForms session management with CSRF tokens
- DataTables-based pagination and filtering
- Automatic session refresh on expiration
- Document retrieval with Markdown conversion
"""
BASE_URL = "https://www.sayistay.gov.tr"
# Search endpoints for each decision type
GENEL_KURUL_ENDPOINT = "/KararlarGenelKurul/DataTablesList"
TEMYIZ_KURULU_ENDPOINT = "/KararlarTemyiz/DataTablesList"
DAIRE_ENDPOINT = "/KararlarDaire/DataTablesList"
# Page endpoints for session initialization and document access
GENEL_KURUL_PAGE = "/KararlarGenelKurul"
TEMYIZ_KURULU_PAGE = "/KararlarTemyiz"
DAIRE_PAGE = "/KararlarDaire"
def __init__(self, request_timeout: float = 60.0):
self.request_timeout = request_timeout
self.session_cookies: Dict[str, str] = {}
self.csrf_tokens: Dict[str, str] = {} # Store tokens for each endpoint
self.http_client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
"X-Requested-With": "XMLHttpRequest",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin"
},
timeout=request_timeout,
follow_redirects=True
)
async def _initialize_session_for_endpoint(self, endpoint_type: str) -> bool:
"""
Initialize session and obtain CSRF token for specific endpoint.
Args:
endpoint_type: One of 'genel_kurul', 'temyiz_kurulu', 'daire'
Returns:
True if session initialized successfully, False otherwise
"""
page_mapping = {
'genel_kurul': self.GENEL_KURUL_PAGE,
'temyiz_kurulu': self.TEMYIZ_KURULU_PAGE,
'daire': self.DAIRE_PAGE
}
if endpoint_type not in page_mapping:
logger.error(f"Invalid endpoint type: {endpoint_type}")
return False
page_url = page_mapping[endpoint_type]
logger.info(f"Initializing session for {endpoint_type} endpoint: {page_url}")
try:
response = await self.http_client.get(page_url)
response.raise_for_status()
# Extract session cookies
for cookie_name, cookie_value in response.cookies.items():
self.session_cookies[cookie_name] = cookie_value
logger.debug(f"Stored session cookie: {cookie_name}")
# Extract CSRF token from form
soup = BeautifulSoup(response.text, 'html.parser')
csrf_input = soup.find('input', {'name': '__RequestVerificationToken'})
if csrf_input and csrf_input.get('value'):
self.csrf_tokens[endpoint_type] = csrf_input['value']
logger.info(f"Extracted CSRF token for {endpoint_type}")
return True
else:
logger.warning(f"CSRF token not found in {endpoint_type} page")
return False
except httpx.RequestError as e:
logger.error(f"HTTP error during session initialization for {endpoint_type}: {e}")
return False
except Exception as e:
logger.error(f"Error initializing session for {endpoint_type}: {e}")
return False
def _enum_to_form_value(self, enum_value: str, enum_type: str) -> str:
"""Convert enum values to form values expected by the API."""
if enum_value == "ALL":
if enum_type == "daire":
return "Tüm Daireler"
elif enum_type == "kamu_idaresi":
return "Tüm Kurumlar"
elif enum_type == "web_karar_konusu":
return "Tüm Konular"
# Apply web_karar_konusu mapping
if enum_type == "web_karar_konusu":
return WEB_KARAR_KONUSU_MAPPING.get(enum_value, enum_value)
return enum_value
def _build_datatables_params(self, start: int, length: int, draw: int = 1) -> List[Tuple[str, str]]:
"""Build standard DataTables parameters for all endpoints."""
params = [
("draw", str(draw)),
("start", str(start)),
("length", str(length)),
("search[value]", ""),
("search[regex]", "false")
]
return params
def _build_genel_kurul_form_data(self, params: GenelKurulSearchRequest, draw: int = 1) -> List[Tuple[str, str]]:
"""Build form data for Genel Kurul search request."""
form_data = self._build_datatables_params(params.start, params.length, draw)
# Add DataTables column definitions (from actual request)
column_defs = [
("columns[0][data]", "KARARNO"),
("columns[0][name]", ""),
("columns[0][searchable]", "true"),
("columns[0][orderable]", "false"),
("columns[0][search][value]", ""),
("columns[0][search][regex]", "false"),
("columns[1][data]", "KARARNO"),
("columns[1][name]", ""),
("columns[1][searchable]", "true"),
("columns[1][orderable]", "true"),
("columns[1][search][value]", ""),
("columns[1][search][regex]", "false"),
("columns[2][data]", "KARARTARIH"),
("columns[2][name]", ""),
("columns[2][searchable]", "true"),
("columns[2][orderable]", "true"),
("columns[2][search][value]", ""),
("columns[2][search][regex]", "false"),
("columns[3][data]", "KARAROZETI"),
("columns[3][name]", ""),
("columns[3][searchable]", "true"),
("columns[3][orderable]", "false"),
("columns[3][search][value]", ""),
("columns[3][search][regex]", "false"),
("columns[4][data]", ""),
("columns[4][name]", ""),
("columns[4][searchable]", "true"),
("columns[4][orderable]", "false"),
("columns[4][search][value]", ""),
("columns[4][search][regex]", "false"),
("order[0][column]", "2"),
("order[0][dir]", "desc")
]
form_data.extend(column_defs)
# Add search parameters
form_data.extend([
("KararlarGenelKurulAra.KARARNO", params.karar_no or ""),
("__Invariant[]", "KararlarGenelKurulAra.KARARNO"),
("__Invariant[]", "KararlarGenelKurulAra.KARAREK"),
("KararlarGenelKurulAra.KARAREK", params.karar_ek or ""),
("KararlarGenelKurulAra.KARARTARIHBaslangic", params.karar_tarih_baslangic or "Başlangıç Tarihi"),
("KararlarGenelKurulAra.KARARTARIHBitis", params.karar_tarih_bitis or "Bitiş Tarihi"),
("KararlarGenelKurulAra.KARARTAMAMI", params.karar_tamami or ""),
("__RequestVerificationToken", self.csrf_tokens.get('genel_kurul', ''))
])
return form_data
def _build_temyiz_kurulu_form_data(self, params: TemyizKuruluSearchRequest, draw: int = 1) -> List[Tuple[str, str]]:
"""Build form data for Temyiz Kurulu search request."""
form_data = self._build_datatables_params(params.start, params.length, draw)
# Add DataTables column definitions (from actual request)
column_defs = [
("columns[0][data]", "TEMYIZTUTANAKTARIHI"),
("columns[0][name]", ""),
("columns[0][searchable]", "true"),
("columns[0][orderable]", "false"),
("columns[0][search][value]", ""),
("columns[0][search][regex]", "false"),
("columns[1][data]", "TEMYIZTUTANAKTARIHI"),
("columns[1][name]", ""),
("columns[1][searchable]", "true"),
("columns[1][orderable]", "true"),
("columns[1][search][value]", ""),
("columns[1][search][regex]", "false"),
("columns[2][data]", "ILAMDAIRESI"),
("columns[2][name]", ""),
("columns[2][searchable]", "true"),
("columns[2][orderable]", "true"),
("columns[2][search][value]", ""),
("columns[2][search][regex]", "false"),
("columns[3][data]", "TEMYIZKARAR"),
("columns[3][name]", ""),
("columns[3][searchable]", "true"),
("columns[3][orderable]", "false"),
("columns[3][search][value]", ""),
("columns[3][search][regex]", "false"),
("columns[4][data]", ""),
("columns[4][name]", ""),
("columns[4][searchable]", "true"),
("columns[4][orderable]", "false"),
("columns[4][search][value]", ""),
("columns[4][search][regex]", "false"),
("order[0][column]", "1"),
("order[0][dir]", "desc")
]
form_data.extend(column_defs)
# Add search parameters
daire_value = self._enum_to_form_value(params.ilam_dairesi, "daire")
kamu_idaresi_value = self._enum_to_form_value(params.kamu_idaresi_turu, "kamu_idaresi")
web_karar_konusu_value = self._enum_to_form_value(params.web_karar_konusu, "web_karar_konusu")
form_data.extend([
("KararlarTemyizAra.ILAMDAIRESI", daire_value),
("KararlarTemyizAra.YILI", params.yili or ""),
("KararlarTemyizAra.KARARTRHBaslangic", params.karar_tarih_baslangic or ""),
("KararlarTemyizAra.KARARTRHBitis", params.karar_tarih_bitis or ""),
("KararlarTemyizAra.KAMUIDARESITURU", kamu_idaresi_value if kamu_idaresi_value != "Tüm Kurumlar" else ""),
("KararlarTemyizAra.ILAMNO", params.ilam_no or ""),
("KararlarTemyizAra.DOSYANO", params.dosya_no or ""),
("KararlarTemyizAra.TEMYIZTUTANAKNO", params.temyiz_tutanak_no or ""),
("__Invariant", "KararlarTemyizAra.TEMYIZTUTANAKNO"),
("KararlarTemyizAra.TEMYIZKARAR", params.temyiz_karar or ""),
("KararlarTemyizAra.WEBKARARKONUSU", web_karar_konusu_value if web_karar_konusu_value != "Tüm Konular" else ""),
("__RequestVerificationToken", self.csrf_tokens.get('temyiz_kurulu', ''))
])
return form_data
def _build_daire_form_data(self, params: DaireSearchRequest, draw: int = 1) -> List[Tuple[str, str]]:
"""Build form data for Daire search request."""
form_data = self._build_datatables_params(params.start, params.length, draw)
# Add DataTables column definitions (from actual request)
column_defs = [
("columns[0][data]", "YARGILAMADAIRESI"),
("columns[0][name]", ""),
("columns[0][searchable]", "true"),
("columns[0][orderable]", "false"),
("columns[0][search][value]", ""),
("columns[0][search][regex]", "false"),
("columns[1][data]", "KARARTRH"),
("columns[1][name]", ""),
("columns[1][searchable]", "true"),
("columns[1][orderable]", "true"),
("columns[1][search][value]", ""),
("columns[1][search][regex]", "false"),
("columns[2][data]", "KARARNO"),
("columns[2][name]", ""),
("columns[2][searchable]", "true"),
("columns[2][orderable]", "true"),
("columns[2][search][value]", ""),
("columns[2][search][regex]", "false"),
("columns[3][data]", "YARGILAMADAIRESI"),
("columns[3][name]", ""),
("columns[3][searchable]", "true"),
("columns[3][orderable]", "true"),
("columns[3][search][value]", ""),
("columns[3][search][regex]", "false"),
("columns[4][data]", "WEBKARARMETNI"),
("columns[4][name]", ""),
("columns[4][searchable]", "true"),
("columns[4][orderable]", "false"),
("columns[4][search][value]", ""),
("columns[4][search][regex]", "false"),
("columns[5][data]", ""),
("columns[5][name]", ""),
("columns[5][searchable]", "true"),
("columns[5][orderable]", "false"),
("columns[5][search][value]", ""),
("columns[5][search][regex]", "false"),
("order[0][column]", "2"),
("order[0][dir]", "desc")
]
form_data.extend(column_defs)
# Add search parameters
daire_value = self._enum_to_form_value(params.yargilama_dairesi, "daire")
kamu_idaresi_value = self._enum_to_form_value(params.kamu_idaresi_turu, "kamu_idaresi")
web_karar_konusu_value = self._enum_to_form_value(params.web_karar_konusu, "web_karar_konusu")
form_data.extend([
("KararlarDaireAra.YARGILAMADAIRESI", daire_value),
("KararlarDaireAra.KARARTRHBaslangic", params.karar_tarih_baslangic or ""),
("KararlarDaireAra.KARARTRHBitis", params.karar_tarih_bitis or ""),
("KararlarDaireAra.ILAMNO", params.ilam_no or ""),
("KararlarDaireAra.KAMUIDARESITURU", kamu_idaresi_value if kamu_idaresi_value != "Tüm Kurumlar" else ""),
("KararlarDaireAra.HESAPYILI", params.hesap_yili or ""),
("KararlarDaireAra.WEBKARARKONUSU", web_karar_konusu_value if web_karar_konusu_value != "Tüm Konular" else ""),
("KararlarDaireAra.WEBKARARMETNI", params.web_karar_metni or ""),
("__RequestVerificationToken", self.csrf_tokens.get('daire', ''))
])
return form_data
async def search_genel_kurul_decisions(self, params: GenelKurulSearchRequest) -> GenelKurulSearchResponse:
"""
Search Sayıştay Genel Kurul (General Assembly) decisions.
Args:
params: Search parameters for Genel Kurul decisions
Returns:
GenelKurulSearchResponse with matching decisions
"""
# Initialize session if needed
if 'genel_kurul' not in self.csrf_tokens:
if not await self._initialize_session_for_endpoint('genel_kurul'):
raise Exception("Failed to initialize session for Genel Kurul endpoint")
form_data = self._build_genel_kurul_form_data(params)
encoded_data = urlencode(form_data, encoding='utf-8')
logger.info(f"Searching Genel Kurul decisions with parameters: {params.model_dump(exclude_none=True)}")
try:
# Update headers with cookies
headers = self.http_client.headers.copy()
if self.session_cookies:
cookie_header = "; ".join([f"{k}={v}" for k, v in self.session_cookies.items()])
headers["Cookie"] = cookie_header
response = await self.http_client.post(
self.GENEL_KURUL_ENDPOINT,
data=encoded_data,
headers=headers
)
response.raise_for_status()
response_json = response.json()
# Parse response
decisions = []
for item in response_json.get('data', []):
decisions.append(GenelKurulDecision(
id=item['Id'],
karar_no=item['KARARNO'],
karar_tarih=item['KARARTARIH'],
karar_ozeti=item['KARAROZETI']
))
return GenelKurulSearchResponse(
decisions=decisions,
total_records=response_json.get('recordsTotal', 0),
total_filtered=response_json.get('recordsFiltered', 0),
draw=response_json.get('draw', 1)
)
except httpx.RequestError as e:
logger.error(f"HTTP error during Genel Kurul search: {e}")
raise
except Exception as e:
logger.error(f"Error processing Genel Kurul search: {e}")
raise
async def search_temyiz_kurulu_decisions(self, params: TemyizKuruluSearchRequest) -> TemyizKuruluSearchResponse:
"""
Search Sayıştay Temyiz Kurulu (Appeals Board) decisions.
Args:
params: Search parameters for Temyiz Kurulu decisions
Returns:
TemyizKuruluSearchResponse with matching decisions
"""
# Initialize session if needed
if 'temyiz_kurulu' not in self.csrf_tokens:
if not await self._initialize_session_for_endpoint('temyiz_kurulu'):
raise Exception("Failed to initialize session for Temyiz Kurulu endpoint")
form_data = self._build_temyiz_kurulu_form_data(params)
encoded_data = urlencode(form_data, encoding='utf-8')
logger.info(f"Searching Temyiz Kurulu decisions with parameters: {params.model_dump(exclude_none=True)}")
try:
# Update headers with cookies
headers = self.http_client.headers.copy()
if self.session_cookies:
cookie_header = "; ".join([f"{k}={v}" for k, v in self.session_cookies.items()])
headers["Cookie"] = cookie_header
response = await self.http_client.post(
self.TEMYIZ_KURULU_ENDPOINT,
data=encoded_data,
headers=headers
)
response.raise_for_status()
response_json = response.json()
# Parse response
decisions = []
for item in response_json.get('data', []):
decisions.append(TemyizKuruluDecision(
id=item['Id'],
temyiz_tutanak_tarihi=item['TEMYIZTUTANAKTARIHI'],
ilam_dairesi=item['ILAMDAIRESI'],
temyiz_karar=item['TEMYIZKARAR']
))
return TemyizKuruluSearchResponse(
decisions=decisions,
total_records=response_json.get('recordsTotal', 0),
total_filtered=response_json.get('recordsFiltered', 0),
draw=response_json.get('draw', 1)
)
except httpx.RequestError as e:
logger.error(f"HTTP error during Temyiz Kurulu search: {e}")
raise
except Exception as e:
logger.error(f"Error processing Temyiz Kurulu search: {e}")
raise
async def search_daire_decisions(self, params: DaireSearchRequest) -> DaireSearchResponse:
"""
Search Sayıştay Daire (Chamber) decisions.
Args:
params: Search parameters for Daire decisions
Returns:
DaireSearchResponse with matching decisions
"""
# Initialize session if needed
if 'daire' not in self.csrf_tokens:
if not await self._initialize_session_for_endpoint('daire'):
raise Exception("Failed to initialize session for Daire endpoint")
form_data = self._build_daire_form_data(params)
encoded_data = urlencode(form_data, encoding='utf-8')
logger.info(f"Searching Daire decisions with parameters: {params.model_dump(exclude_none=True)}")
try:
# Update headers with cookies
headers = self.http_client.headers.copy()
if self.session_cookies:
cookie_header = "; ".join([f"{k}={v}" for k, v in self.session_cookies.items()])
headers["Cookie"] = cookie_header
response = await self.http_client.post(
self.DAIRE_ENDPOINT,
data=encoded_data,
headers=headers
)
response.raise_for_status()
response_json = response.json()
# Parse response
decisions = []
for item in response_json.get('data', []):
decisions.append(DaireDecision(
id=item['Id'],
yargilama_dairesi=item['YARGILAMADAIRESI'],
karar_tarih=item['KARARTRH'],
karar_no=item['KARARNO'],
ilam_no=item.get('ILAMNO'), # Use get() to handle None values
madde_no=item['MADDENO'],
kamu_idaresi_turu=item['KAMUIDARESITURU'],
hesap_yili=item['HESAPYILI'],
web_karar_konusu=item['WEBKARARKONUSU'],
web_karar_metni=item['WEBKARARMETNI']
))
return DaireSearchResponse(
decisions=decisions,
total_records=response_json.get('recordsTotal', 0),
total_filtered=response_json.get('recordsFiltered', 0),
draw=response_json.get('draw', 1)
)
except httpx.RequestError as e:
logger.error(f"HTTP error during Daire search: {e}")
raise
except Exception as e:
logger.error(f"Error processing Daire search: {e}")
raise
def _convert_html_to_markdown(self, html_content: str) -> Optional[str]:
"""Convert HTML content to Markdown using MarkItDown with BytesIO to avoid filename length issues."""
if not html_content:
return None
try:
# Convert HTML string to bytes and create BytesIO stream
html_bytes = html_content.encode('utf-8')
html_stream = io.BytesIO(html_bytes)
# Pass BytesIO stream to MarkItDown to avoid temp file creation
md_converter = MarkItDown()
result = md_converter.convert(html_stream)
markdown_content = result.text_content
logger.info("Successfully converted HTML to Markdown")
return markdown_content
except Exception as e:
logger.error(f"Error converting HTML to Markdown: {e}")
return f"Error converting HTML content: {str(e)}"
async def get_document_as_markdown(self, decision_id: str, decision_type: str) -> SayistayDocumentMarkdown:
"""
Retrieve full text of a Sayıştay decision and convert to Markdown.
Args:
decision_id: Unique decision identifier
decision_type: Type of decision ('genel_kurul', 'temyiz_kurulu', 'daire')
Returns:
SayistayDocumentMarkdown with converted content
"""
logger.info(f"Retrieving document for {decision_type} decision ID: {decision_id}")
# Validate decision_id
if not decision_id or not decision_id.strip():
return SayistayDocumentMarkdown(
decision_id=decision_id,
decision_type=decision_type,
source_url="",
markdown_content=None,
error_message="Decision ID cannot be empty"
)
# Map decision type to URL path
url_path_mapping = {
'genel_kurul': 'KararlarGenelKurul',
'temyiz_kurulu': 'KararlarTemyiz',
'daire': 'KararlarDaire'
}
if decision_type not in url_path_mapping:
return SayistayDocumentMarkdown(
decision_id=decision_id,
decision_type=decision_type,
source_url="",
markdown_content=None,
error_message=f"Invalid decision type: {decision_type}. Must be one of: {list(url_path_mapping.keys())}"
)
# Build document URL
url_path = url_path_mapping[decision_type]
document_url = f"{self.BASE_URL}/{url_path}/Detay/{decision_id}/"
try:
# Make HTTP GET request to document URL
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin"
}
# Include session cookies if available
if self.session_cookies:
cookie_header = "; ".join([f"{k}={v}" for k, v in self.session_cookies.items()])
headers["Cookie"] = cookie_header
response = await self.http_client.get(document_url, headers=headers)
response.raise_for_status()
html_content = response.text
if not html_content or not html_content.strip():
logger.warning(f"Received empty HTML content from {document_url}")
return SayistayDocumentMarkdown(
decision_id=decision_id,
decision_type=decision_type,
source_url=document_url,
markdown_content=None,
error_message="Document content is empty"
)
# Convert HTML to Markdown using existing method
markdown_content = self._convert_html_to_markdown(html_content)
if markdown_content and "Error converting HTML content" not in markdown_content:
logger.info(f"Successfully retrieved and converted document {decision_id} to Markdown")
return SayistayDocumentMarkdown(
decision_id=decision_id,
decision_type=decision_type,
source_url=document_url,
markdown_content=markdown_content,
retrieval_date=None # Could add datetime.now().isoformat() if needed
)
else:
return SayistayDocumentMarkdown(
decision_id=decision_id,
decision_type=decision_type,
source_url=document_url,
markdown_content=None,
error_message=f"Failed to convert HTML to Markdown: {markdown_content}"
)
except httpx.HTTPStatusError as e:
error_msg = f"HTTP error {e.response.status_code} when fetching document: {e}"
logger.error(f"HTTP error fetching document {decision_id}: {error_msg}")
return SayistayDocumentMarkdown(
decision_id=decision_id,
decision_type=decision_type,
source_url=document_url,
markdown_content=None,
error_message=error_msg
)
except httpx.RequestError as e:
error_msg = f"Network error when fetching document: {e}"
logger.error(f"Network error fetching document {decision_id}: {error_msg}")
return SayistayDocumentMarkdown(
decision_id=decision_id,
decision_type=decision_type,
source_url=document_url,
markdown_content=None,
error_message=error_msg
)
except Exception as e:
error_msg = f"Unexpected error when fetching document: {e}"
logger.error(f"Unexpected error fetching document {decision_id}: {error_msg}")
return SayistayDocumentMarkdown(
decision_id=decision_id,
decision_type=decision_type,
source_url=document_url,
markdown_content=None,
error_message=error_msg
)
async def close_client_session(self):
"""Close HTTP client session."""
if hasattr(self, 'http_client') and self.http_client and not self.http_client.is_closed:
await self.http_client.aclose()
logger.info("SayistayApiClient: HTTP client session closed.")
```