Combine structured data extraction

This commit is contained in:
Nathan Evans 2026-01-06 13:03:04 -08:00
parent b265612828
commit f066080ef0
3 changed files with 65 additions and 19 deletions

View File

@ -8,13 +8,12 @@ from io import BytesIO
import pandas as pd
from graphrag.index.input.input_reader import InputReader
from graphrag.index.input.util import process_data_columns
from graphrag.index.input.structured_file_reader import StructuredFileReader
logger = logging.getLogger(__name__)
class CSVFileReader(InputReader):
class CSVFileReader(StructuredFileReader):
"""Reader implementation for csv files."""
async def read_file(self, path: str) -> pd.DataFrame:
@ -29,9 +28,4 @@ class CSVFileReader(InputReader):
"""
buffer = BytesIO(await self._storage.get(path, as_bytes=True))
data = pd.read_csv(buffer, encoding=self._encoding)
data = process_data_columns(
data, path, self._id_column, self._title_column, self._text_column
)
creation_date = await self._storage.get_creation_date(path)
data["creation_date"] = data.apply(lambda _: creation_date, axis=1)
return data
return await self.process_data_columns(data, path)

View File

@ -8,13 +8,12 @@ import logging
import pandas as pd
from graphrag.index.input.input_reader import InputReader
from graphrag.index.input.util import process_data_columns
from graphrag.index.input.structured_file_reader import StructuredFileReader
logger = logging.getLogger(__name__)
class JSONFileReader(InputReader):
class JSONFileReader(StructuredFileReader):
"""Reader implementation for json files."""
async def read_file(self, path: str) -> pd.DataFrame:
@ -32,10 +31,4 @@ class JSONFileReader(InputReader):
# json file could just be a single object, or an array of objects
rows = as_json if isinstance(as_json, list) else [as_json]
data = pd.DataFrame(rows)
data = process_data_columns(
data, path, self._id_column, self._title_column, self._text_column
)
creation_date = await self._storage.get_creation_date(path)
data["creation_date"] = data.apply(lambda _: creation_date, axis=1)
return data
return await self.process_data_columns(data, path)

View File

@ -0,0 +1,59 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
"""A module containing 'CSVFileReader' model."""
import logging
import pandas as pd
from graphrag.index.input.input_reader import InputReader
from graphrag.index.utils.hashing import gen_sha512_hash
logger = logging.getLogger(__name__)
class StructuredFileReader(InputReader):
"""Base reader implementation for structured files such as csv and json."""
def __init__(
self,
id_column: str | None = None,
title_column: str | None = None,
text_column: str = "text",
**kwargs,
):
super().__init__(**kwargs)
self._id_column = id_column
self._title_column = title_column
self._text_column = text_column
async def process_data_columns(
self,
documents: pd.DataFrame,
path: str,
) -> pd.DataFrame:
"""Process configured data columns of a DataFrame."""
# id is optional - generate from harvest from df or hash from text
if self._id_column is not None:
documents["id"] = documents.apply(lambda x: x[self._id_column], axis=1)
else:
documents["id"] = documents.apply(
lambda x: gen_sha512_hash(x, x.keys()), axis=1
)
# title is optional - harvest from df or use filename
if self._title_column is not None:
documents["title"] = documents.apply(
lambda x: x[self._title_column], axis=1
)
else:
documents["title"] = documents.apply(lambda _: path, axis=1)
# text column is required - harvest from df
documents["text"] = documents.apply(lambda x: x[self._text_column], axis=1)
creation_date = await self._storage.get_creation_date(path)
documents["creation_date"] = documents.apply(lambda _: creation_date, axis=1)
return documents