ExecutorService의 submit() 메소드는 매개값으로 준 Runnable 또는 Callable 작업을 스레드 풀의 작업 큐에 저장하고 즉시 Future 객체를 리턴

Future 객체 : 지연 완료 객체

작업 결과가 아니라 작업이 완료될 때까지 기다렸다가(블로킹 되었다가) 최종 결과를 얻는데 사용

 

get() 메소드

: Future의 get() 메소드를 호출하면 스레드가 작업을 완료할 때까지 블로킹 되었다가 작업을 완료하면 처리 결과를 리턴 (future.get())

 

① 리턴값이 없는 작업 완료 통보

: Runnable 객체로 생성

Runnable task = new Runnable() {
   @Override
   public void run() {
      //스레드가 처리할 작업 내용
   }
};
Future future = executorService.submit(task);
try {
   future.get();
} catch (InterruptedException e) {
   //작업 처리 도중 스레드가 interrupt 될 경우 실행할 코드
}

 

리턴값이 없는 작업 완료 통보

package sec09.exam02_blocking;

import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class NoResultExample {
	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(
				Runtime.getRuntime().availableProcessors() //CPU 성능에 맞게(코어 수 만큼) 최대 스레드수 설정 
				);
		System.out.println("[작업 처리 요청]");
		Runnable runnable = new Runnable() { //익명 객체
			@Override
			public void run() {
				int sum = 0;
				for(int i=1; i<=10; i++) {
					sum += i;
				}
				System.out.println("[처리 결과] " + sum);
			}			
		};
		
		Future future = executorService.submit(runnable);
		
		try {
			future.get(); //future.get() : 작업이 완료될 때 까지 블로킹(하위 코드 즉, 메인 메소드 진행하지 않고 대기) 되었다가 작업이 완료되면 리턴값 null 리턴 
			System.out.println("[작업 처리 완료");
		} catch(Exception e) {
			System.out.println("[실행 예외 발생함] " + e.getMessage());
		}
		
		executorService.shutdown();
	}
}
[작업 처리 요청]
[처리 결과] 55
[작업 처리 완료

 

② 리턴값이 있는 작업 완료 통보

: Callable 객체(제네릭 타입)로 생성

Callable<T> task = new Callable<T>() {
   @Override
   public T call() throws Exception {
      //스레드가 처리할 작업 내용
      return T;
   }
};
Future<T> future = executorService.submit(task);
try {
   T result = future.get();
} catch (InterruptedException e) {
   //작업 처리 도중 스레드가 interrupt 될 경우 실행할 코드
}

 

리턴값이 있는 작업 완료 통보

package sec09.exam02_blocking;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ResultByCallableExample {
	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(
				Runtime.getRuntime().availableProcessors()
				);
		System.out.println("[작업 처리 요청]");
		Callable<Integer> task = new Callable<Integer>() { //<T> : 리턴값의 타입
			@Override
			public Integer call() throws Exception { //public T call() - T : 리턴값의 타입
				int sum = 0;
				for(int i=1; i<=10; i++) {
					sum += i;
				}
				return sum; //call() 메소드는 리턴값이 필요함
			}
		};
		
		Future<Integer> future = executorService.submit(task);
		
		try {
			int sum = future.get();
			System.out.println("[처리 결과] " + sum);
			System.out.println("[작업 처리 완료");
		} catch(Exception e) {
			System.out.println("[실행 예외 발생함] " + e.getMessage());
		}
		
		executorService.shutdown();
	}
}
[작업 처리 요청]
[처리 결과] 55
[작업 처리 완료

 

③ 작업 처리 결과를 외부 객체에 저장

스레드가 작업한 결과를 외부 객체에 저장해야 하는 경우

ExecutorService의 submit(Runnable task, V result) 사용

Result result = ...;
Runnable task = new Task(result);
Future<Result> future = executorService.submic(task, result);
result = future.get();

작업 객체는 Runnable 구현 클래스로 생성

외부  Result 객체를 사용해야 하므로 생성자를 통해 Result 객체를 주입받도록 해야함

class Task implements Runnable {
   Result result;
   Task(Result result) {
      this.result = result;
   }
   @Override
   public void run() {
      //작업 코드
      //처리 결과를 result에 저장
   }
}

 

작업 처리 결과를 외부 객체에 저장

package sec09.exam02_blocking;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ResultByRunnableExample {
	public static void main(String[] args) {
		//스레드 풀 생성
		ExecutorService executorService = Executors.newFixedThreadPool(
				Runtime.getRuntime().availableProcessors()
		);
		
		System.out.println("[작업 처리 요청]");
		//작업 정의
		class Task implements Runnable {
			//외부 Result 객체를 필드에 저장
			Result result; //필드
			Task(Result result) { //생성자
				this.result  = result;
			}
			@Override
			public void run() {
				int sum = 0;
				for(int i=1; i<=10; i++) {
					sum += i;
				}
				result.addValue(sum); //Result 객체에 작업 결과 저장
			}
			
		}
		
		//공유 객체(외부 객체)
		Result result = new Result();
		//두개의 작업을 정의
		Runnable task1 = new Task(result);
		Runnable task2 = new Task(result);
		Future<Result> future1 = executorService.submit(task1, result);
		Future<Result> future2 = executorService.submit(task2, result);
		
		try {
			//두 가지 작업 결과를 취합(누적)
			result = future1.get();
			result = future2.get();
			System.out.println("[처리 결과] " + result.accumValue);
			System.out.println("[작업 처리 완료]");
		} catch (Exception e) {
			System.out.println("[실행 예외 발생함]" + e.getMessage());
		}
	}
}

//처리 결과를 저장하는 Result 클래스
class Result {
	int accumValue;
	synchronized void addValue(int value) { //동기화 메소드
		accumValue += value;
	}
}
[작업 처리 요청]
[처리 결과] 110
[작업 처리 완료]

 

④ 작업 완료 순으로 통보

작업 요청 순서대로 작업 처리가 완료되는 것은 아님

여러 개의 작업들이 순차적으로 처리될 필요성이 없고, 처리 결과도 순차적으로 이용할 필요가 없다면 작업 처리가 완료된 것부터 결과를 얻어 이용하면 됨

 

CompletionService의 poll(), take() 메소드 사용

ExecutorService executorService = Executors.newFixedThreadPool(
   Runtime.getRuntime().availableProcessor()
);

CompletionService<V> completionService = new ExecutorCompletionService<V>(executorService);

poll()과 take()를 이용해서 처리 완료된 작업의 Future를 얻으려면 CompletionService의 submit() 메소드로 작업 처리 요청을 해야함 (ExecutorService의 submit() 이 아님)

completionService.submit(Callable<V> task);
completionService.submit(Runnable task, V result);

 

작업 완료 순으로 통보받기

package sec09.exam02_blocking;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CompletionServiceExample {
	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(
				Runtime.getRuntime().availableProcessors()
		);
		//CompletionService 생성
		CompletionService<Integer> completionService = 
				new ExecutorCompletionService<Integer>(executorService);
		
		System.out.println("[작업 처리 요청]");
		for(int i=0; i<3; i++) {
			//스레드 풀 에게 작업 처리 요청
			completionService.submit(new Callable<Integer>() {
				@Override
				public Integer call() throws Exception {
					int sum = 0;
					for(int i=1; i<=10; i++) {
						sum += i;
					}
					return sum;
				}
			});
		}
		
		System.out.println("[처리 완료된 작업 확인]");
		//스레드풀의 스레드에서 실행하도록 함
		executorService.submit(new Runnable() {
			@Override
			public void run() {
				while(true) {
					try {
						Future<Integer> future = completionService.take(); //완료된 작업 가져오기
						int value = future.get();
						System.out.println("[처리 결과] " + value);
					} catch (Exception e) {
						break;
					}
				}
			}
		});
		
		//3초 후 스레드풀 종료 - 스레드가 작업할 시간을 줌
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {}
		
		executorService.shutdownNow();
	}
}
[작업 처리 요청]
[처리 완료된 작업 확인]
[처리 결과] 55
[처리 결과] 55
[처리 결과] 55

+ Recent posts