消息队列 #
应用场景 #
- 应用解耦 (进程间通信、各微服务间通信)
- 异步处理 (业务解耦,注册发送通知,发送邮件、发送短信)
- 流量削峰 (秒杀,大量并发密集型业务)
- 日志处理 (多日志收集,归并、时序存储、io密集型)
消息队列的模式 #
- 点对点模式
消息生产者发送消息到消息队列中,然后消息消费者从队列中取出并且消费消息,消息被消费后,队列中不在存储。所以消息消费者不可能消费到已经被消费的消息;队列支持存在多个消费者,但是对于一个消息而言,只会 有一个消费者可以消费;如果想发给多个消费者,则需要多次发送该条消息
- 发布/订阅模式(一对多,消费者消费数据之后不会清除消息)
消息生产者将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息,和点对点的方式不同,发布到topic的消息会被所有的订阅者消费;但是数据保留是期限的,默认是7天,因为他不是存储系统;
kafka就是这种模式的;
一种是是消费者去主动去消费(拉取)消息,而不是生产者推送消息给消费者;
另外一种就是生产者主动推送消息给消费者,类似公众号
kafka学习 #
kafka通信说明 #
kafka说明视频 #
Kafka安装 #
-
安装jdk https://blog.csdn.net/zam183/article/details/105551399
-
下载kafka https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.12-3.1.0.tgz
修改kafka配置
#创建目录,在kafka安装包的根目录下
mkdir data
mkdir data/zookeeper
mkdir /data/kafka-logs
#修改zookeeper配置 config/zookeeper.properties文件
# 修改dataDir=/tmp/zookeeper
dataDir=d:/tool/kafka/data/zookeeper
#修改kafka配置 config/server.properties
log.dirs=d:/tool/kafka/data/kafka-logs
启动zookeeper和kafka
#win+r 进入cmd中,
> cd D:\tool\kafka\bin\windows #注意windows要在windows下执行bat文件
#启动zookeeper
zookeeper-server-start.bat ../../config/zookeeper.properties
#启动kafka
kafka-server-start.bat ../../config/server.properties
#启动侦听test主题的消费者
kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
go代码实现 #
下载扩展包 #
go get -u github.com/Shopify/sarama
同步生产者 #
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
address := []string{"127.0.0.1:9092"}
topic := "test"
config := sarama.NewConfig()
//同步模式必须设置为true
config.Producer.Return.Successes = true
// (NoResponse不应答,WaitForLocal 仅Leader应答 WaitForAll等待Leader Follower所有应答)
// 前面视频有讲到,NoResponse在 producer.SendMessage(msg)不会返回分区和偏移量
config.Producer.RequiredAcks = sarama.WaitForAll
//生产者分区,NewManualPartitioner,在ProducerMessage中初始化的Partition加密命名的分区
//NewRandomPartitioner随机生成分区
//NewHashPartitioner,在ProducerMessage中的Key加密处理后命名的分区
config.Producer.Partitioner = sarama.NewManualPartitioner
producer, err := sarama.NewSyncProducer(address, config)
if err != nil {
panic(err)
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: topic,
Partition:0,
Key: sarama.StringEncoder("key"),
Value: sarama.StringEncoder("waitForAll NewManualPartitioner"),
}
paritition, offset, err := producer.SendMessage(msg)
if err != nil {
fmt.Println("Send Message Fail",err.Error())
}
fmt.Printf("Partion = %d, offset = %d\n", paritition, offset)
}
异步生产者 #
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
//异步发送消息
func main() {
address := []string{"127.0.0.1:9092"}
topic := "test"
config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer(address, config)
if err != nil {
panic(err)
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("key"),
Value: sarama.StringEncoder("NewAsyncProducer NewRandomPartitioner"),
}
//消息写入chan中
producer.Input() <- msg
}
//同步发送消息
config := sarama.NewConfig()
config.Producer.Return.Successes = true //是否开启消息发送成功后通知 successes channel
...
select {
case msg := <-producer.Successes():
fmt.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
fmt.Println("Produced message failure: ", err)
default:
fmt.Println("Produced message default",)
}
//Successes的通道缓冲区溢出问题
config.Producer.Return.Successes = true
注释掉下面代码,chan一直写入,未接收
//case msg := <-producer.Successes():
// fmt.Printf("Produced message successes: [%s]\n",msg.Value)
生产者操作也可以通过 sarama.NewClient封装一客户端配置信息,通过NewSyncProducerFromClient,或NewAsyncProducerFromClient代理调用,和上面示例的同步异步相同
address := []string{"127.0.0.1:9092"}
topic := "test"
config := sarama.NewConfig() //实例化个sarama的Config
config.Producer.Return.Successes = true //是否开启消息发送成功后通知 successes channel
config.Producer.Partitioner = sarama.NewRandomPartitioner //随机分区器
client, err := sarama.NewClient(address config) //初始化客户端
defer client.Close()
if err != nil {
panic(err)
}
producer, err := sarama.NewSyncProducerFromClient(client) //代理NewSyncProducer
if err != nil {
panic(err)
}
msg := &sarama.ProducerMessage{
Topic: topic,
Partition:0,
Key: sarama.StringEncoder("key"),
Value: sarama.StringEncoder("waitForAll NewManualPartitioner"),
}
paritition, offset, err := producer.SendMessage(msg)
if err != nil {
fmt.Println("Send Message Fail",err.Error())
}
fmt.Printf("Partion = %d, offset = %d\n", paritition, offset)
普通消费者 #
package main
import (
"fmt"
"github.com/Shopify/sarama"
"sync"
)
func main() {
address := []string{"127.0.0.1:9092"}
topic := "test"
wg := &sync.WaitGroup{}
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
consumer, err := sarama.NewConsumer(address, config)
if err != nil {
panic("消费者连接kafka" + err.Error())
}
partitionList, err := consumer.Partitions(topic)
fmt.Println(partitionList)
if err != nil {
panic(err)
}
for partition := range partitionList {
pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
defer pc.AsyncClose()
if err != nil {
panic("消费者侦听分区和主题" + err.Error())
}
wg.Add(1)
go func(pc sarama.PartitionConsumer, wg *sync.WaitGroup) {
defer wg.Done()
for msg := range pc.Messages() {
fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
}(pc, wg)
wg.Wait()
_ = consumer.Close()
}
}
消费者组 #
//实现了sarama.ConsumerGroupHandler接口
type handler struct {
name string
}
func (h handler)Setup(_ sarama.ConsumerGroupSession) error {
return nil
}
func (h handler)Cleanup(_ sarama.ConsumerGroupSession) error {
return nil
}
func (h handler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {
//消费组接收消息
for msg := range claim.Messages() {
fmt.Printf("%s Message topic:%q partition:%d offset:%d value:%s\n",
h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
sess.MarkMessage(msg, "success")
}
return nil
}
//创建一个消费者组,并接收消费
func NewGroup(wg *sync.WaitGroup,addr,topics []string,gname string, config *sarama.Config) {
g,err := sarama.NewConsumerGroup(addr,gname,config)
if err != nil {
panic(err)
}
h := handler{name:gname}
err = g.Consume(context.Background(),topics,h)
if err != nil {
fmt.Println(err)
}
defer func() {
_ = g.Close()
wg.Done()
}()
}
//主函数
func main() {
address := []string{"127.0.0.1:9092"}
topic := "test"
config := sarama.NewConfig()
wg := &sync.WaitGroup{}
wg.Add(3)
go NewGroup(wg,address,[]string{topic},"g1",config)
go NewGroup(wg,address,[]string{topic},"g2",config)
go NewGroup(wg,address,[]string{topic},"g3",config)
wg.Wait()
}