Spark Solr(3)Submit Spark Task in Java Code

Spark Solr(3)Submit Spark Task in Java Code

The idea is using yarn-client to send the tasks jar and dependencies to yarn cluster.
The pom.xml for this console project will be as follow:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                      http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.sillycat</groupId>
    <artifactId>sillycat-spark-console</artifactId>
    <version>1.0</version>
    <description>Fetch the Events from Kafka</description>
    <name>Spark Streaming System</name>
    <packaging>jar</packaging>

<properties>
    <spark.version>2.2.1</spark.version>
</properties>

<dependencies>
    <!-- spark framework -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-yarn_2.11</artifactId>
        <version>2.2.1</version>
    </dependency>
    <!-- JUNIT -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>2.4.1</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>com.sillycat.sparkjava.SparkJavaApp</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>assemble-all</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
</project>

A lot of codes are hard code in SaprkJavaApp.java, but it is working with my local hadoop configuration.
package com.sillycat.sparkjava;

import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.deploy.yarn.Client;
import org.apache.spark.deploy.yarn.ClientArguments;

public class SparkJavaApp
{

  public static void main( String[] arguments ) throws Exception
  {

    String[] args = new String[]
    {
      "--class",
      "com.sillycat.sparkjava.SparkJavaApp",
      "--jar",
      "/home/ec2-user/users/carl/sillycat-spark-java/sillycat-spark-solr/target/sillycat-spark-solr-1.0-jar-with-dependencies.jar",
      "--arg",
      "com.sillycat.sparkjava.app.SeniorJavaFeedToXMLApp",
    };
    SparkConf sparkConf = new SparkConf();
    String applicationTag = "TestApp-" + new Date().getTime();
    sparkConf.setAppName( "SeniorJavaFeedToXMLApp" );
    sparkConf.set( "spark.yarn.submit.waitAppCompletion", "false" );
    sparkConf.set( "spark.yarn.tags", applicationTag );
    sparkConf.set( "spark.master", "yarn-client" );
    sparkConf.set( "spark.submit.deployMode", "cluster" );
    sparkConf.set( "spark.yarn.jars", "/opt/spark/jars/*.jar" );

    Configuration config = new Configuration();
    config.addResource( new Path( "/opt/hadoop/etc/hadoop/core-site.xml" ) );
    config.addResource( new Path( "/opt/hadoop/etc/hadoop/yarn-site.xml" ) );
    config.addResource( new Path( "/opt/hadoop/etc/hadoop/hdfs-site.xml" ) );
    config.set( "fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() );
    config.set( "fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName() );

    System.out.println( config.get( "yarn.resourcemanager.address" ) );

    System.setProperty( "SPARK_YARN_MODE", "true" );
    System.setProperty( "SPARK_HOME", "/opt/spark" );
    System.setProperty( "HADOOP_CONF_DIR", "/opt/hadoop/etc/hadoop" );

    ClientArguments cArgs = new ClientArguments( args );
    Client client = new Client( cArgs, config, sparkConf );
    client.run();

  }

}

I will do more refactor later to make it a RESTful API.


References:
submit task
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/misc/how-to-submit-spark-job-to-yarn-from-java-code.md
https://stackoverflow.com/questions/44444215/submitting-spark-application-via-yarn-client
http://massapi.com/method/org/apache/hadoop/conf/Configuration.addResource.html
https://stackoverflow.com/questions/17265002/hadoop-no-filesystem-for-scheme-file

猜你喜欢

转载自sillycat.iteye.com/blog/2408977