如何解决Spring Batch远程分区工作者步骤无法并行启动
我们正在尝试从本地分区移动到远程分区。我们面临的问题是远程步骤分区无法并行运行。
下面是当前在本地分区中成功并行运行的代码
@Bean
@Qualifier("masterStep")
public Step masterStep(final JpaRepositoryItemWriter<TutoPeople> writer) throws Exception {
return stepBuilderFactory.get("masterStep").partitioner("slaveStep",partitioner()).step(workerStep(writer))
.gridSize(8)).build();
}
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(30);
taskExecutor.setCorePoolSize(20);
//taskExecutor.setQueueCapacity(threadQueuePoolSize);
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
下面是新代码,它们不是并行运行而是按顺序执行。
管理员代码-主用户
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow() {
final KafkaProducerMessageHandler kafkaMessageHandler = new KafkaProducerMessageHandler(kafkaTemplate);
kafkaMessageHandler.setTopicExpression(new LiteralExpression("batchtopic"));
return IntegrationFlows
.from(requests())
.handle(kafkaMessageHandler)
.get();
}
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public ResourcePartitioner partitioner() throws IOException {
return new ResourcePartitioner().setResources(getResources());
}
@Bean
public Job partitionerJob(final BatchJobExecutionListener listener,final Step managerStep) {
return jobBuilderFactory.get("partitionerJob")
.listener(listener)
.start(managerStep)
.build();
}
@Bean
public Step managerStep() throws Exception {
return remoteStepBuilderFactory.get("managerStep")
.partitioner("workerStep",partitioner())
.gridSize(8)
.outputChannel(requests())
.inputChannel(replies())
.build();
}
private Resource[] getResources() throws IOException {
final File inputDir = new FileSystemResource(this.dataPath).getFile();
if (!inputDir.exists() || !inputDir.isDirectory())
throw new IOException("Bad input configuration");
return Arrays.stream(inputDir.listFiles()).map(FileSystemResource::new).toArray(FileSystemResource[]::new);
}
工人代码
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow() {
final ContainerProperties containerProps = new ContainerProperties("batchtopic");
final KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(kafkaFactory,containerProps);
final KafkaMessageDrivenChannelAdapter kafkaMessageChannel = new KafkaMessageDrivenChannelAdapter(container);
return IntegrationFlows
.from(kafkaMessageChannel)
.channel(requests())
.get();
}
@Bean
@StepScope
public FlatFileItemReader<TutoPeople> reader(@Value("#{stepExecutionContext[filePath]}") String filePath) {
return new FlatFileItemReaderBuilder<TutoPeople>()
.name("personReader")
.resource(new FileSystemResource(filePath))
.delimited()
.names(new String[]{"firstName","lastName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<TutoPeople>() {{
setTargetType(TutoPeople.class);
}})
.build();
}
@Bean
@StepScope
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
@StepScope
public JpaRepositoryItemWriter<TutoPeople> writer(final TutoPeopleRepository repository) {
return new JpaRepositoryItemWriter<>(repository);
}
@Bean
public Step workerStep(final JpaRepositoryItemWriter<TutoPeople> writer) {
return remoteStepBuilderFactory.get("workerStep")
.inputChannel(requests())
.outputChannel(replies())
.startLimit(5)
.<TutoPeople,TutoPeople>chunk(5)
.reader(reader(null))
.processor(processor())
.writer(writer)
.build();
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。