记录一次线上关于socket超时问题的定位

版权声明:原创文章,未经允许不得转载.Tips:传统电商火热的时代已经成为过去 , 下一个阶段属于大数据 人工智能 , 服务、便捷、安全、效率、创新成为下一个阶段互联网时代的新词汇,而IT技术也随着行业的变化发展而不断更迭。对于码农的出路总结一句话:追技术不如追领域。[基础][设计][能力] https://blog.csdn.net/shengqianfeng/article/details/84395305

现象:应用程序就是简单的spring+cxf组成的系统,系统上线运行后发现运行一段时间之后就发现请求可以进来却得不到处理,cxf的处理过程是创建一个线程,并提交到线程池去执行.。


import java.io.PrintWriter;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.lang3.RandomUtils;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.PhaseInterceptorChain;
import org.apache.cxf.transport.http.AbstractHTTPDestination;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import com.yzx.rest.util.SysConfig;
import com.yzx.rest.util.web.HttpUtils;

@Service("callbackService")
public class CallbackServiceImpl implements CallbackService {

	private static Logger logger = LogManager.getLogger(CallbackServiceImpl.class);
	
	@Autowired
	private ThreadPoolTaskExecutor poolTaskExecutor;
	
	
	@Override
	public String callback(String msg) {
		logger.info("=================================模拟AS客户端接收到消息,msg="+msg);
		return "000000";
	}
	@Override
	public void test(String msg) {
		final Message messageContext = PhaseInterceptorChain.getCurrentMessage();
		final HttpServletRequest request = (HttpServletRequest) messageContext
				.get(AbstractHTTPDestination.HTTP_REQUEST);
		final HttpServletResponse response = (HttpServletResponse) messageContext
				.get(AbstractHTTPDestination.HTTP_RESPONSE);
		int size  = poolTaskExecutor.getThreadPoolExecutor().getQueue().size();
		logger.info("线程ID:{},推送前置机测试信息.....当前活跃线程数:{},线程池核心线程数:{},线程池最大线程数:{},线程池队列大小:{}",
				Thread.currentThread().getId(),
				poolTaskExecutor.getActiveCount(),
				poolTaskExecutor.getCorePoolSize(),
				poolTaskExecutor.getMaxPoolSize(),
				size);
		if (request.isAsyncSupported()) {
			// 清除cxf的OutInterceptors,防止cxf将响应返回给客户端
			messageContext.getExchange().getEndpoint().getOutInterceptors().clear();
			messageContext.getExchange().getBinding().getOutInterceptors().clear();
			request.startAsync(request, response);
			if (request.isAsyncStarted()) {
				final AsyncContext asyncContext = request.getAsyncContext();
				asyncContext.setTimeout(SysConfig.getInstance().getPropertyInt("async_timeout"));
				asyncContext.start(
				// 开启http线程
						new Runnable() {
							@Override
							public void run() {
								poolTaskExecutor.execute(// 由http线程交于work线程
										new Runnable() {
											@Override
											public void run() {
												PrintWriter printWriter = null;
												try {
													String responseMsgContent = "";
													String ws_callback = SysConfig.getInstance().getProperty("ws_callback");
													String result = HttpUtils.postXml(ws_callback, "desc="+System.currentTimeMillis()+"-"+RandomUtils.nextInt(0, 9999));
													logger.info("线程ID:{},result:{}",Thread.currentThread().getId(),result);
													response.setCharacterEncoding("UTF-8");
													response.setHeader("Content-type","application/json;charset=UTF-8");
													printWriter = response.getWriter();
													printWriter.write(responseMsgContent);
													logger.info(responseMsgContent);
												} catch (Exception e) {
													e.printStackTrace();
												} finally {
													if (printWriter != null) {
														printWriter.flush();
														printWriter.close();
													}
													asyncContext.complete();
												}
											}
										});
							}
						});
			}
		} else { // 不支持异步
		}
	
	}

}

我在cxf逻辑的外边打印了当前系统的线程池大小,队列信息.spring的配置文件中配置了线程池配置:

    <!-- 异步线程池 -->  
  	<bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">  
	    <!-- 核心线程数  -->  
	    <property name="corePoolSize" value="500" />  
	    <!-- 最大线程数 -->  
	    <property name="maxPoolSize" value="800" />  
	    <!-- 队列最大长度 >=mainExecutor.maxSize -->  
	    <property name="queueCapacity" value="1000" />  
	    <!-- 线程池维护线程所允许的空闲时间 -->  
	    <property name="keepAliveSeconds" value="30" />  
	    <!-- 线程池对拒绝任务(无线程可用)的处理策略 -->  
	    <property name="rejectedExecutionHandler">  
	    <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />  
   		</property>  
  	</bean>

连续观察了两三天发现异常是时打印的活跃线程数总是500,然后就再也没有给响应,于是推测是不是线程没有回收,所以调整了线程池的keepAliveSeconds参数,从300调小到了30s,继续观察,结果确实是比之前出异常的时间晚了那么一点点,就可以忽略不计了.

于是安装了个probe对tomcat进行监控.并在测试环境进行压测,开始的时候线程状态是

sun.misc.Unsafe.park ( native code )

解读一下:

sun.misc.Unsafe.park ( native code )

java.util.concurrent.locks.LockSupport.park ( LockSupport.java:186 )

java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await ( AbstractQueuedSynchronizer.java:2043 )

java.util.concurrent.LinkedBlockingQueue.take ( LinkedBlockingQueue.java:442 )

org.apache.tomcat.util.threads.TaskQueue.take ( TaskQueue.java:104 )

org.apache.tomcat.util.threads.TaskQueue.take ( TaskQueue.java:32 )

java.util.concurrent.ThreadPoolExecutor.getTask ( ThreadPoolExecutor.java:1068 )

java.util.concurrent.ThreadPoolExecutor.runWorker ( ThreadPoolExecutor.java:1130 )

java.util.concurrent.ThreadPoolExecutor$Worker.run ( ThreadPoolExecutor.java:615 )

org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run ( TaskThread.java:61 )

java.lang.Thread.run ( Thread.java:744 )

正常的状态:线程处于WAITING状态,阻塞在试图从任务队列中取任务(LinkedBlockingQueue.take),这个任务队列指的是ThreadPoolExecutor的线程池启动的线程任务队列;

也就是说,这些线程都是空闲状态,在等着任务的到来呢!

并发库中的BlockingQueue是一个比较好玩的类,顾名思义,就是阻塞队列。该类主要提供了两个方法put()take(),前者将一个对象放到队列尾部,如果队列已经满了,就等待直到有空闲节点;后者从head取一个对象,如果没有对象,就等待直到有可取的对象。

随着压测的进行,发现几接口发起方的Linux机器中出现大量的CLOSE_WAIT连接

扫描二维码关注公众号,回复: 4464331 查看本文章

代码中使用HttpClient, 查了下:

        按apache httpclient的设计理念, 当http client 处于高并发时, 默认机制导致的CLOSE_WAIT会影响服务的可用性.

产生的原因:

服务器A会去请求服务器B上面的apache获取文件资源,正常情况下,如果请求成功,那么在抓取完资源后服务器A会主动发出关闭连接的请求,这个时候就是主动关闭连接,连接状态我们可以看到是TIME_WAIT。如果一旦发生异常呢?假设请求的资源服务器B上并不存在,那么这个时候就会由服务器B发出关闭连接的请求,服务器A就是被动的关闭了连接,如果服务器A被动关闭连接之后自己并没有释放连接,那就会造成CLOSE_WAIT的状态了。

那么服务器A为什么没有主动释放连接,而服务器B作为被调用放释放了连接,在这里被调用方我是使用Netty实现的Http Server.观察了probe发现一个现象,运行一段时间后所有的Thread都达到了一个相同的状态:

java.net.SocketInputStream.socketRead0 ( native code )

所有的请求竟然都卡在了逻辑中的发送Http请求的地方,好像是在等待结果响应,问题是都等了半个小时了还是这个状态,迟迟不释放,难道是不会自动超时吗???于是检查了下代码,发现这个POST请求真的没有设置socket_timeout超时时间。

/**
	 * 发送post请求(xml字符串)
	 * 
	 * @param url
	 * @param data
	 * @return
	 */
	public static String postXml(String url, String data) {
		String result = null;
		Request request = Request.Post(url);
		request.setHeader("Accept", "application/xml");
		request.setHeader("Content-Type", "text/xml;charset=utf-8");
		request.setHeader("Connection", "close");
		Integer so_timeout = SysConfig.getInstance().getPropertyInt("req_so_timeout",5000);
		Integer connenctTimeOut = SysConfig.getInstance().getPropertyInt("req_connect_timeout",5000);//设置连接超时时间,单位毫秒。
		request.connectTimeout(connenctTimeOut);
		request.socketTimeout(so_timeout);
		request.bodyString(data, ContentType.create("text/xml", Consts.UTF_8));
		try {
			logger.debug(Thread.currentThread().getName()+"【发送post请求(xml字符串)】开始:url=" + url + ", data=" + data);
			result = request.execute().returnContent().asString();
			logger.debug("【发送post请求(xml字符串)】成功:url=" + url + ", data=" + data
					+ ", result=" + result);
		} catch (Throwable e) {
			logger.error("【发送post请求(xml字符串)】失败:url=" + url + ", data=" + data
					+ ", result=" + result, e);
		}
		return result;
	}

果然,重新压测后发现连接都在超时后释放了,线程数也恢复正常了,我顶!

但是后来一个新问题出现了,就是为什么发送的http请求会超时呢?Netty服务端这个对应接口的逻辑也非常之简单没有任何IO操作就返回了!

在请求方机器上抓包:

tcpdump host 115.218.666.146 -s0  -w qianfenghaha.pcap

根据条件过滤wireshark:(参数中包含某个字符串)

             http.file_data  contains 1542790732067-5152

其实Netty服务端好像就没有收到请求,日志中也查不到,抓包显示是发送了,只不过没有响应罢了,一度怀疑是对方Linux文件句柄连接数满了,可惜对方也没怎么配合,到现在也没有找到问题的根本原因。

大家有什么怀疑的点?

 

----后来发现问题真够奇葩的,原来是服务端出现了线程安全问题,这个就不言而明了吧!

猜你喜欢

转载自blog.csdn.net/shengqianfeng/article/details/84395305
今日推荐