From c6d18d4979ad264c2eea7e170fdac648d4cc624c Mon Sep 17 00:00:00 2001 From: Chaos Date: Mon, 10 Nov 2025 19:34:14 +0800 Subject: [PATCH] =?UTF-8?q?refactor(video):=20=E7=A7=BB=E9=99=A4=E8=A7=86?= =?UTF-8?q?=E9=A2=91=E5=A4=84=E7=90=86=E7=9B=B8=E5=85=B3=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 删除 RabbitMQ 配置类及相关队列、交换机定义 - 移除视频控制器及其分片上传、合并处理接口 - 删除视频文件上传服务接口及实现类 - 移除视频处理服务接口及实现类 - 删除相关的 DTO 数据传输对象类 - 清理配置文件中 RabbitMQ 连接信息 --- .../nopj/chaos_api/dto/ProcessVideoPath.java | 8 - .../nopj/chaos_api/dto/VideoTaskPayload.java | 14 -- .../service/VideoFileUploadService.java | 45 ----- .../service/VideoProcessingService.java | 24 --- .../nopj/chaos_api/config/RabbitMQConfig.java | 37 ---- .../impl/VideoFileUploadServiceImpl.java | 92 --------- .../impl/VideoProcessingServiceImpl.java | 181 ------------------ .../chaos_api/controller/VideoController.java | 73 ------- .../src/main/resources/application.yaml | 5 - 9 files changed, 479 deletions(-) delete mode 100644 chaos_api_domain/src/main/java/cn/nopj/chaos_api/dto/ProcessVideoPath.java delete mode 100644 chaos_api_domain/src/main/java/cn/nopj/chaos_api/dto/VideoTaskPayload.java delete mode 100644 chaos_api_interface/src/main/java/cn/nopj/chaos_api/service/VideoFileUploadService.java delete mode 100644 chaos_api_interface/src/main/java/cn/nopj/chaos_api/service/VideoProcessingService.java delete mode 100644 chaos_api_service/src/main/java/cn/nopj/chaos_api/config/RabbitMQConfig.java delete mode 100644 chaos_api_service/src/main/java/cn/nopj/chaos_api/service/impl/VideoFileUploadServiceImpl.java delete mode 100644 chaos_api_service/src/main/java/cn/nopj/chaos_api/service/impl/VideoProcessingServiceImpl.java delete mode 100644 chaos_api_web/src/main/java/cn/nopj/chaos_api/controller/VideoController.java diff --git a/chaos_api_domain/src/main/java/cn/nopj/chaos_api/dto/ProcessVideoPath.java b/chaos_api_domain/src/main/java/cn/nopj/chaos_api/dto/ProcessVideoPath.java deleted file mode 100644 index 63cf2d3..0000000 --- a/chaos_api_domain/src/main/java/cn/nopj/chaos_api/dto/ProcessVideoPath.java +++ /dev/null @@ -1,8 +0,0 @@ -package cn.nopj.chaos_api.dto; - -import lombok.Data; - -@Data -public class ProcessVideoPath { - public String path; -} diff --git a/chaos_api_domain/src/main/java/cn/nopj/chaos_api/dto/VideoTaskPayload.java b/chaos_api_domain/src/main/java/cn/nopj/chaos_api/dto/VideoTaskPayload.java deleted file mode 100644 index c499f1c..0000000 --- a/chaos_api_domain/src/main/java/cn/nopj/chaos_api/dto/VideoTaskPayload.java +++ /dev/null @@ -1,14 +0,0 @@ -package cn.nopj.chaos_api.dto; - -import java.io.Serializable; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@NoArgsConstructor -@AllArgsConstructor -public class VideoTaskPayload implements Serializable { - private String sourceFilePath; - private String uploadId; -} \ No newline at end of file diff --git a/chaos_api_interface/src/main/java/cn/nopj/chaos_api/service/VideoFileUploadService.java b/chaos_api_interface/src/main/java/cn/nopj/chaos_api/service/VideoFileUploadService.java deleted file mode 100644 index f122e47..0000000 --- a/chaos_api_interface/src/main/java/cn/nopj/chaos_api/service/VideoFileUploadService.java +++ /dev/null @@ -1,45 +0,0 @@ -package cn.nopj.chaos_api.service; - -import java.io.IOException; -import java.util.Map; - -public interface VideoFileUploadService { - - /** - * 初始化上传 - * @param fileName 文件名 - * @param totalSize 文件大小 - * @return 上传信息 - */ - Map initUpload(String fileName, long totalSize); - - /** - * 上传分片 - * @param uploadId 上传ID - * @param chunkIndex 分片索引 - * @param bytes 分片内容 - * @throws IOException IO 异常 - */ - void uploadChunk(String uploadId, int chunkIndex, byte[] bytes) throws IOException; - - /** - * 合并并处理上传的文件 - * @param uploadId 上传ID - * @param fileName 文件名 - * @throws IOException IO 异常 - */ - void mergeAndProcess(String uploadId, String fileName) throws IOException; - - /** - * 将处理结果推送到MQ - * @param filePath 文件路径 - * @param uploadId 上传ID - */ - void pushResultToMQ(String filePath,String uploadId); - - /** - * 将处理结果推送到MQ - * @param filePath 文件路径 - */ - void pushResultToMQ(String filePath); -} diff --git a/chaos_api_interface/src/main/java/cn/nopj/chaos_api/service/VideoProcessingService.java b/chaos_api_interface/src/main/java/cn/nopj/chaos_api/service/VideoProcessingService.java deleted file mode 100644 index 6e91a59..0000000 --- a/chaos_api_interface/src/main/java/cn/nopj/chaos_api/service/VideoProcessingService.java +++ /dev/null @@ -1,24 +0,0 @@ -package cn.nopj.chaos_api.service; - -import cn.nopj.chaos_api.dto.VideoTaskPayload; - -import java.io.IOException; - -public interface VideoProcessingService { - /** - * 监听视频处理任务 - * - * @param payload 视频处理任务负载 - */ - void listenForVideoTasks(VideoTaskPayload payload); - - - /** - * 处理视频 - * - * @param sourceFilePath 源视频文件路径 - * @param uploadId 上传ID - */ - void processVideo(String sourceFilePath, String uploadId); - -} \ No newline at end of file diff --git a/chaos_api_service/src/main/java/cn/nopj/chaos_api/config/RabbitMQConfig.java b/chaos_api_service/src/main/java/cn/nopj/chaos_api/config/RabbitMQConfig.java deleted file mode 100644 index 45ac9db..0000000 --- a/chaos_api_service/src/main/java/cn/nopj/chaos_api/config/RabbitMQConfig.java +++ /dev/null @@ -1,37 +0,0 @@ -package cn.nopj.chaos_api.config; - -import org.springframework.amqp.core.Binding; -import org.springframework.amqp.core.BindingBuilder; -import org.springframework.amqp.core.DirectExchange; -import org.springframework.amqp.core.Queue; -import org.springframework.amqp.support.converter.MessageConverter; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration -public class RabbitMQConfig { - public static final String QUEUE_NAME = "video.processing.queue"; - public static final String EXCHANGE_NAME = "video.direct.exchange"; - public static final String ROUTING_KEY = "video.processing.key"; - - @Bean - public Queue videoQueue() { - // durable: true, 队列持久化 - return new Queue(QUEUE_NAME, true); - } - - @Bean - public DirectExchange videoExchange() { - return new DirectExchange(EXCHANGE_NAME); - } - - @Bean - public Binding binding(Queue queue, DirectExchange exchange) { - return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); - } - - @Bean - public MessageConverter jsonMessageConverter() { - return new Fastjson2MessageConverter(); - } -} diff --git a/chaos_api_service/src/main/java/cn/nopj/chaos_api/service/impl/VideoFileUploadServiceImpl.java b/chaos_api_service/src/main/java/cn/nopj/chaos_api/service/impl/VideoFileUploadServiceImpl.java deleted file mode 100644 index 19ec878..0000000 --- a/chaos_api_service/src/main/java/cn/nopj/chaos_api/service/impl/VideoFileUploadServiceImpl.java +++ /dev/null @@ -1,92 +0,0 @@ -package cn.nopj.chaos_api.service.impl; - -import cn.nopj.chaos_api.config.RabbitMQConfig; -import cn.nopj.chaos_api.dto.VideoTaskPayload; -import cn.nopj.chaos_api.service.VideoFileUploadService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; - -import java.io.FileInputStream; -import java.io.IOException; - -import java.nio.channels.FileChannel; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import java.util.Map; -import java.util.UUID; - -@Service -@Slf4j -public class VideoFileUploadServiceImpl implements VideoFileUploadService { - - @Value("${file.upload.temp-dir}") - private String tempDir; - - @Autowired - private RabbitTemplate rabbitTemplate; // 注入RabbitMQ模板 - - private static final long CHUNK_SIZE = 5 * 1024 * 1024; // 5MB - - public Map initUpload(String fileName, long totalSize) { - String uploadId = UUID.randomUUID().toString(); - Path uploadDir = Paths.get(tempDir, uploadId); - try { - Files.createDirectories(uploadDir); - } catch (IOException e) { - throw new RuntimeException("无法创建临时上传目录", e); - } - - return Map.of("uploadId", uploadId, "chunkSize", CHUNK_SIZE); - } - - public void uploadChunk(String uploadId, int chunkIndex, byte[] bytes) throws IOException { - Path chunkPath = Paths.get(tempDir, uploadId, String.valueOf(chunkIndex)); - Files.write(chunkPath, bytes); - } - - public void mergeAndProcess(String uploadId, String fileName) throws IOException { - Path uploadDir = Paths.get(tempDir, uploadId); - Path mergedFilePath = Paths.get(tempDir, fileName); - - // 合并文件 - try (var destChannel = Files.newByteChannel(mergedFilePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { - for (int i = 0; ; i++) { - Path chunkPath = uploadDir.resolve(String.valueOf(i)); - if (!Files.exists(chunkPath)) break; - try (FileInputStream fis = new FileInputStream(chunkPath.toFile()); - FileChannel sourceChannel = fis.getChannel()) { - sourceChannel.transferTo(0, sourceChannel.size(), destChannel); - } - Files.delete(chunkPath); - } - } - Files.delete(uploadDir); - - // 发送消息到RabbitMQ,而不是直接调用Service - pushResultToMQ(mergedFilePath.toString(), uploadId); - - } - public void pushResultToMQ(String filePath,String uploadId) { - VideoTaskPayload payload = new VideoTaskPayload(filePath, uploadId); - rabbitTemplate.convertAndSend( - RabbitMQConfig.EXCHANGE_NAME, - RabbitMQConfig.ROUTING_KEY, - payload - ); - log.info("已发送视频处理结果到消息队列: {}" , payload); - } - public void pushResultToMQ(String filePath) { - VideoTaskPayload payload = new VideoTaskPayload(filePath, UUID.randomUUID().toString()); - rabbitTemplate.convertAndSend( - RabbitMQConfig.EXCHANGE_NAME, - RabbitMQConfig.ROUTING_KEY, - payload - ); - log.info("已发送视频处理结果到消息队列: {}" , payload); - } -} diff --git a/chaos_api_service/src/main/java/cn/nopj/chaos_api/service/impl/VideoProcessingServiceImpl.java b/chaos_api_service/src/main/java/cn/nopj/chaos_api/service/impl/VideoProcessingServiceImpl.java deleted file mode 100644 index cdde474..0000000 --- a/chaos_api_service/src/main/java/cn/nopj/chaos_api/service/impl/VideoProcessingServiceImpl.java +++ /dev/null @@ -1,181 +0,0 @@ -package cn.nopj.chaos_api.service.impl; - -import cn.nopj.chaos_api.config.RabbitMQConfig; -import cn.nopj.chaos_api.config.AppConfig; -import cn.nopj.chaos_api.dto.VideoTaskPayload; -import cn.nopj.chaos_api.service.VideoProcessingService; -import com.alibaba.fastjson2.JSONObject; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.core.io.FileSystemResource; -import org.springframework.http.*; -import org.springframework.stereotype.Service; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -import java.io.*; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - -@Service -@Slf4j -public class VideoProcessingServiceImpl implements VideoProcessingService { - - @Value("${file.upload.ffmpeg-path}") - private String ffmpegPath; - @Value("${file.upload.temp-dir}") - private String tempDir; - - - @Autowired - private AppConfig restTemplate; - - @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME) - public void listenForVideoTasks(VideoTaskPayload payload) { - System.out.println("收到视频处理任务: " + payload); - processVideo(payload.getSourceFilePath(), payload.getUploadId()); - } - - public void processVideo(String sourceFilePath, String uploadId) { - Path sourcePath = Paths.get(sourceFilePath); - String tempOutputDirName = "hls_temp_" + uploadId; - Path tempOutputDirPath = Paths.get(tempDir, tempOutputDirName); - - try { - // 1. 创建临时HLS输出目录 - Files.createDirectories(tempOutputDirPath); - String localM3u8Path = tempOutputDirPath.resolve("playlist.m3u8").toString(); - - - String segmentFilename = tempOutputDirPath.resolve("segment%05d.jpg").toString(); - - // 2. 执行FFmpeg命令进行切片 - ProcessBuilder processBuilder = new ProcessBuilder( - ffmpegPath, "-i", sourceFilePath, "-c:v", "libx264", "-c:a", "aac", - "-hls_time", "10", "-hls_list_size", "0", "-f", "hls", "-hls_segment_filename", segmentFilename, localM3u8Path - ); - // ... (FFmpeg执行逻辑不变) ... - processBuilder.redirectErrorStream(true); - Process process = processBuilder.start(); - try (var reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { - String line; while ((line = reader.readLine()) != null) { System.out.println("FFmpeg: " + line); } - } - if (process.waitFor() != 0) { throw new RuntimeException("FFmpeg切片失败"); } - - // 3. 【核心】上传所有.ts文件到外部接口,并收集返回的URL - Map segmentUrlMap = new HashMap<>(); - - File[] segmentFiles = tempOutputDirPath.toFile().listFiles((dir, name) -> name.endsWith(".jpg")); - if (segmentFiles == null) { throw new RuntimeException("找不到生成的.jpg切片文件"); } - - - - for (File segmentFile : segmentFiles) { - String returnedUrl = uploadFileToExternalServer(segmentFile); - if (returnedUrl == null) { - throw new RuntimeException("上传文件 " + segmentFile.getName() + " 失败"); - } - segmentUrlMap.put(segmentFile.getName(), returnedUrl); - log.info("上传 {} 成功, 地址: {}", segmentFile.getName(), returnedUrl); - } - - // 4. 【核心】根据返回的URL生成新的m3u8内容 - String finalM3u8Content = createFinalM3u8Content(localM3u8Path, segmentUrlMap); - - log.info("m3u8:{}",finalM3u8Content); - // 5. 【核心】将最终的m3u8内容也上传到外部接口 - Path finalM3u8File = tempOutputDirPath.resolve("final_playlist.m3u8"); - Files.writeString(finalM3u8File, finalM3u8Content); - String finalM3u8Url = uploadFileToExternalServer(finalM3u8File.toFile()); - - log.info("视频处理完成!最终M3U8访问地址:{}",finalM3u8Url); - // 在这里,你可以将 finalM3u8Url 保存到数据库 - - } catch (IOException | InterruptedException e) { - System.err.println("视频处理失败: " + e.getMessage()); - e.printStackTrace(); - } finally { - // 6. 清理所有本地临时文件 - try { - if (Files.exists(sourcePath)) { Files.delete(sourcePath); } - if (Files.exists(tempOutputDirPath)) { - Files.walk(tempOutputDirPath) - .sorted(java.util.Comparator.reverseOrder()) - .map(Path::toFile) - .forEach(File::delete); - } - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - /** - * 调用外部接口上传单个文件 - * @param file 要上传的文件 - * @return 外部接口返回的文件URL - */ - private String uploadFileToExternalServer(File file) { - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.MULTIPART_FORM_DATA); - headers.add("x-auth-token","47880955-1882-44ec-a250-a97a8f31a4eb"); - MultiValueMap body = new LinkedMultiValueMap<>(); - body.add("file", new FileSystemResource(file)); // "file"是常见的表单字段名,请根据您的接口修改 - HttpEntity> requestEntity = new HttpEntity<>(body, headers); - - try { - String requestUrl = "https://pinyp.vspjc.com:59789/melody/api/v1/oss/upload"; - - ResponseEntity entity = restTemplate.restTemplate().postForEntity(requestUrl, requestEntity, String.class); - if (entity.getStatusCode() != HttpStatus.OK){ - log.error("请求失败: {}", entity); - throw new RuntimeException("请求失败"); - } - - JSONObject jsonObject = JSONObject.parse(entity.getBody()); - - if (Objects.requireNonNull(jsonObject).getInteger("code") != 12200){ - log.error("上传失败: {}", jsonObject); - throw new RuntimeException("上传失败,请检查接口是否失效"); - } - - String data = jsonObject.getString("data"); - //data = https://qny-imimg.uuvuem.cn/df53/wx/20250719/8cc4f34394fb49ec90b1316ca9e26b86.jpg@,@image/jpeg@,@qiniu - String[] split = data.split("@"); - String url = split[0]; - log.info("上传成功: {}", url); - return url; - - } catch (Exception e) { - log.error("上传文件失败: {}", e.getMessage()); - return null; - } - - } - /** - * 读取本地m3u8文件,并用远程URL替换ts文件名 - */ - private String createFinalM3u8Content(String localM3u8Path, Map tsUrlMap) throws IOException { - StringBuilder newContent = new StringBuilder(); - try (BufferedReader reader = new BufferedReader(new FileReader(localM3u8Path))) { - String line; - while ((line = reader.readLine()) != null) { - if (line.endsWith(".jpg")) { - String remoteUrl = tsUrlMap.get(line.trim()); - if (remoteUrl != null) { - newContent.append(remoteUrl).append("\n"); - } - } else { - newContent.append(line).append("\n"); - } - } - } - return newContent.toString(); - } -} diff --git a/chaos_api_web/src/main/java/cn/nopj/chaos_api/controller/VideoController.java b/chaos_api_web/src/main/java/cn/nopj/chaos_api/controller/VideoController.java deleted file mode 100644 index 6967097..0000000 --- a/chaos_api_web/src/main/java/cn/nopj/chaos_api/controller/VideoController.java +++ /dev/null @@ -1,73 +0,0 @@ -package cn.nopj.chaos_api.controller; - - -import cn.nopj.chaos_api.dto.ProcessVideoPath; -import cn.nopj.chaos_api.model.ApiResult; -import cn.nopj.chaos_api.service.VideoFileUploadService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.security.access.prepost.PreAuthorize; -import org.springframework.web.bind.annotation.*; -import org.springframework.web.multipart.MultipartFile; - -import java.io.IOException; -import java.util.Map; - -@RestController -@RequestMapping("/api/video") -@Slf4j -public class VideoController { - - @Autowired - private VideoFileUploadService videoFileUploadService; - - /** - * 初始化分片上传任务 - * @param fileInfo 包含文件名(fileName)和总大小(totalSize) - * @return 返回 uploadId 和每个分片的大小 - */ - @PostMapping("/init") - public ApiResult initUpload(@RequestBody Map fileInfo) { - log.info("初始化上传任务: {}", fileInfo); - String fileName = fileInfo.get("fileName").toString(); - long totalSize = Long.parseLong(fileInfo.get("totalSize").toString()); - return ApiResult.success(videoFileUploadService.initUpload(fileName, totalSize)); - } - - /** - * 上传分片 - * @param chunk 分片文件 - * @param uploadId 上传任务ID - * @param chunkIndex 分片序号 (从0开始) - */ - @PostMapping("/chunk") - - public ApiResult uploadChunk(@RequestParam("chunk") MultipartFile chunk, - @RequestParam("uploadId") String uploadId, - @RequestParam("chunkIndex") int chunkIndex) throws IOException { - videoFileUploadService.uploadChunk(uploadId, chunkIndex, chunk.getBytes()); - return ApiResult.success("分片 " + chunkIndex + " 上传成功"); - } - - /** - * 合并文件并触发异步处理 - * @param params 包含 uploadId 和 fileName - */ - @PostMapping("/merge") - public ApiResult mergeAndProcess(@RequestBody Map params) throws IOException { - String uploadId = params.get("uploadId"); - String fileName = params.get("fileName"); - videoFileUploadService.mergeAndProcess(uploadId, fileName); - return ApiResult.success("文件上传成功,已加入后台处理队列。"); - } - - /** - * 处理视频 - * @param processVideoPath 待处理的视频路径 - */ - @PostMapping("/process") - public ApiResult processVideo(@RequestBody ProcessVideoPath processVideoPath) throws IOException{ - videoFileUploadService.pushResultToMQ(processVideoPath.getPath()); - return ApiResult.success("视频处理任务已加入队列"); - } -} diff --git a/chaos_api_web/src/main/resources/application.yaml b/chaos_api_web/src/main/resources/application.yaml index a10e2c0..8513ac6 100644 --- a/chaos_api_web/src/main/resources/application.yaml +++ b/chaos_api_web/src/main/resources/application.yaml @@ -15,11 +15,6 @@ spring: min-idle: 5 max-active: 20 max-wait: 60000 - rabbitmq: - host: 10.91.3.24 - port: 5672 - username: chaos - password: zx123456.. servlet: multipart: max-file-size: 100GB