mirror of
https://github.com/microsoft/graphrag.git
synced 2026-01-14 00:57:23 +08:00
Remove pandas from input loading
This commit is contained in:
parent
f066080ef0
commit
2b83d661f9
@ -3,12 +3,11 @@
|
||||
|
||||
"""A module containing 'CSVFileReader' model."""
|
||||
|
||||
import csv
|
||||
import logging
|
||||
from io import BytesIO
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from graphrag.index.input.structured_file_reader import StructuredFileReader
|
||||
from graphrag.index.input.text_document import TextDocument
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -16,7 +15,7 @@ logger = logging.getLogger(__name__)
|
||||
class CSVFileReader(StructuredFileReader):
|
||||
"""Reader implementation for csv files."""
|
||||
|
||||
async def read_file(self, path: str) -> pd.DataFrame:
|
||||
async def read_file(self, path: str) -> list[TextDocument]:
|
||||
"""Read a csv file into a DataFrame of documents.
|
||||
|
||||
Args:
|
||||
@ -26,6 +25,7 @@ class CSVFileReader(StructuredFileReader):
|
||||
-------
|
||||
- output - DataFrame with a row for each document in the file.
|
||||
"""
|
||||
buffer = BytesIO(await self._storage.get(path, as_bytes=True))
|
||||
data = pd.read_csv(buffer, encoding=self._encoding)
|
||||
return await self.process_data_columns(data, path)
|
||||
file = await self._storage.get(path)
|
||||
|
||||
reader = csv.DictReader(file.splitlines())
|
||||
return await self.process_data_columns(list(reader), path)
|
||||
|
||||
@ -8,13 +8,14 @@ from __future__ import annotations
|
||||
import logging
|
||||
import re
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from dataclasses import asdict
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pandas as pd
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from graphrag_storage import Storage
|
||||
|
||||
from graphrag.index.input.text_document import TextDocument
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -51,7 +52,7 @@ class InputReader(metaclass=ABCMeta):
|
||||
|
||||
self._file_pattern = pattern
|
||||
|
||||
async def read_files(self) -> pd.DataFrame:
|
||||
async def read_files(self) -> list[TextDocument]:
|
||||
"""Load files from storage and apply a loader function based on file type. Process metadata on the results if needed."""
|
||||
files = list(self._storage.find(re.compile(self._file_pattern)))
|
||||
if len(files) == 0:
|
||||
@ -59,11 +60,22 @@ class InputReader(metaclass=ABCMeta):
|
||||
logger.warning(msg)
|
||||
files = []
|
||||
|
||||
files_loaded = []
|
||||
documents: list[TextDocument] = []
|
||||
|
||||
for file in files:
|
||||
try:
|
||||
files_loaded.append(await self.read_file(file))
|
||||
file_documents = await self.read_file(file)
|
||||
|
||||
if self._metadata:
|
||||
for document in file_documents:
|
||||
# Collapse the metadata columns into a single JSON object column
|
||||
document.metadata = {
|
||||
k: v
|
||||
for k, v in asdict(document).items()
|
||||
if k in self._metadata
|
||||
}
|
||||
|
||||
documents.extend(file_documents)
|
||||
except Exception as e: # noqa: BLE001 (catching Exception is fine here)
|
||||
logger.warning("Warning! Error loading file %s. Skipping...", file)
|
||||
logger.warning("Error: %s", e)
|
||||
@ -72,38 +84,23 @@ class InputReader(metaclass=ABCMeta):
|
||||
"Found %d %s files, loading %d",
|
||||
len(files),
|
||||
self._file_type,
|
||||
len(files_loaded),
|
||||
len(documents),
|
||||
)
|
||||
result = pd.concat(files_loaded)
|
||||
total_files_log = (
|
||||
f"Total number of unfiltered {self._file_type} rows: {len(result)}"
|
||||
f"Total number of unfiltered {self._file_type} rows: {len(documents)}"
|
||||
)
|
||||
logger.info(total_files_log)
|
||||
# Convert metadata columns to strings and collapse them into a JSON object
|
||||
if self._metadata:
|
||||
if all(col in result.columns for col in self._metadata):
|
||||
# Collapse the metadata columns into a single JSON object column
|
||||
result["metadata"] = result[self._metadata].apply(
|
||||
lambda row: row.to_dict(), axis=1
|
||||
)
|
||||
else:
|
||||
value_error_msg = (
|
||||
"One or more metadata columns not found in the DataFrame."
|
||||
)
|
||||
raise ValueError(value_error_msg)
|
||||
|
||||
result[self._metadata] = result[self._metadata].astype(str)
|
||||
|
||||
return result
|
||||
return documents
|
||||
|
||||
@abstractmethod
|
||||
async def read_file(self, path: str) -> pd.DataFrame:
|
||||
"""Read a file into a DataFrame of documents.
|
||||
async def read_file(self, path: str) -> list[TextDocument]:
|
||||
"""Read a file into a list of documents.
|
||||
|
||||
Args:
|
||||
- path - The path to read the file from.
|
||||
|
||||
Returns
|
||||
-------
|
||||
- output - DataFrame with a row for each document in the file.
|
||||
- output - List with an entry for each document in the file.
|
||||
"""
|
||||
|
||||
@ -6,9 +6,8 @@
|
||||
import json
|
||||
import logging
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from graphrag.index.input.structured_file_reader import StructuredFileReader
|
||||
from graphrag.index.input.text_document import TextDocument
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -16,7 +15,7 @@ logger = logging.getLogger(__name__)
|
||||
class JSONFileReader(StructuredFileReader):
|
||||
"""Reader implementation for json files."""
|
||||
|
||||
async def read_file(self, path: str) -> pd.DataFrame:
|
||||
async def read_file(self, path: str) -> list[TextDocument]:
|
||||
"""Read a JSON file into a DataFrame of documents.
|
||||
|
||||
Args:
|
||||
@ -30,5 +29,4 @@ class JSONFileReader(StructuredFileReader):
|
||||
as_json = json.loads(text)
|
||||
# json file could just be a single object, or an array of objects
|
||||
rows = as_json if isinstance(as_json, list) else [as_json]
|
||||
data = pd.DataFrame(rows)
|
||||
return await self.process_data_columns(data, path)
|
||||
return await self.process_data_columns(rows, path)
|
||||
|
||||
@ -4,10 +4,10 @@
|
||||
"""A module containing 'CSVFileReader' model."""
|
||||
|
||||
import logging
|
||||
|
||||
import pandas as pd
|
||||
from typing import Any
|
||||
|
||||
from graphrag.index.input.input_reader import InputReader
|
||||
from graphrag.index.input.text_document import TextDocument
|
||||
from graphrag.index.utils.hashing import gen_sha512_hash
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -30,30 +30,29 @@ class StructuredFileReader(InputReader):
|
||||
|
||||
async def process_data_columns(
|
||||
self,
|
||||
documents: pd.DataFrame,
|
||||
rows: list[dict[str, Any]],
|
||||
path: str,
|
||||
) -> pd.DataFrame:
|
||||
"""Process configured data columns of a DataFrame."""
|
||||
# id is optional - generate from harvest from df or hash from text
|
||||
if self._id_column is not None:
|
||||
documents["id"] = documents.apply(lambda x: x[self._id_column], axis=1)
|
||||
else:
|
||||
documents["id"] = documents.apply(
|
||||
lambda x: gen_sha512_hash(x, x.keys()), axis=1
|
||||
) -> list[TextDocument]:
|
||||
"""Process configured data columns from a list of loaded dicts."""
|
||||
documents = []
|
||||
for row in rows:
|
||||
# text column is required - harvest from dict
|
||||
text = row[self._text_column]
|
||||
# id is optional - generate from harvest from dict or hash from text
|
||||
id = (
|
||||
row[self._id_column]
|
||||
if self._id_column
|
||||
else gen_sha512_hash({"text": text}, ["text"])
|
||||
)
|
||||
|
||||
# title is optional - harvest from df or use filename
|
||||
if self._title_column is not None:
|
||||
documents["title"] = documents.apply(
|
||||
lambda x: x[self._title_column], axis=1
|
||||
# title is optional - harvest from dict or use filename
|
||||
title = row[self._title_column] if self._title_column else str(path)
|
||||
creation_date = await self._storage.get_creation_date(path)
|
||||
documents.append(
|
||||
TextDocument(
|
||||
id=id,
|
||||
title=title,
|
||||
text=text,
|
||||
creation_date=creation_date,
|
||||
)
|
||||
)
|
||||
else:
|
||||
documents["title"] = documents.apply(lambda _: path, axis=1)
|
||||
|
||||
# text column is required - harvest from df
|
||||
documents["text"] = documents.apply(lambda x: x[self._text_column], axis=1)
|
||||
|
||||
creation_date = await self._storage.get_creation_date(path)
|
||||
documents["creation_date"] = documents.apply(lambda _: creation_date, axis=1)
|
||||
|
||||
return documents
|
||||
|
||||
@ -6,9 +6,8 @@
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from graphrag.index.input.input_reader import InputReader
|
||||
from graphrag.index.input.text_document import TextDocument
|
||||
from graphrag.index.utils.hashing import gen_sha512_hash
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -17,7 +16,7 @@ logger = logging.getLogger(__name__)
|
||||
class TextFileReader(InputReader):
|
||||
"""Reader implementation for text files."""
|
||||
|
||||
async def read_file(self, path: str) -> pd.DataFrame:
|
||||
async def read_file(self, path: str) -> list[TextDocument]:
|
||||
"""Read a text file into a DataFrame of documents.
|
||||
|
||||
Args:
|
||||
@ -28,8 +27,10 @@ class TextFileReader(InputReader):
|
||||
- output - DataFrame with a row for each document in the file.
|
||||
"""
|
||||
text = await self._storage.get(path, encoding=self._encoding)
|
||||
new_item = {"text": text}
|
||||
new_item["id"] = gen_sha512_hash(new_item, new_item.keys())
|
||||
new_item["title"] = str(Path(path).name)
|
||||
new_item["creation_date"] = await self._storage.get_creation_date(path)
|
||||
return pd.DataFrame([new_item])
|
||||
document = TextDocument(
|
||||
id=gen_sha512_hash({"text": text}, ["text"]),
|
||||
title=str(Path(path).name),
|
||||
text=text,
|
||||
creation_date=await self._storage.get_creation_date(path),
|
||||
)
|
||||
return [document]
|
||||
|
||||
23
packages/graphrag/graphrag/index/input/text_document.py
Normal file
23
packages/graphrag/graphrag/index/input/text_document.py
Normal file
@ -0,0 +1,23 @@
|
||||
# Copyright (c) 2024 Microsoft Corporation.
|
||||
# Licensed under the MIT License
|
||||
|
||||
"""TextDocument dataclass."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass
|
||||
class TextDocument:
|
||||
"""The TextDocument holds relevant content for GraphRAG indexing."""
|
||||
|
||||
id: str
|
||||
"""Unique identifier for the document."""
|
||||
text: str
|
||||
"""The main text content of the document."""
|
||||
title: str
|
||||
"""The title of the document."""
|
||||
creation_date: str
|
||||
"""The creation date of the document, ISO-8601 format."""
|
||||
metadata: dict[str, Any] | None = None
|
||||
"""Additional metadata associated with the document."""
|
||||
@ -38,4 +38,4 @@ async def run_workflow(
|
||||
|
||||
async def load_input_documents(input_reader: InputReader) -> pd.DataFrame:
|
||||
"""Load and parse input documents into a standard format."""
|
||||
return await input_reader.read_files()
|
||||
return pd.DataFrame(await input_reader.read_files())
|
||||
|
||||
@ -47,7 +47,7 @@ async def load_update_documents(
|
||||
previous_storage: Storage,
|
||||
) -> pd.DataFrame:
|
||||
"""Load and parse update-only input documents into a standard format."""
|
||||
input_documents = await input_reader.read_files()
|
||||
input_documents = pd.DataFrame(await input_reader.read_files())
|
||||
# previous storage is the output of the previous run
|
||||
# we'll use this to diff the input from the prior
|
||||
delta_documents = await get_delta_docs(input_documents, previous_storage)
|
||||
|
||||
@ -68,7 +68,7 @@ async def load_docs_in_chunks(
|
||||
dataset = await input_reader.read_files()
|
||||
chunk_config = config.chunks
|
||||
chunks_df = create_base_text_units(
|
||||
documents=dataset,
|
||||
documents=pd.DataFrame(dataset),
|
||||
callbacks=NoopWorkflowCallbacks(),
|
||||
size=chunk_size,
|
||||
overlap=overlap,
|
||||
|
||||
@ -20,8 +20,9 @@ async def test_csv_loader_one_file():
|
||||
storage = create_storage(config.storage)
|
||||
reader = create_input_reader(config, storage)
|
||||
documents = await reader.read_files()
|
||||
assert documents.shape == (2, 4)
|
||||
assert documents["title"].iloc[0] == "input.csv"
|
||||
assert len(documents) == 2
|
||||
assert documents[0].title == "input.csv"
|
||||
assert documents[0].metadata is None
|
||||
|
||||
|
||||
async def test_csv_loader_one_file_with_title():
|
||||
@ -36,8 +37,8 @@ async def test_csv_loader_one_file_with_title():
|
||||
storage = create_storage(config.storage)
|
||||
reader = create_input_reader(config, storage)
|
||||
documents = await reader.read_files()
|
||||
assert documents.shape == (2, 4)
|
||||
assert documents["title"].iloc[0] == "Hello"
|
||||
assert len(documents) == 2
|
||||
assert documents[0].title == "Hello"
|
||||
|
||||
|
||||
async def test_csv_loader_one_file_with_metadata():
|
||||
@ -53,8 +54,8 @@ async def test_csv_loader_one_file_with_metadata():
|
||||
storage = create_storage(config.storage)
|
||||
reader = create_input_reader(config, storage)
|
||||
documents = await reader.read_files()
|
||||
assert documents.shape == (2, 5)
|
||||
assert documents["metadata"][0] == {"title": "Hello"}
|
||||
assert len(documents) == 2
|
||||
assert documents[0].metadata == {"title": "Hello"}
|
||||
|
||||
|
||||
async def test_csv_loader_multiple_files():
|
||||
@ -68,4 +69,4 @@ async def test_csv_loader_multiple_files():
|
||||
storage = create_storage(config.storage)
|
||||
reader = create_input_reader(config, storage)
|
||||
documents = await reader.read_files()
|
||||
assert documents.shape == (4, 4)
|
||||
assert len(documents) == 4
|
||||
|
||||
@ -18,8 +18,9 @@ async def test_json_loader_one_file_one_object():
|
||||
storage = create_storage(config.storage)
|
||||
reader = create_input_reader(config, storage)
|
||||
documents = await reader.read_files()
|
||||
assert documents.shape == (1, 4)
|
||||
assert documents["title"].iloc[0] == "input.json"
|
||||
assert len(documents) == 1
|
||||
assert documents[0].title == "input.json"
|
||||
assert documents[0].metadata is None
|
||||
|
||||
|
||||
async def test_json_loader_one_file_multiple_objects():
|
||||
@ -33,9 +34,8 @@ async def test_json_loader_one_file_multiple_objects():
|
||||
storage = create_storage(config.storage)
|
||||
reader = create_input_reader(config, storage)
|
||||
documents = await reader.read_files()
|
||||
print(documents)
|
||||
assert documents.shape == (3, 4)
|
||||
assert documents["title"].iloc[0] == "input.json"
|
||||
assert len(documents) == 3
|
||||
assert documents[0].title == "input.json"
|
||||
|
||||
|
||||
async def test_json_loader_one_file_with_title():
|
||||
@ -50,8 +50,8 @@ async def test_json_loader_one_file_with_title():
|
||||
storage = create_storage(config.storage)
|
||||
reader = create_input_reader(config, storage)
|
||||
documents = await reader.read_files()
|
||||
assert documents.shape == (1, 4)
|
||||
assert documents["title"].iloc[0] == "Hello"
|
||||
assert len(documents) == 1
|
||||
assert documents[0].title == "Hello"
|
||||
|
||||
|
||||
async def test_json_loader_one_file_with_metadata():
|
||||
@ -67,8 +67,8 @@ async def test_json_loader_one_file_with_metadata():
|
||||
storage = create_storage(config.storage)
|
||||
reader = create_input_reader(config, storage)
|
||||
documents = await reader.read_files()
|
||||
assert documents.shape == (1, 5)
|
||||
assert documents["metadata"][0] == {"title": "Hello"}
|
||||
assert len(documents) == 1
|
||||
assert documents[0].metadata == {"title": "Hello"}
|
||||
|
||||
|
||||
async def test_json_loader_multiple_files():
|
||||
@ -82,4 +82,4 @@ async def test_json_loader_multiple_files():
|
||||
storage = create_storage(config.storage)
|
||||
reader = create_input_reader(config, storage)
|
||||
documents = await reader.read_files()
|
||||
assert documents.shape == (4, 4)
|
||||
assert len(documents) == 4
|
||||
|
||||
@ -18,8 +18,9 @@ async def test_txt_loader_one_file():
|
||||
storage = create_storage(config.storage)
|
||||
reader = create_input_reader(config, storage)
|
||||
documents = await reader.read_files()
|
||||
assert documents.shape == (1, 4)
|
||||
assert documents["title"].iloc[0] == "input.txt"
|
||||
assert len(documents) == 1
|
||||
assert documents[0].title == "input.txt"
|
||||
assert documents[0].metadata is None
|
||||
|
||||
|
||||
async def test_txt_loader_one_file_with_metadata():
|
||||
@ -34,9 +35,9 @@ async def test_txt_loader_one_file_with_metadata():
|
||||
storage = create_storage(config.storage)
|
||||
reader = create_input_reader(config, storage)
|
||||
documents = await reader.read_files()
|
||||
assert documents.shape == (1, 5)
|
||||
assert len(documents) == 1
|
||||
# unlike csv, we cannot set the title to anything other than the filename
|
||||
assert documents["metadata"][0] == {"title": "input.txt"}
|
||||
assert documents[0].metadata == {"title": "input.txt"}
|
||||
|
||||
|
||||
async def test_txt_loader_multiple_files():
|
||||
@ -50,4 +51,4 @@ async def test_txt_loader_multiple_files():
|
||||
storage = create_storage(config.storage)
|
||||
reader = create_input_reader(config, storage)
|
||||
documents = await reader.read_files()
|
||||
assert documents.shape == (2, 4)
|
||||
assert len(documents) == 2
|
||||
|
||||
Loading…
Reference in New Issue
Block a user