-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
264 lines (224 loc) · 9.83 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# Author: Hereux
import json
from threading import Thread
import colorlog
import elevenlabs
import pvporcupine
import pyaudio
import text2numde
from vosk import Model, SpkModel, KaldiRecognizer, SetLogLevel
from SpeechToText import TextToCommands
from SpeechToText.SpeechToText import SpeechToText
from TextToSpeech import TextToSpeech
from WakeWords.CheckForWakeWord import CheckForWakeWord
from bin import SocketServer, utils
from bin.rasa import SendCommand
settings = json.load(
open("bin/settings.json", "r", encoding="utf-8")
)
access_key = settings["porcupine_access_key"]
spk_model_path = settings["spk_model_path"]
model_path = settings["vosk_model"]
live_speaking = bool(settings["live_speaking"])
using_internet = not bool(settings["offline_mode"])
vosk_log_level = int(settings["vosk_log_level"])
SetLogLevel(vosk_log_level)
logger = colorlog.getLogger("AIVoiceAssistant_HX")
logger.setLevel(colorlog.INFO)
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
"%(log_color)s%(levelname)-8s%(reset)s %(white)s%(message)s %(reset)s",
datefmt=None,
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red',
}
))
logger.addHandler(handler)
logger.info("Starting...")
BLUE = '\033[94m'
GREEN = '\033[92m'
RESET = '\033[0m' # Zurück zur Standardfarbe
class HomeAssistant:
"""
Die Klasse HomeAssistant ist die Hauptklasse des Programms. Sie ist für die Steuerung des Programms zuständig.
Der HomeAssistant hört auf ein Wakeword, erkennt den gesprochenen Text und führt dann den entsprechenden Befehl aus.
Das Programm befindet sich mitten in der Entwicklungsphase und wird ständig weiterentwickelt.
Powered by Hereux. All rights reserved!
"""
def __init__(self):
super().__init__()
self.using_rasa = bool(settings["using_rasa"])
self.is_running = True
self.kaldi_recognizer = None
self.audio_stream = None
utils.__write_to_txt__("", reset=True)
self.audio_file_gen_process = None
self.memory = {}
elevenlabs.set_api_key(settings["elevenlabs_api_key"])
self.pa = pyaudio.PyAudio()
self.porcupine = pvporcupine.create(
access_key=f'{access_key}',
keyword_paths=["WakeWords/CustomKeywords/Glados_de_windows_v3_0_0.ppn"],
model_path="WakeWords/porcupine_params_de.pv",
sensitivities=[1],
)
self.cww = CheckForWakeWord()
self.stt = SpeechToText()
self.ttc = TextToCommands.TextToCommands()
self.tts = TextToSpeech.TextToSpeech(live_speaking=live_speaking, using_internet=using_internet)
self.server = SocketServer.Server()
self.server.start()
s2t_model = Model(model_path)
speaker_model = SpkModel(spk_model_path)
self.kaldi_recognizer = KaldiRecognizer(s2t_model, 16000, speaker_model)
self.speakers = self.__get_speakers__()
utils.writetojson("settings", "speakers", self.speakers)
def __get_speakers__(self):
"""
Gibt eine gefilterte Liste aller Lautsprecher zurück.
:return: List[device_info.get("name")]
"""
speakers = []
for device_count in range(self.pa.get_device_count()):
device_info = self.pa.get_device_info_by_index(device_count)
# Filter unwanted audio drivers
if device_info.get("hostApi") != 2:
continue
# Filter microphones
if device_info.get("maxInputChannels") != 0:
continue
speakers.append(device_info.get("name"))
return speakers
def get_audio_stream(self): # Startet den Audio Stream
"""
Startet den Audio Stream von Porcupine.
:return: type: Stream
"""
self.audio_stream = self.pa.open(
rate=self.porcupine.sample_rate,
channels=1,
format=pyaudio.paInt16,
input=True,
frames_per_buffer=self.porcupine.frame_length,
input_device_index=self.pa.get_default_input_device_info().get("index"),
)
def when_missing_audio_files(self):
if self.audio_file_gen_process is None:
logger.info("Starting thread")
missing_data = self.memory["missing_data"] = self.tts.missing_data
self.audio_file_gen_process = Thread(target=self.tts.generate_audio_files, args=(missing_data,))
self.audio_file_gen_process.start()
elif self.audio_file_gen_process.is_alive() is False:
logger.info("THREAD IS DEAD")
self.tts.should_listen_after_playing = self.tts.should_listen_after_generating
self.tts.should_listen_after_generating = False
self.tts.is_missing_files = False
self.audio_file_gen_process = None
missing_memory = self.memory["missing_data"]
lds, exists = self.tts.get_command_path(missing_memory[0], missing_memory[2])
print(missing_memory)
if exists:
self.tts.elevenlabs_module(command=missing_memory[0], entities=missing_memory[1],
command_index=missing_memory[2])
else:
logger.warning("Error: File still missing.")
self.memory.clear()
def manual_ttc(self, sentence: str):
"""
Findet einen passenden Befehl und gibt den Befehlsnamen, die Entities und die Antwort zurück.
:param sentence: Der User Input.
:return: None
"""
command, response, entities = self.ttc.manual_text_to_commands(sentence)
utils.__write_to_txt__(f"{command}|{entities}|{response}")
return command, entities, response
@staticmethod
def send_command_to_client(command: str, entities: list, response: str):
"""
Sendet den Befehl an den Client und gibt die Antwort zurück.
:param command: Die Befehlsbezeichnung.
:param entities: Die Daten.
:param response: Die Antwort auf den Befehl.
:return: Die Antwort auf den Befehl, ggf. mit ergänzten Werten.
"""
received_data = None
if not entities or len(entities) == 0:
received_data = SendCommand.send_to_server(command=command)
elif len(entities) == 1:
received_data = SendCommand.send_to_server(command=command, slot1=entities[0])
elif len(entities) == 2:
received_data = SendCommand.send_to_server(command=command, slot1=entities[0],
slot2=entities[1])
return received_data if received_data is not None else response
def text_to_speech(self, command: str, entities: list, response: str):
if live_speaking is True:
self.tts.pytts3_module(response)
else:
self.stt.is_listening = self.tts.elevenlabs_module(command=command, entities=entities)
def start(self):
logger.info("GladOS wurde erfolgreich hochgefahren.")
entities = None
command = None
self.get_audio_stream() # Starte Audio Stream
while self.is_running:
try:
# Lese Audio Stream
pcm = self.audio_stream.read(self.porcupine.frame_length)
self.cww.check_for_wakeword(porcupine=self.porcupine, pcm=pcm)
if self.tts.is_missing_files:
self.when_missing_audio_files()
if self.cww.wakeWordFound or self.tts.should_listen:
self.tts.play_sound(settings["vrecog_activation_sound"])
self.stt.is_listening = True
if not self.stt.is_listening:
continue
# Hört dem Audiostream zu und wandelt Sprache in Text um
self.tts.should_listen = False
self.stt.listen(pcm=pcm, kaldi_recognizer=self.kaldi_recognizer)
if self.stt.is_listening is not False:
continue
# Wenn Recognizer fertig ist, wird überprüft, ob ein Text erkannt wurde
sentence: str = self.stt.result_sentence
if sentence.encode("utf-8") == b'':
logger.info("Kein Text erkannt.")
self.tts.play_sound(settings["vrecog_deactivation_sound"])
continue
logger.info("Erkannter Text: " + sentence)
sentence = text2numde.sentence2num(sentence)
logger.info("Sprache zu Text umgewandelt; Text: " + sentence)
# BEFEHLS-ERKENNUNG
if self.using_rasa:
response = self.ttc.get_rasa_response(sentence=sentence)
logger.info("RASA Response: " + response)
else:
command, entities, response = self.manual_ttc(sentence)
# BEFEHLS-AUSFÜHRUNG
command_response = self.send_command_to_client(command, entities, response)
if command_response and command_response != "None":
command = command_response
logger.info("Command: " + command)
# SPRACH-AUSGABE
self.text_to_speech(command, entities, response)
if command == "goodbye_yes":
self.stop()
except KeyboardInterrupt or SystemExit:
logger.info("Home Assistant stopping...")
HomeAssistant.stop(self)
break
def stop(self):
self.server.stop_server()
self.audio_stream.close()
self.pa.terminate()
self.porcupine.delete()
print("Home Assistant stopped")
self.is_running = False
return
if __name__ == '__main__':
home_assistant = HomeAssistant()
home_assistant.start()
# Info: Tonausgabe nicht über Bluetooth möglich