mirror of
https://github.com/ggml-org/llama.cpp.git
synced 2026-06-26 14:20:21 +00:00
444 lines
20 KiB
Python
444 lines
20 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import math
|
|
|
|
from typing import Callable, Iterable, TYPE_CHECKING
|
|
|
|
import numpy as np
|
|
import torch
|
|
|
|
if TYPE_CHECKING:
|
|
from torch import Tensor
|
|
|
|
from .base import ModelBase, TextModel, gguf, logger
|
|
|
|
|
|
@ModelBase.register(
|
|
"LLaMAForCausalLM",
|
|
"LlamaForCausalLM",
|
|
"MistralForCausalLM",
|
|
"MixtralForCausalLM",
|
|
"VLlama3ForCausalLM",
|
|
"LlavaForConditionalGeneration",
|
|
"VoxtralForConditionalGeneration",
|
|
"LlamaForCausalLMEagle3",
|
|
"Eagle3Speculator",
|
|
"Eagle3DraftModel",
|
|
"IQuestCoderForCausalLM",
|
|
"LlamaModel")
|
|
class LlamaModel(TextModel):
|
|
model_arch = gguf.MODEL_ARCH.LLAMA
|
|
undo_permute = True
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
# fix for SmolVLM2, missing `num_attention_heads` in config.json
|
|
if self.hf_arch == "VLlama3ForCausalLM":
|
|
self.hparams["num_attention_heads"] = self.hparams.get("num_attention_heads", 32)
|
|
# Mistral consolidated format has no config.json; origin_hf_arch is HF-only.
|
|
if self.is_mistral_format:
|
|
self.origin_hf_arch = None
|
|
else:
|
|
hparams = ModelBase.load_hparams(self.dir_model, is_mistral_format=False)
|
|
self.origin_hf_arch = hparams.get('architectures', [None])[0]
|
|
|
|
# Detect eagle3 draft checkpoint by hparams (some models don't use a distinct HF arch name)
|
|
if "draft_vocab_size" in self.hparams and self.hparams["num_hidden_layers"] == 1:
|
|
self.is_eagle3 = True
|
|
self.model_arch = gguf.MODEL_ARCH.EAGLE3
|
|
logger.info("Detected EAGLE-3 draft model, switching to EAGLE3 architecture")
|
|
# Re-initialize tensor_map with eagle3 architecture
|
|
self.tensor_map = gguf.get_tensor_name_map(self.model_arch, self.block_count)
|
|
# Update gguf_writer architecture
|
|
self.gguf_writer.arch = gguf.MODEL_ARCH_NAMES[self.model_arch]
|
|
self.gguf_writer.add_architecture()
|
|
if self.target_model_dir is None:
|
|
raise ValueError(
|
|
"EAGLE-3 model requires --target-model-dir to be specified. "
|
|
"Please provide the path to the target model directory to read config.json"
|
|
)
|
|
# Read both eagle3 raw config and target model config
|
|
with open(self.dir_model / "config.json", 'r', encoding='utf-8') as f:
|
|
eagle3_raw_config = json.load(f)
|
|
with open(self.target_model_dir / "config.json", 'r', encoding='utf-8') as f:
|
|
target_config = json.load(f)
|
|
|
|
if "text_config" in target_config:
|
|
target_config = {**target_config, **target_config["text_config"]}
|
|
self.target_vocab_size = target_config["vocab_size"]
|
|
|
|
# target_layers: derived from target model layer count (low/mid/high)
|
|
target_num_layers = target_config["num_hidden_layers"]
|
|
target_layers = [2, target_num_layers // 2, target_num_layers - 3]
|
|
logger.info(f"EAGLE-3: target_layers = {target_layers} (target model has {target_num_layers} layers)")
|
|
self.gguf_writer.add_array(f"{self.gguf_writer.arch}.target_layers", target_layers)
|
|
|
|
# target_hidden_size: prefer eagle3 config, fallback to target config
|
|
if eagle3_raw_config.get("target_hidden_size") is not None:
|
|
target_hidden_size = eagle3_raw_config["target_hidden_size"]
|
|
src = "EAGLE-3 config"
|
|
else:
|
|
target_hidden_size = target_config["hidden_size"]
|
|
src = "target model config"
|
|
logger.info(f"EAGLE-3: target_hidden_size = {target_hidden_size} (from {src})")
|
|
self.gguf_writer.add_uint32(f"{self.gguf_writer.arch}.target_hidden_size", target_hidden_size)
|
|
|
|
# norm_before_residual (RedHat-style eagle3 specific)
|
|
norm_before_residual = eagle3_raw_config.get("norm_before_residual", False)
|
|
logger.info(f"EAGLE-3: norm_before_residual = {norm_before_residual}")
|
|
self.gguf_writer.add_bool(f"{self.gguf_writer.arch}.norm_before_residual", norm_before_residual)
|
|
|
|
def set_vocab(self):
|
|
# eagle3: use tokenizer from target model if provided
|
|
original_dir_model = None
|
|
if getattr(self, 'is_eagle3', False):
|
|
assert self.target_model_dir is not None
|
|
logger.info(f"EAGLE-3: Using tokenizer from target model: {self.target_model_dir}")
|
|
original_dir_model = self.dir_model
|
|
self.dir_model = self.target_model_dir
|
|
|
|
if self.origin_hf_arch == "GlmasrModel":
|
|
return self._set_vocab_glmedge()
|
|
|
|
if self.is_mistral_format:
|
|
return self._set_vocab_mistral()
|
|
|
|
path_tekken_json = self.dir_model / "tekken.json"
|
|
path_tokenizer_json = self.dir_model / "tokenizer.json"
|
|
if path_tekken_json.is_file() and not path_tokenizer_json.is_file():
|
|
self._set_vocab_mistral()
|
|
|
|
tokenizer_config_file = self.dir_model / 'tokenizer_config.json'
|
|
if tokenizer_config_file.is_file():
|
|
with open(tokenizer_config_file, "r", encoding="utf-8") as f:
|
|
tokenizer_config_json = json.load(f)
|
|
if (add_prefix_space := tokenizer_config_json.get("add_prefix_space")) is not None:
|
|
self.gguf_writer.add_add_space_prefix(add_prefix_space)
|
|
if tokenizer_config_json.get("tokenizer_class") == "HybridDNATokenizer":
|
|
return self._set_vocab_hybriddna()
|
|
|
|
try:
|
|
self._set_vocab_sentencepiece()
|
|
except FileNotFoundError:
|
|
try:
|
|
self._set_vocab_llama_hf()
|
|
except (FileNotFoundError, TypeError):
|
|
# Llama 3
|
|
self._set_vocab_gpt2()
|
|
|
|
# Apply to CodeLlama only (and ignore for Llama 3 with a vocab size of 128256)
|
|
if self.hparams.get("vocab_size", 32000) == 32016:
|
|
special_vocab = gguf.SpecialVocab(
|
|
self.dir_model, load_merges=False,
|
|
special_token_types = ['prefix', 'suffix', 'middle', 'eot']
|
|
)
|
|
special_vocab._set_special_token("prefix", 32007)
|
|
special_vocab._set_special_token("suffix", 32008)
|
|
special_vocab._set_special_token("middle", 32009)
|
|
special_vocab._set_special_token("eot", 32010)
|
|
special_vocab.add_to_gguf(self.gguf_writer)
|
|
|
|
# Apply to granite small models only
|
|
if self.hparams.get("vocab_size", 32000) == 49152:
|
|
self.gguf_writer.add_add_bos_token(False)
|
|
|
|
# eagle3: Restore original dir_model
|
|
if original_dir_model is not None:
|
|
self.dir_model = original_dir_model
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
hparams = self.hparams
|
|
|
|
if not self.is_mistral_format:
|
|
self.gguf_writer.add_vocab_size(hparams["vocab_size"])
|
|
|
|
if (rope_dim := hparams.get("head_dim")) is None:
|
|
rope_dim = hparams["hidden_size"] // hparams["num_attention_heads"]
|
|
self.gguf_writer.add_rope_dimension_count(rope_dim)
|
|
|
|
@staticmethod
|
|
def permute(weights: Tensor, n_head: int, n_head_kv: int | None):
|
|
if n_head_kv is not None and n_head != n_head_kv:
|
|
n_head = n_head_kv
|
|
return (weights.reshape(n_head, 2, weights.shape[0] // n_head // 2, *weights.shape[1:])
|
|
.swapaxes(1, 2)
|
|
.reshape(weights.shape))
|
|
|
|
def _repack_nvfp4(self, name: str, weight: Tensor, scale: Tensor, scale2: Tensor, input_scale: Tensor):
|
|
# Mirror the BF16 Q/K RoPE permutation site in modify_tensors; the NVFP4 path bypasses it.
|
|
if self.undo_permute:
|
|
n_head = self.find_hparam(["n_heads", "num_attention_heads"], optional=True)
|
|
n_kv_head = self.find_hparam(["n_kv_heads", "num_key_value_heads"], optional=True)
|
|
if n_head is not None:
|
|
if name.endswith("q_proj.weight"):
|
|
weight = LlamaModel.permute(weight, n_head, n_head)
|
|
scale = LlamaModel.permute(scale, n_head, n_head)
|
|
elif name.endswith("k_proj.weight"):
|
|
weight = LlamaModel.permute(weight, n_head, n_kv_head)
|
|
scale = LlamaModel.permute(scale, n_head, n_kv_head)
|
|
super()._repack_nvfp4(name, weight, scale, scale2, input_scale)
|
|
|
|
_experts: list[dict[str, Tensor]] | None = None
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
|
|
if "text_model." in name:
|
|
name = name.replace("text_model.", "") # for SmolVLM
|
|
|
|
return super().filter_tensors((name, gen))
|
|
|
|
def index_tensors(self, remote_hf_model_id: str | None = None) -> dict[str, Callable[[], Tensor]]:
|
|
tensors = super().index_tensors(remote_hf_model_id)
|
|
|
|
# Handle Eagle3Speculator nested config
|
|
if "transformer_layer_config" in self.hparams:
|
|
self.hparams = {**self.hparams, **self.hparams["transformer_layer_config"]}
|
|
|
|
# eagle3 detection
|
|
if "draft_vocab_size" in self.hparams and self.hparams["num_hidden_layers"] == 1:
|
|
logger.info("EAGLE-3: renaming midlayer.* / layers.0.* to model.layers.0.*")
|
|
new_tensors = {}
|
|
for name, gen in tensors.items():
|
|
if name.startswith("midlayer."):
|
|
new_name = "model.layers.0." + name[len("midlayer."):]
|
|
new_tensors[new_name] = gen
|
|
elif name.startswith("layers.0."): # Eagle3Speculator format
|
|
new_name = "model." + name
|
|
new_tensors[new_name] = gen
|
|
else:
|
|
new_tensors[name] = gen
|
|
return new_tensors
|
|
|
|
return tensors
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
# eagle3: special tensors that bypass standard llama mapping
|
|
if getattr(self, 'is_eagle3', False):
|
|
if name == "fc.weight":
|
|
yield (name, data_torch)
|
|
return
|
|
if name == "d2t":
|
|
# store for manual int64 handling in prepare_tensors (avoid F32 conversion)
|
|
if not hasattr(self, '_eagle3_int_tensors'):
|
|
self._eagle3_int_tensors = {}
|
|
self._eagle3_int_tensors[name] = data_torch
|
|
return
|
|
if name == "t2d":
|
|
# not used at runtime, skip
|
|
return
|
|
if name.endswith(".hidden_norm.weight"):
|
|
yield (self.format_tensor_name(gguf.MODEL_TENSOR.ATTN_NORM_2, bid), data_torch)
|
|
return
|
|
|
|
n_head = self.find_hparam(["n_heads", "num_attention_heads"])
|
|
n_kv_head = self.find_hparam(["n_kv_heads", "num_key_value_heads"])
|
|
|
|
if self.hf_arch == "LlamaModel":
|
|
name = "model." + name
|
|
|
|
if self.undo_permute:
|
|
if name.endswith(("q_proj.weight", "q_proj.bias")):
|
|
data_torch = LlamaModel.permute(data_torch, n_head, n_head)
|
|
if name.endswith(("k_proj.weight", "k_proj.bias")):
|
|
data_torch = LlamaModel.permute(data_torch, n_head, n_kv_head)
|
|
|
|
# process the experts separately
|
|
if name.find("block_sparse_moe.experts") != -1:
|
|
n_experts = self.hparams["num_local_experts"]
|
|
|
|
assert bid is not None
|
|
|
|
if self._experts is None:
|
|
self._experts = [{} for _ in range(self.block_count)]
|
|
|
|
self._experts[bid][name] = data_torch
|
|
|
|
if len(self._experts[bid]) >= n_experts * 3:
|
|
# merge the experts into a single 3d tensor
|
|
for wid in ["w1", "w2", "w3"]:
|
|
datas: list[Tensor] = []
|
|
|
|
for xid in range(n_experts):
|
|
ename = f"model.layers.{bid}.block_sparse_moe.experts.{xid}.{wid}.weight"
|
|
datas.append(self._experts[bid][ename])
|
|
del self._experts[bid][ename]
|
|
|
|
data_torch = torch.stack(datas, dim=0)
|
|
|
|
merged_name = f"layers.{bid}.feed_forward.experts.{wid}.weight"
|
|
|
|
yield from super().modify_tensors(data_torch, merged_name, bid)
|
|
return
|
|
else:
|
|
return
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
def generate_extra_tensors(self) -> Iterable[tuple[str, Tensor]]:
|
|
if rope_params := self.rope_parameters.get("full_attention", self.rope_parameters):
|
|
if rope_params.get("rope_type", '').lower() == "llama3":
|
|
base = rope_params.get("rope_theta", 10000.0)
|
|
if (dim := self.hparams.get("head_dim")) is None:
|
|
dim = self.hparams["hidden_size"] // self.hparams["num_attention_heads"]
|
|
freqs = 1.0 / (base ** (torch.arange(0, dim, 2, dtype=torch.float32) / dim))
|
|
|
|
factor = rope_params.get("factor", 8.0)
|
|
low_freq_factor = rope_params.get("low_freq_factor", 1.0)
|
|
high_freq_factor = rope_params.get("high_freq_factor", 4.0)
|
|
old_context_len = rope_params.get("original_max_position_embeddings", 8192)
|
|
|
|
low_freq_wavelen = old_context_len / low_freq_factor
|
|
high_freq_wavelen = old_context_len / high_freq_factor
|
|
# assert low_freq_wavelen != high_freq_wavelen # Errors for Llama4
|
|
|
|
rope_factors = []
|
|
for freq in freqs:
|
|
wavelen = 2 * math.pi / freq
|
|
if wavelen < high_freq_wavelen:
|
|
rope_factors.append(1)
|
|
elif wavelen > low_freq_wavelen:
|
|
rope_factors.append(factor)
|
|
else:
|
|
smooth = (old_context_len / wavelen - low_freq_factor) / (high_freq_factor - low_freq_factor)
|
|
rope_factors.append(1 / ((1 - smooth) / factor + smooth))
|
|
|
|
yield (self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FREQS), torch.tensor(rope_factors, dtype=torch.float32))
|
|
|
|
def prepare_tensors(self):
|
|
# eagle3: collect d2t original dtype before parent converts tensors to F32
|
|
eagle3_original_dtypes = {}
|
|
if getattr(self, 'is_eagle3', False):
|
|
for name, data_torch in self.get_tensors():
|
|
if name == "d2t":
|
|
eagle3_original_dtypes[name] = data_torch.dtype
|
|
|
|
super().prepare_tensors()
|
|
|
|
# eagle3: write d2t as absolute target token ids
|
|
if getattr(self, 'is_eagle3', False) and hasattr(self, '_eagle3_int_tensors'):
|
|
for name, data_torch in self._eagle3_int_tensors.items():
|
|
old_dtype = eagle3_original_dtypes.get(name, data_torch.dtype)
|
|
data = data_torch.to(torch.int64).cpu().numpy()
|
|
if name == "d2t":
|
|
data = data.reshape(-1)
|
|
data = data + np.arange(data.size, dtype=np.int64)
|
|
if np.any((data < 0) | (data >= self.target_vocab_size)):
|
|
raise ValueError(f"EAGLE-3 d2t target ids out of range for target vocab size {self.target_vocab_size}")
|
|
if np.unique(data).size != data.size:
|
|
raise ValueError("EAGLE-3 d2t contains duplicate target ids")
|
|
data_qtype = gguf.GGMLQuantizationType.I64
|
|
|
|
shape_str = f"{{{', '.join(str(n) for n in reversed(data.shape))}}}"
|
|
logger.info(f"{name + ',':<30} {old_dtype} --> {data_qtype.name}, shape = {shape_str}")
|
|
self.gguf_writer.add_tensor(name, data, raw_dtype=data_qtype)
|
|
|
|
if self._experts is not None:
|
|
# flatten `list[dict[str, Tensor]]` into `list[str]`
|
|
experts = [k for d in self._experts for k in d.keys()]
|
|
if len(experts) > 0:
|
|
raise ValueError(f"Unprocessed experts: {experts}")
|
|
|
|
|
|
@ModelBase.register("ArceeForCausalLM")
|
|
class ArceeModel(LlamaModel):
|
|
model_arch = gguf.MODEL_ARCH.ARCEE
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
self._try_set_pooling_type()
|
|
|
|
|
|
@ModelBase.register(
|
|
"Llama4ForConditionalGeneration",
|
|
"Llama4ForCausalLM",
|
|
)
|
|
class Llama4Model(LlamaModel):
|
|
model_arch = gguf.MODEL_ARCH.LLAMA4
|
|
undo_permute = False
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
# IMPORTANT: the normal "intermediate_size" is renamed to "intermediate_size_mlp", we need to undo this
|
|
self.hparams["intermediate_size_moe"] = self.hparams["intermediate_size"]
|
|
self.hparams["intermediate_size"] = self.hparams["intermediate_size_mlp"]
|
|
|
|
def set_vocab(self):
|
|
self._set_vocab_gpt2()
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
self.gguf_writer.add_interleave_moe_layer_step(self.hparams["interleave_moe_layer_step"])
|
|
self.gguf_writer.add_expert_feed_forward_length(self.hparams["intermediate_size_moe"])
|
|
if "layer_types" in self.hparams:
|
|
if all(lt == "full_attention" for lt in self.hparams["layer_types"]):
|
|
# all layers are full attention (for MobileLLM), disable swa
|
|
self.gguf_writer.add_sliding_window(0)
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None):
|
|
# split the gate_up into gate and up
|
|
if "gate_up_proj" in name:
|
|
name_up = name.replace("gate_up_proj", "up_proj.weight")
|
|
name_gate = name.replace("gate_up_proj", "gate_proj.weight")
|
|
dim_half = data_torch.shape[-1] // 2
|
|
gate_proj_weight, up_proj_weight = data_torch.transpose(-1, -2).split(dim_half, dim=-2)
|
|
yield from super().modify_tensors(gate_proj_weight, name_gate, bid)
|
|
yield from super().modify_tensors(up_proj_weight, name_up, bid)
|
|
return
|
|
|
|
if name.endswith("down_proj"):
|
|
name += ".weight"
|
|
data_torch = data_torch.transpose(-1, -2)
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
|
|
@ModelBase.register("LlamaBidirectionalModel")
|
|
class LlamaEmbedNemotronModel(LlamaModel):
|
|
model_arch = gguf.MODEL_ARCH.LLAMA_EMBED
|
|
|
|
|
|
@ModelBase.register("SmolLM3ForCausalLM")
|
|
class SmolLM3Model(LlamaModel):
|
|
model_arch = gguf.MODEL_ARCH.SMOLLM3
|
|
|
|
|
|
@ModelBase.register("ApertusForCausalLM")
|
|
class ApertusModel(LlamaModel):
|
|
model_arch = gguf.MODEL_ARCH.APERTUS
|
|
undo_permute = False
|
|
|
|
_alpha_n = {}
|
|
_alpha_p = {}
|
|
_beta = {}
|
|
_eps = {}
|
|
|
|
def modify_tensors(self, data_torch, name, bid):
|
|
# Handle xIELU activation parameters
|
|
n_layers = self.hparams["num_hidden_layers"]
|
|
if name.endswith(".act_fn.alpha_n"):
|
|
self._alpha_n[bid] = data_torch.to("cpu").float().item()
|
|
if (len(self._alpha_n) == n_layers):
|
|
self.gguf_writer.add_xielu_alpha_n([self._alpha_n[k] for k in sorted(self._alpha_n)])
|
|
return
|
|
if name.endswith(".act_fn.alpha_p"):
|
|
self._alpha_p[bid] = data_torch.to("cpu").float().item()
|
|
if (len(self._alpha_p) == n_layers):
|
|
self.gguf_writer.add_xielu_alpha_p([self._alpha_p[k] for k in sorted(self._alpha_p)])
|
|
return
|
|
if name.endswith(".act_fn.beta"):
|
|
self._beta[bid] = data_torch.to("cpu").float().item()
|
|
if (len(self._beta) == n_layers):
|
|
self.gguf_writer.add_xielu_beta([self._beta[k] for k in sorted(self._beta)])
|
|
return
|
|
if name.endswith(".act_fn.eps"):
|
|
self._eps[bid] = data_torch.to("cpu").float().item()
|
|
if (len(self._eps) == n_layers):
|
|
self.gguf_writer.add_xielu_eps([self._eps[k] for k in sorted(self._eps)])
|
|
return
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|