본문 바로가기

Tech/gRPC

[gRPC] gRPC - Java

반응형

이전의 글들에서는 gRPC 의 기본 개념과 gRPC 에서 사용하는 proto buffer 와 이를 정의하는 proto file 등에 대해서 정리했다. 이번에는 gRPC 를 실제 프로그래밍 언어에서 사용하는 방법을 정리해보려 한다.

grpc
는 Java, Go, Kotlin, C++, Dart, Node, Python, Ruby 등등 다양한 언어를 지원한다. 그 중에서 Java 로 gRPC 를 사용하는 법, 그리고 Java 와 많이 사용되는 웹 프레임워크인 스프링에서 gRPC 서버를 같이 사용하는 법을 정리해본다.

1. 자바 프로젝트 설정

자바에서 gRPC 를 사용하기 위해서는 먼저 자바 프로젝트의 환경 설정부터 진행해줘야 한다. 자바 프로젝트는 gradle 기반으로 생성하였고 gRPC 를 사용하기 위해서 grpc-java 라이브러리를 사용했다. 자세한 설정 및 예제는 깃허브 레포지토리 (https://github.com/grpc/grpc-java) 에서 확인할 수 있다.

- dependencies 설정

grpc-java 라이브러리를 사용하기 위해서는 직접 jar 파일을 다운로드 받거나 maven 이나 gradle 등의 빌드 툴을 사용할 수 있다. gradle 을 사용하는 경우에는 아래와 같이 build.gradle 파일의 dependencies 를 설정해주면 된다.

 

runtimeOnly 'io.grpc:grpc-netty-shaded:1.64.0'
implementation 'io.grpc:grpc-protobuf:1.64.0'
implementation 'io.grpc:grpc-stub:1.64.0'
compileOnly 'org.apache.tomcat:annotations-api:6.0.53' // necessary for Java 9+

 

각각의 설정은 gRPC 통신을 자바에서 사용하기 위해 네트워크 프레임워크인 netty Protocol buffer, stub 등을 사용하기 위한 라이브러리들을 다운로드 받기위한 것이다.

- protobuf 설정

자바에서 프로토콜 버퍼를 사용하기 위해서는 프로토 파일을 컴파일하여 자바 소스코드를 생성해주는 프로토버퍼 컴파일러 (protoc) 가 필요하다. 이를 위해서 protobuf-gradle-plugin (https://github.com/google/protobuf-gradle-plugin) 을 사용하고 이에 대한 설정이 필요하다.

 

plugins {
    id 'com.google.protobuf' version '0.9.4'
}

// protobuf configuration
protobuf {
    // configure the protoc executable
    protoc {
        // download from repositories
        artifact = "com.google.protobuf:protoc:3.25.1"
        // path = '/usr/local/bin/protoc'
    }

    // configure codegen plugins
    plugins {
        // Locate a plugin with name 'grpc'. This step is optional.
        // If you leave it empty, it uses the current directory.
        // If you don't specify it, protoc will try to use "protoc-gen-grpc" from
        // system search path.
        grpc {
            artifact = 'io.grpc:protoc-gen-grpc-java:1.64.0'
        }
    }

    // configure the tasks to apply the plugins
    generateProtoTasks {
        // all() returns the collection of all protoc tasks
        all()*.plugins {
            grpc {}
        }
    }
}

 

위의 예제는 build.gradle 에서 protobuf-gradle-plugins 사용을 위한 설정을 한 것이다. 먼저 plugins 에서 com.google.protobuf 를 추가해준다. 버전은 최신 버전인 0.9.4 로 설정했다.

 

protobuf 블럭에서는 프로토 파일 컴파일을 위해 protoc 와 관련된 설정이 위치한다. protoc 블럭은 프로토 파일의 컴파일에 사용할 protoc 에 대한 정보를 입력한다. 이 예제에서는 artifact 를 설정해주어서 protoc 실행파일을 다운로드 받을 레포지토리를 설정해주었다. 만약 로컬에 protoc 실행파일이 있다면 로컬 위치를 path 로 설정해주어서 해당 컴파일러를 사용하도록 할 수 있다.

 

protoc 를 실행할 때 codegen 플러그인과 함께 사용할 수 있다. protobuf 블럭 안의 plugins 에서 설정하여 사용할 수 있는데 protoc 의 위치를 설정해준 것과 같이 plugins 안의 grpc 블럭안에 artifact 또는 path 로 설정해줄 수 있다. 이 예제에서는 Java 소스파일을 생성하기 위해 protoc-gen-grpc-java 를 사용하도록 설정하였다. 그리고 generateProtoTasks 에서 작업들을 설정해주어 해당 플러그인을 반영해준다.

2. proto file 작성 및 빌드

- proto file 작성

gRPC 를 사용하기 위해서는 먼저 gRPC 통신에 사용할 서비스와 메시지 등등을 정의한 프로토 파일을 작성한다. proto-buf-gradle-plugin 플러그인은 'src/$sourceSetName/proto' 에 위치하고 있는 프로토 파일을 읽는다. 기본적으로는 'src/main/proto' 와 'src/test/proto' 아래에 프로토 파일을 위치시킨다.

 

syntax = "proto3";

// specify package for generated java classes
option java_package = "gradle.grpc";

service DemoTest {
  rpc Unary(DemoRequest) returns (DemoResponse) {}
  rpc ServerSideStreaming(DemoRequest) returns (stream DemoResponse) {}
  rpc ClientSideSteraming(stream DemoRequest) returns (DemoResponse) {}
  rpc BidirectionalStreaming(stream DemoRequest) returns (stream DemoResponse) {}
}

message DemoRequest {
  int32 a = 1;
  int32 b = 2;
}

message DemoResponse {
  int32 result = 1;
}

 

위의 예제는 proto3 버전으로 작성한 프로토 파일이다. java_package 옵션으로 생성될 파일들의 패키지 이름을 정의한다. 이 파일에는 DemoTest 라는 서비스와 DemoRequest, DemoResponse 라는 두가지 메시지 타입을 정의했다. DemoTest 는 총 4개의 함수를 가지고 있는데, 이는 각각 gRPC 에서 단일 매개변수 또는 stream 매개변수를 가질 수 있는 4가지 경우의 수를 예제로 만든 것이다. Request 로 입력받은 값의 합을 Response 로 출력하는 기능으로 예제를 구현할 것이다.

- Protocol buffer 빌드

프로토 파일에 정의한 서비스와 메시지를 사용하기 위해서는 빌드를 통해서 자바 클래스를 생성해주어야 한다. 예제 프로젝트는 gradle 기반이기 떄문에 'gradlew build' 명령어를 통해 빌드해준다. 빌드를 완료하면 '$buildDir/generated/source/proto/$sourceSet/$builtinPluginName' 경로에 소스코드들이 생성된다. 이 예제에서는 'app/build/generated/source/proto/main/' 아래에 지정된 패키지 경로에 따라서 서비스 클래스와 메시지 데이터 클래스가 생성된다.

3. 서버 구현

아래는 gRPC 서버를 구현한 예제로 빌드 과정으로 생성된 클래스인 DemoRequest, DemoResponse, DemoTestGrpc 등을 사용하고 있다.

 

package gradle.grpc;

import java.io.IOException;

import gradle.grpc.DemoTestOuterClass.DemoRequest;
import gradle.grpc.DemoTestOuterClass.DemoResponse;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;

public class GrpcServer {
  private Server server;
  private int port;

  private static class DemoTestServer extends DemoTestGrpc.DemoTestImplBase {

    /**
     * StreamObserver: response observer, which is a special interface for server to call with its response
     */
    @Override
    public void unary(
      DemoRequest request,
      StreamObserver<DemoResponse> responseObserver
    ) {
      int a = request.getA();
      int b = request.getB();
      // return the value.
      responseObserver.onNext(generateDemoResponse(a + b));
      // specify that we finished dealing with the RPC
      responseObserver.onCompleted();
    }

    private DemoResponse generateDemoResponse(int result) {
      return DemoResponse.newBuilder()
        .setResult(result)
        .build();
    }

    @Override
    public void serverSideStreaming(
      DemoRequest request,
      StreamObserver<DemoResponse> responseObserver
    ) {
      int a = request.getA();
      int b = request.getB();
      int result = a + b;
      for (int i = 0; i < 10; i++) {
        responseObserver.onNext(generateDemoResponse(result));
        result += a + b;
      }
      responseObserver.onCompleted();
    }

    @Override
    public StreamObserver<DemoRequest> clientSideSteraming(
      StreamObserver<DemoResponse> responseOberver
    ) {
      return new StreamObserver<DemoRequest>() {
        int sum = 0;
        int count = 0;

        @Override
        public void onNext(DemoRequest value) {
          int a = value.getA();
          int b = value.getB();
          this.sum += a + b;
          this.count++
        }

        @Override
        public void onError(Throwable t) {
          System.out.println(t.getMessage());
        }

        @Override
        public void onCompleted() {
          System.out.println("count=" + this.count + " sum=" + this.sum);
          responseOberver.onNext(generateDemoResponse(sum));
          responseOberver.onCompleted();
        }
      };
    }

    @Override
    public StreamObserver<DemoRequest> bidirectionalStreaming(
      StreamObserver<DemoResponse> responseObserver
    ) {
      return new StreamObserver<DemoRequest>() {

        @Override
        public void onNext(DemoRequest value) {
          int a = value.getA();
          int b = value.getB();
          responseObserver.onNext(generateDemoResponse(a + b));
        }

        @Override
        public void onError(Throwable t) {
          System.out.println(t.getMessage());
        }

        @Override
        public void onCompleted() {
          responseObserver.onCompleted();
        }
      };
    }
  }

  public GrpcServer(int port) {
    this.server = ServerBuilder
      .forPort(port)
      .addService(new DemoTestServer())
      .build();
    this.port = port;
  }

  public static void main(String[] args) throws IOException, InterruptedException {
    int port = 50051;
    GrpcServer server = new GrpcServer(port);
    server.start();
    server.blockUntilShutdown();
  }

  public void start() throws IOException {
    this.server.start();
    System.out.println("Server started. Listening on " + this.port);
  }

  /**
   * Await termination on the main thread since the grpc library uses daemon threads.
   */
  private void blockUntilShutdown() throws InterruptedException {
    if (server != null) {
      server.awaitTermination();
    }
  }
}

 

gRPC 서버 구현은 두가지 부분으로 나뉜다.

1) 서비스 구현 - 프로토 파일에 정의된 서비스에 의해 생성된 base class 를 상속받아 실제 동작으로 오버라이딩
2) gRPC 서버 실행 - gRPC 서버를 실행하여 클라이언트의 요청을 받고 응답을 반환

- 서비스 구현

DemoTestServer 클래스는 프로토 파일의 DemoTest 서비스에 의해 생성된 추상 클래스DemoTestGrpc.DemoTestImplBase 를 상속받는다. 상속받은 후에는 서비스에 정의된 4가지 함수인 unary(), serverSideStreaming(), clientSideStreaming(), bidirectionalStreaming() 을 구현한다.


함수를 구현하는 것을 보면 DemoRequest, DemoResponse, StreamObserver 등을 발견할 수 있다. DemoRequest, DemoResponse 는 프로토 파일에 정의된 메시지 타입으로 Builder 형식으로 객체를 생성할 수 있으며, getter 를 가지고 있다. StreamObserver 는 stream 타입의 메시지를 다루는 인터페이스이다. 스트림의 다음 값을 수신하는 onNext() 함수와 에러 발생시 호출되는 onError(), 그리고 스트림 수신 완료 알림을 받는 onCompleted() 함수를 가지고 있다. 이러한 함수들을 호출하여 스트림 타입의 입력을 다룰 수 있다.

 

clientSideStreaming() 과 bidirectionalStreaming() 함수는 StreamObserver 타입을 반환한다. 이들은 스트림 타입의 요청을 입력받기 때문에 입력된 스트림 타입을 어떻게 다룰지 구현한 StreamObserver 객체를 반환하여 서버에서 요청을 처리하도록 한다.

- gRPC 서버 실행

gRPC 서비스를 구현하였다면 이제 서버를 실행하여 클라이언트가 접근할 수 있도록 해야한다. gRPC 에서는 ServerBuilder 라는 클래스를 제공하여 서버를 생성할 수 있도록 해준다. ServerBuilder 로 서버객체를 생성할 때 gRPC 서비스를 구현한 DemoTestServer 객체를 서비스로 추가한 후 빌드해주어 서버 객체를 생성한다. 그렇게 생성된 서버 객체에 start() 함수를 호출하여 서버를 동작시킨다.

4. 클라이언트 구현

package gradle.grpc;

import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import gradle.grpc.DemoTestGrpc.DemoTestBlockingStub;
import gradle.grpc.DemoTestGrpc.DemoTestStub;
import gradle.grpc.DemoTestOuterClass.DemoRequest;
import gradle.grpc.DemoTestOuterClass.DemoResponse;

import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

public class GrpcClient {
  private Channel channel;
  private DemoTestStub asyncStub;
  private DemoTestBlockingStub syncStub;

  public GrpcClient(String host, int port) {
    this.channel = ManagedChannelBuilder
      .forAddress(host, port)
      .usePlaintext()
      .build();
    this.asyncStub = DemoTestGrpc.newStub(channel);
    this.syncStub = DemoTestGrpc.newBlockingStub(channel);
  }

  public static void main(String[] args) throws InterruptedException {
    String host = "localhost";
    int port = 50051;
    if (0 < args.length) {
      port = Integer.parseInt(args[0]);
    }

    try {
      GrpcClient client = new GrpcClient(host, port);
      // unary
      client.callUnary(1, 2);
      // server streaming
      client.callServerSideStreaming(1, 2);
      // client streaming
      client.callClientSideStreaming(10);
      // bidirectional streaming
      client.callBidirectionalStreaming(10);
    } finally {
      // close channel?
    }
  }

  private void callUnary(int a, int b) {
    DemoRequest request = generateDemoRequest(a, b);

    try {
      DemoResponse response = syncStub.unary(request);
      System.out.println("**** unary ****");
      System.out.println("a + b = " + response.getResult());
    } catch (StatusRuntimeException e) {
      System.out.println("RPC failed: " + e.getStatus());
      return ;
    }
  }

  private DemoRequest generateDemoRequest(int a, int b) {
    return DemoRequest.newBuilder().setA(a).setB(b).build();
  }

  private void callServerSideStreaming(int a, int b) {
    DemoRequest request = generateDemoRequest(a, b);
    try {
      Iterator<DemoResponse> responses = syncStub.serverSideStreaming(request);
      System.out.println("**** server side streaming ****");
      System.out.println("a = " + a + ", b = " + b);
      while (responses.hasNext()) {
        System.out.println(responses.next().getResult());
      }
    } catch (StatusRuntimeException e) {
      System.out.println("RPC failed: " + e.getStatus());
      return ;
    }
  }

  private void callClientSideStreaming(int count) throws InterruptedException {
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<DemoResponse> responseObserver = new StreamObserver<DemoResponse>() {

      @Override
      public void onNext(DemoResponse value) {
        System.out.println(value.getResult());
      }

      @Override
      public void onError(Throwable t) {
        System.out.println(t.getMessage());
        finishLatch.countDown();
      }

      @Override
      public void onCompleted() {
        finishLatch.countDown();
      }
    };

    StreamObserver<DemoRequest> requestObserver =
      asyncStub.clientSideSteraming(responseObserver);
    try {
      Random rand = new Random();
      System.out.println("**** client side streaming ****");
      for (int i = 0; i < count; i++) {
        int a = rand.nextInt(100);
        int b = rand.nextInt(100);
        System.out.println("a = " + a + ", b = " + b);
        requestObserver.onNext(generateDemoRequest(a, b));
      }
    } catch (RuntimeException e) {
      requestObserver.onError(e);
      throw e;
    }
    requestObserver.onCompleted();
    finishLatch.await(1, TimeUnit.MINUTES);
  }

  private void callBidirectionalStreaming(int count) throws InterruptedException {
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<DemoResponse> responseObserver = new StreamObserver<DemoResponse>() {

      @Override
      public void onNext(DemoResponse value) {
        System.out.println(value.getResult());
      }

      @Override
      public void onError(Throwable t) {
        System.out.println(t.getMessage());
        finishLatch.countDown();
      }

      @Override
      public void onCompleted() {
        finishLatch.countDown();
      }
    };

    StreamObserver<DemoRequest> requestObserver
      = asyncStub.bidirectionalStreaming(responseObserver);
    try {
      Random rand = new Random();
      System.out.println("**** bidirectional streaming ****");

      for (int i = 0; i < count; i++) {
        int a = rand.nextInt(100);
        int b = rand.nextInt(100);
        System.out.println("a = " + a + ", b = " + b);
        requestObserver.onNext(generateDemoRequest(a, b));
        if (finishLatch.getCount() == 0) {
          return ;
        }
      }
    } catch (RuntimeException e) {
      requestObserver.onError(e);
      throw e;
    }

    requestObserver.onCompleted();
    finishLatch.await(1, TimeUnit.MINUTES);
  }
}

 

gRPC 클라이언트는 서버와 연결을 위하여 Channel Stub 을 생성해야 한다. Channel 은 서버와 클라이언트 간의 TCP 연결을 맺어주는 객체이다. Stub 은 channel 을 통해서 연결된 서버의 rpc 함수를 호출할 수 있도록 해주는 객체로 아래의 두가지 종류가 존재한다.

 

- blocking/synchronous stub: 서버에 요청을 보재고 응답을 반환받거나 에러가 발생할 때까지 대기한다.
- non-blocking/asynchronous stub: non-blocking 호출로 서버의 응답이 비동기적으로 반환된다. 스트림 호출에 대해서 사용할 수 있다.

 

GrpcClient 의 생성자에서 Channel syncStub, asyncStub 을 생성한다. 이때 Channel Server 에서와 유사하게 Builder 를 통해 객체를 생성한다. 이렇게 생성된 Stub 들은 클라이언트의 각 함수 안에서 서버의 함수들을 호출하는데 사용된다.

 

클라이언트에서 서버의 각 함수를 호출하는 것은 서버 예제와 유사한데, 단일 응담의 경우 DemoResponse 객체를 반환받고 스트림 타입의 경우 Iterator 타입으로 반환받는다. client side streaming bidirectional streaming 의 경우에는 StreamObserver 타입 객체를 사용하여 통신한다.

 

[Reference]

- https://grpc.io/docs/languages/java/basics/

- https://github.com/grpc/grpc-java/blob/master/README.md

- https://github.com/google/protobuf-gradle-plugin

반응형

'Tech > gRPC' 카테고리의 다른 글

[gRPC] Proto file (.proto) - Message, Service  (0) 2024.05.13
[gRPC] gRPC 와 Protocol Buffer  (0) 2024.05.09