fix: 拉取无量云2.0数据下载,数据丢失等问题解决,修改线程逻辑 #72

Merged
yaoshuli merged 1 commits from fix/pullWLYImgDataBug into test 1 year ago

@ -61,6 +61,8 @@ public interface OcrPictureService extends BaseService<OcrPicture> {
*/ */
int savePictures(List<OcrPicture> ocrPictureList); int savePictures(List<OcrPicture> ocrPictureList);
void savePicturesInfo(OcrPicture ocrPicture);
ResultVo createPackageTask(List<OcrPicture> pageList, String tenantId, UserToken userToken, String searchMonth, HttpServletRequest req, String buessinessno, Map<String, Object> paramMap); ResultVo createPackageTask(List<OcrPicture> pageList, String tenantId, UserToken userToken, String searchMonth, HttpServletRequest req, String buessinessno, Map<String, Object> paramMap);
Map<String,String> getOcrPictureClassifyAndHash(String img); Map<String,String> getOcrPictureClassifyAndHash(String img);

@ -106,6 +106,7 @@ public class OcrCheckDuplicateServiceImpl extends ServiceImpl<OcrCheckDuplicateM
//异步调用查重任务 //异步调用查重任务
checkDuplicateService.comparePictureResultSave(ocrPictureList, sysUser, id); checkDuplicateService.comparePictureResultSave(ocrPictureList, sysUser, id);
checkDuplicate.setStatus(2); checkDuplicate.setStatus(2);
checkDuplicate.setCompletionTime(System.currentTimeMillis());
} catch (Exception e) { } catch (Exception e) {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
@ -115,6 +116,7 @@ public class OcrCheckDuplicateServiceImpl extends ServiceImpl<OcrCheckDuplicateM
logger.error("异步执行图片比对异常:", e); logger.error("异步执行图片比对异常:", e);
logger.error("异步执行图片比对异常:", e.getMessage()); logger.error("异步执行图片比对异常:", e.getMessage());
checkDuplicate.setStatus(3); checkDuplicate.setStatus(3);
checkDuplicate.setCompletionTime(System.currentTimeMillis());
} finally { } finally {
checkDuplicateService.updateById(checkDuplicate); checkDuplicateService.updateById(checkDuplicate);
} }

@ -26,6 +26,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import liquibase.pro.packaged.E;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -38,7 +39,9 @@ import javax.imageio.ImageIO;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import java.awt.color.ColorSpace; import java.awt.color.ColorSpace;
import java.awt.image.BufferedImage; import java.awt.image.BufferedImage;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.net.URL; import java.net.URL;
import java.net.URLConnection; import java.net.URLConnection;
@ -665,7 +668,7 @@ public class OcrPictureServiceImpl extends BaseServiceImpl<OcrPictureMybatisDao,
} }
@Override @Override
public int savePictures(List<OcrPicture> ocrPictureList) { public int savePictures(List<OcrPicture> ocrPictureList){
//1.字典数据 解析入库 //1.字典数据 解析入库
boolean b = dataDictionaryUtil.pictureDataDictionarySaveArray(ocrPictureList); boolean b = dataDictionaryUtil.pictureDataDictionarySaveArray(ocrPictureList);
@ -673,16 +676,26 @@ public class OcrPictureServiceImpl extends BaseServiceImpl<OcrPictureMybatisDao,
//2.调整图片地址,补齐域名 //2.调整图片地址,补齐域名
for (OcrPicture picture : ocrPictureList) { for (OcrPicture picture : ocrPictureList) {
picture.setCreateTime(System.currentTimeMillis()); //获取创建时间戳 picture.setCreateTime(System.currentTimeMillis()); //获取创建时间戳
// picture.setImgurl(interfaceDomain + picture.getImgurl());
} }
boolean saveBatch = this.saveBatch(ocrPictureList); try {
//3.添加图片信息 boolean saveBatch = false;
if (saveBatch) { synchronized(lock) {
this.savePicturesInfo(ocrPictureList); // 同步的代码块
this.convertImage(ocrPictureList); saveBatch = this.saveBatch(ocrPictureList);
return ocrPictureList.size(); }
//3.添加图片信息
if (saveBatch) {
// this.savePicturesInfo(ocrPictureList);
this.convertImage(ocrPictureList);
return ocrPictureList.size();
}
log.error("存储图片失败:{}",JSONObject.toJSON(ocrPictureList));
}catch (Exception e){
log.error("存储图片异常:{}",e.getMessage());
} }
return 0; return 0;
} }
@ -690,7 +703,7 @@ public class OcrPictureServiceImpl extends BaseServiceImpl<OcrPictureMybatisDao,
List<OcrPictureInfo> list = new ArrayList<>(); List<OcrPictureInfo> list = new ArrayList<>();
for (OcrPicture ocrPicture : ocrPictureList) { for (OcrPicture ocrPicture : ocrPictureList) {
if (StringUtils.isNotBlank(ocrPicture.getImgurl())) { if (StringUtils.isNotBlank(ocrPicture.getImgurl())) {
OcrPictureInfo imageInfo = getImageInfo(ocrPicture.getImgurl()); OcrPictureInfo imageInfo = getImageInfo(ocrPicture.getImgurl(),true);
if (ObjectUtil.isNotEmpty(imageInfo)) { if (ObjectUtil.isNotEmpty(imageInfo)) {
imageInfo.setPictureId(ocrPicture.getId()); imageInfo.setPictureId(ocrPicture.getId());
imageInfo.setImgName(ocrPicture.getImgname()); imageInfo.setImgName(ocrPicture.getImgname());
@ -701,18 +714,49 @@ public class OcrPictureServiceImpl extends BaseServiceImpl<OcrPictureMybatisDao,
ocrPictureInfoService.saveBatch(list); ocrPictureInfoService.saveBatch(list);
} }
private OcrPictureInfo getImageInfo(String imageUrl) { /**
*
* @param ocrPicture
*/
@Override
public void savePicturesInfo(OcrPicture ocrPicture) {
boolean isConnection = true;
String url = ocrPicture.getImgurl();
if(!StringUtils.isEmpty(ocrPicture.getLocalpictrueurl())){
url = ocrPicture.getLocalpictrueurl();
isConnection = false;
}
OcrPictureInfo imageInfo = getImageInfo(url,isConnection);
imageInfo.setPictureId(ocrPicture.getId());
imageInfo.setImgName(ocrPicture.getImgname());
ocrPictureInfoService.save(imageInfo);
}
try { /**
// 创建URL对象 *
URL url = new URL(imageUrl); * @param imageUrl
// 打开连接 * @param isConnection url
URLConnection urlConnection = url.openConnection(); * @return
// 获取图片大小 */
Integer imageSize = urlConnection.getContentLength(); private OcrPictureInfo getImageInfo(String imageUrl,boolean isConnection) {
// 使用ImageIO读取图片 try {
BufferedImage image = ImageIO.read(url); BufferedImage image = null;
Integer imageSize = 0;
if(isConnection){
// 创建URL对象
URL url = new URL(imageUrl);
// 打开连接
URLConnection urlConnection = url.openConnection();
// 获取图片大小
imageSize = urlConnection.getContentLength();
// 使用ImageIO读取图片
image= ImageIO.read(url);
}else{
File file = new File(imageUrl);
image = ImageIO.read(file);
imageSize =(int)file.length();
}
if (image != null) { if (image != null) {
OcrPictureInfo info = new OcrPictureInfo(); OcrPictureInfo info = new OcrPictureInfo();
@ -751,12 +795,14 @@ public class OcrPictureServiceImpl extends BaseServiceImpl<OcrPictureMybatisDao,
log.info("Cannot read image from URL."); log.info("Cannot read image from URL.");
return null; return null;
} }
} catch (IOException e) { } catch (Exception e) {
log.error("Error: " + e.getMessage()); log.error("Error: " + e.getMessage());
} }
return null; return null;
} }
private void convertImage(List<OcrPicture> list) { private void convertImage(List<OcrPicture> list) {
for (OcrPicture ocrPicture : list) { for (OcrPicture ocrPicture : list) {
PictureImgToLocalEntity entity = new PictureImgToLocalEntity(); PictureImgToLocalEntity entity = new PictureImgToLocalEntity();
@ -764,7 +810,22 @@ public class OcrPictureServiceImpl extends BaseServiceImpl<OcrPictureMybatisDao,
entity.setImgUrl(ocrPicture.getImgurl()); entity.setImgUrl(ocrPicture.getImgurl());
String imgurl = ocrPicture.getImgurl(); String imgurl = ocrPicture.getImgurl();
entity.setLocalPath(localImagePath + imgurl.substring(imgurl.lastIndexOf("/"))); entity.setLocalPath(localImagePath + imgurl.substring(imgurl.lastIndexOf("/")));
TaskQueue.pictureImgToLocalPushData(entity);
boolean addedToQueue = false;
int count = 1;
while (!addedToQueue){
try {
addedToQueue = TaskQueue.pictureImgToLocalPushData(entity);
if(addedToQueue) {
break;
}
Thread.sleep(5000);
} catch (InterruptedException e) {
log.error("放入图片下载队列,等待休眠异常:{}",e.getMessage());
}
count++;
}
} }
} }

@ -21,6 +21,7 @@ public class TaskThreadPool {
private static Logger logger = LoggerFactory.getLogger(TaskQueue.class); private static Logger logger = LoggerFactory.getLogger(TaskQueue.class);
ExecutorService threadPool = null; ExecutorService threadPool = null;
ExecutorService threadPoolImg = null;
public TaskThreadPool() { public TaskThreadPool() {
if (threadPool == null) { if (threadPool == null) {
@ -32,6 +33,15 @@ public class TaskThreadPool {
new LinkedBlockingDeque<>(), new LinkedBlockingDeque<>(),
Executors.defaultThreadFactory(), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()); new ThreadPoolExecutor.DiscardOldestPolicy());
threadPoolImg = new ThreadPoolExecutor(
10,
40,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
} }
} }
@ -45,8 +55,8 @@ public class TaskThreadPool {
@SneakyThrows @SneakyThrows
@Override @Override
public void run() { public void run() {
logger.debug("无量云接口数据处理消费线程检测中...");
while (true) { while (true) {
logger.debug("无量云接口数据处理消费线程检测中...,队列数量:{}",TaskQueue.pictureDisposeQueue.size());
PictureSourceResult pictureSourceResult = TaskQueue.pictureDisposePullData(); PictureSourceResult pictureSourceResult = TaskQueue.pictureDisposePullData();
if (pictureSourceResult != null) { if (pictureSourceResult != null) {
threadPool.execute(new PictureDisposeTask(pictureSourceResult)); threadPool.execute(new PictureDisposeTask(pictureSourceResult));
@ -71,13 +81,12 @@ public class TaskThreadPool {
@SneakyThrows @SneakyThrows
@Override @Override
public void run() { public void run() {
logger.debug("图片下载消费线程检测中...");
while (true) { while (true) {
logger.debug("图片下载消费线程检测中...,队列数量:{}",TaskQueue.pictureImgToLocalQueue.size());
try { try {
PictureImgToLocalEntity pictureImgToLocalEntity = TaskQueue.pictureImgToLocalPullData(); PictureImgToLocalEntity pictureImgToLocalEntity = TaskQueue.pictureImgToLocalPullData();
if (pictureImgToLocalEntity != null) { if (pictureImgToLocalEntity != null) {
threadPool.execute(new PictureImgToLocalTask(pictureImgToLocalEntity)); threadPoolImg.execute(new PictureImgToLocalTask(pictureImgToLocalEntity));
} else { } else {
Thread.sleep(5000); Thread.sleep(5000);
} }

@ -80,6 +80,10 @@ public class PictureImgToLocalTask implements Runnable {
// picture.setPhotoDateTimestamp(timestamp); // picture.setPhotoDateTimestamp(timestamp);
// } // }
ocrPictureService.updateById(picture); ocrPictureService.updateById(picture);
logger.info("图片本地化,回写数据完成!");
logger.info("开启获取图片信息操作!");
ocrPictureService.savePicturesInfo(picture);
logger.info("获取图片信息操作,结束!");
} }
} }
} }

@ -5,6 +5,8 @@ import org.slf4j.LoggerFactory;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.InputStream; import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URL; import java.net.URL;
/** /**
@ -33,7 +35,11 @@ public class DownloadImgUtil {
out.write(buffer, 0, n); out.write(buffer, 0, n);
} }
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage()); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
String exceptionDetails = sw.toString(); // 包含堆栈跟踪信息的字符串
logger.error("图片本地化,转储图片异常:",exceptionDetails);
return false; return false;
} }
return true; return true;

Loading…
Cancel
Save