mirror of
https://github.com/ggml-org/llama.cpp.git
synced 2026-06-25 22:00:21 +00:00
338 lines
16 KiB
Python
338 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
import math
|
|
import re
|
|
|
|
from typing import Callable, Iterable, TYPE_CHECKING
|
|
|
|
import torch
|
|
|
|
if TYPE_CHECKING:
|
|
from torch import Tensor
|
|
|
|
from .base import MmprojModel, ModelBase, TextModel, _MISTRAL_COMMON_DATASET_MEAN, _MISTRAL_COMMON_DATASET_STD, gguf
|
|
|
|
from .qwen import Qwen3Model
|
|
|
|
|
|
@ModelBase.register("StepVLForConditionalGeneration", "Step3p7ForConditionalGeneration")
|
|
class Step3VLVisionModel(MmprojModel):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
assert self.hparams_vision is not None
|
|
|
|
if not self.hparams_vision.get("intermediate_size"):
|
|
hidden_size = self.hparams_vision.get("hidden_size") or self.hparams_vision.get("width") or 0
|
|
assert hidden_size > 0
|
|
mlp_ratio = float(self.hparams_vision.get("mlp_ratio", 8960 / 1536))
|
|
self.hparams_vision["intermediate_size"] = int(round(hidden_size * mlp_ratio))
|
|
|
|
self.preprocessor_config.setdefault("image_mean", list(_MISTRAL_COMMON_DATASET_MEAN))
|
|
self.preprocessor_config.setdefault("image_std", list(_MISTRAL_COMMON_DATASET_STD))
|
|
|
|
def set_gguf_parameters(self):
|
|
super().set_gguf_parameters()
|
|
assert self.hparams_vision is not None
|
|
|
|
projector_stride = int(self.global_config.get("understand_projector_stride", -1))
|
|
hidden_size = int(self.hparams_vision.get("hidden_size", self.hparams_vision.get("width", -1)))
|
|
num_layers = int(self.hparams_vision.get("num_hidden_layers", self.hparams_vision.get("layers", -1)))
|
|
assert (projector_stride, int(self.hparams_vision.get("image_size", -1)), hidden_size, num_layers) == (2, 728, 1536, 47), (
|
|
"current Step3-VL conversion path is only validated for Step3-VL-10B"
|
|
)
|
|
|
|
self.gguf_writer.add_clip_projector_type(gguf.VisionProjectorType.STEP3VL)
|
|
self.gguf_writer.add_vision_attention_layernorm_eps(float(self.hparams_vision.get("layer_norm_eps", 1e-5)))
|
|
self.gguf_writer.add_vision_projector_scale_factor(projector_stride ** 2)
|
|
# 3024 max resize comes from step3-vl-10b processing_step3.py.
|
|
self.gguf_writer.add_vision_preproc_image_size(3024)
|
|
|
|
def tensor_force_quant(self, name, new_name, bid, n_dims):
|
|
if ".position_embd." in new_name:
|
|
return gguf.GGMLQuantizationType.F32
|
|
if ("mm.0." in new_name or "mm.1." in new_name) and new_name.endswith(".weight"):
|
|
return gguf.GGMLQuantizationType.F16 if self.ftype == gguf.LlamaFileType.MOSTLY_F16 else gguf.GGMLQuantizationType.F32
|
|
return super().tensor_force_quant(name, new_name, bid, n_dims)
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
name, gen = item
|
|
|
|
if name.startswith(("model.", "lm_head.")):
|
|
return None
|
|
|
|
return super().filter_tensors(item)
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
|
|
if name.startswith("vision_model.vit_downsampler"):
|
|
match = re.match(r"vision_model\.vit_downsampler(\d+)\.(weight|bias)", name)
|
|
if match is None:
|
|
raise ValueError(f"Unexpected Step3-VL projector tensor {name!r}")
|
|
|
|
proj_id = int(match.group(1)) - 1
|
|
suffix = f".{match.group(2)}"
|
|
yield (self.format_tensor_name(gguf.MODEL_TENSOR.V_MMPROJ, proj_id, suffix=suffix), data_torch)
|
|
return
|
|
|
|
if name == "vit_large_projector.weight":
|
|
yield (self.format_tensor_name(gguf.MODEL_TENSOR.V_MMPROJ_FC), data_torch)
|
|
return
|
|
|
|
if name.startswith("vision_model."):
|
|
if name == "vision_model.positional_embedding":
|
|
name += ".weight"
|
|
elif name.endswith(".gamma") and ".ls_" in name:
|
|
name = name.removesuffix(".gamma") + ".weight"
|
|
|
|
name = name.replace("attn.in_proj_weight", "attn.in_proj.weight")
|
|
name = name.replace("attn.in_proj_bias", "attn.in_proj.bias")
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
|
|
@ModelBase.register("StepVLForConditionalGeneration")
|
|
class Step3VLTextModel(Qwen3Model):
|
|
model_arch = gguf.MODEL_ARCH.QWEN3
|
|
|
|
|
|
@ModelBase.register("Step3p5ForCausalLM", "Step3p7ForConditionalGeneration")
|
|
class Step35Model(TextModel):
|
|
model_arch = gguf.MODEL_ARCH.STEP35
|
|
|
|
# The --mtp / --no-mtp toggles are ModelBase.mtp_only / no_mtp (set in
|
|
# convert_hf_to_gguf.py main()). Unlike Qwen3.5, which stores MTP under a
|
|
# `mtp.*` namespace, Step3.5 appends MTP layers at
|
|
# `model.layers.{num_hidden_layers + i}`, so we filter them by layer index.
|
|
# The trunk layer count is captured before indexing so the classmethod
|
|
# filter_tensors can tell the appended MTP block(s) apart from the trunk.
|
|
_n_main_layers: int | None = None
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
# NextN/MTP layers are appended past num_hidden_layers; extend the
|
|
# tensor map to cover them so the MTP block's tensors get correctly
|
|
# indexed names. When --no-mtp drops the MTP blocks, fall back to the
|
|
# base num_hidden_layers so we don't reserve unused slots.
|
|
n_nextn = int(self.hparams.get("num_nextn_predict_layers", 0))
|
|
if n_nextn > 0 and not self.no_mtp:
|
|
self.block_count += n_nextn
|
|
self.tensor_map = gguf.get_tensor_name_map(self.model_arch, self.block_count)
|
|
|
|
def index_tensors(self, remote_hf_model_id: str | None = None):
|
|
# filter_tensors is a classmethod and can't reach self.hparams; stash
|
|
# the trunk layer count here (before indexing runs) so it can detect
|
|
# the appended MTP layers by index.
|
|
hparams = {**self.hparams, **self.hparams.get("text_config", {})}
|
|
key = next((k for k in ["n_layers", "num_hidden_layers", "n_layer", "num_layers"] if k in hparams), None)
|
|
type(self)._n_main_layers = hparams.get(key)
|
|
return super().index_tensors(remote_hf_model_id=remote_hf_model_id)
|
|
|
|
def set_gguf_parameters(self):
|
|
rope_theta = self.hparams.get("rope_theta")
|
|
if isinstance(rope_theta, list):
|
|
self.hparams["rope_theta"] = float(rope_theta[0])
|
|
self.hparams["local_rope_theta"] = float(rope_theta[1])
|
|
self.rope_parameters["rope_theta"] = self.hparams["rope_theta"]
|
|
self.rope_parameters["sliding_attention"] = {"rope_theta": self.hparams["local_rope_theta"]}
|
|
|
|
super().set_gguf_parameters()
|
|
|
|
layer_types = self.hparams.get("layer_types") or []
|
|
partial_rotary_factors = self.hparams.get("partial_rotary_factors") or []
|
|
attn_other = self.hparams.get("attention_other_setting") or {}
|
|
|
|
n_head_base = self.hparams["num_attention_heads"]
|
|
n_kv_base = self.hparams["num_attention_groups"]
|
|
|
|
n_head_swa = attn_other.get("num_attention_heads", n_head_base)
|
|
n_kv_swa = attn_other.get("num_attention_groups", n_kv_base)
|
|
|
|
n_nextn = int(self.hparams.get("num_nextn_predict_layers", 0))
|
|
|
|
# The Step3p5 HF checkpoint stores layer_types/partial_rotary_factors
|
|
# entries for the MTP blocks past num_hidden_layers; preserve them so
|
|
# the MTP layer's attention shape, SWA flag, and partial RoPE dim are
|
|
# set correctly. Pad with full-attention defaults if the checkpoint
|
|
# truncated them.
|
|
def _pad(arr, n, default):
|
|
arr = list(arr)
|
|
if len(arr) < n:
|
|
arr = arr + [default] * (n - len(arr))
|
|
return arr[:n]
|
|
|
|
layer_types = _pad(layer_types, self.block_count, "full_attention")
|
|
partial_rotary_factors = _pad(
|
|
partial_rotary_factors,
|
|
self.block_count,
|
|
0.5, # full_attention default for Step3p5
|
|
)
|
|
assert [1.0 if lt == "sliding_attention" else 0.5 for lt in layer_types] == partial_rotary_factors
|
|
head_arr = [n_head_swa if lt == "sliding_attention" else n_head_base for lt in layer_types]
|
|
kv_arr = [n_kv_swa if lt == "sliding_attention" else n_kv_base for lt in layer_types]
|
|
swa_pat = [lt == "sliding_attention" for lt in layer_types]
|
|
|
|
self.gguf_writer.add_head_count(head_arr)
|
|
self.gguf_writer.add_head_count_kv(kv_arr)
|
|
|
|
self.gguf_writer.add_sliding_window(self.hparams["sliding_window"])
|
|
self.gguf_writer.add_sliding_window_pattern(swa_pat)
|
|
|
|
self.gguf_writer.add_value_length(self.hparams["head_dim"])
|
|
|
|
# MoE params
|
|
self.gguf_writer.add_expert_count(self.hparams["moe_num_experts"])
|
|
self.gguf_writer.add_expert_used_count(self.hparams["moe_top_k"])
|
|
self.gguf_writer.add_expert_feed_forward_length(self.hparams["moe_intermediate_size"])
|
|
self.gguf_writer.add_expert_shared_feed_forward_length(self.hparams["share_expert_dim"])
|
|
|
|
if (moe_router_scaling_factor := self.hparams.get("moe_router_scaling_factor")) is not None:
|
|
self.gguf_writer.add_expert_weights_scale(moe_router_scaling_factor)
|
|
if (norm_expert_weight := self.hparams.get("norm_expert_weight")) is not None:
|
|
self.gguf_writer.add_expert_weights_norm(norm_expert_weight)
|
|
|
|
# leading dense blocks
|
|
leading_dense = 0
|
|
moe_layers_enum = self.hparams.get("moe_layers_enum")
|
|
if isinstance(moe_layers_enum, str) and moe_layers_enum.strip():
|
|
moe_layers = sorted(int(i) for i in moe_layers_enum.strip().split(","))
|
|
if moe_layers:
|
|
leading_dense = max(0, moe_layers[0])
|
|
self.gguf_writer.add_leading_dense_block_count(leading_dense)
|
|
self.gguf_writer.add_moe_every_n_layers(int(self.hparams.get("moe_every_n_layer", 1)))
|
|
|
|
self.gguf_writer.add_layer_norm_rms_eps(self.hparams.get("rms_norm_eps", 1e-5))
|
|
|
|
# Optional per-layer SwiGLU clamps. MTP layers default to no clamping (0.0).
|
|
if (limits := self.hparams.get("swiglu_limits")) is not None:
|
|
limits_f = _pad(
|
|
[0.0 if v is None else float(v) for v in limits],
|
|
self.block_count,
|
|
0.0,
|
|
)
|
|
self.gguf_writer.add_swiglu_clamp_exp(limits_f)
|
|
if (limits_shared := self.hparams.get("swiglu_limits_shared")) is not None:
|
|
limits_shared_f = _pad(
|
|
[0.0 if v is None else float(v) for v in limits_shared],
|
|
self.block_count,
|
|
0.0,
|
|
)
|
|
self.gguf_writer.add_swiglu_clamp_shexp(limits_shared_f)
|
|
|
|
if n_nextn > 0 and not self.no_mtp:
|
|
self.gguf_writer.add_nextn_predict_layers(n_nextn)
|
|
|
|
@classmethod
|
|
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
|
|
if (titem := super().filter_tensors(item)) is None:
|
|
return None
|
|
name, gen = titem
|
|
|
|
# Map router bias (expert selection bias) to a GGUF bias tensor
|
|
if name.endswith(".moe.router_bias"):
|
|
name += ".bias"
|
|
|
|
# Step3.5 appends the MTP block(s) past num_hidden_layers.
|
|
assert cls._n_main_layers is not None
|
|
is_mtp = (m := re.match(r"model\.layers\.(\d+)\.", name)) is not None and int(m.group(1)) >= cls._n_main_layers
|
|
|
|
# --no-mtp: drop the appended MTP block(s) entirely.
|
|
if is_mtp and cls.no_mtp:
|
|
return None
|
|
# --mtp: keep ONLY MTP-block tensors plus the shared embeddings/norm/
|
|
# lm_head (so the resulting GGUF carries just the draft head).
|
|
if cls.mtp_only and not is_mtp and name not in (
|
|
"model.embed_tokens.weight", "model.norm.weight", "lm_head.weight",
|
|
):
|
|
return None
|
|
|
|
# The checkpoint nests the per-MTP-layer shared head under
|
|
# `model.layers.{N+i}.transformer.shared_head.{norm,output}.weight`;
|
|
# strip the `transformer.` infix and rename `output` → `head` so the
|
|
# existing NEXTN_SHARED_HEAD_{NORM,HEAD} tensor mapping picks them up.
|
|
# Mirrors vllm's `_rewrite_spec_layer_name` (step3p5_mtp.py).
|
|
if is_mtp:
|
|
name = name.replace(".transformer.", ".")
|
|
name = name.replace("shared_head.output", "shared_head.head")
|
|
|
|
return name, gen
|
|
|
|
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None):
|
|
if name.endswith("norm.weight"):
|
|
data_torch += 1.0
|
|
|
|
if name.endswith((".self_attn.g_proj.weight", ".moe.gate.weight", ".moe.up_proj.weight", ".moe.gate_proj.weight", ".moe.down_proj.weight")):
|
|
data_torch = data_torch.squeeze().contiguous()
|
|
|
|
yield from super().modify_tensors(data_torch, name, bid)
|
|
|
|
def prepare_metadata(self, vocab_only: bool):
|
|
from_dir = self.fname_out.is_dir()
|
|
super().prepare_metadata(vocab_only=vocab_only)
|
|
|
|
# Mirror Qwen3.5's behavior: when emitting a draft-only file into a
|
|
# directory, prefix with "mtp-" so it doesn't collide with the trunk.
|
|
if not self.mtp_only or not from_dir:
|
|
return
|
|
|
|
output_type: str = self.ftype.name.partition("_")[2]
|
|
fname_default: str = gguf.naming_convention(
|
|
self.metadata.name, self.metadata.basename, self.metadata.finetune,
|
|
self.metadata.version, size_label=None, output_type=output_type, model_type=None)
|
|
self.fname_out = self.fname_out.parent / f"mtp-{fname_default}.gguf"
|
|
|
|
def generate_extra_tensors(self) -> Iterable[tuple[str, Tensor]]:
|
|
# Step35 can optionally use Llama-3 style RoPE scaling (HF: rope_scaling.rope_type == "llama3").
|
|
# llama.cpp represents this via a single extra tensor: "rope_freqs.weight" (aka MODEL_TENSOR.ROPE_FREQS).
|
|
rope_params = self.rope_parameters.get("full_attention", self.rope_parameters)
|
|
rope_type = rope_params.get("rope_type") or ""
|
|
if rope_type.lower() != "llama3":
|
|
return
|
|
|
|
# Step35 configs can carry per-layer rope_theta as a list; for llama3 rope factors we use the base value.
|
|
rope_theta = self.hparams.get("rope_theta", 10000.0)
|
|
if isinstance(rope_theta, list):
|
|
rope_theta = rope_theta[0]
|
|
base = float(rope_theta)
|
|
|
|
if (storage_dim := self.hparams.get("head_dim")) is None:
|
|
storage_dim = self.hparams["hidden_size"] // self.hparams["num_attention_heads"]
|
|
storage_dim = int(storage_dim)
|
|
|
|
# Llama 3 factors apply only to the rotary dims used by full_attention layers
|
|
# (partial_rotary_factor * head_dim). Remaining slots are padded with 1.0 so
|
|
# sliding_attention layers remain unaffected. set_gguf_parameters already
|
|
# guarantees at least one full_attention layer.
|
|
layer_types = (self.hparams.get("layer_types") or [])[: self.block_count]
|
|
partial_rotary_factors = (self.hparams.get("partial_rotary_factors") or [])[: self.block_count]
|
|
full_attention_factor = next(
|
|
float(f) for lt, f in zip(layer_types, partial_rotary_factors) if lt == "full_attention"
|
|
)
|
|
rotary_dim = int(storage_dim * full_attention_factor)
|
|
|
|
freqs = 1.0 / (base ** (torch.arange(0, rotary_dim, 2, dtype=torch.float32) / rotary_dim))
|
|
|
|
factor = float(rope_params.get("factor", 8.0))
|
|
low_freq_factor = float(rope_params.get("low_freq_factor", 1.0))
|
|
high_freq_factor = float(rope_params.get("high_freq_factor", 4.0))
|
|
old_context_len = int(rope_params.get("original_max_position_embeddings", 8192))
|
|
|
|
low_freq_wavelen = old_context_len / low_freq_factor
|
|
high_freq_wavelen = old_context_len / high_freq_factor
|
|
|
|
rope_factors: list[float] = []
|
|
for freq in freqs:
|
|
wavelen = 2 * math.pi / float(freq)
|
|
if wavelen < high_freq_wavelen:
|
|
rope_factors.append(1.0)
|
|
elif wavelen > low_freq_wavelen:
|
|
rope_factors.append(factor)
|
|
else:
|
|
smooth = (old_context_len / wavelen - low_freq_factor) / (high_freq_factor - low_freq_factor)
|
|
rope_factors.append(1.0 / ((1.0 - smooth) / factor + smooth))
|
|
|
|
# Pad to head_dim/2 with 1.0 so non-scaled layers remain neutral.
|
|
if len(rope_factors) < storage_dim // 2:
|
|
rope_factors.extend([1.0] * (storage_dim // 2 - len(rope_factors)))
|
|
|
|
yield (self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FREQS), torch.tensor(rope_factors, dtype=torch.float32))
|