feat(chaos): 实现视频分片上传和后台处理功能- 新增视频上传相关控制器、服务接口和实现类
- 实现了视频分片上传、合并和后台处理的逻辑 - 添加了 RabbitMQ 消息队列配置和消息转换器 -优化了 JWT 认证过滤器和日志记录 - 新增了跨域配置
This commit is contained in:
@@ -36,6 +36,19 @@
|
||||
<artifactId>chaos_api_common</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-web</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -0,0 +1,13 @@
|
||||
package cn.nopj.chaos_api.config;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
@Configuration
|
||||
public class AppConfig {
|
||||
@Bean
|
||||
public RestTemplate restTemplate() {
|
||||
return new RestTemplate ();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,66 @@
|
||||
package cn.nopj.chaos_api.config;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONReader;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.amqp.support.converter.MessageConversionException;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* 自定义使用 Fastjson2 的消息转换器
|
||||
*/
|
||||
public class Fastjson2MessageConverter implements MessageConverter {
|
||||
|
||||
public static final String DEFAULT_CHARSET = StandardCharsets.UTF_8.name();
|
||||
|
||||
/**
|
||||
* 将Java对象转换为AMQP消息
|
||||
* 在这个方法中,我们将对象序列化为JSON字节数组,并在消息头中添加类型信息。
|
||||
*/
|
||||
@Override
|
||||
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
|
||||
if (messageProperties == null) {
|
||||
messageProperties = new MessageProperties();
|
||||
}
|
||||
// 设置内容类型为JSON
|
||||
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
|
||||
messageProperties.setContentEncoding(DEFAULT_CHARSET);
|
||||
|
||||
// 【核心】在消息头中添加完整的类名作为类型标识
|
||||
messageProperties.setHeader("__TypeId__", object.getClass().getName());
|
||||
|
||||
byte[] bytes = JSON.toJSONBytes(object);
|
||||
messageProperties.setContentLength(bytes.length);
|
||||
|
||||
return new Message(bytes, messageProperties);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将AMQP消息转换为Java对象
|
||||
* 在这个方法中,我们从消息头中获取类型信息,然后用Fastjson2将消息体反序列化为指定类型的对象。
|
||||
*/
|
||||
@Override
|
||||
public Object fromMessage(Message message) throws MessageConversionException {
|
||||
MessageProperties properties = message.getMessageProperties();
|
||||
String typeId = properties.getHeader("__TypeId__");
|
||||
|
||||
if (typeId == null) {
|
||||
throw new MessageConversionException("无法转换消息:未找到 __TypeId__ 消息头");
|
||||
}
|
||||
|
||||
try {
|
||||
// 根据类型标识找到对应的类
|
||||
Class<?> targetClass = ClassUtils.forName(typeId, ClassUtils.getDefaultClassLoader());
|
||||
// 使用Fastjson2进行反序列化
|
||||
return JSON.parseObject(message.getBody(), targetClass, JSONReader.Feature.SupportSmartMatch);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new MessageConversionException("无法解析类型 " + typeId, e);
|
||||
} catch (Exception e) {
|
||||
throw new MessageConversionException("使用Fastjson2转换消息失败", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package cn.nopj.chaos_api.config;
|
||||
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.DirectExchange;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class RabbitMQConfig {
|
||||
public static final String QUEUE_NAME = "video.processing.queue";
|
||||
public static final String EXCHANGE_NAME = "video.direct.exchange";
|
||||
public static final String ROUTING_KEY = "video.processing.key";
|
||||
|
||||
@Bean
|
||||
public Queue videoQueue() {
|
||||
// durable: true, 队列持久化
|
||||
return new Queue(QUEUE_NAME, true);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public DirectExchange videoExchange() {
|
||||
return new DirectExchange(EXCHANGE_NAME);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding binding(Queue queue, DirectExchange exchange) {
|
||||
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MessageConverter jsonMessageConverter() {
|
||||
return new Fastjson2MessageConverter();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,92 @@
|
||||
package cn.nopj.chaos_api.service.impl;
|
||||
|
||||
import cn.nopj.chaos_api.config.RabbitMQConfig;
|
||||
import cn.nopj.chaos_api.dto.VideoTaskPayload;
|
||||
import cn.nopj.chaos_api.service.VideoFileUploadService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class VideoFileUploadServiceImpl implements VideoFileUploadService {
|
||||
|
||||
@Value("${file.upload.temp-dir}")
|
||||
private String tempDir;
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate; // 注入RabbitMQ模板
|
||||
|
||||
private static final long CHUNK_SIZE = 5 * 1024 * 1024; // 5MB
|
||||
|
||||
public Map<String, Object> initUpload(String fileName, long totalSize) {
|
||||
String uploadId = UUID.randomUUID().toString();
|
||||
Path uploadDir = Paths.get(tempDir, uploadId);
|
||||
try {
|
||||
Files.createDirectories(uploadDir);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("无法创建临时上传目录", e);
|
||||
}
|
||||
|
||||
return Map.of("uploadId", uploadId, "chunkSize", CHUNK_SIZE);
|
||||
}
|
||||
|
||||
public void uploadChunk(String uploadId, int chunkIndex, byte[] bytes) throws IOException {
|
||||
Path chunkPath = Paths.get(tempDir, uploadId, String.valueOf(chunkIndex));
|
||||
Files.write(chunkPath, bytes);
|
||||
}
|
||||
|
||||
public void mergeAndProcess(String uploadId, String fileName) throws IOException {
|
||||
Path uploadDir = Paths.get(tempDir, uploadId);
|
||||
Path mergedFilePath = Paths.get(tempDir, fileName);
|
||||
|
||||
// 合并文件
|
||||
try (var destChannel = Files.newByteChannel(mergedFilePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
|
||||
for (int i = 0; ; i++) {
|
||||
Path chunkPath = uploadDir.resolve(String.valueOf(i));
|
||||
if (!Files.exists(chunkPath)) break;
|
||||
try (FileInputStream fis = new FileInputStream(chunkPath.toFile());
|
||||
FileChannel sourceChannel = fis.getChannel()) {
|
||||
sourceChannel.transferTo(0, sourceChannel.size(), destChannel);
|
||||
}
|
||||
Files.delete(chunkPath);
|
||||
}
|
||||
}
|
||||
Files.delete(uploadDir);
|
||||
|
||||
// 发送消息到RabbitMQ,而不是直接调用Service
|
||||
pushResultToMQ(mergedFilePath.toString(), uploadId);
|
||||
|
||||
}
|
||||
public void pushResultToMQ(String filePath,String uploadId) {
|
||||
VideoTaskPayload payload = new VideoTaskPayload(filePath, uploadId);
|
||||
rabbitTemplate.convertAndSend(
|
||||
RabbitMQConfig.EXCHANGE_NAME,
|
||||
RabbitMQConfig.ROUTING_KEY,
|
||||
payload
|
||||
);
|
||||
log.info("已发送视频处理结果到消息队列: {}" , payload);
|
||||
}
|
||||
public void pushResultToMQ(String filePath) {
|
||||
VideoTaskPayload payload = new VideoTaskPayload(filePath, UUID.randomUUID().toString());
|
||||
rabbitTemplate.convertAndSend(
|
||||
RabbitMQConfig.EXCHANGE_NAME,
|
||||
RabbitMQConfig.ROUTING_KEY,
|
||||
payload
|
||||
);
|
||||
log.info("已发送视频处理结果到消息队列: {}" , payload);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,181 @@
|
||||
package cn.nopj.chaos_api.service.impl;
|
||||
|
||||
import cn.nopj.chaos_api.config.RabbitMQConfig;
|
||||
import cn.nopj.chaos_api.config.AppConfig;
|
||||
import cn.nopj.chaos_api.dto.VideoTaskPayload;
|
||||
import cn.nopj.chaos_api.service.VideoProcessingService;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.core.io.FileSystemResource;
|
||||
import org.springframework.http.*;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.LinkedMultiValueMap;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class VideoProcessingServiceImpl implements VideoProcessingService {
|
||||
|
||||
@Value("${file.upload.ffmpeg-path}")
|
||||
private String ffmpegPath;
|
||||
@Value("${file.upload.temp-dir}")
|
||||
private String tempDir;
|
||||
|
||||
|
||||
@Autowired
|
||||
private AppConfig restTemplate;
|
||||
|
||||
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
|
||||
public void listenForVideoTasks(VideoTaskPayload payload) {
|
||||
System.out.println("收到视频处理任务: " + payload);
|
||||
processVideo(payload.getSourceFilePath(), payload.getUploadId());
|
||||
}
|
||||
|
||||
public void processVideo(String sourceFilePath, String uploadId) {
|
||||
Path sourcePath = Paths.get(sourceFilePath);
|
||||
String tempOutputDirName = "hls_temp_" + uploadId;
|
||||
Path tempOutputDirPath = Paths.get(tempDir, tempOutputDirName);
|
||||
|
||||
try {
|
||||
// 1. 创建临时HLS输出目录
|
||||
Files.createDirectories(tempOutputDirPath);
|
||||
String localM3u8Path = tempOutputDirPath.resolve("playlist.m3u8").toString();
|
||||
|
||||
|
||||
String segmentFilename = tempOutputDirPath.resolve("segment%05d.jpg").toString();
|
||||
|
||||
// 2. 执行FFmpeg命令进行切片
|
||||
ProcessBuilder processBuilder = new ProcessBuilder(
|
||||
ffmpegPath, "-i", sourceFilePath, "-c:v", "libx264", "-c:a", "aac",
|
||||
"-hls_time", "10", "-hls_list_size", "0", "-f", "hls", "-hls_segment_filename", segmentFilename, localM3u8Path
|
||||
);
|
||||
// ... (FFmpeg执行逻辑不变) ...
|
||||
processBuilder.redirectErrorStream(true);
|
||||
Process process = processBuilder.start();
|
||||
try (var reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
|
||||
String line; while ((line = reader.readLine()) != null) { System.out.println("FFmpeg: " + line); }
|
||||
}
|
||||
if (process.waitFor() != 0) { throw new RuntimeException("FFmpeg切片失败"); }
|
||||
|
||||
// 3. 【核心】上传所有.ts文件到外部接口,并收集返回的URL
|
||||
Map<String, String> segmentUrlMap = new HashMap<>();
|
||||
|
||||
File[] segmentFiles = tempOutputDirPath.toFile().listFiles((dir, name) -> name.endsWith(".jpg"));
|
||||
if (segmentFiles == null) { throw new RuntimeException("找不到生成的.jpg切片文件"); }
|
||||
|
||||
|
||||
|
||||
for (File segmentFile : segmentFiles) {
|
||||
String returnedUrl = uploadFileToExternalServer(segmentFile);
|
||||
if (returnedUrl == null) {
|
||||
throw new RuntimeException("上传文件 " + segmentFile.getName() + " 失败");
|
||||
}
|
||||
segmentUrlMap.put(segmentFile.getName(), returnedUrl);
|
||||
log.info("上传 {} 成功, 地址: {}", segmentFile.getName(), returnedUrl);
|
||||
}
|
||||
|
||||
// 4. 【核心】根据返回的URL生成新的m3u8内容
|
||||
String finalM3u8Content = createFinalM3u8Content(localM3u8Path, segmentUrlMap);
|
||||
|
||||
log.info("m3u8:{}",finalM3u8Content);
|
||||
// 5. 【核心】将最终的m3u8内容也上传到外部接口
|
||||
Path finalM3u8File = tempOutputDirPath.resolve("final_playlist.m3u8");
|
||||
Files.writeString(finalM3u8File, finalM3u8Content);
|
||||
String finalM3u8Url = uploadFileToExternalServer(finalM3u8File.toFile());
|
||||
|
||||
log.info("视频处理完成!最终M3U8访问地址:{}",finalM3u8Url);
|
||||
// 在这里,你可以将 finalM3u8Url 保存到数据库
|
||||
|
||||
} catch (IOException | InterruptedException e) {
|
||||
System.err.println("视频处理失败: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
// 6. 清理所有本地临时文件
|
||||
try {
|
||||
if (Files.exists(sourcePath)) { Files.delete(sourcePath); }
|
||||
if (Files.exists(tempOutputDirPath)) {
|
||||
Files.walk(tempOutputDirPath)
|
||||
.sorted(java.util.Comparator.reverseOrder())
|
||||
.map(Path::toFile)
|
||||
.forEach(File::delete);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 调用外部接口上传单个文件
|
||||
* @param file 要上传的文件
|
||||
* @return 外部接口返回的文件URL
|
||||
*/
|
||||
private String uploadFileToExternalServer(File file) {
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.setContentType(MediaType.MULTIPART_FORM_DATA);
|
||||
headers.add("x-auth-token","47880955-1882-44ec-a250-a97a8f31a4eb");
|
||||
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
|
||||
body.add("file", new FileSystemResource(file)); // "file"是常见的表单字段名,请根据您的接口修改
|
||||
HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(body, headers);
|
||||
|
||||
try {
|
||||
String requestUrl = "https://pinyp.vspjc.com:59789/melody/api/v1/oss/upload";
|
||||
|
||||
ResponseEntity<String> entity = restTemplate.restTemplate().postForEntity(requestUrl, requestEntity, String.class);
|
||||
if (entity.getStatusCode() != HttpStatus.OK){
|
||||
log.error("请求失败: {}", entity);
|
||||
throw new RuntimeException("请求失败");
|
||||
}
|
||||
|
||||
JSONObject jsonObject = JSONObject.parse(entity.getBody());
|
||||
|
||||
if (Objects.requireNonNull(jsonObject).getInteger("code") != 12200){
|
||||
log.error("上传失败: {}", jsonObject);
|
||||
throw new RuntimeException("上传失败,请检查接口是否失效");
|
||||
}
|
||||
|
||||
String data = jsonObject.getString("data");
|
||||
//data = https://qny-imimg.uuvuem.cn/df53/wx/20250719/8cc4f34394fb49ec90b1316ca9e26b86.jpg@,@image/jpeg@,@qiniu
|
||||
String[] split = data.split("@");
|
||||
String url = split[0];
|
||||
log.info("上传成功: {}", url);
|
||||
return url;
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("上传文件失败: {}", e.getMessage());
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
/**
|
||||
* 读取本地m3u8文件,并用远程URL替换ts文件名
|
||||
*/
|
||||
private String createFinalM3u8Content(String localM3u8Path, Map<String, String> tsUrlMap) throws IOException {
|
||||
StringBuilder newContent = new StringBuilder();
|
||||
try (BufferedReader reader = new BufferedReader(new FileReader(localM3u8Path))) {
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
if (line.endsWith(".jpg")) {
|
||||
String remoteUrl = tsUrlMap.get(line.trim());
|
||||
if (remoteUrl != null) {
|
||||
newContent.append(remoteUrl).append("\n");
|
||||
}
|
||||
} else {
|
||||
newContent.append(line).append("\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
return newContent.toString();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user