Google

Jul 17, 2013

Spring Batch code snippets

This post compliments some of the other Spring batch tutorials Spring batch part1 and Spring batch part 2 and Spring batch advanced.



A sample spring context file (e.g. applicationContext-test.xml)

  
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
 xmlns:task="http://www.springframework.org/schema/task" xmlns:context="http://www.springframework.org/schema/context"
 xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
 xmlns:int="http://www.springframework.org/schema/integration"
 xmlns:p="http://www.springframework.org/schema/p"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
 http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd
 http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
 http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
 http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
 
    <bean id="jdbcTestTemplate" class="org.springframework.jdbc.core.JdbcTemplate" p:dataSource-ref="dataSource" />
 <bean id="myappDao" class="myapp.dao.MyappDaoImpl" />
 <bean id="myappFeedDao" class="myapp.dao.MyappDaoImpl" />
 
 
 <bean id="jobRepository" class="myapp.batch.repository.support.NoopJobRepository"/>
 
 <bean id="jobExecutorThreadPool"
  class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
  <property name="corePoolSize" value="1" />
  <property name="threadNamePrefix" value="jobExecutorThread-" />
 </bean>
 
 <bean id="jobLauncher"
  class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
  <property name="jobRepository" ref="jobRepository" />
  <!-- <property name="taskExecutor" ref="jobExecutorThreadPool" /> -->
 </bean>
 
 <job id="myappJob" xmlns="http://www.springframework.org/schema/batch">
  <listeners>
   <listener ref="myappJobExecutionListener" />
  </listeners>
  <step id="loadMyappFeedData" >

   <tasklet transaction-manager="txnManager">
    <listeners>
     <listener ref="stepExecutionListener" />
     <listener ref="myappFileImportListener" />
    </listeners>

    <chunk reader="groupCfDetailsReader" writer="myappFileItemWriter"
     commit-interval="1" />
   </tasklet>
  </step>
 </job>
 
 
    <!-- ============= readers, writers and listeners ============== -->
 <bean id="myappFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
  <property name="resource" value="#{jobParameters['dataFileName']}" />
   
  <property name="lineMapper">
   <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
    <property name="lineTokenizer">
     <bean
      class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
      <property name="names"
       value="accountCd,currency,txnCd, amount" />
     </bean>
    </property>
    <property name="fieldSetMapper">
     <bean
      class="myapp.mapper.MyappFieldSetMapper" />
    </property>
   </bean>
  </property>
  <property name="linesToSkip" value="1" />
  <property name="skippedLinesCallback" ref="cfFileHeaderLineCallbackHandler" />

 </bean>

 <bean id="groupCfDetailsReader"
  class="myapp.item.reader.MyappItemReader">
  <property name="delegate" ref="myappFileItemReader" />
 </bean>

 <bean id="myappFileItemWriter"
  class="myapp.item.writer.MyappItemWriter" />
  
  <bean id="myappMetaFileItemWriter"
  class="myapp.item.writer.MyappMetaFileItemWriter" />

 <bean id="stepExecutionListener"
  class="myapp.util.StepExecutionListenerCtxInjecter" />
  
    <bean id="myappFileImportListener"
  class="myapp.listener.MyappFileImportListener"
  scope="step" />
 
</beans>


Q. How do you kick off a batch job?
A. Here is an example of a test class kicking off the batch job. It also demonstrates how to add job parameters like "dataFileName". The "jobLauncher" and "myappJob" are injected from the above context file.

 
package myapp.batch.myapp;
//..
import java.math.BigDecimal;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import javax.annotation.Resource;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.test.StepScopeTestExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestExecutionListeners;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.support.DependencyInjectionTestExecutionListener;
import org.springframework.test.context.transaction.TransactionConfiguration;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;

@RunWith(SpringJUnit4ClassRunner.class)
@TestExecutionListeners(
{
    DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class
})
@TransactionConfiguration(transactionManager = "txManager", defaultRollback = true)
@Transactional
@ContextConfiguration(locations =
{"/applicationContext-test.xml"
})

public class MyappBatchJobTest
{
    
    private final static Logger logger = LoggerFactory.getLogger(MyappBatchJobTest.class);
    
 private static final String BATCH_JOB_ID_FIELD_NAME = "batchJobIdFieldName";
    private static final String BATCH_JOB_ID_FIELD_VALUE = "batchJobId";
    private static final String BATCH_ID = "batchId";
    
    @Autowired
    private JobLauncher jobLauncher;
    
    @Resource(name = "myappJob")
    private Job job;
    

    public int kickOffTheBatchJob(String dataFileName)
            throws JobExecutionAlreadyRunningException,
            JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException, ParseException
    {
        
  //Adding job paramters
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("dataFileName", dataFileName)
                .addString("metaDataFileName", metadataFileName)
                .addString(BATCH_JOB_ID_FIELD_NAME, BATCH_JOB_ID_FIELD_VALUE)
                .addString(BATCH_ID, getBatchId())
                .addDate("valuationDate", getValuationDate())
                .addLong("parentBatchJobId", PARENT_BATCHJOB_ID)
                .toJobParameters();
        
        JobExecution jobExecution = jobLauncher.run(job, jobParameters);
        
        int batchJobId = jobExecution.getExecutionContext().getInt("batchJobId");
        
        logger.info("The cash forecast batch job has started ................");
        
        if (positiveTest)
        {
            Assert.assertEquals("job failed: ", "COMPLETED", jobExecution.getExitStatus().getExitCode());
        }
        else
        {
            Assert.assertEquals("job should have failed: ", "FAILED", jobExecution.getExitStatus().getExitCode());
        }
        Assert.assertTrue("Batch job Id is not created: ", batchJobId >= 1);
        
        batchJobIds.add(batchJobId);
        
        return batchJobId;
    }
}


Q. How will you write a custom reader that reads data in chunks as shown below?

Each account has an OPENBAL txnCd record followed by 0 or more transactional records for that account.


A. Here is the custom reader "groupCfDetailsReader" that chunks and delegates each record to "myappFileItemReader" as defined in the above context file.

1. Custom file handler that groups records

 
package myapp.item.reader;

import java.util.ArrayList;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

import myapp.model.MyappDetail;
import myapp.model.MyappParent;

public class MyappItemReader implements ItemStreamReader<MyappParent> {

 private ItemStreamReader<MyappDetail> delegate;
 private MyappDetail curItem = null;

 @Override
 public MyappParent read() {
  MyappParent parent = null;
  try {

   if (curItem == null) {
    curItem = delegate.read();
   }

   if (curItem != null) {
    parent = new MyappParent();
    parent.setBalanceDetail(curItem);
   }

   curItem = null;

   if (parent != null) {
    parent.setTxnDetails(new ArrayList<MyappDetail>());
    MyappDetail detail = peek();
    while (detail != null && !"OPENBAL".equalsIgnoreCase(peek().getTxnCd())) {
     parent.getTxnDetails().add(curItem);
     curItem = null;
     detail = peek();
    }
   }

  }

  catch (Exception e) {
   e.printStackTrace();
  }

  return parent;
 }

 public MyappDetail peek() throws Exception, UnexpectedInputException, ParseException {
  if (curItem == null) {
   curItem = delegate.read();
  }

  return curItem;
 }

 @Override
 public void close() throws ItemStreamException {
  delegate.close();
 }

 @Override
 public void open(ExecutionContext arg0) throws ItemStreamException {
  delegate.open(arg0);
 }

 @Override
 public void update(ExecutionContext arg0) throws ItemStreamException {
  delegate.update(arg0);

 }

 public void setDelegate(ItemStreamReader<MyappDetail> delegate) {
  this.delegate = delegate;
 }
}


2. The model classes used to map data to Java POJOs. The container class.

 
package myapp.model;

import java.io.Serializable;
import java.util.List;

public class MyappParent implements Serializable {

 private static final long serialVersionUID = 1L;
 
 private String metaData;
 private MyappDetail balanceDetail;
 private List<MyappDetail> txnDetails;
 
 public MyappDetail getBalanceDetail() {
  return balanceDetail;
 }
 public void setBalanceDetail(MyappDetail balanceDetail) {
  this.balanceDetail = balanceDetail;
 }
 public List<MyappDetail> getTxnDetails() {
  return txnDetails;
 }
 public void setTxnDetails(List<MyappDetail> txnDetails) {
  this.txnDetails = txnDetails;
 }
 
 public String getMetaData() {
  return metaData;
 }
 public void setMetaData(String metaData) {
  this.metaData = metaData;
 }
 
 @Override
 public String toString() {
  return "MyappHeader [balanceDetail=" + balanceDetail + ", txnDetails=" + txnDetails + "]";
 }
}

The detail class that represents each record.

 
package myapp.model;

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;

public class MyappDetail implements Serializable
{    
    private static final long serialVersionUID = 1L;
  
    private String accountCd;
 private String currencyCd;
    private String txnCd;
    private BigDecimal amount;
    
 //getters, setters, etc  
}

3. The mapper class that maps data to POJO model defined above.

 
package myapp.mapper;

import myapp.model.MyappDetail;

import java.math.BigDecimal;
import java.text.ParseException;

import org.apache.commons.lang.time.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

public class MyappFieldSetMapper implements FieldSetMapper<MyappDetail>
{
    private final static Logger logger = LoggerFactory.getLogger(MyappFieldSetMapper.class);
    
    private static final String DATE_FORMAT = "dd/MM/yyyy";
    
    @Override
    public MyappDetail mapFieldSet(FieldSet fs) throws BindException
    { 
        if (fs == null)
        {
            return null;
        }
        
        MyappDetail appd = new MyappDetail();
        appd.setAccountCd(fs.readString("accountCd"));
  appd.setCurrencyCd(fs.readString("currency"));
        appd.setTxnCd(fs.readString("txnCd"));
        BigDecimal amount = fs.readBigDecimal("amount");
        appd.setAmount(amount != null ? amount : BigDecimal.ZERO);  
       
        return appd;        
    } 
}


Labels:

0 Comments:

Post a Comment

Subscribe to Post Comments [Atom]

<< Home