1.直接上代码 输入https 链接即可
import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class HttpsTest { /** * 请求接口 */ public static final String DEFAULT_URL = ""; public static void main(String[] args) throws Exception { //显示head信息 // System.setProperty("javax.net.debug", "all"); StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setParallelism(1); DataStream<String> text = see.addSource(new HttpsTestSource(DEFAULT_URL, 150)); text.print(); text.writeAsText("/root/packages/opfinal.txt", FileSystem.WriteMode.OVERWRITE); see.execute(); } }
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
public class HttpsTestSource extends RichSourceFunction<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpsTestSource.class);
private static final long DEFAULT_SLEEP_MS = 2000;
private final String url;
private final long sleepMs;
private volatile boolean isRunning;
public HttpsTestSource() {
this(HttpsTest.DEFAULT_URL, DEFAULT_SLEEP_MS);
}
public HttpsTestSource(String url, long sleepMs) {
this.isRunning = true;
this.url = url;
this.sleepMs = sleepMs;
}
@Override
public void run(SourceContext ctx) throws Exception {
while (this.isRunning) {
Thread.sleep(sleepMs);
ctx.collect(httpGet(url));
}
}
@Override
public void cancel() {
this.isRunning = false;
}
private static String httpGet(String getUrl) throws Exception {
HttpURLConnection con = null;
BufferedReader in = null;
StringBuilder inputString = new StringBuilder();
try {
URL url = new URL(getUrl);
LOGGER.info("Trying to get now");
con = (HttpURLConnection) url.openConnection();
con.setRequestMethod("GET");
in = new BufferedReader(new InputStreamReader(con.getInputStream()));
String inputLine;
double inputSize = 0;
while ((inputLine = in.readLine()) != null) {
inputString.append(inputLine);
inputSize += inputLine.length();
LOGGER.debug("read inputline - " + inputLine);
}
} catch (Exception e) {
LOGGER.warn("httpget threw: ", e);
} finally {
try {
if (in != null) {
in.close();
}
if (con != null) {
con.disconnect();
}
} catch (Exception e) {
LOGGER.warn("httpget finally block threw: ", e);
}
}
return inputString.toString();
}
}
POM 仅供参考,个人项目引入的其他依赖比较多,自行删除
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.han</groupId>
<artifactId>flink</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.4.1</flink.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<!--
Execute "mvn clean package -Pbuild-jar"
to build a jar file out of this project!
How to use the Flink Quickstart pom:
a) Adding new dependencies:
You can add dependencies to the list below.
b) Build a jar for running on the cluster:
"mvn clean package -Pbuild-jar"
This will create a fat-jar which contains all dependencies necessary for running the created jar in a cluster.
-->
<dependencies>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<!-- This dependency is required to actually execute jobs. It is currently pulled in by
flink-streaming-java, but we explicitly depend on it to safeguard against future changes. -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- explicitly add a standard logging framework, as Flink does not have
a hard dependency on one specific framework by default -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-connectors</artifactId>
<version>0.9.0-milestone-1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/javax.json/javax.json-api -->
<dependency>
<groupId>javax.json</groupId>
<artifactId>javax.json-api</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
<profiles>
<profile>
<!-- Profile for packaging correct JAR files -->
<id>build-jar</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- We use the maven-shade plugin to create a fat jar that contains all dependencies
except flink and its transitive dependencies. The resulting fat-jar can be executed
on a cluster. Change the value of Program-Class if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<!-- If you want to use ./bin/flink run <quickstart jar> uncomment the following lines.
This will add a Main-Class entry to the manifest file -->
<!--
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>wikiedits.StreamingJob</mainClass>
</transformer>
</transformers>
-->
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
<!--
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<compilerId>jdt</compilerId>
</configuration>
<dependencies>
<dependency>
<groupId>org.eclipse.tycho</groupId>
<artifactId>tycho-compiler-jdt</artifactId>
<version>0.21.0</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<versionRange>[2.4,)</versionRange>
<goals>
<goal>single</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
-->
</build>
</project>
如有问题错误,留言即可~