graphrag/tests/verbs/util.py
Nathan Evans c02ab0984a
Streamline workflows (#1674)
* Remove create_final_nodes

* Rename final entity output to "entities"

* Remove duplicate code from graph extraction

* Rename create_final_relationships output to "relationships"

* Rename create_final_communities output to "communities"

* Combine compute_communities and create_final_communities

* Rename create_final_covariates output to "covariates"

* Rename create_final_community_reports output to "community_reports"

* Rename create_final_text_units output to "text_units"

* Rename create_final_documents output to "documents"

* Remove transient snapshots config

* Move create_final_entities to finalize_entities operation

* Move create_final_relationships flow to finalize_relationships operation

* Reuse some community report functions

* Collapse most of graph and text unit-based report generation

* Unify schemas files

* Move community reports extractor

* Move NLP report prompt to prompts folder

* Fix a few pandas warnings

* Rename embeddings config to embed_text

* Rename claim_extraction config to extract_claims

* Remove nltk from standard graph extraction

* Fix verb tests

* Fix extract graph config naming

* Fix moved file reference

* Create v1-to-v2 migration notebook

* Semver

* Fix smoke test artifact count

* Raise tpm/rpm on smoke tests

* Update drift settings for smoke tests

* Reuse project directory var in api notebook

* Format

* Format
2025-02-07 11:11:03 -08:00

86 lines
2.8 KiB
Python

# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
import pandas as pd
from pandas.testing import assert_series_equal
import graphrag.config.defaults as defs
from graphrag.index.context import PipelineRunContext
from graphrag.index.run.utils import create_run_context
from graphrag.utils.storage import write_table_to_storage
pd.set_option("display.max_columns", None)
FAKE_API_KEY = "NOT_AN_API_KEY"
DEFAULT_CHAT_MODEL_CONFIG = {
"api_key": FAKE_API_KEY,
"type": defs.LLM_TYPE.value,
"model": defs.LLM_MODEL,
}
DEFAULT_EMBEDDING_MODEL_CONFIG = {
"api_key": FAKE_API_KEY,
"type": defs.EMBEDDING_TYPE.value,
"model": defs.EMBEDDING_MODEL,
}
DEFAULT_MODEL_CONFIG = {
defs.DEFAULT_CHAT_MODEL_ID: DEFAULT_CHAT_MODEL_CONFIG,
defs.DEFAULT_EMBEDDING_MODEL_ID: DEFAULT_EMBEDDING_MODEL_CONFIG,
}
async def create_test_context(storage: list[str] | None = None) -> PipelineRunContext:
"""Create a test context with tables loaded into storage storage."""
context = create_run_context(None, None, None)
# always set the input docs, but since our stored table is final, drop what wouldn't be in the original source input
input = load_test_table("documents")
input.drop(columns=["text_unit_ids"], inplace=True)
await write_table_to_storage(input, "documents", context.storage)
if storage:
for name in storage:
table = load_test_table(name)
# normal storage interface insists on bytes
await write_table_to_storage(table, name, context.storage)
return context
def load_test_table(output: str) -> pd.DataFrame:
"""Pass in the workflow output (generally the workflow name)"""
return pd.read_parquet(f"tests/verbs/data/{output}.parquet")
def compare_outputs(
actual: pd.DataFrame, expected: pd.DataFrame, columns: list[str] | None = None
) -> None:
"""Compare the actual and expected dataframes, optionally specifying columns to compare.
This uses assert_series_equal since we are sometimes intentionally omitting columns from the actual output.
"""
cols = expected.columns if columns is None else columns
assert len(actual) == len(expected), (
f"Expected: {len(expected)} rows, Actual: {len(actual)} rows"
)
for column in cols:
assert column in actual.columns
try:
# dtypes can differ since the test data is read from parquet and our workflow runs in memory
if column != "id": # don't check uuids
assert_series_equal(
actual[column],
expected[column],
check_dtype=False,
check_index=False,
)
except AssertionError:
print("Expected:")
print(expected[column])
print("Actual:")
print(actual[column])
raise