[JAVA]병렬 스트림(parallel) vs CompletableFuture non-blocking 코드 만들기
2021.09.05 - [프로그래밍 노트/JAVA] - CompletableFutrue를 사용해보자
앞 전에 만들었던 동기 API(getPrice(String product))를 이용하여 최저가격 검색 애플리케이션을 만든다고 가정해보자.
앞에 게시글을 다시들어가기 귀찬으니 아래 코드를 참고하자.
getPrice는 1초 blocking되는 API이다.
public class Shop {
private final String name;
private final Random random;
public Shop(String name) {
this.name = name;
random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));
}
public double getPrice(String product) {
return calculatePrice(product);
}
public double calculatePrice(String product) {
// 1초를 인위적으로 지연
delay();
return format(random.nextDouble() * product.charAt(0) + product.charAt(1));
}
...
}
다음과 같은 상점 리스트가 있다고 가정하고, 제품명을 입력하면 상점 이름과 제품가격 문자열 정보를 List로 반환하는 메서드를 구현해보자.
List<Shop> shopList = Arrays.asList(new Shop("판교역 상점"),
new Shop("정자역 상점"),
new Shop("강남역 상점"),
new Shop("광교역 상점"));
// 우리가 구현해야할 메서드
public List<String> findPrices(String product);
기본적으로 생각할 수 있는 로직은 순차적으로 정보를 요청하는 findPrices이다.
public List<String> findPrices(String product) {
return shopList.stream()
.map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
.collect(toList());
}
[판교역 상점 price is 135.87, 정자역 상점 price is 111.23, 강남역 상점 price is 97.26, 광교역 상점 price is 152.21]
Done in 4071msecs
위 코드의 문제점은 가격을 검색하는 동안 각각 1초의 대기시간이 있으므로 전체 가격 검색 결과는 4초정도 걸린다. 이제 이 코드를 개선해보자.
병렬 스트림으로 요청 병렬화하기
병렬 스트림을 이용하면 순차 계산을 병렬로 처리해서 성능을 개선할 수 있다.
public List<String> findPrices(String product) {
return shops.parallelStream()
.map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
.collect(toList());
}
[판교역 상점 price is 135.87, 정자역 상점 price is 111.23, 강남역 상점 price is 97.26, 광교역 상점 price is 152.21]
Done in 1069msecs
성능이 확실히 향상된 것을 볼 수 있따. 4개의 상점에서 병렬로 검색이 진행되므로 1초정도의 시간이 걸렸다.
CompletableFuture를 사용해보기
public List<String> findPricesFuture(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))))
.collect(Collectors.toList());
List<String> prices = priceFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
return prices;
}
[판교역 상점 price is 125.58, 정자역 상점 price is 99.06, 강남역 상점 price is 143.98, 광교역 상점 price is 92.72]
Done in 1009msecs
병렬 스트림과 비슷한 시간이 걸렸다.
일단 코드를 살펴보자join 메서드
- join 메서드는 Future 인터페이스의 get 메서드와 같은 의미를 갖는다. 유일한 차이점은 join은 내부에서 예외처리를 하기 때문에 아무 예외도 발생시키지 않는다는 점이다.
하나의 스트림 파이프라인으로 처리하지 않고 두 개의 스트림 파이프라인으로 처리한 이유
- 스트림 연산은 게으른 특성이 있으므로 하나의 파이프라인으로 연산을 처리했다면 모든 가격 정보 요청 동작이, 순차적으로 이루어지는 결과가 된다.
- 위 : 이전 요청의 처리가 완전히 끝난 다음에 새로 만든 CompletableFuture가 처리된다.
- 아래 : CompletableFuture를 리스트로 모은 다음에 다른 작업과는 독립적으로 각자의 작업을 수행하는 모습을 보여준다.
성능은 비슷했고, 코드는 병렬스트림을 사용하는 것이 CompletableFuture를 사용하는 것보다 훨씬 간단했다. 그럼에도 CompletableFuture를 사용하는 이유는 무엇일까?
병렬 스트림 vs CompletableFuture
위의 코드에서 검색하는 상점의 수(shopList)를 늘려보자. 두 가지 버전 모두 Runtime.getRuntime().availableProcessors()가 반환하는 스레드 수를 사용한다. (내 맥북에서는 8개의 쓰레드를 기본적으로 사용한다.) 상점의 수가 이 스레드 수가 넘어가면 어떻게될까?
검색하는 상점의 수를 9개로 설정하면, 결과값을 얻는데 까지 2초정도가 걸린다. 8개의 쓰레드가 동시작업을 하는동안 1개의 상점에서 가격을 찾지 못하기 때문이다.
여기까지 보면 비슷해보인다. 가장 큰 차이점은 CompletableFuture는 작업에 이용할 수 있는 다양한 Executor를 지정할 수 있다는 것이다. (병렬 스트림에서도 ForkJoinPool을 이용할 수 있는데 다음에 살펴보자.) 따라서 Executor로 스레드 풀의 크기를 조절하는 등 애플리케이션에 맞는 최적화된 설정을 만들 수 있다.
private final Executor executor = Executors.newFixedThreadPool(Math.min(shopList.size(), 100), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
// Executor를 supplyAsync 두번째 인자로 전달할 수 있다.
CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)), executor);
위 처럼 상점 수 만큼 쓰레드 수를 갖는 풀을 생성할 수 있다. (상점 검색시 1초 남짓의 시간이 걸리게됨)
그럼 언제 사용하면 좋을까?
병렬스트림 - I/O가 포함되지 않은 계산 중심의 동작을 실행할 때는 스트림 인터페이스가 구현하기 간단하며 효율적
CompletableFuture - I/O를 기다리는 작업을 병렬로 실행할때 많은 유연성을 제공. 대기/계산(W/C)의 비율에 적합한 스레드 수를 설정할 수 있음
출처 : 모던자바인액션