Java 8 - 7. Callable과 Future
Callable
-
Runnable과 유사하지만 return을 통해 작업의 결과를 받아볼 수 있다.
Future
-
비동기적인 작업의 현재 상태를 조회하거나 결과를 가져올 수 있다.
-
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html
결과를 가져오기 get()
- 블록킹 콜이다. 타임아웃(최대한으로 기다릴 시간)을 설정할 수 있다.
- 아래 예제 같은 경우 started! 출력 후 2초를 기다린다.
- 그 후 Hello를 꺼내고 출력한 뒤 End!를 출력, 그리고 쓰레드를 종료한다.
package Java8;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () -> {
Thread.sleep(2000L);
return "Hello";
};
Future<String> stringFuture = executorService.submit(hello);
System.out.println("Started!");
System.out.println(stringFuture.get()); //블록킹 콜, 작업이 완료될 때 까지 기다린다.
System.out.println("End!");
executorService.shutdown();
}
}
작업 상태 확인하기 isDone()
-
완료 했으면 true 아니면 false를 리턴한다.
package Java8;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () -> {
Thread.sleep(2000L);
return "Hello";
};
Future<String> stringFuture = executorService.submit(hello);
System.out.println(stringFuture.isDone());
System.out.println("Started!");
stringFuture.get();
System.out.println(stringFuture.isDone());
System.out.println("End!");
executorService.shutdown();
}
}
작업을 완료하기 전엔 false를 반환했다가 작업이 끝나면 true를 반환하는 것을 볼 수 있다.
작업 취소하기 cancel()
-
취소 했으면 true 못했으면 false를 리턴한다.
-
parameter로 true를 전달하면 현재 진행중인 쓰레드를 interrupt하고 그러지 않으면 현재 진행중인 작업이 끝날때까지 기다린다.
- cancel 했을 경우 interrupt 안하고 cancel하더라도 값을 가져올 수 없다. 작업이 완료된 채 종료된게 아니기 때문이다.
package Java8;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () -> {
Thread.sleep(2000L);
return "Hello";
};
Future<String> stringFuture = executorService.submit(hello);
System.out.println(stringFuture.isDone());
System.out.println("Started!");
stringFuture.cancel(false);
System.out.println(stringFuture.isDone());
stringFuture.get();
System.out.println("End!");
executorService.shutdown();
}
}
작업이 완료되지 않고 cancel에 의해 작업이 취소되어 종료된 것이기 때문에, 작업이 isDone()인 것은 true지만, get()을 통해 완료된 작업을 가져올 순 없다. 따라서 작업을 가져오려하면 CancellationException이 발생한다.
여러 작업 동시에 실행하기 invokeAll()
-
동시에 실행한 작업 중에 제일 오래 걸리는 작업 만큼 시간이 걸린다.
package Java8;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () -> {
Thread.sleep(1000);
return "Hello";
};
Callable<String> merry = () -> {
Thread.sleep(2000);
return "merry";
};
Callable<String> xMas = () -> {
Thread.sleep(3000L);
return "X-mas";
};
// 한꺼번에 보내기
List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, merry, xMas));
for (Future<String> f : futures) {
System.out.println(f.get());
}
executorService.shutdown();
}
}
여러 작업 중에 하나라도 먼저 응답이 오면 끝내기 invokeAny()
-
동시에 실행한 작업 중에 제일 짧게 걸리는 작업 만큼 시간이 걸린다.
-
블록킹 콜이다.
package Java8;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () -> {
Thread.sleep(1000);
return "Hello";
};
Callable<String> merry = () -> {
Thread.sleep(2000);
return "merry";
};
Callable<String> xMas = () -> {
Thread.sleep(3000L);
return "X-mas";
};
// 먼저 처리 된 것 확인, 블럭킹 콜
String first = executorService.invokeAny(Arrays.asList(hello, merry, xMas));
System.out.println(first);
executorService.shutdown();
}
}
CompletableFuture
자바에서 비동기(Asynchronous) 프로그래밍을 가능케하는 인터페이스.
-
Future를 사용해서도 어느정도 가능했지만 하기 힘들 일들이 많았다.
Future로는 하기 어렵던 작업들
-
Future를 외부에서 완료 시킬 수 없다. 취소하거나, get()에 타임아웃을 설정할 수는 있다.
-
블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
-
여러 Future를 조합할 수 없다, 예) Event 정보 가져온 다음 Event에 참석하는 회원 목록 가져오기
-
예외 처리용 API를 제공하지 않는다.
CompletableFuture
-
Implements Future
-
Implements CompletionStage
- 외부에서 complete 시킬 수 있음. 예) 몇 초 이내에 응답이 안 오면 기본값으로 리턴해라.
package Java8;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("Hello");
System.out.println(future.get());
CompletableFuture<String> future2 = CompletableFuture.completedFuture("Hello");
System.out.println(future2.get());
}
}
새로 객체를 만들어도 되고, static 메서드를 사용해 만들 수도 있다.
비동기로 작업 실행하기
-
runAsync() : 리턴 값이 없는 경우
package Java8;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
});
// 여기까진 아무일 없음
future.get(); // get을 통해 가져오면 실행.
}
}
리턴 타입없이 void 형태의 CompletableFuture가 실행된다.
-
supplyAsync() : 리턴 값이 있는 경우
package Java8;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
});
// 여기까진 아무일 없음
System.out.println(future.get()); // get을 할 때 작업이 실행됨.
}
}
-
원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다. (내부적으로 기본은 ForkJoinPool.commonPool())
package Java8;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}, executorService).thenRun( () -> {
System.out.println(Thread.currentThread().getName());
});
future.get();
executorService.shutdown();
}
}
콜백 제공하기
-
thenApply(Function) : 리턴값을 받아서 다른 값으로 바꾸는 콜백
package Java8;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}).thenApply(s-> {
System.out.println(Thread.currentThread().getName());
return s.toUpperCase();
});
System.out.println(future.get());
}
}
리턴값을 받은 후 해당값을 함수처리하듯 처리한다.
-
thenAccept(Consumer) : 리턴값을 받지 않고, 다른 작업을 처리하는 콜백
package Java8;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}).thenAccept(s-> {
System.out.println(Thread.currentThread().getName());
System.out.println(s.toUpperCase());
});
future.get();
}
}
리턴 값을 받지 않고
-
thenRun(Runnable) : 리턴값 받지 않고 다른 작업을 처리하는 콜백. Runnable이 온다고 생각하면 됨. 결과 값을 참고하지 않고 바로 실행.
package Java8;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}).thenRun( () -> {
System.out.println(Thread.currentThread().getName());
});
future.get();
}
}
-
콜백 자체를 또 다른 쓰레드에서 실행할 수 있다.
package Java8;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}, executorService).thenRun( () -> {
System.out.println(Thread.currentThread().getName());
});
future.get();
executorService.shutdown();
}
}
위에서 봤던 예제와 동일하다.
조합하기
-
thenCompose() : 두 작업이 서로 이어서 실행하도록 조합
package Java8;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
});
// Future 간에 연관 관계가 있는 경우
CompletableFuture<String> future = hello.thenCompose(Main::getWorld);
System.out.println(future.get());
}
private static CompletableFuture<String> getWorld(String message) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("World : " + Thread.currentThread().getName());
return message + " World";
});
}
}
-
thenCombine() : 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행
package Java8;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
System.out.println("World : " + Thread.currentThread().getName());
return "World";
});
CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> h + " " + w);
System.out.println(future.get());
}
}
-
allOf() : 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
package Java8;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
System.out.println("World : " + Thread.currentThread().getName());
return "World";
});
List<CompletableFuture> futures = Arrays.asList(hello, world);
CompletableFuture<Object> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(v -> {
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
});
System.out.println(future.get());
}
}
-
anyOf() : 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행
package Java8;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
System.out.println("World : " + Thread.currentThread().getName());
return "World";
});
CompletableFuture<Void> future = CompletableFuture.anyOf(hello, world).thenAccept(System.out::println);
future.get();
}
}
랜덤하게 먼저 끝난 쓰레드가 표시된다.
예외처리
-
exeptionally(Function) :
package Java8;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
boolean throwError = true;
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
if (throwError) {
throw new IllegalArgumentException();
}
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}).exceptionally(ex -> {
return "Error!";
});
System.out.println(hello.get());
}
}
-
handle(BiFunction) :
에러와 에러가 아닌 두 경우 모두 처리가 가능하다.
package Java8;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
boolean throwError = true;
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
if (throwError) {
throw new IllegalArgumentException();
}
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}).handle((result, ex) -> {
if (ex != null) {
System.out.println(ex);
return "ERROR!";
}
return result;
});
System.out.println(hello.get());
}
}
package Java8;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
boolean throwError = false;
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
if (throwError) {
throw new IllegalArgumentException();
}
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}).handle((result, ex) -> {
if (ex != null) {
System.out.println(ex);
return "ERROR!";
}
return result;
});
System.out.println(hello.get());
}
}