Migration notebook (#1492)

* Add migration notebook

* Update migration instructions

* Semver

* Rename item in relationships table

* Remove indexing vector store shim

* Remove query shims

* Remove columns from migrated data

* Format

* Add community parents
This commit is contained in:
Nathan Evans 2024-12-10 12:23:26 -08:00 committed by GitHub
parent 1a13e0fd93
commit 61816e076f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 310 additions and 324 deletions

View File

@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Add migration notebook."
}

View File

@ -171,9 +171,6 @@
" read_indexer_reports,\n",
" read_indexer_text_units,\n",
")\n",
"from graphrag.query.input.loaders.dfs import (\n",
" store_entity_semantic_embeddings,\n",
")\n",
"from graphrag.query.llm.oai.chat_openai import ChatOpenAI\n",
"from graphrag.query.llm.oai.embedding import OpenAIEmbedding\n",
"from graphrag.query.llm.oai.typing import OpenaiApiType\n",
@ -207,9 +204,6 @@
" collection_name=\"default-entity-description\",\n",
")\n",
"description_embedding_store.connect(db_uri=LANCEDB_URI)\n",
"entity_description_embeddings = store_entity_semantic_embeddings(\n",
" entities=entities, vectorstore=description_embedding_store\n",
")\n",
"\n",
"print(f\"Entity count: {len(entity_df)}\")\n",
"entity_df.head()\n",
@ -270,37 +264,16 @@
}
],
"source": [
"def embed_community_reports(\n",
"def read_community_reports(\n",
" input_dir: str,\n",
" embedder: OpenAIEmbedding,\n",
" community_report_table: str = COMMUNITY_REPORT_TABLE,\n",
"):\n",
" \"\"\"Embeds the full content of the community reports and saves the DataFrame with embeddings to the output path.\"\"\"\n",
" input_path = Path(input_dir) / f\"{community_report_table}.parquet\"\n",
" output_path = Path(input_dir) / f\"{community_report_table}_with_embeddings.parquet\"\n",
"\n",
" if not Path(output_path).exists():\n",
" print(\"Embedding file not found. Computing community report embeddings...\")\n",
"\n",
" report_df = pd.read_parquet(input_path)\n",
"\n",
" if \"full_content\" not in report_df.columns:\n",
" error_msg = f\"'full_content' column not found in {input_path}\"\n",
" raise ValueError(error_msg)\n",
"\n",
" report_df[\"full_content_embeddings\"] = report_df.loc[:, \"full_content\"].apply(\n",
" lambda x: embedder.embed(x)\n",
" )\n",
"\n",
" # Save the DataFrame with embeddings to the output path\n",
" report_df.to_parquet(output_path)\n",
" print(f\"Embeddings saved to {output_path}\")\n",
" return report_df\n",
" print(f\"Embeddings file already exists at {output_path}\")\n",
" return pd.read_parquet(output_path)\n",
" return pd.read_parquet(input_path)\n",
"\n",
"\n",
"report_df = embed_community_reports(INPUT_DIR, text_embedder)\n",
"report_df = read_community_reports(INPUT_DIR)\n",
"reports = read_indexer_reports(\n",
" report_df,\n",
" entity_df,\n",
@ -321,7 +294,7 @@
" entities=entities,\n",
" relationships=relationships,\n",
" reports=reports,\n",
" entity_text_embeddings=entity_description_embeddings,\n",
" entity_text_embeddings=description_embedding_store,\n",
" text_units=text_units,\n",
")\n",
"\n",
@ -3172,7 +3145,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "graphrag-ta_-cxM1-py3.10",
"display_name": ".venv",
"language": "python",
"name": "python3"
},
@ -3186,7 +3159,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
"version": "3.11.9"
}
},
"nbformat": 4,

View File

@ -0,0 +1,263 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# Copyright (c) 2024 Microsoft Corporation.\n",
"# Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Index Migration\n",
"\n",
"This notebook is used to maintain data model parity with older indexes for the latest versions of GraphRAG. If you have a pre-1.0 index and need to migrate without re-running the entire pipeline, you can use this notebook to only update the pieces necessary for alignment.\n",
"\n",
"NOTE: we recommend regenerating your settings.yml with the latest version of GraphRAG using `graphrag init`. Copy your LLM settings into it before running this notebook. This ensures your config is aligned with the latest version for the migration. This also ensures that you have default vector store config, which is now required or indexing will fail.\n",
"\n",
"WARNING: This will overwrite your parquet files, you may want to make a backup!"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# This is the directory that has your settings.yml\n",
"# NOTE: much older indexes may have been output with a timestamped directory\n",
"# if this is the case, you will need to make sure the storage.base_dir in settings.yml points to it correctly\n",
"PROJECT_DIRECTORY = \"<your project directory>\""
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"\n",
"from graphrag.config.load_config import load_config\n",
"from graphrag.config.resolve_path import resolve_paths\n",
"from graphrag.index.create_pipeline_config import create_pipeline_config\n",
"from graphrag.storage.factory import create_storage\n",
"\n",
"# This first block does some config loading, path resolution, and translation that is normally done by the CLI/API when running a full workflow\n",
"config = load_config(Path(PROJECT_DIRECTORY))\n",
"resolve_paths(config)\n",
"pipeline_config = create_pipeline_config(config)\n",
"storage = create_storage(pipeline_config.storage)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"def remove_columns(df, columns):\n",
" \"\"\"Remove columns from a DataFrame, suppressing errors.\"\"\"\n",
" df.drop(labels=columns, axis=1, errors=\"ignore\", inplace=True)"
]
},
{
"cell_type": "code",
"execution_count": 63,
"metadata": {},
"outputs": [],
"source": [
"def get_community_parent(nodes):\n",
" \"\"\"Compute the parent community using the node membership as a lookup.\"\"\"\n",
" parent_mapping = nodes.loc[:, [\"level\", \"community\", \"title\"]]\n",
" nodes = nodes.loc[:, [\"level\", \"community\", \"title\"]]\n",
"\n",
" # Create a parent mapping by adding 1 to the level column\n",
" parent_mapping[\"level\"] += 1 # Shift levels for parent relationship\n",
" parent_mapping.rename(columns={\"community\": \"parent\"}, inplace=True)\n",
"\n",
" # Merge the parent information back into the base DataFrame\n",
" nodes = nodes.merge(parent_mapping, on=[\"level\", \"title\"], how=\"left\")\n",
"\n",
" # Fill missing parents with -1 (default value)\n",
" nodes[\"parent\"] = nodes[\"parent\"].fillna(-1).astype(int)\n",
"\n",
" join = (\n",
" nodes.groupby([\"community\", \"level\", \"parent\"])\n",
" .agg({\"title\": list})\n",
" .reset_index()\n",
" )\n",
" return join[join[\"community\"] > -1].loc[:, [\"community\", \"parent\"]]"
]
},
{
"cell_type": "code",
"execution_count": 64,
"metadata": {},
"outputs": [],
"source": [
"from uuid import uuid4\n",
"\n",
"from graphrag.utils.storage import load_table_from_storage, write_table_to_storage\n",
"\n",
"# First we'll go through any parquet files that had model changes and update them\n",
"# The new data model may have removed excess columns as well, but we will only make the minimal changes required for compatibility\n",
"\n",
"final_documents = await load_table_from_storage(\n",
" \"create_final_documents.parquet\", storage\n",
")\n",
"final_text_units = await load_table_from_storage(\n",
" \"create_final_text_units.parquet\", storage\n",
")\n",
"final_entities = await load_table_from_storage(\"create_final_entities.parquet\", storage)\n",
"final_nodes = await load_table_from_storage(\"create_final_nodes.parquet\", storage)\n",
"final_relationships = await load_table_from_storage(\n",
" \"create_final_relationships.parquet\", storage\n",
")\n",
"final_communities = await load_table_from_storage(\n",
" \"create_final_communities.parquet\", storage\n",
")\n",
"final_community_reports = await load_table_from_storage(\n",
" \"create_final_community_reports.parquet\", storage\n",
")\n",
"\n",
"\n",
"# Documents renames raw_content for consistency\n",
"if \"raw_content\" in final_documents.columns:\n",
" final_documents.rename(columns={\"raw_content\": \"text\"}, inplace=True)\n",
"final_documents[\"human_readable_id\"] = final_documents.index + 1\n",
"\n",
"# Text units just get a human_readable_id or consistency\n",
"final_text_units[\"human_readable_id\"] = final_text_units.index + 1\n",
"\n",
"# We renamed \"name\" to \"title\" for consistency with the rest of the tables\n",
"if \"name\" in final_entities.columns:\n",
" final_entities.rename(columns={\"name\": \"title\"}, inplace=True)\n",
"remove_columns(\n",
" final_entities, [\"mname_embedding\", \"graph_embedding\", \"description_embedding\"]\n",
")\n",
"\n",
"# Final nodes uses community for joins, which is now an int everywhere\n",
"final_nodes[\"community\"] = final_nodes[\"community\"].fillna(-1)\n",
"final_nodes[\"community\"] = final_nodes[\"community\"].astype(int)\n",
"remove_columns(\n",
" final_nodes,\n",
" [\n",
" \"type\",\n",
" \"description\",\n",
" \"source_id\",\n",
" \"graph_embedding\",\n",
" \"entity_type\",\n",
" \"top_level_node_id\",\n",
" \"size\",\n",
" ],\n",
")\n",
"\n",
"# Relationships renames \"rank\" to \"combined_degree\" to be clear what the default ranking is\n",
"if \"rank\" in final_relationships.columns:\n",
" final_relationships.rename(columns={\"rank\": \"combined_degree\"}, inplace=True)\n",
"\n",
"\n",
"# Compute the parents for each community, to add to communities and reports\n",
"parent_df = get_community_parent(final_nodes)\n",
"\n",
"# Communities previously used the \"id\" field for the Leiden id, but we've moved this to the community field and use a uuid for id like the others\n",
"if \"community\" not in final_communities.columns:\n",
" final_communities[\"community\"] = final_communities[\"id\"].astype(int)\n",
" final_communities[\"human_readable_id\"] = final_communities[\"community\"]\n",
" final_communities[\"id\"] = [str(uuid4()) for _ in range(len(final_communities))]\n",
"if \"parent\" not in final_communities.columns:\n",
" final_communities = final_communities.merge(parent_df, on=\"community\", how=\"left\")\n",
"remove_columns(final_communities, [\"raw_community\"])\n",
"\n",
"# We need int for community and the human_readable_id copy for consistency\n",
"final_community_reports[\"community\"] = final_community_reports[\"community\"].astype(int)\n",
"final_community_reports[\"human_readable_id\"] = final_community_reports[\"community\"]\n",
"if \"parent\" not in final_community_reports.columns:\n",
" final_community_reports = final_community_reports.merge(\n",
" parent_df, on=\"community\", how=\"left\"\n",
" )\n",
"\n",
"await write_table_to_storage(final_documents, \"create_final_documents.parquet\", storage)\n",
"await write_table_to_storage(\n",
" final_text_units, \"create_final_text_units.parquet\", storage\n",
")\n",
"await write_table_to_storage(final_entities, \"create_final_entities.parquet\", storage)\n",
"await write_table_to_storage(final_nodes, \"create_final_nodes.parquet\", storage)\n",
"await write_table_to_storage(\n",
" final_relationships, \"create_final_relationships.parquet\", storage\n",
")\n",
"await write_table_to_storage(\n",
" final_communities, \"create_final_communities.parquet\", storage\n",
")\n",
"await write_table_to_storage(\n",
" final_community_reports, \"create_final_community_reports.parquet\", storage\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"from datashaper import NoopVerbCallbacks\n",
"\n",
"from graphrag.cache.factory import create_cache\n",
"from graphrag.index.flows.generate_text_embeddings import generate_text_embeddings\n",
"\n",
"# We only need to re-run the embeddings workflow, to ensure that embeddings for all required search fields are in place\n",
"# We'll construct the context and run this function flow directly to avoid everything else\n",
"\n",
"workflow = next(\n",
" (x for x in pipeline_config.workflows if x.name == \"generate_text_embeddings\"), None\n",
")\n",
"config = workflow.config\n",
"text_embed = config.get(\"text_embed\", {})\n",
"embedded_fields = config.get(\"embedded_fields\", {})\n",
"callbacks = NoopVerbCallbacks()\n",
"cache = create_cache(pipeline_config.cache, PROJECT_DIRECTORY)\n",
"\n",
"await generate_text_embeddings(\n",
" final_documents=None,\n",
" final_relationships=None,\n",
" final_text_units=final_text_units,\n",
" final_entities=final_entities,\n",
" final_community_reports=final_community_reports,\n",
" callbacks=callbacks,\n",
" cache=cache,\n",
" storage=storage,\n",
" text_embed_config=text_embed,\n",
" embedded_fields=embedded_fields,\n",
" snapshot_embeddings_enabled=False,\n",
")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -29,9 +29,6 @@
" read_indexer_reports,\n",
" read_indexer_text_units,\n",
")\n",
"from graphrag.query.input.loaders.dfs import (\n",
" store_entity_semantic_embeddings,\n",
")\n",
"from graphrag.query.llm.oai.chat_openai import ChatOpenAI\n",
"from graphrag.query.llm.oai.embedding import OpenAIEmbedding\n",
"from graphrag.query.llm.oai.typing import OpenaiApiType\n",
@ -287,9 +284,6 @@
" collection_name=\"default-entity-description\",\n",
")\n",
"description_embedding_store.connect(db_uri=LANCEDB_URI)\n",
"entity_description_embeddings = store_entity_semantic_embeddings(\n",
" entities=entities, vectorstore=description_embedding_store\n",
")\n",
"\n",
"print(f\"Entity count: {len(entity_df)}\")\n",
"entity_df.head()"

View File

@ -38,9 +38,6 @@
" read_indexer_reports,\n",
" read_indexer_text_units,\n",
")\n",
"from graphrag.query.input.loaders.dfs import (\n",
" store_entity_semantic_embeddings,\n",
")\n",
"from graphrag.query.llm.oai.chat_openai import ChatOpenAI\n",
"from graphrag.query.llm.oai.embedding import OpenAIEmbedding\n",
"from graphrag.query.llm.oai.typing import OpenaiApiType\n",
@ -302,9 +299,6 @@
" collection_name=\"default-entity-description\",\n",
")\n",
"description_embedding_store.connect(db_uri=LANCEDB_URI)\n",
"entity_description_embeddings = store_entity_semantic_embeddings(\n",
" entities=entities, vectorstore=description_embedding_store\n",
")\n",
"covariate_df = pd.read_parquet(f\"{INPUT_DIR}/{COVARIATE_TABLE}.parquet\")\n",
"claims = read_indexer_covariates(covariate_df)\n",
"covariates = {\"claims\": claims}\n",

View File

@ -8,8 +8,6 @@ WARNING: This API is under development and may undergo changes in future release
Backwards compatibility is not guaranteed at this time.
"""
from pathlib import Path
from datashaper import WorkflowCallbacks
from graphrag.cache.noop_pipeline_cache import NoopPipelineCache
@ -20,7 +18,6 @@ from graphrag.index.create_pipeline_config import create_pipeline_config
from graphrag.index.run import run_pipeline_with_config
from graphrag.index.typing import PipelineRunResult
from graphrag.logging.base import ProgressReporter
from graphrag.vector_stores.factory import VectorStoreType
async def build_index(
@ -59,8 +56,6 @@ async def build_index(
msg = "Cannot resume and update a run at the same time."
raise ValueError(msg)
config = _patch_vector_config(config)
pipeline_config = create_pipeline_config(config)
pipeline_cache = (
NoopPipelineCache() if config.cache.type == CacheType.none is None else None
@ -88,22 +83,3 @@ async def build_index(
progress_reporter.success(output.workflow)
progress_reporter.info(str(output.result))
return outputs
def _patch_vector_config(config: GraphRagConfig):
"""Back-compat patch to ensure a default vector store configuration."""
if not config.embeddings.vector_store:
config.embeddings.vector_store = {
"type": "lancedb",
"db_uri": "output/lancedb",
"container_name": "default",
"overwrite": True,
}
# TODO: must update filepath of lancedb (if used) until the new config engine has been implemented
# TODO: remove the type ignore annotations below once the new config engine has been refactored
vector_store_type = config.embeddings.vector_store["type"] # type: ignore
if vector_store_type == VectorStoreType.LanceDB:
db_uri = config.embeddings.vector_store["db_uri"] # type: ignore
lancedb_dir = Path(config.root_dir).resolve() / db_uri
config.embeddings.vector_store["db_uri"] = str(lancedb_dir) # type: ignore
return config

View File

@ -48,7 +48,7 @@ from graphrag.query.structured_search.base import SearchResult # noqa: TC001
from graphrag.utils.cli import redact
from graphrag.utils.embeddings import create_collection_name
from graphrag.vector_stores.base import BaseVectorStore
from graphrag.vector_stores.factory import VectorStoreFactory, VectorStoreType
from graphrag.vector_stores.factory import VectorStoreFactory
reporter = PrintProgressReporter("")
@ -240,17 +240,7 @@ async def local_search(
------
TODO: Document any exceptions to expect.
"""
config = _patch_vector_store(config, nodes, entities, community_level)
# TODO: update filepath of lancedb (if used) until the new config engine has been implemented
# TODO: remove the type ignore annotations below once the new config engine has been refactored
vector_store_type = config.embeddings.vector_store.get("type") # type: ignore
vector_store_args = config.embeddings.vector_store
if vector_store_type == VectorStoreType.LanceDB:
db_uri = config.embeddings.vector_store["db_uri"] # type: ignore
lancedb_dir = Path(config.root_dir).resolve() / db_uri
vector_store_args["db_uri"] = str(lancedb_dir) # type: ignore
reporter.info(f"Vector Store Args: {redact(vector_store_args)}") # type: ignore
description_embedding_store = _get_embedding_store(
@ -316,17 +306,7 @@ async def local_search_streaming(
------
TODO: Document any exceptions to expect.
"""
config = _patch_vector_store(config, nodes, entities, community_level)
# TODO: must update filepath of lancedb (if used) until the new config engine has been implemented
# TODO: remove the type ignore annotations below once the new config engine has been refactored
vector_store_type = config.embeddings.vector_store.get("type") # type: ignore
vector_store_args = config.embeddings.vector_store
if vector_store_type == VectorStoreType.LanceDB:
db_uri = config.embeddings.vector_store["db_uri"] # type: ignore
lancedb_dir = Path(config.root_dir).resolve() / db_uri
vector_store_args["db_uri"] = str(lancedb_dir) # type: ignore
reporter.info(f"Vector Store Args: {redact(vector_store_args)}") # type: ignore
description_embedding_store = _get_embedding_store(
@ -399,19 +379,7 @@ async def drift_search(
------
TODO: Document any exceptions to expect.
"""
config = _patch_vector_store(
config, nodes, entities, community_level, with_reports=community_reports
)
# TODO: update filepath of lancedb (if used) until the new config engine has been implemented
# TODO: remove the type ignore annotations below once the new config engine has been refactored
vector_store_type = config.embeddings.vector_store.get("type") # type: ignore
vector_store_args = config.embeddings.vector_store
if vector_store_type == VectorStoreType.LanceDB:
db_uri = config.embeddings.vector_store["db_uri"] # type: ignore
lancedb_dir = Path(config.root_dir).resolve() / db_uri
vector_store_args["db_uri"] = str(lancedb_dir) # type: ignore
reporter.info(f"Vector Store Args: {redact(vector_store_args)}") # type: ignore
description_embedding_store = _get_embedding_store(
@ -453,85 +421,6 @@ async def drift_search(
return response, context_data
def _patch_vector_store(
config: GraphRagConfig,
nodes: pd.DataFrame,
entities: pd.DataFrame,
community_level: int,
with_reports: pd.DataFrame | None = None,
) -> GraphRagConfig:
# TODO: remove the following patch that checks for a vector_store prior to v1 release
# TODO: this is a backwards compatibility patch that injects the default vector_store settings into the config if it is not present
# Only applicable in situations involving a local vector_store (lancedb). The general idea:
# if vector_store not in config:
# 1. assume user is running local if vector_store is not in config
# 2. insert default vector_store in config
# 3 .create lancedb vector_store instance
# 4. upload vector embeddings from the input dataframes to the vector_store
if not config.embeddings.vector_store:
from graphrag.query.input.loaders.dfs import (
store_entity_semantic_embeddings,
)
from graphrag.vector_stores.lancedb import LanceDBVectorStore
config.embeddings.vector_store = {
"type": "lancedb",
"db_uri": f"{Path(config.storage.base_dir)}/lancedb",
"container_name": "default",
"overwrite": True,
}
description_embedding_store = LanceDBVectorStore(
db_uri=config.embeddings.vector_store["db_uri"],
collection_name=create_collection_name(
config.embeddings.vector_store["container_name"],
entity_description_embedding,
),
overwrite=config.embeddings.vector_store["overwrite"],
)
description_embedding_store.connect(
db_uri=config.embeddings.vector_store["db_uri"]
)
# dump embeddings from the entities list to the description_embedding_store
entities_ = read_indexer_entities(nodes, entities, community_level)
store_entity_semantic_embeddings(
entities=entities_, vectorstore=description_embedding_store
)
if with_reports is not None:
from graphrag.query.input.loaders.dfs import (
store_reports_semantic_embeddings,
)
from graphrag.vector_stores.lancedb import LanceDBVectorStore
community_reports = with_reports
container_name = config.embeddings.vector_store["container_name"]
# Store report embeddings
reports = read_indexer_reports(
community_reports,
nodes,
community_level,
content_embedding_col="full_content_embedding",
config=config,
)
full_content_embedding_store = LanceDBVectorStore(
db_uri=config.embeddings.vector_store["db_uri"],
collection_name=create_collection_name(
container_name, community_full_content_embedding
),
overwrite=config.embeddings.vector_store["overwrite"],
)
full_content_embedding_store.connect(
db_uri=config.embeddings.vector_store["db_uri"]
)
# dump embeddings from the reports list to the full_content_embedding_store
store_reports_semantic_embeddings(
reports=reports, vectorstore=full_content_embedding_store
)
return config
def _get_embedding_store(
config_args: dict,
embedding_name: str,

View File

@ -16,7 +16,7 @@ from graphrag.config.resolve_path import resolve_paths
from graphrag.index.create_pipeline_config import create_pipeline_config
from graphrag.logging.print_progress import PrintProgressReporter
from graphrag.storage.factory import create_storage
from graphrag.utils.storage import _load_table_from_storage
from graphrag.utils.storage import load_table_from_storage
reporter = PrintProgressReporter("")
@ -124,7 +124,6 @@ def run_local_search(
config.storage.base_dir = str(data_dir) if data_dir else config.storage.base_dir
resolve_paths(config)
# TODO remove optional create_final_entities_description_embeddings.parquet to delete backwards compatibility
dataframe_dict = _resolve_parquet_files(
config=config,
parquet_list=[
@ -270,7 +269,7 @@ def _resolve_parquet_files(
for parquet_file in parquet_list:
df_key = parquet_file.split(".")[0]
df_value = asyncio.run(
_load_table_from_storage(name=parquet_file, storage=storage_obj)
load_table_from_storage(name=parquet_file, storage=storage_obj)
)
dataframe_dict[df_key] = df_value
@ -281,7 +280,7 @@ def _resolve_parquet_files(
df_key = optional_file.split(".")[0]
if file_exists:
df_value = asyncio.run(
_load_table_from_storage(name=optional_file, storage=storage_obj)
load_table_from_storage(name=optional_file, storage=storage_obj)
)
dataframe_dict[df_key] = df_value
else:

View File

@ -24,9 +24,6 @@ class LLMParameters(BaseModel):
description="The encoding model to use", default=defs.ENCODING_MODEL
)
model: str = Field(description="The LLM model to use.", default=defs.LLM_MODEL)
embeddings_model: str | None = Field(
description="The embeddings model to use.", default=defs.EMBEDDING_MODEL
)
max_tokens: int | None = Field(
description="The maximum number of tokens to generate.",
default=defs.LLM_MAX_TOKENS,

View File

@ -9,6 +9,7 @@ from string import Template
from graphrag.config.enums import ReportingType, StorageType
from graphrag.config.models.graph_rag_config import GraphRagConfig
from graphrag.vector_stores.factory import VectorStoreType
def _resolve_timestamp_path_with_value(path: str | Path, timestamp_value: str) -> Path:
@ -203,3 +204,11 @@ def resolve_paths(
pattern_or_timestamp_value,
)
)
# TODO: must update filepath of lancedb (if used) until the new config engine has been implemented
# TODO: remove the type ignore annotations below once the new config engine has been refactored
vector_store_type = config.embeddings.vector_store["type"] # type: ignore
if vector_store_type == VectorStoreType.LanceDB:
db_uri = config.embeddings.vector_store["db_uri"] # type: ignore
lancedb_dir = Path(config.root_dir).resolve() / db_uri
config.embeddings.vector_store["db_uri"] = str(lancedb_dir) # type: ignore

View File

@ -39,10 +39,6 @@ async def create_input(
log.info("loading input from root_dir=%s", config.base_dir)
progress_reporter = progress_reporter or NullProgressReporter()
if config is None:
msg = "No input specified!"
raise ValueError(msg)
match config.type:
case InputType.blob:
log.info("using blob storage input")

View File

@ -126,10 +126,6 @@ async def run_pipeline_with_config(
)
workflows = workflows or config.workflows
if dataset is None:
msg = "No dataset provided!"
raise ValueError(msg)
if is_update_run and update_index_storage:
delta_dataset = await get_delta_docs(dataset, storage)

View File

@ -23,7 +23,7 @@ from graphrag.index.run.profiling import _write_workflow_stats
from graphrag.index.typing import PipelineRunResult
from graphrag.logging.base import ProgressReporter
from graphrag.storage.pipeline_storage import PipelineStorage
from graphrag.utils.storage import _load_table_from_storage
from graphrag.utils.storage import load_table_from_storage
log = logging.getLogger(__name__)
@ -41,7 +41,7 @@ async def _inject_workflow_data_dependencies(
for id in deps:
workflow_id = f"workflow:{id}"
try:
table = await _load_table_from_storage(f"{id}.parquet", storage)
table = await load_table_from_storage(f"{id}.parquet", storage)
except ValueError:
# our workflows allow for transient tables, and we avoid putting those in storage
# however, we need to keep the table in the dependency list for proper execution order.

View File

@ -25,7 +25,7 @@ from graphrag.index.update.entities import (
from graphrag.index.update.relationships import _update_and_merge_relationships
from graphrag.logging.print_progress import ProgressReporter
from graphrag.storage.pipeline_storage import PipelineStorage
from graphrag.utils.storage import _load_table_from_storage
from graphrag.utils.storage import load_table_from_storage
@dataclass
@ -61,7 +61,7 @@ async def get_delta_docs(
InputDelta
The input delta. With new inputs and deleted inputs.
"""
final_docs = await _load_table_from_storage(
final_docs = await load_table_from_storage(
"create_final_documents.parquet", storage
)
@ -171,7 +171,7 @@ async def _update_community_reports(
dataframe_dict, storage, update_storage, community_id_mapping
):
"""Update the community reports output."""
old_community_reports = await _load_table_from_storage(
old_community_reports = await load_table_from_storage(
"create_final_community_reports.parquet", storage
)
delta_community_reports = dataframe_dict["create_final_community_reports"]
@ -192,7 +192,7 @@ async def _update_communities(
dataframe_dict, storage, update_storage, community_id_mapping
):
"""Update the communities output."""
old_communities = await _load_table_from_storage(
old_communities = await load_table_from_storage(
"create_final_communities.parquet", storage
)
delta_communities = dataframe_dict["create_final_communities"]
@ -207,7 +207,7 @@ async def _update_communities(
async def _update_nodes(dataframe_dict, storage, update_storage, merged_entities_df):
"""Update the nodes output."""
old_nodes = await _load_table_from_storage("create_final_nodes.parquet", storage)
old_nodes = await load_table_from_storage("create_final_nodes.parquet", storage)
delta_nodes = dataframe_dict["create_final_nodes"]
merged_nodes, community_id_mapping = _merge_and_resolve_nodes(
@ -220,7 +220,7 @@ async def _update_nodes(dataframe_dict, storage, update_storage, merged_entities
async def _update_covariates(dataframe_dict, storage, update_storage):
"""Update the covariates output."""
old_covariates = await _load_table_from_storage(
old_covariates = await load_table_from_storage(
"create_final_covariates.parquet", storage
)
delta_covariates = dataframe_dict["create_final_covariates"]
@ -235,7 +235,7 @@ async def _update_text_units(
dataframe_dict, storage, update_storage, entity_id_mapping
):
"""Update the text units output."""
old_text_units = await _load_table_from_storage(
old_text_units = await load_table_from_storage(
"create_final_text_units.parquet", storage
)
delta_text_units = dataframe_dict["create_final_text_units"]
@ -253,7 +253,7 @@ async def _update_text_units(
async def _update_relationships(dataframe_dict, storage, update_storage):
"""Update the relationships output."""
old_relationships = await _load_table_from_storage(
old_relationships = await load_table_from_storage(
"create_final_relationships.parquet", storage
)
delta_relationships = dataframe_dict["create_final_relationships"]
@ -273,7 +273,7 @@ async def _update_entities(
dataframe_dict, storage, update_storage, config, cache, callbacks
):
"""Update Final Entities output."""
old_entities = await _load_table_from_storage(
old_entities = await load_table_from_storage(
"create_final_entities.parquet", storage
)
delta_entities = dataframe_dict["create_final_entities"]
@ -310,7 +310,7 @@ async def _concat_dataframes(name, dataframe_dict, storage, update_storage):
storage : PipelineStorage
The storage used to store the dataframes.
"""
old_df = await _load_table_from_storage(f"{name}.parquet", storage)
old_df = await load_table_from_storage(f"{name}.parquet", storage)
delta_df = dataframe_dict[name]
# Merge the final documents

View File

@ -25,9 +25,6 @@ class CommunityReport(Named):
rank: float | None = 1.0
"""Rank of the report, used for sorting (optional). Higher means more important"""
summary_embedding: list[float] | None = None
"""The semantic (i.e. text) embedding of the report summary (optional)."""
full_content_embedding: list[float] | None = None
"""The semantic (i.e. text) embedding of the full report content (optional)."""
@ -51,8 +48,6 @@ class CommunityReport(Named):
summary_key: str = "summary",
full_content_key: str = "full_content",
rank_key: str = "rank",
summary_embedding_key: str = "summary_embedding",
full_content_embedding_key: str = "full_content_embedding",
attributes_key: str = "attributes",
size_key: str = "size",
period_key: str = "period",
@ -66,8 +61,6 @@ class CommunityReport(Named):
summary=d[summary_key],
full_content=d[full_content_key],
rank=d[rank_key],
summary_embedding=d.get(summary_embedding_key),
full_content_embedding=d.get(full_content_embedding_key),
attributes=d.get(attributes_key),
size=d.get(size_key),
period=d.get(period_key),

View File

@ -22,9 +22,6 @@ class Document(Named):
text: str = ""
"""The raw text content of the document."""
text_embedding: list[float] | None = None
"""The semantic embedding for the document raw content (optional)."""
attributes: dict[str, Any] | None = None
"""A dictionary of structured attributes such as author, etc (optional)."""
@ -37,7 +34,6 @@ class Document(Named):
title_key: str = "title",
type_key: str = "type",
text_key: str = "text",
text_embedding_key: str = "text_embedding",
text_units_key: str = "text_units",
attributes_key: str = "attributes",
) -> "Document":
@ -48,7 +44,6 @@ class Document(Named):
title=d[title_key],
type=d.get(type_key, "text"),
text=d[text_key],
text_embedding=d.get(text_embedding_key),
text_unit_ids=d.get(text_units_key, []),
attributes=d.get(attributes_key),
)

View File

@ -16,9 +16,6 @@ class TextUnit(Identified):
text: str
"""The text of the unit."""
text_embedding: list[float] | None = None
"""The text embedding for the text unit (optional)."""
entity_ids: list[str] | None = None
"""List of entity IDs related to the text unit (optional)."""
@ -44,7 +41,6 @@ class TextUnit(Identified):
id_key: str = "id",
short_id_key: str = "human_readable_id",
text_key: str = "text",
text_embedding_key: str = "text_embedding",
entities_key: str = "entity_ids",
relationships_key: str = "relationship_ids",
covariates_key: str = "covariate_ids",
@ -57,7 +53,6 @@ class TextUnit(Identified):
id=d[id_key],
short_id=d.get(short_id_key),
text=d[text_key],
text_embedding=d.get(text_embedding_key),
entity_ids=d.get(entities_key),
relationship_ids=d.get(relationships_key),
covariate_ids=d.get(covariates_key),

View File

@ -63,17 +63,10 @@ def read_indexer_covariates(final_covariates: pd.DataFrame) -> list[Covariate]:
def read_indexer_relationships(final_relationships: pd.DataFrame) -> list[Relationship]:
"""Read in the Relationships from the raw indexing outputs."""
# rank is for back-compat with older indexes
# TODO: remove for 1.0
rank_col = (
"combined_degree"
if "combined_degree" in final_relationships.columns
else "rank"
)
return read_relationships(
df=final_relationships,
short_id_col="human_readable_id",
rank_col=rank_col,
rank_col="combined_degree",
description_embedding_col=None,
attributes_cols=None,
)
@ -106,10 +99,6 @@ def read_indexer_reports(
nodes_df = nodes_df.groupby(["title"]).agg({"community": "max"}).reset_index()
filtered_community_df = nodes_df["community"].drop_duplicates()
# todo: pre 1.0 back-compat where community was a string
reports_df.loc[:, "community"] = reports_df["community"].fillna(-1)
reports_df.loc[:, "community"] = reports_df["community"].astype(int)
reports_df = reports_df.merge(
filtered_community_df, on="community", how="inner"
)
@ -127,7 +116,6 @@ def read_indexer_reports(
df=reports_df,
id_col="id",
short_id_col="community",
summary_embedding_col=None,
content_embedding_col=content_embedding_col,
)
@ -155,10 +143,6 @@ def read_indexer_entities(
nodes_df = cast("pd.DataFrame", nodes_df[["id", "degree", "community"]])
nodes_df["community"] = nodes_df["community"].fillna(-1)
nodes_df["community"] = nodes_df["community"].astype(int)
nodes_df["degree"] = nodes_df["degree"].astype(int)
# group entities by id and degree and remove duplicated community IDs
nodes_df = nodes_df.groupby(["id", "degree"]).agg({"community": set}).reset_index()
nodes_df["community"] = nodes_df["community"].apply(lambda x: [str(i) for i in x])
@ -166,10 +150,6 @@ def read_indexer_entities(
subset=["id"]
)
# todo: pre 1.0 back-compat where title was name
if "title" not in final_df.columns:
final_df["title"] = final_df["name"]
# read entity dataframe to knowledge model objects
return read_entities(
df=final_df,
@ -199,10 +179,6 @@ def read_indexer_communities(
nodes_df = final_nodes
reports_df = final_community_reports
# todo: pre 1.0 back-compat!
if "community" not in communities_df.columns:
communities_df["community"] = communities_df["id"]
# ensure communities matches community reports
missing_reports = communities_df[
~communities_df.community.isin(reports_df.community.unique())

View File

@ -19,7 +19,6 @@ from graphrag.query.input.loaders.utils import (
to_optional_str,
to_str,
)
from graphrag.vector_stores.base import BaseVectorStore, VectorStoreDocument
def read_entities(
@ -62,50 +61,6 @@ def read_entities(
return entities
def store_entity_semantic_embeddings(
entities: list[Entity],
vectorstore: BaseVectorStore,
) -> BaseVectorStore:
"""Store entity semantic embeddings in a vectorstore."""
documents = [
VectorStoreDocument(
id=entity.id,
text=entity.description,
vector=entity.description_embedding,
attributes=(
{"title": entity.title, **entity.attributes}
if entity.attributes
else {"title": entity.title}
),
)
for entity in entities
]
vectorstore.load_documents(documents=documents)
return vectorstore
def store_reports_semantic_embeddings(
reports: list[CommunityReport],
vectorstore: BaseVectorStore,
) -> BaseVectorStore:
"""Store entity semantic embeddings in a vectorstore."""
documents = [
VectorStoreDocument(
id=report.id,
text=report.full_content,
vector=report.full_content_embedding,
attributes=(
{"title": report.title, **report.attributes}
if report.attributes
else {"title": report.title}
),
)
for report in reports
]
vectorstore.load_documents(documents=documents)
return vectorstore
def read_relationships(
df: pd.DataFrame,
id_col: str = "id",
@ -219,7 +174,6 @@ def read_community_reports(
summary_col: str = "summary",
content_col: str = "full_content",
rank_col: str | None = "rank",
summary_embedding_col: str | None = "summary_embedding",
content_embedding_col: str | None = "full_content_embedding",
attributes_cols: list[str] | None = None,
) -> list[CommunityReport]:
@ -234,9 +188,6 @@ def read_community_reports(
summary=to_str(row, summary_col),
full_content=to_str(row, content_col),
rank=to_optional_float(row, rank_col),
summary_embedding=to_optional_list(
row, summary_embedding_col, item_type=float
),
full_content_embedding=to_optional_list(
row, content_embedding_col, item_type=float
),
@ -259,7 +210,6 @@ def read_text_units(
covariates_col: str | None = "covariate_ids",
tokens_col: str | None = "n_tokens",
document_ids_col: str | None = "document_ids",
embedding_col: str | None = "text_embedding",
attributes_cols: list[str] | None = None,
) -> list[TextUnit]:
"""Read text units from a dataframe."""
@ -274,7 +224,6 @@ def read_text_units(
covariate_ids=to_optional_dict(
row, covariates_col, key_type=str, value_type=str
),
text_embedding=to_optional_list(row, embedding_col, item_type=float), # type: ignore
n_tokens=to_optional_int(row, tokens_col),
document_ids=to_optional_list(row, document_ids_col, item_type=str),
attributes=(

View File

@ -13,7 +13,8 @@ from graphrag.storage.pipeline_storage import PipelineStorage
log = logging.getLogger(__name__)
async def _load_table_from_storage(name: str, storage: PipelineStorage) -> pd.DataFrame:
async def load_table_from_storage(name: str, storage: PipelineStorage) -> pd.DataFrame:
"""Load a parquet from the storage instance."""
if not await storage.has(name):
msg = f"Could not find {name} in storage!"
raise ValueError(msg)
@ -23,3 +24,10 @@ async def _load_table_from_storage(name: str, storage: PipelineStorage) -> pd.Da
except Exception:
log.exception("error loading table from storage: %s", name)
raise
async def write_table_to_storage(
table: pd.DataFrame, name: str, storage: PipelineStorage
) -> None:
"""Write a table to storage."""
await storage.set(name, table.to_parquet())

View File

@ -1,36 +1,16 @@
# GraphRAG Data Model and Config Breaking Changes
As we worked toward a cleaner codebase, data model, and configuration for the v1 release, we made a few changes that can break older indexes. During the development process we left shims in place to account for these changes, so that all old indexes will work up until v1.0. However, with the release of 1.0 we are removing these shims to allow the codebase to move forward without the legacy code elements. This should be a fairly painless process for most users: because we aggressively use a cache for LLM calls, re-running an index over the top of a previous one should be very low (or no) cost. Therefore, our standard migration recommendation is as follows:
As we worked toward a cleaner codebase, data model, and configuration for the v1 release, we made a few changes that can break older indexes. During the development process we left shims in place to account for these changes, so that all old indexes will work up until v1.0. However, with the release of 1.0 we are removing these shims to allow the codebase to move forward without the legacy code elements. We are providing a migration notebook so this process should be fairly painless for most users:
1. Rename or move your settings.yml file to back it up.
2. Re-run `graphrag init` to generate a new default settings.yml.
3. Open your old settings.yml and copy any critical settings that you changed. For most people this is likely only the LLM and embedding config.
4. Re-run `graphrag index`. This will re-execute the standard pipeline, using the cache for any LLM calls that it can. The output parquet tables will be in the latest format.
4. Run the notebook here: [./docs/examples_notebooks/index_migration.ipynb]()
Note that one of the new requirements is that we write embeddings to a vector store during indexing. By default, this uses a local lancedb instance. When you re-generate the default config, a block will be added to reflect this. If you need to write to Azure AI Search instead, we recommend updating these settings before you index, so you don't need to do a separate vector ingest.
All of the breaking changes listed below are accounted for in the four steps above.
## What if I don't have a cache available?
If you no longer have your original GraphRAG cache, you can manually update your index. The most important aspect is ensuring that you have the required embeddings stored. If you already have your embeddings in a vector store, much of this can be avoided.
Parquet changes:
- The `create_final_entities.name` field has been renamed to `create_final_entities.title` for consistency with the other tables. Use your parquet editor of choice to fix this.
- The `create_final_communities.id` field has been renamed to `create_final_communities.community` so that `id` can be repurposed for a UUID like the other tables. Use your parquet editor of choice to copy and rename this. You can copy it to leave the `id` field in place, or use a tool such as pandas to give each community a new UUID in the `id` field. (We join on the `community` field internally, so `id` can be effectively ignored).
Embeddings changes:
- For Local Search, you need to have the entity.description embeddings in a vector store
- For DRIFT Search, you need the community.full_content embeddings in a vector store
- If you are only using Global search, you do not need any embeddings
The easiest way to get both of those is to run the pipeline with all workflows skipped except for `generate_embeddings`, which will embed those fields and write them to a vector store directly. Using a newer config file that has the embeddings.vector_store block:
- Set the `skip_workflows` value to [create_base_entity_graph, create_base_text_units, create_final_text_units, create_final_community_reports, create_final_nodes, create_final_relationships, create_final_documents, create_final_covariates, create_final_entities, create_final_communities]
- Re-run `graphrag index`
What this does is run the pipeline, but skip over all of the usual artifact generation - the only workflow that is not skipped is the one that generates all default (or otherwise configured) embeddings.
## Updated data model
- We have streamlined the data model of the index in a few small ways to align tables more consistently and remove redundant content. Notably: