李宏毅深度学习2021春HW4实验四,Strong Baseline,使用Conformer_李宏毅老师深度学习语音预测hw如何change transformer to conformer-程序员宅基地

技术标签: python  深度学习  pytorch  神经网络  

如果您有更好的实现方式请留言告诉作者~~~

任务简介

对给定的一段语音数据预测说话者,就是一个分类问题,总共600个说话者。数据已经预处理好了,我们只需要读入即可。strong baseline的score在0.954


原始模型优化(Transformer)

在demo中,transformer使用head=2,双头,TransformerEncoderLayer中feedforward=256,只使用了一个Transformer层。原始score大概是0.81


优化思路

因为任务并没有这么复杂,pred_layer中并不需要进行进行两次线性变化,我们修改pred_layer只进行一次变换nn.Linear(d_model, n_spks)就可以有较大提升。

对于head,我们修改head=1效果比head=2

我们再将Transformer变成两层self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=2)

经过这些优化,我们在kaggle上的的score能够达到0.90左右
注意:并不是说只能做这些尝试,这是写者经常训练后得到的有提升的优化,许多优化并没有起到作用反而会降低score,这需要读者们自己进行测试,

你可能会有疑问为什么做这些优化,写者由于水平有限并不能完全解答,如果您有任何建议可以留言


Conformer简介

论文地址:Conformer: Convolution-augmented Transformer for Speech Recognition
强烈建议阅读下此篇论文,不要仅仅只会调库调参

Transformer模型擅长捕获基于内容的全局交互,而CNN则有效地利用了局部特征。通过研究如何结合卷积神经网络和Transformer到一个模型,以参数有效的方式对音频序列的局部和全局相依性进行建模,从而实现了两个方面的最佳。

名为卷积增强的transformer模型即conformer。总结: Transformer在提取长序列依赖的时候更有效,而卷积则是擅长提取局部特征。Conformer就是将将两者结合起来。

Conformer结构
Conformer结构
如图,文中称此为马卡龙结构,可以说是很形象了,使用两个half feed forward module在外面,中间夹着Attention层和Convolution Module(卷积层),具体每一层的结构大家可以去拜读下论文。看下来,就是多了个卷积层,正是多的这个卷积层增强了我们学习局部特征的能力。

Conformer模型,本文使用的模型地址为:Conformer,这个库提供了Conformer模块,我们直接引入修改参数就可以,当然,如果你感兴趣也可以自己实现一个Conformer模型。


Conformer使用:

对于本任务来说,我们不需要修改太多代码,只需要使用Conformer代替原来的Transformer,我们引入一个ConformerBlock

 self.conformer_block = ConformerBlock(
	 dim=d_model,
	 dim_head=256,
	 heads=1,  # set 1
	 ff_mult=4,
	 conv_expansion_factor=18,
	 conv_kernel_size=41,
	 attn_dropout=dropout,
	 ff_dropout=dropout,
	 conv_dropout=dropout
  )

然后在forward中使用self.conformer_block代替原来的encoder

Conformer参数优化

同样的,对于Transformer模块,我们使用基本和原来的基本一致的结构,heads=1m ff_mult=4,对于其中参数的作用需要你自己去阅读其源码,对于kenel_size,expansion_factor使用31和8效果较好,使用dropout=0.1,在调试过程中我尝试使用两层conformer发现模型效果变差。

在调参过程中,多次尝试甚至都不如原来transformer的优化,后来我观看输出,发现模型有些简单了,我们在train上的accuracy不是特别高,因此我提高了d_model,在提高到220时效果较好,再向上提升不大,能够达到strong baseline
在这里插入图片描述

完整代码

代码中有些注释供参考

import os
import json
import torch
import random
from pathlib import Path
from torch.utils.data import Dataset, DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence
import torch.nn as nn
import torch.nn.functional as F
import math
from torch.optim import Optimizer
from torch.optim.lr_scheduler import LambdaLR
from tqdm import tqdm
from torch.optim import AdamW
import csv
from conformer import ConformerBlock

#  600分类
class MyDataset(Dataset):
    def __init__(self, data_dir, segment_len=128):
        self.data_dir = data_dir
        self.segment_len = segment_len

        mapping_path = Path(data_dir) / "mapping.json"
        mapping = json.load(mapping_path.open())
        self.speaker2id = mapping['speaker2id']  # map 映射speakid到一个id(number)

        # load metadata of training data
        metadata_path = Path(data_dir) / 'metadata.json'
        metadata = json.load(open(metadata_path))['speakers']  # 映射某个speak
        # id的所有话数据(feature_path, mel_len)

        # get total number of speaker
        self.speaker_num = len(metadata.keys())
        # print('s num', self.speaker_num)
        self.data = []
        for speaker in metadata.keys():
            for utterances in metadata[speaker]:
                self.data.append([utterances['feature_path'], self.speaker2id[speaker]])  # [feature_path, id]

    def __getitem__(self, index):
        feat_path, speaker = self.data[index]
        # lead preprocessed mel-spectrogram
        mel = torch.load(os.path.join(self.data_dir, feat_path))

        # Sefment mel-spectrogram in to 'segment_len' frames.
        if len(mel) > self.segment_len:
            # randomly get the starting point of the segment
            start = random.randint(0, len(mel) - self.segment_len)
            mel = torch.FloatTensor(mel[start:start + self.segment_len])
        else:
            mel = torch.FloatTensor(mel)
        # turn the speaker id into long for computing loss later
        speaker = torch.FloatTensor([speaker]).long()
        return mel, speaker

    def __len__(self):
        return len(self.data)

    def get_speaker_number(self):
        return self.speaker_num


# metadata = json.load(open(Path('./data/Dataset') / 'metadata.json'))['speakers']
# print(metadata.keys())

def collate_batch(batch):
    # process features within a batch
    """
    collate a batch of data
    :param batch:
    :return:
    """
    mel, speaker = zip(*batch)
    # because we train the model batch by batch, we need to pad the features in the same batch to make their lengths the same
    mel = pad_sequence(mel, batch_first=True, padding_value=20)  # 使batch中每个句子长度相同,取最大的,用padding_value覆盖
    # mel: (batch size, length, 40)
    return mel, torch.FloatTensor(speaker).long()


def get_dataloader(data_dir, batch_size, n_workers):
    """Generate dataloader"""
    dataset = MyDataset(data_dir)
    speaker_num = dataset.get_speaker_number()
    # split dataset into training dataset and validation dataset
    trainlen = int(0.9 * len(dataset))
    lengths = [trainlen, len(dataset) - trainlen]
    trainset, validset = random_split(dataset, lengths)

    train_loader = DataLoader(trainset, batch_size=batch_size, shuffle=True, drop_last=True, num_workers=n_workers,
                              pin_memory=True, collate_fn=collate_batch)
    valid_loader = DataLoader(validset, batch_size=batch_size, num_workers=n_workers, drop_last=True, pin_memory=True
                              , collate_fn=collate_batch)

    return train_loader, valid_loader, speaker_num


# Model: Transformer
class Classifier(nn.Module):
    def __init__(self, d_model=220, n_spks=600, dropout=0.1):
        super().__init__()
        # Project the dimension of features from that of input into d_model.
        self.prenet = nn.Linear(40, d_model)
        # TODO:
        #   Change Transformer to Conformer.
        #   https://arxiv.org/abs/2005.08100
        self.conformer_block = ConformerBlock(
            dim=d_model,
            dim_head=256,
            heads=1,  # set 1
            ff_mult=4,
            conv_expansion_factor=18,
            conv_kernel_size=41,
            attn_dropout=dropout,
            ff_dropout=dropout,
            conv_dropout=dropout
        )

        self.encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, dim_feedforward=256, nhead=1
        )

        self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=2)
        # Project the the dimension of features from d_model into speaker nums.
        self.pred_layer = nn.Sequential(
            # nn.Linear(d_model, d_model),
            # nn.ReLU(),
            nn.Linear(d_model, n_spks)
        )

    def forward(self, mels):
        """
        args:
          mels: (batch size, length, 40)
        return:
          out: (batch size, n_spks)
        """
        # out: (batch size, length, d_model)
        out = self.prenet(mels)
        # out: (length, batch size, d_model)
        out = out.permute(1, 0, 2)
        # The encoder layer expect features in the shape of (length, batch size, d_model).
        # out = self.encoder_layer(out)
        # out = self.encoder(out)
        # out: (batch size, length, d_model)
        # out = self.conformer_block(out)
        out = self.conformer_block(out)
        out = out.transpose(0, 1)
        # mean pooling
        stats = out.mean(dim=1)

        # out: (batch, n_spks)
        out = self.pred_layer(stats)
        return out


# Learning rate schedule
def get_cosine_shedule_with_warmup(
        optimizer: Optimizer,
        num_warmup_steps: int,
        num_training_steps: int,
        num_cycles: float = 0.5,
        last_epoch: int = -1
):
    def lr_lambda(current_step):
        # warmup
        if current_step < num_warmup_steps:
            return float(current_step) / float(max(1, num_warmup_steps))
        # decadence
        progress = float(current_step - num_warmup_steps) / float(max(1, num_training_steps - num_warmup_steps))
        return max(0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress)))

    return LambdaLR(optimizer, lr_lambda, last_epoch)


# Model fuction
def model_fn(batch, model, criterion, device):
    """Forward a batch through the model"""
    mels, labels = batch
    mels = mels.to(device)
    labels = labels.to(device)

    outs = model(mels)

    loss = criterion(outs, labels)

    # get the speaker id with hightest probability
    preds = outs.argmax(1)
    # compute accuracy
    accuracy = torch.mean((preds == labels).float())
    return loss, accuracy


def valid(dataloader, model, criterion, device):
    """validate on validation set."""
    model.eval()
    running_loss = 0.0
    running_accuracy = 0.0
    pbar = tqdm(total=len(dataloader.dataset), ncols=0, desc='Valid', unit=' uttr')

    for i, batch in enumerate(dataloader):
        with torch.no_grad():
            loss, accuracy = model_fn(batch, model, criterion, device)
            running_loss += loss.item()
            running_accuracy += accuracy.item()
        pbar.update(dataloader.batch_size)
        pbar.set_postfix(
            loss=f'{
      running_loss / (i + 1):.2f}',
            accuracy=f'{
      running_accuracy / (i + 1):.2f}'
        )
    pbar.close()
    model.train()
    return running_accuracy / len(dataloader)


def parse_args():
    """arguments"""
    config = {
    
        'data_dir': './Dataset',
        'save_path': 'model.ckpt',
        'batch_size': 32,
        'n_workers': 8,
        'valid_steps': 2000,
        'warmup_steps': 1000,
        'save_steps': 10000,
        'total_steps': 70000
    }
    return config


def main(data_dir, save_path, batch_size, n_workers, valid_steps, warmup_steps, total_steps, save_steps):
    """Main function"""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f'[Info]: Use {
      device} now!')

    train_loader, valid_loader, speaker_num = get_dataloader(data_dir, batch_size, n_workers)
    train_iterator = iter(train_loader)
    print(f"[Info]: Finish loading data!", flush=True)

    model = Classifier(n_spks=speaker_num).to(device)
    criterion = nn.CrossEntropyLoss()

    optimizer = AdamW(model.parameters(), lr=1e-3)
    scheduler = get_cosine_shedule_with_warmup(optimizer, warmup_steps, total_steps)
    print(f"[Info]: Finish creating model!", flush=True)

    best_accuracy = -1.0
    best_state_dict = None

    pbar = tqdm(total=valid_steps, ncols=0, desc='Train', unit=' step')

    for step in range(total_steps):
        # get data
        try:
            batch = next(train_iterator)
        except StopIteration:
            train_iterator = iter(train_loader)
            batch = next(train_iterator)

        loss, accuracy = model_fn(batch, model, criterion, device)
        batch_loss = loss.item()
        batch_accuracy = accuracy.item()

        # update the model
        loss.backward()
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()
        # Log
        pbar.update()
        pbar.set_postfix(
            loss=f"{
      batch_loss:.2f}",
            accuracy=f"{
      batch_accuracy:.2f}",
            step=step + 1,
        )
        # Do validation
        if (step + 1) % valid_steps == 0:
            pbar.close()

            valid_accuracy = valid(valid_loader, model, criterion, device)

            # keep the best model
            if valid_accuracy > best_accuracy:
                best_accuracy = valid_accuracy
                best_state_dict = model.state_dict()

            pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step")

        # Save the best model so far.
        if (step + 1) % save_steps == 0 and best_state_dict is not None:
            torch.save(best_state_dict, save_path)
            pbar.write(f"Step {
      step + 1}, best model saved. (accuracy={
      best_accuracy:.4f})")

    pbar.close()


class InferenceDataset(Dataset):
    def __init__(self, data_dir):
        testdata_path = Path(data_dir) / "testdata.json"
        metadata = json.load(testdata_path.open())
        self.data_dir = data_dir
        self.data = metadata['utterances']

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        utterance = self.data[index]
        feat_path = utterance['feature_path']
        mel = torch.load(os.path.join(self.data_dir, feat_path))
        return feat_path, mel


def inference_collate_batch(batch):
    """Collate a batch of data."""
    feat_paths, mels = zip(*batch)

    return feat_paths, torch.stack(mels)


def parse_args2():
    """test arguments"""
    config = {
    
        'data_dir': './Dataset',
        'model_path': './model.ckpt',
        'output_path': './output.csv'
    }
    return config

def main2(data_dir, model_path, output_path):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"[Info]: Use {
      device} now!")

    mapping_path = Path(data_dir) / "mapping.json"
    mapping = json.load(mapping_path.open())

    dataset = InferenceDataset(data_dir)
    dataloader = DataLoader(dataset, batch_size=1, shuffle=False, drop_last=False, num_workers=8, collate_fn=inference_collate_batch)
    speaker_num = len(mapping["id2speaker"])
    model = Classifier(n_spks=speaker_num).to(device)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    print(f"[Info]: Finish creating model!", flush=True)
    results = [["Id", "Category"]]
    for feat_paths, mels in tqdm(dataloader):
        with torch.no_grad():
            mels = mels.to(device)
            outs = model(mels)
            preds = outs.argmax(1).cpu().numpy()
            for feat_path, pred in zip(feat_paths, preds):
                results.append([feat_path, mapping["id2speaker"][str(pred)]])

    with open(output_path, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerows(results)


if __name__ == '__main__':
    main(**parse_args())
    main2(**parse_args2())
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_43319080/article/details/120256218

智能推荐

攻防世界_难度8_happy_puzzle_攻防世界困难模式攻略图文-程序员宅基地

文章浏览阅读645次。这个肯定是末尾的IDAT了,因为IDAT必须要满了才会开始一下个IDAT,这个明显就是末尾的IDAT了。,对应下面的create_head()代码。,对应下面的create_tail()代码。不要考虑爆破,我已经试了一下,太多情况了。题目来源:UNCTF。_攻防世界困难模式攻略图文

达梦数据库的导出(备份)、导入_达梦数据库导入导出-程序员宅基地

文章浏览阅读2.9k次,点赞3次,收藏10次。偶尔会用到,记录、分享。1. 数据库导出1.1 切换到dmdba用户su - dmdba1.2 进入达梦数据库安装路径的bin目录,执行导库操作  导出语句:./dexp cwy_init/[email protected]:5236 file=cwy_init.dmp log=cwy_init_exp.log 注释:   cwy_init/init_123..._达梦数据库导入导出

js引入kindeditor富文本编辑器的使用_kindeditor.js-程序员宅基地

文章浏览阅读1.9k次。1. 在官网上下载KindEditor文件,可以删掉不需要要到的jsp,asp,asp.net和php文件夹。接着把文件夹放到项目文件目录下。2. 修改html文件,在页面引入js文件:<script type="text/javascript" src="./kindeditor/kindeditor-all.js"></script><script type="text/javascript" src="./kindeditor/lang/zh-CN.js"_kindeditor.js

STM32学习过程记录11——基于STM32G431CBU6硬件SPI+DMA的高效WS2812B控制方法-程序员宅基地

文章浏览阅读2.3k次,点赞6次,收藏14次。SPI的详情简介不必赘述。假设我们通过SPI发送0xAA,我们的数据线就会变为10101010,通过修改不同的内容,即可修改SPI中0和1的持续时间。比如0xF0即为前半周期为高电平,后半周期为低电平的状态。在SPI的通信模式中,CPHA配置会影响该实验,下图展示了不同采样位置的SPI时序图[1]。CPOL = 0,CPHA = 1:CLK空闲状态 = 低电平,数据在下降沿采样,并在上升沿移出CPOL = 0,CPHA = 0:CLK空闲状态 = 低电平,数据在上升沿采样,并在下降沿移出。_stm32g431cbu6

计算机网络-数据链路层_接收方收到链路层数据后,使用crc检验后,余数为0,说明链路层的传输时可靠传输-程序员宅基地

文章浏览阅读1.2k次,点赞2次,收藏8次。数据链路层习题自测问题1.数据链路(即逻辑链路)与链路(即物理链路)有何区别?“电路接通了”与”数据链路接通了”的区别何在?2.数据链路层中的链路控制包括哪些功能?试讨论数据链路层做成可靠的链路层有哪些优点和缺点。3.网络适配器的作用是什么?网络适配器工作在哪一层?4.数据链路层的三个基本问题(帧定界、透明传输和差错检测)为什么都必须加以解决?5.如果在数据链路层不进行帧定界,会发生什么问题?6.PPP协议的主要特点是什么?为什么PPP不使用帧的编号?PPP适用于什么情况?为什么PPP协议不_接收方收到链路层数据后,使用crc检验后,余数为0,说明链路层的传输时可靠传输

软件测试工程师移民加拿大_无证移民,未受过软件工程师的教育(第1部分)-程序员宅基地

文章浏览阅读587次。软件测试工程师移民加拿大 无证移民,未受过软件工程师的教育(第1部分) (Undocumented Immigrant With No Education to Software Engineer(Part 1))Before I start, I want you to please bear with me on the way I write, I have very little gen...

随便推点

Thinkpad X250 secure boot failed 启动失败问题解决_安装完系统提示secureboot failure-程序员宅基地

文章浏览阅读304次。Thinkpad X250笔记本电脑,装的是FreeBSD,进入BIOS修改虚拟化配置(其后可能是误设置了安全开机),保存退出后系统无法启动,显示:secure boot failed ,把自己惊出一身冷汗,因为这台笔记本刚好还没开始做备份.....根据错误提示,到bios里面去找相关配置,在Security里面找到了Secure Boot选项,发现果然被设置为Enabled,将其修改为Disabled ,再开机,终于正常启动了。_安装完系统提示secureboot failure

C++如何做字符串分割(5种方法)_c++ 字符串分割-程序员宅基地

文章浏览阅读10w+次,点赞93次,收藏352次。1、用strtok函数进行字符串分割原型: char *strtok(char *str, const char *delim);功能:分解字符串为一组字符串。参数说明:str为要分解的字符串,delim为分隔符字符串。返回值:从str开头开始的一个个被分割的串。当没有被分割的串时则返回NULL。其它:strtok函数线程不安全,可以使用strtok_r替代。示例://借助strtok实现split#include <string.h>#include <stdio.h&_c++ 字符串分割

2013第四届蓝桥杯 C/C++本科A组 真题答案解析_2013年第四届c a组蓝桥杯省赛真题解答-程序员宅基地

文章浏览阅读2.3k次。1 .高斯日记 大数学家高斯有个好习惯:无论如何都要记日记。他的日记有个与众不同的地方,他从不注明年月日,而是用一个整数代替,比如:4210后来人们知道,那个整数就是日期,它表示那一天是高斯出生后的第几天。这或许也是个好习惯,它时时刻刻提醒着主人:日子又过去一天,还有多少时光可以用于浪费呢?高斯出生于:1777年4月30日。在高斯发现的一个重要定理的日记_2013年第四届c a组蓝桥杯省赛真题解答

基于供需算法优化的核极限学习机(KELM)分类算法-程序员宅基地

文章浏览阅读851次,点赞17次,收藏22次。摘要:本文利用供需算法对核极限学习机(KELM)进行优化,并用于分类。

metasploitable2渗透测试_metasploitable2怎么进入-程序员宅基地

文章浏览阅读1.1k次。一、系统弱密码登录1、在kali上执行命令行telnet 192.168.26.1292、Login和password都输入msfadmin3、登录成功,进入系统4、测试如下:二、MySQL弱密码登录:1、在kali上执行mysql –h 192.168.26.129 –u root2、登录成功,进入MySQL系统3、测试效果:三、PostgreSQL弱密码登录1、在Kali上执行psql -h 192.168.26.129 –U post..._metasploitable2怎么进入

Python学习之路:从入门到精通的指南_python人工智能开发从入门到精通pdf-程序员宅基地

文章浏览阅读257次。本文将为初学者提供Python学习的详细指南,从Python的历史、基础语法和数据类型到面向对象编程、模块和库的使用。通过本文,您将能够掌握Python编程的核心概念,为今后的编程学习和实践打下坚实基础。_python人工智能开发从入门到精通pdf

推荐文章

热门文章

相关标签