8.1 Backpressure
Publisher 가 데이터를 emit 하는 속도가 Subscriber 가 처리하는 속도보다 훨씬 더 빠를 때, 오버플로우나 시스템 다운 발생 가능
이러한 문제를 해결하기 위한 수단이 BackPressure
8.2 Reactor에서의 Backpressure 처리 방식
1) 데이터 개수 제어
Subscriber 가 처리할 수 있는 수준의 데이터 개수를 Publisher 에게 지정(전달) 하는 방법
@Slf4j
public class Example8_1 {
public static void main(String[] args) {
Flux.range(1, 5)
.doOnRequest(data -> log.info("# doOnRequest: {}", data))
.subscribe(new BaseSubscriber<Integer>() { // Subscriber 로 BaseSubscriber의 구현 객체 전달
@Override
protected void hookOnSubscribe(Subscription subscription) {
// 최초로 Publisher 구독시 요청할 데이터 개수를 request 함수로 지정
request(1);
}
@SneakyThrows
@Override
// Publisher가 emit 한 데이터를 처리할 때 사용하는 함수로, value를 출력한 후 다음 데이터 1개를 request
protected void hookOnNext(Integer value) {
Thread.sleep(2000L);
log.info("# hookOnNext: {}", value);
request(1);
log.warn("\n");
}
});
}
}
2) Backpressure 전략 사용
IGNORE 전략
: Backpressure 을 사용하지 않는 전략(IllegalStateException 발생 가능)
ERROR 전략
: 데이터를 emit 하는 속도가 처리하는 속도보다 빠를 때 IllegalStateException을 발생시키는 전략
@Slf4j
public class Example8_2 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(1L)) // 0부터 1씩 증가하는 수를 0.001 초마다 emit
.onBackpressureError() // ERROR 전략을 사용
.doOnNext(data -> log.info("# doOnNext: {}", data))
//.publishOn(Schedulers.parallel())
.subscribe(data -> {
try {
// emit 된 데이터를 0.005 초마다 처리 (emit 속도보다 느림)
Thread.sleep(5L);
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
}
}
주석 부분이 없어도 emit 은 0.001 초마다 한 번씩, subscribe 는 0.005 초마다 한 번씩 처리하기 때문에 에러가 발생할 줄 알았는데 아니었다 (왜?) 그래서 찾아보니 publishOn() 메서드를 사용하면 데이터를 emit 하는 동작과 데이터를 처리하는 동작이 다른 스레드에서 실행되고, publishOn()을 지우면 메인스레드 하나에서 데이터 emit, 데이터 처리 동작이 같이 돌아간다고 한다. 후자의 경우 데이터 emit 동작이 잠시 멈춰 있는 동안 데이터 처리가 일어나기 때문에 에러가 안 생길 수도 있다는 것…! 전자의 경우 별도의 스레드에서 데이터가 빠르게 emit 되고 있으므로 무조건 에러가 발생한다.
DROP 전략
: Publisher 가 DownStream 에 전달할 데이터가 버퍼에 가득찼을 때 가장 먼저 emit 된 순서대로 데이터를 drop 하는 전략
LATEST 전략
: (버퍼가 가득찼다가) 새로 데이터를 버퍼에 넣으려고 하는 시점에 가장 최근에 emit 된 데이터를 제외하고 모두 discard 하는 전략
DROP/LATEST 모두 가장 먼저 emit 된 데이터를 제거하는 느낌은 비슷하지만 DROP은 들어가는 하나씩 제거되는 느낌이고 LATEST 는 새로운 걸 넣어야 할 때 최신 데이터만 빼고 앞에 데이터는 다 없어지는 느낌 인 것 같다.
BUFFER 전략
: 버퍼가 가득차면 에러를 낼 수도, 버퍼가 가득찼을 때 버퍼 내 데이터를 제거할 수도 있는 전략
⓵ BUFFER_DROP_LATEST 전략
버퍼가 가득찼을 때 새로운 데이터가 다시 emit 되면(오버플로우 상황) 가장 최근에 emit 된 데이터를 제거하는 방식
⓶ BUFFER_DROP_OLDEST 전략
버퍼가 가득찼을 때 가장 오래 전에 버퍼에 들어온 데이터를 제거하는 방식
@Slf4j
public class Example8_5 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(300L)) // 0.3초마다 데이터 emit
.doOnNext(data -> log.info("# emitted by original Flux: {}", data))
.onBackpressureBuffer(2,
dropped -> log.info("** Overflow & Dropped: {} **", dropped),
BufferOverflowStrategy.DROP_LATEST) // BufferOverflowStrategy.DROP_OLDEST
.publishOn(Schedulers.parallel(), false, 1)
.subscribe(data -> {
try {
Thread.sleep(1000L); // 1초마다 데이터 처리
} catch (InterruptedException e) {}
log.info("# onNext: {}", data);
},
error -> log.error("# onError", error));
Thread.sleep(2500L);
}
}
그동안 doOnNext() 동작을 제일 마지막 .subscribe() 에서 데이터 처리 동작이 수행될 때 같이 수행된다고 착각을 하고 있었다. doOnNext() 는 하위 Consumer Publisher 에 데이터를 emit 할 때 사용하고 따라서 위의 코드에서는 최초로 데이터가 생성될 때 emit 하는 상황을 로깅한다.
'2023~ > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
Ch10 Scheduler (0) | 2024.11.05 |
---|---|
Ch09 Sinks (0) | 2024.11.03 |
Ch07 Cold Sequence 와 Hot Sequence (0) | 2024.10.02 |
Ch06 마블 다이어그램 (0) | 2024.09.19 |
Ch05 Reactor 개요 (0) | 2024.09.18 |
댓글