mirror of
https://github.com/microsoft/graphrag.git
synced 2026-01-14 00:57:23 +08:00
fix
This commit is contained in:
parent
3dcdd3ce53
commit
eff3588592
@ -106,7 +106,7 @@ These settings control the data input used by the pipeline. Any settings with a
|
||||
| `GRAPHRAG_INPUT_TYPE` | The input storage type to use when reading files. (`file` or `blob`) | `str` | optional | `file` |
|
||||
| `GRAPHRAG_INPUT_FILE_PATTERN` | The file pattern regexp to use when reading input files from the input directory. | `str` | optional | `.*\.txt$` |
|
||||
| `GRAPHRAG_INPUT_TEXT_COLUMN` | The 'text' column to use when reading CSV input files. | `str` | optional | `text` |
|
||||
| `GRAPHRAG_INPUT_DOCUMENT_METADATA` | A list of CSV columns, comma-separated, to incorporate as metadata with each chunk. | `str` | optional | `id` |
|
||||
| `GRAPHRAG_INPUT_DOCUMENT_METADATA` | A list of CSV columns, comma-separated, to incorporate as metadata with each chunk. | `str` | optional | `None` |
|
||||
| `GRAPHRAG_INPUT_STORAGE_ACCOUNT_BLOB_URL` | The Azure Storage blob endpoint to use when in `blob` mode and using managed identity. Will have the format `https://<storage_account_name>.blob.core.windows.net` | `str` | optional | `None` |
|
||||
| `GRAPHRAG_INPUT_CONNECTION_STRING` | The connection string to use when reading CSV input files from Azure Blob Storage. | `str` | optional | `None` |
|
||||
| `GRAPHRAG_INPUT_CONTAINER_NAME` | The container name to use when reading CSV input files from Azure Blob Storage. | `str` | optional | `None` |
|
||||
|
||||
@ -93,7 +93,7 @@ This is the base LLM configuration section. Other steps may override this config
|
||||
- `file_pattern` **str** - A regex to match input files. Default is `.*\.csv$` if in csv mode and `.*\.txt$` if in text mode.
|
||||
- `file_filter` **dict** - Key/value pairs to filter. Default is None.
|
||||
- `text_column` **str** - (CSV Mode Only) The text column name.
|
||||
- `metadata` **list[str]** - (CSV Mode Only) The additional document attributes to include.
|
||||
- `metadata` **list[str]** - The additional document attributes to include when chunking.
|
||||
|
||||
### chunks
|
||||
|
||||
|
||||
@ -43,6 +43,6 @@ class InputConfig(BaseModel):
|
||||
text_column: str = Field(
|
||||
description="The input text column to use.", default=defs.INPUT_TEXT_COLUMN
|
||||
)
|
||||
metadata: list[str] = Field(
|
||||
description="The document metadata to use with each chunk.", default=[]
|
||||
metadata: list[str] | None = Field(
|
||||
description="The document metadata to use with each chunk.", default=None
|
||||
)
|
||||
|
||||
@ -59,7 +59,7 @@ def create_base_text_units(
|
||||
encoding_model=encoding_model,
|
||||
strategy=strategy,
|
||||
callbacks=callbacks,
|
||||
metadata=metadata or [],
|
||||
metadata=metadata,
|
||||
line_delimiter=line_delimiter,
|
||||
)
|
||||
|
||||
|
||||
@ -9,6 +9,7 @@ import pandas as pd
|
||||
def create_final_documents(
|
||||
documents: pd.DataFrame,
|
||||
text_units: pd.DataFrame,
|
||||
metadata: list[str] | None,
|
||||
) -> pd.DataFrame:
|
||||
"""All the steps to transform final documents."""
|
||||
exploded = (
|
||||
@ -53,6 +54,9 @@ def create_final_documents(
|
||||
"text",
|
||||
"text_unit_ids",
|
||||
]
|
||||
if metadata:
|
||||
core_columns.extend(metadata)
|
||||
|
||||
final_columns = [column for column in core_columns if column in rejoined.columns]
|
||||
|
||||
return rejoined.loc[:, final_columns]
|
||||
return rejoined.loc[:, list(set(final_columns))]
|
||||
|
||||
@ -50,6 +50,19 @@ async def load(
|
||||
)
|
||||
else:
|
||||
data["text"] = data.apply(lambda x: x[config.text_column], axis=1)
|
||||
|
||||
if config.metadata is not None:
|
||||
for metadata in config.metadata:
|
||||
if metadata not in data.columns:
|
||||
log.warning(
|
||||
"metadata column %s not found in csv file %s",
|
||||
metadata,
|
||||
path,
|
||||
)
|
||||
else:
|
||||
data[metadata] = data.apply(
|
||||
lambda x, metadata=metadata: x[metadata], axis=1
|
||||
)
|
||||
return data
|
||||
|
||||
file_pattern = (
|
||||
|
||||
@ -5,7 +5,6 @@
|
||||
|
||||
import logging
|
||||
import re
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
@ -146,12 +146,6 @@ async def _text_embed_with_vector_store(
|
||||
if embed_column not in input.columns:
|
||||
msg = f"Column {embed_column} not found in input dataframe with columns {input.columns}"
|
||||
raise ValueError(msg)
|
||||
title = embed_column
|
||||
if title not in input.columns:
|
||||
msg = (
|
||||
f"Column {title} not found in input dataframe with columns {input.columns}"
|
||||
)
|
||||
raise ValueError(msg)
|
||||
if id_column not in input.columns:
|
||||
msg = f"Column {id_column} not found in input dataframe with columns {input.columns}"
|
||||
raise ValueError(msg)
|
||||
|
||||
@ -27,8 +27,9 @@ async def run_workflow(
|
||||
"create_base_text_units", context.storage
|
||||
)
|
||||
|
||||
metadata = config.input.metadata
|
||||
input = config.input
|
||||
output = create_final_documents(documents, text_units)
|
||||
output = create_final_documents(documents, text_units, metadata)
|
||||
|
||||
await write_table_to_storage(output, workflow_name, context.storage)
|
||||
|
||||
|
||||
@ -8,6 +8,7 @@ import os
|
||||
import re
|
||||
import shutil
|
||||
from collections.abc import Iterator
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, cast
|
||||
|
||||
@ -120,7 +121,7 @@ class FilePipelineStorage(PipelineStorage):
|
||||
|
||||
def get_creation_date(self, key: str) -> str:
|
||||
"""Get the creation date of a file."""
|
||||
return str(Path(join_path(self._root_dir, key)).stat().st_ctime)
|
||||
return str(get_creation_time_with_local_tz(self._root_dir, key))
|
||||
|
||||
async def has(self, key: str) -> bool:
|
||||
"""Has method definition."""
|
||||
@ -170,3 +171,16 @@ def _create_progress_status(
|
||||
completed_items=num_loaded + num_filtered,
|
||||
description=f"{num_loaded} files loaded ({num_filtered} filtered)",
|
||||
)
|
||||
|
||||
|
||||
def get_creation_time_with_local_tz(root_dir, key):
|
||||
"""Get the creation time of a file with the local timezone."""
|
||||
file_path = Path(join_path(root_dir, key))
|
||||
|
||||
creation_timestamp = file_path.stat().st_ctime
|
||||
|
||||
creation_time_utc = datetime.fromtimestamp(creation_timestamp, tz=timezone.utc)
|
||||
|
||||
creation_time_local = creation_time_utc.astimezone()
|
||||
|
||||
return creation_time_local.strftime("%Y-%m-%d %H:%M:%S %Z")
|
||||
@ -44,8 +44,7 @@ async def test_create_final_documents_with_attribute_columns():
|
||||
storage=["create_base_text_units"],
|
||||
)
|
||||
|
||||
config = create_graphrag_config()
|
||||
config.input.metadata = ["title"]
|
||||
config = create_graphrag_config({"models": DEFAULT_MODEL_CONFIG})
|
||||
|
||||
await run_workflow(
|
||||
config,
|
||||
@ -59,8 +58,10 @@ async def test_create_final_documents_with_attribute_columns():
|
||||
# our test dataframe does not have attributes, so we'll assert without it
|
||||
# and separately confirm it is in the output
|
||||
compare_outputs(
|
||||
actual, expected, columns=["id", "human_readable_id", "text", "text_unit_ids"]
|
||||
actual,
|
||||
expected,
|
||||
columns=["id", "human_readable_id", "text", "text_unit_ids", "title"],
|
||||
)
|
||||
assert len(actual.columns) == 5
|
||||
assert "title" not in actual.columns
|
||||
assert "attributes" in actual.columns
|
||||
assert "title" in actual.columns
|
||||
assert "attributes" not in actual.columns
|
||||
|
||||
Loading…
Reference in New Issue
Block a user