aws服务从入门到精通| Amazon Kinesis 服务之Firehose操作

kinesis简介(什么是Kinesis)

Amazon Kinesis 可以轻松收集、处理和分析实时视频和数据流

  • 1、使用 Kinesis可以捕获,处理,存储video stream 用来分析和机器学习。
  • 2、使用 Kinesis 构建 自定义应用程序分析数据流,或者使用流行的流处理框架
  • 3、使用 firehose 加载流,处理 流的存储
  • 4,使用 Kinesis Data Analytics 与SQL数据流分析

由于作者英语太烂了,所以附上亚马逊的原文的英文如下:

Amazon Kinesis makes it easy to collect, process, and analyze video and data streams in real time.

  • 1、Use Kinesis Video Streams to capture, process, and store video streams for analytics and machine learning.
  • 2、Use Kinesis Data Streams to build custom applications that analyze data streams using popular stream processing frameworks.
  • 3、Use Kinesis Data Firehose to load data streams into AWS data stores.
  • 4、Use Kinesis Data Analytics to analyze data streams with SQL.

kinesis 操作界面简介:

4408655-7765aeb618fceb1b.png
控制面板

很显然kinesis 只有四大部分组成:

  • 1、Data Streams:
    借助 Amazon Kinesis Data Streams,您可以构建用于处理或分析流数据的自定义应用程序,以满足特定需求。您可以配置数以万计的数据创建器,连续不断地将数据输入 Kinesis 数据流。例如,来自网站点击流、应用程序日志和社交媒体馈送内容的数据。在不到一秒时间里,您的 Amazon Kinesis 应用程序便可以从数据流读取和处理数据。
4408655-faf819b5df1f3a0f.png
图片
  • 2、Data Firehose:
    Amazon Kinesis Data Firehose 是将流数据加载到数据存储和分析工具的最简单方式。Kinesis Data Firehose 是一项完全托管的服务,让您可以轻松地从数十万个来源捕获、转换大量流数据并将其加载到 Amazon S3、Amazon Redshift、Amazon Elasticsearch Service、Kinesis Data Analytics 和 Splunk 中,从而实现近乎实时的分析与见解

  • 3、Data Analytics:
    Amazon Kinesis Data Analytics 是使用 ANSI 标准 SQL 实时处理和分析流数据最简单的方法。借助该产品,您能够从 Amazon Kinesis Data Streams 和 Amazon Kinesis Data Firehose 中读取数据,且可以使用 SQL 构建流处理查询或整个应用程序,从而在收到数据后持续进行筛选、转换和整合。Amazon Kinesis Data Analytics 自动识别标准数据格式、解析数据并推荐架构,这样您便可以使用交互式架构编辑器进行编辑。它提供交互式 SQL 编辑器和流处理模板,这样您就可以在几分钟内构建复杂的流处理应用程序。Amazon Kinesis Data Analytics 会在您的流式处理应用程序中持续运行查询,并将处理结果写入输出目标 (如 Amazon Kinesis Data Streams 和 Amazon Kinesis Data Firehose),从而将数据传输至 Amazon S3、Amazon Redshift 和 Amazon Elasticsearch Service。Amazon Kinesis Data Analytics 可以自动预置、部署和扩展运行您的流式处理应用程序所需的资源

  • 4、Video Streams:
    Amazon Kinesis Video Streams 是一项完全托管的视频提取和存储服务。借助它,您可以为支持机器人、智能城市、工业自动化、安全监控、机器学习 (ML) 等功能的应用程序安全地提取、处理和存储任意规模的视频。Kinesis Video Streams 还可以接收其他类型的时间编码数据,例如音频、RADAR 和 LIDAR 信号。Kinesis Video Streams 为您提供了可安装在您设备上的软件开发工具包,从而可以轻松安全地将视频流式传输到 AWS。Kinesis Video Streams 可以自动预置和弹性扩展从数百万台设备中提取视频流所需的所有基础设施。它还持久地存储视频流并对其进行加密和编制索引,而且提供了易于使用的 API,因此应用程序可以基于标签和时间戳来访问和检索已编制索引的视频片段。Kinesis Video Streams 提供了一个库来将 ML 框架 (例如 Apache MxNet、Tensorflow 和 OpenCV) 与视频流集成以构建机器学习应用程序。Kinesis Video Streams 与 Amazon Rekognition Video 集成,从而使您能够构建用于检测和识别流视频中的人脸的计算机视觉应用程序。

以上就是Kinesis 的四个功能所有的服务,读者可以根据实际场景进行选取对应的服务进行操作。我今天主要讲的操作服务是Firehose

Firehose的界面操作部分:

  • 1 、打开Firehose的操作界面。会出现 存在的delivery stream


    4408655-35500837b54e99ba.png
    image.png

-2、点击 create delivery stream 按钮,出现以下界面:

4408655-ff501e10a175e8a0.png
image.png
  • 3.存储数据之前可以选择是否用lamda函数处理。符合的数据存储。
4408655-0fe38024e93a0804.png
image.png
  • 4.选择存储的工具。s3、redshift、elasticsearch、splunk
4408655-82b88ff8a2c2978e.png
image.png
  • 5、存储文件的格式,缓存多大的时候进行写入,错误日志收集,IAM角色的设置
4408655-99c235eaede63991.png
界面

4408655-f167d1c720dabda5.png
image.png
  • 6、确认信息界面:


    4408655-e84245ed6fb081ce.png
    image.png
  • 7、查看结果


    4408655-38722e9583041195.png
    image.png
4408655-3cf61e530125024e.png
image.png

Firehose代码操作:

  • 1.导入pom.xml文件:
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-kinesis-client</artifactId>
            <version>1.7.5</version>
        </dependency>
  • 2.初始化流操作
 /**
     * 初始化流
     */
    @SuppressWarnings("deprecation")
    public static void initClients() {
        AWSCredentials credentials = null;
        try {
            credentials = new ProfileCredentialsProvider().getCredentials();
        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. "
                            + "Please make sure that your credentials file is at the correct "
                            + "location (~/.aws/credentials), and is in valid format.",
                    e);
        }

        // Firehose client
        firehoseClient = new AmazonKinesisFirehoseClient(credentials);
        firehoseClient.setRegion(RegionUtils.getRegion(FIRE_HOSE_REGION));

    }
  • 3、单个流的写入
    /**
     * 
     * 单个写流
     */
    public static void addFireHose(Record record, String deliveryStreamName) {
        PutRecordRequest putRecordRequest = new PutRecordRequest();
        putRecordRequest.setDeliveryStreamName(deliveryStreamName);
        putRecordRequest.setRecord(record);
        firehoseClient.putRecord(putRecordRequest);
    }
  • 4、批量流的写入
  /**
     * 批量添加数据到流里面
     */
    public static void addBatchFireHose(List<Record> records,
            String deliveryStreamName) {
        try {
            putRecordBatch(records, deliveryStreamName);
        } catch (Exception e) {
            LOGGER.error("写流错误" + e);
        }
    }


    private static PutRecordBatchResult putRecordBatch(List<Record> recordList,
            String deliveryStreamName) {
        PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
        putRecordBatchRequest.setDeliveryStreamName(deliveryStreamName);
        putRecordBatchRequest.setRecords(recordList);
        return firehoseClient.putRecordBatch(putRecordBatchRequest);
    }

  • 4、更新流配置
    /**
     * 更新流配置
     *
     * @throws Exception
     */
    public static void updateDeliveryStream(String deliveryOpenStreamName,
            String s3DestinationUpdateName) throws Exception {
        DeliveryStreamDescription deliveryStreamDescription = describeDeliveryStream(deliveryOpenStreamName);
        LOGGER.info("Updating DeliveryStream Destination: "
                + deliveryOpenStreamName + " with new configuration options");
        // get(0) -> DeliveryStream currently supports only one destination per
        // DeliveryStream
        UpdateDestinationRequest updateDestinationRequest = new UpdateDestinationRequest()
                .withDeliveryStreamName(deliveryOpenStreamName)
                .withCurrentDeliveryStreamVersionId(
                        deliveryStreamDescription.getVersionId())
                .withDestinationId(
                        deliveryStreamDescription.getDestinations().get(0)
                                .getDestinationId());

        S3DestinationUpdate s3DestinationUpdate = new S3DestinationUpdate();
        s3DestinationUpdate.withPrefix(s3DestinationUpdateName);

        updateDestinationRequest.setS3DestinationUpdate(s3DestinationUpdate);

        firehoseClient.updateDestination(updateDestinationRequest);
    }

    /**
     * Method to describe the delivery stream.
     *
     * @param deliveryStreamName
     *            the delivery stream
     * @return the delivery description
     */
    private static DeliveryStreamDescription describeDeliveryStream(
            String deliveryStreamName) {
        DescribeDeliveryStreamRequest describeDeliveryStreamRequest = new DescribeDeliveryStreamRequest();
        describeDeliveryStreamRequest
                .withDeliveryStreamName(deliveryStreamName);
        DescribeDeliveryStreamResult describeDeliveryStreamResponse = firehoseClient
                .describeDeliveryStream(describeDeliveryStreamRequest);
        return describeDeliveryStreamResponse.getDeliveryStreamDescription();
    }

完整 代码:

package com.sdk.wifi.util.aws.firehose;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
import com.amazonaws.services.kinesisfirehose.model.DeliveryStreamDescription;
import com.amazonaws.services.kinesisfirehose.model.DescribeDeliveryStreamRequest;
import com.amazonaws.services.kinesisfirehose.model.DescribeDeliveryStreamResult;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
import com.amazonaws.services.kinesisfirehose.model.PutRecordRequest;
import com.amazonaws.services.kinesisfirehose.model.Record;
import com.amazonaws.services.kinesisfirehose.model.S3DestinationUpdate;
import com.amazonaws.services.kinesisfirehose.model.UpdateDestinationRequest;

public class FireHoseUtil {

    private static final Log LOGGER = LogFactory.getLog(FireHoseUtil.class);

    // DeliveryStream properties
    private static AmazonKinesisFirehoseClient firehoseClient;
    private static final String FIRE_HOSE_REGION = "us-west-2";

    /**
     * 批量添加数据到流里面
     */
    public static void addBatchFireHose(List<Record> records,
            String deliveryStreamName) {
        try {
            putRecordBatch(records, deliveryStreamName);
        } catch (Exception e) {
            LOGGER.error("写流错误" + e);
        }
    }

    /**
     * 
     * 单个写流
     */
    public static void addFireHose(Record record, String deliveryStreamName) {
        PutRecordRequest putRecordRequest = new PutRecordRequest();
        putRecordRequest.setDeliveryStreamName(deliveryStreamName);
        putRecordRequest.setRecord(record);
        firehoseClient.putRecord(putRecordRequest);
    }

    /**
     * 字符串边record
     * 
     * @param data
     * @return
     * @throws UnsupportedEncodingException 
     */
    public static Record createRecord(String data) throws UnsupportedEncodingException {
        return new Record().withData(ByteBuffer.wrap(data.getBytes("UTF-8")));
    }

    /**
     * 初始化流
     */
    public static void initClients() {
        AWSCredentials credentials = null;
        try {
            credentials = new ProfileCredentialsProvider().getCredentials();
        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. "
                            + "Please make sure that your credentials file is at the correct "
                            + "location (~/.aws/credentials), and is in valid format.",
                    e);
        }

        // Firehose client
        firehoseClient = new AmazonKinesisFirehoseClient(credentials);
        firehoseClient.setRegion(RegionUtils.getRegion(FIRE_HOSE_REGION));

    }

    /**
     * 批量写流
     * 
     * @param recordList
     * @return
     */
    private static PutRecordBatchResult putRecordBatch(List<Record> recordList,
            String deliveryStreamName) {
        PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
        putRecordBatchRequest.setDeliveryStreamName(deliveryStreamName);
        putRecordBatchRequest.setRecords(recordList);
        return firehoseClient.putRecordBatch(putRecordBatchRequest);
    }

    /**
     * 更新流配置
     *
     * @throws Exception
     */
    public static void updateDeliveryStream(String deliveryOpenStreamName,
            String s3DestinationUpdateName) throws Exception {
        DeliveryStreamDescription deliveryStreamDescription = describeDeliveryStream(deliveryOpenStreamName);
        LOGGER.info("Updating DeliveryStream Destination: "
                + deliveryOpenStreamName + " with new configuration options");
        // get(0) -> DeliveryStream currently supports only one destination per
        // DeliveryStream
        UpdateDestinationRequest updateDestinationRequest = new UpdateDestinationRequest()
                .withDeliveryStreamName(deliveryOpenStreamName)
                .withCurrentDeliveryStreamVersionId(
                        deliveryStreamDescription.getVersionId())
                .withDestinationId(
                        deliveryStreamDescription.getDestinations().get(0)
                                .getDestinationId());

        S3DestinationUpdate s3DestinationUpdate = new S3DestinationUpdate();
        s3DestinationUpdate.withPrefix(s3DestinationUpdateName);

        updateDestinationRequest.setS3DestinationUpdate(s3DestinationUpdate);

        firehoseClient.updateDestination(updateDestinationRequest);
    }

    /**
     * Method to describe the delivery stream.
     *
     * @param deliveryStreamName
     *            the delivery stream
     * @return the delivery description
     */
    private static DeliveryStreamDescription describeDeliveryStream(
            String deliveryStreamName) {
        DescribeDeliveryStreamRequest describeDeliveryStreamRequest = new DescribeDeliveryStreamRequest();
        describeDeliveryStreamRequest
                .withDeliveryStreamName(deliveryStreamName);
        DescribeDeliveryStreamResult describeDeliveryStreamResponse = firehoseClient
                .describeDeliveryStream(describeDeliveryStreamRequest);
        return describeDeliveryStreamResponse.getDeliveryStreamDescription();
    }

}

猜你喜欢

转载自blog.csdn.net/weixin_34240657/article/details/87232069