mac m1 中go使用kafka

dandan2年前程序开发2836

当前docker-comose:

version: '3'
services:
  Etcd:
    container_name: etcd3
    image: bitnami/etcd:${ETCD_VERSION}
    deploy:
      replicas: 1
      restart_policy:
        condition: on-failure
    environment:
      - ALLOW_NONE_AUTHENTICATION=yes
    privileged: true
    volumes:
      - ${ETCD_DIR}/data:/bitnami/etcd/data
    ports:
      - ${ETCD_PORT}:2379
      - 2380:2380
    networks:
      - dandan_net
  redis:
    container_name: redis
    image: redis:7.0
    ports:
      - 6379:6379
    environment:
      # 时区上海 - Time zone Shanghai (Change if needed)
      TZ: Asia/Shanghai
    volumes:
      # 数据文件 - data files
      - ./data/redis/data:/data:rw
    command: "redis-server --requirepass G62m50oigInC30sf  --appendonly yes"
    privileged: true
    restart: always
    networks:
      - dandan_net
  #zookeeper是kafka的依赖 - Zookeeper is the dependencies of Kafka
  zookeeper:
    container_name: zookeeper
    image: zookeeper:3.8.1
    environment:
      # 时区上海 - Time zone Shanghai (Change if needed)
      TZ: Asia/Shanghai
    restart: always
    ports:
      - 2181:2181
    networks:
      - dandan_net

  #消息队列 - Message queue
  kafka:
    container_name: kafka
    image: bitnami/kafka:3.4.0
    ports:
      - 9092:9092
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
      - TZ=Asia/Shanghai
    restart: always
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    networks:
      - dandan_net
    depends_on:
    - zookeeper
networks:
  dandan_net:
    driver: bridge
    ipam:
      config:
        - subnet: 172.20.0.0/16

运行后,进入容器

 docker exec -it 容器id /bin/bash

进入kafka目录

cd /opt/bitnami/kafka/

创建topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4 --topic dandan-log

go示例

package main

import (
   "fmt"
   "github.com/Shopify/sarama"
)

func main() {
   config := sarama.NewConfig()
   config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
   config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
   config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

   // 构造一个消息
   msg := &sarama.ProducerMessage{}
   msg.Topic = "normal_log"
   msg.Value = sarama.StringEncoder("this is a test log")
   // 连接kafka
   client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
   if err != nil {
      fmt.Println("producer closed, err:", err)
      return
   }
   defer client.Close()
   // 发送消息
   pid, offset, err := client.SendMessage(msg)
   if err != nil {
      fmt.Println("send msg failed, err:", err)
      return
   }
   fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

运行会报错,找不到hosts,需要将容器id添加到etc/hosts中

sudo vim /etc/hosts

然后增加一行127.0.0.1 容器id  ,然后保存


#在kafka容器里,bin目录下执行

#查看是否有未被消费的消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 主题名称 --from-beginning --max-messages 1

#删除topic
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic 主题名称


标签: gokafka

相关文章

go-zero学习

goland创建新项目,需要在设置->go->go模块里启用go模块集成,不然下载的包无法正常引入根据api文件内容生成文件 goctl api go&nbs...

go-zero中使用jaeger链路追踪

go-zero中使用jaeger链路追踪

链路追踪分两块:(1)框架内置的,通过配置实现,一般都在api、rpc等中间件中,粗浅理解是请求级;(2)自定义的,可以定义到函数里,粗浅理解可以自己写代码级的,也就是可以跟踪每个函数方法的执行时间;...

mac m1 pro 解决微信公众号本地调试问题(未成功)

最终未成功,买natapp先凑合用了,临时记录下nginx安装1、微信公众号后台,公众号设置,功能设置,网页授权域名,添加好该域名。(需要上传文件到根目录)2、natapp上购买vip隧道,9元/月;...

Docker使用篇

镜像:image容器:container运行镜像(如果本地有则会运行本地,本地没有会去下载镜像仓库)docker run 镜像运行镜像内系统的命令窗口(如果要退出,输入exit)d...

宝塔ftp连不上

1、先检查端口是否有开放,涉及宝塔端口 和 云服务器的安全组;2、对Pure-ftpd的配置文件中,大概180来行的ForcePassiveIP  开放,并且把ip改成服务器的外网ip;...

宝塔nginx配置允许各种文件的下载

location ~ .*\.(gif|jpg|jpeg|png|bmp|swf|pdf|doc|docx|xls|xlsx|rar|zip|gz|7z|ppt|pptx|mp3|...

评论列表

test
2023-06-16 16:12:11

支持蛋总

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。