fix: 解决拉取无量云2.0数据丢失,线程未进行等待,请求接口太频繁问题

fix/pullWLYDataBug
shuliYao 1 year ago
parent 32f5a5995b
commit 758a5ee8f5

@ -6,6 +6,7 @@ import cn.jyjz.xiaoyao.common.base.vo.ResultVo;
import cn.jyjz.xiaoyao.common.base.vo.UserToken; import cn.jyjz.xiaoyao.common.base.vo.UserToken;
import cn.jyjz.xiaoyao.common.mybatisplus.base.BaseService; import cn.jyjz.xiaoyao.common.mybatisplus.base.BaseService;
import cn.jyjz.xiaoyao.ocr.dataobject.OcrPicture; import cn.jyjz.xiaoyao.ocr.dataobject.OcrPicture;
import cn.jyjz.xiaoyao.ocr.dataobject.OcrPictureInfo;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.Wrapper; import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
@ -61,6 +62,12 @@ public interface OcrPictureService extends BaseService<OcrPicture> {
*/ */
int savePictures(List<OcrPicture> ocrPictureList); int savePictures(List<OcrPicture> ocrPictureList);
/**
*
* @param ocrPicture
*/
void savePicturesInfo(OcrPicture ocrPicture);
ResultVo createPackageTask(List<OcrPicture> pageList, String tenantId, UserToken userToken, String searchMonth, HttpServletRequest req, String buessinessno, Map<String, Object> paramMap); ResultVo createPackageTask(List<OcrPicture> pageList, String tenantId, UserToken userToken, String searchMonth, HttpServletRequest req, String buessinessno, Map<String, Object> paramMap);
Map<String,String> getOcrPictureClassifyAndHash(String img); Map<String,String> getOcrPictureClassifyAndHash(String img);

@ -26,6 +26,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import liquibase.pro.packaged.E;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -38,7 +39,9 @@ import javax.imageio.ImageIO;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import java.awt.color.ColorSpace; import java.awt.color.ColorSpace;
import java.awt.image.BufferedImage; import java.awt.image.BufferedImage;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.net.URL; import java.net.URL;
import java.net.URLConnection; import java.net.URLConnection;
@ -665,7 +668,7 @@ public class OcrPictureServiceImpl extends BaseServiceImpl<OcrPictureMybatisDao,
} }
@Override @Override
public int savePictures(List<OcrPicture> ocrPictureList) { public int savePictures(List<OcrPicture> ocrPictureList){
//1.字典数据 解析入库 //1.字典数据 解析入库
boolean b = dataDictionaryUtil.pictureDataDictionarySaveArray(ocrPictureList); boolean b = dataDictionaryUtil.pictureDataDictionarySaveArray(ocrPictureList);
@ -673,16 +676,26 @@ public class OcrPictureServiceImpl extends BaseServiceImpl<OcrPictureMybatisDao,
//2.调整图片地址,补齐域名 //2.调整图片地址,补齐域名
for (OcrPicture picture : ocrPictureList) { for (OcrPicture picture : ocrPictureList) {
picture.setCreateTime(System.currentTimeMillis()); //获取创建时间戳 picture.setCreateTime(System.currentTimeMillis()); //获取创建时间戳
// picture.setImgurl(interfaceDomain + picture.getImgurl());
} }
boolean saveBatch = this.saveBatch(ocrPictureList); try {
//3.添加图片信息 boolean saveBatch = false;
if (saveBatch) { synchronized(lock) {
this.savePicturesInfo(ocrPictureList); // 同步的代码块
this.convertImage(ocrPictureList); saveBatch = this.saveBatch(ocrPictureList);
return ocrPictureList.size(); }
//3.添加图片信息
if (saveBatch) {
// this.savePicturesInfo(ocrPictureList);
this.convertImage(ocrPictureList);
return ocrPictureList.size();
}
log.error("存储图片失败:{}",JSONObject.toJSON(ocrPictureList));
}catch (Exception e){
log.error("存储图片异常:{}",e.getMessage());
} }
return 0; return 0;
} }
@ -690,7 +703,7 @@ public class OcrPictureServiceImpl extends BaseServiceImpl<OcrPictureMybatisDao,
List<OcrPictureInfo> list = new ArrayList<>(); List<OcrPictureInfo> list = new ArrayList<>();
for (OcrPicture ocrPicture : ocrPictureList) { for (OcrPicture ocrPicture : ocrPictureList) {
if (StringUtils.isNotBlank(ocrPicture.getImgurl())) { if (StringUtils.isNotBlank(ocrPicture.getImgurl())) {
OcrPictureInfo imageInfo = getImageInfo(ocrPicture.getImgurl()); OcrPictureInfo imageInfo = getImageInfo(ocrPicture.getImgurl(),true);
if (ObjectUtil.isNotEmpty(imageInfo)) { if (ObjectUtil.isNotEmpty(imageInfo)) {
imageInfo.setPictureId(ocrPicture.getId()); imageInfo.setPictureId(ocrPicture.getId());
imageInfo.setImgName(ocrPicture.getImgname()); imageInfo.setImgName(ocrPicture.getImgname());
@ -701,18 +714,49 @@ public class OcrPictureServiceImpl extends BaseServiceImpl<OcrPictureMybatisDao,
ocrPictureInfoService.saveBatch(list); ocrPictureInfoService.saveBatch(list);
} }
private OcrPictureInfo getImageInfo(String imageUrl) { /**
*
* @param ocrPicture
*/
@Override
public void savePicturesInfo(OcrPicture ocrPicture) {
boolean isConnection = true;
String url = ocrPicture.getImgurl();
if(!StringUtils.isEmpty(ocrPicture.getLocalpictrueurl())){
url = ocrPicture.getLocalpictrueurl();
isConnection = false;
}
OcrPictureInfo imageInfo = getImageInfo(url,isConnection);
imageInfo.setPictureId(ocrPicture.getId());
imageInfo.setImgName(ocrPicture.getImgname());
ocrPictureInfoService.save(imageInfo);
}
try { /**
// 创建URL对象 *
URL url = new URL(imageUrl); * @param imageUrl
// 打开连接 * @param isConnection url
URLConnection urlConnection = url.openConnection(); * @return
// 获取图片大小 */
Integer imageSize = urlConnection.getContentLength(); private OcrPictureInfo getImageInfo(String imageUrl,boolean isConnection) {
// 使用ImageIO读取图片 try {
BufferedImage image = ImageIO.read(url); BufferedImage image = null;
Integer imageSize = 0;
if(isConnection){
// 创建URL对象
URL url = new URL(imageUrl);
// 打开连接
URLConnection urlConnection = url.openConnection();
// 获取图片大小
imageSize = urlConnection.getContentLength();
// 使用ImageIO读取图片
image= ImageIO.read(url);
}else{
File file = new File(imageUrl);
image = ImageIO.read(file);
imageSize =(int)file.length();
}
if (image != null) { if (image != null) {
OcrPictureInfo info = new OcrPictureInfo(); OcrPictureInfo info = new OcrPictureInfo();
@ -751,12 +795,14 @@ public class OcrPictureServiceImpl extends BaseServiceImpl<OcrPictureMybatisDao,
log.info("Cannot read image from URL."); log.info("Cannot read image from URL.");
return null; return null;
} }
} catch (IOException e) { } catch (Exception e) {
log.error("Error: " + e.getMessage()); log.error("Error: " + e.getMessage());
} }
return null; return null;
} }
private void convertImage(List<OcrPicture> list) { private void convertImage(List<OcrPicture> list) {
for (OcrPicture ocrPicture : list) { for (OcrPicture ocrPicture : list) {
PictureImgToLocalEntity entity = new PictureImgToLocalEntity(); PictureImgToLocalEntity entity = new PictureImgToLocalEntity();
@ -764,7 +810,22 @@ public class OcrPictureServiceImpl extends BaseServiceImpl<OcrPictureMybatisDao,
entity.setImgUrl(ocrPicture.getImgurl()); entity.setImgUrl(ocrPicture.getImgurl());
String imgurl = ocrPicture.getImgurl(); String imgurl = ocrPicture.getImgurl();
entity.setLocalPath(localImagePath + imgurl.substring(imgurl.lastIndexOf("/"))); entity.setLocalPath(localImagePath + imgurl.substring(imgurl.lastIndexOf("/")));
TaskQueue.pictureImgToLocalPushData(entity);
boolean addedToQueue = false;
int count = 1;
while (!addedToQueue && count<=5){
try {
addedToQueue = TaskQueue.pictureImgToLocalPushData(entity);
if(addedToQueue) {
break;
}
Thread.sleep(30000);
} catch (InterruptedException e) {
log.error("放入图片下载队列,等待休眠异常:{}",e.getMessage());
}
count++;
}
} }
} }

@ -20,12 +20,12 @@ public class TaskQueue {
/** /**
* *
*/ */
public static LinkedBlockingQueue pictureDisposeQueue = new LinkedBlockingQueue(); public static LinkedBlockingQueue pictureDisposeQueue = new LinkedBlockingQueue(1000);
/** /**
* picture * picture
*/ */
public static LinkedBlockingQueue pictureImgToLocalQueue = new LinkedBlockingQueue(); public static LinkedBlockingQueue pictureImgToLocalQueue = new LinkedBlockingQueue(1000);
/** /**

@ -21,6 +21,7 @@ public class TaskThreadPool {
private static Logger logger = LoggerFactory.getLogger(TaskQueue.class); private static Logger logger = LoggerFactory.getLogger(TaskQueue.class);
ExecutorService threadPool = null; ExecutorService threadPool = null;
ExecutorService threadPoolImg = null;
public TaskThreadPool() { public TaskThreadPool() {
if (threadPool == null) { if (threadPool == null) {
@ -32,6 +33,15 @@ public class TaskThreadPool {
new LinkedBlockingDeque<>(), new LinkedBlockingDeque<>(),
Executors.defaultThreadFactory(), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()); new ThreadPoolExecutor.DiscardOldestPolicy());
threadPoolImg = new ThreadPoolExecutor(
10,
40,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
} }
} }
@ -45,8 +55,8 @@ public class TaskThreadPool {
@SneakyThrows @SneakyThrows
@Override @Override
public void run() { public void run() {
logger.debug("无量云接口数据处理消费线程检测中...");
while (true) { while (true) {
logger.debug("无量云接口数据处理消费线程检测中...,队列数量:{}",TaskQueue.pictureDisposeQueue.size());
PictureSourceResult pictureSourceResult = TaskQueue.pictureDisposePullData(); PictureSourceResult pictureSourceResult = TaskQueue.pictureDisposePullData();
if (pictureSourceResult != null) { if (pictureSourceResult != null) {
threadPool.execute(new PictureDisposeTask(pictureSourceResult)); threadPool.execute(new PictureDisposeTask(pictureSourceResult));
@ -71,13 +81,12 @@ public class TaskThreadPool {
@SneakyThrows @SneakyThrows
@Override @Override
public void run() { public void run() {
logger.debug("图片下载消费线程检测中...");
while (true) { while (true) {
logger.debug("图片下载消费线程检测中...,队列数量:{}",TaskQueue.pictureImgToLocalQueue.size());
try { try {
PictureImgToLocalEntity pictureImgToLocalEntity = TaskQueue.pictureImgToLocalPullData(); PictureImgToLocalEntity pictureImgToLocalEntity = TaskQueue.pictureImgToLocalPullData();
if (pictureImgToLocalEntity != null) { if (pictureImgToLocalEntity != null) {
threadPool.execute(new PictureImgToLocalTask(pictureImgToLocalEntity)); threadPoolImg.execute(new PictureImgToLocalTask(pictureImgToLocalEntity));
} else { } else {
Thread.sleep(5000); Thread.sleep(5000);
} }

@ -205,9 +205,9 @@ public class PictureDisposeTask implements Runnable{
picture.setSubmitDateTimestamp(pictureSourceResult.getSubmitTime()); picture.setSubmitDateTimestamp(pictureSourceResult.getSubmitTime());
// picture.setSubmitDateTimestamp(System.currentTimeMillis()); // picture.setSubmitDateTimestamp(System.currentTimeMillis());
picture.setPhotoDateTimestamp(System.currentTimeMillis()); picture.setPhotoDateTimestamp(System.currentTimeMillis());
//
System.out.println("SubmitDateTimestamp1=================>>>>>>>"+pictureSourceResult.getSubmitTime()); // System.out.println("SubmitDateTimestamp1=================>>>>>>>"+pictureSourceResult.getSubmitTime());
System.out.println("SubmitDateTimestamp2=================>>>>>>>"+picture.getSubmitDateTimestamp()); // System.out.println("SubmitDateTimestamp2=================>>>>>>>"+picture.getSubmitDateTimestamp());
//定位距离 //定位距离
picture.setField12(pictureSourceResult.getDistance()); picture.setField12(pictureSourceResult.getDistance());
@ -230,6 +230,7 @@ public class PictureDisposeTask implements Runnable{
//调用图片入库方法 //调用图片入库方法
int size = ocrPictureService.savePictures(ocrPictureList); int size = ocrPictureService.savePictures(ocrPictureList);
} }

@ -74,12 +74,17 @@ public class PictureImgToLocalTask implements Runnable {
picture.setImgHash(ocrPictureClassifyAndHash.get("hash")); picture.setImgHash(ocrPictureClassifyAndHash.get("hash"));
picture.setClassificationid(ocrPictureClassifyAndHash.get("classId")); picture.setClassificationid(ocrPictureClassifyAndHash.get("classId"));
} }
// 获取拍照时间 // 获取拍照时间
// Long timestamp = ocrPictureService.getPhotoDateTime(picture); // Long timestamp = ocrPictureService.getPhotoDateTime(picture);
// if(timestamp != 0L){ // if(timestamp != 0L){
// picture.setPhotoDateTimestamp(timestamp); // picture.setPhotoDateTimestamp(timestamp);
// } // }
ocrPictureService.updateById(picture); ocrPictureService.updateById(picture);
logger.info("图片本地化,回写数据完成!");
logger.info("开启获取图片信息操作!");
ocrPictureService.savePicturesInfo(picture);
logger.info("获取图片信息操作,结束!");
} }
} }
} }

@ -33,7 +33,7 @@ public class DownloadImgUtil {
out.write(buffer, 0, n); out.write(buffer, 0, n);
} }
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage()); logger.error("图片本地化,转储图片异常:",e.getMessage());
return false; return false;
} }
return true; return true;

Loading…
Cancel
Save