Skip to content

Commit

Permalink
Merge pull request #54 from beeldengeluid/no-dane
Browse files Browse the repository at this point in the history
remove DANE
  • Loading branch information
greenw0lf authored Sep 12, 2024
2 parents bc17666 + e5fecfb commit 833ddfa
Show file tree
Hide file tree
Showing 41 changed files with 16,294 additions and 1,761 deletions.
7 changes: 5 additions & 2 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/data
/misc
/model
/models
/config
/tests
.venv
Expand All @@ -12,4 +12,7 @@
.coverage
__pycache__
s3-creds.env
.vscode
.vscode
.env
.env.override
/data
31 changes: 31 additions & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# passed to --input-uri (see main.py)
INPUT_URI=http://model-hosting.beng.nl/kaldi-nl-test.mp3
# uncomment if you have downloaded s3://x-omg-daan-av/dane-asr-worker-sample-data.tar.gz
# INPUT_URI=http://fake-hosting.beng.nl/2101608150135908031__NOS_JOURNAAL_-WON01207359.mp4

# passed to --output-uri (see main.py)
OUTPUT_URI=s3://x-omg-daan-av/assets/2101608150135908031__NOS_JOURNAAL_-WON01207359/

# make sure to mount this dir into the container (see docker-compose-dane-worker.yml)
DATA_BASE_DIR=./data
MODEL_BASE_DIR=./model

# make sure to get a valid endpoint from a CODEOWNER
S3_ENDPOINT_URL=https://some_url

# default bucket and subdir
S3_BUCKET=x-omg-daan-av
S3_FOLDER_IN_BUCKET=assets

# your AWS credentials for the S3 bucket in question
AWS_ACCESS_KEY_ID=your-key-id
AWS_SECRET_ACCESS_KEY=your-secret-access-key

# Whisper related settings
W_WORD_TIMESTAMPS=y # or n
W_DEVICE=cuda # "cpu" to run on CPU, otherwise "cuda" to run on GPU
W_VAD=y # whether to use voice activity detection (VAD) or not
W_MODEL=large-v2 # check the README for available options
W_BEAM_SIZE=5
W_BEST_OF=5
W_TEMPERATURE="(0.0,0.2,0.4,0.6,0.8,1.0)"
12 changes: 10 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
/data/*
model/*
!/data/README.md
!/data/input/
/data/input/*
!/data/input/testsource__testcarrier.wav
!/data/output
/data/output/*
!/data/output/testsource__testcarrier
!/data/output/testsource__testcarrier/*
/model/*
__pycache__
.pytest_cache
.coverage
/config.yml
s3-creds.env
.vscode/*
.vscode/*
.env.override
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ FROM nvidia/cuda:12.2.2-cudnn8-runtime-ubuntu22.04
RUN mkdir /root/.DANE /mnt/dane-fs /src /data /model

RUN apt-get update && \
apt-get install -y python3-pip python3-dev python-is-python3 && \
apt-get install -y python3-pip python3.11-dev python-is-python3 ffmpeg && \
rm -rf /var/lib/apt/lists/*


WORKDIR /src

ENV POETRY_NO_INTERACTION=1 \
Expand Down
211 changes: 43 additions & 168 deletions base_util.py
Original file line number Diff line number Diff line change
@@ -1,183 +1,58 @@
from typing import Any, List
from yacs.config import CfgNode
import os
from pathlib import Path
import logging
import ntpath
import os
import subprocess
from typing import Tuple
from config import data_base_dir


LOG_FORMAT = "%(asctime)s|%(levelname)s|%(process)d|%(module)s|%(funcName)s|%(lineno)d|%(message)s"
logger = logging.getLogger(__name__)


def validate_config(config: CfgNode, validate_file_paths: bool = True) -> bool:
"""Check the configuration (supplied by config.yml)
Most of the config is related to DANE and do not need to be altered when
developing locally, except the last part (settings for this worker specifically).
consult https://github.com/beeldengeluid/dane-example-worker/wiki/Config
"""
try:
__validate_environment_variables()
except AssertionError as e:
print("Error malconfigured worker: env vars incomplete")
print(str(e))
return False

parent_dirs_to_check: List[str] = [] # parent dirs of file paths must exist
try:
# rabbitmq settings
assert config.RABBITMQ, "RABBITMQ"
assert check_setting(config.RABBITMQ.HOST, str), "RABBITMQ.HOST"
assert check_setting(config.RABBITMQ.PORT, int), "RABBITMQ.PORT"
assert check_setting(config.RABBITMQ.EXCHANGE, str), "RABBITMQ.EXCHANGE"
assert check_setting(
config.RABBITMQ.RESPONSE_QUEUE, str
), "RABBITMQ.RESPONSE_QUEUE"
assert check_setting(config.RABBITMQ.USER, str), "RABBITMQ.USER"
assert check_setting(config.RABBITMQ.PASSWORD, str), "RABBITMQ.PASSWORD"

# Elasticsearch settings
assert config.ELASTICSEARCH, "ELASTICSEARCH"
assert check_setting(config.ELASTICSEARCH.HOST, list), "ELASTICSEARCH.HOST"
assert (
len(config.ELASTICSEARCH.HOST) == 1
and type(config.ELASTICSEARCH.HOST[0]) is str
), "Invalid ELASTICSEARCH.HOST"

assert check_setting(config.ELASTICSEARCH.PORT, int), "ELASTICSEARCH.PORT"
assert check_setting(config.ELASTICSEARCH.USER, str, True), "ELASTICSEARCH.USER"
assert check_setting(
config.ELASTICSEARCH.PASSWORD, str, True
), "ELASTICSEARCH.PASSWORD"
assert check_setting(config.ELASTICSEARCH.SCHEME, str), "ELASTICSEARCH.SCHEME"
assert check_setting(config.ELASTICSEARCH.INDEX, str), "ELASTICSEARCH.INDEX"

# DANE python lib settings
assert config.PATHS, "PATHS"
assert check_setting(config.PATHS.TEMP_FOLDER, str), "PATHS.TEMP_FOLDER"
assert check_setting(config.PATHS.OUT_FOLDER, str), "PATHS.OUT_FOLDER"

assert config.FILE_SYSTEM, "FILE_SYSTEM"
assert check_setting(
config.FILE_SYSTEM.BASE_MOUNT, str
), "FILE_SYSTEM.BASE_MOUNT"
assert check_setting(config.FILE_SYSTEM.INPUT_DIR, str), "FILE_SYSTEM.INPUT_DIR"
assert check_setting(
config.FILE_SYSTEM.OUTPUT_DIR, str
), "FILE_SYSTEM.OUTPUT_DIR"

# settings for input & output handling
assert config.INPUT, "INPUT"
assert check_setting(
config.INPUT.S3_ENDPOINT_URL, str, True
), "INPUT.S3_ENDPOINT_URL"
assert check_setting(
config.INPUT.DELETE_ON_COMPLETION, bool
), "INPUT.DELETE_ON_COMPLETION"

assert config.OUTPUT, "OUTPUT"
assert check_setting(
config.OUTPUT.DELETE_ON_COMPLETION, bool
), "OUTPUT.DELETE_ON_COMPLETION"
assert check_setting(
config.OUTPUT.TRANSFER_ON_COMPLETION, bool
), "OUTPUT.TRANSFER_ON_COMPLETION"
if config.OUTPUT.TRANSFER_ON_COMPLETION:
# required only in case output must be transferred
assert check_setting(
config.OUTPUT.S3_ENDPOINT_URL, str
), "OUTPUT.S3_ENDPOINT_URL"
assert check_setting(config.OUTPUT.S3_BUCKET, str), "OUTPUT.S3_BUCKET"
assert check_setting(
config.OUTPUT.S3_FOLDER_IN_BUCKET, str
), "OUTPUT.S3_FOLDER_IN_BUCKET"

# settings for this worker specifically
assert check_setting(
config.WHISPER_ASR_SETTINGS.WORD_TIMESTAMPS, bool
), "WHISPER_ASR_SETTINGS.WORD_TIMESTAMPS"
assert check_setting(
config.WHISPER_ASR_SETTINGS.DEVICE, str
), "WHISPER_ASR_SETTINGS.DEVICE"
assert check_setting(
config.WHISPER_ASR_SETTINGS.VAD, bool
), "WHISPER_ASR_SETTINGS.VAD"
assert check_setting(
config.WHISPER_ASR_SETTINGS.MODEL, str
), "WHISPER_ASR_SETTINGS.MODEL"
assert check_setting(
config.WHISPER_ASR_SETTINGS.BEAM_SIZE, int
), "WHISPER_ASR_SETTINGS.BEAM_SIZE"
assert check_setting(
config.WHISPER_ASR_SETTINGS.BEST_OF, int
), "WHISPER_ASR_SETTINGS.BEST_OF"
assert check_setting(
config.WHISPER_ASR_SETTINGS.TEMPERATURE, str
), "WHISPER_ASR_SETTINGS.TEMPERATURE"
# the file name without extension is used as asset ID
def get_asset_info(input_file: str) -> Tuple[str, str]:
file_name = ntpath.basename(input_file)
asset_id, extension = os.path.splitext(file_name)
logger.info(f"working with this asset ID {asset_id}")
return asset_id, extension

assert __check_dane_dependencies(config.DANE_DEPENDENCIES), "DANE_DEPENDENCIES"

# validate file paths (not while unit testing)
if validate_file_paths:
__validate_parent_dirs(parent_dirs_to_check)
__validate_dane_paths(config.PATHS.TEMP_FOLDER, config.PATHS.OUT_FOLDER)
# i.e. {output_base_dir}/output/{input_filename_without_extension}
def asr_output_dir(input_path):
return os.path.join(data_base_dir, "output", get_asset_info(input_path)[0])

except AssertionError as e:
print(f"Configuration error: {str(e)}")
return False

return True

def extension_to_mime_type(extension: str) -> str:
mime_dict = {
".mov": "video/quicktime",
".mp4": "video/mp4",
".mp3": "audio/mpeg",
".wav": "audio/wav",
}

def __validate_environment_variables() -> None:
# self.UNIT_TESTING = os.getenv('DW_ASR_UNIT_TESTING', False)
try:
assert True # TODO add secrets from the config.yml to the env
except AssertionError as e:
raise (e)

return mime_dict.get(extension, "application/octet-stream")

def __validate_dane_paths(dane_temp_folder: str, dane_out_folder: str) -> None:
i_dir = Path(dane_temp_folder)
o_dir = Path(dane_out_folder)

# used by asr.py and transcode.py
def run_shell_command(cmd: str) -> bool:
logger.info(cmd)
try:
assert os.path.exists(
i_dir.parent.absolute()
), f"{i_dir.parent.absolute()} does not exist"
assert os.path.exists(
o_dir.parent.absolute()
), f"{o_dir.parent.absolute()} does not exist"
except AssertionError as e:
raise (e)


def check_setting(setting: Any, t: type, optional=False) -> bool:
return (type(setting) is t and optional is False) or (
optional and (setting is None or type(setting) is t)
)


def __check_dane_dependencies(deps: Any) -> bool:
"""The idea is that you specify a bit more strictly that your worker can only
work on the OUTPUT of another worker.
If you want to define a dependency, you should populate the deps_allowed list
in this function with valid keys, that other workers use to identify themselves
within DANE: just use the queue_name
(see e.g. https://github.com/beeldengeluid/dane-video-segmentation-worker/blob/main/worker.py#L34-L35)
Then also make sure you define a valid dependency in your worker here:
https://github.com/beeldengeluid/dane-video-segmentation-worker/blob/main/worker.py#L36-L38
(using another worker as an example)
"""
deps_to_check: list = deps if type(deps) is list else []
deps_allowed: list = []
return all(dep in deps_allowed for dep in deps_to_check)


def __validate_parent_dirs(paths: list) -> None:
try:
for p in paths:
assert os.path.exists(
Path(p).parent.absolute()
), f"Parent dir of file does not exist: {p}"
except AssertionError as e:
raise (e)
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True, # needed to support file glob
)

stdout, stderr = process.communicate()
logger.info(stdout)
logger.error(stderr)
logger.info(f"Process is done: return code {process.returncode}")
return process.returncode == 0
except subprocess.CalledProcessError:
logger.exception("CalledProcessError")
return False
except Exception:
logger.exception("Exception")
return False
Loading

0 comments on commit 833ddfa

Please sign in to comment.