对于一个框架来说,通常具备有所谓的中间件,有时候也可以说是拦截器,其实和钩子差不多的概念。
那grpc也不例外。但是使用python如何应用到我们的拦截器的呐? 拦截器又可以做哪些事情呢?
1:grpc的拦截器可以做啥?
本身拦截器的概念和我们的中间件类似,所以类似fastapi中我们的中间件能做,拦截器都可以做:
- 身份验证
- 日志请求记录
- 全局上下文的信息处理等
- 多个拦截器和多个中间件遵循的请求规则都是洋葱模型
- 拦截器必须有返回值,返回是响应报文体
PS:而且相对GRPC来说不止于我们的服务端有钩子,客户端也有钩子(拦截器),和我们的httpx库提供的类似的钩子函数差不多!
PS:拦截器可以作用再客户端和服务端:客户端拦截器和服务端拦截器
2:grpc的拦截器分类
- 一元拦截器(UnaryServerInterceptor)-客户端中
- 流式拦截器(StreamClientInterceptor)- 客户端中
- python中的服务端是实现ServerInterceptor
3:在python实现grpc拦截器
查看服务传递的拦截器参数说明:
3.1 服务端的自带拦截器
主要注意点:
- 拦截器传入是一个实例化的对象
- 拦截器列表的传入,可以是元组也可以是列表
- 多拦截器的形式遵循洋葱模型
服务端拦截器需要实现拦截器的抽象方法:
完整服务端示例代码:
from concurrent import futures
import time
import grpc
import hello_pb2
import hello_pb2_grpc
import signal
# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(hello_pb2_grpc.GreeterServicer):
# 实现 proto 文件中定义的 rpc 调用
def SayHello(self, request, context):
# 返回是我们的定义的响应体的对象
return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))
def SayHelloAgain(self, request, context):
# 返回是我们的定义的响应体的对象
# # 设置异常状态码
# context.set_code(grpc.StatusCode.PERMISSION_DENIED)
# context.set_details("你没有这个访问的权限")
# raise context
# 接收请求头的信息
print("接收到的请求头元数据信息", context.invocation_metadata())
# 设置响应报文头信息
context.set_trailing_metadata((('name', '223232'), ('sex', '23232')))
# 三种的压缩机制处理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 局部的数据进行压缩
context.set_compression(grpc.Compression.Gzip)
return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))
class MyUnaryServerInterceptor1(grpc.ServerInterceptor):
def intercept_service(self,continuation, handler_call_details):
print("我是拦截器1号:开始----1")
respn = continuation(handler_call_details)
print("我是拦截器1号:结束----2",respn)
return respn
class MyUnaryServerInterceptor2(grpc.ServerInterceptor):
def intercept_service(self,continuation, handler_call_details):
print("我是拦截器2号:开始----1")
respn = continuation(handler_call_details)
print("我是拦截器2号:结束----2",respn)
return respn
def serve():
# 实例化一个rpc服务,使用线程池的方式启动我们的服务
# 服务一些参数信息的配置
options = [
('grpc.max_send_message_length', 60 * 1024 * 1024), # 限制发送的最大的数据大小
('grpc.max_receive_message_length', 60 * 1024 * 1024), # 限制接收的最大的数据的大小
]
# 三种的压缩机制处理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 配置服务启动全局的数据传输的压缩机制
compression = grpc.Compression.Gzip
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
options=options,
compression=compression,
interceptors=[MyUnaryServerInterceptor1(),MyUnaryServerInterceptor2()])
# 添加我们服务
hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
# 配置启动的端口
server.add_insecure_port('[::]:50051')
# 开始启动的服务
server.start()
def stop_serve(signum, frame):
print("进程结束了!!!!")
# sys.exit(0)
raise KeyboardInterrupt
# 注销相关的信号
# SIGINT 对应windos下的 ctrl+c的命令
# SIGTERM 对应的linux下的kill命令
signal.signal(signal.SIGINT, stop_serve)
# signal.signal(signal.SIGTERM, stop_serve)
# wait_for_termination --主要是为了目标启动后主进程直接的结束!需要一个循环的方式进行进行进程运行
server.wait_for_termination()
if __name__ == '__main__':
serve()
复制代码
关键的配置地方是:
此时使用我们的客户端请求服务端,服务端会输出一下的信息:
我是拦截器1号:开始----1
我是拦截器2号:开始----1
我是拦截器2号:结束----2 RpcMethodHandler(request_streaming=False, response_streaming=False, request_deserializer=<built-in method FromString of GeneratedProtocolMessageType object at 0x00000175D2988558>, response_serializer=<method 'SerializeToString' of 'google.protobuf.pyext._message.CMessage' objects>, unary_unary=<bound method Greeter.SayHelloAgain of <__main__.Greeter object at 0x00000175D46167B8>>, unary_stream=None, stream_unary=None, stream_stream=None)
我是拦截器1号:结束----2 RpcMethodHandler(request_streaming=False, response_streaming=False, request_deserializer=<built-in method FromString of GeneratedProtocolMessageType object at 0x00000175D2988558>, response_serializer=<method 'SerializeToString' of 'google.protobuf.pyext._message.CMessage' objects>, unary_unary=<bound method Greeter.SayHelloAgain of <__main__.Greeter object at 0x00000175D46167B8>>, unary_stream=None, stream_unary=None, stream_stream=None)
接收到的请求头元数据信息 (_Metadatum(key='mesasge', value='1010'), _Metadatum(key='error', value='No Error'), _Metadatum(key='user-agent', value='grpc-python/1.41.1 grpc-c/19.0.0 (windows; chttp2)'))
复制代码
3.2 客户端的自带拦截器
客户端拦截器的需要实现类和服务端的不一样:
且当我们的使用客户端拦截器的时候,主要链接到我们的RPC的时候的方式也有所改变:
完整客户端示例代码:
import grpc
import hello_pb2
import hello_pb2_grpc
class ClientServerInterceptor1(grpc.UnaryUnaryClientInterceptor):
def intercept_unary_unary(self, continuation, client_call_details, request):
print("客户端的拦截器1:---开始1")
resp = continuation(client_call_details, request)
print("客户端的拦截器1:---结束2", resp)
return resp
class ClientServerInterceptor2(grpc.UnaryUnaryClientInterceptor):
def intercept_unary_unary(self, continuation, client_call_details, request):
print("客户端的拦截器2:---开始1")
resp = continuation(client_call_details, request)
print("客户端的拦截器2:---结束2", resp)
return resp
def run():
# 连接 rpc 服务器
options = [
('grpc.max_send_message_length', 100 * 1024 * 1024),
('grpc.max_receive_message_length', 100 * 1024 * 1024),
('grpc.enable_retries', 1),
('grpc.service_config',
'{ "retryPolicy":{ "maxAttempts": 4, "initialBackoff": "0.1s", "maxBackoff": "1s", "backoffMutiplier": 2, "retryableStatusCodes": [ "UNAVAILABLE" ] } }')
]
# 三种的压缩机制处理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 配置服务启动全局的数据传输的压缩机制
compression = grpc.Compression.Gzip
# with grpc.insecure_channel(target='localhost:50051',
# options=options,
# compression=compression
# ) as channel:
with grpc.insecure_channel(target='localhost:50051',
options=options,
compression=compression
) as channel:
# 通过通道服务一个服务intercept_channel
interceptor_channel = grpc.intercept_channel(channel, ClientServerInterceptor1(),ClientServerInterceptor2())
stub = hello_pb2_grpc.GreeterStub(interceptor_channel)
# 生成请求我们的服务的函数的时候,需要传递的参数体,它放在hello_pb2里面-请求体为:hello_pb2.HelloRequest对象
try:
reest_header = (
('mesasge', '1010'),
('error', 'No Error')
)
response, callbask = stub.SayHelloAgain.with_call(request=hello_pb2.HelloRequest(name='欢迎下次光临'),
# 设置请求的超时处理
timeout=5,
# 设置请求的头的信息
metadata=reest_header,
)
print("SayHelloAgain函数调用结果的返回: " + response.message)
print("SayHelloAgain函数调用结果的返回---响应报文头信息: ", callbask.trailing_metadata())
except grpc._channel._InactiveRpcError as e:
print(e.code())
print(e.details())
if __name__ == '__main__':
run()
复制代码
4:grpc拦截器上下文传递
我们的都知道作为中间件的话,一般某些业务场景下是有些使用承载请求上下文的传递的任务滴,然是自带的拦截器,似乎完全没有对应的
request, context
复制代码
相关的引入传递,如果我们的需要传递上下文的时候呢?这就无法实现了!!!!
要实现具有上下文的传递拦截器的话使用第三方库来实现:
pip install grpc-interceptor
复制代码
这个库还字典的相关的测试:
$ pip install grpc-interceptor[testing]
复制代码
4.1 改造服务端拦截器
该用 第三方库后的完整服务端改造示例:
from concurrent import futures
import time
import grpc
import hello_pb2
import hello_pb2_grpc
import signal
from typing import Any,Callable
# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(hello_pb2_grpc.GreeterServicer):
# 实现 proto 文件中定义的 rpc 调用
def SayHello(self, request, context):
# 返回是我们的定义的响应体的对象
return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))
def SayHelloAgain(self, request, context):
# 返回是我们的定义的响应体的对象
# # 设置异常状态码
# context.set_code(grpc.StatusCode.PERMISSION_DENIED)
# context.set_details("你没有这个访问的权限")
# raise context
# 接收请求头的信息
print("接收到的请求头元数据信息", context.invocation_metadata())
# 设置响应报文头信息
context.set_trailing_metadata((('name', '223232'), ('sex', '23232')))
# 三种的压缩机制处理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 局部的数据进行压缩
context.set_compression(grpc.Compression.Gzip)
return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))
from grpc_interceptor import ServerInterceptor
from grpc_interceptor.exceptions import GrpcException
from grpc_interceptor.exceptions import NotFound
class MyUnaryServerInterceptor1(ServerInterceptor):
def intercept(
self,
method: Callable,
request: Any,
context: grpc.ServicerContext,
method_name: str,
) -> Any:
rsep = None
try:
print("我是拦截器1号:开始----1")
rsep= method(request, context)
except GrpcException as e:
context.set_code(e.status_code)
context.set_details(e.details)
raise
finally:
print("我是拦截器1号:结束----2",rsep)
return rsep
class MyUnaryServerInterceptor2(ServerInterceptor):
def intercept(
self,
method: Callable,
request: Any,
context: grpc.ServicerContext,
method_name: str,
) -> Any:
rsep = None
try:
print("我是拦截器2号:开始----1")
rsep= method(request, context)
except GrpcException as e:
context.set_code(e.status_code)
context.set_details(e.details)
raise
finally:
print("我是拦截器2号:结束----2",rsep)
return rsep
def serve():
# 实例化一个rpc服务,使用线程池的方式启动我们的服务
# 服务一些参数信息的配置
options = [
('grpc.max_send_message_length', 60 * 1024 * 1024), # 限制发送的最大的数据大小
('grpc.max_receive_message_length', 60 * 1024 * 1024), # 限制接收的最大的数据的大小
]
# 三种的压缩机制处理
# NoCompression = _compression.NoCompression
# Deflate = _compression.Deflate
# Gzip = _compression.Gzip
# 配置服务启动全局的数据传输的压缩机制
compression = grpc.Compression.Gzip
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
options=options,
compression=compression,
interceptors=[MyUnaryServerInterceptor1(),MyUnaryServerInterceptor2()])
# 添加我们服务
hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
# 配置启动的端口
server.add_insecure_port('[::]:50051')
# 开始启动的服务
server.start()
def stop_serve(signum, frame):
print("进程结束了!!!!")
# sys.exit(0)
raise KeyboardInterrupt
# 注销相关的信号
# SIGINT 对应windos下的 ctrl+c的命令
# SIGTERM 对应的linux下的kill命令
signal.signal(signal.SIGINT, stop_serve)
# signal.signal(signal.SIGTERM, stop_serve)
# wait_for_termination --主要是为了目标启动后主进程直接的结束!需要一个循环的方式进行进行进程运行
server.wait_for_termination()
if __name__ == '__main__':
serve()
复制代码
通过上面的方式,我们就可以对应我们的上下文请求做相关的处理了!这个和我们的web框架的中间件几乎是接近类似了!
4.2 简单分析第三方库简单源码
进入这个第三方库的源码内部的,其实发现它自己也是实现了
grpc.ServerInterceptor
复制代码
然后对它进一步进行了抽象一层
- 第一步其实和我们自带的实现一样,先是获取返回的下一个带处理的handle
next_handler = continuation(handler_call_details)
复制代码
然后对返回这个next_handler进行是那种类型的的拦截器:
- unary_unary
- unary_stream
- stream_unary
- stream_stream
复制代码
- 判断完成是哪里蕾西的拦截器之后返回
handler_factory, next_handler_method
复制代码
然后调用的是最终返回是handler_factory的对象
-
handler_factory的对象需要的参数有:
- invoke_intercept_method 拦截器的方法
- request_deserializer 请求的系列化
- response_serializer 响应的系列化
-
而我们的invoke_intercept_method 拦截器的方法获取则需要
- 传入定义的一个
request: Any, context: grpc.ServicerContext, 复制代码
-
然后返回是我们的最终需要实现的方法!
我去晕了~
4.3 补充说明handler_call_details
如果我们的单纯只是需要获取到RPC请求里面的提交请求头元数据的,我们可以使用它读取:
print("handler call details: ", handler_call_details.invocation_metadata)
复制代码
它本身是一个:
grpc._server._HandlerCallDetails的类型
复制代码
总结
以上仅仅是个人结合自己的实际需求,做学习的实践笔记!如有笔误!欢迎批评指正!感谢各位大佬!
结尾
END
简书:www.jianshu.com/u/d6960089b…
公众号:微信搜【小儿来一壶枸杞酒泡茶】
小钟同学 | 文 【欢迎一起学习交流】| QQ:308711822