Files
Gabe Goodhart a3900a6694 model: Granite Speech Plus (#24818)
* feat: Add conversion support for Granite Speech Plus

Branch: GraniteSpeechPlus
AI-usage: full (Bob, OpenCode + Qwen3.6-35b)
Signed-off-by: Gabe Goodhart <ghart@us.ibm.com>

* feat: Extend granite_speech to support plus multi-layer concatenation

Branch: GraniteSpeechPlus
AI-usage: draft (Bob, OpenCode + Qwen3.6-35b)
Signed-off-by: Gabe Goodhart <ghart@us.ibm.com>

* fix(conversion): Fix plural naming for feature_layers for audio

Branch: GraniteSpeechPlus
AI-usage: none
Signed-off-by: Gabe Goodhart <ghart@us.ibm.com>

* fix(mtmd): Align feature_layer usage and naming everywhere

Branch: GraniteSpeechPlus
AI-usage: none
Signed-off-by: Gabe Goodhart <ghart@us.ibm.com>

* style: Use fstring for log

Signed-off-by: Gabe Goodhart <ghart@us.ibm.com>

Co-authored-by: Xuan-Son Nguyen <thichthat@gmail.com>

---------

Co-authored-by: Xuan-Son Nguyen <thichthat@gmail.com>
2026-06-23 12:03:31 +02:00

507 lines
24 KiB
Python

from __future__ import annotations
import re
from typing import Any, Callable, Iterable, TYPE_CHECKING
import torch
if TYPE_CHECKING:
from torch import Tensor
from .base import MmprojModel, ModelBase, gguf, logger
from .llama import LlamaModel
from .mamba import Mamba2Model
@ModelBase.register("GraniteForCausalLM")
class GraniteModel(LlamaModel):
"""Conversion for IBM's GraniteForCausalLM"""
model_arch = gguf.MODEL_ARCH.GRANITE
def set_gguf_parameters(self):
"""Granite uses standard llama parameters with the following differences:
- No head_dim support
- New multiplier params:
- attention_scale
- embedding_scale
- residual_scale
- logits_scaling
"""
if head_dim := self.hparams.pop("head_dim", None):
logger.warning("Ignoring head_dim (%s) from config for Granite", head_dim)
super().set_gguf_parameters()
# NOTE: Convert _multiplier params to _scale params for naming
# consistency
if attention_scale := self.hparams.get("attention_multiplier"):
self.gguf_writer.add_attention_scale(attention_scale)
logger.info("gguf: (granite) attention_scale = %s", attention_scale)
if embedding_scale := self.hparams.get("embedding_multiplier"):
self.gguf_writer.add_embedding_scale(embedding_scale)
logger.info("gguf: (granite) embedding_scale = %s", embedding_scale)
if residual_scale := self.hparams.get("residual_multiplier"):
self.gguf_writer.add_residual_scale(residual_scale)
logger.info("gguf: (granite) residual_scale = %s", residual_scale)
if logits_scale := self.hparams.get("logits_scaling"):
self.gguf_writer.add_logit_scale(logits_scale)
logger.info("gguf: (granite) logits_scale = %s", logits_scale)
# If being used as the base for Granite4 Vision, add deepstack_layer_arr
if self.hparams.get("spatial_target_layers") or self.hparams.get("deepstack_layer_map"):
normalized_projector_map = Granite4VisionMmprojModel.get_normalized_projector_map(self.hparams)
deepstack_mapping_arr = [-1 for _ in range(self.block_count)] # Populate with -1 sentinels
for proj_idx, (_, llm_layer, _, _) in enumerate(normalized_projector_map):
# Skip the first projector which is handled as the base embedding
# stream like normal
if proj_idx == 0:
continue
deepstack_mapping_arr[llm_layer] = proj_idx
self.gguf_writer.add_deepstack_mapping(deepstack_mapping_arr)
@classmethod
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
name, gen = item
# Skip multimodal tensors
if (
name.startswith(("encoder."))
or "image_" in name
or "layerwise_projectors" in name
or "spatial_projectors" in name
):
return
return super().filter_tensors(item)
@ModelBase.register("GraniteMoeForCausalLM", "GraniteMoeSharedForCausalLM")
class GraniteMoeModel(GraniteModel):
"""Conversion for IBM's GraniteMoeForCausalLM"""
model_arch = gguf.MODEL_ARCH.GRANITE_MOE
def set_gguf_parameters(self):
"""GraniteMoeShared uses GraniteMoe parameters plus the following:
- shared_intermediate_size
"""
super().set_gguf_parameters()
if shared_feed_forward_length := self.hparams.get("shared_intermediate_size"):
self.gguf_writer.add_expert_shared_feed_forward_length(shared_feed_forward_length)
logger.info("gguf: (granitemoeshared) shared_feed_forward_length = %s", shared_feed_forward_length)
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
"""In modeling_granitemoe, the JetMoe implementation of parallel experts
is used. This essentially merges w1 and w3 into a single tensor with 2x
the hidden size that is then split during forward. To keep compatibility
with existing mixtral support, we pull them apart here.
"""
if name.endswith("block_sparse_moe.input_linear.weight"):
ffn_dim = self.hparams["intermediate_size"]
assert data_torch.shape[-2] == 2 * ffn_dim, "Merged FFN tensor size must be 2 * intermediate_size"
gate, up = data_torch.split(ffn_dim, dim=-2)
yield from ModelBase.modify_tensors(self, gate, self.format_tensor_name(gguf.MODEL_TENSOR.FFN_GATE_EXP, bid), bid)
yield from ModelBase.modify_tensors(self, up, self.format_tensor_name(gguf.MODEL_TENSOR.FFN_UP_EXP, bid), bid)
return
has_experts = bool(self.hparams.get('num_local_experts'))
if name.endswith("shared_mlp.input_linear.weight"):
ffn_dim = self.hparams["shared_intermediate_size"]
assert data_torch.shape[-2] == 2 * ffn_dim, "Merged FFN tensor size must be 2 * shared_intermediate_size"
gate, up = data_torch.split(ffn_dim, dim=-2)
if has_experts:
yield from ModelBase.modify_tensors(self, gate,self.format_tensor_name(gguf.MODEL_TENSOR.FFN_GATE_SHEXP, bid), bid)
yield from ModelBase.modify_tensors(self, up, self.format_tensor_name(gguf.MODEL_TENSOR.FFN_UP_SHEXP, bid), bid)
return
yield from ModelBase.modify_tensors(self, gate, self.format_tensor_name(gguf.MODEL_TENSOR.FFN_GATE, bid), bid)
yield from ModelBase.modify_tensors(self, up, self.format_tensor_name(gguf.MODEL_TENSOR.FFN_UP, bid), bid)
return
if not has_experts and name.endswith("shared_mlp.output_linear.weight"):
yield from ModelBase.modify_tensors(self, data_torch, self.format_tensor_name(gguf.MODEL_TENSOR.FFN_DOWN, bid), bid)
return
yield from super().modify_tensors(data_torch, name, bid)
@ModelBase.register("GraniteMoeHybridForCausalLM", "BambaForCausalLM")
class GraniteHybridModel(Mamba2Model, GraniteMoeModel):
"""GraniteHybrid is a hybrid SSM + Attention model that uses Mamba2 SSM
layers and optionally uses MoE w/ a shared expert"""
model_arch = gguf.MODEL_ARCH.GRANITE_HYBRID
undo_permute = True
def __init__(self, *args, **kwargs):
# Hybrid mamba models use a prefix for the mamba-specific params.
# TODO: Extend this if the prefix(es) need to be configurable
self.hparam_prefixes = ["mamba"]
super().__init__(*args, **kwargs)
# Lists of which layers use ssm vs attention
self._attn_layers = self.get_attn_layers()
self._ssm_layers = [
i for i in range(self.block_count)
if i not in self._attn_layers
]
# There are some models in this family that are non-hybrid, but keep the
# same parent class by setting all layers to "attention." If this is the
# case, the model architecture needs to be updated to a standard
# "granite" or "granitemoe" model
if not self._ssm_layers:
has_experts = self.find_hparam(["num_experts_per_tok", "num_experts_per_token"], optional=True)
new_arch = (
gguf.MODEL_ARCH.GRANITE_MOE
if has_experts else
gguf.MODEL_ARCH.GRANITE
)
self.model_arch = new_arch
self.gguf_writer.arch = gguf.MODEL_ARCH_NAMES[new_arch]
self.gguf_writer.add_architecture()
# n_group and d_inner are used during reshape_tensors for mamba2
# NOTE: Explicitly include hparam prefix prefix for d_model to
# disambiguate with top-level head_dim
# NOTE 2: If needed for future models, this can be isolated in a method
# to separate the prefix setting and the keys used
self.d_model = self.find_hparam([f"{self.hparam_prefixes[0]}_head_dim", "hidden_size", "d_model"])
self.n_group = self.find_hparam(["n_groups", "num_groups"])
self.d_inner = self.find_hparam(["expand", "num_heads"]) * self.d_model
def get_attn_layers(self):
# Explicit list of layer type names
if layer_types := self.hparams.get("layer_types"):
return [
i for i, typ in enumerate(layer_types)
if typ == "attention"
]
# Layer types indicated by index or period
attn_layers = self.hparams.get("attn_layer_indices", [])
if not attn_layers:
attn_period = self.hparams.get("attn_layer_period")
assert attn_period, "Didn't find attn_layer_indices or attn_layer_period"
attn_offset = self.hparams.get("attn_layer_offset")
assert attn_offset is not None, "No attention layer offset set with attn_layer_period"
attn_layers = [
i for i in range(self.block_count)
if i % attn_period == attn_offset
]
return attn_layers
def find_hparam(self, keys: Iterable[str], *args, **kwargs) -> Any:
prefixed = []
for pfx in self.hparam_prefixes:
prefixed.extend(
"_".join([pfx, k])
for k in keys
)
keys = list(keys) + prefixed
return Mamba2Model.find_hparam(self, keys, *args, **kwargs)
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
if (
name.endswith("block_sparse_moe.input_linear.weight")
or "shared_mlp" in name
):
yield from GraniteMoeModel.modify_tensors(self, data_torch, name, bid)
return
# Determine whether this is a mamba layer or an attention layer
if bid in self._ssm_layers:
yield from Mamba2Model.modify_tensors(self, data_torch, name, bid)
return
elif bid in self._attn_layers:
yield from GraniteMoeModel.modify_tensors(self, data_torch, name, bid)
return
yield from ModelBase.modify_tensors(self, data_torch, name, bid)
def set_gguf_parameters(self):
"""This method merges params from both parents and some that are
specific to this model. The result is some duplication of how the params
get set. The following warnings are expected during conversion:
WARNING:Duplicated key name 'granitehybrid.attention.head_count_kv'
WARNING:Duplicated key name 'granitehybrid.context_length'
"""
GraniteMoeModel.set_gguf_parameters(self)
## Mamba mixer params ##
self.gguf_writer.add_ssm_conv_kernel(self.find_hparam(["conv_kernel", "d_conv"]))
self.gguf_writer.add_ssm_state_size(self.find_hparam(["state_size", "d_state", "state_dim", "ssm_state_size"]))
self.gguf_writer.add_ssm_group_count(self.n_group)
self.gguf_writer.add_ssm_inner_size(self.d_inner)
# NOTE: The mamba_dt_rank is _not_ the right field for how this is used
# in llama.cpp
self.gguf_writer.add_ssm_time_step_rank(self.find_hparam(["n_heads", "num_heads"]))
## Attention params ##
head_count_kv = self.find_hparam(["num_key_value_heads", "n_head_kv"])
head_count_kv_vec = [
head_count_kv if i in self._attn_layers else 0 for i in range(self.block_count)
]
if rope_dim := self.hparams.get("attn_rotary_emb"):
self.gguf_writer.add_rope_dimension_count(rope_dim)
self.gguf_writer.add_head_count_kv(head_count_kv_vec)
## If Bamba or non-hybrid, use rope, otherwise don't
use_rope = (
"BambaForCausalLM" in self.hparams["architectures"]
or not self._ssm_layers
)
self.gguf_writer.add_rope_scaling_finetuned(use_rope)
if not use_rope:
self.gguf_writer.add_context_length(2**20)
## Validation ##
d_head = self.find_hparam(["d_head"], optional=True) or 64
assert self.hparams.get("hidden_act") in [None, "silu"], "Only SILU activation supported"
assert self.d_inner % d_head == 0, f"SSM inner size {self.d_inner} not a multiple of head dim {d_head}"
def set_vocab(self):
# For models with no ssm layers, don't pad for mamba2
self.hparams["pad_vocab_size_multiple"] = 8 if self._ssm_layers else 1
Mamba2Model.set_vocab(self)
@ModelBase.register("GraniteSpeechForConditionalGeneration")
class GraniteSpeechMmprojModel(MmprojModel):
has_vision_encoder = False
has_audio_encoder = True
_batch_norm_tensors: list[dict[str, Tensor]] | None = None
def get_audio_config(self) -> dict[str, Any] | None:
return self.global_config.get("encoder_config")
def set_gguf_parameters(self):
assert self.hparams_audio is not None
a = self.hparams_audio
a["hidden_size"] = a["hidden_dim"]
a["intermediate_size"] = a["hidden_dim"] * a["feedforward_mult"]
a["num_attention_heads"] = a["num_heads"]
a["num_hidden_layers"] = a["num_layers"]
super().set_gguf_parameters()
self.gguf_writer.add_clip_projector_type(gguf.VisionProjectorType.GRANITE_SPEECH)
self.gguf_writer.add_audio_num_mel_bins(a["input_dim"])
self.gguf_writer.add_audio_attention_layernorm_eps(1e-5)
self.gguf_writer.add_audio_chunk_size(a["context_size"])
self.gguf_writer.add_audio_conv_kernel_size(a["conv_kernel_size"])
self.gguf_writer.add_audio_max_pos_emb(a["max_pos_emb"])
p = self.global_config
self.gguf_writer.add_audio_projector_window_size(p["window_size"])
self.gguf_writer.add_audio_projector_downsample_rate(p["downsample_rate"])
self.gguf_writer.add_audio_projector_head_count(p["projector_config"]["num_attention_heads"])
def tensor_force_quant(self, name, new_name, bid, n_dims):
if "encoder" in name or "projector" in name:
if ".conv" in name and ".weight" in name:
return gguf.GGMLQuantizationType.F32
return super().tensor_force_quant(name, new_name, bid, n_dims)
@classmethod
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
name, gen = item
if "attention_dists" in name or "num_batches_tracked" in name:
return None
return super().filter_tensors(item)
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
# fold running_mean, running_var and eps into weight and bias for batch_norm
if "batch_norm" in name and "encoder.layers." in name:
if self._batch_norm_tensors is None:
self._batch_norm_tensors = [{} for _ in range(self.block_count)]
assert bid is not None
self._batch_norm_tensors[bid][name] = data_torch
if len(self._batch_norm_tensors[bid]) < 4:
return
prefix = f"encoder.layers.{bid}.conv.batch_norm"
weight = self._batch_norm_tensors[bid][f"{prefix}.weight"]
bias = self._batch_norm_tensors[bid][f"{prefix}.bias"]
running_mean = self._batch_norm_tensors[bid][f"{prefix}.running_mean"]
running_var = self._batch_norm_tensors[bid][f"{prefix}.running_var"]
eps = 1e-5
a = weight / torch.sqrt(running_var + eps)
b = bias - running_mean * a
yield from super().modify_tensors(a, f"encoder.layers.{bid}.conv.batch_norm.weight", bid)
yield from super().modify_tensors(b, f"encoder.layers.{bid}.conv.batch_norm.bias", bid)
return
if ".attn.to_kv.weight" in name:
k_weight, v_weight = data_torch.chunk(2, dim=0)
yield from super().modify_tensors(k_weight, name.replace("to_kv", "to_k"), bid)
yield from super().modify_tensors(v_weight, name.replace("to_kv", "to_v"), bid)
return
if ("up_conv" in name or "down_conv" in name) and name.endswith(".weight"):
if data_torch.ndim == 3 and data_torch.shape[2] == 1:
data_torch = data_torch.squeeze(2)
if "depth_conv" in name and name.endswith(".weight"):
if data_torch.ndim == 3 and data_torch.shape[1] == 1:
data_torch = data_torch.squeeze(1)
yield from super().modify_tensors(data_torch, name, bid)
@ModelBase.register("GraniteSpeechPlusForConditionalGeneration")
class GraniteSpeechPlusMmprojModel(GraniteSpeechMmprojModel):
"""Conversion for GraniteSpeechPlus - extends GraniteSpeech with feature layer concatenation"""
has_vision_encoder = False
has_audio_encoder = True
def set_gguf_parameters(self):
assert self.hparams_audio is not None
super().set_gguf_parameters()
# Add feature_layer if present in encoder config
if feature_layers := self.hparams_audio.get("cat_hidden_layers"):
self.gguf_writer.add_audio_feature_layers(feature_layers)
logger.info(f"gguf: audio feature_layers = {feature_layers}")
# Validate projector dimension matches concatenated encoder output
hidden_dim = self.hparams_audio["hidden_dim"]
expected_dim = hidden_dim * (len(feature_layers) + 1)
projector_dim = self.global_config["projector_config"]["encoder_hidden_size"]
if projector_dim != expected_dim:
raise ValueError(
f"Projector encoder_hidden_size ({projector_dim}) does not match "
f"expected concatenated dimension ({expected_dim}). "
f"Expected: hidden_dim ({hidden_dim}) * (len(feature_layers) + 1) = {expected_dim}"
)
@ModelBase.register("Granite4VisionForConditionalGeneration")
class Granite4VisionMmprojModel(MmprojModel):
has_vision_encoder = True
has_audio_encoder = False
@staticmethod
def get_normalized_projector_map(global_config: dict) -> list[tuple[int, int, str, int]]:
"""Normalize both deepstack and spatial projector maps to the form:
(vision_layer, llm_layer, <type>, type_index)
This is then used to populate the following mappings:
- vision_feature_layers (mmproj hparam): ordered list of all
vision_layer values where order corresponds with the order of the
stacked projector tensors
NOTE: Values may appear multiple times for spatial projectors
- tensor_prefix_map (mmproj tensors): mapping from tensor prefixes to
the index of the corresponding projector in the stacked tensors
- deepstack_layer_arr (llm hparam): per-text-layer array indicating
which input vision feature should be injected at that layer
(-1 if none)
Output: (vision_layer, llm_layer, <type>, type_index)
"""
deepstack_map = global_config.get("deepstack_layer_map", []) # [[vis_layer, llm_layer], ...]
spatial_layers = global_config.get("spatial_target_layers", []) # [llm_layer, ...]
n_text_layers = global_config["text_config"]["num_hidden_layers"]
n_vision_layers = global_config["vision_config"]["num_hidden_layers"]
normalized_projector_map = []
if deepstack_map:
for deepstack_idx, (vision_layer, llm_layer) in enumerate(sorted(deepstack_map)):
if vision_layer < 0:
vision_layer = n_vision_layers + vision_layer
if llm_layer < 0:
llm_layer = n_text_layers + llm_layer
normalized_projector_map.append((vision_layer, llm_layer, "layerwise", deepstack_idx))
if spatial_layers:
spatial_vision_layer = global_config.get("spatial_vision_layer", -1)
if spatial_vision_layer < 0:
spatial_vision_layer = n_vision_layers + spatial_vision_layer
for spatial_idx, llm_layer in enumerate(spatial_layers):
normalized_projector_map.append((spatial_vision_layer, llm_layer, "spatial", spatial_idx))
return list(sorted(normalized_projector_map, key=(lambda entry: entry[1])))
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
normalized_projector_map = self.get_normalized_projector_map(self.global_config)
self._n_proj = len(normalized_projector_map)
self._tensor_prefix_map = {
f"model.{proj_type}_projectors.{type_idx}": proj_idx
for proj_idx, (_, _, proj_type, type_idx) in enumerate(normalized_projector_map)
}
self._vision_feature_layers = [vision_layer for vision_layer, _, _, _ in normalized_projector_map]
self._spatial_offsets = [
type_idx if proj_type == "spatial" else -1
for _, _, proj_type, type_idx in normalized_projector_map
]
def set_gguf_parameters(self):
assert self.hparams_vision is not None
super().set_gguf_parameters()
self.gguf_writer.add_clip_projector_type(gguf.VisionProjectorType.GRANITE4_VISION)
# SigLIP encoder hparams
self.gguf_writer.add_vision_attention_layernorm_eps(self.hparams.get("layer_norm_eps", 1e-6))
self.gguf_writer.add_vision_use_gelu(True)
# Preprocessor
self.gguf_writer.add_vision_preproc_image_size(self.hparams.get("image_size", 384))
# QFormer projector config
ds_rate = self.global_config["downsample_rate"]
ds_parts = ds_rate.split("/")
assert len(ds_parts) == 2, f"Invalid 'downsample_rate' value: {ds_rate}"
query_side, window_side = [int(p) for p in ds_parts]
self.gguf_writer.add_vision_projector_query_side(query_side)
self.gguf_writer.add_vision_projector_window_side(window_side)
# Set vision feature layers
self.gguf_writer.add_vision_feature_layers(self._vision_feature_layers)
# Set the spatial offests per projector
self.gguf_writer.add_vision_spatial_offsets(self._spatial_offsets)
# Add flattened image grind pinpoints (resolution candidates internally)
if pinpoints := self.global_config.get("image_grid_pinpoints"):
# Flatten with h, w -> w, h inversion
pinpoints = [val for h, w in pinpoints for val in (w, h)]
self.gguf_writer.add_vision_image_grid_pinpoints(pinpoints)
@classmethod
def filter_tensors(cls, item: tuple[str, Callable[[], Tensor]]) -> tuple[str, Callable[[], Tensor]] | None:
name, _ = item
if ("vision_model.head" in name or name.startswith("lm_head")):
return None
return super().filter_tensors(item)
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
# Detect projector tensors and bin them
projector_idx = None
for prefix, proj_idx in self._tensor_prefix_map.items():
if name.startswith(prefix):
projector_idx = proj_idx
break
if projector_idx is not None:
# If this projector tensor has a block id within the projector,
# alias the bid to projector_idx
#
# TODO: currently, none of the Granite 4 Vision models have
# projectors with multiple QFormer layers, so the `layer.{}` index
# is always 0. This allows us to simply map to a single `bid` that
# matches the projector index. If this changes, we'll need a
# convention that merges the two IDs.
id_matches = list(re.finditer(r"\.([0-9]+)\.", name))
all_ids = [int(m.group(1)) for m in id_matches]
assert len(all_ids) >= 1 and len(all_ids) <= 2, "Must have at least 1 and at most 2 ids in tensor names"
# If not layer id, just use the projector index
new_bid = projector_idx
if len(all_ids) == 1:
new_name = name[:id_matches[0].span(1)[0]] + str(new_bid) + name[id_matches[0].span(1)[1]:]
else: # len(all_ids) == 2
new_bid = projector_idx # + all_ids[1]
new_name = name[:id_matches[0].span(0)[0]] + name[id_matches[0].span(1)[1]:id_matches[1].span(1)[0]] + str(new_bid) + name[id_matches[1].span(1)[1]:]
yield from super().modify_tensors(data_torch, new_name, new_bid)
return
yield from super().modify_tensors(data_torch, name, bid)