Google

Dec 20, 2012

Spring batch advanced tutorial -- writing your own reader


My previous 3 part spring batch tutorial covered a high level overview with examples. This tutorial demonstrates how to wrap your own File Reader  with the FileItemReader to peek the data and group them the way you wanted to provide some customization. For example, if you have a CSV file like shown below where

  1. The first line is the header with the portfolio name, transaction from date and transaction to date.
  2. The remaining rows are transaction detail records grouped by account code. The detail records contain portfolio code, account code, transaction type, and transaction amount.
  3. When you read the records we need to read them by account grouping and process them. in other words, we have 2 groups in the csv feed shown below. The records 2- 5 is one group, and records 6 -8 is another group. In other words starting from the transaction type "OPENBAL" to the record before the next "OPENBAL" is one group


"Portfolio1","29/02/2012","11/03/2012",
"Portfolio1","Account1","OPENBAL", 2000.00
"Portfolio1  ","Account1","PURCHASE",1000.00
"Portfolio1  ","Account1","EXPENSE",500.00
"Portfolio1  ","Account1","ADJUSTMENT ", 200.00
"Portfolio1","Account1","OPENBAL ", 12000.00
"Portfolio1  ","Account2","PURCHASE",1000.00
"Portfolio1  ","Account3","ADJUSTMENT",1000.00


So, wee need to write a custom file reader that can peek into the next record before reading it.

Step 1: Snippets of the spring batch context configuration file.E..g.applicationContext-myapp.xml.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
 xmlns:task="http://www.springframework.org/schema/task" xmlns:context="http://www.springframework.org/schema/context"
 xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
 xmlns:int="http://www.springframework.org/schema/integration"
 xmlns:file="http://www.springframework.org/schema/integration/file"
 xmlns:util="http://www.springframework.org/schema/util"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
 http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd
 http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
 http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
 http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
 http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file-2.0.xsd
 http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd
 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

     <!-- load properties file-->
 <context:property-placeholder location="classpath:myapp.properties" />
 <!-- annotation driven injection -->
 <tx:annotation-driven />

 <!-- define the job that reads from a CSV file and write to a database-->
 <job id="myAppJob" xmlns="http://www.springframework.org/schema/batch">
  <listeners>
   <listener ref="myAppJobExecutionListener" />
  </listeners>

  <step id="loadMyAppFeedData">
   <tasklet transaction-manager="transactionManager">
    <listeners>
     <listener ref="stepExecutionListener" />
    </listeners>

    <chunk reader="groupMyAppDetailsReader" writer="myAppFileItemWriter" commit-interval="10" />
   </tasklet>
  </step>
  
 </job>

 
    <!-- Spring supplied File Item Reader that reads CSV file line by line--> 
 <bean id="myAppFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
  <property name="resource" value="#{jobParameters['dataFileName']}" />
  <property name="lineMapper">
   <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
    <property name="lineTokenizer">
     <bean
      class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
      <property name="names"
       value="portfolioCd,accountCd,transactionType, Amount" />
     </bean>
    </property>
    <property name="fieldSetMapper">
     <bean
      class="com.myapp.mapper.MyAppFieldSetMapper" />
    </property>
   </bean>
  </property>
  <property name="linesToSkip" value="1" />
  <property name="skippedLinesCallback" ref="myAppFileHeaderLineCallbackHandler" />
 </bean>

    <!-- My custom CSV file Reader that groups data but it internally makes use of the Spring's FileItemReader-->
 <bean id="groupMyAppDetailsReader"  class="com.myapp.item.reader.myAppItemReader">
  <property name="delegate" ref="myAppFileItemReader" />
 </bean>

 <!-- My custome File Item Writer -->
 <bean id="myAppFileItemWriter" class="com.myapp.item.writer.MyAppItemWriter" />

    <!-- The Step execution context listener that can be injected to propagate step values -->
 <bean id="stepExecutionListener" class="com.myapp.StepExecutionListenerCtxInjecter" />


</beans>


Step 2: The custom reader can be implemented as shown below. The key here is that  peeking the next record to enable grouping and making use of the Spring provided FileItemReader as a delegate to read each CSV line.

package com.myapp.item.reader;

import java.util.ArrayList;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

import com.myapp.model.TransactionDetail;
import com.myapp.model.MyAppPortfolioParent;

public class MyAppFileItemReader implements ItemStreamReader<MyAppPortfolioParent> {

 private ItemStreamReader<TransactionDetail> delegate;
 private TransactionDetail curItem = null;

 @Override
 public MyAppPortfolioParent read() {
  MyAppPortfolioParent parent = null;
  try {

   if (curItem == null) {
    curItem = delegate.read();
   }

   if (curItem != null) {
    parent = new MyAppPortfolioParent();
    parent.setBalanceDetail(curItem);
   }

   curItem = null;

   if (parent != null) {
    parent.setTxnDetails(new ArrayList<TransactionDetail>());
    TransactionDetail detail = peek();
    while (detail != null && !"OPENBAL".equalsIgnoreCase(peek().getTxnCd())) {
     parent.getTxnDetails().add(curItem);
     curItem = null;
     detail = peek();
    }
   }

  }

  catch (Exception e) {
   e.printStackTrace();
  }

  return parent;
 }

 public TransactionDetail peek() throws Exception, UnexpectedInputException, ParseException {
  if (curItem == null) {
   curItem = delegate.read();
  }

  return curItem;
 }

 @Override
 public void close() throws ItemStreamException {
  delegate.close();
 }

 @Override
 public void open(ExecutionContext arg0) throws ItemStreamException {
  delegate.open(arg0);
 }

 @Override
 public void update(ExecutionContext arg0) throws ItemStreamException {
  delegate.update(arg0);

 }

 public void setDelegate(ItemStreamReader<TransactionDetail> delegate) {
  this.delegate = delegate;
 }
}



Step 3: The utility class that can be used to inject the step and job execution contexts into your reader, processor, or writer classes.

package com.myapp.util;

import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ExecutionContext;


public class StepExecutionListenerCtxInjecter
{
    private ExecutionContext stepExecutionCtx;
    
    private ExecutionContext jobExecutionCtx;
    
    @BeforeStep
    public void beforeStep(StepExecution stepExecution)
    {
        stepExecutionCtx = stepExecution.getExecutionContext();
        jobExecutionCtx = stepExecution.getJobExecution().getExecutionContext();
    }


    public ExecutionContext getStepExecutionCtx()
    {
        return stepExecutionCtx;
    }

    public ExecutionContext getJobExecutionCtx()
    {
        return jobExecutionCtx;
    }
}


Step 4: As you could see in the spring config file that we are skipping the first record, which is the header record and defined a LineCallBackHandler to handle the header records. Here is the implementation of this handler.

package com.myapp.handler;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.file.LineCallbackHandler;
import org.springframework.stereotype.Component;

import com.myapp.dao.MyAppingDao;
import com.myapp.model.MyAppMeta;
import com.myapp.util.CashforecastingUtil;
import com.myapp.util.StepExecutionListenerCtxInjecter;

@Component(value = "myAppFileHeaderLineCallbackHandler")
public class MyAppFileHeaderCallbackHandler implements LineCallbackHandler {

 private static final Logger LOGGER = LoggerFactory.getLogger(MyAppFileHeaderCallbackHandler.class);

 public static final String FEED_HEADER_DATA = "feedHeaderData";

 @Resource(name = "myappFeedDao")
 private MyAppDao myappDao;

 @Resource(name = "stepExecutionListener")
 private StepExecutionListenerCtxInjecter stepExecutionListener;

 @Override
 public void handleLine(String headerLine) {
  LOGGER.debug("header line: {}", headerLine);
  //convert CSV data into 
  MyAppMeta cfMeta = MyAppUtil.getMyAppMetaFromHeader(headerLine, null);

  // logical delete current records
  int noOfRecordsLogicallyDeleted = myappDao.logicallyDelete(cfMeta);
  LOGGER.info("No of records logically deleted: " + noOfRecordsLogicallyDeleted);

  //save it in the job execution context
  stepExecutionListener.getJobExecutionCtx().put(FEED_HEADER_DATA, cfMeta);
 }
}



Step 5: The FileItemReader has a mapper defined to map each row to an object. We need to define this object that gets invoked when each CSV line item is read to convert each field to an object as shown below.

package com.myapp.mapper;
import java.math.BigDecimal;
import java.text.ParseException;

import org.apache.commons.lang.time.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

import com.myapp.model.MyAppDetail;


public class MyAppFieldSetMapper implements FieldSetMapper<MyAppDetail> {
 
 private final static Logger logger = LoggerFactory.getLogger(CashForecastFieldSetMapper.class);
 
 
 @Override
 public MyAppDetail mapFieldSet(FieldSet fs) throws BindException {

  if (fs == null) {
   return null;
  }
  
  MyAppDetail detail = new MyAppDetail();
  detail.setPortfolioCd(fs.readString("portfolioCd"));
  detail.setAccountCd(fs.readString("accountCd"));
  detail.setTxnCd(fs.readString("txnCd"));
  
  BigDecimal cashValue = fs.readBigDecimal("cashValue");
  detail.setCashValue(cashValue != null ? cashValue : BigDecimal.ZERO);
  
  
  return detail;

 }
}



Step 6: The writer class that is responsible for writing a group of items (i.e. parent and children records)  to the database. 

package com.myapp.item.writer;

import java.util.List;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemWriter;

import com.myapp.dao.myappingDao;
import com.myapp.handler.myappFileHeaderCallbackHandler;
import com.myapp.model.myappDetail;
import com.myapp.model.myappMeta;
import com.myapp.model.myappParent;
import com.myapp.util.StepExecutionListenerCtxInjecter;

public class MyAppItemWriter implements ItemWriter<MyAppParent> {

 @Resource(name = "stepExecutionListener")
 private StepExecutionListenerCtxInjecter stepExecutionListener; // to get the step and job contexts

 @Resource(name = "myappFeedDao")
 private myappingDao myappDao;   //dao class for saving records into database

 private final static Logger logger = LoggerFactory.getLogger(MyappItemWriter.class);

 @Override
 public void write(List portfolioDetails) {

     //retrieving previously stored data from the job context
  myappMeta pfMeta = (myappMeta) stepExecutionListener.getJobExecutionCtx().get(
    MyAppFileHeaderCallbackHandler.FEED_HEADER_DATA);

  int batchJobId = -1;

   //retrieving previously stored data from the job context
  if (stepExecutionListener.getJobExecutionCtx().get("batchJobId") != null) {
   batchJobId = stepExecutionListener.getJobExecutionCtx().getInt("batchJobId");
  }
  pfMeta.setBatchJobId(batchJobId);

  try {
   for (myappParent cfp : portfolioDetails) {

    MyappDetail bd = cfp.getBalanceDetail();

    // save cash forcasting balances
    int noOfRecords = myappDao.saveMyappBalance(bd, pfMeta);
    logger.info("No of cashforcast balance records inserted " + noOfRecords);

    int syntheticId = myappDao.getmyappId(bd, pfMeta);

    // save myapping transaction records
    List<Myappdetail> txnDetails = cfp.getTxnDetails();
    for (myappDetail txd : txnDetails) {
     myappDao.saveMyappDetail(txd, syntheticId);
    }

   }

  } catch (Exception e) {
   logger.error("myappItemWriter error", e);
   throw new RuntimeException(e);
  }

  if (logger.isDebugEnabled()) {
   logger.debug("Commiting chunks to the database ...... ");
  }

 }
}



Labels:

5 Comments:

Blogger A.K said...

Thanks a lot to post it, example is really very needful.

9:01 PM, July 24, 2013  
Blogger ModernPhilosopher said...

Nicely written example.

9:11 AM, March 19, 2014  
Blogger Jay Singh said...

Hi ,
I am reading file from Database and then writing in fixed line file format (No comma) ,I would like to include Header and Footer in file.
My Header format is prefixed with HD
HD-RECORD-TYPE-IDENTIFIER(2)
HD-REC-SEQ-NUM(5) its line number
HD-SUBSCRIBER-ID(10)
HD-DATE-TIME-STAMP(10)
HD-REJECT-CODE(10)
FILLER
Body detail format is as below and prefixed with (DT)
HD-RECORD-TYPE-IDENTIFIER(2)
HD-REC-SEQ-NUM(5)
DT-CREDIT-CARD-NUMBER
DT-PAN-SEQ-NUMBER
DT-CUSTOMER-NUMBER
DT-EXPIRY-DATE
DT-SE-NUM
DT-DECISION-INDICATOR
similary footer format is and prefixed with 'FT'
FOOTER FT-RECORD-TYPE-IDENTIFIER(2)
FOOTER FT-REC-SEQ-NUM(5)
FOOTER FT-NO-OF-RECORDS
HD0000100011000220111212022424
DT000020002376000390521009
FT000030034000000052

6:49 AM, April 05, 2014  
Blogger Unknown said...

Refer Spring batch doco where fixed file format is described.

9:58 PM, April 07, 2014  
Blogger Rajesh Kommuri said...

can you post an example to use ItemReader and HibernateItemWriter in SpringBatch....

Thanks In Advance

9:55 PM, May 07, 2014  

Post a Comment

Subscribe to Post Comments [Atom]

<< Home