Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Kafka

This directory provides an end-to-end Kafka validation case to verify the unified Kafka Source/Sink connectors work as expected.

  • Producer: wpgen writes sample data to Kafka (input topic, default wp.testcase.events.raw)
  • Engine: wparse reads from Kafka input, parses and routes to multiple sinks (including Kafka output topic, default wp.testcase.events.parsed, and a file sink)
  • Optional verification: Use wpkit kafka consume to verify messages on the Kafka output topic

Data Flow

The diagram below shows the data flow and key components (wp.testcase.events.raw/wp.testcase.events.parsed).

flowchart LR
    subgraph Producer
      WPGEN[wpgen sample]\n(按 wpgen.toml 写 Kafka)
    end
    subgraph Kafka
      KAFKA_IN[(KAFKA_INPUT_TOPIC)]
      KAFKA_OUT[(KAFKA_OUTPUT_TOPIC)]
    end
    subgraph Engine
      WPARSE[wparse batch\n(-n 限制条数自动退出)]
      SINKS{{Sink Group\n(models/sink)}}
      OML[OML 映射/脱敏]
    end
    subgraph Verifier
      FILE[file sink: events.parsed.prototext]
      CONSUME[wpkit kafka consume\n(可选验证)]
    end

    WPGEN -- produce --> KAFKA_IN
    WPARSE -- consume --> KAFKA_IN
    WPARSE -- route --> OML --> SINKS
    SINKS -- write --> FILE
    SINKS -- produce --> KAFKA_OUT
    CONSUME -- verify --> KAFKA_OUT

If Mermaid is not supported, refer to the ASCII version:

wpgen(sample) --> Kafka(KAFKA_INPUT_TOPIC) --> wparse(batch) --> [OML/route] --> sinks{file,kafka}
    sinks --> file: data/out_dat/events.parsed.prototext
    sinks --> Kafka(KAFKA_OUTPUT_TOPIC) --> (optional) wpkit kafka consume verification

Directory Structure

  • conf/
    • wparse.toml: Engine main config (directories/concurrency/logging, etc.)
    • wpgen.toml: Data generator config (points to Kafka sink, overrides input topic)
  • topology/source/wpsrc.toml: Source routing (contains two [[sources]]: kafka_input subscribes to input topic; kafka_output_tap subscribes to output topic for self-testing/demo, can be disabled as needed)
  • topology/sink/business.d/example.toml: Business sink routing (contains a file sink and a Kafka sink)
  • models/oml/...: OML models (result field mapping/masking)
  • case_verify.sh: One-click verification script (starts wparsewpgen sends → verification)

Note: Source and Sink connector IDs reference definitions in the repository root connectors/ directory:

  • connectors/source.d/30-kafka.toml: id=kafka_src (allows overriding topic/group_id/config)
  • connectors/sink.d/30-kafka.toml: id=kafka_sink (allows overriding topic/config/num_partitions/replication/brokers/fmt)

Prerequisites

  • Kafka running locally, default address localhost:9092 (or override via environment variables, see below)

Quick Start

Enter the case directory and run the script (default debug):

cd extensions/wp-connectors/testcase
./case_verify.sh            # or ./case_verify.sh release

Main script steps:

  1. Clean run directory (preserving conf/ templates), build binaries to target/<profile>, add to PATH
  2. wpkit conf check for config self-check; clean data directory
  3. Start wparse in background (-n limits processing count, auto-exits on completion)
  4. Run wpgen sample to generate sample data and write to Kafka input topic
  5. Wait for wparse to exit and perform file sink verification (optional)

Parameters

The script supports the following optional environment variables:

  • PROFILE: Build and run profile (debug|release), default debug
  • LINE_CNT: Number of sample records to generate/process, default 3000
  • STAT_SEC: Statistics print interval (seconds), default 3
  • KAFKA_BOOTSTRAP_SERVERS: Kafka address, default localhost:9092
  • KAFKA_INPUT_TOPIC: Input topic (wpgen writes, wparse consumes), default wp.testcase.events.raw
  • KAFKA_OUTPUT_TOPIC: Output topic (wparse Kafka sink writes), default wp.testcase.events.parsed

Example:

KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 KAFKA_INPUT_TOPIC=my_in KAFKA_OUTPUT_TOPIC=my_out ./case_verify.sh

Result Verification

  • File Sink: The script runs wpkit stat file and wpkit validate sink-file -v; output is available in data/out_dat/ as events.parsed.prototext (per models/sink/business.d/example.toml file sink config)
  • Kafka Output: Optionally run the following command to view output topic (recommend using a fresh group to avoid messages being consumed by other consumers)
wpkit kafka consume --brokers ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092} \
  --group wpkit-consume-$$ \
  --topic "${KAFKA_OUTPUT_TOPIC:-wp.testcase.events.parsed}"

Kafka (中文)

本目录提供一套基于 Kafka 的端到端校验用例,验证统一 Kafka Source/Sink 连接器是否按预期工作。

  • 发送端:wpgen 将样例数据写入 Kafka(输入 topic,默认 wp.testcase.events.raw
  • 引擎端:wparse 读取 Kafka 输入,解析并路由到多个 Sink(其中包含 Kafka 输出 topic,默认 wp.testcase.events.parsed,以及一个文件型 Sink)
  • 可选校验:使用 wpkit kafka consume 验证 Kafka 输出 topic 的消息

数据流图

下图展示 testcase 的数据流与关键环节(wp.testcase.events.raw/wp.testcase.events.parsed)。

flowchart LR
    subgraph Producer
      WPGEN[wpgen sample]\n(按 wpgen.toml 写 Kafka)
    end
    subgraph Kafka
      KAFKA_IN[(KAFKA_INPUT_TOPIC)]
      KAFKA_OUT[(KAFKA_OUTPUT_TOPIC)]
    end
    subgraph Engine
      WPARSE[wparse batch\n(-n 限制条数自动退出)]
      SINKS{{Sink Group\n(models/sink)}}
      OML[OML 映射/脱敏]
    end
    subgraph Verifier
      FILE[file sink: events.parsed.prototext]
      CONSUME[wpkit kafka consume\n(可选验证)]
    end

    WPGEN -- produce --> KAFKA_IN
    WPARSE -- consume --> KAFKA_IN
    WPARSE -- route --> OML --> SINKS
    SINKS -- write --> FILE
    SINKS -- produce --> KAFKA_OUT
    CONSUME -- verify --> KAFKA_OUT

如渲染不支持 Mermaid,可参考 ASCII 版:

wpgen(sample) --> Kafka(KAFKA_INPUT_TOPIC) --> wparse(batch) --> [OML/route] --> sinks{file,kafka}
    sinks --> file: data/out_dat/events.parsed.prototext
    sinks --> Kafka(KAFKA_OUTPUT_TOPIC) --> (optional) wpkit kafka consume 验证

目录结构

  • conf/
    • wparse.toml:引擎主配置(目录/并发/日志等)
    • wpgen.toml:数据生成器配置(已指向 Kafka sink,并覆写输入 topic)
  • topology/source/wpsrc.toml:Source 路由(包含两个 [[sources]]kafka_input 订阅输入 topic;kafka_output_tap 订阅输出 topic,用于自测/演示,可按需关闭)
  • topology/sink/business.d/example.toml:业务 Sink 路由(包含一个文件型 sink 与一个 Kafka sink)
  • models/oml/...:OML 模型(结果字段映射/脱敏)
  • case_verify.sh:一键校验脚本(启动 wparsewpgen 发送 → 校验)

说明:Source 与 Sink 连接器 id 引用仓库根目录 connectors/ 下的定义:

  • connectors/source.d/30-kafka.toml:id=kafka_src(允许覆写 topic/group_id/config
  • connectors/sink.d/30-kafka.toml:id=kafka_sink(允许覆写 topic/config/num_partitions/replication/brokers/fmt

前置要求

  • 本机已启动 Kafka,默认地址 localhost:9092(或通过环境变量覆盖,见下文)

快速开始

进入用例目录并运行脚本(默认 debug):

cd extensions/wp-connectors/testcase
./case_verify.sh            # 或 ./case_verify.sh release

脚本主要步骤:

  1. 清理运行目录(保留 conf/ 模板)并构建二进制到 target/<profile>,加入 PATH
  2. wpkit conf check 进行配置自检;清理数据目录
  3. 后台启动 wparse-n 限制处理条数,完成后自动退出)
  4. 执行 wpgen sample 生成样例数据并写入 Kafka 输入 topic
  5. 等待 wparse 退出并进行文件型 sink 校验(可选)

运行参数

脚本支持以下可选环境变量:

  • PROFILE:构建与运行的 profile(debug|release),默认 debug
  • LINE_CNT:生成/处理的样例条数,默认 3000
  • STAT_SEC:统计打印间隔(秒),默认 3
  • KAFKA_BOOTSTRAP_SERVERS:Kafka 地址,默认 localhost:9092
  • KAFKA_INPUT_TOPIC:输入 topic(wpgen 写入、wparse 消费),默认 wp.testcase.events.raw
  • KAFKA_OUTPUT_TOPIC:输出 topic(wparse 的 Kafka sink 写入),默认 wp.testcase.events.parsed

示例:

KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 KAFKA_INPUT_TOPIC=my_in KAFKA_OUTPUT_TOPIC=my_out ./case_verify.sh

结果验证

  • 文件型 Sink:脚本会执行 wpkit stat filewpkit validate sink-file -v,在 data/out_dat/ 下可见 events.parsed.prototext(按 models/sink/business.d/example.toml 的文件型 sink 配置)
  • Kafka 输出:可选执行以下命令查看输出 topic(建议使用全新 group,以免被其他消费者读走)
wpkit kafka consume --brokers ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092} \
  --group wpkit-consume-$$ \
  --topic "${KAFKA_OUTPUT_TOPIC:-wp.testcase.events.parsed}"