背景
只要存在数据库,就会有后台批量处理数据的需求,比如数据表备份、定期清理、数据替换、数据迁移,对于批量处理来说,往往会涉及大量的查询、过滤、归类、聚合计算,在批量脚本中直接查询数据库往往性能太低,甚至会因为一个大型的SQL导致数据库锁表出现线上事故,因此一般采用先导出到文件,在文件上计算然后再导入,比如:
1、使用mysql -e “select * from table” > output.txt的方式,执行SQL,将结果导出到文件中;
2、针对文件,使用各种方式进行聚合、过滤、替换等计算,最后产出成需要使用的格式;
3、发布产出的文件,或者使用load data命令导入到数据库;
由于只是一次性的批量查询数据库导出到文件,然后针对文件进行计算,而不是每次都查询数据库,大量节省了网络的IO耗费,从而提升处理的速度。
然而得到了导出的文件之后,如果文件过大,或者计算逻辑复杂比如大量的调用了耗费CPU的正则匹配、聚合计算,那么单线程的处理会耗费大量的时间,这时候就可以引入并发处理,使得机器的CPU、内存、IO、网络等资源全部充分利用起来,大幅度降低处理时间。
引入多线程,拆分输入文件为多个,每个小文件启动一个处理线程
HADOOP的MAP-REDUCE的做法,是先将文件split成小分片文件,然后针对每个分片做计算,最后将每个分片的结果聚合在一起,然而由于HADOOP的调度、集群稳定性等各种原因,对于MB大小级别的文件处理,会发现速度非常慢,有时候甚至比单机单线程处理速度还慢,将单机单线程改成多线程,往往会发现令人惊讶的效果提升。
直观的做法,是使用主线程读取输入的单个大文件,然后将读取的结果分配给子线程处理,然后主线程做整合,这种方式因为多线程共用了单个文件的IO,需要加入对文件的同步机制,最后会发现性能瓶颈在这单个文件的读取同步之上。
可以将大文件分片成小文件,然后每个文件分配给单个线程单独处理,避免线程间的资源同步,每个线程会享用单独的CPU核、内存单元、文件句柄,处理速度能达到最快。
使用这种方式,可以用以下的步骤进行:
1、使用SHELL,将输入文件拆分成预定线程数目的份数,存放到一个目录中;
2、以输入文件的目录路径作为参数,编程语言JAVA/PYTHON读取该目录的所有文件,对于每个文件启动一个处理线程,进行处理;
3、SHELL将输出目录的所有文件,使用cat file* > output_file的方式,得到最终的计算结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
#
# 将输入文件拆分成多个小文件,启动多线程进行处理,输出结果文件
#
function
run_multi_task
(
)
{
# 开启多个异步线程
SPLITS_COUNT
=
20
# 输入文件总数
source_file_lines_count
=
`
cat
$
{
input_file
}
|
wc
-
l
`
# 计算出拆分的文件个数
split_file_lines_count
=
$
(
(
$source_file_lines_count
/
$SPLITS_COUNT
)
)
# 进行文件拆分
split
-
l
$split_file_lines_count
-
a
3
-
d
$
{
input_file
}
$
{
input_dir
}
/
inputFile_
# 执行JAVA程序
$JAVA_CMD
-
classpath
$jar_path
"net.crazyant.BackTaskMain"
"${input_dir}"
"${output_dir}"
"${output_err_dir}"
# 合并文件
cat
$
{
output_dir
}
/
*
>
$
{
output_file
}
}
run_multi_task
|
这里注意,拆分文件的时候,不能使用split按照大小进行拆分,因为会把输入文件中的行截断;
对应的JAVA程序,则是通过读取文件夹中文件列表的方法,每个文件单独启动一个线程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
public
class
BackTaskMain
{
public
static
void
main
(
String
[
]
args
)
{
String
inputDataDir
=
args
[
1
]
;
String
outputDataDir
=
args
[
2
]
;
String
errDataDir
=
args
[
3
]
;
File
inputDir
=
new
File
(
inputDataDir
)
;
File
[
]
inputFiles
=
inputDir
.
listFiles
(
)
;
// 记录开启的线程
List
<Thread>
threads
=
new
ArrayList
<Thread>
(
)
;
for
(
File
inputFile
:
inputFiles
)
{
if
(
inputFile
.
getName
(
)
.
equals
(
"."
)
||
inputFile
.
getName
(
)
.
equals
(
".."
)
)
{
continue
;
}
// 针对每个inputFile,生成对应的outputFile和errFile
String
outputSrcLiceFpath
=
outputDataDir
+
"/"
+
inputFile
.
getName
(
)
+
".out"
;
String
errorOutputFpath
=
errDataDir
+
"/"
+
inputFile
.
getName
(
)
+
".err"
;
// 创建Runnable
BackRzInterface
backRzInterface
=
new
BackRzInterface
(
)
;
backRzInterface
.
setInputFilePath
(
inputFile
.
getAbsolutePath
(
)
)
;
backRzInterface
.
setOutputFilePath
(
outputSrcLiceFpath
)
;
backRzInterface
.
setErrorOutputFpath
(
errorOutputFpath
)
;
// 创建Thread,启动线程
Thread
singleRunThread
=
new
Thread
(
backRzInterface
)
;
threads
.
add
(
singleRunThread
)
;
singleRunThread
.
start
(
)
;
}
for
(
Thread
thread
:
threads
)
{
try
{
// 使用thread.join(),等待所有的线程执行完毕
thread
.
join
(
)
;
System
.
out
.
println
(
thread
.
getName
(
)
+
" has over"
)
;
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
(
)
;
}
}
System
.
out
.
println
(
"proccess all over"
)
;
}
}
|
通过这种方式,将大文件拆分成小文件,启动多个线程,每个线程处理一个小文件,最终将每个小文件的结果聚合,就得到了最终产出,性能上却大幅提升。
若有依赖的资源,可以按线程先复制、拆分、克隆,防止依赖的资源成为性能瓶颈
在上面的代码中,BackRzInterface是每个线程启动时要使用的Runnable对象,可以看到用的是每次new的方式创建:
// 创建Runnable
BackRzInterface backRzInterface = new BackRzInterface();
这样每个处理线程依赖的BackRzInterface都是独立的,对这个Runnable代码的使用不会存在同步问题。
如果多线程处理中需要使用外部资源,最好想办法使得每个线程单独使用自己独占的资源,相互之间若不会存在冲突,可以实现最大化并发处理。
其他一些例子,比如:
- 多线程用到了字典文件,那么方法是首先将字典文件复制多份,每个线程使用自己独占的字典,避免并发同步访问字典;
- 多线程若需要统一ID发号,可以提前计算出每个输入文件的行数,然后依次生成第一个线程需要的ID范围、第二个线程需要的ID范围,这些不同的ID范围也可以分别生成不同的文件,这样每个线程会使用各自独立的ID资源,避免了多个线程单时刻访问单个ID发号服务,使得发号成为性能瓶颈的可能;
- 多线程如果依赖相同的Service,如果可以每次new对象就每次new,如果Bean都是在Spring中管理,则将Service加上@Scope(“prototype”),或者将对象每次clone一下得到一个新对象,保证最终每个线程使用自己独占的对象。
- 尽量使用函数式编程的思想,每个函数都不要产生副作用,不要修改入参,结果只能通过return返回,避免增加代码同步冲突的可能;
通过以上这些类似的方法,每次将可能需要同步访问的共享资源,通过复制、分片等手段得到不同份,每个线程单独访问自己那一份,避免同步访问,最终实现性能最优。
避免同步的终极方法:使用多进程进行实现资源隔离
如果将文件拆分成了多份,依赖的ID、词典等资源也相应提供了多份,但是发现代码中存在无法解决的代码级别同步,该怎么办呢?
相对于想尽办法解决代码中的同步问题来说,多线程和多进程之间的性能差别微乎其微,我们都知道线程会使用进程的资源,所以导致了线程之间存在竞争进程资源,但是对于进程来说,CPU、内存等硬件资源是完全隔离的,这时候将程序运行在多进程而不是多线程,反而能更好的提升性能。
对于一些支持多线程不好的语言,比如PHP,直接用这种多进程计算的方法,速度并不比支持多线程的JAVA、PYTHON语言差:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
# 要拆分的文件数,也就是要启动的多进程数
SPLITS_COUNT
=
20
input_splits_dir
=
"${input_dir}_splits"
output_splits_dir
=
"${output_dir}_splits"
# 输入文件行数
source_file_lines_count
=
`
cat
$
{
input_file
}
|
wc
-
l
`
# 每个文件拆分的行数=总行数除以要拆分的文件个数(也就是对应进程的个数)
split_file_lines_count
=
$
(
(
$source_file_lines_count
/
$
{
SPLITS_COUNT
}
)
)
# 执行拆分,注意这里使用-l进行行级别拆分更好
split
-
l
$split_file_lines_count
-
a
3
-
d
$
{
input_file
}
$
{
input_splits_dir
}
/
inputfile_
process_idx
=
1
for
fname
in
$
(
ls
$
{
input_splits_dir
}
)
;
do
input_fpath
=
$
{
input_splits_dir
}
/
$fname
ouput_fpath
=
$
{
output_splits_dir
}
/
$fname
# 后台执行所有进程
php
"/php/main.php"
"${input_fpath}"
"${ouput_fpath}"
&
(
(
process_idx
++
)
)
done
# 等待所有后台进程执行结束
wait
# 合并文件
cat
$output_splits_dir
/
*
>
$
{
output_file
}
|
上述代码中,使用shell的&符号,可以在后台同时启动多个进程,使用wait语法,可以实现多线程的Thread.join特性,等待所有的进程执行结束。
总结
对于输入文件的大小、计算的复杂度处于单机和集群计算之间的数据处理,使用并发处理最为合适,但是并发的同步处理却会降低多线程的性能,这时可以借助于输入文件复制拆分、依赖资源复制拆分切片等方法,实现每个线程处理自己的独占资源,从而最大化提升计算速度。而对于一些无法避免的代码同步冲突逻辑,可以退化为多进程处理数据,借助于SHELL的后台进程支持,实现进程级别的资源独占,最终大幅提升处理性能。