hbase开发问题-hbase-0.94.0的ServerCallable callTimeout处理有问题

读hbase-0.94.0的ServerCallable时,发现callTimeout的处理貌似有问题。

在withRetries方法执行时,调用shouldRetry(t)时;afterCall()还未调用,因此,此时的endTime值为错误的。

另外,shouldRetry中,this.callTimeout = ((int) (this.endTime - this.startTime));这一行代码是在是没有明白要干什么。


查看了hbase-0.94.18的代码,改进了callTimeout的处理。





 
protected int callTimeout;
  protected long startTime, endTime;

 

 

  public void beforeCall() {
    HBaseRPC.setRpcTimeout(this.callTimeout);
    this.startTime = System.currentTimeMillis();
  }

  public void afterCall() {
    HBaseRPC.resetRpcTimeout();
    this.endTime = System.currentTimeMillis();
  }

 

 

  public void shouldRetry(Throwable throwable) throws IOException {
    if (this.callTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)
      if (throwable instanceof SocketTimeoutException
          || (this.endTime - this.startTime > this.callTimeout)) {
        throw (SocketTimeoutException) (SocketTimeoutException) new SocketTimeoutException(
            "Call to access row '" + Bytes.toString(row) + "' on table '"
                + Bytes.toString(tableName)
                + "' failed on socket timeout exception: " + throwable)
            .initCause(throwable);
      } else {
        this.callTimeout = ((int) (this.endTime - this.startTime));
      }
  }

 

 

public T withRetries()
  throws IOException, RuntimeException {
    Configuration c = getConnection().getConfiguration();
    final long pause = c.getLong(HConstants.HBASE_CLIENT_PAUSE,
      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
    final int numRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
    List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
      new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
    for (int tries = 0; tries < numRetries; tries++) {
      try {
        beforeCall();
        connect(tries != 0);
        return call();
      } catch (Throwable t) {
        shouldRetry(t);
        t = translateException(t);
        if (t instanceof SocketTimeoutException ||
            t instanceof ConnectException ||
            t instanceof RetriesExhaustedException) {
          // if thrown these exceptions, we clear all the cache entries that
          // map to that slow/dead server; otherwise, let cache miss and ask
          // .META. again to find the new location
          HRegionLocation hrl = location;
          if (hrl != null) {
            getConnection().clearCaches(hrl.getHostnamePort());
          }
        }
        RetriesExhaustedException.ThrowableWithExtraContext qt =
          new RetriesExhaustedException.ThrowableWithExtraContext(t,
            System.currentTimeMillis(), toString());
        exceptions.add(qt);
        if (tries == numRetries - 1) {
          throw new RetriesExhaustedException(tries, exceptions);
        }
      } finally {
        afterCall();
      }
      try {
        Thread.sleep(ConnectionUtils.getPauseTime(pause, tries));
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new IOException("Giving up after tries=" + tries, e);
      }
    }
    return null;
  }

猜你喜欢

转载自zhang-xzhi-xjtu.iteye.com/blog/2045959
今日推荐