C/C++教程

service

本文主要是介绍service,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
package com.hsbc.risk.frtbsa.service.cde.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.hsbc.risk.frtbsa.base.AbstractBeamCheckService;
import com.hsbc.risk.frtbsa.coder.CommonBeamCoder;
import com.hsbc.risk.frtbsa.constant.CdeConstants;
import com.hsbc.risk.frtbsa.domain.bean.BaseBean;
import com.hsbc.risk.frtbsa.domain.bean.CombineBean;
import com.hsbc.risk.frtbsa.domain.bean.CommonBean;
import com.hsbc.risk.frtbsa.domain.enumType.CsvCountEnum;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import com.hsbc.risk.frtbsa.domain.enumType.RunBeamType;
import com.hsbc.risk.frtbsa.dto.BeamParam;
import com.hsbc.risk.frtbsa.dto.CsvFile;
import com.hsbc.risk.frtbsa.dto.RunStatusLog;
import com.hsbc.risk.frtbsa.entity.CdeCompletenessConfig;
import com.hsbc.risk.frtbsa.entity.CdeException;
import com.hsbc.risk.frtbsa.entity.CdeExceptionSummary;
import com.hsbc.risk.frtbsa.option.CommonOption;
import com.hsbc.risk.frtbsa.parse.dto.ParseParam;
import com.hsbc.risk.frtbsa.parse.transform.BaseBeanToCsvFn;
import com.hsbc.risk.frtbsa.parse.transform.CombineBaseBeanFn;
import com.hsbc.risk.frtbsa.parse.transform.CsvToBaseBeanFn;
import com.hsbc.risk.frtbsa.parse.transform.KVToBaseBeanFn;
import com.hsbc.risk.frtbsa.service.cde.CompletenessBeamService;
import com.hsbc.risk.frtbsa.service.entity.CdeExceptionService;
import com.hsbc.risk.frtbsa.service.entity.CdeExceptionSummaryService;
import com.hsbc.risk.frtbsa.transform.*;
import com.hsbc.risk.frtbsa.utils.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.*;

@Service
@Slf4j
public class CompletenessBeamServiceImpl extends AbstractBeamCheckService implements CompletenessBeamService {

    private final static LineConvert convert = new ConvertLineToObject();

    @Autowired
    CdeExceptionService cdeExceptionService;
    @Autowired
    CdeExceptionSummaryService cdeExceptionSummaryService;

    @Value("${output.dir}")
    public String outputDir;

    private CdeCompletenessConfig cdeConfig;

    private BeamParam beamParam;

    private String site;
    private String preAggGroup;
    private String cdeName;
    private String fileType;
    private Long cdeId;
    private String legalEntity;

    private FileTypeEnum fileTypeEnum;

    private Map<String, List<CsvFile>> runFileMap;

    private HashMap<FileTypeEnum, PCollection<String>> pCollectionMap = Maps.newHashMap();
    private HashMap<FileTypeEnum, HashMap<String, Integer>> indexMap = Maps.newHashMap();

    private HashMap<String, Integer> cdeIndexMap;
    private PCollection<String> cdePCollection;

    private boolean notExist;

    private void setPCollectionMap(Pipeline pipeline) {
        pCollectionMap.put(FileTypeEnum.DELTA, getCombinePCollection("SBM_Delta_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.DELTA)));
        pCollectionMap.put(FileTypeEnum.TRADE_ATTRIBUTE, getCombinePCollection("Trade_Attributes", runFileMap, pipeline, indexMap.get(FileTypeEnum.TRADE_ATTRIBUTE)));
        pCollectionMap.put(FileTypeEnum.VEGA, getCombinePCollection("SBM_Vega_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.VEGA)));
        pCollectionMap.put(FileTypeEnum.CURVATURE, getCombinePCollection("SBM_Curvature_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.CURVATURE)));
        pCollectionMap.put(FileTypeEnum.DRC, getCombinePCollection("DRC", runFileMap, pipeline, indexMap.get(FileTypeEnum.DRC)));
    }

    private void setIndexMap(CdeConstants cdeConstants) {
        indexMap.put(FileTypeEnum.DELTA, cdeConstants.cdeIndexMap.get(FileTypeEnum.DELTA));
        indexMap.put(FileTypeEnum.TRADE_ATTRIBUTE, cdeConstants.cdeIndexMap.get(FileTypeEnum.TRADE_ATTRIBUTE));
        indexMap.put(FileTypeEnum.VEGA, cdeConstants.cdeIndexMap.get(FileTypeEnum.VEGA));
        indexMap.put(FileTypeEnum.CURVATURE, cdeConstants.cdeIndexMap.get(FileTypeEnum.CURVATURE));
        indexMap.put(FileTypeEnum.DRC, cdeConstants.cdeIndexMap.get(FileTypeEnum.DRC));
    }

    private PCollection<String> getCombinePCollection(String key, Map<String, List<CsvFile>> runFileMap, Pipeline pipeline, HashMap<String, Integer> indexMap) {
        PCollection<String> pResult = null;
        if (StringUtils.isEmpty(key) || !runFileMap.containsKey(key)) {
            return pResult;
        } else {
            List<CsvFile> csvFiles = runFileMap.get(key);
            List<PCollection<BaseBean>> beans = Lists.newArrayList();
            List<PCollection<KV<String, BaseBean>>> kvBeans = Lists.newArrayList();
            for (CsvFile csvFile : csvFiles) {
                PCollection<String> sPCollection = pipeline.apply(TextIO.read().from(csvFile.getAfterBatchFilePath()))
                        .apply(Filter.by(s -> !s.startsWith("ASOFDATE")));
                beans.add(sPCollection.apply(ParDo.of(new CsvToBaseBeanFn(convert, indexMap))));
            }
            for (PCollection<BaseBean> pBean : beans) {
                kvBeans.add(CombineBaseBeanFn.getKvBean(pBean));
            }
            KeyedPCollectionTuple<String> of = null;
            for (int i = 0; i < kvBeans.size(); i++) {
                PCollection<KV<String, BaseBean>> kvBean = kvBeans.get(i);
                if (i == 0) {
                    of = KeyedPCollectionTuple.of("kv" + i, kvBean);
                } else {
                    of = of.and("kv" + i, kvBean);
                }
            }
            PCollection<KV<String, CoGbkResult>> coGbkResultCollection = of.apply(CoGroupByKey.<String>create());
            PCollection<BaseBean> mixBean = coGbkResultCollection.apply(ParDo.of(new KVToBaseBeanFn(kvBeans.size())));
            pResult = mixBean.apply(ParDo.of(new BaseBeanToCsvFn()));
        }
        return pResult;
    }

    @Override
    public void beforeRunBeam(BeamParam beamParam) {
        this.beamParam = beamParam;

        this.runFileMap = beamParam.getRunFileMap();

        //to create a new cde config
        cdeConfig = beamParam.getCdeConfig();
        cdeConfig.setOutputDir(outputDir);
        cdeName = cdeConfig.getCdeName();
        fileType = cdeConfig.getFileType();
        cdeId = cdeConfig.getId();

        fileTypeEnum = FileTypeEnum.getFileType(cdeConfig.getFileType());

        site = beamParam.getSite();
        preAggGroup = beamParam.getPreAggGroup();

        notExist = false;
    }

    @Override
    public void runBeam() {
        try {
            CommonOption option = createOption(CommonOption.class, generateCompletenessCdeConfig(cdeConfig));
            initOption(option);

            Pipeline pipeline = Pipeline.create(option);
            pipeline.getCoderRegistry().registerCoderForClass(CommonBean.class, new CommonBeamCoder());

            CdeConstants cdeConstants = option.getCdeConstants();

            setIndexMap(cdeConstants);
            setPCollectionMap(pipeline);

            cdeIndexMap = indexMap.get(fileTypeEnum);
            cdePCollection = pCollectionMap.get(fileTypeEnum);

            if (cdePCollection == null) {
                notExist = true;
                return;
            }

            PCollection<String> totalCount = cdePCollection.apply(Count.globally()).apply(ParDo.of(new CsvCountFn()));

            PCollection<String> filterCsv = null;

            //parse to map,key is order
            Map<Integer, List<ParseParam>> orderParseMap = CdeMapUtil.getOrderParseMap(this.beamParam.getParseParamList());

            //parse to map,key is fileType
            Map<String, Map<Integer, List<ParseParam>>> combineOrderParseMap = CdeMapUtil.getCombineOrderParseMap(this.beamParam.getCombineParamMap());

            //filter own file first
            filterCsv = filterCsv(cdePCollection, orderParseMap, 0, null, cdeIndexMap, convert);

            //filter combine file array secondly if combineOrderParseMap.size > 0
            if (combineOrderParseMap.size() > 0) {
                for (Map.Entry<String, Map<Integer, List<ParseParam>>> entry : combineOrderParseMap.entrySet()) {
                    FileTypeEnum fileType = FileTypeEnum.getFileType(entry.getKey());
                    HashMap<String, Integer> indexMap = cdeConstants.cdeIndexMap.get(fileType);
                    Map<Integer, List<ParseParam>> value = entry.getValue();

                    PCollection<String> csv = pCollectionMap.get(fileType);

                    PCollection<String> combineCsv = filterCsv(csv, value, 0, null, indexMap, convert);

                    PCollection<CombineBean> sourceBean = combineCsv.apply(ParDo.of(new CsvToComplexBeanFn(fileType, indexMap, convert)));

                    //now combine own file and combine files
                    filterCsv = getRuleFilterCombineCsv(filterCsv, sourceBean);
                }
            }

            PCollection<String> failCount = filterCsv.apply(Count.globally()).apply(ParDo.of(new CsvCountFn()));

            PCollection<String> analysisFilterCsv;

            //add delta logical
            if (Objects.equals(fileTypeEnum, FileTypeEnum.DELTA)) {
                analysisFilterCsv = filterCsv.apply(ParDo.of(new DeltaCsvToCsvFn(indexMap.get(FileTypeEnum.DELTA))));
            } else {
                PCollection<CombineBean> deltaBean = pCollectionMap.get(FileTypeEnum.DELTA)
                        .apply(Filter.by(s -> !s.startsWith("ASOFDATE")))
                        .apply(ParDo.of(new DeltaCsvToBeanFn(indexMap.get(FileTypeEnum.DELTA))));

                analysisFilterCsv = getCombineDeltaCsv(filterCsv, deltaBean);
            }

            //add trade logical
            //as long as it is not a trade attribute, it must be executed
            if (!Objects.equals(fileTypeEnum, FileTypeEnum.TRADE_ATTRIBUTE)) {
                PCollection<CombineBean> tradeBean = pCollectionMap.get(FileTypeEnum.TRADE_ATTRIBUTE)
                        .apply(Filter.by(s -> !s.startsWith("ASOFDATE")))
                        .apply(ParDo.of(new TradeCsvToBeanFn(indexMap.get(FileTypeEnum.TRADE_ATTRIBUTE))));

                analysisFilterCsv = getCombineTradeCsv(analysisFilterCsv, tradeBean);
            }

            //output to directory
            analysisFilterCsv.apply(TextIO.write().to(CsvNameUtil.getOutputCsvName(outputDir, fileTypeEnum, RunBeamType.COMPLETENESS, cdeName))
                    .withNumShards(1).withSuffix(".csv"));

            totalCount.apply(TextIO.write().to(CsvNameUtil.getOutputCsvName(outputDir, fileTypeEnum,
                    cdeName, CsvCountEnum.TOTAL_COUNT, RunBeamType.COMPLETENESS))
                    .withNumShards(1).withSuffix(".csv"));

            failCount.apply(TextIO.write().to(CsvNameUtil.getOutputCsvName(outputDir, fileTypeEnum,
                    cdeName, CsvCountEnum.FAIL_COUNT, RunBeamType.COMPLETENESS))
                    .withNumShards(1).withSuffix(".csv"));

            pipeline.run().waitUntilFinish();

        } catch (Exception e) {
            log.error(e.getLocalizedMessage(), e);
        }
    }

    private PCollection<String> getRuleFilterCombineCsv(PCollection<String> target, PCollection<CombineBean> sourceBean) {
        PCollection<CombineBean> targetBean = target.apply(ParDo.of(new CsvToComplexBeanFn(fileTypeEnum, cdeIndexMap, convert)));

        PCollection<CombineBean> otherBean = CombineBeanFn.getCombineComplexBean(targetBean, sourceBean);

        PCollection<CombineBean> mixBean = PCollectionList.of(targetBean).and(otherBean).apply(Flatten.<CombineBean>pCollections());

        PCollection<KV<String, CombineBean>> mixBeanKv = CombineBeanFn.getComplexBeanKv(mixBean);

        PCollection<KV<String, Iterable<CombineBean>>> mixBeanPc = mixBeanKv.apply(GroupByKey.create());

        PCollection<CombineBean> filterMixBeanPC = mixBeanPc.apply(ParDo.of(new FilterMixComplexBeanFn(fileTypeEnum)));

        return filterMixBeanPC.apply(ParDo.of(new BeanToCsvFn()));
    }

    private PCollection<String> getCombineDeltaCsv(PCollection<String> filterCsv, PCollection<CombineBean> deltaBean) {
        PCollection<CombineBean> filterBean = filterCsv.apply(ParDo.of(new CombineDeltaCsvToBeanFn(fileTypeEnum, cdeIndexMap, convert)));

        PCollection<CombineBean> otherBean = CombineBeanFn.getCombineBean(filterBean, deltaBean);

        PCollection<CombineBean> mixBean = PCollectionList.of(filterBean).and(otherBean).apply(Flatten.<CombineBean>pCollections());

        PCollection<KV<String, CombineBean>> mixBeanKv = CombineBeanFn.getMixBeanKv(mixBean);

        PCollection<KV<String, Iterable<CombineBean>>> mixBeanPc = mixBeanKv.apply(GroupByKey.create());

        PCollection<CombineBean> filterMixBeanPC = mixBeanPc.apply(ParDo.of(new FilterMixDeltaFn()));

        return filterMixBeanPC.apply(ParDo.of(new BeanToCsvFn()));
    }

    private PCollection<String> getCombineTradeCsv(PCollection<String> filterCsv, PCollection<CombineBean> tradeBean) {
        PCollection<CombineBean> filterBean = filterCsv.apply(ParDo.of(new CombineTradeCsvToBeanFn(fileTypeEnum, cdeIndexMap, convert)));

        PCollection<CombineBean> otherBean = CombineBeanFn.getCombineBean(filterBean, tradeBean);

        PCollection<CombineBean> mixBean = PCollectionList.of(filterBean).and(otherBean).apply(Flatten.<CombineBean>pCollections());

        PCollection<KV<String, CombineBean>> mixBeanKv = CombineBeanFn.getMixBeanKv(mixBean);

        PCollection<KV<String, Iterable<CombineBean>>> mixBeanPc = mixBeanKv.apply(GroupByKey.create());

        PCollection<CombineBean> filterMixBeanPC = mixBeanPc.apply(ParDo.of(new FilterMixTradeFn()));

        return filterMixBeanPC.apply(ParDo.of(new BeanToCsvFn()));
    }

    @Override
    public void afterRunBeam() {
        if (!notExist) {
            ArrayList<String> totalCountResult = FileHelper.readCsv(FileHelper.findCsv(outputDir,
                    fileTypeEnum, cdeName, CsvCountEnum.TOTAL_COUNT, RunBeamType.COMPLETENESS));
            ArrayList<String> failCountResult = FileHelper.readCsv(FileHelper.findCsv(outputDir,
                    fileTypeEnum, cdeName, CsvCountEnum.FAIL_COUNT, RunBeamType.COMPLETENESS));

            Long totalCount = Long.parseLong(totalCountResult.get(0));
            Long failCount = Long.parseLong(failCountResult.get(0));

            if (totalCountResult.size() == 1) beamParam.setTotalCount(totalCount);
            if (failCountResult.size() == 1) beamParam.setFailCount(failCount);
            if (totalCount != null && failCount != null) beamParam.setPassCount(totalCount - failCount);

            ArrayList<String> csv = FileHelper.readCsv(FileHelper.findCsv(outputDir, fileTypeEnum, cdeName, RunBeamType.COMPLETENESS));
            beamParam.setCsv(csv);
        }
    }

    @Override
    public void setFilterDataInDB() {
        CdeConstants cdeConstants = CdeConstants.instance(cdeConfig.getVersion());

        String batchNum = cdeExceptionService.getBatchNum(fileTypeEnum);
        beamParam.setBatchNum(batchNum);

        ArrayList<String> csvs = beamParam.getCsv();

        HashMap<String, Integer> indexMap = cdeConstants.cdeIndexMap.get(this.fileTypeEnum);

        if (notExist) {
            CdeException cdeException = CdeException.builder()
                    .dqId(cdeConfig.getDqId())
                    .siteCode(site)
                    .preAggGroup(preAggGroup)
                    .tableName("cde_exception")
                    .dataFieldValue("dataFieldValue")
                    .logicalOrBusinessNameCde(cdeName)
                    .sourceSystemName("SUBMIT")
                    .producer("N")
                    .errorMessage("csv file is not exist!")
                    .dimension(RunBeamType.COMPLETENESS.name())
                    .controlRunTimestamp(String.valueOf(System.currentTimeMillis()))
                    .cdeExceptionRecordType("PLATO")
                    .bookIdentifier("bookIdentifier")
                    .createDate(new Date())
                    .createBy("SYSTEM")
                    .generatedDate(DateFormatUtil.getCurrentDate(DateFormatUtil.YYYY_MM_DD))
                    .fileType(fileType)
                    .cdeName(cdeName)
                    .cdeConfigId(cdeId)
                    .batchNum(batchNum)
                    .build();

            cdeExceptionService.save(cdeException);

        } else {
            List<CdeException> cdeExceptions = Lists.newArrayList();

            csvs.forEach(csv -> {
                String[] arr = SplitUtil.split(csv);

                String tradeId = arr[indexMap.get("TRADEID")];
                String legId = arr[indexMap.get("LEG_ID")];
                String dqId = cdeConfig.getDqId();
                String recordId = "";

                CdeException cdeException = CdeException.builder()
                        .dqId(dqId)
                        .siteCode(site)
                        .preAggGroup(preAggGroup)
                        .tableName("cde_exception")
                        .dataFieldValue("dataFieldValue")
                        .logicalOrBusinessNameCde(cdeName)
                        .sourceSystemName("SUBMIT")
                        .producer("N")
                        .errorMessage(cdeConfig.getErrorMessage())
                        .dimension(RunBeamType.COMPLETENESS.name())
                        .controlRunTimestamp(String.valueOf(System.currentTimeMillis()))
                        .cdeExceptionRecordType("PLATO")
                        .bookIdentifier("bookIdentifier")
                        .tradeId(tradeId)
                        .legId(legId)
                        .asOfDate(arr[indexMap.get("ASOFDATE")])
                        .createDate(new Date())
                        .createBy("SYSTEM")
                        .generatedDate(DateFormatUtil.getCurrentDate(DateFormatUtil.YYYY_MM_DD))
                        .fileType(fileType)
                        .cdeName(cdeName)
                        .cdeConfigId(cdeId)
                        .batchNum(batchNum)
                        .fxDelta(StringUtils.equals("null", arr[indexMap.get("FXDELTA")]) ? null : new Double(arr[indexMap.get("FXDELTA")]))
                        .cmDelta(StringUtils.equals("null", arr[indexMap.get("CMDELTA")]) ? null : new Double(arr[indexMap.get("CMDELTA")]))
                        .irDelta(StringUtils.equals("null", arr[indexMap.get("IRDELTA")]) ? null : new Double(arr[indexMap.get("IRDELTA")]))
                        .eqDelta(StringUtils.equals("null", arr[indexMap.get("EQDELTA")]) ? null : new Double(arr[indexMap.get("EQDELTA")]))
                        .build();

                //logic of record_id
                //DQID, Source System,Book, TradeID, LegID,Riskfactor
                if (!Objects.equals(FileTypeEnum.TRADE_ATTRIBUTE, fileTypeEnum)) {
                    legalEntity = arr[indexMap.get("TRADE_LEGAL_ENTITY")];
                    cdeException.setLegalEntity(legalEntity);
                    recordId = String.format("%s-%s-%s-%s-%s-%s", dqId, arr[indexMap.get("TRADE_SOURCE_SYSTEM")],
                            arr[indexMap.get("TRADE_BOOK")], tradeId, legId,
                            indexMap.get("RISKFACTOR") == null ? "no RiskFactor" : arr[indexMap.get("RISKFACTOR")]);
                } else {
                    legalEntity = arr[indexMap.get("LEGAL_ENTITY")];
                    cdeException.setLegalEntity(legalEntity);
                    recordId = String.format("%s-%s-%s-%s-%s-%s", dqId, arr[indexMap.get("SOURCE_SYSTEM")],
                            arr[indexMap.get("BOOK")], tradeId, legId, "no RiskFactor");
                }
                cdeException.setRecordId(recordId);

                //delta
                if (Objects.equals(FileTypeEnum.DELTA, fileTypeEnum)) {
                    cdeException.setRiskClass(arr[indexMap.get("RISKCLASS")]);
                    cdeException.setRiskFactor(arr[indexMap.get("RISKFACTOR")]);
                    cdeException.setSequence(arr[indexMap.get("SEQUENCE")]);
                    cdeException.setAdjustmentFlag(arr[indexMap.get("ADJUSTMENTFLAG")]);
                }
                //vega curvature
                if (Objects.equals(FileTypeEnum.VEGA, fileTypeEnum) || Objects.equals(FileTypeEnum.CURVATURE, fileTypeEnum)) {
                    cdeException.setRiskClass(arr[indexMap.get("RISKCLASS")]);
                    cdeException.setRiskFactor(arr[indexMap.get("RISKFACTOR")]);
                    cdeException.setAdjustmentFlag(arr[indexMap.get("ADJUSTMENTFLAG")]);
                }
                //drc
                if (Objects.equals(FileTypeEnum.DRC, fileTypeEnum)) {
                    cdeException.setRiskClass(arr[indexMap.get("RISKCLASS")]);
                    cdeException.setObligorId(arr[indexMap.get("OBLIGORID")]);
                    cdeException.setAdjustmentFlag(arr[indexMap.get("ADJUSTMENTFLAG")]);
                    cdeException.setDrcUnderlyingref(arr[indexMap.get("DRC_UNDERLYINGREF")]);
                }

                cdeExceptions.add(cdeException);
            });

            cdeExceptionService.batchSave(cdeExceptions);
        }
    }

    @Override
    public RunStatusLog finish(long costTime) {
        CdeExceptionSummary summary = null;
        if (notExist) {
            summary = CdeExceptionSummary.builder()
                    .siteCode(site)
                    .dqId(cdeConfig.getDqId())
                    .recordsFailed(0l)
                    .recordsPassed(0l)
                    .recordsTested(0l)
                    .createDate(new Date())
                    .createBy("SYSTEM")
                    .batchNum(beamParam.getBatchNum())
                    .generatedDate(DateFormatUtil.getCurrentDate(DateFormatUtil.YYYY_MM_DD))
                    .fileType(fileType)
                    .cdeName(cdeName)
                    .cdeConfigId(cdeId)
                    .runCostTimeCount(TimeUtil.formatMinSec(costTime))
                    .dimension(RunBeamType.COMPLETENESS.name())
                    .tableName("cde_exception_summary")
                    .build();
        } else {
            summary = CdeExceptionSummary.builder()
                    .siteCode(site)
                    .dqId(cdeConfig.getDqId())
                    .recordsFailed(beamParam.getFailCount())
                    .recordsPassed(beamParam.getPassCount())
                    .recordsTested(beamParam.getTotalCount())
                    .createDate(new Date())
                    .createBy("SYSTEM")
                    .batchNum(beamParam.getBatchNum())
                    .generatedDate(DateFormatUtil.getCurrentDate(DateFormatUtil.YYYY_MM_DD))
                    .fileType(fileType)
                    .cdeName(cdeName)
                    .cdeConfigId(cdeId)
                    .runCostTimeCount(TimeUtil.formatMinSec(costTime))
                    .dimension(RunBeamType.COMPLETENESS.name())
                    .tableName("cde_exception_summary")
                    .legalEntity(legalEntity)
                    .build();
        }
        cdeExceptionSummaryService.save(summary);

        RunStatusLog log = new RunStatusLog();
        BeanUtils.copyProperties(summary, log);
        return log;
    }
}
package com.hsbc.risk.frtbsa.service.cde.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.hsbc.risk.frtbsa.base.AbstractBeamCheckService;
import com.hsbc.risk.frtbsa.coder.CommonBeamCoder;
import com.hsbc.risk.frtbsa.constant.CdeConstants;
import com.hsbc.risk.frtbsa.domain.bean.BaseBean;
import com.hsbc.risk.frtbsa.domain.bean.CombineBean;
import com.hsbc.risk.frtbsa.domain.bean.CommonBean;
import com.hsbc.risk.frtbsa.domain.enumType.CsvCountEnum;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import com.hsbc.risk.frtbsa.domain.enumType.RunBeamType;
import com.hsbc.risk.frtbsa.dto.BeamParam;
import com.hsbc.risk.frtbsa.dto.CsvFile;
import com.hsbc.risk.frtbsa.dto.RunStatusLog;
import com.hsbc.risk.frtbsa.entity.CdeConsistencyConfig;
import com.hsbc.risk.frtbsa.entity.CdeException;
import com.hsbc.risk.frtbsa.entity.CdeExceptionSummary;
import com.hsbc.risk.frtbsa.option.CommonOption;
import com.hsbc.risk.frtbsa.parse.dto.ParseParam;
import com.hsbc.risk.frtbsa.parse.transform.BaseBeanToCsvFn;
import com.hsbc.risk.frtbsa.parse.transform.CombineBaseBeanFn;
import com.hsbc.risk.frtbsa.parse.transform.CsvToBaseBeanFn;
import com.hsbc.risk.frtbsa.parse.transform.KVToBaseBeanFn;
import com.hsbc.risk.frtbsa.service.cde.ConsistencyBeamService;
import com.hsbc.risk.frtbsa.service.entity.CdeExceptionService;
import com.hsbc.risk.frtbsa.service.entity.CdeExceptionSummaryService;
import com.hsbc.risk.frtbsa.transform.*;
import com.hsbc.risk.frtbsa.utils.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.*;

@Service
@Slf4j
public class ConsistencyBeamServiceImpl extends AbstractBeamCheckService implements ConsistencyBeamService {

    private final static LineConvert convert = new ConvertLineToObject();

    @Autowired
    CdeExceptionService cdeExceptionService;
    @Autowired
    CdeExceptionSummaryService cdeExceptionSummaryService;

    @Value("${output.dir}")
    public String outputDir;

    private CdeConsistencyConfig cdeConfig;

    private Map<String, List<CsvFile>> runFileMap;

    private BeamParam beamParam;

    private String cdeName;
    private Long cdeId;
    private String fileType;
    private String legalEntity;

    private String site;
    private String preAggGroup;

    private FileTypeEnum fileTypeEnum;

    private boolean notExist;

    private HashMap<FileTypeEnum, PCollection<String>> afterControlPCollectionMap = Maps.newHashMap();
    private HashMap<FileTypeEnum, PCollection<String>> rawPCollectionMap = Maps.newHashMap();
    private HashMap<FileTypeEnum, HashMap<String, Integer>> indexMap = Maps.newHashMap();

    private HashMap<String, Integer> cdeIndexMap;
    private PCollection<String> afterControlPCollection;
    private PCollection<String> rawPCollection;

    private void setAfterControlPCollectionMap(Pipeline pipeline) {
        afterControlPCollectionMap.put(FileTypeEnum.DELTA, getCombinePCollection("SBM_Delta_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.DELTA), "afterControl"));
        afterControlPCollectionMap.put(FileTypeEnum.TRADE_ATTRIBUTE, getCombinePCollection("Trade_Attributes", runFileMap, pipeline, indexMap.get(FileTypeEnum.TRADE_ATTRIBUTE), "afterControl"));
        afterControlPCollectionMap.put(FileTypeEnum.VEGA, getCombinePCollection("SBM_Vega_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.VEGA), "afterControl"));
        afterControlPCollectionMap.put(FileTypeEnum.CURVATURE, getCombinePCollection("SBM_Curvature_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.CURVATURE), "afterControl"));
        afterControlPCollectionMap.put(FileTypeEnum.DRC, getCombinePCollection("DRC", runFileMap, pipeline, indexMap.get(FileTypeEnum.DRC), "afterControl"));
    }

    private void setRawPCollectionMap(Pipeline pipeline) {
        rawPCollectionMap.put(FileTypeEnum.DELTA, getCombinePCollection("SBM_Delta_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.DELTA), "raw"));
        rawPCollectionMap.put(FileTypeEnum.TRADE_ATTRIBUTE, getCombinePCollection("Trade_Attributes", runFileMap, pipeline, indexMap.get(FileTypeEnum.TRADE_ATTRIBUTE), "raw"));
        rawPCollectionMap.put(FileTypeEnum.VEGA, getCombinePCollection("SBM_Vega_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.VEGA), "raw"));
        rawPCollectionMap.put(FileTypeEnum.CURVATURE, getCombinePCollection("SBM_Curvature_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.CURVATURE), "raw"));
        rawPCollectionMap.put(FileTypeEnum.DRC, getCombinePCollection("DRC", runFileMap, pipeline, indexMap.get(FileTypeEnum.DRC), "raw"));
    }

    private void setIndexMap(CdeConstants cdeConstants) {
        indexMap.put(FileTypeEnum.DELTA, cdeConstants.cdeIndexMap.get(FileTypeEnum.DELTA));
        indexMap.put(FileTypeEnum.TRADE_ATTRIBUTE, cdeConstants.cdeIndexMap.get(FileTypeEnum.TRADE_ATTRIBUTE));
        indexMap.put(FileTypeEnum.VEGA, cdeConstants.cdeIndexMap.get(FileTypeEnum.VEGA));
        indexMap.put(FileTypeEnum.CURVATURE, cdeConstants.cdeIndexMap.get(FileTypeEnum.CURVATURE));
        indexMap.put(FileTypeEnum.DRC, cdeConstants.cdeIndexMap.get(FileTypeEnum.DRC));
    }

    private PCollection<String> getCombinePCollection(String key, Map<String, List<CsvFile>> runFileMap, Pipeline pipeline, HashMap<String, Integer> indexMap, String type) {
        PCollection<String> pResult = null;
        if (StringUtils.isEmpty(key) || !runFileMap.containsKey(key)) {
            return pResult;
        } else {
            List<CsvFile> csvFiles = runFileMap.get(key);
            List<PCollection<BaseBean>> beans = Lists.newArrayList();
            List<PCollection<KV<String, BaseBean>>> kvBeans = Lists.newArrayList();
            for (CsvFile csvFile : csvFiles) {
                String readPath = "";
                if (StringUtils.equals("raw", type)) {
                    readPath = csvFile.getRawFilePath();
                } else if (StringUtils.equals("afterControl", type)) {
                    readPath = csvFile.getAfterBatchFilePath();
                }
                PCollection<String> sPCollection = pipeline.apply(TextIO.read().from(readPath))
                        .apply(Filter.by(s -> !s.startsWith("ASOFDATE")));
                beans.add(sPCollection.apply(ParDo.of(new CsvToBaseBeanFn(convert, indexMap))));
            }
            for (PCollection<BaseBean> pBean : beans) {
                kvBeans.add(CombineBaseBeanFn.getKvBean(pBean));
            }
            KeyedPCollectionTuple<String> of = null;
            for (int i = 0; i < kvBeans.size(); i++) {
                PCollection<KV<String, BaseBean>> kvBean = kvBeans.get(i);
                if (i == 0) {
                    of = KeyedPCollectionTuple.of("kv" + i, kvBean);
                } else {
                    of = of.and("kv" + i, kvBean);
                }
            }
            PCollection<KV<String, CoGbkResult>> coGbkResultCollection = of.apply(CoGroupByKey.<String>create());
            PCollection<BaseBean> mixBean = coGbkResultCollection.apply(ParDo.of(new KVToBaseBeanFn(kvBeans.size())));
            pResult = mixBean.apply(ParDo.of(new BaseBeanToCsvFn()));
        }
        return pResult;
    }

    @Override
    public void beforeRunBeam(BeamParam beamParam) {
        this.beamParam = beamParam;
        //to create a new cde config
        cdeConfig = beamParam.getConsistencyConfig();

        cdeConfig.setOutputDir(outputDir);

        this.runFileMap = beamParam.getRunFileMap();

        cdeName = cdeConfig.getCdeName();
        fileType = cdeConfig.getFileType();
        cdeId = cdeConfig.getId();

        fileTypeEnum = FileTypeEnum.getFileType(fileType);

        site = beamParam.getSite();
        preAggGroup = beamParam.getPreAggGroup();

        notExist = false;
    }

    @Override
    public void runBeam() {
        try {
            CommonOption option = createOption(CommonOption.class, generateConsistencyCdeConfig(cdeConfig));
            initOption(option);

            Pipeline pipeline = Pipeline.create(option);
            pipeline.getCoderRegistry().registerCoderForClass(CommonBean.class, new CommonBeamCoder());

            CdeConstants cdeConstants = option.getCdeConstants();

            setIndexMap(cdeConstants);
            setAfterControlPCollectionMap(pipeline);
            setRawPCollectionMap(pipeline);

            cdeIndexMap = indexMap.get(fileTypeEnum);
            afterControlPCollection = afterControlPCollectionMap.get(fileTypeEnum);
            rawPCollection = rawPCollectionMap.get(fileTypeEnum);

            if (afterControlPCollection == null || rawPCollection == null) {
                notExist = true;
                return;
            }

            //parse to map,key is order
            Map<Integer, List<ParseParam>> orderParseMap = CdeMapUtil.getOrderParseMap(this.beamParam.getParseParamList());

            if (orderParseMap.size() > 0) {
                afterControlPCollection = filterCsv(afterControlPCollection, orderParseMap, 0, null, cdeIndexMap, convert);
                rawPCollection = filterCsv(rawPCollection, orderParseMap, 0, null, cdeIndexMap, convert);
            }

            PCollection<String> afterBatchTotalCount = afterControlPCollection.apply(Count.globally()).apply(ParDo.of(new CsvCountFn()));

            PCollection<CombineBean> afterBatchBean = afterControlPCollection.apply(ParDo.of(new CsvToComplexBeanFn(fileTypeEnum, cdeIndexMap, convert, cdeConfig.getCompareField(), "afterBatch")));
            PCollection<CombineBean> rawBean = rawPCollection.apply(ParDo.of(new CsvToComplexBeanFn(fileTypeEnum, cdeIndexMap, convert, cdeConfig.getCompareField(), "raw")));

            PCollection<CombineBean> otherBean = CombineBeanFn.getCombineComplexBean(afterBatchBean, rawBean);

            PCollection<CombineBean> mixBean = PCollectionList.of(afterBatchBean).and(otherBean).apply(Flatten.<CombineBean>pCollections());

            PCollection<KV<String, CombineBean>> mixBeanKv = CombineBeanFn.getComplexBeanKv(mixBean);

            PCollection<KV<String, Iterable<CombineBean>>> mixBeanPc = mixBeanKv.apply(GroupByKey.create());

            PCollection<CombineBean> filterMixBeanPC = mixBeanPc.apply(ParDo.of(new FilterConsistencyBeanFn()));

            PCollection<String> afterFilterCsv = filterMixBeanPC.apply(ParDo.of(new BeanToCsvFn()));

            //add trade logical
            //as long as it is not a trade attribute, it must be executed
            if (!Objects.equals(fileTypeEnum, FileTypeEnum.TRADE_ATTRIBUTE)) {
                //to be continue...
                PCollection<CombineBean> tradeBean = afterControlPCollectionMap.get(FileTypeEnum.TRADE_ATTRIBUTE)
                        .apply(Filter.by(s -> !s.startsWith("ASOFDATE")))
                        .apply(ParDo.of(new TradeCsvToBeanFn(indexMap.get(FileTypeEnum.TRADE_ATTRIBUTE))));

                afterFilterCsv = getCombineTradeCsv(afterFilterCsv, tradeBean);
            }

            //fail record
            PCollection<String> failCount = afterFilterCsv.apply(Count.globally()).apply(ParDo.of(new CsvCountFn()));

            //output to directory
            afterFilterCsv.apply(TextIO.write().to(CsvNameUtil.getOutputCsvName(outputDir, fileTypeEnum, RunBeamType.CONSISTENCY, cdeName))
                    .withNumShards(1).withSuffix(".csv"));

            afterBatchTotalCount.apply(TextIO.write().to(CsvNameUtil.getOutputCsvName(outputDir, fileTypeEnum,
                    cdeName, CsvCountEnum.TOTAL_COUNT, RunBeamType.CONSISTENCY))
                    .withNumShards(1).withSuffix(".csv"));

            failCount.apply(TextIO.write().to(CsvNameUtil.getOutputCsvName(outputDir, fileTypeEnum,
                    cdeName, CsvCountEnum.FAIL_COUNT, RunBeamType.CONSISTENCY))
                    .withNumShards(1).withSuffix(".csv"));

            pipeline.run().waitUntilFinish();

        } catch (Exception e) {
            log.error(e.getLocalizedMessage(), e);
        }
    }

    private PCollection<String> getCombineTradeCsv(PCollection<String> filterCsv, PCollection<CombineBean> tradeBean) {
        PCollection<CombineBean> filterBean = filterCsv.apply(ParDo.of(new CombineDeltaCsvToBeanFn(fileTypeEnum, cdeIndexMap, convert)));

        PCollection<CombineBean> otherBean = CombineBeanFn.getCombineBean(filterBean, tradeBean);

        PCollection<CombineBean> mixBean = PCollectionList.of(filterBean).and(otherBean).apply(Flatten.<CombineBean>pCollections());

        PCollection<KV<String, CombineBean>> mixBeanKv = CombineBeanFn.getMixBeanKv(mixBean);

        PCollection<KV<String, Iterable<CombineBean>>> mixBeanPc = mixBeanKv.apply(GroupByKey.create());

        PCollection<CombineBean> filterMixBeanPC = mixBeanPc.apply(ParDo.of(new FilterMixTradeFn()));

        return filterMixBeanPC.apply(ParDo.of(new BeanToCsvFn()));
    }

    @Override
    public void afterRunBeam() {
        if (!notExist) {
            ArrayList<String> totalCountResult = FileHelper.readCsv(FileHelper.findCsv(outputDir,
                    fileTypeEnum, cdeName, CsvCountEnum.TOTAL_COUNT, RunBeamType.CONSISTENCY));
            ArrayList<String> failCountResult = FileHelper.readCsv(FileHelper.findCsv(outputDir,
                    fileTypeEnum, cdeName, CsvCountEnum.FAIL_COUNT, RunBeamType.CONSISTENCY));

            Long totalCount = Long.parseLong(totalCountResult.get(0));
            Long failCount = Long.parseLong(failCountResult.get(0));

            if (totalCountResult.size() == 1) beamParam.setTotalCount(totalCount);
            if (failCountResult.size() == 1) beamParam.setFailCount(failCount);
            if (totalCount != null && failCount != null) beamParam.setPassCount(totalCount - failCount);

            ArrayList<String> csv = FileHelper.readCsv(FileHelper.findCsv(outputDir, fileTypeEnum, cdeName, RunBeamType.CONSISTENCY));
            beamParam.setCsv(csv);
        }
    }

    @Override
    public void setFilterDataInDB() {
        CdeConstants cdeConstants = CdeConstants.instance(cdeConfig.getVersion());

        String batchNum = cdeExceptionService.getBatchNum(fileTypeEnum);
        beamParam.setBatchNum(batchNum);

        ArrayList<String> csvs = beamParam.getCsv();

        HashMap<String, Integer> indexMap = cdeConstants.cdeIndexMap.get(this.fileTypeEnum);

        if (notExist) {
            CdeException cdeException = CdeException.builder()
                    .dqId(cdeConfig.getDqId())
                    .siteCode(site)
                    .preAggGroup(preAggGroup)
                    .tableName("cde_exception")
                    .dataFieldValue("dataFieldValue")
                    .logicalOrBusinessNameCde(cdeName)
                    .sourceSystemName("SUBMIT")
                    .producer("N")
                    .errorMessage("csv file is not exist!")
                    .dimension(RunBeamType.CONSISTENCY.name())
                    .controlRunTimestamp(String.valueOf(System.currentTimeMillis()))
                    .cdeExceptionRecordType("PLATO")
                    .bookIdentifier("bookIdentifier")
                    .createDate(new Date())
                    .createBy("SYSTEM")
                    .generatedDate(DateFormatUtil.getCurrentDate(DateFormatUtil.YYYY_MM_DD))
                    .fileType(fileType)
                    .cdeName(cdeName)
                    .cdeConfigId(cdeId)
                    .batchNum(batchNum)
                    .build();

            cdeExceptionService.save(cdeException);
        } else {
            List<CdeException> cdeExceptions = Lists.newArrayList();

            csvs.forEach(csv -> {
                String[] arr = SplitUtil.split(csv);

                String tradeId = arr[indexMap.get("TRADEID")];
                String legId = arr[indexMap.get("LEG_ID")];
                String dqId = cdeConfig.getDqId();
                String recordId = "";

                CdeException cdeException = CdeException.builder()
                        .dqId(dqId)
                        .siteCode(site)
                        .preAggGroup(preAggGroup)
                        .tableName("cde_exception")
                        .dataFieldValue("dataFieldValue")
                        .logicalOrBusinessNameCde(cdeName)
                        .sourceSystemName("SUBMIT")
                        .producer("N")
                        .errorMessage(cdeConfig.getErrorMessage())
                        .dimension(RunBeamType.CONSISTENCY.name())
                        .controlRunTimestamp(String.valueOf(System.currentTimeMillis()))
                        .cdeExceptionRecordType("PLATO")
                        .bookIdentifier("bookIdentifier")
                        .tradeId(tradeId)
                        .legId(legId)
                        .asOfDate(arr[indexMap.get("ASOFDATE")])
                        .createDate(new Date())
                        .createBy("SYSTEM")
                        .generatedDate(DateFormatUtil.getCurrentDate(DateFormatUtil.YYYY_MM_DD))
                        .fileType(fileType)
                        .cdeName(cdeName)
                        .cdeConfigId(cdeId)
                        .batchNum(batchNum)
                        .fxDelta(StringUtils.equals("null", arr[indexMap.get("FXDELTA")]) ? null : new Double(arr[indexMap.get("FXDELTA")]))
                        .cmDelta(StringUtils.equals("null", arr[indexMap.get("CMDELTA")]) ? null : new Double(arr[indexMap.get("CMDELTA")]))
                        .irDelta(StringUtils.equals("null", arr[indexMap.get("IRDELTA")]) ? null : new Double(arr[indexMap.get("IRDELTA")]))
                        .eqDelta(StringUtils.equals("null", arr[indexMap.get("EQDELTA")]) ? null : new Double(arr[indexMap.get("EQDELTA")]))
                        .build();

                //logic of record_id
                //DQID, Source System,Book, TradeID, LegID,Riskfactor
                if (!Objects.equals(FileTypeEnum.TRADE_ATTRIBUTE, fileTypeEnum)) {
                    legalEntity = arr[indexMap.get("TRADE_LEGAL_ENTITY")];
                    cdeException.setLegalEntity(legalEntity);
                    recordId = String.format("%s-%s-%s-%s-%s-%s", dqId, arr[indexMap.get("TRADE_SOURCE_SYSTEM")],
                            arr[indexMap.get("TRADE_BOOK")], tradeId, legId,
                            indexMap.get("RISKFACTOR") == null ? "no RiskFactor" : arr[indexMap.get("RISKFACTOR")]);
                } else {
                    legalEntity = arr[indexMap.get("LEGAL_ENTITY")];
                    cdeException.setLegalEntity(legalEntity);
                    recordId = String.format("%s-%s-%s-%s-%s-%s", dqId, arr[indexMap.get("SOURCE_SYSTEM")],
                            arr[indexMap.get("BOOK")], tradeId, legId, "no RiskFactor");
                }
                cdeException.setRecordId(recordId);

                //delta
                if (Objects.equals(FileTypeEnum.DELTA, fileTypeEnum)) {
                    cdeException.setRiskClass(arr[indexMap.get("RISKCLASS")]);
                    cdeException.setRiskFactor(arr[indexMap.get("RISKFACTOR")]);
                    cdeException.setSequence(arr[indexMap.get("SEQUENCE")]);
                    cdeException.setAdjustmentFlag(arr[indexMap.get("ADJUSTMENTFLAG")]);
                }
                //vega curvature
                if (Objects.equals(FileTypeEnum.VEGA, fileTypeEnum) || Objects.equals(FileTypeEnum.CURVATURE, fileTypeEnum)) {
                    cdeException.setRiskClass(arr[indexMap.get("RISKCLASS")]);
                    cdeException.setRiskFactor(arr[indexMap.get("RISKFACTOR")]);
                    cdeException.setAdjustmentFlag(arr[indexMap.get("ADJUSTMENTFLAG")]);
                }
                //drc
                if (Objects.equals(FileTypeEnum.DRC, fileTypeEnum)) {
                    cdeException.setRiskClass(arr[indexMap.get("RISKCLASS")]);
                    cdeException.setObligorId(arr[indexMap.get("OBLIGORID")]);
                    cdeException.setAdjustmentFlag(arr[indexMap.get("ADJUSTMENTFLAG")]);
                    cdeException.setDrcUnderlyingref(arr[indexMap.get("DRC_UNDERLYINGREF")]);
                }

                cdeExceptions.add(cdeException);
            });

            cdeExceptionService.batchSave(cdeExceptions);
        }
    }

    @Override
    public RunStatusLog finish(long costTime) {
        CdeExceptionSummary summary = null;
        if (notExist) {
            summary = CdeExceptionSummary.builder()
                    .siteCode(site)
                    .dqId(cdeConfig.getDqId())
                    .recordsFailed(0l)
                    .recordsPassed(0l)
                    .recordsTested(0l)
                    .createDate(new Date())
                    .createBy("SYSTEM")
                    .batchNum(beamParam.getBatchNum())
                    .generatedDate(DateFormatUtil.getCurrentDate(DateFormatUtil.YYYY_MM_DD))
                    .fileType(fileType)
                    .cdeName(cdeName)
                    .cdeConfigId(cdeId)
                    .runCostTimeCount(TimeUtil.formatMinSec(costTime))
                    .dimension(RunBeamType.CONSISTENCY.name())
                    .tableName("cde_exception_summary")
                    .build();
        }else{
            summary = CdeExceptionSummary.builder()
                    .siteCode(site)
                    .dqId(cdeConfig.getDqId())
                    .recordsFailed(beamParam.getFailCount())
                    .recordsPassed(beamParam.getPassCount())
                    .recordsTested(beamParam.getTotalCount())
                    .createDate(new Date())
                    .createBy("SYSTEM")
                    .batchNum(beamParam.getBatchNum())
                    .generatedDate(DateFormatUtil.getCurrentDate(DateFormatUtil.YYYY_MM_DD))
                    .fileType(fileType)
                    .cdeName(cdeName)
                    .cdeConfigId(cdeId)
                    .runCostTimeCount(TimeUtil.formatMinSec(costTime))
                    .dimension(RunBeamType.CONSISTENCY.name())
                    .legalEntity(legalEntity)
                    .tableName("cde_exception_summary")
                    .build();
        }

        cdeExceptionSummaryService.save(summary);

        RunStatusLog log = new RunStatusLog();
        BeanUtils.copyProperties(summary, log);
        return log;
    }
}
package com.hsbc.risk.frtbsa.service.cde;

import com.hsbc.risk.frtbsa.base.GenericBeamCheckService;
import com.hsbc.risk.frtbsa.parse.dto.ParseParam;

import java.util.List;
import java.util.Map;

public interface CompletenessBeamService extends GenericBeamCheckService {

}
package com.hsbc.risk.frtbsa.service.cde;

import com.hsbc.risk.frtbsa.base.GenericBeamCheckService;

public interface ConsistencyBeamService extends GenericBeamCheckService {

}
package com.hsbc.risk.frtbsa.service.entity.impl;

import com.hsbc.risk.frtbsa.base.AbstractGenericService;
import com.hsbc.risk.frtbsa.dao.CdeCompletenessConfigDao;
import com.hsbc.risk.frtbsa.entity.CdeCompletenessConfig;
import com.hsbc.risk.frtbsa.service.entity.CdeCompletenessConfigService;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class CdeCompletenessConfigServiceImpl extends AbstractGenericService<CdeCompletenessConfig, CdeCompletenessConfigDao>
        implements CdeCompletenessConfigService {

    @Override
    public List<CdeCompletenessConfig> getActiveCdes() {
        return this.repository.getActiveCdes();
    }
}
package com.hsbc.risk.frtbsa.service.entity.impl;

import com.hsbc.risk.frtbsa.base.AbstractGenericService;
import com.hsbc.risk.frtbsa.dao.CdeCompletenessRuleDao;
import com.hsbc.risk.frtbsa.entity.CdeCompletenessRule;
import com.hsbc.risk.frtbsa.service.entity.CdeCompletenessRuleService;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class CdeCompletenessRuleServiceImpl extends AbstractGenericService<CdeCompletenessRule, CdeCompletenessRuleDao>
        implements CdeCompletenessRuleService {

    @Override
    public List<CdeCompletenessRule> findRulesByCdeConfigId(Long cdeConfigId) {
        return this.repository.findRulesByCdeConfigId(cdeConfigId);
    }

    @Override
    public List<CdeCompletenessRule> findRulesByCdeConfigIdAndCombineFileType(Long cdeConfigId, String combineFileType) {
        return this.repository.findRulesByCdeConfigIdAndCombineFileType(cdeConfigId, combineFileType);
    }

    @Override
    public List<String> findDistinctCombineFileTypeByCdeConfigId(Long cdeConfigId) {
        return this.repository.findDistinctCombineFileTypeByCdeConfigId(cdeConfigId);
    }
}
package com.hsbc.risk.frtbsa.service.entity.impl;

import com.hsbc.risk.frtbsa.base.AbstractGenericService;
import com.hsbc.risk.frtbsa.dao.CdeConsistencyConfigDao;
import com.hsbc.risk.frtbsa.entity.CdeConsistencyConfig;
import com.hsbc.risk.frtbsa.service.entity.CdeConsistencyConfigService;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class CdeConsistencyConfigServiceImpl extends AbstractGenericService<CdeConsistencyConfig, CdeConsistencyConfigDao>
        implements CdeConsistencyConfigService {
    @Override
    public List<CdeConsistencyConfig> getActiveCdes() {
        return this.repository.getActiveCdes();
    }
}
package com.hsbc.risk.frtbsa.service.entity.impl;

import com.hsbc.risk.frtbsa.base.AbstractGenericService;
import com.hsbc.risk.frtbsa.dao.CdeConsistencyRuleDao;
import com.hsbc.risk.frtbsa.entity.CdeConsistencyRule;
import com.hsbc.risk.frtbsa.service.entity.CdeConsistencyRuleService;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class CdeConsistencyRuleServiceImpl extends AbstractGenericService<CdeConsistencyRule, CdeConsistencyRuleDao>
        implements CdeConsistencyRuleService {
    @Override
    public List<CdeConsistencyRule> findRulesByCdeConfigId(Long cdeConfigId) {
        return this.repository.findRulesByCdeConfigId(cdeConfigId);
    }
}
package com.hsbc.risk.frtbsa.service.entity.impl;

import com.hsbc.risk.frtbsa.base.AbstractGenericService;
import com.hsbc.risk.frtbsa.dao.CdeExceptionDao;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import com.hsbc.risk.frtbsa.entity.CdeException;
import com.hsbc.risk.frtbsa.service.entity.CdeExceptionService;
import com.hsbc.risk.frtbsa.utils.DateFormatUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.persistence.EntityManager;
import java.util.List;
import java.util.Locale;
import java.util.UUID;

@Service
@Slf4j
public class CdeExceptionServiceImpl extends AbstractGenericService<CdeException, CdeExceptionDao>
        implements CdeExceptionService {

    @Override
    public String getBatchNum(FileTypeEnum fileTypeEnum) {
        String currentDate = DateFormatUtil.getCurrentDate(DateFormatUtil.YYYYMMDD);
        return currentDate + "_region_" + fileTypeEnum.getRiskMeasure() + "_"
                + UUID.randomUUID().toString().toLowerCase(Locale.ROOT);
    }

    @Override
    @Modifying
    @Transactional
    public void batchSave(List<CdeException> entities) {
        log.info("CdeException batch insert begin!");
        if (CollectionUtils.isEmpty(entities)) {
            return;
        }
        EntityManager em = this.repository.getEntityManager();
        try {
            for (CdeException entity : entities) {
                em.persist(entity);
            }
            em.flush();
            em.clear();
            log.info("CdeException batch insert successful,total records:{}", entities.size());
        } catch (Exception e) {
            e.printStackTrace();
            log.error("CdeException batch insert fail!");
        }
    }

    @Override
    public List<CdeException> queryByBatchNum(String batchNum) {
        return this.repository.queryByBatchNum(batchNum);
    }

    @Override
    public List<CdeException> queryByCdeConfigIdAndFileTypeAndGeneratedDate(Long cdeName, String fileType, String generatedDate) {
        return this.repository.queryByCdeConfigIdAndFileTypeAndGeneratedDate(cdeName, fileType, generatedDate);
    }
}
package com.hsbc.risk.frtbsa.service.entity.impl;

import com.hsbc.risk.frtbsa.base.AbstractGenericService;
import com.hsbc.risk.frtbsa.dao.CdeExceptionSummaryDao;
import com.hsbc.risk.frtbsa.entity.CdeExceptionSummary;
import com.hsbc.risk.frtbsa.service.entity.CdeExceptionSummaryService;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class CdeExceptionSummaryServiceImpl extends AbstractGenericService<CdeExceptionSummary, CdeExceptionSummaryDao>
        implements CdeExceptionSummaryService {
    @Override
    public CdeExceptionSummary queryByBatchNum(String batchNum) {
        return this.repository.queryByBatchNum(batchNum);
    }

    @Override
    public List<String> queryGeneratedDateList(Long cdeConfigId) {
        return this.repository.queryGeneratedDateList(cdeConfigId);
    }

    @Override
    public List<CdeExceptionSummary> queryByCdeConfigIdAndFileTypeAndGeneratedDate(Long cdeName, String fileType, String generatedDate) {
        return this.repository.queryByCdeConfigIdAndFileTypeAndGeneratedDate(cdeName, fileType, generatedDate);
    }
}
package com.hsbc.risk.frtbsa.service.entity;

import com.hsbc.risk.frtbsa.base.GenericService;
import com.hsbc.risk.frtbsa.dao.CdeCompletenessConfigDao;
import com.hsbc.risk.frtbsa.entity.CdeCompletenessConfig;

import java.util.List;

public interface CdeCompletenessConfigService extends GenericService<CdeCompletenessConfig, CdeCompletenessConfigDao> {

    List<CdeCompletenessConfig> getActiveCdes();
}
package com.hsbc.risk.frtbsa.service.entity;

import com.hsbc.risk.frtbsa.base.GenericService;
import com.hsbc.risk.frtbsa.dao.CdeCompletenessRuleDao;
import com.hsbc.risk.frtbsa.entity.CdeCompletenessRule;

import java.util.List;

public interface CdeCompletenessRuleService extends GenericService<CdeCompletenessRule, CdeCompletenessRuleDao> {

    List<CdeCompletenessRule> findRulesByCdeConfigId(Long cdeConfigId);

    List<CdeCompletenessRule> findRulesByCdeConfigIdAndCombineFileType(Long cdeConfigId, String combineFileType);

    List<String> findDistinctCombineFileTypeByCdeConfigId(Long cdeConfigId);
}
package com.hsbc.risk.frtbsa.service.entity;

import com.hsbc.risk.frtbsa.base.GenericService;
import com.hsbc.risk.frtbsa.dao.CdeConsistencyConfigDao;
import com.hsbc.risk.frtbsa.entity.CdeConsistencyConfig;

import java.util.List;

public interface CdeConsistencyConfigService extends GenericService<CdeConsistencyConfig, CdeConsistencyConfigDao> {

    List<CdeConsistencyConfig> getActiveCdes();
}
package com.hsbc.risk.frtbsa.service.entity;

import com.hsbc.risk.frtbsa.base.GenericService;
import com.hsbc.risk.frtbsa.dao.CdeConsistencyRuleDao;
import com.hsbc.risk.frtbsa.entity.CdeConsistencyRule;

import java.util.List;

public interface CdeConsistencyRuleService extends GenericService<CdeConsistencyRule, CdeConsistencyRuleDao> {

    List<CdeConsistencyRule> findRulesByCdeConfigId(Long cdeConfigId);
}
package com.hsbc.risk.frtbsa.service.entity;

import com.hsbc.risk.frtbsa.base.GenericService;
import com.hsbc.risk.frtbsa.dao.CdeExceptionDao;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import com.hsbc.risk.frtbsa.entity.CdeException;

import java.util.List;

public interface CdeExceptionService extends GenericService<CdeException, CdeExceptionDao> {

    String getBatchNum(FileTypeEnum fileTypeEnum);

    void batchSave(List<CdeException> entities);

    List<CdeException> queryByBatchNum(String batchNum);

    List<CdeException> queryByCdeConfigIdAndFileTypeAndGeneratedDate(Long cdeName, String fileType, String generatedDate);
}
package com.hsbc.risk.frtbsa.service.entity;

import com.hsbc.risk.frtbsa.base.GenericService;
import com.hsbc.risk.frtbsa.dao.CdeExceptionSummaryDao;
import com.hsbc.risk.frtbsa.entity.CdeExceptionSummary;

import java.util.List;

public interface CdeExceptionSummaryService extends GenericService<CdeExceptionSummary, CdeExceptionSummaryDao> {

    CdeExceptionSummary queryByBatchNum(String batchNum);

    List<String> queryGeneratedDateList(Long cdeConfigId);

    List<CdeExceptionSummary> queryByCdeConfigIdAndFileTypeAndGeneratedDate(Long cdeName, String fileType, String generatedDate);

}


这篇关于service的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!