Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add randomized shift #244

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions audiomentations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@
from .augmentations.add_short_noises import AddShortNoises
from .augmentations.air_absorption import AirAbsorption
from .augmentations.apply_impulse_response import ApplyImpulseResponse
from .augmentations.apply_mp3_codec import ApplyMP3Codec
from .augmentations.apply_ulaw_codec import ApplyULawCodec
from .augmentations.apply_vorbis_codec import ApplyVorbisCodec
from .augmentations.band_limit_with_two_phase_resample import BandLimitWithTwoPhaseResample
from .augmentations.band_pass_filter import BandPassFilter
from .augmentations.band_stop_filter import BandStopFilter
from .augmentations.clip import Clip
from .augmentations.clipping_distortion import ClippingDistortion
from .augmentations.compressor import Compressor
from .augmentations.destroy_levels import DestroyLevels
from .augmentations.gain import Gain
from .augmentations.gain_transition import GainTransition
from .augmentations.high_pass_filter import HighPassFilter
Expand All @@ -18,20 +24,29 @@
from .augmentations.low_pass_filter import LowPassFilter
from .augmentations.low_shelf_filter import LowShelfFilter
from .augmentations.mp3_compression import Mp3Compression
from .augmentations.noise_gate import NoiseGate
from .augmentations.normalize import Normalize
from .augmentations.overdrive import Overdrive
from .augmentations.padding import Padding
from .augmentations.peaking_filter import PeakingFilter
from .augmentations.phaser import Phaser
from .augmentations.pitch_shift import PitchShift
from .augmentations.polarity_inversion import PolarityInversion
from .augmentations.resample import Resample
from .augmentations.reverse import Reverse
from .augmentations.room_simulator import RoomSimulator
from .augmentations.seven_band_parametric_eq import SevenBandParametricEQ
from .augmentations.shift import Shift
from .augmentations.simple_compressor import SimpleCompressor
from .augmentations.short_delay import ShortDelay
from .augmentations.simple_expansor import SimpleExpansor
from .augmentations.tanh_distortion import TanhDistortion
from .augmentations.time_mask import TimeMask
from .augmentations.time_stretch import TimeStretch
from .augmentations.tremolo import Tremolo
from .augmentations.trim import Trim
from .augmentations.add_phase_randomization import AddRandomizedPhaseShiftNoise
from .augmentations.two_pole_all_pass_filter import TwoPoleAllPassFilter
from .core.composition import Compose, SpecCompose, OneOf, SomeOf
from .spec_augmentations.spec_channel_shuffle import SpecChannelShuffle
from .spec_augmentations.spec_frequency_mask import SpecFrequencyMask
Expand Down
29 changes: 29 additions & 0 deletions audiomentations/augmentations/add_phase_randomization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import random

import numpy as np
from audiomentations.core.transforms_interface import BaseWaveformTransform


class AddRandomizedPhaseShiftNoise(BaseWaveformTransform):

def __init__(self, p=0.5, min_phase_shift=0, max_phase_shift=np.pi):
"""
:param p:
"""
super().__init__(p)
self.min_phase_shift = min_phase_shift
self.max_phase_shift = max_phase_shift

def randomize_parameters(self, samples, sample_rate):
super().randomize_parameters(samples, sample_rate)
if self.parameters["should_apply"]:
phase_shift = random.uniform(self.min_phase_shift, self.max_phase_shift)
self.parameters["phase_shift"] = phase_shift

def apply(self, samples):
fourier = np.fft.rfft(samples)
random_phases = np.exp(np.random.uniform(0, self.parameters["phase_shift"], int(len(samples) / 2 + 1)) * 1.0j)
fourier_randomized = fourier * random_phases
new_samples = np.fft.irfft(fourier_randomized)

return new_samples
69 changes: 69 additions & 0 deletions audiomentations/augmentations/apply_mp3_codec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import random

import librosa
import torch
import torchaudio
import numpy as np

from audiomentations.core.utils import find_time_shift
from audiomentations.core.transforms_interface import BaseWaveformTransform


class ApplyMP3Codec(BaseWaveformTransform):
"""
Apply MP3 Codec.
Mp3 encode and decode the audio signal. May cause time shift issues.
"""

supports_multichannel = True

def __init__(self,
min_bitrate=8,
max_bitrate=320,
p=0.5):
"""
:param min_bitrate, int, minimum bitrate (in `kbps`)
:param max_bitrate, int, maximum bitrate (in `kbps`)
:param p: The probability of applying this transform
"""
super().__init__(p)
self.min_bitrate = min_bitrate
self.max_bitrate = max_bitrate
assert self.min_bitrate < self.max_bitrate

def randomize_parameters(self, samples, sample_rate):
super().randomize_parameters(samples, sample_rate)
if self.parameters["should_apply"]:
self.parameters['bitrate'] = random.randint(
self.min_bitrate, self.max_bitrate
)

def apply(self, samples, sample_rate):
samples_torch = torch.tensor(samples.astype(np.float32))

if len(samples.shape) == 1:
samples_torch = samples_torch.unsqueeze(0)

compressed_samples = torchaudio.functional.apply_codec(
samples_torch,
sample_rate,
format='mp3',
compression=self.parameters['bitrate']
)

# the decoded audio may have more samples than the original due to mp3 codec characteristics.
# to alight the decoded audio with original, first use convolution to find time shift.
shift = find_time_shift(compressed_samples[0].numpy(), samples_torch[0].numpy())

assert shift > 0
compressed_samples = compressed_samples[:, shift:]
assert compressed_samples.shape[-1] >= samples_torch.shape[-1]
compressed_samples = compressed_samples[:, :samples_torch.shape[-1]]

if len(samples.shape) == 1:
compressed_samples = compressed_samples[0]

compressed_samples = compressed_samples.numpy()

assert compressed_samples.shape == samples.shape
return compressed_samples
47 changes: 47 additions & 0 deletions audiomentations/augmentations/apply_ulaw_codec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import random

import librosa
import numpy as np
import torch
import torchaudio

from audiomentations.core.transforms_interface import BaseWaveformTransform


class ApplyULawCodec(BaseWaveformTransform):
"""
Apply MU-Law/U-Law Codec.
ULAW encode and decode the audio signal.
"""

supports_multichannel = True

def __init__(self,
p=0.5):
"""
:param p: The probability of applying this transform
"""
super().__init__(p)


def apply(self, samples, sample_rate):
samples_torch = torch.tensor(samples.astype(np.float32))

if len(samples.shape) == 1:
samples_torch = samples_torch.unsqueeze(0)

compressed_samples = torchaudio.functional.apply_codec(
samples_torch,
sample_rate,
format='wav',
encoding='ULAW',
bits_per_sample=8
)

if len(samples.shape) == 1:
compressed_samples = compressed_samples[0]

compressed_samples = compressed_samples.numpy()

assert compressed_samples.shape == samples.shape
return compressed_samples
59 changes: 59 additions & 0 deletions audiomentations/augmentations/apply_vorbis_codec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import random

import librosa
import numpy as np
import torch
import torchaudio

from audiomentations.core.transforms_interface import BaseWaveformTransform


class ApplyVorbisCodec(BaseWaveformTransform):
"""
Apply OGG/Vorbis Codec.
OGG/Vorbis encode and decode the audio signal.
"""

supports_multichannel = True

def __init__(self,
min_compression=-1,
max_compression=10,
p=0.5):
"""
:param min_compression, int, minimum compression. This corresponds to ``-C`` option of ``sox`` command.
:param max_compression, int, maximum compression. This corresponds to ``-C`` option of ``sox`` command.
:param p: The probability of applying this transform
"""
super().__init__(p)
self.min_compression = min_compression
self.max_compression = max_compression
assert self.min_compression < self.max_compression

def randomize_parameters(self, samples, sample_rate):
super().randomize_parameters(samples, sample_rate)
if self.parameters["should_apply"]:
self.parameters['compression'] = random.randint(
self.min_compression, self.max_compression
)

def apply(self, samples, sample_rate):
samples_torch = torch.tensor(samples.astype(np.float32))

if len(samples.shape) == 1:
samples_torch = samples_torch.unsqueeze(0)

compressed_samples = torchaudio.functional.apply_codec(
samples_torch,
sample_rate,
format='ogg',
compression=self.parameters['compression']
)

if len(samples.shape) == 1:
compressed_samples = compressed_samples[0]

compressed_samples = compressed_samples.numpy()

assert compressed_samples.shape == samples.shape
return compressed_samples
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import random

import librosa

from audiomentations.core.transforms_interface import BaseWaveformTransform


class BandLimitWithTwoPhaseResample(BaseWaveformTransform):
RESAMPLE_TYPES = ["soxr_vhq", "soxr_hq", "soxr_mq", "soxr_lq", "soxr_qq",
"kaiser_best", "kaiser_fast", "fft", "polyphase", "linear", "zero_order_hold",
"sinc_best", "sinc_medium", "sinc_fastest"]

"""
Band limit with two phase resample.
Phase 1: Downsample to a random sample rate between min_sample_rate and max_sample_rate
Phase 2: Upsample back to the original sample rate

If the random sampling rate between min_sample_rate and max_sample_rate is greater than the original sample rate,
the audio will be upsamled first and then downsampled.
"""

supports_multichannel = True

def __init__(self,
min_sample_rate=8000,
max_sample_rate=44100,
res_types=RESAMPLE_TYPES,
p=0.5):
"""
:param min_sample_rate: int, Minimum sample rate
:param max_sample_rate: int, Maximum sample rate
:param res_types: [None, "all" or list of resample types], Resample types to use.
Should be from librosa resample res_types
:param p: The probability of applying this transform
"""
super().__init__(p)
assert min_sample_rate <= max_sample_rate
self.min_sample_rate = min_sample_rate
self.max_sample_rate = max_sample_rate

self.res_types = res_types
if self.res_types == 'all':
self.res_types = self.RESAMPLE_TYPES

if self.res_types:
for i in self.res_types:
assert i in self.RESAMPLE_TYPES

def randomize_parameters(self, samples, sample_rate):
super().randomize_parameters(samples, sample_rate)
if self.parameters["should_apply"]:
self.parameters["target_sample_rate"] = random.randint(
self.min_sample_rate, self.max_sample_rate
)

if self.res_types:
self.parameters["res_type_down"] = random.choice(self.res_types)
self.parameters["res_type_up"] = random.choice(self.res_types)
else:
self.parameters["res_type_down"] = None
self.parameters["res_type_up"] = None

def apply(self, samples, sample_rate):
downsampled_samples = librosa.core.resample(
samples,
orig_sr=sample_rate,
target_sr=self.parameters["target_sample_rate"],
res_type=self.parameters["res_type_down"],
)

restored_samples = librosa.core.resample(
downsampled_samples,
orig_sr=self.parameters["target_sample_rate"],
target_sr=sample_rate,
res_type=self.parameters["res_type_up"],
)

if samples.shape != restored_samples.shape:
restored_samples = librosa.util.fix_length(restored_samples, size=samples.shape[-1])
assert samples.shape == restored_samples.shape
return restored_samples
Loading