diff --git a/examples/bench.py b/examples/bench.py index 453c64cd..6c5e68ef 100644 --- a/examples/bench.py +++ b/examples/bench.py @@ -1,11 +1,10 @@ import infinicore -from transformers import AutoTokenizer from infinilm.modeling_utils import load_model_state_dict_by_file from infinilm.distributed import DistConfig from infinilm.infer_engine import GenerationConfig, InferEngine from infinilm.base_config import BaseConfig from infinilm.cache import StaticKVCacheConfig, PagedKVCacheConfig -import argparse +from infinilm.processors import AutoInfinilmProcessor import sys import time import os @@ -104,7 +103,6 @@ def repeat_prompt(input_ids: list[int], target_length: int): class TestModel: model: infinicore.nn.Module - tokenizer: AutoTokenizer input_ids_list: list[int] def __init__( @@ -140,39 +138,25 @@ def __init__( # ---------------------------------------------------------------------------- # # 创建 tokenizer # ---------------------------------------------------------------------------- # - tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) - - if tokenizer.pad_token is None: - if tokenizer.eos_token is not None: - tokenizer.pad_token = tokenizer.eos_token - tokenizer.pad_token_id = tokenizer.eos_token_id - else: - tokenizer.add_special_tokens({"pad_token": "[PAD]"}) + self.processor = AutoInfinilmProcessor.from_pretrained(model_path) + self.tokenizer = self.processor.get_tokenizer() # ---------------------------------------------------------------------------- # # token编码 # ---------------------------------------------------------------------------- # - input_content = [ - tokenizer.apply_chat_template( - conversation=[{"role": "user", "content": prompt}], - add_generation_prompt=True, - tokenize=False, - ) - ] - - # print(input_content, end="", flush=True) - # Support Transformers >= 5.0 for batch_encode_plus deprecation - encoding = tokenizer( - input_content, - padding=True, - truncation=True, - max_length=8192, + input_content = self.processor.apply_chat_template( + conversation=[{"role": "user", "content": prompt}], + add_generation_prompt=True, + tokenize=False, ) - input_ids_list = encoding["input_ids"] + input_ids_list = [ + self.tokenizer.encode( + input_content, + ) + ] self.model = model - self.tokenizer = tokenizer self.input_ids_list = input_ids_list def run( diff --git a/examples/test_infer.py b/examples/test_infer.py index abec5d00..97bbdd3a 100644 --- a/examples/test_infer.py +++ b/examples/test_infer.py @@ -1,32 +1,14 @@ -import infinicore -import transformers -from transformers import AutoTokenizer -from tokenizers import decoders as _dec -from infinilm.modeling_utils import load_model_state_dict_by_file -from infinilm.distributed import DistConfig -from infinilm.infer_engine import GenerationConfig, InferEngine -import argparse -import sys import time import os -import numpy as np -from infinilm.cache import StaticKVCacheConfig, PagedKVCacheConfig -from packaging import version from infinilm.base_config import BaseConfig - -from PIL import Image -import torch - -sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../python")) - -_PAGED_KV_BLOCK_SIZE = 256 +from infinilm.llm.llm import LLM def test( - prompts: str | list[str], + prompts: list[str], model_path, max_new_tokens=100, - infini_device=infinicore.device("cpu", 0), + device="cpu", tp=1, enable_paged_attn=False, enable_graph=False, @@ -35,6 +17,7 @@ def test( temperature=1.0, attn_backend="default", image_path=None, + skip_load=False, ): model_path = os.path.expanduser(model_path) # ---------------------------------------------------------------------------- # @@ -43,226 +26,46 @@ def test( if enable_paged_attn and attn_backend == "default": attn_backend = "paged-attn" - model = InferEngine( - model_path, - device=infini_device, - distributed_config=DistConfig(tp), - enable_graph_compiling=enable_graph, - attention_backend=attn_backend, - kv_cache_dtype=cfg.kv_cache_dtype, + model = LLM( + model_path=model_path, + device=device, + tensor_parallel_size=tp, + cache_type="paged" if enable_paged_attn else "static", + max_batch_size=len(prompts), + max_tokens=max_new_tokens, + temperature=temperature, + top_k=top_k, + top_p=top_p, + enable_graph=enable_graph, + attn_backend=attn_backend, + skip_load=skip_load, ) - # ---------------------------------------------------------------------------- # - # Load Weights - # ---------------------------------------------------------------------------- # - load_model_state_dict_by_file(model, model_path, dtype=model.dtype) - # ---------------------------------------------------------------------------- # - # create tokenizer - # ---------------------------------------------------------------------------- # - tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) - - processor = None + conversations = [ + {"role": "user", "content": [{"type": "text", "text": prompt}]} + for prompt in prompts + ] if image_path is not None: - if model.model_type == "minicpmv": - from transformers import AutoProcessor - - processor = AutoProcessor.from_pretrained( - model_path, trust_remote_code=True - ) - tokenizer = processor.tokenizer - - if "llama" == model.model_type: - backend = getattr(tokenizer, "backend_tokenizer", None) - target = getattr(backend, "_tokenizer", backend) - norm = getattr(target, "normalizer", None) - dec = getattr(target, "decoder", None) - sn = repr(norm)[:800] if norm is not None else "" - sd = repr(dec)[:800] if dec is not None else "" - has_prepend = "Prepend" in sn - has_strip = "Strip" in sd - if has_prepend and has_strip: - target.decoder = _dec.Sequence( - [ - _dec.Replace("▁", " "), - _dec.ByteFallback(), - _dec.Fuse(), - ] - ) - - # ---------------------------------------------------------------------------- # - # tokenize - # ---------------------------------------------------------------------------- # - # prompt = "山东最高的山是?" - if isinstance(prompts, str): - prompts = [prompts] - - if image_path is not None: - updated_prompts = [] - for prompt in prompts: - if model.model_type == "minicpmv" and "" not in prompt: - prompt = "(./)\n" + prompt - updated_prompts.append(prompt) - prompts = updated_prompts - - if hasattr(tokenizer, "chat_template") and tokenizer.chat_template is not None: - input_contents = [ - tokenizer.apply_chat_template( - conversation=[{"role": "user", "content": prompt}], - add_generation_prompt=True, - tokenize=False, - ) - for prompt in prompts - ] - else: - input_contents = prompts - - pixel_values = None - image_bound = None - tgt_sizes = None - if image_path is not None and processor is not None: - image = Image.open(image_path).convert("RGB") - if model.model_type == "minicpmv": - images = [[image] for _ in range(len(input_contents))] - else: - images = [image for _ in range(len(input_contents))] - if model.model_type == "minicpmv": - inputs = processor( - text=input_contents, - images=images, - return_tensors="pt", - ) - input_ids = inputs["input_ids"] - input_ids_list = input_ids.tolist() - pixel_values = inputs["pixel_values"] - tgt_sizes = inputs["tgt_sizes"] - image_bound = inputs["image_bound"] - else: - raise ValueError(f"Unsupported multimodal model_type: {model.model_type}") - else: - if hasattr(tokenizer, "batch_encode_plus"): - input_ids_list = tokenizer.batch_encode_plus(input_contents)["input_ids"] - elif hasattr(tokenizer, "_encode_plus"): - input_ids_list = tokenizer._encode_plus(input_contents)["input_ids"] - else: - input_ids_list = tokenizer(input_contents)[ - "input_ids" - ] # List: [[1, 1128, 526, 366, 29892]] - - # input_ids_list = tokenizer.batch_encode_plus(input_contents)[ - # "input_ids" - # ] # List: [[1, 1128, 526, 366, 29892]] - if version.parse(transformers.__version__) < version.parse("5.0.0"): - # Ideally this is solved by upgrading transformers. However, doing so causes version mismatch between transformers and mlu pytorch on devices with Phytium CPU. So a branch is temporarily used. - input_ids_list = [ - tokenizer.encode_plus( - text, truncation=True, max_length=2048, add_special_tokens=True - )["input_ids"] - for text in input_contents - ] - else: - input_ids_list = [ - tokenizer._encode_plus( - text, truncation=True, max_length=2048, add_special_tokens=True - )["input_ids"] - for text in input_contents - ] - - # ---------------------------------------------------------------------------- # - # Create KVCache - # ---------------------------------------------------------------------------- # - if enable_paged_attn: - batch_size = 1 if prompts is str else len(prompts) - max_total_tokens = max_new_tokens + len(input_ids_list[0]) - cache_config = PagedKVCacheConfig( - num_blocks=( - (max_total_tokens + (_PAGED_KV_BLOCK_SIZE - 1)) // _PAGED_KV_BLOCK_SIZE - ) - * batch_size, - block_size=_PAGED_KV_BLOCK_SIZE, - ) - else: - batch_size = 1 if prompts is str else len(prompts) - initial_capacity = max_new_tokens + len(input_ids_list[0]) - cache_config = StaticKVCacheConfig( - max_batch_size=batch_size, max_cache_len=initial_capacity - ) - - model.reset_cache(cache_config) - - # ---------------------------------------------------------------------------- # - # Generate - # ---------------------------------------------------------------------------- # - print(input_contents[0], end="", flush=True) - input_ids_infini = infinicore.from_list(input_ids_list) - - # Process multimodal inputs if needed - pixel_values_infini = None - image_bound_infini = None - tgt_sizes_infini = None - if image_path is not None and processor is not None: - # TODO: Factor out this part per future multimodal model support. - if model.model_type == "minicpmv": - torch_dtype = infinicore.utils.to_torch_dtype(model.dtype) - - # 1. Pixel values - all_pixel_values = [] - assert ( - len(pixel_values) == 1 - ), "Only batch_size=1 is supported yet for image inputs." - for pv in pixel_values: - all_pixel_values.extend( - [i.flatten(end_dim=1).permute(1, 0) for i in pv] - ) - - pixel_values_tensor = torch.nn.utils.rnn.pad_sequence( - all_pixel_values, batch_first=True, padding_value=0.0 - ).to(dtype=torch_dtype) - B, L, _ = pixel_values_tensor.shape - pixel_values_tensor = ( - pixel_values_tensor.permute(0, 2, 1).reshape(B, 3, -1, L).contiguous() - ) - pixel_values_infini = infinicore.from_torch(pixel_values_tensor) - - # 2. tgt_sizes - all_tgt_sizes = [ - tgt_size for tgt_size in tgt_sizes if isinstance(tgt_size, torch.Tensor) - ] - - tgt_sizes_tensor = torch.vstack(all_tgt_sizes).to(torch.int64) - - tgt_sizes_infini = infinicore.from_torch(tgt_sizes_tensor) - - # 3. image_bound - batch_size = len(image_bound) - max_ranges = max(len(b) for b in image_bound) - - bound_np = np.zeros((batch_size, max_ranges, 2), dtype=np.int64) - - for i, bnd in enumerate(image_bound): - if len(bnd) > 0: - bound_np[i, : len(bnd), :] = bnd.cpu().numpy() - - image_bound_infini = infinicore.from_numpy(bound_np) + for conversation in conversations: + conversation["content"] = [ + {"type": "image_url", "image_url": {"url": image_path}} + ] + conversation["content"] t1 = time.time() print("=================== start generate ====================") - output_ids = model.generate( - input_ids_infini, - GenerationConfig( - max_new_tokens=max_new_tokens, - temperature=temperature, - top_k=top_k, - top_p=top_p, - ), - _measure_and_log_time=True, - pixel_values=pixel_values_infini, - image_bound=image_bound_infini, - tgt_sizes=tgt_sizes_infini, + + outputs = model.chat( + messages=conversations, ) t2 = time.time() - numpy_output_ids = np.array([output_id.to_numpy()[0] for output_id in output_ids]) - print(tokenizer.decode(numpy_output_ids, skip_special_tokens=True)) + for i, output in enumerate(outputs): + print(f"Resquest {i}:") + print("===Query===") + print(output.prompt) + print("===Response===") + print(output.outputs[0].text) + print("") print( f"total_time: {round((t2 - t1) * 1000, 2)} ms", @@ -276,30 +79,21 @@ def test( prompts = [cfg.prompt for _ in range(cfg.batch_size)] - _PAGED_KV_BLOCK_SIZE = cfg.block_size - model_path = cfg.model max_new_tokens = cfg.max_new_tokens - backend = cfg.backend - tp = cfg.tp enable_paged_attn = cfg.enable_paged_attn enable_graph = cfg.enable_graph - if backend != "cpp": - raise ValueError(f"Unsupported backend: {backend}.") - - infini_device = infinicore.device(device_str, 0) - test( prompts, model_path, max_new_tokens, - infini_device=infini_device, + device=device_str, tp=tp, enable_paged_attn=enable_paged_attn, enable_graph=enable_graph, @@ -308,4 +102,5 @@ def test( temperature=cfg.temperature, attn_backend=cfg.attn, image_path=cfg.image, + skip_load=cfg.skip_load, ) diff --git a/python/infinilm/llm/llm.py b/python/infinilm/llm/llm.py index b640cf20..cba3af83 100644 --- a/python/infinilm/llm/llm.py +++ b/python/infinilm/llm/llm.py @@ -14,9 +14,6 @@ from typing import List, Optional, Union, AsyncIterator from dataclasses import dataclass -from transformers import AutoTokenizer -from tokenizers import decoders as _dec - import infinicore from infinilm.llm.request import ( @@ -28,11 +25,12 @@ from infinilm.llm.sampling_params import SamplingParams from infinilm.llm.scheduler import Scheduler from infinilm.llm.static_scheduler import StaticScheduler - +from infinilm.processors import AutoInfinilmProcessor from infinilm.distributed import DistConfig from infinilm.infer_engine import InferEngine from infinilm.cache.cache import PagedKVCacheConfig, StaticKVCacheConfig from infinilm.modeling_utils import load_model_state_dict_by_file +from infinilm.multimodal.multimodal import resolve_multimodal_inputs logger = logging.getLogger(__name__) @@ -57,6 +55,7 @@ class EngineConfig: top_k: Default top-k sampling parameter. enable_graph: Whether to enable graph compiling. attn_backend: Attention backend to use ('default', 'flash-attn'). + skip_load: Whether to skip loading model weights (for testing). """ model_path: str @@ -74,6 +73,7 @@ class EngineConfig: top_k: int = 1 enable_graph: bool = False attn_backend: str = "default" + skip_load: bool = False class LLMEngine: @@ -95,15 +95,14 @@ def __init__(self, config: EngineConfig): ) # Load model weights - load_model_state_dict_by_file( - self.model_engine, config.model_path, dtype=self.model_engine.dtype - ) + if not self.config.skip_load: + load_model_state_dict_by_file( + self.model_engine, config.model_path, dtype=self.model_engine.dtype + ) - # Initialize tokenizer - self.tokenizer = AutoTokenizer.from_pretrained( - config.model_path, trust_remote_code=True - ) - self._fix_tokenizer_decoder() + # Initialize processor/tokenizer + self.processor = AutoInfinilmProcessor.from_pretrained(config.model_path) + self.tokenizer = self.processor.get_tokenizer() # Initialize KV cache based on cache type if config.cache_type == "static": @@ -166,26 +165,6 @@ def _init_device(self): self.dtype = dtype_map[self.config.dtype] - def _fix_tokenizer_decoder(self): - """Fix tokenizer decoder for llama models.""" - if "llama" in self.model_engine.model_type.lower(): - backend = getattr(self.tokenizer, "backend_tokenizer", None) - target = getattr(backend, "_tokenizer", backend) - norm = getattr(target, "normalizer", None) - dec = getattr(target, "decoder", None) - sn = repr(norm)[:800] if norm is not None else "" - sd = repr(dec)[:800] if dec is not None else "" - has_prepend = "Prepend" in sn - has_strip = "Strip" in sd - if has_prepend and has_strip: - target.decoder = _dec.Sequence( - [ - _dec.Replace("▁", " "), - _dec.ByteFallback(), - _dec.Fuse(), - ] - ) - def add_request(self, request: InferenceRequest): """Add a request to the scheduler.""" self.scheduler.add_request(request) @@ -204,10 +183,12 @@ def step(self) -> tuple[list[InferenceRequest], list[tuple]]: return [], [] # Build model inputs - model_input_dict = scheduler_output.build_model_inputs( - self.config.temperature, self.config.top_p, self.config.top_k + model_input = self.processor.build_model_inputs( + scheduler_output, + self.config.temperature, + self.config.top_p, + self.config.top_k, ) - model_input = self._prepare_model_input(model_input_dict) # Run inference sampled_tokens = self.model_engine.forward(**model_input) @@ -222,28 +203,6 @@ def step(self) -> tuple[list[InferenceRequest], list[tuple]]: return scheduler_output.scheduled_requests, pending - def _prepare_model_input(self, model_input_dict: dict) -> dict: - """Convert model input dict to infinicore tensors.""" - model_input = {} - for key, value in model_input_dict.items(): - if value is None: - # Skip None values (block_tables/slot_mapping for static cache) - model_input[key] = None - elif key in ["input_ids", "position_ids", "slot_mapping"]: - model_input[key] = infinicore.from_list(value, dtype=infinicore.int64) - elif key in [ - "past_kv_lengths", - "total_kv_lengths", - "input_offsets", - "cu_seqlens", - "block_tables", - ]: - model_input[key] = infinicore.from_list(value, dtype=infinicore.int32) - else: - # temperature, top_k, top_p, etc. - model_input[key] = value - return model_input - def _update_requests( self, is_prefill: bool, @@ -361,6 +320,12 @@ def detokenize(self, token_ids: List[int]) -> str: """Detokenize token IDs to text.""" return self.tokenizer.decode(token_ids) + def process(self, prompt, images, videos, audios, **kwargs) -> dict: + """Process the input prompt and media into final model inputs.""" + return self.processor( + prompt, images=images, videos=videos, audios=audios, **kwargs + ) + def apply_chat_template( self, messages: List[dict], @@ -369,7 +334,7 @@ def apply_chat_template( ) -> str: """Apply chat template to messages.""" chat_template_kwargs = chat_template_kwargs or {} - return self.tokenizer.apply_chat_template( + return self.processor.apply_chat_template( conversation=messages, add_generation_prompt=add_generation_prompt, tokenize=False, @@ -397,6 +362,7 @@ def __init__( top_k: int = 1, enable_graph: bool = False, attn_backend: str = "default", + skip_load: bool = False, ): """Initialize LLM. @@ -433,13 +399,15 @@ def __init__( top_k=top_k, enable_graph=enable_graph, attn_backend=attn_backend, + skip_load=skip_load, ) self.engine = LLMEngine(config) self.config = config def generate( self, - prompts: Union[str, List[str]], + prompts: Union[str, List[str]] = None, + messages: Union[List[dict], List[List[dict]]] = None, sampling_params: Optional[SamplingParams] = None, use_tqdm: bool = True, ) -> List[RequestOutput]: @@ -455,6 +423,14 @@ def generate( """ if isinstance(prompts, str): prompts = [prompts] + if isinstance(messages, list) and isinstance(messages[0], dict): + messages = [messages] + + contents = prompts + apply_chat_template = False + if messages: + contents = messages + apply_chat_template = True if sampling_params is None: sampling_params = SamplingParams(max_tokens=self.config.max_tokens) @@ -463,13 +439,29 @@ def generate( sampling_params.max_tokens = self.config.max_tokens requests = [] - for prompt in prompts: + for content in contents: request_id = f"cmpl-{uuid.uuid4().hex}" - token_ids = self.engine.tokenize(prompt) + processed_inputs = None + if apply_chat_template: + prompt = self.engine.apply_chat_template( + content, add_generation_prompt=True + ) + + images, videos, audios = resolve_multimodal_inputs(content) + processed_inputs = self.engine.process( + prompt, images, videos, audios, return_tensors="pt" + ) + + prompt_token_ids = processed_inputs.get("input_ids").flatten().tolist() + else: + prompt = content + prompt_token_ids = self.engine.tokenize(prompt) + req = InferenceRequest( request_id=request_id, prompt=prompt, - prompt_token_ids=token_ids, + prompt_token_ids=prompt_token_ids, + processed_inputs=processed_inputs, sampling_params=sampling_params, eos_token_ids=self.engine.eos_token_ids, ) @@ -523,14 +515,9 @@ def chat( if messages and isinstance(messages[0], dict): messages = [messages] - prompts = [] - for conversation in messages: - prompt = self.engine.apply_chat_template( - conversation, add_generation_prompt=True - ) - prompts.append(prompt) - - return self.generate(prompts, sampling_params, use_tqdm) + return self.generate( + messages=messages, sampling_params=sampling_params, use_tqdm=use_tqdm + ) class AsyncLLMEngine: @@ -654,6 +641,9 @@ def _batch_put(pending): def add_request( self, + messages: Optional[List[dict]], + apply_chat_template: bool = True, + add_generation_prompt: bool = True, prompt: Optional[str] = None, prompt_token_ids: Optional[List[int]] = None, sampling_params: Optional[SamplingParams] = None, @@ -665,8 +655,28 @@ def add_request( """Add a request to the engine. Args: - prompt: Text prompt for generation. - prompt_token_ids: Pre-tokenized prompt. + messages: List of message dicts (chat conversation). Following this format: + [ + { + "role": "user", + "content": [ + { + "type": "text", + "text": "xxxxxxxxx" + }, + { + "type": "image_url", + "image_url": { + "url": "xxx.jpg" + } + }, + ] + }, + ] + apply_chat_template: Whether to apply the chat template. + add_generation_prompt: Whether to add a generation prompt. + prompt: Text prompt for generation. If provided, it will be used directly after encoded by tokenizer, ignoring messages. + prompt_token_ids: Pre-tokenized prompt. If provided, it will be used directly as input. sampling_params: Sampling parameters. request_id: Optional request ID. request_data: Optional request data dict (for server use). @@ -678,8 +688,32 @@ def add_request( if request_id is None: request_id = f"cmpl-{uuid.uuid4().hex}" - if prompt_token_ids is None and prompt is not None: + images, videos, audios = None, None, None + processed_inputs = None + + if prompt_token_ids is not None: + prompt = self.engine.detokenize(prompt_token_ids) + elif prompt is not None: prompt_token_ids = self.engine.tokenize(prompt) + else: + assert messages is not None, ( + "Either messages or prompt/prompt_token_ids must be provided" + ) + + assert apply_chat_template, ( + "apply_chat_template needs to be true for multi-role conversation" + ) + + prompt = self.engine.apply_chat_template( + messages, add_generation_prompt=add_generation_prompt + ) + + images, videos, audios = resolve_multimodal_inputs(messages) + processed_inputs = self.engine.process( + prompt, images, videos, audios, return_tensors="pt" + ) + + prompt_token_ids = processed_inputs.get("input_ids").flatten().tolist() if sampling_params is None: sampling_params = SamplingParams(max_tokens=self.config.max_tokens) @@ -691,6 +725,7 @@ def add_request( request_id=request_id, prompt=prompt, prompt_token_ids=prompt_token_ids, + processed_inputs=processed_inputs, sampling_params=sampling_params, eos_token_ids=self.engine.eos_token_ids, request_data=request_data, @@ -711,7 +746,7 @@ def add_chat_request( request_data: Optional[dict] = None, http_request: Optional[any] = None, add_generation_prompt: bool = True, - chat_template_kwargs: Optional[dict] = None, + **kwargs, ) -> InferenceRequest: """Add a chat request to the engine. @@ -725,13 +760,11 @@ def add_chat_request( Returns: The created InferenceRequest object. """ - prompt = self.engine.apply_chat_template( - messages, - add_generation_prompt=add_generation_prompt, - chat_template_kwargs=chat_template_kwargs, - ) + return self.add_request( - prompt=prompt, + messages=messages, + apply_chat_template=True, + add_generation_prompt=add_generation_prompt, sampling_params=sampling_params, request_id=request_id, request_data=request_data, diff --git a/python/infinilm/llm/request.py b/python/infinilm/llm/request.py index 1c96fc23..15bcf69f 100644 --- a/python/infinilm/llm/request.py +++ b/python/infinilm/llm/request.py @@ -105,6 +105,7 @@ def __init__( request_id: str, prompt: Optional[str] = None, prompt_token_ids: Optional[List[int]] = None, + processed_inputs: Optional[dict] = None, sampling_params: Optional[SamplingParams] = None, eos_token_ids: Optional[List[int]] = None, arrival_time: Optional[float] = None, @@ -112,13 +113,15 @@ def __init__( request_data: Optional[dict] = None, http_request: Optional[Any] = None, ): + self.arrival_time: float = arrival_time or time.time() + self.finished_time: Optional[float] = None + # Request metadata self.request_id: str = request_id self.prompt: Optional[str] = prompt self.prompt_token_ids: List[int] = prompt_token_ids or [] self.prompt_length: int = len(self.prompt_token_ids) - self.arrival_time: float = arrival_time or time.time() - self.finished_time: Optional[float] = None + self.processed_inputs: Optional[dict] = processed_inputs # Sampling parameters self.sampling_params: SamplingParams = sampling_params or SamplingParams() diff --git a/python/infinilm/llm/scheduler.py b/python/infinilm/llm/scheduler.py index 92691416..f9c11635 100644 --- a/python/infinilm/llm/scheduler.py +++ b/python/infinilm/llm/scheduler.py @@ -24,102 +24,6 @@ def __init__( self.num_requests = len(scheduled_requests) self.is_prefill = is_prefill - def build_model_inputs( - self, temperature: float = 1.0, top_p: float = 0.8, top_k: int = 1 - ): - """Construct model inputs for prefill or decode phase. - - Prefill phase: - - input_ids: Flattened token list (excluding cached tokens) - - position_ids: Position IDs for new tokens in complete sequence - - past_kv_lengths: Number of cached tokens per request - - total_kv_lengths: Total tokens (cached + new) per request - - input_offsets: Start position of each request in flattened array - - block_tables: Padded block_table for each request - - slot_mapping: Token to slot mappings - - Decode phase: - - input_ids: Only last generated token per request - - position_ids: Position of last token in complete sequence - - past_kv_lengths: Number of cached tokens per request - - total_kv_lengths: Total sequence length per request - - input_offsets: Offsets for each request - - block_tables: Padded block_table for each request - - slot_mapping: Single slot per request - """ - if not self.scheduled_requests: - raise RuntimeError( - "build_model_inputs called with empty scheduled_requests" - ) - - tokens = [] - seq_lens = [] - seq_offsets = [0] - block_tables = [] - slot_mapping = [] - cached_lens = [] - position_ids = [] - cu_seqlens = [0] - - max_block_table_len = max( - len(req.block_table) for req in self.scheduled_requests - ) - current_offset = 0 - - for req in self.scheduled_requests: - num_cached = req.num_cached_tokens - if self.is_prefill: - # Prefill phase - req_tokens = req.get_input_tokens() - tokens_to_compute = req_tokens[num_cached:] - tokens.extend(tokens_to_compute) - - compute_len = len(tokens_to_compute) - seq_len = len(req_tokens) - seq_lens.append(seq_len) - - current_offset += compute_len - seq_offsets.append(current_offset) - - slot_mapping.extend(req.slot_mapping) - cached_lens.append(num_cached) - position_ids.extend(range(num_cached, num_cached + compute_len)) - - else: - # Decode phase - seq_len = req.get_total_length() - last_token = req.generated_token_ids[-1] - tokens.append(last_token) - seq_lens.append(seq_len) - - current_offset += 1 - seq_offsets.append(current_offset) - - slot_mapping.extend(req.slot_mapping) - cached_lens.append(num_cached) - position_ids.append(seq_len - 1) - - # Pad block_table to same length - padded_block_table = req.block_table + [-1] * ( - max_block_table_len - len(req.block_table) - ) - block_tables.append(padded_block_table) - cu_seqlens.append(cu_seqlens[-1] + seq_len) - - return { - "input_ids": [tokens], - "position_ids": position_ids, - "past_kv_lengths": cached_lens, - "total_kv_lengths": seq_lens, - "input_offsets": seq_offsets, - "cu_seqlens": cu_seqlens, - "block_tables": block_tables, - "slot_mapping": slot_mapping, - "temperature": temperature, - "top_k": top_k, - "top_p": top_p, - } - class Scheduler: """Request scheduler with integrated BlockManager for KV cache management. diff --git a/python/infinilm/llm/static_scheduler.py b/python/infinilm/llm/static_scheduler.py index de4d9d35..4b9f1853 100644 --- a/python/infinilm/llm/static_scheduler.py +++ b/python/infinilm/llm/static_scheduler.py @@ -34,61 +34,6 @@ def __init__( self.is_prefill = is_prefill self.prefix_hit_len = prefix_hit_len - def build_model_inputs( - self, temperature: float = 1.0, top_p: float = 0.8, top_k: int = 1 - ): - """Construct model inputs for prefill or decode phase. - - Static cache model inputs: - - Prefill phase (with prefix cache reuse): - - input_ids: Tokens after the cached prefix [1, prompt_length - prefix_hit_len] - - position_ids: [prefix_hit_len, ..., prompt_length-1] - - past_kv_lengths: [prefix_hit_len] (reuse cached prefix) - - total_kv_lengths: [prompt_length] - - Decode phase: - - input_ids: Only the last generated token [1, 1] - - position_ids: [current_position] (position in full sequence) - - past_kv_lengths: [num_cached_tokens] - - total_kv_lengths: [total_tokens] - """ - req = self.scheduled_requests[0] - - if self.is_prefill: - # Prefill: only send tokens not already in cache - tokens = req.get_input_tokens() - prefix_hit_len = self.prefix_hit_len - input_tokens = tokens[prefix_hit_len:] - input_ids = [input_tokens] - position_ids = [list(range(prefix_hit_len, len(tokens)))] - past_kv_len = prefix_hit_len - total_kv_len = len(tokens) - input_offsets = [0, len(input_tokens)] - else: - # Decode: send only the last generated token - last_token = req.generated_token_ids[-1] - current_position = req.get_total_length() - 1 - input_ids = [[last_token]] - position_ids = [[current_position]] - past_kv_len = current_position - total_kv_len = req.get_total_length() - input_offsets = [0, 1] - - return { - "input_ids": input_ids, - "position_ids": position_ids, - "past_kv_lengths": [past_kv_len], - "total_kv_lengths": [total_kv_len], - "input_offsets": input_offsets, - "cu_seqlens": [0, total_kv_len], - "block_tables": None, - "slot_mapping": None, - "temperature": temperature, - "top_k": top_k, - "top_p": top_p, - } - class StaticScheduler: """Request scheduler for Static KV Cache with batch_size=1. diff --git a/python/infinilm/multimodal/multimodal.py b/python/infinilm/multimodal/multimodal.py new file mode 100644 index 00000000..7b067734 --- /dev/null +++ b/python/infinilm/multimodal/multimodal.py @@ -0,0 +1,29 @@ +from typing import List, Union +from PIL import Image + + +def resolve_multimodal_inputs(messages: Union[List[dict], dict]): + """Get images, videos, audios from the messages.""" + if isinstance(messages, dict): + messages = [messages] + + images = [] + videos = [] + audios = [] + + for msg in messages: + content = msg.get("content", []) + if not isinstance(content, list): + continue + + for item in content: + if item.get("type") == "text": + pass + elif item.get("type") == "image": + # TODO support other image url formats + images.append(Image.open(item["image_url"])) + + else: # TODO support video/audio + raise NotImplementedError("Only image input is supported for now") + + return images, videos, audios diff --git a/python/infinilm/processors/__init__.py b/python/infinilm/processors/__init__.py new file mode 100644 index 00000000..61adff6d --- /dev/null +++ b/python/infinilm/processors/__init__.py @@ -0,0 +1,18 @@ +from .processor import InfinilmProcessor +from .basic_llm_processor import BasicLLMProcessor +from .llama_processor import LlamaProcessor + +from transformers import AutoConfig + + +class AutoInfinilmProcessor: + @classmethod + def from_pretrained(cls, model_dir_path: str, **kwargs) -> InfinilmProcessor: + """Factory method to get the appropriate processor based on model config.""" + config = AutoConfig.from_pretrained(model_dir_path, trust_remote_code=True) + model_type = config.model_type.lower() + + if model_type in ["llama"]: + return LlamaProcessor(model_dir_path) + else: + return BasicLLMProcessor(model_dir_path) diff --git a/python/infinilm/processors/basic_llm_processor.py b/python/infinilm/processors/basic_llm_processor.py new file mode 100644 index 00000000..070a4062 --- /dev/null +++ b/python/infinilm/processors/basic_llm_processor.py @@ -0,0 +1,240 @@ +from .processor import InfinilmProcessor +from transformers import AutoTokenizer +from ..llm.static_scheduler import StaticSchedulerOutput +from ..llm.scheduler import SchedulerOutput + + +class BasicLLMProcessor(InfinilmProcessor): + def __init__(self, model_dir_path: str): + self.tokenizer = AutoTokenizer.from_pretrained( + model_dir_path, trust_remote_code=True + ) + + def __call__(self, prompt: str, return_tensors: str = None, **kwargs) -> dict: + if return_tensors is None: + return self.tokenizer(prompt) + elif return_tensors == "infini": + import infinicore + + result = {} + for key, tensor in self.tokenizer(prompt, return_tensors="pt").items(): + result[key] = tensor.from_torch(tensor) + return result + + # "pt" or "np" or "tf". + return self.tokenizer(prompt, return_tensors="pt") + + def apply_chat_template( + self, + conversation, + add_generation_prompt: bool = False, + tokenize: bool = True, + **kwargs, + ): + normalized_conversation = [] + for message in conversation: + if isinstance(message["content"], list): + assert len(message["content"]) == 1, "Only one content item supported in list" + content_item = message["content"][0] + assert "type" in content_item and "text" in content_item, "Content dict must have 'type' and 'text' keys" + normalized_conversation.append( + {"role": message["role"], "content": content_item["text"]} + ) + else: + normalized_conversation.append(message) + return self.tokenizer.apply_chat_template( + conversation=normalized_conversation, + add_generation_prompt=add_generation_prompt, + tokenize=tokenize, + **kwargs, + ) + + def build_model_inputs( + self, + scheduler_output: SchedulerOutput | StaticSchedulerOutput, + temperature: float = 1.0, + top_p: float = 0.8, + top_k: int = 1, + ) -> dict: + """Process a batch of data and return a dictionary of model inputs.""" + if isinstance(scheduler_output, StaticSchedulerOutput): + return self._build_model_input_from_static_scheduler_output( + scheduler_output, temperature, top_p, top_k + ) + elif isinstance(scheduler_output, SchedulerOutput): + return self._build_model_input_from_batch_scheduler_output( + scheduler_output, temperature, top_p, top_k + ) + else: + raise ValueError( + "scheduler_output must be an instance of SchedulerOutput or StaticSchedulerOutput" + ) + + def _build_model_input_from_static_scheduler_output( + self, scheduler_output: StaticSchedulerOutput, temperature, top_p, top_k + ) -> dict: + """Construct model inputs for prefill or decode phase. + + Static cache model inputs: + + Prefill phase (with prefix cache reuse): + - input_ids: Tokens after the cached prefix [1, prompt_length - prefix_hit_len] + - position_ids: [prefix_hit_len, ..., prompt_length-1] + - past_kv_lengths: [prefix_hit_len] (reuse cached prefix) + - total_kv_lengths: [prompt_length] + + Decode phase: + - input_ids: Only the last generated token [1, 1] + - position_ids: [current_position] (position in full sequence) + - past_kv_lengths: [num_cached_tokens] + - total_kv_lengths: [total_tokens] + """ + import infinicore + + """Build model input from static scheduler output.""" + req = scheduler_output.scheduled_requests[0] + + if scheduler_output.is_prefill: + # Prefill: only send tokens not already in cache + tokens = req.get_input_tokens() + prefix_hit_len = scheduler_output.prefix_hit_len + input_tokens = tokens[prefix_hit_len:] + input_ids = [input_tokens] + position_ids = [list(range(prefix_hit_len, len(tokens)))] + past_kv_len = prefix_hit_len + total_kv_len = len(tokens) + input_offsets = [0, len(input_tokens)] + else: + # Decode: send only the last generated token + last_token = req.generated_token_ids[-1] + current_position = req.get_total_length() - 1 + input_ids = [[last_token]] + position_ids = [[current_position]] + past_kv_len = current_position + total_kv_len = req.get_total_length() + input_offsets = [0, 1] + + return { + "input_ids": infinicore.from_list(input_ids, dtype=infinicore.int64), + "position_ids": infinicore.from_list(position_ids, dtype=infinicore.int64), + "past_kv_lengths": infinicore.from_list( + [past_kv_len], dtype=infinicore.int32 + ), + "total_kv_lengths": infinicore.from_list( + [total_kv_len], dtype=infinicore.int32 + ), + "input_offsets": infinicore.from_list( + input_offsets, dtype=infinicore.int32 + ), + "cu_seqlens": infinicore.from_list( + [0, total_kv_len], dtype=infinicore.int32 + ), + "block_tables": None, + "slot_mapping": None, + "temperature": temperature, + "top_k": top_k, + "top_p": top_p, + } + + def _build_model_input_from_batch_scheduler_output( + self, scheduler_output: SchedulerOutput, temperature, top_p, top_k + ) -> dict: + """Construct model inputs for prefill or decode phase. + + Prefill phase: + - input_ids: Flattened token list (excluding cached tokens) + - position_ids: Position IDs for new tokens in complete sequence + - past_kv_lengths: Number of cached tokens per request + - total_kv_lengths: Total tokens (cached + new) per request + - input_offsets: Start position of each request in flattened array + - block_tables: Padded block_table for each request + - slot_mapping: Token to slot mappings + + Decode phase: + - input_ids: Only last generated token per request + - position_ids: Position of last token in complete sequence + - past_kv_lengths: Number of cached tokens per request + - total_kv_lengths: Total sequence length per request + - input_offsets: Offsets for each request + - block_tables: Padded block_table for each request + - slot_mapping: Single slot per request + """ + import infinicore + + if not scheduler_output.scheduled_requests: + raise RuntimeError( + "build_model_inputs called with empty scheduled_requests" + ) + + tokens = [] + seq_lens = [] + seq_offsets = [0] + block_tables = [] + slot_mapping = [] + cached_lens = [] + position_ids = [] + cu_seqlens = [0] + + max_block_table_len = max( + len(req.block_table) for req in scheduler_output.scheduled_requests + ) + current_offset = 0 + + for req in scheduler_output.scheduled_requests: + num_cached = req.num_cached_tokens + if scheduler_output.is_prefill: + # Prefill phase + req_tokens = req.get_input_tokens() + tokens_to_compute = req_tokens[num_cached:] + tokens.extend(tokens_to_compute) + + compute_len = len(tokens_to_compute) + seq_len = len(req_tokens) + seq_lens.append(seq_len) + + current_offset += compute_len + seq_offsets.append(current_offset) + + slot_mapping.extend(req.slot_mapping) + cached_lens.append(num_cached) + position_ids.extend(range(num_cached, num_cached + compute_len)) + + else: + # Decode phase + seq_len = req.get_total_length() + last_token = req.generated_token_ids[-1] + tokens.append(last_token) + seq_lens.append(seq_len) + + current_offset += 1 + seq_offsets.append(current_offset) + + slot_mapping.extend(req.slot_mapping) + cached_lens.append(num_cached) + position_ids.append(seq_len - 1) + + # Pad block_table to same length + padded_block_table = req.block_table + [-1] * ( + max_block_table_len - len(req.block_table) + ) + block_tables.append(padded_block_table) + cu_seqlens.append(cu_seqlens[-1] + seq_len) + + return { + "input_ids": infinicore.from_list([tokens], dtype=infinicore.int64), + "position_ids": infinicore.from_list(position_ids, dtype=infinicore.int64), + "past_kv_lengths": infinicore.from_list( + cached_lens, dtype=infinicore.int32 + ), + "total_kv_lengths": infinicore.from_list(seq_lens, dtype=infinicore.int32), + "input_offsets": infinicore.from_list(seq_offsets, dtype=infinicore.int32), + "cu_seqlens": infinicore.from_list(cu_seqlens, dtype=infinicore.int32), + "block_tables": infinicore.from_list(block_tables, dtype=infinicore.int32), + "slot_mapping": infinicore.from_list(slot_mapping, dtype=infinicore.int64), + "temperature": temperature, + "top_k": top_k, + "top_p": top_p, + } + + def get_tokenizer(self): + return self.tokenizer diff --git a/python/infinilm/processors/llama_processor.py b/python/infinilm/processors/llama_processor.py new file mode 100644 index 00000000..84875200 --- /dev/null +++ b/python/infinilm/processors/llama_processor.py @@ -0,0 +1,28 @@ +from .basic_llm_processor import BasicLLMProcessor +from tokenizers import decoders as _dec + + +class LlamaProcessor(BasicLLMProcessor): + def __init__(self, model_dir_path: str): + super().__init__(model_dir_path) + self._fix_tokenizer_decoder(self.tokenizer) + + @staticmethod + def _fix_tokenizer_decoder(tokenizer): + """Fix tokenizer decoder for llama models.""" + backend = getattr(tokenizer, "backend_tokenizer", None) + target = getattr(backend, "_tokenizer", backend) + norm = getattr(target, "normalizer", None) + dec = getattr(target, "decoder", None) + sn = repr(norm)[:800] if norm is not None else "" + sd = repr(dec)[:800] if dec is not None else "" + has_prepend = "Prepend" in sn + has_strip = "Strip" in sd + if has_prepend and has_strip: + target.decoder = _dec.Sequence( + [ + _dec.Replace("▁", " "), + _dec.ByteFallback(), + _dec.Fuse(), + ] + ) diff --git a/python/infinilm/processors/processor.py b/python/infinilm/processors/processor.py new file mode 100644 index 00000000..fd8353fe --- /dev/null +++ b/python/infinilm/processors/processor.py @@ -0,0 +1,34 @@ +class InfinilmProcessor: + def __init__(self, model_dir_path: str): + """Initialize the processor with the model directory path.""" + raise NotImplementedError("ModelInputProcessor is not implemented yet") + + def __call__( + self, + prompt, + images=None, + videos=None, + audios=None, + return_tensors: str = None, + **kwargs, + ) -> dict: + """Process the input prompt and media into final inputs.""" + raise NotImplementedError("InfinilmProcessor is not implemented yet") + + def apply_chat_template( + self, + messages, + add_generation_prompt: bool = False, + tokenize: bool = True, + **kwargs, + ): + """Apply chant template given input messages""" + raise NotImplementedError("InfinilmProcessor is not implemented yet") + + def build_model_inputs(self, scheduler_output, **kwargs) -> dict: + """Build batched infinilm model inputs from the scheduler output.""" + raise NotImplementedError("InfinilmProcessor is not implemented yet") + + def get_tokenizer(self): + """Return the text tokenizer associated with this processor.""" + raise NotImplementedError("InfinilmProcessor is not implemented yet") diff --git a/test/bench/test_benchmark.py b/test/bench/test_benchmark.py index a3adf487..c15c950f 100644 --- a/test/bench/test_benchmark.py +++ b/test/bench/test_benchmark.py @@ -9,6 +9,7 @@ from datasets import load_dataset, Dataset from abc import ABC, abstractmethod from infinilm.base_config import BaseConfig +from infinilm.processors import AutoInfinilmProcessor TOTAL_TOKENS = 0 TOTAL_TIME = 0.0 @@ -90,29 +91,8 @@ def __init__( with open(os.path.join(model_dir_path, "config.json"), "r") as f: self.config_dict = json.load(f) - # Align tokenizer initialization with jiuge backend (010) - # Match the exact same initialization logic based on model type - model_type = self.config_dict.get("model_type", "") - if model_type == "llama": - # For llama models: no trust_remote_code (matches jiuge line 465) - self.tokenizer = transformers.AutoTokenizer.from_pretrained( - model_dir_path, trust_remote_code=True - ) - elif model_type in ["fm9g", "minicpm", "fm9g7b"]: - # For fm9g/minicpm/fm9g7b models: use trust_remote_code=True (matches jiuge lines 493-495, 518-520) - self.tokenizer = transformers.AutoTokenizer.from_pretrained( - model_dir_path, trust_remote_code=True - ) - elif model_type in ["qwen2", "qwen3"]: - # For qwen2/qwen3 models: no trust_remote_code (matches jiuge line 534-536) - self.tokenizer = transformers.AutoTokenizer.from_pretrained( - model_dir_path, trust_remote_code=True - ) - else: - # Default: use trust_remote_code=True for other models - self.tokenizer = transformers.AutoTokenizer.from_pretrained( - model_dir_path, trust_remote_code=True - ) + self.processor = AutoInfinilmProcessor.from_pretrained(model_dir_path) + self.tokenizer = self.processor.get_tokenizer() eos_token_id = self.config_dict.get("eos_token_id") self.eos_token_id = ( @@ -159,9 +139,9 @@ def max_context_len(self): def render_input_content(self, *args, **kwargs): """Render input content based on benchmark type""" if self.benchmark == "ceval": - return render_ceval(self.tokenizer, *args, **kwargs) + return render_ceval(self.processor, *args, **kwargs) elif self.benchmark == "mmlu": - return render_mmlu(self.tokenizer, *args, **kwargs) + return render_mmlu(self.processor, *args, **kwargs) else: raise ValueError(f"Unknown benchmark: {self.benchmark}")