diff --git a/.semversioner/next-release/patch-20240916191422408337.json b/.semversioner/next-release/patch-20240916191422408337.json new file mode 100644 index 00000000..d6b4b4d3 --- /dev/null +++ b/.semversioner/next-release/patch-20240916191422408337.json @@ -0,0 +1,4 @@ +{ + "type": "patch", + "description": "Covariate verb collapse." +} diff --git a/graphrag/index/workflows/v1/join_text_units_to_covariate_ids.py b/graphrag/index/workflows/v1/join_text_units_to_covariate_ids.py index be6bddf1..eefa0eb7 100644 --- a/graphrag/index/workflows/v1/join_text_units_to_covariate_ids.py +++ b/graphrag/index/workflows/v1/join_text_units_to_covariate_ids.py @@ -19,15 +19,11 @@ def build_steps( """ return [ { - "verb": "select", - "args": {"columns": ["id", "text_unit_id"]}, - "input": {"source": "workflow:create_final_covariates"}, - }, - { - "verb": "aggregate_override", + "verb": "join_text_units_to_covariate_ids", "args": { - "groupby": ["text_unit_id"], - "aggregations": [ + "select_columns": ["id", "text_unit_id"], + "aggregate_groupby": ["text_unit_id"], + "aggregate_aggregations": [ { "column": "id", "operation": "array_agg_distinct", @@ -40,5 +36,6 @@ def build_steps( }, ], }, + "input": {"source": "workflow:create_final_covariates"}, }, ] diff --git a/graphrag/index/workflows/v1/join_text_units_to_entity_ids.py b/graphrag/index/workflows/v1/join_text_units_to_entity_ids.py index 3cde93d2..6d1fdf18 100644 --- a/graphrag/index/workflows/v1/join_text_units_to_entity_ids.py +++ b/graphrag/index/workflows/v1/join_text_units_to_entity_ids.py @@ -19,7 +19,7 @@ def build_steps( """ return [ { - "verb": "join_text_units", + "verb": "join_text_units_to_entity_ids", "args": { "select_columns": ["id", "text_unit_ids"], "unroll_column": "text_unit_ids", diff --git a/graphrag/index/workflows/v1/join_text_units_to_relationship_ids.py b/graphrag/index/workflows/v1/join_text_units_to_relationship_ids.py index 2ffde036..b0b7d8b1 100644 --- a/graphrag/index/workflows/v1/join_text_units_to_relationship_ids.py +++ b/graphrag/index/workflows/v1/join_text_units_to_relationship_ids.py @@ -19,7 +19,7 @@ def build_steps( """ return [ { - "verb": "join_text_units", + "verb": "join_text_units_to_relationship_ids", "args": { "select_columns": ["id", "text_unit_ids"], "unroll_column": "text_unit_ids", diff --git a/graphrag/index/workflows/v1/subflows/__init__.py b/graphrag/index/workflows/v1/subflows/__init__.py index f823079b..a7ba94cc 100644 --- a/graphrag/index/workflows/v1/subflows/__init__.py +++ b/graphrag/index/workflows/v1/subflows/__init__.py @@ -3,8 +3,12 @@ """The Indexing Engine workflows -> subflows package root.""" -from .join_text_units import join_text_units +from .join_text_units_to_covariate_ids import join_text_units_to_covariate_ids +from .join_text_units_to_entity_ids import join_text_units_to_entity_ids +from .join_text_units_to_relationship_ids import join_text_units_to_relationship_ids __all__ = [ - "join_text_units", + "join_text_units_to_covariate_ids", + "join_text_units_to_entity_ids", + "join_text_units_to_relationship_ids", ] diff --git a/graphrag/index/workflows/v1/subflows/join_text_units_to_covariate_ids.py b/graphrag/index/workflows/v1/subflows/join_text_units_to_covariate_ids.py new file mode 100644 index 00000000..e7724c33 --- /dev/null +++ b/graphrag/index/workflows/v1/subflows/join_text_units_to_covariate_ids.py @@ -0,0 +1,27 @@ +# Copyright (c) 2024 Microsoft Corporation. +# Licensed under the MIT License + +"""join_text_units_to_covariate_ids verb (subtask).""" + +from typing import Any, cast + +from datashaper.engine.verbs.verb_input import VerbInput +from datashaper.engine.verbs.verbs_mapping import verb +from datashaper.table_store.types import Table, VerbResult, create_verb_result + +from graphrag.index.verbs.overrides.aggregate import aggregate_df + + +@verb(name="join_text_units_to_covariate_ids", treats_input_tables_as_immutable=True) +def join_text_units_to_covariate_ids( + input: VerbInput, + select_columns: list[str], + aggregate_aggregations: list[dict[str, Any]], + aggregate_groupby: list[str] | None = None, + **_kwargs: dict, +) -> VerbResult: + """Subtask to select and unroll items using an id.""" + table = input.get_input() + selected = cast(Table, table[select_columns]) + aggregated = aggregate_df(selected, aggregate_aggregations, aggregate_groupby) + return create_verb_result(aggregated) diff --git a/graphrag/index/workflows/v1/subflows/join_text_units_to_entity_ids.py b/graphrag/index/workflows/v1/subflows/join_text_units_to_entity_ids.py new file mode 100644 index 00000000..98841bd6 --- /dev/null +++ b/graphrag/index/workflows/v1/subflows/join_text_units_to_entity_ids.py @@ -0,0 +1,29 @@ +# Copyright (c) 2024 Microsoft Corporation. +# Licensed under the MIT License + +"""join_text_units_to_entity_ids verb (subtask).""" + +from typing import Any, cast + +from datashaper.engine.verbs.verb_input import VerbInput +from datashaper.engine.verbs.verbs_mapping import verb +from datashaper.table_store.types import Table, VerbResult, create_verb_result + +from graphrag.index.verbs.overrides.aggregate import aggregate_df + + +@verb(name="join_text_units_to_entity_ids", treats_input_tables_as_immutable=True) +def join_text_units_to_entity_ids( + input: VerbInput, + select_columns: list[str], + unroll_column: str, + aggregate_aggregations: list[dict[str, Any]], + aggregate_groupby: list[str] | None = None, + **_kwargs: dict, +) -> VerbResult: + """Subtask to select and unroll items using an id.""" + table = input.get_input() + selected = cast(Table, table[select_columns]) + unrolled = selected.explode(unroll_column).reset_index(drop=True) + aggregated = aggregate_df(unrolled, aggregate_aggregations, aggregate_groupby) + return create_verb_result(aggregated) diff --git a/graphrag/index/workflows/v1/subflows/join_text_units_to_relationship_ids.py b/graphrag/index/workflows/v1/subflows/join_text_units_to_relationship_ids.py new file mode 100644 index 00000000..f7bed85e --- /dev/null +++ b/graphrag/index/workflows/v1/subflows/join_text_units_to_relationship_ids.py @@ -0,0 +1,30 @@ +# Copyright (c) 2024 Microsoft Corporation. +# Licensed under the MIT License + +"""join_text_units_to_relationship_ids verb (subtask).""" + +from typing import Any, cast + +from datashaper.engine.verbs.verb_input import VerbInput +from datashaper.engine.verbs.verbs_mapping import verb +from datashaper.table_store.types import Table, VerbResult, create_verb_result + +from graphrag.index.verbs.overrides.aggregate import aggregate_df + + +@verb(name="join_text_units_to_relationship_ids", treats_input_tables_as_immutable=True) +def join_text_units_to_relationship_ids( + input: VerbInput, + select_columns: list[str], + unroll_column: str, + aggregate_aggregations: list[dict[str, Any]], + aggregate_groupby: list[str] | None = None, + final_select_columns: list[str] | None = None, + **_kwargs: dict, +) -> VerbResult: + """Subtask to select and unroll items using an id.""" + table = input.get_input() + selected = cast(Table, table[select_columns]) + unrolled = selected.explode(unroll_column).reset_index(drop=True) + aggregated = aggregate_df(unrolled, aggregate_aggregations, aggregate_groupby) + return create_verb_result(cast(Table, aggregated[final_select_columns])) diff --git a/tests/fixtures/text/config.json b/tests/fixtures/text/config.json index 8e6d11c6..1121b602 100644 --- a/tests/fixtures/text/config.json +++ b/tests/fixtures/text/config.json @@ -50,7 +50,7 @@ 1, 2000 ], - "subworkflows": 2, + "subworkflows": 1, "max_runtime": 10 }, "create_base_entity_graph": { diff --git a/tests/verbs/data/create_base_entity_graph.parquet b/tests/verbs/data/create_base_entity_graph.parquet index 8da4b60d..8892b2ac 100644 Binary files a/tests/verbs/data/create_base_entity_graph.parquet and b/tests/verbs/data/create_base_entity_graph.parquet differ diff --git a/tests/verbs/data/create_base_extracted_entities.parquet b/tests/verbs/data/create_base_extracted_entities.parquet index ea9e3b0c..d7ec39be 100644 Binary files a/tests/verbs/data/create_base_extracted_entities.parquet and b/tests/verbs/data/create_base_extracted_entities.parquet differ diff --git a/tests/verbs/data/create_final_communities.parquet b/tests/verbs/data/create_final_communities.parquet index 15fca400..201f4457 100644 Binary files a/tests/verbs/data/create_final_communities.parquet and b/tests/verbs/data/create_final_communities.parquet differ diff --git a/tests/verbs/data/create_final_community_reports.parquet b/tests/verbs/data/create_final_community_reports.parquet index 623e7945..fbdc87c0 100644 Binary files a/tests/verbs/data/create_final_community_reports.parquet and b/tests/verbs/data/create_final_community_reports.parquet differ diff --git a/tests/verbs/data/create_final_covariates.parquet b/tests/verbs/data/create_final_covariates.parquet new file mode 100644 index 00000000..6dce9b65 Binary files /dev/null and b/tests/verbs/data/create_final_covariates.parquet differ diff --git a/tests/verbs/data/create_final_entities.parquet b/tests/verbs/data/create_final_entities.parquet index ce044844..caa2d1ac 100644 Binary files a/tests/verbs/data/create_final_entities.parquet and b/tests/verbs/data/create_final_entities.parquet differ diff --git a/tests/verbs/data/create_final_nodes.parquet b/tests/verbs/data/create_final_nodes.parquet index 5722af14..97e3b0f6 100644 Binary files a/tests/verbs/data/create_final_nodes.parquet and b/tests/verbs/data/create_final_nodes.parquet differ diff --git a/tests/verbs/data/create_final_relationships.parquet b/tests/verbs/data/create_final_relationships.parquet index 9d1d5370..2e610dfb 100644 Binary files a/tests/verbs/data/create_final_relationships.parquet and b/tests/verbs/data/create_final_relationships.parquet differ diff --git a/tests/verbs/data/create_final_text_units.parquet b/tests/verbs/data/create_final_text_units.parquet index 36650599..e74bc0b6 100644 Binary files a/tests/verbs/data/create_final_text_units.parquet and b/tests/verbs/data/create_final_text_units.parquet differ diff --git a/tests/verbs/data/create_summarized_entities.parquet b/tests/verbs/data/create_summarized_entities.parquet index e8655428..80e377c4 100644 Binary files a/tests/verbs/data/create_summarized_entities.parquet and b/tests/verbs/data/create_summarized_entities.parquet differ diff --git a/tests/verbs/data/join_text_units_to_covariate_ids.parquet b/tests/verbs/data/join_text_units_to_covariate_ids.parquet new file mode 100644 index 00000000..538fd424 Binary files /dev/null and b/tests/verbs/data/join_text_units_to_covariate_ids.parquet differ diff --git a/tests/verbs/data/join_text_units_to_entity_ids.parquet b/tests/verbs/data/join_text_units_to_entity_ids.parquet index 4aef6a9b..40e63412 100644 Binary files a/tests/verbs/data/join_text_units_to_entity_ids.parquet and b/tests/verbs/data/join_text_units_to_entity_ids.parquet differ diff --git a/tests/verbs/data/join_text_units_to_relationship_ids.parquet b/tests/verbs/data/join_text_units_to_relationship_ids.parquet index 7ab5781c..c5a069f6 100644 Binary files a/tests/verbs/data/join_text_units_to_relationship_ids.parquet and b/tests/verbs/data/join_text_units_to_relationship_ids.parquet differ diff --git a/tests/verbs/test_join_text_units_to_covariate_ids.py b/tests/verbs/test_join_text_units_to_covariate_ids.py new file mode 100644 index 00000000..12ca4d18 --- /dev/null +++ b/tests/verbs/test_join_text_units_to_covariate_ids.py @@ -0,0 +1,22 @@ +# Copyright (c) 2024 Microsoft Corporation. +# Licensed under the MIT License + +from graphrag.index.workflows.v1.join_text_units_to_covariate_ids import build_steps + +from .util import compare_outputs, get_workflow_output, load_expected, load_input_tables + + +async def test_join_text_units_to_covariate_ids(): + input_tables = load_input_tables([ + "workflow:create_final_covariates", + ]) + expected = load_expected("join_text_units_to_covariate_ids") + + actual = await get_workflow_output( + input_tables, + { + "steps": build_steps({}), + }, + ) + + compare_outputs(actual, expected)