From 82cd93d232be36eb206a97dc10e7c1a14cf6c832 Mon Sep 17 00:00:00 2001 From: zhouwentao <1577701412@qq.com> Date: Fri, 4 Aug 2023 14:55:44 +0800 Subject: [PATCH] updates --- .../modules/api/controller/ApiController.java | 13 +- .../modules/ocr/init/HandleTransInit.java | 176 ++++++++++++++++++ .../service/impl/OcrIdentifyServiceImpl.java | 2 +- .../modules/ocr/service/impl/TaskService.java | 133 +++++++++++++ 4 files changed, 319 insertions(+), 5 deletions(-) create mode 100644 jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/ocr/init/HandleTransInit.java create mode 100644 jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/ocr/service/impl/TaskService.java diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/api/controller/ApiController.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/api/controller/ApiController.java index 5a8f46a..ec06a42 100644 --- a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/api/controller/ApiController.java +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/api/controller/ApiController.java @@ -18,15 +18,18 @@ import org.jeecg.modules.ocr.entity.OcrRuleCheck; import org.jeecg.modules.ocr.service.IOcrIdentifyDetailService; import org.jeecg.modules.ocr.service.IOcrIdentifyService; import org.jeecg.modules.ocr.service.IOcrRuleCheckService; +import org.jeecg.modules.ocr.service.impl.TaskService; import org.jeecg.modules.ocr.utils.FileOUtils; import org.jeecg.modules.ocr.vo.OcrRuleCheckVo; import org.jeecg.modules.system.service.ISysDictService; +import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -50,7 +53,8 @@ public class ApiController { private ISysDictService sysDictService; @Resource private RedisUtil redisUtil; - + @Resource + private TaskService taskService; ////===================================================================伪接口 @ApiOperation(value = "通用识别") @RequestMapping("/identify") @@ -206,6 +210,7 @@ public class ApiController { @ApiOperation(value = "刷新为完成的任务", notes = "任务完结通知") @GetMapping("/restartTask") + @Async public Result restartTask() throws InterruptedException { //获取未执行完的任务 List list = ocrIdentifyService.list(new LambdaQueryWrapper().eq(OcrIdentify::getStatus, "0")); @@ -214,9 +219,9 @@ public class ApiController { ocrIdentifyDetailService.remove(new LambdaQueryWrapper().in(OcrIdentifyDetail::getIdentifyId,identifyIdList)); for (OcrIdentify ocrIdentify : list) { log.info("打印ocrIdentifyId:"+ocrIdentify.getId()); - List identifyUrlList = FileOUtils.fileLists(null, ocrIdentify.getIdentifyUrl()); - Thread.sleep(1000L); - ocrIdentifyService.postSemantic(ocrIdentify,identifyUrlList); + //List identifyUrlList = FileOUtils.fileLists(null, ocrIdentify.getIdentifyUrl()); + List identifyUrlList = Arrays.asList("1","2"); + taskService.postSemantic(ocrIdentify,identifyUrlList); } } return Result.OK(list.size()+"个任务."); diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/ocr/init/HandleTransInit.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/ocr/init/HandleTransInit.java new file mode 100644 index 0000000..2a8922f --- /dev/null +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/ocr/init/HandleTransInit.java @@ -0,0 +1,176 @@ +package org.jeecg.modules.ocr.init; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.jeecg.common.constant.OcrConstant; +import org.jeecg.common.util.RedisUtil; +import org.jeecg.common.util.RestUtil; +import org.jeecg.modules.ocr.entity.OcrIdentify; +import org.jeecg.modules.ocr.model.TaskModel; +import org.jeecg.modules.ocr.service.IOcrIdentifyService; +import org.jeecg.modules.ocr.service.impl.TaskService; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.Async; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * @Description + * @Author ZhouWenTao + * @Date 2023/8/4 11:39 + */ +@Configuration +@Slf4j +public class HandleTransInit implements ApplicationRunner { + @Resource + RedisUtil redisUtil; + @Resource + TaskService taskService; + @Resource + IOcrIdentifyService ocrIdentifyService; + /** + * 处理转账数据 + * + * @param args + */ + @Override + @Async + public void run(ApplicationArguments args) { + try { + Thread.sleep(5000l); + } catch (InterruptedException e) { + e.printStackTrace(); + } + while (true) { + try { + Thread.sleep(1000l); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + log.info("---------检测是否有任务..."); + List taskList = taskService.getTaskList(); + if (taskList.size()==0) { + log.info("无任务."); + continue; + } + for (TaskModel taskModel : taskList) { + //刷新redis,执行中 + flushTask(taskModel.getTaskId(), 1); + //执行 + executeTaskp(taskModel); + //刷新待运行任务 + log.error("任务已执行:"+taskModel.getTaskId()); + } + } + } + + /** + * 刷新任务状态 + * + * @param taskId + * @param status 状态 1 运行中,2 已结束 + */ + public void flushTask(String taskId, int status) { + List taskList = taskService.getTaskList(); + if (taskList != null && taskList.size() > 0) { + TaskModel taskModel = taskList.stream().filter(t -> t.getTaskId().equals(taskId)).findFirst().get(); + if (taskModel != null) { + int taskLevel = taskModel.getTaskLevel(); + String task = (String) redisUtil.get("task_identify_" + taskLevel); + JSONArray taskJsonArray = JSONObject.parseArray(task); + taskList = new ArrayList<>(); + if (task != null) { + taskList.addAll(taskJsonArray.toJavaList(TaskModel.class)); + } + if (status == 1) { + //更新任务状态 + taskList.forEach(t -> { + if (t.getTaskId().equals(taskId)) { + t.setTaskStatus("1"); + } + }); + } else if (status == 2) { + //删除任务 + taskList = taskList.stream().filter(t -> !t.getTaskId().equals(taskId)).collect(Collectors.toList()); + } + String s = JSONObject.toJSONString(taskList); + //存入redis + redisUtil.set("task_identify_" + taskLevel, s); + } + } + } + + //执行任务 + public void executeTaskp(TaskModel taskModel) { + //睡眠 + try { + String parameter = taskModel.getParameter(); + // 任务类型 + if ("identify".equals(taskModel.getTaskType())) { + String[] parameters = parameter.split(","); + //通用识别 + String masterTaskId = parameters[0];//主任务redis + String task_id = parameters[1]; + String image = parameters[2]; + JSONObject requestBody = new JSONObject(); + requestBody.put("task_id", task_id); + requestBody.put("img_path", image); + //更新主任务,正在识别中. + ocrIdentifyService.updateMasterTaskStartTime(task_id.split("_")[0]); + JSONObject semanticResponseJson = RestUtil.post(OcrConstant.api_test2_identify_url, requestBody); + semanticResponseJson.put("identifyId", masterTaskId); + log.info("ocr识别返回数据:"); + log.info(semanticResponseJson.toJSONString()); + ocrIdentifyService.getSemanticInfo(semanticResponseJson); + //该子任务已执行,判断主任务是否残留 + String masterTask = (String) redisUtil.get("identify_" + masterTaskId); + if (StringUtils.isNotBlank(masterTask)) { + //主任务中排除当前任务 + String collect = Arrays.asList(masterTask.split(",")).stream().filter(t -> !t.equals(task_id)).collect(Collectors.joining(",")); + if (StringUtils.isBlank(collect)) { + //如果主任务下的子任务已清空,删除key + redisUtil.del("identify_" + masterTaskId); + //刷新Ocr识别任务状态 + ocrIdentifyService.updateOcrIdentifyStatus(masterTaskId, "1"); + log.error("更新任务状态,id:" + masterTaskId); + } else { + //主任务还存在,刷新主任务明细 + redisUtil.set("identify_" + masterTaskId, collect); + } + } + } else { + Thread.sleep(2000l); + } + + + } catch (InterruptedException e) { + e.printStackTrace(); + } + //模拟执行成功 + String overTaskIds = (String) redisUtil.get("over_task_"+taskModel.getTaskType()); + if (org.apache.commons.lang.StringUtils.isBlank(overTaskIds)) { + overTaskIds = taskModel.getTaskId(); + } else { + if (!Arrays.asList(overTaskIds.split(",")).contains(taskModel.getTaskId())) { + //该任务未执行过 + overTaskIds += "," + taskModel.getTaskId(); + } else { + //该任务已结束过 + } + } + //存入已执行 redis里 + redisUtil.set("over_task_"+taskModel.getTaskType(), overTaskIds); + log.error(taskModel.getTaskLevel() + "-级别," + taskModel.getTaskId() + "-已执行"); + //从3中任务集中,删除该任务 + flushTask(taskModel.getTaskId(), 2); + } +} diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/ocr/service/impl/OcrIdentifyServiceImpl.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/ocr/service/impl/OcrIdentifyServiceImpl.java index 2f8f85d..bae024f 100644 --- a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/ocr/service/impl/OcrIdentifyServiceImpl.java +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/ocr/service/impl/OcrIdentifyServiceImpl.java @@ -245,7 +245,7 @@ public class OcrIdentifyServiceImpl extends ServiceImpl identifyUrlList) { + String id = ocrIdentify.getId(); + String masterTask = (String) redisUtil.get("identify_" + id); + if (StringUtils.isNotBlank(masterTask)) { + log.error("任务已存在_" + ocrIdentify.getId()); + return; + } + String priority = ocrIdentify.getPriority();//加急状态;//任务优先级 + String identifyUrl = null; + String taskType = "identify"; + //拼接子任务id + String taskId = null; + StringBuffer task_master = new StringBuffer(); + for (int i = 0; i < identifyUrlList.size(); i++) { + taskId = String.format(id + "_" + (i + 1)); + task_master.append(taskId).append(","); + } + + //执行子任务. + JSONObject requestBody = new JSONObject(); + TaskModel taskModel = new TaskModel(); + if (identifyUrlList.size()>0) { + redisUtil.set("identify_" + id, task_master.toString()); + for (int i = 0; i < identifyUrlList.size(); i++) { + identifyUrl = identifyUrlList.get(i); + taskModel.setTaskId(taskId); + taskModel.setTaskType("identify"); + taskModel.setTaskLevel(Integer.valueOf(priority)); + taskModel.setTaskStatus("0"); + taskModel.setParameter(id + "," + taskId + "," + identifyUrl); + //保存子任务 + saveTask(taskModel); + } + } + + } + + public Result saveTask(TaskModel taskModel) { + String taskId = taskModel.getTaskId(); + String task_ = (String) redisUtil.get("task_" +taskModel.getTaskType()+"_"+taskModel.getTaskLevel()); + if (task_ == null || task_.equals("[]")) { + //无历史任务 + List jsonObjects = Arrays.asList(taskModel); + task_ = JSONObject.toJSONString(jsonObjects); + } else { + //有历史任务 + JSONArray jsonArray = JSONObject.parseArray(task_); + if (jsonArray != null) { + List taskList = jsonArray.toJavaList(TaskModel.class); + List staskList = taskList.stream().filter(t -> t.getTaskId().equals(taskId)).collect(Collectors.toList()); + TaskModel task = null; + if (staskList == null || staskList.size() == 0) { + //该任务不存在, 看看 库表里 该任务是否已执行成功了 + String overTask = (String) redisUtil.get("over_task_"+taskModel.getTaskType()); + if (org.apache.commons.lang.StringUtils.isNotBlank(overTask) && Arrays.asList(overTask.split(",")).contains(taskId)) { + //库表里已执行过 + return Result.OK("该任务已执行结束"); + } else { + //库表中未执行过,追加任务 + taskList.add(taskModel); + task_ = JSONObject.toJSONString(taskList); + } + } else if ("0".equals(staskList.get(0).getTaskStatus())) { + //该任务 待运行 + return Result.OK("该任务还处于排队中"); + } else if ("1".equals(staskList.get(0).getTaskStatus())) { + //该任务 运行中 + return Result.OK("该任务还处于运行中"); + } + } + } + //存入redis + redisUtil.set("task_" +taskModel.getTaskType()+"_"+taskModel.getTaskLevel(), task_); + //执行任务 + //executeTask(); + return Result.OK("已追加到任务"); + } + + //获取全部任务 + public List getTaskList() { + String task_0 = (String) redisUtil.get("task_identify_0"); + String task_1 = (String) redisUtil.get("task_identify_1"); + /*String task_2 = (String) redisUtil.get("task_2"); + String task_3 = (String) redisUtil.get("task_3");*/ + JSONArray task0JsonArray = JSONObject.parseArray(task_0); + JSONArray task1JsonArray = JSONObject.parseArray(task_1); + /*JSONArray task2JsonArray = JSONObject.parseArray(task_2); + JSONArray task3JsonArray = JSONObject.parseArray(task_3);*/ + List taskList = new ArrayList<>(); + if (task0JsonArray != null) { + taskList.addAll(task0JsonArray.toJavaList(TaskModel.class)); + } + if (task1JsonArray != null) { + taskList.addAll(task1JsonArray.toJavaList(TaskModel.class)); + } + /*if (task2JsonArray != null) { + taskList.addAll(task2JsonArray.toJavaList(TaskModel.class)); + } + if (task3JsonArray != null) { + taskList.addAll(task3JsonArray.toJavaList(TaskModel.class)); + }*/ + return taskList; + } +}