phoenix 已知 create table 两种方式: 列映射优化 与 关闭列映射 优化,第一种方式 不会 映射历史数据,第二种 会映射 历史数据 ,以下 四种方式都是以 关闭 列优化 方式建表为前提,补充历史数据可以分为两大类同步与异步:
异步方式(建议修改表名为大写)
a)phoenix sql 在线洗数据
适合小批量pheonix 内部导数据 (建议五十万以内)
b)hive sql 离线洗数据
c)spark + phoenix
如果洗数据 逻辑复杂,不能用sql 解决,使用该方式
同步方式(创建 pheonix 表 与 hbase 表名 一致,小写,不建议使用,部分pheonix官方插件使用受限,只能使用jdbc,主要针对 不能 修改表名的表)
d) phoenix 客户端sql 创建
在创建表过程中,需要同步数据,索引也需要同步创建,此方式 需要延长 hbase scanner rpc 超时时间,同步建立表或者索引( 扫描速度 140W/s,可设置 100 分钟)
如果 选择 列映射优化方式,所建 phoenix 表 都不能映射历史数据,直接选择异步方式 刷数据就可以
phoenix sql 离线洗数据
接下来创建表pheonix 表和 索引
create table f_fact_ad_keyword(
id varchar not null,
"cf"."flow_corp_name" varchar,
"cf"."keyword_url_remake" varchar
CONSTRAINT pk PRIMARY KEY (id)
)COLUMN_ENCODED_BYTES = 0;
CREATE LOCAL INDEX f_fact_ad_keyword_idx ON f_fact_ad_keyword ("cf"."keyword_url_remake");
因为表明一样,只是区分大小写,表名加上"" 再建一遍,就可以
数据插入sql
UPSERT INTO F_FACT_AD_KEYWORD(
ID,"flow_corp_name"
) SELECT
ID,"flow_corp_name"
FROM "f_fact_ad_keyword" where "business_date" < '2019-02-08';
hive sql 离线洗数据
通过自动化创建hbase数据源hive外表
首先查看 mysql 是否配置scheme
然后在 68 修改表名,hbase表名和 hive 外表 表名一致
运行脚本,创建外表
接下来创建表pheonix 表和 索引
phoenix sql 已有建表语句,可以参考
创建 pheonix 外表
create external table external_f_fact_ad_keyword (
ID string,account_budget_amount string,flow_corp_name string
)
STORED BY 'org.apache.phoenix.hive.PhoenixStorageHandler'
TBLPROPERTIES (
"phoenix.table.name" = "F_FACT_AD_KEYWORD",
"phoenix.zookeeper.quorum" = "eagle73",
"phoenix.zookeeper.znode.parent" = "/hbase",
"phoenix.zookeeper.client.port" = "2181",
"phoenix.rowkeys" = "ID",
"phoenix.column.mapping" = "id:ID, account_budget_amount:account_budget_amount,flow_corp_name:flow_corp_name"
);
刷入历史数据
insert into table external_f_fact_ad_keyword
(
id,
account_budget_amount ,
flow_corp_name
)
select
word.key id,
word.account_budget_amount account_budget_amount,
word.flow_corp_name flow_corp_name
from f_fact_ad_keyword word
where (word.flow_corp_name is not null or word.account_budget_amount is not null );
spark + phoenix方式
def main(args: Array[String]) {
Logger.getRootLogger.setLevel(Level.INFO)
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sparkSession = SparkSession.builder().master("local[*]").appName("phoenix-test").getOrCreate()
import sparkSession.implicits._
val configuration = new Configuration()
configuration.set("hbase.zookeeper.quorum", "172.16.116.80")
val df = sparkSession.sqlContext.phoenixTableAsDataFrame("\"f_fact_url_account_status\"", Array("ID", "cf.account_id", "cf.account_name","cf.keyword_url_remake"), conf = configuration)
case class f_fact_url_account_status(ID:String,account_id:String,account_name:String,keyword_url_remake:String)
val rdd= df.as[f_fact_url_account_status].map(x=> (x.ID, x.account_id, x.account_name,x.keyword_url_remake)).rdd
rdd.saveToPhoenix("f_fact_url_account_status", Seq("ID", "account_id", "account_name","keyword_url_remake"),zkUrl = Some("jdbc:phoenix:172.16.116.80"))
sparkSession.stop()
}
phoenix 客户端sql 创建
同步建表(不用改表名)
cd /data/soft/phoenix/bin/
vim hbase-site.xml
添加
<!-- add phoenix create table time begion muchaofeng -->
<property>
<name>hbase.client.scanner.timeout.period</name>
<value>6000000</value>
</property>
<property>
<name>hbase.rpc.timeout</name>
<value>6000000</value>
</property>
<property>
<name>hbase.client.operation.timeout</name>
<value>6000000</value>
</property>
<property>
<name>hbase.regionserver.lease.period</name>
<value>6000000</value>
</property>
<property>
<name>hbase.client.scanner.caching</name>
<value>5000</value>
</property>
<property>
<name>phoenix.query.timeoutMs</name>
<value>6000000</value>
</property>
<property>
<name>phoenix.query.keepAliveMs</name>
<value>6000000</value>
</property>
<!-- add phoenix query table time end -->
export HBASE_CONF_PATH=/data/soft/phoenix/bin/
重启 phoenix 客户端,Hbase 不用重启
检表语句 末尾 必须加 COLUMN_ENCODED_BYTES = 0 ,如果使用列优化,该检表语句不可用,使用表明大写方式创建,hive 导入数据方式
CREATE TABLE "dra_oppo"(
id varchar not null,
"cf"."abandon_status" varchar ,
"cf"."valid_follow_counts" varchar ,
"cf"."valid_follow_time" varchar
CONSTRAINT pk PRIMARY KEY (id)
)COLUMN_ENCODED_BYTES = 0;
建表 scanner 速度 130万/s