设为首页 加入收藏

TOP

ScalaPB(5):用akka-stream实现reactive-gRPC(一)
2019-08-15 00:11:46 】 浏览:82
Tags:ScalaPB akka-stream 实现 reactive-gRPC

??在前面几篇讨论里我们介绍了scala-gRPC的基本功能和使用方法,我们基本确定了选择gRPC作为一种有效的内部系统集成工具,主要因为下面gRPC支持的几种服务模式:

1、Unary-Call:独立的一对client-request/server-response,是我们常用的http交互模式 2、Server-Streaming:client发出一个request后从server端接收一串多个response 3、Client-Streaming:client向server发送一串多个request后从server接收一个response 4、Bidirectional-Streaming:由client首先发送request启动连接,然后在这个连接上两端可以不断交互信息。

很明显,gRPC支持双向的streaming。那么如果能把gRPC中ListenableFuture和StreamObserver这两种类型转成akka-stream的基本类型应该就能够实现所谓的reactive-gRPC了。如果我们能用akka-streamyabo体育app下载方式实现gRPC服务调用的话,可能会遭遇下面的场景:在服务端我们只需要实现一种akka-stream的Flow把进来的request转化成出去的response,如下:

// Unary case
Flow[Request].map(computeResponse) // Server streaming
Flow[Request].flatMapConcat(computeResponses) // Client streaming
Flow[Request].fold(defaultResponse)(computeResponse) // Bidirectional streaming
Flow[Request].flatMapConcat(computeResponses)

当然,这是个akka-stream Flow,我们可以在这个Flow里调用任何akka-stream提供的功能,如:

Flow[Request] .throttle(1, 10.millis, 1, ThrottleMode.Shaping) .map(computeResponse)

在客户端我们可以直接经客户端stub调用Flow,如下:

Source
  .single(request)
  .via(stub.doSomething)
  .runForeach(println)

刚好,beyond-the-lines gRPCAkkaStream开源项目提供这么一种gRPC StreamObserver到aka-stream Flow转换桥梁。下面是gRPCAkkaStream的使用示范。先从Unary-Call开始:下面是.proto文件的IDL服务描述:

syntax = "proto3"; package learn.grpc.akka.stream.services; message NumPair { int32 num1 = 1; int32 num2 = 2; } message Num { int32 num = 1; } message SumResult { int32 result = 1; } service SumNumbers { rpc SumPair(NumPair) returns (SumResult) {} }

我们看看编译后自动产生的SumGrpcAkkaStream.scala文件中一些相关类型和函数:

服务界面描述:

trait SumNumbers extends AbstractService { override def serviceCompanion = SumNumbers def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] }

我们看到服务函数sumPair是一个akka-stream Fow[NumPair,SumResult,NotUsed]。下面是具体实现SumNumbers.sumPair代码:

class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers { val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName) override def sumPair: Flow[NumPair, SumResult, NotUsed] = { logger.info(s"*** calling sumPair ... ***") Flow[NumPair].map { case NumPair(a,b) => { logger.info(s"serving ${a} + ${b} = ???") SumResult(a + b) } } }

产生的客户端stub源代码如下:

 class SumNumbersStub( channel: Channel, options: CallOptions = CallOptions.DEFAULT ) extends AbstractStub[SumNumbersStub](channel, options) with SumNumbers { override def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] = Flow[learn.grpc.akka.stream.services.sum.NumPair].flatMapConcat(request => Source.fromFuture( Grpc.guavaFuture2ScalaFuture( ClientCalls.futureUnaryCall(channel.newCall(METHOD_SUM_PAIR, options), request) ) ) ) def stub(channel: Channel): SumNumbersStub = new SumNumbersStub(channel)

我们可以通过stub来调用sumPair方法,如下:

  val channel = ManagedChannelBuilder .forAddress(host,port) .usePlaintext(true) .build() val stub = SumGrpcAkkaStream.stub(channel) def addPair(num1: Int, num2: Int): Source[String,NotUsed] = { logger.info(s"Requesting to add $num1, $num2") Source .single(NumPair(num1,num2)) .via(stub.sumPair) .map(r => s"the result  
		
ScalaPB(5):用akka-stream实现reactive-gRPC(一) https://www.cppentry.com/bencandy.php?fid=90&id=229040

首页 上一页 1 2 3 4 5 下一页 尾页 1/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇重要的博客收集 下一篇ScalaPB(3): gRPC streaming

kafka-
kafka ? Partit
解决android studio
Kafka史上最详细原理
Error while fetchin
【Kafka】安装与快速
? ? &
flume读取日志数据写
Authentication plug
Flume 自定义source
flume ? 三大核
ICC副本>>>
愚公移山 ?
Hbase架构 ? Hb
5 hbase-shell + &
Hbase ? MapRed
MetaException(messa
Exception in thread
HIVE metastore Dupl
-->