From f6045fac0903382c2e4c93a2c76efa7aa59312bd Mon Sep 17 00:00:00 2001 From: fredricz-20070104 <226039983+fredricz-20070104@users.noreply.github.com> Date: Sat, 10 Jan 2026 20:51:18 +0800 Subject: [PATCH] [None][chore] Fix Gitlab CI termination issues (#10576) Signed-off-by: FredricZ-2007 <226039983+fredricz-20070104@users.noreply.github.com> Signed-off-by: yufeiwu-nv <230315618+yufeiwu-nv@users.noreply.github.com> Co-authored-by: yufeiwu-nv <230315618+yufeiwu-nv@users.noreply.github.com> --- .../defs/perf/disagg/cleanup_jobs.sh | 108 ++++++++++++++++++ .../integration/defs/perf/disagg/conftest.py | 44 +++++-- .../defs/perf/disagg/execution/executor.py | 9 +- .../disagg/reporting/accuracy_validator.py | 15 +++ .../defs/perf/disagg/session_collect.sh | 42 ++----- ...pp8_gen1_dep16_bs8_eplb0_mtp0-Default.yaml | 4 +- .../defs/perf/disagg/test_disagg.py | 72 ++++++------ .../defs/perf/disagg/utils/job_tracker.py | 61 ++++++++++ .../defs/perf/disagg/utils/trackers.py | 5 + 9 files changed, 278 insertions(+), 82 deletions(-) create mode 100644 tests/integration/defs/perf/disagg/cleanup_jobs.sh create mode 100644 tests/integration/defs/perf/disagg/utils/job_tracker.py diff --git a/tests/integration/defs/perf/disagg/cleanup_jobs.sh b/tests/integration/defs/perf/disagg/cleanup_jobs.sh new file mode 100644 index 0000000000..61ad4b160e --- /dev/null +++ b/tests/integration/defs/perf/disagg/cleanup_jobs.sh @@ -0,0 +1,108 @@ +#!/bin/bash +# cleanup_jobs.sh - Cancel all SLURM jobs tracked in jobs.txt +# +# This script is designed to run in GitLab CI after_script to ensure +# all SLURM jobs are cancelled when the pipeline is interrupted, cancelled, +# or times out. +# +# Usage: +# bash cleanup_jobs.sh +# +# Environment variables: +# OUTPUT_PATH: Directory containing jobs.txt and pytest.pid + +set -e + +OUTPUT_PATH="${OUTPUT_PATH:-/tmp}" +JOBS_FILE="${OUTPUT_PATH}/jobs.txt" +PID_FILE="${OUTPUT_PATH}/pytest.pid" + +echo "==========================================" +echo "SLURM Job Cleanup Script" +echo "==========================================" +echo "Output path: $OUTPUT_PATH" +echo "" + +# Show pytest PID if available (for debugging) +if [ -f "$PID_FILE" ]; then + PYTEST_PID=$(cat "$PID_FILE" | tr -d '\n') + echo "Pytest PID: $PYTEST_PID" + + # Check if pytest is still running + if kill -0 "$PYTEST_PID" 2>/dev/null; then + echo "Status: Still running" + else + echo "Status: Already terminated" + fi + echo "" +else + echo "No pytest.pid found (test may not have started)" + echo "" +fi + +# Check if jobs.txt exists +if [ ! -f "$JOBS_FILE" ]; then + echo "[WARN] No jobs.txt found" + echo " Nothing to cancel" + echo "==========================================" + exit 0 +fi + +echo "[INFO] Reading jobs from: $JOBS_FILE" + +# Read, deduplicate, and filter empty lines +JOBS=$(sort -u "$JOBS_FILE" | grep -v '^$' || true) + +if [ -z "$JOBS" ]; then + echo "[WARN] jobs.txt is empty" + echo " Nothing to cancel" + echo "==========================================" + exit 0 +fi + +JOB_COUNT=$(echo "$JOBS" | wc -l) +echo "Found $JOB_COUNT job(s) to cancel" +echo "" + +# Cancel each job +CANCELLED=0 +ALREADY_DONE=0 +FAILED=0 + +echo "Cancelling jobs..." +while IFS= read -r job_id; do + if [ -n "$job_id" ]; then + printf " %-12s ... " "$job_id" + + # Try to cancel the job + if scancel "$job_id" 2>/dev/null; then + echo "[OK] Cancelled" + CANCELLED=$((CANCELLED + 1)) + else + # Check if job exists in squeue + if squeue -j "$job_id" -h 2>/dev/null | grep -q "$job_id"; then + echo "[FAIL] Failed to cancel" + FAILED=$((FAILED + 1)) + else + echo "[SKIP] Already finished" + ALREADY_DONE=$((ALREADY_DONE + 1)) + fi + fi + fi +done <<< "$JOBS" + +echo "" +echo "==========================================" +echo "[DONE] Cleanup completed" +echo " Total: $JOB_COUNT" +echo " Cancelled: $CANCELLED" +echo " Already done: $ALREADY_DONE" +echo " Failed: $FAILED" +echo "==========================================" + +# Exit with error if any cancellation actually failed +if [ $FAILED -gt 0 ]; then + exit 1 +fi + +exit 0 diff --git a/tests/integration/defs/perf/disagg/conftest.py b/tests/integration/defs/perf/disagg/conftest.py index a4b88542df..6b13f92919 100644 --- a/tests/integration/defs/perf/disagg/conftest.py +++ b/tests/integration/defs/perf/disagg/conftest.py @@ -151,6 +151,7 @@ class BatchManager: self.submitted_batches = set() # Track which batch numbers have been submitted self.job_mapping = {} # Map test_id -> SLURM job_id + self.submit_errors = {} # Map test_id -> error message (validation/submission failures) self.all_configs = [] # Ordered list of all test configs logger.info(f"\n{'=' * 70}") @@ -214,6 +215,8 @@ class BatchManager: batch_num: Batch number to submit (0-indexed) """ from execution.executor import JobManager + from utils.config_validator import ConfigValidator + from utils.job_tracker import JobTracker # Calculate batch range if self.batch_size: @@ -230,33 +233,56 @@ class BatchManager: logger.info(f"Range: [{start_idx}:{end_idx}] ({len(batch_configs)} jobs)") logger.info(f"{'=' * 70}\n") - # Submit all jobs in this batch + # Pre-validate all configs before submission + logger.info("Pre-validating configurations...") + valid_configs = [] + for config in batch_configs: + try: + ConfigValidator.validate_test_config(config) + valid_configs.append(config) + except Exception as e: + # Validation failed - mark as None and record error + self.job_mapping[config.test_id] = None + self.submit_errors[config.test_id] = f"Validation failed: {str(e)}" + logger.error(f" [FAILED] Validation failed: {config.test_id}") + logger.error(f" Error: {str(e)[:100]}") + + logger.info( + f"Validation complete: {len(valid_configs)}/{len(batch_configs)} configs valid\n" + ) + + # Submit only valid configs success_count = 0 - for i, config in enumerate(batch_configs, 1): + for i, config in enumerate(valid_configs, 1): try: success, job_id = JobManager.submit_test_job(config) if success and job_id: self.job_mapping[config.test_id] = job_id + JobTracker.record_job(job_id) # Record job ID for cleanup success_count += 1 - # Truncate test_id for display - display_id = ( - config.test_id[:60] + "..." if len(config.test_id) > 60 else config.test_id + logger.success( + f" [{i:3d}/{len(valid_configs)}] Job {job_id} <- {config.test_id}" ) - logger.success(f" [{i:3d}/{len(batch_configs)}] Job {job_id} <- {display_id}") else: + # Submission failed - mark as None and record error self.job_mapping[config.test_id] = None - logger.error(f" [{i:3d}/{len(batch_configs)}] Failed: {config.test_id[:50]}") + self.submit_errors[config.test_id] = f"Job submission failed: {job_id}" + logger.error(f" [{i:3d}/{len(valid_configs)}] Failed: {config.test_id}") except Exception as e: + # Submission exception - mark as None and record error self.job_mapping[config.test_id] = None - logger.error(f" [{i:3d}/{len(batch_configs)}] Error: {e}") + self.submit_errors[config.test_id] = f"Submission exception: {str(e)}" + logger.error(f" [{i:3d}/{len(valid_configs)}] Error: {e}") # Mark batch as submitted self.submitted_batches.add(batch_num) logger.info(f"\n{'=' * 70}") logger.success( - f"Batch {batch_num} Complete: {success_count}/{len(batch_configs)} succeeded" + f"Batch {batch_num} Complete: {success_count}/{len(valid_configs)} submitted successfully" ) + if len(valid_configs) < len(batch_configs): + logger.warning(f"Skipped {len(batch_configs) - len(valid_configs)} invalid config(s)") logger.info(f"{'=' * 70}\n") diff --git a/tests/integration/defs/perf/disagg/execution/executor.py b/tests/integration/defs/perf/disagg/execution/executor.py index 547b63aa8c..8dec350f77 100644 --- a/tests/integration/defs/perf/disagg/execution/executor.py +++ b/tests/integration/defs/perf/disagg/execution/executor.py @@ -271,7 +271,7 @@ class JobManager: @staticmethod def backup_logs( - job_id: str, + job_id: Optional[str], test_config, result_dir: str, is_passed: bool, @@ -279,13 +279,18 @@ class JobManager: """Backup logs and config files to test_id directory. Args: - job_id: SLURM job ID + job_id: SLURM job ID (None if submission failed) test_config: TestConfig object result_dir: Result directory path (already named as test_id) is_passed: Whether the job passed Returns: Final directory path if successful, None otherwise """ + if job_id is None: + logger.warning(f"Job submission failed for {test_config.test_id}") + else: + logger.info(f"Backing up logs for job {job_id} ({test_config.test_id})") + if not os.path.exists(result_dir): logger.warning(f"Result directory does not exist yet: {result_dir}") return None diff --git a/tests/integration/defs/perf/disagg/reporting/accuracy_validator.py b/tests/integration/defs/perf/disagg/reporting/accuracy_validator.py index 70cd6e7514..14e0a1cdfb 100644 --- a/tests/integration/defs/perf/disagg/reporting/accuracy_validator.py +++ b/tests/integration/defs/perf/disagg/reporting/accuracy_validator.py @@ -92,6 +92,13 @@ class HypothesisTestingParams: # Dataset default parameters for hypothesis testing # Extracted from accuracy_core.py AccuracyTask subclasses DATASET_DEFAULTS = { + "aime25": { + "alpha": 0.05, + "beta": 0.2, + "sigma": 50, + "num_samples": 30, # AIME 2025 full sample size + "higher_is_better": True, + }, "gsm8k": { "alpha": 0.05, "beta": 0.2, @@ -127,6 +134,14 @@ DATASET_DEFAULTS = { "num_samples": 198, "higher_is_better": True, }, + # Alias for gpqa_diamond (same task, different naming convention) + "gpqa_diamond_cot_zeroshot": { + "alpha": 0.05, + "beta": 0.2, + "sigma": 50, + "num_samples": 198, + "higher_is_better": True, + }, "json_mode_eval": { "alpha": 0.05, "beta": 0.2, diff --git a/tests/integration/defs/perf/disagg/session_collect.sh b/tests/integration/defs/perf/disagg/session_collect.sh index 30cd3c4c1d..f21e90887d 100644 --- a/tests/integration/defs/perf/disagg/session_collect.sh +++ b/tests/integration/defs/perf/disagg/session_collect.sh @@ -22,44 +22,18 @@ cd "$WORK_DIR" python3 "$WORK_DIR/simple_collect.py" "$OUTPUT_PATH" 2>&1 echo "System information collection completed" -# Step 2: Handle different installation modes -echo "" -echo "Step 2: Installing TensorRT-LLM..." +# Step 2: Collect TensorRT-LLM version information (only for none mode) if [ "$INSTALL_MODE" = "none" ]; then - echo "Using built-in TensorRT-LLM, skipping installation" - -elif [ "$INSTALL_MODE" = "wheel" ]; then - echo "Installing TensorRT-LLM wheel..." - echo "Wheel path pattern: $WHEEL_PATH" - - # Expand wildcard and install - for wheel_file in $WHEEL_PATH; do - if [ -f "$wheel_file" ]; then - echo "Found wheel: $wheel_file" - pip3 install "$wheel_file" 2>&1 || echo "Wheel install failed, continuing..." - break - fi - done - echo "Wheel installation completed" - -elif [ "$INSTALL_MODE" = "source" ]; then - echo "Installing TensorRT-LLM from source..." - cd "$REPO_DIR" - pip3 install -e . 2>&1 || echo "Source install failed, continuing..." - echo "Source installation completed" - + echo "" + echo "Step 2: Collecting TensorRT-LLM version information..." + VERSION_FILE="$OUTPUT_PATH/trtllm_version.txt" + python3 -c "import tensorrt_llm; print(f'[TensorRT-LLM] TensorRT-LLM version: {tensorrt_llm.__version__}')" > "$VERSION_FILE" 2>&1 || echo "[TensorRT-LLM] TensorRT-LLM version: unknown" > "$VERSION_FILE" + echo "TensorRT-LLM version written to: $VERSION_FILE" else - echo "ERROR: Invalid install mode: $INSTALL_MODE" - exit 1 + echo "" + echo "Step 2: Skipping TensorRT-LLM version collection (install_mode=$INSTALL_MODE)" fi -# Step 3: Collect TensorRT-LLM version information -echo "" -echo "Step 3: Collecting TensorRT-LLM version information..." -VERSION_FILE="$OUTPUT_PATH/trtllm_version.txt" -python3 -c "import tensorrt_llm; print(f'[TensorRT-LLM] TensorRT-LLM version: {tensorrt_llm.__version__}')" > "$VERSION_FILE" 2>&1 || echo "[TensorRT-LLM] TensorRT-LLM version: unknown" > "$VERSION_FILE" -echo "TensorRT-LLM version written to: $VERSION_FILE" - echo "" echo "==========================================" echo "Session Collect Job Completed" diff --git a/tests/integration/defs/perf/disagg/test_configs/disagg/perf/deepseek-r1-fp4_128k8k_ctx2_pp8_gen1_dep16_bs8_eplb0_mtp0-Default.yaml b/tests/integration/defs/perf/disagg/test_configs/disagg/perf/deepseek-r1-fp4_128k8k_ctx2_pp8_gen1_dep16_bs8_eplb0_mtp0-Default.yaml index c9e4c90564..ff9a9e62cf 100644 --- a/tests/integration/defs/perf/disagg/test_configs/disagg/perf/deepseek-r1-fp4_128k8k_ctx2_pp8_gen1_dep16_bs8_eplb0_mtp0-Default.yaml +++ b/tests/integration/defs/perf/disagg/test_configs/disagg/perf/deepseek-r1-fp4_128k8k_ctx2_pp8_gen1_dep16_bs8_eplb0_mtp0-Default.yaml @@ -77,12 +77,12 @@ worker_config: stream_interval: 20 num_postprocess_workers: 4 ctx: - max_batch_size: 8 + max_batch_size: 1 max_num_tokens: 131104 max_seq_len: 131104 tensor_parallel_size: 1 moe_expert_parallel_size: 1 - enable_attention_dp: true + enable_attention_dp: false pipeline_parallel_size: 8 print_iter_log: true cuda_graph_config: null diff --git a/tests/integration/defs/perf/disagg/test_disagg.py b/tests/integration/defs/perf/disagg/test_disagg.py index b60ba85196..36e786949d 100644 --- a/tests/integration/defs/perf/disagg/test_disagg.py +++ b/tests/integration/defs/perf/disagg/test_disagg.py @@ -47,6 +47,11 @@ else: @pytest.fixture(scope="session", autouse=True) def session_lifecycle(): """Session lifecycle management.""" + from utils.job_tracker import JobTracker + + # Record pytest main process PID for GitLab CI cleanup + JobTracker.record_pid() + session_tracker.start() try: yield @@ -66,11 +71,8 @@ class TestDisaggBenchmark: """Performance benchmark test for YAML configurations.""" full_test_name = request.node.name - # Validate configuration first (before any other operations) - try: - ConfigValidator.validate_test_config(test_config) - except Exception as e: - pytest.fail(f"Configuration validation failed: {e}") + # Note: Configuration validation is done during batch submission (in conftest.py) + # If validation failed, job_id will be None and the assert below will fail # Create test case tracker test_tracker = TestCaseTracker() @@ -104,8 +106,11 @@ class TestDisaggBenchmark: # Get job_id from batch manager (auto-submits batch if needed) job_id = batch_manager.get_job_id(test_config) - # Validate submission result - assert job_id, f"Failed to get job_id for {test_config.test_id}" + # Validate submission result (will be None if validation/submission failed) + error_msg = batch_manager.submit_errors.get( + test_config.test_id, "Check batch submission logs for details" + ) + assert job_id, f"Failed to submit job for {test_config.test_id}\n{error_msg}" # Wait for completion (timeout: 10 hours = 36000 seconds) JobManager.wait_for_completion(job_id, 36000, test_config, check_early_failure=True) @@ -125,13 +130,12 @@ class TestDisaggBenchmark: raise e finally: # Always backup logs, regardless of success or failure - if job_id: - result_dir = JobManager.get_result_dir(test_config) - is_passed = result.get("success", False) if result else False - try: - JobManager.backup_logs(job_id, test_config, result_dir, is_passed) - except Exception as backup_error: - logger.error(f"Failed to backup logs: {backup_error}") + result_dir = JobManager.get_result_dir(test_config) + is_passed = result.get("success", False) if result else False + try: + JobManager.backup_logs(job_id, test_config, result_dir, is_passed) + except Exception as backup_error: + logger.error(f"Failed to backup logs: {backup_error}") @pytest.mark.accuracy @pytest.mark.parametrize("test_config", ACCURACY_TEST_CASES) @@ -204,13 +208,12 @@ class TestDisaggBenchmark: raise e finally: # Always backup logs, regardless of success or failure - if job_id: - result_dir = JobManager.get_result_dir(test_config) - is_passed = result.get("success", False) if result else False - try: - JobManager.backup_logs(job_id, test_config, result_dir, is_passed) - except Exception as backup_error: - logger.error(f"Failed to backup logs: {backup_error}") + result_dir = JobManager.get_result_dir(test_config) + is_passed = result.get("success", False) if result else False + try: + JobManager.backup_logs(job_id, test_config, result_dir, is_passed) + except Exception as backup_error: + logger.error(f"Failed to backup logs: {backup_error}") @pytest.mark.stress @pytest.mark.parametrize("test_config", STRESS_TEST_CASES) @@ -222,11 +225,8 @@ class TestDisaggBenchmark: """ full_test_name = request.node.name - # Validate configuration first (before any other operations) - try: - ConfigValidator.validate_test_config(test_config) - except Exception as e: - pytest.fail(f"Configuration validation failed: {e}") + # Note: Configuration validation is done during batch submission (in conftest.py) + # If validation failed, job_id will be None and the assert below will fail # Create test case tracker test_tracker = TestCaseTracker() @@ -266,8 +266,11 @@ class TestDisaggBenchmark: # Get job_id from batch manager (auto-submits batch if needed) job_id = batch_manager.get_job_id(test_config) - # Validate submission result - assert job_id, f"Failed to get job_id for {test_config.test_id}" + # Validate submission result (will be None if validation/submission failed) + error_msg = batch_manager.submit_errors.get( + test_config.test_id, "Check batch submission logs for details" + ) + assert job_id, f"Failed to submit job for {test_config.test_id}\n{error_msg}" # Wait for completion (timeout: 10 hours = 36000 seconds) JobManager.wait_for_completion(job_id, 36000, test_config, check_early_failure=True) @@ -287,13 +290,12 @@ class TestDisaggBenchmark: raise e finally: # Always backup logs, regardless of success or failure - if job_id: - result_dir = JobManager.get_result_dir(test_config) - is_passed = result.get("success", False) if result else False - try: - JobManager.backup_logs(job_id, test_config, result_dir, is_passed) - except Exception as backup_error: - logger.error(f"Failed to backup logs: {backup_error}") + result_dir = JobManager.get_result_dir(test_config) + is_passed = result.get("success", False) if result else False + try: + JobManager.backup_logs(job_id, test_config, result_dir, is_passed) + except Exception as backup_error: + logger.error(f"Failed to backup logs: {backup_error}") if __name__ == "__main__": diff --git a/tests/integration/defs/perf/disagg/utils/job_tracker.py b/tests/integration/defs/perf/disagg/utils/job_tracker.py new file mode 100644 index 0000000000..0935839efa --- /dev/null +++ b/tests/integration/defs/perf/disagg/utils/job_tracker.py @@ -0,0 +1,61 @@ +"""Simple job and process tracker for GitLab CI cleanup.""" + +import os + +from utils.common import EnvManager +from utils.logger import logger + + +class JobTracker: + """Track SLURM job IDs and pytest PID for GitLab CI cleanup.""" + + @staticmethod + def get_jobs_file() -> str: + """Get jobs.txt file path in output_path.""" + output_path = EnvManager.get_output_path() + return os.path.join(output_path, "jobs.txt") + + @staticmethod + def get_pid_file() -> str: + """Get pytest.pid file path in output_path.""" + output_path = EnvManager.get_output_path() + return os.path.join(output_path, "pytest.pid") + + @staticmethod + def record_pid(): + """Record pytest main process PID to pytest.pid file.""" + pid = os.getpid() + pid_file = JobTracker.get_pid_file() + try: + # Ensure output directory exists + os.makedirs(os.path.dirname(pid_file), exist_ok=True) + + # Write PID + with open(pid_file, "w") as f: + f.write(f"{pid}\n") + f.flush() + + logger.info(f"Recorded pytest PID: {pid} -> {pid_file}") + except Exception as e: + logger.warning(f"Failed to record PID: {e}") + + @staticmethod + def record_job(job_id: str): + """Append SLURM job ID to jobs.txt file. + + Args: + job_id: SLURM job ID to record + """ + jobs_file = JobTracker.get_jobs_file() + try: + # Ensure output directory exists + os.makedirs(os.path.dirname(jobs_file), exist_ok=True) + + # Append job ID + with open(jobs_file, "a") as f: + f.write(f"{job_id}\n") + f.flush() + + logger.debug(f"Recorded SLURM job: {job_id}") + except Exception as e: + logger.warning(f"Failed to record job ID {job_id}: {e}") diff --git a/tests/integration/defs/perf/disagg/utils/trackers.py b/tests/integration/defs/perf/disagg/utils/trackers.py index acee8d7fd6..45aefc7f81 100644 --- a/tests/integration/defs/perf/disagg/utils/trackers.py +++ b/tests/integration/defs/perf/disagg/utils/trackers.py @@ -79,6 +79,8 @@ class SessionTracker: Uses the new sbatch-based approach for non-blocking execution. Submits the job and waits for completion using JobManager. """ + from utils.job_tracker import JobTracker + self.end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") logger.info(f"Session ended: {self.end_time}") @@ -89,6 +91,9 @@ class SessionTracker: logger.error(f"Failed to submit session collect job: {job_id}") return False + # Record session collect job ID for cleanup + JobTracker.record_job(job_id) + # Wait for job completion (reuses wait_for_completion method) logger.info(f"Waiting for session collect job {job_id} to complete...") JobManager.wait_for_completion(