공부일기/Java 8

Java 8 - 7. Callable과 Future

Youngbin Kim 2020. 12. 25. 16:18

Callable

  • Runnable과 유사하지만 return을 통해 작업의 결과를 받아볼 수 있다.

 

 

Future

 

 

결과를 가져오기 get()

  • 블록킹 콜이다. 타임아웃(최대한으로 기다릴 시간)을 설정할 수 있다.
  • 아래 예제 같은 경우 started! 출력 후 2초를 기다린다.
  • 그 후 Hello를 꺼내고 출력한 뒤 End!를 출력, 그리고 쓰레드를 종료한다.
package Java8;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> hello = () -> {
            Thread.sleep(2000L);
            return "Hello";
        };
        Future<String> stringFuture = executorService.submit(hello);

        System.out.println("Started!");

        System.out.println(stringFuture.get()); //블록킹 콜, 작업이 완료될 때 까지 기다린다.

        System.out.println("End!");
        executorService.shutdown();
    }
}

 

 

 

작업 상태 확인하기 isDone()

  • 완료 했으면 true 아니면 false를 리턴한다.

package Java8;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> hello = () -> {
            Thread.sleep(2000L);
            return "Hello";
        };

        Future<String> stringFuture = executorService.submit(hello);
        System.out.println(stringFuture.isDone());
        System.out.println("Started!");

        stringFuture.get();

        System.out.println(stringFuture.isDone());
        System.out.println("End!");
        executorService.shutdown();
    }
}

 

작업을 완료하기 전엔 false를 반환했다가 작업이 끝나면 true를 반환하는 것을 볼 수 있다.

 

 

 

작업 취소하기 cancel()

  • 취소 했으면 true 못했으면 false를 리턴한다.

  • parameter로 true를 전달하면 현재 진행중인 쓰레드를 interrupt하고 그러지 않으면 현재 진행중인 작업이 끝날때까지 기다린다.

  • cancel 했을 경우 interrupt 안하고 cancel하더라도 값을 가져올 수 없다. 작업이 완료된 채 종료된게 아니기 때문이다.
package Java8;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> hello = () -> {
            Thread.sleep(2000L);
            return "Hello";
        };

        Future<String> stringFuture = executorService.submit(hello);
        System.out.println(stringFuture.isDone());
        System.out.println("Started!");

        stringFuture.cancel(false);
        System.out.println(stringFuture.isDone());

        stringFuture.get();


        System.out.println("End!");
        executorService.shutdown();
    }
}

 

 

작업이 완료되지 않고 cancel에 의해 작업이 취소되어 종료된 것이기 때문에, 작업이 isDone()인 것은 true지만, get()을 통해 완료된 작업을 가져올 순 없다. 따라서 작업을 가져오려하면 CancellationException이 발생한다.

 

 

여러 작업 동시에 실행하기 invokeAll()

  • 동시에 실행한 작업 중에 제일 오래 걸리는 작업 만큼 시간이 걸린다.

package Java8;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> hello = () -> {
            Thread.sleep(1000);
            return "Hello";
        };

        Callable<String> merry = () -> {
            Thread.sleep(2000);
            return "merry";
        };

        Callable<String> xMas = () -> {
            Thread.sleep(3000L);
            return "X-mas";
        };
        
        // 한꺼번에 보내기
        List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, merry, xMas));
        for (Future<String> f : futures) {
            System.out.println(f.get());
        }

        executorService.shutdown();
    }
}

 

 

 

 

여러 작업 중에 하나라도 먼저 응답이 오면 끝내기 invokeAny()

  • 동시에 실행한 작업 중에 제일 짧게 걸리는 작업 만큼 시간이 걸린다.

  • 블록킹 콜이다.

package Java8;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> hello = () -> {
            Thread.sleep(1000);
            return "Hello";
        };

        Callable<String> merry = () -> {
            Thread.sleep(2000);
            return "merry";
        };

        Callable<String> xMas = () -> {
            Thread.sleep(3000L);
            return "X-mas";
        };

        // 먼저 처리 된 것 확인, 블럭킹 콜
        String first = executorService.invokeAny(Arrays.asList(hello, merry, xMas));
        System.out.println(first);
        executorService.shutdown();
    }
}

 

 

 

CompletableFuture

자바에서 비동기(Asynchronous) 프로그래밍을 가능케하는 인터페이스.

  • Future를 사용해서도 어느정도 가능했지만 하기 힘들 일들이 많았다.

 

 

 

Future로는 하기 어렵던 작업들

  • Future를 외부에서 완료 시킬 수 없다. 취소하거나, get()에 타임아웃을 설정할 수는 있다.

  • 블로킹 코드(get())를 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.

  • 여러 Future를 조합할 수 없다, 예) Event 정보 가져온 다음 Event에 참석하는 회원 목록 가져오기

  • 예외 처리용 API를 제공하지 않는다.

 

CompletableFuture

  • Implements Future

  • Implements CompletionStage

  • 외부에서 complete 시킬 수 있음. 예) 몇 초 이내에 응답이 안 오면 기본값으로 리턴해라.
package Java8;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = new CompletableFuture<>();
        future.complete("Hello");

        System.out.println(future.get());

        CompletableFuture<String> future2 = CompletableFuture.completedFuture("Hello");
        System.out.println(future2.get());
    }
}

새로 객체를 만들어도 되고, static 메서드를 사용해 만들 수도 있다.

 

 

 

비동기로 작업 실행하기

  • runAsync() : 리턴 값이 없는 경우

package Java8;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
        });
        // 여기까진 아무일 없음
        future.get(); // get을 통해 가져오면 실행.
    }
}

 

리턴 타입없이 void 형태의 CompletableFuture가 실행된다.

 

 

  • supplyAsync() : 리턴 값이 있는 경우

package Java8;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        });
        // 여기까진 아무일 없음
        System.out.println(future.get()); // get을 할 때 작업이 실행됨.
    }
}

 

 

  • 원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다. (내부적으로 기본은 ForkJoinPool.commonPool())

package Java8;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        }, executorService).thenRun( () -> {
            System.out.println(Thread.currentThread().getName());
        });

        future.get();
        executorService.shutdown();
    }
}

밑의 thenRun 코드와 동일하나 supplyAsync에 ExecutorService 변수를 하나 더 넣어주니 실행되는 쓰레드 이름이 변경된 것을 확인할 수 있다.

 

 

콜백 제공하기

  • thenApply(Function) : 리턴값을 받아서 다른 값으로 바꾸는 콜백

package Java8;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        }).thenApply(s-> {
            System.out.println(Thread.currentThread().getName());
            return s.toUpperCase();
        });
        System.out.println(future.get());
    }
}

 

리턴값을 받은 후 해당값을 함수처리하듯 처리한다.

 

 

 

  • thenAccept(Consumer) : 리턴값을 받지 않고, 다른 작업을 처리하는 콜백

package Java8;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        }).thenAccept(s-> {
            System.out.println(Thread.currentThread().getName());
            System.out.println(s.toUpperCase());
        });

        future.get();
    }
}

리턴 값을 받지 않고 

 

 

  • thenRun(Runnable) : 리턴값 받지 않고 다른 작업을 처리하는 콜백. Runnable이 온다고 생각하면 됨. 결과 값을 참고하지 않고 바로 실행.

package Java8;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        }).thenRun( () -> {
            System.out.println(Thread.currentThread().getName());
        });

        future.get();
    }
}

 

 

 

 

  • 콜백 자체를 또 다른 쓰레드에서 실행할 수 있다.

package Java8;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        }, executorService).thenRun( () -> {
            System.out.println(Thread.currentThread().getName());
        });

        future.get();
        executorService.shutdown();
    }
}

 

위에서 봤던 예제와 동일하다.

 

 

 

 

조합하기

  • thenCompose() : 두 작업이 서로 이어서 실행하도록 조합

package Java8;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        });
        
        // Future 간에 연관 관계가 있는 경우
        CompletableFuture<String> future = hello.thenCompose(Main::getWorld);
        System.out.println(future.get());
    }

    private static CompletableFuture<String> getWorld(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World : " + Thread.currentThread().getName());
            return message + " World";
        });
    }
}

 

 

 

  • thenCombine() : 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행

package Java8;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("World : " + Thread.currentThread().getName());
            return "World";
        });

        CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> h + " " + w);
        System.out.println(future.get());
    }
}

위 예제와 결과는 같지만 실행과정은 다르다. thenCompose의 경우 hello future를 먼저 실행해 얻어진 String 값을 매개변수로 getWorld future를 실행하지만 thenCombine은 두 개를 각각 실행해 얻어지는 결과로 리턴 값을 만든다. 

 

 

 

  • allOf() : 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행

package Java8;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("World : " + Thread.currentThread().getName());
            return "World";
        });

        List<CompletableFuture> futures = Arrays.asList(hello, world);

        CompletableFuture<Object> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                .thenApply(v -> {
                    return futures.stream()
                            .map(CompletableFuture::join)
                            .collect(Collectors.toList());
                });
        System.out.println(future.get());
    }
}

 

 

 

  • anyOf() : 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행

package Java8;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("World : " + Thread.currentThread().getName());
            return "World";
        });

        CompletableFuture<Void> future = CompletableFuture.anyOf(hello, world).thenAccept(System.out::println);
        future.get();
    }
}

랜덤하게 먼저 끝난 쓰레드가 표시된다.

 

 

 

예외처리

  • exeptionally(Function) :

package Java8;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        boolean throwError = true;

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            if (throwError) {
                throw new IllegalArgumentException();
            }

            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        }).exceptionally(ex -> {
            return "Error!";
        });

        System.out.println(hello.get());
    }
}

 

 

 

  • handle(BiFunction)

에러와 에러가 아닌 두 경우 모두 처리가 가능하다. 

package Java8;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        boolean throwError = true;

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            if (throwError) {
                throw new IllegalArgumentException();
            }

            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        }).handle((result, ex) -> {
            if (ex != null) {
                System.out.println(ex);
                return "ERROR!";
            }
            return result;
        });

        System.out.println(hello.get());
    }
}

에러인 경우의 출력

 

package Java8;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        boolean throwError = false;

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            if (throwError) {
                throw new IllegalArgumentException();
            }

            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        }).handle((result, ex) -> {
            if (ex != null) {
                System.out.println(ex);
                return "ERROR!";
            }
            return result;
        });

        System.out.println(hello.get());
    }
}

에러가 아닌 경우 정상적으로 Hello가 출력된다.