본문 바로가기
ㄴ Spring Webflux/스프링으로 시작하는 리액티브 프로그래밍

Ch10 Scheduler

by 경아ㅏ 2024. 11. 5.

 

10.1 스레드(Thread)의 개념 이해

물리적인 스레드: CPU 코어를 논리적으로 나누어 놓은 스레드

논리적인 스레드: 소프트웨어적으로 생성된 스레드

물리적인 스레드는 진짜로 동시에 돌아가고 - 병렬성,
논리적인 스레드는 여러 물리적 스레드를 빠른 속도로 번갈아 가며 실행되기 때문에 동시에 돌아가는 것 처럼 보인다 - 동시성

 


10.3 Scheduler를 위한 전용 Operator

1) subscribeOn()

 

Flux.fromArray(new Integer[] {1, 3, 5, 7})
        .subscribeOn(Schedulers.boundedElastic())
        .doOnNext(data -> log.info("# doOnNext: {}", data))
        .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
        .subscribe(data -> log.info("# onNext: {}", data));


최초로 구독이 시작되는 시점 이후에 동작들이 어떠한 스레드에서 처리될 것인지를 지정
위 코드에서
- [main] 최초 구독 시점에 main 스레드에서 #doOnSubscribe 출력

- [boundElastic-1] 구독 이후 boundElastic-1 스레드에서 1, 3, 5, 7 emit 및 #doOnNext 출력

- [boundElastic-1] #onNext 출력도 해당 스레드에서 이루어짐

 

2) publishOn()

 

Flux.fromArray(new Integer[] {1, 3, 5, 7})
        .doOnNext(data -> log.info("# doOnNext: {}", data))
        .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
        .publishOn(Schedulers.parallel())
        .subscribe(data -> log.info("# onNext: {}", data));


해당 publishOn()을 기준으로 DownStream으로 넘어갈 때 실행 스레드 변경
위 코드에서
- [main] #doOnSubscribe

- [main] #doOnNext 출력
- [parallel-1] main과 다른 스레드에서 #onNext 출력


3) parallel()

물리적 코어 내 논리적 코어(물리적 스레드)를 지정해서 병렬적으로 실행

Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
        .parallel(4)
        .runOn(Schedulers.parallel())
        .subscribe(data -> log.info("# onNext: {}", data));


parallel()은 논리적 코어 몇 개에 데이터들을 할당할 것인지를 지정(parallel(4): 1부터 19까지의 데이터를 4개의 물리적 스레드에 공평하게 나누겠다 - 데이터 묶음 rail)
runOn(): 병렬 작업을 수행할 스레드 지정



10.4 publishOn()과 subscribeOn()의 동작 이해(publishOn, subscribeOn의 혼용 사용)

subscribeOn(): 사용한 위치에 상관 없이 구독한 시점 이후 emit 부터 실행할 새로운 스레드를 지정

publishOn(): 위치에 따라 Downstream 실행할 새로운 스레드 지정


1) case1: publishOn(), subscribeOn() 둘 다 사용하지 않았을 경우

 

Flux
    .fromArray(new Integer[] {1, 3, 5, 7})
    .doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
    .filter(data -> data > 3)
    .doOnNext(data -> log.info("# doOnNext filter: {}", data))
    .map(data -> data * 10)
    .doOnNext(data -> log.info("# doOnNext map: {}", data))
    .subscribe(data -> log.info("# onNext: {}", data));

 

- [main] 1, 3, 5, 7 을 emit 하면서 #doOnNext fromArray 출력

- [main] filter 수행 후 #doOnNext filter 출력

- [main] map 수행 후 #doOnNext map 출력

- [main] subscriber 등록 후 # onNext 출력

Emit, filter, map, subscribe 연산이 모두 동일한 스레드 main에서 수행



2) case2: publishOn()만 사용했을 경우

Flux
    .fromArray(new Integer[] {1, 3, 5, 7})
    .doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
    .publishOn(Schedulers.parallel())
    .filter(data -> data > 3)
    .doOnNext(data -> log.info("# doOnNext filter: {}", data))
    .publishOn(Schedulers.parallel())
    .map(data -> data * 10)
    .doOnNext(data -> log.info("# doOnNext map: {}", data))
    .subscribe(data -> log.info("# onNext: {}", data));


- [main] emit, # doOnNext fromArray 출력

- [parallel-2] filter, # doOnNext filter 출력
- [parallel-1] map, # doOnNext map 출력

- [parallel-1] subscriber 등록 후 # onNext 출력

publishOn() 연산을 기준으로 다른 스레드에서 명령 수행


3) case3: subscribeOn(), publishOn() 을 동시에 사용하는 경우

Flux
    .fromArray(new Integer[] {1, 3, 5, 7})
    .subscribeOn(Schedulers.boundedElastic())
    .doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
    .filter(data -> data > 3)
    .doOnNext(data -> log.info("# doOnNext filter: {}", data))
    .publishOn(Schedulers.parallel())
    .map(data -> data * 10)
    .doOnNext(data -> log.info("# doOnNext map: {}", data))
    .subscribe(data -> log.info("# onNext: {}", data));


- [boundedElastic-1] 1, 3, 5, 7 emit, # doOnNext fromArray 출력

- [boundedElastic-1] filter, # doOnNext filter 출력

- [parallel-1] map, # doOnNext map 출력

- [parallel-1] # onNext 출력

 

 

10.5 Scheduler의 종류

1) Schedulers.immediate()

 

현재의 스레드에서 계속 수행

특정 API의 파라미터로 Scheduler를 전달해야 하는데 특정 상황에서는 현재 스레드에서 계속 수행하고 싶을 때 사용


2) Schedulers.single()

 

하나의 스레드를 만들어 계속 사용


3) Schedulers.newSingle(thread-name, true/false)

 

함수 등을 호출할 때마다 새로운 스레드를 하나 생성하여 작업 처리
true/false: 데몬 스레드 여부(주 스레드가 종료될 때 해당 스레드도 종료할 것인지)

 

4) Scheduler.boundedElastic()

 

ExcecutorService 기반의 스레드 풀을 생성한 후 정해진 수만큼의 스레드를 사용하고 종료된 스레드는 반납하는 방식

Blocking I/O 에 최적화

 

5) Scheduler.parallel()

CPU 코어 개수만큼 스레드를 생성하여 사용하는 방식
Non-Blocking I/O 최적화

 

 

'ㄴ Spring Webflux > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글

Ch09 Sinks  (0) 2024.11.03
Ch08 Backpressure  (0) 2024.10.22
Ch07 Cold Sequence 와 Hot Sequence  (0) 2024.10.02
Ch06 마블 다이어그램  (0) 2024.09.19
Ch05 Reactor 개요  (0) 2024.09.18

댓글