kafka学习整理

消息队列 #

应用场景 #

  1. 应用解耦 (进程间通信、各微服务间通信)
  2. 异步处理 (业务解耦,注册发送通知,发送邮件、发送短信)
  3. 流量削峰 (秒杀,大量并发密集型业务)
  4. 日志处理 (多日志收集,归并、时序存储、io密集型)

消息队列的模式 #

  • 点对点模式

消息生产者发送消息到消息队列中,然后消息消费者从队列中取出并且消费消息,消息被消费后,队列中不在存储。所以消息消费者不可能消费到已经被消费的消息;队列支持存在多个消费者,但是对于一个消息而言,只会 有一个消费者可以消费;如果想发给多个消费者,则需要多次发送该条消息

  • 发布/订阅模式(一对多,消费者消费数据之后不会清除消息)

消息生产者将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息,和点对点的方式不同,发布到topic的消息会被所有的订阅者消费;但是数据保留是期限的,默认是7天,因为他不是存储系统;

kafka就是这种模式的;

一种是是消费者去主动去消费(拉取)消息,而不是生产者推送消息给消费者;

另外一种就是生产者主动推送消息给消费者,类似公众号

kafka学习 #

kafka通信说明 #

kafka说明视频 #

Kafka安装 #

  1. 安装jdk https://blog.csdn.net/zam183/article/details/105551399

  2. 下载kafka https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.12-3.1.0.tgz

修改kafka配置

#创建目录,在kafka安装包的根目录下
mkdir data
mkdir data/zookeeper
mkdir /data/kafka-logs

#修改zookeeper配置 config/zookeeper.properties文件
# 修改dataDir=/tmp/zookeeper
dataDir=d:/tool/kafka/data/zookeeper

#修改kafka配置 config/server.properties
log.dirs=d:/tool/kafka/data/kafka-logs

启动zookeeper和kafka

#win+r 进入cmd中,
> cd D:\tool\kafka\bin\windows #注意windows要在windows下执行bat文件

#启动zookeeper
zookeeper-server-start.bat ../../config/zookeeper.properties
#启动kafka
kafka-server-start.bat ../../config/server.properties

#启动侦听test主题的消费者
kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning

go代码实现 #

下载扩展包 #

go get -u github.com/Shopify/sarama

同步生产者 #

	package main

import (
	"fmt"
	"github.com/Shopify/sarama"
)

func main() {
	address := []string{"127.0.0.1:9092"}
	topic := "test"

	config := sarama.NewConfig()
    //同步模式必须设置为true
	config.Producer.Return.Successes = true 
    // (NoResponse不应答,WaitForLocal 仅Leader应答 WaitForAll等待Leader Follower所有应答)
    // 前面视频有讲到,NoResponse在 producer.SendMessage(msg)不会返回分区和偏移量
    config.Producer.RequiredAcks = sarama.WaitForAll
    //生产者分区,NewManualPartitioner,在ProducerMessage中初始化的Partition加密命名的分区
    //NewRandomPartitioner随机生成分区
    //NewHashPartitioner,在ProducerMessage中的Key加密处理后命名的分区
	config.Producer.Partitioner = sarama.NewManualPartitioner

	producer, err := sarama.NewSyncProducer(address, config)

	if err != nil {
		panic(err)
	}
	defer producer.Close()

	msg := &sarama.ProducerMessage{
		Topic: topic,
		Partition:0,
		Key:   sarama.StringEncoder("key"),
		Value: sarama.StringEncoder("waitForAll NewManualPartitioner"),
	}

	paritition, offset, err := producer.SendMessage(msg)

	if err != nil {
		fmt.Println("Send Message Fail",err.Error())
	}

	fmt.Printf("Partion = %d, offset = %d\n", paritition, offset)

}

异步生产者 #

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
)
//异步发送消息
func main()  {
	address := []string{"127.0.0.1:9092"}
	topic := "test"

	config := sarama.NewConfig()
	producer, err := sarama.NewAsyncProducer(address, config)

	if err != nil {
		panic(err)
	}
	defer producer.Close()

	msg := &sarama.ProducerMessage{
		Topic: topic,
		Key:   sarama.StringEncoder("key"),
		Value: sarama.StringEncoder("NewAsyncProducer NewRandomPartitioner"),
	}
	//消息写入chan中
    producer.Input() <- msg

}
//同步发送消息
config := sarama.NewConfig()
config.Producer.Return.Successes = true //是否开启消息发送成功后通知 successes channel
...

select {
	case msg := <-producer.Successes():
		fmt.Printf("Produced message successes: [%s]\n",msg.Value)
	case err := <-producer.Errors():
		fmt.Println("Produced message failure: ", err)
	default:
		fmt.Println("Produced message default",)
	}

//Successes的通道缓冲区溢出问题
config.Producer.Return.Successes = true

注释掉下面代码chan一直写入未接收
//case msg := <-producer.Successes():
//		fmt.Printf("Produced message successes: [%s]\n",msg.Value)

生产者操作也可以通过 sarama.NewClient封装一客户端配置信息,通过NewSyncProducerFromClient,或NewAsyncProducerFromClient代理调用,和上面示例的同步异步相同

address := []string{"127.0.0.1:9092"}
topic := "test"

config := sarama.NewConfig()        //实例化个sarama的Config
config.Producer.Return.Successes = true    //是否开启消息发送成功后通知 successes channel
config.Producer.Partitioner = sarama.NewRandomPartitioner  //随机分区器

client, err := sarama.NewClient(address config) //初始化客户端
defer client.Close()
if err != nil {
    panic(err)
}

producer, err := sarama.NewSyncProducerFromClient(client) //代理NewSyncProducer
if err != nil {
    panic(err)
}

	msg := &sarama.ProducerMessage{
		Topic: topic,
		Partition:0,
		Key:   sarama.StringEncoder("key"),
		Value: sarama.StringEncoder("waitForAll NewManualPartitioner"),
	}

	paritition, offset, err := producer.SendMessage(msg)

	if err != nil {
		fmt.Println("Send Message Fail",err.Error())
	}

	fmt.Printf("Partion = %d, offset = %d\n", paritition, offset)

普通消费者 #

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"sync"
)

func main() {
	address := []string{"127.0.0.1:9092"}
	topic := "test"
	wg := &sync.WaitGroup{}

	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true

	consumer, err := sarama.NewConsumer(address, config)
	if err != nil {
		panic("消费者连接kafka" + err.Error())
	}
	partitionList, err := consumer.Partitions(topic)

	fmt.Println(partitionList)
	if err != nil {
		panic(err)
	}

	for partition := range partitionList {
		pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)

		defer pc.AsyncClose()

		if err != nil {
			panic("消费者侦听分区和主题" + err.Error())
		}

		wg.Add(1)
		go func(pc sarama.PartitionConsumer, wg *sync.WaitGroup) {
			defer wg.Done()
			for msg := range pc.Messages() {
				fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
			}
		}(pc, wg)

		wg.Wait()
		_ = consumer.Close()
	}
}

消费者组 #

//实现了sarama.ConsumerGroupHandler接口

type handler struct {
	name string
}

func (h handler)Setup(_ sarama.ConsumerGroupSession) error {
	return nil
}

func (h handler)Cleanup(_ sarama.ConsumerGroupSession) error {
	return nil
}

func (h handler) ConsumeClaim(sess sarama.ConsumerGroupSession,
	claim sarama.ConsumerGroupClaim) error {
    //消费组接收消息
	for msg := range claim.Messages() {
		fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n",
			h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
		sess.MarkMessage(msg, "success")
	}
	return nil
}

//创建一个消费者组,并接收消费
func NewGroup(wg *sync.WaitGroup,addr,topics []string,gname string, config *sarama.Config)  {
	g,err := sarama.NewConsumerGroup(addr,gname,config)
	if err != nil {
		panic(err)
	}
	h := handler{name:gname}

	err = g.Consume(context.Background(),topics,h)
	if err != nil {
		fmt.Println(err)
	}

	defer func() {
		_ = g.Close()
		wg.Done()
	}()
}

//主函数

func main()  {
	address := []string{"127.0.0.1:9092"}
	topic := "test"

	config := sarama.NewConfig()
	wg := &sync.WaitGroup{}
	wg.Add(3)
	go NewGroup(wg,address,[]string{topic},"g1",config)
	go NewGroup(wg,address,[]string{topic},"g2",config)
	go NewGroup(wg,address,[]string{topic},"g3",config)
	wg.Wait()
}