如何解决与JMS的Spring Batch集成
当前正在从事Spring Batch项目,我们在应用程序启动时启动工作。我的春季批处理工作(多线程步骤)包括:
- 使用JmsItemReader读取jms消息
- 处理消息,将其转换为模型对象。
- 在提交数据库之前先在writer中调用一些rest服务。
但是,在研究表明最佳的解决方案是使用spring集成ServiceActivator来启动作业之后,我们现在希望在将消息添加到队列时就开始作业。我面临的问题是,一旦启动作业并执行了Jms项目读取器,消息就不再在队列中,因为它们已被ChannelPublishingJmsMessageListener占用。
我的代码:
@Bean
public JmsItemReader<Message> reader() {
JmsItemReader<Message> itemReader = new JmsItemReader<>();
itemReader.setItemType(Message.class);
itemReader.setJmsTemplate(jmsTemplate());
return itemReader;
}
// Jobs et Steps
@Bean
Step stepDetectionIncoherencesLiq(@Autowired StepBuilderFactory steps) {
int threadSize = Integer.parseInt(env.getProperty(PropertyConstant.THREAD_POOL_SIZE));
return steps.get("stepDetectionIncoherencesLiq").<Message,DetectionIncoherenceLiqJmsOut>chunk(1)
.reader(reader())
.processor(processor())
.writer(writer()).readerIsTransactionalQueue().faultTolerant()
.taskExecutor(taskExecutor()).throttleLimit(threadSize)
.listener(stepListener()).build();
}
@Bean
// @DependsOn({"getJobRepository"})
Job job(@Autowired JobBuilderFactory jobs,@Qualifier("stepDetectionIncoherencesLiq") Step stepDetectionIncoherencesLiq) {
LOGGER.error("Creation bean job ");
return jobs.get("job")
.incrementer(new RunIdIncrementer())
.start(stepDetectionIncoherencesLiq).build();
}
具有spring集成实现:
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer defmessageListenerContainer = new SimpleMessageListenerContainer();
defmessageListenerContainer.setConnectionFactory((ConnectionFactory) queueConnectionFactory().getObject());
defmessageListenerContainer.setDestination((Destination) jmsQueue().getObject());
// defmessageListenerContainer.setSessionTransacted(true);
return defmessageListenerContainer;
}
@Bean
public MessageChannel inputChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel outputChannel() {
return new DirectChannel();
}
@Bean
public JmsMessageDrivenEndpoint jmsMessageDrivenEndpoilnt() {
ChannelPublishingJmsMessageListener channelPublishingJml = new ChannelPublishingJmsMessageListener();
channelPublishingJml.setRequestChannel(inputChannel());
// channelPublishingJml.setReplyChannel(outputChannel());
channelPublishingJml.setMessageConverter(new JmsMessageConverter());
return new JmsMessageDrivenEndpoint(messageListenerContainer(),channelPublishingJml);
}
@Bean
public IntegrationFlow myFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlows.from("outputChannel").handle(jobLaunchingGateway) .handle(logger()).get();
}
@Bean
JobLaunchingGateway jobLaunchingGateway(SimpleJobLauncher jobLauncher) {
return new JobLaunchingGateway(jobLauncher);
}
和服务激活器:
@ServiceActivator(inputChannel = "inputChannel",outputChannel = "outputChannel")
public JobLaunchRequest process(DetectionIncoherenceLiqJmsOut jmsOut) {
log.info("Starting Job");
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
return new JobLaunchRequest(job,jobParametersBuilder.toJobParameters());
}
我是Spring集成的新手,这很困难,我有以下问题:
- 如何在不使用消息的情况下触发工作?
- 任何人都可以提供频道项目阅读器或Tasklet的代码吗?我可以从通道而不是队列中读取消息吗?
- 将jms消息转换为模型对象并将其传递给作业参数是否很好? 感谢您的帮助。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。