java grpc四种模式介绍

2024-09-06 11:44
文章标签 java 模式 介绍 四种 grpc

本文主要是介绍java grpc四种模式介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

GRPC功能

一.GRPC的响应模式

1.GRPC的四种响应模式

(1)UNARY(简单模式)
  • 也称简单 RPC,即客户端发起一次请求,服务端响应处理后返回一个结果给客户端。
(2) SERVER_STREAMING(服务端流模式)
  • 客户端发起一次请求,服务端可以连续返回数据流(即分批次返回场景)。
(3)CLIENT_STREAMING(客户端流模式)
  • 服务端数据流模式相反,客户端持续向服务端发送数据流,在发送结束后,由服务端返回一个响应。
(4)BIDI_STREAMING(双向流模式)
  • 客户端和服务端都可以向对方多次收发数据。
(5)对应io.grpc.MethodDescriptor.MethodType的枚举
public enum MethodType {/*** One request message followed by one response message.*/UNARY,/*** Zero or more request messages with one response message.*/CLIENT_STREAMING,/*** One request message followed by zero or more response messages.*/SERVER_STREAMING,/*** Zero or more request and response messages arbitrarily interleaved in time.*/BIDI_STREAMING,/*** Cardinality and temporal relationships are not known. Implementations should not make* buffering assumptions and should largely treat the same as {@link #BIDI_STREAMING}.*/UNKNOWN;/*** Returns {@code true} for {@code UNARY} and {@code SERVER_STREAMING}, which do not permit the* client to stream.** @since 1.0.0*/public final boolean clientSendsOneMessage() {return this == UNARY || this == SERVER_STREAMING;}/*** Returns {@code true} for {@code UNARY} and {@code CLIENT_STREAMING}, which do not permit the* server to stream.** @since 1.0.0*/public final boolean serverSendsOneMessage() {return this == UNARY || this == CLIENT_STREAMING;}}

2. proto文件定义

// @link https://github.com/grpc/grpc-java/blob/master/examples/src/main/proto/grpc/examples/echo/echo.proto
syntax = "proto3";import "google/protobuf/any.proto";option java_multiple_files = true;
//生成java代码的package
option java_package = "com.zzc.rpc.grpc.entity";
//创建的javaBean的文件名
//option java_outer_classname = "DemoProto";
//可以生成rpc接口
//option java_generic_services = true;//声明一个服务名称
service DemoService {//request is unary echo. 简单模式rpc unaryRequest (RequestGrpc) returns (ResponseGrpc) {}// serverStreamingRequest is server side streaming.服务端流模式rpc serverStreamingRequest(RequestGrpc) returns (stream ResponseGrpc) {}// clientStreamingRequest is client side streaming. 客户端流模式rpc clientStreamingRequest(stream RequestGrpc) returns (ResponseGrpc) {}// bidirectionalStreamingRequest is bidi streaming. 双向流模式rpc bidirectionalStreamingRequest(stream RequestGrpc) returns (stream ResponseGrpc) {}
}message MetadataGrpc {string type = 3;string clientIp = 8;map<string, string> headers = 7;
}message RequestGrpc {MetadataGrpc metadata = 2;google.protobuf.Any body = 3;
}message ResponseGrpc {int32 code = 5;string msg = 6;MetadataGrpc metadata = 2;google.protobuf.Any body = 3;
}

3.代码示例

(1)代码结构

代码目录结构

(2)maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.zzc.rpc</groupId><artifactId>rpc-design</artifactId><version>1.0-SNAPSHOT</version></parent><groupId>com.zzc.rpc.grpc</groupId><artifactId>grpc-demo</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><grpc.version>1.65.1</grpc.version><protostuff.version>1.8.0</protostuff.version></properties><dependencies><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty</artifactId><version>${grpc.version}</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>${grpc.version}</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>${grpc.version}</version></dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-core</artifactId><version>${protostuff.version}</version></dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-runtime</artifactId><version>${protostuff.version}</version></dependency><!-- log4j2日志门面 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId></dependency><!-- log4j2日志框架 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId></dependency><!-- slf4j日志门面 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></dependency><!--log4j2的适配器,为slf4j绑定日志框架 --><!-- 依赖org.slf4j:slf4j-api:1.7.25 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>provided</scope></dependency></dependencies><build><!-- SPI机制加载类 --><resources><resource><directory>src/main/resources</directory><filtering>true</filtering><includes><include>*.*</include><include>META-INF/services/*</include></includes></resource></resources><!-- 通过proto文件生成java文件相关 --><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.5.0.Final</version></extension></extensions><plugins><plugin><groupId>org.xolstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>0.6.1</version><configuration><protocArtifact>com.google.protobuf:protoc:3.22.4:exe:${os.detected.classifier}</protocArtifact><pluginId>grpc-java</pluginId><pluginArtifact>io.grpc:protoc-gen-grpc-java:1.65.1:exe:${os.detected.classifier}</pluginArtifact></configuration><executions><execution><goals><goal>compile</goal><goal>compile-custom</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.13.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build></project>
(3)通过proto生成grpc相关的java文件

生成Java文件

(4)具体实现
entity实现对像
  • Payload 接口,继承实现该接口的,会被ServerLoader加载
package com.zzc.rpc.grpc.entity;public interface Payload {}
  • Request 请求对象
package com.zzc.rpc.grpc.entity;import java.util.Map;
import java.util.TreeMap;public class Request implements Payload {/*** 创建header集合,并且不根据key字符大小写匹配排序的*/private final Map<String, String> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);private String requestId;public void putHeader(String key, String value) {headers.put(key, value);}public void putAllHeader(Map<String, String> headers) {if (headers == null || headers.isEmpty()) {return;}this.headers.putAll(headers);}public String getHeader(String key) {return headers.get(key);}public String getHeader(String key, String defaultValue) {String value = headers.get(key);return (value == null) ? defaultValue : value;}public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public Map<String, String> getHeaders() {return headers;}public void clearHeaders() {this.headers.clear();}
}
  • Response 响应类
package com.zzc.rpc.grpc.entity;public class Response implements Payload {private int resultCode = 0;private int code;private String msg;private String requestId;public String getRequestId() {return requestId;}public void setRequestId(String requestId) {this.requestId = requestId;}public boolean isSuccess() {return this.resultCode == 0;}public int getResultCode() {return resultCode;}public void setResultCode(int resultCode) {this.resultCode = resultCode;}public String getMsg() {return msg;}public void setMsg(String msg) {this.msg = msg;}public int getCode() {return code;}public void setCode(int code) {this.code = code;}public void setErrorInfo(int errorCode, String errorMsg) {this.resultCode = errorCode;this.code = errorCode;this.msg = errorMsg;}public static Response build() {return new Response();}public Response code(int code) {this.code = code;return this;}public Response msg(String msg) {this.msg = msg;return this;}}
  • 请求头

package com.zzc.rpc.grpc.entity;public class RequestMeta {private String connectionId = "";private String clientIp = "";private String clientVersion = "";public String getClientVersion() {return clientVersion;}public void setClientVersion(String clientVersion) {this.clientVersion = clientVersion;}public String getConnectionId() {return connectionId;}public void setConnectionId(String connectionId) {this.connectionId = connectionId;}public String getClientIp() {return clientIp;}public void setClientIp(String clientIp) {this.clientIp = clientIp;}}
在resources中添加SPI扫描配置
  • 在resources下新建META-INF/services/目录,然后新建接口全限定名的文件:com.zzc.rpc.grpc.entity.Payload,里面加上我们需要用到的实现类。
com.zzc.rpc.grpc.entity.Request
com.zzc.rpc.grpc.entity.Response
添加utils工具类
  • ByteBufferBackedInputStream buffer对象读取
package com.zzc.rpc.grpc.utils;import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;/*** buff读取*/
public class ByteBufferBackedInputStream extends InputStream {protected final ByteBuffer _b;public ByteBufferBackedInputStream(ByteBuffer buf) { _b = buf; }@Override public int available() { return _b.remaining(); }@Overridepublic int read() throws IOException { return _b.hasRemaining() ? (_b.get() & 0xFF) : -1; }@Overridepublic int read(byte[] bytes, int off, int len) throws IOException {if (!_b.hasRemaining()) return -1;len = Math.min(len, _b.remaining());_b.get(bytes, off, len);return len;}
}
  • PayloadRegistry,加载继承Payload的类,实际上Request和Response可以做成abstract类型,然后接收对象只有继承Payload就可以注册了
package com.zzc.rpc.grpc.utils;import com.zzc.rpc.grpc.entity.Payload;
import com.zzc.rpc.grpc.entity.Request;
import com.zzc.rpc.grpc.entity.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.lang.reflect.Modifier;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.ServiceLoader;/*** 加载继承Payload的类,实际上Request和Response可以做成abstract类型,然后接收对象只有继承Payload就可以注册了*/
public class PayloadRegistry {private static final Logger log = LoggerFactory.getLogger(PayloadRegistry.class);private static final Map<String, Class<?>> REGISTRY_REQUEST = new HashMap<>();static boolean initialized = false;public static void init() {log.info("start init PayloadRegistry...");scan();}private static synchronized void scan() {if (initialized) {return;}log.info("start scan");ServiceLoader<Payload> payloads = ServiceLoader.load(Payload.class);for (Payload payload : payloads) {log.info("scan classname:{}, class:{}", payload.getClass().getSimpleName(), payload.getClass());register(payload.getClass().getSimpleName(), payload.getClass());}initialized = true;}static void register(String type, Class<?> clazz) {if (Modifier.isAbstract(clazz.getModifiers())) {//抽象类型的不注册return;}if (REGISTRY_REQUEST.containsKey(type)) {throw new RuntimeException(String.format("Fail to register, type:%s ,clazz:%s ", type, clazz.getName()));}log.info("register type:{}, class:{}", type, clazz);REGISTRY_REQUEST.put(type, clazz);}public static Class<?> getClassByType(String type) {return REGISTRY_REQUEST.get(type);}public static void main(String[] args) {init();}
}
  • GRPCUtils 工具类,转换Request、Response等对象
package com.zzc.rpc.grpc.utils;import com.alibaba.fastjson2.JSON;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.zzc.rpc.grpc.entity.MetadataGrpc;
import com.zzc.rpc.grpc.entity.Request;
import com.zzc.rpc.grpc.entity.RequestGrpc;
import com.zzc.rpc.grpc.entity.RequestMeta;
import com.zzc.rpc.grpc.entity.Response;
import com.zzc.rpc.grpc.entity.ResponseGrpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;public class GRPCUtils {private static final Logger log = LoggerFactory.getLogger(GRPCUtils.class);public static RequestGrpc convert(Request request, RequestMeta meta) {RequestGrpc.Builder payloadBuilder = RequestGrpc.newBuilder();MetadataGrpc.Builder metadataBuilder = MetadataGrpc.newBuilder();if (meta != null) {metadataBuilder.putAllHeaders(request.getHeaders()).setType(request.getClass().getSimpleName());}metadataBuilder.setClientIp("127.0.0.1");payloadBuilder.setMetadata(metadataBuilder.build());// request body .byte[] jsonBytes = convertRequestToByte(request);return payloadBuilder.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(jsonBytes))).build();}public static RequestGrpc convert(Request request) {MetadataGrpc newMeta = MetadataGrpc.newBuilder().setType(request.getClass().getSimpleName()).setClientIp("127.0.0.1").putAllHeaders(request.getHeaders()).build();byte[] jsonBytes = convertRequestToByte(request);RequestGrpc.Builder builder = RequestGrpc.newBuilder();return builder.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(jsonBytes))).setMetadata(newMeta).build();}private static byte[] convertRequestToByte(Request request) {Map<String, String> requestHeaders = new HashMap<>(request.getHeaders());request.clearHeaders();byte[] jsonBytes = JSON.toJSONBytes(request);request.putAllHeader(requestHeaders);return jsonBytes;}public static ResponseGrpc convert(Response response) {byte[] jsonBytes = JSON.toJSONBytes(response);MetadataGrpc.Builder metaBuilder = MetadataGrpc.newBuilder().setType(response.getClass().getSimpleName());return ResponseGrpc.newBuilder().setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(jsonBytes))).setMetadata(metaBuilder.build()).build();}public static <T> T parse(RequestGrpc payload) {Class classType = PayloadRegistry.getClassByType(payload.getMetadata().getType());log.info("parse classType:{}", classType);if (classType != null) {ByteString byteString = payload.getBody().getValue();ByteBuffer byteBuffer = byteString.asReadOnlyByteBuffer();T obj = JSON.parseObject(new ByteBufferBackedInputStream(byteBuffer), classType);if (obj instanceof Request) {((Request) obj).putAllHeader(payload.getMetadata().getHeadersMap());}return obj;} else {throw new RuntimeException( "Unknown payload type:" + payload.getMetadata().getType());}}public static <T> T parse(ResponseGrpc payload) {Class classType = PayloadRegistry.getClassByType(payload.getMetadata().getType());log.info("parse classType:{}", classType);if (classType != null) {ByteString byteString = payload.getBody().getValue();ByteBuffer byteBuffer = byteString.asReadOnlyByteBuffer();T obj = JSON.parseObject(new ByteBufferBackedInputStream(byteBuffer), classType);if (obj instanceof Request) {((Request) obj).putAllHeader(payload.getMetadata().getHeadersMap());}return obj;} else {throw new RuntimeException( "Unknown payload type:" + payload.getMetadata().getType());}}}
应用实现
  • Grpc服务端实现
package com.zzc.rpc.grpc;import com.alibaba.fastjson2.JSON;
import com.zzc.rpc.grpc.entity.DemoServiceGrpc;
import com.zzc.rpc.grpc.entity.Request;
import com.zzc.rpc.grpc.entity.RequestGrpc;
import com.zzc.rpc.grpc.entity.Response;
import com.zzc.rpc.grpc.entity.ResponseGrpc;
import com.zzc.rpc.grpc.utils.GRPCUtils;
import com.zzc.rpc.grpc.utils.PayloadRegistry;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class GrpcServer {static {//使用spi加载继承Payload的类PayloadRegistry.init();}private int port = 8001;private Server server;private void start() {try {server = ServerBuilder.forPort(port).addService(new GrpcServerImpl()).build().start();} catch (Exception e) {log.error("start error.");}Runtime.getRuntime().addShutdownHook(new Thread(){@Overridepublic void run() {GrpcServer.this.stop();}});}private void stop() {if (server != null) {server.isShutdown();}}private void blockUntilShutdown() {if (server != null) {try {server.awaitTermination();} catch (InterruptedException e) {log.error("blockUntilShutdown error.", e);}}}class GrpcServerImpl extends DemoServiceGrpc.DemoServiceImplBase {/*** 简单模式*/@Overridepublic void unaryRequest(RequestGrpc requestGrpc, StreamObserver<ResponseGrpc> responseObserver) {Request request = GRPCUtils.parse(requestGrpc);log.info("receive unaryRequest data:{}", JSON.toJSONString(request));//返回请求Response response = Response.build().code(0).msg("UNARY request success.");responseObserver.onNext(GRPCUtils.convert(response));responseObserver.onCompleted();}/*** 服务端流模式*/@Overridepublic void serverStreamingRequest(RequestGrpc requestGrpc, StreamObserver<ResponseGrpc> responseObserver) {Request request = GRPCUtils.parse(requestGrpc);log.info("receive serverStreamingRequest data:{}", JSON.toJSONString(request));//返回请求Response response = Response.build().code(0).msg("SERVER_STREAMING request success.");responseObserver.onNext(GRPCUtils.convert(response));responseObserver.onCompleted();}/*** 客户端流模式,可以多次接收客户端的请求,然后只返回一次*/@Overridepublic StreamObserver<RequestGrpc> clientStreamingRequest(StreamObserver<ResponseGrpc> responseObserver) {return new StreamObserver<RequestGrpc>() {@Overridepublic void onNext(RequestGrpc requestGrpc) {//多次接收客户端的数据Request request = GRPCUtils.parse(requestGrpc);log.info("receive clientStreamingRequest data:{}", JSON.toJSONString(request));}@Overridepublic void onError(Throwable throwable) {responseObserver.onError(new StatusException(Status.INTERNAL));log.error("clientStreamingRequest error.", throwable);}@Overridepublic void onCompleted() {//然后返回一次Response response = Response.build().code(0).msg("CLIENT_STREAMING request success.");responseObserver.onNext(GRPCUtils.convert(response));responseObserver.onCompleted();}};}/*** 双向流模式*/@Overridepublic StreamObserver<RequestGrpc> bidirectionalStreamingRequest(StreamObserver<ResponseGrpc> responseObserver) {StreamObserver<RequestGrpc> streamObserver = new StreamObserver<RequestGrpc>() {@Overridepublic void onNext(RequestGrpc requestGrpc) {Request request = GRPCUtils.parse(requestGrpc);log.info("receive bidirectionalStreamingRequest data:{}", JSON.toJSONString(request));Response response = Response.build().code(0).msg("SERVER_STREAMING request success.");responseObserver.onNext(GRPCUtils.convert(response));}@Overridepublic void onError(Throwable throwable) {responseObserver.onError(new StatusException(Status.INTERNAL));log.error("bidirectionalStreamingRequest error.", throwable);}@Overridepublic void onCompleted() {Response response = Response.build().code(0).msg("BIDI_STREAMING request success.");responseObserver.onNext(GRPCUtils.convert(response));responseObserver.onCompleted();}};return streamObserver;}}public static void main(String[] args) {GrpcServer grpcServer = new GrpcServer();grpcServer.start();grpcServer.blockUntilShutdown();}}
  • Grpc客户端实现
package com.zzc.rpc.grpc;import com.alibaba.fastjson2.JSON;
import com.zzc.rpc.grpc.entity.DemoServiceGrpc;
import com.zzc.rpc.grpc.entity.Payload;
import com.zzc.rpc.grpc.entity.Request;
import com.zzc.rpc.grpc.entity.RequestGrpc;
import com.zzc.rpc.grpc.entity.Response;
import com.zzc.rpc.grpc.entity.ResponseGrpc;
import com.zzc.rpc.grpc.utils.GRPCUtils;
import com.zzc.rpc.grpc.utils.PayloadRegistry;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;@Slf4j
public class GrpcClient {static {PayloadRegistry.init();}private final ManagedChannel channel;private final DemoServiceGrpc.DemoServiceBlockingStub blockingStub;private final DemoServiceGrpc.DemoServiceStub stub;public GrpcClient(String host, int port) {this.channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();this.blockingStub = DemoServiceGrpc.newBlockingStub(channel);this.stub = DemoServiceGrpc.newStub(channel);}public void shutdown() {try {channel.shutdown().awaitTermination(30, TimeUnit.SECONDS);} catch (InterruptedException e) {log.error("");}}public void unarySend() {//Payload request = Payload.newBuilder().setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(JSON.toJSONBytes(name)))).build();Request request = new Request();request.setRequestId("aaaaaaaaaaaaaaaaaaa");request.putHeader("Test", "teste-hader");RequestGrpc requestGrpc = GRPCUtils.convert(request);ResponseGrpc responseGrpc = blockingStub.unaryRequest(requestGrpc);Response rev = GRPCUtils.parse(responseGrpc);log.info("response msg:{}", JSON.toJSONString(rev));}public void clientStreamingSend() {Request request1 = new Request();request1.setRequestId("bbbbbbbbbbbbbbb");request1.putHeader("Test", "teste-hader");RequestGrpc requestGrpc1 = GRPCUtils.convert(request1);Request request2 = new Request();request2.setRequestId("bbbbbbbbbbbbbbb");request2.putHeader("Test", "teste-hader");RequestGrpc requestGrpc2 = GRPCUtils.convert(request2);StreamObserver<ResponseGrpc> streamObserver = new StreamObserver<ResponseGrpc>() {@Overridepublic void onNext(ResponseGrpc responseGrpc) {Response response = GRPCUtils.parse(responseGrpc);log.info("onNext response msg:{}, response type:{}", JSON.toJSONString(response), responseGrpc.getMetadata().getType());}@Overridepublic void onError(Throwable throwable) {log.info("onError");}@Overridepublic void onCompleted() {//发送完数据之后的处理,比如多线程的并发执行的 countDown就可以在这里执行log.info("onCompleted clientStreamingSend.");}};StreamObserver<RequestGrpc> clientStreamingRequest = stub.clientStreamingRequest(streamObserver);try {clientStreamingRequest.onNext(requestGrpc1);//发送第一次clientStreamingRequest.onNext(requestGrpc2);//发送第二次clientStreamingRequest.onCompleted();} catch (Exception e) {clientStreamingRequest.onError(e);log.error("clientStreamingSend error.", e);}}public void serverStreamingSend() {Request request = new Request();request.setRequestId("ddddddddddddddddddddddddd");request.putHeader("Test", "teste-hader");RequestGrpc requestGrpc = GRPCUtils.convert(request);StreamObserver<ResponseGrpc> streamObserver = new StreamObserver<ResponseGrpc>() {@Overridepublic void onNext(ResponseGrpc responseGrpc) {//服务端返回的结果,可以通过事件或接口回调等方式传递给外部使用Response response = GRPCUtils.parse(responseGrpc);log.info("serverStreamingSend response:{}", JSON.toJSONString(response));}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {//发送完数据之后的处理,比如多线程的并发执行的 countDown就可以在这里执行}};stub.serverStreamingRequest(requestGrpc, streamObserver);//同步阻塞执行/*Iterator<ResponseGrpc> responseGrpcs = blockingStub.serverStreamingRequest(requestGrpc);while (responseGrpcs.hasNext()) {ResponseGrpc responseGrpc = responseGrpcs.next();Response response = GRPCUtils.parse(responseGrpc);log.info("serverStreamingSend response:{}", JSON.toJSONString(response));}*/}private void bidirectionalStreamingSend() {CountDownLatch countDownLatch = new CountDownLatch(2);StreamObserver<ResponseGrpc> streamObserver = new StreamObserver<ResponseGrpc>() {@Overridepublic void onNext(ResponseGrpc responseGrpc) {Response response = GRPCUtils.parse(responseGrpc);log.info("bidirectionalStreamingSend response:{}", JSON.toJSONString(response));}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {countDownLatch.countDown();}};StreamObserver<RequestGrpc> bidirectionalStreamingRequest = stub.bidirectionalStreamingRequest(streamObserver);for (int i = 0; i < 4; i ++) {Request request = new Request();request.setRequestId("eeeeeeeeeeeeeeeeeeeeeeeee");request.putHeader("Test", "teste-hader");bidirectionalStreamingRequest.onNext(GRPCUtils.convert(request));}bidirectionalStreamingRequest.onCompleted();}public static void main(String[] args) throws InterruptedException {GrpcClient client = new GrpcClient("127.0.0.1", 8001);//简单模式client.unarySend();Thread.sleep(5000);//客户端流模式client.clientStreamingSend();Thread.sleep(5000);//服务端流模式client.serverStreamingSend();Thread.sleep(5000);//双向流模式client.bidirectionalStreamingSend();Thread.sleep(5000);client.shutdown();Thread.sleep(5000);//等待channel的消息传输完成等}}

这篇关于java grpc四种模式介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1141913

相关文章

Java NoClassDefFoundError运行时错误分析解决

《JavaNoClassDefFoundError运行时错误分析解决》在Java开发中,NoClassDefFoundError是一种常见的运行时错误,它通常表明Java虚拟机在尝试加载一个类时未能... 目录前言一、问题分析二、报错原因三、解决思路检查类路径配置检查依赖库检查类文件调试类加载器问题四、常见

Java注解之超越Javadoc的元数据利器详解

《Java注解之超越Javadoc的元数据利器详解》本文将深入探讨Java注解的定义、类型、内置注解、自定义注解、保留策略、实际应用场景及最佳实践,无论是初学者还是资深开发者,都能通过本文了解如何利用... 目录什么是注解?注解的类型内置注编程解自定义注解注解的保留策略实际用例最佳实践总结在 Java 编程

Java 实用工具类Spring 的 AnnotationUtils详解

《Java实用工具类Spring的AnnotationUtils详解》Spring框架提供了一个强大的注解工具类org.springframework.core.annotation.Annot... 目录前言一、AnnotationUtils 的常用方法二、常见应用场景三、与 JDK 原生注解 API 的

Java controller接口出入参时间序列化转换操作方法(两种)

《Javacontroller接口出入参时间序列化转换操作方法(两种)》:本文主要介绍Javacontroller接口出入参时间序列化转换操作方法,本文给大家列举两种简单方法,感兴趣的朋友一起看... 目录方式一、使用注解方式二、统一配置场景:在controller编写的接口,在前后端交互过程中一般都会涉及

Java中的StringBuilder之如何高效构建字符串

《Java中的StringBuilder之如何高效构建字符串》本文将深入浅出地介绍StringBuilder的使用方法、性能优势以及相关字符串处理技术,结合代码示例帮助读者更好地理解和应用,希望对大家... 目录关键点什么是 StringBuilder?为什么需要 StringBuilder?如何使用 St

使用Java将各种数据写入Excel表格的操作示例

《使用Java将各种数据写入Excel表格的操作示例》在数据处理与管理领域,Excel凭借其强大的功能和广泛的应用,成为了数据存储与展示的重要工具,在Java开发过程中,常常需要将不同类型的数据,本文... 目录前言安装免费Java库1. 写入文本、或数值到 Excel单元格2. 写入数组到 Excel表格

Java并发编程之如何优雅关闭钩子Shutdown Hook

《Java并发编程之如何优雅关闭钩子ShutdownHook》这篇文章主要为大家详细介绍了Java如何实现优雅关闭钩子ShutdownHook,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起... 目录关闭钩子简介关闭钩子应用场景数据库连接实战演示使用关闭钩子的注意事项开源框架中的关闭钩子机制1.

Maven中引入 springboot 相关依赖的方式(最新推荐)

《Maven中引入springboot相关依赖的方式(最新推荐)》:本文主要介绍Maven中引入springboot相关依赖的方式(最新推荐),本文给大家介绍的非常详细,对大家的学习或工作具有... 目录Maven中引入 springboot 相关依赖的方式1. 不使用版本管理(不推荐)2、使用版本管理(推

Java 中的 @SneakyThrows 注解使用方法(简化异常处理的利与弊)

《Java中的@SneakyThrows注解使用方法(简化异常处理的利与弊)》为了简化异常处理,Lombok提供了一个强大的注解@SneakyThrows,本文将详细介绍@SneakyThro... 目录1. @SneakyThrows 简介 1.1 什么是 Lombok?2. @SneakyThrows

在 Spring Boot 中实现异常处理最佳实践

《在SpringBoot中实现异常处理最佳实践》本文介绍如何在SpringBoot中实现异常处理,涵盖核心概念、实现方法、与先前查询的集成、性能分析、常见问题和最佳实践,感兴趣的朋友一起看看吧... 目录一、Spring Boot 异常处理的背景与核心概念1.1 为什么需要异常处理?1.2 Spring B