본문 바로가기
Dev

Reactive Programming

by jyobi 2022. 4. 12.
반응형

1. Reactive System

- 요구사항의 변화

    - 한 개의 거대한 앱에서 -> 도메인 별로 나뉘어진 여러개의 마이크로 앱으로

    - 응답이 잘 되고 탄력적이며, 유연하고 메시지 기반으로 동작하는 시스템이 필요해짐

 

- 리액티브 선언문(https://www.reactivemanifesto.org/ko)

    - 응답성(Responsive)

    - 탄력성(Resilient)

    - 유연성(Elastic)

    - 메시지기반(Message Driven)

 

2. Reactive Programming

- 데이터 흐름과 변화 전파에 중점을 둔 프로그래밍 패러다임

- 프로그램 안에서 정적이거나 동적인 데이터 흐름이 표현되어야 함

- 데이터 흐름에 따라 하위 로직에 자동으로 변화를 전파할 수 있어야 함

    -> 그래서 필수적 요소로 함수형 프로그래밍을 활용

 

3. Blocking / Nonblocking, Synchronous/Asynchronous

- Blocking/Nonblocking

  : 호출하는 시점에 제어권을 넘겨주는지에 초점

- Synchronous/Asynchronous

  : 작업 완료 여부를 확인하는지에 초점

 

- 동기-블로킹, 동기-논블로킹, 비동기-블로킹, 비동기-논브로킹 구분

 

4. Reactive Stream과 Reactor

- Reactive Stream

  : Reactive Programming을 위한 Interface

  : Nonblocking BackPressure를 이용한 비동기 스트림

    (*BackPressure: Publisher에서 데이터를 Subscriber로 Push 하는 방식이 아니라, Pull 방식으로 Subscriber가 Publisher로
      처리할 수 있는 양의 크기만큼 데이터를 요청 함으로써 Subscriber의 장애를 방지하기 위함이다. (OutOfMemory 등)

      즉, 다이나믹 풀 방식의 데이터 요청을 통해서 Subscriber가 수용할 수 있는 만큼 데이터를 요청하는 방식이다.)

- Reactor

  : Reactor Stream의 구현체 중 하나

  : Spring5, Spring boot 2.0 이상부터 지원

 

5. Reactive Stream Interface

- 메이븐 링크(https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams)

  : 2019년으로 업데이트 끝남. 사실상 완료

- 4가지로 구성

    - Processor

    - Publisher

    - Subscriber

    - Subscription

Reactive Stream 동작 방식

6. Marble Diagram

- Observable 한 데이터의 Stream!

- https://reactivex.io/documentation/ko/observable.html

 

7. Reactive Core Feature

- Reactor Project의 주요 구현체 포함

    - Mono

    - Flux

    - Schedulers

    - Errors

    - Processors

    - Operations

    - Handling errors

        - Testing

 

8. Flux/Mono

- Publisher의 구현체

- Flux: 0~N 개의 결과을 가짐

- Mono: 0~1 개의 결과를 가짐

   

9. Sequence

- 변화 가능한 데이터의 흐름

- Publisher에 의해 발행(publish) 되고

   Subscriber에 의해 구독(Subscription) 된다.

 

10. Sequence 생성(Create)

- 문서 링크(https://reactivex.io/documentation/operators.html#creating)

    - Just: 

    - Empty: 

    - From: 

    - Defer: 

 

10. Scheduler

- 문서 링크(https://reactivex.io/documentation/ko/scheduler.html)

- Sequence를 처리할 쓰레드를 지정하는 것

- Reactor는 비동기 실행을 강제하지 않음

- publishOn

  : next, complete, error 신호처리 쓰레드 설정

  : 다음 publishOn을 만날 때까지 같은 쓰레드에서 동작

- subscribeOn

  : 시퀀스를 실행할 쓰레드를 설정

  : publishOn을 만날 때까지 같은 쓰레드에서 실행

  : publishOn이 신호를 처리할 쓰레드를 지정하므로

    publishOn 뒤에 오는 subscribeOn은 무시된다.

    (* 우선순위는 publishOn > subscribeOn)

 

11. Scheduler의 종류

- Immediate: 지금 실행 중인 쓰레드에서 실행

- Single: Runnable Executor 실행

- Parallel: Core 갯수만큼 쓰레드 생성

- Elastic: 쓰레드를 무한정 생성

- Bounded Elastic: Core*10 만큼 쓰레드 생성

 

12. Operations

- 문서 링크 (https://reactivex.io/documentation/operators.html)

- (자주 사용되는 것들만 정리)

- 변환

  : flatMap, groupBy, map

- 필터

  : distinct, elementAt, filter, ignoreElements, take

- 결합연산자

  : merge, startWith, then, zip

- 오류처리 연산자

  : onErrorResume, retry

- 유틸리티 연산자

  : delay, do*, serialize, subscribe, timestamp

- 조건 연산자

  : all, contains, defaultEmpty, skipWhile, takeWhile

- 집계 연산자

  : average*, count, max, min, reduce, sum

 

13. Handling Errors

- Publiser가 아이템 처리 시 마다 onNext() 이벤트를 발생시키다가,

  error를 만날 경우 onError()를 발생시키고, sequnece 중지

- onErrorReturn() 이나 onErrorResume() 등으로 에러 발생 시 로직 작성

 

14. Testing

- reactor-tesing dependency 추가

- 테스트할 모듈이 flux나 mono를 반환하도록 하여 검사

- 기대하는 동작을 StepVerifier API로 작성

 

  1. StepVerifier를 생성하여 테스트할 Sequence 입력

  2. expect*에 기대값 표현

  3. 일부 시퀀스를 통과하고 싶을 때는 consumeNextWith()

  4. 종료 이벤트를 기대할 경우 expectComplete()

      또는 expectError() 

  5. 마지막으로 verify()를 호출하여 검증단계 trigger

 

   * 단계 중 기대한 값과 다른 데이터를 subscribe하면 즉시 AssertionError를 발생하여 테스트 종료

 

ex) StepVerifier.create(

 testFunction(source)

    .expectNext("s1")

    .expectNext("s2")

    .expectComplete()

    .verify()

)

반응형

'Dev' 카테고리의 다른 글

2024년 말에 다시 시작하는 사이드프로젝트 세팅  (0) 2024.10.07
WebFlux  (0) 2022.04.19
Event-driven Architecture & Programming  (0) 2022.04.11