之前简单了解了GeoSpark的一些基础知识,以及如何运行,但是仅仅在standalone模式运行测试通过,之后拿到集群上进行Spark yarn模式运行一直报错。记录一下填坑历史:
由于时间过去比较久,之前过程也一直没有记录,仅仅是回忆自己部分的调试过程。
jar包配置
其实到现在我还是不太清楚这部分是怎么更好的解决,希望在我叙述完,知道的同学可以帮我解惑。
因为考虑到项目的本地运行是需要很多的jar包依赖,但是在集群环境上并没有这些jar包,而在看了几篇博客进行打包只是将compile
的包保留上传,那么我在本地打包上传到集群时候,那些依赖的jar包该怎么解决呢?
最开始我直接按照最简单的上传一个jar包运行,会出现错误。这里错误的原因可能有俩处,一是本身jar包缺失,导致无法产生结果,另外可能的原因就是集群的运行命令有错。
接下来我先排除第二个原因,因为是第一次接触,所以可能有些集群运行的指令不正确,所以我直接到Spark官网,查看了运行的指令,因为Spark本身就提供了一些案例,比如像经典的WordCount
以及计算PI
。所以我按照这个来运行,结果是可以的,这就排除了我命令上的错误。
那么问题应该是出在依赖上,实际上当时也查看了很多的log,但是时间过去有点久,忘记了,而且实话说对于新手的我来说,当时查看日志是真的头大,根本找不到错误的位置。所以我就想最简单的办法应该就是将集群环境配置成本地环境,然后直接运行不就可以了?
这应该是最粗暴的办法,但应该是行的通的。然后我就直接将需要的jar包传到了远程集群上,然后将SVN
上的代码直接co
到服务器端,最后在服务器端直接使用maven
编译,命令如下:
mvn clean package -DskipTests
编译以后就可以在geospark/target/
目录下看到如下jar包:
也就是后面我们需要通过spark submit
来提交运行的
测试
我主要测试的是Geospark
其中的一个案例,因为其中涉及的比较多,所以单独写了一个GeoSparkAPP
来进行测试,代码如下:
package gtl.geospark.app;
import gtl.geospark.enums.FileDataSplitter;
import gtl.geospark.enums.IndexType;
import gtl.geospark.spatialOperator.RangeQuery;
import gtl.geospark.spatialRDD.PointRDD;
import gtl.geospark.spatialRDD.PolygonRDD;
import gtl.jts.geom.Envelope;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.storage.StorageLevel;
import java.io.Serializable;
public class GeoSparkApp
implements Serializable
{
private static final long serialVersionUID=1L;
private static JavaSparkContext sparkContext;
private static String pointRDDInputLocation;
private static FileDataSplitter pointRDDSplitter;
private static IndexType pointRDDIndexType;
private static Integer pointRDDOffset;
private static Integer eachQueryLoopTimes;
private static Envelope rangeQueryWindow;
private static PointRDD objectRDD;
private static PolygonRDD polObjectRDD;
private static String polygonRDDInputLocation;
private static FileDataSplitter polygonRDDSplitter;
private static IndexType polygonRDDIndexType;
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
// .master("local[*]")
.appName("GeoSparkApp")
.getOrCreate();
sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
// pointRDDInputLocation = "/home/hadoop/Documents/data/spatialhadoop/arealm-small/arealm-small.csv";
pointRDDSplitter = FileDataSplitter.CSV;
pointRDDIndexType = IndexType.RTREE;
pointRDDOffset = 0;
// polygonRDDInputLocation = "/home/hadoop/Documents/data/spatialhadoop/counties/county_small.tsv";
polygonRDDInputLocation = "hdfs://geosciences/data/spatialhadoop/counties/county_small.tsv";
polygonRDDSplitter = FileDataSplitter.TSV;
polygonRDDIndexType = IndexType.RTREE;
eachQueryLoopTimes = 10;
rangeQueryWindow = new Envelope(0, 0, 10, 10);
try {
testSpatialRangeQuery();
testSpatialRangeQueryUsingIndex();
testPolygonRangeQuery();
testPolygonRangeQueryUsingIndex();
} catch (Exception e) {
e.printStackTrace();
System.out.println("异常");
}
sparkContext.stop();
System.out.println("通过");
}
private static void testSpatialRangeQuery()
throws Exception {
objectRDD = new PointRDD(sparkContext, pointRDDInputLocation, pointRDDOffset, pointRDDSplitter, true, StorageLevel.MEMORY_ONLY());
objectRDD.rawSpatialRDD.persist(StorageLevel.MEMORY_ONLY());
for (int i = 0; i < eachQueryLoopTimes; i++) {
long resultSize = RangeQuery.SpatialRangeQuery(objectRDD, rangeQueryWindow, false, false).count();
assert resultSize > -1;
}
}
private static void testSpatialRangeQueryUsingIndex()
throws Exception {
objectRDD = new PointRDD(sparkContext, pointRDDInputLocation, pointRDDOffset, pointRDDSplitter, true, StorageLevel.MEMORY_ONLY());
objectRDD.buildIndex(pointRDDIndexType, false);
objectRDD.indexedRawRDD.persist(StorageLevel.MEMORY_ONLY());
for (int i = 0; i < eachQueryLoopTimes; i++) {
long resultSize = RangeQuery.SpatialRangeQuery(objectRDD, rangeQueryWindow, false, true).count();
assert resultSize > -1;
}
}
private static void testPolygonRangeQuery()
throws Exception {
polObjectRDD = new PolygonRDD(sparkContext, polygonRDDInputLocation, polygonRDDSplitter, true);
for (int i = 0; i < eachQueryLoopTimes; i++) {
long resultSize = RangeQuery.SpatialRangeQuery(polObjectRDD, rangeQueryWindow, false, false).count();
assert resultSize > -1;
}
}
private static void testPolygonRangeQueryUsingIndex()
throws Exception {
polObjectRDD = new PolygonRDD(sparkContext, polygonRDDInputLocation, polygonRDDSplitter, true);
polObjectRDD.buildIndex(polygonRDDIndexType, false);
for (int i = 0; i < eachQueryLoopTimes; i++) {
long resultSize = RangeQuery.SpatialRangeQuery(polObjectRDD, rangeQueryWindow, false, true).count();
assert resultSize > -1;
}
}
}
这里测试的功能主要是四个,分别是没有索引的空间三角形查询、使用R树索引的空间三角形查询、Polygon
三角形查询、使用R树索引的空间三角形查询。
其中注释掉的是最开始在本地的测试代码。
然后使用提交命令编译:
spark-submit --class gtl.geospark.app.GeoSparkApp --master yarn --deploy-mode cluster --driver-memory 4g --executor-memory 2g geospark-1.0-SNAPSHOT.jar hdfs://geosciences/data/spatialhadoop/counties/county_small.tsv 10
运行以后并没有看到能够读取HDFS上的数据,然后直接产生异常退出。
后面我又看了一些博客,说是可以在代码中将要传入的文件按照args
参数传入,将
polygonRDDInputLocation = "hdfs://geosciences/data/spatialhadoop/counties/county_small.tsv";
修改为
polygonRDDInputLocation = args[0];
然后再使用以上命令运行,成功运行: