Apache Kafka

分布式事件流平台 - 高吞吐发布订阅、持久化日志、实时数据管道

TL;DR

是什么:分布式事件流平台,用于高吞吐量数据管道。

为什么用:极致可扩展性、持久性、实时处理、精确一次语义。

Quick Start

使用 Docker 安装

# 使用 KRaft 启动 Kafka(无需 Zookeeper)
docker run -d --name kafka \
  -p 9092:9092 \
  -e KAFKA_CFG_NODE_ID=1 \
  -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
  -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
  -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  bitnami/kafka:latest

安装 CLI 工具

# macOS
brew install kafka

# 或从 https://kafka.apache.org/downloads 下载

创建主题并测试

# 创建主题
kafka-topics.sh --create --topic test --bootstrap-server localhost:9092

# 生产消息
kafka-console-producer.sh --topic test --bootstrap-server localhost:9092

# 消费消息(新终端)
kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

Cheatsheet

命令描述
kafka-topics.sh --list列出主题
kafka-topics.sh --create创建主题
kafka-topics.sh --describe描述主题
kafka-console-producer.sh生产消息
kafka-console-consumer.sh消费消息
kafka-consumer-groups.sh管理消费者组

Gotchas

Node.js 生产者

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
})

const producer = kafka.producer()

async function produce() {
  await producer.connect()
  await producer.send({
    topic: 'test',
    messages: [
      { key: 'key1', value: 'Hello Kafka!' }
    ]
  })
  await producer.disconnect()
}

produce()

Node.js 消费者

const consumer = kafka.consumer({ groupId: 'my-group' })

async function consume() {
  await consumer.connect()
  await consumer.subscribe({ topic: 'test', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        key: message.key?.toString(),
        value: message.value.toString()
      })
    }
  })
}

consume()

Python 生产者/消费者

from kafka import KafkaProducer, KafkaConsumer

# 生产者
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('test', b'Hello Kafka!')
producer.flush()

# 消费者
consumer = KafkaConsumer(
    'test',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    group_id='my-group'
)

for message in consumer:
    print(f"{message.key}: {message.value}")

主题管理

# 创建带分区和副本的主题
kafka-topics.sh --create \
  --topic my-topic \
  --partitions 3 \
  --replication-factor 1 \
  --bootstrap-server localhost:9092

# 描述主题
kafka-topics.sh --describe \
  --topic my-topic \
  --bootstrap-server localhost:9092

# 删除主题
kafka-topics.sh --delete \
  --topic my-topic \
  --bootstrap-server localhost:9092

Next Steps