Skip to content

Commit

Permalink
Merge pull request #319 from NexaAI/david-localui
Browse files Browse the repository at this point in the history
asr server endpoint update
  • Loading branch information
Davidqian123 authored Dec 17, 2024
2 parents fde94ff + 9458136 commit cdacf08
Show file tree
Hide file tree
Showing 2 changed files with 263 additions and 17 deletions.
99 changes: 99 additions & 0 deletions nexa/gguf/nexa_inference_voice.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from nexa.general import pull_model
from nexa.utils import nexa_prompt, SpinningCursorAnimation
from nexa.gguf.llama._utils_transformers import suppress_stdout_stderr
import numpy as np


logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -58,6 +59,12 @@ def __init__(self, model_path=None, local_path=None, **kwargs):
exit(1)

self.params.update(kwargs)

# for StreamASRProcessor
self.audio_buffer = np.array([], dtype=np.float32)
self.commited = []
self.buffer_time_offset = 0.0

self.model = None

if not kwargs.get("streamlit", False):
Expand All @@ -80,6 +87,50 @@ def _load_model(self):
)
logging.debug("Model loaded successfully")

# for StreamASRProcessor
def live_transcribe(self, audio, prompt=""):
segments, info = self.model.transcribe(audio, language=self.params["language"], beam_size=self.params["beam_size"], word_timestamps=True, condition_on_previous_text=True, initial_prompt=prompt)
return list(segments)

def ts_words(self, segments):
# return a list of (start, end, "word") for each word
words = []
for seg in segments:
if seg.no_speech_prob > 0.9:
continue
for w in seg.words:
words.append((w.start, w.end, w.word))
return words

def insert_audio_chunk(self, audio):
self.audio_buffer = np.append(self.audio_buffer, audio)

def process_iter(self):
# Transcribe the current buffer
if len(self.audio_buffer) == 0:
return (None, None, "")
res = self.live_transcribe(self.audio_buffer)
tsw = self.ts_words(res)
if len(tsw) == 0:
return (None, None, "")

# We'll consider all words as committed for simplicity
self.commited = tsw
# return the entire transcription so far
text = " ".join([w[2] for w in self.commited])
beg = self.commited[0][0] + self.buffer_time_offset
end = self.commited[-1][1] + self.buffer_time_offset
return (beg, end, text)

def finish(self):
# Final flush when done
if len(self.commited) == 0:
return (None, None, "")
text = " ".join([w[2] for w in self.commited])
beg = self.commited[0][0] + self.buffer_time_offset
end = self.commited[-1][1] + self.buffer_time_offset
return (beg, end, text)

def run(self):
from nexa.gguf.llama._utils_spinner import start_spinner, stop_spinner

Expand Down Expand Up @@ -180,6 +231,54 @@ def transcribe(self, audio, **kwargs):
audio,
**kwargs,
)

def stream_transcription(self, audio_path, chunk_duration=1.0):
"""
Simulate streaming by processing the audio in small increments of time.
Yields partial transcripts as they become available.
"""
import librosa
SAMPLING_RATE = 16000
audio, sr = librosa.load(audio_path, sr=SAMPLING_RATE, dtype=np.float32)
duration = len(audio) / SAMPLING_RATE

start = time.time()
beg = 0.0
while beg < duration:
now = time.time() - start
# Simulate waiting for real-time
if now < beg + chunk_duration:
time.sleep((beg + chunk_duration) - now)

end = time.time() - start
if end > duration:
end = duration

chunk_samples = int((end - beg)*SAMPLING_RATE)
chunk_audio = audio[int(beg*SAMPLING_RATE):int(beg*SAMPLING_RATE)+chunk_samples]
beg = end

# Process incrementally
self.insert_audio_chunk(chunk_audio)
o = self.process_iter()
if o[0] is not None:
yield {
"emission_time_ms": (time.time()-start)*1000,
"segment_start_ms": o[0]*1000,
"segment_end_ms": o[1]*1000,
"text": o[2]
}

# Final flush
o = self.finish()
if o[0] is not None:
yield {
"emission_time_ms": (time.time()-start)*1000,
"segment_start_ms": o[0]*1000,
"segment_end_ms": o[1]*1000,
"text": o[2],
"final": True
}

def _transcribe_audio(self, audio_path):
logging.debug(f"Transcribing audio from: {audio_path}")
Expand Down
181 changes: 164 additions & 17 deletions nexa/gguf/server/nexa_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@
from nexa.gguf.llama.llama import Llama
from nexa.gguf.sd.stable_diffusion import StableDiffusion
from faster_whisper import WhisperModel
import numpy as np
import argparse
import soundfile as sf
import librosa
import io

logging.basicConfig(level=logging.INFO)

Expand Down Expand Up @@ -84,6 +88,7 @@
is_huggingface = False
is_modelscope = False
projector_path = None
SAMPLING_RATE = 16000
# Request Classes
class GenerationRequest(BaseModel):
prompt: str = "Tell me a story"
Expand Down Expand Up @@ -214,6 +219,61 @@ class DownloadModelRequest(BaseModel):
"protected_namespaces": ()
}

class StreamASRProcessor:
def __init__(self, asr, task, language):
self.asr = asr
self.task = task
self.language = None if language == "auto" else language
self.audio_buffer = np.array([], dtype=np.float32)
self.commited = []
self.buffer_time_offset = 0.0

def insert_audio_chunk(self, audio):
self.audio_buffer = np.append(self.audio_buffer, audio)

def process_iter(self):
if len(self.audio_buffer) == 0:
return (None, None, "")
res = self.transcribe(self.audio_buffer)
tsw = self.ts_words(res)
if len(tsw) == 0:
return (None, None, "")

self.commited = tsw
text = " ".join([w[2] for w in self.commited])
beg = self.commited[0][0] + self.buffer_time_offset
end = self.commited[-1][1] + self.buffer_time_offset
return (beg, end, text)

def finish(self):
if len(self.commited) == 0:
return (None, None, "")
text = " ".join([w[2] for w in self.commited])
beg = self.commited[0][0] + self.buffer_time_offset
end = self.commited[-1][1] + self.buffer_time_offset
return (beg, end, text)

def transcribe(self, audio, prompt=""):
segments, info = self.asr.transcribe(
audio,
language=self.language,
task=self.task,
beam_size=5,
word_timestamps=True,
condition_on_previous_text=True,
initial_prompt=prompt
)
return list(segments)

def ts_words(self, segments):
words = []
for seg in segments:
if seg.no_speech_prob > 0.9:
continue
for w in seg.words:
words.append((w.start, w.end, w.word))
return words

# helper functions
async def load_model():
global model, chat_format, completion_template, model_path, n_ctx, is_local_path, model_type, is_huggingface, is_modelscope, projector_path
Expand Down Expand Up @@ -356,7 +416,7 @@ async def load_model():
model = WhisperModel(
downloaded_path,
device="cpu", # only support cpu for now because cuDNN needs to be installed on user's machine
compute_type="default"
compute_type="float16"
)
logging.info(f"model loaded as {model}")
else:
Expand Down Expand Up @@ -545,6 +605,12 @@ def image_path_to_base64(file_path):
return f"data:image/png;base64,{base64_data}"
return None

def load_audio_from_bytes(audio_bytes: bytes):
buffer = io.BytesIO(audio_bytes)
a, sr = sf.read(buffer, dtype='float32')
if sr != SAMPLING_RATE:
a = librosa.resample(a, orig_sr=sr, target_sr=SAMPLING_RATE)
return a

def run_nexa_ai_service(model_path_arg=None, is_local_path_arg=False, model_type_arg=None, huggingface=False, modelscope=False, projector_local_path_arg=None, **kwargs):
global model_path, n_ctx, is_local_path, model_type, is_huggingface, is_modelscope, projector_path
Expand Down Expand Up @@ -884,38 +950,119 @@ async def img2img(request: ImageGenerationRequest):
logging.error(f"Error in img2img generation: {e}")
raise HTTPException(status_code=500, detail=str(e))

@app.post("/v1/audio/transcriptions", tags=["Audio"])
async def transcribe_audio(
@app.post("/v1/audio/processing", tags=["Audio"])
async def process_audio(
file: UploadFile = File(...),
beam_size: Optional[int] = Query(5, description="Beam size for transcription"),
language: Optional[str] = Query(None, description="Language code (e.g., 'en', 'fr')"),
temperature: Optional[float] = Query(0.0, description="Temperature for sampling"),
task: str = Query("transcribe",
description="Task to perform on the audio. Options are: 'transcribe' or 'translate'.",
regex="^(transcribe|translate)$"
),
beam_size: Optional[int] = Query(5, description="Beam size for decoding."),
language: Optional[str] = Query(None, description="Language code (e.g. 'en', 'fr') for transcription."),
temperature: Optional[float] = Query(0.0, description="Temperature for sampling.")
):

try:
if model_type != "Audio":
raise HTTPException(
status_code=400,
detail="The model that is loaded is not an Audio model. Please use an Audio model for audio transcription."
detail="The model that is loaded is not an Audio model. Please use an Audio model."
)

with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[1]) as temp_audio:
temp_audio.write(await file.read())
temp_audio_path = temp_audio.name

transcribe_params = {
# Set up parameters for Whisper or similar model
task_params = {
"beam_size": beam_size,
"language": language,
"task": "transcribe",
"temperature": temperature,
"vad_filter": True
"vad_filter": True,
"task": task
}
segments, _ = model.transcribe(temp_audio_path, **transcribe_params)
transcription = "".join(segment.text for segment in segments)
return JSONResponse(content={"text": transcription})

# Only include language parameter if task is "transcribe"
# For "translate", the language is always defined as "en"
if task == "transcribe" and language:
task_params["language"] = language

segments, _ = model.transcribe(temp_audio_path, **task_params)
result_text = "".join(segment.text for segment in segments)
return JSONResponse(content={"text": result_text})

except Exception as e:
raise HTTPException(status_code=500, detail=f"Error during transcription: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error during {task}: {str(e)}")
finally:
os.unlink(temp_audio_path)
if 'temp_audio_path' in locals() and os.path.exists(temp_audio_path):
os.unlink(temp_audio_path)

@app.post("/v1/audio/transcriptions_stream", tags=["Audio"])
async def transcribe_stream_audio(
file: UploadFile = File(...),
task: str = Query("transcribe",
description="Task to perform on the audio. Options are: 'transcribe' or 'translate'.",
regex="^(transcribe|translate)$"
),
language: Optional[str] = Query("auto", description="Language code (e.g., 'en', 'fr')"),
min_chunk: Optional[float] = Query(1.0, description="Minimum chunk duration for streaming"),
):
# Read the entire file into memory
audio_bytes = await file.read()
a_full = load_audio_from_bytes(audio_bytes)
duration = len(a_full) / SAMPLING_RATE

# Only include language parameter if task is "transcribe"
# For "translate", the language is always defined as "en"
if task == "transcribe" and language != "auto":
used_language = language
else:
used_language = None

warmup_audio = a_full[:SAMPLING_RATE] # first second
model.transcribe(warmup_audio)

streamer = StreamASRProcessor(model, task, used_language)

start = time.time()
beg = 0.0

def stream_generator():
nonlocal beg
while beg < duration:
now = time.time() - start
if now < beg + min_chunk:
time.sleep((beg + min_chunk) - now)
end = time.time() - start
if end > duration:
end = duration

chunk_samples = int((end - beg)*SAMPLING_RATE)
chunk_audio = a_full[int(beg*SAMPLING_RATE):int(beg*SAMPLING_RATE)+chunk_samples]
beg = end

streamer.insert_audio_chunk(chunk_audio)
o = streamer.process_iter()
if o[0] is not None:
data = {
"emission_time_ms": (time.time()-start)*1000,
"segment_start_ms": o[0]*1000,
"segment_end_ms": o[1]*1000,
"text": o[2]
}
yield f"data: {json.dumps(data)}\n\n".encode("utf-8")

# Final flush
o = streamer.finish()
if o[0] is not None:
data = {
"emission_time_ms": (time.time()-start)*1000,
"segment_start_ms": o[0]*1000,
"segment_end_ms": o[1]*1000,
"text": o[2],
"final": True
}
yield f"data: {json.dumps(data)}\n\n".encode("utf-8")

return StreamingResponse(stream_generator(), media_type="application/x-ndjson")

@app.post("/v1/audio/translations", tags=["Audio"])
async def translate_audio(
Expand Down

0 comments on commit cdacf08

Please sign in to comment.