这里主要看thrift 在java平台的实现。
我们通过thrift的编译器会得到一个代码很长的类。这个类会在服务端和客户端使用,所以里面包含了服务端和客户端的相关代码。
我们从客户端的看起:
service HelloService{
string helloString(1:string para, 2:i64 id)
}
public class Client {
public static void main(String[] args) {
System.out.println("客户端启动....");
TTransport transport = null;
try {
transport = new TSocket("localhost", 50005);
TProtocol protocol = new TBinaryProtocol(transport);
HelloService.Client client = new HelloService.Client(protocol);
transport.open();
String result = client.helloString("liyao",1);
System.out.println(result);
} catch (TTransportException e) {
e.printStackTrace();
} catch (TException e) {
e.printStackTrace();
} finally {
if (null != transport) {
transport.close();
}
}
}
这里client端的main方法,使用了thrift生成类里的本地方法,真正的实现在服务端进程里。下面看下这个client里的本地方法实现。
public interface Iface {
String helloString(String var1, long var2) throws TException;
}
这是我们之前定义的接口。
public static class Client extends TServiceClient implements HelloService.Iface {
public Client(TProtocol prot) {
super(prot, prot);
}
public Client(TProtocol iprot, TProtocol oprot) {
super(iprot, oprot);
}
public String helloString(String para, long id) throws TException {
this.send_helloString(para, id);
return this.recv_helloString();
}
public void send_helloString(String para, long id) throws TException {
HelloService.helloString_args args = new HelloService.helloString_args();
args.setPara(para);
args.setId(id);
this.sendBase("helloString", args);
}
public String recv_helloString() throws TException {
HelloService.helloString_result result = new HelloService.helloString_result();
this.receiveBase(result, "helloString");
if (result.isSetSuccess()) {
return result.success;
} else {
throw new TApplicationException(5, "helloString failed: unknown result");
}
}
...//省略
}
这是本地方法实现。接口实现类里调用了send_xxx方法发送请求和参数,然后调用了recv_xxx方法接受返回值。
在send_xxx方法里,先构建了方法的参数,再调用一个sendBase方法发送。
这里分开:
1.参数类,thrift实际上会为每一个方法的参数构建一个参数的内部类。同时也会为每一个方法的返回值构建一个参数的内部类。参数类和结果类最终会以方法名_args和方法名_result命名。
下面看下这个参数的实现类。
public static class helloString_args implements TBase<HelloService.helloString_args, HelloService.helloString_args._Fields>, Serializable, Cloneable, Comparable<HelloService.helloString_args> {
private static final TStruct STRUCT_DESC = new TStruct("helloString_args");
private static final TField PARA_FIELD_DESC = new TField("para", (byte)11, (short)1);
private static final TField ID_FIELD_DESC = new TField("id", (byte)10, (short)2);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap();
public String para;
public long id;
private static final int __ID_ISSET_ID = 0;
private byte __isset_bitfield;
public static final Map<HelloService.helloString_args._Fields, FieldMetaData> metaDataMap;
定义了接口的参数,方法中最重要的是read和write方法。
public void read(TProtocol iprot) throws TException {
((SchemeFactory)schemes.get(iprot.getScheme())).getScheme().read(iprot, this);
}
public void write(TProtocol oprot) throws TException {
((SchemeFactory)schemes.get(oprot.getScheme())).getScheme().write(oprot, this);
}
这里最终会调用到协议的read和write方法。
2.sendBase方法:
protected void sendBase(String methodName, TBase<?,?> args) throws TException {
sendBase(methodName, args, TMessageType.CALL);
}
private void sendBase(String methodName, TBase<?,?> args, byte type) throws TException {
oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_));
args.write(oprot_);
oprot_.writeMessageEnd();
oprot_.getTransport().flush();
}
这个是Client的父类里的方法。最终会调用协议层的方法发送。
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.thrift.protocol;
import java.nio.ByteBuffer;
import org.apache.thrift.TException;
import org.apache.thrift.scheme.IScheme;
import org.apache.thrift.scheme.StandardScheme;
import org.apache.thrift.transport.TTransport;
/**
* Protocol interface definition.
*
*/
public abstract class TProtocol {
/**
* Prevent direct instantiation
*/
@SuppressWarnings("unused")
private TProtocol() {}
/**
* Transport
*/
protected TTransport trans_;
/**
* Constructor
*/
protected TProtocol(TTransport trans) {
trans_ = trans;
}
/**
* Transport accessor
*/
public TTransport getTransport() {
return trans_;
}
/**
* Writing methods.
*/
public abstract void writeMessageBegin(TMessage message) throws TException;
public abstract void writeMessageEnd() throws TException;
public abstract void writeStructBegin(TStruct struct) throws TException;
public abstract void writeStructEnd() throws TException;
public abstract void writeFieldBegin(TField field) throws TException;
public abstract void writeFieldEnd() throws TException;
public abstract void writeFieldStop() throws TException;
public abstract void writeMapBegin(TMap map) throws TException;
public abstract void writeMapEnd() throws TException;
public abstract void writeListBegin(TList list) throws TException;
public abstract void writeListEnd() throws TException;
public abstract void writeSetBegin(TSet set) throws TException;
public abstract void writeSetEnd() throws TException;
public abstract void writeBool(boolean b) throws TException;
public abstract void writeByte(byte b) throws TException;
public abstract void writeI16(short i16) throws TException;
public abstract void writeI32(int i32) throws TException;
public abstract void writeI64(long i64) throws TException;
public abstract void writeDouble(double dub) throws TException;
public abstract void writeString(String str) throws TException;
public abstract void writeBinary(ByteBuffer buf) throws TException;
/**
* Reading methods.
*/
public abstract TMessage readMessageBegin() throws TException;
public abstract void readMessageEnd() throws TException;
public abstract TStruct readStructBegin() throws TException;
public abstract void readStructEnd() throws TException;
public abstract TField readFieldBegin() throws TException;
public abstract void readFieldEnd() throws TException;
public abstract TMap readMapBegin() throws TException;
public abstract void readMapEnd() throws TException;
public abstract TList readListBegin() throws TException;
public abstract void readListEnd() throws TException;
public abstract TSet readSetBegin() throws TException;
public abstract void readSetEnd() throws TException;
public abstract boolean readBool() throws TException;
public abstract byte readByte() throws TException;
public abstract short readI16() throws TException;
public abstract int readI32() throws TException;
public abstract long readI64() throws TException;
public abstract double readDouble() throws TException;
public abstract String readString() throws TException;
public abstract ByteBuffer readBinary() throws TException;
/**
* Reset any internal state back to a blank slate. This method only needs to
* be implemented for stateful protocols.
*/
public void reset() {}
/**
* Scheme accessor
*/
public Class<? extends IScheme> getScheme() {
return StandardScheme.class;
}
}
这是协议层接口,协议层要做的是序列化,把发送的内容转化为特定的格式,说白了就是java内存模型转化为特定的tyte数组,不同的协议对应了不同格式的byte数组。
这是thrift的jar包里关于protocol的实现类。我们可以看到一些protocol的实现类:json的,base64的,binary的。感兴趣的可以看下各自的实现。好了,至此我们从client层次深入到了protocol层次,也明白了protocol层次的功能是格式化或者序列化,把内存中的数据转化为一定格式的byte数组。
下面我们以binaryProtocol协议为例,看下其中的write32方法:
public void writeI32(int i32) throws TException {
i32out[0] = (byte)(0xff & (i32 >> 24));
i32out[1] = (byte)(0xff & (i32 >> 16));
i32out[2] = (byte)(0xff & (i32 >> 8));
i32out[3] = (byte)(0xff & (i32));
trans_.write(i32out, 0, 4);
}
这里把数据转为byte数组格式,最终调用了transport层的write方法发送。
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.thrift.transport;
import java.io.Closeable;
/**
* Generic class that encapsulates the I/O layer. This is basically a thin
* wrapper around the combined functionality of Java input/output streams.
*
*/
public abstract class TTransport implements Closeable {
/**
* Queries whether the transport is open.
*
* @return True if the transport is open.
*/
public abstract boolean isOpen();
/**
* Is there more data to be read?
*
* @return True if the remote side is still alive and feeding us
*/
public boolean peek() {
return isOpen();
}
/**
* Opens the transport for reading/writing.
*
* @throws TTransportException if the transport could not be opened
*/
public abstract void open()
throws TTransportException;
/**
* Closes the transport.
*/
public abstract void close();
/**
* Reads up to len bytes into buffer buf, starting at offset off.
*
* @param buf Array to read into
* @param off Index to start reading at
* @param len Maximum number of bytes to read
* @return The number of bytes actually read
* @throws TTransportException if there was an error reading data
*/
public abstract int read(byte[] buf, int off, int len)
throws TTransportException;
/**
* Guarantees that all of len bytes are actually read off the transport.
*
* @param buf Array to read into
* @param off Index to start reading at
* @param len Maximum number of bytes to read
* @return The number of bytes actually read, which must be equal to len
* @throws TTransportException if there was an error reading data
*/
public int readAll(byte[] buf, int off, int len)
throws TTransportException {
int got = 0;
int ret = 0;
while (got < len) {
ret = read(buf, off+got, len-got);
if (ret <= 0) {
throw new TTransportException(
"Cannot read. Remote side has closed. Tried to read "
+ len
+ " bytes, but only got "
+ got
+ " bytes. (This is often indicative of an internal error on the server side. Please check your server logs.)");
}
got += ret;
}
return got;
}
/**
* Writes the buffer to the output
*
* @param buf The output data buffer
* @throws TTransportException if an error occurs writing data
*/
public void write(byte[] buf) throws TTransportException {
write(buf, 0, buf.length);
}
/**
* Writes up to len bytes from the buffer.
*
* @param buf The output data buffer
* @param off The offset to start writing from
* @param len The number of bytes to write
* @throws TTransportException if there was an error writing data
*/
public abstract void write(byte[] buf, int off, int len)
throws TTransportException;
/**
* Flush any pending data out of a transport buffer.
*
* @throws TTransportException if there was an error writing out data.
*/
public void flush()
throws TTransportException {}
/**
* Access the protocol's underlying buffer directly. If this is not a
* buffered transport, return null.
* @return protocol's Underlying buffer
*/
public byte[] getBuffer() {
return null;
}
/**
* Return the index within the underlying buffer that specifies the next spot
* that should be read from.
* @return index within the underlying buffer that specifies the next spot
* that should be read from
*/
public int getBufferPosition() {
return 0;
}
/**
* Get the number of bytes remaining in the underlying buffer. Returns -1 if
* this is a non-buffered transport.
* @return the number of bytes remaining in the underlying buffer. <br> Returns -1 if
* this is a non-buffered transport.
*/
public int getBytesRemainingInBuffer() {
return -1;
}
/**
* Consume len bytes from the underlying buffer.
* @param len
*/
public void consumeBuffer(int len) {}
}
Transport层次封装了底层io操作过程。
其实现有阻塞socket类型或者非阻塞socket类型等。总之transport层主要解决底层io实现。
关于协议和io的具体实现这里就不在探讨了,比较复杂。
这里主要想展示thrift调用的过程。
再回到之前client里的sendBase方法。
1.首先会sendMessageBegin,主要是发送了方法名。
2.调用args,即参数类的send方法。这个是在生成类中生成的,最终会调用scheme的send方法:
public void write(TProtocol oprot, HelloService.helloString_args struct) throws TException {
struct.validate();
oprot.writeStructBegin(HelloService.helloString_args.STRUCT_DESC);
if (struct.para != null) {
oprot.writeFieldBegin(HelloService.helloString_args.PARA_FIELD_DESC);
oprot.writeString(struct.para);
oprot.writeFieldEnd();
}
oprot.writeFieldBegin(HelloService.helloString_args.ID_FIELD_DESC);
oprot.writeI64(struct.id);
oprot.writeFieldEnd();
oprot.writeFieldStop();
oprot.writeStructEnd();
}
会把每一个参数的名字和值发送出去,最终还是调用协议层的write方法。
为什么要把参数的write方法放到参数类里实现?因为每一个方法发送参数的内容不同,因此发送过程需要根据每一个具体的方法定制,比如参数的数目,名称和类型等都不同。定义好发送的内容,之后就可以调用协议层发送了。
3.调用sendMessageEnd,这个是空方法。
这是整个发送过程。
下面看接受返回结果:
public String recv_helloString() throws TException {
HelloService.helloString_result result = new HelloService.helloString_result();
this.receiveBase(result, "helloString");
if (result.isSetSuccess()) {
return result.success;
} else {
throw new TApplicationException(5, "helloString failed: unknown result");
}
}
调用了父类的方法:
protected void receiveBase(TBase<?,?> result, String methodName) throws TException {
TMessage msg = iprot_.readMessageBegin();
if (msg.type == TMessageType.EXCEPTION) {
TApplicationException x = TApplicationException.read(iprot_);
iprot_.readMessageEnd();
throw x;
}
if (msg.seqid != seqid_) {
throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
}
result.read(iprot_);
iprot_.readMessageEnd();
}
通args一样,生成类里也有每一种结果的定制实现类,最终会调用生成类里的result的read方法:
public void read(org.apache.thrift.protocol.TProtocol iprot, helloString_result struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
{
schemeField = iprot.readFieldBegin();
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
break;
}
switch (schemeField.id) {
case 0: // SUCCESS
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.success = iprot.readString();
struct.setSuccessIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
iprot.readFieldEnd();
}
iprot.readStructEnd();
// check for required fields of primitive type, which can't be checked in the validate method
struct.validate();
}
会调用协议层的readString方法:
public String readString() throws TException {
int size = readI32();
checkStringReadLength(size);
if (stringLengthLimit_ > 0 && size > stringLengthLimit_) {
throw new TProtocolException(TProtocolException.SIZE_LIMIT,
"String field exceeded string size limit");
}
if (trans_.getBytesRemainingInBuffer() >= size) {
try {
String s = new String(trans_.getBuffer(), trans_.getBufferPosition(), size, "UTF-8");
trans_.consumeBuffer(size);
return s;
} catch (UnsupportedEncodingException e) {
throw new TException("JVM DOES NOT SUPPORT UTF-8");
}
}
return readStringBody(size);
}
public String readStringBody(int size) throws TException {
try {
byte[] buf = new byte[size];
trans_.readAll(buf, 0, size);
return new String(buf, "UTF-8");
} catch (UnsupportedEncodingException uex) {
throw new TException("JVM DOES NOT SUPPORT UTF-8");
}
}
接着会调用传输层的readAll方法
public int readAll(byte[] buf, int off, int len)
throws TTransportException {
int got = 0;
int ret = 0;
while (got < len) {
ret = read(buf, off+got, len-got);
if (ret <= 0) {
throw new TTransportException(
"Cannot read. Remote side has closed. Tried to read "
+ len
+ " bytes, but only got "
+ got
+ " bytes. (This is often indicative of an internal error on the server side. Please check your server logs.)");
}
got += ret;
}
return got;
}
最终是read方法,在协议的父类里:
/**
* Reads from the underlying input stream if not null.
*/
public int read(byte[] buf, int off, int len) throws TTransportException {
if (inputStream_ == null) {
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot read from null inputStream");
}
int bytesRead;
try {
bytesRead = inputStream_.read(buf, off, len);
} catch (IOException iox) {
throw new TTransportException(TTransportException.UNKNOWN, iox);
}
if (bytesRead < 0) {
throw new TTransportException(TTransportException.END_OF_FILE);
}
return bytesRead;
}
这就是一次thrift调用的客户端过程。
总结来看时序主要是:
调用本地方法->调用生成类方法(参数或者返回值)->调用协议层->调用运输层。
服务端的逻辑也基本相同:
假设我们现在使用如下的服务端
public class Server {
public static void main(String args[]){
try {
System.out.println("服务端开启....");
TProcessor tprocessor = new HelloService.Processor<HelloService.Iface>(new HelloServiceImpl());
TServerSocket serverTransport = new TServerSocket(50005);
TServer.Args tArgs = new TServer.Args(serverTransport);
tArgs.processor(tprocessor);
tArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new TSimpleServer(tArgs);
server.serve();
}catch (TTransportException e) {
e.printStackTrace();
}
}
}
这里调用了生成类 中的Processor的构造方法传入了实现类。
看下生成类中的Processor:
public static class Processor<I extends HelloService.Iface> extends TBaseProcessor<I> implements TProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(HelloService.Processor.class.getName());
public Processor(I iface) {
super(iface, getProcessMap(new HashMap()));
}
protected Processor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processMap) {
super(iface, getProcessMap(processMap));
}
private static <I extends HelloService.Iface> Map<String, ProcessFunction<I, ? extends TBase>> getProcessMap(Map<String, ProcessFunction<I, ? extends TBase>> processMap) {
processMap.put("helloString", new HelloService.Processor.helloString());
return processMap;
}
public static class helloString<I extends HelloService.Iface> extends ProcessFunction<I, HelloService.helloString_args> {
public helloString() {
super("helloString");
}
public HelloService.helloString_args getEmptyArgsInstance() {
return new HelloService.helloString_args();
}
protected boolean isOneway() {
return false;
}
public HelloService.helloString_result getResult(I iface, HelloService.helloString_args args) throws TException {
HelloService.helloString_result result = new HelloService.helloString_result();
result.success = iface.helloString(args.para, args.id);
return result;
}
}
}
调用了父类的构造方法:
public abstract class TBaseProcessor<I> implements TProcessor {
private final I iface;
private final Map<String,ProcessFunction<I, ? extends TBase>> processMap;
protected TBaseProcessor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processFunctionMap) {
this.iface = iface;
this.processMap = processFunctionMap;
}
public Map<String,ProcessFunction<I, ? extends TBase>> getProcessMapView() {
return Collections.unmodifiableMap(processMap);
}
@Override
public boolean process(TProtocol in, TProtocol out) throws TException {
TMessage msg = in.readMessageBegin();
ProcessFunction fn = processMap.get(msg.name);
if (fn == null) {
TProtocolUtil.skip(in, TType.STRUCT);
in.readMessageEnd();
TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
x.write(out);
out.writeMessageEnd();
out.getTransport().flush();
return true;
}
fn.process(msg.seqid, in, out, iface);
return true;
}
}
父类里有一个iface实现类的引用,说明每一个实现类都有一个processor。
所以这里会最终调用类型为ProcessFucntion的实例的process方法:
public abstract class ProcessFunction<I, T extends TBase> {
private final String methodName;
private static final Logger LOGGER = LoggerFactory.getLogger(ProcessFunction.class.getName());
public ProcessFunction(String methodName) {
this.methodName = methodName;
}
public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException {
T args = getEmptyArgsInstance();
try {
args.read(iprot);
} catch (TProtocolException e) {
iprot.readMessageEnd();
TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.EXCEPTION, seqid));
x.write(oprot);
oprot.writeMessageEnd();
oprot.getTransport().flush();
return;
}
iprot.readMessageEnd();
TBase result = null;
try {
result = getResult(iface, args);
} catch(TException tex) {
LOGGER.error("Internal error processing " + getMethodName(), tex);
TApplicationException x = new TApplicationException(TApplicationException.INTERNAL_ERROR,
"Internal error processing " + getMethodName());
oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.EXCEPTION, seqid));
x.write(oprot);
oprot.writeMessageEnd();
oprot.getTransport().flush();
return;
}
if(!isOneway()) {
oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.REPLY, seqid));
result.write(oprot);
oprot.writeMessageEnd();
oprot.getTransport().flush();
}
}
protected abstract boolean isOneway();
public abstract TBase getResult(I iface, T args) throws TException;
public abstract T getEmptyArgsInstance();
public String getMethodName() {
return methodName;
}
}
该方法是在父类中实现的,最终的getResult方法是在生成类中实现的,已经贴在前面。最终会在getResult方法内调用接口的实现类,并且写回结果。
总结就是,每一个接口的实现类都对应一个processor,processor持有了实现类实例的引用。processor是实现类的代理,通用逻辑由父类实现,与实现类相关的逻辑在生成类的processor里。processor通过getResult调用实现类,写回结果,完成代理。