Files
llama.cpp/tools/server/server-models.h

312 lines
12 KiB
C++

#pragma once
#include "common.h"
#include "download.h"
#include "preset.h"
#include "server-common.h"
#include "server-http.h"
#include "server-queue.h"
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <memory>
#include <set>
#include <unordered_map>
/**
* state diagram:
*
* DOWNLOADING ──► DOWNLOADED ──► (replaced by new instance)
*
* UNLOADED ──► LOADING ──► LOADED ◄──── SLEEPING
* ▲ │ │ ▲
* └───failed───┘ │ │
* ▲ └──sleeping─────┘
* └────────unloaded─────────┘
*/
enum server_model_status {
// TODO: also add downloading state when the logic is added
SERVER_MODEL_STATUS_DOWNLOADING,
SERVER_MODEL_STATUS_DOWNLOADED,
SERVER_MODEL_STATUS_UNLOADED,
SERVER_MODEL_STATUS_LOADING,
SERVER_MODEL_STATUS_LOADED,
SERVER_MODEL_STATUS_SLEEPING
};
enum server_model_source {
SERVER_MODEL_SOURCE_PRESET,
SERVER_MODEL_SOURCE_MODELS_DIR,
SERVER_MODEL_SOURCE_CACHE,
};
static std::string server_model_status_to_string(server_model_status status) {
switch (status) {
case SERVER_MODEL_STATUS_DOWNLOADING: return "downloading";
case SERVER_MODEL_STATUS_DOWNLOADED: return "downloaded";
case SERVER_MODEL_STATUS_UNLOADED: return "unloaded";
case SERVER_MODEL_STATUS_LOADING: return "loading";
case SERVER_MODEL_STATUS_LOADED: return "loaded";
case SERVER_MODEL_STATUS_SLEEPING: return "sleeping";
default: return "unknown";
}
}
static std::string server_model_source_to_string(server_model_source source) {
switch (source) {
case SERVER_MODEL_SOURCE_PRESET: return "preset";
case SERVER_MODEL_SOURCE_MODELS_DIR: return "models_dir";
case SERVER_MODEL_SOURCE_CACHE: return "cache";
default: return "unknown";
}
}
using buft_memory_map = std::map<ggml_backend_buffer_type_t, size_t>;
struct server_model_meta {
server_model_source source = SERVER_MODEL_SOURCE_CACHE;
common_preset preset;
std::string name;
std::set<std::string> aliases; // additional names that resolve to this model
std::set<std::string> tags; // informational tags, not used for routing
int port = 0;
server_model_status status = SERVER_MODEL_STATUS_UNLOADED;
int64_t last_used = 0; // for LRU unloading
buft_memory_map bmm_req; // bytes required per buffer type
std::vector<std::string> args; // args passed to the model instance, will be populated by render_args()
json loaded_info; // info to be reflected via /v1/models endpoint ; if in DOWNLOADING state, it should contain download progress info
int exit_code = 0; // exit code of the model instance process (only valid if status == FAILED)
int stop_timeout = 0; // seconds to wait before force-killing the model instance during shutdown
mtmd_caps multimodal; // multimodal capabilities
// bool need_download = false; // whether the model needs to be downloaded before loading // TODO @ngxson: implement this
bool is_ready() const {
return status == SERVER_MODEL_STATUS_LOADED;
}
bool is_running() const {
return status == SERVER_MODEL_STATUS_LOADED || status == SERVER_MODEL_STATUS_LOADING || status == SERVER_MODEL_STATUS_SLEEPING;
}
bool is_failed() const {
return status == SERVER_MODEL_STATUS_UNLOADED && exit_code != 0;
}
void update_args(common_preset_context & ctx_presets, std::string bin_path);
void update_caps();
};
struct server_models_routes;
struct server_subproc; // defined in server-models.cpp
struct server_models {
friend struct server_models_routes;
private:
struct instance_t {
std::shared_ptr<server_subproc> subproc; // shared between main thread and monitoring thread
std::thread th;
server_model_meta meta;
FILE * stdin_file = nullptr;
};
std::mutex mutex;
std::condition_variable cv;
std::map<std::string, instance_t> mapping;
// for stopping models
std::condition_variable cv_stop;
std::set<std::string> stopping_models;
// background tasks for download/estimate/load pipelines, keyed by model name
struct bg_task {
std::thread th;
std::atomic<bool> done{false};
};
std::map<std::string, std::unique_ptr<bg_task>> bg_tasks;
// set to true while load_models() is executing a reload; load() will wait until clear
bool is_reloading = false;
// if true, the next get_meta() will trigger a reload of model list
bool need_reload = false;
common_preset_context ctx_preset;
common_params base_params;
std::string bin_path;
std::vector<std::string> base_env;
common_preset base_preset; // base preset from llama-server CLI args
// available memory per buffer type
buft_memory_map bmm_available;
// buft name -> buft lookup (host buffer types map to CPU buft)
std::unordered_map<std::string, ggml_backend_buffer_type_t> buft_by_name;
void update_meta(const std::string & name, const server_model_meta & meta);
// unload least recently used models if the limit is reached
void unload_lru(const buft_memory_map & bmm_req);
// not thread-safe, caller must hold mutex
void add_model(server_model_meta && meta);
// notify SSE clients
void notify_sse(const std::string & event, const std::string & model_id, const json & data = nullptr);
// return number of buffer types where the memory limit would be exceeded
// return 0 if the new model would fit
// not thread-safe, caller must hold mutex
int can_fit(const buft_memory_map & bmm_req) const;
// check if active model count or memory limits would be exceeded
// not thread-safe, caller must hold mutex
bool limits_exceeded(const buft_memory_map & bmm_req) const;
// estimate model memory by spawning a child process with --measure-only
// returns the buft memory map, or empty map on failure (caller must NOT hold mutex)
buft_memory_map estimate_model_memory(const std::string & name);
// join and remove completed background tasks
void join_completed_bg_tasks();
public:
server_models(const common_params & params, int argc, char ** argv);
server_response sse; // for real-time updates via SSE endpoint
// (re-)load the list of models from various sources and prepare the metadata mapping
// - if this is called the first time, simply populate the metadata
// - if this is called subsequently (e.g. when refreshing from disk):
// - if a model is running but updated or removed from the source, it will be unloaded
// - if a model is not running, it will be added or updated according to the source
void load_models();
// check if a model instance exists (thread-safe)
bool has_model(const std::string & name);
// return a copy of model metadata (thread-safe)
std::optional<server_model_meta> get_meta(const std::string & name);
// return a copy of all model metadata (thread-safe)
std::vector<server_model_meta> get_all_meta();
// load and unload model instances
// these functions are thread-safe
void load(const std::string & name);
void unload(const std::string & name);
void unload_all();
// download a new model, progress is reported via SSE
// to stop the download, call unload()
void download(common_params_model && model, common_download_opts && opts);
// update the status of a model instance (thread-safe)
void update_status(const std::string & name, server_model_status status, int exit_code);
void update_loaded_info(const std::string & name, std::string & raw_info);
void update_download_progress(const std::string & name, const common_download_progress & progress, bool done, bool ok = true);
// remove a cache model from disk and update the list (thread-safe)
// note: only cache models can be removed; returns false if the model doesn't exist or is not a cache model
bool remove(const std::string & name);
// wait until the model instance is fully loaded (thread-safe)
// note: predicate is called while holding the lock
// return when the model no longer in "loading" state
void wait(const std::string & name, std::function<bool(const server_model_meta &)> predicate);
void wait(std::unique_lock<std::mutex> & lk, const std::string & name, std::function<bool(const server_model_meta &)> predicate);
// ensure the model is in ready state (thread-safe)
// return false if model is ready
// otherwise, load the model and blocking wait until it's ready, then return true (meta may need to be refreshed)
bool ensure_model_ready(const std::string & name);
// proxy an HTTP request to the model instance
server_http_res_ptr proxy_request(const server_http_req & req, const std::string & method, const std::string & name, bool update_last_used);
// return true if the current process is a child server instance
static bool is_child_server();
// notify the router server that a model instance is ready
// return the monitoring thread (to be joined by the caller)
static std::thread setup_child_server(const std::function<void(int)> & shutdown_handler, const json & model_info);
// notify the router server that the sleeping state has changed
static void notify_router_sleeping_state(bool sleeping);
};
struct server_models_routes {
common_params params;
json ui_settings = json::object(); // Primary: new name
json webui_settings = json::object(); // Deprecated: use ui_settings (kept for compat)
std::atomic<bool> stopping = false; // for graceful disconnecting SSE clients during shutdown
server_models models;
server_models_routes(const common_params & params, int argc, char ** argv)
: params(params), models(params, argc, argv) {
// Support both new ui_config_json and deprecated webui_config_json
const std::string & cfg = !this->params.ui_config_json.empty()
? this->params.ui_config_json
: this->params.webui_config_json;
if (!cfg.empty()) {
try {
json json_settings = json::parse(cfg);
ui_settings = json_settings;
webui_settings = json_settings; // Deprecated: keep in sync
} catch (const std::exception & e) {
LOG_ERR("%s: failed to parse UI config: %s\n", __func__, e.what());
throw;
}
}
init_routes();
}
void init_routes();
// handlers using lambda function, so that they can capture `this` without `std::bind`
server_http_context::handler_t get_router_props;
server_http_context::handler_t proxy_get;
server_http_context::handler_t proxy_post;
server_http_context::handler_t get_router_models;
server_http_context::handler_t post_router_models_load;
server_http_context::handler_t post_router_models_unload;
// management API
server_http_context::handler_t get_router_models_sse;
server_http_context::handler_t post_router_models;
server_http_context::handler_t del_router_models;
};
/**
* A simple HTTP proxy that forwards requests to another server
* and relays the responses back.
*/
struct server_http_proxy : server_http_res {
std::function<void()> cleanup = nullptr;
public:
server_http_proxy(const std::string & method,
const std::string & scheme,
const std::string & host,
int port,
const std::string & path,
const std::map<std::string, std::string> & headers,
const std::string & body,
const std::map<std::string, uploaded_file> & files,
const std::function<bool()> should_stop,
int32_t timeout_read,
int32_t timeout_write
);
~server_http_proxy() {
if (cleanup) {
cleanup();
}
}
private:
std::thread thread;
struct msg_t {
std::map<std::string, std::string> headers;
int status = 0;
std::string data;
std::string content_type;
};
};