本文主要是介绍transform,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
package com.hsbc.risk.frtbsa.transform;
import com.hsbc.risk.frtbsa.domain.bean.CombineBean;
import org.apache.beam.sdk.transforms.DoFn;
public class BeanToCsvFn extends DoFn<CombineBean, String> {
@ProcessElement
public void processElement(ProcessContext context) {
CombineBean bean = context.element();
if (bean == null) {
return;
}
Object[] objects = bean.getObjects();
StringBuffer sb = new StringBuffer();
for (int i = 0; i < objects.length - 7; i++) {
if (i == 0) {
sb.append(objects[i]);
} else {
sb.append(",").append(objects[i]);
}
}
sb.append(",").append(bean.getFxDelta());
sb.append(",").append(bean.getCmDelta());
sb.append(",").append(bean.getIrDelta());
sb.append(",").append(bean.getEqDelta());
sb.append(",").append(bean.getBook());
sb.append(",").append(bean.getLegalEntity());
sb.append(",").append(bean.getSourceSystem());
context.output(sb.toString());
}
}
package com.hsbc.risk.frtbsa.transform;
import com.hsbc.risk.frtbsa.domain.bean.CombineBean;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import com.hsbc.risk.frtbsa.utils.MathUtil;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import java.util.List;
import java.util.Objects;
public class CombineBeanFn {
public static Double calDouble(String[] valueArr) {
Double result = new Double(0);
for (String value : valueArr) {
result = MathUtil.add(result, value);
}
return result;
}
public static CombineBean analysisBean(CombineBean resultBean, List<CombineBean> deltaBeans) {
Double fxDelta = null, cmDelta = null, irDelta = null, eqDelta = null;
for (CombineBean deltaBean : deltaBeans) {
String deltaValueStr = deltaBean.getDeltaSensitivities();
if (deltaValueStr.contains(";")) {
Double rDouble = calDouble(deltaValueStr.split(";"));
switch (deltaBean.getRiskClass().toUpperCase()) {
case "GIRR":
irDelta = MathUtil.add(irDelta, rDouble);
break;
case "EQUITY":
eqDelta = MathUtil.add(eqDelta, rDouble);
break;
case "FX":
fxDelta = MathUtil.add(fxDelta, rDouble);
break;
case "COMMODITY":
cmDelta = MathUtil.add(cmDelta, rDouble);
break;
}
} else {
switch (deltaBean.getRiskClass().toUpperCase()) {
case "GIRR":
irDelta = MathUtil.add(irDelta, deltaValueStr);
break;
case "EQUITY":
eqDelta = MathUtil.add(eqDelta, deltaValueStr);
break;
case "FX":
fxDelta = MathUtil.add(fxDelta, deltaValueStr);
break;
case "COMMODITY":
cmDelta = MathUtil.add(cmDelta, deltaValueStr);
break;
}
}
}
resultBean.setCmDelta(cmDelta);
resultBean.setEqDelta(eqDelta);
resultBean.setFxDelta(fxDelta);
resultBean.setIrDelta(irDelta);
return resultBean;
}
public static PCollection<KV<String, CombineBean>> getMixBeanKv(PCollection<CombineBean> mixBean) {
return mixBean.apply(MapElements.via(new SimpleFunction<CombineBean, KV<String, CombineBean>>() {
@Override
public KV<String, CombineBean> apply(CombineBean bean) {
return KV.of(bean.getTradeId(), bean);
}
}));
}
public static PCollection<KV<String, CombineBean>> getComplexBeanKv(PCollection<CombineBean> bean) {
return bean.apply(MapElements.via(new SimpleFunction<CombineBean, KV<String, CombineBean>>() {
@Override
public KV<String, CombineBean> apply(CombineBean bean) {
//Curvature Vega
if (Objects.equals(FileTypeEnum.CURVATURE, bean.getFileType()) || Objects.equals(FileTypeEnum.VEGA, bean.getFileType())) {
return KV.of(bean.getAsOfDate() + bean.getTradeId() + bean.getRiskClass() + bean.getRiskFactor() + bean.getLegId() + bean.getAdjustmentFlag(), bean);
}
//Delta
if (Objects.equals(FileTypeEnum.DELTA, bean.getFileType())) {
return KV.of(bean.getAsOfDate() + bean.getTradeId() + bean.getRiskClass() + bean.getRiskFactor() + bean.getLegId() + bean.getAdjustmentFlag() + bean.getSequence(), bean);
}
//DRC
if (Objects.equals(FileTypeEnum.DRC, bean.getFileType())) {
return KV.of(bean.getAsOfDate() + bean.getTradeId() + bean.getRiskClass() + bean.getLegId() + bean.getAdjustmentFlag() + bean.getObligorId() + bean.getDrcUnderlyingref(), bean);
}
//trade
return KV.of(bean.getAsOfDate() + bean.getTradeId() + bean.getLegId(), bean);
}
}));
}
public static PCollection<CombineBean> getCombineMixedBean(PCollection<KV<String, CombineBean>> targetBeanKv, PCollection<KV<String, CombineBean>> sourceBeanKv) {
final PCollection<KV<String, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple.of("targetBeanKv", targetBeanKv).and("sourceBeanKv", sourceBeanKv).apply(CoGroupByKey.<String>create());
PCollection<CombineBean> leftBean = coGbkResultCollection.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, CombineBean>() {
@ProcessElement
public void processElement(ProcessContext context) {
KV<String, CoGbkResult> kv = context.element();
CoGbkResult cgr = kv.getValue();
Iterable<CombineBean> targetItr = cgr.getAll("targetBeanKv");
Iterable<CombineBean> sourceItr = cgr.getAll("sourceBeanKv");
if (targetItr.iterator().hasNext()) {
if (sourceItr.iterator().hasNext()) {
for (CombineBean sourceBean : sourceItr) {
context.output(sourceBean);
}
}
}
}
}));
return leftBean;
}
public static PCollection<CombineBean> getCombineComplexBean(PCollection<CombineBean> targetBean, PCollection<CombineBean> sourceBean) {
PCollection<KV<String, CombineBean>> targetBeanKv = getComplexBeanKv(targetBean);
PCollection<KV<String, CombineBean>> sourceBeanKv = getComplexBeanKv(sourceBean);
return getCombineMixedBean(targetBeanKv, sourceBeanKv);
}
public static PCollection<CombineBean> getCombineBean(PCollection<CombineBean> filterBean, PCollection<CombineBean> deltaBean) {
PCollection<KV<String, CombineBean>> filterBeanKv = getMixBeanKv(filterBean);
PCollection<KV<String, CombineBean>> deltaBeanKv = getMixBeanKv(deltaBean);
return getCombineMixedBean(filterBeanKv, deltaBeanKv);
}
}
package com.hsbc.risk.frtbsa.transform;
import com.hsbc.risk.frtbsa.domain.bean.CombineBean;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import com.hsbc.risk.frtbsa.utils.LineConvert;
import com.hsbc.risk.frtbsa.utils.SplitUtil;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
public class CombineDeltaCsvToBeanFn extends DoFn<String, CombineBean> {
private FileTypeEnum fileTypeEnum;
private HashMap<String, Integer> cdeIndexMap;
private LineConvert convert;
public CombineDeltaCsvToBeanFn(FileTypeEnum fileTypeEnum, HashMap<String, Integer> cdeIndexMap, LineConvert convert) {
this.fileTypeEnum = fileTypeEnum;
this.cdeIndexMap = cdeIndexMap;
this.convert = convert;
}
@ProcessElement
public void processElement(ProcessContext context) {
String[] arr = SplitUtil.split(context.element());
CombineBean combineBeam = CombineBean.builder()
.fileType(fileTypeEnum)
.tradeId(arr[cdeIndexMap.get("TRADEID")])
.build();
Object[] objects = convert.convertLine2Arr(context.element(), cdeIndexMap.size());
combineBeam.setObjects(objects);
context.output(combineBeam);
}
}
package com.hsbc.risk.frtbsa.transform;
import com.hsbc.risk.frtbsa.domain.bean.CombineBean;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import com.hsbc.risk.frtbsa.utils.LineConvert;
import com.hsbc.risk.frtbsa.utils.SplitUtil;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
public class CombineTradeCsvToBeanFn extends DoFn<String, CombineBean> {
private FileTypeEnum fileTypeEnum;
private HashMap<String, Integer> cdeIndexMap;
private LineConvert convert;
public CombineTradeCsvToBeanFn(FileTypeEnum fileTypeEnum, HashMap<String, Integer> cdeIndexMap, LineConvert convert) {
this.fileTypeEnum = fileTypeEnum;
this.cdeIndexMap = cdeIndexMap;
this.convert = convert;
}
@ProcessElement
public void processElement(ProcessContext context) {
String[] arr = SplitUtil.split(context.element());
CombineBean combineBeam = CombineBean.builder()
.fileType(fileTypeEnum)
.tradeId(arr[cdeIndexMap.get("TRADEID")])
.fxDelta(getDeltaValue(arr[cdeIndexMap.get("FXDELTA")]))
.eqDelta(getDeltaValue(arr[cdeIndexMap.get("EQDELTA")]))
.cmDelta(getDeltaValue(arr[cdeIndexMap.get("CMDELTA")]))
.irDelta(getDeltaValue(arr[cdeIndexMap.get("IRDELTA")]))
.build();
Object[] objects = convert.convertLine2Arr(context.element(), cdeIndexMap.size());
combineBeam.setObjects(objects);
context.output(combineBeam);
}
private Double getDeltaValue(String value) {
return StringUtils.equals("null", value) ? 0 : new Double(value);
}
}
package com.hsbc.risk.frtbsa.transform;
import org.apache.beam.sdk.transforms.DoFn;
public class CsvCountFn extends DoFn<Long,String> {
@ProcessElement
public void processElement(ProcessContext context) {
context.output(String.valueOf(context.element()));
}
}
package com.hsbc.risk.frtbsa.transform;
import com.hsbc.risk.frtbsa.domain.bean.CombineBean;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import com.hsbc.risk.frtbsa.utils.LineConvert;
import com.hsbc.risk.frtbsa.utils.SplitUtil;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.Objects;
public class CsvToComplexBeanFn extends DoFn<String, CombineBean> {
private FileTypeEnum fileTypeEnum;
private HashMap<String, Integer> cdeIndexMap;
private LineConvert convert;
private String compareField;
private String compareFieldValue;
private String consistencyType;
public CsvToComplexBeanFn(FileTypeEnum fileTypeEnum, HashMap<String, Integer> cdeIndexMap, LineConvert convert) {
this.fileTypeEnum = fileTypeEnum;
this.cdeIndexMap = cdeIndexMap;
this.convert = convert;
}
public CsvToComplexBeanFn(FileTypeEnum fileTypeEnum, HashMap<String, Integer> cdeIndexMap, LineConvert convert, String compareField,String consistencyType) {
this.fileTypeEnum = fileTypeEnum;
this.cdeIndexMap = cdeIndexMap;
this.convert = convert;
this.compareField = compareField;
this.consistencyType = consistencyType;
}
@ProcessElement
public void processElement(ProcessContext context) {
String[] arr = SplitUtil.split(context.element());
CombineBean combineBeam = CombineBean.builder()
.fileType(fileTypeEnum)
.asOfDate(arr[cdeIndexMap.get("ASOFDATE")])
.tradeId(arr[cdeIndexMap.get("TRADEID")])
.legId(getIfEmpty(arr[cdeIndexMap.get("LEG_ID")]))
.build();
//Curvature Vega
if (Objects.equals(FileTypeEnum.CURVATURE, fileTypeEnum) || Objects.equals(FileTypeEnum.VEGA, fileTypeEnum)) {
combineBeam.setAdjustmentFlag(getIfEmpty(arr[cdeIndexMap.get("ADJUSTMENTFLAG")]));
combineBeam.setRiskClass(arr[cdeIndexMap.get("RISKCLASS")]);
combineBeam.setRiskFactor(arr[cdeIndexMap.get("RISKFACTOR")]);
}
//Delta
if (Objects.equals(FileTypeEnum.DELTA, fileTypeEnum)) {
combineBeam.setRiskClass(arr[cdeIndexMap.get("RISKCLASS")]);
combineBeam.setRiskFactor(arr[cdeIndexMap.get("RISKFACTOR")]);
combineBeam.setSequence(getIfEmpty(arr[cdeIndexMap.get("SEQUENCE")]));
combineBeam.setAdjustmentFlag(getIfEmpty(arr[cdeIndexMap.get("ADJUSTMENTFLAG")]));
}
//DRC
if (Objects.equals(FileTypeEnum.DRC, fileTypeEnum)) {
combineBeam.setRiskClass(arr[cdeIndexMap.get("RISKCLASS")]);
combineBeam.setAdjustmentFlag(getIfEmpty(arr[cdeIndexMap.get("ADJUSTMENTFLAG")]));
combineBeam.setObligorId(getIfEmpty(arr[cdeIndexMap.get("OBLIGORID")]));
combineBeam.setDrcUnderlyingref(getIfEmpty(arr[cdeIndexMap.get("DRC_UNDERLYINGREF")]));
}
Object[] objects = convert.convertLine2Arr(context.element(), cdeIndexMap.size());
combineBeam.setObjects(objects);
//consistency need this field
if (StringUtils.isNotEmpty(this.compareField)) {
this.compareFieldValue = arr[cdeIndexMap.get(this.compareField)];
combineBeam.setCompareFieldValue(this.compareFieldValue);
combineBeam.setConsistencyType(this.consistencyType);
}
context.output(combineBeam);
}
private String getIfEmpty(String s) {
return StringUtils.isEmpty(s) ? "" : s;
}
}
package com.hsbc.risk.frtbsa.transform;
import com.hsbc.risk.frtbsa.domain.bean.CombineBean;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import com.hsbc.risk.frtbsa.utils.SplitUtil;
import org.apache.beam.sdk.transforms.DoFn;
import java.util.HashMap;
public class DeltaCsvToBeanFn extends DoFn<String, CombineBean> {
private HashMap<String, Integer> deltaIndexMap;
public DeltaCsvToBeanFn(HashMap<String, Integer> deltaIndexMap){
this.deltaIndexMap = deltaIndexMap;
}
@ProcessElement
public void processElement(ProcessContext context) {
String[] arr = SplitUtil.split(context.element());
CombineBean combineBeam = CombineBean.builder()
.fileType(FileTypeEnum.DELTA)
.tradeId(arr[deltaIndexMap.get("TRADEID")])
.deltaSensitivities(arr[deltaIndexMap.get("DELTASENSITIVITIES")])
.deltaSensitivityDates(arr[deltaIndexMap.get("SENSITIVITYDATES")])
.riskClass(arr[deltaIndexMap.get("RISKCLASS")])
.build();
context.output(combineBeam);
}
}
package com.hsbc.risk.frtbsa.transform;
import com.google.common.collect.Lists;
import com.hsbc.risk.frtbsa.domain.bean.CombineBean;
import com.hsbc.risk.frtbsa.utils.SplitUtil;
import org.apache.beam.sdk.transforms.DoFn;
import java.util.HashMap;
public class DeltaCsvToCsvFn extends DoFn<String, String> {
private HashMap<String, Integer> deltaIndexMap;
public DeltaCsvToCsvFn(HashMap<String, Integer> deltaIndexMap){
this.deltaIndexMap = deltaIndexMap;
}
@ProcessElement
public void processElement(ProcessContext context) {
String[] arr = SplitUtil.split(context.element());
String deltaValueStr = arr[deltaIndexMap.get("DELTASENSITIVITIES")];
CombineBean deltaBean = CombineBean.builder()
.deltaSensitivities(deltaValueStr)
.riskClass(arr[deltaIndexMap.get("RISKCLASS")])
.build();
CombineBean analysisBean = CombineBeanFn.analysisBean(new CombineBean(), Lists.newArrayList(deltaBean));
StringBuffer sb = new StringBuffer(context.element());
sb.append(",").append(analysisBean.getFxDelta());
sb.append(",").append(analysisBean.getCmDelta());
sb.append(",").append(analysisBean.getIrDelta());
sb.append(",").append(analysisBean.getEqDelta());
context.output(sb.toString());
}
}
package com.hsbc.risk.frtbsa.transform;
import com.google.common.collect.Lists;
import com.hsbc.risk.frtbsa.domain.bean.CombineBean;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
@Slf4j
public class FilterConsistencyBeanFn extends DoFn<KV<String, Iterable<CombineBean>>, CombineBean> {
@ProcessElement
public void processElement(ProcessContext context) {
Iterable<CombineBean> valueItr = context.element().getValue();
ArrayList<CombineBean> beans = Lists.newArrayList(valueItr);
if (beans.size() == 1) {
//can not find match
context.output(beans.get(0));
} else {
CombineBean afterBean = null;
CombineBean rawBean = null;
for (CombineBean bean : beans) {
if (StringUtils.equals("afterBatch", bean.getConsistencyType())) {
afterBean = bean;
} else if (StringUtils.equals("raw", bean.getConsistencyType())) {
rawBean = bean;
}
}
if (StringUtils.equals(afterBean.getCompareFieldValue(), rawBean.getCompareFieldValue())) {
context.output(null);
} else {
context.output(afterBean);
}
}
}
}
package com.hsbc.risk.frtbsa.transform;
import com.google.common.collect.Lists;
import com.hsbc.risk.frtbsa.domain.bean.CombineBean;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import java.util.ArrayList;
import java.util.Objects;
public class FilterMixComplexBeanFn extends DoFn<KV<String, Iterable<CombineBean>>, CombineBean> {
private FileTypeEnum targetFileType;
public FilterMixComplexBeanFn(FileTypeEnum targetFileType) {
this.targetFileType = targetFileType;
}
@ProcessElement
public void processElement(ProcessContext context) {
Iterable<CombineBean> valueItr = context.element().getValue();
ArrayList<CombineBean> beans = Lists.newArrayList(valueItr);
if (beans.size() == 1) {
context.output(null);
} else {
for (CombineBean bean : beans) {
FileTypeEnum fileType = bean.getFileType();
if (Objects.equals(targetFileType, fileType)) {
context.output(bean);
}
}
}
}
}
package com.hsbc.risk.frtbsa.transform;
import com.google.common.collect.Lists;
import com.hsbc.risk.frtbsa.domain.bean.CombineBean;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class FilterMixDeltaFn extends DoFn<KV<String, Iterable<CombineBean>>, CombineBean> {
@ProcessElement
public void processElement(ProcessContext context) {
Iterable<CombineBean> valueItr = context.element().getValue();
ArrayList<CombineBean> beans = Lists.newArrayList(valueItr);
if (beans.size() == 1) {
context.output(beans.get(0));
} else {
List<CombineBean> resultBeans = Lists.newArrayList();
List<CombineBean> deltaBeans = Lists.newArrayList();
for (CombineBean bean : beans) {
FileTypeEnum fileType = bean.getFileType();
if (Objects.equals(FileTypeEnum.DELTA, fileType)) {
deltaBeans.add(bean);
} else {
resultBeans.add(bean);
}
}
for (CombineBean resultBean : resultBeans) {
context.output(CombineBeanFn.analysisBean(resultBean, deltaBeans));
}
}
}
}
package com.hsbc.risk.frtbsa.transform;
import com.google.common.collect.Lists;
import com.hsbc.risk.frtbsa.domain.bean.CombineBean;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@Slf4j
public class FilterMixTradeFn extends DoFn<KV<String, Iterable<CombineBean>>, CombineBean> {
@ProcessElement
public void processElement(ProcessContext context) {
Iterable<CombineBean> valueItr = context.element().getValue();
ArrayList<CombineBean> beans = Lists.newArrayList(valueItr);
if (beans.size() == 1) {
context.output(beans.get(0));
} else {
List<CombineBean> otherBeans = Lists.newArrayList();
CombineBean tradeBean = new CombineBean();
//because the trade id will not be repeated in the trade file,
//there will only be one associated trade
for (CombineBean bean : beans) {
FileTypeEnum fileType = bean.getFileType();
if (Objects.equals(FileTypeEnum.TRADE_ATTRIBUTE, fileType)) {
tradeBean = bean;
} else {
otherBeans.add(bean);
}
}
for (CombineBean other : otherBeans) {
other.setBook(tradeBean.getBook());
other.setLegalEntity(tradeBean.getLegalEntity());
other.setSourceSystem(tradeBean.getSourceSystem());
context.output(other);
}
}
}
}
package com.hsbc.risk.frtbsa.transform;
import com.hsbc.risk.frtbsa.domain.bean.CombineBean;
import com.hsbc.risk.frtbsa.domain.enumType.FileTypeEnum;
import com.hsbc.risk.frtbsa.utils.SplitUtil;
import org.apache.beam.sdk.transforms.DoFn;
import java.util.HashMap;
public class TradeCsvToBeanFn extends DoFn<String, CombineBean> {
private HashMap<String, Integer> tradeIndexMap;
public TradeCsvToBeanFn(HashMap<String, Integer> tradeIndexMap){
this.tradeIndexMap = tradeIndexMap;
}
@ProcessElement
public void processElement(ProcessContext context) {
String[] arr = SplitUtil.split(context.element());
CombineBean combineBeam = CombineBean.builder()
.fileType(FileTypeEnum.TRADE_ATTRIBUTE)
.tradeId(arr[tradeIndexMap.get("TRADEID")])
.book(arr[tradeIndexMap.get("BOOK")])
.legalEntity(arr[tradeIndexMap.get("LEGAL_ENTITY")])
.sourceSystem(arr[tradeIndexMap.get("SOURCE_SYSTEM")])
.build();
context.output(combineBeam);
}
}
这篇关于transform的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!