Post

[Spring Batch 5] JdbcPagingItemReader/JdbcBatchItemWriter 구현

Spring Batch의 JdbcPagingItemReaderJdbcBatchItemWriter로 DB 데이터를 읽고 쓰는 방법을 알아보자.

1. JdbcPagingItemReader/JdbcBatchItemWriter 개요

1-1. JdbcPagingItemReader 개요

  • Spring Batch에서 제공하는 ItemReader 인터페이스를 구현하는 클래스이다.
  • 페이지 단위 데이터 읽기: 데이터베이스에서 페이지 단위로 데이터를 읽어오는 기능을 제공한다.
  • 대규모 데이터 처리 효율성: 지정된 크기 만큼만 읽어와 메모리 사용량을 최소화하고, Chunk 단위로 커밋 간격을 설정하여 대규모 데이터를 효율적으로 처리할 수 있다.
  • 페이지네이션 방식과 성능 최적화: 각 페이지를 독립적으로 조회하는 페이지네이션 방식을 사용하여 커서 유지 부담을 줄이고 대용량 데이터를 안정적으로 처리할 수 있다.
  • SQL 쿼리 정의: SQL 쿼리를 직접 작성하여 최적화된 데이터 읽기가 가능하다.
  • 커서 제어: 데이터베이스 커서를 사용하여 데이터 순회를 제어할 수 있다.

1-2. JdbcPagingItemReader 주요 구성 요소

  • DataSource: 데이터베이스 연결 정보를 설정한다.
  • SqlQuery: SelectClause, FromClause, WhereClause, SortKeys 구성요소를 통해 데이터를 읽을 SQL 쿼리를 설정한다.
  • RowMapper: SQL 쿼리의 각 결과 행을 Item으로 변환하는 역할을 한다.
  • PageSize: Chunk와는 별도로 페이지 크기를 설정한다.
  • SaveState: 기본적으로 상태 저장을 지원하며, 배치 잡이 중단되었을 때 마지막으로 읽은 위치를 저장하여 재시작 시 이어서 처리할 수 있도록 한다.
  • Exception Handling: 읽기 과정에서 발생하는 예외를 처리하기 위해 다양한 리스너(SkipListener, ReadListener)를 지원한다.

1-3. JdbcBatchItemWriter 개요

  • Spring Batch에서 제공하는 ItemWriter 인터페이스를 구현하는 클래스이다.
  • JDBC를 통한 대량 데이터 저장: JDBC를 통해 데이터를 저장하며, 대량 데이터 저장에 최적화되어 있다.
  • SQL 쿼리 정의: SQL 쿼리를 직접 작성하여 원하는 방식으로 데이터베이스에 저장할 수 있다.
  • 대용량 데이터 처리에 적합: 데이터를 저장할 때 Chunk 기반으로 데이터를 처리하며, 커밋 간격에 따라 데이터베이스에 저장하여 안정적이고 효율적으로 데이터를 처리 힌다.

1-4. JdbcBatchItemWriter 주요 구성 요소

  • DataSource: 데이터베이스 연결 정보를 설정한다.
  • SqlStatementCreator: INSERT, UPDATE, 또는 MERGE와 같은 쿼리를 생성하는 역할을 한다.
  • PreparedStatementSetter: INSERT 쿼리의 파라미터 값을 설정하는 역할을 한다.
  • ItemSqlParameterSourceProvider: Item 객체의 필드를 PreparedStatementSetter에 전달할 쿼리 파라미터 값을 생성하는 역할을 한다.



2. JdbcPagingItemReader 구현

JdbcPagingItemReader를 활용하여 db의 customer 테이블로 부터 데이터를 읽어들이고 flatfile(csv)로 저장하는 로직을 구현해보자.

2-1. 코드 구현

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@Slf4j
@Configuration
public class JdbcPagingReaderJobConfig {

  /**
   * CHUNK 크기를 지정한다.
   */
  public static final int CHUNK_SIZE = 2;
  public static final String ENCODING = "EUC-KR";
  public static final String JDBC_PAGING_CHUNK_JOB = "JDBC_PAGING_CHUNK_JOB";

  @Autowired
  DataSource dataSource;

  @Bean
  public JdbcPagingItemReader<Customer> jdbcPagingItemReader() throws Exception {

    Map<String, Object> parameterValue = new HashMap<>();
    parameterValue.put("age", 20);

    return new JdbcPagingItemReaderBuilder<Customer>()
      .name("jdbcPagingItemReader")
      .fetchSize(CHUNK_SIZE)
      .dataSource(dataSource)
      .rowMapper(new BeanPropertyRowMapper<>(Customer.class))
      .queryProvider(queryProvider())
      .parameterValues(parameterValue)
      .build();
  }

  @Bean
  public PagingQueryProvider queryProvider() throws Exception {
    SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
    queryProvider.setDataSource(dataSource);  // DB 에 맞는 PagingQueryProvider 를 선택하기 위함
    queryProvider.setSelectClause("id, name, age, gender");
    queryProvider.setFromClause("from customer");
    queryProvider.setWhereClause("where age >= :age");

    Map<String, Order> sortKeys = new HashMap<>(1);
    sortKeys.put("id", Order.DESCENDING);

    queryProvider.setSortKeys(sortKeys);

    return queryProvider.getObject();
  }

  @Bean
  public FlatFileItemWriter<Customer> customerFlatFileItemWriter() {
    return new FlatFileItemWriterBuilder<Customer>()
      .name("customerFlatFileItemWriter")
      .resource(new FileSystemResource("./output/customer_new_v1.csv"))
      .encoding(ENCODING)
      .delimited().delimiter("\t")
      .names("Name", "Age", "Gender")
      .build();
  }

  @Bean
  public Step customerJdbcPagingStep(JobRepository jobRepository,
                                     PlatformTransactionManager transactionManager) throws Exception {
    log.info("------------------ Init customerJdbcPagingStep -----------------");

    return new StepBuilder("customerJdbcPagingStep", jobRepository)
      .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
      .reader(jdbcPagingItemReader())
      .writer(customerFlatFileItemWriter())
      .build();
  }

  @Bean
  public Job customerJdbcPagingJob(Step customerJdbcPagingStep, JobRepository jobRepository) {
    log.info("------------------ Init customerJdbcPagingJob -----------------");
    return new JobBuilder(JDBC_PAGING_CHUNK_JOB, jobRepository)
      .incrementer(new RunIdIncrementer())
      .start(customerJdbcPagingStep)
      .build();
  }
}
  • JdbcPagingItemReader 빈을 생성하고 dataSource, rowMapper, queryProvider 등을 설정한다.
  • SqlPagingQueryProviderFactoryBean로 데이터를 조회하기 위한 쿼리 정보를 설정할 수 있다.
    • SelectClause : select에서 프로젝션할 필드 이름을 지정한다.
    • FromClause : 조회할 테이블 이름을 지정한다.
    • WhereClause : 조건절을 지정한다.
    • SortKeys : 정렬 기준을 지정한다.
  • parameterValue에는 queryProvider에서 사용할 변수(:age) 값을 설정한다.
  • FlatFileItemWriter 빈을 등록하여 output 디렉토리에 customer_new_v1.csv 파일을 생성한다.
  • Job, Step 빈을 생성하고 chunk, reader, writer를 설정한다.

2-2. 실행 및 결과

img_1.png

일단 customer 테이블을 생성하고 위와 같이 데이터를 추가한 후에 배치를 실행하였다.

img.png

로그를 통해 JDBC_PAGING_CHUNK_JOB이 실행되어 customerJdbcPagingStep이 실행되는 것을 확인할 수 있었다.
읽어들인 데이터가 csv로 잘 저장되었는지 확인해보자.

img_2.png

customer_new_v1.csv 파일이 생성되었고, 데이터가 잘 저장된 것을 확인할 수 있다.



3. JdbcBatchItemWriter 구현

JdbcBatchItemWriter를 활용하여 flatfile에서 데이터를 읽어들인 후 customer2 테이블에 저장하는 로직을 구현해보자.

img_3.png

읽어들일 customer.csv 파일을 생성해준다.

1
2
3
4
5
6
7
create table study.customer2
(
  id     int auto_increment primary key,
  name   varchar(100) null,
  age    int null,
  gender varchar(10) null
);

읽어들인 데이터를 저장할 테이블도 생성한다.

3-1. 코드 구현

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@Slf4j
@Configuration
public class JdbcBatchItemJobConfig {

  /**
   * CHUNK 크기를 지정한다.
   */
  public static final int CHUNK_SIZE = 100;
  public static final String ENCODING = "UTF-8";
  public static final String JDBC_BATCH_WRITER_CHUNK_JOB = "JDBC_BATCH_WRITER_CHUNK_JOB";

  @Autowired
  DataSource dataSource;

  @Bean
  public FlatFileItemReader<Customer> flatFileItemReader() {

    return new FlatFileItemReaderBuilder<Customer>()
      .name("FlatFileItemReader")
      .resource(new ClassPathResource("./customer.csv"))
      .encoding(ENCODING)
      .delimited().delimiter(",")
      .names("name", "age", "gender")
      .targetType(Customer.class)
      .build();
  }

  @Bean
  public JdbcBatchItemWriter<Customer> flatFileItemWriter() {

    return new JdbcBatchItemWriterBuilder<Customer>()
      .dataSource(dataSource)
      .sql("INSERT INTO customer2 (name, age, gender) VALUES (:name, :age, :gender)")
      .itemSqlParameterSourceProvider(new CustomerItemSqlParameterSourceProvider())
      .build();
  }

  @Bean
  public Step flatFileStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    log.info("------------------ Init flatFileStep -----------------");

    return new StepBuilder("flatFileStep", jobRepository)
      .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
      .reader(flatFileItemReader())
      .writer(flatFileItemWriter())
      .build();
  }

  @Bean
  public Job flatFileJob(Step flatFileStep, JobRepository jobRepository) {
    log.info("------------------ Init flatFileJob -----------------");
    return new JobBuilder(JDBC_BATCH_WRITER_CHUNK_JOB, jobRepository)
      .incrementer(new RunIdIncrementer())
      .start(flatFileStep)
      .build();
  }
}

public class CustomerItemSqlParameterSourceProvider implements ItemSqlParameterSourceProvider<Customer> {
  @Override
  public SqlParameterSource createSqlParameterSource(Customer item) {
    return new BeanPropertySqlParameterSource(item);
  }
}
  • FlatFileItemReader 빈을 생성하여 읽어들일 파일 정보를 저장한다.
  • FlatFileItemWriter 빈을 생성하고 dataSource, sql, itemSqlParameterSourceProvider를 설정한다.
    • sql : 데이터를 저장할 쿼리를 작성한다.
    • itemSqlParameterSourceProvider : item으로 부터 sql에 전달할 파라미터 정보를 생성한다.
  • Job, Step 빈을 생성하고 chunk, reader, writer를 설정한다.
  • CustomerItemSqlParameterSourceProvider 클래스를 생성하여 itemSqlParameterSourceProvider를 구현한다.

3-1. 실행 및 결과

img_4.png

로그를 통해 JDBC_BATCH_WRITER_CHUNK_JOB이 실행되어 flatFileStep이 실행되는 것을 확인할 수 있었다.

1
2
select *
from customer2;

customer2 테이블에 데이터가 잘 들어갔는지 조회해보자.

img_5.png

정상적으로 데이터가 저장된 것을 확인할 수 있다.


참고: DEVOCEAN KIDO님 SpringBatch 연재 05


This post is licensed under CC BY 4.0 by the author.