1. Reactive System
- 요구사항의 변화
- 한 개의 거대한 앱에서 -> 도메인 별로 나뉘어진 여러개의 마이크로 앱으로
- 응답이 잘 되고 탄력적이며, 유연하고 메시지 기반으로 동작하는 시스템이 필요해짐
- 리액티브 선언문(https://www.reactivemanifesto.org/ko)
- 응답성(Responsive)
- 탄력성(Resilient)
- 유연성(Elastic)
- 메시지기반(Message Driven)
2. Reactive Programming
- 데이터 흐름과 변화 전파에 중점을 둔 프로그래밍 패러다임
- 프로그램 안에서 정적이거나 동적인 데이터 흐름이 표현되어야 함
- 데이터 흐름에 따라 하위 로직에 자동으로 변화를 전파할 수 있어야 함
-> 그래서 필수적 요소로 함수형 프로그래밍을 활용
3. Blocking / Nonblocking, Synchronous/Asynchronous
- Blocking/Nonblocking
: 호출하는 시점에 제어권을 넘겨주는지에 초점
- Synchronous/Asynchronous
: 작업 완료 여부를 확인하는지에 초점
- 동기-블로킹, 동기-논블로킹, 비동기-블로킹, 비동기-논브로킹 구분
4. Reactive Stream과 Reactor
- Reactive Stream
: Reactive Programming을 위한 Interface
: Nonblocking BackPressure를 이용한 비동기 스트림
(*BackPressure: Publisher에서 데이터를 Subscriber로 Push 하는 방식이 아니라, Pull 방식으로 Subscriber가 Publisher로
처리할 수 있는 양의 크기만큼 데이터를 요청 함으로써 Subscriber의 장애를 방지하기 위함이다. (OutOfMemory 등)
즉, 다이나믹 풀 방식의 데이터 요청을 통해서 Subscriber가 수용할 수 있는 만큼 데이터를 요청하는 방식이다.)
- Reactor
: Reactor Stream의 구현체 중 하나
: Spring5, Spring boot 2.0 이상부터 지원
5. Reactive Stream Interface
- 메이븐 링크(https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams)
: 2019년으로 업데이트 끝남. 사실상 완료
- 4가지로 구성
- Processor
- Publisher
- Subscriber
- Subscription
6. Marble Diagram
- Observable 한 데이터의 Stream!
- https://reactivex.io/documentation/ko/observable.html
7. Reactive Core Feature
- Reactor Project의 주요 구현체 포함
- Mono
- Flux
- Schedulers
- Errors
- Processors
- Operations
- Handling errors
- Testing
8. Flux/Mono
- Publisher의 구현체
- Flux: 0~N 개의 결과을 가짐
- Mono: 0~1 개의 결과를 가짐
9. Sequence
- 변화 가능한 데이터의 흐름
- Publisher에 의해 발행(publish) 되고
Subscriber에 의해 구독(Subscription) 된다.
10. Sequence 생성(Create)
- 문서 링크(https://reactivex.io/documentation/operators.html#creating)
- Just:
- Empty:
- From:
- Defer:
10. Scheduler
- 문서 링크(https://reactivex.io/documentation/ko/scheduler.html)
- Sequence를 처리할 쓰레드를 지정하는 것
- Reactor는 비동기 실행을 강제하지 않음
- publishOn
: next, complete, error 신호처리 쓰레드 설정
: 다음 publishOn을 만날 때까지 같은 쓰레드에서 동작
- subscribeOn
: 시퀀스를 실행할 쓰레드를 설정
: publishOn을 만날 때까지 같은 쓰레드에서 실행
: publishOn이 신호를 처리할 쓰레드를 지정하므로
publishOn 뒤에 오는 subscribeOn은 무시된다.
(* 우선순위는 publishOn > subscribeOn)
11. Scheduler의 종류
- Immediate: 지금 실행 중인 쓰레드에서 실행
- Single: Runnable Executor 실행
- Parallel: Core 갯수만큼 쓰레드 생성
- Elastic: 쓰레드를 무한정 생성
- Bounded Elastic: Core*10 만큼 쓰레드 생성
12. Operations
- 문서 링크 (https://reactivex.io/documentation/operators.html)
- (자주 사용되는 것들만 정리)
- 변환
: flatMap, groupBy, map
- 필터
: distinct, elementAt, filter, ignoreElements, take
- 결합연산자
: merge, startWith, then, zip
- 오류처리 연산자
: onErrorResume, retry
- 유틸리티 연산자
: delay, do*, serialize, subscribe, timestamp
- 조건 연산자
: all, contains, defaultEmpty, skipWhile, takeWhile
- 집계 연산자
: average*, count, max, min, reduce, sum
13. Handling Errors
- Publiser가 아이템 처리 시 마다 onNext() 이벤트를 발생시키다가,
error를 만날 경우 onError()를 발생시키고, sequnece 중지
- onErrorReturn() 이나 onErrorResume() 등으로 에러 발생 시 로직 작성
14. Testing
- reactor-tesing dependency 추가
- 테스트할 모듈이 flux나 mono를 반환하도록 하여 검사
- 기대하는 동작을 StepVerifier API로 작성
1. StepVerifier를 생성하여 테스트할 Sequence 입력
2. expect*에 기대값 표현
3. 일부 시퀀스를 통과하고 싶을 때는 consumeNextWith()
4. 종료 이벤트를 기대할 경우 expectComplete()
또는 expectError()
5. 마지막으로 verify()를 호출하여 검증단계 trigger
* 단계 중 기대한 값과 다른 데이터를 subscribe하면 즉시 AssertionError를 발생하여 테스트 종료
ex) StepVerifier.create(
testFunction(source)
.expectNext("s1")
.expectNext("s2")
.expectComplete()
.verify()
)
'Dev' 카테고리의 다른 글
2024년 말에 다시 시작하는 사이드프로젝트 세팅 (0) | 2024.10.07 |
---|---|
WebFlux (0) | 2022.04.19 |
Event-driven Architecture & Programming (0) | 2022.04.11 |