mac m1 中go使用kafka

dandan2年前程序开发2585

当前docker-comose:

version: '3'
services:
  Etcd:
    container_name: etcd3
    image: bitnami/etcd:${ETCD_VERSION}
    deploy:
      replicas: 1
      restart_policy:
        condition: on-failure
    environment:
      - ALLOW_NONE_AUTHENTICATION=yes
    privileged: true
    volumes:
      - ${ETCD_DIR}/data:/bitnami/etcd/data
    ports:
      - ${ETCD_PORT}:2379
      - 2380:2380
    networks:
      - dandan_net
  redis:
    container_name: redis
    image: redis:7.0
    ports:
      - 6379:6379
    environment:
      # 时区上海 - Time zone Shanghai (Change if needed)
      TZ: Asia/Shanghai
    volumes:
      # 数据文件 - data files
      - ./data/redis/data:/data:rw
    command: "redis-server --requirepass G62m50oigInC30sf  --appendonly yes"
    privileged: true
    restart: always
    networks:
      - dandan_net
  #zookeeper是kafka的依赖 - Zookeeper is the dependencies of Kafka
  zookeeper:
    container_name: zookeeper
    image: zookeeper:3.8.1
    environment:
      # 时区上海 - Time zone Shanghai (Change if needed)
      TZ: Asia/Shanghai
    restart: always
    ports:
      - 2181:2181
    networks:
      - dandan_net

  #消息队列 - Message queue
  kafka:
    container_name: kafka
    image: bitnami/kafka:3.4.0
    ports:
      - 9092:9092
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
      - TZ=Asia/Shanghai
    restart: always
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    networks:
      - dandan_net
    depends_on:
    - zookeeper
networks:
  dandan_net:
    driver: bridge
    ipam:
      config:
        - subnet: 172.20.0.0/16

运行后,进入容器

 docker exec -it 容器id /bin/bash

进入kafka目录

cd /opt/bitnami/kafka/

创建topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4 --topic dandan-log

go示例

package main

import (
   "fmt"
   "github.com/Shopify/sarama"
)

func main() {
   config := sarama.NewConfig()
   config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
   config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
   config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

   // 构造一个消息
   msg := &sarama.ProducerMessage{}
   msg.Topic = "normal_log"
   msg.Value = sarama.StringEncoder("this is a test log")
   // 连接kafka
   client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
   if err != nil {
      fmt.Println("producer closed, err:", err)
      return
   }
   defer client.Close()
   // 发送消息
   pid, offset, err := client.SendMessage(msg)
   if err != nil {
      fmt.Println("send msg failed, err:", err)
      return
   }
   fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

运行会报错,找不到hosts,需要将容器id添加到etc/hosts中

sudo vim /etc/hosts

然后增加一行127.0.0.1 容器id  ,然后保存


#在kafka容器里,bin目录下执行

#查看是否有未被消费的消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 主题名称 --from-beginning --max-messages 1

#删除topic
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic 主题名称


标签: gokafka

相关文章

通过frp进行内网穿透

因为ngrok会定义刷新域名,有点限制,如果没有服务器域名,用他合适,如果有自己的服务器、域名,则用frp:按照https://blog.csdn.net/mirage003/article/deta...

go-zero中使用jaeger链路追踪

go-zero中使用jaeger链路追踪

链路追踪分两块:(1)框架内置的,通过配置实现,一般都在api、rpc等中间件中,粗浅理解是请求级;(2)自定义的,可以定义到函数里,粗浅理解可以自己写代码级的,也就是可以跟踪每个函数方法的执行时间;...

go Imagick图片处理

使用前先需要安装ImageMagicMac环境brew install imagemagick #安装依赖(不确定装完上面会不会自动装) #尝试的时候发现依赖都装不上,把提示的...

wsl2安装docker+dify+xinference

1、wsl2安装ubuntu,这里指定版本安装一个新环境 wsl --install -d Ubuntu-22.042、安装完的ubuntu是在c盘的,放到其他...

linux安装go环境

1、在 https://go.dev/dl/  下载linux的包2、上传到服务器,解压,比如放到/opt后,执行下命令,解压到当前目录得到go文件夹tar -xzf&n...

centos后台运行程序

通过远程shell工具连上服务器,运行程序后,如果关闭工具,进程也会被结束。所以采用systemctl方式运行1、创建运行用户adduser newUserName2、编辑service脚本...

评论列表

test
2023-06-16 16:12:11

支持蛋总

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。