版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/namelessfighter/article/details/80547347
实时消费LogHub与实时查询LogSearch区别:
功能 | 日志查询(LogSearch) | 日志收集与消费(LogHub) |
---|---|---|
关键词查找 | 支持 | 不支持 |
小量数据读取 | 快 | 快 |
全量数据读取 | 慢(100条日志100ms,不建议这样使用) | 快 (1MB日志10ms,推荐方式) |
读取是否区分topic | 区分 | 不区分,只以shard作为标识 |
读取是否区分shard | 不区分,查询所有shard | 区分,单次读取需要指定shard |
费用 | 较高 | 低 |
适用场景 | 监控、问题调查等需要过滤数据的场景 | 流式计算、批量处理等全量处理场景 |
注意:一条日志数据,只能被消费一次
---------Maven依赖---------
<!-- 日志实时消费 -->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.5</version>
</dependency>
---------LogHub日志处理
---------
// consumer group 的名字,不能为空,支持 [a-z][0-9] 和'_','-',长度在
// [3-63]字符,只能以小写字母和数字开头结尾
String consumerGroupName = "chisal";
// consumer 的名字,必须确保同一个 consumer group 下面的各个 consumer 不重名
String workerInstanceName = "tony";
String loghubEndPoint = S_AliyunAccount.LogEndpoint;
String project = S_AliyunAccount.LogProject;
String logStore = S_AliyunAccount.LogStore;
String accessId = S_AliyunAccount.AccessKeyId;
String accessKey = S_AliyunAccount.AccessSecretKey;
// 用于指出在服务端没有记录 shard 的 checkpoint 的情况下应该从什么位置消费 shard
// 如果服务端保存了有效的 checkpoint信息,那么这些取值不起任何作用,
// mCursorPosition 取值可以是 [BEGIN_CURSOR, END_CURSOR, SPECIAL_TIMER_CURSOR]中的一个,
// BEGIN_CURSOR 表示从 shard 中的第一条数据开始消费
// END_CURSOR 表示从 shard 中的当前时刻的最后一条数据开始消费
// SPECIAL_TIMER_CURSOR 和下面的 mLoghubCursorStartTime 配对使用,表示从特定的时刻开始消费数据。
LogHubCursorPosition cursorPosition = LogHubCursorPosition.BEGIN_CURSOR;
// LogHubCursorPosition cursorPosition =
// LogHubCursorPosition.SPECIAL_TIMER_CURSOR;
// int mLoghubCursorStartTime = 0;
// 轮询获取 loghub 数据的时间间隔,间隔越小,抓取越快,单位是毫秒,默认是
// DEFAULT_DATA_FETCH_INTERVAL_MS(200)
// 建议时间间隔 200ms 以上。
long mDataFetchIntervalMillis = 200;
// worker 向服务端汇报心跳的时间间隔,单位是毫秒,建议取值 10000ms。
long heartBeatIntervalMillis = 10000;
// 是否按序消费
boolean consumeInOrder = false;
LogHubConfig config = new LogHubConfig(consumerGroupName, workerInstanceName, loghubEndPoint, project,
logStore, accessId, accessKey, cursorPosition, heartBeatIntervalMillis, consumeInOrder);
// LogHubConfig config = new LogHubConfig(consumerGroupName, workerInstanceName, loghubEndPoint, project, logStore, accessId,
// accessKey, mLoghubCursorStartTime, heartBeatIntervalMillis, consumeInOrder);
ClientWorker worker = new ClientWorker(new LogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
// thread 运行之后,client worker 会自动运行,ClientWorker 扩展了Runnable 接口。
thread.start();
Thread.sleep(60 * 60 * 1000);
// 调用 worker 的 shutdown 函数,退出消费实例,关联的线程也会自动停止。
worker.shutdown();
// ClientWorker 运行过程中会生成多个异步的 Task,shutdown 之后最好等待还在执行的 Task 安全退出,建议30s。
Thread.sleep(30 * 1000);
---------LogHub日志处理工厂 类 ---------
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
public class LogHubProcessorFactory implements ILogHubProcessorFactory
{
public ILogHubProcessor generatorProcessor()
{
// 生成一个消费实例
return new LogHubProcessor();
}
}
---------LogHub实际日志处理类 ---------
import java.util.List;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.log.common.Logs.Log;
import com.aliyun.openservices.log.common.Logs.Log.Content;
import com.aliyun.openservices.log.common.Logs.LogGroup;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
public class LogHubProcessor implements ILogHubProcessor
{
private int mShardId;
// 记录上次持久化 check point 的时间
private long mLastCheckTime = 0;
public void initialize(int shardId)
{
mShardId = shardId;
}
// 消费数据的主逻辑
public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker)
{
for (LogGroupData logGroup : logGroups)
{
LogGroup lg = logGroup.GetLogGroup();
System.out.println("mShardId:" + mShardId);
System.out.println("source:" + lg.getSource());
System.out.println("topic:" + lg.getTopic());
for (Log log : lg.getLogsList())
{
StringBuilder content = new StringBuilder();
content.append(log.getTime() + "\t");
for (Content cont : log.getContentsList())
{
content.append(cont.getKey() + "=" + cont.getValue() + "\t");
}
System.out.println(content.toString());
}
}
long curTime = System.currentTimeMillis();
// 每隔 60 秒,写一次 check point 到服务端,如果 60 秒内,worker crash,
// 新启动的 worker 会从上一个 checkpoint 其消费数据,有可能有重复数据
if (curTime - mLastCheckTime > 60 * 1000)
{
try
{
// 参数 true 表示立即将 checkpoint 更新到服务端,为 false 会将 checkpoint
// 缓存在本地,默认隔 60s
// 后台会将 checkpoint 刷新到服务端。
checkPointTracker.saveCheckPoint(true);
}
catch (LogHubCheckPointException e)
{
e.printStackTrace();
}
mLastCheckTime = curTime;
}
else
{
try
{
checkPointTracker.saveCheckPoint(false);
}
catch (LogHubCheckPointException e)
{
e.printStackTrace();
}
}
// 返回空表示正常处理数据, 如果需要回滚到上个 check point 的点进行重试的话,可以 return
// checkPointTracker.getCheckpoint()
return null;
}
// 当 worker 退出的时候,会调用该函数,用户可以在此处做些清理工作。
public void shutdown(ILogHubCheckPointTracker checkPointTracker)
{
// 将消费断点保存到服务端。
try
{
checkPointTracker.saveCheckPoint(true);
}
catch (LogHubCheckPointException e)
{
e.printStackTrace();
}
}
}
---------LogHubConfig参数说明
---------
//worker 默认的拉取数据的时间间隔
public static final long DEFAULT_DATA_FETCH_INTERVAL_MS = 200;
//consumer group 的名字,不能为空,支持 [a-z][0-9] 和'_','-',长度在 [3-63]字符,只能以小写字母和数字开头结尾
private String mConsumerGroupName;
//consumer 的名字,必须确保同一个 consumer group 下面的各个 consumer 不重名
private String mWorkerInstanceName;
//loghub 数据接口地址
private String mLogHubEndPoint;
//项目名称
private String mProject;
//日志库名称
private String mLogStore;
//云账号的 access key id
private String mAccessId;
//云账号的 access key
private String mAccessKey;
//用于指出在服务端没有记录 shard 的 checkpoint 的情况下应该从什么位置消费 shard,如果服务端保存了有效的 checkpoint 信息,那么这些取值不起任何作用, mCursorPosition 取值可以是 [BEGIN_CURSOR, END_CURSOR, SPECIAL_TIMER_CURSOR]中的一个,BEGIN_CURSOR 表示从 shard 中的第一条数据开始消费,END_CURSOR 表示从 shard 中的当前时刻的最后一条数据开始消费,SPECIAL_TIMER_CURSOR 和下面的 mLoghubCursorStartTime 配对使用,表示从特定的时刻开始消费数据。
private LogHubCursorPosition mCursorPosition;
//当 mCursorPosition 取值为 SPECIAL_TIMER_CURSOR 时,指定消费时间,单位是秒。
private int mLoghubCursorStartTime = 0;
// 轮询获取 loghub 数据的时间间隔,间隔越小,抓取越快,单位是毫秒,默认是 DEFAULT_DATA_FETCH_INTERVAL_MS,建议时间间隔 200ms 以上。
private long mDataFetchIntervalMillis;
// worker 向服务端汇报心跳的时间间隔,单位是毫秒,建议取值 10000ms。
private long mHeartBeatIntervalMillis;
//是否按序消费
private boolean mConsumeInOrder;