mtmd: refactor video subproc handling (#24316)

* mtmd: refactor video subproc handling

* Update tools/mtmd/mtmd-helper.cpp

Co-authored-by: Mikko Juola <mikjuo@gmail.com>

---------

Co-authored-by: Mikko Juola <mikjuo@gmail.com>
This commit is contained in:
Xuan-Son Nguyen
2026-06-09 12:15:12 +02:00
committed by GitHub
parent 1e912561dd
commit 9682e351b8
+66 -47
View File
@@ -617,10 +617,52 @@ struct mtmd_helper_video {
float fps_target = 0.0f;
mtmd_helper_video_info info = {};
struct subprocess_s proc = {};
bool proc_alive = false;
// RAII wrapper for managing subprocess
struct subprocess_handle {
struct subprocess_s proc = {};
bool alive = false;
std::thread feeder;
subprocess_handle() = default;
subprocess_handle(const subprocess_handle &) = delete;
subprocess_handle & operator=(const subprocess_handle &) = delete;
~subprocess_handle() { stop(); }
void stop() {
if (alive) {
subprocess_terminate(&proc);
}
// join before destroy: feeder holds a FILE* from subprocess_stdin;
// subprocess_destroy closes it, so the thread must finish first
if (feeder.joinable()) {
feeder.join();
}
if (alive) {
subprocess_destroy(&proc);
alive = false;
}
}
FILE * stdout_pipe() {
return subprocess_stdout(&proc);
}
// buf is tied to lifetime of mtmd_helper_video, so it's guaranteed to outlive the feeder thread
void start_feeder(const std::vector<uint8_t> & buf) {
feeder = std::thread([this, &buf]() {
FILE * f = subprocess_stdin(&proc);
if (!f) {
return;
}
fwrite(buf.data(), 1, buf.size(), f);
fclose(f);
proc.stdin_file = nullptr; // prevent double-close in subprocess_destroy
});
}
};
subprocess_handle sp;
int32_t current_frame = 0;
std::thread feeder_thread;
std::string prompt_start = "Video:";
int32_t timestamp_interval_ms = 5000; // emit a timestamp text every N ms (0 = disabled)
@@ -630,19 +672,8 @@ struct mtmd_helper_video {
std::string pending_text; // text queued to be returned before the next frame
bool start_emitted = false;
bool is_buf_input() const { return !input_buf.empty(); }
// must run in a separate thread alongside stdout reading to avoid pipe deadlock
void feed_stdin(struct subprocess_s * sp) {
FILE * f = subprocess_stdin(sp);
if (!f) {
LOG_DBG("%s: subprocess has no stdin pipe\n", __func__);
return;
}
LOG_DBG("%s: feeding %zu bytes to stdin\n", __func__, input_buf.size());
size_t written = fwrite(input_buf.data(), 1, input_buf.size(), f);
LOG_DBG("%s: wrote %zu bytes, closing stdin\n", __func__, written);
fclose(f);
bool is_buf_input() const {
return !input_buf.empty();
}
bool probe(float fps_target_arg) {
@@ -661,17 +692,17 @@ struct mtmd_helper_video {
for (size_t i = 0; cmd[i]; i++) { LOG_DBG(" %s", cmd[i]); }
LOG_DBG("\n");
struct subprocess_s fprobe;
subprocess_handle probe_sp;
if (subprocess_create(cmd,
subprocess_option_search_user_path | subprocess_option_inherit_environment,
&fprobe) != 0) {
&probe_sp.proc) != 0) {
LOG_ERR("%s: failed to launch ffprobe\n", __func__);
return false;
}
probe_sp.alive = true;
std::thread probe_feeder;
if (is_buf_input()) {
probe_feeder = std::thread([this, &fprobe]() { feed_stdin(&fprobe); });
probe_sp.start_feeder(input_buf);
}
uint32_t width = 0;
@@ -680,7 +711,7 @@ struct mtmd_helper_video {
float duration = -1.0f;
int32_t n_frames_orig = -1;
char line[256];
FILE * fp = subprocess_stdout(&fprobe);
FILE * fp = probe_sp.stdout_pipe();
while (fgets(line, sizeof(line), fp)) {
char * eq = strchr(line, '=');
@@ -704,13 +735,7 @@ struct mtmd_helper_video {
}
}
if (probe_feeder.joinable()) {
probe_feeder.join();
}
int ret_code;
subprocess_join(&fprobe, &ret_code);
subprocess_destroy(&fprobe);
probe_sp.stop();
if (width == 0 || height == 0 || orig_fps <= 0.0f) {
return false;
@@ -745,6 +770,7 @@ struct mtmd_helper_video {
cmd.push_back(seek_buf);
}
cmd.push_back("-nostdin");
cmd.push_back("-i");
// cache:pipe:0 wraps stdin with a seekable in-memory cache, letting ffmpeg seek
// backwards for container headers (e.g. MP4 moov atom at end of file)
@@ -781,34 +807,27 @@ struct mtmd_helper_video {
int ret = subprocess_create(
cmd.data(),
subprocess_option_search_user_path | subprocess_option_inherit_environment,
&proc);
&sp.proc);
proc_alive = (ret == 0);
LOG_DBG("%s: subprocess_create ret=%d proc_alive=%d\n", __func__, ret, (int)proc_alive);
sp.alive = (ret == 0);
LOG_DBG("%s: subprocess_create ret=%d proc_alive=%d\n", __func__, ret, (int)sp.alive);
if (proc_alive && is_buf_input()) {
if (sp.alive && is_buf_input()) {
LOG_DBG("%s: starting feeder thread for %zu-byte buffer\n", __func__, input_buf.size());
feeder_thread = std::thread([this]() { feed_stdin(&proc); });
sp.start_feeder(input_buf);
}
return proc_alive;
return sp.alive;
}
void stop_ffmpeg() {
if (proc_alive) {
subprocess_terminate(&proc);
subprocess_destroy(&proc);
proc_alive = false;
}
if (feeder_thread.joinable()) {
feeder_thread.join();
}
sp.stop();
}
mtmd_bitmap * read_next_frame() {
if (!proc_alive) return nullptr;
if (!sp.alive) return nullptr;
FILE * fp = subprocess_stdout(&proc);
FILE * fp = sp.stdout_pipe();
const size_t frame_size = (size_t)info.width * info.height * 3;
LOG_DBG("%s: reading frame %d, expecting %zu bytes (%ux%u)\n",
__func__, current_frame, frame_size, info.width, info.height);
@@ -820,7 +839,7 @@ struct mtmd_helper_video {
// clean EOF only if no bytes read yet; partial frame is an error
LOG_DBG("%s: fread returned 0 after %zu/%zu bytes (ferror=%d)\n",
__func__, total_read, frame_size, ferror(fp));
proc_alive = false;
sp.alive = false;
return nullptr;
}
total_read += n;
@@ -842,9 +861,9 @@ struct mtmd_helper_video {
}
LOG_DBG("%s: proc_alive=%d start_emitted=%d current_frame=%d\n",
__func__, (int)proc_alive, (int)start_emitted, current_frame);
__func__, (int)sp.alive, (int)start_emitted, current_frame);
if (!proc_alive) {
if (!sp.alive) {
return (current_frame == 0) ? -2 : -1;
}