MySchedule1.java 14.2 KB
//package org.yrhl.syncdata.task;
//
//import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.scheduling.annotation.EnableScheduling;
//import org.springframework.scheduling.annotation.Scheduled;
//import org.springframework.stereotype.Component;
//import org.yrhl.syncdata.domain.*;
//import org.yrhl.syncdata.mapper.local.CourseOneMapper;
//import org.yrhl.syncdata.mapper.remote.CourseTwoMapper;
//
//import javax.annotation.Resource;
//import java.util.ArrayList;
//import java.util.List;
//import java.util.Map;
//import java.util.stream.Collectors;
//
//@EnableScheduling //开启定时
//@Component
//public class MySchedule1 {
//    private static final Logger log = LoggerFactory.getLogger(MySchedule1.class);
//    @Resource
//    private CourseOneMapper courseOneMapper;
//    @Resource
//    private CourseTwoMapper courseTwoMapper;
//    /**
//     * 每隔10秒执行一次  执行生产者 准备数据
//     */
//   @Scheduled(fixedDelay = 20000)
//    public void test(){
//        //先查询需要同步的表信息
//        List<SyncTable> syncTableList = courseOneMapper.getSyncTableList();
//        if(syncTableList.isEmpty()){
//            log.error("没有可用的同步表,请查看sync_table表是否有可用的数据");
//            return ;
//        }
//        for(SyncTable syncTable:syncTableList){
//            //查询需要待同步的表数据
//            if("allinonepushresults".equals(syncTable.getTableName())){
//               int page = 1;
//                while (true) {
//                    log.info("生产者:当前第{}页",page);
//                    //将数据插入到sync_result 中
//                    List<SyncResult> syncResultListForWait = courseOneMapper.getSyncResultListForWaitForAllinonepushresults(syncTable.getTableName(),(page-1)*100,100);
//                    if(CollectionUtils.isEmpty(syncResultListForWait)) {
//                        break;
//                    }
//                    page++;
//                    try {
//                        // 1.在中间表查询是否存在数据(根据业务主键):如果存在,忽略,如果不存在,插入
//                        List<SyncResult> res= new ArrayList<>();
//                        for(SyncResult syncResult:syncResultListForWait){
//                            // 遍历逐条分析:
//                            int exesit = courseOneMapper.isExesit(syncResult.getInfoId());
//                            if(0==exesit){
//                                res.add(syncResult);
//                            }
//                        }
//                        if(CollectionUtils.isNotEmpty(res)){
//                            courseOneMapper.insertSyncResult(res);
//                        }
//                    }catch (Exception e){
//                        log.error("插入数据失败", e);
//                    }
//                }
//            }
//            else{
//                 int page = 1;
//                 while (true) {
//                     log.info("生产者:表名{}第{}页",syncTable,page);
//                     List<SyncResult> syncResultListForWait = courseOneMapper.getSyncResultListForWait(syncTable.getTableName(),(page-1)*100,100);
//                     if(CollectionUtils.isEmpty(syncResultListForWait)) {
//                         break;
//                     }
//                     page++;
//                     try {
//                         // 1.在中间表查询是否存在数据(根据业务主键):如果存在,忽略,如果不存在,插入
//                         List<SyncResult> res = new ArrayList<>();
//                         for (SyncResult syncResult : syncResultListForWait) {
//                             // 遍历逐条分析:
//                             int exesit = courseOneMapper.isExesit(syncResult.getInfoId());
//                             if (0 == exesit) {
//                                 res.add(syncResult);
//                             }
//                         }
//                         if (CollectionUtils.isNotEmpty(res)) {
//                             courseOneMapper.insertSyncResult(res);
//                         }
//                         log.info("生产者:表名{}插入中间表结束",syncTable);
//                     } catch (Exception e) {
//                         log.error("插入数据失败", e);
//                     }
//                 }
//            }
//
//        }
//    }
//
//
//    @Scheduled(fixedDelay = 40000)
//    public void test1() {
//        //从中间表查询需要同步的数据
//        List<SyncTable> syncTableList = courseOneMapper.getSyncTableList();
//        for (SyncTable syncTable : syncTableList) {
//            if ("allinonepushresults".equals(syncTable.getTableName())) {
//                int page = 1;
//                while (true) {
//                    log.info("消费者:表名{}当前第{}页","allinonepushresults",page);
//                    List<AllinonepushresultsEntity> syncResultListForWait = courseOneMapper.getSyncResultListForWaitForConsumer(syncTable.getTableName(),(page-1)*100,100);
//                    List<Long> ids = null;
//                    if (CollectionUtils.isEmpty(syncResultListForWait)) {
//                        break;
//                    }
//                    page++;
//                    try {
//                        List<AllinonepushresultsEntity> resData = new ArrayList<>();
//                        for (AllinonepushresultsEntity allinonepushresultsEntity : syncResultListForWait) {
//                            int count = courseTwoMapper.isExist("allinonepushresults", allinonepushresultsEntity.getId());
//                            if (0 == count) {
//                                resData.add(allinonepushresultsEntity);
//                            }
//                        }
//                        if (CollectionUtils.isNotEmpty(resData)) {
//                            courseTwoMapper.insertAllinonepushresultsEntityBatch(resData);
//                        }
//                        ids = syncResultListForWait.stream()
//                                .map(AllinonepushresultsEntity::getId)
//                                .collect(Collectors.toList());
//                        for (Long id : ids) {
//                            courseOneMapper.updateSyncResult(id);
//                        }
//                    } catch (Exception e) {
//                        ids = syncResultListForWait.stream()
//                                .map(AllinonepushresultsEntity::getId)
//                                .collect(Collectors.toList());
//                        for (Long id : ids) {
//                            courseOneMapper.updateSyncResultFail(id);
//                        }
//                    }
//                }
//                log.info("消费者:表名{}同步完毕,一共{}页数据",syncTable,page);
//            }
//            else if ("block_new_source_data".equals(syncTable.getTableName())) {
//                int page = 1;
//                while (true) {
//                    log.info("消费者:表名{}当前第{}页","block_new_source_data",page);
//                    List<Long> ids = null;
//                    List<Map<String, Object>> list3 = courseOneMapper.getSyncResultListForWaitForOtherConsumer(syncTable.getTableName(),(page-1)*100,100);
//                    if (CollectionUtils.isEmpty(list3)) {
//                        break;
//                    }
//                    List<BlockNewSourceDataEntity> res1 = list3.stream()
//                            .map(BlockNewSourceDataEntity::fromMap)
//                            .collect(Collectors.toList());
//                    page++;
//                    try {
//                        List<BlockNewSourceDataEntity> resData = new ArrayList<>();
//                        for (BlockNewSourceDataEntity blockNewSourceDataEntity : res1) {
//                            int count = courseTwoMapper.isExist("block_new_source_data", blockNewSourceDataEntity.getId());
//                            if (0 == count) {
//                                resData.add(blockNewSourceDataEntity);
//                            }
//                        }
//                        if (CollectionUtils.isNotEmpty(resData)) {
//                            courseTwoMapper.insertBlockNewSourceDataEntityBatch(resData);
//                        }
//                        ids = res1.stream()
//                                .map(BlockNewSourceDataEntity::getId)
//                                .collect(Collectors.toList());
//                        for (Long id : ids) {
//                            courseOneMapper.updateSyncResult(id);
//                        }
//                    } catch (Exception e) {
//                        ids = res1.stream()
//                                .map(BlockNewSourceDataEntity::getId)
//                                .collect(Collectors.toList());
//                        for (Long id : ids) {
//                            courseOneMapper.updateSyncResultFail(id);
//                        }
//                    }
//                }
//                log.info("消费者:表名{}同步完毕,一共{}页数据",syncTable);
//            }
//            else if("block_source_data".equals(syncTable.getTableName())){
//                {
//                    int page = 1;
//                    while (true) {
//                        log.info("消费者:表名{}当前第{}页","block_source_data",page);
//                        List<Long> ids = null;
//                        List<Map<String, Object>> list3 = courseOneMapper.getSyncResultListForWaitForOtherConsumer(syncTable.getTableName(),(page-1)*100,100);
//                        if (CollectionUtils.isEmpty(list3)) {
//                            break;
//                        }
//                        List<BlockSourceDataEntity> res1 = list3.stream()
//                                .map(BlockSourceDataEntity::fromMap)
//                                .collect(Collectors.toList());
//                        page++;
//                        try {
//                            List<BlockSourceDataEntity> resData = new ArrayList<>();
//                            for (BlockSourceDataEntity blockSourceDataEntity : res1) {
//                                int count = courseTwoMapper.isExist("block_source_data", blockSourceDataEntity.getId());
//                                if (0 == count) {
//                                    resData.add(blockSourceDataEntity);
//                                }
//                            }
//                            if (CollectionUtils.isNotEmpty(resData)) {
//                                courseTwoMapper.insertBlockSourceDataEntityBatch(resData);
//                            }
//                            ids = res1.stream()
//                                    .map(BlockSourceDataEntity::getId)
//                                    .collect(Collectors.toList());
//                            for (Long id : ids) {
//                                courseOneMapper.updateSyncResult(id);
//                            }
//                        } catch (Exception e) {
//                            ids = res1.stream()
//                                    .map(BlockSourceDataEntity::getId)
//                                    .collect(Collectors.toList());
//                            for (Long id : ids) {
//                                courseOneMapper.updateSyncResultFail(id);
//                            }
//                        }
//                    }
//                    log.info("消费者:表名{}同步完毕,一共{}页数据",syncTable,page);
//                }
//
//            }
//            else if("business_data_results".equals(syncTable.getTableName())){
//                {
//                    int page = 1;
//                    while (true) {
//                        log.info("消费者:表名{}当前第{}页","business_data_results",page);
//                        List<Long> ids = null;
//                        List<Map<String, Object>> list3 = courseOneMapper.getSyncResultListForWaitForOtherConsumer(syncTable.getTableName(),(page-1)*100,100);
//                        if (CollectionUtils.isEmpty(list3)) {
//                            break;
//                        }
//                        List<BusinessDataResultsEntity> res1 = list3.stream()
//                                .map(BusinessDataResultsEntity::fromMap)
//                                .collect(Collectors.toList());
//                        page++;
//                        try {
//                            List<BusinessDataResultsEntity> resData = new ArrayList<>();
//                            for (BusinessDataResultsEntity businessDataResultsEntity : res1) {
//                                int count = courseTwoMapper.isExist("business_data_results", businessDataResultsEntity.getId());
//                                if (0 == count) {
//                                    resData.add(businessDataResultsEntity);
//                                }
//                            }
//                            if (CollectionUtils.isNotEmpty(resData)) {
//                                courseTwoMapper.insertBusinessDataResultsBatch(resData);
//                            }
//                            ids = res1.stream()
//                                    .map(BusinessDataResultsEntity::getId)
//                                    .collect(Collectors.toList());
//                            for (Long id : ids) {
//                                courseOneMapper.updateSyncResult(id);
//                            }
//                        } catch (Exception e) {
//                            ids = res1.stream()
//                                    .map(BusinessDataResultsEntity::getId)
//                                    .collect(Collectors.toList());
//                            for (Long id : ids) {
//                                courseOneMapper.updateSyncResultFail(id);
//                            }
//                        }
//                    }
//                    log.info("消费者:表名{}同步完毕,一共{}页数据",syncTable,page);
//                }
//            }
//
//        }
//
//    }
//}