9.1 Sinks 란?
- Publisher 와 Subscriber 의 역할을 모두 할 수 있는 리액티브 스트림즈 구성 요소
- Reactor 3.5 부터 Processor 가 없어지고 Sinks만 지원
- Sinks의 주 용도는 "signal을 프로그래밍 코드를 통해 명시적으로 전달하는 것"
- create(), generate() operator는 싱글 스레드 기반에서 signal 을 전송하는데 사용하고, Sinks는 멀티스레드 방식으로 signal 을 전송하는데 사용함(thread-safety)
1) create Operator를 사용하는 경우
public class Example9_1 {
public static void main(String[] args) throws InterruptedException {
int tasks = 6;
Flux
.create((FluxSink<String> sink) -> {
IntStream
.range(1, tasks)
.forEach(n -> sink.next(doTask(n)));
})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(n -> log.info("# create(): {}", n))
.publishOn(Schedulers.parallel())
.map(result -> result + " success!")
.doOnNext(n -> log.info("# map(): {}", n))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
}
private static String doTask(int taskNumber) {
// now tasking.
// complete to task.
return "task " + taskNumber + " result";
}
}
위 코드에서 데이터 처리(emit) / 가공(map) / Subscriber에 전달 하는 단계는 모두 다른 스레드에서 실행
- [boundedElastic-1] / [parallel-2] / [parallel-1]
- 1~6(task)까지의 doTask 자체는 boundedElastic-1 스레드에서 싱글 스레드로 실행
2) Sinks 를 사용하는 경우
public class Example9_2 {
public static void main(String[] args) throws InterruptedException {
int tasks = 6;
Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<String> fluxView = unicastSink.asFlux();
IntStream
.range(1, tasks)
.forEach(n -> {
try {
new Thread(() -> {
unicastSink.emitNext(doTask(n), Sinks.EmitFailureHandler.FAIL_FAST);
log.info("# emitted: {}", n);
}).start();
Thread.sleep(100L);
} catch (InterruptedException e) {
log.error(e.getMessage());
}
});
fluxView
.publishOn(Schedulers.parallel())
.map(result -> result + " success!")
.doOnNext(n -> log.info("# map(): {}", n))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(200L);
}
}
doTask() 데이터 처리과정이 Thread0-4까지 별도의 스레드에서 이루어지지만 스레드 안정성 보장
9.2 Sinks 종류 및 특징
1) Sinks.One
Sinks.One<String> sinksOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();
하나의 데이터만 emit 하는 Sinks로, Sinks 클래스 내의 one() 함수를 이용하여 Sinks.One 객체 생성
Mono로 변환하여 여러개의 구독자가 구독을 실행할 수 있으나 여러개의 데이터를 emit 하는 것은 불가(drop)
2) Sinks.Many
여러 건의 데이터를 emit 하는 SInks
Sinks.many()는 ManySpec 인터페이스를 리턴
- ManySpec.unicast() : UnicastSpec 리턴
- ManySpec.multicast(): MulticastSpec 리턴
- ManySpec.replay(): MulticastReplaySpec 리턴
각각의 리턴 객체의 기능을 이용하여 Sinks.Many 객체 생성
1) unicast(): 하나의 Subscriber 만 구독 가능, 여러 개의 Subscriber 등록시 에러 발생
Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> fluxView = unicastSink.asFlux();
2) multicast(): 여러 개의 Subscriber가 구독 가능
Sinks.Many<Integer> multicastSink = Sinks.many().multicast().onBackpressureBuffer();
Flux<Integer> fluxView = multicastSink.asFlux();
multicastSink.emitNext(1, FAIL_FAST);
multicastSink.emitNext(2, FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
multicastSink.emitNext(3, FAIL_FAST);
Sinks가 Publisher 역할을 수행할 경우 Hot Sequence-Warm up 으로 데이터를 emit
결과적으로 Subscriber1은 1,2,3 / Subscriber2는 3 만 전달
3) replay().limit(x): 여러 개의 Subscriber가 구독 가능하고, 가장 나중에 emit 된 데이터 중 x개를 emit
Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2);
Flux<Integer> fluxView = replaySink.asFlux();
replaySink.emitNext(1, FAIL_FAST);
replaySink.emitNext(2, FAIL_FAST);
replaySink.emitNext(3, FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
replaySink.emitNext(4, FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
Subscriber2의 경우 구독한 시점에 emit 된 데이터 (1, 2, 3, 4) 중 최신 2개 3, 4 전달
'2023~ > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
Ch10 Scheduler (0) | 2024.11.05 |
---|---|
Ch08 Backpressure (0) | 2024.10.22 |
Ch07 Cold Sequence 와 Hot Sequence (0) | 2024.10.02 |
Ch06 마블 다이어그램 (0) | 2024.09.19 |
Ch05 Reactor 개요 (0) | 2024.09.18 |
댓글