gRPC

gRPC

gRPCλŠ” κ³ μ„±λŠ₯ μ˜€ν”ˆ μ†ŒμŠ€ λ²”μš© RPC ν”„λ ˆμž„ μ›Œν¬μž…λ‹ˆλ‹€.

Installation

μ‹œμž‘ν•˜κΈ° 전에 ν•„μš”ν•œ νŒ¨ν‚€μ§€λ₯Ό μ„€μΉ˜ν•΄μ•Όν•©λ‹ˆλ‹€.

$ npm i --save grpc @grpc/proto-loader

Transporter

gRPC μ „μ†‘κΈ°λ‘œ μ „ν™˜ν•˜λ €λ©΄ createMicroservice()λ©”μ†Œλ“œμ— μ „λ‹¬λœ μ˜΅μ…˜ 객체λ₯Ό μˆ˜μ •ν•΄μ•Ό ν•©λ‹ˆλ‹€.

@@filename(main)
const app = await NestFactory.createMicroservice(ApplicationModule, {
  transport: Transport.GRPC,
  options: {
    package: 'hero',
    protoPath: join(__dirname, 'hero/hero.proto'),
  },
});

info 힌트 join()ν•¨μˆ˜λŠ” path νŒ¨ν‚€μ§€μ—μ„œ κ°€μ Έμ˜€κ³  Transport μ—΄κ±°λŠ” @nestjs/microservicesμ—μ„œ 온 κ²ƒμž…λ‹ˆλ‹€.

Options

전솑기 λ™μž‘μ„ κ²°μ •ν•˜λŠ” μ‚¬μš© κ°€λŠ₯ν•œ μ˜΅μ…˜μ΄ 많이 μžˆμŠ΅λ‹ˆλ‹€.

Overview

일반적으둜 package 속성은 protobuf νŒ¨ν‚€μ§€ 이름을 μ„€μ •ν•˜κ³  protoPathλŠ”.proto μ •μ˜ 파일의 κ²½λ‘œμž…λ‹ˆλ‹€. hero.proto νŒŒμΌμ€ ν”„λ‘œν† μ½œ 버퍼 μ–Έμ–΄λ₯Ό μ‚¬μš©ν•˜μ—¬ κ΅¬μ„±λ©λ‹ˆλ‹€.

syntax = "proto3";

package hero;

service HeroService {
  rpc FindOne (HeroById) returns (Hero) {}
}

message HeroById {
  int32 id = 1;
}

message Hero {
  int32 id = 1;
  string name = 2;
}

μœ„μ˜ μ˜ˆμ—μ„œ, μš°λ¦¬λŠ” HeroByIdλ₯Ό μž…λ ₯으둜 μ˜ˆμƒν•˜κ³  Hero λ©”μ‹œμ§€λ₯Ό λ¦¬ν„΄ν•˜λŠ” FindOne() gRPC ν•Έλ“€λŸ¬λ₯Ό λ…ΈμΆœν•˜λŠ” HeroServiceλ₯Ό μ •μ˜ν–ˆμŠ΅λ‹ˆλ‹€. 이 ν”„λ‘œν†  νƒ€μž… μ •μ˜λ₯Ό μΆ©μ‘±μ‹œν‚€λŠ” ν•Έλ“€λŸ¬λ₯Ό μ •μ˜ν•˜λ €λ©΄ @GrpcMethod() λ°μ½”λ ˆμ΄ν„°λ₯Ό μ‚¬μš©ν•΄μ•Όν•©λ‹ˆλ‹€. 이전에 μ•Œλ €μ§„ @MessagePattern()은 더 이상 μœ μš©ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.

@@filename(hero.controller)
@GrpcMethod('HeroService', 'FindOne')
findOne(data: HeroById, metadata: any): Hero {
  const items = [
    { id: 1, name: 'John' },
    { id: 2, name: 'Doe' },
  ];
  return items.find(({ id }) => id === data.id);
}

info 힌트 @GrpcMethod()λ°μ½”λ ˆμ΄ν„°λŠ” @nestjs/microservices νŒ¨ν‚€μ§€μ—μ„œ κ°€μ Έμ˜΅λ‹ˆλ‹€.

HeroServiceλŠ” μ„œλΉ„μŠ€ 이름인 반면, FindOne은 FindOne() gRPC ν•Έλ“€λŸ¬λ₯Ό κ°€λ¦¬ν‚΅λ‹ˆλ‹€. ν•΄λ‹Ήν•˜λŠ” findOne() λ©”μ†Œλ“œλŠ” 두가지 인수, 즉 ν˜ΈμΆœμžλ‘œλΆ€ν„° μ „λ‹¬λœ data와 gRPC μš”μ²­μ˜ 메타 데이터λ₯Ό μ €μž₯ν•˜λŠ” metadataλ₯Ό μ·¨ν•©λ‹ˆλ‹€.

λ˜ν•œ, FindOne은 μ‹€μ œλ‘œ μ—¬κΈ°μ„œ μ€‘λ³΅λ©λ‹ˆλ‹€. @GrpcMethod()에 두 번째 인수λ₯Ό μ „λ‹¬ν•˜μ§€ μ•ŠμœΌλ©΄ NestλŠ” μžλ™μœΌλ‘œ 첫 번째 문자 (예 :findOne->FindOne)와 ν•¨κ»˜ λ©”μ†Œλ“œ 이름을 μ‚¬μš©ν•©λ‹ˆλ‹€.

@@filename(hero.controller)
@Controller()
export class HeroService {
  @GrpcMethod()
  findOne(data: HeroById, metadata: any): Hero {
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ];
    return items.find(({ id }) => id === data.id);
  }
}

λ§ˆμ°¬κ°€μ§€λ‘œ 인수λ₯Ό μ „λ‹¬ν•˜μ§€ λͺ»ν•  μˆ˜λ„ μžˆμŠ΅λ‹ˆλ‹€. 이 경우 NestλŠ” 클래슀 이름을 μ‚¬μš©ν•©λ‹ˆλ‹€.

@@filename(hero.controller)
@Controller()
export class HeroService {
  @GrpcMethod()
  findOne(data: HeroById, metadata: any): Hero {
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ];
    return items.find(({ id }) => id === data.id);
  }
}

Client

ν΄λΌμ΄μ–ΈνŠΈ μΈμŠ€ν„΄μŠ€λ₯Ό λ§Œλ“€λ €λ©΄ @Client()λ°μ½”λ ˆμ΄ν„°λ₯Ό μ‚¬μš©ν•΄μ•Ό ν•©λ‹ˆλ‹€.

@Client({
  transport: Transport.GRPC,
  options: {
    package: 'hero',
    protoPath: join(__dirname, 'hero/hero.proto'),
  },
})
client: ClientGrpc;

이전 μ˜ˆμ™€ λΉ„κ΅ν•˜μ—¬ μ•½κ°„μ˜ 차이가 μžˆμŠ΅λ‹ˆλ‹€. ClientProxy 클래슀 λŒ€μ‹ , getService() λ©”μ†Œλ“œλ₯Ό μ œκ³΅ν•˜λŠ” ClientGrpcλ₯Ό μ‚¬μš©ν•©λ‹ˆλ‹€. getService()μ œλ„€λ¦­ λ©”μ„œλ“œλŠ” μ„œλΉ„μŠ€ 이름을 인수둜 μ‚¬μš©ν•˜κ³  κ°€λŠ₯ν•œ 경우 ν•΄λ‹Ή μΈμŠ€ν„΄μŠ€λ₯Ό λ°˜ν™˜ν•©λ‹ˆλ‹€.

@@filename(hero.controller)
onModuleInit() {
  this.heroService = this.client.getService<HeroService>('HeroService');
}

heroService κ°μ²΄λŠ” .proto 파일 내에 μ •μ˜λœ 것과 λ™μΌν•œ λ©”μ†Œλ“œ μ„ΈνŠΈλ₯Ό λ…ΈμΆœν•©λ‹ˆλ‹€. λͺ¨λ“  κ·œμΉ™μ€ μ†Œλ¬Έμž (μžμ—° κ·œμΉ™μ„ λ”°λ₯΄κΈ° μœ„ν•΄)μž…λ‹ˆλ‹€. 기본적으둜 gRPC HeroService μ •μ˜μ—λŠ” FindOne()ν•¨μˆ˜κ°€ ν¬ν•¨λ˜μ–΄ μžˆμŠ΅λ‹ˆλ‹€. heroService μΈμŠ€ν„΄μŠ€λŠ” findOne()λ©”μ†Œλ“œλ₯Ό μ œκ³΅ν•œλ‹€λŠ” μ˜λ―Έμž…λ‹ˆλ‹€.

interface HeroService {
  findOne(data: { id: number }): Observable<any>;
}

λͺ¨λ“  μ„œλΉ„μŠ€ λ©”μ†Œλ“œλŠ” Observable을 λ¦¬ν„΄ν•©λ‹ˆλ‹€. NestλŠ” RxJS μŠ€νŠΈλ¦Όμ„ μ§€μ›ν•˜κ³  잘 μž‘λ™ν•˜λ―€λ‘œ HTTP 처리기 λ‚΄μ—μ„œλ„ λ°˜ν™˜ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

@@filename(hero.controller)
@Get()
call(): Observable<any> {
  return this.heroService.findOne({ id: 1 });
}

전체 μž‘μ—… μ˜ˆλŠ” μ—¬κΈ°μ—μ„œ 확인할 수 μžˆμŠ΅λ‹ˆλ‹€.

gRPC Streaming

GRPC μžμ²΄λŠ” 슀트림으둜 μ•Œλ €μ§„ λ‘±ν…€ 라이브 연결을 μ§€μ›ν•©λ‹ˆλ‹€. μŠ€νŠΈλ¦Όμ€ μ±„νŒ…, κ΄€μ°° λ˜λŠ” 청크 데이터 전솑과 같은 μ„œλΉ„μŠ€ 사둀에 맀우 μœ μš©ν•œ 도ꡬ가 될 수 μžˆμŠ΅λ‹ˆλ‹€. 곡식 λ¬Έμ„œ (μ—¬κΈ°)μ—μ„œ μžμ„Έν•œ λ‚΄μš©μ„ 확인할 수 μžˆμŠ΅λ‹ˆλ‹€.

NestλŠ” 두 가지 λ°©λ²•μœΌλ‘œ GRPC 슀트림 ν•Έλ“€λŸ¬λ₯Ό μ§€μ›ν•©λ‹ˆλ‹€.

  • RxJS Subject + Observable ν•Έλ“€λŸ¬: Controller λ©”μ†Œλ“œ λ‚΄μ—μ„œ λ°”λ‘œ 응닡을 μ“°κ±°λ‚˜ Subject/Observable μ†ŒλΉ„μžμ—κ²Œ μ „λ‹¬ν•˜λŠ” 데 유용 ν•  수 μžˆμŠ΅λ‹ˆλ‹€

  • 순수 GRPC 호좜 슀트림 ν•Έλ“€λŸ¬: λ…Έλ“œ ν‘œμ€€ Duplex 슀트림 ν•Έλ“€λŸ¬μ— λŒ€ν•œ λ‚˜λ¨Έμ§€ λ””μŠ€νŒ¨μΉ˜λ₯Ό μ²˜λ¦¬ν•˜λŠ” 일뢀 싀행기에 μ „λ‹¬ν•˜λŠ” 데 유용 ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

Subject strategy

@GrpcStreamMethod()λ°μ½”λ ˆμ΄ν„°λŠ” ν•¨μˆ˜ 맀개 λ³€μˆ˜λ₯Ό RxJS Observable둜 μ œκ³΅ν•©λ‹ˆλ‹€.

// Set decorator with selecting a Service definition from protobuf package
// the string is matching to: package proto_example.orders.OrdersService
@GrpcStreamMethod('orders.OrderService')
handleStream(messages: Observable<any>): Observable<any> {
  const subject = new Subject();
  messages.subscribe(message => {
    console.log(message);
    subject.next({
      shipmentType: {
        carrier: 'test-carrier',
      },
    });
  });
  return subject.asObservable();
}

@GrpcStreamMethod()λ°μ½”λ ˆμ΄ν„°μ™€μ˜ 전이쀑 μƒν˜Έ μž‘μš©μ„ μ§€μ›ν•˜λ €λ©΄ 컨트둀러 λ©”μ†Œλ“œμ—μ„œ RxJS 'Observable'을 λ°˜ν™˜ν•΄μ•Ό ν•©λ‹ˆλ‹€.

Pure GRPC call stream handler

@GrpcStreamCall()λ°μ½”λ ˆμ΄ν„°λŠ” grpc.ServerDuplexStreamκ³Ό 같은 ν•¨μˆ˜ 맀개 λ³€μˆ˜λ₯Ό μ œκ³΅ν•˜λ©° .on('data',callback), .write(message)λ˜λŠ” .cancel()κ³Ό 같은 ν‘œμ€€ λ©”μ†Œλ“œλ₯Ό μ§€μ›ν•©λ‹ˆλ‹€. μ‚¬μš© κ°€λŠ₯ν•œ 방법에 λŒ€ν•œ 전체 μ„€λͺ…μ„œλŠ” μ—¬κΈ°μ—μ„œ 찾을 수 μžˆμŠ΅λ‹ˆλ‹€.

// Set decorator with selecting a Service definition from protobuf package
// the string is matching to: package proto_example.orders.OrdersService
@GrpcStreamCall('orders.OrderService')
handleStream(stream: any) {
  stream.on('data', (msg: any) => {
    console.log(msg);
    // Answer here or anywhere else using stream reference
    stream.write({
      shipmentType: {
        carrier: 'test-carrier',
      },
    });
  });
}

이 λ°μ½”λ ˆμ΄ν„°μ—λŠ” νŠΉμ • 리턴 맀개 λ³€μˆ˜λ₯Ό μ œκ³΅ν•  ν•„μš”κ°€ μ—†μŠ΅λ‹ˆλ‹€. μŠ€νŠΈλ¦Όμ€ λ‹€λ₯Έ ν‘œμ€€ 슀트림 μœ ν˜•κ³Ό 같은 λ°©μ‹μœΌλ‘œ 처리될 κ²ƒμœΌλ‘œ μ˜ˆμƒλ©λ‹ˆλ‹€.

Last updated