Go操作kafka----kafka安装启动以及使用

一.安装kafka
1.kafka运行需要依赖java的jdk,先安装好jdk。
地址:http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
安装完成后需要添加以下的环境变量(右键点击“我的电脑” -> “高级系统设置” -> “环境变量” ):

JAVA_HOME: C:\Program Files\Java\jdk1.8.0_171 (jdk的安装路径)
Path: 在现有的值后面添加"; %JAVA_HOME%\bin"

2.安装kafka
下载地址:http://kafka.apache.org/downloads.html
下载完成后解压到自定义路径下。
进入kafka文件夹的configs中,找到zookeeper,将dataDir数据保存路径改为你自己随意的路径。
在这里插入图片描述
运行kafka之前需要先运行zookeeper,kafka有自带的zookeeper,启动zookee即可。
在这里插入图片描述
打开cmd窗口,进入你自己的kafka目录,执行“bin\windows\zookeeper-server-start.bat config\zookeeper.properties”命令,便可以启动zookeeper。
在这里插入图片描述
如图启动成功。
注意:启动成功后不要关闭。

3.启动kafka
同样进入kafka的文件夹中,进入config中,找到server,将dataDir路径修改,一般改成与zookeeper中相同即可。
在这里插入图片描述
重新打开一个cmd,切换为kafka的路径中,执行“bin\windows\kafka-server-start.bat config\server.properties”命令,如下图启动成功。
在这里插入图片描述
4.tai读日志
介绍下tail,tail是一个读日志的第三方库。
安装 tail ,“go get github.com/hpcloud/tail”

package main

import (
	"fmt"
	"time"

	"github.com/hpcloud/tail"
)

//tail的用法用例
func main() {
	fileName := "./my.log"
	config := tail.Config{
		ReOpen:    true,                                 //重新打开
		Follow:    true,                                 //是否跟随
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件哪个地方开始读
		MustExist: false,                                //文件不存在不报错
		Poll:      true,
	}
	tails, err := tail.TailFile(fileName, config)
	if err != nil {
		fmt.Println("tail file failed,err:", err)
		return
	}
	var (
		line *tail.Line
		ok   bool
	)
	for {
		line, ok = <-tails.Lines
		if !ok {
			fmt.Printf("tail file close reopen,filename:%s\n", tails.Filename)
			time.Sleep(time.Second)
			continue
		}
		fmt.Println("line:", line.Text)
	}
}

运行结果
在这里插入图片描述
5.向kafka中写日志
需要使用sarama库
在这里插入图片描述
这里需要指定下载某个版本的第三方库:
进入teimal中,依次执行:“SET GO111MOUDLE=on”,“SET GOPROXY=https://goproxy.cn”,“go mod init”
执行完成后,会出现一个mod文件,在mod中写入
在这里插入图片描述
然后“go mod download”,就可以下载完成了。
执行代码

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
)
func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

	// 构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "web_log"
	msg.Value = sarama.StringEncoder("this is a test log")
	// 连接kafka
	client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return
	}
	fmt.Println("连接kafka成功")
	defer client.Close()
	// 发送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed, err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

运行代码,执行成功后
在这里插入图片描述
temp文件下会多了一个分区
在这里插入图片描述

发布了38 篇原创文章 · 获赞 25 · 访问量 9723

猜你喜欢

转载自blog.csdn.net/weixin_44517681/article/details/104373518