Google

Jan 23, 2014

Java ExecutorService for multi-threading -- coding question and tutorial

Q. Can you code in Java for the following scenario?

Write a multi-threaded SumEngine, which takes  SumRequest with 2 operands (or input numbers to add) as shown below:

package com.mycompany.metrics;

import java.util.UUID;

public class SumRequest {
 
 private String id = UUID.randomUUID().toString();
 private int operand1;
 private int operand2;
 
 protected int getOperand1() {
  return operand1;
 }
 protected void setOperand1(int operand1) {
  this.operand1 = operand1;
 }
 protected int getOperand2() {
  return operand2;
 }
 protected void setOperand2(int operand2) {
  this.operand2 = operand2;
 }
 protected String getId() {
  return id;
 }
 
 @Override
 public String toString() {
  return "SumRequest [id=" + id + ", operand1=" + operand1 + ", operand2=" + operand2 + "]";
 } 
}

and returns a  SumResponse with a result.

package com.mycompany.metrics;

public class SumResponse {
 
 private String requestId;
 private int result;
 
 protected String getRequestId() {
  return requestId;
 }
 protected void setRequestId(String requestId) {
  this.requestId = requestId;
 }
 protected int getResult() {
  return result;
 }
 protected void setResult(int result) {
  this.result = result;
 }
 
 @Override
 public String toString() {
  return "SumResponse [requestId=" + requestId + ", result=" + result + "]";
 }
}

A. Processing a request and returning a response is a very common programming task. Here is a basic sample code to get started.This interface can take any type of object as request and response.

package com.mycompany.metrics;

/**
 * R -- Generic request type, S -- Generic response type 
 */
public interface SumProcessor<R,S> {
 
    abstract S sum(R request);
}

Step 1: Define the interface that performs the sum operation. Take note that generics is used .

package com.mycompany.metrics;

/**
 * R -- Generic request type, S -- Generic response type 
 */
public interface SumProcessor<R,S> {
 
    abstract S sum(R request);
}

Step 2: Define the implementation for the above interface. Takes SumRequest and returns SumResponse. 

package com.mycompany.metrics;

public class SumProcessorImpl<R,S> implements SumProcessor<SumRequest, SumResponse> {

 @Override
 public SumResponse sum(SumRequest request) {
  System.out.println(Thread.currentThread().getName() + " processing request .... " + request);
  SumResponse resp= new SumResponse();
  resp.setRequestId(request.getId());
  resp.setResult(request.getOperand1() + request.getOperand2());
  return resp;
 }
}

Step 3: Write the multi-threaded  SumEngine. The entry point is the public method execute(SumRequest... request ) that takes 1 or more SumRequest as input via varargs. ExecutorService is the thread pool and closure of Callable interface is the executable task that can be submitted to the pool to be executed by the available thread.


package com.mycompany.metrics;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

public class SumEngine {

private final AtomicInteger requestsCount = new AtomicInteger();

 ExecutorService executionService = null;

 //executes requests to sum
 public void execute(SumRequest... request) {
  executionService = Executors.newFixedThreadPool(5); //create a thread pool
  List<Callable<SumResponse>> tasks = createExecuteTasks(request);
  List<Future<SumResponse>> results = execute(tasks);
  for (Future<SumResponse> result : results) {

   try {
    System.out.println(Thread.currentThread().getName() + ": Response = " + result.get());
   } catch (InterruptedException e) {
    e.printStackTrace();
   } catch (ExecutionException e) {
    e.printStackTrace();
   }
  }
                 
   //initiates an orderly shutdown of thread pool
   executionService.shutdown();
 }

 //create tasks
 private List<Callable<SumResponse>> createExecuteTasks(SumRequest[] requests) {
  List<Callable<SumResponse>> tasks = new LinkedList<Callable<SumResponse>>();
  executingRequests(requests.length);
  for (SumRequest req : requests) {
   Callable<SumResponse> task = createTask(req);
   tasks.add(task);
  }

  return tasks;
 }

 //increment the requests counter
 private void executingRequests(int count) {
  requestsCount.addAndGet(count);
 }

 //creates callable (i.e executable or runnable tasks) 
 private Callable<SumResponse> createTask(final SumRequest request) {
  // anonymous implementation of Callable.
  // Pre Java 8's way of creating closures
  Callable<SumResponse> task = new Callable<SumResponse>() {

   @Override
   public SumResponse call() throws Exception {
    System.out.println(Thread.currentThread().getName() + ": Request = " + request);
    SumProcessor<SumRequest, SumResponse> processor = new SumProcessorImpl<>();
    SumResponse result = processor.sum(request);
    return result;
   }

  };

  return task;
 }

 //executes the tasks
 private <T> List<Future<T>> execute(List<Callable<T>> tasks) {

  List<Future<T>> result = null;
  try {
   //invokes the sum(sumRequest) method by executing the closure call() inside createTask
   result = executionService.invokeAll(tasks);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }

  return result;

 }
 
 public int getRequestsCount(){
  return requestsCount.get();
 }
}

Step 4: Write the SumEngineTest to run the engine with the main method. Loops through numbers 1 to 5 and adds each consecutive numbers like 1+2=3, 2+3=5, 3+4=7, 4+5=9, and 5+6 = 11.

package com.mycompany.metrics;

import java.util.ArrayList;
import java.util.List;

public class SumEngineTest {

 public static void main(String[] args) throws Exception {

  SumEngine se = new SumEngine();
  
  List<SumRequest> list = new ArrayList<>();

  // sums 1+2, 2+3, 3+4, etc
  for (int i = 1; i <= 5; i++) {
   SumRequest req = new SumRequest();
   req.setOperand1(i);
   req.setOperand2(i + 1);
   list.add(req);
  }

  SumRequest[] req = new SumRequest[list.size()];
  se.execute((SumRequest[]) list.toArray(req));

 }
}

The output is:

pool-1-thread-2: Request = SumRequest [id=bca23e97-3a6f-4e42-aff4-5ed5f7de2783, operand1=2, operand2=3]
pool-1-thread-4: Request = SumRequest [id=36d95b35-09f0-4e93-99e4-715ea7cb33c9, operand1=4, operand2=5]
pool-1-thread-3: Request = SumRequest [id=31ccd137-349a-4b7a-93b1-e51f62c11ba9, operand1=3, operand2=4]
pool-1-thread-1: Request = SumRequest [id=4bfa782a-c695-4de6-9593-cbfd357c3535, operand1=1, operand2=2]
pool-1-thread-5: Request = SumRequest [id=c653f469-6a6f-45b6-99f2-ed58620fd144, operand1=5, operand2=6]
pool-1-thread-4 processing request .... SumRequest [id=36d95b35-09f0-4e93-99e4-715ea7cb33c9, operand1=4, operand2=5]
pool-1-thread-2 processing request .... SumRequest [id=bca23e97-3a6f-4e42-aff4-5ed5f7de2783, operand1=2, operand2=3]
pool-1-thread-1 processing request .... SumRequest [id=4bfa782a-c695-4de6-9593-cbfd357c3535, operand1=1, operand2=2]
pool-1-thread-3 processing request .... SumRequest [id=31ccd137-349a-4b7a-93b1-e51f62c11ba9, operand1=3, operand2=4]
pool-1-thread-5 processing request .... SumRequest [id=c653f469-6a6f-45b6-99f2-ed58620fd144, operand1=5, operand2=6]
main: Response = SumResponse [requestId=4bfa782a-c695-4de6-9593-cbfd357c3535, result=3]
main: Response = SumResponse [requestId=bca23e97-3a6f-4e42-aff4-5ed5f7de2783, result=5]
main: Response = SumResponse [requestId=31ccd137-349a-4b7a-93b1-e51f62c11ba9, result=7]
main: Response = SumResponse [requestId=36d95b35-09f0-4e93-99e4-715ea7cb33c9, result=9]
main: Response = SumResponse [requestId=c653f469-6a6f-45b6-99f2-ed58620fd144, result=11]

Labels: ,

0 Comments:

Post a Comment

Subscribe to Post Comments [Atom]

<< Home