本文主要是介绍parse,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
package com.hsbc.risk.frtbsa.parse.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.beam.sdk.values.PCollection;
import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class ParseParam implements Serializable {
private String field;
private String operate;
private String expectedValue;
private String expectedFrom;
private String expectedValueType;
private int parseOrder;
private String combineFilePath;
private String combineParseOrder;
private String combineFileType;
}
package com.hsbc.risk.frtbsa.parse.enumType;
import org.apache.commons.lang3.StringUtils;
import java.util.Objects;
public enum OperateEnum {
NOT_EQUAL("<>"),
EQUAL("="),
IS_NUMERIC("is_numeric"),
IS_NOT_NUMERIC("is_not_numeric"),
LENGTH_EQUAL("length_equal"),
LENGTH_NOT_EQUAL("length_not_equal"),
IN_RANGE("in_range"),
DATE_GREATER_THAN("date_greater_than"),
DATE_LESS_THAN("date_less_than")
;
private String operate;
OperateEnum(String operate) {
this.operate = operate;
}
public String getOperate() {
return operate;
}
public static OperateEnum getOperateTypeByEnumName(String enumName){
if (StringUtils.isEmpty(enumName)) {
return null;
}
for (OperateEnum f : values()) {
if (Objects.equals(f.name(), enumName)) {
return f;
}
}
return null;
}
public static OperateEnum getOperateType(String operate) {
if (StringUtils.isEmpty(operate)) {
return null;
}
for (OperateEnum f : values()) {
if (Objects.equals(f.operate, operate)) {
return f;
}
}
return null;
}
}
package com.hsbc.risk.frtbsa.parse.enumType;
public enum ValueFromEnum {
CSV_FIELD,
DB_FIELD;
}
package com.hsbc.risk.frtbsa.parse.enumType;
public enum ValueTypeEnum {
STRING;
}
package com.hsbc.risk.frtbsa.parse.parser;
import com.hsbc.risk.frtbsa.parse.dto.ParseParam;
import com.hsbc.risk.frtbsa.parse.enumType.ValueFromEnum;
import com.hsbc.risk.frtbsa.parse.enumType.ValueTypeEnum;
import com.hsbc.risk.frtbsa.utils.DateFormatUtil;
import com.hsbc.risk.frtbsa.utils.SplitUtil;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
public class DateGreaterThanParser implements IParser<String> {
@Override
public PCollection<String> filterCsv(PCollection<String> csv, ParseParam param, HashMap<String, Integer> indexMap) {
PCollection<String> filterCsv = null;
if (ValueTypeEnum.STRING.name().equals(param.getExpectedValueType())) {
if (ValueFromEnum.DB_FIELD.name().equals(param.getExpectedFrom())) {
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
return StringUtils.isNotEmpty(arr[indexMap.get(param.getField())])
&& DateFormatUtil.dateGreaterThan(arr[indexMap.get(param.getField())], param.getExpectedValue(), DateFormatUtil.YMD_BAR);
}));
}
if (ValueFromEnum.CSV_FIELD.name().equals(param.getExpectedFrom())) {
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
return StringUtils.isNotEmpty(arr[indexMap.get(param.getField())])
&& DateFormatUtil.dateGreaterThan(arr[indexMap.get(param.getField())], arr[indexMap.get(param.getExpectedValue())], DateFormatUtil.YMD_BAR);
}));
}
}
return filterCsv;
}
}
package com.hsbc.risk.frtbsa.parse.parser;
import com.hsbc.risk.frtbsa.parse.dto.ParseParam;
import com.hsbc.risk.frtbsa.parse.enumType.ValueFromEnum;
import com.hsbc.risk.frtbsa.parse.enumType.ValueTypeEnum;
import com.hsbc.risk.frtbsa.utils.DateFormatUtil;
import com.hsbc.risk.frtbsa.utils.SplitUtil;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
public class DateLessThanParser implements IParser<String> {
@Override
public PCollection<String> filterCsv(PCollection<String> csv, ParseParam param, HashMap<String, Integer> indexMap) {
PCollection<String> filterCsv = null;
if (ValueTypeEnum.STRING.name().equals(param.getExpectedValueType())) {
if (ValueFromEnum.DB_FIELD.name().equals(param.getExpectedFrom())) {
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
return StringUtils.isNotEmpty(arr[indexMap.get(param.getField())])
&& DateFormatUtil.dateGreaterThan(param.getExpectedValue(), arr[indexMap.get(param.getField())], DateFormatUtil.YMD_BAR);
}));
}
if (ValueFromEnum.CSV_FIELD.name().equals(param.getExpectedFrom())) {
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
return StringUtils.isNotEmpty(arr[indexMap.get(param.getField())])
&& DateFormatUtil.dateGreaterThan(arr[indexMap.get(param.getExpectedValue())], arr[indexMap.get(param.getField())], DateFormatUtil.YMD_BAR);
}));
}
}
return filterCsv;
}
}
package com.hsbc.risk.frtbsa.parse.parser;
import com.hsbc.risk.frtbsa.parse.dto.ParseParam;
import com.hsbc.risk.frtbsa.parse.enumType.ValueFromEnum;
import com.hsbc.risk.frtbsa.parse.enumType.ValueTypeEnum;
import com.hsbc.risk.frtbsa.utils.SplitUtil;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
public class EqualParser implements IParser<String> {
@Override
public PCollection filterCsv(PCollection<String> csv, ParseParam param, HashMap<String, Integer> indexMap) {
PCollection<String> filterCsv = null;
if (ValueTypeEnum.STRING.name().equals(param.getExpectedValueType())) {
if (ValueFromEnum.DB_FIELD.name().equals(param.getExpectedFrom())) {
if (StringUtils.equalsIgnoreCase("NULL", param.getExpectedValue())) {
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
return StringUtils.isEmpty(arr[indexMap.get(param.getField())]);
}));
} else {
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
return StringUtils.isNotEmpty(arr[indexMap.get(param.getField())])
&& StringUtils.equals(arr[indexMap.get(param.getField())], param.getExpectedValue());
}));
}
}
if (ValueFromEnum.CSV_FIELD.name().equals(param.getExpectedFrom())) {
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
return StringUtils.isNotEmpty(arr[indexMap.get(param.getField())])
&& StringUtils.equals(arr[indexMap.get(param.getField())], arr[indexMap.get(param.getExpectedValue())]);
}));
}
}
return filterCsv;
}
}
package com.hsbc.risk.frtbsa.parse.parser;
import com.hsbc.risk.frtbsa.parse.dto.ParseParam;
import com.hsbc.risk.frtbsa.parse.enumType.ValueFromEnum;
import com.hsbc.risk.frtbsa.parse.enumType.ValueTypeEnum;
import com.hsbc.risk.frtbsa.utils.SplitUtil;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.util.HashMap;
public class InParser implements IParser<String>, Serializable {
@Override
public PCollection<String> filterCsv(PCollection<String> csv, ParseParam param, HashMap<String, Integer> indexMap) {
PCollection<String> filterCsv = null;
if (ValueTypeEnum.STRING.name().equals(param.getExpectedValueType())) {
if (ValueFromEnum.DB_FIELD.name().equals(param.getExpectedFrom())) {
String[] values = SplitUtil.split(param.getExpectedValue());
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
return StringUtils.isNotEmpty(arr[indexMap.get(param.getField())])
&& getIsMatch(values, 0, values[0], arr[indexMap.get(param.getField())], new boolean[values.length]);
}));
} else if (ValueFromEnum.CSV_FIELD.name().equals(param.getExpectedFrom())) {
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
String[] values = SplitUtil.split(arr[indexMap.get(param.getExpectedValue())]);
return StringUtils.isNotEmpty(arr[indexMap.get(param.getField())])
&& getIsMatch(values, 0, values[0], arr[indexMap.get(param.getField())], new boolean[values.length]);
}));
}
}
return filterCsv;
}
private boolean getIsMatch(String[] values, int index, String value1, String value2, boolean[] flags) {
if (StringUtils.equals(value1, value2)) {
flags[index] = true;
} else {
flags[index] = false;
}
index = index + 1;
if (index == values.length) {
return getFinallyFlag(flags);
} else {
return getIsMatch(values, index, values[index], value2, flags);
}
}
private boolean getFinallyFlag(boolean[] flags) {
for (boolean flag : flags) {
if (true == flag) {
return true;
}
}
return false;
}
}
package com.hsbc.risk.frtbsa.parse.parser;
import com.hsbc.risk.frtbsa.parse.dto.ParseParam;
import org.apache.beam.sdk.values.PCollection;
import java.util.HashMap;
public interface IParser<T> {
PCollection<T> filterCsv(PCollection<T> csv, ParseParam param, HashMap<String, Integer> indexMap);
}
package com.hsbc.risk.frtbsa.parse.parser;
import com.hsbc.risk.frtbsa.parse.dto.ParseParam;
import com.hsbc.risk.frtbsa.utils.SplitUtil;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import java.util.HashMap;
public class IsNotNumericParser implements IParser<String> {
@Override
public PCollection filterCsv(PCollection<String> csv, ParseParam param, HashMap<String, Integer> indexMap) {
PCollection<String> filterCsv = null;
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
return StringUtils.isNotEmpty(arr[indexMap.get(param.getField())])
&& !NumberUtils.isCreatable(arr[indexMap.get(param.getField())]);
}));
return filterCsv;
}
}
package com.hsbc.risk.frtbsa.parse.parser;
import com.hsbc.risk.frtbsa.parse.dto.ParseParam;
import com.hsbc.risk.frtbsa.utils.SplitUtil;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import java.util.HashMap;
public class IsNumericParser implements IParser<String> {
@Override
public PCollection filterCsv(PCollection<String> csv, ParseParam param, HashMap<String, Integer> indexMap) {
PCollection<String> filterCsv = null;
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
return StringUtils.isNotEmpty(arr[indexMap.get(param.getField())])
&& NumberUtils.isCreatable(arr[indexMap.get(param.getField())]);
}));
return filterCsv;
}
}
package com.hsbc.risk.frtbsa.parse.parser;
import com.hsbc.risk.frtbsa.parse.dto.ParseParam;
import com.hsbc.risk.frtbsa.parse.enumType.ValueFromEnum;
import com.hsbc.risk.frtbsa.parse.enumType.ValueTypeEnum;
import com.hsbc.risk.frtbsa.utils.SplitUtil;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
public class LengthEqualParser implements IParser<String> {
@Override
public PCollection filterCsv(PCollection<String> csv, ParseParam param, HashMap<String, Integer> indexMap) {
PCollection<String> filterCsv = null;
if (ValueTypeEnum.STRING.name().equals(param.getExpectedValueType())) {
if (ValueFromEnum.DB_FIELD.name().equals(param.getExpectedFrom())) {
//int length = param.getExpectedValue().length();
int length = Integer.parseInt(param.getExpectedValue());
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
return StringUtils.isNotEmpty(arr[indexMap.get(param.getField())])
&& arr[indexMap.get(param.getField())].length() == length;
}));
}
if (ValueFromEnum.CSV_FIELD.name().equals(param.getExpectedFrom())) {
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
int length = Integer.parseInt(arr[indexMap.get(param.getExpectedValue())]);
return StringUtils.isNotEmpty(arr[indexMap.get(param.getField())])
&& arr[indexMap.get(param.getField())].length() == length;
}));
}
}
return filterCsv;
}
}
package com.hsbc.risk.frtbsa.parse.parser;
import com.hsbc.risk.frtbsa.parse.dto.ParseParam;
import com.hsbc.risk.frtbsa.parse.enumType.ValueFromEnum;
import com.hsbc.risk.frtbsa.parse.enumType.ValueTypeEnum;
import com.hsbc.risk.frtbsa.utils.SplitUtil;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
public class LengthNotEqualParser implements IParser<String> {
@Override
public PCollection filterCsv(PCollection<String> csv, ParseParam param, HashMap<String, Integer> indexMap) {
PCollection<String> filterCsv = null;
if (ValueTypeEnum.STRING.name().equals(param.getExpectedValueType())) {
if (ValueFromEnum.DB_FIELD.name().equals(param.getExpectedFrom())) {
//int length = param.getExpectedValue().length();
int length = Integer.parseInt(param.getExpectedValue());
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
return StringUtils.isNotEmpty(arr[indexMap.get(param.getField())])
&& arr[indexMap.get(param.getField())].length() != length;
}));
}
if (ValueFromEnum.CSV_FIELD.name().equals(param.getExpectedFrom())) {
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
int length = Integer.parseInt(arr[indexMap.get(param.getExpectedValue())]);
return StringUtils.isNotEmpty(arr[indexMap.get(param.getField())])
&& arr[indexMap.get(param.getField())].length() != length;
}));
}
}
return filterCsv;
}
}
package com.hsbc.risk.frtbsa.parse.parser;
import com.hsbc.risk.frtbsa.parse.dto.ParseParam;
import com.hsbc.risk.frtbsa.parse.enumType.ValueFromEnum;
import com.hsbc.risk.frtbsa.parse.enumType.ValueTypeEnum;
import com.hsbc.risk.frtbsa.utils.SplitUtil;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
public class NotEqualParser implements IParser<String> {
@Override
public PCollection filterCsv(PCollection<String> csv, ParseParam param, HashMap<String, Integer> indexMap) {
PCollection<String> filterCsv = null;
if (ValueTypeEnum.STRING.name().equals(param.getExpectedValueType())) {
if (ValueFromEnum.DB_FIELD.name().equals(param.getExpectedFrom())) {
if (StringUtils.equalsIgnoreCase("NULL", param.getExpectedValue())) {
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
return StringUtils.isNotEmpty(arr[indexMap.get(param.getField())]);
}));
} else {
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
return StringUtils.isNotEmpty(arr[indexMap.get(param.getField())])
&& !StringUtils.equals(arr[indexMap.get(param.getField())], param.getExpectedValue());
}));
}
}
if (ValueFromEnum.CSV_FIELD.name().equals(param.getExpectedFrom())) {
filterCsv = csv.apply(Filter.by(s -> {
String[] arr = SplitUtil.split(s);
return StringUtils.isNotEmpty(arr[indexMap.get(param.getField())])
&& StringUtils.equals(arr[indexMap.get(param.getField())], arr[indexMap.get(param.getExpectedValue())]);
}));
}
}
return filterCsv;
}
}
package com.hsbc.risk.frtbsa.parse.transform;
import com.hsbc.risk.frtbsa.domain.bean.BaseBean;
import org.apache.beam.sdk.transforms.DoFn;
public class BaseBeanToCsvFn extends DoFn<BaseBean, String> {
@ProcessElement
public void processElement(ProcessContext context) {
BaseBean bean = context.element();
Object[] objects = bean.getObjects();
StringBuffer sb = new StringBuffer();
for (int i = 0; i < objects.length - 7; i++) {
if (i == 0) {
sb.append(objects[i]);
} else {
sb.append(",").append(objects[i]);
}
}
context.output(sb.toString());
}
}
package com.hsbc.risk.frtbsa.parse.transform;
import com.hsbc.risk.frtbsa.domain.bean.BaseBean;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
public class CombineBaseBeanFn {
public static PCollection<KV<String, BaseBean>> getKvBean(PCollection<BaseBean> pBean){
return pBean.apply(MapElements.via(new SimpleFunction<BaseBean, KV<String, BaseBean>>() {
@Override
public KV<String, BaseBean> apply(BaseBean bean) {
return KV.of(bean.getUuid(), bean);
}
}));
}
}
package com.hsbc.risk.frtbsa.parse.transform;
import com.hsbc.risk.frtbsa.domain.bean.BaseBean;
import com.hsbc.risk.frtbsa.utils.LineConvert;
import org.apache.beam.sdk.transforms.DoFn;
import java.util.HashMap;
import java.util.UUID;
public class CsvToBaseBeanFn extends DoFn<String, BaseBean> {
private HashMap<String, Integer> cdeIndexMap;
private LineConvert convert;
public CsvToBaseBeanFn(LineConvert convert, HashMap<String, Integer> cdeIndexMap){
this.convert = convert;
this.cdeIndexMap = cdeIndexMap;
}
@ProcessElement
public void processElement(ProcessContext context) {
Object[] objects = convert.convertLine2Arr(context.element(), cdeIndexMap.size());
BaseBean bean = new BaseBean();
bean.setObjects(objects);
bean.setUuid(UUID.randomUUID().toString());
context.output(bean);
}
}
package com.hsbc.risk.frtbsa.parse.transform;
import com.hsbc.risk.frtbsa.domain.bean.BaseBean;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.values.KV;
public class KVToBaseBeanFn extends DoFn<KV<String, CoGbkResult>, BaseBean> {
private int kvLength;
public KVToBaseBeanFn(int kvLength) {
this.kvLength = kvLength;
}
@ProcessElement
public void processElement(ProcessContext context) {
KV<String, CoGbkResult> kv = context.element();
CoGbkResult cgr = kv.getValue();
for (int i = 0; i < kvLength; i++) {
Iterable<BaseBean> itr = cgr.getAll("kv" + i);
if (itr.iterator().hasNext()) {
for (BaseBean bean : itr) {
context.output(bean);
}
}
}
}
}
这篇关于parse的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!