重点技术-20170713-阿里云-日志服务-实时消费LogHub

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/namelessfighter/article/details/80547347

实时消费LogHub与实时查询LogSearch区别

功能 日志查询(LogSearch) 日志收集与消费(LogHub)
关键词查找 支持 不支持
小量数据读取
全量数据读取 慢(100条日志100ms,不建议这样使用) 快 (1MB日志10ms,推荐方式)
读取是否区分topic 区分 不区分,只以shard作为标识
读取是否区分shard 不区分,查询所有shard 区分,单次读取需要指定shard
费用 较高
适用场景 监控、问题调查等需要过滤数据的场景 流式计算、批量处理等全量处理场景

注意:一条日志数据,只能被消费一次

---------Maven依赖---------
<!-- 日志实时消费 -->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.5</version>
</dependency>

---------LogHub日志处理 ---------
// consumer group 的名字,不能为空,支持 [a-z][0-9] 和'_','-',长度在
// [3-63]字符,只能以小写字母和数字开头结尾
String consumerGroupName = "chisal";
// consumer 的名字,必须确保同一个 consumer group 下面的各个 consumer 不重名
String workerInstanceName = "tony";

String loghubEndPoint = S_AliyunAccount.LogEndpoint;
String project = S_AliyunAccount.LogProject;
String logStore = S_AliyunAccount.LogStore;
String accessId = S_AliyunAccount.AccessKeyId;
String accessKey = S_AliyunAccount.AccessSecretKey;

// 用于指出在服务端没有记录 shard 的 checkpoint 的情况下应该从什么位置消费 shard
// 如果服务端保存了有效的 checkpoint信息,那么这些取值不起任何作用,
// mCursorPosition 取值可以是 [BEGIN_CURSOR, END_CURSOR, SPECIAL_TIMER_CURSOR]中的一个,
// BEGIN_CURSOR 表示从 shard 中的第一条数据开始消费
// END_CURSOR 表示从 shard 中的当前时刻的最后一条数据开始消费
// SPECIAL_TIMER_CURSOR 和下面的 mLoghubCursorStartTime 配对使用,表示从特定的时刻开始消费数据。
LogHubCursorPosition cursorPosition = LogHubCursorPosition.BEGIN_CURSOR;

// LogHubCursorPosition cursorPosition =
// LogHubCursorPosition.SPECIAL_TIMER_CURSOR;
// int mLoghubCursorStartTime = 0;

// 轮询获取 loghub 数据的时间间隔,间隔越小,抓取越快,单位是毫秒,默认是
// DEFAULT_DATA_FETCH_INTERVAL_MS(200)
// 建议时间间隔 200ms 以上。
long mDataFetchIntervalMillis = 200;

// worker 向服务端汇报心跳的时间间隔,单位是毫秒,建议取值 10000ms。
long heartBeatIntervalMillis = 10000;
// 是否按序消费
boolean consumeInOrder = false;

LogHubConfig config = new LogHubConfig(consumerGroupName, workerInstanceName, loghubEndPoint, project,
logStore, accessId, accessKey, cursorPosition, heartBeatIntervalMillis, consumeInOrder);
// LogHubConfig config = new LogHubConfig(consumerGroupName, workerInstanceName, loghubEndPoint, project, logStore, accessId,
// accessKey, mLoghubCursorStartTime, heartBeatIntervalMillis, consumeInOrder);

ClientWorker worker = new ClientWorker(new LogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
// thread 运行之后,client worker 会自动运行,ClientWorker 扩展了Runnable 接口。
thread.start();
Thread.sleep(60 * 60 * 1000);
// 调用 worker 的 shutdown 函数,退出消费实例,关联的线程也会自动停止。
worker.shutdown();
// ClientWorker 运行过程中会生成多个异步的 Task,shutdown 之后最好等待还在执行的 Task 安全退出,建议30s。
Thread.sleep(30 * 1000);

---------LogHub日志处理工厂
---------
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;

public class LogHubProcessorFactory implements ILogHubProcessorFactory 
{
  public ILogHubProcessor generatorProcessor()
  {   
      // 生成一个消费实例
      return new LogHubProcessor();
  }
}

---------LogHub实际日志处理类
---------
import java.util.List;

import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.log.common.Logs.Log;
import com.aliyun.openservices.log.common.Logs.Log.Content;
import com.aliyun.openservices.log.common.Logs.LogGroup;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;

public class LogHubProcessor implements ILogHubProcessor
{
private int mShardId;
// 记录上次持久化 check point 的时间
private long mLastCheckTime = 0;

public void initialize(int shardId)
{
mShardId = shardId;
}

// 消费数据的主逻辑
public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker)
{
for (LogGroupData logGroup : logGroups)
{
LogGroup lg = logGroup.GetLogGroup();
System.out.println("mShardId:" + mShardId);
System.out.println("source:" + lg.getSource());
System.out.println("topic:" + lg.getTopic());
for (Log log : lg.getLogsList())
{
StringBuilder content = new StringBuilder();
content.append(log.getTime() + "\t");
for (Content cont : log.getContentsList())
{
content.append(cont.getKey() + "=" + cont.getValue() + "\t");
}
System.out.println(content.toString());
}
}
long curTime = System.currentTimeMillis();
// 每隔 60 秒,写一次 check point 到服务端,如果 60 秒内,worker crash,
// 新启动的 worker 会从上一个 checkpoint 其消费数据,有可能有重复数据
if (curTime - mLastCheckTime > 60 * 1000)
{
try
{
// 参数 true 表示立即将 checkpoint 更新到服务端,为 false 会将 checkpoint
// 缓存在本地,默认隔 60s
// 后台会将 checkpoint 刷新到服务端。
checkPointTracker.saveCheckPoint(true);
}
catch (LogHubCheckPointException e)
{
e.printStackTrace();
}
mLastCheckTime = curTime;
}
else
{
try
{
checkPointTracker.saveCheckPoint(false);
}
catch (LogHubCheckPointException e)
{
e.printStackTrace();
}
}
// 返回空表示正常处理数据, 如果需要回滚到上个 check point 的点进行重试的话,可以 return
// checkPointTracker.getCheckpoint()
return null;
}

// 当 worker 退出的时候,会调用该函数,用户可以在此处做些清理工作。
public void shutdown(ILogHubCheckPointTracker checkPointTracker)
{
// 将消费断点保存到服务端。
try
{
checkPointTracker.saveCheckPoint(true);
}
catch (LogHubCheckPointException e)
{
e.printStackTrace();
}
}
}

---------LogHubConfig参数说明 ---------
     //worker 默认的拉取数据的时间间隔
    public static final long DEFAULT_DATA_FETCH_INTERVAL_MS = 200;
    //consumer group 的名字,不能为空,支持 [a-z][0-9] 和'_','-',长度在 [3-63]字符,只能以小写字母和数字开头结尾
    private String mConsumerGroupName;
    //consumer 的名字,必须确保同一个 consumer group 下面的各个 consumer 不重名
    private String mWorkerInstanceName;
    //loghub 数据接口地址
    private String mLogHubEndPoint;
    //项目名称
    private String mProject;
    //日志库名称
    private String mLogStore;
    //云账号的 access key id
    private String mAccessId;
    //云账号的 access key
    private String mAccessKey;
    //用于指出在服务端没有记录 shard 的 checkpoint 的情况下应该从什么位置消费 shard,如果服务端保存了有效的 checkpoint 信息,那么这些取值不起任何作用, mCursorPosition 取值可以是 [BEGIN_CURSOR, END_CURSOR, SPECIAL_TIMER_CURSOR]中的一个,BEGIN_CURSOR 表示从 shard 中的第一条数据开始消费,END_CURSOR 表示从 shard 中的当前时刻的最后一条数据开始消费,SPECIAL_TIMER_CURSOR 和下面的 mLoghubCursorStartTime 配对使用,表示从特定的时刻开始消费数据。
    private LogHubCursorPosition mCursorPosition;
    //当 mCursorPosition 取值为 SPECIAL_TIMER_CURSOR 时,指定消费时间,单位是秒。
    private int  mLoghubCursorStartTime = 0;
    // 轮询获取 loghub 数据的时间间隔,间隔越小,抓取越快,单位是毫秒,默认是 DEFAULT_DATA_FETCH_INTERVAL_MS,建议时间间隔 200ms 以上。
    private long mDataFetchIntervalMillis;
    // worker 向服务端汇报心跳的时间间隔,单位是毫秒,建议取值 10000ms。
    private long mHeartBeatIntervalMillis;
    //是否按序消费
    private boolean mConsumeInOrder; 

猜你喜欢

转载自blog.csdn.net/namelessfighter/article/details/80547347