springboot activemq FFmpeg视频转码h264
服务思路
1.获取待转码视频url
2.将该视频url作为生产者发送消息到队列
eg:wkvideo 代表主题 encodeFileUrl 为消息 QUEUE代表队列类型
根据业务自行管理:实例如下
public void joinTransFormatQue(String fileUrl, boolean is ){
String suffix = fileUrl.substring(fileUrl.lastIndexOf(".") + 1, fileUrl.length());
String toSuffix =null;
String videoFormat = "mp4,mp3,wav,wma,wmv,mid,avi,mpg,asf,rm,rmvb";
String officeFormat = "doc,docx,ppt,xls,pdf,xlsx,pptx";
if(!CommUtils.isNull(suffix)){
toSuffix =suffix.toLowerCase();
}
if (videoFormat.contains(toSuffix) && is ) {
new FileConvertUtils(fileUrl,3).retrieveVideoThumbnailPic(fileUrl);//创建视频封面
String encodeFileUrl = java.net.URLEncoder.encode(fileUrl);
//发送 转码主题到jms服务器
jmsService.sendMessage1("wkvideo", encodeFileUrl, "QUEUE");
}else if (officeFormat.contains(toSuffix) && is ) {
String encodeFileUrl = java.net.URLEncoder.encode(fileUrl);
//发送 转码主题到jms服务器
jmsService.sendMessage1("office2swf", encodeFileUrl, "QUEUE");
}
}
3.消费者监听服务,只需启动即可,先注册主题为wkvideo
@SpringBootApplication
public class DemoApplication {
@Bean
public ActiveMQQueue queue() {
return new ActiveMQQueue("wkvideo");
}
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
开始监听消息并获取@JmsListener(destination = "wkvideo")
@Component
public class PromoteActConsumer {
/**
* 客户端消费
* @param message
*/
@JmsListener(destination = "wkvideo")
public void receiveQueue(Message message) {//consumer:视频url
//执行转码功能
try {
String msg = ((TextMessage)message).getText();
String videoUrl = java.net.URLDecoder.decode( msg,"UTF-8");
String chmod = "/usr/local/bin/video2H264.sh";
String shell = "/usr/local/bin/video2H264.sh /mnt/nfs/mnt/nfs" + videoUrl;
System.out.println("videochs**************shell:"+shell);
CommandResult result = CommandUtils.exec(shell,chmod);
} catch (JMSException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
shell 为转码脚本路径,后面为服务器转码路径
video2H264.sh 脚本如下 需注意不同系统的空格处理
#!/bin/sh
echo "ffmpegmp4"
ffmpegmp4(){
if [ -f "$1" -a -s "$1" ];then
if [ "${1##*.}" = "mp4" -o "${1##*.}" = "flv" -o "${1##*.}" = "MP4" -o "${1##*.}" = "FLV" ];then
original=$1
echo "$original"
echo ">>target<<"
target=${original%.*}.${original##*.}
echo "$target"
tmp=${original%.*}`date +%N`.mp4 # 文件的临时名字
echo ">>picpath<<"
picpath=${original%.*}_pic.jpg
echo ">>picpath<<"
ffmpeg -i $original -s 480x320 -vcodec h264 -preset fast -b 800000 $tmp
# ffmpeg -i $original -vcodec h264 -preset fast -b:v 1000k $tmp
# ffmpeg -i $original -threads:1 4 -ab 56 -ar 22050 -qscale 8 -codec:v libx264 -c:a copy -movflags faststart -r 15 -s 480*320 -#y $tmp
#ffmpeg -loglevel quiet -i $original -c:v libx264 -strict -2 $tmp
#ffmpeg -ss 0 -i $original -y -f image2 -t 0.001 -s 238*140 $picpath
ffmpeg -i $original -y -f image2 -ss 00:00:10 -vframes 1 $picpath
mv $tmp $target
fi
fi
}
ffmpegmp4 $1
echo ">>>>>>>>>>>>>>>ffmpegmp4 over<<<<<<<<<<<<<<<"
生成者根据业务自行管理,实例如下
@RestController
@Component
@EnableScheduling
@RequestMapping("/send")
public class PromoteActProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
//@Scheduled(fixedDelay = 2000) // 每2s执行1次
@RequestMapping("/msg")
public void send() {
this.jmsMessagingTemplate.convertAndSend(this.queue, "快点转码");
}
}
配置文件信息
##对于activeMQ的支持
#spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.in-memory=true
spring.activemq.pool.enabled=false
spring.activemq.userName=admin
spring.activemq.password=admin
#连接池最大连接数
spring.activemq.pool.max-connections=10
#空闲的连接过期时间,默认为30秒
spring.activemq.pool.idle-timeout=30000
#强制的连接过期时间,与idleTimeout的区别在于:idleTimeout是在连接空闲一段时间失效,而expiryTimeout不管当前连接的情况,只要达到指定时间就失效。默认为0,never
spring.jms.pub-sub-domain=false
spring.activemq.pool.time-between-expiration-check=10000
最后两个工具类
public class CommandResult {
public static final int EXIT_VALUE_TIMEOUT=-1;
public static final int EXIT_VALUE_UNEXCEPTED = 2;
private String output;
void setOutput(String error) {
output=error;
}
String getOutput(){
return output;
}
int exitValue;
void setExitValue(int value) {
exitValue=value;
}
int getExitValue(){
return exitValue;
}
private String error;
/**
* @return the error
*/
public String getError() {
return error;
}
/**
* @param error the error to set
*/
public void setError(String error) {
this.error = error;
}
}
package ActiveMQ.common;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Date;
public class CommandUtils {
public static void exec(String command,String chmod) {
try {
System.out.println(">>>>>>>>>>赋予脚本转码操作权限<<<<<<<<<<");
ProcessBuilder builder = new ProcessBuilder("chmod", "777",chmod);
Process process1 = builder.start();
process1.waitFor();
System.out.println(">>>>>>>>>>赋予权限成功<<<<<<<<<<");
System.out.println(">>>>>>>>>>开始执行转码脚本<<<<<<<<<<"+new Date());
Runtime runtime = Runtime.getRuntime();
final Process pro = runtime.exec(command);
new Thread()
{
@Override
public void run()
{
BufferedReader in = new BufferedReader(new InputStreamReader(pro.getInputStream()));
String line = null;
try
{
while((line = in.readLine()) != null)
{
System.out.println("output: " + line);
}
}
catch (IOException e)
{
e.printStackTrace();
}
finally
{
try
{
in.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
}.start();
new Thread()
{
@Override
public void run()
{
BufferedReader err = new BufferedReader(new InputStreamReader(pro.getErrorStream()));
String line = null;
try
{
while((line = err.readLine()) != null)
{
System.out.println("err: " + line);
}
}
catch (IOException e)
{
e.printStackTrace();
}
finally
{
try
{
err.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
}.start();
pro.waitFor();
}
catch (Exception e)
{
e.printStackTrace();
}
}
// //default time out, in millseconds
// public static final Long DEFAULT_TIMEOUT = 5 * 100 * 10000L;
// public static final int DEFAULT_INTERVAL = 10000;
//
// /**
// * Executes the specified command in a separate process. The method then blocks until the process returned.
// * If an error arises during the execution or if the exeecuted process returned an non-null return code,
// * the content of the process' stderr is returned to the caller. If the execution is fine, null is returned.
// *
// * @param command String
// * @return CommandResult
// */
// public static CommandResult exec(String command,String chmod) {
// long start = System.currentTimeMillis();
// long last;
//
// CommandResult commandResult = new CommandResult();
//
// try {
// ProcessBuilder builder = new ProcessBuilder("chmod", "777",chmod);
// Process process1 = builder.start();
// process1.waitFor();
// Process process = Runtime.getRuntime().exec(command);
//
//// int status = process.waitFor();
//// if(status !=0){
//// System.out.println("调用shell命令失败");
//// }else{
//// System.out.println("调用shell命令成功");
//// }
// process(process, commandResult);
// if (process != null) {
// process.destroy();
// }
//
// last = (System.currentTimeMillis() - start) / 1000;
//
// } catch (Exception e) {
// last = (System.currentTimeMillis() - start) / 1000;
// String error = "Execute command [" + command + "] last [" + last + "] s, failed [" + e.getMessage() + "]";
//
// commandResult.setExitValue(CommandResult.EXIT_VALUE_UNEXCEPTED);
// commandResult.setError(error);
// }
//
// return commandResult;
// }
//
// private static void process(Process process, CommandResult commandResult) {
// BufferedReader errorReader = null;
// BufferedReader inputReader = null;
//
// try {
// errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
// inputReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
//
// //timeout control
// long start = System.currentTimeMillis();
// boolean processFinished = false;
//
// while (System.currentTimeMillis() - start < DEFAULT_TIMEOUT.intValue() && !processFinished) {
// processFinished = true;
// try {
// process.exitValue();
// } catch (IllegalThreadStateException e) {
// // process hasn't finished yet
// processFinished = false;
// continue;
//// try {
//// Thread.sleep(DEFAULT_INTERVAL);
//// } catch (InterruptedException e1) {
//// }
// }
// }
//
// if (!processFinished) {
// commandResult.setExitValue(CommandResult.EXIT_VALUE_TIMEOUT);
// commandResult.setError("Command process timeout");
// return;
// }
//
// commandResult.setExitValue(process.waitFor());
//
// StringBuffer sb;
// String line;
//
// //parse error info
// if (errorReader.ready()) {
// sb = new StringBuffer();
// while ((line = errorReader.readLine()) != null) {
// sb.append(line);
// }
// commandResult.setError(sb.toString());
// }
//
// //parse info
// if (inputReader.ready()) {
// sb = new StringBuffer();
// while ((line = inputReader.readLine()) != null) {
// sb.append(line);
// }
// commandResult.setOutput(sb.toString());
// }
//
// } catch (Exception e) {
// String error = "Command process, failed [" + e.getMessage() + "]";
//
// commandResult.setExitValue(CommandResult.EXIT_VALUE_UNEXCEPTED);
// commandResult.setError(error);
//
// } finally {
// if (errorReader != null) {
// try {
// errorReader.close();
// } catch (IOException e) {
// System.out.println("Close BufferedReader, failed [" + e.getMessage() + "]" );
// }
// }
//
// if (inputReader != null) {
// try {
// inputReader.close();
// } catch (IOException e) {
// System.out.println("Close BufferedReader, failed [" + e.getMessage() + "]" );
// }
// }
// }
// }
}