Spring-batch (ItemProcessor) 数据处理过程

Spring-batch学习总结(五)
学习目标:掌握ItemProcessor
1.ItemProcessor:spring-batch中数据处理的过程
2.ItemProcessor主要用于实现业务逻辑,验证,过滤,等
3.Spring-batch为我们提供ItemProcessor<I,O>这个接口,它包含一个方法O process(I item
4.我们用代码进行演示:
例:我们读取数据库表person_buf中的数据,将其id为奇数的数据剔除,将读出name进行字母大写转换
首先观察数据库表数据结构:

Spring-batch (ItemProcessor) 数据处理过程

代码:
Person

package com.dhcc.batch.batchDemo.processor;

import java.util.Date;

public class Person {
    private Integer id;
    private String name;
    private String perDesc;
    private Date createTime;
    private Date updateTime;
    private String sex;
    private Float score;
    private Double price;

    public Person() {
        super();
    }

    public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score,
            Double price) {
        super();
        this.id = id;
        this.name = name;
        this.perDesc = perDesc;
        this.createTime = createTime;
        this.updateTime = updateTime;
        this.sex = sex;
        this.score = score;
        this.price = price;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public String getPerDesc() {
        return perDesc;
    }

    public void setPerDesc(String perDesc) {
        this.perDesc = perDesc;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Date getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public Float getScore() {
        return score;
    }

    public void setScore(Float score) {
        this.score = score;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime="
                + updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]";
    }

}

PersonLineAggregator

package com.dhcc.batch.batchDemo.processor;

import org.springframework.batch.item.file.transform.LineAggregator;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class PersonLineAggregator implements LineAggregator<Person> {
    //JSON
    private ObjectMapper mapper=new ObjectMapper();

    @Override
    public String aggregate(Person person) {
        try {
            return mapper.writeValueAsString(person);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("unable to writer...",e);
        }
    }

}

PersonRowMapper

package com.dhcc.batch.batchDemo.processor;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;
/**
 * 实现将数据库中的每条数据映射到Person对象中
 * @author Administrator
 *
 */
public class PersonRowMapper implements RowMapper<Person> {

    /**
     * rs一条结果集,rowNum代表当前行
     */
    @Override
    public Person mapRow(ResultSet rs, int rowNum) throws SQLException {
        return new Person(rs.getInt("id")
                ,rs.getString("name")
                ,rs.getString("per_desc")
                ,rs.getDate("create_time")
                ,rs.getDate("update_time")
                ,rs.getString("sex")
                ,rs.getFloat("score")
                ,rs.getDouble("price"));
    }

}

ProcessorFileApplication

package com.dhcc.batch.batchDemo.processor;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class ProcessorFileApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProcessorFileApplication.class, args);

    }
}

ProcessorFileOutputFromDBConfiguration

package com.dhcc.batch.batchDemo.processor;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;

@Configuration
public class ProcessorFileOutputFromDBConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Autowired
    private ItemProcessor<Person, Person> fristNameUpperCaseProcessor;

    @Autowired
    private ItemProcessor<Person, Person> idFilterProcessor;

    @Bean
    public Job ProcessorFileOutputFromDBJob() {
        return jobBuilderFactory.get("ProcessorFileOutputFromDBJob")
                .start(ProcessorFileOutputFromDBStep())
                .build();

    }

    @Bean
    public Step ProcessorFileOutputFromDBStep() {
        return stepBuilderFactory.get("ProcessorFileOutputFromDBStep")
                .<Person, Person>chunk(100)
                .reader(ProcessorFileOutputFromItemWriter())
                .processor(personDataProcessor())
                .writer(ProcessorFileOutputFromItemReader())
                .build();
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<Person> ProcessorFileOutputFromItemWriter() {
        JdbcPagingItemReader<Person> reader = new JdbcPagingItemReader<>();
        reader.setDataSource(this.dataSource); // 设置数据源
        reader.setFetchSize(100); // 设置一次最大读取条数
        reader.setRowMapper(new PersonRowMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 设置查询的列
        queryProvider.setFromClause("from person_buf"); // 设置要查询的表
        Map<String, Order> sortKeys = new HashMap<String, Order>();// 定义一个集合用于存放排序列
        sortKeys.put("id", Order.ASCENDING);// 按照升序排序
        queryProvider.setSortKeys(sortKeys);
        reader.setQueryProvider(queryProvider);// 设置排序列
        return reader;
    }

    @Bean
    public CompositeItemProcessor<Person, Person> personDataProcessor(){
        CompositeItemProcessor<Person, Person> processor=new CompositeItemProcessor<>();
        List<ItemProcessor<Person, Person>> listProcessor=new ArrayList<>();
        listProcessor.add(fristNameUpperCaseProcessor);
        listProcessor.add(idFilterProcessor);
        processor.setDelegates(listProcessor);
        return processor;

    }

    @Bean
    @StepScope
    public FlatFileItemWriter<Person> ProcessorFileOutputFromItemReader() {
        FlatFileItemWriter<Person> writer = new FlatFileItemWriter<Person>();
        try {
            File path = new File("D:" + File.separator + "newPerson.json").getAbsoluteFile();
            System.out.println("file is create in :" + path);
            writer.setResource(new FileSystemResource(path));
            writer.setLineAggregator(new PersonLineAggregator());
            writer.afterPropertiesSet();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return writer;
    }

}

FristNameUpperCaseProcessor

package com.dhcc.batch.batchDemo.processor;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

@Component
public class FristNameUpperCaseProcessor implements ItemProcessor<Person, Person> {

    @Override
    public Person process(Person item) throws Exception {
        return new Person(item.getId(), item.getName().toUpperCase(), item.getPerDesc(), item.getCreateTime(),
                item.getUpdateTime(), item.getSex(), item.getScore(), item.getPrice());
    }

}

IdFilterProcessor

package com.dhcc.batch.batchDemo.processor;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

@Component
public class IdFilterProcessor implements ItemProcessor<Person, Person> {

    @Override
    public Person process(Person item) throws Exception {
        if (item.getId() % 2 == 0) {
            return item;
        } else {
            return null;
        }
    }

}

运行结果:

Spring-batch (ItemProcessor) 数据处理过程


观察写入完成后的文件:

Spring-batch (ItemProcessor) 数据处理过程


可以看出我们已经完成了我们的目标

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


这篇文章主要介绍了spring的事务传播属性REQUIRED_NESTED的原理介绍,具有一定借鉴价值,需要的朋友可以参考下。下面就和我一起来看看吧。传统事务中回滚点的使...
今天小编给大家分享的是一文解析spring中事务的传播机制,相信很多人都不太了解,为了让大家更加了解,所以给大家总结了以下内容,一起往下看吧。一定会有所收获...
这篇文章主要介绍了SpringCloudAlibaba和SpringCloud有什么区别,具有一定借鉴价值,需要的朋友可以参考下。下面就和我一起来看看吧。Spring Cloud Netfli...
本篇文章和大家了解一下SpringCloud整合XXL-Job的几个步骤。有一定的参考价值,有需要的朋友可以参考一下,希望对大家有所帮助。第一步:整合pom文件,在S...
本篇文章和大家了解一下Spring延迟初始化会遇到什么问题。有一定的参考价值,有需要的朋友可以参考一下,希望对大家有所帮助。List 坑列表 = new ArrayList(2);...
这篇文章主要介绍了怎么使用Spring提供的不同缓存注解实现缓存的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇...
本篇内容主要讲解“Spring中的@Autowired和@Resource注解怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学...
今天小编给大家分享一下SpringSecurity怎么定义多个过滤器链的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家
这篇文章主要介绍“Spring的@Conditional注解怎么使用”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Spring的@Con...
这篇文章主要介绍了SpringCloudGateway的熔断限流怎么配置的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringCloud&nb...
今天小编给大家分享一下怎么使用Spring解决循环依赖问题的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考
这篇文章主要介绍“Spring事务及传播机制的原理及应用方法是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Sp...
这篇“SpringCloudAlibaba框架实例应用分析”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价
本篇内容主要讲解“SpringBoot中怎么使用SpringMVC”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习...
这篇文章主要介绍“SpringMVC适配器模式作用范围是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“SpringMVC
这篇“导入SpringCloud依赖失败如何解决”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家...
这篇文章主要讲解了“SpringMVC核心DispatcherServlet处理流程是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来
今天小编给大家分享一下SpringMVCHttpMessageConverter消息转换器怎么使用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以...
这篇文章主要介绍“Spring框架实现依赖注入的原理是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Spring框架...
本篇内容介绍了“Spring单元测试控制Bean注入的方法是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下