#
tokens: 43540/50000 5/236 files (page 7/10)
lines: off (toggle) GitHub
raw markdown copy
This is page 7 of 10. Use http://codebase.md/getzep/graphiti?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .env.example
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   └── bug_report.md
│   ├── pull_request_template.md
│   ├── secret_scanning.yml
│   └── workflows
│       ├── ai-moderator.yml
│       ├── cla.yml
│       ├── claude-code-review-manual.yml
│       ├── claude-code-review.yml
│       ├── claude.yml
│       ├── codeql.yml
│       ├── lint.yml
│       ├── release-graphiti-core.yml
│       ├── release-mcp-server.yml
│       ├── release-server-container.yml
│       ├── typecheck.yml
│       └── unit_tests.yml
├── .gitignore
├── AGENTS.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── conftest.py
├── CONTRIBUTING.md
├── depot.json
├── docker-compose.test.yml
├── docker-compose.yml
├── Dockerfile
├── ellipsis.yaml
├── examples
│   ├── azure-openai
│   │   ├── .env.example
│   │   ├── azure_openai_neo4j.py
│   │   └── README.md
│   ├── data
│   │   └── manybirds_products.json
│   ├── ecommerce
│   │   ├── runner.ipynb
│   │   └── runner.py
│   ├── langgraph-agent
│   │   ├── agent.ipynb
│   │   └── tinybirds-jess.png
│   ├── opentelemetry
│   │   ├── .env.example
│   │   ├── otel_stdout_example.py
│   │   ├── pyproject.toml
│   │   ├── README.md
│   │   └── uv.lock
│   ├── podcast
│   │   ├── podcast_runner.py
│   │   ├── podcast_transcript.txt
│   │   └── transcript_parser.py
│   ├── quickstart
│   │   ├── dense_vs_normal_ingestion.py
│   │   ├── quickstart_falkordb.py
│   │   ├── quickstart_neo4j.py
│   │   ├── quickstart_neptune.py
│   │   ├── README.md
│   │   └── requirements.txt
│   └── wizard_of_oz
│       ├── parser.py
│       ├── runner.py
│       └── woo.txt
├── graphiti_core
│   ├── __init__.py
│   ├── cross_encoder
│   │   ├── __init__.py
│   │   ├── bge_reranker_client.py
│   │   ├── client.py
│   │   ├── gemini_reranker_client.py
│   │   └── openai_reranker_client.py
│   ├── decorators.py
│   ├── driver
│   │   ├── __init__.py
│   │   ├── driver.py
│   │   ├── falkordb_driver.py
│   │   ├── graph_operations
│   │   │   └── graph_operations.py
│   │   ├── kuzu_driver.py
│   │   ├── neo4j_driver.py
│   │   ├── neptune_driver.py
│   │   └── search_interface
│   │       └── search_interface.py
│   ├── edges.py
│   ├── embedder
│   │   ├── __init__.py
│   │   ├── azure_openai.py
│   │   ├── client.py
│   │   ├── gemini.py
│   │   ├── openai.py
│   │   └── voyage.py
│   ├── errors.py
│   ├── graph_queries.py
│   ├── graphiti_types.py
│   ├── graphiti.py
│   ├── helpers.py
│   ├── llm_client
│   │   ├── __init__.py
│   │   ├── anthropic_client.py
│   │   ├── azure_openai_client.py
│   │   ├── client.py
│   │   ├── config.py
│   │   ├── errors.py
│   │   ├── gemini_client.py
│   │   ├── groq_client.py
│   │   ├── openai_base_client.py
│   │   ├── openai_client.py
│   │   ├── openai_generic_client.py
│   │   └── utils.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models
│   │   ├── __init__.py
│   │   ├── edges
│   │   │   ├── __init__.py
│   │   │   └── edge_db_queries.py
│   │   └── nodes
│   │       ├── __init__.py
│   │       └── node_db_queries.py
│   ├── nodes.py
│   ├── prompts
│   │   ├── __init__.py
│   │   ├── dedupe_edges.py
│   │   ├── dedupe_nodes.py
│   │   ├── eval.py
│   │   ├── extract_edge_dates.py
│   │   ├── extract_edges.py
│   │   ├── extract_nodes.py
│   │   ├── invalidate_edges.py
│   │   ├── lib.py
│   │   ├── models.py
│   │   ├── prompt_helpers.py
│   │   ├── snippets.py
│   │   └── summarize_nodes.py
│   ├── py.typed
│   ├── search
│   │   ├── __init__.py
│   │   ├── search_config_recipes.py
│   │   ├── search_config.py
│   │   ├── search_filters.py
│   │   ├── search_helpers.py
│   │   ├── search_utils.py
│   │   └── search.py
│   ├── telemetry
│   │   ├── __init__.py
│   │   └── telemetry.py
│   ├── tracer.py
│   └── utils
│       ├── __init__.py
│       ├── bulk_utils.py
│       ├── content_chunking.py
│       ├── datetime_utils.py
│       ├── maintenance
│       │   ├── __init__.py
│       │   ├── community_operations.py
│       │   ├── dedup_helpers.py
│       │   ├── edge_operations.py
│       │   ├── graph_data_operations.py
│       │   ├── node_operations.py
│       │   └── temporal_operations.py
│       ├── ontology_utils
│       │   └── entity_types_utils.py
│       └── text_utils.py
├── images
│   ├── arxiv-screenshot.png
│   ├── graphiti-graph-intro.gif
│   ├── graphiti-intro-slides-stock-2.gif
│   └── simple_graph.svg
├── LICENSE
├── Makefile
├── mcp_server
│   ├── .env.example
│   ├── .python-version
│   ├── config
│   │   ├── config-docker-falkordb-combined.yaml
│   │   ├── config-docker-falkordb.yaml
│   │   ├── config-docker-neo4j.yaml
│   │   ├── config.yaml
│   │   └── mcp_config_stdio_example.json
│   ├── docker
│   │   ├── build-standalone.sh
│   │   ├── build-with-version.sh
│   │   ├── docker-compose-falkordb.yml
│   │   ├── docker-compose-neo4j.yml
│   │   ├── docker-compose.yml
│   │   ├── Dockerfile
│   │   ├── Dockerfile.standalone
│   │   ├── github-actions-example.yml
│   │   ├── README-falkordb-combined.md
│   │   └── README.md
│   ├── docs
│   │   └── cursor_rules.md
│   ├── main.py
│   ├── pyproject.toml
│   ├── pytest.ini
│   ├── README.md
│   ├── src
│   │   ├── __init__.py
│   │   ├── config
│   │   │   ├── __init__.py
│   │   │   └── schema.py
│   │   ├── graphiti_mcp_server.py
│   │   ├── models
│   │   │   ├── __init__.py
│   │   │   ├── entity_types.py
│   │   │   └── response_types.py
│   │   ├── services
│   │   │   ├── __init__.py
│   │   │   ├── factories.py
│   │   │   └── queue_service.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── formatting.py
│   │       └── utils.py
│   ├── tests
│   │   ├── __init__.py
│   │   ├── conftest.py
│   │   ├── pytest.ini
│   │   ├── README.md
│   │   ├── run_tests.py
│   │   ├── test_async_operations.py
│   │   ├── test_comprehensive_integration.py
│   │   ├── test_configuration.py
│   │   ├── test_falkordb_integration.py
│   │   ├── test_fixtures.py
│   │   ├── test_http_integration.py
│   │   ├── test_integration.py
│   │   ├── test_mcp_integration.py
│   │   ├── test_mcp_transports.py
│   │   ├── test_stdio_simple.py
│   │   └── test_stress_load.py
│   └── uv.lock
├── OTEL_TRACING.md
├── py.typed
├── pyproject.toml
├── pytest.ini
├── README.md
├── SECURITY.md
├── server
│   ├── .env.example
│   ├── graph_service
│   │   ├── __init__.py
│   │   ├── config.py
│   │   ├── dto
│   │   │   ├── __init__.py
│   │   │   ├── common.py
│   │   │   ├── ingest.py
│   │   │   └── retrieve.py
│   │   ├── main.py
│   │   ├── routers
│   │   │   ├── __init__.py
│   │   │   ├── ingest.py
│   │   │   └── retrieve.py
│   │   └── zep_graphiti.py
│   ├── Makefile
│   ├── pyproject.toml
│   ├── README.md
│   └── uv.lock
├── signatures
│   └── version1
│       └── cla.json
├── tests
│   ├── cross_encoder
│   │   ├── test_bge_reranker_client_int.py
│   │   └── test_gemini_reranker_client.py
│   ├── driver
│   │   ├── __init__.py
│   │   └── test_falkordb_driver.py
│   ├── embedder
│   │   ├── embedder_fixtures.py
│   │   ├── test_gemini.py
│   │   ├── test_openai.py
│   │   └── test_voyage.py
│   ├── evals
│   │   ├── data
│   │   │   └── longmemeval_data
│   │   │       ├── longmemeval_oracle.json
│   │   │       └── README.md
│   │   ├── eval_cli.py
│   │   ├── eval_e2e_graph_building.py
│   │   ├── pytest.ini
│   │   └── utils.py
│   ├── helpers_test.py
│   ├── llm_client
│   │   ├── test_anthropic_client_int.py
│   │   ├── test_anthropic_client.py
│   │   ├── test_azure_openai_client.py
│   │   ├── test_client.py
│   │   ├── test_errors.py
│   │   └── test_gemini_client.py
│   ├── test_edge_int.py
│   ├── test_entity_exclusion_int.py
│   ├── test_graphiti_int.py
│   ├── test_graphiti_mock.py
│   ├── test_node_int.py
│   ├── test_text_utils.py
│   └── utils
│       ├── maintenance
│       │   ├── test_bulk_utils.py
│       │   ├── test_edge_operations.py
│       │   ├── test_entity_extraction.py
│       │   ├── test_node_operations.py
│       │   └── test_temporal_operations_int.py
│       ├── search
│       │   └── search_utils_test.py
│       └── test_content_chunking.py
├── uv.lock
└── Zep-CLA.md
```

# Files

--------------------------------------------------------------------------------
/graphiti_core/utils/maintenance/edge_operations.py:
--------------------------------------------------------------------------------

```python
"""
Copyright 2024, Zep Software, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import logging
from datetime import datetime
from time import time

from pydantic import BaseModel
from typing_extensions import LiteralString

from graphiti_core.driver.driver import GraphDriver, GraphProvider
from graphiti_core.edges import (
    CommunityEdge,
    EntityEdge,
    EpisodicEdge,
    create_entity_edge_embeddings,
)
from graphiti_core.graphiti_types import GraphitiClients
from graphiti_core.helpers import semaphore_gather
from graphiti_core.llm_client import LLMClient
from graphiti_core.llm_client.config import ModelSize
from graphiti_core.nodes import CommunityNode, EntityNode, EpisodicNode
from graphiti_core.prompts import prompt_library
from graphiti_core.prompts.dedupe_edges import EdgeDuplicate
from graphiti_core.prompts.extract_edges import Edge as ExtractedEdge
from graphiti_core.prompts.extract_edges import ExtractedEdges
from graphiti_core.search.search import search
from graphiti_core.search.search_config import SearchResults
from graphiti_core.search.search_config_recipes import EDGE_HYBRID_SEARCH_RRF
from graphiti_core.search.search_filters import SearchFilters
from graphiti_core.utils.content_chunking import generate_covering_chunks
from graphiti_core.utils.datetime_utils import ensure_utc, utc_now
from graphiti_core.utils.maintenance.dedup_helpers import _normalize_string_exact

DEFAULT_EDGE_NAME = 'RELATES_TO'
MAX_NODES = 15

logger = logging.getLogger(__name__)


def build_episodic_edges(
    entity_nodes: list[EntityNode],
    episode_uuid: str,
    created_at: datetime,
) -> list[EpisodicEdge]:
    episodic_edges: list[EpisodicEdge] = [
        EpisodicEdge(
            source_node_uuid=episode_uuid,
            target_node_uuid=node.uuid,
            created_at=created_at,
            group_id=node.group_id,
        )
        for node in entity_nodes
    ]

    logger.debug(f'Built episodic edges: {episodic_edges}')

    return episodic_edges


def build_community_edges(
    entity_nodes: list[EntityNode],
    community_node: CommunityNode,
    created_at: datetime,
) -> list[CommunityEdge]:
    edges: list[CommunityEdge] = [
        CommunityEdge(
            source_node_uuid=community_node.uuid,
            target_node_uuid=node.uuid,
            created_at=created_at,
            group_id=community_node.group_id,
        )
        for node in entity_nodes
    ]

    return edges


async def extract_edges(
    clients: GraphitiClients,
    episode: EpisodicNode,
    nodes: list[EntityNode],
    previous_episodes: list[EpisodicNode],
    edge_type_map: dict[tuple[str, str], list[str]],
    group_id: str = '',
    edge_types: dict[str, type[BaseModel]] | None = None,
    custom_extraction_instructions: str | None = None,
) -> list[EntityEdge]:
    start = time()

    extract_edges_max_tokens = 16384
    llm_client = clients.llm_client

    edge_type_signature_map: dict[str, tuple[str, str]] = {
        edge_type: signature
        for signature, edge_types in edge_type_map.items()
        for edge_type in edge_types
    }

    edge_types_context = (
        [
            {
                'fact_type_name': type_name,
                'fact_type_signature': edge_type_signature_map.get(type_name, ('Entity', 'Entity')),
                'fact_type_description': type_model.__doc__,
            }
            for type_name, type_model in edge_types.items()
        ]
        if edge_types is not None
        else []
    )

    # Generate covering chunks to ensure all node pairs are processed.
    # Uses a greedy approach based on the Handshake Flights Problem.
    covering_chunks = generate_covering_chunks(nodes, MAX_NODES)

    # Pre-assign pairs to chunks to avoid duplicate edge extraction.
    # Each pair is assigned to the first chunk that contains it.
    processed_pairs: set[frozenset[int]] = set()
    chunk_assigned_pairs: list[set[frozenset[int]]] = []

    for _, global_indices in covering_chunks:
        assigned_pairs: set[frozenset[int]] = set()
        for i, idx_i in enumerate(global_indices):
            for idx_j in global_indices[i + 1 :]:
                pair = frozenset([idx_i, idx_j])
                if pair not in processed_pairs:
                    processed_pairs.add(pair)
                    assigned_pairs.add(pair)
        chunk_assigned_pairs.append(assigned_pairs)

    async def extract_edges_for_chunk(
        chunk: list[EntityNode],
        global_indices: list[int],
        assigned_pairs: set[frozenset[int]],
    ) -> list[ExtractedEdge]:
        # Skip chunks with no assigned pairs (all pairs already processed)
        if not assigned_pairs:
            return []

        # Prepare context for LLM
        context = {
            'episode_content': episode.content,
            'nodes': [
                {'id': idx, 'name': node.name, 'entity_types': node.labels}
                for idx, node in enumerate(chunk)
            ],
            'previous_episodes': [ep.content for ep in previous_episodes],
            'reference_time': episode.valid_at,
            'edge_types': edge_types_context,
            'custom_extraction_instructions': custom_extraction_instructions or '',
        }

        llm_response = await llm_client.generate_response(
            prompt_library.extract_edges.edge(context),
            response_model=ExtractedEdges,
            max_tokens=extract_edges_max_tokens,
            group_id=group_id,
            prompt_name='extract_edges.edge',
        )
        chunk_edges_data = ExtractedEdges(**llm_response).edges

        # Map chunk-local indices to global indices in the original nodes list
        # Note: global_indices are guaranteed valid by generate_covering_chunks,
        # but LLM-returned local indices need validation
        valid_edges: list[ExtractedEdge] = []
        chunk_size = len(global_indices)

        for edge_data in chunk_edges_data:
            source_local_idx = edge_data.source_entity_id
            target_local_idx = edge_data.target_entity_id

            # Validate LLM-returned indices are within chunk bounds
            if not (0 <= source_local_idx < chunk_size):
                logger.warning(
                    f'Source index {source_local_idx} out of bounds for chunk of size '
                    f'{chunk_size} in edge {edge_data.relation_type}'
                )
                continue

            if not (0 <= target_local_idx < chunk_size):
                logger.warning(
                    f'Target index {target_local_idx} out of bounds for chunk of size '
                    f'{chunk_size} in edge {edge_data.relation_type}'
                )
                continue

            # Map to global indices (guaranteed valid by generate_covering_chunks)
            mapped_source = global_indices[source_local_idx]
            mapped_target = global_indices[target_local_idx]
            edge_data.source_entity_id = mapped_source
            edge_data.target_entity_id = mapped_target

            # Only include edges for pairs assigned to this chunk
            edge_pair = frozenset([mapped_source, mapped_target])
            if edge_pair in assigned_pairs:
                valid_edges.append(edge_data)

        return valid_edges

    # Extract edges from all chunks in parallel
    chunk_results: list[list[ExtractedEdge]] = list(
        await semaphore_gather(
            *[
                extract_edges_for_chunk(chunk, global_indices, assigned_pairs)
                for (chunk, global_indices), assigned_pairs in zip(
                    covering_chunks, chunk_assigned_pairs, strict=True
                )
            ]
        )
    )

    # Combine results from all chunks
    edges_data: list[ExtractedEdge] = []
    for chunk_edges in chunk_results:
        edges_data.extend(chunk_edges)

    end = time()
    logger.debug(f'Extracted new edges: {edges_data} in {(end - start) * 1000} ms')

    if len(edges_data) == 0:
        return []

    # Convert the extracted data into EntityEdge objects
    edges = []
    for edge_data in edges_data:
        # Validate Edge Date information
        valid_at = edge_data.valid_at
        invalid_at = edge_data.invalid_at
        valid_at_datetime = None
        invalid_at_datetime = None

        # Filter out empty edges
        if not edge_data.fact.strip():
            continue

        # Indices already validated in extract_edges_for_chunk
        source_node_uuid = nodes[edge_data.source_entity_id].uuid
        target_node_uuid = nodes[edge_data.target_entity_id].uuid

        if valid_at:
            try:
                valid_at_datetime = ensure_utc(
                    datetime.fromisoformat(valid_at.replace('Z', '+00:00'))
                )
            except ValueError as e:
                logger.warning(f'WARNING: Error parsing valid_at date: {e}. Input: {valid_at}')

        if invalid_at:
            try:
                invalid_at_datetime = ensure_utc(
                    datetime.fromisoformat(invalid_at.replace('Z', '+00:00'))
                )
            except ValueError as e:
                logger.warning(f'WARNING: Error parsing invalid_at date: {e}. Input: {invalid_at}')
        edge = EntityEdge(
            source_node_uuid=source_node_uuid,
            target_node_uuid=target_node_uuid,
            name=edge_data.relation_type,
            group_id=group_id,
            fact=edge_data.fact,
            episodes=[episode.uuid],
            created_at=utc_now(),
            valid_at=valid_at_datetime,
            invalid_at=invalid_at_datetime,
        )
        edges.append(edge)
        logger.debug(
            f'Created new edge: {edge.name} from (UUID: {edge.source_node_uuid}) to (UUID: {edge.target_node_uuid})'
        )

    logger.debug(f'Extracted edges: {[(e.name, e.uuid) for e in edges]}')

    return edges


async def resolve_extracted_edges(
    clients: GraphitiClients,
    extracted_edges: list[EntityEdge],
    episode: EpisodicNode,
    entities: list[EntityNode],
    edge_types: dict[str, type[BaseModel]],
    edge_type_map: dict[tuple[str, str], list[str]],
) -> tuple[list[EntityEdge], list[EntityEdge]]:
    # Fast path: deduplicate exact matches within the extracted edges before parallel processing
    seen: dict[tuple[str, str, str], EntityEdge] = {}
    deduplicated_edges: list[EntityEdge] = []

    for edge in extracted_edges:
        key = (
            edge.source_node_uuid,
            edge.target_node_uuid,
            _normalize_string_exact(edge.fact),
        )
        if key not in seen:
            seen[key] = edge
            deduplicated_edges.append(edge)

    extracted_edges = deduplicated_edges

    driver = clients.driver
    llm_client = clients.llm_client
    embedder = clients.embedder
    await create_entity_edge_embeddings(embedder, extracted_edges)

    valid_edges_list: list[list[EntityEdge]] = await semaphore_gather(
        *[
            EntityEdge.get_between_nodes(driver, edge.source_node_uuid, edge.target_node_uuid)
            for edge in extracted_edges
        ]
    )

    related_edges_results: list[SearchResults] = await semaphore_gather(
        *[
            search(
                clients,
                extracted_edge.fact,
                group_ids=[extracted_edge.group_id],
                config=EDGE_HYBRID_SEARCH_RRF,
                search_filter=SearchFilters(edge_uuids=[edge.uuid for edge in valid_edges]),
            )
            for extracted_edge, valid_edges in zip(extracted_edges, valid_edges_list, strict=True)
        ]
    )

    related_edges_lists: list[list[EntityEdge]] = [result.edges for result in related_edges_results]

    edge_invalidation_candidate_results: list[SearchResults] = await semaphore_gather(
        *[
            search(
                clients,
                extracted_edge.fact,
                group_ids=[extracted_edge.group_id],
                config=EDGE_HYBRID_SEARCH_RRF,
                search_filter=SearchFilters(),
            )
            for extracted_edge in extracted_edges
        ]
    )

    edge_invalidation_candidates: list[list[EntityEdge]] = [
        result.edges for result in edge_invalidation_candidate_results
    ]

    logger.debug(
        f'Related edges lists: {[(e.name, e.uuid) for edges_lst in related_edges_lists for e in edges_lst]}'
    )

    # Build entity hash table
    uuid_entity_map: dict[str, EntityNode] = {entity.uuid: entity for entity in entities}

    # Collect all node UUIDs referenced by edges that are not in the entities list
    referenced_node_uuids = set()
    for extracted_edge in extracted_edges:
        if extracted_edge.source_node_uuid not in uuid_entity_map:
            referenced_node_uuids.add(extracted_edge.source_node_uuid)
        if extracted_edge.target_node_uuid not in uuid_entity_map:
            referenced_node_uuids.add(extracted_edge.target_node_uuid)

    # Fetch missing nodes from the database
    if referenced_node_uuids:
        missing_nodes = await EntityNode.get_by_uuids(driver, list(referenced_node_uuids))
        for node in missing_nodes:
            uuid_entity_map[node.uuid] = node

    # Determine which edge types are relevant for each edge.
    # `edge_types_lst` stores the subset of custom edge definitions whose
    # node signature matches each extracted edge. Anything outside this subset
    # should only stay on the edge if it is a non-custom (LLM generated) label.
    edge_types_lst: list[dict[str, type[BaseModel]]] = []
    custom_type_names = set(edge_types or {})
    for extracted_edge in extracted_edges:
        source_node = uuid_entity_map.get(extracted_edge.source_node_uuid)
        target_node = uuid_entity_map.get(extracted_edge.target_node_uuid)
        source_node_labels = (
            source_node.labels + ['Entity'] if source_node is not None else ['Entity']
        )
        target_node_labels = (
            target_node.labels + ['Entity'] if target_node is not None else ['Entity']
        )
        label_tuples = [
            (source_label, target_label)
            for source_label in source_node_labels
            for target_label in target_node_labels
        ]

        extracted_edge_types = {}
        for label_tuple in label_tuples:
            type_names = edge_type_map.get(label_tuple, [])
            for type_name in type_names:
                type_model = edge_types.get(type_name)
                if type_model is None:
                    continue

                extracted_edge_types[type_name] = type_model

        edge_types_lst.append(extracted_edge_types)

    for extracted_edge, extracted_edge_types in zip(extracted_edges, edge_types_lst, strict=True):
        allowed_type_names = set(extracted_edge_types)
        is_custom_name = extracted_edge.name in custom_type_names
        if not allowed_type_names:
            # No custom types are valid for this node pairing. Keep LLM generated
            # labels, but flip disallowed custom names back to the default.
            if is_custom_name and extracted_edge.name != DEFAULT_EDGE_NAME:
                extracted_edge.name = DEFAULT_EDGE_NAME
            continue
        if is_custom_name and extracted_edge.name not in allowed_type_names:
            # Custom name exists but it is not permitted for this source/target
            # signature, so fall back to the default edge label.
            extracted_edge.name = DEFAULT_EDGE_NAME

    # resolve edges with related edges in the graph and find invalidation candidates
    results: list[tuple[EntityEdge, list[EntityEdge], list[EntityEdge]]] = list(
        await semaphore_gather(
            *[
                resolve_extracted_edge(
                    llm_client,
                    extracted_edge,
                    related_edges,
                    existing_edges,
                    episode,
                    extracted_edge_types,
                    custom_type_names,
                )
                for extracted_edge, related_edges, existing_edges, extracted_edge_types in zip(
                    extracted_edges,
                    related_edges_lists,
                    edge_invalidation_candidates,
                    edge_types_lst,
                    strict=True,
                )
            ]
        )
    )

    resolved_edges: list[EntityEdge] = []
    invalidated_edges: list[EntityEdge] = []
    for result in results:
        resolved_edge = result[0]
        invalidated_edge_chunk = result[1]

        resolved_edges.append(resolved_edge)
        invalidated_edges.extend(invalidated_edge_chunk)

    logger.debug(f'Resolved edges: {[(e.name, e.uuid) for e in resolved_edges]}')

    await semaphore_gather(
        create_entity_edge_embeddings(embedder, resolved_edges),
        create_entity_edge_embeddings(embedder, invalidated_edges),
    )

    return resolved_edges, invalidated_edges


def resolve_edge_contradictions(
    resolved_edge: EntityEdge, invalidation_candidates: list[EntityEdge]
) -> list[EntityEdge]:
    if len(invalidation_candidates) == 0:
        return []

    # Determine which contradictory edges need to be expired
    invalidated_edges: list[EntityEdge] = []
    for edge in invalidation_candidates:
        # (Edge invalid before new edge becomes valid) or (new edge invalid before edge becomes valid)
        edge_invalid_at_utc = ensure_utc(edge.invalid_at)
        resolved_edge_valid_at_utc = ensure_utc(resolved_edge.valid_at)
        edge_valid_at_utc = ensure_utc(edge.valid_at)
        resolved_edge_invalid_at_utc = ensure_utc(resolved_edge.invalid_at)

        if (
            edge_invalid_at_utc is not None
            and resolved_edge_valid_at_utc is not None
            and edge_invalid_at_utc <= resolved_edge_valid_at_utc
        ) or (
            edge_valid_at_utc is not None
            and resolved_edge_invalid_at_utc is not None
            and resolved_edge_invalid_at_utc <= edge_valid_at_utc
        ):
            continue
        # New edge invalidates edge
        elif (
            edge_valid_at_utc is not None
            and resolved_edge_valid_at_utc is not None
            and edge_valid_at_utc < resolved_edge_valid_at_utc
        ):
            edge.invalid_at = resolved_edge.valid_at
            edge.expired_at = edge.expired_at if edge.expired_at is not None else utc_now()
            invalidated_edges.append(edge)

    return invalidated_edges


async def resolve_extracted_edge(
    llm_client: LLMClient,
    extracted_edge: EntityEdge,
    related_edges: list[EntityEdge],
    existing_edges: list[EntityEdge],
    episode: EpisodicNode,
    edge_type_candidates: dict[str, type[BaseModel]] | None = None,
    custom_edge_type_names: set[str] | None = None,
) -> tuple[EntityEdge, list[EntityEdge], list[EntityEdge]]:
    """Resolve an extracted edge against existing graph context.

    Parameters
    ----------
    llm_client : LLMClient
        Client used to invoke the LLM for deduplication and attribute extraction.
    extracted_edge : EntityEdge
        Newly extracted edge whose canonical representation is being resolved.
    related_edges : list[EntityEdge]
        Candidate edges with identical endpoints used for duplicate detection.
    existing_edges : list[EntityEdge]
        Broader set of edges evaluated for contradiction / invalidation.
    episode : EpisodicNode
        Episode providing content context when extracting edge attributes.
    edge_type_candidates : dict[str, type[BaseModel]] | None
        Custom edge types permitted for the current source/target signature.
    custom_edge_type_names : set[str] | None
        Full catalog of registered custom edge names. Used to distinguish
        between disallowed custom types (which fall back to the default label)
        and ad-hoc labels emitted by the LLM.

    Returns
    -------
    tuple[EntityEdge, list[EntityEdge], list[EntityEdge]]
        The resolved edge, any duplicates, and edges to invalidate.
    """
    if len(related_edges) == 0 and len(existing_edges) == 0:
        return extracted_edge, [], []

    # Fast path: if the fact text and endpoints already exist verbatim, reuse the matching edge.
    normalized_fact = _normalize_string_exact(extracted_edge.fact)
    for edge in related_edges:
        if (
            edge.source_node_uuid == extracted_edge.source_node_uuid
            and edge.target_node_uuid == extracted_edge.target_node_uuid
            and _normalize_string_exact(edge.fact) == normalized_fact
        ):
            resolved = edge
            if episode is not None and episode.uuid not in resolved.episodes:
                resolved.episodes.append(episode.uuid)
            return resolved, [], []

    start = time()

    # Prepare context for LLM
    related_edges_context = [{'idx': i, 'fact': edge.fact} for i, edge in enumerate(related_edges)]

    invalidation_edge_candidates_context = [
        {'idx': i, 'fact': existing_edge.fact} for i, existing_edge in enumerate(existing_edges)
    ]

    edge_types_context = (
        [
            {
                'fact_type_name': type_name,
                'fact_type_description': type_model.__doc__,
            }
            for type_name, type_model in edge_type_candidates.items()
        ]
        if edge_type_candidates is not None
        else []
    )

    context = {
        'existing_edges': related_edges_context,
        'new_edge': extracted_edge.fact,
        'edge_invalidation_candidates': invalidation_edge_candidates_context,
        'edge_types': edge_types_context,
    }

    if related_edges or existing_edges:
        logger.debug(
            'Resolving edge: sent %d EXISTING FACTS%s and %d INVALIDATION CANDIDATES%s',
            len(related_edges),
            f' (idx 0-{len(related_edges) - 1})' if related_edges else '',
            len(existing_edges),
            f' (idx 0-{len(existing_edges) - 1})' if existing_edges else '',
        )

    llm_response = await llm_client.generate_response(
        prompt_library.dedupe_edges.resolve_edge(context),
        response_model=EdgeDuplicate,
        model_size=ModelSize.small,
        prompt_name='dedupe_edges.resolve_edge',
    )
    response_object = EdgeDuplicate(**llm_response)
    duplicate_facts = response_object.duplicate_facts

    # Validate duplicate_facts are in valid range for EXISTING FACTS
    invalid_duplicates = [i for i in duplicate_facts if i < 0 or i >= len(related_edges)]
    if invalid_duplicates:
        logger.warning(
            'LLM returned invalid duplicate_facts idx values %s (valid range: 0-%d for EXISTING FACTS)',
            invalid_duplicates,
            len(related_edges) - 1,
        )

    duplicate_fact_ids: list[int] = [i for i in duplicate_facts if 0 <= i < len(related_edges)]

    resolved_edge = extracted_edge
    for duplicate_fact_id in duplicate_fact_ids:
        resolved_edge = related_edges[duplicate_fact_id]
        break

    if duplicate_fact_ids and episode is not None:
        resolved_edge.episodes.append(episode.uuid)

    contradicted_facts: list[int] = response_object.contradicted_facts

    # Validate contradicted_facts are in valid range for INVALIDATION CANDIDATES
    invalid_contradictions = [i for i in contradicted_facts if i < 0 or i >= len(existing_edges)]
    if invalid_contradictions:
        logger.warning(
            'LLM returned invalid contradicted_facts idx values %s (valid range: 0-%d for INVALIDATION CANDIDATES)',
            invalid_contradictions,
            len(existing_edges) - 1,
        )

    invalidation_candidates: list[EntityEdge] = [
        existing_edges[i] for i in contradicted_facts if 0 <= i < len(existing_edges)
    ]

    fact_type: str = response_object.fact_type
    candidate_type_names = set(edge_type_candidates or {})
    custom_type_names = custom_edge_type_names or set()

    is_default_type = fact_type.upper() == 'DEFAULT'
    is_custom_type = fact_type in custom_type_names
    is_allowed_custom_type = fact_type in candidate_type_names

    if is_allowed_custom_type:
        # The LLM selected a custom type that is allowed for the node pair.
        # Adopt the custom type and, if needed, extract its structured attributes.
        resolved_edge.name = fact_type

        edge_attributes_context = {
            'episode_content': episode.content,
            'reference_time': episode.valid_at,
            'fact': resolved_edge.fact,
        }

        edge_model = edge_type_candidates.get(fact_type) if edge_type_candidates else None
        if edge_model is not None and len(edge_model.model_fields) != 0:
            edge_attributes_response = await llm_client.generate_response(
                prompt_library.extract_edges.extract_attributes(edge_attributes_context),
                response_model=edge_model,  # type: ignore
                model_size=ModelSize.small,
                prompt_name='extract_edges.extract_attributes',
            )

            resolved_edge.attributes = edge_attributes_response
    elif not is_default_type and is_custom_type:
        # The LLM picked a custom type that is not allowed for this signature.
        # Reset to the default label and drop any structured attributes.
        resolved_edge.name = DEFAULT_EDGE_NAME
        resolved_edge.attributes = {}
    elif not is_default_type:
        # Non-custom labels are allowed to pass through so long as the LLM does
        # not return the sentinel DEFAULT value.
        resolved_edge.name = fact_type
        resolved_edge.attributes = {}

    end = time()
    logger.debug(
        f'Resolved Edge: {extracted_edge.name} is {resolved_edge.name}, in {(end - start) * 1000} ms'
    )

    now = utc_now()

    if resolved_edge.invalid_at and not resolved_edge.expired_at:
        resolved_edge.expired_at = now

    # Determine if the new_edge needs to be expired
    if resolved_edge.expired_at is None:
        invalidation_candidates.sort(key=lambda c: (c.valid_at is None, ensure_utc(c.valid_at)))
        for candidate in invalidation_candidates:
            candidate_valid_at_utc = ensure_utc(candidate.valid_at)
            resolved_edge_valid_at_utc = ensure_utc(resolved_edge.valid_at)
            if (
                candidate_valid_at_utc is not None
                and resolved_edge_valid_at_utc is not None
                and candidate_valid_at_utc > resolved_edge_valid_at_utc
            ):
                # Expire new edge since we have information about more recent events
                resolved_edge.invalid_at = candidate.valid_at
                resolved_edge.expired_at = now
                break

    # Determine which contradictory edges need to be expired
    invalidated_edges: list[EntityEdge] = resolve_edge_contradictions(
        resolved_edge, invalidation_candidates
    )
    duplicate_edges: list[EntityEdge] = [related_edges[idx] for idx in duplicate_fact_ids]

    return resolved_edge, invalidated_edges, duplicate_edges


async def filter_existing_duplicate_of_edges(
    driver: GraphDriver, duplicates_node_tuples: list[tuple[EntityNode, EntityNode]]
) -> list[tuple[EntityNode, EntityNode]]:
    if not duplicates_node_tuples:
        return []

    duplicate_nodes_map = {
        (source.uuid, target.uuid): (source, target) for source, target in duplicates_node_tuples
    }

    if driver.provider == GraphProvider.NEPTUNE:
        query: LiteralString = """
            UNWIND $duplicate_node_uuids AS duplicate_tuple
            MATCH (n:Entity {uuid: duplicate_tuple.source})-[r:RELATES_TO {name: 'IS_DUPLICATE_OF'}]->(m:Entity {uuid: duplicate_tuple.target})
            RETURN DISTINCT
                n.uuid AS source_uuid,
                m.uuid AS target_uuid
        """

        duplicate_nodes = [
            {'source': source.uuid, 'target': target.uuid}
            for source, target in duplicates_node_tuples
        ]

        records, _, _ = await driver.execute_query(
            query,
            duplicate_node_uuids=duplicate_nodes,
            routing_='r',
        )
    else:
        if driver.provider == GraphProvider.KUZU:
            query = """
                UNWIND $duplicate_node_uuids AS duplicate
                MATCH (n:Entity {uuid: duplicate.src})-[:RELATES_TO]->(e:RelatesToNode_ {name: 'IS_DUPLICATE_OF'})-[:RELATES_TO]->(m:Entity {uuid: duplicate.dst})
                RETURN DISTINCT
                    n.uuid AS source_uuid,
                    m.uuid AS target_uuid
            """
            duplicate_node_uuids = [{'src': src, 'dst': dst} for src, dst in duplicate_nodes_map]
        else:
            query: LiteralString = """
                UNWIND $duplicate_node_uuids AS duplicate_tuple
                MATCH (n:Entity {uuid: duplicate_tuple[0]})-[r:RELATES_TO {name: 'IS_DUPLICATE_OF'}]->(m:Entity {uuid: duplicate_tuple[1]})
                RETURN DISTINCT
                    n.uuid AS source_uuid,
                    m.uuid AS target_uuid
            """
            duplicate_node_uuids = list(duplicate_nodes_map.keys())

        records, _, _ = await driver.execute_query(
            query,
            duplicate_node_uuids=duplicate_node_uuids,
            routing_='r',
        )

    # Remove duplicates that already have the IS_DUPLICATE_OF edge
    for record in records:
        duplicate_tuple = (record.get('source_uuid'), record.get('target_uuid'))
        if duplicate_nodes_map.get(duplicate_tuple):
            duplicate_nodes_map.pop(duplicate_tuple)

    return list(duplicate_nodes_map.values())

```

--------------------------------------------------------------------------------
/tests/utils/test_content_chunking.py:
--------------------------------------------------------------------------------

```python
"""
Copyright 2024, Zep Software, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import json

from graphiti_core.nodes import EpisodeType
from graphiti_core.utils.content_chunking import (
    CHARS_PER_TOKEN,
    _count_json_keys,
    _json_likely_dense,
    _text_likely_dense,
    chunk_json_content,
    chunk_message_content,
    chunk_text_content,
    estimate_tokens,
    generate_covering_chunks,
    should_chunk,
)


class TestEstimateTokens:
    def test_empty_string(self):
        assert estimate_tokens('') == 0

    def test_short_string(self):
        # 4 chars per token
        assert estimate_tokens('abcd') == 1
        assert estimate_tokens('abcdefgh') == 2

    def test_long_string(self):
        text = 'a' * 400
        assert estimate_tokens(text) == 100

    def test_uses_chars_per_token_constant(self):
        text = 'x' * (CHARS_PER_TOKEN * 10)
        assert estimate_tokens(text) == 10


class TestChunkJsonArray:
    def test_small_array_no_chunking(self):
        data = [{'name': 'Alice'}, {'name': 'Bob'}]
        content = json.dumps(data)
        chunks = chunk_json_content(content, chunk_size_tokens=1000)
        assert len(chunks) == 1
        assert json.loads(chunks[0]) == data

    def test_empty_array(self):
        chunks = chunk_json_content('[]', chunk_size_tokens=100)
        assert chunks == ['[]']

    def test_array_splits_at_element_boundaries(self):
        # Create array that exceeds chunk size
        data = [{'id': i, 'data': 'x' * 100} for i in range(20)]
        content = json.dumps(data)

        # Use small chunk size to force splitting
        chunks = chunk_json_content(content, chunk_size_tokens=100, overlap_tokens=20)

        # Verify all chunks are valid JSON arrays
        for chunk in chunks:
            parsed = json.loads(chunk)
            assert isinstance(parsed, list)
            # Each element should be a complete object
            for item in parsed:
                assert 'id' in item
                assert 'data' in item

    def test_array_preserves_all_elements(self):
        data = [{'id': i} for i in range(10)]
        content = json.dumps(data)

        chunks = chunk_json_content(content, chunk_size_tokens=50, overlap_tokens=10)

        # Collect all unique IDs across chunks (accounting for overlap)
        seen_ids = set()
        for chunk in chunks:
            parsed = json.loads(chunk)
            for item in parsed:
                seen_ids.add(item['id'])

        # All original IDs should be present
        assert seen_ids == set(range(10))


class TestChunkJsonObject:
    def test_small_object_no_chunking(self):
        data = {'name': 'Alice', 'age': 30}
        content = json.dumps(data)
        chunks = chunk_json_content(content, chunk_size_tokens=1000)
        assert len(chunks) == 1
        assert json.loads(chunks[0]) == data

    def test_empty_object(self):
        chunks = chunk_json_content('{}', chunk_size_tokens=100)
        assert chunks == ['{}']

    def test_object_splits_at_key_boundaries(self):
        # Create object that exceeds chunk size
        data = {f'key_{i}': 'x' * 100 for i in range(20)}
        content = json.dumps(data)

        chunks = chunk_json_content(content, chunk_size_tokens=100, overlap_tokens=20)

        # Verify all chunks are valid JSON objects
        for chunk in chunks:
            parsed = json.loads(chunk)
            assert isinstance(parsed, dict)
            # Each key-value pair should be complete
            for key in parsed:
                assert key.startswith('key_')

    def test_object_preserves_all_keys(self):
        data = {f'key_{i}': f'value_{i}' for i in range(10)}
        content = json.dumps(data)

        chunks = chunk_json_content(content, chunk_size_tokens=50, overlap_tokens=10)

        # Collect all unique keys across chunks
        seen_keys = set()
        for chunk in chunks:
            parsed = json.loads(chunk)
            seen_keys.update(parsed.keys())

        # All original keys should be present
        expected_keys = {f'key_{i}' for i in range(10)}
        assert seen_keys == expected_keys


class TestChunkJsonInvalid:
    def test_invalid_json_falls_back_to_text(self):
        invalid_json = 'not valid json {'
        chunks = chunk_json_content(invalid_json, chunk_size_tokens=1000)
        # Should fall back to text chunking
        assert len(chunks) >= 1
        assert invalid_json in chunks[0]

    def test_scalar_value_returns_as_is(self):
        for scalar in ['"string"', '123', 'true', 'null']:
            chunks = chunk_json_content(scalar, chunk_size_tokens=1000)
            assert chunks == [scalar]


class TestChunkTextContent:
    def test_small_text_no_chunking(self):
        text = 'This is a short text.'
        chunks = chunk_text_content(text, chunk_size_tokens=1000)
        assert len(chunks) == 1
        assert chunks[0] == text

    def test_splits_at_paragraph_boundaries(self):
        paragraphs = ['Paragraph one.', 'Paragraph two.', 'Paragraph three.']
        text = '\n\n'.join(paragraphs)

        # Use small chunk size to force splitting
        chunks = chunk_text_content(text, chunk_size_tokens=10, overlap_tokens=5)

        # Each chunk should contain complete paragraphs (possibly with overlap)
        for chunk in chunks:
            # Should not have partial words cut off mid-paragraph
            assert not chunk.endswith(' ')

    def test_splits_at_sentence_boundaries_for_large_paragraphs(self):
        # Create a single long paragraph with multiple sentences
        sentences = ['This is sentence number ' + str(i) + '.' for i in range(20)]
        long_paragraph = ' '.join(sentences)

        chunks = chunk_text_content(long_paragraph, chunk_size_tokens=50, overlap_tokens=10)

        # Should have multiple chunks
        assert len(chunks) > 1
        # Each chunk should end at a sentence boundary where possible
        for chunk in chunks[:-1]:  # All except last
            # Should end with sentence punctuation or continue to next chunk
            assert chunk[-1] in '.!? ' or True  # Allow flexibility

    def test_preserves_text_completeness(self):
        text = 'Alpha beta gamma delta epsilon zeta eta theta.'
        chunks = chunk_text_content(text, chunk_size_tokens=10, overlap_tokens=2)

        # All words should appear in at least one chunk
        all_words = set(text.replace('.', '').split())
        found_words = set()
        for chunk in chunks:
            found_words.update(chunk.replace('.', '').split())

        assert all_words <= found_words


class TestChunkMessageContent:
    def test_small_message_no_chunking(self):
        content = 'Alice: Hello!\nBob: Hi there!'
        chunks = chunk_message_content(content, chunk_size_tokens=1000)
        assert len(chunks) == 1
        assert chunks[0] == content

    def test_preserves_speaker_message_format(self):
        messages = [f'Speaker{i}: This is message number {i}.' for i in range(10)]
        content = '\n'.join(messages)

        chunks = chunk_message_content(content, chunk_size_tokens=50, overlap_tokens=10)

        # Each chunk should have complete speaker:message pairs
        for chunk in chunks:
            lines = [line for line in chunk.split('\n') if line.strip()]
            for line in lines:
                # Should have speaker: format
                assert ':' in line

    def test_json_message_array_format(self):
        messages = [{'role': 'user', 'content': f'Message {i}'} for i in range(10)]
        content = json.dumps(messages)

        chunks = chunk_message_content(content, chunk_size_tokens=50, overlap_tokens=10)

        # Each chunk should be valid JSON array
        for chunk in chunks:
            parsed = json.loads(chunk)
            assert isinstance(parsed, list)
            for msg in parsed:
                assert 'role' in msg
                assert 'content' in msg


class TestChunkOverlap:
    def test_json_array_overlap_captures_boundary_elements(self):
        data = [{'id': i, 'name': f'Entity {i}'} for i in range(10)]
        content = json.dumps(data)

        # Use settings that will create overlap
        chunks = chunk_json_content(content, chunk_size_tokens=80, overlap_tokens=30)

        if len(chunks) > 1:
            # Check that adjacent chunks share some elements
            for i in range(len(chunks) - 1):
                current = json.loads(chunks[i])
                next_chunk = json.loads(chunks[i + 1])

                # Get IDs from end of current and start of next
                current_ids = {item['id'] for item in current}
                next_ids = {item['id'] for item in next_chunk}

                # There should be overlap (shared IDs)
                # Note: overlap may be empty if elements are large
                # The test verifies the structure, not exact overlap amount
                _ = current_ids & next_ids

    def test_text_overlap_captures_boundary_text(self):
        paragraphs = [f'Paragraph {i} with some content here.' for i in range(10)]
        text = '\n\n'.join(paragraphs)

        chunks = chunk_text_content(text, chunk_size_tokens=50, overlap_tokens=20)

        if len(chunks) > 1:
            # Adjacent chunks should have some shared content
            for i in range(len(chunks) - 1):
                current_words = set(chunks[i].split())
                next_words = set(chunks[i + 1].split())

                # There should be some overlap
                overlap = current_words & next_words
                # At minimum, common words like 'Paragraph', 'with', etc.
                assert len(overlap) > 0


class TestEdgeCases:
    def test_very_large_single_element(self):
        # Single element larger than chunk size
        data = [{'content': 'x' * 10000}]
        content = json.dumps(data)

        chunks = chunk_json_content(content, chunk_size_tokens=100, overlap_tokens=10)

        # Should handle gracefully - may return single chunk or fall back
        assert len(chunks) >= 1

    def test_empty_content(self):
        assert chunk_text_content('', chunk_size_tokens=100) == ['']
        assert chunk_message_content('', chunk_size_tokens=100) == ['']

    def test_whitespace_only(self):
        chunks = chunk_text_content('   \n\n   ', chunk_size_tokens=100)
        assert len(chunks) >= 1


class TestShouldChunk:
    def test_empty_content_never_chunks(self):
        """Empty content should never chunk."""
        assert not should_chunk('', EpisodeType.text)
        assert not should_chunk('', EpisodeType.json)

    def test_short_content_never_chunks(self, monkeypatch):
        """Short content should never chunk regardless of density."""
        from graphiti_core.utils import content_chunking

        # Set very low thresholds that would normally trigger chunking
        monkeypatch.setattr(content_chunking, 'CHUNK_DENSITY_THRESHOLD', 0.001)
        monkeypatch.setattr(content_chunking, 'CHUNK_MIN_TOKENS', 1000)

        # Dense but short JSON (~200 tokens, below 1000 minimum)
        dense_data = [{'name': f'Entity{i}'} for i in range(50)]
        dense_json = json.dumps(dense_data)
        assert not should_chunk(dense_json, EpisodeType.json)

    def test_high_density_large_json_chunks(self, monkeypatch):
        """Large high-density JSON should trigger chunking."""
        from graphiti_core.utils import content_chunking

        monkeypatch.setattr(content_chunking, 'CHUNK_DENSITY_THRESHOLD', 0.01)
        monkeypatch.setattr(content_chunking, 'CHUNK_MIN_TOKENS', 500)

        # Dense JSON: many elements, large enough to exceed minimum
        dense_data = [{'name': f'Entity{i}', 'desc': 'x' * 20} for i in range(200)]
        dense_json = json.dumps(dense_data)
        assert should_chunk(dense_json, EpisodeType.json)

    def test_low_density_text_no_chunk(self, monkeypatch):
        """Low-density prose should not trigger chunking."""
        from graphiti_core.utils import content_chunking

        monkeypatch.setattr(content_chunking, 'CHUNK_DENSITY_THRESHOLD', 0.05)
        monkeypatch.setattr(content_chunking, 'CHUNK_MIN_TOKENS', 100)

        # Low-density prose: mostly lowercase narrative
        prose = 'the quick brown fox jumps over the lazy dog. ' * 50
        assert not should_chunk(prose, EpisodeType.text)

    def test_low_density_json_no_chunk(self, monkeypatch):
        """Low-density JSON (few elements, lots of content) should not chunk."""
        from graphiti_core.utils import content_chunking

        monkeypatch.setattr(content_chunking, 'CHUNK_DENSITY_THRESHOLD', 0.05)
        monkeypatch.setattr(content_chunking, 'CHUNK_MIN_TOKENS', 100)

        # Sparse JSON: few elements with lots of content each
        sparse_data = [{'content': 'x' * 1000}, {'content': 'y' * 1000}]
        sparse_json = json.dumps(sparse_data)
        assert not should_chunk(sparse_json, EpisodeType.json)


class TestJsonDensityEstimation:
    def test_dense_array_detected(self, monkeypatch):
        """Arrays with many elements should be detected as dense."""
        from graphiti_core.utils import content_chunking

        monkeypatch.setattr(content_chunking, 'CHUNK_DENSITY_THRESHOLD', 0.01)

        # Array with 100 elements, ~800 chars = 200 tokens
        # Density = 100/200 * 1000 = 500, threshold = 10
        data = [{'id': i} for i in range(100)]
        content = json.dumps(data)
        tokens = estimate_tokens(content)

        assert _json_likely_dense(content, tokens)

    def test_sparse_array_not_dense(self, monkeypatch):
        """Arrays with few elements should not be detected as dense."""
        from graphiti_core.utils import content_chunking

        monkeypatch.setattr(content_chunking, 'CHUNK_DENSITY_THRESHOLD', 0.05)

        # Array with 2 elements but lots of content each
        data = [{'content': 'x' * 1000}, {'content': 'y' * 1000}]
        content = json.dumps(data)
        tokens = estimate_tokens(content)

        assert not _json_likely_dense(content, tokens)

    def test_dense_object_detected(self, monkeypatch):
        """Objects with many keys should be detected as dense."""
        from graphiti_core.utils import content_chunking

        monkeypatch.setattr(content_chunking, 'CHUNK_DENSITY_THRESHOLD', 0.01)

        # Object with 50 keys
        data = {f'key_{i}': f'value_{i}' for i in range(50)}
        content = json.dumps(data)
        tokens = estimate_tokens(content)

        assert _json_likely_dense(content, tokens)

    def test_count_json_keys_shallow(self):
        """Key counting should work for nested structures."""
        data = {
            'a': 1,
            'b': {'c': 2, 'd': 3},
            'e': [{'f': 4}, {'g': 5}],
        }
        # At depth 2: a, b, c, d, e, f, g = 7 keys
        assert _count_json_keys(data, max_depth=2) == 7

    def test_count_json_keys_depth_limit(self):
        """Key counting should respect depth limit."""
        data = {
            'a': {'b': {'c': {'d': 1}}},
        }
        # At depth 1: only 'a'
        assert _count_json_keys(data, max_depth=1) == 1
        # At depth 2: 'a' and 'b'
        assert _count_json_keys(data, max_depth=2) == 2


class TestTextDensityEstimation:
    def test_entity_rich_text_detected(self, monkeypatch):
        """Text with many proper nouns should be detected as dense."""
        from graphiti_core.utils import content_chunking

        monkeypatch.setattr(content_chunking, 'CHUNK_DENSITY_THRESHOLD', 0.01)

        # Entity-rich text: many capitalized names
        text = 'Alice met Bob at Acme Corp. Then Carol and David joined them. '
        text += 'Eve from Globex introduced Frank and Grace. '
        text += 'Later Henry and Iris arrived from Initech. '
        text = text * 10
        tokens = estimate_tokens(text)

        assert _text_likely_dense(text, tokens)

    def test_prose_not_dense(self, monkeypatch):
        """Narrative prose should not be detected as dense."""
        from graphiti_core.utils import content_chunking

        monkeypatch.setattr(content_chunking, 'CHUNK_DENSITY_THRESHOLD', 0.05)

        # Low-entity prose
        prose = """
        the sun was setting over the horizon as the old man walked slowly
        down the dusty road. he had been traveling for many days and his
        feet were tired. the journey had been long but he knew that soon
        he would reach his destination. the wind whispered through the trees
        and the birds sang their evening songs.
        """
        prose = prose * 10
        tokens = estimate_tokens(prose)

        assert not _text_likely_dense(prose, tokens)

    def test_sentence_starters_ignored(self, monkeypatch):
        """Capitalized words after periods should be ignored."""
        from graphiti_core.utils import content_chunking

        monkeypatch.setattr(content_chunking, 'CHUNK_DENSITY_THRESHOLD', 0.05)

        # Many sentences but no mid-sentence proper nouns
        text = 'This is a sentence. Another one follows. Yet another here. '
        text = text * 50
        tokens = estimate_tokens(text)

        # Should not be dense since capitals are sentence starters
        assert not _text_likely_dense(text, tokens)


class TestGenerateCoveringChunks:
    """Tests for the greedy covering chunks algorithm (Handshake Flights Problem)."""

    def test_empty_list(self):
        """Empty list should return single chunk with empty items."""
        result = generate_covering_chunks([], k=3)
        # n=0 <= k=3, so returns single chunk with empty items
        assert result == [([], [])]

    def test_single_item(self):
        """Single item should return one chunk with that item."""
        items = ['A']
        result = generate_covering_chunks(items, k=3)
        assert len(result) == 1
        assert result[0] == (['A'], [0])

    def test_items_fit_in_single_chunk(self):
        """When n <= k, all items should be in one chunk."""
        items = ['A', 'B', 'C']
        result = generate_covering_chunks(items, k=5)
        assert len(result) == 1
        chunk_items, indices = result[0]
        assert chunk_items == items
        assert indices == [0, 1, 2]

    def test_items_equal_to_k(self):
        """When n == k, all items should be in one chunk."""
        items = ['A', 'B', 'C', 'D']
        result = generate_covering_chunks(items, k=4)
        assert len(result) == 1
        chunk_items, indices = result[0]
        assert chunk_items == items
        assert indices == [0, 1, 2, 3]

    def test_all_pairs_covered_k2(self):
        """With k=2, every pair of items must appear in exactly one chunk."""
        items = ['A', 'B', 'C', 'D']
        result = generate_covering_chunks(items, k=2)

        # Collect all pairs from chunks
        covered_pairs = set()
        for _, indices in result:
            assert len(indices) == 2
            pair = frozenset(indices)
            covered_pairs.add(pair)

        # All C(4,2) = 6 pairs should be covered
        expected_pairs = {
            frozenset([0, 1]),
            frozenset([0, 2]),
            frozenset([0, 3]),
            frozenset([1, 2]),
            frozenset([1, 3]),
            frozenset([2, 3]),
        }
        assert covered_pairs == expected_pairs

    def test_all_pairs_covered_k3(self):
        """With k=3, every pair must appear in at least one chunk."""
        items = list(range(6))  # 0, 1, 2, 3, 4, 5
        result = generate_covering_chunks(items, k=3)

        # Collect all covered pairs
        covered_pairs: set[frozenset[int]] = set()
        for _, indices in result:
            assert len(indices) == 3
            # Each chunk of 3 covers C(3,2) = 3 pairs
            for i in range(len(indices)):
                for j in range(i + 1, len(indices)):
                    covered_pairs.add(frozenset([indices[i], indices[j]]))

        # All C(6,2) = 15 pairs should be covered
        expected_pairs = {frozenset([i, j]) for i in range(6) for j in range(i + 1, 6)}
        assert covered_pairs == expected_pairs

    def test_all_pairs_covered_larger(self):
        """Verify all pairs covered for larger input."""
        items = list(range(10))
        result = generate_covering_chunks(items, k=4)

        # Collect all covered pairs
        covered_pairs: set[frozenset[int]] = set()
        for _, indices in result:
            assert len(indices) == 4
            for i in range(len(indices)):
                for j in range(i + 1, len(indices)):
                    covered_pairs.add(frozenset([indices[i], indices[j]]))

        # All C(10,2) = 45 pairs should be covered
        expected_pairs = {frozenset([i, j]) for i in range(10) for j in range(i + 1, 10)}
        assert covered_pairs == expected_pairs

    def test_index_mapping_correctness(self):
        """Global indices should correctly map to original items."""
        items = ['Alice', 'Bob', 'Carol', 'Dave', 'Eve']
        result = generate_covering_chunks(items, k=3)

        for chunk_items, indices in result:
            # Each chunk item should match the item at the corresponding global index
            for local_idx, global_idx in enumerate(indices):
                assert chunk_items[local_idx] == items[global_idx]

    def test_greedy_minimizes_chunks(self):
        """Greedy approach should produce reasonably few chunks.

        For n=6, k=3: Each chunk covers C(3,2)=3 pairs.
        Total pairs = C(6,2) = 15.
        Lower bound = ceil(15/3) = 5 chunks.
        Schönheim bound = ceil(6/3 * ceil(5/2)) = ceil(2 * 3) = 6 chunks.

        Note: When random sampling is used (large n,k), the fallback mechanism
        may create additional small chunks to cover remaining pairs, so the
        upper bound is not guaranteed.
        """
        items = list(range(6))
        result = generate_covering_chunks(items, k=3)

        # For small inputs (exhaustive enumeration), should achieve near-optimal
        # Should be at least the simple lower bound (5 for this case)
        assert len(result) >= 5

        # Verify all pairs are covered (the primary guarantee)
        covered_pairs: set[frozenset[int]] = set()
        for _, indices in result:
            for i in range(len(indices)):
                for j in range(i + 1, len(indices)):
                    covered_pairs.add(frozenset([indices[i], indices[j]]))
        expected_pairs = {frozenset([i, j]) for i in range(6) for j in range(i + 1, 6)}
        assert covered_pairs == expected_pairs

    def test_works_with_custom_types(self):
        """Function should work with any type, not just strings/ints."""

        class Entity:
            def __init__(self, name: str):
                self.name = name

        items = [Entity('A'), Entity('B'), Entity('C'), Entity('D')]
        result = generate_covering_chunks(items, k=2)

        # Verify structure
        assert len(result) > 0
        for chunk_items, indices in result:
            assert len(chunk_items) == 2
            assert len(indices) == 2
            # Items should be Entity objects
            for item in chunk_items:
                assert isinstance(item, Entity)

    def test_deterministic_output(self):
        """Same input should produce same output."""
        items = list(range(8))
        result1 = generate_covering_chunks(items, k=3)
        result2 = generate_covering_chunks(items, k=3)

        assert len(result1) == len(result2)
        for (chunk1, idx1), (chunk2, idx2) in zip(result1, result2, strict=True):
            assert chunk1 == chunk2
            assert idx1 == idx2

    def test_all_pairs_covered_k15_n30(self):
        """Verify all pairs covered for n=30, k=15 (realistic edge extraction scenario).

        For n=30, k=15:
        - Total pairs = C(30,2) = 435
        - Pairs per chunk = C(15,2) = 105
        - Lower bound = ceil(435/105) = 5 chunks
        - Schönheim bound = ceil(6/3 * ceil(5/2)) = ceil(2 * 3) = 6 chunks

        Note: When random sampling is used, the fallback mechanism may create
        additional small chunks (size 2) to cover remaining pairs, so chunk
        sizes may vary and the upper bound on chunk count is not guaranteed.
        """
        n = 30
        k = 15
        items = list(range(n))
        result = generate_covering_chunks(items, k=k)

        # Verify chunk sizes are at most k (fallback chunks may be smaller)
        for _, indices in result:
            assert len(indices) <= k, f'Expected chunk size <= {k}, got {len(indices)}'

        # Collect all covered pairs
        covered_pairs: set[frozenset[int]] = set()
        for _, indices in result:
            for i in range(len(indices)):
                for j in range(i + 1, len(indices)):
                    covered_pairs.add(frozenset([indices[i], indices[j]]))

        # All C(30,2) = 435 pairs should be covered
        expected_pairs = {frozenset([i, j]) for i in range(n) for j in range(i + 1, n)}
        assert len(expected_pairs) == 435, f'Expected 435 pairs, got {len(expected_pairs)}'
        assert covered_pairs == expected_pairs, (
            f'Missing {len(expected_pairs - covered_pairs)} pairs: {expected_pairs - covered_pairs}'
        )

        # Verify chunk count is at least the lower bound
        assert len(result) >= 5, f'Expected at least 5 chunks, got {len(result)}'

    def test_all_pairs_covered_with_random_sampling(self):
        """Verify all pairs covered when random sampling is triggered.

        When C(n,k) > MAX_COMBINATIONS_TO_EVALUATE, the algorithm uses random
        sampling instead of exhaustive enumeration. This test ensures the
        fallback logic covers any pairs missed by the greedy sampling.
        """
        import random

        # n=50, k=5 triggers sampling since C(50,5) = 2,118,760 > 1000
        n = 50
        k = 5
        items = list(range(n))

        # Test with multiple random seeds to ensure robustness
        for seed in range(5):
            random.seed(seed)
            result = generate_covering_chunks(items, k=k)

            # Collect all covered pairs
            covered_pairs: set[frozenset[int]] = set()
            for _, indices in result:
                for i in range(len(indices)):
                    for j in range(i + 1, len(indices)):
                        covered_pairs.add(frozenset([indices[i], indices[j]]))

            # All C(50,2) = 1225 pairs should be covered
            expected_pairs = {frozenset([i, j]) for i in range(n) for j in range(i + 1, n)}
            assert len(expected_pairs) == 1225
            assert covered_pairs == expected_pairs, (
                f'Seed {seed}: Missing {len(expected_pairs - covered_pairs)} pairs'
            )

    def test_fallback_creates_pair_chunks_for_uncovered(self):
        """Verify fallback creates size-2 chunks for any remaining uncovered pairs.

        When the greedy algorithm breaks early (best_covered_count == 0),
        the fallback logic should create minimal chunks to cover remaining pairs.
        """
        import random

        # Use a large n with small k to stress the sampling
        n = 100
        k = 4
        items = list(range(n))

        random.seed(42)
        result = generate_covering_chunks(items, k=k)

        # Collect all covered pairs
        covered_pairs: set[frozenset[int]] = set()
        for _, indices in result:
            for i in range(len(indices)):
                for j in range(i + 1, len(indices)):
                    covered_pairs.add(frozenset([indices[i], indices[j]]))

        # All C(100,2) = 4950 pairs must be covered
        expected_pairs = {frozenset([i, j]) for i in range(n) for j in range(i + 1, n)}
        assert len(expected_pairs) == 4950
        assert covered_pairs == expected_pairs, (
            f'Missing {len(expected_pairs - covered_pairs)} pairs'
        )

    def test_duplicate_sampling_safety(self):
        """Verify the algorithm handles duplicate random samples gracefully.

        When k is large relative to n, there are fewer unique combinations
        and random sampling may generate many duplicates. The safety counter
        should prevent infinite loops.
        """
        import random

        # n=20, k=10: C(20,10) = 184,756 > 1000 triggers sampling
        # With large k relative to n, duplicates are more likely
        n = 20
        k = 10
        items = list(range(n))

        random.seed(123)
        result = generate_covering_chunks(items, k=k)

        # Collect all covered pairs
        covered_pairs: set[frozenset[int]] = set()
        for _, indices in result:
            for i in range(len(indices)):
                for j in range(i + 1, len(indices)):
                    covered_pairs.add(frozenset([indices[i], indices[j]]))

        # All C(20,2) = 190 pairs should be covered
        expected_pairs = {frozenset([i, j]) for i in range(n) for j in range(i + 1, n)}
        assert len(expected_pairs) == 190
        assert covered_pairs == expected_pairs

    def test_stress_multiple_seeds(self):
        """Stress test with multiple random seeds to ensure robustness.

        The combination of greedy sampling and fallback logic should
        guarantee all pairs are covered regardless of random seed.
        """
        import random

        n = 30
        k = 5
        items = list(range(n))
        expected_pairs = {frozenset([i, j]) for i in range(n) for j in range(i + 1, n)}

        for seed in range(10):
            random.seed(seed)
            result = generate_covering_chunks(items, k=k)

            covered_pairs: set[frozenset[int]] = set()
            for _, indices in result:
                for i in range(len(indices)):
                    for j in range(i + 1, len(indices)):
                        covered_pairs.add(frozenset([indices[i], indices[j]]))

            assert covered_pairs == expected_pairs, f'Seed {seed} failed to cover all pairs'

```

--------------------------------------------------------------------------------
/graphiti_core/nodes.py:
--------------------------------------------------------------------------------

```python
"""
Copyright 2024, Zep Software, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import json
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum
from time import time
from typing import Any
from uuid import uuid4

from pydantic import BaseModel, Field
from typing_extensions import LiteralString

from graphiti_core.driver.driver import (
    GraphDriver,
    GraphProvider,
)
from graphiti_core.embedder import EmbedderClient
from graphiti_core.errors import NodeNotFoundError
from graphiti_core.helpers import parse_db_date
from graphiti_core.models.nodes.node_db_queries import (
    COMMUNITY_NODE_RETURN,
    COMMUNITY_NODE_RETURN_NEPTUNE,
    EPISODIC_NODE_RETURN,
    EPISODIC_NODE_RETURN_NEPTUNE,
    SAGA_NODE_RETURN,
    SAGA_NODE_RETURN_NEPTUNE,
    get_community_node_save_query,
    get_entity_node_return_query,
    get_entity_node_save_query,
    get_episode_node_save_query,
    get_saga_node_save_query,
)
from graphiti_core.utils.datetime_utils import utc_now

logger = logging.getLogger(__name__)


class EpisodeType(Enum):
    """
    Enumeration of different types of episodes that can be processed.

    This enum defines the various sources or formats of episodes that the system
    can handle. It's used to categorize and potentially handle different types
    of input data differently.

    Attributes:
    -----------
    message : str
        Represents a standard message-type episode. The content for this type
        should be formatted as "actor: content". For example, "user: Hello, how are you?"
        or "assistant: I'm doing well, thank you for asking."
    json : str
        Represents an episode containing a JSON string object with structured data.
    text : str
        Represents a plain text episode.
    """

    message = 'message'
    json = 'json'
    text = 'text'

    @staticmethod
    def from_str(episode_type: str):
        if episode_type == 'message':
            return EpisodeType.message
        if episode_type == 'json':
            return EpisodeType.json
        if episode_type == 'text':
            return EpisodeType.text
        logger.error(f'Episode type: {episode_type} not implemented')
        raise NotImplementedError


class Node(BaseModel, ABC):
    uuid: str = Field(default_factory=lambda: str(uuid4()))
    name: str = Field(description='name of the node')
    group_id: str = Field(description='partition of the graph')
    labels: list[str] = Field(default_factory=list)
    created_at: datetime = Field(default_factory=lambda: utc_now())

    @abstractmethod
    async def save(self, driver: GraphDriver): ...

    async def delete(self, driver: GraphDriver):
        if driver.graph_operations_interface:
            return await driver.graph_operations_interface.node_delete(self, driver)

        match driver.provider:
            case GraphProvider.NEO4J:
                records, _, _ = await driver.execute_query(
                    """
                    MATCH (n {uuid: $uuid})
                    WHERE n:Entity OR n:Episodic OR n:Community
                    OPTIONAL MATCH (n)-[r]-()
                    WITH collect(r.uuid) AS edge_uuids, n
                    DETACH DELETE n
                    RETURN edge_uuids
                    """,
                    uuid=self.uuid,
                )

            case GraphProvider.KUZU:
                for label in ['Episodic', 'Community']:
                    await driver.execute_query(
                        f"""
                        MATCH (n:{label} {{uuid: $uuid}})
                        DETACH DELETE n
                        """,
                        uuid=self.uuid,
                    )
                # Entity edges are actually nodes in Kuzu, so simple `DETACH DELETE` will not work.
                # Explicitly delete the "edge" nodes first, then the entity node.
                await driver.execute_query(
                    """
                    MATCH (n:Entity {uuid: $uuid})-[:RELATES_TO]->(e:RelatesToNode_)
                    DETACH DELETE e
                    """,
                    uuid=self.uuid,
                )
                await driver.execute_query(
                    """
                    MATCH (n:Entity {uuid: $uuid})
                    DETACH DELETE n
                    """,
                    uuid=self.uuid,
                )
            case _:  # FalkorDB, Neptune
                for label in ['Entity', 'Episodic', 'Community']:
                    await driver.execute_query(
                        f"""
                        MATCH (n:{label} {{uuid: $uuid}})
                        DETACH DELETE n
                        """,
                        uuid=self.uuid,
                    )

        logger.debug(f'Deleted Node: {self.uuid}')

    def __hash__(self):
        return hash(self.uuid)

    def __eq__(self, other):
        if isinstance(other, Node):
            return self.uuid == other.uuid
        return False

    @classmethod
    async def delete_by_group_id(cls, driver: GraphDriver, group_id: str, batch_size: int = 100):
        if driver.graph_operations_interface:
            return await driver.graph_operations_interface.node_delete_by_group_id(
                cls, driver, group_id, batch_size
            )

        match driver.provider:
            case GraphProvider.NEO4J:
                async with driver.session() as session:
                    await session.run(
                        """
                        MATCH (n:Entity|Episodic|Community {group_id: $group_id})
                        CALL (n) {
                            DETACH DELETE n
                        } IN TRANSACTIONS OF $batch_size ROWS
                        """,
                        group_id=group_id,
                        batch_size=batch_size,
                    )

            case GraphProvider.KUZU:
                for label in ['Episodic', 'Community']:
                    await driver.execute_query(
                        f"""
                        MATCH (n:{label} {{group_id: $group_id}})
                        DETACH DELETE n
                        """,
                        group_id=group_id,
                    )
                # Entity edges are actually nodes in Kuzu, so simple `DETACH DELETE` will not work.
                # Explicitly delete the "edge" nodes first, then the entity node.
                await driver.execute_query(
                    """
                    MATCH (n:Entity {group_id: $group_id})-[:RELATES_TO]->(e:RelatesToNode_)
                    DETACH DELETE e
                    """,
                    group_id=group_id,
                )
                await driver.execute_query(
                    """
                    MATCH (n:Entity {group_id: $group_id})
                    DETACH DELETE n
                    """,
                    group_id=group_id,
                )
            case _:  # FalkorDB, Neptune
                for label in ['Entity', 'Episodic', 'Community']:
                    await driver.execute_query(
                        f"""
                        MATCH (n:{label} {{group_id: $group_id}})
                        DETACH DELETE n
                        """,
                        group_id=group_id,
                    )

    @classmethod
    async def delete_by_uuids(cls, driver: GraphDriver, uuids: list[str], batch_size: int = 100):
        if driver.graph_operations_interface:
            return await driver.graph_operations_interface.node_delete_by_uuids(
                cls, driver, uuids, group_id=None, batch_size=batch_size
            )

        match driver.provider:
            case GraphProvider.FALKORDB:
                for label in ['Entity', 'Episodic', 'Community']:
                    await driver.execute_query(
                        f"""
                        MATCH (n:{label})
                        WHERE n.uuid IN $uuids
                        DETACH DELETE n
                        """,
                        uuids=uuids,
                    )
            case GraphProvider.KUZU:
                for label in ['Episodic', 'Community']:
                    await driver.execute_query(
                        f"""
                        MATCH (n:{label})
                        WHERE n.uuid IN $uuids
                        DETACH DELETE n
                        """,
                        uuids=uuids,
                    )
                # Entity edges are actually nodes in Kuzu, so simple `DETACH DELETE` will not work.
                # Explicitly delete the "edge" nodes first, then the entity node.
                await driver.execute_query(
                    """
                    MATCH (n:Entity)-[:RELATES_TO]->(e:RelatesToNode_)
                    WHERE n.uuid IN $uuids
                    DETACH DELETE e
                    """,
                    uuids=uuids,
                )
                await driver.execute_query(
                    """
                    MATCH (n:Entity)
                    WHERE n.uuid IN $uuids
                    DETACH DELETE n
                    """,
                    uuids=uuids,
                )
            case _:  # Neo4J, Neptune
                async with driver.session() as session:
                    # Collect all edge UUIDs before deleting nodes
                    await session.run(
                        """
                        MATCH (n:Entity|Episodic|Community)
                        WHERE n.uuid IN $uuids
                        MATCH (n)-[r]-()
                        RETURN collect(r.uuid) AS edge_uuids
                        """,
                        uuids=uuids,
                    )

                    # Now delete the nodes in batches
                    await session.run(
                        """
                        MATCH (n:Entity|Episodic|Community)
                        WHERE n.uuid IN $uuids
                        CALL (n) {
                            DETACH DELETE n
                        } IN TRANSACTIONS OF $batch_size ROWS
                        """,
                        uuids=uuids,
                        batch_size=batch_size,
                    )

    @classmethod
    async def get_by_uuid(cls, driver: GraphDriver, uuid: str): ...

    @classmethod
    async def get_by_uuids(cls, driver: GraphDriver, uuids: list[str]): ...


class EpisodicNode(Node):
    source: EpisodeType = Field(description='source type')
    source_description: str = Field(description='description of the data source')
    content: str = Field(description='raw episode data')
    valid_at: datetime = Field(
        description='datetime of when the original document was created',
    )
    entity_edges: list[str] = Field(
        description='list of entity edges referenced in this episode',
        default_factory=list,
    )

    async def save(self, driver: GraphDriver):
        if driver.graph_operations_interface:
            return await driver.graph_operations_interface.episodic_node_save(self, driver)

        episode_args = {
            'uuid': self.uuid,
            'name': self.name,
            'group_id': self.group_id,
            'source_description': self.source_description,
            'content': self.content,
            'entity_edges': self.entity_edges,
            'created_at': self.created_at,
            'valid_at': self.valid_at,
            'source': self.source.value,
        }

        result = await driver.execute_query(
            get_episode_node_save_query(driver.provider), **episode_args
        )

        logger.debug(f'Saved Node to Graph: {self.uuid}')

        return result

    @classmethod
    async def get_by_uuid(cls, driver: GraphDriver, uuid: str):
        records, _, _ = await driver.execute_query(
            """
            MATCH (e:Episodic {uuid: $uuid})
            RETURN
            """
            + (
                EPISODIC_NODE_RETURN_NEPTUNE
                if driver.provider == GraphProvider.NEPTUNE
                else EPISODIC_NODE_RETURN
            ),
            uuid=uuid,
            routing_='r',
        )

        episodes = [get_episodic_node_from_record(record) for record in records]

        if len(episodes) == 0:
            raise NodeNotFoundError(uuid)

        return episodes[0]

    @classmethod
    async def get_by_uuids(cls, driver: GraphDriver, uuids: list[str]):
        records, _, _ = await driver.execute_query(
            """
            MATCH (e:Episodic)
            WHERE e.uuid IN $uuids
            RETURN DISTINCT
            """
            + (
                EPISODIC_NODE_RETURN_NEPTUNE
                if driver.provider == GraphProvider.NEPTUNE
                else EPISODIC_NODE_RETURN
            ),
            uuids=uuids,
            routing_='r',
        )

        episodes = [get_episodic_node_from_record(record) for record in records]

        return episodes

    @classmethod
    async def get_by_group_ids(
        cls,
        driver: GraphDriver,
        group_ids: list[str],
        limit: int | None = None,
        uuid_cursor: str | None = None,
    ):
        cursor_query: LiteralString = 'AND e.uuid < $uuid' if uuid_cursor else ''
        limit_query: LiteralString = 'LIMIT $limit' if limit is not None else ''

        records, _, _ = await driver.execute_query(
            """
            MATCH (e:Episodic)
            WHERE e.group_id IN $group_ids
            """
            + cursor_query
            + """
            RETURN DISTINCT
            """
            + (
                EPISODIC_NODE_RETURN_NEPTUNE
                if driver.provider == GraphProvider.NEPTUNE
                else EPISODIC_NODE_RETURN
            )
            + """
            ORDER BY uuid DESC
            """
            + limit_query,
            group_ids=group_ids,
            uuid=uuid_cursor,
            limit=limit,
            routing_='r',
        )

        episodes = [get_episodic_node_from_record(record) for record in records]

        return episodes

    @classmethod
    async def get_by_entity_node_uuid(cls, driver: GraphDriver, entity_node_uuid: str):
        records, _, _ = await driver.execute_query(
            """
            MATCH (e:Episodic)-[r:MENTIONS]->(n:Entity {uuid: $entity_node_uuid})
            RETURN DISTINCT
            """
            + (
                EPISODIC_NODE_RETURN_NEPTUNE
                if driver.provider == GraphProvider.NEPTUNE
                else EPISODIC_NODE_RETURN
            ),
            entity_node_uuid=entity_node_uuid,
            routing_='r',
        )

        episodes = [get_episodic_node_from_record(record) for record in records]

        return episodes


class EntityNode(Node):
    name_embedding: list[float] | None = Field(default=None, description='embedding of the name')
    summary: str = Field(description='regional summary of surrounding edges', default_factory=str)
    attributes: dict[str, Any] = Field(
        default={}, description='Additional attributes of the node. Dependent on node labels'
    )

    async def generate_name_embedding(self, embedder: EmbedderClient):
        start = time()
        text = self.name.replace('\n', ' ')
        self.name_embedding = await embedder.create(input_data=[text])
        end = time()
        logger.debug(f'embedded {text} in {end - start} ms')

        return self.name_embedding

    async def load_name_embedding(self, driver: GraphDriver):
        if driver.graph_operations_interface:
            return await driver.graph_operations_interface.node_load_embeddings(self, driver)

        if driver.provider == GraphProvider.NEPTUNE:
            query: LiteralString = """
                MATCH (n:Entity {uuid: $uuid})
                RETURN [x IN split(n.name_embedding, ",") | toFloat(x)] as name_embedding
            """

        else:
            query: LiteralString = """
                MATCH (n:Entity {uuid: $uuid})
                RETURN n.name_embedding AS name_embedding
            """
        records, _, _ = await driver.execute_query(
            query,
            uuid=self.uuid,
            routing_='r',
        )

        if len(records) == 0:
            raise NodeNotFoundError(self.uuid)

        self.name_embedding = records[0]['name_embedding']

    async def save(self, driver: GraphDriver):
        if driver.graph_operations_interface:
            return await driver.graph_operations_interface.node_save(self, driver)

        entity_data: dict[str, Any] = {
            'uuid': self.uuid,
            'name': self.name,
            'name_embedding': self.name_embedding,
            'group_id': self.group_id,
            'summary': self.summary,
            'created_at': self.created_at,
        }

        if driver.provider == GraphProvider.KUZU:
            entity_data['attributes'] = json.dumps(self.attributes)
            entity_data['labels'] = list(set(self.labels + ['Entity']))
            result = await driver.execute_query(
                get_entity_node_save_query(driver.provider, labels=''),
                **entity_data,
            )
        else:
            entity_data.update(self.attributes or {})
            labels = ':'.join(self.labels + ['Entity'])

            result = await driver.execute_query(
                get_entity_node_save_query(driver.provider, labels),
                entity_data=entity_data,
            )

        logger.debug(f'Saved Node to Graph: {self.uuid}')

        return result

    @classmethod
    async def get_by_uuid(cls, driver: GraphDriver, uuid: str):
        records, _, _ = await driver.execute_query(
            """
            MATCH (n:Entity {uuid: $uuid})
            RETURN
            """
            + get_entity_node_return_query(driver.provider),
            uuid=uuid,
            routing_='r',
        )

        nodes = [get_entity_node_from_record(record, driver.provider) for record in records]

        if len(nodes) == 0:
            raise NodeNotFoundError(uuid)

        return nodes[0]

    @classmethod
    async def get_by_uuids(cls, driver: GraphDriver, uuids: list[str]):
        records, _, _ = await driver.execute_query(
            """
            MATCH (n:Entity)
            WHERE n.uuid IN $uuids
            RETURN
            """
            + get_entity_node_return_query(driver.provider),
            uuids=uuids,
            routing_='r',
        )

        nodes = [get_entity_node_from_record(record, driver.provider) for record in records]

        return nodes

    @classmethod
    async def get_by_group_ids(
        cls,
        driver: GraphDriver,
        group_ids: list[str],
        limit: int | None = None,
        uuid_cursor: str | None = None,
        with_embeddings: bool = False,
    ):
        cursor_query: LiteralString = 'AND n.uuid < $uuid' if uuid_cursor else ''
        limit_query: LiteralString = 'LIMIT $limit' if limit is not None else ''
        with_embeddings_query: LiteralString = (
            """,
            n.name_embedding AS name_embedding
            """
            if with_embeddings
            else ''
        )

        records, _, _ = await driver.execute_query(
            """
            MATCH (n:Entity)
            WHERE n.group_id IN $group_ids
            """
            + cursor_query
            + """
            RETURN
            """
            + get_entity_node_return_query(driver.provider)
            + with_embeddings_query
            + """
            ORDER BY n.uuid DESC
            """
            + limit_query,
            group_ids=group_ids,
            uuid=uuid_cursor,
            limit=limit,
            routing_='r',
        )

        nodes = [get_entity_node_from_record(record, driver.provider) for record in records]

        return nodes


class CommunityNode(Node):
    name_embedding: list[float] | None = Field(default=None, description='embedding of the name')
    summary: str = Field(description='region summary of member nodes', default_factory=str)

    async def save(self, driver: GraphDriver):
        if driver.provider == GraphProvider.NEPTUNE:
            await driver.save_to_aoss(  # pyright: ignore reportAttributeAccessIssue
                'communities',
                [{'name': self.name, 'uuid': self.uuid, 'group_id': self.group_id}],
            )
        result = await driver.execute_query(
            get_community_node_save_query(driver.provider),  # type: ignore
            uuid=self.uuid,
            name=self.name,
            group_id=self.group_id,
            summary=self.summary,
            name_embedding=self.name_embedding,
            created_at=self.created_at,
        )

        logger.debug(f'Saved Node to Graph: {self.uuid}')

        return result

    async def generate_name_embedding(self, embedder: EmbedderClient):
        start = time()
        text = self.name.replace('\n', ' ')
        self.name_embedding = await embedder.create(input_data=[text])
        end = time()
        logger.debug(f'embedded {text} in {end - start} ms')

        return self.name_embedding

    async def load_name_embedding(self, driver: GraphDriver):
        if driver.provider == GraphProvider.NEPTUNE:
            query: LiteralString = """
                MATCH (c:Community {uuid: $uuid})
                RETURN [x IN split(c.name_embedding, ",") | toFloat(x)] as name_embedding
            """
        else:
            query: LiteralString = """
            MATCH (c:Community {uuid: $uuid})
            RETURN c.name_embedding AS name_embedding
            """

        records, _, _ = await driver.execute_query(
            query,
            uuid=self.uuid,
            routing_='r',
        )

        if len(records) == 0:
            raise NodeNotFoundError(self.uuid)

        self.name_embedding = records[0]['name_embedding']

    @classmethod
    async def get_by_uuid(cls, driver: GraphDriver, uuid: str):
        records, _, _ = await driver.execute_query(
            """
            MATCH (c:Community {uuid: $uuid})
            RETURN
            """
            + (
                COMMUNITY_NODE_RETURN_NEPTUNE
                if driver.provider == GraphProvider.NEPTUNE
                else COMMUNITY_NODE_RETURN
            ),
            uuid=uuid,
            routing_='r',
        )

        nodes = [get_community_node_from_record(record) for record in records]

        if len(nodes) == 0:
            raise NodeNotFoundError(uuid)

        return nodes[0]

    @classmethod
    async def get_by_uuids(cls, driver: GraphDriver, uuids: list[str]):
        records, _, _ = await driver.execute_query(
            """
            MATCH (c:Community)
            WHERE c.uuid IN $uuids
            RETURN
            """
            + (
                COMMUNITY_NODE_RETURN_NEPTUNE
                if driver.provider == GraphProvider.NEPTUNE
                else COMMUNITY_NODE_RETURN
            ),
            uuids=uuids,
            routing_='r',
        )

        communities = [get_community_node_from_record(record) for record in records]

        return communities

    @classmethod
    async def get_by_group_ids(
        cls,
        driver: GraphDriver,
        group_ids: list[str],
        limit: int | None = None,
        uuid_cursor: str | None = None,
    ):
        cursor_query: LiteralString = 'AND c.uuid < $uuid' if uuid_cursor else ''
        limit_query: LiteralString = 'LIMIT $limit' if limit is not None else ''

        records, _, _ = await driver.execute_query(
            """
            MATCH (c:Community)
            WHERE c.group_id IN $group_ids
            """
            + cursor_query
            + """
            RETURN
            """
            + (
                COMMUNITY_NODE_RETURN_NEPTUNE
                if driver.provider == GraphProvider.NEPTUNE
                else COMMUNITY_NODE_RETURN
            )
            + """
            ORDER BY c.uuid DESC
            """
            + limit_query,
            group_ids=group_ids,
            uuid=uuid_cursor,
            limit=limit,
            routing_='r',
        )

        communities = [get_community_node_from_record(record) for record in records]

        return communities


class SagaNode(Node):
    async def save(self, driver: GraphDriver):
        result = await driver.execute_query(
            get_saga_node_save_query(driver.provider),
            uuid=self.uuid,
            name=self.name,
            group_id=self.group_id,
            created_at=self.created_at,
        )

        logger.debug(f'Saved Node to Graph: {self.uuid}')

        return result

    async def delete(self, driver: GraphDriver):
        await driver.execute_query(
            """
            MATCH (n:Saga {uuid: $uuid})
            DETACH DELETE n
            """,
            uuid=self.uuid,
        )

        logger.debug(f'Deleted Node: {self.uuid}')

    @classmethod
    async def get_by_uuid(cls, driver: GraphDriver, uuid: str):
        records, _, _ = await driver.execute_query(
            """
            MATCH (s:Saga {uuid: $uuid})
            RETURN
            """
            + (
                SAGA_NODE_RETURN_NEPTUNE
                if driver.provider == GraphProvider.NEPTUNE
                else SAGA_NODE_RETURN
            ),
            uuid=uuid,
            routing_='r',
        )

        nodes = [get_saga_node_from_record(record) for record in records]

        if len(nodes) == 0:
            raise NodeNotFoundError(uuid)

        return nodes[0]

    @classmethod
    async def get_by_uuids(cls, driver: GraphDriver, uuids: list[str]):
        records, _, _ = await driver.execute_query(
            """
            MATCH (s:Saga)
            WHERE s.uuid IN $uuids
            RETURN
            """
            + (
                SAGA_NODE_RETURN_NEPTUNE
                if driver.provider == GraphProvider.NEPTUNE
                else SAGA_NODE_RETURN
            ),
            uuids=uuids,
            routing_='r',
        )

        sagas = [get_saga_node_from_record(record) for record in records]

        return sagas

    @classmethod
    async def get_by_group_ids(
        cls,
        driver: GraphDriver,
        group_ids: list[str],
        limit: int | None = None,
        uuid_cursor: str | None = None,
    ):
        cursor_query: LiteralString = 'AND s.uuid < $uuid' if uuid_cursor else ''
        limit_query: LiteralString = 'LIMIT $limit' if limit is not None else ''

        records, _, _ = await driver.execute_query(
            """
            MATCH (s:Saga)
            WHERE s.group_id IN $group_ids
            """
            + cursor_query
            + """
            RETURN
            """
            + (
                SAGA_NODE_RETURN_NEPTUNE
                if driver.provider == GraphProvider.NEPTUNE
                else SAGA_NODE_RETURN
            )
            + """
            ORDER BY s.uuid DESC
            """
            + limit_query,
            group_ids=group_ids,
            uuid=uuid_cursor,
            limit=limit,
            routing_='r',
        )

        sagas = [get_saga_node_from_record(record) for record in records]

        return sagas


# Node helpers
def get_episodic_node_from_record(record: Any) -> EpisodicNode:
    created_at = parse_db_date(record['created_at'])
    valid_at = parse_db_date(record['valid_at'])

    if created_at is None:
        raise ValueError(f'created_at cannot be None for episode {record.get("uuid", "unknown")}')
    if valid_at is None:
        raise ValueError(f'valid_at cannot be None for episode {record.get("uuid", "unknown")}')

    return EpisodicNode(
        content=record['content'],
        created_at=created_at,
        valid_at=valid_at,
        uuid=record['uuid'],
        group_id=record['group_id'],
        source=EpisodeType.from_str(record['source']),
        name=record['name'],
        source_description=record['source_description'],
        entity_edges=record['entity_edges'],
    )


def get_entity_node_from_record(record: Any, provider: GraphProvider) -> EntityNode:
    if provider == GraphProvider.KUZU:
        attributes = json.loads(record['attributes']) if record['attributes'] else {}
    else:
        attributes = record['attributes']
        attributes.pop('uuid', None)
        attributes.pop('name', None)
        attributes.pop('group_id', None)
        attributes.pop('name_embedding', None)
        attributes.pop('summary', None)
        attributes.pop('created_at', None)
        attributes.pop('labels', None)

    labels = record.get('labels', [])
    group_id = record.get('group_id')
    if 'Entity_' + group_id.replace('-', '') in labels:
        labels.remove('Entity_' + group_id.replace('-', ''))

    entity_node = EntityNode(
        uuid=record['uuid'],
        name=record['name'],
        name_embedding=record.get('name_embedding'),
        group_id=group_id,
        labels=labels,
        created_at=parse_db_date(record['created_at']),  # type: ignore
        summary=record['summary'],
        attributes=attributes,
    )

    return entity_node


def get_community_node_from_record(record: Any) -> CommunityNode:
    return CommunityNode(
        uuid=record['uuid'],
        name=record['name'],
        group_id=record['group_id'],
        name_embedding=record['name_embedding'],
        created_at=parse_db_date(record['created_at']),  # type: ignore
        summary=record['summary'],
    )


def get_saga_node_from_record(record: Any) -> SagaNode:
    return SagaNode(
        uuid=record['uuid'],
        name=record['name'],
        group_id=record['group_id'],
        created_at=parse_db_date(record['created_at']),  # type: ignore
    )


async def create_entity_node_embeddings(embedder: EmbedderClient, nodes: list[EntityNode]):
    # filter out falsey values from nodes
    filtered_nodes = [node for node in nodes if node.name]

    if not filtered_nodes:
        return

    name_embeddings = await embedder.create_batch([node.name for node in filtered_nodes])
    for node, name_embedding in zip(filtered_nodes, name_embeddings, strict=True):
        node.name_embedding = name_embedding

```

--------------------------------------------------------------------------------
/mcp_server/src/graphiti_mcp_server.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Graphiti MCP Server - Exposes Graphiti functionality through the Model Context Protocol (MCP)
"""

import argparse
import asyncio
import logging
import os
import sys
from pathlib import Path
from typing import Any, Optional

from dotenv import load_dotenv
from graphiti_core import Graphiti
from graphiti_core.edges import EntityEdge
from graphiti_core.nodes import EpisodeType, EpisodicNode
from graphiti_core.search.search_filters import SearchFilters
from graphiti_core.utils.maintenance.graph_data_operations import clear_data
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel
from starlette.responses import JSONResponse

from config.schema import GraphitiConfig, ServerConfig
from models.response_types import (
    EpisodeSearchResponse,
    ErrorResponse,
    FactSearchResponse,
    NodeResult,
    NodeSearchResponse,
    StatusResponse,
    SuccessResponse,
)
from services.factories import DatabaseDriverFactory, EmbedderFactory, LLMClientFactory
from services.queue_service import QueueService
from utils.formatting import format_fact_result

# Load .env file from mcp_server directory
mcp_server_dir = Path(__file__).parent.parent
env_file = mcp_server_dir / '.env'
if env_file.exists():
    load_dotenv(env_file)
else:
    # Try current working directory as fallback
    load_dotenv()


# Semaphore limit for concurrent Graphiti operations.
#
# This controls how many episodes can be processed simultaneously. Each episode
# processing involves multiple LLM calls (entity extraction, deduplication, etc.),
# so the actual number of concurrent LLM requests will be higher.
#
# TUNING GUIDELINES:
#
# LLM Provider Rate Limits (requests per minute):
# - OpenAI Tier 1 (free):     3 RPM   -> SEMAPHORE_LIMIT=1-2
# - OpenAI Tier 2:            60 RPM   -> SEMAPHORE_LIMIT=5-8
# - OpenAI Tier 3:           500 RPM   -> SEMAPHORE_LIMIT=10-15
# - OpenAI Tier 4:         5,000 RPM   -> SEMAPHORE_LIMIT=20-50
# - Anthropic (default):     50 RPM   -> SEMAPHORE_LIMIT=5-8
# - Anthropic (high tier): 1,000 RPM   -> SEMAPHORE_LIMIT=15-30
# - Azure OpenAI (varies):  Consult your quota -> adjust accordingly
#
# SYMPTOMS:
# - Too high: 429 rate limit errors, increased costs from parallel processing
# - Too low: Slow throughput, underutilized API quota
#
# MONITORING:
# - Watch logs for rate limit errors (429)
# - Monitor episode processing times
# - Check LLM provider dashboard for actual request rates
#
# DEFAULT: 10 (suitable for OpenAI Tier 3, mid-tier Anthropic)
SEMAPHORE_LIMIT = int(os.getenv('SEMAPHORE_LIMIT', 10))


# Configure structured logging with timestamps
LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
DATE_FORMAT = '%Y-%m-%d %H:%M:%S'

logging.basicConfig(
    level=logging.INFO,
    format=LOG_FORMAT,
    datefmt=DATE_FORMAT,
    stream=sys.stderr,
)

# Configure specific loggers
logging.getLogger('uvicorn').setLevel(logging.INFO)
logging.getLogger('uvicorn.access').setLevel(logging.WARNING)  # Reduce access log noise
logging.getLogger('mcp.server.streamable_http_manager').setLevel(
    logging.WARNING
)  # Reduce MCP noise


# Patch uvicorn's logging config to use our format
def configure_uvicorn_logging():
    """Configure uvicorn loggers to match our format after they're created."""
    for logger_name in ['uvicorn', 'uvicorn.error', 'uvicorn.access']:
        uvicorn_logger = logging.getLogger(logger_name)
        # Remove existing handlers and add our own with proper formatting
        uvicorn_logger.handlers.clear()
        handler = logging.StreamHandler(sys.stderr)
        handler.setFormatter(logging.Formatter(LOG_FORMAT, datefmt=DATE_FORMAT))
        uvicorn_logger.addHandler(handler)
        uvicorn_logger.propagate = False


logger = logging.getLogger(__name__)

# Create global config instance - will be properly initialized later
config: GraphitiConfig

# MCP server instructions
GRAPHITI_MCP_INSTRUCTIONS = """
Graphiti is a memory service for AI agents built on a knowledge graph. Graphiti performs well
with dynamic data such as user interactions, changing enterprise data, and external information.

Graphiti transforms information into a richly connected knowledge network, allowing you to 
capture relationships between concepts, entities, and information. The system organizes data as episodes 
(content snippets), nodes (entities), and facts (relationships between entities), creating a dynamic, 
queryable memory store that evolves with new information. Graphiti supports multiple data formats, including 
structured JSON data, enabling seamless integration with existing data pipelines and systems.

Facts contain temporal metadata, allowing you to track the time of creation and whether a fact is invalid 
(superseded by new information).

Key capabilities:
1. Add episodes (text, messages, or JSON) to the knowledge graph with the add_memory tool
2. Search for nodes (entities) in the graph using natural language queries with search_nodes
3. Find relevant facts (relationships between entities) with search_facts
4. Retrieve specific entity edges or episodes by UUID
5. Manage the knowledge graph with tools like delete_episode, delete_entity_edge, and clear_graph

The server connects to a database for persistent storage and uses language models for certain operations. 
Each piece of information is organized by group_id, allowing you to maintain separate knowledge domains.

When adding information, provide descriptive names and detailed content to improve search quality. 
When searching, use specific queries and consider filtering by group_id for more relevant results.

For optimal performance, ensure the database is properly configured and accessible, and valid 
API keys are provided for any language model operations.
"""

# MCP server instance
mcp = FastMCP(
    'Graphiti Agent Memory',
    instructions=GRAPHITI_MCP_INSTRUCTIONS,
)

# Global services
graphiti_service: Optional['GraphitiService'] = None
queue_service: QueueService | None = None

# Global client for backward compatibility
graphiti_client: Graphiti | None = None
semaphore: asyncio.Semaphore


class GraphitiService:
    """Graphiti service using the unified configuration system."""

    def __init__(self, config: GraphitiConfig, semaphore_limit: int = 10):
        self.config = config
        self.semaphore_limit = semaphore_limit
        self.semaphore = asyncio.Semaphore(semaphore_limit)
        self.client: Graphiti | None = None
        self.entity_types = None

    async def initialize(self) -> None:
        """Initialize the Graphiti client with factory-created components."""
        try:
            # Create clients using factories
            llm_client = None
            embedder_client = None

            # Create LLM client based on configured provider
            try:
                llm_client = LLMClientFactory.create(self.config.llm)
            except Exception as e:
                logger.warning(f'Failed to create LLM client: {e}')

            # Create embedder client based on configured provider
            try:
                embedder_client = EmbedderFactory.create(self.config.embedder)
            except Exception as e:
                logger.warning(f'Failed to create embedder client: {e}')

            # Get database configuration
            db_config = DatabaseDriverFactory.create_config(self.config.database)

            # Build entity types from configuration
            custom_types = None
            if self.config.graphiti.entity_types:
                custom_types = {}
                for entity_type in self.config.graphiti.entity_types:
                    # Create a dynamic Pydantic model for each entity type
                    # Note: Don't use 'name' as it's a protected Pydantic attribute
                    entity_model = type(
                        entity_type.name,
                        (BaseModel,),
                        {
                            '__doc__': entity_type.description,
                        },
                    )
                    custom_types[entity_type.name] = entity_model

            # Store entity types for later use
            self.entity_types = custom_types

            # Initialize Graphiti client with appropriate driver
            try:
                if self.config.database.provider.lower() == 'falkordb':
                    # For FalkorDB, create a FalkorDriver instance directly
                    from graphiti_core.driver.falkordb_driver import FalkorDriver

                    falkor_driver = FalkorDriver(
                        host=db_config['host'],
                        port=db_config['port'],
                        password=db_config['password'],
                        database=db_config['database'],
                    )

                    self.client = Graphiti(
                        graph_driver=falkor_driver,
                        llm_client=llm_client,
                        embedder=embedder_client,
                        max_coroutines=self.semaphore_limit,
                    )
                else:
                    # For Neo4j (default), use the original approach
                    self.client = Graphiti(
                        uri=db_config['uri'],
                        user=db_config['user'],
                        password=db_config['password'],
                        llm_client=llm_client,
                        embedder=embedder_client,
                        max_coroutines=self.semaphore_limit,
                    )
            except Exception as db_error:
                # Check for connection errors
                error_msg = str(db_error).lower()
                if 'connection refused' in error_msg or 'could not connect' in error_msg:
                    db_provider = self.config.database.provider
                    if db_provider.lower() == 'falkordb':
                        raise RuntimeError(
                            f'\n{"=" * 70}\n'
                            f'Database Connection Error: FalkorDB is not running\n'
                            f'{"=" * 70}\n\n'
                            f'FalkorDB at {db_config["host"]}:{db_config["port"]} is not accessible.\n\n'
                            f'To start FalkorDB:\n'
                            f'  - Using Docker Compose: cd mcp_server && docker compose up\n'
                            f'  - Or run FalkorDB manually: docker run -p 6379:6379 falkordb/falkordb\n\n'
                            f'{"=" * 70}\n'
                        ) from db_error
                    elif db_provider.lower() == 'neo4j':
                        raise RuntimeError(
                            f'\n{"=" * 70}\n'
                            f'Database Connection Error: Neo4j is not running\n'
                            f'{"=" * 70}\n\n'
                            f'Neo4j at {db_config.get("uri", "unknown")} is not accessible.\n\n'
                            f'To start Neo4j:\n'
                            f'  - Using Docker Compose: cd mcp_server && docker compose -f docker/docker-compose-neo4j.yml up\n'
                            f'  - Or install Neo4j Desktop from: https://neo4j.com/download/\n'
                            f'  - Or run Neo4j manually: docker run -p 7474:7474 -p 7687:7687 neo4j:latest\n\n'
                            f'{"=" * 70}\n'
                        ) from db_error
                    else:
                        raise RuntimeError(
                            f'\n{"=" * 70}\n'
                            f'Database Connection Error: {db_provider} is not running\n'
                            f'{"=" * 70}\n\n'
                            f'{db_provider} at {db_config.get("uri", "unknown")} is not accessible.\n\n'
                            f'Please ensure {db_provider} is running and accessible.\n\n'
                            f'{"=" * 70}\n'
                        ) from db_error
                # Re-raise other errors
                raise

            # Build indices
            await self.client.build_indices_and_constraints()

            logger.info('Successfully initialized Graphiti client')

            # Log configuration details
            if llm_client:
                logger.info(
                    f'Using LLM provider: {self.config.llm.provider} / {self.config.llm.model}'
                )
            else:
                logger.info('No LLM client configured - entity extraction will be limited')

            if embedder_client:
                logger.info(f'Using Embedder provider: {self.config.embedder.provider}')
            else:
                logger.info('No Embedder client configured - search will be limited')

            if self.entity_types:
                entity_type_names = list(self.entity_types.keys())
                logger.info(f'Using custom entity types: {", ".join(entity_type_names)}')
            else:
                logger.info('Using default entity types')

            logger.info(f'Using database: {self.config.database.provider}')
            logger.info(f'Using group_id: {self.config.graphiti.group_id}')

        except Exception as e:
            logger.error(f'Failed to initialize Graphiti client: {e}')
            raise

    async def get_client(self) -> Graphiti:
        """Get the Graphiti client, initializing if necessary."""
        if self.client is None:
            await self.initialize()
        if self.client is None:
            raise RuntimeError('Failed to initialize Graphiti client')
        return self.client


@mcp.tool()
async def add_memory(
    name: str,
    episode_body: str,
    group_id: str | None = None,
    source: str = 'text',
    source_description: str = '',
    uuid: str | None = None,
) -> SuccessResponse | ErrorResponse:
    """Add an episode to memory. This is the primary way to add information to the graph.

    This function returns immediately and processes the episode addition in the background.
    Episodes for the same group_id are processed sequentially to avoid race conditions.

    Args:
        name (str): Name of the episode
        episode_body (str): The content of the episode to persist to memory. When source='json', this must be a
                           properly escaped JSON string, not a raw Python dictionary. The JSON data will be
                           automatically processed to extract entities and relationships.
        group_id (str, optional): A unique ID for this graph. If not provided, uses the default group_id from CLI
                                 or a generated one.
        source (str, optional): Source type, must be one of:
                               - 'text': For plain text content (default)
                               - 'json': For structured data
                               - 'message': For conversation-style content
        source_description (str, optional): Description of the source
        uuid (str, optional): Optional UUID for the episode

    Examples:
        # Adding plain text content
        add_memory(
            name="Company News",
            episode_body="Acme Corp announced a new product line today.",
            source="text",
            source_description="news article",
            group_id="some_arbitrary_string"
        )

        # Adding structured JSON data
        # NOTE: episode_body should be a JSON string (standard JSON escaping)
        add_memory(
            name="Customer Profile",
            episode_body='{"company": {"name": "Acme Technologies"}, "products": [{"id": "P001", "name": "CloudSync"}, {"id": "P002", "name": "DataMiner"}]}',
            source="json",
            source_description="CRM data"
        )
    """
    global graphiti_service, queue_service

    if graphiti_service is None or queue_service is None:
        return ErrorResponse(error='Services not initialized')

    try:
        # Use the provided group_id or fall back to the default from config
        effective_group_id = group_id or config.graphiti.group_id

        # Try to parse the source as an EpisodeType enum, with fallback to text
        episode_type = EpisodeType.text  # Default
        if source:
            try:
                episode_type = EpisodeType[source.lower()]
            except (KeyError, AttributeError):
                # If the source doesn't match any enum value, use text as default
                logger.warning(f"Unknown source type '{source}', using 'text' as default")
                episode_type = EpisodeType.text

        # Submit to queue service for async processing
        await queue_service.add_episode(
            group_id=effective_group_id,
            name=name,
            content=episode_body,
            source_description=source_description,
            episode_type=episode_type,
            entity_types=graphiti_service.entity_types,
            uuid=uuid or None,  # Ensure None is passed if uuid is None
        )

        return SuccessResponse(
            message=f"Episode '{name}' queued for processing in group '{effective_group_id}'"
        )
    except Exception as e:
        error_msg = str(e)
        logger.error(f'Error queuing episode: {error_msg}')
        return ErrorResponse(error=f'Error queuing episode: {error_msg}')


@mcp.tool()
async def search_nodes(
    query: str,
    group_ids: list[str] | None = None,
    max_nodes: int = 10,
    entity_types: list[str] | None = None,
) -> NodeSearchResponse | ErrorResponse:
    """Search for nodes in the graph memory.

    Args:
        query: The search query
        group_ids: Optional list of group IDs to filter results
        max_nodes: Maximum number of nodes to return (default: 10)
        entity_types: Optional list of entity type names to filter by
    """
    global graphiti_service

    if graphiti_service is None:
        return ErrorResponse(error='Graphiti service not initialized')

    try:
        client = await graphiti_service.get_client()

        # Use the provided group_ids or fall back to the default from config if none provided
        effective_group_ids = (
            group_ids
            if group_ids is not None
            else [config.graphiti.group_id]
            if config.graphiti.group_id
            else []
        )

        # Create search filters
        search_filters = SearchFilters(
            node_labels=entity_types,
        )

        # Use the search_ method with node search config
        from graphiti_core.search.search_config_recipes import NODE_HYBRID_SEARCH_RRF

        results = await client.search_(
            query=query,
            config=NODE_HYBRID_SEARCH_RRF,
            group_ids=effective_group_ids,
            search_filter=search_filters,
        )

        # Extract nodes from results
        nodes = results.nodes[:max_nodes] if results.nodes else []

        if not nodes:
            return NodeSearchResponse(message='No relevant nodes found', nodes=[])

        # Format the results
        node_results = []
        for node in nodes:
            # Get attributes and ensure no embeddings are included
            attrs = node.attributes if hasattr(node, 'attributes') else {}
            # Remove any embedding keys that might be in attributes
            attrs = {k: v for k, v in attrs.items() if 'embedding' not in k.lower()}

            node_results.append(
                NodeResult(
                    uuid=node.uuid,
                    name=node.name,
                    labels=node.labels if node.labels else [],
                    created_at=node.created_at.isoformat() if node.created_at else None,
                    summary=node.summary,
                    group_id=node.group_id,
                    attributes=attrs,
                )
            )

        return NodeSearchResponse(message='Nodes retrieved successfully', nodes=node_results)
    except Exception as e:
        error_msg = str(e)
        logger.error(f'Error searching nodes: {error_msg}')
        return ErrorResponse(error=f'Error searching nodes: {error_msg}')


@mcp.tool()
async def search_memory_facts(
    query: str,
    group_ids: list[str] | None = None,
    max_facts: int = 10,
    center_node_uuid: str | None = None,
) -> FactSearchResponse | ErrorResponse:
    """Search the graph memory for relevant facts.

    Args:
        query: The search query
        group_ids: Optional list of group IDs to filter results
        max_facts: Maximum number of facts to return (default: 10)
        center_node_uuid: Optional UUID of a node to center the search around
    """
    global graphiti_service

    if graphiti_service is None:
        return ErrorResponse(error='Graphiti service not initialized')

    try:
        # Validate max_facts parameter
        if max_facts <= 0:
            return ErrorResponse(error='max_facts must be a positive integer')

        client = await graphiti_service.get_client()

        # Use the provided group_ids or fall back to the default from config if none provided
        effective_group_ids = (
            group_ids
            if group_ids is not None
            else [config.graphiti.group_id]
            if config.graphiti.group_id
            else []
        )

        relevant_edges = await client.search(
            group_ids=effective_group_ids,
            query=query,
            num_results=max_facts,
            center_node_uuid=center_node_uuid,
        )

        if not relevant_edges:
            return FactSearchResponse(message='No relevant facts found', facts=[])

        facts = [format_fact_result(edge) for edge in relevant_edges]
        return FactSearchResponse(message='Facts retrieved successfully', facts=facts)
    except Exception as e:
        error_msg = str(e)
        logger.error(f'Error searching facts: {error_msg}')
        return ErrorResponse(error=f'Error searching facts: {error_msg}')


@mcp.tool()
async def delete_entity_edge(uuid: str) -> SuccessResponse | ErrorResponse:
    """Delete an entity edge from the graph memory.

    Args:
        uuid: UUID of the entity edge to delete
    """
    global graphiti_service

    if graphiti_service is None:
        return ErrorResponse(error='Graphiti service not initialized')

    try:
        client = await graphiti_service.get_client()

        # Get the entity edge by UUID
        entity_edge = await EntityEdge.get_by_uuid(client.driver, uuid)
        # Delete the edge using its delete method
        await entity_edge.delete(client.driver)
        return SuccessResponse(message=f'Entity edge with UUID {uuid} deleted successfully')
    except Exception as e:
        error_msg = str(e)
        logger.error(f'Error deleting entity edge: {error_msg}')
        return ErrorResponse(error=f'Error deleting entity edge: {error_msg}')


@mcp.tool()
async def delete_episode(uuid: str) -> SuccessResponse | ErrorResponse:
    """Delete an episode from the graph memory.

    Args:
        uuid: UUID of the episode to delete
    """
    global graphiti_service

    if graphiti_service is None:
        return ErrorResponse(error='Graphiti service not initialized')

    try:
        client = await graphiti_service.get_client()

        # Get the episodic node by UUID
        episodic_node = await EpisodicNode.get_by_uuid(client.driver, uuid)
        # Delete the node using its delete method
        await episodic_node.delete(client.driver)
        return SuccessResponse(message=f'Episode with UUID {uuid} deleted successfully')
    except Exception as e:
        error_msg = str(e)
        logger.error(f'Error deleting episode: {error_msg}')
        return ErrorResponse(error=f'Error deleting episode: {error_msg}')


@mcp.tool()
async def get_entity_edge(uuid: str) -> dict[str, Any] | ErrorResponse:
    """Get an entity edge from the graph memory by its UUID.

    Args:
        uuid: UUID of the entity edge to retrieve
    """
    global graphiti_service

    if graphiti_service is None:
        return ErrorResponse(error='Graphiti service not initialized')

    try:
        client = await graphiti_service.get_client()

        # Get the entity edge directly using the EntityEdge class method
        entity_edge = await EntityEdge.get_by_uuid(client.driver, uuid)

        # Use the format_fact_result function to serialize the edge
        # Return the Python dict directly - MCP will handle serialization
        return format_fact_result(entity_edge)
    except Exception as e:
        error_msg = str(e)
        logger.error(f'Error getting entity edge: {error_msg}')
        return ErrorResponse(error=f'Error getting entity edge: {error_msg}')


@mcp.tool()
async def get_episodes(
    group_ids: list[str] | None = None,
    max_episodes: int = 10,
) -> EpisodeSearchResponse | ErrorResponse:
    """Get episodes from the graph memory.

    Args:
        group_ids: Optional list of group IDs to filter results
        max_episodes: Maximum number of episodes to return (default: 10)
    """
    global graphiti_service

    if graphiti_service is None:
        return ErrorResponse(error='Graphiti service not initialized')

    try:
        client = await graphiti_service.get_client()

        # Use the provided group_ids or fall back to the default from config if none provided
        effective_group_ids = (
            group_ids
            if group_ids is not None
            else [config.graphiti.group_id]
            if config.graphiti.group_id
            else []
        )

        # Get episodes from the driver directly
        from graphiti_core.nodes import EpisodicNode

        if effective_group_ids:
            episodes = await EpisodicNode.get_by_group_ids(
                client.driver, effective_group_ids, limit=max_episodes
            )
        else:
            # If no group IDs, we need to use a different approach
            # For now, return empty list when no group IDs specified
            episodes = []

        if not episodes:
            return EpisodeSearchResponse(message='No episodes found', episodes=[])

        # Format the results
        episode_results = []
        for episode in episodes:
            episode_dict = {
                'uuid': episode.uuid,
                'name': episode.name,
                'content': episode.content,
                'created_at': episode.created_at.isoformat() if episode.created_at else None,
                'source': episode.source.value
                if hasattr(episode.source, 'value')
                else str(episode.source),
                'source_description': episode.source_description,
                'group_id': episode.group_id,
            }
            episode_results.append(episode_dict)

        return EpisodeSearchResponse(
            message='Episodes retrieved successfully', episodes=episode_results
        )
    except Exception as e:
        error_msg = str(e)
        logger.error(f'Error getting episodes: {error_msg}')
        return ErrorResponse(error=f'Error getting episodes: {error_msg}')


@mcp.tool()
async def clear_graph(group_ids: list[str] | None = None) -> SuccessResponse | ErrorResponse:
    """Clear all data from the graph for specified group IDs.

    Args:
        group_ids: Optional list of group IDs to clear. If not provided, clears the default group.
    """
    global graphiti_service

    if graphiti_service is None:
        return ErrorResponse(error='Graphiti service not initialized')

    try:
        client = await graphiti_service.get_client()

        # Use the provided group_ids or fall back to the default from config if none provided
        effective_group_ids = (
            group_ids or [config.graphiti.group_id] if config.graphiti.group_id else []
        )

        if not effective_group_ids:
            return ErrorResponse(error='No group IDs specified for clearing')

        # Clear data for the specified group IDs
        await clear_data(client.driver, group_ids=effective_group_ids)

        return SuccessResponse(
            message=f'Graph data cleared successfully for group IDs: {", ".join(effective_group_ids)}'
        )
    except Exception as e:
        error_msg = str(e)
        logger.error(f'Error clearing graph: {error_msg}')
        return ErrorResponse(error=f'Error clearing graph: {error_msg}')


@mcp.tool()
async def get_status() -> StatusResponse:
    """Get the status of the Graphiti MCP server and database connection."""
    global graphiti_service

    if graphiti_service is None:
        return StatusResponse(status='error', message='Graphiti service not initialized')

    try:
        client = await graphiti_service.get_client()

        # Test database connection with a simple query
        async with client.driver.session() as session:
            result = await session.run('MATCH (n) RETURN count(n) as count')
            # Consume the result to verify query execution
            if result:
                _ = [record async for record in result]

        # Use the provider from the service's config, not the global
        provider_name = graphiti_service.config.database.provider
        return StatusResponse(
            status='ok',
            message=f'Graphiti MCP server is running and connected to {provider_name} database',
        )
    except Exception as e:
        error_msg = str(e)
        logger.error(f'Error checking database connection: {error_msg}')
        return StatusResponse(
            status='error',
            message=f'Graphiti MCP server is running but database connection failed: {error_msg}',
        )


@mcp.custom_route('/health', methods=['GET'])
async def health_check(request) -> JSONResponse:
    """Health check endpoint for Docker and load balancers."""
    return JSONResponse({'status': 'healthy', 'service': 'graphiti-mcp'})


async def initialize_server() -> ServerConfig:
    """Parse CLI arguments and initialize the Graphiti server configuration."""
    global config, graphiti_service, queue_service, graphiti_client, semaphore

    parser = argparse.ArgumentParser(
        description='Run the Graphiti MCP server with YAML configuration support'
    )

    # Configuration file argument
    # Default to config/config.yaml relative to the mcp_server directory
    default_config = Path(__file__).parent.parent / 'config' / 'config.yaml'
    parser.add_argument(
        '--config',
        type=Path,
        default=default_config,
        help='Path to YAML configuration file (default: config/config.yaml)',
    )

    # Transport arguments
    parser.add_argument(
        '--transport',
        choices=['sse', 'stdio', 'http'],
        help='Transport to use: http (recommended, default), stdio (standard I/O), or sse (deprecated)',
    )
    parser.add_argument(
        '--host',
        help='Host to bind the MCP server to',
    )
    parser.add_argument(
        '--port',
        type=int,
        help='Port to bind the MCP server to',
    )

    # Provider selection arguments
    parser.add_argument(
        '--llm-provider',
        choices=['openai', 'azure_openai', 'anthropic', 'gemini', 'groq'],
        help='LLM provider to use',
    )
    parser.add_argument(
        '--embedder-provider',
        choices=['openai', 'azure_openai', 'gemini', 'voyage'],
        help='Embedder provider to use',
    )
    parser.add_argument(
        '--database-provider',
        choices=['neo4j', 'falkordb'],
        help='Database provider to use',
    )

    # LLM configuration arguments
    parser.add_argument('--model', help='Model name to use with the LLM client')
    parser.add_argument('--small-model', help='Small model name to use with the LLM client')
    parser.add_argument(
        '--temperature', type=float, help='Temperature setting for the LLM (0.0-2.0)'
    )

    # Embedder configuration arguments
    parser.add_argument('--embedder-model', help='Model name to use with the embedder')

    # Graphiti-specific arguments
    parser.add_argument(
        '--group-id',
        help='Namespace for the graph. If not provided, uses config file or generates random UUID.',
    )
    parser.add_argument(
        '--user-id',
        help='User ID for tracking operations',
    )
    parser.add_argument(
        '--destroy-graph',
        action='store_true',
        help='Destroy all Graphiti graphs on startup',
    )

    args = parser.parse_args()

    # Set config path in environment for the settings to pick up
    if args.config:
        os.environ['CONFIG_PATH'] = str(args.config)

    # Load configuration with environment variables and YAML
    config = GraphitiConfig()

    # Apply CLI overrides
    config.apply_cli_overrides(args)

    # Also apply legacy CLI args for backward compatibility
    if hasattr(args, 'destroy_graph'):
        config.destroy_graph = args.destroy_graph

    # Log configuration details
    logger.info('Using configuration:')
    logger.info(f'  - LLM: {config.llm.provider} / {config.llm.model}')
    logger.info(f'  - Embedder: {config.embedder.provider} / {config.embedder.model}')
    logger.info(f'  - Database: {config.database.provider}')
    logger.info(f'  - Group ID: {config.graphiti.group_id}')
    logger.info(f'  - Transport: {config.server.transport}')

    # Log graphiti-core version
    try:
        import graphiti_core

        graphiti_version = getattr(graphiti_core, '__version__', 'unknown')
        logger.info(f'  - Graphiti Core: {graphiti_version}')
    except Exception:
        # Check for Docker-stored version file
        version_file = Path('/app/.graphiti-core-version')
        if version_file.exists():
            graphiti_version = version_file.read_text().strip()
            logger.info(f'  - Graphiti Core: {graphiti_version}')
        else:
            logger.info('  - Graphiti Core: version unavailable')

    # Handle graph destruction if requested
    if hasattr(config, 'destroy_graph') and config.destroy_graph:
        logger.warning('Destroying all Graphiti graphs as requested...')
        temp_service = GraphitiService(config, SEMAPHORE_LIMIT)
        await temp_service.initialize()
        client = await temp_service.get_client()
        await clear_data(client.driver)
        logger.info('All graphs destroyed')

    # Initialize services
    graphiti_service = GraphitiService(config, SEMAPHORE_LIMIT)
    queue_service = QueueService()
    await graphiti_service.initialize()

    # Set global client for backward compatibility
    graphiti_client = await graphiti_service.get_client()
    semaphore = graphiti_service.semaphore

    # Initialize queue service with the client
    await queue_service.initialize(graphiti_client)

    # Set MCP server settings
    if config.server.host:
        mcp.settings.host = config.server.host
    if config.server.port:
        mcp.settings.port = config.server.port

    # Return MCP configuration for transport
    return config.server


async def run_mcp_server():
    """Run the MCP server in the current event loop."""
    # Initialize the server
    mcp_config = await initialize_server()

    # Run the server with configured transport
    logger.info(f'Starting MCP server with transport: {mcp_config.transport}')
    if mcp_config.transport == 'stdio':
        await mcp.run_stdio_async()
    elif mcp_config.transport == 'sse':
        logger.info(
            f'Running MCP server with SSE transport on {mcp.settings.host}:{mcp.settings.port}'
        )
        logger.info(f'Access the server at: http://{mcp.settings.host}:{mcp.settings.port}/sse')
        await mcp.run_sse_async()
    elif mcp_config.transport == 'http':
        # Use localhost for display if binding to 0.0.0.0
        display_host = 'localhost' if mcp.settings.host == '0.0.0.0' else mcp.settings.host
        logger.info(
            f'Running MCP server with streamable HTTP transport on {mcp.settings.host}:{mcp.settings.port}'
        )
        logger.info('=' * 60)
        logger.info('MCP Server Access Information:')
        logger.info(f'  Base URL: http://{display_host}:{mcp.settings.port}/')
        logger.info(f'  MCP Endpoint: http://{display_host}:{mcp.settings.port}/mcp/')
        logger.info('  Transport: HTTP (streamable)')

        # Show FalkorDB Browser UI access if enabled
        if os.environ.get('BROWSER', '1') == '1':
            logger.info(f'  FalkorDB Browser UI: http://{display_host}:3000/')

        logger.info('=' * 60)
        logger.info('For MCP clients, connect to the /mcp/ endpoint above')

        # Configure uvicorn logging to match our format
        configure_uvicorn_logging()

        await mcp.run_streamable_http_async()
    else:
        raise ValueError(
            f'Unsupported transport: {mcp_config.transport}. Use "sse", "stdio", or "http"'
        )


def main():
    """Main function to run the Graphiti MCP server."""
    try:
        # Run everything in a single event loop
        asyncio.run(run_mcp_server())
    except KeyboardInterrupt:
        logger.info('Server shutting down...')
    except Exception as e:
        logger.error(f'Error initializing Graphiti MCP server: {str(e)}')
        raise


if __name__ == '__main__':
    main()

```

--------------------------------------------------------------------------------
/examples/data/manybirds_products.json:
--------------------------------------------------------------------------------

```json
{
  "products": [
    {
      "id": 6785367965776,
      "title": "TinyBirds Wool Runners - Little Kids - Natural Black (Blizzard Sole)",
      "handle": "TinyBirds-wool-runners-little-kids",
      "body_html": "TinyBirds are eco-friendly and machine washable sneakers for kids. Super soft and cozy and made with comfortable, itch-free ZQ Merino Wool, they're the perfect pair for kids of all ages.",
      "published_at": "2024-08-21T10:07:25-07:00",
      "created_at": "2023-01-03T16:00:31-08:00",
      "updated_at": "2024-08-24T17:56:38-07:00",
      "vendor": "Manybirds",
      "product_type": "Shoes",
      "tags": [
        "Manybirds::carbon-score = 3.06",
        "Manybirds::cfId = color-TinyBirds-wool-runners-natural-black-blizzard-ne",
        "Manybirds::complete = true",
        "Manybirds::edition = classic",
        "Manybirds::gender = toddler",
        "Manybirds::hue = black",
        "Manybirds::master = TinyBirds-wool-runners-little-kids",
        "Manybirds::material = wool",
        "Manybirds::price-tier = tier-1",
        "Manybirds::silhouette = runner",
        "loop::returnable = true",
        "shoprunner",
        "YCRF_unisex-smallbird-shoes",
        "YGroup_ygroup_TinyBirds-wool-runners-little-kids"
      ],
      "variants": [
        {
          "id": 40015831531600,
          "title": "5T",
          "option1": "5T",
          "option2": null,
          "option3": null,
          "sku": "AB00DFT050",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": false,
          "price": "25.00",
          "grams": 290,
          "compare_at_price": "60.00",
          "position": 1,
          "product_id": 6785367965776,
          "created_at": "2023-01-03T16:00:32-08:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40015831564368,
          "title": "6T",
          "option1": "6T",
          "option2": null,
          "option3": null,
          "sku": "AB00DFT060",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": false,
          "price": "25.00",
          "grams": 310,
          "compare_at_price": "60.00",
          "position": 2,
          "product_id": 6785367965776,
          "created_at": "2023-01-03T16:00:32-08:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40015831597136,
          "title": "7T",
          "option1": "7T",
          "option2": null,
          "option3": null,
          "sku": "AB00DFT070",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": false,
          "price": "25.00",
          "grams": 320,
          "compare_at_price": "60.00",
          "position": 3,
          "product_id": 6785367965776,
          "created_at": "2023-01-03T16:00:32-08:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40015831629904,
          "title": "8T",
          "option1": "8T",
          "option2": null,
          "option3": null,
          "sku": "AB00DFT080",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": false,
          "price": "25.00",
          "grams": 340,
          "compare_at_price": "60.00",
          "position": 4,
          "product_id": 6785367965776,
          "created_at": "2023-01-03T16:00:32-08:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40015831662672,
          "title": "9T",
          "option1": "9T",
          "option2": null,
          "option3": null,
          "sku": "AB00DFT090",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": false,
          "price": "25.00",
          "grams": 350,
          "compare_at_price": "60.00",
          "position": 5,
          "product_id": 6785367965776,
          "created_at": "2023-01-03T16:00:32-08:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40015831695440,
          "title": "10T",
          "option1": "10T",
          "option2": null,
          "option3": null,
          "sku": "AB00DFT100",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": false,
          "price": "25.00",
          "grams": 360,
          "compare_at_price": "60.00",
          "position": 6,
          "product_id": 6785367965776,
          "created_at": "2023-01-03T16:00:32-08:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        }
      ],
      "images": [
        {
          "id": 30703127068752,
          "created_at": "2023-01-03T16:00:32-08:00",
          "position": 1,
          "updated_at": "2023-01-03T16:00:32-08:00",
          "product_id": 6785367965776,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/products\/AB008ET_Shoe_Angle_Global_Little_Kids_Wool_Runner_Natural_Black_Blizzard_d532e5f4-50f5-49af-964a-52906e1fd3d1.png?v=1672790432",
          "width": 1600,
          "height": 1600
        },
        {
          "id": 30703127101520,
          "created_at": "2023-01-03T16:00:32-08:00",
          "position": 2,
          "updated_at": "2023-01-03T16:00:32-08:00",
          "product_id": 6785367965776,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/products\/WR-PDP-Little_Kids_e389b4fb-5f67-4232-919b-5f18e95eb301.jpg?v=1672790432",
          "width": 1600,
          "height": 1600
        },
        {
          "id": 30703127134288,
          "created_at": "2023-01-03T16:00:32-08:00",
          "position": 3,
          "updated_at": "2023-01-03T16:00:32-08:00",
          "product_id": 6785367965776,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/products\/AB008ET_Shoe_Left_Global_Little_Kids_Wool_Runner_Natural_Black_Blizzard_76c2d640-e476-4fa5-985d-ddb48a20b6fb.png?v=1672790432",
          "width": 1110,
          "height": 1110
        },
        {
          "id": 30703127167056,
          "created_at": "2023-01-03T16:00:32-08:00",
          "position": 4,
          "updated_at": "2023-01-03T16:00:32-08:00",
          "product_id": 6785367965776,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/products\/AB008ET_Shoe_Back_Global_Little_Kids_Wool_Runner_Natural_Black_Blizzard_744e7e0f-10e7-4712-83d9-3a907f7ed1d9.png?v=1672790432",
          "width": 1600,
          "height": 1600
        },
        {
          "id": 30703127199824,
          "created_at": "2023-01-03T16:00:32-08:00",
          "position": 5,
          "updated_at": "2023-01-03T16:00:32-08:00",
          "product_id": 6785367965776,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/products\/AB008ET_Shoe_Top_Global_Little_Kids_Wool_Runner_Natural_Black_Blizzard_9075685f-39f3-454b-a19f-1c15f1c0ee5c.png?v=1672790432",
          "width": 1600,
          "height": 1600
        },
        {
          "id": 30703127232592,
          "created_at": "2023-01-03T16:00:32-08:00",
          "position": 6,
          "updated_at": "2023-01-03T16:00:32-08:00",
          "product_id": 6785367965776,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/products\/AB008ET_Shoe_Bottom_Global_Little_Kids_Wool_Runner_Natural_Black_Blizzard_ebe5612a-44e3-4e53-864c-a02899ad2ce6.png?v=1672790432",
          "width": 1600,
          "height": 1600
        }
      ],
      "options": [
        {
          "name": "Size",
          "position": 1,
          "values": [
            "5T",
            "6T",
            "7T",
            "8T",
            "9T",
            "10T"
          ]
        }
      ]
    },
    {
      "id": 6889961750608,
      "title": "Anytime No Show Sock - Rugged Beige",
      "handle": "anytime-no-show-sock-rugged-beige",
      "body_html": "Soft, breathable, and super durable, these lightweight socks are designed to stay put so no one will even know they\u2019re there\u2014unless you blow their cover.",
      "published_at": "2024-08-21T08:50:07-07:00",
      "created_at": "2023-10-30T20:22:43-07:00",
      "updated_at": "2024-08-24T17:56:38-07:00",
      "vendor": "Manybirds",
      "product_type": "Socks",
      "tags": [
        "Manybirds::carbon-score = 0.71",
        "Manybirds::cfId = color-anytime-no-show-sock-rugged-beige",
        "Manybirds::complete = true",
        "Manybirds::edition = limited",
        "Manybirds::gender = unisex",
        "Manybirds::hue = beige",
        "Manybirds::master = anytime-no-show-sock",
        "Manybirds::material = cotton",
        "Manybirds::price-tier = msrp",
        "Manybirds::silhouette = hider",
        "loop::returnable = true",
        "shoprunner",
        "YCRF_socks",
        "YGroup_ygroup_anytime-no-show-sock"
      ],
      "variants": [
        {
          "id": 40356479500368,
          "title": "S (W5-7)",
          "option1": "S (W5-7)",
          "option2": null,
          "option3": null,
          "sku": "A10849U001",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "14.00",
          "grams": 59,
          "compare_at_price": null,
          "position": 1,
          "product_id": 6889961750608,
          "created_at": "2023-10-30T20:22:43-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40356479533136,
          "title": "M (W8-10 \/ M8)",
          "option1": "M (W8-10 \/ M8)",
          "option2": null,
          "option3": null,
          "sku": "A10849U002",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "14.00",
          "grams": 56,
          "compare_at_price": null,
          "position": 2,
          "product_id": 6889961750608,
          "created_at": "2023-10-30T20:22:43-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40356479565904,
          "title": "L (W11 M9-12)",
          "option1": "L (W11 M9-12)",
          "option2": null,
          "option3": null,
          "sku": "A10849U003",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "14.00",
          "grams": 52,
          "compare_at_price": null,
          "position": 3,
          "product_id": 6889961750608,
          "created_at": "2023-10-30T20:22:43-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40356479598672,
          "title": "XL (M13-14)",
          "option1": "XL (M13-14)",
          "option2": null,
          "option3": null,
          "sku": "A10849U004",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "14.00",
          "grams": 50,
          "compare_at_price": null,
          "position": 4,
          "product_id": 6889961750608,
          "created_at": "2023-10-30T20:22:43-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        }
      ],
      "images": [
        {
          "id": 31822180155472,
          "created_at": "2024-04-05T14:20:41-07:00",
          "position": 1,
          "updated_at": "2024-04-05T14:20:41-07:00",
          "product_id": 6889961750608,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10849_S24Q1_Anytime_No_Show_Sock_Rugged_Beige_A-1400x1400.png?v=1712352041",
          "width": 1400,
          "height": 1400
        },
        {
          "id": 31822180188240,
          "created_at": "2024-04-05T14:20:41-07:00",
          "position": 2,
          "updated_at": "2024-04-05T14:20:41-07:00",
          "product_id": 6889961750608,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10849_S24Q1_Anytime_No_Show_Sock_Rugged_Beige_B-1400x1400.png?v=1712352041",
          "width": 1400,
          "height": 1400
        }
      ],
      "options": [
        {
          "name": "Size",
          "position": 1,
          "values": [
            "S (W5-7)",
            "M (W8-10 \/ M8)",
            "L (W11 M9-12)",
            "XL (M13-14)"
          ]
        }
      ]
    },
    {
      "id": 6919095189584,
      "title": "Men's Couriers - Natural Black\/Basin Blue (Blizzard Sole)",
      "handle": "mens-couriers-natural-black-basin-blue",
      "body_html": "Our nod to a vintage sneaker made with natural materials for a better future. The retro silhouette elevated with intricate details pairs with anything you have planned. Come for the throwback style, and stay for the cushy all-day-wearability.",
      "published_at": "2024-08-19T17:08:34-07:00",
      "created_at": "2024-01-10T21:53:11-08:00",
      "updated_at": "2024-08-24T17:56:38-07:00",
      "vendor": "Manybirds",
      "product_type": "Shoes",
      "tags": [
        "Manybirds::carbon-score = 5.51",
        "Manybirds::cfId = color-mens-couriers-ntl-blk-multi-blzz",
        "Manybirds::complete = true",
        "Manybirds::edition = limited",
        "Manybirds::gender = mens",
        "Manybirds::hue = black",
        "Manybirds::hue = blue",
        "Manybirds::master = mens-couriers",
        "Manybirds::material = cotton",
        "Manybirds::price-tier = msrp",
        "Manybirds::silhouette = runner",
        "loop::returnable = true",
        "shoprunner",
        "YCRF_mens-move-shoes",
        "YGroup_ygroup_mens-couriers"
      ],
      "variants": [
        {
          "id": 40444543696976,
          "title": "8",
          "option1": "8",
          "option2": null,
          "option3": null,
          "sku": "A10875M080",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "98.00",
          "grams": 860,
          "compare_at_price": null,
          "position": 1,
          "product_id": 6919095189584,
          "created_at": "2024-01-10T21:53:12-08:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40444543729744,
          "title": "9",
          "option1": "9",
          "option2": null,
          "option3": null,
          "sku": "A10875M090",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "98.00",
          "grams": 923,
          "compare_at_price": null,
          "position": 2,
          "product_id": 6919095189584,
          "created_at": "2024-01-10T21:53:12-08:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40444543762512,
          "title": "10",
          "option1": "10",
          "option2": null,
          "option3": null,
          "sku": "A10875M100",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "98.00",
          "grams": 965,
          "compare_at_price": null,
          "position": 3,
          "product_id": 6919095189584,
          "created_at": "2024-01-10T21:53:12-08:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40444543795280,
          "title": "11",
          "option1": "11",
          "option2": null,
          "option3": null,
          "sku": "A10875M110",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "98.00",
          "grams": 1027,
          "compare_at_price": null,
          "position": 4,
          "product_id": 6919095189584,
          "created_at": "2024-01-10T21:53:12-08:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40444543828048,
          "title": "12",
          "option1": "12",
          "option2": null,
          "option3": null,
          "sku": "A10875M120",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "98.00",
          "grams": 1076,
          "compare_at_price": null,
          "position": 5,
          "product_id": 6919095189584,
          "created_at": "2024-01-10T21:53:12-08:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40444543860816,
          "title": "13",
          "option1": "13",
          "option2": null,
          "option3": null,
          "sku": "A10875M130",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "98.00",
          "grams": 1137,
          "compare_at_price": null,
          "position": 6,
          "product_id": 6919095189584,
          "created_at": "2024-01-10T21:53:12-08:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40444543893584,
          "title": "14",
          "option1": "14",
          "option2": null,
          "option3": null,
          "sku": "A10875M140",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "98.00",
          "grams": 1185,
          "compare_at_price": null,
          "position": 7,
          "product_id": 6919095189584,
          "created_at": "2024-01-10T21:53:12-08:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        }
      ],
      "images": [
        {
          "id": 32177950490704,
          "created_at": "2024-07-05T15:28:37-07:00",
          "position": 1,
          "updated_at": "2024-07-05T15:28:37-07:00",
          "product_id": 6919095189584,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10875_24Q3_Courier_Natural_Black_Multi_Blizzard_PDP_SINGLE_3Q_3f10aae5-fb6e-4424-b6a9-a8e4134a9318.png?v=1720218517",
          "width": 4000,
          "height": 4000
        },
        {
          "id": 32177950523472,
          "created_at": "2024-07-05T15:28:37-07:00",
          "position": 2,
          "updated_at": "2024-07-05T15:28:37-07:00",
          "product_id": 6919095189584,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10875_24Q3_Courier_Natural_Black_Multi_Blizzard_PDP_LEFT_b55bab7e-0e85-40be-b457-761165491d76.png?v=1720218517",
          "width": 1110,
          "height": 1110
        },
        {
          "id": 32177950556240,
          "created_at": "2024-07-05T15:28:37-07:00",
          "position": 3,
          "updated_at": "2024-07-05T15:28:37-07:00",
          "product_id": 6919095189584,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10875_24Q3_Courier_Natural_Black_Multi_Blizzard_PDP_BACK_e6bb4a6b-5d6a-41f3-93ba-6e7a2a142796.png?v=1720218517",
          "width": 4000,
          "height": 4000
        },
        {
          "id": 32177950589008,
          "created_at": "2024-07-05T15:28:37-07:00",
          "position": 4,
          "updated_at": "2024-07-05T15:28:37-07:00",
          "product_id": 6919095189584,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10875_24Q3_Courier_Natural_Black_Multi_Blizzard_PDP_TD_8a2d64ab-f013-4683-85cd-7ce1daa19eae.png?v=1720218517",
          "width": 4000,
          "height": 4000
        },
        {
          "id": 32177950621776,
          "created_at": "2024-07-05T15:28:37-07:00",
          "position": 5,
          "updated_at": "2024-07-05T15:28:37-07:00",
          "product_id": 6919095189584,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10875_24Q3_Courier_Natural_Black_Multi_Blizzard_PDP_SOLE_44264878-bed1-4f02-b80b-1f15a7b941be.png?v=1720218517",
          "width": 4000,
          "height": 4000
        },
        {
          "id": 32177950654544,
          "created_at": "2024-07-05T15:28:37-07:00",
          "position": 6,
          "updated_at": "2024-07-05T15:28:37-07:00",
          "product_id": 6919095189584,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10875_24Q3_Courier_Natural_Black_Multi_Blizzard_PDP_PAIR_3Q_52f5f245-d1e6-4bb3-925c-863d70f1ead8.png?v=1720218517",
          "width": 4000,
          "height": 4000
        }
      ],
      "options": [
        {
          "name": "Size",
          "position": 1,
          "values": [
            "8",
            "9",
            "10",
            "11",
            "12",
            "13",
            "14"
          ]
        }
      ]
    },
    {
      "id": 6864490004560,
      "title": "Men's SuperLight Wool Runners - Dark Grey (Medium Grey Sole)",
      "handle": "mens-superlight-wool-runners-dark-grey",
      "body_html": "Lighter by nature. Meet the SuperLight Wool Runner \u2013 an everyday sneaker engineered with an ultralight upper and our new revolutionary SuperLight Foam technology for a barely-there feel, and light-as-air fit that\u2019s our lightest and lowest carbon footprint to date. And we\u2019re just getting started\u2026.",
      "published_at": "2024-08-19T15:15:23-07:00",
      "created_at": "2023-08-09T19:57:33-07:00",
      "updated_at": "2024-08-24T17:56:38-07:00",
      "vendor": "Manybirds",
      "product_type": "Shoes",
      "tags": [
        "Manybirds::carbon-score = 4.03",
        "Manybirds::cfId = color-mens-super-light-wool-runners-dark-grey-medium-grey",
        "Manybirds::complete = true",
        "Manybirds::edition = classic",
        "Manybirds::gender = mens",
        "Manybirds::hue = grey",
        "Manybirds::master = mens-superlight-wool-runners",
        "Manybirds::material = wool",
        "Manybirds::price-tier = msrp",
        "Manybirds::silhouette = runner",
        "loop::returnable = true",
        "shoprunner",
        "YCRF_mens-move-shoes",
        "YGroup_ygroup_mens-superlight-wool-runners"
      ],
      "variants": [
        {
          "id": 40260974084176,
          "title": "8",
          "option1": "8",
          "option2": null,
          "option3": null,
          "sku": "A10668M080",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "120.00",
          "grams": 498,
          "compare_at_price": null,
          "position": 1,
          "product_id": 6864490004560,
          "created_at": "2023-08-09T19:57:33-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40260974116944,
          "title": "9",
          "option1": "9",
          "option2": null,
          "option3": null,
          "sku": "A10668M090",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "120.00",
          "grams": 535,
          "compare_at_price": null,
          "position": 2,
          "product_id": 6864490004560,
          "created_at": "2023-08-09T19:57:33-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40260974149712,
          "title": "10",
          "option1": "10",
          "option2": null,
          "option3": null,
          "sku": "A10668M100",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "120.00",
          "grams": 560,
          "compare_at_price": null,
          "position": 3,
          "product_id": 6864490004560,
          "created_at": "2023-08-09T19:57:33-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40260974182480,
          "title": "11",
          "option1": "11",
          "option2": null,
          "option3": null,
          "sku": "A10668M110",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "120.00",
          "grams": 579,
          "compare_at_price": null,
          "position": 4,
          "product_id": 6864490004560,
          "created_at": "2023-08-09T19:57:33-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40260974215248,
          "title": "12",
          "option1": "12",
          "option2": null,
          "option3": null,
          "sku": "A10668M120",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "120.00",
          "grams": 642,
          "compare_at_price": null,
          "position": 5,
          "product_id": 6864490004560,
          "created_at": "2023-08-09T19:57:33-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40260974248016,
          "title": "13",
          "option1": "13",
          "option2": null,
          "option3": null,
          "sku": "A10668M130",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "120.00",
          "grams": 664,
          "compare_at_price": null,
          "position": 6,
          "product_id": 6864490004560,
          "created_at": "2023-08-09T19:57:33-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40260974280784,
          "title": "14",
          "option1": "14",
          "option2": null,
          "option3": null,
          "sku": "A10668M140",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "120.00",
          "grams": 678,
          "compare_at_price": null,
          "position": 7,
          "product_id": 6864490004560,
          "created_at": "2023-08-09T19:57:33-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        }
      ],
      "images": [
        {
          "id": 32365862060112,
          "created_at": "2024-08-13T11:59:28-07:00",
          "position": 1,
          "updated_at": "2024-08-13T11:59:28-07:00",
          "product_id": 6864490004560,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10669_24Q3_SuperLight_WR_Dark_Grey_Medium_Grey_PDP_SINGLE_3Q-2000x2000_f11911c8-d949-4291-9646-5dfa20506abe.png?v=1723575568",
          "width": 2000,
          "height": 2000
        },
        {
          "id": 32365862092880,
          "created_at": "2024-08-13T11:59:28-07:00",
          "position": 2,
          "updated_at": "2024-08-13T11:59:28-07:00",
          "product_id": 6864490004560,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10669_24Q3_SuperLight_WR_Dark_Grey_Medium_Grey_PDP_LEFT-2000x2000_51940ffa-25a8-4037-bfcf-359d1c6f9259.png?v=1723575568",
          "width": 2000,
          "height": 2000
        },
        {
          "id": 32365862125648,
          "created_at": "2024-08-13T11:59:28-07:00",
          "position": 3,
          "updated_at": "2024-08-13T11:59:28-07:00",
          "product_id": 6864490004560,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10669_24Q3_SuperLight_WR_Dark_Grey_Medium_Grey_PDP_BACK-2000x2000_811af23d-dca2-452a-9370-6eb8aa6847b2.png?v=1723575568",
          "width": 2000,
          "height": 2000
        },
        {
          "id": 32365862158416,
          "created_at": "2024-08-13T11:59:28-07:00",
          "position": 4,
          "updated_at": "2024-08-13T11:59:28-07:00",
          "product_id": 6864490004560,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10669_24Q3_SuperLight_WR_Dark_Grey_Medium_Grey_PDP_TD-2000x2000_f1643699-e8d8-4419-adc1-02701aa4e5bd.png?v=1723575568",
          "width": 2000,
          "height": 2000
        },
        {
          "id": 32365862191184,
          "created_at": "2024-08-13T11:59:28-07:00",
          "position": 5,
          "updated_at": "2024-08-13T11:59:28-07:00",
          "product_id": 6864490004560,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10669_24Q3_SuperLight_WR_Dark_Grey_Medium_Grey_PDP_SOLE-2000x2000_1dccbf00-9cc1-4223-81b3-6d15c697630e.png?v=1723575568",
          "width": 2000,
          "height": 2000
        },
        {
          "id": 32365862223952,
          "created_at": "2024-08-13T11:59:28-07:00",
          "position": 6,
          "updated_at": "2024-08-13T11:59:28-07:00",
          "product_id": 6864490004560,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10669_24Q3_SuperLight_WR_Dark_Grey_Medium_Grey_PDP_PAIR_3Q-2000x2000_529013c3-128b-4cf7-86c2-1ed204f8d3e2.png?v=1723575568",
          "width": 2000,
          "height": 2000
        }
      ],
      "options": [
        {
          "name": "Size",
          "position": 1,
          "values": [
            "8",
            "9",
            "10",
            "11",
            "12",
            "13",
            "14"
          ]
        }
      ]
    },
    {
      "id": 7082686742608,
      "title": "Women's Tree Breezers Knit - Rugged Beige (Hazy Beige Sole)",
      "handle": "womens-tree-breezers-rugged-beige-knit",
      "body_html": "Crafted with silky-smooth, breathable eucalyptus tree fiber and a secure fitted collar, the Tree Breezer is a versatile, lightweight, and comfortable ballet flat with no break-in necessary.",
      "published_at": "2024-08-19T15:15:22-07:00",
      "created_at": "2024-07-08T16:26:01-07:00",
      "updated_at": "2024-08-24T17:56:38-07:00",
      "vendor": "Manybirds",
      "product_type": "Shoes",
      "tags": [
        "Manybirds::carbon-score = 2.93",
        "Manybirds::cfId = color-womens-tree-breezers-rugged-beige-hazy-beige",
        "Manybirds::complete = true",
        "Manybirds::edition = limited",
        "Manybirds::gender = womens",
        "Manybirds::hue = beige",
        "Manybirds::master = womens-tree-breezers",
        "Manybirds::material = tree",
        "Manybirds::price-tier = msrp",
        "Manybirds::silhouette = breezer",
        "loop::returnable = true",
        "shoprunner",
        "YCRF_womens-move-shoes-half-sizes",
        "YGroup_ygroup_womens-tree-breezers"
      ],
      "variants": [
        {
          "id": 40832464322640,
          "title": "5",
          "option1": "5",
          "option2": null,
          "option3": null,
          "sku": "A10938W050",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "100.00",
          "grams": 331,
          "compare_at_price": null,
          "position": 1,
          "product_id": 7082686742608,
          "created_at": "2024-07-08T16:26:01-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40832464355408,
          "title": "5.5",
          "option1": "5.5",
          "option2": null,
          "option3": null,
          "sku": "A10938W055",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "100.00",
          "grams": 341,
          "compare_at_price": null,
          "position": 2,
          "product_id": 7082686742608,
          "created_at": "2024-07-08T16:26:01-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40832464388176,
          "title": "6",
          "option1": "6",
          "option2": null,
          "option3": null,
          "sku": "A10938W060",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "100.00",
          "grams": 351,
          "compare_at_price": null,
          "position": 3,
          "product_id": 7082686742608,
          "created_at": "2024-07-08T16:26:01-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40832464420944,
          "title": "6.5",
          "option1": "6.5",
          "option2": null,
          "option3": null,
          "sku": "A10938W065",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "100.00",
          "grams": 361,
          "compare_at_price": null,
          "position": 4,
          "product_id": 7082686742608,
          "created_at": "2024-07-08T16:26:01-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40832464453712,
          "title": "7",
          "option1": "7",
          "option2": null,
          "option3": null,
          "sku": "A10938W070",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "100.00",
          "grams": 371,
          "compare_at_price": null,
          "position": 5,
          "product_id": 7082686742608,
          "created_at": "2024-07-08T16:26:01-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40832464486480,
          "title": "7.5",
          "option1": "7.5",
          "option2": null,
          "option3": null,
          "sku": "A10938W075",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "100.00",
          "grams": 381,
          "compare_at_price": null,
          "position": 6,
          "product_id": 7082686742608,
          "created_at": "2024-07-08T16:26:01-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40832464519248,
          "title": "8",
          "option1": "8",
          "option2": null,
          "option3": null,
          "sku": "A10938W080",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "100.00",
          "grams": 391,
          "compare_at_price": null,
          "position": 7,
          "product_id": 7082686742608,
          "created_at": "2024-07-08T16:26:01-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40832464552016,
          "title": "8.5",
          "option1": "8.5",
          "option2": null,
          "option3": null,
          "sku": "A10938W085",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "100.00",
          "grams": 401,
          "compare_at_price": null,
          "position": 8,
          "product_id": 7082686742608,
          "created_at": "2024-07-08T16:26:01-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40832464584784,
          "title": "9",
          "option1": "9",
          "option2": null,
          "option3": null,
          "sku": "A10938W090",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "100.00",
          "grams": 416,
          "compare_at_price": null,
          "position": 9,
          "product_id": 7082686742608,
          "created_at": "2024-07-08T16:26:01-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40832464617552,
          "title": "9.5",
          "option1": "9.5",
          "option2": null,
          "option3": null,
          "sku": "A10938W095",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "100.00",
          "grams": 426,
          "compare_at_price": null,
          "position": 10,
          "product_id": 7082686742608,
          "created_at": "2024-07-08T16:26:01-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40832464650320,
          "title": "10",
          "option1": "10",
          "option2": null,
          "option3": null,
          "sku": "A10938W100",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "100.00",
          "grams": 436,
          "compare_at_price": null,
          "position": 11,
          "product_id": 7082686742608,
          "created_at": "2024-07-08T16:26:01-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40832464683088,
          "title": "10.5",
          "option1": "10.5",
          "option2": null,
          "option3": null,
          "sku": "A10938W105",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "100.00",
          "grams": 446,
          "compare_at_price": null,
          "position": 12,
          "product_id": 7082686742608,
          "created_at": "2024-07-08T16:26:01-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        },
        {
          "id": 40832464715856,
          "title": "11",
          "option1": "11",
          "option2": null,
          "option3": null,
          "sku": "A10938W110",
          "requires_shipping": true,
          "taxable": true,
          "featured_image": null,
          "available": true,
          "price": "100.00",
          "grams": 456,
          "compare_at_price": null,
          "position": 13,
          "product_id": 7082686742608,
          "created_at": "2024-07-08T16:26:01-07:00",
          "updated_at": "2024-08-24T17:56:38-07:00"
        }
      ],
      "images": [
        {
          "id": 32367931359312,
          "created_at": "2024-08-14T10:03:51-07:00",
          "position": 1,
          "updated_at": "2024-08-14T10:03:51-07:00",
          "product_id": 7082686742608,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10938_24Q3_Tree_Breezer_Knit_Pack_Rugged_Beige_Hazy_Beige_SINGLE_3Q-2000x2000.png?v=1723655031",
          "width": 2000,
          "height": 2000
        },
        {
          "id": 32367931392080,
          "created_at": "2024-08-14T10:03:51-07:00",
          "position": 2,
          "updated_at": "2024-08-14T10:03:51-07:00",
          "product_id": 7082686742608,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10938_24Q3_Tree_Breezer_Knit_Pack_Rugged_Beige_Hazy_Beige_LEFT-2000x2000.png?v=1723655031",
          "width": 2000,
          "height": 2000
        },
        {
          "id": 32367931424848,
          "created_at": "2024-08-14T10:03:51-07:00",
          "position": 3,
          "updated_at": "2024-08-14T10:03:51-07:00",
          "product_id": 7082686742608,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10938_24Q3_Tree_Breezer_Knit_Pack_Rugged_Beige_Hazy_Beige_BACK-2000x2000.png?v=1723655031",
          "width": 2000,
          "height": 2000
        },
        {
          "id": 32367931457616,
          "created_at": "2024-08-14T10:03:51-07:00",
          "position": 4,
          "updated_at": "2024-08-14T10:03:51-07:00",
          "product_id": 7082686742608,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10938_24Q3_Tree_Breezer_Knit_Pack_Rugged_Beige_Hazy_Beige_TD-2000x2000.png?v=1723655031",
          "width": 2000,
          "height": 2000
        },
        {
          "id": 32367931490384,
          "created_at": "2024-08-14T10:03:51-07:00",
          "position": 5,
          "updated_at": "2024-08-14T10:03:51-07:00",
          "product_id": 7082686742608,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10938_24Q3_Tree_Breezer_Knit_Pack_Rugged_Beige_Hazy_Beige_SOLE-2000x2000.png?v=1723655031",
          "width": 2000,
          "height": 2000
        },
        {
          "id": 32367931523152,
          "created_at": "2024-08-14T10:03:51-07:00",
          "position": 6,
          "updated_at": "2024-08-14T10:03:51-07:00",
          "product_id": 7082686742608,
          "variant_ids": [],
          "src": "https:\/\/cdn.shopify.com\/s\/files\/1\/1104\/4168\/files\/A10938_24Q3_Tree_Breezer_Knit_Pack_Rugged_Beige_Hazy_Beige_PAIR_3Q-2000x2000.png?v=1723655031",
          "width": 2000,
          "height": 2000
        }
      ],
      "options": [
        {
          "name": "Size",
          "position": 1,
          "values": [
            "5",
            "5.5",
            "6",
            "6.5",
            "7",
            "7.5",
            "8",
            "8.5",
            "9",
            "9.5",
            "10",
            "10.5",
            "11"
          ]
        }
      ]
    }
  ]
}
```
Page 7/10FirstPrevNextLast