본문 바로가기
2023~/스프링으로 시작하는 리액티브 프로그래밍

Ch09 Sinks

by 경아ㅏ 2024. 11. 3.

 

9.1 Sinks 란?

 

- Publisher 와 Subscriber 의 역할을 모두 할 수 있는 리액티브 스트림즈 구성 요소

- Reactor 3.5 부터 Processor 가 없어지고 Sinks만 지원

- Sinks의 주 용도는 "signal을 프로그래밍 코드를 통해 명시적으로 전달하는 것"

- create(), generate() operator는 싱글 스레드 기반에서 signal 을 전송하는데 사용하고, Sinks는 멀티스레드 방식으로 signal 을 전송하는데 사용함(thread-safety)


1) create Operator를 사용하는 경우

public class Example9_1 {
    public static void main(String[] args) throws InterruptedException {
        int tasks = 6;
        Flux
            .create((FluxSink<String> sink) -> {
                IntStream
                        .range(1, tasks)
                        .forEach(n -> sink.next(doTask(n)));
            })
            .subscribeOn(Schedulers.boundedElastic())
            .doOnNext(n -> log.info("# create(): {}", n))
            .publishOn(Schedulers.parallel())
            .map(result -> result + " success!")
            .doOnNext(n -> log.info("# map(): {}", n))
            .publishOn(Schedulers.parallel())
            .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(500L);
    }

    private static String doTask(int taskNumber) {
        // now tasking.
        // complete to task.
        return "task " + taskNumber + " result";
    }
}


위 코드에서 데이터 처리(emit) / 가공(map) / Subscriber에 전달 하는 단계는 모두 다른 스레드에서 실행

- [boundedElastic-1] / [parallel-2] / [parallel-1]
- 1~6(task)까지의 doTask 자체는 boundedElastic-1 스레드에서 싱글 스레드로 실행


2) Sinks 를 사용하는 경우

 

public class Example9_2 {
    public static void main(String[] args) throws InterruptedException {
        int tasks = 6;

        Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
        Flux<String> fluxView = unicastSink.asFlux();
        IntStream
                .range(1, tasks)
                .forEach(n -> {
                    try {
                        new Thread(() -> {
                            unicastSink.emitNext(doTask(n), Sinks.EmitFailureHandler.FAIL_FAST);
                            log.info("# emitted: {}", n);
                        }).start();
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                        log.error(e.getMessage());
                    }
                });

        fluxView
                .publishOn(Schedulers.parallel())
                .map(result -> result + " success!")
                .doOnNext(n -> log.info("# map(): {}", n))
                .publishOn(Schedulers.parallel())
                .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(200L);
    }
}

 

doTask() 데이터 처리과정이 Thread0-4까지 별도의 스레드에서 이루어지지만 스레드 안정성 보장


9.2 Sinks 종류 및 특징

1) Sinks.One

 

Sinks.One<String> sinksOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();


하나의 데이터만 emit 하는 Sinks로, Sinks 클래스 내의 one() 함수를 이용하여 Sinks.One 객체 생성
Mono로 변환하여 여러개의 구독자가 구독을 실행할 수 있으나 여러개의 데이터를 emit 하는 것은 불가(drop)

 

 

2) Sinks.Many

여러 건의 데이터를 emit 하는 SInks

Sinks.many()는 ManySpec 인터페이스를 리턴

- ManySpec.unicast() : UnicastSpec 리턴

- ManySpec.multicast(): MulticastSpec 리턴

- ManySpec.replay(): MulticastReplaySpec 리턴

각각의 리턴 객체의 기능을 이용하여 Sinks.Many 객체 생성

 


1) unicast(): 하나의 Subscriber 만 구독 가능, 여러 개의 Subscriber 등록시 에러 발생

 

Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> fluxView = unicastSink.asFlux();

 

 

2) multicast(): 여러 개의 Subscriber가 구독 가능

Sinks.Many<Integer> multicastSink = Sinks.many().multicast().onBackpressureBuffer();
Flux<Integer> fluxView = multicastSink.asFlux();

multicastSink.emitNext(1, FAIL_FAST);
multicastSink.emitNext(2, FAIL_FAST);

fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));

multicastSink.emitNext(3, FAIL_FAST);


Sinks가 Publisher 역할을 수행할 경우 Hot Sequence-Warm up 으로 데이터를 emit

결과적으로 Subscriber1은 1,2,3 / Subscriber2는 3 만 전달


3) replay().limit(x): 여러 개의 Subscriber가 구독 가능하고, 가장 나중에 emit 된 데이터 중 x개를 emit

Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2);
Flux<Integer> fluxView = replaySink.asFlux();

replaySink.emitNext(1, FAIL_FAST);
replaySink.emitNext(2, FAIL_FAST);
replaySink.emitNext(3, FAIL_FAST);

fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));

replaySink.emitNext(4, FAIL_FAST);

fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));


Subscriber2의 경우 구독한 시점에 emit 된 데이터 (1, 2, 3, 4) 중 최신 2개 3, 4 전달

 

'2023~ > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글

Ch10 Scheduler  (0) 2024.11.05
Ch08 Backpressure  (0) 2024.10.22
Ch07 Cold Sequence 와 Hot Sequence  (0) 2024.10.02
Ch06 마블 다이어그램  (0) 2024.09.19
Ch05 Reactor 개요  (0) 2024.09.18

댓글