spring异步service中处理线程数限制详解

情况简介

spring项目,controller异步调用service的方法,产生大量并发。

具体业务:

前台同时传入大量待翻译的单词,后台业务接收单词,并调用百度翻译接口翻译接收单词并将翻译结果保存到数据库,前台不需要实时返回翻译结果。

处理方式:

controller接收文本调用service中的异步方法,将单词先保存到队列中,再启动2个新线程,从缓存队列中取单词,并调用百度翻译接口获取翻译结果并将翻译结果保存到数据库。

本文主要知识点:

多线程同时(异步)调用方法后,开启新线程,并限制线程数量。

代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
@Service
public class LgtsAsyncServiceImpl {
  /** logger日志. */
  public static final Logger LOGGER = Logger.getLogger(LgtsAsyncServiceImpl2. class );
 
  private final BlockingQueue<Lgts> que = new LinkedBlockingQueue<>(); // 待翻译的队列
  private final AtomicInteger threadCnt = new AtomicInteger( 0 ); // 当前翻译中的线程数
  private final Vector<String> existsKey = new Vector<>(); // 保存已入队列的数据
  private final int maxThreadCnt = 2 ; // 允许同时执行的翻译线程数
  private static final int NUM_OF_EVERY_TIME = 50 ; // 每次提交的翻译条数
  private static final String translationFrom = "zh" ;
 
  @Async
  public void saveAsync(Lgts t) {
   if (Objects.isNull(t) || StringUtils.isAnyBlank(t.getGco(), t.getCode())) {
    return ;
   }
   offer(t);
   save();
   return ;
  }
 
  private boolean offer(Lgts t) {
   String key = t.getGco() + "-" + t.getCode();
   if (!existsKey.contains(key)) {
    existsKey.add(key);
    boolean result = que.offer(t);
    // LOGGER.trace("待翻译文字[" + t.getGco() + ":" + t.getCode() + "]加入队列结果[" + result
    // + "],队列中数据总个数:" + que.size());
    return result;
   }
   return false ;
  }
 
  @Autowired
  private LgtsService lgtsService;
 
  private void save() {
   int cnt = threadCnt.incrementAndGet(); // 当前线程数+1
   if (cnt > maxThreadCnt) {
    // 已启动的线程大于设置的最大线程数直接丢弃
    threadCnt.decrementAndGet(); // +1的线程数再-回去
    return ;
   }
   GwallUser user = UserUtils.getUser();
   Thread thr = new Thread() {
    public void run() {
     long sleepTime = 30000l;
     UserUtils.setUser(user);
     boolean continueFlag = true ;
     int maxContinueCnt = 5 ; // 最大连续休眠次数,连续休眠次数超过最大休眠次数后,while循环退出,当前线程销毁
     int continueCnt = 0 ; // 连续休眠次数
 
     while (continueFlag) { // 队列不为空时执行
      if (Objects.isNull(que.peek())) {
       try {
        if (continueCnt > maxContinueCnt) {
         // 连续休眠次数达到最大连续休眠次数,当前线程将销毁。
         continueFlag = false ;
         continue ;
        }
        // 队列为空,准备休眠
        Thread.sleep(sleepTime);
        continueCnt++;
        continue ;
       } catch (InterruptedException e) {
        // 休眠失败,无需处理
        e.printStackTrace();
       }
      }
      continueCnt = 0 ; // 重置连续休眠次数为0
 
      List<Lgts> params = new ArrayList<>();
      int totalCnt = que.size();
      que.drainTo(params, NUM_OF_EVERY_TIME);
      StringBuilder utf8q = new StringBuilder();
      String code = "" ;
      List<Lgts> needRemove = new ArrayList<>();
      for (Lgts lgts : params) {
       if (StringUtils.isAnyBlank(code)) {
        code = lgts.getCode();
       }
       // 移除existsKey中保存的key,以免下面翻译失败时再次加入队列时,加入不进去
       String key = lgts.getGco() + "-" + lgts.getCode();
       existsKey.remove(key);
 
       if (!code.equalsIgnoreCase(lgts.getCode())) { // 要翻译的目标语言与当前列表中的第一个不一致
        offer(lgts); // 重新将待翻译的语言放回队列
        needRemove.add(lgts);
        continue ;
       }
       utf8q.append(lgts.getGco()).append( "\n" );
      }
      params.removeAll(needRemove);
      LOGGER.debug( "队列中共" + totalCnt + " 个,获取" + params.size() + " 个符合条件的待翻译内容,编码:" + code);
      String to = "en" ;
      if (StringUtils.isAnyBlank(utf8q, to)) {
       LOGGER.warn( "调用翻译出错,未找到[" + code + "]对应的百度编码。" );
       continue ;
      }
      Map<String, String> result = getBaiduTranslation(utf8q.toString(), translationFrom, to);
      if (Objects.isNull(result) || result.isEmpty()) { // 把没有获取到翻译结果的重新放回队列
       for (Lgts lgts : params) {
        offer(lgts);
       }
       LOGGER.debug( "本次翻译结果为空。" );
       continue ;
      }
      int sucessCnt = 0 , ignoreCnt = 0 ;
      for (Lgts lgts : params) {
       lgts.setBdcode(to);
       String gna = result.get(lgts.getGco());
       if (StringUtils.isAnyBlank(gna)) {
        offer(lgts); // 重新将待翻译的语言放回队列
        continue ;
       }
       lgts.setStat( 1 );
       lgts.setGna(gna);
       int saveResult = lgtsService.saveIgnore(lgts);
       if ( 0 == saveResult) {
        ignoreCnt++;
       } else {
        sucessCnt++;
       }
      }
      LOGGER.debug( "待翻译个数:" + params.size() + ",翻译成功个数:" + sucessCnt + ",已存在并忽略个数:" + ignoreCnt);
     }
     threadCnt.decrementAndGet(); // 运行中的线程数-1
     distory(); // 清理数据,必须放在方法最后,否则distory中的判断需要修改
    }
 
    /**
     * 如果是最后一个线程,清空队列和existsKey中的数据
     */
    private void distory() {
     if ( 0 == threadCnt.get()) {
      // 最后一个线程退出时,执行清理操作
      existsKey.clear();
      que.clear();
     }
    }
   };
   thr.setDaemon( true ); // 守护线程,如果主线程执行完毕,则此线程会自动销毁
   thr.setName( "baidufanyi-" + RandomUtils.nextInt( 1000 , 9999 ));
   thr.start(); // 启动插入线程
  }
 
  /**
   * 百度翻译
   *
   * @param utf8q
   *   待翻译的字符串,需要utf8格式的
   * @param from
   *   百度翻译语言列表中的代码
   *   参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList
   * @param to
   *   百度翻译语言列表中的代码
   *   参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList
   * @return 翻译结果
   */
  private Map<String, String> getBaiduTranslation(String utf8q, String from, String to) {
   Map<String, String> result = new HashMap<>();
   if (StringUtils.isAnyBlank(baiduurlStr)) {
    LOGGER.warn( "百度翻译API接口URL相关参数为空!" );
    return result;
   }
   Map<String, String> params = buildParams(utf8q, from, to);
   if (params.isEmpty()) {
    return result;
   }
 
   String sendUrl = getUrlWithQueryString(baiduurlStr, params);
   try {
    HttpClient httpClient = new HttpClient();
    httpClient.setMethod( "GET" );
    String remoteResult = httpClient.pub(sendUrl, "" );
    result = convertRemote(remoteResult);
   } catch (Exception e) {
    LOGGER.info( "百度翻译API返回结果异常!" , e);
   }
   return result;
  }
 
  private Map<String, String> convertRemote(String remoteResult) {
   Map<String, String> result = new HashMap<>();
   if (StringUtils.isBlank(remoteResult)) {
    return result;
   }
   JSONObject jsonObject = JSONObject.parseObject(remoteResult);
   JSONArray trans_result = jsonObject.getJSONArray( "trans_result" );
   if (Objects.isNull(trans_result) || trans_result.isEmpty()) {
    return result;
   }
   for (Object object : trans_result) {
    JSONObject trans = (JSONObject) object;
    result.put(trans.getString( "src" ), trans.getString( "dst" ));
   }
   return result;
  }
 
  private Map<String, String> buildParams(String utf8q, String from, String to) {
   if (StringUtils.isBlank(from)) {
    from = "auto" ;
   }
   Map<String, String> params = new HashMap<String, String>();
   String skStr = "sk" ;
   String appidStr = "appid" ;
   if (StringUtils.isAnyBlank(skStr, appidStr)) {
    LOGGER.warn( "百度翻译API接口相关参数为空!" );
    return params;
   }
 
   params.put( "q" , utf8q);
   params.put( "from" , from);
   params.put( "to" , to);
 
   params.put( "appid" , appidStr);
 
   // 随机数
   String salt = String.valueOf(System.currentTimeMillis());
   params.put( "salt" , salt);
 
   // 签名
   String src = appidStr + utf8q + salt + skStr; // 加密前的原文
   params.put( "sign" , MD5Util.md5Encrypt(src).toLowerCase());
   return params;
  }
 
  public static String getUrlWithQueryString(String url, Map<String, String> params) {
   if (params == null ) {
    return url;
   }
 
   StringBuilder builder = new StringBuilder(url);
   if (url.contains( "?" )) {
    builder.append( "&" );
   } else {
    builder.append( "?" );
   }
 
   int i = 0 ;
   for (String key : params.keySet()) {
    String value = params.get(key);
    if (value == null ) { // 过滤空的key
     continue ;
    }
 
    if (i != 0 ) {
     builder.append( '&' );
    }
 
    builder.append(key);
    builder.append( '=' );
    builder.append(encode(value));
 
    i++;
   }
 
   return builder.toString();
  }
 
  /**
   * 对输入的字符串进行URL编码, 即转换为%20这种形式
   *
   * @param input
   *   原文
   * @return URL编码. 如果编码失败, 则返回原文
   */
  public static String encode(String input) {
   if (input == null ) {
    return "" ;
   }
 
   try {
    return URLEncoder.encode(input, "utf-8" );
   } catch (UnsupportedEncodingException e) {
    e.printStackTrace();
   }
 
   return input;
  }
}

猜你喜欢

转载自www.cnblogs.com/xwc245ag/p/11469903.html