| Added support for .mp3 and .flac loading...

This commit is contained in:
2025-05-04 23:56:14 +03:00
parent 660b41aef8
commit b1e18443ba
3 changed files with 83 additions and 81 deletions

55
data.py
View File

@ -5,41 +5,42 @@ import torchaudio
import os import os
import random import random
import torchaudio.transforms as T import torchaudio.transforms as T
import tqdm
import AudioUtils import AudioUtils
class AudioDataset(Dataset): class AudioDataset(Dataset):
audio_sample_rates = [11025] audio_sample_rates = [11025]
MAX_LENGTH = 44100 # Define your desired maximum length here
def __init__(self, input_dir, device): def __init__(self, input_dir, device):
self.input_files = [os.path.join(root, f) for root, _, files in os.walk(input_dir) for f in files if f.endswith('.wav')]
self.device = device self.device = device
input_files = [os.path.join(root, f) for root, _, files in os.walk(input_dir) for f in files if f.endswith('.wav') or f.endswith('.mp3') or f.endswith('.flac')]
data = []
for audio_clip in tqdm.tqdm(input_files, desc=f"Processing {len(input_files)} audio file(s)"):
audio, original_sample_rate = torchaudio.load(audio_clip, normalize=True)
audio = AudioUtils.stereo_tensor_to_mono(audio)
# Generate low-quality audio with random downsampling
mangled_sample_rate = random.choice(self.audio_sample_rates)
resample_transform_low = torchaudio.transforms.Resample(original_sample_rate, mangled_sample_rate)
resample_transform_high = torchaudio.transforms.Resample(mangled_sample_rate, original_sample_rate)
low_audio = resample_transform_low(audio)
low_audio = resample_transform_high(low_audio)
splitted_high_quality_audio = AudioUtils.split_audio(audio, 128)
splitted_high_quality_audio[-1] = AudioUtils.pad_tensor(splitted_high_quality_audio[-1], 128)
splitted_low_quality_audio = AudioUtils.split_audio(low_audio, 128)
splitted_low_quality_audio[-1] = AudioUtils.pad_tensor(splitted_low_quality_audio[-1], 128)
for high_quality_sample, low_quality_sample in zip(splitted_high_quality_audio, splitted_low_quality_audio):
data.append(((high_quality_sample, low_quality_sample), (original_sample_rate, mangled_sample_rate)))
self.audio_data = data
def __len__(self): def __len__(self):
return len(self.input_files) return len(self.audio_data)
def __getitem__(self, idx): def __getitem__(self, idx):
# Load high-quality audio return self.audio_data[idx]
high_quality_audio, original_sample_rate = torchaudio.load(self.input_files[idx], normalize=True)
# Change to mono
high_quality_audio = AudioUtils.stereo_tensor_to_mono(high_quality_audio)
# Generate low-quality audio with random downsampling
mangled_sample_rate = random.choice(self.audio_sample_rates)
resample_transform_low = torchaudio.transforms.Resample(original_sample_rate, mangled_sample_rate)
resample_transform_high = torchaudio.transforms.Resample(mangled_sample_rate, original_sample_rate)
low_quality_audio = resample_transform_low(high_quality_audio)
low_quality_audio = resample_transform_high(low_quality_audio)
splitted_high_quality_audio = AudioUtils.split_audio(high_quality_audio, 128)
splitted_high_quality_audio[-1] = AudioUtils.pad_tensor(splitted_high_quality_audio[-1], 128)
splitted_high_quality_audio = [tensor.to(self.device) for tensor in splitted_high_quality_audio]
splitted_low_quality_audio = AudioUtils.split_audio(low_quality_audio, 128)
splitted_low_quality_audio[-1] = AudioUtils.pad_tensor(splitted_low_quality_audio[-1], 128)
splitted_low_quality_audio = [tensor.to(self.device) for tensor in splitted_low_quality_audio]
return (splitted_high_quality_audio, original_sample_rate), (splitted_low_quality_audio, mangled_sample_rate)

View File

@ -2,20 +2,22 @@ import json
filepath = "my_data.json" filepath = "my_data.json"
def write_data(filepath, data): def write_data(filepath, data, debug=False):
try: try:
with open(filepath, 'w') as f: with open(filepath, 'w') as f:
json.dump(data, f, indent=4) # Use indent for pretty formatting json.dump(data, f, indent=4) # Use indent for pretty formatting
print(f"Data written to '{filepath}'") if debug:
print(f"Data written to '{filepath}'")
except Exception as e: except Exception as e:
print(f"Error writing to file: {e}") print(f"Error writing to file: {e}")
def read_data(filepath): def read_data(filepath, debug=False):
try: try:
with open(filepath, 'r') as f: with open(filepath, 'r') as f:
data = json.load(f) data = json.load(f)
print(f"Data read from '{filepath}'") if debug:
print(f"Data read from '{filepath}'")
return data return data
except FileNotFoundError: except FileNotFoundError:
print(f"File not found: {filepath}") print(f"File not found: {filepath}")

View File

@ -76,7 +76,7 @@ os.makedirs(audio_output_dir, exist_ok=True)
# ========= SINGLE ========= # ========= SINGLE =========
train_data_loader = DataLoader(dataset, batch_size=1, shuffle=True) train_data_loader = DataLoader(dataset, batch_size=256, shuffle=True)
# ========= MODELS ========= # ========= MODELS =========
@ -122,70 +122,69 @@ def start_training():
times_correct = 0 times_correct = 0
# ========= TRAINING ========= # ========= TRAINING =========
for high_quality_data, low_quality_data in tqdm.tqdm(train_data_loader, desc=f"Training epoch {generator_epoch+1}/{generator_epochs}, Current epoch {epoch+1}"): for training_data in tqdm.tqdm(train_data_loader, desc=f"Training epoch {generator_epoch+1}/{generator_epochs}, Current epoch {epoch+1}"):
## Data structure: ## Data structure:
# [[float..., float..., float...], sample_rate] # [[[float..., float..., float...], [float..., float..., float...]], [original_sample_rate, mangled_sample_rate]]
# ========= LABELS ========= # ========= LABELS =========
good_quality_data = training_data[0][0].to(device)
bad_quality_data = training_data[0][1].to(device)
original_sample_rate = training_data[1][0]
mangled_sample_rate = training_data[1][1]
batch_size = high_quality_data[0][0].size(0) batch_size = good_quality_data.size(0)
real_labels = torch.ones(batch_size, 1).to(device) real_labels = torch.ones(batch_size, 1).to(device)
fake_labels = torch.zeros(batch_size, 1).to(device) fake_labels = torch.zeros(batch_size, 1).to(device)
high_quality_audio = high_quality_data high_quality_audio = (good_quality_data, original_sample_rate)
low_quality_audio = low_quality_data low_quality_audio = (bad_quality_data, mangled_sample_rate)
ai_enhanced_outputs = [] # ========= DISCRIMINATOR =========
discriminator.train()
d_loss = discriminator_train(
good_quality_data,
bad_quality_data,
real_labels,
fake_labels,
discriminator,
generator,
criterion_d,
optimizer_d
)
for high_quality_sample, low_quality_sample in tqdm.tqdm(zip(high_quality_data[0], low_quality_data[0]), desc=f"Processing audio clip.. Length: {len(high_quality_data[0])}"): # ========= GENERATOR =========
# ========= DISCRIMINATOR ========= generator.train()
discriminator.train() generator_output, combined_loss, adversarial_loss, mel_l1_tensor, log_stft_l1_tensor, mfcc_l_tensor = generator_train(
d_loss = discriminator_train( bad_quality_data,
high_quality_sample, good_quality_data,
low_quality_sample, real_labels,
real_labels, generator,
fake_labels, discriminator,
discriminator, criterion_d,
generator, optimizer_g,
criterion_d, device,
optimizer_d mel_transform,
) stft_transform,
mfcc_transform
)
# ========= GENERATOR ========= if debug:
generator.train() print(f"D_LOSS: {d_loss.item():.4f}, COMBINED_LOSS: {combined_loss.item():.4f}, ADVERSARIAL_LOSS: {adversarial_loss.item():.4f}, MEL_L1_LOSS: {mel_l1_tensor.item():.4f}, LOG_STFT_L1_LOSS: {log_stft_l1_tensor.item():.4f}, MFCC_LOSS: {mfcc_l_tensor.item():.4f}")
generator_output, combined_loss, adversarial_loss, mel_l1_tensor, log_stft_l1_tensor, mfcc_l_tensor = generator_train( scheduler_d.step(d_loss.detach())
low_quality_sample, scheduler_g.step(adversarial_loss.detach())
high_quality_sample,
real_labels,
generator,
discriminator,
criterion_d,
optimizer_g,
device,
mel_transform,
stft_transform,
mfcc_transform
)
ai_enhanced_outputs.append(generator_output)
if debug:
print(f"D_LOSS: {d_loss.item():.4f}, COMBINED_LOSS: {combined_loss.item():.4f}, ADVERSARIAL_LOSS: {adversarial_loss.item():.4f}, MEL_L1_LOSS: {mel_l1_tensor.item():.4f}, LOG_STFT_L1_LOSS: {log_stft_l1_tensor.item():.4f}, MFCC_LOSS: {mfcc_l_tensor.item():.4f}")
scheduler_d.step(d_loss.detach())
scheduler_g.step(adversarial_loss.detach())
# ========= SAVE LATEST AUDIO ========= # ========= SAVE LATEST AUDIO =========
high_quality_audio = (torch.cat(high_quality_data[0]), high_quality_data[1]) high_quality_audio = (good_quality_data, original_sample_rate)
low_quality_audio = (torch.cat(low_quality_data[0]), low_quality_data[1]) low_quality_audio = (bad_quality_data, original_sample_rate)
ai_enhanced_audio = (torch.cat(ai_enhanced_outputs), high_quality_data[1]) ai_enhanced_audio = (generator_output, original_sample_rate)
new_epoch = generator_epoch+epoch new_epoch = generator_epoch+epoch
if generator_epoch % 25 == 0: # if generator_epoch % 25 == 0:
print(f"Saved epoch {new_epoch}!") # print(f"Saved epoch {new_epoch}!")
torchaudio.save(f"{audio_output_dir}/epoch-{new_epoch}-audio-crap.wav", low_quality_audio[0].cpu().detach(), high_quality_audio[1]) # <-- Because audio clip was resampled in data.py from original to crap and to original again. # torchaudio.save(f"{audio_output_dir}/epoch-{new_epoch}-audio-orig.wav", high_quality_audio[0][-1].cpu().detach(), high_quality_audio[1][-1])
torchaudio.save(f"{audio_output_dir}/epoch-{new_epoch}-audio-ai.wav", ai_enhanced_audio[0].cpu().detach(), ai_enhanced_audio[1]) # torchaudio.save(f"{audio_output_dir}/epoch-{new_epoch}-audio-crap.wav", low_quality_audio[0][-1].cpu().detach(), high_quality_audio[1][-1]) # <-- Because audio clip was resampled in data.py from original to crap and to original again.
torchaudio.save(f"{audio_output_dir}/epoch-{new_epoch}-audio-orig.wav", high_quality_audio[0].cpu().detach(), high_quality_audio[1]) # torchaudio.save(f"{audio_output_dir}/epoch-{new_epoch}-audio-ai.wav", ai_enhanced_audio[0][-1].cpu().detach(), high_quality_audio[1][-1])
#if debug: #if debug:
# print(generator.state_dict().keys()) # print(generator.state_dict().keys())