mac m1 中go使用kafka
当前docker-comose:
version: '3' services: Etcd: container_name: etcd3 image: bitnami/etcd:${ETCD_VERSION} deploy: replicas: 1 restart_policy: condition: on-failure environment: - ALLOW_NONE_AUTHENTICATION=yes privileged: true volumes: - ${ETCD_DIR}/data:/bitnami/etcd/data ports: - ${ETCD_PORT}:2379 - 2380:2380 networks: - dandan_net redis: container_name: redis image: redis:7.0 ports: - 6379:6379 environment: # 时区上海 - Time zone Shanghai (Change if needed) TZ: Asia/Shanghai volumes: # 数据文件 - data files - ./data/redis/data:/data:rw command: "redis-server --requirepass G62m50oigInC30sf --appendonly yes" privileged: true restart: always networks: - dandan_net #zookeeper是kafka的依赖 - Zookeeper is the dependencies of Kafka zookeeper: container_name: zookeeper image: zookeeper:3.8.1 environment: # 时区上海 - Time zone Shanghai (Change if needed) TZ: Asia/Shanghai restart: always ports: - 2181:2181 networks: - dandan_net #消息队列 - Message queue kafka: container_name: kafka image: bitnami/kafka:3.4.0 ports: - 9092:9092 environment: - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_ADVERTISED_HOST_NAME=kafka - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_AUTO_CREATE_TOPICS_ENABLE=false - TZ=Asia/Shanghai restart: always volumes: - /var/run/docker.sock:/var/run/docker.sock networks: - dandan_net depends_on: - zookeeper networks: dandan_net: driver: bridge ipam: config: - subnet: 172.20.0.0/16
运行后,进入容器
docker exec -it 容器id /bin/bash
进入kafka目录
cd /opt/bitnami/kafka/
创建topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4 --topic dandan-log
go示例
package main import ( "fmt" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认 config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回 // 构造一个消息 msg := &sarama.ProducerMessage{} msg.Topic = "normal_log" msg.Value = sarama.StringEncoder("this is a test log") // 连接kafka client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { fmt.Println("producer closed, err:", err) return } defer client.Close() // 发送消息 pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send msg failed, err:", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset) }
运行会报错,找不到hosts,需要将容器id添加到etc/hosts中
sudo vim /etc/hosts
然后增加一行127.0.0.1 容器id ,然后保存
#在kafka容器里,bin目录下执行 #查看是否有未被消费的消息 kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 主题名称 --from-beginning --max-messages 1 #删除topic kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic 主题名称