Tranquility 写入数据到 Druid

需求

  • 启动一个单机版 Druid
  • 直接通过一个客户端写一些数据进去,不通过其他 kafka 或者 批量导入之类的

调研出来的方案

Druid 单机配置

Tranquility

  • Ctrl+C 杀掉 Druid

  • 下载 Tranquility,在 Druid 根目录下做

curl http://static.druid.io/tranquility/releases/tranquility-distribution-0.8.2.tgz -o tranquility-distribution-0.8.2.tgz
tar -xzf tranquility-distribution-0.8.2.tgz
mv tranquility-distribution-0.8.2 tranquility
  • 修改 quickstart/tutorial/conf/tutorial-cluster.conf 文件,把 tranquility-server 那一行前的 # 去掉
# Uncomment to use Tranquility Server
!p95 tranquility-server tranquility/bin/tranquility server -configFile quickstart/tutorial/conf/tranquility/wikipedia-server.json -Ddruid.extensions.loadList=[]
  • 启动 Druid + Tranquility
bin/supervise -c quickstart/tutorial/conf/tutorial-cluster.conf

示例 maven 项目

  • 项目代码:https://github.com/qiaojialin/TranquilityExample

  • 新建一个 resources 文件夹

  • 在里面新建一个文件: example.json:

    {
      "dataSources": [
        {
          "spec": {
            "dataSchema": {
              "dataSource": "wikipedia",
              "parser": {
                "type": "string",
                "parseSpec": {
                  "format": "json",
                  "timestampSpec": {
                    "column": "timestamp",
                    "format": "auto"
                  },
                  "dimensionsSpec": {
                    "dimensions": [
                      "page",
                      "language",
                      "user",
                      "unpatrolled",
                      "newPage",
                      "robot",
                      "anonymous",
                      "namespace",
                      "continent",
                      "country",
                      "region",
                      "city"
                    ],
                    "dimensionExclusions": [],
                    "spatialDimensions": []
                  }
                }
              },
              "metricsSpec": [
                {
                  "type": "count",
                  "name": "count"
                },
                {
                  "type": "doubleSum",
                  "name": "added",
                  "fieldName": "added"
                },
                {
                  "type": "doubleSum",
                  "name": "deleted",
                  "fieldName": "deleted"
                },
                {
                  "type": "doubleSum",
                  "name": "delta",
                  "fieldName": "delta"
                }
              ],
              "granularitySpec": {
                "type": "uniform",
                "segmentGranularity": "DAY",
                "queryGranularity": "NONE"
              }
            },
            "tuningConfig": {
              "type": "realtime",
              "maxRowsInMemory": 100000,
              "intermediatePersistPeriod": "PT10m",
              "windowPeriod": "PT10M"
            }
          }
        }
      ],
      "properties": {
        "zookeeper.connect": "localhost:2181",
        "druid.selectors.indexing.serviceName": "druid/overlord",
        "druid.discovery.curator.path": "/druid/discovery",
        "druidBeam.taskLocator": "overlord",
        "druidBeam.overlordPollPeriod": "PT5S"
      }
    }
    
    
  • 示例代码

    	
    import com.google.common.collect.ImmutableMap;
    import com.metamx.common.logger.Logger;
    import com.metamx.tranquility.config.DataSourceConfig;
    import com.metamx.tranquility.config.PropertiesBasedConfig;
    import com.metamx.tranquility.config.TranquilityConfig;
    import com.metamx.tranquility.druid.DruidBeams;
    import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    import com.metamx.tranquility.tranquilizer.Tranquilizer;
    import com.twitter.util.FutureEventListener;
    import org.joda.time.DateTime;
    import scala.runtime.BoxedUnit;
    
    import java.io.InputStream;
    import java.util.Map;
    
    public class JavaExample
    {
      private static final Logger log = new Logger(JavaExample.class);
    
      public static void main(String[] args)
      {
        // Read config from "example.json" on the classpath.
        final InputStream configStream = JavaExample.class.getClassLoader().getResourceAsStream("example.json");
        final TranquilityConfig<PropertiesBasedConfig> config = TranquilityConfig.read(configStream);
        final DataSourceConfig<PropertiesBasedConfig> wikipediaConfig = config.getDataSource("wikipedia");
        final Tranquilizer<Map<String, Object>> sender = DruidBeams.fromConfig(wikipediaConfig)
            .buildTranquilizer(wikipediaConfig.tranquilizerBuilder());
    
        sender.start();
    
        try {
          // Send 10000 objects
    
          for (int i = 0; i < 10000; i++) {
            // Build a sample event to send; make sure we use a current date
            final Map<String, Object> obj = ImmutableMap.<String, Object>of(
                "timestamp", new DateTime().toString(),
                "page", "foo",
                "added", i
            );
    
            // Asynchronously send event to Druid:
            sender.send(obj).addEventListener(
                new FutureEventListener<BoxedUnit>()
                {
                  @Override
                  public void onSuccess(BoxedUnit value)
                  {
                    log.info("Sent message: %s", obj);
                  }
    
                  @Override
                  public void onFailure(Throwable e)
                  {
                    if (e instanceof MessageDroppedException) {
                      log.warn(e, "Dropped message: %s", obj);
                    } else {
                      log.error(e, "Failed to send message: %s", obj);
                    }
                  }
                }
            );
          }
        }
        finally {
          sender.flush();
          sender.stop();
        }
      }
    }
    
    
  • 添加依赖(这一步最坑了)

    • 保证 jackson 和 tranquility 的 scala 都是 2.11
    • jackson-2.9.2 是可以的,2.9.8 的就不行,其他版本没试了
    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-scala_2.11</artifactId>
      <version>2.9.2</version>
    </dependency>
    <dependency>
      <groupId>com.metamx</groupId>
      <artifactId>java-util</artifactId>
      <version>0.27.9</version>
    </dependency>
    <dependency>
      <groupId>joda-time</groupId>
      <artifactId>joda-time</artifactId>
      <version>2.9.4</version>
    </dependency>
    <dependency>
      <groupId>io.druid</groupId>
      <artifactId>tranquility-core_2.11</artifactId>
      <version>0.8.2</version>
    </dependency>
    
  • 以上是在一个干净的项目中测试的,在我的另一个项目中不知道又跟哪个冲突了,又开始找不到方法了,于是又加了三个依赖才好。

    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-scala_2.11</artifactId>
      <version>2.9.2</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-jaxb-annotations</artifactId>
      <version>2.9.2</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.9.2</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.9.2</version>
    </dependency>
    

其他坑

依赖还不是最坑的,我昨天还能写进去数据,今天就写不进去了。各种 MessageDroppedException,网上说是时间窗口的问题,调整了日期也没用。

最坑的是 Tranquility 只支持一段时间窗口的数据写入,很不爽,于是放弃之,准备转到 Kafka Indexing service。

猜你喜欢

转载自blog.csdn.net/qiaojialin/article/details/88663054