Google

Dec 15, 2013

Spring batch working with the listeners

Q. Why do you have listeners in Spring batch?
A. Reader --> to read data and Writer --> to write chunked data and processor --> to filter out records before they are passed to the ItemWriter. Filtering is an action distinct from skipping; skipping indicates that a record is invalid whereas filtering simply indicates that a record should not be written.

Listeners are events that can be executed before or after a point in time like before or after executing a step, etc.


Here is an example that uses all the listeners except for the ItemProcessListener.


package com.mysample.job;

import java.util.List;

import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.SkipListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;

public class SampleListener implements ItemWriteListener<List<String>>, StepExecutionListener, JobExecutionListener,
  ItemReadListener<String>, ChunkListener, SkipListener<String, String> {

 @Override
 public void afterRead(String arg0) {
 }
 
 @Override
 public void beforeRead() {
 }

 @Override
 public void onReadError(Exception arg0) {
 }

 @Override
 public void afterJob(JobExecution arg0) {
 }

 @Override
 public void beforeJob(JobExecution arg0) {
 }

 @Override
 public ExitStatus afterStep(StepExecution arg0) {
  return null;
 }

 @Override
 public void beforeStep(StepExecution arg0) {
 }

 @Override
 public void afterWrite(List<? extends List<String>> arg0) {
 }

 @Override
 public void beforeWrite(List<? extends List<String>> arg0) {
 }

 @Override
 public void onWriteError(Exception arg0, List<? extends List<String>> arg1) {
 }

 @Override
 public void onSkipInProcess(String arg0, Throwable arg1) {
 }

 @Override
 public void onSkipInRead(Throwable arg0) {
 }

 @Override
 public void onSkipInWrite(String arg0, Throwable arg1) {
 }

 @Override
 public void afterChunk() {
  
 }

 @Override
 public void beforeChunk() {
 }
}


Q. Where do you declare a listener?
A. In the Spring config file.

    <job id="sampleJob" restartable="false"
  xmlns="http://www.springframework.org/schema/batch">
  <step id="sampleStep">
   <tasklet task-executor="taskExecutor">
    <chunk reader="sampleReader" writer="sampleWriter"
     commit-interval="3">
    </chunk>
    <listeners>
     <listener ref="sampleListener" />
    </listeners>
   </tasklet>
  </step>
 </job>
 
 <bean id="sampleListener" scope="step" class="com.mysample.SampleListener">
  <property name="jobId" value="#{stepExecutionContext[JOB_ID]}" />
  <property name="runDate" value="#{jobParameters['RUN_DATE']}" />
 </bean>


Q. What do you understand by the terms StepExecution, JobExecution, JobExecutionContext, and StepExecutionContext?
A. You can store data between readers and writers using StepExecution or JobExecution. For example, you may want to keep track of the failed accounts by storing them in a job control table. These accounts can be read again from the job control table and rerun in the next job run.

stepExecution.getExecutionContext().put(ValidationJobMapKey.FAILED_ACCOUNTS.name(), failedItems);
jobExecution.getExecutionContext().put(ValidationJobMapKey.JOB_INFO.name(), info);

Here is a more comprehensive sample listener code.

package com.mysample.job;

//...

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import javax.annotation.Resource;

import org.joda.time.Instant;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
//....

/**
 * writes batch run data to the job control table after write and after step
 * failed accounts are stored in the stepExecution context
 */
public class SampleListener implements ItemWriteListener<TradeAccount>, StepExecutionListener {

 private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);

 @Resource(name = "sampleService")
 private SampleService sampleService;

 private Long jobId;
 private Date runDate;
 private Instant startTime;

 private StepExecution stepExecution;
 
 @Override
 public void beforeWrite(List<? extends TradeAccount> items) {
  startTime = new Instant();
 }

 @Override
 public void afterWrite(List<? extends TradeAccount> items) {
  Period period = new Interval(startTime, new Instant()).toPeriod();
  logger.info("Finish call service, duration: " + getDuration(period));
  try {
   JobControlMetaData meta = new JobControlMetaData();
   meta.setJobId(this.jobId);
   meta.setJobRunDate(runDate);
   //Spring batch gives you the ExitStatus
   meta.setJobStatus(ExitStatus.COMPLETED);
   sampleService.updateJobControlTable(meta, items);
  } catch (Exception e) {
   logger.error("Failed to update job control: ", e);
  }

  logger.info("Finish write, duration: " + getDuration(new Interval(startTime, new Instant()).toPeriod()));
 }

 @Override
 /**
   * Transactions are rolleback here.
   */
 public void onWriteError(Exception exception, List<? extends TradeAccount> items) {
  logger.error("Error to process accounts: " + items);
  handleFailedAccounts(exception, items);
 }
 
 /**
  * Store failed accounts in the step execution context
  */
 private synchronized void handleFailedAccounts(Exception exception, List<? extends TradeAccount> items){
  for (TradeAccount tradeAccount : items) {
   tradeAccount.setErrorMsg(exception.getMessage());
  }
  
  @SuppressWarnings("unchecked")
  List<TradeAccount> failedItems = (List<TradeAccount>) stepExecution.getExecutionContext().get(ConstantEnum.FAILED_ACCOUNTS.name());
  if(failedItems == null) {
   failedItems = new ArrayList<TradeAccount>();
  }
  
  failedItems.addAll(items);
  
  //ConstantEnum is enum class with constant values.
  stepExecution.getExecutionContext().put(ConstantEnum.FAILED_ACCOUNTS.name(), failedItems);
 }
 
 public Long getJobId() {
  return jobId;
 }

 public void setJobId(Long jobId) {
  this.jobId = jobId;
 }

 public Date getRunDate() {
  return runDate;
 }

 public void setRunDate(Date runDate) {
  this.runDate = runDate;
 }

 @Override
 /**
  *  Write failed accounts to the job control table
  **/
 public ExitStatus afterStep(StepExecution stepExecution) {
  @SuppressWarnings("unchecked")
  List<TradeAccount> failedItems = (List<TradeAccount>) stepExecution.getExecutionContext().get(ConstantEnum.FAILED_ACCOUNTS.name());
  if(failedItems != null && failedItems.size() > 0) {
   try {
    JobControlMetaData meta = new JobControlMetaData();
    meta.setJobId(this.jobId);
    meta.setJobRunDate(runDate);
    //Spring batch gives you the ExitStatus
    meta.setJobStatus(ExitStatus.FAILED);
    sampleService.updateJobControlTable(meta, failedItems);
   } catch (Exception e) {
    logger.error("Failed to update job control: ",e);
   }
  }
  return null;
 }

 @Override
 public void beforeStep(StepExecution stepExecution) {
  this.stepExecution = stepExecution;
  //Spring allocates job ids. so get it.
  jobId = stepExecution.getJobExecution().getJobId();
  stepExecution.getExecutionContext().put(ConstantEnum.JOB_ID.name(), jobId);
 }
 
 private static String getDuration(Period period) {
  return period.getHours() + ":" + period.getMinutes() + ":" + period.getSeconds() + "." + period.getMillis();
 }
}


Labels:

4 Comments:

Anonymous Anonymous said...

When an exception is thrown in listener. Does it stops the entire process? For Example I have a custom listener that implements ItemProcessListener and in the beforeProcess method there is an IOException. But this exception is being swallowed not sure how because I checked the source code they catch the exception and rethrow. Even the exception is thrown the Process method on the item processor is called. Can you please help me on this >>??

4:59 AM, March 21, 2014  
Blogger Unknown said...

You have methds like writeonerror

8:16 AM, March 21, 2014  
Blogger Unknown said...

Job and Step are not supposed to throw exceptions even if they fail. Your program should be able to retrieve the exceptions from a local execution via the JobExecution or StepExecution objects. Then it can re-throw.

10:13 AM, March 21, 2014  
Anonymous Anonymous said...

I am having the same issue as here - http://stackoverflow.com/questions/24253996/spring-batch-ignore-any-line-that-does-not-match-the-specified-pattern any help?

12:25 PM, June 17, 2014  

Post a Comment

Subscribe to Post Comments [Atom]

<< Home