2

Below is a relevant portion of code for reader, processor , writer and step for batch job that I create.

I have a requirement to update a flag column in table from where data is being read ( source table ) to mark that this data is being processed by this job so other apps don't pick up that data. Then once processing of read records is finished, I need to restore that column to original value so other apps can work on those records too.

I guess, listener is the approach to take ( ItemReadListener ? ) . Reader listener seems suitable only for first step ( i.e to update flag column ) but not for restore at the end of chunk. Challenge seems to be making read data available at the end of processor.

Can anybody suggest about possible approaches?

@Bean
    public Step step1(StepBuilderFactory stepBuilderFactory,
            ItemReader<RemittanceVO> reader, ItemWriter<RemittanceClaimVO> writer,
            ItemProcessor<RemittanceVO, RemittanceClaimVO> processor) {

        return stepBuilderFactory.get("step1")
                .<RemittanceVO, RemittanceClaimVO> chunk(Constants.SPRING_BATCH_CHUNK_SIZE)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .taskExecutor(simpleAsyntaskExecutor)
                .throttleLimit(Constants.THROTTLE_LIMIT)
                .build();
    }

@Bean
    public ItemReader<RemittanceVO> reader() {
        JdbcPagingItemReader<RemittanceVO> reader = new JdbcPagingItemReader<RemittanceVO>();
        reader.setDataSource(dataSource);
        reader.setRowMapper(new RemittanceRowMapper());
        reader.setQueryProvider(queryProvider);
        reader.setPageSize(Constants.SPRING_BATCH_READER_PAGE_SIZE);
        return reader;
    }



@Bean
 public ItemProcessor<RemittanceVO, RemittanceClaimVO> processor() {
            return new MatchClaimProcessor();
        }

@Bean
        public ItemWriter<RemittanceClaimVO> writer(DataSource dataSource) {
            return new MatchedClaimWriter();
        }

I started with Spring Batch few days ago so don't have familiarity with all the provided modeling and patterns.

Sabir Khan
  • 9,826
  • 7
  • 45
  • 98

1 Answers1

2

Firstly, a small hint about using an asyncTaskExecutor: you have to synchronize the reader, otherwise you will run into concurrency problems. You can use SynchronizedItemStreamReader to do this:

@Bean
public Step step1(StepBuilderFactory stepBuilderFactory,
        ItemReader<RemittanceVO> reader, ItemWriter<RemittanceClaimVO> writer,
        ItemProcessor<RemittanceVO, RemittanceClaimVO> processor) {

    return stepBuilderFactory.get("step1")
            .<RemittanceVO, RemittanceClaimVO> chunk(Constants.SPRING_BATCH_CHUNK_SIZE)
            .reader(syncReader)
            .processor(processor)
            .writer(writer)
            .taskExecutor(simpleAsyntaskExecutor)
            .throttleLimit(Constants.THROTTLE_LIMIT)
            .build();
}


@Bean
public ItemReader<RemittanceVO> syncReader() {
    SynchronizedItemStreamReader<RemittanceVO> syncReader = new SynchronizedItemStreamReader<>();

    syncReader.setDelegate(reader());

    return syncReader;
}


@Bean
public ItemReader<RemittanceVO> reader() {
    JdbcPagingItemReader<RemittanceVO> reader = new JdbcPagingItemReader<RemittanceVO>();
    reader.setDataSource(dataSource);
    reader.setRowMapper(new RemittanceRowMapper());
    reader.setQueryProvider(queryProvider);
    reader.setPageSize(Constants.SPRING_BATCH_READER_PAGE_SIZE);
    return reader;
}

Secondly, a possible approach to your real question:

I would use a simple tasklet in order to "mark" the entries you want to process. You can do this with one simple UPDATE-statement, since you know your selection criterias. This way, you only need one call and therefore only one transaction.

After that, I would implement an normal step with reader, processor and writer. The reader has to read only the marked entries, making your select clause also very simple.

In order to restore the flag, you could do that in a third step which is implemented as tasklet and uses an appropriate UPDATE-statement (like the first step). To ensure that the flag is restored in the case of an exception, just configure your jobflow appropriately, so that step 3 is executed even if step 2 fails (-> see my answer to this question Spring Batch Java Config: Skip step when exception and go to next steps)

Of course, you could also restore the flag when writing the chunk if you use a compositeItemWriter. However, you need a strategy how to restore the flag in case of an exception in step 2.

IMO, using a Listener is not a good idea, since the transaction handling is differently.

Community
  • 1
  • 1
Hansjoerg Wingeier
  • 4,274
  • 4
  • 17
  • 25
  • Thanks for the heads up about synchronization about reader. Do you mean to say synchronize the `reader` object in `reader()` bean , before returning? For the time being, I solved problem in question by adding two listeners - reader listener and a processor listener. I added logic to update in `afterRead` & restore in and `afterProcess` methods respectively. – Sabir Khan Aug 29 '16 at 09:53
  • I added the code for using SynchronizedItemStreamReader to my answer. – Hansjoerg Wingeier Aug 29 '16 at 10:01
  • so its just a decorator? – Sabir Khan Aug 29 '16 at 10:04
  • 1
    Yes. It is provided by SpringBatch. You do not have to write it by yourself. – Hansjoerg Wingeier Aug 29 '16 at 10:05