本文主要是介绍service,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
package com.hsbc.risk.frtbsa.service.cde.impl;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.hsbc.risk.frtbsa.base.AbstractBeamCheckService;
import com.hsbc.risk.frtbsa.coder.CommonBeamCoder;
import com.hsbc.risk.frtbsa.constant.CdeConstants;
import com.hsbc.risk.frtbsa.domain.bean.BaseBean;
import com.hsbc.risk.frtbsa.domain.bean.CombineBean;
import com.hsbc.risk.frtbsa.domain.bean.CommonBean;
import com.hsbc.risk.frtbsa.domain.enumType.CsvCountEnum;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import com.hsbc.risk.frtbsa.domain.enumType.RunBeamType;
import com.hsbc.risk.frtbsa.dto.BeamParam;
import com.hsbc.risk.frtbsa.dto.CsvFile;
import com.hsbc.risk.frtbsa.dto.RunStatusLog;
import com.hsbc.risk.frtbsa.entity.CdeCompletenessConfig;
import com.hsbc.risk.frtbsa.entity.CdeException;
import com.hsbc.risk.frtbsa.entity.CdeExceptionSummary;
import com.hsbc.risk.frtbsa.option.CommonOption;
import com.hsbc.risk.frtbsa.parse.dto.ParseParam;
import com.hsbc.risk.frtbsa.parse.transform.BaseBeanToCsvFn;
import com.hsbc.risk.frtbsa.parse.transform.CombineBaseBeanFn;
import com.hsbc.risk.frtbsa.parse.transform.CsvToBaseBeanFn;
import com.hsbc.risk.frtbsa.parse.transform.KVToBaseBeanFn;
import com.hsbc.risk.frtbsa.service.cde.CompletenessBeamService;
import com.hsbc.risk.frtbsa.service.entity.CdeExceptionService;
import com.hsbc.risk.frtbsa.service.entity.CdeExceptionSummaryService;
import com.hsbc.risk.frtbsa.transform.*;
import com.hsbc.risk.frtbsa.utils.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.*;
@Service
@Slf4j
public class CompletenessBeamServiceImpl extends AbstractBeamCheckService implements CompletenessBeamService {
private final static LineConvert convert = new ConvertLineToObject();
@Autowired
CdeExceptionService cdeExceptionService;
@Autowired
CdeExceptionSummaryService cdeExceptionSummaryService;
@Value("${output.dir}")
public String outputDir;
private CdeCompletenessConfig cdeConfig;
private BeamParam beamParam;
private String site;
private String preAggGroup;
private String cdeName;
private String fileType;
private Long cdeId;
private String legalEntity;
private FileTypeEnum fileTypeEnum;
private Map<String, List<CsvFile>> runFileMap;
private HashMap<FileTypeEnum, PCollection<String>> pCollectionMap = Maps.newHashMap();
private HashMap<FileTypeEnum, HashMap<String, Integer>> indexMap = Maps.newHashMap();
private HashMap<String, Integer> cdeIndexMap;
private PCollection<String> cdePCollection;
private boolean notExist;
private void setPCollectionMap(Pipeline pipeline) {
pCollectionMap.put(FileTypeEnum.DELTA, getCombinePCollection("SBM_Delta_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.DELTA)));
pCollectionMap.put(FileTypeEnum.TRADE_ATTRIBUTE, getCombinePCollection("Trade_Attributes", runFileMap, pipeline, indexMap.get(FileTypeEnum.TRADE_ATTRIBUTE)));
pCollectionMap.put(FileTypeEnum.VEGA, getCombinePCollection("SBM_Vega_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.VEGA)));
pCollectionMap.put(FileTypeEnum.CURVATURE, getCombinePCollection("SBM_Curvature_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.CURVATURE)));
pCollectionMap.put(FileTypeEnum.DRC, getCombinePCollection("DRC", runFileMap, pipeline, indexMap.get(FileTypeEnum.DRC)));
}
private void setIndexMap(CdeConstants cdeConstants) {
indexMap.put(FileTypeEnum.DELTA, cdeConstants.cdeIndexMap.get(FileTypeEnum.DELTA));
indexMap.put(FileTypeEnum.TRADE_ATTRIBUTE, cdeConstants.cdeIndexMap.get(FileTypeEnum.TRADE_ATTRIBUTE));
indexMap.put(FileTypeEnum.VEGA, cdeConstants.cdeIndexMap.get(FileTypeEnum.VEGA));
indexMap.put(FileTypeEnum.CURVATURE, cdeConstants.cdeIndexMap.get(FileTypeEnum.CURVATURE));
indexMap.put(FileTypeEnum.DRC, cdeConstants.cdeIndexMap.get(FileTypeEnum.DRC));
}
private PCollection<String> getCombinePCollection(String key, Map<String, List<CsvFile>> runFileMap, Pipeline pipeline, HashMap<String, Integer> indexMap) {
PCollection<String> pResult = null;
if (StringUtils.isEmpty(key) || !runFileMap.containsKey(key)) {
return pResult;
} else {
List<CsvFile> csvFiles = runFileMap.get(key);
List<PCollection<BaseBean>> beans = Lists.newArrayList();
List<PCollection<KV<String, BaseBean>>> kvBeans = Lists.newArrayList();
for (CsvFile csvFile : csvFiles) {
PCollection<String> sPCollection = pipeline.apply(TextIO.read().from(csvFile.getAfterBatchFilePath()))
.apply(Filter.by(s -> !s.startsWith("ASOFDATE")));
beans.add(sPCollection.apply(ParDo.of(new CsvToBaseBeanFn(convert, indexMap))));
}
for (PCollection<BaseBean> pBean : beans) {
kvBeans.add(CombineBaseBeanFn.getKvBean(pBean));
}
KeyedPCollectionTuple<String> of = null;
for (int i = 0; i < kvBeans.size(); i++) {
PCollection<KV<String, BaseBean>> kvBean = kvBeans.get(i);
if (i == 0) {
of = KeyedPCollectionTuple.of("kv" + i, kvBean);
} else {
of = of.and("kv" + i, kvBean);
}
}
PCollection<KV<String, CoGbkResult>> coGbkResultCollection = of.apply(CoGroupByKey.<String>create());
PCollection<BaseBean> mixBean = coGbkResultCollection.apply(ParDo.of(new KVToBaseBeanFn(kvBeans.size())));
pResult = mixBean.apply(ParDo.of(new BaseBeanToCsvFn()));
}
return pResult;
}
@Override
public void beforeRunBeam(BeamParam beamParam) {
this.beamParam = beamParam;
this.runFileMap = beamParam.getRunFileMap();
//to create a new cde config
cdeConfig = beamParam.getCdeConfig();
cdeConfig.setOutputDir(outputDir);
cdeName = cdeConfig.getCdeName();
fileType = cdeConfig.getFileType();
cdeId = cdeConfig.getId();
fileTypeEnum = FileTypeEnum.getFileType(cdeConfig.getFileType());
site = beamParam.getSite();
preAggGroup = beamParam.getPreAggGroup();
notExist = false;
}
@Override
public void runBeam() {
try {
CommonOption option = createOption(CommonOption.class, generateCompletenessCdeConfig(cdeConfig));
initOption(option);
Pipeline pipeline = Pipeline.create(option);
pipeline.getCoderRegistry().registerCoderForClass(CommonBean.class, new CommonBeamCoder());
CdeConstants cdeConstants = option.getCdeConstants();
setIndexMap(cdeConstants);
setPCollectionMap(pipeline);
cdeIndexMap = indexMap.get(fileTypeEnum);
cdePCollection = pCollectionMap.get(fileTypeEnum);
if (cdePCollection == null) {
notExist = true;
return;
}
PCollection<String> totalCount = cdePCollection.apply(Count.globally()).apply(ParDo.of(new CsvCountFn()));
PCollection<String> filterCsv = null;
//parse to map,key is order
Map<Integer, List<ParseParam>> orderParseMap = CdeMapUtil.getOrderParseMap(this.beamParam.getParseParamList());
//parse to map,key is fileType
Map<String, Map<Integer, List<ParseParam>>> combineOrderParseMap = CdeMapUtil.getCombineOrderParseMap(this.beamParam.getCombineParamMap());
//filter own file first
filterCsv = filterCsv(cdePCollection, orderParseMap, 0, null, cdeIndexMap, convert);
//filter combine file array secondly if combineOrderParseMap.size > 0
if (combineOrderParseMap.size() > 0) {
for (Map.Entry<String, Map<Integer, List<ParseParam>>> entry : combineOrderParseMap.entrySet()) {
FileTypeEnum fileType = FileTypeEnum.getFileType(entry.getKey());
HashMap<String, Integer> indexMap = cdeConstants.cdeIndexMap.get(fileType);
Map<Integer, List<ParseParam>> value = entry.getValue();
PCollection<String> csv = pCollectionMap.get(fileType);
PCollection<String> combineCsv = filterCsv(csv, value, 0, null, indexMap, convert);
PCollection<CombineBean> sourceBean = combineCsv.apply(ParDo.of(new CsvToComplexBeanFn(fileType, indexMap, convert)));
//now combine own file and combine files
filterCsv = getRuleFilterCombineCsv(filterCsv, sourceBean);
}
}
PCollection<String> failCount = filterCsv.apply(Count.globally()).apply(ParDo.of(new CsvCountFn()));
PCollection<String> analysisFilterCsv;
//add delta logical
if (Objects.equals(fileTypeEnum, FileTypeEnum.DELTA)) {
analysisFilterCsv = filterCsv.apply(ParDo.of(new DeltaCsvToCsvFn(indexMap.get(FileTypeEnum.DELTA))));
} else {
PCollection<CombineBean> deltaBean = pCollectionMap.get(FileTypeEnum.DELTA)
.apply(Filter.by(s -> !s.startsWith("ASOFDATE")))
.apply(ParDo.of(new DeltaCsvToBeanFn(indexMap.get(FileTypeEnum.DELTA))));
analysisFilterCsv = getCombineDeltaCsv(filterCsv, deltaBean);
}
//add trade logical
//as long as it is not a trade attribute, it must be executed
if (!Objects.equals(fileTypeEnum, FileTypeEnum.TRADE_ATTRIBUTE)) {
PCollection<CombineBean> tradeBean = pCollectionMap.get(FileTypeEnum.TRADE_ATTRIBUTE)
.apply(Filter.by(s -> !s.startsWith("ASOFDATE")))
.apply(ParDo.of(new TradeCsvToBeanFn(indexMap.get(FileTypeEnum.TRADE_ATTRIBUTE))));
analysisFilterCsv = getCombineTradeCsv(analysisFilterCsv, tradeBean);
}
//output to directory
analysisFilterCsv.apply(TextIO.write().to(CsvNameUtil.getOutputCsvName(outputDir, fileTypeEnum, RunBeamType.COMPLETENESS, cdeName))
.withNumShards(1).withSuffix(".csv"));
totalCount.apply(TextIO.write().to(CsvNameUtil.getOutputCsvName(outputDir, fileTypeEnum,
cdeName, CsvCountEnum.TOTAL_COUNT, RunBeamType.COMPLETENESS))
.withNumShards(1).withSuffix(".csv"));
failCount.apply(TextIO.write().to(CsvNameUtil.getOutputCsvName(outputDir, fileTypeEnum,
cdeName, CsvCountEnum.FAIL_COUNT, RunBeamType.COMPLETENESS))
.withNumShards(1).withSuffix(".csv"));
pipeline.run().waitUntilFinish();
} catch (Exception e) {
log.error(e.getLocalizedMessage(), e);
}
}
private PCollection<String> getRuleFilterCombineCsv(PCollection<String> target, PCollection<CombineBean> sourceBean) {
PCollection<CombineBean> targetBean = target.apply(ParDo.of(new CsvToComplexBeanFn(fileTypeEnum, cdeIndexMap, convert)));
PCollection<CombineBean> otherBean = CombineBeanFn.getCombineComplexBean(targetBean, sourceBean);
PCollection<CombineBean> mixBean = PCollectionList.of(targetBean).and(otherBean).apply(Flatten.<CombineBean>pCollections());
PCollection<KV<String, CombineBean>> mixBeanKv = CombineBeanFn.getComplexBeanKv(mixBean);
PCollection<KV<String, Iterable<CombineBean>>> mixBeanPc = mixBeanKv.apply(GroupByKey.create());
PCollection<CombineBean> filterMixBeanPC = mixBeanPc.apply(ParDo.of(new FilterMixComplexBeanFn(fileTypeEnum)));
return filterMixBeanPC.apply(ParDo.of(new BeanToCsvFn()));
}
private PCollection<String> getCombineDeltaCsv(PCollection<String> filterCsv, PCollection<CombineBean> deltaBean) {
PCollection<CombineBean> filterBean = filterCsv.apply(ParDo.of(new CombineDeltaCsvToBeanFn(fileTypeEnum, cdeIndexMap, convert)));
PCollection<CombineBean> otherBean = CombineBeanFn.getCombineBean(filterBean, deltaBean);
PCollection<CombineBean> mixBean = PCollectionList.of(filterBean).and(otherBean).apply(Flatten.<CombineBean>pCollections());
PCollection<KV<String, CombineBean>> mixBeanKv = CombineBeanFn.getMixBeanKv(mixBean);
PCollection<KV<String, Iterable<CombineBean>>> mixBeanPc = mixBeanKv.apply(GroupByKey.create());
PCollection<CombineBean> filterMixBeanPC = mixBeanPc.apply(ParDo.of(new FilterMixDeltaFn()));
return filterMixBeanPC.apply(ParDo.of(new BeanToCsvFn()));
}
private PCollection<String> getCombineTradeCsv(PCollection<String> filterCsv, PCollection<CombineBean> tradeBean) {
PCollection<CombineBean> filterBean = filterCsv.apply(ParDo.of(new CombineTradeCsvToBeanFn(fileTypeEnum, cdeIndexMap, convert)));
PCollection<CombineBean> otherBean = CombineBeanFn.getCombineBean(filterBean, tradeBean);
PCollection<CombineBean> mixBean = PCollectionList.of(filterBean).and(otherBean).apply(Flatten.<CombineBean>pCollections());
PCollection<KV<String, CombineBean>> mixBeanKv = CombineBeanFn.getMixBeanKv(mixBean);
PCollection<KV<String, Iterable<CombineBean>>> mixBeanPc = mixBeanKv.apply(GroupByKey.create());
PCollection<CombineBean> filterMixBeanPC = mixBeanPc.apply(ParDo.of(new FilterMixTradeFn()));
return filterMixBeanPC.apply(ParDo.of(new BeanToCsvFn()));
}
@Override
public void afterRunBeam() {
if (!notExist) {
ArrayList<String> totalCountResult = FileHelper.readCsv(FileHelper.findCsv(outputDir,
fileTypeEnum, cdeName, CsvCountEnum.TOTAL_COUNT, RunBeamType.COMPLETENESS));
ArrayList<String> failCountResult = FileHelper.readCsv(FileHelper.findCsv(outputDir,
fileTypeEnum, cdeName, CsvCountEnum.FAIL_COUNT, RunBeamType.COMPLETENESS));
Long totalCount = Long.parseLong(totalCountResult.get(0));
Long failCount = Long.parseLong(failCountResult.get(0));
if (totalCountResult.size() == 1) beamParam.setTotalCount(totalCount);
if (failCountResult.size() == 1) beamParam.setFailCount(failCount);
if (totalCount != null && failCount != null) beamParam.setPassCount(totalCount - failCount);
ArrayList<String> csv = FileHelper.readCsv(FileHelper.findCsv(outputDir, fileTypeEnum, cdeName, RunBeamType.COMPLETENESS));
beamParam.setCsv(csv);
}
}
@Override
public void setFilterDataInDB() {
CdeConstants cdeConstants = CdeConstants.instance(cdeConfig.getVersion());
String batchNum = cdeExceptionService.getBatchNum(fileTypeEnum);
beamParam.setBatchNum(batchNum);
ArrayList<String> csvs = beamParam.getCsv();
HashMap<String, Integer> indexMap = cdeConstants.cdeIndexMap.get(this.fileTypeEnum);
if (notExist) {
CdeException cdeException = CdeException.builder()
.dqId(cdeConfig.getDqId())
.siteCode(site)
.preAggGroup(preAggGroup)
.tableName("cde_exception")
.dataFieldValue("dataFieldValue")
.logicalOrBusinessNameCde(cdeName)
.sourceSystemName("SUBMIT")
.producer("N")
.errorMessage("csv file is not exist!")
.dimension(RunBeamType.COMPLETENESS.name())
.controlRunTimestamp(String.valueOf(System.currentTimeMillis()))
.cdeExceptionRecordType("PLATO")
.bookIdentifier("bookIdentifier")
.createDate(new Date())
.createBy("SYSTEM")
.generatedDate(DateFormatUtil.getCurrentDate(DateFormatUtil.YYYY_MM_DD))
.fileType(fileType)
.cdeName(cdeName)
.cdeConfigId(cdeId)
.batchNum(batchNum)
.build();
cdeExceptionService.save(cdeException);
} else {
List<CdeException> cdeExceptions = Lists.newArrayList();
csvs.forEach(csv -> {
String[] arr = SplitUtil.split(csv);
String tradeId = arr[indexMap.get("TRADEID")];
String legId = arr[indexMap.get("LEG_ID")];
String dqId = cdeConfig.getDqId();
String recordId = "";
CdeException cdeException = CdeException.builder()
.dqId(dqId)
.siteCode(site)
.preAggGroup(preAggGroup)
.tableName("cde_exception")
.dataFieldValue("dataFieldValue")
.logicalOrBusinessNameCde(cdeName)
.sourceSystemName("SUBMIT")
.producer("N")
.errorMessage(cdeConfig.getErrorMessage())
.dimension(RunBeamType.COMPLETENESS.name())
.controlRunTimestamp(String.valueOf(System.currentTimeMillis()))
.cdeExceptionRecordType("PLATO")
.bookIdentifier("bookIdentifier")
.tradeId(tradeId)
.legId(legId)
.asOfDate(arr[indexMap.get("ASOFDATE")])
.createDate(new Date())
.createBy("SYSTEM")
.generatedDate(DateFormatUtil.getCurrentDate(DateFormatUtil.YYYY_MM_DD))
.fileType(fileType)
.cdeName(cdeName)
.cdeConfigId(cdeId)
.batchNum(batchNum)
.fxDelta(StringUtils.equals("null", arr[indexMap.get("FXDELTA")]) ? null : new Double(arr[indexMap.get("FXDELTA")]))
.cmDelta(StringUtils.equals("null", arr[indexMap.get("CMDELTA")]) ? null : new Double(arr[indexMap.get("CMDELTA")]))
.irDelta(StringUtils.equals("null", arr[indexMap.get("IRDELTA")]) ? null : new Double(arr[indexMap.get("IRDELTA")]))
.eqDelta(StringUtils.equals("null", arr[indexMap.get("EQDELTA")]) ? null : new Double(arr[indexMap.get("EQDELTA")]))
.build();
//logic of record_id
//DQID, Source System,Book, TradeID, LegID,Riskfactor
if (!Objects.equals(FileTypeEnum.TRADE_ATTRIBUTE, fileTypeEnum)) {
legalEntity = arr[indexMap.get("TRADE_LEGAL_ENTITY")];
cdeException.setLegalEntity(legalEntity);
recordId = String.format("%s-%s-%s-%s-%s-%s", dqId, arr[indexMap.get("TRADE_SOURCE_SYSTEM")],
arr[indexMap.get("TRADE_BOOK")], tradeId, legId,
indexMap.get("RISKFACTOR") == null ? "no RiskFactor" : arr[indexMap.get("RISKFACTOR")]);
} else {
legalEntity = arr[indexMap.get("LEGAL_ENTITY")];
cdeException.setLegalEntity(legalEntity);
recordId = String.format("%s-%s-%s-%s-%s-%s", dqId, arr[indexMap.get("SOURCE_SYSTEM")],
arr[indexMap.get("BOOK")], tradeId, legId, "no RiskFactor");
}
cdeException.setRecordId(recordId);
//delta
if (Objects.equals(FileTypeEnum.DELTA, fileTypeEnum)) {
cdeException.setRiskClass(arr[indexMap.get("RISKCLASS")]);
cdeException.setRiskFactor(arr[indexMap.get("RISKFACTOR")]);
cdeException.setSequence(arr[indexMap.get("SEQUENCE")]);
cdeException.setAdjustmentFlag(arr[indexMap.get("ADJUSTMENTFLAG")]);
}
//vega curvature
if (Objects.equals(FileTypeEnum.VEGA, fileTypeEnum) || Objects.equals(FileTypeEnum.CURVATURE, fileTypeEnum)) {
cdeException.setRiskClass(arr[indexMap.get("RISKCLASS")]);
cdeException.setRiskFactor(arr[indexMap.get("RISKFACTOR")]);
cdeException.setAdjustmentFlag(arr[indexMap.get("ADJUSTMENTFLAG")]);
}
//drc
if (Objects.equals(FileTypeEnum.DRC, fileTypeEnum)) {
cdeException.setRiskClass(arr[indexMap.get("RISKCLASS")]);
cdeException.setObligorId(arr[indexMap.get("OBLIGORID")]);
cdeException.setAdjustmentFlag(arr[indexMap.get("ADJUSTMENTFLAG")]);
cdeException.setDrcUnderlyingref(arr[indexMap.get("DRC_UNDERLYINGREF")]);
}
cdeExceptions.add(cdeException);
});
cdeExceptionService.batchSave(cdeExceptions);
}
}
@Override
public RunStatusLog finish(long costTime) {
CdeExceptionSummary summary = null;
if (notExist) {
summary = CdeExceptionSummary.builder()
.siteCode(site)
.dqId(cdeConfig.getDqId())
.recordsFailed(0l)
.recordsPassed(0l)
.recordsTested(0l)
.createDate(new Date())
.createBy("SYSTEM")
.batchNum(beamParam.getBatchNum())
.generatedDate(DateFormatUtil.getCurrentDate(DateFormatUtil.YYYY_MM_DD))
.fileType(fileType)
.cdeName(cdeName)
.cdeConfigId(cdeId)
.runCostTimeCount(TimeUtil.formatMinSec(costTime))
.dimension(RunBeamType.COMPLETENESS.name())
.tableName("cde_exception_summary")
.build();
} else {
summary = CdeExceptionSummary.builder()
.siteCode(site)
.dqId(cdeConfig.getDqId())
.recordsFailed(beamParam.getFailCount())
.recordsPassed(beamParam.getPassCount())
.recordsTested(beamParam.getTotalCount())
.createDate(new Date())
.createBy("SYSTEM")
.batchNum(beamParam.getBatchNum())
.generatedDate(DateFormatUtil.getCurrentDate(DateFormatUtil.YYYY_MM_DD))
.fileType(fileType)
.cdeName(cdeName)
.cdeConfigId(cdeId)
.runCostTimeCount(TimeUtil.formatMinSec(costTime))
.dimension(RunBeamType.COMPLETENESS.name())
.tableName("cde_exception_summary")
.legalEntity(legalEntity)
.build();
}
cdeExceptionSummaryService.save(summary);
RunStatusLog log = new RunStatusLog();
BeanUtils.copyProperties(summary, log);
return log;
}
}
package com.hsbc.risk.frtbsa.service.cde.impl;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.hsbc.risk.frtbsa.base.AbstractBeamCheckService;
import com.hsbc.risk.frtbsa.coder.CommonBeamCoder;
import com.hsbc.risk.frtbsa.constant.CdeConstants;
import com.hsbc.risk.frtbsa.domain.bean.BaseBean;
import com.hsbc.risk.frtbsa.domain.bean.CombineBean;
import com.hsbc.risk.frtbsa.domain.bean.CommonBean;
import com.hsbc.risk.frtbsa.domain.enumType.CsvCountEnum;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import com.hsbc.risk.frtbsa.domain.enumType.RunBeamType;
import com.hsbc.risk.frtbsa.dto.BeamParam;
import com.hsbc.risk.frtbsa.dto.CsvFile;
import com.hsbc.risk.frtbsa.dto.RunStatusLog;
import com.hsbc.risk.frtbsa.entity.CdeConsistencyConfig;
import com.hsbc.risk.frtbsa.entity.CdeException;
import com.hsbc.risk.frtbsa.entity.CdeExceptionSummary;
import com.hsbc.risk.frtbsa.option.CommonOption;
import com.hsbc.risk.frtbsa.parse.dto.ParseParam;
import com.hsbc.risk.frtbsa.parse.transform.BaseBeanToCsvFn;
import com.hsbc.risk.frtbsa.parse.transform.CombineBaseBeanFn;
import com.hsbc.risk.frtbsa.parse.transform.CsvToBaseBeanFn;
import com.hsbc.risk.frtbsa.parse.transform.KVToBaseBeanFn;
import com.hsbc.risk.frtbsa.service.cde.ConsistencyBeamService;
import com.hsbc.risk.frtbsa.service.entity.CdeExceptionService;
import com.hsbc.risk.frtbsa.service.entity.CdeExceptionSummaryService;
import com.hsbc.risk.frtbsa.transform.*;
import com.hsbc.risk.frtbsa.utils.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.*;
@Service
@Slf4j
public class ConsistencyBeamServiceImpl extends AbstractBeamCheckService implements ConsistencyBeamService {
private final static LineConvert convert = new ConvertLineToObject();
@Autowired
CdeExceptionService cdeExceptionService;
@Autowired
CdeExceptionSummaryService cdeExceptionSummaryService;
@Value("${output.dir}")
public String outputDir;
private CdeConsistencyConfig cdeConfig;
private Map<String, List<CsvFile>> runFileMap;
private BeamParam beamParam;
private String cdeName;
private Long cdeId;
private String fileType;
private String legalEntity;
private String site;
private String preAggGroup;
private FileTypeEnum fileTypeEnum;
private boolean notExist;
private HashMap<FileTypeEnum, PCollection<String>> afterControlPCollectionMap = Maps.newHashMap();
private HashMap<FileTypeEnum, PCollection<String>> rawPCollectionMap = Maps.newHashMap();
private HashMap<FileTypeEnum, HashMap<String, Integer>> indexMap = Maps.newHashMap();
private HashMap<String, Integer> cdeIndexMap;
private PCollection<String> afterControlPCollection;
private PCollection<String> rawPCollection;
private void setAfterControlPCollectionMap(Pipeline pipeline) {
afterControlPCollectionMap.put(FileTypeEnum.DELTA, getCombinePCollection("SBM_Delta_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.DELTA), "afterControl"));
afterControlPCollectionMap.put(FileTypeEnum.TRADE_ATTRIBUTE, getCombinePCollection("Trade_Attributes", runFileMap, pipeline, indexMap.get(FileTypeEnum.TRADE_ATTRIBUTE), "afterControl"));
afterControlPCollectionMap.put(FileTypeEnum.VEGA, getCombinePCollection("SBM_Vega_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.VEGA), "afterControl"));
afterControlPCollectionMap.put(FileTypeEnum.CURVATURE, getCombinePCollection("SBM_Curvature_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.CURVATURE), "afterControl"));
afterControlPCollectionMap.put(FileTypeEnum.DRC, getCombinePCollection("DRC", runFileMap, pipeline, indexMap.get(FileTypeEnum.DRC), "afterControl"));
}
private void setRawPCollectionMap(Pipeline pipeline) {
rawPCollectionMap.put(FileTypeEnum.DELTA, getCombinePCollection("SBM_Delta_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.DELTA), "raw"));
rawPCollectionMap.put(FileTypeEnum.TRADE_ATTRIBUTE, getCombinePCollection("Trade_Attributes", runFileMap, pipeline, indexMap.get(FileTypeEnum.TRADE_ATTRIBUTE), "raw"));
rawPCollectionMap.put(FileTypeEnum.VEGA, getCombinePCollection("SBM_Vega_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.VEGA), "raw"));
rawPCollectionMap.put(FileTypeEnum.CURVATURE, getCombinePCollection("SBM_Curvature_Sensi", runFileMap, pipeline, indexMap.get(FileTypeEnum.CURVATURE), "raw"));
rawPCollectionMap.put(FileTypeEnum.DRC, getCombinePCollection("DRC", runFileMap, pipeline, indexMap.get(FileTypeEnum.DRC), "raw"));
}
private void setIndexMap(CdeConstants cdeConstants) {
indexMap.put(FileTypeEnum.DELTA, cdeConstants.cdeIndexMap.get(FileTypeEnum.DELTA));
indexMap.put(FileTypeEnum.TRADE_ATTRIBUTE, cdeConstants.cdeIndexMap.get(FileTypeEnum.TRADE_ATTRIBUTE));
indexMap.put(FileTypeEnum.VEGA, cdeConstants.cdeIndexMap.get(FileTypeEnum.VEGA));
indexMap.put(FileTypeEnum.CURVATURE, cdeConstants.cdeIndexMap.get(FileTypeEnum.CURVATURE));
indexMap.put(FileTypeEnum.DRC, cdeConstants.cdeIndexMap.get(FileTypeEnum.DRC));
}
private PCollection<String> getCombinePCollection(String key, Map<String, List<CsvFile>> runFileMap, Pipeline pipeline, HashMap<String, Integer> indexMap, String type) {
PCollection<String> pResult = null;
if (StringUtils.isEmpty(key) || !runFileMap.containsKey(key)) {
return pResult;
} else {
List<CsvFile> csvFiles = runFileMap.get(key);
List<PCollection<BaseBean>> beans = Lists.newArrayList();
List<PCollection<KV<String, BaseBean>>> kvBeans = Lists.newArrayList();
for (CsvFile csvFile : csvFiles) {
String readPath = "";
if (StringUtils.equals("raw", type)) {
readPath = csvFile.getRawFilePath();
} else if (StringUtils.equals("afterControl", type)) {
readPath = csvFile.getAfterBatchFilePath();
}
PCollection<String> sPCollection = pipeline.apply(TextIO.read().from(readPath))
.apply(Filter.by(s -> !s.startsWith("ASOFDATE")));
beans.add(sPCollection.apply(ParDo.of(new CsvToBaseBeanFn(convert, indexMap))));
}
for (PCollection<BaseBean> pBean : beans) {
kvBeans.add(CombineBaseBeanFn.getKvBean(pBean));
}
KeyedPCollectionTuple<String> of = null;
for (int i = 0; i < kvBeans.size(); i++) {
PCollection<KV<String, BaseBean>> kvBean = kvBeans.get(i);
if (i == 0) {
of = KeyedPCollectionTuple.of("kv" + i, kvBean);
} else {
of = of.and("kv" + i, kvBean);
}
}
PCollection<KV<String, CoGbkResult>> coGbkResultCollection = of.apply(CoGroupByKey.<String>create());
PCollection<BaseBean> mixBean = coGbkResultCollection.apply(ParDo.of(new KVToBaseBeanFn(kvBeans.size())));
pResult = mixBean.apply(ParDo.of(new BaseBeanToCsvFn()));
}
return pResult;
}
@Override
public void beforeRunBeam(BeamParam beamParam) {
this.beamParam = beamParam;
//to create a new cde config
cdeConfig = beamParam.getConsistencyConfig();
cdeConfig.setOutputDir(outputDir);
this.runFileMap = beamParam.getRunFileMap();
cdeName = cdeConfig.getCdeName();
fileType = cdeConfig.getFileType();
cdeId = cdeConfig.getId();
fileTypeEnum = FileTypeEnum.getFileType(fileType);
site = beamParam.getSite();
preAggGroup = beamParam.getPreAggGroup();
notExist = false;
}
@Override
public void runBeam() {
try {
CommonOption option = createOption(CommonOption.class, generateConsistencyCdeConfig(cdeConfig));
initOption(option);
Pipeline pipeline = Pipeline.create(option);
pipeline.getCoderRegistry().registerCoderForClass(CommonBean.class, new CommonBeamCoder());
CdeConstants cdeConstants = option.getCdeConstants();
setIndexMap(cdeConstants);
setAfterControlPCollectionMap(pipeline);
setRawPCollectionMap(pipeline);
cdeIndexMap = indexMap.get(fileTypeEnum);
afterControlPCollection = afterControlPCollectionMap.get(fileTypeEnum);
rawPCollection = rawPCollectionMap.get(fileTypeEnum);
if (afterControlPCollection == null || rawPCollection == null) {
notExist = true;
return;
}
//parse to map,key is order
Map<Integer, List<ParseParam>> orderParseMap = CdeMapUtil.getOrderParseMap(this.beamParam.getParseParamList());
if (orderParseMap.size() > 0) {
afterControlPCollection = filterCsv(afterControlPCollection, orderParseMap, 0, null, cdeIndexMap, convert);
rawPCollection = filterCsv(rawPCollection, orderParseMap, 0, null, cdeIndexMap, convert);
}
PCollection<String> afterBatchTotalCount = afterControlPCollection.apply(Count.globally()).apply(ParDo.of(new CsvCountFn()));
PCollection<CombineBean> afterBatchBean = afterControlPCollection.apply(ParDo.of(new CsvToComplexBeanFn(fileTypeEnum, cdeIndexMap, convert, cdeConfig.getCompareField(), "afterBatch")));
PCollection<CombineBean> rawBean = rawPCollection.apply(ParDo.of(new CsvToComplexBeanFn(fileTypeEnum, cdeIndexMap, convert, cdeConfig.getCompareField(), "raw")));
PCollection<CombineBean> otherBean = CombineBeanFn.getCombineComplexBean(afterBatchBean, rawBean);
PCollection<CombineBean> mixBean = PCollectionList.of(afterBatchBean).and(otherBean).apply(Flatten.<CombineBean>pCollections());
PCollection<KV<String, CombineBean>> mixBeanKv = CombineBeanFn.getComplexBeanKv(mixBean);
PCollection<KV<String, Iterable<CombineBean>>> mixBeanPc = mixBeanKv.apply(GroupByKey.create());
PCollection<CombineBean> filterMixBeanPC = mixBeanPc.apply(ParDo.of(new FilterConsistencyBeanFn()));
PCollection<String> afterFilterCsv = filterMixBeanPC.apply(ParDo.of(new BeanToCsvFn()));
//add trade logical
//as long as it is not a trade attribute, it must be executed
if (!Objects.equals(fileTypeEnum, FileTypeEnum.TRADE_ATTRIBUTE)) {
//to be continue...
PCollection<CombineBean> tradeBean = afterControlPCollectionMap.get(FileTypeEnum.TRADE_ATTRIBUTE)
.apply(Filter.by(s -> !s.startsWith("ASOFDATE")))
.apply(ParDo.of(new TradeCsvToBeanFn(indexMap.get(FileTypeEnum.TRADE_ATTRIBUTE))));
afterFilterCsv = getCombineTradeCsv(afterFilterCsv, tradeBean);
}
//fail record
PCollection<String> failCount = afterFilterCsv.apply(Count.globally()).apply(ParDo.of(new CsvCountFn()));
//output to directory
afterFilterCsv.apply(TextIO.write().to(CsvNameUtil.getOutputCsvName(outputDir, fileTypeEnum, RunBeamType.CONSISTENCY, cdeName))
.withNumShards(1).withSuffix(".csv"));
afterBatchTotalCount.apply(TextIO.write().to(CsvNameUtil.getOutputCsvName(outputDir, fileTypeEnum,
cdeName, CsvCountEnum.TOTAL_COUNT, RunBeamType.CONSISTENCY))
.withNumShards(1).withSuffix(".csv"));
failCount.apply(TextIO.write().to(CsvNameUtil.getOutputCsvName(outputDir, fileTypeEnum,
cdeName, CsvCountEnum.FAIL_COUNT, RunBeamType.CONSISTENCY))
.withNumShards(1).withSuffix(".csv"));
pipeline.run().waitUntilFinish();
} catch (Exception e) {
log.error(e.getLocalizedMessage(), e);
}
}
private PCollection<String> getCombineTradeCsv(PCollection<String> filterCsv, PCollection<CombineBean> tradeBean) {
PCollection<CombineBean> filterBean = filterCsv.apply(ParDo.of(new CombineDeltaCsvToBeanFn(fileTypeEnum, cdeIndexMap, convert)));
PCollection<CombineBean> otherBean = CombineBeanFn.getCombineBean(filterBean, tradeBean);
PCollection<CombineBean> mixBean = PCollectionList.of(filterBean).and(otherBean).apply(Flatten.<CombineBean>pCollections());
PCollection<KV<String, CombineBean>> mixBeanKv = CombineBeanFn.getMixBeanKv(mixBean);
PCollection<KV<String, Iterable<CombineBean>>> mixBeanPc = mixBeanKv.apply(GroupByKey.create());
PCollection<CombineBean> filterMixBeanPC = mixBeanPc.apply(ParDo.of(new FilterMixTradeFn()));
return filterMixBeanPC.apply(ParDo.of(new BeanToCsvFn()));
}
@Override
public void afterRunBeam() {
if (!notExist) {
ArrayList<String> totalCountResult = FileHelper.readCsv(FileHelper.findCsv(outputDir,
fileTypeEnum, cdeName, CsvCountEnum.TOTAL_COUNT, RunBeamType.CONSISTENCY));
ArrayList<String> failCountResult = FileHelper.readCsv(FileHelper.findCsv(outputDir,
fileTypeEnum, cdeName, CsvCountEnum.FAIL_COUNT, RunBeamType.CONSISTENCY));
Long totalCount = Long.parseLong(totalCountResult.get(0));
Long failCount = Long.parseLong(failCountResult.get(0));
if (totalCountResult.size() == 1) beamParam.setTotalCount(totalCount);
if (failCountResult.size() == 1) beamParam.setFailCount(failCount);
if (totalCount != null && failCount != null) beamParam.setPassCount(totalCount - failCount);
ArrayList<String> csv = FileHelper.readCsv(FileHelper.findCsv(outputDir, fileTypeEnum, cdeName, RunBeamType.CONSISTENCY));
beamParam.setCsv(csv);
}
}
@Override
public void setFilterDataInDB() {
CdeConstants cdeConstants = CdeConstants.instance(cdeConfig.getVersion());
String batchNum = cdeExceptionService.getBatchNum(fileTypeEnum);
beamParam.setBatchNum(batchNum);
ArrayList<String> csvs = beamParam.getCsv();
HashMap<String, Integer> indexMap = cdeConstants.cdeIndexMap.get(this.fileTypeEnum);
if (notExist) {
CdeException cdeException = CdeException.builder()
.dqId(cdeConfig.getDqId())
.siteCode(site)
.preAggGroup(preAggGroup)
.tableName("cde_exception")
.dataFieldValue("dataFieldValue")
.logicalOrBusinessNameCde(cdeName)
.sourceSystemName("SUBMIT")
.producer("N")
.errorMessage("csv file is not exist!")
.dimension(RunBeamType.CONSISTENCY.name())
.controlRunTimestamp(String.valueOf(System.currentTimeMillis()))
.cdeExceptionRecordType("PLATO")
.bookIdentifier("bookIdentifier")
.createDate(new Date())
.createBy("SYSTEM")
.generatedDate(DateFormatUtil.getCurrentDate(DateFormatUtil.YYYY_MM_DD))
.fileType(fileType)
.cdeName(cdeName)
.cdeConfigId(cdeId)
.batchNum(batchNum)
.build();
cdeExceptionService.save(cdeException);
} else {
List<CdeException> cdeExceptions = Lists.newArrayList();
csvs.forEach(csv -> {
String[] arr = SplitUtil.split(csv);
String tradeId = arr[indexMap.get("TRADEID")];
String legId = arr[indexMap.get("LEG_ID")];
String dqId = cdeConfig.getDqId();
String recordId = "";
CdeException cdeException = CdeException.builder()
.dqId(dqId)
.siteCode(site)
.preAggGroup(preAggGroup)
.tableName("cde_exception")
.dataFieldValue("dataFieldValue")
.logicalOrBusinessNameCde(cdeName)
.sourceSystemName("SUBMIT")
.producer("N")
.errorMessage(cdeConfig.getErrorMessage())
.dimension(RunBeamType.CONSISTENCY.name())
.controlRunTimestamp(String.valueOf(System.currentTimeMillis()))
.cdeExceptionRecordType("PLATO")
.bookIdentifier("bookIdentifier")
.tradeId(tradeId)
.legId(legId)
.asOfDate(arr[indexMap.get("ASOFDATE")])
.createDate(new Date())
.createBy("SYSTEM")
.generatedDate(DateFormatUtil.getCurrentDate(DateFormatUtil.YYYY_MM_DD))
.fileType(fileType)
.cdeName(cdeName)
.cdeConfigId(cdeId)
.batchNum(batchNum)
.fxDelta(StringUtils.equals("null", arr[indexMap.get("FXDELTA")]) ? null : new Double(arr[indexMap.get("FXDELTA")]))
.cmDelta(StringUtils.equals("null", arr[indexMap.get("CMDELTA")]) ? null : new Double(arr[indexMap.get("CMDELTA")]))
.irDelta(StringUtils.equals("null", arr[indexMap.get("IRDELTA")]) ? null : new Double(arr[indexMap.get("IRDELTA")]))
.eqDelta(StringUtils.equals("null", arr[indexMap.get("EQDELTA")]) ? null : new Double(arr[indexMap.get("EQDELTA")]))
.build();
//logic of record_id
//DQID, Source System,Book, TradeID, LegID,Riskfactor
if (!Objects.equals(FileTypeEnum.TRADE_ATTRIBUTE, fileTypeEnum)) {
legalEntity = arr[indexMap.get("TRADE_LEGAL_ENTITY")];
cdeException.setLegalEntity(legalEntity);
recordId = String.format("%s-%s-%s-%s-%s-%s", dqId, arr[indexMap.get("TRADE_SOURCE_SYSTEM")],
arr[indexMap.get("TRADE_BOOK")], tradeId, legId,
indexMap.get("RISKFACTOR") == null ? "no RiskFactor" : arr[indexMap.get("RISKFACTOR")]);
} else {
legalEntity = arr[indexMap.get("LEGAL_ENTITY")];
cdeException.setLegalEntity(legalEntity);
recordId = String.format("%s-%s-%s-%s-%s-%s", dqId, arr[indexMap.get("SOURCE_SYSTEM")],
arr[indexMap.get("BOOK")], tradeId, legId, "no RiskFactor");
}
cdeException.setRecordId(recordId);
//delta
if (Objects.equals(FileTypeEnum.DELTA, fileTypeEnum)) {
cdeException.setRiskClass(arr[indexMap.get("RISKCLASS")]);
cdeException.setRiskFactor(arr[indexMap.get("RISKFACTOR")]);
cdeException.setSequence(arr[indexMap.get("SEQUENCE")]);
cdeException.setAdjustmentFlag(arr[indexMap.get("ADJUSTMENTFLAG")]);
}
//vega curvature
if (Objects.equals(FileTypeEnum.VEGA, fileTypeEnum) || Objects.equals(FileTypeEnum.CURVATURE, fileTypeEnum)) {
cdeException.setRiskClass(arr[indexMap.get("RISKCLASS")]);
cdeException.setRiskFactor(arr[indexMap.get("RISKFACTOR")]);
cdeException.setAdjustmentFlag(arr[indexMap.get("ADJUSTMENTFLAG")]);
}
//drc
if (Objects.equals(FileTypeEnum.DRC, fileTypeEnum)) {
cdeException.setRiskClass(arr[indexMap.get("RISKCLASS")]);
cdeException.setObligorId(arr[indexMap.get("OBLIGORID")]);
cdeException.setAdjustmentFlag(arr[indexMap.get("ADJUSTMENTFLAG")]);
cdeException.setDrcUnderlyingref(arr[indexMap.get("DRC_UNDERLYINGREF")]);
}
cdeExceptions.add(cdeException);
});
cdeExceptionService.batchSave(cdeExceptions);
}
}
@Override
public RunStatusLog finish(long costTime) {
CdeExceptionSummary summary = null;
if (notExist) {
summary = CdeExceptionSummary.builder()
.siteCode(site)
.dqId(cdeConfig.getDqId())
.recordsFailed(0l)
.recordsPassed(0l)
.recordsTested(0l)
.createDate(new Date())
.createBy("SYSTEM")
.batchNum(beamParam.getBatchNum())
.generatedDate(DateFormatUtil.getCurrentDate(DateFormatUtil.YYYY_MM_DD))
.fileType(fileType)
.cdeName(cdeName)
.cdeConfigId(cdeId)
.runCostTimeCount(TimeUtil.formatMinSec(costTime))
.dimension(RunBeamType.CONSISTENCY.name())
.tableName("cde_exception_summary")
.build();
}else{
summary = CdeExceptionSummary.builder()
.siteCode(site)
.dqId(cdeConfig.getDqId())
.recordsFailed(beamParam.getFailCount())
.recordsPassed(beamParam.getPassCount())
.recordsTested(beamParam.getTotalCount())
.createDate(new Date())
.createBy("SYSTEM")
.batchNum(beamParam.getBatchNum())
.generatedDate(DateFormatUtil.getCurrentDate(DateFormatUtil.YYYY_MM_DD))
.fileType(fileType)
.cdeName(cdeName)
.cdeConfigId(cdeId)
.runCostTimeCount(TimeUtil.formatMinSec(costTime))
.dimension(RunBeamType.CONSISTENCY.name())
.legalEntity(legalEntity)
.tableName("cde_exception_summary")
.build();
}
cdeExceptionSummaryService.save(summary);
RunStatusLog log = new RunStatusLog();
BeanUtils.copyProperties(summary, log);
return log;
}
}
package com.hsbc.risk.frtbsa.service.cde;
import com.hsbc.risk.frtbsa.base.GenericBeamCheckService;
import com.hsbc.risk.frtbsa.parse.dto.ParseParam;
import java.util.List;
import java.util.Map;
public interface CompletenessBeamService extends GenericBeamCheckService {
}
package com.hsbc.risk.frtbsa.service.cde;
import com.hsbc.risk.frtbsa.base.GenericBeamCheckService;
public interface ConsistencyBeamService extends GenericBeamCheckService {
}
package com.hsbc.risk.frtbsa.service.entity.impl;
import com.hsbc.risk.frtbsa.base.AbstractGenericService;
import com.hsbc.risk.frtbsa.dao.CdeCompletenessConfigDao;
import com.hsbc.risk.frtbsa.entity.CdeCompletenessConfig;
import com.hsbc.risk.frtbsa.service.entity.CdeCompletenessConfigService;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class CdeCompletenessConfigServiceImpl extends AbstractGenericService<CdeCompletenessConfig, CdeCompletenessConfigDao>
implements CdeCompletenessConfigService {
@Override
public List<CdeCompletenessConfig> getActiveCdes() {
return this.repository.getActiveCdes();
}
}
package com.hsbc.risk.frtbsa.service.entity.impl;
import com.hsbc.risk.frtbsa.base.AbstractGenericService;
import com.hsbc.risk.frtbsa.dao.CdeCompletenessRuleDao;
import com.hsbc.risk.frtbsa.entity.CdeCompletenessRule;
import com.hsbc.risk.frtbsa.service.entity.CdeCompletenessRuleService;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class CdeCompletenessRuleServiceImpl extends AbstractGenericService<CdeCompletenessRule, CdeCompletenessRuleDao>
implements CdeCompletenessRuleService {
@Override
public List<CdeCompletenessRule> findRulesByCdeConfigId(Long cdeConfigId) {
return this.repository.findRulesByCdeConfigId(cdeConfigId);
}
@Override
public List<CdeCompletenessRule> findRulesByCdeConfigIdAndCombineFileType(Long cdeConfigId, String combineFileType) {
return this.repository.findRulesByCdeConfigIdAndCombineFileType(cdeConfigId, combineFileType);
}
@Override
public List<String> findDistinctCombineFileTypeByCdeConfigId(Long cdeConfigId) {
return this.repository.findDistinctCombineFileTypeByCdeConfigId(cdeConfigId);
}
}
package com.hsbc.risk.frtbsa.service.entity.impl;
import com.hsbc.risk.frtbsa.base.AbstractGenericService;
import com.hsbc.risk.frtbsa.dao.CdeConsistencyConfigDao;
import com.hsbc.risk.frtbsa.entity.CdeConsistencyConfig;
import com.hsbc.risk.frtbsa.service.entity.CdeConsistencyConfigService;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class CdeConsistencyConfigServiceImpl extends AbstractGenericService<CdeConsistencyConfig, CdeConsistencyConfigDao>
implements CdeConsistencyConfigService {
@Override
public List<CdeConsistencyConfig> getActiveCdes() {
return this.repository.getActiveCdes();
}
}
package com.hsbc.risk.frtbsa.service.entity.impl;
import com.hsbc.risk.frtbsa.base.AbstractGenericService;
import com.hsbc.risk.frtbsa.dao.CdeConsistencyRuleDao;
import com.hsbc.risk.frtbsa.entity.CdeConsistencyRule;
import com.hsbc.risk.frtbsa.service.entity.CdeConsistencyRuleService;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class CdeConsistencyRuleServiceImpl extends AbstractGenericService<CdeConsistencyRule, CdeConsistencyRuleDao>
implements CdeConsistencyRuleService {
@Override
public List<CdeConsistencyRule> findRulesByCdeConfigId(Long cdeConfigId) {
return this.repository.findRulesByCdeConfigId(cdeConfigId);
}
}
package com.hsbc.risk.frtbsa.service.entity.impl;
import com.hsbc.risk.frtbsa.base.AbstractGenericService;
import com.hsbc.risk.frtbsa.dao.CdeExceptionDao;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import com.hsbc.risk.frtbsa.entity.CdeException;
import com.hsbc.risk.frtbsa.service.entity.CdeExceptionService;
import com.hsbc.risk.frtbsa.utils.DateFormatUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.persistence.EntityManager;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
@Service
@Slf4j
public class CdeExceptionServiceImpl extends AbstractGenericService<CdeException, CdeExceptionDao>
implements CdeExceptionService {
@Override
public String getBatchNum(FileTypeEnum fileTypeEnum) {
String currentDate = DateFormatUtil.getCurrentDate(DateFormatUtil.YYYYMMDD);
return currentDate + "_region_" + fileTypeEnum.getRiskMeasure() + "_"
+ UUID.randomUUID().toString().toLowerCase(Locale.ROOT);
}
@Override
@Modifying
@Transactional
public void batchSave(List<CdeException> entities) {
log.info("CdeException batch insert begin!");
if (CollectionUtils.isEmpty(entities)) {
return;
}
EntityManager em = this.repository.getEntityManager();
try {
for (CdeException entity : entities) {
em.persist(entity);
}
em.flush();
em.clear();
log.info("CdeException batch insert successful,total records:{}", entities.size());
} catch (Exception e) {
e.printStackTrace();
log.error("CdeException batch insert fail!");
}
}
@Override
public List<CdeException> queryByBatchNum(String batchNum) {
return this.repository.queryByBatchNum(batchNum);
}
@Override
public List<CdeException> queryByCdeConfigIdAndFileTypeAndGeneratedDate(Long cdeName, String fileType, String generatedDate) {
return this.repository.queryByCdeConfigIdAndFileTypeAndGeneratedDate(cdeName, fileType, generatedDate);
}
}
package com.hsbc.risk.frtbsa.service.entity.impl;
import com.hsbc.risk.frtbsa.base.AbstractGenericService;
import com.hsbc.risk.frtbsa.dao.CdeExceptionSummaryDao;
import com.hsbc.risk.frtbsa.entity.CdeExceptionSummary;
import com.hsbc.risk.frtbsa.service.entity.CdeExceptionSummaryService;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class CdeExceptionSummaryServiceImpl extends AbstractGenericService<CdeExceptionSummary, CdeExceptionSummaryDao>
implements CdeExceptionSummaryService {
@Override
public CdeExceptionSummary queryByBatchNum(String batchNum) {
return this.repository.queryByBatchNum(batchNum);
}
@Override
public List<String> queryGeneratedDateList(Long cdeConfigId) {
return this.repository.queryGeneratedDateList(cdeConfigId);
}
@Override
public List<CdeExceptionSummary> queryByCdeConfigIdAndFileTypeAndGeneratedDate(Long cdeName, String fileType, String generatedDate) {
return this.repository.queryByCdeConfigIdAndFileTypeAndGeneratedDate(cdeName, fileType, generatedDate);
}
}
package com.hsbc.risk.frtbsa.service.entity;
import com.hsbc.risk.frtbsa.base.GenericService;
import com.hsbc.risk.frtbsa.dao.CdeCompletenessConfigDao;
import com.hsbc.risk.frtbsa.entity.CdeCompletenessConfig;
import java.util.List;
public interface CdeCompletenessConfigService extends GenericService<CdeCompletenessConfig, CdeCompletenessConfigDao> {
List<CdeCompletenessConfig> getActiveCdes();
}
package com.hsbc.risk.frtbsa.service.entity;
import com.hsbc.risk.frtbsa.base.GenericService;
import com.hsbc.risk.frtbsa.dao.CdeCompletenessRuleDao;
import com.hsbc.risk.frtbsa.entity.CdeCompletenessRule;
import java.util.List;
public interface CdeCompletenessRuleService extends GenericService<CdeCompletenessRule, CdeCompletenessRuleDao> {
List<CdeCompletenessRule> findRulesByCdeConfigId(Long cdeConfigId);
List<CdeCompletenessRule> findRulesByCdeConfigIdAndCombineFileType(Long cdeConfigId, String combineFileType);
List<String> findDistinctCombineFileTypeByCdeConfigId(Long cdeConfigId);
}
package com.hsbc.risk.frtbsa.service.entity;
import com.hsbc.risk.frtbsa.base.GenericService;
import com.hsbc.risk.frtbsa.dao.CdeConsistencyConfigDao;
import com.hsbc.risk.frtbsa.entity.CdeConsistencyConfig;
import java.util.List;
public interface CdeConsistencyConfigService extends GenericService<CdeConsistencyConfig, CdeConsistencyConfigDao> {
List<CdeConsistencyConfig> getActiveCdes();
}
package com.hsbc.risk.frtbsa.service.entity;
import com.hsbc.risk.frtbsa.base.GenericService;
import com.hsbc.risk.frtbsa.dao.CdeConsistencyRuleDao;
import com.hsbc.risk.frtbsa.entity.CdeConsistencyRule;
import java.util.List;
public interface CdeConsistencyRuleService extends GenericService<CdeConsistencyRule, CdeConsistencyRuleDao> {
List<CdeConsistencyRule> findRulesByCdeConfigId(Long cdeConfigId);
}
package com.hsbc.risk.frtbsa.service.entity;
import com.hsbc.risk.frtbsa.base.GenericService;
import com.hsbc.risk.frtbsa.dao.CdeExceptionDao;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import com.hsbc.risk.frtbsa.entity.CdeException;
import java.util.List;
public interface CdeExceptionService extends GenericService<CdeException, CdeExceptionDao> {
String getBatchNum(FileTypeEnum fileTypeEnum);
void batchSave(List<CdeException> entities);
List<CdeException> queryByBatchNum(String batchNum);
List<CdeException> queryByCdeConfigIdAndFileTypeAndGeneratedDate(Long cdeName, String fileType, String generatedDate);
}
package com.hsbc.risk.frtbsa.service.entity;
import com.hsbc.risk.frtbsa.base.GenericService;
import com.hsbc.risk.frtbsa.dao.CdeExceptionSummaryDao;
import com.hsbc.risk.frtbsa.entity.CdeExceptionSummary;
import java.util.List;
public interface CdeExceptionSummaryService extends GenericService<CdeExceptionSummary, CdeExceptionSummaryDao> {
CdeExceptionSummary queryByBatchNum(String batchNum);
List<String> queryGeneratedDateList(Long cdeConfigId);
List<CdeExceptionSummary> queryByCdeConfigIdAndFileTypeAndGeneratedDate(Long cdeName, String fileType, String generatedDate);
}
这篇关于service的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!