Skip to content

Commit

Permalink
Merge pull request #31 from slackr31337/dev
Browse files Browse the repository at this point in the history
Update for stop()
  • Loading branch information
slackr31337 authored Aug 13, 2023
2 parents 76bacaa + 8f09683 commit 52ac3cb
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 35 deletions.
2 changes: 2 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
"pvporcupine",
"pvrecorder",
"ratecv",
"signame",
"signum",
"SIGTERM",
"simpleaudio",
"tomono",
Expand Down
104 changes: 69 additions & 35 deletions voice_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
_LOGGER = logging.getLogger(__name__)

RESULT = "result"
ERROR = "error"
MESSAGE = "message"
EVENT = "event"
NAME = "name"
DATA = "data"
Expand Down Expand Up @@ -66,16 +68,18 @@ class PorcupinePipeline:
_sslcontext = None
_message_id = 1
_last_ping = 0
_recorder = None
_devices = {}
_conversation_id = None
_followup = False

##########################################
def __init__(self, args: argparse.Namespace):
"""Setup Websocket client and audio pipeline"""

signal.signal(signal.SIGINT, self.stop)
signal.signal(signal.SIGTERM, self.stop)

self._state = State(args=args)
self._state.running = False

Expand All @@ -97,6 +101,11 @@ def __init__(self, args: argparse.Namespace):
sys.exit(0)

self._porcupine = get_porcupine(self._state)
self._recorder = PvRecorder(
device_index=args.audio_device,
frame_length=self._porcupine.frame_length,
)

self._audio_thread = threading.Thread(
target=self.read_audio,
daemon=True,
Expand Down Expand Up @@ -135,22 +144,27 @@ def start(self) -> None:
self._event_loop.run_until_complete(self._start_audio_pipeline())

##########################################
def stop(self) -> None:
def stop(self, signum=0, frame=None) -> None:
"""Stop audio thread and loop"""

_LOGGER.info("Stopping")
if signum:
signame = signal.Signals(signum).name
_LOGGER.debug(signame)

self._state.recording = False
self._state.running = False
self._websocket = None

self._state.recording = False
self._recorder.stop()

self._audio_thread.join(1)

if hasattr(self._porcupine, "delete"):
self._porcupine.delete()

self._porcupine = None

self._websocket = None
sys.exit(0)

##########################################
async def _ping(self):
"""Send Ping to HA"""
Expand All @@ -163,10 +177,14 @@ async def _ping(self):
await asyncio.sleep(0.3)
return

await self._send_ws({TYPE: "ping"})
response = await self._websocket.receive_json(timeout=WEBSOCKET_TIMEOUT)
response = await self._send_ws({TYPE: "ping"})
if response.get(TYPE) == "pong":
self._state.connected = True

else:
self._state.connected = False
_LOGGER.error(response)

assert response[TYPE] == "pong", response
self._last_ping = int(time.time())

##########################################
Expand All @@ -193,6 +211,10 @@ async def _send_ws(self, message: dict) -> None:
await self._websocket.send_json(message)
self._message_id += 1

response = await self._websocket.receive_json(timeout=WEBSOCKET_TIMEOUT)
_LOGGER.debug("send_ws() response=%s", response)
return response

##########################################
async def _start_audio_pipeline(self):
"""Start HA audio pipeline"""
Expand Down Expand Up @@ -244,16 +266,15 @@ async def get_audio_pipeline(self) -> None:
)

# Get list of available pipelines and resolve name
await self._send_ws(
msg = await self._send_ws(
{
TYPE: "assist_pipeline/pipeline/list",
}
)
msg = await self._websocket.receive_json()
_LOGGER.debug(msg)
if RESULT not in msg:
_LOGGER.error("FAiled to get audio pipeline from HA")
_LOGGER.error("response=%s", msg)
_LOGGER.error(msg)
return

pipelines = msg[RESULT]["pipelines"]
Expand Down Expand Up @@ -295,9 +316,12 @@ async def _process_loop(self) -> None:
pipeline_args["pipeline"] = self._pipeline_id

# Send audio pipeline args to HA
await self._send_ws(pipeline_args)
msg = await self._websocket.receive_json()
assert msg["success"], "Pipeline failed to start"
msg = await self._send_ws(pipeline_args)
if not msg.get("success"):
_LOGGER.error(
msg.get(ERROR, {}).get(MESSAGE, "Pipeline failed to start")
)
return

_LOGGER.info(
"Listening and sending audio to voice pipeline %s", self._pipeline_id
Expand All @@ -320,11 +344,14 @@ async def stt_task(self) -> None:
[msg[EVENT][DATA]["runner_data"].get("stt_binary_handler_id")]
)

receive_event_task = asyncio.create_task(self._websocket.receive_json())
receive_event_task = asyncio.create_task(
self._websocket.receive_json(timeout=WEBSOCKET_TIMEOUT)
)

while self._state.connected:
audio_chunk = await self._state.audio_queue.get()
if not audio_chunk:
_LOGGER.error("No audio chunk in queue")
break

# Prefix binary message with handler id
send_audio_task = asyncio.create_task(
Expand Down Expand Up @@ -359,6 +386,10 @@ async def stt_task(self) -> None:
)
break

elif event_type == "intent-end":
intent = event_data.get("intent_output", {})
self._conversation_id = intent.get("conversation_id")

elif event_type == "stt-end":
# HA finished processing speech to text with result
speech = event_data["stt_output"].get("text")
Expand All @@ -377,10 +408,14 @@ async def stt_task(self) -> None:

else:
_LOGGER.debug("event_type=%s", event_type)
_LOGGER.debug("event_data=%s", event_data)
_LOGGER.debug("event=%s", event)
# _LOGGER.debug("event_data=%s", event_data)

receive_event_task = asyncio.create_task(self._websocket.receive_json())

if not self._state.running:
break

if send_audio_task not in done:
await send_audio_task

Expand All @@ -393,16 +428,11 @@ def read_audio(self) -> None:
ratecv_state = None

_LOGGER.debug("Reading audio")
recorder = PvRecorder(
device_index=args.audio_device,
frame_length=self._porcupine.frame_length,
)

recorder.start()
self._recorder.start()
self._state.recording = False
while self._state.running:
try:
pcm = recorder.read()
pcm = self._recorder.read()

except OSError as err:
_LOGGER.error("Exception: %s", err)
Expand Down Expand Up @@ -450,13 +480,21 @@ def read_audio(self) -> None:
async def _play_response(self, url: str) -> None:
"""Play response wav file from HA"""

request = requests.get(url, timeout=(10, 30))
if request.status_code > 299:
try:
audio_data = None
request = requests.get(url, timeout=(10, 15))
if request.status_code < 300:
audio_data = request.content

except (TimeoutError, ConnectionError) as err:
_LOGGER.error("Exception: %s", err)

if not audio_data:
_LOGGER.error("Failed to get audio file at %s", url)
return

audio = simpleaudio.play_buffer(
request.content,
audio_data,
self._state.args.channels,
self._state.args.width,
self._state.args.rate,
Expand Down Expand Up @@ -541,7 +579,6 @@ def get_porcupine(state: State) -> Porcupine:

##########################################
if __name__ == "__main__":

args = get_cli_args()
_LOGGER.setLevel(level=logging.DEBUG if args.debug else logging.INFO)
if args.debug:
Expand All @@ -558,6 +595,3 @@ def get_porcupine(state: State) -> Porcupine:
audio_pipeline = PorcupinePipeline(args)
with suppress(KeyboardInterrupt):
audio_pipeline.start()

audio_pipeline.stop()
sys.exit(0)

0 comments on commit 52ac3cb

Please sign in to comment.