转角遇见DataX
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。
阿里datax网址(本文摘自阿里datax介绍):https://github.com/alibaba/DataX
Features
DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
Support Data Channels
DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:DataX数据源参考指南
类型 | 数据源 | Reader(读) | Writer(写) | 文档 |
---|---|---|---|---|
RDBMS 关系型数据库 | MySQL | √ | √ | 读 、写 |
Oracle | √ | √ | 读 、写 | |
SQLServer | √ | √ | 读 、写 | |
PostgreSQL | √ | √ | 读 、写 | |
DRDS | √ | √ | 读 、写 | |
通用RDBMS(支持所有关系型数据库) | √ | √ | 读 、写 | |
阿里云数仓数据存储 | ODPS | √ | √ | 读 、写 |
ADS | √ | 写 | ||
OSS | √ | √ | 读 、写 | |
OCS | √ | √ | 读 、写 | |
NoSQL数据存储 | OTS | √ | √ | 读 、写 |
Hbase0.94 | √ | √ | 读 、写 | |
Hbase1.1 | √ | √ | 读 、写 | |
Phoenix4.x | √ | √ | 读 、写 | |
Phoenix5.x | √ | √ | 读 、写 | |
MongoDB | √ | √ | 读 、写 | |
Hive | √ | √ | 读 、写 | |
无结构化数据存储 | TxtFile | √ | √ | 读 、写 |
FTP | √ | √ | 读 、写 | |
HDFS | √ | √ | 读 、写 | |
Elasticsearch | √ | 写 | ||
时间序列数据库 | OpenTSDB | √ | 读 | |
TSDB | √ | 写 |
mysql的读操作
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"id",
"name"
],
"splitPk": "db_id",
"connection": [
{
"table": [
"table"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/database"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print":true
}
}
}
]
}
}
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"connection": [
{
"querySql": [
"select db_id,on_line_flag from db_info where db_id < 10;"
],
"jdbcUrl": [
"jdbc:mysql://bad_ip:3306/database",
"jdbc:mysql://127.0.0.1:bad_port/database",
"jdbc:mysql://127.0.0.1:3306/database"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false,
"encoding": "UTF-8"
}
}
}
]
}
}
splitPk
-
- 描述:MysqlReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DATAX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。 推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。 目前splitPk仅支持整形数据切分,`不支持浮点、字符串、日期等其他类型`。如果用户指定其他非支持类型,MysqlReader将报错!
如果splitPk不填写,包括不提供splitPk或者splitPk值为空,DataX视作使用单通道同步该表数据。
- 必选:否
- 默认值:空
querySQL所
描述:在有些业务场景下,在这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL当用户配置了这一项之后,数据X系统就会忽略表,列这些配置类型,直接使用此配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用从table_a中选择a,b加入table_a.id = table_b.id上的table_b
当用户配置querySql时,MysqlReader直接忽略table、column、where条件的配置,querySQL所优先级大于表,列,其中选项。
必选:否
默认值:无
类型转换
目前MysqlReader支持大部分Mysql类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。
下面列出MysqlReader针对Mysql类型转换列表:
DataX 内部类型 | Mysql 数据类型 |
---|---|
Long | int, tinyint, smallint, mediumint, int, bigint |
Double | float, double, decimal |
String | varchar, char, tinytext, text, mediumtext, longtext, year |
Date | date, datetime, timestamp, time |
Boolean | bit, bool |
Bytes | tinyblob, mediumblob, blob, longblob, varbinary |
请注意:
- 除上述罗列字段类型外,其他类型均不支持`。
- `tinyint(1) DataX视作为整形`。
- `year DataX视作为字符串类型`
- `bit DataX属于未定义行为`。
mysql的写操作
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 19880808,
"type": "long"
},
{
"value": "1988-08-08 08:08:08",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 1000
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "root",
"column": [
"id",
"name"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from test"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk",
"table": [
"test"
]
}
]
}
}
}
]
}
}
column
-
描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: “column”: [“id”,“name”,“age”]。如果要依次写入全部列,使用表示, 例如: “column”: [""]。
**column配置项必须指定,不能留空!** 注意:1、我们强烈不推荐你这样配置,因为当你目的表字段个数、类型等有改动时,你的任务可能运行不正确或者失败 2、 column 不能配置任何常量值
-
必选:是
-
默认值:否
类似 MysqlReader ,目前 MysqlWriter 支持大部分 Mysql 类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。
下面列出 MysqlWriter 针对 Mysql 类型转换列表:
DataX 内部类型 | Mysql 数据类型 |
---|---|
Long | int, tinyint, smallint, mediumint, int, bigint, year |
Double | float, double, decimal |
String | varchar, char, tinytext, text, mediumtext, longtext |
Date | date, datetime, timestamp, time |
Boolean | bit, bool |
Bytes | tinyblob, mediumblob, blob, longblob, varbinary |
bit类型目前是未定义类型转换
DataX HdfsReader 插件文档
1 快速介绍
HdfsReader提供了读取分布式文件系统数据存储的能力。在底层实现上,HdfsReader获取分布式文件系统上文件的数据,并转换为DataX传输协议传递给Writer。
目前HdfsReader支持的文件格式有textfile(text)、orcfile(orc)、rcfile(rc)、sequence file(seq)和普通逻辑二维表(csv)类型格式的文件,且文件内容存放的必须是一张逻辑意义上的二维表。
HdfsReader需要Jdk1.7及以上版本的支持。
2功能和限制
HdfsReader实现了从Hadoop分布式文件系统Hdfs中读取文件数据并转为DataX协议的功能。textfile是Hive建表时默认使用的存储格式,数据不做压缩,本质上textfile就是以文本的形式将数据存放在hdfs中,对于DataX而言,HdfsReader实现上类比TxtFileReader,有诸多相似之处。orcfile,它的全名是Optimized Row Columnar file,是对RCFile做了优化。据官方文档介绍,这种文件格式可以提供一种高效的方法来存储Hive数据。HdfsReader利用Hive提供的OrcSerde类,读取解析orcfile文件的数据。目前HdfsReader支持的功能如下:
- 支持textfile、orcfile、rcfile、sequence file和csv格式的文件,且要求文件内容存放的是一张逻辑意义上的二维表。
- 支持多种类型数据读取(使用String表示),支持列裁剪,支持列常量
- 支持递归读取、支持正则表达式("*“和”?")。
- 支持orcfile数据压缩,目前支持SNAPPY,ZLIB两种压缩方式。
- 多个File可以支持并发读取。
- 支持sequence file数据压缩,目前支持lzo压缩方式。
- csv类型支持压缩格式有:gzip、bz2、zip、lzo、lzo_deflate、snappy。
- 目前插件中Hive版本为1.1.1,Hadoop版本为2.7.1(Apache[为适配JDK1.7],在Hadoop 2.5.0, Hadoop 2.6.0 和Hive 1.2.0测试环境中写入正常;其它版本需后期进一步测试;
- 支持kerberos认证(注意:如果用户需要进行kerberos认证,那么用户使用的Hadoop集群版本需要和hdfsreader的Hadoop版本保持一致,如果高于hdfsreader的Hadoop版本,不保证kerberos认证有效)
我们暂时不能做到:
- 单个File支持多线程并发读取,这里涉及到单个File内部切分算法。二期考虑支持。
- 目前还不支持hdfs HA;
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/user/hive/warehouse/mytable01/*",
"defaultFS": "hdfs://xxx:port",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "boolean"
},
{
"type": "string",
"value": "hello"
},
{
"index": 2,
"type": "double"
}
],
"fileType": "orc",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
-
path
-
描述:要读取的文件路径,如果要读取多个文件,可以使用正则表达式"*",注意这里可以支持填写多个路径。。
当指定单个Hdfs文件,HdfsReader暂时只能使用单线程进行数据抽取。二期考虑在非压缩文件情况下针对单个File可以进行多线程并发读取。
当指定多个Hdfs文件,HdfsReader支持使用多线程进行数据抽取。线程并发数通过通道数指定。
当指定通配符,HdfsReader尝试遍历出多个文件信息。例如: 指定/代表读取/目录下所有的文件,指定/bazhen/*代表读取bazhen目录下游所有的文件。HdfsReader目前只支持"“和”?"作为文件通配符。
特别需要注意的是,DataX会将一个作业下同步的所有的文件视作同一张数据表。用户必须自己保证所有的File能够适配同一套schema信息。并且提供给DataX权限可读。
-
必选:是
-
默认值:无
-
-
defaultFS
-
描述:Hadoop hdfs文件系统namenode节点地址。
目前HdfsReader已经支持Kerberos认证,如果需要权限认证,则需要用户配置kerberos参数,见下面
-
必选:是
-
默认值:无
-
-
fileType
-
描述:文件的类型,目前只支持用户配置为"text"、“orc”、“rc”、“seq”、“csv”。
text表示textfile文件格式
orc表示orcfile文件格式
rc表示rcfile文件格式
seq表示sequence file文件格式
csv表示普通hdfs文件格式(逻辑二维表)
特别需要注意的是,HdfsReader能够自动识别文件是orcfile、textfile或者还是其它类型的文件,但该项是必填项,HdfsReader则会只读取用户配置的类型的文件,忽略路径下其他格式的文件
另外需要注意的是,由于textfile和orcfile是两种完全不同的文件格式,所以HdfsReader对这两种文件的解析方式也存在差异,这种差异导致hive支持的复杂复合类型(比如map,array,struct,union)在转换为DataX支持的String类型时,转换的结果格式略有差异,比如以map类型为例:
orcfile map类型经hdfsreader解析转换成datax支持的string类型后,结果为"{job=80, team=60, person=70}"
textfile map类型经hdfsreader解析转换成datax支持的string类型后,结果为"job:80,team:60,person:70"
从上面的转换结果可以看出,数据本身没有变化,但是表示的格式略有差异,所以如果用户配置的文件路径中要同步的字段在Hive中是复合类型的话,建议配置统一的文件格式。
如果需要统一复合类型解析出来的格式,我们建议用户在hive客户端将textfile格式的表导成orcfile格式的表
-
必选:是
-
默认值:无
-
-
column
-
描述:读取字段列表,type指定源数据的类型,index指定当前列来自于文本第几列(以0开始),value指定当前类型为常量,不从源头文件读取数据,而是根据value值自动生成对应的列。
默认情况下,用户可以全部按照String类型读取数据,配置如下:
"column": ["*"]
用户可以指定Column字段信息,配置如下:
-
{ “type”: “long”, “index”: 0 //从本地文件文本第一列获取int字段 }, { “type”: “string”, “value”: “alibaba” //HdfsReader内部生成alibaba的字符串字段作为当前字段 } ```
对于用户指定Column信息,type必须填写,index/value必须选择其一。
* 必选:是 <br />
* 默认值:全部按照string类型读取 <br />
-
fieldDelimiter
- 描述:读取的字段分隔符
另外需要注意的是,HdfsReader在读取textfile数据时,需要指定字段分割符,如果不指定默认为’,’,HdfsReader在读取orcfile时,用户无需指定字段分割符
- 必选:否
- 默认值:,
-
encoding
- 描述:读取文件的编码配置。
- 必选:否
- 默认值:utf-8
-
nullFormat
-
描述:文本文件中无法使用标准字符串定义null(空指针),DataX提供nullFormat定义哪些字符串可以表示为null。
例如如果用户配置: nullFormat:"\N",那么如果源头数据是"\N",DataX视作null字段。
-
必选:否
-
默认值:无
compress
- 描述:当fileType(文件类型)为csv下的文件压缩方式,目前仅支持 gzip、bz2、zip、lzo、lzo_deflate、hadoop-snappy、framing-snappy压缩;值得注意的是,lzo存在两种压缩格式:lzo和lzo_deflate,用户在配置的时候需要留心,不要配错了;另外,由于snappy目前没有统一的stream format,datax目前只支持最主流的两种:hadoop-snappy(hadoop上的snappy stream format)和framing-snappy(google建议的snappy stream format);orc文件类型下无需填写。
- 必选:否
- 默认值:无
-
hadoopConfig
-
描述:hadoopConfig里可以配置与Hadoop相关的一些高级参数,比如HA的配置。
"hadoopConfig":{ "dfs.nameservices": "testDfs", "dfs.ha.namenodes.testDfs": "namenode1,namenode2", "dfs.namenode.rpc-address.aliDfs.namenode1": "", "dfs.namenode.rpc-address.aliDfs.namenode2": "", "dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }
3.4 按分区读取
Hive在建表的时候,可以指定分区partition,例如创建分区partition(day=“20150820”,hour=“09”),对应的hdfs文件系统中,相应的表的目录下则会多出/20150820和/09两个目录,且/20150820是/09的父目录。了解了分区都会列成相应的目录结构,在按照某个分区读取某个表所有数据时,则只需配置好json中path的值即可。
比如需要读取表名叫mytable01下分区day为20150820这一天的所有数据,则配置如下:
"path": "/user/hive/warehouse/mytable01/20150820/*"
DataX HdfsWriter 插件文档
-
(1)、目前HdfsWriter仅支持textfile和orcfile两种格式的文件,且文件内容存放的必须是一张逻辑意义上的二维表;
-
(2)、由于HDFS是文件系统,不存在schema的概念,因此不支持对部分列写入;
-
(3)、目前仅支持与以下Hive数据类型: 数值型:TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE 字符串类型:STRING,VARCHAR,CHAR 布尔类型:BOOLEAN 时间类型:DATE,TIMESTAMP 目前不支持:decimal、binary、arrays、maps、structs、union类型;
-
(4)、对于Hive分区表目前仅支持一次写入单个分区;
-
(5)、对于textfile需用户保证写入hdfs文件的分隔符与在Hive上创建表时的分隔符一致,从而实现写入hdfs数据与Hive表字段关联;
-
(6)、HdfsWriter实现过程是:首先根据用户指定的path,创建一个hdfs文件系统上不存在的临时目录,创建规则:path_随机;然后将读取的文件写入这个临时目录;全部写入后再将这个临时目录下的文件移动到用户指定目录(在创建文件时保证文件名不重复); 最后删除临时目录。如果在中间过程发生网络中断等情况造成无法与hdfs建立连接,需要用户手动删除已经写入的文件和临时目录。
-
(7)、目前插件中Hive版本为1.1.1,Hadoop版本为2.7.1(Apache[为适配JDK1.7],在Hadoop 2.5.0, Hadoop 2.6.0 和Hive 1.2.0测试环境中写入正常;其它版本需后期进一步测试;
-
(8)、目前HdfsWriter支持Kerberos认证(注意:如果用户需要进行kerberos认证,那么用户使用的Hadoop集群版本需要和hdfsreader的Hadoop版本保持一致,如果高于hdfsreader的Hadoop版本,不保证kerberos认证有效)
官网案例
{
"setting": {},
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": ["/Users/shf/workplace/txtWorkplace/job/dataorcfull.txt"],
"encoding": "UTF-8",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "long"
},
{
"index": 2,
"type": "long"
},
{
"index": 3,
"type": "long"
},
{
"index": 4,
"type": "DOUBLE"
},
{
"index": 5,
"type": "DOUBLE"
},
{
"index": 6,
"type": "STRING"
},
{
"index": 7,
"type": "STRING"
},
{
"index": 8,
"type": "STRING"
},
{
"index": 9,
"type": "BOOLEAN"
},
{
"index": 10,
"type": "date"
},
{
"index": 11,
"type": "date"
}
],
"fieldDelimiter": "\t"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://xxx:port",
"fileType": "orc",
"path": "/user/hive/warehouse/writerorc.db/orcfull",
"fileName": "xxxx",
"column": [
{
"name": "col1",
"type": "TINYINT"
},
{
"name": "col2",
"type": "SMALLINT"
},
{
"name": "col3",
"type": "INT"
},
{
"name": "col4",
"type": "BIGINT"
},
{
"name": "col5",
"type": "FLOAT"
},
{
"name": "col6",
"type": "DOUBLE"
},
{
"name": "col7",
"type": "STRING"
},
{
"name": "col8",
"type": "VARCHAR"
},
{
"name": "col9",
"type": "CHAR"
},
{
"name": "col10",
"type": "BOOLEAN"
},
{
"name": "col11",
"type": "date"
},
{
"name": "col12",
"type": "TIMESTAMP"
}
],
"writeMode": "append",
"fieldDelimiter": "\t",
"compress":"NONE"
}
}
}
]
}
}
-
defaultFS
- 描述:Hadoop hdfs文件系统namenode节点地址。格式:hdfs://ip:端口;例如:hdfs://127.0.0.1:9000
- 必选:是
- 默认值:无
-
fileType
-
描述:文件的类型,目前只支持用户配置为"text"或"orc"。
text表示textfile文件格式
orc表示orcfile文件格式
-
必选:是
-
默认值:无
-
-
path
- 描述:存储到Hadoop hdfs文件系统的路径信息,HdfsWriter会根据并发配置在Path目录下写入多个文件。为与hive表关联,请填写hive表在hdfs上的存储路径。例:Hive上设置的数据仓库的存储路径为:/user/hive/warehouse/ ,已建立数据库:test,表:hello;则对应的存储路径为:/user/hive/warehouse/test.db/hello
- 必选:是
- 默认值:无
-
fileName
- 描述:HdfsWriter写入时的文件名,实际执行时会在该文件名后添加随机的后缀作为每个线程写入实际文件名。
- 必选:是
- 默认值:无
-
column
-
描述:写入数据的字段,不支持对部分列写入。为与hive中表关联,需要指定表中所有字段名和字段类型,其中:name指定字段名,type指定字段类型。
用户可以指定Column字段信息,配置如下:
"column": [ { "name": "userName", "type": "string" }, { "name": "age", "type": "long" } ]
-
必选:是
-
默认值:无
-
-
writeMode
- 描述:hdfswriter写入前数据清理处理模式:
- append,写入前不做任何处理,DataX hdfswriter直接使用filename写入,并保证文件名不冲突。
- nonConflict,如果目录下有fileName前缀的文件,直接报错。
- 必选:是
- 默认值:无
- 描述:hdfswriter写入前数据清理处理模式:
-
fieldDelimiter
- 描述:hdfswriter写入时的字段分隔符,需要用户保证与创建的Hive表的字段分隔符一致,否则无法在Hive表中查到数据
- 必选:是
- 默认值:无
-
compress
- 描述:hdfs文件压缩类型,默认不填写意味着没有压缩。其中:text类型文件支持压缩类型有gzip、bzip2;orc类型文件支持的压缩类型有NONE、SNAPPY(需要用户安装SnappyCodec)。
- 必选:否
- 默认值:无压缩
-
hadoopConfig
-
描述:hadoopConfig里可以配置与Hadoop相关的一些高级参数,比如HA的配置。
"hadoopConfig":{ "dfs.nameservices": "testDfs", "dfs.ha.namenodes.testDfs": "namenode1,namenode2", "dfs.namenode.rpc-address.aliDfs.namenode1": "", "dfs.namenode.rpc-address.aliDfs.namenode2": "", "dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }
-
必选:否
-
默认值:无
-
-
encoding
- 描述:写文件的编码配置。
- 必选:否
- 默认值:utf-8,慎重修改
类型转换
目前 HdfsWriter 支持大部分 Hive 类型,请注意检查你的类型。
下面列出 HdfsWriter 针对 Hive 数据类型转换列表:
DataX 内部类型 | HIVE 数据类型 |
---|---|
Long | TINYINT,SMALLINT,INT,BIGINT |
Double | FLOAT,DOUBLE |
String | STRING,VARCHAR,CHAR |
Boolean | BOOLEAN |
Date | DATE,TIMESTAMP |
自己的模板案例
mysql2hdfs普通格式
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"name",
"age"
],
"splitPk": "age",
"connection": [
{
"table": [
"USER"
],
"jdbcUrl": [
"jdbc:mysql://hadoop-02:3306/test"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://hadoop-01:9000",
"fileType": "text",
"path": "/input/mysql2hive",
"fileName": "1.dat",
"column": [
{
"name": "name",
"type": "STRING"
},
{
"name": "name",
"type": "int"
}
],
"writeMode": "append",
"fieldDelimiter": "\t",
}
}
}
]
}
}
mysql2hdfs(orc)加密版
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"name",
"age"
],
"splitPk": "age",
"connection": [
{
"table": [
"USER"
],
"jdbcUrl": [
"jdbc:mysql://hadoop-02:3306/test"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://hadoop-01:9000",
"fileType": "orc",
"path": "/input/mysql2hive",
"fileName": "mysql2hdfs",
"column": [
{
"name": "name",
"type": "STRING"
},
{
"name": "name",
"type": "int"
}
],
"writeMode": "append",
"fieldDelimiter": "\t",
}
}
}
]
}
}
将mysql数据导入到hive表(orc格式)中
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"name",
"age"
],
"splitPk": "age",
"connection": [
{
"table": [
"USER"
],
"jdbcUrl": [
"jdbc:mysql://hadoop-02:3306/test"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://hadoop-01:9000",
"fileType": "orc",
"path": "/user/hive/workmouse/ali_test.db/part5",
"fileName": "part5",
"column": [
{
"name": "name",
"type": "STRING"
},
{
"name": "age",
"type": "int"
}
],
"writeMode": "append",
"fieldDelimiter": "\t",
}
}
}
]
}
}
执行语句
python /usr/local/datax/bin/datax.py ./mysql2hdfs.json
建表语句
create table if not exists part5(
name string,
age int
)
row format delimited fields terminated by '\t'
stored as orc
;
HA导入
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"name",
"age"
],
"splitPk": "age",
"connection": [
{
"table": [
"USER"
],
"jdbcUrl": [
"jdbc:mysql://hadoop-02:3306/test"
]
}
],
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://zbs",
"fileType": "orc",
"path": "/user/hive/workmouse/qf_test.db/part5",
"fileName": "part5",
"column": [
{
"name": "name",
"type": "STRING"
},
{
"name": "age",
"type": "int"
}
],
"writeMode": "append",
"fieldDelimiter": "\t",
"hadoopConfig": {
"dfs.nameservices": "zbs",
"dfs.ha.namenodes.zbs": "nn1,nn2",
"dfs.namenode.rpc-address.zbs.nn1": "hadoop-01:9000",
"dfs.namenode.rpc-address.zbs.nn2": "hadoop-02:9000",
"dfs.client.failover.proxy.provider.zbs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
}
}
}
]
}
}
hive到mysql
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"name",
"age"
],
"connection": [
{
"querySql": [
"select name,age from USER"
],
"jdbcUrl": [
"jdbc:mysql://hadoop-02:3306/test"
]
}
],
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://hadoop-01:9000",
"fileType": "text",
"path": "/user/hive/workmouse/qf_test.db/part6",
"fileName": "part6",
"column": [
{
"name": "name",
"type": "STRING"
},
{
"name": "age",
"type": "int"
}
],
"writeMode": "append",
"encoding": "UTF-8",
"fieldDelimiter": "\t",
}
}
}
]
}
}
hdfs导入mysql
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://hadoop-01:9000",
"fileType": "text",
"path": "/user/hive/workmouse/ali_test.db/part6",
"column": [
{
"name": "name",
"type": "STRING"
},
{
"name": "age",
"type": "int"
}
],
"fileType":"text",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"name",
"age"
],
"connection": [
{
"table":[
"USER"
],
"jdbcUrl": [
"jdbc:mysql://hadoop-02:3306/test"
]
}
],
}
}
}
]
}
}
“name”,
“type”: “STRING”
},
{
“name”: “age”,
“type”: “int”
}
],
“fileType”:“text”,
“encoding”: “UTF-8”,
“fieldDelimiter”: “,”
}
},
“writer”: {
“name”: “mysqlreader”,
“parameter”: {
“username”: “root”,
“password”: “123456”,
“column”: [
“name”,
“age”
],
“connection”: [
{
“table”:[
“USER”
],
"jdbcUrl": [
"jdbc:mysql://hadoop-02:3306/test"
]
}
],
}
}
}
]
}
}