공부일기/Java 8

Java 8 - 6. Concurrent 프로그래밍과 Executors

Youngbin Kim 2020. 12. 24. 21:55

Concurrent 소프트웨어란?

  • 동시에 여러 작업을 할 수 있는 소프트웨어 예) 웹 브라우저로 유튜브를 보면서 키보드로 문서에 타이핑을 할 수 있다. 또는 화면녹화를 하면서 인텔리J로 코딩을 하고 워드에 적어둔 문서를 보거나 수정할 수 있다.

  • 위의 예시처럼 여러가지 프로그램을 사용하는 것도 맞고, 한 프로그램 내에서 여러가지 동시작업이 일어나는 것도 해당한다. 게임을 하는 경우 캐릭터가 이동하거나 시선을 바꿀 때 캐릭터를 이동시키면서 새로운 배경을 불러오는 것을 예로 들 수 있다.

 

 

자바에서 지원하는 컨커런트 프로그래밍

  • 멀티프로세싱 (ProcessBuilder)

  • 멀티쓰레드

 

 

자바 멀티쓰레드 프로그래밍

  • Thread / Runnable

 

 

Thread 상속

package Java8;

public class Main {
    public static void main(String[] args) {
        MyThread myThread = new MyThread();
        myThread.start();

        System.out.println("Hello: " + Thread.currentThread().getName());
    }

    static class MyThread extends Thread {
        public void run() {
            System.out.println("Thread: " + Thread.currentThread().getName());
        }
    }
}

 

 

Runnable 구현 또는 람다

자바 8부터는 람다를 표현해 보기 좋게 나타낼 수 있으므로 람다를 통해 구현했다.

package Java8;

public class Main {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            System.out.println("Thread: " + Thread.currentThread().getName());
        });
        thread.start();

        System.out.println("Hello: " + Thread.currentThread().getName());
    }
}

새로 만든 thread를 먼저 실행시켰지만, 메인 쓰레드가 먼저 출력된다. 실행순서는 랜덤이다.

 

 

 

쓰레드 주요 기능

 

  • sleep(현재 쓰레드 멈춰두기) : 다른 쓰레드가 처리할 수 있도록 우선권을 넘기고 기회를 주지만 그렇다고 락을 놔주진 않는다. (잘못하면 데드락 걸릴 수 있다.)

package Java8;

public class Main {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                // 자는 동안 이 쓰레드를 깨우면 InterruptedException 발생
                e.printStackTrace();
            }
            System.out.println("Thread: " + Thread.currentThread().getName());

        });
        thread.start();

        System.out.println("Hello: " + Thread.currentThread().getName());
    }
}

메인 쓰레드가 실행되고 1초후에 다음 쓰레드가 실행된다.

thread를 만들자마자 sleep으로 1초간 멈춰놨기 때문에 1초 후에 쓰레드가 실행된다. 

 

 

 

 

  • interupt (다른 쓰레드 깨우기) : 다른 쓰레드를 깨워서 interruptedExeption을 발생 시킨다. 그 에러가 발생했을 때 할 일은 코딩하기 나름. 종료 시킬 수도 있고 계속 하던 일 할 수도 있고.

 

package Java8;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            while (true) {
                System.out.println("Thread: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000l);
                } catch (InterruptedException e) {
                    System.out.println("exit!");
                    return; // inturrupt 발생 시 종료 하기로 한다.
                }
            }
        });
        thread.start();

        System.out.println("Hello: " + Thread.currentThread().getName());
        Thread.sleep(3000L);
        thread.interrupt();
    }
}

 

메인과 새로 만든 쓰레드 두 개가 랜덤의 순서로 실행되고, 3초동안 쓰레드가 실행됐다 멈췄다를 반복한다. 그 후 interrupt를 받아 exit!을 출력하고 return을 통해 종료한다.

 

 

 

  • join(다른 쓰레드 기다리기) : 다른 쓰레드가 끝날 때까지 기다린다.

package Java8;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            System.out.println("Thread: " + Thread.currentThread().getName());
            try {
                Thread.sleep(3000l);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        });
        thread.start();

        System.out.println("Hello: " + Thread.currentThread().getName());
        thread.join(); // 3초간 기다린 후 다음 실행
        System.out.println(thread + "is finished");
    }
}

 

두 개의 쓰레드가 실행되고, thread는 3초동안 sleep하게 된다. join()을 통해 해당 thread를 기다리고, 그 다음 is finished를 출력한다. join도 마찬가지로 기다리는 동안 interrupt가 들어오게 되면 interruptedException을 발생시키기 때문에 사실상 프로그래머가 모든 쓰레드를 직접 관리한다는 것은 불가능에 가깝다.

 

 

 

 

Executors

 

고수준 (High-Level) Concurrency 프로그래밍

  • 쓰레드를 만들고 관리하는 작업을 애플리케이션에서 분리.

  • 그런 기능을 Executors에게 위임.

 

 

Executors가 하는 일

  • 쓰레드 만들기: 애플리케이션이 사용할 쓰레드 풀을 만들어 관리한다.

  • 쓰레드 관리: 쓰레드 생명 주기를 관리한다.

  • 작업 처리 및 실행: 쓰레드로 실행할 작업을 제공할 수 있는 API를 제공한다.

 

 

주요 인터페이스

  • Executor: execute(Runnable)

  • ExecutorService: Executor 상속 받은 인터페이스로, Callable도 실행할 수 있으며, Executor를 종료 시키거나, 여러 Callable을 동시에 실행하는 등의 기능을 제공한다.

  • ScheduledExecutorService: ExecutorService를 상속 받은 인터페이스로 특정 시간 이후에 또는 주기적으로 작업을 실행할 수 있다.

 

 

ExecutorService로 작업 실행하기

package Java8;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(() -> System.out.println("Thread: " + Thread.currentThread().getName()));
    }
}

쓰레드가 계속 작동하고 있다.

submit을 통해 작업을 실행하면 작업이 완료된 후 쓰레드를 종료하지 않고 계속 켜두기 때문에 종료해주는 명령을 같이 사용해줘야 한다.

 

 

ExecutorService로 멈추기

  • executorService.shutdown(); // 처리중인 작업 기다렸다가 종료
package Java8;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(() -> System.out.println("Thread: " + Thread.currentThread().getName()));

        executorService.shutdown(); // 현재 진행중인 작업을 끝까지 마치고 끝냄.
    }
}

작업이 완료되고 종료됐음을 확인할 수 있다.

 

 

  • executorService.shutdownNow(); // 당장 종료
package Java8;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(() -> System.out.println("Thread: " + Thread.currentThread().getName()));

        executorService.shutdownNow(); // 바로 종료
    }
}

작업 중에도 강제로 종료할 수 있다. 위에서는 크게 작업중인 게 없었기 때문에 출력결과는 동일하게 나타났다.

 

 

 

ExecutorService의 threadpool 사용하기

package Java8;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(getRunnable("Hello"));
        executorService.submit(getRunnable("World"));
        executorService.submit(getRunnable("Have a Nice Day"));
        executorService.submit(getRunnable("and"));
        executorService.submit(getRunnable("Merry Christmas!"));


        executorService.shutdown(); // 현재 진행중인 작업을 끝까지 마치고 끝냄.
    }

    private static Runnable getRunnable(String message) {
        return () -> {
            System.out.println(message + Thread.currentThread().getName());
        };
    }
}

위와 같이 2개의 thread를 지닌 thread pool을 만들고 5개의 작업을 보내면 어떻게 될까. 두 개만 실행될 것 같지만 그렇지 않다. 5개 모두 실행하게 되는데 그 원리는 다음과 같다.

 

  • 1. Executor Secrvice 내에 thread pool이 있고, 처리할 작업들을 보낸다.
  • 2. thread pool 이 가진 tread 보다 많은 수의 작업이 들어오면 나머지 작업들을 Blocking Queue에 담는다.
  • 3. Blocking Queue에서 하나씩 꺼내서 놀고 있는 thread에게 일을 시킨다.
  • 4. 모든 작업을 처리하게 됐다.

다섯 개의 작업이 모두 완료된 것을 확인할 수 있다. 다만 shutdownNow를 사용하면 두 개만 실행된다.

 

 

 

ScheduledExecutorService 사용하기

package Java8;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.schedule(getRunnable("Hello"), 3, TimeUnit.SECONDS);

        executorService.shutdown();
    }

    private static Runnable getRunnable(String message) {
        return () -> {
            System.out.println(message + Thread.currentThread().getName());
        };
    }
}

ScheduledExecutoreService를 사용해 쓰레드를 3초 후에 실행시킬 수 있다.

일정한 시간 차를 두고 반복적인 작업을 진행하고 싶다면 scheduleAtFiexdRate 메서드를 사용하면 된다.

 

 

package Java8;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(getRunnable("Hello"), 1, 2, TimeUnit.SECONDS);


    }

    private static Runnable getRunnable(String message) {
        return () -> {
            System.out.println(message + Thread.currentThread().getName());
        };
    }
}

실행하고자 하는 메서드 뒤에 첫 번째 숫자는 언제 시작할 건지(1초 후 실행), 두 번째 숫자는 텀은 어떻게 둘 것인지(2초마다 실행)에 대한 것이다.

참고로 shutdown 두게 되면 아무것도 실행하지 않고 종료된다.

 

 


Fork/Join 프레임워크(멀티프로세싱)

  • ExecutorService의 구현체로 손쉽게 멀티 프로세서를 활용할 수 있게 도와준다.

 

 

 

참고