论文链接:https://arxiv.org/abs/1804.10070
论文大意:
本文讨论的是声音事件的追踪问题(sound event detection)。其背景是给定一段音频,判断这段音频的类别。下面以URBANSED 数据集(http://urbansed.weebly.com/)为例进行介绍。(模型使用的也是这个数据)
给定一段音频,这段音频中可能发生下述声音事件中的一个——air_conditioner, car_horn, children_playing, dog_bark, drilling, engine_idling, gun_shot, jackhammer, siren, street_music。判断其属于的类别。
一种直观的感觉是把这个问题看成简单的分类问题,音频进行采样后生成相应的时间序列,直接对生成的序列进行分类。这种观点对于统计特征具有很强区分度的情况是可以的,但对于上述情况我们需要换一个观点来看。
在我们进行采样的过程中,具备分类属性的特征的发生可能仅仅是一瞬——如一段音频长8s,这段音频是记录狗叫(dog bark)的音频,而狗叫在整个音频的持续时间仅有不到2s,这时dog bark的统计特征就被淹没在“杂波”的汪洋大海中了。这实际对应到图像中就是缺少图像的Crop过程。但有些数据就是这样的,精确地detection来进行标签化的代价太大,只知道统计特征在里面,而并不知道具体是那一段为“精确”的统计特征,这一类统计数据称为弱标签(weak label)数据。
一般的,解决弱标签问题对应的机器学习方法可以考虑多示例学习(Multiple instance learning),我们以上述分类问题为例讨论该学习方法。
既然我们无法在整个特征维度detect到显著性强的统计特征就可以考虑在小的区间段来抽象特征(这里是小的音频段,图像则对应一些grid片段),可以考虑在分段抽象特征后进行“聚合”(aggregation),假定我们分段提取的特征是有效的,则下一个问题就是选取一个有效的聚合手段。映射到分类问题,称截取的每一段音频为一个示例,对于每一个示例有一个分类分布判定(如softmax),寻找一种“加权”方案得到整个音频段的分布,如果做过文本,一种直观的方法会想到attention。这种想法相当于对于不同的实例再做一个softmax加权,见下图:
上图有三种聚合方式,直观上可以窥见其差异,max的方法只用到某个示例的特征,mean兼具多示例特征,softmax以置信度的方式兼具多示例特征。并不能说哪一个一定比另一个好,这好比充分统计量与有效统计量的区别(就看你如何看待方差了,这里max可以看作有效、后两者与充分相关)。机器学习的观点仍是数据分布决定方差,而不是模型。
本文更多的是面向充分性的,考虑充分统计量数据分布的观点是有效示例较多次地出现(为了更多地使得单个示例成为一个置信度高的示例会考虑更密集的采样,故多次出现),这种情形的难度在于直接使用max的观点效果可能不如后两者,但是我们希望出现的情况是使用后两者的方式能够找到一种聚合方式使得统计量能够达到max的效果。(有效的效果)
由于mean可以看成softmax的一个特例,所以我们后来仅讨论softmax。
作者要说明的就是对于多示例学习问题,softmax这种充分统计量在一些情形下并不是有效统计量的问题,并给出一个充分有效统计量。
下面是softmax估计的公式:
下面作者对于其在示例个数过多的情况下不是有效的进行了证明:
利用如下性质:
该性质要确定softmax运算的界,只要利用exp的单调性立即得到。
a,b 作用于0,1区间得到如下表示:
由于这个softmax加权是一个有界量,当每一个待加权示例概率比较小时限制了其有效性。
学过数学分析的人都知道,当研究区间收敛性质时,有一种方法是用闭区间的收敛性来逼近开区间的收敛性,即所谓内闭一致收敛。作者也要考虑的是用一种变换来拓广上面的dominant,寻找一个内闭dominant
故作者提出如下加权方案:(Auto-pooling)
类似与Lp空间对l2空间的范数推广,作者推广了softmax。其中alpha为待学习参数。alpha=0对应mean, alpha=1对应softmax, alpha -> 正无穷对应max。
这样定义的Auto-pooling应用上面性质有如下不等式关系:
当alpha->负无穷时就得到了(0,1)开区间的一个内闭dominant。为加权后统计量的有效性提供了必要条件。
在很多时候我们考虑神经网络往往是从必要性的方向来考虑问题的,这个不等式也不例外,我们的模型设计满足的都是必要性,而优化方法(如梯度下降)给我们提供了充分性,我们是黑箱,因为我们只提供必要性;而概率、数学的观点更多的往往是充分性,是从问题到结果的思维方式,神经网络倒向思考问题,我们构建特征网络使得token满足必要性、使得特征点满足必要性。
在应用上会考虑到控制Auto-pooling的加权的下界(或多或少是对于充分性的保证),即
这些会对参数学习加一些限制。类似的,还有alpha的正则化问题,具体参见原文。
上面的内容都是围绕聚合方法进行的,对于网络构建与特征抽取可以用下图概括:
在实现时会对音频进行采样与变换,抽象成图片后放入神经网络,具体实现见下面代码。
下面对于URBANSED 数据集,这里使用的是UrbanSound8K数据集给出实验过程,实验分两步进行,首先不使用多示例的方法看效果如何,之后再给出mean 及Auto-pooling两种结果
数据集链接:https://urbansounddataset.weebly.com/urbansound8k.html
下载解压数据集,并使用如下数据处理:
import pylab
import librosa
import librosa.display
import numpy as np
import pandas as pd
import glob
from functools import reduce
import pickle
from sklearn.model_selection import train_test_split
from collections import defaultdict
from PIL import Image
from random import choice
from copy import deepcopy
from joblib import Parallel, delayed
from sklearn.utils import shuffle
import pause
# clip every wav file to length of 0.5s ndarray list without padding
# and save pics
def serlize_single_wav(file_path, min_time_duration = 8.0):
sig, fs = librosa.load(file_path, sr=int(44.1 * 1000))
time_duration = librosa.core.get_duration(sig)
print("time duration : {} s".format(time_duration))
if time_duration < min_time_duration:
return
split_length = int(fs / 3.99)
for i in range(int(len(sig) / split_length) + 1):
if (i+1) * split_length <= len(sig):
sub_sig = sig[i * split_length: (i+1) * split_length]
file_tail_format = file_path.split("\\")[-1].split(".")[0] + "_{}.png"
# make pictures name
save_path = r'C:\Coding\Python\UrbanSound8K.tar\UrbanSound8K\audio_pics_32' + "\\" + \
"{}".format(file_tail_format.format(i))
pylab.axis('off') # no axis
pylab.axes([0., 0., 1., 1.], frameon=False, xticks=[], yticks=[]) # Remove the white edge
S = librosa.feature.melspectrogram(y=sub_sig, sr=fs,
n_fft=2048, hop_length=1024, n_mels = 128)
librosa.display.specshow(librosa.power_to_db(S, ref=np.max),
y_axis="log")
pylab.savefig(save_path, bbox_inches=None, pad_inches=0)
pylab.close()
def process_all_audios():
Parallel(n_jobs=11)(delayed(serlize_single_wav)(file,) for f_idx, file in enumerate(filter(lambda f: f.endswith("wav"),reduce(lambda a, b: a + b,map(lambda x: list(glob.glob(r"C:\Coding\Python\UrbanSound8K.tar\UrbanSound8K\audio\fold{}\*".format(x))), range(10))))))
def csv_reader(num = None):
df = pd.DataFrame.from_csv(r"C:\Coding\Python\UrbanSound8K.tar\UrbanSound8K\metadata\UrbanSound8K.csv", encoding="utf-8",
header=0, index_col=None)
df["slice_file_name"] = df["slice_file_name"].apply(lambda x: x.replace(".wav", ""))
req_df = df[["classID", "slice_file_name"]]
if num:
req_df = req_df.head(num)
req_dict = req_df.to_dict()
req_dict["slice_file_name_list"] = dict()
for idx ,ele in req_dict["slice_file_name"].items():
glob_format = r"C:\Coding\Python\UrbanSound8K.tar\UrbanSound8K\audio_pics_32\{}*".format(ele)
req_dict["slice_file_name_list"][idx] = list(glob.glob(glob_format))
if idx % 1000 == 0:
print(idx)
with open("all_csv_pd_32.pkl", "wb") as f:
pickle.dump(pd.DataFrame.from_dict(req_dict), f)
# retrieve train dataloader and valid dataloader Simultaneously
def batch_data_loader(valid_size=0.2, batch_num = 4, m_size = 8):
with open(r"C:\Coding\Python\AutoPooling\data_preprocess\all_csv_pd_32.pkl", "rb") as f:
req_df = pickle.load(f)
values_ndarray = req_df.values
columns = req_df.columns
train_ndarray, valid_ndarray = train_test_split(values_ndarray, test_size=valid_size, random_state=0,
shuffle=True)
train_df = pd.DataFrame(train_ndarray, columns=columns)
valid_df = pd.DataFrame(valid_ndarray, columns=columns)
def data_generator(type = "train", batch_num = batch_num, m_size = m_size):
# m_size indicate bag size of Multiple instance learning
# so in this part it can be looked as batch_size
assert type in ["train", "valid"]
# flatten list pngs
def flatten_df_to_ndarray(input_df):
req_dict = defaultdict(list)
for idx, r in input_df.iterrows():
for png_path in r["slice_file_name_list"]:
req_dict["png_path"].append(png_path)
req_dict["classID"].append(r["classID"])
req_pd = pd.DataFrame.from_dict(req_dict)
return np.concatenate([np.array(req_pd["classID"].tolist())[:, np.newaxis],
np.array(req_pd["png_path"].tolist())[:, np.newaxis]], axis=-1)
def flatten_df(input_df):
req_dict = defaultdict(list)
for idx, r in input_df.iterrows():
if len(r["slice_file_name_list"]) >= m_size:
req_dict["slice_file_name_list"].append(r["slice_file_name_list"][:m_size])
req_dict["classID"].append(r["classID"])
req_df = pd.DataFrame.from_dict(req_dict)
return req_df
def read_img(path):
return np.array(Image.open(path))[...,:3]
req_df = flatten_df(train_df) if type == "train" else flatten_df(valid_df)
use_df = deepcopy(req_df)
height = 480
width = 640
img_bag = np.zeros(shape=[batch_num ,m_size, height, width, 3])
img_bag_label = np.zeros([batch_num]).astype(np.int32)
start_num = 0
while True:
for idx ,r in use_df.iterrows():
img_bag[start_num] = np.array(list(map(read_img, r["slice_file_name_list"])))
#print(img_bag[start_num].shape)
img_bag_label[start_num] = r["classID"]
start_num += 1
if start_num == batch_num:
img_bag, img_bag_label = shuffle(img_bag, img_bag_label)
yield (img_bag, img_bag_label)
img_bag = np.zeros(shape=[batch_num ,m_size, height, width, 3])
img_bag_label = np.zeros([batch_num]).astype(np.int32)
start_num = 0
use_df = deepcopy(req_df)
img_bag = np.zeros(shape=[batch_num ,m_size, height, width, 3])
img_bag_label = np.zeros([batch_num]).astype(np.int32)
start_num = 0
return (data_generator("train"), data_generator("valid"))
if __name__ == "__main__":
#process_all_audios()
#csv_reader()
pass
首先我们抽取的特征是面向长度为8s的音频的,少于8s会被过滤掉,对于每一个长度为8s的音频进行32份采样,相当于每一个样本有32个示例之后进网络。(其它采样参数大抵与论文要求相同)
可以看一个抽象出的单个示例波谱图片:(0.25s)
32个示例组合(8s)
下面给出网络模型:
import tensorflow as tf
import keras.backend as K
from functools import reduce
from data_preprocess.data_transformer_v0 import batch_data_loader
from sklearn.metrics import f1_score
import numpy as np
from autopool import AutoPool1D
import os
sess = tf.Session()
K.set_session(sess)
def conv2d(input, name = None, filters = 16, is_training = tf.constant(True),
keep_prob = tf.constant(1.0)):
with tf.variable_scope(name):
conv1_output = tf.layers.conv2d(input, filters=filters, strides=2, kernel_size=3,
name="conv_1_{}".format(name), padding="same",
activation=tf.nn.relu)
conv1_output = tf.layers.batch_normalization(conv1_output, training=is_training,
name="norm_1_{}".format(name))
conv2_output = tf.layers.conv2d(conv1_output, filters=filters, strides=2, kernel_size=3,
name="conv_2_{}".format(name), padding="same",
activation=tf.nn.relu)
conv2_output = tf.layers.batch_normalization(conv2_output, training=is_training,
name="norm_2_{}".format(name))
output = tf.layers.max_pooling2d(conv2_output, pool_size=2, strides=2,
name="max_pool_{}".format(name), padding="same")
output = tf.nn.dropout(output, keep_prob=keep_prob)
return output
class SoundEventClassifier(object):
def __init__(self, img_height = 480,
img_width = 640, num_channels = 3, class_num = 10,
batch_num = 4, m_size = 8, pool_name = "avg"):
assert pool_name in ["max", "avg", "auto", None]
self.class_num = class_num
self.batch_num = batch_num
self.m_size = m_size
self.pool_name = pool_name
self.img_input = tf.placeholder(tf.float32, shape=[None, m_size, img_height, img_width, num_channels])
self.img_label_input = tf.placeholder(tf.int32, shape=[None])
self.is_training = tf.placeholder(tf.bool, [])
self.keep_prob = tf.placeholder(tf.float32, [])
input = self.model_construct()
self.opt_construct(input)
def model_construct(self):
img_input = tf.reshape(self.img_input, [-1] + list(map(int ,self.img_input.get_shape()[2:])))
input = tf.layers.batch_normalization(img_input, training=self.is_training,
name="input_bn")
filters_list = [16, 32, 64, 128]
for i in range(4):
input = conv2d(input=input, filters=filters_list[i], name="layer_{}".format(i), is_training=self.is_training,
keep_prob = self.keep_prob)
input = tf.layers.conv2d(input, filters=256, strides=2, kernel_size=(8, 1),
name="before_output", padding="same",
activation=tf.nn.relu)
input = tf.nn.dropout(input, keep_prob=self.keep_prob)
input = tf.layers.batch_normalization(input, training=self.is_training,
name="norm_output")
flatten_input = tf.reshape(input, [-1, reduce(lambda a, b: a * b, map(int ,input.get_shape()[1:]))])
dense_before_agg = tf.layers.dense(flatten_input, units=self.class_num, name="dense_before_agg")
# [batch ,1, class_num]
dense_before_agg = tf.reshape(dense_before_agg, [-1, self.m_size, self.class_num])
if self.pool_name == "auto":
pooling_output = AutoPool1D(axis=1)(dense_before_agg)
elif self.pool_name == "avg":
pooling_output = tf.layers.average_pooling1d(dense_before_agg, pool_size=self.m_size, strides=1)
pooling_output = tf.squeeze(pooling_output, [1])
elif self.pool_name == "max":
pooling_output = tf.layers.max_pooling1d(dense_before_agg, pool_size=self.m_size, strides=1)
pooling_output = tf.squeeze(pooling_output, [1])
else:
dense_before_agg = tf.reshape(dense_before_agg, [-1, self.m_size * self.class_num])
pooling_output = tf.layers.dense(dense_before_agg, units=self.class_num, name="pooling_output")
return pooling_output
def opt_construct(self, input):
# input [1, class_num]
self.prediction = tf.cast(tf.arg_max(tf.nn.softmax(input, axis=-1), dimension=-1), tf.int32)
self.loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=input, labels=tf.one_hot(self.img_label_input,
depth=self.class_num)))
self.accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.squeeze(self.prediction), tf.squeeze(self.img_label_input)), tf.float32))
self.train_op = tf.train.AdamOptimizer(0.0001).minimize(self.loss)
@staticmethod
def train(sess):
batch_num = 3
m_size = 32
train_data_loader, valid_data_loader = batch_data_loader(batch_num=batch_num,
m_size = m_size)
ext = SoundEventClassifier(batch_num=batch_num, m_size=m_size, pool_name="avg")
print("model construct end")
saver = tf.train.Saver()
step = 0
epoch = 0
train_acc_list = []
valid_acc_list = []
valid_f1_list = []
with sess:
if os.path.exists(r"C:\Coding\Python\AutoPooling\model_v0_auto.meta"):
saver.restore(sess, save_path=r"C:\Coding\Python\AutoPooling\model_v0_auto")
print("restore exists")
else:
sess.run(tf.global_variables_initializer())
print("init new")
while True:
train_data = train_data_loader.__next__()
if train_data is None:
train_data_loader, _ = batch_data_loader(batch_num=batch_num,
m_size=m_size)
print("one epoch end")
epoch += 1
continue
img_bag, img_bag_label = train_data
_, loss, acc = sess.run([ext.train_op, ext.loss, ext.accuracy],
feed_dict={
ext.img_input: img_bag,
ext.img_label_input: img_bag_label,
ext.is_training: True,
ext.keep_prob: 0.5
})
train_acc_list.append(acc)
step += 1
if step % 5 == 0:
valid_data = valid_data_loader.__next__()
if valid_data is None:
_, valid_data_loader = batch_data_loader(batch_num=batch_num, m_size=m_size)
#print("valid data load end, re init")
continue
img_bag, img_bag_label = valid_data
loss, acc, pred = sess.run([ext.loss, ext.accuracy, ext.prediction],
feed_dict={
ext.img_input: img_bag,
ext.img_label_input: img_bag_label,
ext.is_training: True,
ext.keep_prob: 1.0
})
pred = np.squeeze(pred)
img_bag_label = np.squeeze(img_bag_label)
valid_acc_list.append(acc)
valid_f1_list.append(f1_score(y_true=img_bag_label, y_pred=pred, average="micro"))
if step % 50 == 0:
print("step : {} valid loss : {}".format(step,loss))
print("step : {} train acc : {}".format(step,np.mean(train_acc_list)))
print("step : {} valid acc : {}".format(step,np.mean(valid_acc_list)))
print("step : {} valid f1 : {}".format(step,np.mean(valid_f1_list)))
print("-" * 100)
train_acc_list = []
valid_acc_list = []
valid_f1_list = []
saver.save(sess, save_path=r"C:\Coding\Python\AutoPooling\model_v0_auto")
if __name__ == "__main__":
SoundEventClassifier.train(sess)
pass
通过配置参数pool_name可以指定使用的聚合方法。avg对应mean,None对应不使用多示例学习的方法。
多示例学习情况在autopooling 的f1 最高为0.733 (这个效果并不是太差,作者的结果最高大致0.76)
meanpooling 为0.6左右
非多示例(期望对训练集有过拟合效果,或者由于噪声大不收敛,事实效果也是不收敛的) f1最高不操过0.4,且在训练前阶段的收敛是很慢的。
充分性在文中两次出现,并非歧义。