Logging improvements (#2030)
Some checks failed
gh-pages / build (push) Has been cancelled
Python CI / python-ci (ubuntu-latest, 3.10) (push) Has been cancelled
Python CI / python-ci (ubuntu-latest, 3.11) (push) Has been cancelled
Python CI / python-ci (windows-latest, 3.10) (push) Has been cancelled
Python CI / python-ci (windows-latest, 3.11) (push) Has been cancelled
Python Integration Tests / python-ci (ubuntu-latest, 3.10) (push) Has been cancelled
Python Integration Tests / python-ci (windows-latest, 3.10) (push) Has been cancelled
Python Notebook Tests / python-ci (ubuntu-latest, 3.10) (push) Has been cancelled
Python Notebook Tests / python-ci (windows-latest, 3.10) (push) Has been cancelled
Python Publish (pypi) / Upload release to PyPI (push) Has been cancelled
Python Smoke Tests / python-ci (ubuntu-latest, 3.10) (push) Has been cancelled
Python Smoke Tests / python-ci (windows-latest, 3.10) (push) Has been cancelled
Spellcheck / spellcheck (push) Has been cancelled

* Turn down blob/cosmos exception reporting to match file storage

* Restore indexing-engine.log

* Restore some basic console logging and progress for index CLI

* Semver

* Ignore small ruff complaints

* Fix CLI console printing
This commit is contained in:
Nathan Evans 2025-08-25 14:56:43 -07:00 committed by GitHub
parent 469ee8568f
commit 77fb7d9d7d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 145 additions and 279 deletions

View File

@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Improve upon recent logging refactor"
}

View File

@ -2,10 +2,3 @@
# Licensed under the MIT License
"""The GraphRAG package."""
import logging
from graphrag.logger.standard_logging import init_console_logger
logger = logging.getLogger(__name__)
init_console_logger()

View File

@ -0,0 +1,46 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
"""A logger that emits updates from the indexing engine to the console."""
from graphrag.callbacks.noop_workflow_callbacks import NoopWorkflowCallbacks
from graphrag.index.typing.pipeline_run_result import PipelineRunResult
from graphrag.logger.progress import Progress
# ruff: noqa: T201
class ConsoleWorkflowCallbacks(NoopWorkflowCallbacks):
"""A logger that writes to a console."""
_verbose = False
def __init__(self, verbose=False):
self._verbose = verbose
def pipeline_start(self, names: list[str]) -> None:
"""Execute this callback to signal when the entire pipeline starts."""
print("Starting pipeline with workflows:", ", ".join(names))
def pipeline_end(self, results: list[PipelineRunResult]) -> None:
"""Execute this callback to signal when the entire pipeline ends."""
print("Pipeline complete")
def workflow_start(self, name: str, instance: object) -> None:
"""Execute this callback when a workflow starts."""
print(f"Starting workflow: {name}")
def workflow_end(self, name: str, instance: object) -> None:
"""Execute this callback when a workflow ends."""
print("") # account for potential return on prior progress
print(f"Workflow complete: {name}")
if self._verbose:
print(instance)
def progress(self, progress: Progress) -> None:
"""Handle when progress occurs."""
complete = progress.completed_items or 0
total = progress.total_items or 1
percent = round((complete / total) * 100)
start = f" {complete} / {total} "
print(f"{start:{'.'}<{percent}}", flush=True, end="\r")

View File

@ -10,9 +10,11 @@ import warnings
from pathlib import Path
import graphrag.api as api
from graphrag.callbacks.console_workflow_callbacks import ConsoleWorkflowCallbacks
from graphrag.config.enums import CacheType, IndexingMethod, ReportingType
from graphrag.config.load_config import load_config
from graphrag.index.validate_config import validate_config_names
from graphrag.logger.standard_logging import DEFAULT_LOG_FILENAME
from graphrag.utils.cli import redact
# Ignore warnings from numba
@ -115,7 +117,6 @@ def _run_index(
# Initialize loggers and reporting config
init_loggers(
config=config,
root_dir=str(config.root_dir) if config.root_dir else None,
verbose=verbose,
)
@ -124,8 +125,8 @@ def _run_index(
# Log the configuration details
if config.reporting.type == ReportingType.file:
log_dir = Path(config.root_dir or "") / (config.reporting.base_dir or "")
log_path = log_dir / "logs.txt"
log_dir = Path(config.root_dir) / config.reporting.base_dir
log_path = log_dir / DEFAULT_LOG_FILENAME
logger.info("Logging enabled at %s", log_path)
else:
logger.info(
@ -154,6 +155,7 @@ def _run_index(
method=method,
is_update_run=is_update_run,
memory_profile=memprofile,
callbacks=[ConsoleWorkflowCallbacks(verbose=verbose)],
)
)
encountered_errors = any(

View File

@ -9,6 +9,7 @@ from pathlib import Path
import graphrag.api as api
from graphrag.config.enums import ReportingType
from graphrag.config.load_config import load_config
from graphrag.logger.standard_logging import DEFAULT_LOG_FILENAME
from graphrag.prompt_tune.generator.community_report_summarization import (
COMMUNITY_SUMMARIZATION_FILENAME,
)
@ -75,14 +76,13 @@ async def prompt_tune(
# initialize loggers with config
init_loggers(
config=graph_config,
root_dir=str(root_path),
verbose=verbose,
)
# log the configuration details
if graph_config.reporting.type == ReportingType.file:
log_dir = Path(root_path) / (graph_config.reporting.base_dir or "")
log_path = log_dir / "logs.txt"
log_dir = Path(root_path) / graph_config.reporting.base_dir
log_path = log_dir / DEFAULT_LOG_FILENAME
logger.info("Logging enabled at %s", log_path)
else:
logger.info(

View File

@ -4,7 +4,6 @@
"""CLI implementation of the query subcommand."""
import asyncio
import logging
import sys
from pathlib import Path
from typing import TYPE_CHECKING, Any
@ -19,8 +18,7 @@ from graphrag.utils.storage import load_table_from_storage, storage_has_table
if TYPE_CHECKING:
import pandas as pd
# Initialize standard logger
logger = logging.getLogger(__name__)
# ruff: noqa: T201
def run_global_search(
@ -61,10 +59,6 @@ def run_global_search(
final_community_reports_list = dataframe_dict["community_reports"]
index_names = dataframe_dict["index_names"]
logger.info(
"Running multi-index global search on indexes: %s",
dataframe_dict["index_names"],
)
response, context_data = asyncio.run(
api.multi_index_global_search(
config=config,
@ -80,11 +74,7 @@ def run_global_search(
verbose=verbose,
)
)
# log the full response at INFO level for user visibility but at DEBUG level in the API layer
logger.info("Query Response:\n%s", response)
# NOTE: we return the response and context data here purely as a complete demonstration of the API.
# External users should use the API directly to get the response and context data.
print(response)
return response, context_data
# Otherwise, call the Single-Index Global Search API
@ -118,9 +108,9 @@ def run_global_search(
verbose=verbose,
):
full_response += stream_chunk
print(stream_chunk, end="") # noqa: T201
sys.stdout.flush() # flush output buffer to display text immediately
print() # noqa: T201
print(stream_chunk, end="")
sys.stdout.flush()
print()
return full_response, context_data
return asyncio.run(run_streaming_search())
@ -138,11 +128,8 @@ def run_global_search(
verbose=verbose,
)
)
# log the full response at INFO level for user visibility but at DEBUG level in the API layer
logger.info("Global Search Response:\n%s", response)
print(response)
# NOTE: we return the response and context data here purely as a complete demonstration of the API.
# External users should use the API directly to get the response and context data.
return response, context_data
@ -188,11 +175,6 @@ def run_local_search(
final_relationships_list = dataframe_dict["relationships"]
index_names = dataframe_dict["index_names"]
logger.info(
"Running multi-index local search on indexes: %s",
dataframe_dict["index_names"],
)
# If any covariates tables are missing from any index, set the covariates list to None
if len(dataframe_dict["covariates"]) != dataframe_dict["num_indexes"]:
final_covariates_list = None
@ -216,11 +198,8 @@ def run_local_search(
verbose=verbose,
)
)
# log the full response at INFO level for user visibility but at DEBUG level in the API layer
logger.info("Local Search Response:\n%s", response)
print(response)
# NOTE: we return the response and context data here purely as a complete demonstration of the API.
# External users should use the API directly to get the response and context data.
return response, context_data
# Otherwise, call the Single-Index Local Search API
@ -259,9 +238,9 @@ def run_local_search(
verbose=verbose,
):
full_response += stream_chunk
print(stream_chunk, end="") # noqa: T201
sys.stdout.flush() # flush output buffer to display text immediately
print() # noqa: T201
print(stream_chunk, end="")
sys.stdout.flush()
print()
return full_response, context_data
return asyncio.run(run_streaming_search())
@ -281,11 +260,8 @@ def run_local_search(
verbose=verbose,
)
)
# log the full response at INFO level for user visibility but at DEBUG level in the API layer
logger.info("Local Search Response:\n%s", response)
print(response)
# NOTE: we return the response and context data here purely as a complete demonstration of the API.
# External users should use the API directly to get the response and context data.
return response, context_data
@ -329,11 +305,6 @@ def run_drift_search(
final_relationships_list = dataframe_dict["relationships"]
index_names = dataframe_dict["index_names"]
logger.info(
"Running multi-index drift search on indexes: %s",
dataframe_dict["index_names"],
)
response, context_data = asyncio.run(
api.multi_index_drift_search(
config=config,
@ -350,11 +321,8 @@ def run_drift_search(
verbose=verbose,
)
)
# log the full response at INFO level for user visibility but at DEBUG level in the API layer
logger.info("DRIFT Search Response:\n%s", response)
print(response)
# NOTE: we return the response and context data here purely as a complete demonstration of the API.
# External users should use the API directly to get the response and context data.
return response, context_data
# Otherwise, call the Single-Index Drift Search API
@ -391,9 +359,9 @@ def run_drift_search(
verbose=verbose,
):
full_response += stream_chunk
print(stream_chunk, end="") # noqa: T201
sys.stdout.flush() # flush output buffer to display text immediately
print() # noqa: T201
print(stream_chunk, end="")
sys.stdout.flush()
print()
return full_response, context_data
return asyncio.run(run_streaming_search())
@ -413,11 +381,8 @@ def run_drift_search(
verbose=verbose,
)
)
# log the full response at INFO level for user visibility but at DEBUG level in the API layer
logger.info("DRIFT Search Response:\n%s", response)
print(response)
# NOTE: we return the response and context data here purely as a complete demonstration of the API.
# External users should use the API directly to get the response and context data.
return response, context_data
@ -451,11 +416,6 @@ def run_basic_search(
final_text_units_list = dataframe_dict["text_units"]
index_names = dataframe_dict["index_names"]
logger.info(
"Running multi-index basic search on indexes: %s",
dataframe_dict["index_names"],
)
response, context_data = asyncio.run(
api.multi_index_basic_search(
config=config,
@ -466,11 +426,8 @@ def run_basic_search(
verbose=verbose,
)
)
# log the full response at INFO level for user visibility but at DEBUG level in the API layer
logger.info("Basic Search Response:\n%s", response)
print(response)
# NOTE: we return the response and context data here purely as a complete demonstration of the API.
# External users should use the API directly to get the response and context data.
return response, context_data
# Otherwise, call the Single-Index Basic Search API
@ -497,9 +454,9 @@ def run_basic_search(
verbose=verbose,
):
full_response += stream_chunk
print(stream_chunk, end="") # noqa: T201
sys.stdout.flush() # flush output buffer to display text immediately
print() # noqa: T201
print(stream_chunk, end="")
sys.stdout.flush()
print()
return full_response, context_data
return asyncio.run(run_streaming_search())
@ -512,11 +469,8 @@ def run_basic_search(
verbose=verbose,
)
)
# log the full response at INFO level for user visibility but at DEBUG level in the API layer
logger.info("Basic Search Response:\n%s", response)
print(response)
# NOTE: we return the response and context data here purely as a complete demonstration of the API.
# External users should use the API directly to get the response and context data.
return response, context_data

View File

@ -17,7 +17,6 @@ from graphrag.index.operations.chunk_text.strategies import get_encoding_fn
from graphrag.index.typing.context import PipelineRunContext
from graphrag.index.typing.workflow import WorkflowFunctionOutput
from graphrag.index.utils.hashing import gen_sha512_hash
from graphrag.logger.progress import Progress
from graphrag.utils.storage import load_table_from_storage, write_table_to_storage
logger = logging.getLogger(__name__)
@ -69,8 +68,6 @@ def create_base_text_units(
zip(*[sort[col] for col in ["id", "text"]], strict=True)
)
callbacks.progress(Progress(percent=0))
agg_dict = {"text_with_ids": list}
if "metadata" in documents:
agg_dict["metadata"] = "first" # type: ignore

View File

@ -16,9 +16,6 @@ logger = logging.getLogger(__name__)
class Progress:
"""A class representing the progress of a task."""
percent: float | None = None
"""0 - 1 progress"""
description: str | None = None
"""Description of the progress"""

View File

@ -9,7 +9,7 @@ logging system for use within the graphrag package.
Usage:
# Configuration should be done once at the start of your application:
from graphrag.logger.standard_logging import init_loggers
init_loggers(log_file="/path/to/app.log")
init_loggers(config)
# Then throughout your code:
import logging
@ -33,22 +33,20 @@ Notes
"""
import logging
import sys
from pathlib import Path
from graphrag.config.enums import ReportingType
from graphrag.config.models.graph_rag_config import GraphRagConfig
from graphrag.config.models.reporting_config import ReportingConfig
DEFAULT_LOG_FILENAME = "indexing-engine.log"
LOG_FORMAT = "%(asctime)s.%(msecs)04d - %(levelname)s - %(name)s - %(message)s"
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
def init_loggers(
config: GraphRagConfig | None = None,
root_dir: str | None = None,
config: GraphRagConfig,
verbose: bool = False,
log_file: str | Path | None = None,
filename: str = DEFAULT_LOG_FILENAME,
) -> None:
"""Initialize logging handlers for graphrag based on configuration.
@ -70,20 +68,7 @@ def init_loggers(
from graphrag.logger.blob_workflow_logger import BlobWorkflowLogger
# extract reporting config from GraphRagConfig if provided
reporting_config: ReportingConfig
if log_file:
# if log_file is provided directly, override config to use file-based logging
log_path = Path(log_file)
reporting_config = ReportingConfig(
type=ReportingType.file,
base_dir=str(log_path.parent),
)
elif config is not None:
# use the reporting configuration from GraphRagConfig
reporting_config = config.reporting
else:
# default to file-based logging if no config provided
reporting_config = ReportingConfig(base_dir="logs", type=ReportingType.file)
reporting_config = config.reporting
logger = logging.getLogger("graphrag")
log_level = logging.DEBUG if verbose else logging.INFO
@ -100,23 +85,15 @@ def init_loggers(
# create formatter with custom format
formatter = logging.Formatter(fmt=LOG_FORMAT, datefmt=DATE_FORMAT)
init_console_logger(verbose)
# add more handlers based on configuration
handler: logging.Handler
match reporting_config.type:
case ReportingType.file:
if log_file:
# use the specific log file provided
log_file_path = Path(log_file)
log_file_path.parent.mkdir(parents=True, exist_ok=True)
handler = logging.FileHandler(str(log_file_path), mode="a")
else:
# use the config-based file path
log_dir = Path(root_dir or "") / (reporting_config.base_dir or "")
log_dir.mkdir(parents=True, exist_ok=True)
log_file_path = log_dir / "logs.txt"
handler = logging.FileHandler(str(log_file_path), mode="a")
# use the config-based file path
log_dir = Path(config.root_dir) / (reporting_config.base_dir)
log_dir.mkdir(parents=True, exist_ok=True)
log_file_path = log_dir / filename
handler = logging.FileHandler(str(log_file_path), mode="a")
handler.setFormatter(formatter)
logger.addHandler(handler)
case ReportingType.blob:
@ -129,25 +106,3 @@ def init_loggers(
logger.addHandler(handler)
case _:
logger.error("Unknown reporting type '%s'.", reporting_config.type)
def init_console_logger(verbose: bool = False) -> None:
"""Initialize a console logger if not already present.
This function sets up a logger that outputs log messages to STDOUT.
Parameters
----------
verbose : bool, default=False
Whether to enable verbose (DEBUG) logging.
"""
logger = logging.getLogger("graphrag")
logger.setLevel(logging.DEBUG if verbose else logging.INFO)
has_console_handler = any(
isinstance(h, logging.StreamHandler) for h in logger.handlers
)
if not has_console_handler:
console_handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter(fmt=LOG_FORMAT, datefmt=DATE_FORMAT)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)

View File

@ -61,7 +61,7 @@ class BlobPipelineStorage(PipelineStorage):
if storage_account_blob_url
else None
)
logger.info(
logger.debug(
"creating blob storage at container=%s, path=%s",
self._container_name,
self._path_prefix,
@ -162,14 +162,13 @@ class BlobPipelineStorage(PipelineStorage):
num_filtered,
num_total,
)
except Exception:
logger.exception(
except Exception: # noqa: BLE001
logger.warning(
"Error finding blobs: base_dir=%s, file_pattern=%s, file_filter=%s",
base_dir,
file_pattern,
file_filter,
)
raise
async def get(
self, key: str, as_bytes: bool | None = False, encoding: str | None = None
@ -185,8 +184,8 @@ class BlobPipelineStorage(PipelineStorage):
if not as_bytes:
coding = encoding or self._encoding
blob_data = blob_data.decode(coding)
except Exception:
logger.exception("Error getting key %s", key)
except Exception: # noqa: BLE001
logger.warning("Error getting key %s", key)
return None
else:
return blob_data
@ -303,8 +302,8 @@ class BlobPipelineStorage(PipelineStorage):
blob_client = container_client.get_blob_client(key)
timestamp = blob_client.download_blob().properties.creation_time
return get_timestamp_formatted_with_local_tz(timestamp)
except Exception:
logger.exception("Error getting key %s", key)
except Exception: # noqa: BLE001
logger.warning("Error getting key %s", key)
return ""

View File

@ -71,7 +71,7 @@ class CosmosDBPipelineStorage(PipelineStorage):
else None
)
self._no_id_prefixes = []
logger.info(
logger.debug(
"creating cosmosdb storage with account: %s and database: %s and container: %s",
self._cosmosdb_account_name,
self._database_name,
@ -192,8 +192,8 @@ class CosmosDBPipelineStorage(PipelineStorage):
progress_status.completed_items,
progress_status.total_items,
)
except Exception:
logger.exception(
except Exception: # noqa: BLE001
logger.warning(
"An error occurred while searching for documents in Cosmos DB."
)
@ -229,8 +229,8 @@ class CosmosDBPipelineStorage(PipelineStorage):
item = self._container_client.read_item(item=key, partition_key=key)
item_body = item.get("body")
return json.dumps(item_body)
except Exception:
logger.exception("Error reading item %s", key)
except Exception: # noqa: BLE001
logger.warning("Error reading item %s", key)
return None
async def set(self, key: str, value: Any, encoding: str | None = None) -> None:
@ -343,8 +343,8 @@ class CosmosDBPipelineStorage(PipelineStorage):
datetime.fromtimestamp(item["_ts"], tz=timezone.utc)
)
except Exception:
logger.exception("Error getting key %s", key)
except Exception: # noqa: BLE001
logger.warning("Error getting key %s", key)
return ""

View File

@ -7,9 +7,7 @@ import logging
import tempfile
from pathlib import Path
from graphrag.config.enums import ReportingType
from graphrag.config.models.reporting_config import ReportingConfig
from graphrag.logger.standard_logging import init_loggers
from graphrag.logger.standard_logging import DEFAULT_LOG_FILENAME, init_loggers
from tests.unit.config.utils import get_default_graphrag_config
@ -19,37 +17,11 @@ def test_standard_logging():
assert logger.name == "graphrag.test"
def test_file_logging():
"""Test that logging to a file works."""
with tempfile.TemporaryDirectory() as temp_dir:
log_file = Path(temp_dir) / "test.log"
# configure logging to file using init_loggers
init_loggers(log_file=log_file)
# get a logger and log some messages
logger = logging.getLogger("graphrag.test")
test_message = "Test file logging message"
logger.info(test_message)
# check that the log file exists and contains our message
assert log_file.exists()
with open(log_file) as f:
content = f.read()
assert test_message in content
# close all file handlers to ensure proper cleanup on Windows
graphrag_logger = logging.getLogger("graphrag")
for handler in graphrag_logger.handlers[:]:
if isinstance(handler, logging.FileHandler):
handler.close()
graphrag_logger.removeHandler(handler)
def test_logger_hierarchy():
"""Test that logger hierarchy works correctly."""
# reset logging to default state using init_loggers
init_loggers()
config = get_default_graphrag_config()
init_loggers(config)
root_logger = logging.getLogger("graphrag")
child_logger = logging.getLogger("graphrag.child")
@ -62,71 +34,13 @@ def test_logger_hierarchy():
root_logger.handlers.clear()
def test_init_loggers_console_enabled():
"""Test that init_loggers works with console handler."""
init_loggers()
logger = logging.getLogger("graphrag")
# should have both a console handler and a file handler (default config)
console_handlers = [
h
for h in logger.handlers
if isinstance(h, logging.StreamHandler)
and not isinstance(h, logging.FileHandler)
]
file_handlers = [h for h in logger.handlers if isinstance(h, logging.FileHandler)]
assert len(console_handlers) > 0
assert len(file_handlers) > 0 # Due to default file config
# clean up
for handler in logger.handlers[:]:
if isinstance(handler, logging.FileHandler):
handler.close()
logger.handlers.clear()
def test_init_loggers_default_config():
"""Test that init_loggers uses default file config when none provided."""
with tempfile.TemporaryDirectory() as temp_dir:
# call init_loggers with no config (should default to file logging)
init_loggers(root_dir=temp_dir)
logger = logging.getLogger("graphrag")
# Should have a file handler due to default config
file_handlers = [
h for h in logger.handlers if isinstance(h, logging.FileHandler)
]
assert len(file_handlers) > 0
# test that logging works
test_message = "Test default config message"
logger.info(test_message)
# check that the log file was created with default structure
log_file = Path(temp_dir) / "logs" / "logs.txt"
assert log_file.exists()
with open(log_file) as f:
content = f.read()
assert test_message in content
# clean up
for handler in logger.handlers[:]:
if isinstance(handler, logging.FileHandler):
handler.close()
logger.handlers.clear()
def test_init_loggers_file_config():
"""Test that init_loggers works with file configuration."""
with tempfile.TemporaryDirectory() as temp_dir:
config = get_default_graphrag_config(root_dir=temp_dir)
config.reporting = ReportingConfig(type=ReportingType.file, base_dir="logs")
# call init_loggers with file config
init_loggers(config=config, root_dir=temp_dir)
init_loggers(config=config)
logger = logging.getLogger("graphrag")
@ -141,7 +55,7 @@ def test_init_loggers_file_config():
logger.info(test_message)
# check that the log file was created
log_file = Path(temp_dir) / "logs" / "logs.txt"
log_file = Path(temp_dir) / "logs" / DEFAULT_LOG_FILENAME
assert log_file.exists()
with open(log_file) as f:
@ -155,45 +69,50 @@ def test_init_loggers_file_config():
logger.handlers.clear()
def test_init_loggers_console_config():
"""Test that init_loggers works with console configuration."""
config = get_default_graphrag_config()
def test_init_loggers_file_verbose():
"""Test that init_loggers works with verbose flag."""
with tempfile.TemporaryDirectory() as temp_dir:
config = get_default_graphrag_config(root_dir=temp_dir)
# call init_loggers with config
init_loggers(config=config)
# call init_loggers with file config
init_loggers(config=config, verbose=True)
logger = logging.getLogger("graphrag")
logger = logging.getLogger("graphrag")
# should have a console handler from the config
console_handlers = [
h
for h in logger.handlers
if isinstance(h, logging.StreamHandler)
and not isinstance(h, logging.FileHandler)
]
assert len(console_handlers) > 0
# test that logging works
test_message = "Test init_loggers file message"
logger.debug(test_message)
# clean up
logger.handlers.clear()
# check that the log file was created
log_file = Path(temp_dir) / "logs" / DEFAULT_LOG_FILENAME
with open(log_file) as f:
content = f.read()
assert test_message in content
# clean up
for handler in logger.handlers[:]:
if isinstance(handler, logging.FileHandler):
handler.close()
logger.handlers.clear()
def test_init_loggers_both_console():
"""Test that init_loggers doesn't duplicate console handlers."""
config = get_default_graphrag_config()
def test_init_loggers_custom_filename():
"""Test that init_loggers works with custom filename."""
with tempfile.TemporaryDirectory() as temp_dir:
config = get_default_graphrag_config(root_dir=temp_dir)
# call init_loggers with config
init_loggers(config=config)
# call init_loggers with file config
init_loggers(config=config, filename="custom-log.log")
logger = logging.getLogger("graphrag")
logger = logging.getLogger("graphrag")
# should have only one console handler (no duplicates)
console_handlers = [
h
for h in logger.handlers
if isinstance(h, logging.StreamHandler)
and not isinstance(h, logging.FileHandler)
]
assert len(console_handlers) == 1
# check that the log file was created
log_file = Path(temp_dir) / "logs" / "custom-log.log"
assert log_file.exists()
# clean up
logger.handlers.clear()
# clean up
for handler in logger.handlers[:]:
if isinstance(handler, logging.FileHandler):
handler.close()
logger.handlers.clear()