Source code for spokestack.asr.spokestack.cloud_client

"""
This module contains the websocket logic used to communicate with
Spokestack's cloud-based ASR service.
"""
import base64
import hashlib
import hmac
import json
from typing import Any, Dict, List, Union

import numpy as np
from websocket import WebSocket


[docs]class CloudClient: """Spokestack client for cloud based speech to text Args: key_id (str): identity from spokestack api credentials key_secret (str): secret key from spokestack api credentials socket_url (str): url for socket connection audio_format (str): format of input audio sample_rate (int): audio sample rate (kHz) language (str): language for recognition limit (int): Limit of messages per api response idle_timeout (Any): Time before client timeout. Defaults to None """ def __init__( self, key_id: str, key_secret: str, socket_url: str = "wss://api.spokestack.io", audio_format: str = "PCM16LE", sample_rate: int = 16000, language: str = "en", limit: int = 10, idle_timeout: Union[float, None] = None, ) -> None: self._body: str = json.dumps( { "format": audio_format, "rate": sample_rate, "language": language, "limit": limit, } ) self._socket_url: str = socket_url self._key_id: str = key_id self._key: bytes = key_secret.encode("utf-8") signature = hmac.new( self._key, self._body.encode("utf-8"), hashlib.sha256 ).digest() self._signature = base64.b64encode(signature).decode("utf-8") self._socket: Any = None self._response: Dict[str, Any] = { "error": None, "final": True, "hypotheses": [], "status": None, } self._sample_rate: int = sample_rate self._idle_timeout = idle_timeout self._idle_count: int = 0 def __call__(self, audio: Union[bytes, np.ndarray], limit: int = 1) -> List[str]: """Audio to text interface for the cloud client Args: audio (bytes|np.ndarray): input audio can be in the form of bytes or np.float, np.int16 array with conversions handled. other types with produce a TypeError limit (int): number of predictions to return Returns: list of transcripts, and their confidence values of size limit """ if isinstance(audio, bytes): audio = np.frombuffer(audio, np.int16) elif np.issubdtype(audio.dtype, np.floating): # convert and rescale to PCM-16 audio = (audio * (2 ** 15 - 1)).astype(np.int16) elif not np.issubdtype(audio.dtype, np.int16): raise TypeError("invalid_audio") chunk_size = self._sample_rate self.connect() self.initialize() for i in range(0, len(audio), chunk_size): frame = audio[i:][:chunk_size] self.send(frame) self.receive() self.end() while not self._response["final"]: self.receive() self.disconnect() hypotheses = self._response.get("hypotheses", []) return hypotheses[:limit] @property def is_connected(self) -> bool: """ status of the socket connection """ if self._socket: return True return False
[docs] def connect(self) -> None: """ connects to websocket """ if self._socket is None: self._socket = WebSocket() self._socket.connect(f"{self._socket_url}/v1/asr/websocket")
[docs] def initialize(self) -> None: """ sends/receives the initial api request """ if not self._socket: raise ConnectionError("Not Connected") message = { "keyId": self._key_id, "signature": self._signature, "body": self._body, } self._socket.send(json.dumps(message)) self._response = json.loads(self._socket.recv()) if not self._response["status"] == "ok": raise APIError(self._response)
[docs] def disconnect(self) -> None: """ disconnects client socket connection """ if self._socket: self._socket.close() self._socket = None
[docs] def send(self, frame: np.ndarray) -> None: """sends a single frame of audio Args: frame (np.ndarray): segment of PCM-16 encoded audio """ if self._socket: self._socket.send_binary(frame.tobytes()) else: raise ConnectionError("Not Connected")
[docs] def end(self) -> None: """ sends empty string in binary to indicate last frame """ if self._socket: self._socket.send_binary(b"") else: raise ConnectionError("Not Connected")
[docs] def receive(self) -> None: """ receives the api response """ if self._socket: timeout = self._socket.timeout try: self._socket.timeout = 0 response = self._socket.recv() self._response = json.loads(response) except Exception: pass self._socket.timeout = timeout else: raise ConnectionError("Not Connected")
@property def response(self) -> dict: """ current response message""" return self._response @property def is_final(self) -> bool: """ status of most recent sever response """ return self._response["final"] @property def idle_timeout(self) -> Any: """ property for maximum idle time """ return self._idle_timeout @property def idle_count(self) -> int: """ current counter of idle time """ return self._idle_count @idle_count.setter def idle_count(self, value: int) -> None: """ sets the idle counter""" self._idle_count = value
[docs]class APIError(Exception): """Spokestack api error pass through Args: response (dict): message from the api service """ def __init__(self, response: dict) -> None: super().__init__(response["error"])