Update index API + a notebook that provides a general API overview (#1454)

* update index api to accept callbacks

* fix hardcoded folder name that was creating an empty folder

* add API notebook

* add semversioner file

* filename change

---------

Co-authored-by: Alonso Guevara <alonsog@microsoft.com>
This commit is contained in:
Josh Bradley 2024-12-05 16:34:21 -05:00 committed by GitHub
parent 10f84c91eb
commit b00142260d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 237 additions and 21 deletions

View File

@ -0,0 +1,4 @@
{
"type": "patch",
"description": "update API and add a demonstration notebook"
}

View File

@ -0,0 +1,209 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Copyright (c) 2024 Microsoft Corporation.\n",
"# Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## API Overview\n",
"\n",
"This notebook provides a demonstration of how to interact with graphrag as a library using the API as opposed to the CLI. Note that graphrag's CLI actually connects to the library through this API for all operations. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import graphrag.api as api\n",
"from graphrag.index.typing import PipelineRunResult"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prerequisite\n",
"As a prerequisite to all API operations, a `GraphRagConfig` object is required. It is the primary means to control the behavior of graphrag and can be instantiated from a `settings.yaml` configuration file.\n",
"\n",
"Please refer to the [CLI docs](https://microsoft.github.io/graphrag/cli/#init) for more detailed information on how to generate the `settings.yaml` file.\n",
"\n",
"#### Load `settings.yaml` configuration"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import yaml\n",
"\n",
"settings = yaml.safe_load(open(\"<project_directory>/settings.yaml\")) # noqa: PTH123, SIM115"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"At this point, you can modify the imported settings to align with your application's requirements. For example, if building a UI application, the application might need to change the input and/or storage destinations dynamically in order to enable users to build and query different indexes."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Generate a `GraphRagConfig` object"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from graphrag.config.create_graphrag_config import create_graphrag_config\n",
"\n",
"graphrag_config = create_graphrag_config(\n",
" values=settings, root_dir=\"<project_directory>\"\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Indexing API\n",
"\n",
"*Indexing* is the process of ingesting raw text data and constructing a knowledge graph. GraphRAG currently supports plaintext (`.txt`) and `.csv` file formats."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Build an index"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"index_result: list[PipelineRunResult] = await api.build_index(config=graphrag_config)\n",
"\n",
"# index_result is a list of workflows that make up the indexing pipeline that was run\n",
"for workflow_result in index_result:\n",
" status = f\"error\\n{workflow_result.errors}\" if workflow_result.errors else \"success\"\n",
" print(f\"Workflow Name: {workflow_result.workflow}\\tStatus: {status}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Query an index\n",
"\n",
"To query an index, several index files must first be read into memory and passed to the query API. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"final_nodes = pd.read_parquet(\"<project_directory>/output/create_final_nodes.parquet\")\n",
"final_entities = pd.read_parquet(\n",
" \"<project_directory>/output/create_final_entities.parquet\"\n",
")\n",
"final_communities = pd.read_parquet(\n",
" \"<project_directory>/output/create_final_communities.parquet\"\n",
")\n",
"final_community_reports = pd.read_parquet(\n",
" \"<project_directory>/output/create_final_community_reports.parquet\"\n",
")\n",
"\n",
"response, context = await api.global_search(\n",
" config=graphrag_config,\n",
" nodes=final_nodes,\n",
" entities=final_entities,\n",
" communities=final_communities,\n",
" community_reports=final_community_reports,\n",
" community_level=2,\n",
" dynamic_community_selection=False,\n",
" response_type=\"Multiple Paragraphs\",\n",
" query=\"Who is Scrooge and what are his main relationships?\",\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The response object is the official reponse from graphrag while the context object holds various metadata regarding the querying process used to obtain the final response."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(response)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Digging into the context a bit more provides users with extremely granular information such as what sources of data (down to the level of text chunks) were ultimately retrieved and used as part of the context sent to the LLM model)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pprint import pprint\n",
"\n",
"pprint(context) # noqa: T203"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "graphrag-venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.15"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -10,7 +10,10 @@ Backwards compatibility is not guaranteed at this time.
from pathlib import Path
from datashaper import WorkflowCallbacks
from graphrag.cache.noop_pipeline_cache import NoopPipelineCache
from graphrag.callbacks.factory import create_pipeline_reporter
from graphrag.config.enums import CacheType
from graphrag.config.models.graph_rag_config import GraphRagConfig
from graphrag.index.create_pipeline_config import create_pipeline_config
@ -25,6 +28,7 @@ async def build_index(
run_id: str = "",
is_resume_run: bool = False,
memory_profile: bool = False,
callbacks: list[WorkflowCallbacks] | None = None,
progress_reporter: ProgressReporter | None = None,
) -> list[PipelineRunResult]:
"""Run the pipeline with the given configuration.
@ -37,10 +41,10 @@ async def build_index(
The run id. Creates a output directory with this name.
is_resume_run : bool default=False
Whether to resume a previous index run.
is_update_run : bool default=False
Whether to update a previous index run.
memory_profile : bool
Whether to enable memory profiling.
callbacks : list[WorkflowCallbacks] | None default=None
A list of callbacks to register.
progress_reporter : ProgressReporter | None default=None
The progress reporter.
@ -61,12 +65,17 @@ async def build_index(
pipeline_cache = (
NoopPipelineCache() if config.cache.type == CacheType.none is None else None
)
# TODO: remove the type ignore once the new config engine has been refactored
callbacks = (
[create_pipeline_reporter(config.reporting, None)] if config.reporting else None # type: ignore
) # type: ignore
outputs: list[PipelineRunResult] = []
async for output in run_pipeline_with_config(
pipeline_config,
run_id=run_id,
memory_profile=memory_profile,
cache=pipeline_cache,
callbacks=callbacks,
progress_reporter=progress_reporter,
is_resume_run=is_resume_run,
is_update_run=is_update_run,

View File

@ -17,7 +17,6 @@ from datashaper import NoopVerbCallbacks, WorkflowCallbacks
from graphrag.cache.factory import create_cache
from graphrag.cache.pipeline_cache import PipelineCache
from graphrag.callbacks.console_workflow_callbacks import ConsoleWorkflowCallbacks
from graphrag.callbacks.factory import create_pipeline_reporter
from graphrag.index.config.cache import PipelineMemoryCacheConfig
from graphrag.index.config.pipeline import (
PipelineConfig,
@ -67,7 +66,7 @@ async def run_pipeline_with_config(
storage: PipelineStorage | None = None,
update_index_storage: PipelineStorage | None = None,
cache: PipelineCache | None = None,
callbacks: WorkflowCallbacks | None = None,
callbacks: list[WorkflowCallbacks] | None = None,
progress_reporter: ProgressReporter | None = None,
input_post_process_steps: list[PipelineWorkflowStep] | None = None,
additional_verbs: VerbDefinitions | None = None,
@ -107,6 +106,7 @@ async def run_pipeline_with_config(
storage = storage = create_storage(config.storage) # type: ignore
if is_update_run:
# TODO: remove the default choice (PipelineFileStorageConfig) once the new config system enforces a correct update-index-storage config when used.
update_index_storage = update_index_storage or create_storage(
config.update_index_storage
or PipelineFileStorageConfig(base_dir=str(Path(root_dir) / "output"))
@ -114,11 +114,6 @@ async def run_pipeline_with_config(
# TODO: remove the default choice (PipelineMemoryCacheConfig) when the new config system guarantees the existence of a cache config
cache = cache or create_cache(config.cache or PipelineMemoryCacheConfig(), root_dir)
callbacks = (
create_pipeline_reporter(config.reporting, root_dir)
if config.reporting
else None
)
# TODO: remove the type ignore when the new config system guarantees the existence of an input config
dataset = (
dataset
@ -195,7 +190,7 @@ async def run_pipeline(
dataset: pd.DataFrame,
storage: PipelineStorage | None = None,
cache: PipelineCache | None = None,
callbacks: WorkflowCallbacks | None = None,
callbacks: list[WorkflowCallbacks] | None = None,
progress_reporter: ProgressReporter | None = None,
input_post_process_steps: list[PipelineWorkflowStep] | None = None,
additional_verbs: VerbDefinitions | None = None,
@ -226,13 +221,12 @@ async def run_pipeline(
start_time = time.time()
progress_reporter = progress_reporter or NullProgressReporter()
callbacks = callbacks or ConsoleWorkflowCallbacks()
callbacks = _create_callback_chain(callbacks, progress_reporter)
callbacks = callbacks or [ConsoleWorkflowCallbacks()]
callback_chain = _create_callback_chain(callbacks, progress_reporter)
context = create_run_context(storage=storage, cache=cache, stats=None)
exporter = ParquetExporter(
context.storage,
lambda e, s, d: cast(WorkflowCallbacks, callbacks).on_error(
lambda e, s, d: cast(WorkflowCallbacks, callback_chain).on_error(
"Error exporting table", e, s, d
),
)
@ -246,7 +240,7 @@ async def run_pipeline(
workflows_to_run = loaded_workflows.workflows
workflow_dependencies = loaded_workflows.dependencies
dataset = await _run_post_process_steps(
input_post_process_steps, dataset, context, callbacks
input_post_process_steps, dataset, context, callback_chain
)
# ensure the incoming data is valid
@ -266,7 +260,7 @@ async def run_pipeline(
result = await _process_workflow(
workflow_to_run.workflow,
context,
callbacks,
callback_chain,
exporter,
workflow_dependencies,
dataset,

View File

@ -68,12 +68,12 @@ async def _export_workflow_output(
def _create_callback_chain(
callbacks: WorkflowCallbacks | None, progress: ProgressReporter | None
callbacks: list[WorkflowCallbacks] | None, progress: ProgressReporter | None
) -> WorkflowCallbacks:
"""Create a callbacks manager."""
"""Create a callback manager that encompasses multiple callbacks."""
manager = WorkflowCallbacksManager()
if callbacks is not None:
manager.register(callbacks)
for callback in callbacks or []:
manager.register(callback)
if progress is not None:
manager.register(ProgressWorkflowCallbacks(progress))
return manager

View File

@ -18,7 +18,7 @@ class MemoryPipelineStorage(FilePipelineStorage):
def __init__(self):
"""Init method definition."""
super().__init__(root_dir=".output")
super().__init__()
self._storage = {}
async def get(