|
|
//package org.yrhl.syncdata.task;
|
|
|
//
|
|
|
//import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
|
|
|
//import org.slf4j.Logger;
|
|
|
//import org.slf4j.LoggerFactory;
|
|
|
//import org.springframework.scheduling.annotation.EnableScheduling;
|
|
|
//import org.springframework.scheduling.annotation.Scheduled;
|
|
|
//import org.springframework.stereotype.Component;
|
|
|
//import org.yrhl.syncdata.domain.*;
|
|
|
//import org.yrhl.syncdata.mapper.local.CourseOneMapper;
|
|
|
//import org.yrhl.syncdata.mapper.remote.CourseTwoMapper;
|
|
|
//
|
|
|
//import javax.annotation.Resource;
|
|
|
//import java.util.ArrayList;
|
|
|
//import java.util.List;
|
|
|
//import java.util.Map;
|
|
|
//import java.util.stream.Collectors;
|
|
|
//
|
|
|
//@EnableScheduling //开启定时
|
|
|
//@Component
|
|
|
//public class MySchedule1 {
|
|
|
// private static final Logger log = LoggerFactory.getLogger(MySchedule1.class);
|
|
|
// @Resource
|
|
|
// private CourseOneMapper courseOneMapper;
|
|
|
// @Resource
|
|
|
// private CourseTwoMapper courseTwoMapper;
|
|
|
// /**
|
|
|
// * 每隔10秒执行一次 执行生产者 准备数据
|
|
|
// */
|
|
|
// @Scheduled(fixedDelay = 20000)
|
|
|
// public void test(){
|
|
|
// //先查询需要同步的表信息
|
|
|
// List<SyncTable> syncTableList = courseOneMapper.getSyncTableList();
|
|
|
// if(syncTableList.isEmpty()){
|
|
|
// log.error("没有可用的同步表,请查看sync_table表是否有可用的数据");
|
|
|
// return ;
|
|
|
// }
|
|
|
// for(SyncTable syncTable:syncTableList){
|
|
|
// //查询需要待同步的表数据
|
|
|
// if("allinonepushresults".equals(syncTable.getTableName())){
|
|
|
// int page = 1;
|
|
|
// while (true) {
|
|
|
// log.info("生产者:当前第{}页",page);
|
|
|
// //将数据插入到sync_result 中
|
|
|
// List<SyncResult> syncResultListForWait = courseOneMapper.getSyncResultListForWaitForAllinonepushresults(syncTable.getTableName(),(page-1)*100,100);
|
|
|
// if(CollectionUtils.isEmpty(syncResultListForWait)) {
|
|
|
// break;
|
|
|
// }
|
|
|
// page++;
|
|
|
// try {
|
|
|
// // 1.在中间表查询是否存在数据(根据业务主键):如果存在,忽略,如果不存在,插入
|
|
|
// List<SyncResult> res= new ArrayList<>();
|
|
|
// for(SyncResult syncResult:syncResultListForWait){
|
|
|
// // 遍历逐条分析:
|
|
|
// int exesit = courseOneMapper.isExesit(syncResult.getInfoId());
|
|
|
// if(0==exesit){
|
|
|
// res.add(syncResult);
|
|
|
// }
|
|
|
// }
|
|
|
// if(CollectionUtils.isNotEmpty(res)){
|
|
|
// courseOneMapper.insertSyncResult(res);
|
|
|
// }
|
|
|
// }catch (Exception e){
|
|
|
// log.error("插入数据失败", e);
|
|
|
// }
|
|
|
// }
|
|
|
// }
|
|
|
// else{
|
|
|
// int page = 1;
|
|
|
// while (true) {
|
|
|
// log.info("生产者:表名{}第{}页",syncTable,page);
|
|
|
// List<SyncResult> syncResultListForWait = courseOneMapper.getSyncResultListForWait(syncTable.getTableName(),(page-1)*100,100);
|
|
|
// if(CollectionUtils.isEmpty(syncResultListForWait)) {
|
|
|
// break;
|
|
|
// }
|
|
|
// page++;
|
|
|
// try {
|
|
|
// // 1.在中间表查询是否存在数据(根据业务主键):如果存在,忽略,如果不存在,插入
|
|
|
// List<SyncResult> res = new ArrayList<>();
|
|
|
// for (SyncResult syncResult : syncResultListForWait) {
|
|
|
// // 遍历逐条分析:
|
|
|
// int exesit = courseOneMapper.isExesit(syncResult.getInfoId());
|
|
|
// if (0 == exesit) {
|
|
|
// res.add(syncResult);
|
|
|
// }
|
|
|
// }
|
|
|
// if (CollectionUtils.isNotEmpty(res)) {
|
|
|
// courseOneMapper.insertSyncResult(res);
|
|
|
// }
|
|
|
// log.info("生产者:表名{}插入中间表结束",syncTable);
|
|
|
// } catch (Exception e) {
|
|
|
// log.error("插入数据失败", e);
|
|
|
// }
|
|
|
// }
|
|
|
// }
|
|
|
//
|
|
|
// }
|
|
|
// }
|
|
|
//
|
|
|
//
|
|
|
// @Scheduled(fixedDelay = 40000)
|
|
|
// public void test1() {
|
|
|
// //从中间表查询需要同步的数据
|
|
|
// List<SyncTable> syncTableList = courseOneMapper.getSyncTableList();
|
|
|
// for (SyncTable syncTable : syncTableList) {
|
|
|
// if ("allinonepushresults".equals(syncTable.getTableName())) {
|
|
|
// int page = 1;
|
|
|
// while (true) {
|
|
|
// log.info("消费者:表名{}当前第{}页","allinonepushresults",page);
|
|
|
// List<AllinonepushresultsEntity> syncResultListForWait = courseOneMapper.getSyncResultListForWaitForConsumer(syncTable.getTableName(),(page-1)*100,100);
|
|
|
// List<Long> ids = null;
|
|
|
// if (CollectionUtils.isEmpty(syncResultListForWait)) {
|
|
|
// break;
|
|
|
// }
|
|
|
// page++;
|
|
|
// try {
|
|
|
// List<AllinonepushresultsEntity> resData = new ArrayList<>();
|
|
|
// for (AllinonepushresultsEntity allinonepushresultsEntity : syncResultListForWait) {
|
|
|
// int count = courseTwoMapper.isExist("allinonepushresults", allinonepushresultsEntity.getId());
|
|
|
// if (0 == count) {
|
|
|
// resData.add(allinonepushresultsEntity);
|
|
|
// }
|
|
|
// }
|
|
|
// if (CollectionUtils.isNotEmpty(resData)) {
|
|
|
// courseTwoMapper.insertAllinonepushresultsEntityBatch(resData);
|
|
|
// }
|
|
|
// ids = syncResultListForWait.stream()
|
|
|
// .map(AllinonepushresultsEntity::getId)
|
|
|
// .collect(Collectors.toList());
|
|
|
// for (Long id : ids) {
|
|
|
// courseOneMapper.updateSyncResult(id);
|
|
|
// }
|
|
|
// } catch (Exception e) {
|
|
|
// ids = syncResultListForWait.stream()
|
|
|
// .map(AllinonepushresultsEntity::getId)
|
|
|
// .collect(Collectors.toList());
|
|
|
// for (Long id : ids) {
|
|
|
// courseOneMapper.updateSyncResultFail(id);
|
|
|
// }
|
|
|
// }
|
|
|
// }
|
|
|
// log.info("消费者:表名{}同步完毕,一共{}页数据",syncTable,page);
|
|
|
// }
|
|
|
// else if ("block_new_source_data".equals(syncTable.getTableName())) {
|
|
|
// int page = 1;
|
|
|
// while (true) {
|
|
|
// log.info("消费者:表名{}当前第{}页","block_new_source_data",page);
|
|
|
// List<Long> ids = null;
|
|
|
// List<Map<String, Object>> list3 = courseOneMapper.getSyncResultListForWaitForOtherConsumer(syncTable.getTableName(),(page-1)*100,100);
|
|
|
// if (CollectionUtils.isEmpty(list3)) {
|
|
|
// break;
|
|
|
// }
|
|
|
// List<BlockNewSourceDataEntity> res1 = list3.stream()
|
|
|
// .map(BlockNewSourceDataEntity::fromMap)
|
|
|
// .collect(Collectors.toList());
|
|
|
// page++;
|
|
|
// try {
|
|
|
// List<BlockNewSourceDataEntity> resData = new ArrayList<>();
|
|
|
// for (BlockNewSourceDataEntity blockNewSourceDataEntity : res1) {
|
|
|
// int count = courseTwoMapper.isExist("block_new_source_data", blockNewSourceDataEntity.getId());
|
|
|
// if (0 == count) {
|
|
|
// resData.add(blockNewSourceDataEntity);
|
|
|
// }
|
|
|
// }
|
|
|
// if (CollectionUtils.isNotEmpty(resData)) {
|
|
|
// courseTwoMapper.insertBlockNewSourceDataEntityBatch(resData);
|
|
|
// }
|
|
|
// ids = res1.stream()
|
|
|
// .map(BlockNewSourceDataEntity::getId)
|
|
|
// .collect(Collectors.toList());
|
|
|
// for (Long id : ids) {
|
|
|
// courseOneMapper.updateSyncResult(id);
|
|
|
// }
|
|
|
// } catch (Exception e) {
|
|
|
// ids = res1.stream()
|
|
|
// .map(BlockNewSourceDataEntity::getId)
|
|
|
// .collect(Collectors.toList());
|
|
|
// for (Long id : ids) {
|
|
|
// courseOneMapper.updateSyncResultFail(id);
|
|
|
// }
|
|
|
// }
|
|
|
// }
|
|
|
// log.info("消费者:表名{}同步完毕,一共{}页数据",syncTable);
|
|
|
// }
|
|
|
// else if("block_source_data".equals(syncTable.getTableName())){
|
|
|
// {
|
|
|
// int page = 1;
|
|
|
// while (true) {
|
|
|
// log.info("消费者:表名{}当前第{}页","block_source_data",page);
|
|
|
// List<Long> ids = null;
|
|
|
// List<Map<String, Object>> list3 = courseOneMapper.getSyncResultListForWaitForOtherConsumer(syncTable.getTableName(),(page-1)*100,100);
|
|
|
// if (CollectionUtils.isEmpty(list3)) {
|
|
|
// break;
|
|
|
// }
|
|
|
// List<BlockSourceDataEntity> res1 = list3.stream()
|
|
|
// .map(BlockSourceDataEntity::fromMap)
|
|
|
// .collect(Collectors.toList());
|
|
|
// page++;
|
|
|
// try {
|
|
|
// List<BlockSourceDataEntity> resData = new ArrayList<>();
|
|
|
// for (BlockSourceDataEntity blockSourceDataEntity : res1) {
|
|
|
// int count = courseTwoMapper.isExist("block_source_data", blockSourceDataEntity.getId());
|
|
|
// if (0 == count) {
|
|
|
// resData.add(blockSourceDataEntity);
|
|
|
// }
|
|
|
// }
|
|
|
// if (CollectionUtils.isNotEmpty(resData)) {
|
|
|
// courseTwoMapper.insertBlockSourceDataEntityBatch(resData);
|
|
|
// }
|
|
|
// ids = res1.stream()
|
|
|
// .map(BlockSourceDataEntity::getId)
|
|
|
// .collect(Collectors.toList());
|
|
|
// for (Long id : ids) {
|
|
|
// courseOneMapper.updateSyncResult(id);
|
|
|
// }
|
|
|
// } catch (Exception e) {
|
|
|
// ids = res1.stream()
|
|
|
// .map(BlockSourceDataEntity::getId)
|
|
|
// .collect(Collectors.toList());
|
|
|
// for (Long id : ids) {
|
|
|
// courseOneMapper.updateSyncResultFail(id);
|
|
|
// }
|
|
|
// }
|
|
|
// }
|
|
|
// log.info("消费者:表名{}同步完毕,一共{}页数据",syncTable,page);
|
|
|
// }
|
|
|
//
|
|
|
// }
|
|
|
// else if("business_data_results".equals(syncTable.getTableName())){
|
|
|
// {
|
|
|
// int page = 1;
|
|
|
// while (true) {
|
|
|
// log.info("消费者:表名{}当前第{}页","business_data_results",page);
|
|
|
// List<Long> ids = null;
|
|
|
// List<Map<String, Object>> list3 = courseOneMapper.getSyncResultListForWaitForOtherConsumer(syncTable.getTableName(),(page-1)*100,100);
|
|
|
// if (CollectionUtils.isEmpty(list3)) {
|
|
|
// break;
|
|
|
// }
|
|
|
// List<BusinessDataResultsEntity> res1 = list3.stream()
|
|
|
// .map(BusinessDataResultsEntity::fromMap)
|
|
|
// .collect(Collectors.toList());
|
|
|
// page++;
|
|
|
// try {
|
|
|
// List<BusinessDataResultsEntity> resData = new ArrayList<>();
|
|
|
// for (BusinessDataResultsEntity businessDataResultsEntity : res1) {
|
|
|
// int count = courseTwoMapper.isExist("business_data_results", businessDataResultsEntity.getId());
|
|
|
// if (0 == count) {
|
|
|
// resData.add(businessDataResultsEntity);
|
|
|
// }
|
|
|
// }
|
|
|
// if (CollectionUtils.isNotEmpty(resData)) {
|
|
|
// courseTwoMapper.insertBusinessDataResultsBatch(resData);
|
|
|
// }
|
|
|
// ids = res1.stream()
|
|
|
// .map(BusinessDataResultsEntity::getId)
|
|
|
// .collect(Collectors.toList());
|
|
|
// for (Long id : ids) {
|
|
|
// courseOneMapper.updateSyncResult(id);
|
|
|
// }
|
|
|
// } catch (Exception e) {
|
|
|
// ids = res1.stream()
|
|
|
// .map(BusinessDataResultsEntity::getId)
|
|
|
// .collect(Collectors.toList());
|
|
|
// for (Long id : ids) {
|
|
|
// courseOneMapper.updateSyncResultFail(id);
|
|
|
// }
|
|
|
// }
|
|
|
// }
|
|
|
// log.info("消费者:表名{}同步完毕,一共{}页数据",syncTable,page);
|
|
|
// }
|
|
|
// }
|
|
|
//
|
|
|
// }
|
|
|
//
|
|
|
// }
|
|
|
//} |
|
|
\ No newline at end of file |
...
|
...
|
|