前言:
当我们在部署web服务的时候,web容器通常都会记录来自客户端的访问日志。而当我们使用Thrift RPC服务的时候,Thrift服务则不会给我们自动记录客户端的访问日志。
通过这篇文章,你可以学习到如何使用在Thrift服务器端添加客户端的访问日志。
面临的问题:
要在Thrift服务器端添加客户端的访问日志,我们需要解决两个问题:
- 找到合适的拦截点记录信息
- 收集访问日志中需要的信息
寻找合适的拦截点:
我们都知道,Thrift协议为我们提供了thrift文件向各种编程语言转换的程序。通过观察,我们会发现Thrift将IDL中定义的每个方法抽象为一个类,即ProcessFunction类。
该类负责从输入中读取参数,调用用户编写的服务将响应写回到输出中。该类是如何发挥作用的,下面这张类图可以比较清晰地说明。

当我们使用Thrift.exe可执行程序处理IDL文件的时候,Processor会被自动创建出来。它负责把实际的方法实现和方法的key关联起来,放到Map中维护。
以TMultiplexedProcessor为例,TMultiplexedProcessor会将所有注册的Processor都存储到SERVICE_PROCESSOR_MAP中。
| public boolean process(TProtocol iprot, TProtocol oprot) throws TException { |
| |
| |
| |
| TMessage message = iprot.readMessageBegin(); |
| if (message.type != TMessageType.CALL && message.type != TMessageType.ONEWAY) { |
| |
| |
| throw new TException("This should not have happened!?"); |
| } |
| |
| int index = message.name.indexOf(TMultiplexedProtocol.SEPARATOR); |
| if (index < 0) { |
| throw new TException("Service name not found in message name: " + message.name + ". Did you " + |
| "forget to use a TMultiplexProtocol in your client?"); |
| } |
| |
| String serviceName = message.name.substring(0, index); |
| TProcessor actualProcessor = SERVICE_PROCESSOR_MAP.get(serviceName); |
| if (actualProcessor == null) { |
| throw new TException("Service name not found: " + serviceName + ". Did you forget " + |
| "to call registerProcessor()?"); |
| } |
| |
| TMessage standardMessage = new TMessage( |
| message.name.substring(serviceName.length()+TMultiplexedProtocol.SEPARATOR.length()), |
| message.type, |
| message.seqid |
| ); |
| |
| return actualProcessor.process(new StoredMessageProtocol(iprot, standardMessage), oprot); |
| } |
actualProcessor的process过程如下,其具体的实现逻辑在TBaseProcessor中。
| public boolean process(TProtocol in, TProtocol out) throws TException { |
| |
| TMessage msg = in.readMessageBegin(); |
| |
| ProcessFunction fn = processMap.get(msg.name); |
| if (fn == null) { |
| TProtocolUtil.skip(in, TType.STRUCT); |
| in.readMessageEnd(); |
| TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'"); |
| out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid)); |
| x.write(out); |
| out.writeMessageEnd(); |
| out.getTransport().flush(); |
| return true; |
| } |
| |
| |
| fn.process(msg.seqid, in, out, iface); |
| return true; |
| } |
通过上面的分析,我们可以在ProcessFunction中添加有关的access日志。但是这其中有一个问题,就是经过ThriftServer对thrift请求的解析以及消息内容处理,在到达ProcessFunction::process方法的时候,我们已经无法获取到客户端的远程IP地址了。
接下来,我们就要考虑如何收集访问日志需要的信息了。
如何收集访问日志需要的信息:
从上面ProcessFunction中的process方法中,我们可以看出将客户端的IP地址保存到iprot中,是一个不错的选择。
那么,接下来我们需要找到iprot这个对象参数是在什么地方被创建的,以及在合适的地方将客户端的IP地址写入到这个对象中。
经过分析,我们会发现TNonblockingServer是NIO服务器的实现,它通过Selector来检查IO就绪状态,进而调用相关的Channel。
就方法调用而言,它处理的是读事件,用AbstractNonblockingServer的handelRead()来进一步处理。
| protected void handleRead(SelectionKey key) throws IOException { |
| FrameBuffer buffer = (FrameBuffer) key.attachment(); |
| if (!buffer.read()) { |
| cleanupSelectionKey(key); |
| return; |
| } |
| |
| if (buffer.isFrameFullyRead()) { |
| if (!requestInvoke(buffer)) { |
| cleanupSelectionKey(key); |
| } |
| } |
| } |
SelectionKey中有客户端的IP地址,FrameBuffer则是处理方法调用的缓冲区对象,其内部的invoke方法会对Processor中的方法进行实际调用。
因此,在handleRead方法中添加两行代码将客户端的IP地址写入inProt_中就可以带入ProcessFunction中了。
| SocketChannel socketChannel = (SocketChannel)key.channel(); |
| buffer.inProt_.setClientAddr(socketChannel.getRemoteAddress().toString()); |