1.模块介绍
目前媒资管理的主要管理对象是视频、图片、文档等,包括:媒资文件的查询、文件上传、视频处理等。
媒资查询:系统可以查询自己所拥有的媒资信息。
文件上传:包括上传图片、上传文档、上传视频。
视频处理:视频上传成功,系统自动对视频进行编码处理。
文件删除:教学机构删除自己上传的媒资文件。
2.业务流程
主要分为:上传图片,上传视频,视频处理
2.1上传图片流程
1、前端进入上传图片界面
2、上传图片,请求媒资管理服务。
3、媒资管理服务将图片文件存储在MinIO。
4、媒资管理记录文件信息到数据库。
5、前端请求内容管理服务保存课程信息,在内容管理数据库保存图片地址。
2.2上传视频流程
1、前端对文件进行分块。
2、前端上传分块文件前请求媒资服务检查文件是否存在,如果已经存在则不再上传。
3、如果分块文件不存在则前端开始上传
4、前端请求媒资服务上传分块。
5、媒资服务将分块上传至MinIO。
6、前端将分块上传完毕请求媒资服务合并分块。
7、媒资服务判断分块上传完成则请求MinIO合并文件。
8、合并完成校验合并后的文件是否完整,如果完整则上传完成,否则删除文件。
2.3视频处理流程
(这里用的视频转码工具是FFmpeg)
1、任务调度中心广播作业分片。
2、执行器收到广播作业分片,从数据库读取待处理任务,读取未处理及处理失败的任务。
3、执行器更新任务为处理中,根据任务内容从MinIO下载要处理的文件。
4、执行器启动多线程去处理任务。
5、任务处理完成,上传处理后的视频到MinIO。
6、将更新任务处理结果,如果视频处理完成除了更新任务处理结果以外还要将文件的访问地址更新至任务处理表及文件表中,最后将任务完成记录写入历史表。
3.数据模型
1.3 数据模型
本模块媒资文件相关的数据表如下:
媒资文件表:存储文件信息,包括图片、视频、文档等。
media_process: 待处理视频表,为要视频转码的视频数据存储(如,.avi->mp4)。
media_process_history: 视频处理历史表,记录已经处理成功的视频信息。
4.采用的分布式技术粗讲
本次采用的分布式技术分别是MinIo与XXL-Job
4.1分布式文件系统(本次采用的是MinIo)
什么是文件系统呢?
文件系统是负责管理和存储文件的系统软件,操作系统通过文件系统提供的接口去存取文件,用户通过操作系统访问磁盘上的文件,常见的文件系统:FAT16/FAT32、NTFS、HFS、UFS、APFS、XFS、Ext4等 。
而当要存储海量的文件时,应该如何存储呢?
分布式文件系统就是海量用户查阅海量文件的解决方案。
分布式文件系统是一个计算机无法存储海量的文件,通过网络将若干计算机组织起来共同去存储海量的文件,去接收海量用户的请求,这些组织起来的计算机通过网络进行通信。
既然是分布式处理,那么它的好处便是:
1、一台计算机的文件系统处理能力扩充到多台计算机同时处理。
2、一台计算机挂了还有另外副本计算机提供数据。
3、每台计算机可以放在不同的地域,这样用户就可以就近访问,提高访问速度。
4.2 分布式任务调度系统(本次采用的是XXL-JOB)
对一个视频的转码可以理解为一个任务的执行,如果视频的数量比较多(海量的任务),如何去高效处理一批任务呢?
1、多线程
多线程是充分利用单机的资源。
2、分布式加多线程
充分利用多台计算机,每台计算机使用多线程处理。
方案2可扩展性更强。
方案2是一种分布式任务调度的处理方案。
什么是任务调度?
任务调度顾名思义,就是对任务的调度,它是指系统为了完成特定业务,基于给定时间点,给定时间间隔或者给定执行次数自动执行任务。
什么是分布式任务调度?
通常任务调度的程序是集成在应用中的,比如:优惠卷服务中包括了定时发放优惠卷的的调度程序,结算服务中包括了定期生成报表的任务调度程序,由于采用分布式架构,一个服务往往会部署多个冗余实例来运行我们的业务,在这种分布式系统环境下运行任务调度,我们称之为分布式任务调度
优点:
采用分布式调度任务的方式就相当于将任务调度程序分布式构建,这样就可以具有分布式系统的特点,并且提高任务的调度处理能力:
1、并行任务调度
并行任务调度实现靠多线程,如果有大量任务需要调度,此时光靠多线程就会有瓶颈了,因为一台计算机CPU的处理能力是有限的。
如果将任务调度程序分布式部署,每个结点还可以部署为集群,这样就可以让多台计算机共同去完成任务调度,我们可以将任务分割为若干个分片,由不同的实例并行执行,来提高任务调度的处理效率。
2、高可用
若某一个实例宕机,不影响其他实例来执行任务。
3、弹性扩容
当集群中增加实例就可以提高并执行任务的处理效率。
4、任务管理与监测
对系统中存在的所有定时任务进行统一的管理及监测。让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应。
5、避免任务重复执行
当任务调度以集群方式部署,同一个任务调度可能会执行多次,比如在上面提到的电商系统中到点发优惠券的例子,就会发放多次优惠券,对公司造成很多损失,所以我们需要控制相同的任务在多个运行实例上只执行一次。
5.MinIo精讲
5.1MinIo介绍
MinIO 是一个非常轻量的服务,可以很简单的和其他应用的结合使用,它兼容亚马逊 S3 云存储服务接口,非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等。
它一大特点就是轻量,使用简单,功能强大,支持各种平台,单个文件最大5TB,兼容 Amazon S3接口,提供了 Java、Python、GO等多版本SDK支持。
官网:https://min.io
中文:https://www.minio.org.cn/,http://docs.minio.org.cn/docs/
MinIO集群采用去中心化共享架构,每个结点是对等关系,通过Nginx可对MinIO进行负载均衡访问。
去中心化有什么好处?
在大数据领域,通常的设计理念都是无中心和分布式。Minio分布式模式可以帮助你搭建一个高可用的对象存储服务,你可以使用这些存储设备,而不用考虑其真实物理位置。它将分布在不同服务器上的多块硬盘组成一个对象存储服务。由于硬盘分布在不同的节点上,分布式Minio避免了单点故障。
Minio使用纠删码技术来保护数据,它是一种恢复丢失和损坏数据的数学算法,它将数据分块冗余的分散存储在各各节点的磁盘上,所有的可用磁盘组成一个集合,上图由8块硬盘组成一个集合,当上传一个文件时会通过纠删码算法计算对文件进行分块存储,除了将文件本身分成4个数据块,还会生成4个校验块,数据块和校验块会分散的存储在这8块硬盘上。
使用纠删码的好处是即便丢失一半数量(N/2)的硬盘,仍然可以恢复数据。 比如上边集合中有4个以内的硬盘损害仍可保证数据恢复,不影响上传和下载,如果多于一半的硬盘坏了则无法恢复。
5.2MinIo使用
安装,并且登入后界面长这样,界面很简单,界面的使用自己可以摸索就会了。
bucket,桶,它相当于存储文件的目录,可以创建若干的桶
5.2.1maven依赖如下:
最低需求Java 1.8或更高版本:
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>8.4.3</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.8.1</version>
</dependency>
5.2.2配置
在配置文件中根据自己的minIo信息去配置
上面的桶名对应我下面的桶名
需要三个参数才能连接到minio服务。
Endpoint 对象存储服务的URL
Access Key Access key 就像用户ID,可以唯一标识你的账户。
Secret Key Secret key 是你账户的密码。
minio:
endpoint: http://192.168.101.65:9000
accessKey: minioadmin
secretKey: minioadmin
bucket:
files: mediafiles
videofiles: video
如果不是测试,可以写一个MinIo的配置类,方便使用,如:
package com.xuecheng.media.config;
import io.minio.MinioClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @description minio配置类
*/
@Configuration
public class MinioConfig {
@Value("${minio.endpoint}")
private String endpoint;
@Value("${minio.accessKey}")
private String accessKey;
@Value("${minio.secretKey}")
private String secretKey;
@Bean
public MinioClient minioClient() {
MinioClient minioClient =
MinioClient.builder()
.endpoint("http://192.168.101.65:9000")
.credentials(accessKey, secretKey)
.build();
return minioClient;
}
}
5.2.3 断点续传技术(上传视频需要了解一下)
什么是断点续传
通常视频文件都比较大,所以对于媒资系统上传文件的需求要满足大文件的上传要求。http协议本身对上传文件大小没有限制,但是客户的网络环境质量、电脑硬件环境等参差不齐,如果一个大文件快上传完了网断了没有上传完成,需要客户重新上传,用户体验非常差,所以对于大文件上传的要求最基本的是断点续传。
什么是断点续传:
引用百度百科:断点续传指的是在下载或上传时,将下载或上传任务(一个文件或一个压缩包)人为的划分为几个部分,每一个部分采用一个线程进行上传或下载,如果碰到网络故障,可以从已经上传或下载的部分开始继续上传下载未完成的部分,而没有必要从头开始上传下载,断点续传可以提高节省操作时间,提高用户体验性。
断点续传流程如下图:
流程如下:
1、前端上传前先把文件分成块
2、一块一块的上传,上传中断后重新上传,已上传的分块则不用再上传
3、各分块上传完成最后在服务端合并文件
5.2.4minIo的示例测试代码
这只是演示基本的使用,自己用还是得修改的
import com.j256.simplemagic.ContentInfo;
import com.j256.simplemagic.ContentInfoUtil;
import io.minio.*;
import io.minio.errors.*;
import io.minio.messages.DeleteError;
import io.minio.messages.DeleteObject;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.compress.utils.IOUtils;
import org.junit.jupiter.api.Test;
import org.springframework.http.MediaType;
import java.io.*;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class MinioTest {
MinioClient minioClient =
MinioClient.builder()
.endpoint("http://192.168.101.65:9000")
.credentials("minioadmin", "minioadmin")
.build();
@Test
public void test_upload() throws IOException, ServerException, InsufficientDataException, ErrorResponseException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
//根据扩展名取出媒体资源类型mimeType
ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(".mp4");
String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE;//通用mimeType,字节流
if (extensionMatch != null) {
String mimeType1 = extensionMatch.getMimeType();
}
//上传文件的参数信息
UploadObjectArgs uploadObjectArgs = UploadObjectArgs.builder()
.bucket("testbuket")//桶
.filename("C:\\Users\\a2262\\Pictures\\blog\\Blue Whale.png")//指定本地文件路径
// .object("blog.png")//对象名,文件存放路径相对于桶名
.object("test/01/blog.png") // 在testbuket/test/01下存放文件,命名为blog.png
// .contentType("image/png") //设置媒体文件类型
.contentType(mimeType)
.build();
//上传文件
minioClient.uploadObject(uploadObjectArgs);
}
@Test
public void test_delete() throws Exception {
//删除文件的参数信息
RemoveObjectArgs removeObjectArgs = RemoveObjectArgs.builder()
.bucket("testbuket")
.object("blog.png")
.build();
//删除文件
minioClient.removeObject(removeObjectArgs);
}
//查询文件,从minio下载
@Test
public void getFile() throws IOException {
GetObjectArgs getObjectArgs = GetObjectArgs
.builder()
.bucket("testbuket")
.object("test/01/blog.png")
.build();
try (
//这是远程流,不稳定
FilterInputStream inputStream = minioClient.getObject(getObjectArgs);
FileOutputStream outputStream = new FileOutputStream(new File("D:\\MinIo\\blog.png"));
) {
IOUtils.copy(inputStream, outputStream);
} catch (Exception e) {
e.printStackTrace();
}
//校验文件的完整性对文件的内容进行md5
FileInputStream fileInputStream1
= new FileInputStream(new File("C:\\Users\\a2262\\Pictures\\blog\\Blue Whale.png"));
String source_md5 = DigestUtils.md5Hex(fileInputStream1);
FileInputStream fileInputStream
= new FileInputStream(new File("D:\\MinIo\\blog.png"));
String local_md5 = DigestUtils.md5Hex(fileInputStream);
if (source_md5.equals(local_md5)) {
System.out.println("下载成功");
}
}
//将分块文件上传到minio
@Test
public void uploadChunk() throws IOException, ServerException, InsufficientDataException, ErrorResponseException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
for (int i = 0; i < 2; i++) {
//上传文件的参数信息
UploadObjectArgs uploadObjectArgs = UploadObjectArgs.builder()
.bucket("testbuket")//桶
.filename("D:\\MinIo\\upload\\chunk\\" + i)//指定本地文件路径
.object("chunk/" + i)
.build();
minioClient.uploadObject(uploadObjectArgs);
System.out.println("上传分块" + i + "成功");
}
}
//调用minio接口合并分块
@Test
public void uploadMerge() throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
//指定分块文件信息
List<ComposeSource> source = Stream.iterate(0, i -> ++i).limit(2).map(
i -> ComposeSource
.builder()
.bucket("testbuket")
.object("chunk/" + i)
.build()
)
.collect(Collectors.toList());
//指定分块后文件信息
ComposeObjectArgs testbuket = ComposeObjectArgs.builder()
.bucket("testbuket")
.object("merge01.mp4")
.sources(source)
.build();
//合并文件
minioClient.composeObject(testbuket);
}
//批量清理分块
public void test_removeObjects() {
//合并分块完成将分块文件清除
List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i)
.limit(2)
.map(i -> new DeleteObject("chunk/".concat(Integer.toString(i))))
.collect(Collectors.toList());
RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder()
.bucket("testbucket")
.objects(deleteObjects)
.build();
Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);
results.forEach(r -> {
DeleteError deleteError = null;
try {
deleteError = r.get();
} catch (Exception e) {
e.printStackTrace();
}
});
}
//分块测试
@Test
public void testChunk() throws IOException {
//源文件
File sourceFile = new File("C:\\Users\\a2262\\Videos\\视频\\3db161-170d56d7335.mp4");
//存储文件路径
String chunkFilePath="D:\\MinIo\\upload\\chunk\\";
File mk_file = new File(chunkFilePath);
if(!mk_file.exists()){
mk_file.mkdirs();
}
//分块大小,默认最小5mb,否则要改源码
int chunkSize=1024*1024*5;
//文件分块数,向上取整
int chunkNum= (int) Math.ceil(sourceFile.length()*1.0/chunkSize);
//用流读数据
RandomAccessFile r = new RandomAccessFile(sourceFile, "r");
//缓存区
byte[] bytes = new byte[1024];
for (int i = 0; i < chunkNum; i++) {
File chunkFile = new File(chunkFilePath + i);
if(chunkFile.exists()){
chunkFile.delete();
}
RandomAccessFile rw = new RandomAccessFile(chunkFile, "rw");
int len=-1;
while((len=r.read(bytes))!=-1){
rw.write(bytes,0,len);
if(chunkFile.length()>=chunkSize){
break;
}
}
rw.close();
}
r.close();
}
//将分块合并
@Test
public void testMerge() throws IOException {
//块文件目录
File chunkFolder=new File("D:\\MinIo\\upload\\chunk");
//源文件
File sourceFile = new File("C:\\Users\\a2262\\Videos\\视频\\3db161-170d56d7335.mp4");
File mk_file = new File("D:\\MinIo\\merge\\chunk");
if(!mk_file.exists()){
mk_file.mkdirs();
}
//合并后的文件
File mergeFile = new File("D:\\MinIo\\merge\\chunk\\3db161-170d56d7335.mp4");
//取出所有的分块文件
File[] files = chunkFolder.listFiles();
//将数组转化为list
List<File> filesList = Arrays.asList(files);
Collections.sort(filesList, new Comparator<File>() {
@Override
public int compare(File o1, File o2) {
return Integer.parseInt(o1.getName())-Integer.parseInt(o2.getName());
}
});
//向合并文件写的流
RandomAccessFile rw = new RandomAccessFile(mergeFile, "rw");
//缓冲区
byte[] bytes =new byte[1024];
for (File file:filesList){
//读分块的流
RandomAccessFile r = new RandomAccessFile(file, "r");
int len=-1;
while((len=r.read(bytes))!=-1){
rw.write(bytes,0,len);
}
r.close();
}
rw.close();
//合并文件完成进行md5校验
FileInputStream merge_f = new FileInputStream(mergeFile);
FileInputStream chunk_f = new FileInputStream(sourceFile);
String s1 = DigestUtils.md5Hex(merge_f);
String s2 = DigestUtils.md5Hex(chunk_f);
if(s1.equals(s2)){
System.out.println("合并完成");
}
}
}
6.XXL-JOB精讲
6.1介绍
XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
官网:https://www.xuxueli.com/xxl-job/
XXL-JOB主要有调度中心、执行器、任务:
调度中心:
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码;
主要职责为执行器管理、任务管理、监控运维、日志管理等
任务执行器:
负责接收调度请求并执行任务逻辑;
只要职责是注册服务、任务执行服务(接收到任务后会放入线程池中的任务队列)、执行结果上报、日志服务等
任务:负责执行具体的业务处理。
调度中心与执行器之间的工作流程如下:
执行流程:
1.任务执行器根据配置的调度中心的地址,自动注册到调度中心
2.达到任务触发条件,调度中心下发任务
3.执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中
4.执行器消费内存队列中的执行结果,主动上报给调度中心
5.当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并 返回日志详情
6.2使用
6.2.1首先下载XXL-JOB
GitHub:https://github.com/xuxueli/xxl-job
码云:https://gitee.com/xuxueli0323/xxl-job
下载完解压后,使用IDEA打开解压后的目录
xxl-job-admin:调度中心
xxl-job-core:公共依赖
xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用)
:xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
:xxl-job-executor-sample-frameless:无框架版本;
doc :文档资料,包含数据库脚本
6.2.2创建doc里的数据库脚本
创建完后如下图所示
6.2.3访问XXL-JOB
http://你的ip地址:8088/xxl-job-admin/
界面大概长这样
6.3 执行器
执行器负责与调度中心通信接收调度中心发起的任务调度请求
配置执行器示例:
点击新增,填写执行器信息,appname是前边在nacos中配置xxl信息时指定的执行器的应用名。
保存
6.4配置
Maven依赖
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.1</version>
</dependency>
配置文件
xxl:
job:
admin:
addresses: http://你的ip地址:8088/xxl-job-admin
executor:
appname: media-process-service #执行器名字
address:
ip:
port: 9999
logpath: /data/applogs/xxl-job/jobhandler
logretentiondays: 30
accessToken: default_token
注意配置中的appname这是执行器的应用名,port是执行器启动的端口,如果本地启动多个执行器注意端口不能重复。
配置类
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* xxl-job config
*/
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
}
重新启动该项目后,查看XXL-Job界面
在线机器地址处已显示1个执行器。或者启动后观察日志,出现下边的日志
表示执行器在调度中心注册成功
6.5执行任务
参考示例代码
如:
Java
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* @description 测试执行器
* @author Mr.M
* @date 2022/9/13 20:32
* @version 1.0
*/
@Component
@Slf4j
public class SampleJob {
/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("testJob")
public void testJob() throws Exception {
log.info("开始执行.....");
}
}
下边在调度中心添加任务,进入任务管理
点击新增,填写任务信息
调度类型:
固定速度指按固定的间隔定时调度。
Cron,通过Cron表达式实现更丰富的定时调度策略。
Cron表达式是一个字符串,通过它可以定义调度策略,格式如下:
{秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}
xxl-job提供图形界面去配置:
一些例子如下:
30 10 1 * * ? 每天1点10分30秒触发
0/30 * * * * ? 每30秒触发一次
- 0/10 * * * ? 每10分钟触发一次
运行模式:
有BEAN和GLUE,bean模式较常用就是在项目工程中编写执行器的任务代码,GLUE是将任务代码编写在调度中心。
JobHandler:
即任务方法名,填写任务方法上边@XxlJob注解中的名称。
路由策略:
当执行器集群部署时,调度中心向哪个执行器下发任务,这里选择第一个表示只向第一个执行器下发任务,路由策略的其它选项稍后在分片广播章节详细解释。
高级配置的其它配置项稍后在分片广播章节详细解释。
添加成功,启动任务
停止任务要在调度执行操作
还可以进行日志清除
6.6分片广播的路由策略
6.6.1有哪些路由策略呢
高级配置:
- 路由策略:当执行器集群部署时,提供丰富的路由策略,包括;
FIRST(第一个):固定选择第一个机器;
LAST(最后一个):固定选择最后一个机器;
ROUND(轮询):;
RANDOM(随机):随机选择在线的机器;
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
- 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度,通过子任务可以实现一个任务执行完成去执行另一个任务。
- 调度过期策略:
- 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
- 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
- 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
- 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;
- 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;
6.6.2重点再说分片广播策略
重点说的是分片广播策略,分片是指是调度中心以执行器为维度进行分片,将集群中的执行器标上序号:0,1,2,3…,广播是指每次调度会向集群中的所有执行器发送任务调度,请求中携带分片参数,获取分片参数进行分片业务处理。
每个执行器收到调度请求同时接收分片参数。
xxl-job支持动态扩容执行器集群从而动态增加分片数量,当有任务量增加可以部署更多的执行器到集群中,调度中心会动态修改分片的数量。
作业分片适用哪些场景呢?
•分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;
•广播任务场景:广播执行器同时运行shell脚本、广播集群节点进行缓存更新等。
所以,广播分片方式不仅可以充分发挥每个执行器的能力,并且根据分片参数可以控制任务是否执行,最终灵活控制了执行器集群分布式处理任务。
6.6.3分片广播使用示例
代码
Java
/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
log.info("开始执行第"+shardIndex+"批任务");
}
在调度中心添加任务
添加成功,然后启动任务:
观察日志
6.7如何保证多个执行器不会查询到重复的任务呢
每个执行器收到广播任务有两个参数:分片总数、分片序号。每个执行从数据表取任务时可以让任务id 模上 分片总数,如果等于分片序号则执行此任务。
上边两个执行器实例那么分片总数为2,序号为0、1,从任务1开始,如下:
1 % 2 = 1 执行器2执行
2 % 2 = 0 执行器1执行
3 % 2 = 1 执行器2执行
以此类推.
6.8如何保证任务不重复执行
通过作业分片方案保证了执行器之间查询到不重复的任务,如果一个执行器在处理一个视频还没有完成,此时调度中心又一次请求调度,为了不重复处理同一个视频该怎么办?
首先配置调度过期策略:
查看文档如下:
- 调度过期策略:调度中心错过调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等;
- 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
- 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
- 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
这里我们选择忽略,如果立即执行一次就可能重复执行相同的任务。
只做这些配置可以保证任务不会重复执行吗?
做不到,还需要保证任务处理的幂等性
什么是任务的幂等性?
任务的幂等性是指:对于数据的操作不论多少次,操作的结果始终是一致的。在本项目中要实现的是不论多少次任务调度同一个视频只执行一次成功的转码。
什么是幂等性?
它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果。
幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等。
解决幂等性常用的方案:
1)数据库约束,比如:唯一索引,主键。
2)乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。
3)唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等则执行。
上面的方式属于用分布式锁
什么是分布式锁?
它并不是类似于synchronized同步锁的方式,synchronized只能保证同一个虚拟机中多个线程去争抢锁。虚拟机都去抢占同一个锁,锁是一个单独的程序提供加锁、解锁服务。该锁已不属于某个虚拟机,而是分布式部署,由多个虚拟机所共享,这种锁叫分布式锁。
7.补充FFmpeg的使用
下载:FFmpeg https://www.ffmpeg.org/download.html#build-windows
请从常用工具软件目录找到ffmpeg.exe,并将ffmpeg.exe加入环境变量path中。
测试是否正常:cmd运行 ffmpeg -version
7.1示例代码
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class Mp4VideoUtil extends VideoUtil {
String ffmpeg_path = "D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe";//ffmpeg的安装位置
String video_path = "D:\\BaiduNetdiskDownload\\test1.avi";
String mp4_name = "test1.mp4";
String mp4folder_path = "D:/BaiduNetdiskDownload/Movies/test1/";
public Mp4VideoUtil(String ffmpeg_path, String video_path, String mp4_name, String mp4folder_path){
super(ffmpeg_path);
this.ffmpeg_path = ffmpeg_path;
this.video_path = video_path;
this.mp4_name = mp4_name;
this.mp4folder_path = mp4folder_path;
}
//清除已生成的mp4
private void clear_mp4(String mp4_path){
//删除原来已经生成的m3u8及ts文件
File mp4File = new File(mp4_path);
if(mp4File.exists() && mp4File.isFile()){
mp4File.delete();
}
}
/**
* 视频编码,生成mp4文件
* @return 成功返回success,失败返回控制台日志
*/
public String generateMp4(){
//清除已生成的mp4
// clear_mp4(mp4folder_path+mp4_name);
clear_mp4(mp4folder_path);
/*
ffmpeg.exe -i lucene.avi -c:v libx264 -s 1280x720 -pix_fmt yuv420p -b:a 63k -b:v 753k -r 18 .\lucene.mp4
*/
List<String> commend = new ArrayList<String>();
//commend.add("D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe");
commend.add(ffmpeg_path);
commend.add("-i");
// commend.add("D:\\BaiduNetdiskDownload\\test1.avi");
commend.add(video_path);
commend.add("-c:v");
commend.add("libx264");
commend.add("-y");//覆盖输出文件
commend.add("-s");
commend.add("1280x720");
commend.add("-pix_fmt");
commend.add("yuv420p");
commend.add("-b:a");
commend.add("63k");
commend.add("-b:v");
commend.add("753k");
commend.add("-r");
commend.add("18");
// commend.add(mp4folder_path + mp4_name );
commend.add(mp4folder_path );
String outstring = null;
try {
ProcessBuilder builder = new ProcessBuilder();
builder.command(commend);
//将标准输入流和错误输入流合并,通过标准输入流程读取信息
builder.redirectErrorStream(true);
Process p = builder.start();
outstring = waitFor(p);
} catch (Exception ex) {
ex.printStackTrace();
}
// Boolean check_video_time = this.check_video_time(video_path, mp4folder_path + mp4_name);
Boolean check_video_time = this.check_video_time(video_path, mp4folder_path);
if(!check_video_time){
return outstring;
}else{
return "success";
}
}
public static void main(String[] args) throws IOException {
//ffmpeg的路径
String ffmpeg_path = "D:\\Baidu Netdisk\\item1\\day1\\常用软件\\ffmpeg\\ffmpeg.exe";//ffmpeg的安装位置
//源avi视频的路径
String video_path = "D:\\develop\\bigfile_test\\nacos01.avi";
//转换后mp4文件的名称
String mp4_name = "nacos01.mp4";
//转换后mp4文件的路径
String mp4_path = "D:\\MinIo\\nacos01.mp4";
//创建工具类对象
Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path,video_path,mp4_name,mp4_path);
//开始视频转换,成功将返回success
String s = videoUtil.generateMp4();
System.out.println(s);
}
}