-
Java 8 - 7. Callable과 Future공부일기/Java 8 2020. 12. 25. 16:18
Callable
-
Runnable과 유사하지만 return을 통해 작업의 결과를 받아볼 수 있다.
Future
-
비동기적인 작업의 현재 상태를 조회하거나 결과를 가져올 수 있다.
-
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html
결과를 가져오기 get()
- 블록킹 콜이다. 타임아웃(최대한으로 기다릴 시간)을 설정할 수 있다.
- 아래 예제 같은 경우 started! 출력 후 2초를 기다린다.
- 그 후 Hello를 꺼내고 출력한 뒤 End!를 출력, 그리고 쓰레드를 종료한다.
package Java8; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); Callable<String> hello = () -> { Thread.sleep(2000L); return "Hello"; }; Future<String> stringFuture = executorService.submit(hello); System.out.println("Started!"); System.out.println(stringFuture.get()); //블록킹 콜, 작업이 완료될 때 까지 기다린다. System.out.println("End!"); executorService.shutdown(); } }
작업 상태 확인하기 isDone()
-
완료 했으면 true 아니면 false를 리턴한다.
package Java8; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); Callable<String> hello = () -> { Thread.sleep(2000L); return "Hello"; }; Future<String> stringFuture = executorService.submit(hello); System.out.println(stringFuture.isDone()); System.out.println("Started!"); stringFuture.get(); System.out.println(stringFuture.isDone()); System.out.println("End!"); executorService.shutdown(); } }
작업을 완료하기 전엔 false를 반환했다가 작업이 끝나면 true를 반환하는 것을 볼 수 있다.
작업 취소하기 cancel()
-
취소 했으면 true 못했으면 false를 리턴한다.
-
parameter로 true를 전달하면 현재 진행중인 쓰레드를 interrupt하고 그러지 않으면 현재 진행중인 작업이 끝날때까지 기다린다.
- cancel 했을 경우 interrupt 안하고 cancel하더라도 값을 가져올 수 없다. 작업이 완료된 채 종료된게 아니기 때문이다.
package Java8; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); Callable<String> hello = () -> { Thread.sleep(2000L); return "Hello"; }; Future<String> stringFuture = executorService.submit(hello); System.out.println(stringFuture.isDone()); System.out.println("Started!"); stringFuture.cancel(false); System.out.println(stringFuture.isDone()); stringFuture.get(); System.out.println("End!"); executorService.shutdown(); } }
작업이 완료되지 않고 cancel에 의해 작업이 취소되어 종료된 것이기 때문에, 작업이 isDone()인 것은 true지만, get()을 통해 완료된 작업을 가져올 순 없다. 따라서 작업을 가져오려하면 CancellationException이 발생한다.
여러 작업 동시에 실행하기 invokeAll()
-
동시에 실행한 작업 중에 제일 오래 걸리는 작업 만큼 시간이 걸린다.
package Java8; import java.util.Arrays; import java.util.List; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); Callable<String> hello = () -> { Thread.sleep(1000); return "Hello"; }; Callable<String> merry = () -> { Thread.sleep(2000); return "merry"; }; Callable<String> xMas = () -> { Thread.sleep(3000L); return "X-mas"; }; // 한꺼번에 보내기 List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, merry, xMas)); for (Future<String> f : futures) { System.out.println(f.get()); } executorService.shutdown(); } }
여러 작업 중에 하나라도 먼저 응답이 오면 끝내기 invokeAny()
-
동시에 실행한 작업 중에 제일 짧게 걸리는 작업 만큼 시간이 걸린다.
-
블록킹 콜이다.
package Java8; import java.util.Arrays; import java.util.List; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); Callable<String> hello = () -> { Thread.sleep(1000); return "Hello"; }; Callable<String> merry = () -> { Thread.sleep(2000); return "merry"; }; Callable<String> xMas = () -> { Thread.sleep(3000L); return "X-mas"; }; // 먼저 처리 된 것 확인, 블럭킹 콜 String first = executorService.invokeAny(Arrays.asList(hello, merry, xMas)); System.out.println(first); executorService.shutdown(); } }
CompletableFuture
자바에서 비동기(Asynchronous) 프로그래밍을 가능케하는 인터페이스.
-
Future를 사용해서도 어느정도 가능했지만 하기 힘들 일들이 많았다.
Future로는 하기 어렵던 작업들
-
Future를 외부에서 완료 시킬 수 없다. 취소하거나, get()에 타임아웃을 설정할 수는 있다.
-
블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
-
여러 Future를 조합할 수 없다, 예) Event 정보 가져온 다음 Event에 참석하는 회원 목록 가져오기
-
예외 처리용 API를 제공하지 않는다.
CompletableFuture
-
Implements Future
-
Implements CompletionStage
- 외부에서 complete 시킬 수 있음. 예) 몇 초 이내에 응답이 안 오면 기본값으로 리턴해라.
package Java8; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future = new CompletableFuture<>(); future.complete("Hello"); System.out.println(future.get()); CompletableFuture<String> future2 = CompletableFuture.completedFuture("Hello"); System.out.println(future2.get()); } }
새로 객체를 만들어도 되고, static 메서드를 사용해 만들 수도 있다.
비동기로 작업 실행하기
-
runAsync() : 리턴 값이 없는 경우
package Java8; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("Hello : " + Thread.currentThread().getName()); }); // 여기까진 아무일 없음 future.get(); // get을 통해 가져오면 실행. } }
리턴 타입없이 void 형태의 CompletableFuture가 실행된다.
-
supplyAsync() : 리턴 값이 있는 경우
package Java8; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello : " + Thread.currentThread().getName()); return "Hello"; }); // 여기까진 아무일 없음 System.out.println(future.get()); // get을 할 때 작업이 실행됨. } }
-
원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다. (내부적으로 기본은 ForkJoinPool.commonPool())
package Java8; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(4); CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello : " + Thread.currentThread().getName()); return "Hello"; }, executorService).thenRun( () -> { System.out.println(Thread.currentThread().getName()); }); future.get(); executorService.shutdown(); } }
밑의 thenRun 코드와 동일하나 supplyAsync에 ExecutorService 변수를 하나 더 넣어주니 실행되는 쓰레드 이름이 변경된 것을 확인할 수 있다.
콜백 제공하기
-
thenApply(Function) : 리턴값을 받아서 다른 값으로 바꾸는 콜백
package Java8; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello : " + Thread.currentThread().getName()); return "Hello"; }).thenApply(s-> { System.out.println(Thread.currentThread().getName()); return s.toUpperCase(); }); System.out.println(future.get()); } }
리턴값을 받은 후 해당값을 함수처리하듯 처리한다.
-
thenAccept(Consumer) : 리턴값을 받지 않고, 다른 작업을 처리하는 콜백
package Java8; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello : " + Thread.currentThread().getName()); return "Hello"; }).thenAccept(s-> { System.out.println(Thread.currentThread().getName()); System.out.println(s.toUpperCase()); }); future.get(); } }
리턴 값을 받지 않고
-
thenRun(Runnable) : 리턴값 받지 않고 다른 작업을 처리하는 콜백. Runnable이 온다고 생각하면 됨. 결과 값을 참고하지 않고 바로 실행.
package Java8; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello : " + Thread.currentThread().getName()); return "Hello"; }).thenRun( () -> { System.out.println(Thread.currentThread().getName()); }); future.get(); } }
-
콜백 자체를 또 다른 쓰레드에서 실행할 수 있다.
package Java8; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(4); CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello : " + Thread.currentThread().getName()); return "Hello"; }, executorService).thenRun( () -> { System.out.println(Thread.currentThread().getName()); }); future.get(); executorService.shutdown(); } }
위에서 봤던 예제와 동일하다.
조합하기
-
thenCompose() : 두 작업이 서로 이어서 실행하도록 조합
package Java8; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello : " + Thread.currentThread().getName()); return "Hello"; }); // Future 간에 연관 관계가 있는 경우 CompletableFuture<String> future = hello.thenCompose(Main::getWorld); System.out.println(future.get()); } private static CompletableFuture<String> getWorld(String message) { return CompletableFuture.supplyAsync(() -> { System.out.println("World : " + Thread.currentThread().getName()); return message + " World"; }); } }
-
thenCombine() : 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행
package Java8; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello : " + Thread.currentThread().getName()); return "Hello"; }); CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println("World : " + Thread.currentThread().getName()); return "World"; }); CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> h + " " + w); System.out.println(future.get()); } }
위 예제와 결과는 같지만 실행과정은 다르다. thenCompose의 경우 hello future를 먼저 실행해 얻어진 String 값을 매개변수로 getWorld future를 실행하지만 thenCombine은 두 개를 각각 실행해 얻어지는 결과로 리턴 값을 만든다.
-
allOf() : 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
package Java8; import java.util.Arrays; import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello : " + Thread.currentThread().getName()); return "Hello"; }); CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println("World : " + Thread.currentThread().getName()); return "World"; }); List<CompletableFuture> futures = Arrays.asList(hello, world); CompletableFuture<Object> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) .thenApply(v -> { return futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); }); System.out.println(future.get()); } }
-
anyOf() : 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행
package Java8; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello : " + Thread.currentThread().getName()); return "Hello"; }); CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println("World : " + Thread.currentThread().getName()); return "World"; }); CompletableFuture<Void> future = CompletableFuture.anyOf(hello, world).thenAccept(System.out::println); future.get(); } }
랜덤하게 먼저 끝난 쓰레드가 표시된다.
예외처리
-
exeptionally(Function) :
package Java8; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { boolean throwError = true; CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { if (throwError) { throw new IllegalArgumentException(); } System.out.println("Hello : " + Thread.currentThread().getName()); return "Hello"; }).exceptionally(ex -> { return "Error!"; }); System.out.println(hello.get()); } }
-
handle(BiFunction) :
에러와 에러가 아닌 두 경우 모두 처리가 가능하다.
package Java8; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { boolean throwError = true; CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { if (throwError) { throw new IllegalArgumentException(); } System.out.println("Hello : " + Thread.currentThread().getName()); return "Hello"; }).handle((result, ex) -> { if (ex != null) { System.out.println(ex); return "ERROR!"; } return result; }); System.out.println(hello.get()); } }
package Java8; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { boolean throwError = false; CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { if (throwError) { throw new IllegalArgumentException(); } System.out.println("Hello : " + Thread.currentThread().getName()); return "Hello"; }).handle((result, ex) -> { if (ex != null) { System.out.println(ex); return "ERROR!"; } return result; }); System.out.println(hello.get()); } }
'공부일기 > Java 8' 카테고리의 다른 글
Java 항해일지 - 13. I/O (0) 2021.02.28 Java 8 - 8. 애노테이션의 변화 (0) 2020.12.26 Java 8 - 6. Concurrent 프로그래밍과 Executors (0) 2020.12.24 Java 8 - 5. Date & Time (0) 2020.12.23 Java 8 - 4. Optional (0) 2020.12.22 -