mac m1 中go使用kafka
当前docker-comose:
version: '3'
services:
Etcd:
container_name: etcd3
image: bitnami/etcd:${ETCD_VERSION}
deploy:
replicas: 1
restart_policy:
condition: on-failure
environment:
- ALLOW_NONE_AUTHENTICATION=yes
privileged: true
volumes:
- ${ETCD_DIR}/data:/bitnami/etcd/data
ports:
- ${ETCD_PORT}:2379
- 2380:2380
networks:
- dandan_net
redis:
container_name: redis
image: redis:7.0
ports:
- 6379:6379
environment:
# 时区上海 - Time zone Shanghai (Change if needed)
TZ: Asia/Shanghai
volumes:
# 数据文件 - data files
- ./data/redis/data:/data:rw
command: "redis-server --requirepass G62m50oigInC30sf --appendonly yes"
privileged: true
restart: always
networks:
- dandan_net
#zookeeper是kafka的依赖 - Zookeeper is the dependencies of Kafka
zookeeper:
container_name: zookeeper
image: zookeeper:3.8.1
environment:
# 时区上海 - Time zone Shanghai (Change if needed)
TZ: Asia/Shanghai
restart: always
ports:
- 2181:2181
networks:
- dandan_net
#消息队列 - Message queue
kafka:
container_name: kafka
image: bitnami/kafka:3.4.0
ports:
- 9092:9092
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
- TZ=Asia/Shanghai
restart: always
volumes:
- /var/run/docker.sock:/var/run/docker.sock
networks:
- dandan_net
depends_on:
- zookeeper
networks:
dandan_net:
driver: bridge
ipam:
config:
- subnet: 172.20.0.0/16运行后,进入容器
docker exec -it 容器id /bin/bash
进入kafka目录
cd /opt/bitnami/kafka/
创建topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4 --topic dandan-log
go示例
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "normal_log"
msg.Value = sarama.StringEncoder("this is a test log")
// 连接kafka
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}运行会报错,找不到hosts,需要将容器id添加到etc/hosts中
sudo vim /etc/hosts
然后增加一行127.0.0.1 容器id ,然后保存
#在kafka容器里,bin目录下执行 #查看是否有未被消费的消息 kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 主题名称 --from-beginning --max-messages 1 #删除topic kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic 主题名称
