mac m1 中go使用kafka

dandan2年前程序开发2325

当前docker-comose:

version: '3'
services:
  Etcd:
    container_name: etcd3
    image: bitnami/etcd:${ETCD_VERSION}
    deploy:
      replicas: 1
      restart_policy:
        condition: on-failure
    environment:
      - ALLOW_NONE_AUTHENTICATION=yes
    privileged: true
    volumes:
      - ${ETCD_DIR}/data:/bitnami/etcd/data
    ports:
      - ${ETCD_PORT}:2379
      - 2380:2380
    networks:
      - dandan_net
  redis:
    container_name: redis
    image: redis:7.0
    ports:
      - 6379:6379
    environment:
      # 时区上海 - Time zone Shanghai (Change if needed)
      TZ: Asia/Shanghai
    volumes:
      # 数据文件 - data files
      - ./data/redis/data:/data:rw
    command: "redis-server --requirepass G62m50oigInC30sf  --appendonly yes"
    privileged: true
    restart: always
    networks:
      - dandan_net
  #zookeeper是kafka的依赖 - Zookeeper is the dependencies of Kafka
  zookeeper:
    container_name: zookeeper
    image: zookeeper:3.8.1
    environment:
      # 时区上海 - Time zone Shanghai (Change if needed)
      TZ: Asia/Shanghai
    restart: always
    ports:
      - 2181:2181
    networks:
      - dandan_net

  #消息队列 - Message queue
  kafka:
    container_name: kafka
    image: bitnami/kafka:3.4.0
    ports:
      - 9092:9092
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
      - TZ=Asia/Shanghai
    restart: always
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    networks:
      - dandan_net
    depends_on:
    - zookeeper
networks:
  dandan_net:
    driver: bridge
    ipam:
      config:
        - subnet: 172.20.0.0/16

运行后,进入容器

 docker exec -it 容器id /bin/bash

进入kafka目录

cd /opt/bitnami/kafka/

创建topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4 --topic dandan-log

go示例

package main

import (
   "fmt"
   "github.com/Shopify/sarama"
)

func main() {
   config := sarama.NewConfig()
   config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
   config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
   config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

   // 构造一个消息
   msg := &sarama.ProducerMessage{}
   msg.Topic = "normal_log"
   msg.Value = sarama.StringEncoder("this is a test log")
   // 连接kafka
   client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
   if err != nil {
      fmt.Println("producer closed, err:", err)
      return
   }
   defer client.Close()
   // 发送消息
   pid, offset, err := client.SendMessage(msg)
   if err != nil {
      fmt.Println("send msg failed, err:", err)
      return
   }
   fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

运行会报错,找不到hosts,需要将容器id添加到etc/hosts中

sudo vim /etc/hosts

然后增加一行127.0.0.1 容器id  ,然后保存


#在kafka容器里,bin目录下执行

#查看是否有未被消费的消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 主题名称 --from-beginning --max-messages 1

#删除topic
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic 主题名称


标签: gokafka

相关文章

git操作

将本地现有项目放到远端上:添加远程仓库: git remote add [远端名称] [远端地址], 名称一般默认都是origin,地址就是项目地址git remote add&n...

mac m1使用docker mysql踩坑

1、etcd版本用3.4.242、mysql镜像要用:mysql/mysql-server:8.0.323、mysql因为是8的版本,启动后本地没权限连接,需要进容器创建用户和添加权限:# ...

Docker使用篇

镜像:image容器:container运行镜像(如果本地有则会运行本地,本地没有会去下载镜像仓库)docker run 镜像运行镜像内系统的命令窗口(如果要退出,输入exit)d...

mysql5.6导入mysql8的坑

1、虽然在mysql8里设置数据库的排序规则是utf8mb4_general_ci ,但是导入后,却都被变成了utf8mb4_0900_ai_ci解决:用navicae导出的,表语句中没有COLLAT...

win11 RTX4070Ti 部署langchain-chatchat

1、下载py的环境管理工具:Anaconda (等同于node环境的nvm工具)2、创建一个专属环境conda create -n langchain pyth...

camunda使用

1、 拉取镜像docker pull camunda/camunda-bpm-platform:7.17.02、配置并启动docker run -d ...

评论列表

test
2023-06-16 16:12:11

支持蛋总

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。