diff --git a/nexa/gguf/nexa_inference_voice.py b/nexa/gguf/nexa_inference_voice.py index b3659776..9be8f257 100644 --- a/nexa/gguf/nexa_inference_voice.py +++ b/nexa/gguf/nexa_inference_voice.py @@ -12,6 +12,7 @@ from nexa.general import pull_model from nexa.utils import nexa_prompt, SpinningCursorAnimation from nexa.gguf.llama._utils_transformers import suppress_stdout_stderr +import numpy as np logging.basicConfig(level=logging.INFO) @@ -58,6 +59,12 @@ def __init__(self, model_path=None, local_path=None, **kwargs): exit(1) self.params.update(kwargs) + + # for StreamASRProcessor + self.audio_buffer = np.array([], dtype=np.float32) + self.commited = [] + self.buffer_time_offset = 0.0 + self.model = None if not kwargs.get("streamlit", False): @@ -80,6 +87,50 @@ def _load_model(self): ) logging.debug("Model loaded successfully") + # for StreamASRProcessor + def live_transcribe(self, audio, prompt=""): + segments, info = self.model.transcribe(audio, language=self.params["language"], beam_size=self.params["beam_size"], word_timestamps=True, condition_on_previous_text=True, initial_prompt=prompt) + return list(segments) + + def ts_words(self, segments): + # return a list of (start, end, "word") for each word + words = [] + for seg in segments: + if seg.no_speech_prob > 0.9: + continue + for w in seg.words: + words.append((w.start, w.end, w.word)) + return words + + def insert_audio_chunk(self, audio): + self.audio_buffer = np.append(self.audio_buffer, audio) + + def process_iter(self): + # Transcribe the current buffer + if len(self.audio_buffer) == 0: + return (None, None, "") + res = self.live_transcribe(self.audio_buffer) + tsw = self.ts_words(res) + if len(tsw) == 0: + return (None, None, "") + + # We'll consider all words as committed for simplicity + self.commited = tsw + # return the entire transcription so far + text = " ".join([w[2] for w in self.commited]) + beg = self.commited[0][0] + self.buffer_time_offset + end = self.commited[-1][1] + self.buffer_time_offset + return (beg, end, text) + + def finish(self): + # Final flush when done + if len(self.commited) == 0: + return (None, None, "") + text = " ".join([w[2] for w in self.commited]) + beg = self.commited[0][0] + self.buffer_time_offset + end = self.commited[-1][1] + self.buffer_time_offset + return (beg, end, text) + def run(self): from nexa.gguf.llama._utils_spinner import start_spinner, stop_spinner @@ -180,6 +231,54 @@ def transcribe(self, audio, **kwargs): audio, **kwargs, ) + + def stream_transcription(self, audio_path, chunk_duration=1.0): + """ + Simulate streaming by processing the audio in small increments of time. + Yields partial transcripts as they become available. + """ + import librosa + SAMPLING_RATE = 16000 + audio, sr = librosa.load(audio_path, sr=SAMPLING_RATE, dtype=np.float32) + duration = len(audio) / SAMPLING_RATE + + start = time.time() + beg = 0.0 + while beg < duration: + now = time.time() - start + # Simulate waiting for real-time + if now < beg + chunk_duration: + time.sleep((beg + chunk_duration) - now) + + end = time.time() - start + if end > duration: + end = duration + + chunk_samples = int((end - beg)*SAMPLING_RATE) + chunk_audio = audio[int(beg*SAMPLING_RATE):int(beg*SAMPLING_RATE)+chunk_samples] + beg = end + + # Process incrementally + self.insert_audio_chunk(chunk_audio) + o = self.process_iter() + if o[0] is not None: + yield { + "emission_time_ms": (time.time()-start)*1000, + "segment_start_ms": o[0]*1000, + "segment_end_ms": o[1]*1000, + "text": o[2] + } + + # Final flush + o = self.finish() + if o[0] is not None: + yield { + "emission_time_ms": (time.time()-start)*1000, + "segment_start_ms": o[0]*1000, + "segment_end_ms": o[1]*1000, + "text": o[2], + "final": True + } def _transcribe_audio(self, audio_path): logging.debug(f"Transcribing audio from: {audio_path}") diff --git a/nexa/gguf/server/nexa_service.py b/nexa/gguf/server/nexa_service.py index 0b38fb79..9a79a6dc 100644 --- a/nexa/gguf/server/nexa_service.py +++ b/nexa/gguf/server/nexa_service.py @@ -40,7 +40,11 @@ from nexa.gguf.llama.llama import Llama from nexa.gguf.sd.stable_diffusion import StableDiffusion from faster_whisper import WhisperModel +import numpy as np import argparse +import soundfile as sf +import librosa +import io logging.basicConfig(level=logging.INFO) @@ -84,6 +88,7 @@ is_huggingface = False is_modelscope = False projector_path = None +SAMPLING_RATE = 16000 # Request Classes class GenerationRequest(BaseModel): prompt: str = "Tell me a story" @@ -214,6 +219,61 @@ class DownloadModelRequest(BaseModel): "protected_namespaces": () } +class StreamASRProcessor: + def __init__(self, asr, task, language): + self.asr = asr + self.task = task + self.language = None if language == "auto" else language + self.audio_buffer = np.array([], dtype=np.float32) + self.commited = [] + self.buffer_time_offset = 0.0 + + def insert_audio_chunk(self, audio): + self.audio_buffer = np.append(self.audio_buffer, audio) + + def process_iter(self): + if len(self.audio_buffer) == 0: + return (None, None, "") + res = self.transcribe(self.audio_buffer) + tsw = self.ts_words(res) + if len(tsw) == 0: + return (None, None, "") + + self.commited = tsw + text = " ".join([w[2] for w in self.commited]) + beg = self.commited[0][0] + self.buffer_time_offset + end = self.commited[-1][1] + self.buffer_time_offset + return (beg, end, text) + + def finish(self): + if len(self.commited) == 0: + return (None, None, "") + text = " ".join([w[2] for w in self.commited]) + beg = self.commited[0][0] + self.buffer_time_offset + end = self.commited[-1][1] + self.buffer_time_offset + return (beg, end, text) + + def transcribe(self, audio, prompt=""): + segments, info = self.asr.transcribe( + audio, + language=self.language, + task=self.task, + beam_size=5, + word_timestamps=True, + condition_on_previous_text=True, + initial_prompt=prompt + ) + return list(segments) + + def ts_words(self, segments): + words = [] + for seg in segments: + if seg.no_speech_prob > 0.9: + continue + for w in seg.words: + words.append((w.start, w.end, w.word)) + return words + # helper functions async def load_model(): global model, chat_format, completion_template, model_path, n_ctx, is_local_path, model_type, is_huggingface, is_modelscope, projector_path @@ -356,7 +416,7 @@ async def load_model(): model = WhisperModel( downloaded_path, device="cpu", # only support cpu for now because cuDNN needs to be installed on user's machine - compute_type="default" + compute_type="float16" ) logging.info(f"model loaded as {model}") else: @@ -545,6 +605,12 @@ def image_path_to_base64(file_path): return f"data:image/png;base64,{base64_data}" return None +def load_audio_from_bytes(audio_bytes: bytes): + buffer = io.BytesIO(audio_bytes) + a, sr = sf.read(buffer, dtype='float32') + if sr != SAMPLING_RATE: + a = librosa.resample(a, orig_sr=sr, target_sr=SAMPLING_RATE) + return a def run_nexa_ai_service(model_path_arg=None, is_local_path_arg=False, model_type_arg=None, huggingface=False, modelscope=False, projector_local_path_arg=None, **kwargs): global model_path, n_ctx, is_local_path, model_type, is_huggingface, is_modelscope, projector_path @@ -884,38 +950,119 @@ async def img2img(request: ImageGenerationRequest): logging.error(f"Error in img2img generation: {e}") raise HTTPException(status_code=500, detail=str(e)) -@app.post("/v1/audio/transcriptions", tags=["Audio"]) -async def transcribe_audio( +@app.post("/v1/audio/processing", tags=["Audio"]) +async def process_audio( file: UploadFile = File(...), - beam_size: Optional[int] = Query(5, description="Beam size for transcription"), - language: Optional[str] = Query(None, description="Language code (e.g., 'en', 'fr')"), - temperature: Optional[float] = Query(0.0, description="Temperature for sampling"), + task: str = Query("transcribe", + description="Task to perform on the audio. Options are: 'transcribe' or 'translate'.", + regex="^(transcribe|translate)$" + ), + beam_size: Optional[int] = Query(5, description="Beam size for decoding."), + language: Optional[str] = Query(None, description="Language code (e.g. 'en', 'fr') for transcription."), + temperature: Optional[float] = Query(0.0, description="Temperature for sampling.") ): - try: if model_type != "Audio": raise HTTPException( status_code=400, - detail="The model that is loaded is not an Audio model. Please use an Audio model for audio transcription." + detail="The model that is loaded is not an Audio model. Please use an Audio model." ) + with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[1]) as temp_audio: temp_audio.write(await file.read()) temp_audio_path = temp_audio.name - transcribe_params = { + # Set up parameters for Whisper or similar model + task_params = { "beam_size": beam_size, - "language": language, - "task": "transcribe", "temperature": temperature, - "vad_filter": True + "vad_filter": True, + "task": task } - segments, _ = model.transcribe(temp_audio_path, **transcribe_params) - transcription = "".join(segment.text for segment in segments) - return JSONResponse(content={"text": transcription}) + + # Only include language parameter if task is "transcribe" + # For "translate", the language is always defined as "en" + if task == "transcribe" and language: + task_params["language"] = language + + segments, _ = model.transcribe(temp_audio_path, **task_params) + result_text = "".join(segment.text for segment in segments) + return JSONResponse(content={"text": result_text}) + except Exception as e: - raise HTTPException(status_code=500, detail=f"Error during transcription: {str(e)}") + raise HTTPException(status_code=500, detail=f"Error during {task}: {str(e)}") finally: - os.unlink(temp_audio_path) + if 'temp_audio_path' in locals() and os.path.exists(temp_audio_path): + os.unlink(temp_audio_path) + +@app.post("/v1/audio/transcriptions_stream", tags=["Audio"]) +async def transcribe_stream_audio( + file: UploadFile = File(...), + task: str = Query("transcribe", + description="Task to perform on the audio. Options are: 'transcribe' or 'translate'.", + regex="^(transcribe|translate)$" + ), + language: Optional[str] = Query("auto", description="Language code (e.g., 'en', 'fr')"), + min_chunk: Optional[float] = Query(1.0, description="Minimum chunk duration for streaming"), +): + # Read the entire file into memory + audio_bytes = await file.read() + a_full = load_audio_from_bytes(audio_bytes) + duration = len(a_full) / SAMPLING_RATE + + # Only include language parameter if task is "transcribe" + # For "translate", the language is always defined as "en" + if task == "transcribe" and language != "auto": + used_language = language + else: + used_language = None + + warmup_audio = a_full[:SAMPLING_RATE] # first second + model.transcribe(warmup_audio) + + streamer = StreamASRProcessor(model, task, used_language) + + start = time.time() + beg = 0.0 + + def stream_generator(): + nonlocal beg + while beg < duration: + now = time.time() - start + if now < beg + min_chunk: + time.sleep((beg + min_chunk) - now) + end = time.time() - start + if end > duration: + end = duration + + chunk_samples = int((end - beg)*SAMPLING_RATE) + chunk_audio = a_full[int(beg*SAMPLING_RATE):int(beg*SAMPLING_RATE)+chunk_samples] + beg = end + + streamer.insert_audio_chunk(chunk_audio) + o = streamer.process_iter() + if o[0] is not None: + data = { + "emission_time_ms": (time.time()-start)*1000, + "segment_start_ms": o[0]*1000, + "segment_end_ms": o[1]*1000, + "text": o[2] + } + yield f"data: {json.dumps(data)}\n\n".encode("utf-8") + + # Final flush + o = streamer.finish() + if o[0] is not None: + data = { + "emission_time_ms": (time.time()-start)*1000, + "segment_start_ms": o[0]*1000, + "segment_end_ms": o[1]*1000, + "text": o[2], + "final": True + } + yield f"data: {json.dumps(data)}\n\n".encode("utf-8") + + return StreamingResponse(stream_generator(), media_type="application/x-ndjson") @app.post("/v1/audio/translations", tags=["Audio"]) async def translate_audio(