先来说下这个分布式上传功能的背景
来公司之后很长一段时间都在优化改造审核系统,现在审核系统稳定了,而且也可以快速接需求了。这时候被借到学院这边来做一些优化改造,其中视频分布式上传这个功能存在很多问题,急需改造处理
学院视频上传功能简介
学院这边上传视频的大致步骤如下
- 老师在教师管理后台上传视频
- 教师管理后台再把视频上传到阿里云的点播平台
- 审核人员去审核上传的视频资源
- 将审核通过的视频进行转码处理
- 将转码后生成的视频加密的播放地址绑定到课时当中
- 学院通过加密的播放地址播放课程进行学习
因此上传视频的功能是非常重要的一环,它的服务对象是老师,在未改造之前的上传功能存在如下问题:
1. 由于历史原因,老的上传功能是 php 实现的、而现有的团队都是 java 程序员,这就导致出现问题无法进行维护
2. 老师上传的文件会出现花屏、丢帧等一些质量问题, 导致学院学生观看视频的时候客诉率很高
3. 老师上传的文件的原文件播放时长与学生看到播放时长存在不一致情况
4. 老师在管理后台上传的视频是成功状态,但是后端实际存储的状态是未上传成功的状态
5. 老师上传视频完成之后无法覆盖之前的视频
所以改造视频上传这块的功能就成了一个迫在眉睫的一个事情
改造上传功能的限制条件
- 前端限制条件
因为项目的历史背景原因很重,前端也缺乏人员支持,前期我们的打算是,前端代码尽量不动,只优化后端,等前端那边人员支持充足,再进一步优化,下面是先有前端进行分片上传的过程
前端会把一个大的视频文件,切分成一小片一小片的,然后挨个调用后端接口,将分片数据上传上去,如上图,假设我有一个测试视频文件,大小是 43760633 个字节,我把它拆分成每个分片为 5000000 个字节的字节块,总共会切分成 9块,先从块1开始上传,块1上传完成成功之后,再接着上传块2,直到最后一个块上传完成。
注: 这里的前端有个重要的约束条件,就是分片上传是一块一块上传的,且有先后顺序
- 后端限制条件
后端点播是采购的阿里云,因此得用阿里云的sdk 将视频上传到阿里云的点播平台
阿里云sdk 提供如下一些上传方法
- 提供本地视频文件
- 提供本地文件流或网络流
- 提供视频网络地址
下面是阿里云官网的上传例子
// 视频文件上传
// 视频标题(必选)
String title = "测试标题";
// 1.本地文件上传和文件流上传时,文件名称为上传文件绝对路径,如:/User/sample/文件名称.mp4 (必选)
// 2.网络流上传时,文件名称为源文件名,如文件名称.mp4(必选)。
// 3.流式上传时,文件名称为源文件名,如文件名称.mp4(必选)。
// 任何上传方式文件名必须包含扩展名
String fileName = "/Users/test/video/test.mp4";
// 本地文件上传
testUploadVideo(accessKeyId, accessKeySecret, title, fileName);
// 待上传视频的网络流地址
String url = "http://test.aliyun.com/video/test.mp4";
// 2.网络流上传
// 文件扩展名,当url中不包含扩展名时,需要设置该参数
String fileExtension = "mp4";
testUploadURLStream(accessKeyId, accessKeySecret, title, url, fileExtension);
// 3.文件流上传
testUploadFileStream(accessKeyId, accessKeySecret, title, fileName);
// 4.流式上传,如文件流和网络流
InputStream inputStream = null;
// 4.1 文件流
try {
inputStream = new FileInputStream(fileName);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
// 4.2 网络流
try {
inputStream = new URL(url).openStream();
} catch (IOException e) {
e.printStackTrace();
}
testUploadStream(accessKeyId, accessKeySecret, title, fileName, inputStream);
阿里云提供的 sdk 必需满足一个条件,就是上传的文件或者视频它是本地完整的文件或者流
这里是sdk使用方法:https://help.aliyun.com/document_detail/53406.html
在改造这个上传功能之前的几种实现方案
方案一,通过代理服务器,将同一个课时的视频定向传输到一台服务器
举个例子:
- 前端把视频文件分片上传到后端服务器
- nginx 会反向代理这个上传请求(我们可以在nginx 上面配置一些定向规则),比如我这里的视频对应的课时 id 为 589776 经过gninx 定向规则,定向到了服务 1 上面
- 因为每个分片数据上传的时候都会带上课时 id,所以所有的分片数据最终都会存储在服务 1上面,我们在服务1上面进行分片数据合并,然后发送到阿里云点播平台即可
- 这个方案的缺点:
1. 需要额外的反向代理服务配置
2. 无法进行动态扩容缩容,尤其不适用容器的部署方式
3. 每台服务器都需要分配足够的磁盘空间大小,需要更多的磁盘资源
4. 很容器出现服务器倾斜的问题
5. 业务处理服务与上传存储服务耦合
1. 业务处理需要把上传文件的中间态保存下来
2. 业务端上传成功,实际并未真正上传成功
6. 需要保留很多文件中间状态,代码实现很繁杂
7. 上传失败之后无法提前告知用户
- 优点
实现起来很直接,也很容易想到这种方案
因为方案 1缺点太多了, 我们就自然而然想到下面的方案二,它能解决方案 一中的大部分问题
方案二,每台服务服务都挂载一个共享磁盘
方案二的实现方式,比较简单,就是每个服务挂载一个共享磁盘,所有上传的分片文件都存储到这个共享磁盘当中,当所有分片上传完成之后,组合共享文件磁盘当中的分片为一个完整的文件,再上传到阿里云的点播平台
方案二解决了方案一中 1、2、3、4 点的问题
- 但方案二也有如下缺点
1. 需要配置额外的共享磁盘
2. 无论业务方分片上传,还是将文件发送到点播云,都需要经过从共享磁盘到业务服务器之间的网络传输,上传性能更差,更多的io意味着更多的不稳定性
3. 业务处理服务与上传存储服务耦合
1. 业务处理需要把上传文件的中间态保存下来
2. 业务端上传成功,实际并未真正上传成功
4. 也需要保留很多文件的中间状态,代码实现也会相对复杂
5. 上传失败之后无法提前告知用户
- 它的优点
实现相对更简单一些,因为少 nginx 配置, 所以更加容易扩展
但是方案二依然存在方案1中的第5,6,7点的问题
于是我们就想到了下面的方案三,方案三的话就是我们实现一个代理的点播平台,这个代理的点播平台无论是我们后面实现自己的点播平台,还是采购阿里云的点播平台或者百度的,对于我们业务方来说,无需关心,代理的点播平台支持接收分片数据,同时由业务方来告诉代理点播平台哪些分片数据是一个完整的文件,代理点播平台收到组装文件请求之后,完成后续一些列的操作处理
注意:
- 分片处理服务,需要把所有的分片信息,存储起来
- 分片数据上传是否完整,由业务方告知,通过校验分片数据确认无误,再由代理点播平台上传至阿里云点播平台
- 如果要切换别的点播平台或者说自己实现一个点播平台,那么业务方那边不用做任何修改
- 它的优点很明显
1. 解决了方案1,2中的所有问题
2. 扩展性非常好,后期切换到其他云点播服务平台会很简单
3. 业务方无需再关系上传等问题,由代理点播平台来进行保障处理,业务方只需要进行分片转发即可,业务方代码会非常简单
- 但它也有缺点
1 需要实现独立的代理服务,实现成本相对比较高
2 工期时间上来不急,因为改造这个上传功能迫在眉睫
3 需要额外的服务器成本,因为要独立出来服务,就不能跟业务方的服务器放到一起了
有了方案三之后,我们找到了一条明确的道路,也就说尽可能让业务方实现分片上传简单,最好就是业务方只做分片转发处理,通知合并分片文件即可,于是就想到了方案四
方案4,扩展阿里云点播平台自带的 sdk,使其支持分布式分片上传
有了方案3之后,就会想到为啥要自己实现一个代理的点播服务了,如果说阿里云点播平台自身支持按分片上传,那是不是可以改造和扩展 api,来实现分布式分片上传
通过查看其官方的sdk 例子,发现两个重点的配置参数
private static void testUploadVideo(String accessKeyId, String accessKeySecret, String title, String fileName) {
UploadVideoRequest request = new UploadVideoRequest(accessKeyId, accessKeySecret, title, fileName);
/* 重点:可指定分片上传时每个分片的大小,默认为2M字节 */
request.setPartSize(2 * 1024 * 1024L);
/* 重点:可指定分片上传时的并发线程数,默认为1,(注:该配置会占用服务器CPU资源,需根据服务器情况指定)*/
request.setTaskNum(1); //
UploadVideoImpl uploader = new UploadVideoImpl();
UploadVideoResponse response = uploader.uploadVideo(request);
System.out.print("RequestId=" + response.getRequestId() + "\n"); //请求视频点播服务的请求ID
if (response.isSuccess()) {
System.out.print("VideoId=" + response.getVideoId() + "\n");
}
...
}
重点参数:
-
partSize: 分片大小,这不就是前端分片后的分片大小
-
taskNum: 一个线程其实可以想象为一个后台服务
那我们可以来分析下阿里云的分片上传 sdk 是如何做的,这样就可以对它进行一些扩展了
下面阿里云分片上传的一个简化流程,这里忽略了一些 checkpoint 等的一些处理,主要关注主流程
下面是分析源代码的过程
- 第一步通过请求参数配置,获取点播平台上传 token
try {
DefaultAcsClient vodClient = this.initVodClient(request.getApiRegionId(), request.getAccessKeyId(), request.getAccessKeySecret(), request.getSecurityToken());
createUploadVideoResponse = (CreateUploadVideoResponse)vodClient.getAcsResponse(createUploadVideoRequest);
} catch (ClientException var14) {
response.setCode(var14.getErrCode());
response.setMessage(var14.getErrMsg());
response.setRequestId(createUploadVideoResponse.getRequestId());
return;
}
首先通过 ak, sk 等阿里云配置信息,获取到上传文件时会用到的 token 信息
- 第二步 根据获得的 token 生成 ossClient
ossClient = this.initOSSClient(uploadTokenDTO.getEndpoint(), uploadTokenDTO.getAccessKeyId(), uploadTokenDTO.getAccessKeySecret(), uploadTokenDTO.getSecurityToken(), request.getCrcCheckEnabled(), request.getOssConfig());
- 第三步 将要上传的文件,按设置的分片大小进行分片处理
private ArrayList<OSSUploadOperation.UploadPart> splitFile(long fileSize, long partSize) {
ArrayList<OSSUploadOperation.UploadPart> parts = new ArrayList();
long partNum = fileSize / partSize;
if (partNum >= 10000L) {
partSize = fileSize / 9999L;
partNum = fileSize / partSize;
}
for(long i = 0L; i < partNum; ++i) {
OSSUploadOperation.UploadPart part = new OSSUploadOperation.UploadPart();
part.number = (int)(i + 1L);
part.offset = i * partSize;
part.size = partSize;
part.isCompleted = false;
parts.add(part);
}
if (fileSize % partSize > 0L) {
OSSUploadOperation.UploadPart part = new OSSUploadOperation.UploadPart();
part.number = parts.size() + 1;
part.offset = (long)parts.size() * partSize;
part.size = fileSize % partSize;
part.isCompleted = false;
parts.add(part);
}
return parts;
}
- 第四步 获取一个 uploadId(每个上传的视频文件都有一个唯一区分 uploadId)
InitiateMultipartUploadRequest initiateUploadRequest = new InitiateMultipartUploadRequest(uploadFileRequest.getBucketName(), uploadFileRequest.getKey(), metadata);
InitiateMultipartUploadResult initiateUploadResult = this.multipartOperation.initiateMultipartUpload(initiateUploadRequest);
- 第五步 通过配置的线程数,采用线程池的方式上传切好的分片数据, 并获得每个分片上传的结果
private ArrayList<OSSUploadOperation.PartResult> upload(OSSUploadOperation.UploadCheckPoint uploadCheckPoint, VoDUploadFileRequest uploadFileRequest) throws Throwable {
ArrayList<OSSUploadOperation.PartResult> taskResults = new ArrayList();
ExecutorService service = Executors.newFixedThreadPool(uploadFileRequest.getTaskNum());
ArrayList<Future<OSSUploadOperation.PartResult>> futures = new ArrayList();
ProgressListener listener = uploadFileRequest.getProgressListener();
long contentLength = 0L;
int i;
for(i = 0; i < uploadCheckPoint.uploadParts.size(); ++i) {
if (!((OSSUploadOperation.UploadPart)uploadCheckPoint.uploadParts.get(i)).isCompleted) {
contentLength += ((OSSUploadOperation.UploadPart)uploadCheckPoint.uploadParts.get(i)).size;
}
}
...
for(i = 0; i < uploadCheckPoint.uploadParts.size(); ++i) {
if (!((OSSUploadOperation.UploadPart)uploadCheckPoint.uploadParts.get(i)).isCompleted) {
futures.add(service.submit(new OSSUploadOperation.Task(i, "upload-" + i, uploadCheckPoint, i, uploadFileRequest, this.multipartOperation, listener)));
...
taskResults.add(new OSSUploadOperation.PartResult(i + 1, ((OSSUploadOperation.UploadPart)uploadCheckPoint.uploadParts.get(i)).offset, ((OSSUploadOperation.UploadPart)uploadCheckPoint.uploadParts.get(i)).size));
}
}
...
return taskResults;
}
}
有了前面建立 ossClient 连接以及 uploadId, 就可以上传分片数据了, 由于每个分片有分片编号,因此可以用多线程的方式进行分片上传,加快上传速度
- 第六步 发送阿里云文件上传完成请求,进行分片合并生成完整文件(需要将之前所有分片的上传结果,都发给阿里云, 以此来确定上传文件的完整性)
private CompleteMultipartUploadResult complete(OSSUploadOperation.UploadCheckPoint uploadCheckPoint, UploadFileRequest uploadFileRequest) {
Collections.sort(uploadCheckPoint.partETags, new Comparator<PartETag>() {
public int compare(PartETag p1, PartETag p2) {
return p1.getPartNumber() - p2.getPartNumber();
}
});
CompleteMultipartUploadRequest completeUploadRequest = new CompleteMultipartUploadRequest(uploadFileRequest.getBucketName(), uploadFileRequest.getKey(), uploadCheckPoint.uploadID, uploadCheckPoint.partETags);
...
return this.multipartOperation.completeMultipartUpload(completeUploadRequest);
}
有了前面分析阿里云分片上传的流程,我们就可以有下面的方案四
方案4 扩展现有点播平台 sdk,实现分布式分片上传
教师管理后端,将分片数据通过扩展的 sdk 转发到阿里云点播平台, 最后所有分片上传完成之后,告诉阿里云点播平台进行分片合并处理,分片合并完成将上传成功的消息通过 mq 发送出去
然后经过不同的消费组去做不同的处理,比如业务方需要根据上传的信息去设置上传 video id, 生成转码,加密播放链接等,而审核系统负责将上传的视频数据推送给审核人员进行审核处理
由于之前在分析阿里云 sdk 时,发现阿里云都是在本地处理,但是我们的服务是多个的,这时候需要考虑到上传的性能问题,比如 ossClient 已经创建成功,那么一个服务的同一个视频的不同分片就不需要再进行重复建立 oss 链接了,我们可以把建立的链接缓存起来,其他的 uploadId, token 等信息,可以供多个服务共享,因此可以把它们缓存到 redis 中
下面是整个分片上传的过程图
需要注意的点
- 获取的 token 有过期时间,默认是 1 个小时,因此在缓存 token 的时候,会缓存一个token 生成的时间,这样可以提前预判 token 是否过期
- 除 ossClient 缓存本地内存 , token数据、upload_id、分片上传结果都需要缓存到 redis 中
- 在出现异常情况时,要考虑 ossClient 等缓存数据的清理
- 另外我们需要采用 lru 缓存淘汰策略,当连接池里拿不到 ossClient 时,能够剔除掉之前不再用到的 ossClient 连接, ossClient 的链接池的线程数是有限制的,默认是 1024,所以缓存 ossClient 的大小不要超过这个数量
方案4的缺点:
- 只适用于阿里云点播平台,如果要改为其他平台,需要重写,但公司的视频资源都在阿里云上面,这种换平台的几率非常小
优点:
- 实现相对简单
- 不会出现方案一,二中的问题
经过四个方案的对比,最终实现的方案采用方案四
总结:
- 改造服务的实现尽量给出多种方案,然后根据各自的优缺点,选择最合适的
- 如果出现各种库里面各种状态的保存,转换比较多的情况下,通常都是设计实现上存在问题,需要重新思考设计方案是否合理
需要看源代码的同志可以在评论里留言!!!