google-code-prettify

2015年1月23日 星期五

簡易限制同時多執行緒數量Simple Limited Concurrent Multi-thread

因為工作常需要處理較大數量的資料,
使用單執行緒耗時較久,
所以就想寫一個多執行緒的程序來做,
但不能無限制的往Server丟,
Server恐怕也會吃不消,
所以必須控制同時多執行緒執行的數量。
另外,也為了要考量到原本已寫好的Service必須可以直接使用,
套用多執行緒後不用改Code,
保持需要時,原本只有主執行緒運算的狀況下仍可進行,
所以就利用時間寫了以下這樣一個簡易可以指定同時間多執行緒數量的程式,
並且這個控制同時執行緒數量的框架架構可以在後來其他新的程式工作直接套用即可,
不需再寫產生多執行緒、控制Concurrent Multi-thread的程式。
 

public interface BatchService {
 public Collection<!----> retrieveSource() throws Exception;
 public void doJob( Object obj ) throws Exception;
}


 
public interface Job extends Runnable {
 public void setData(Object obj);
 public void setService(BatchService service);
}

 
public class JobImpl implements Job {

 //private static Logger log = Logger.getLogger(JobImpl.class);
 
 private Object data;
 private BatchService service;
 
 @Override
 public void run() {
  Thread.currentThread().setName(Thread.currentThread().getId()+"");
  if (data == null || service == null) return;
  
  try {
   service.doJob(data);
   this.setData(null);
   this.setService(null);
   
   
  } catch (Exception e) {
   e.printStackTrace();
   //System.out.println(e.getMessage());
   //log.error(e.getMessage());
   
  }finally{
   BatchInit.infoJobFinishCount(this);
  }
 }

 @Override
 public void setData(Object obj) {
  data = obj;
  
 }

 @Override
 public void setService(BatchService service) {
  this.service = service;
 }

}

 

public class BatchInit {
 //private static Logger log = LogManager.getLogger("BatchInit");
 public static Stack<job> jobStack;

 private static int JobFinishCount = 0;

 
 public void batchJob(BatchService batchService, Stack<job> jobs) throws Exception{
  List<!----> source = (List<!---->)batchService.retrieveSource() ;
  int JobAmount = source.size();
  jobStack = jobs;
  int JobNow = 0;
  //log.info("JobAmount::"+JobAmount);
  
  while(true){
   if (JobAmount==JobFinishCount){
    //System.out.println("做完了::"+JobFinishCount);
    //log.info("做完了::"+JobFinishCount);
    break;
   }
   Job job = null;
   try{
    job = jobStack.pop();
    
   }catch(java.util.EmptyStackException ex){
  
   }catch(Exception ex){
    ex.printStackTrace();
    //log.error(ex.getMessage());
   }
   
   //log.debug("#######job ::"+job);
   if (job != null && JobNow < JobAmount ){
    loga(JobFinishCount, "");
    Object obj = source.get(JobNow);
    ++JobNow;
    job.setData(obj);
    job.setService(batchService);
    Thread t = new Thread(job, job.toString());
    t.start();
    
   }
   
  }
  
 }

 public synchronized static void infoJobFinishCount( Job job){
  ++JobFinishCount;
  jobStack.push(job);
  
 }
 
 private void loga(int i, String message){
  if (i % 500 == 0){
   System.out.println("--doing :" +i+", "+message);
   //log.info("--doing :" +i+", "+message);

  }
 }
}


上面的程式是控制同一時間執行緒的最大數量,之後不用在改(除非有Bug,或為了提升效率),
以後新的需要多執行緒進行的程式可以直接引用。
而下面的兩隻程式必須要寫,
一個是實作每一個Batch Job Service的實際工作必須implements BatchService這個interface,
另一個則是啟動程式和同一時間處理Job數量(執行緒數量)設定
 

public class MyBatchJobService implements BatchService {

 @Override
 public Collection<!----> retrieveSource() throws Exception {
  //TODO write your code
  ArrayList<String> source = new ArrayList<String>();
  source.add("Hello ");
  source.add("Ryan! ");
  source.add("How ");
  source.add("are ");
  source.add("you? ");
  
  for(int i = 0 ; i < 10 ; i++){
   source.add(""+i);
  }
  return source;
 }

 @Override
 public void doJob(Object obj) throws Exception {
  //TODO write your code 
  String data = (String) obj;
  System.out.println("ThreadId:"+ Thread.currentThread().getId() +"::"+data);
 }

}


執行BatchStart啟動程式。
 
public class BatchStart {

 private static BatchInit batchInit = new BatchInit();
 
 public static void main(String[] args) throws Exception {
  BatchService service = new MyBatchJobService();
  batchJob(service, 2);
  
 }

 private static void batchJob(BatchService service, int amountThread)
   throws Exception {
  Stack jobs = new Stack();
  
  for(int i =0; i < amountThread; i++){  //TODO Setting the amount of concurrent thread
   jobs.push(new JobImpl());
   
  }
  
  batchInit.batchJob(service, jobs);
 }
}

開放原始碼下載:GitHub

沒有留言:

張貼留言