上篇文章我们介绍[利用管道模式来进行业务编排]的2种实现方式。本文又来介绍其他实现方式
1、新建管道实体
@Data @AllArgsConstructor @NoArgsConstructor public class PipelineDefinition { public static final String PREFIX = "lybgeek_pipeline_"; private String comsumePipelineName; private List<String> pipelineClassNames; }
@Data @AllArgsConstructor @NoArgsConstructor @ConfigurationProperties(prefix = PipelineDefinitionProperties.PREFIX) public class PipelineDefinitionProperties { public final static String PREFIX = "lybgeek.pipeline"; private List<PipelineDefinition> chain; }
2、编写自动装配类
@Configuration @EnableConfigurationProperties(PipelineDefinitionProperties.class) public class PipelineAutoConfiguration implements BeanFactoryAware,InitializingBean, SmartInitializingSingleton { @Autowired private PipelineDefinitionProperties pipelineDefinitionProperties; private DefaultListableBeanFactory defaultListableBeanFactory; @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { defaultListableBeanFactory = (DefaultListableBeanFactory)beanFactory; } private void registerPipeline(DefaultListableBeanFactory defaultListableBeanFactory, PipelineDefinition pipelineDefinition) { LinkedBlockingDeque linkedBlockingDeque = buildPipelineQuque(pipelineDefinition); GenericBeanDefinition beanDefinition = (GenericBeanDefinition) BeanDefinitionBuilder.genericBeanDefinition(ChannelPipeline.class).getBeanDefinition(); beanDefinition.getPropertyValues().addPropertyValue("channelHandlers",linkedBlockingDeque); defaultListableBeanFactory.registerBeanDefinition(PipelineDefinition.PREFIX + pipelineDefinition.getComsumePipelineName() ,beanDefinition); } @SneakyThrows private LinkedBlockingDeque buildPipelineQuque(PipelineDefinition pipelineDefinition) { List<String> pipelineClassNames = pipelineDefinition.getPipelineClassNames(); if(CollectionUtil.isEmpty(pipelineClassNames)){ throw new PipelineException("pipelineClassNames must config"); } LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque(); for (String pipelineClassName : pipelineClassNames) { Class<?> pipelineClassClass = Class.forName(pipelineClassName); if(!AbstactChannelHandler.class.isAssignableFrom(pipelineClassClass)){ throw new PipelineException("pipelineClassNames must be 【com.github.lybgeek.pipeline.handler.AbstactChannelHandler】 subclass"); } Object pipeline = pipelineClassClass.getDeclaredConstructor().newInstance(); linkedBlockingDeque.addLast(pipeline); } return linkedBlockingDeque; } @Override public void afterPropertiesSet() throws Exception { if(CollectionUtil.isNotEmpty(pipelineDefinitionProperties.getChain())){ for (PipelineDefinition pipelineDefinition : pipelineDefinitionProperties.getChain()) { registerPipeline(defaultListableBeanFactory, pipelineDefinition); } } } @Override public void afterSingletonsInstantiated() { Map<String, ChannelPipeline> pipelineBeanMap = defaultListableBeanFactory.getBeansOfType(ChannelPipeline.class); pipelineBeanMap.forEach((key,bean)->{ bean.setHandlerContext(ChannelHandlerContext.getCurrentContext()); }); } }
3、编写spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.github.lybgeek.pipeline.spring.autoconfigure.PipelineAutoConfiguration\
示例:
1、创建管道执行器
@Slf4j public class UserCheckChannelHandler extends AbstactChannelHandler { @Override public boolean handler(ChannelHandlerContext chx) { ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest(); System.out.println("yml------------------------------------步骤一:用户数据校验【"+channelHandlerRequest.getRequestId()+"】"); Object params = channelHandlerRequest.getParams(); if(params instanceof User){ User user = (User)params; if(StringUtils.isBlank(user.getFullname())){ log.error("用户名不能为空"); return false; } return true; } return false; } }
@Slf4j public class UserFillUsernameAndEmailChannelHandler extends AbstactChannelHandler { @SneakyThrows @Override public boolean handler(ChannelHandlerContext chx) { ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest(); System.out.println("yml------------------------------------步骤二:用户名以及邮箱填充【将汉语转成拼音填充】【"+channelHandlerRequest.getRequestId()+"】"); Object params = channelHandlerRequest.getParams(); if(params instanceof User){ User user = (User)params; String fullname = user.getFullname(); HanyuPinyinOutputFormat hanyuPinyinOutputFormat = new HanyuPinyinOutputFormat(); hanyuPinyinOutputFormat.setToneType(HanyuPinyinToneType.WITHOUT_TONE); String username = PinyinHelper.toHanYuPinyinString(fullname, hanyuPinyinOutputFormat); user.setUsername(username); user.setEmail(username + "@qq.com"); return true; } return false; } }
。。。其他执行器具体查看链接代码
2、配置yml文件
lybgeek: pipeline: chain: - comsumePipelineName: userYmlService pipelineClassNames: - com.github.lybgeek.pipeline.spring.test.yml.handler.UserCheckChannelHandler - com.github.lybgeek.pipeline.spring.test.yml.handler.UserFillUsernameAndEmailChannelHandler - com.github.lybgeek.pipeline.spring.test.yml.handler.UserPwdEncryptChannelHandler - com.github.lybgeek.pipeline.spring.test.yml.handler.UserMockSaveChannelHandler - com.github.lybgeek.pipeline.spring.test.yml.handler.UserPrintChannleHandler
3、具体业务service引入管道bean
@Service public class UserYmlServiceImpl implements UserYmlService { @Autowired private ApplicationContext applicationContext; @Override public boolean save(User user) { ChannelPipeline pipeline = applicationContext.getBean(ChannelPipeline.class,PipelineDefinition.PREFIX + StringUtils.uncapitalize(UserYmlService.class.getSimpleName())); return pipeline.start(ChannelHandlerRequest.builder().params(user).build()); } }
4、测试
@Test public void testPipelineYml(){ boolean isOk = userYmlService.save(user); Assert.assertTrue(isOk); }
1、定义xsd约束文件pipeline.xsd
<?xml version="1.0" encoding="UTF-8" standalone="no"?> <xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:tool="http://www.springframework.org/schema/tool" xmlns="http://lybgeek.github.com/schema/pipeline" targetNamespace="http://lybgeek.github.com/schema/pipeline"> <xsd:import namespace="http://www.w3.org/XML/1998/namespace"/> <xsd:import namespace="http://www.springframework.org/schema/beans" schemaLocation="http://www.springframework.org/schema/beans/spring-beans.xsd"/> <xsd:import namespace="http://www.springframework.org/schema/tool"/> <xsd:annotation> <xsd:documentation> <![CDATA[ Namespace support for pipeline services ]]></xsd:documentation> </xsd:annotation> <xsd:complexType name="pipelineType"> <xsd:choice> <xsd:element ref="pipelineHandler" minOccurs="1" maxOccurs="unbounded"/> </xsd:choice> <xsd:attribute name="id" type="xsd:ID"> <xsd:annotation> <xsd:documentation><![CDATA[ The unique identifier for a bean. ]]></xsd:documentation> </xsd:annotation> </xsd:attribute> <xsd:attribute name="consumePipelinesServiceClassName" type="xsd:string" use="required"> <xsd:annotation> <xsd:documentation><![CDATA[ consumePipelinesService class name ]]></xsd:documentation> </xsd:annotation> </xsd:attribute> <xsd:attribute name="consumePipelinesMethod" type="xsd:string" use="required"> <xsd:annotation> <xsd:documentation><![CDATA[ consumePipelinesMethod name ]]></xsd:documentation> </xsd:annotation> </xsd:attribute> <xsd:attribute name="argsType" type="xsd:string" use="required"> <xsd:annotation> <xsd:documentation><![CDATA[ consumePipelinesMethod args type , multiple args types are separated by commas ]]></xsd:documentation> </xsd:annotation> </xsd:attribute> </xsd:complexType> <xsd:complexType name="pipelineHandlerType"> <xsd:attribute name="className" type="xsd:string" use="required"> <xsd:annotation> <xsd:documentation><![CDATA[ pipelineHanlder class name]]></xsd:documentation> </xsd:annotation> </xsd:attribute> <xsd:attribute name="order" type="xsd:string" use="required"> <xsd:annotation> <xsd:documentation><![CDATA[ pipeline class name]]></xsd:documentation> </xsd:annotation> </xsd:attribute> </xsd:complexType> <xsd:element name="pipelineHandler" type="pipelineHandlerType"> <xsd:annotation> <xsd:documentation><![CDATA[ The pipelineHandler config ]]></xsd:documentation> </xsd:annotation> </xsd:element> <xsd:element name="pipeline" type="pipelineType"> <xsd:annotation> <xsd:documentation><![CDATA[ The pipeline config ]]></xsd:documentation> </xsd:annotation> </xsd:element> </xsd:schema>
2、配置xsd约束文件
在classpath下的resources文件夹新建META-INF文件夹,再建立一个文件spring.schemas,内容如下
http\://lybgeek.github.com/schema/pipeline/pipeline.xsd=META-INF/pipeline.xsd
3、定义解析自定义标签的类
public class PipelineNamespaceHandler extends NamespaceHandlerSupport { @Override public void init() { registerBeanDefinitionParser("pipeline",new PipelineBeanDefinitionParser()); } }
public class PipelineBeanDefinitionParser implements BeanDefinitionParser { @Override public BeanDefinition parse(Element element, ParserContext parserContext) { PipelineConfig pipelineConfig = buildPipelineConfig(element); List<HandlerInvotation> handlerInvotations = this.buildHandlerInvotations(pipelineConfig); GenericBeanDefinition beanDefinition = getGenericBeanDefinition(element, parserContext, pipelineConfig, handlerInvotations); return beanDefinition; } private GenericBeanDefinition getGenericBeanDefinition(Element element, ParserContext parserContext, PipelineConfig pipelineConfig, List<HandlerInvotation> handlerInvotations) { GenericBeanDefinition beanDefinition = (GenericBeanDefinition) BeanDefinitionBuilder.genericBeanDefinition().getBeanDefinition(); beanDefinition.getPropertyValues().addPropertyValue("pipelineServiceClz",pipelineConfig.getConsumePipelinesService()); beanDefinition.getPropertyValues().addPropertyValue("handlerInvotations",handlerInvotations); beanDefinition.getPropertyValues().addPropertyValue("createByXml",true); beanDefinition.setBeanClass(ComsumePipelineFactoryBean.class); String beanName = BeanUtils.generateBeanName(element,"id",parserContext,pipelineConfig.getConsumePipelinesService().getSimpleName()); parserContext.getRegistry().registerBeanDefinition(beanName,beanDefinition); return beanDefinition; } @SneakyThrows private List<HandlerInvotation> buildHandlerInvotations(PipelineConfig pipelineConfig){ List<HandlerInvotation> handlerInvotations = new ArrayList<>(); for (PipelineHandlerConfig pipelineHandlerConfig : pipelineConfig.getPipelineChain()) { if(!AbstactChannelHandler.class.isAssignableFrom(pipelineHandlerConfig.getPipelineClass())){ throw new PipelineException("pipelineHandler className must be 【com.github.lybgeek.pipeline.handler.AbstactChannelHandler】 subclass"); } AbstactChannelHandler channelHandler = (AbstactChannelHandler) pipelineHandlerConfig.getPipelineClass().getDeclaredConstructor().newInstance(); HandlerInvotation invotation = HandlerInvotation.builder() .args(pipelineConfig.getArgs()) .handler(channelHandler) .order(pipelineHandlerConfig.getOrder()) .consumePipelinesMethod(pipelineConfig.getConsumePipelinesMethod()) .build(); handlerInvotations.add(invotation); } return handlerInvotations; } @SneakyThrows private PipelineConfig buildPipelineConfig(Element element){ String argsType = element.getAttribute("argsType"); String[] argsTypeArr = trimArrayElements(commaDelimitedListToStringArray(argsType)); String consumePipelinesMethod = element.getAttribute("consumePipelinesMethod"); String consumePipelinesServiceClassName = element.getAttribute("consumePipelinesServiceClassName"); Class[] args = null; if(ArrayUtil.isNotEmpty(argsTypeArr)){ args = new Class[argsTypeArr.length]; for (int i = 0; i < argsTypeArr.length; i++) { Class argType = Class.forName(argsTypeArr[i]); args[i] = argType; } } List<PipelineHandlerConfig> pipelineHandlerConfigs = buildPipelineHandlerConfig(element); return PipelineConfig.builder().args(args) .consumePipelinesMethod(consumePipelinesMethod) .consumePipelinesService(Class.forName(consumePipelinesServiceClassName)) .pipelineChain(pipelineHandlerConfigs) .build(); } @SneakyThrows private List<PipelineHandlerConfig> buildPipelineHandlerConfig(Element element){ NodeList nodeList = element.getChildNodes(); if (nodeList == null) { return Collections.emptyList(); } List<PipelineHandlerConfig> pipelineHandlerConfigs = new ArrayList<>(); for (int i = 0; i < nodeList.getLength(); i++) { if (!(nodeList.item(i) instanceof Element)) { continue; } Element childElement = (Element) nodeList.item(i); if ("pipelineHandler".equals(childElement.getNodeName()) || "pipelineHandler".equals(childElement.getLocalName())) { String pipelineHanlderClassName = childElement.getAttribute("className"); String pipelineHanlderOrder = childElement.getAttribute("order"); Class pipelineHanlderClass = Class.forName(pipelineHanlderClassName); PipelineHandlerConfig pipelineHandlerConfig = PipelineHandlerConfig.builder() .PipelineClass(pipelineHanlderClass) .order(Integer.valueOf(pipelineHanlderOrder)) .build(); pipelineHandlerConfigs.add(pipelineHandlerConfig); } } return pipelineHandlerConfigs; } }
4、注册解析类
在META-INF文件夹新建spring.handlers文件,内容如下
http\://lybgeek.github.com/schema/pipeline=com.github.lybgeek.pipeline.spring.shema.PipelineNamespaceHandler
示例:
1、创建管道执行器
@Slf4j public class UserCheckChannelHandler extends AbstactChannelHandler { @Override public boolean handler(ChannelHandlerContext chx) { ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest(); System.out.println("XML------------------------------------步骤一:用户数据校验【"+channelHandlerRequest.getRequestId()+"】"); String json = JSON.toJSONString(channelHandlerRequest.getParams()); List<User> users = JSON.parseArray(json,User.class); if(CollectionUtil.isEmpty(users) || StringUtils.isBlank(users.get(0).getFullname())){ log.error("用户名不能为空"); return false; } return true; } }
@Slf4j public class UserFillUsernameAndEmailChannelHandler extends AbstactChannelHandler { @SneakyThrows @Override public boolean handler(ChannelHandlerContext chx) { ChannelHandlerRequest channelHandlerRequest = chx.getChannelHandlerRequest(); System.out.println("XML------------------------------------步骤二:用户名以及邮箱填充【将汉语转成拼音填充】【"+channelHandlerRequest.getRequestId()+"】"); String json = JSON.toJSONString(channelHandlerRequest.getParams()); List<User> users = JSON.parseArray(json,User.class); if(CollectionUtil.isNotEmpty(users)){ User user = users.get(0); String fullname = user.getFullname(); HanyuPinyinOutputFormat hanyuPinyinOutputFormat = new HanyuPinyinOutputFormat(); hanyuPinyinOutputFormat.setToneType(HanyuPinyinToneType.WITHOUT_TONE); String username = PinyinHelper.toHanYuPinyinString(fullname, hanyuPinyinOutputFormat); user.setUsername(username); user.setEmail(username + "@qq.com"); return true; } return false; } }
。。。其他执行器具体查看链接代码
2、定义管道xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:lybgeek="http://lybgeek.github.com/schema/pipeline" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://lybgeek.github.com/schema/pipeline http://lybgeek.github.com/schema/pipeline/pipeline.xsd"> <lybgeek:pipeline consumePipelinesServiceClassName="com.github.lybgeek.pipeline.spring.test.xml.service.UserXmlService" consumePipelinesMethod="save" argsType="com.github.lybgeek.pipeline.spring.test.model.User"> <lybgeek:pipelineHandler className="com.github.lybgeek.pipeline.spring.test.xml.handler.UserCheckChannelHandler" order="1"/> <lybgeek:pipelineHandler className="com.github.lybgeek.pipeline.spring.test.xml.handler.UserFillUsernameAndEmailChannelHandler" order="2"/> <lybgeek:pipelineHandler className="com.github.lybgeek.pipeline.spring.test.xml.handler.UserPwdEncryptChannelHandler" order="3"/> <lybgeek:pipelineHandler className="com.github.lybgeek.pipeline.spring.test.xml.handler.UserMockSaveChannelHandler" order="4"/> <lybgeek:pipelineHandler className="com.github.lybgeek.pipeline.spring.test.xml.handler.UserPrintChannleHandler" order="5"/> </lybgeek:pipeline>
3、创建业务管道类
public interface UserXmlService { boolean save(User user); }
直接定义接口即可
4、项目启动类上加上@ImportResource(“classpath:/pipeline.xml”)
@SpringBootApplication @ImportResource("classpath:/pipeline.xml") public class SpringPipelineApplication { public static void main(String[] args) { SpringApplication.run(SpringPipelineApplication.class); } }
5、测试
@Test public void testPipelineXml(){ boolean isOk = userXmlService.save(user); Assert.assertTrue(isOk); }
本文的管道模式的核心逻辑内核和上篇文章是一样,只是把管道执行器通过配置文件集中管理起来,这个后续维护也比较不容易出错