编译特定版本Hadoop访问OSS的Jar包
目前Hadoop trunk 分支已经包含了hadoop-aliyun模块的代码,hadoop已经默认支持aliyun oss的访问支持。但是我们公司的hadoop版本还是CDH5的分支版本,默认是没有oss支持的。
通过下载官方提供的hadoop-aliyun.jar
包放到Hadoop中,发现高版本的hadoop-aliyun.jar
依赖于高版本的httpclient.jar
和高版本的httpcore.jar
, 高版本和低版本造成类访问冲突。
无奈之下,尝试使用官方的代码,自行编译对应hadoop版本的hadoop-aliyun.jar
来实现需求。
操作步骤:
- 下载当前hadoop版本的最新代码(commit id = fa15594ae60),并copy
hadoop-tools/hadoop-aliyun
模块代码到当前hadoop项目中。 - 修改了hadoop-tools pom,添加hadoop-aliyun module
- 修改adoop-aliyun module pom,修改为对应的version,修改
aliyun-sdk-oss
,httpclient
,httpcore
三个依赖包,增加使用shade插件,来shade 高版本的httpclient.jar
和高版本的httpcore.jar
两个包 - 去除所有
org.apache.hadoop.thirdparty.
开头的代码 - 修改
import org.apache.commons.lang3.
为import org.apache.commons.lang.
- copy hadoop-aws 模块下的
BlockingThreadPoolExecutorService
和SemaphoredDelegatingExecutor
两个类 到org.apache.hadoop.util
目录下 - 编译模块:
mvn clean package -pl hadoop-tools/hadoop-aliyun
修改后的 hadoop-aliyun module pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>2.6.0-cdh5.13.3</version>
<relativePath>../../hadoop-project</relativePath>
</parent>
<artifactId>hadoop-aliyun</artifactId>
<name>Apache Hadoop Aliyun OSS support</name>
<packaging>jar</packaging>
<properties>
<file.encoding>UTF-8</file.encoding>
<downloadSources>true</downloadSources>
</properties>
<profiles>
<profile>
<id>tests-off</id>
<activation>
<file>
<missing>src/test/resources/auth-keys.xml</missing>
</file>
</activation>
<properties>
<maven.test.skip>true</maven.test.skip>
</properties>
</profile>
<profile>
<id>tests-on</id>
<activation>
<file>
<exists>src/test/resources/auth-keys.xml</exists>
</file>
</activation>
<properties>
<maven.test.skip>false</maven.test.skip>
</properties>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<configuration>
<findbugsXmlOutput>true</findbugsXmlOutput>
<xmlOutput>true</xmlOutput>
<excludeFilterFile>${basedir}/dev-support/findbugs-exclude.xml
</excludeFilterFile>
<effort>Max</effort>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>deplist</id>
<phase>compile</phase>
<goals>
<goal>list</goal>
</goals>
<configuration>
<!-- build a shellprofile -->
<outputFile>
${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt
</outputFile>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>shade-aliyun-sdk-oss</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
<createDependencyReducedPom>true</createDependencyReducedPom>
<createSourcesJar>true</createSourcesJar>
<relocations>
<relocation>
<pattern>org.apache.http</pattern>
<shadedPattern>com.xxx.thirdparty.org.apache.http</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-examples</artifactId>
<scope>test</scope>
<type>jar</type>
</dependency>
</dependencies>
</project>
Hadoop 和Spark集成读取OSS文件
HDFS 读取 OSS 文件
HDFS访问OSS需要修改 core-site.xml,增加OSS的反问配置。再把hadoop-aliyun.jar
放到hadoop节点的commons jar包目录下
core-site.xml 配置
<xml>
<property>
<name>fs.oss.endpoint</name>
<value>oss-cn-zhangjiakou.aliyuncs.com</value>
</property>
<property>
<name>fs.oss.accessKeyId</name>
<value>xxx</value>
</property>
<property>
<name>fs.oss.accessKeySecret</name>
<value>xxxx</value>
</property>
<property>
<name>fs.oss.impl</name>
<value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>
</property>
<property>
<name>fs.oss.buffer.dir</name>
<value>/tmp/oss</value>
</property>
<property>
<name>fs.oss.connection.secure.enabled</name>
<value>false</value>
</property>
<property>
<name>fs.oss.connection.maximum</name>
<value>2048</value>
</property>
</xml>
测试HDFS读取OSS文件
bin/hdfs dfs -ls oss://bucket/OSS_FILES
Spark读取OSS文件
因为Spark的可扩展性比较好,可以在不修改hadoop任何配置的前提下,增加Spark对OSS文件的读取
spark.hadoop.fs.oss.endpoint=oss-cn-zhangjiakou.aliyuncs.com
spark.hadoop.fs.oss.accessKeyId=xxx
spark.hadoop.fs.oss.accessKeySecret=xxx
spark.hadoop.fs.oss.impl=org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
spark.hadoop.fs.oss.buffer.dir=/tmp/oss
spark.hadoop.fs.oss.connection.secure.enabled=false
spark.hadoop.fs.oss.connection.maximum=2048
再把hadoop-aliyun.jar
放到spark的依赖jar包目录下即可
测试Spark读取OSS文件
val df = spark.read.format("json").load("oss://bucket/OSS_FILES")
Spark SQL 读取 OSS文件
Spark SQL 读取OSS有两种方式
第一种是建立数据的外部视图的方式,这种方式是无catalog管理模式,有如下几个特点:
- spark session回话关闭时,view会删除
- view 只能查询一个目录下的数据,暂时没有看到分区数据的支持
- 建立view的时候,会对数据进行sample采样,数据比较大时,比较耗时
CREATE TEMPORARY VIEW view_name
USING org.apache.spark.sql.json
OPTIONS (
path "oss://bucket/OSS_DIRECTORY"
);
第二种方式是直接建立OSS文件的外部表
如果要建立OSS文件的catalog外部表,需要hive metastore支持OSS文件的读取。此时必须要Hadoop支持OSS文件的读取(参考HDFS 读取 OSS 文件),另外把hadoop-aliyun.jar
放到hive节点的依赖包目录下,重启Hive MetaStore 服务。
cp hadoop-aliyun-2.6.0-cdh5.13.3.jar /opt/cloudera/parcels/CDH/lib/hadoop-hdfs/
cp hadoop-aliyun-2.6.0-cdh5.13.3.jar /opt/cloudera/parcels/CDH/lib/hive/lib
Spark SQL建立外部表测试
CREATE TABLE app_log (
`@timestamp` timestamp ,
`@version` string ,
appname string ,
containerMeta struct<appName:string,containerId:string,procName:string> ,
contextMap struct<aid:string,sid:string,spanId:string,storeId:string,traceId:string> ,
message string ,
year string,
month string,
day string,
hour string
)
USING org.apache.spark.sql.json
PARTITIONED BY (year, month, day, hour)
OPTIONS (
path "oss://bucket/OSS_DIRECTORY"
);
-- 更新表元数据
MSCK REPAIR TABLE app_log;