원자로를 사용한 화재
나의 Spring boot 앱에 아래와 같은 방법이 있습니다.
public Flux<Data> search(SearchRequest request) {
Flux<Data> result = searchService.search(request);//this returns Flux<Data>
Mono<List<Data>> listOfData = result.collectList();
// doThisAsync() // here I want to pass this list and run some processing on it
// the processing should happen async and the search method should return immediately.
return result;
}
//this method uses the complete List<Data> returned by above method
public void doThisAsync(List<Data> data) {
//do some processing here
}
현재 사용 중입니다.@Async
주석이 달린 서비스 클래스doThisAsync
하지만 어떻게 통과하는지 모릅니다.List<Data>
전화하고 싶지 않기 때문에block
내가 가진건Mono<List<Data>>
.
나의 주된 문제는 이 모노를 어떻게 별도로 처리하느냐 하는 것입니다.search
메소드는 반환해야 합니다.Flux<Data>
.
1, Fire-and-Forget이 이미 비동기식으로 반환되는 경우Mono
/Flux
public Flux<Data> search(SearchRequest request)
{
return searchService.search(request)
.collectList()
.doOnNext(data -> doThisAsync(data).subscribe()) // add error logging here or inside doThisAsync
.flatMapMany(Flux::fromIterable);
}
public Mono<Void> doThisAsync(List<Data> data) {
//do some async/non-blocking processing here like calling WebClient
}
2, Fire-and-Forget이 I/O를 차단하는 경우
public Flux<Data> search(SearchRequest request)
{
return searchService.search(request)
.collectList()
.doOnNext(data -> Mono.fromRunnable(() -> doThisAsync(data))
.subscribeOn(Schedulers.elastic()) // delegate to proper thread to not block main flow
.subscribe()) // add error logging here or inside doThisAsync
.flatMapMany(Flux::fromIterable);
}
public void doThisAsync(List<Data> data) {
//do some blocking I/O on calling thread
}
위의 두 경우 모두 배압 지지대가 손실됩니다.만약에doAsyncThis
어떤 이유로 인해 속도가 느려지면 데이터 생산자는 신경 쓰지 않고 계속해서 항목을 생산할 것입니다.이것은 불과 안개 메커니즘의 자연스러운 결과입니다.
아래 예제와 같이 publishOn을 사용하여 별도의 스레드에서 처리를 실행하는 것을 고려해 본 적이 있습니까?이것은 정확히 당신이 요구하는 것이 아닐 수도 있지만, 플럭스의 결과 처리가 전용 스케줄러(FourThreadScheduler)에서 하나 이상의 스레드(예: 4개 스레드 스케줄러)에 의해 수행되는 동안 다른 문제를 계속 수행할 수 있습니다.
@Test
public void processingInSeparateThreadTest() {
final Scheduler theFourThreadScheduler = Schedulers.newParallel("FourThreads", 4);
final Flux<String> theResultFlux = Flux.just("one", "two", "three", "four", "five", "six", "seven", "eight");
theResultFlux.log()
.collectList()
.publishOn(theFourThreadScheduler)
.subscribe(theStringList -> {
doThisAsync(theStringList);
});
System.out.println("Subscribed to the result flux");
for (int i = 0; i < 20; i++) {
System.out.println("Waiting for completion: " + i);
try {
Thread.sleep(300);
} catch (final InterruptedException theException) {
}
}
}
private void doThisAsync(final List<String> inStringList) {
for (final String theString : inStringList) {
System.out.println("Processing in doThisAsync: " + theString);
try {
Thread.sleep(500);
} catch (final InterruptedException theException) {
}
}
}
예제를 실행하면 doThisAsync()에서 수행된 처리가 백그라운드에서 수행됨을 보여주는 다음과 같은 출력이 생성됩니다.
Subscribed to the result flux
Waiting for completion: 0
Processing in doThisAsync: one
Waiting for completion: 1
Processing in doThisAsync: two
Waiting for completion: 2
Waiting for completion: 3
Processing in doThisAsync: three
Waiting for completion: 4
Waiting for completion: 5
Processing in doThisAsync: four
Waiting for completion: 6
Processing in doThisAsync: five
Waiting for completion: 7
Waiting for completion: 8
Processing in doThisAsync: six
Waiting for completion: 9
Processing in doThisAsync: seven
Waiting for completion: 10
Waiting for completion: 11
Processing in doThisAsync: eight
Waiting for completion: 12
Waiting for completion: 13
Waiting for completion: 14
Waiting for completion: 15
Waiting for completion: 16
Waiting for completion: 17
Waiting for completion: 18
Waiting for completion: 19
참조:3번 원자로 참조: 스케줄러
2023/01/31 업데이트
사실 어쨌든 당신은 .subscribeOn()을 사용해야 합니다. 왜냐하면 당신이 당신의 fire-and-forget 함수를 호출하더라도 그것은 되돌아오기 때문입니다.Mono<Void>
반응형 체인 내에서 실행 스레드가 전환되거나 즉시 발생할 것이라는 보장은 없습니다(특히 체인에서 사용된 운영자는 해당 화재 및 폭발 기능 내부의 코드에 따라 다름).
따라서 이 함수를 호출한 동일한 스레드에서 fire-and-forget 함수가 실행되므로 이 함수가 완료될 때까지 메서드가 반환되지 않을 수 있습니다.
fire-and-forget 기능이 반환되는 경우Publisher<Void>
:
public Flux<Data> search(SearchRequest request) {
return searchService.search(request)
.collectList()
.doOnNext(data ->
// anyway call subscribeOn(...)
fireAndForgetOperation(data)
.subscribeOn(...)
.subscribe()
)
.flatMapMany(Flux::fromIterable);
}
public Mono<Void> fireAndForgetOperation(List<String> list) {
...
}
화재-잊음 기능이 일반적인 경우void
반환 방법:
public Flux<Data> search(SearchRequest request) {
return searchService.search(request)
.collectList()
.doOnNext(data ->
Mono.fromRunnable(() -> fireAndForgetOperation(data))
.subscribeOn(...)
.subscribe()
)
.flatMapMany(Flux::fromIterable);
}
public void fireAndForgetOperation(List<String> list) {
...
}
또한 당신은 무엇을 고려해야 합니다.Scheduler
소방 및 소방 기능의 특성에 따라 제공해야 합니다.
기본적으로 두 가지 시나리오가 있습니다.
방화벽 및 망각 기능이 CPU 바인딩을 수행하는 경우.그런 다음 다음 다음을 지정합니다.Schedulers.parallel()
東京의 subsribeOn()
I/O 기능이 작동하는 경우(실제로 이 경우에는 차단 또는 비차단 IO 여부와 상관 없음).그런 다음 다음 다음을 지정합니다.Schedulers.boundedElastic()
東京의 subsribeOn()
따라서 이 접근 방식을 사용하면 화재 예방 기능을 실행한 후 즉시 복귀할 수 있습니다.
언급URL : https://stackoverflow.com/questions/57566465/fire-and-forget-with-reactor
'programing' 카테고리의 다른 글
springdoc-openapi-webflux-ui를 사용하여 앱 API 문서를 표시하는 방법은 무엇입니까? (0) | 2023.07.20 |
---|---|
jpa와 함께 kafka를 사용할 때 모범 사례 (0) | 2023.07.20 |
hibernate.load로 loader 'app'의 이름 없는 모듈에 있는 "is"를 수정하는 방법 (0) | 2023.07.20 |
Spring Boot이 로그를 콘솔로 인쇄하지 못하도록 방지 (0) | 2023.07.20 |
컨트롤러 레벨에서 사용자 지정 예외 및 처리를 발생시키지 않고 서비스 계층에서 ResponseStatusException을 직접 발생시킬 수 있습니까? (0) | 2023.07.20 |