ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Java 8 - 7. Callable과 Future
    공부일기/Java 8 2020. 12. 25. 16:18

    Callable

    • Runnable과 유사하지만 return을 통해 작업의 결과를 받아볼 수 있다.

     

     

    Future

     

     

    결과를 가져오기 get()

    • 블록킹 콜이다. 타임아웃(최대한으로 기다릴 시간)을 설정할 수 있다.
    • 아래 예제 같은 경우 started! 출력 후 2초를 기다린다.
    • 그 후 Hello를 꺼내고 출력한 뒤 End!를 출력, 그리고 쓰레드를 종료한다.
    package Java8;
    
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
    
            Callable<String> hello = () -> {
                Thread.sleep(2000L);
                return "Hello";
            };
            Future<String> stringFuture = executorService.submit(hello);
    
            System.out.println("Started!");
    
            System.out.println(stringFuture.get()); //블록킹 콜, 작업이 완료될 때 까지 기다린다.
    
            System.out.println("End!");
            executorService.shutdown();
        }
    }
    

     

     

     

    작업 상태 확인하기 isDone()

    • 완료 했으면 true 아니면 false를 리턴한다.

    package Java8;
    
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
    
            Callable<String> hello = () -> {
                Thread.sleep(2000L);
                return "Hello";
            };
    
            Future<String> stringFuture = executorService.submit(hello);
            System.out.println(stringFuture.isDone());
            System.out.println("Started!");
    
            stringFuture.get();
    
            System.out.println(stringFuture.isDone());
            System.out.println("End!");
            executorService.shutdown();
        }
    }
    

     

    작업을 완료하기 전엔 false를 반환했다가 작업이 끝나면 true를 반환하는 것을 볼 수 있다.

     

     

     

    작업 취소하기 cancel()

    • 취소 했으면 true 못했으면 false를 리턴한다.

    • parameter로 true를 전달하면 현재 진행중인 쓰레드를 interrupt하고 그러지 않으면 현재 진행중인 작업이 끝날때까지 기다린다.

    • cancel 했을 경우 interrupt 안하고 cancel하더라도 값을 가져올 수 없다. 작업이 완료된 채 종료된게 아니기 때문이다.
    package Java8;
    
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
    
            Callable<String> hello = () -> {
                Thread.sleep(2000L);
                return "Hello";
            };
    
            Future<String> stringFuture = executorService.submit(hello);
            System.out.println(stringFuture.isDone());
            System.out.println("Started!");
    
            stringFuture.cancel(false);
            System.out.println(stringFuture.isDone());
    
            stringFuture.get();
    
    
            System.out.println("End!");
            executorService.shutdown();
        }
    }
    

     

     

    작업이 완료되지 않고 cancel에 의해 작업이 취소되어 종료된 것이기 때문에, 작업이 isDone()인 것은 true지만, get()을 통해 완료된 작업을 가져올 순 없다. 따라서 작업을 가져오려하면 CancellationException이 발생한다.

     

     

    여러 작업 동시에 실행하기 invokeAll()

    • 동시에 실행한 작업 중에 제일 오래 걸리는 작업 만큼 시간이 걸린다.

    package Java8;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
    
            Callable<String> hello = () -> {
                Thread.sleep(1000);
                return "Hello";
            };
    
            Callable<String> merry = () -> {
                Thread.sleep(2000);
                return "merry";
            };
    
            Callable<String> xMas = () -> {
                Thread.sleep(3000L);
                return "X-mas";
            };
            
            // 한꺼번에 보내기
            List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, merry, xMas));
            for (Future<String> f : futures) {
                System.out.println(f.get());
            }
    
            executorService.shutdown();
        }
    }
    

     

     

     

     

    여러 작업 중에 하나라도 먼저 응답이 오면 끝내기 invokeAny()

    • 동시에 실행한 작업 중에 제일 짧게 걸리는 작업 만큼 시간이 걸린다.

    • 블록킹 콜이다.

    package Java8;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
    
            Callable<String> hello = () -> {
                Thread.sleep(1000);
                return "Hello";
            };
    
            Callable<String> merry = () -> {
                Thread.sleep(2000);
                return "merry";
            };
    
            Callable<String> xMas = () -> {
                Thread.sleep(3000L);
                return "X-mas";
            };
    
            // 먼저 처리 된 것 확인, 블럭킹 콜
            String first = executorService.invokeAny(Arrays.asList(hello, merry, xMas));
            System.out.println(first);
            executorService.shutdown();
        }
    }
    

     

     

     

    CompletableFuture

    자바에서 비동기(Asynchronous) 프로그래밍을 가능케하는 인터페이스.

    • Future를 사용해서도 어느정도 가능했지만 하기 힘들 일들이 많았다.

     

     

     

    Future로는 하기 어렵던 작업들

    • Future를 외부에서 완료 시킬 수 없다. 취소하거나, get()에 타임아웃을 설정할 수는 있다.

    • 블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.

    • 여러 Future를 조합할 수 없다, 예) Event 정보 가져온 다음 Event에 참석하는 회원 목록 가져오기

    • 예외 처리용 API를 제공하지 않는다.

     

    CompletableFuture

    • Implements Future

    • Implements CompletionStage

    • 외부에서 complete 시킬 수 있음. 예) 몇 초 이내에 응답이 안 오면 기본값으로 리턴해라.
    package Java8;
    
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> future = new CompletableFuture<>();
            future.complete("Hello");
    
            System.out.println(future.get());
    
            CompletableFuture<String> future2 = CompletableFuture.completedFuture("Hello");
            System.out.println(future2.get());
        }
    }
    

    새로 객체를 만들어도 되고, static 메서드를 사용해 만들 수도 있다.

     

     

     

    비동기로 작업 실행하기

    • runAsync() : 리턴 값이 없는 경우

    package Java8;
    
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                System.out.println("Hello : " + Thread.currentThread().getName());
            });
            // 여기까진 아무일 없음
            future.get(); // get을 통해 가져오면 실행.
        }
    }
    

     

    리턴 타입없이 void 형태의 CompletableFuture가 실행된다.

     

     

    • supplyAsync() : 리턴 값이 있는 경우

    package Java8;
    
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("Hello : " + Thread.currentThread().getName());
                return "Hello";
            });
            // 여기까진 아무일 없음
            System.out.println(future.get()); // get을 할 때 작업이 실행됨.
        }
    }
    

     

     

    • 원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다. (내부적으로 기본은 ForkJoinPool.commonPool())

    package Java8;
    
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(4);
    
            CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("Hello : " + Thread.currentThread().getName());
                return "Hello";
            }, executorService).thenRun( () -> {
                System.out.println(Thread.currentThread().getName());
            });
    
            future.get();
            executorService.shutdown();
        }
    }

    밑의 thenRun 코드와 동일하나 supplyAsync에 ExecutorService 변수를 하나 더 넣어주니 실행되는 쓰레드 이름이 변경된 것을 확인할 수 있다.

     

     

    콜백 제공하기

    • thenApply(Function) : 리턴값을 받아서 다른 값으로 바꾸는 콜백

    package Java8;
    
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("Hello : " + Thread.currentThread().getName());
                return "Hello";
            }).thenApply(s-> {
                System.out.println(Thread.currentThread().getName());
                return s.toUpperCase();
            });
            System.out.println(future.get());
        }
    }

     

    리턴값을 받은 후 해당값을 함수처리하듯 처리한다.

     

     

     

    • thenAccept(Consumer) : 리턴값을 받지 않고, 다른 작업을 처리하는 콜백

    package Java8;
    
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("Hello : " + Thread.currentThread().getName());
                return "Hello";
            }).thenAccept(s-> {
                System.out.println(Thread.currentThread().getName());
                System.out.println(s.toUpperCase());
            });
    
            future.get();
        }
    }

    리턴 값을 받지 않고 

     

     

    • thenRun(Runnable) : 리턴값 받지 않고 다른 작업을 처리하는 콜백. Runnable이 온다고 생각하면 됨. 결과 값을 참고하지 않고 바로 실행.

    package Java8;
    
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("Hello : " + Thread.currentThread().getName());
                return "Hello";
            }).thenRun( () -> {
                System.out.println(Thread.currentThread().getName());
            });
    
            future.get();
        }
    }

     

     

     

     

    • 콜백 자체를 또 다른 쓰레드에서 실행할 수 있다.

    package Java8;
    
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(4);
    
            CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("Hello : " + Thread.currentThread().getName());
                return "Hello";
            }, executorService).thenRun( () -> {
                System.out.println(Thread.currentThread().getName());
            });
    
            future.get();
            executorService.shutdown();
        }
    }

     

    위에서 봤던 예제와 동일하다.

     

     

     

     

    조합하기

    • thenCompose() : 두 작업이 서로 이어서 실행하도록 조합

    package Java8;
    
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
                System.out.println("Hello : " + Thread.currentThread().getName());
                return "Hello";
            });
            
            // Future 간에 연관 관계가 있는 경우
            CompletableFuture<String> future = hello.thenCompose(Main::getWorld);
            System.out.println(future.get());
        }
    
        private static CompletableFuture<String> getWorld(String message) {
            return CompletableFuture.supplyAsync(() -> {
                System.out.println("World : " + Thread.currentThread().getName());
                return message + " World";
            });
        }
    }

     

     

     

    • thenCombine() : 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행

    package Java8;
    
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
                System.out.println("Hello : " + Thread.currentThread().getName());
                return "Hello";
            });
    
            CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
                System.out.println("World : " + Thread.currentThread().getName());
                return "World";
            });
    
            CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> h + " " + w);
            System.out.println(future.get());
        }
    }

    위 예제와 결과는 같지만 실행과정은 다르다. thenCompose의 경우 hello future를 먼저 실행해 얻어진 String 값을 매개변수로 getWorld future를 실행하지만 thenCombine은 두 개를 각각 실행해 얻어지는 결과로 리턴 값을 만든다. 

     

     

     

    • allOf() : 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행

    package Java8;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.*;
    import java.util.stream.Collectors;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
                System.out.println("Hello : " + Thread.currentThread().getName());
                return "Hello";
            });
    
            CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
                System.out.println("World : " + Thread.currentThread().getName());
                return "World";
            });
    
            List<CompletableFuture> futures = Arrays.asList(hello, world);
    
            CompletableFuture<Object> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                    .thenApply(v -> {
                        return futures.stream()
                                .map(CompletableFuture::join)
                                .collect(Collectors.toList());
                    });
            System.out.println(future.get());
        }
    }

     

     

     

    • anyOf() : 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행

    package Java8;
    
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
                System.out.println("Hello : " + Thread.currentThread().getName());
                return "Hello";
            });
    
            CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
                System.out.println("World : " + Thread.currentThread().getName());
                return "World";
            });
    
            CompletableFuture<Void> future = CompletableFuture.anyOf(hello, world).thenAccept(System.out::println);
            future.get();
        }
    }

    랜덤하게 먼저 끝난 쓰레드가 표시된다.

     

     

     

    예외처리

    • exeptionally(Function) :

    package Java8;
    
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            boolean throwError = true;
    
            CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
                if (throwError) {
                    throw new IllegalArgumentException();
                }
    
                System.out.println("Hello : " + Thread.currentThread().getName());
                return "Hello";
            }).exceptionally(ex -> {
                return "Error!";
            });
    
            System.out.println(hello.get());
        }
    }

     

     

     

    • handle(BiFunction)

    에러와 에러가 아닌 두 경우 모두 처리가 가능하다. 

    package Java8;
    
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            boolean throwError = true;
    
            CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
                if (throwError) {
                    throw new IllegalArgumentException();
                }
    
                System.out.println("Hello : " + Thread.currentThread().getName());
                return "Hello";
            }).handle((result, ex) -> {
                if (ex != null) {
                    System.out.println(ex);
                    return "ERROR!";
                }
                return result;
            });
    
            System.out.println(hello.get());
        }
    }

    에러인 경우의 출력

     

    package Java8;
    
    import java.util.concurrent.*;
    
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            boolean throwError = false;
    
            CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
                if (throwError) {
                    throw new IllegalArgumentException();
                }
    
                System.out.println("Hello : " + Thread.currentThread().getName());
                return "Hello";
            }).handle((result, ex) -> {
                if (ex != null) {
                    System.out.println(ex);
                    return "ERROR!";
                }
                return result;
            });
    
            System.out.println(hello.get());
        }
    }

    에러가 아닌 경우 정상적으로 Hello가 출력된다.

     

     

     

     

    '공부일기 > Java 8' 카테고리의 다른 글

    Java 항해일지 - 13. I/O  (0) 2021.02.28
    Java 8 - 8. 애노테이션의 변화  (0) 2020.12.26
    Java 8 - 6. Concurrent 프로그래밍과 Executors  (0) 2020.12.24
    Java 8 - 5. Date & Time  (0) 2020.12.23
    Java 8 - 4. Optional  (0) 2020.12.22
Designed by Tistory.