从前面几小节的学习,我们可能知道在发送和接收消息重要的类ConnectionFactory, Connection,Channel和QueueingConsumer。
ConntectionFactory类是方便创建与AMQP代理相关联的Connection;下面来看看ConntectionFactory是如何创建一个Contention.
首先通过new ConnectionFactory()创建一个ConnectionFactory;并设置此连接工厂的主机设置为localhost。通过ConnectionFactory的newConnection()方法 创建一个Connection; newConnection方法通过得到当前连接的地址及端口号来获得一个Address,通过createFrameHandler的方法 来得到FrameHandler;再通过new AMQConnection(this, frameHandler)来得到Connection并启动。如代码清单7-1所示。
代码清单7-1 创建Connection的源码(ConnectionFactory.java中)
- 1. protected FrameHandler createFrameHandler(Address addr)
- 2. throws IOException {
- 3.
- 4. String hostName = addr.getHost();
- 5. int portNumber = portOrDefault(addr.getPort());//得到端口号
- 6. Socket socket = factory.createSocket();
- 7. configureSocket(socket);//这里Socket通过TCP无迟延的协议
- 8. socket.connect(new InetSocketAddress(hostName, portNumber), connectionTimeout);
- 9. return createFrameHandler(socket);
- 10. }
- 11.
- 12. protected FrameHandler createFrameHandler(Socket sock)
- 13. throws IOException
- 14. {
- 15. return new SocketFrameHandler(sock);
- 16. }
- 17.
- 18. /**
- 19. * Provides a hook to insert custom configuration of the sockets
- 20. * used to connect to an AMQP server before they connect.
- 21. *
- 22. * The default behaviour of this method is to disable Nagle's
- 23. * algorithm to get more consistently low latency. However it
- 24. * may be overridden freely and there is no requirement to retain
- 25. * this behaviour.
- 26. *
- 27. * @param socket The socket that is to be used for the Connection
- 28. */
- 29. protected void configureSocket(Socket socket) throws IOException{
- 30. // disable Nagle's algorithm, for more consistently low latency
- 31. socket.setTcpNoDelay(true);
- 32. }
- 33.
- 34. /**
- 35. * Create a new broker connection
- 36. * @param addrs an array of known broker addresses (hostname/port pairs) to try in order
- 37. * @return an interface to the connection
- 38. * @throws IOException if it encounters a problem
- 39. */
- 40. public Connection newConnection(Address[] addrs)
- 41. throws IOException
- 42. {
- 43. IOException lastException = null;
- 44. for (Address addr : addrs) {
- 45. try {
- 46. FrameHandler frameHandler = createFrameHandler(addr);
- 47. AMQConnection conn = new AMQConnection(this,
- 48. frameHandler);
- 49. conn.start();
- 50. return conn;
- 51. } catch (IOException e) {
- 52. lastException = e;
- 53. }
- 54. }
- 55.
- 56. if (lastException == null) {
- 57. throw new IOException("failed to connect");
- 58. } else {
- 59. throw lastException;
- 60. }
- 61. }
- 62.
- 63. /**
- 64. * Create a new broker connection
- 65. * @return an interface to the connection
- 66. * @throws IOException if it encounters a problem
- 67. */
- 68. public Connection newConnection() throws IOException {
- 69. return newConnection(new Address[] {
- 70. new Address(getHost(), getPort())});
- 71. }
代码清单7-2 连接启动
- /**
- * Start up the connection, including the MainLoop thread.
- * Sends the protocol
- * version negotiation header, and runs through
- * Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then
- * calls Connection.Open and waits for the OpenOk. Sets heartbeat
- * and frame max values after tuning has taken place.
- * @throws java.io.IOException if an error is encountered; IOException
- * subtypes {@link ProtocolVersionMismatchException} and
- * {@link PossibleAuthenticationFailureException} will be thrown in the
- * corresponding circumstances.
- */
- public void start()
- throws IOException
- {
- // Make sure that the first thing we do is to send the header,
- // which should cause any socket errors to show up for us, rather
- // than risking them pop out in the MainLoop
- AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
- new AMQChannel.SimpleBlockingRpcContinuation();
- // We enqueue an RPC continuation here without sending an RPC
- // request, since the protocol specifies that after sending
- // the version negotiation header, the client (connection
- // initiator) is to wait for a connection.start method to
- // arrive.
- _channel0.enqueueRpc(connStartBlocker);
- // The following two lines are akin to AMQChannel's
- // transmit() method for this pseudo-RPC.
- _frameHandler.setTimeout(HANDSHAKE_TIMEOUT);
- _frameHandler.sendHeader();
- // start the main loop going
- Thread ml = new MainLoop();
- ml.setName("AMQP Connection " + getHostAddress() + ":" + getPort());
- ml.start();
- AMQP.Connection.Start connStart = null;
- try {
- connStart =
- (AMQP.Connection.Start) connStartBlocker.getReply().getMethod();
- _serverProperties = connStart.getServerProperties();
- Version serverVersion =
- new Version(connStart.getVersionMajor(),
- connStart.getVersionMinor());
- if (!Version.checkVersion(clientVersion, serverVersion)) {
- _frameHandler.close(); //this will cause mainLoop to terminate
- throw new ProtocolVersionMismatchException(clientVersion,
- serverVersion);
- }
- } catch (ShutdownSignalException sse) {
- throw AMQChannel.wrap(sse);
- }
- String[] mechanisms = connStart.getMechanisms().toString().split(" ");
- SaslMechanism sm = _factory.getSaslConfig().getSaslMechanism(mechanisms);
- if (sm == null) {
- throw new IOException("No compatible authentication mechanism found - " +
- "server offered [" + connStart.getMechanisms() + "]");
- }
- LongString challenge = null;
- LongString response = sm.handleChallenge(null, _factory);
- AMQP.Connection.Tune connTune = null;
- do {
- Method method = (challenge == null)
- ? new AMQImpl.Connection.StartOk(_clientProperties,
- sm.getName(),
- response, "en_US")
- : new AMQImpl.Connection.SecureOk(response);
- try {
- Method serverResponse = _channel0.rpc(method).getMethod();
- if (serverResponse instanceof AMQP.Connection.Tune) {
- connTune = (AMQP.Connection.Tune) serverResponse;
- } else {
- challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
- response = sm.handleChallenge(challenge, _factory);
- }
- } catch (ShutdownSignalException e) {
- throw new PossibleAuthenticationFailureException(e);
- }
- } while (connTune == null);
- int channelMax =
- negotiatedMaxValue(_factory.getRequestedChannelMax(),
- connTune.getChannelMax());
- _channelManager = new ChannelManager(channelMax);
- int frameMax =
- negotiatedMaxValue(_factory.getRequestedFrameMax(),
- connTune.getFrameMax());
- setFrameMax(frameMax);
- int heartbeat =
- negotiatedMaxValue(_factory.getRequestedHeartbeat(),
- connTune.getHeartbeat());
- setHeartbeat(heartbeat);
- _channel0.transmit(new AMQImpl.Connection.TuneOk(channelMax,
- frameMax,
- heartbeat));
- // 0.9.1: insist [on not being redirected] is deprecated, but
- // still in generated code; just pass a dummy value here
- _channel0.exnWrappingRpc(new AMQImpl.Connection.Open(_virtualHost,
- "",
- false)).getMethod();
- return;
- }