10.1 스레드(Thread)의 개념 이해
물리적인 스레드: CPU 코어를 논리적으로 나누어 놓은 스레드
논리적인 스레드: 소프트웨어적으로 생성된 스레드
물리적인 스레드는 진짜로 동시에 돌아가고 - 병렬성,
논리적인 스레드는 여러 물리적 스레드를 빠른 속도로 번갈아 가며 실행되기 때문에 동시에 돌아가는 것 처럼 보인다 - 동시성
10.3 Scheduler를 위한 전용 Operator
1) subscribeOn()
Flux.fromArray(new Integer[] {1, 3, 5, 7})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(data -> log.info("# doOnNext: {}", data))
.doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
.subscribe(data -> log.info("# onNext: {}", data));
최초로 구독이 시작되는 시점 이후에 동작들이 어떠한 스레드에서 처리될 것인지를 지정
위 코드에서
- [main] 최초 구독 시점에 main 스레드에서 #doOnSubscribe 출력
- [boundElastic-1] 구독 이후 boundElastic-1 스레드에서 1, 3, 5, 7 emit 및 #doOnNext 출력
- [boundElastic-1] #onNext 출력도 해당 스레드에서 이루어짐
2) publishOn()
Flux.fromArray(new Integer[] {1, 3, 5, 7})
.doOnNext(data -> log.info("# doOnNext: {}", data))
.doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
해당 publishOn()을 기준으로 DownStream으로 넘어갈 때 실행 스레드 변경
위 코드에서
- [main] #doOnSubscribe
- [main] #doOnNext 출력
- [parallel-1] main과 다른 스레드에서 #onNext 출력
3) parallel()
물리적 코어 내 논리적 코어(물리적 스레드)를 지정해서 병렬적으로 실행
Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
.parallel(4)
.runOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
parallel()은 논리적 코어 몇 개에 데이터들을 할당할 것인지를 지정(parallel(4): 1부터 19까지의 데이터를 4개의 물리적 스레드에 공평하게 나누겠다 - 데이터 묶음 rail)
runOn(): 병렬 작업을 수행할 스레드 지정
10.4 publishOn()과 subscribeOn()의 동작 이해(publishOn, subscribeOn의 혼용 사용)
subscribeOn(): 사용한 위치에 상관 없이 구독한 시점 이후 emit 부터 실행할 새로운 스레드를 지정
publishOn(): 위치에 따라 Downstream 실행할 새로운 스레드 지정
1) case1: publishOn(), subscribeOn() 둘 다 사용하지 않았을 경우
Flux
.fromArray(new Integer[] {1, 3, 5, 7})
.doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
.filter(data -> data > 3)
.doOnNext(data -> log.info("# doOnNext filter: {}", data))
.map(data -> data * 10)
.doOnNext(data -> log.info("# doOnNext map: {}", data))
.subscribe(data -> log.info("# onNext: {}", data));
- [main] 1, 3, 5, 7 을 emit 하면서 #doOnNext fromArray 출력
- [main] filter 수행 후 #doOnNext filter 출력
- [main] map 수행 후 #doOnNext map 출력
- [main] subscriber 등록 후 # onNext 출력
Emit, filter, map, subscribe 연산이 모두 동일한 스레드 main에서 수행
2) case2: publishOn()만 사용했을 경우
Flux
.fromArray(new Integer[] {1, 3, 5, 7})
.doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
.publishOn(Schedulers.parallel())
.filter(data -> data > 3)
.doOnNext(data -> log.info("# doOnNext filter: {}", data))
.publishOn(Schedulers.parallel())
.map(data -> data * 10)
.doOnNext(data -> log.info("# doOnNext map: {}", data))
.subscribe(data -> log.info("# onNext: {}", data));
- [main] emit, # doOnNext fromArray 출력
- [parallel-2] filter, # doOnNext filter 출력
- [parallel-1] map, # doOnNext map 출력
- [parallel-1] subscriber 등록 후 # onNext 출력
publishOn() 연산을 기준으로 다른 스레드에서 명령 수행
3) case3: subscribeOn(), publishOn() 을 동시에 사용하는 경우
Flux
.fromArray(new Integer[] {1, 3, 5, 7})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
.filter(data -> data > 3)
.doOnNext(data -> log.info("# doOnNext filter: {}", data))
.publishOn(Schedulers.parallel())
.map(data -> data * 10)
.doOnNext(data -> log.info("# doOnNext map: {}", data))
.subscribe(data -> log.info("# onNext: {}", data));
- [boundedElastic-1] 1, 3, 5, 7 emit, # doOnNext fromArray 출력
- [boundedElastic-1] filter, # doOnNext filter 출력
- [parallel-1] map, # doOnNext map 출력
- [parallel-1] # onNext 출력
10.5 Scheduler의 종류
1) Schedulers.immediate()
현재의 스레드에서 계속 수행
특정 API의 파라미터로 Scheduler를 전달해야 하는데 특정 상황에서는 현재 스레드에서 계속 수행하고 싶을 때 사용
2) Schedulers.single()
하나의 스레드를 만들어 계속 사용
3) Schedulers.newSingle(thread-name, true/false)
함수 등을 호출할 때마다 새로운 스레드를 하나 생성하여 작업 처리
true/false: 데몬 스레드 여부(주 스레드가 종료될 때 해당 스레드도 종료할 것인지)
4) Scheduler.boundedElastic()
ExcecutorService 기반의 스레드 풀을 생성한 후 정해진 수만큼의 스레드를 사용하고 종료된 스레드는 반납하는 방식
Blocking I/O 에 최적화
5) Scheduler.parallel()
CPU 코어 개수만큼 스레드를 생성하여 사용하는 방식
Non-Blocking I/O에 최적화
'ㄴ Spring Webflux > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
Ch09 Sinks (0) | 2024.11.03 |
---|---|
Ch08 Backpressure (0) | 2024.10.22 |
Ch07 Cold Sequence 와 Hot Sequence (0) | 2024.10.02 |
Ch06 마블 다이어그램 (0) | 2024.09.19 |
Ch05 Reactor 개요 (0) | 2024.09.18 |
댓글