mirror of
https://github.com/microsoft/graphrag.git
synced 2026-02-11 21:43:56 +08:00
* Load default config in verb tests * Load proper workflow config * Collapse text unit pre-embedding steps * Format * Update smoke tests * Semver * Format * Merge join* subflows into create_final_text_units * Remove join_text_units_to_covariate_ids * Format * Remove join_text_units_to_entity_ids * Remove join_text_units_to_relationship_ids * Clean up merges and aggregations * Remove unnecessary cast
75 lines
2.5 KiB
Python
75 lines
2.5 KiB
Python
# Copyright (c) 2024 Microsoft Corporation.
|
|
# Licensed under the MIT License
|
|
|
|
from typing import cast
|
|
|
|
import pandas as pd
|
|
from datashaper import Workflow
|
|
|
|
from graphrag.config import create_graphrag_config
|
|
from graphrag.index import (
|
|
PipelineWorkflowConfig,
|
|
PipelineWorkflowStep,
|
|
create_pipeline_config,
|
|
)
|
|
|
|
|
|
def load_input_tables(inputs: list[str]) -> dict[str, pd.DataFrame]:
|
|
"""Harvest all the referenced input IDs from the workflow being tested and pass them here."""
|
|
# stick all the inputs in a map - Workflow looks them up by name
|
|
input_tables: dict[str, pd.DataFrame] = {}
|
|
for input in inputs:
|
|
# remove the workflow: prefix if it exists, because that is not part of the actual table filename
|
|
name = input.replace("workflow:", "")
|
|
input_tables[input] = pd.read_parquet(f"tests/verbs/data/{name}.parquet")
|
|
return input_tables
|
|
|
|
|
|
def load_expected(output: str) -> pd.DataFrame:
|
|
"""Pass in the workflow output (generally the workflow name)"""
|
|
return pd.read_parquet(f"tests/verbs/data/{output}.parquet")
|
|
|
|
|
|
def get_config_for_workflow(name: str) -> PipelineWorkflowConfig:
|
|
"""Instantiates the bare minimum config to get a default workflow config for testing."""
|
|
config = create_graphrag_config()
|
|
pipeline_config = create_pipeline_config(config)
|
|
print(pipeline_config.workflows)
|
|
result = next(conf for conf in pipeline_config.workflows if conf.name == name)
|
|
return cast(PipelineWorkflowConfig, result.config)
|
|
|
|
|
|
async def get_workflow_output(
|
|
input_tables: dict[str, pd.DataFrame], schema: dict
|
|
) -> pd.DataFrame:
|
|
"""Pass in the input tables, the schema, and the output name"""
|
|
|
|
# the bare minimum workflow is the pipeline schema and table context
|
|
workflow = Workflow(
|
|
schema=schema,
|
|
input_tables=input_tables,
|
|
)
|
|
|
|
await workflow.run()
|
|
|
|
# if there's only one output, it is the default here, no name required
|
|
return cast(pd.DataFrame, workflow.output())
|
|
|
|
|
|
def compare_outputs(actual: pd.DataFrame, expected: pd.DataFrame) -> None:
|
|
try:
|
|
assert actual.shape == expected.shape
|
|
assert (actual.columns == expected.columns).all()
|
|
except AssertionError:
|
|
print("Expected:")
|
|
print(expected.head())
|
|
print("Actual:")
|
|
print(actual.head())
|
|
raise AssertionError from None
|
|
|
|
|
|
def remove_disabled_steps(
|
|
steps: list[PipelineWorkflowStep],
|
|
) -> list[PipelineWorkflowStep]:
|
|
return [step for step in steps if step.get("enabled", True)]
|