TensorRT-LLMs/scripts/rename_docker_images.py
ixlmar 10e686466e
fix: use current_image_tags.properties in rename_docker_images.py (#5846)
Signed-off-by: ixlmar <206748156+ixlmar@users.noreply.github.com>
2025-07-09 17:07:52 +09:00

222 lines
7.7 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse as _ap
import datetime as _dt
import os
import pathlib as _pl
import subprocess as _sp
CURRENT_TAG_FILE = "current_image_tags.properties"
IMAGE_MAPPING = {
"LLM_DOCKER_IMAGE":
"urm.nvidia.com/sw-tensorrt-docker/tensorrt-llm-staging/__stage__:x86_64-__stage__-torch_skip",
"LLM_SBSA_DOCKER_IMAGE":
"urm.nvidia.com/sw-tensorrt-docker/tensorrt-llm-staging/__stage__:sbsa-__stage__-torch_skip",
"LLM_ROCKYLINUX8_PY310_DOCKER_IMAGE":
"urm.nvidia.com/sw-tensorrt-docker/tensorrt-llm-staging/__stage__:x86_64-rockylinux8-torch_skip-py310",
"LLM_ROCKYLINUX8_PY312_DOCKER_IMAGE":
"urm.nvidia.com/sw-tensorrt-docker/tensorrt-llm-staging/__stage__:x86_64-rockylinux8-torch_skip-py312",
}
def parse_arguments() -> _ap.Namespace:
parser = _ap.ArgumentParser(
description="Rename Docker images based on the given instructions.")
parser.add_argument(
'src_branch',
type=str,
help="The name of the source branch releasing the Docker image.")
parser.add_argument(
'src_build_id',
type=int,
help="The name of the source build id release the Docker image.")
parser.add_argument(
'dst_mr',
type=int,
help="The number of the merge request for the destination image.")
parser.add_argument(
'--dry-run',
action='store_true',
help="Simulate the rename process without making any changes.")
parser.add_argument(
"--timestamp",
type=str,
required=False,
help="The timestamp to use for the destination image name.")
parser.add_argument(
"--stage",
type=str,
required=False,
default="tritondevel",
help=
"The new stage part of the destination image name (default: tritondevel)."
)
return parser.parse_args()
def get_current_timestamp() -> str:
"""Get the current timestamp in YYYYMMDDhhmm format."""
return _dt.datetime.now(_dt.timezone.utc).strftime("%Y%m%d%H%M")
def run_shell_command(command: str, dry_run: bool) -> None:
"""Run a shell command and display its output.
Args:
command (str): The shell command to execute.
"""
print(command)
if not dry_run:
_sp.run(command, shell=True, check=True, text=True)
def find_script_directory() -> _pl.Path:
"""Find the directory containing this Python script.
Returns:
str: The absolute path of the script's directory.
"""
return _pl.Path(__file__).resolve().parent
def extract_line_after_prefix(file_path: _pl.Path, prefix: str) -> str | None:
"""
Extracts the line starting with a certain prefix from the given file.
Args:
file_path (Path): Path to the file.
prefix (str): Prefix to search for.
Returns:
str: The line starting with the prefix, or None if no such line exists.
"""
with open(file_path, 'r') as file:
for line in file:
if line.startswith(prefix):
return line[len(prefix):].strip()
return None
def image_prefix(full_image_name: str) -> str:
"""
Extracts the image prefix from a full image name.
Args:
full_image_name (str): Full image name.
Returns:
str: Image prefix.
"""
dash = '-'
last_index = full_image_name.rfind(dash) # Find the last occurrence
if last_index == -1:
raise ValueError("Invalid image name format")
second_last_index = full_image_name.rfind(
dash, 0, last_index) # Look for the next occurrence before the last
if second_last_index == -1:
raise ValueError("Invalid image name format")
return full_image_name[:second_last_index]
def replace_text_between_dashes(input_string: str, dash_idx: int,
replacement: str) -> str:
"""
Replace the text between the third and second '-' from the back in a string.
Args:
input_string (str): Original string.
dash_idx (int): Index of the dash to start replacing (from the back).
replacement (str): Replacement text.
Returns:
str: Updated string with the replacement applied.
Raises:
ValueError: If the input string does not have at least three dashes.
"""
dash_indices = [i for i, char in enumerate(input_string) if char == '-']
if len(dash_indices) < dash_idx:
raise ValueError(f"Input string must have at least {dash_idx} dashes.")
# Find the indices of third and second last dashes
start_dash_idx = dash_indices[-dash_idx]
stop_dash_idx = dash_indices[-dash_idx + 1]
# Replace the segment between the dashes
updated_string = (input_string[:start_dash_idx + 1] + replacement +
input_string[stop_dash_idx:])
return updated_string
def find_and_replace_in_files(directory, file_extension: str,
search_string: str, replace_string: str,
dry_run: bool) -> None:
"""
Perform find-and-replace in all files within a directory tree matching a specific extension.
Args:
directory (str or PathLike): Root directory of the search.
file_extension (str): File extension to filter (e.g., ".txt").
search_string (str): String to search for.
replace_string (str): String to replace the search string with.
dry_run (bool): Whether to perform the find-and-replace operation or not.
"""
for root, _, files in os.walk(directory):
for file in files:
if file.endswith(file_extension):
file_path = os.path.join(root, file)
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# Replace strings in file content
updated_content = content.replace(search_string, replace_string)
if content != updated_content:
print(f"Updating {file_path}")
if not dry_run:
with open(file_path, "w", encoding="utf-8") as f:
f.write(updated_content)
def rename_images(*,
src_branch: str,
src_build_id: int,
dst_mr: int,
stage: str,
timestamp: str | None = None,
dry_run: bool = False) -> None:
print(
f"Renaming images for branch {src_branch} and build id {src_build_id} to {dst_mr}"
)
if dry_run:
print("Dry-run mode enabled. No actual changes will be made.")
else:
print("Renaming images...")
timestamp = timestamp or get_current_timestamp()
src_branch_sanitized = src_branch.replace("/", "_")
base_dir = find_script_directory().parent
current_tags_path = base_dir / "jenkins" / CURRENT_TAG_FILE
for dst_key, src_pattern in IMAGE_MAPPING.items():
print(f"Processing {dst_key} ...")
src_image = f"{src_pattern}-{src_branch_sanitized}-{src_build_id}".replace(
"__stage__", stage)
dst_image_old = extract_line_after_prefix(current_tags_path,
dst_key + "=").strip('"')
dst_image = replace_text_between_dashes(
f"{image_prefix(dst_image_old)}-{timestamp}-{dst_mr}", 3, stage)
run_shell_command(f"docker pull {src_image}", dry_run)
run_shell_command(f"docker tag {src_image} {dst_image}", dry_run)
run_shell_command(f"docker push {dst_image}", dry_run)
find_and_replace_in_files(current_tags_path.parent,
current_tags_path.name, dst_image_old,
dst_image, dry_run)
def main() -> None:
args = parse_arguments()
rename_images(**vars(args))
if __name__ == "__main__":
main()