Kafka
This directory provides an end-to-end Kafka validation case to verify the unified Kafka Source/Sink connectors work as expected.
- Producer:
wpgenwrites sample data to Kafka (input topic, defaultwp.testcase.events.raw) - Engine:
wparsereads from Kafka input, parses and routes to multiple sinks (including Kafka output topic, defaultwp.testcase.events.parsed, and a file sink) - Optional verification: Use
wpkit kafka consumeto verify messages on the Kafka output topic
Data Flow
The diagram below shows the data flow and key components (wp.testcase.events.raw/wp.testcase.events.parsed).
flowchart LR
subgraph Producer
WPGEN[wpgen sample]\n(按 wpgen.toml 写 Kafka)
end
subgraph Kafka
KAFKA_IN[(KAFKA_INPUT_TOPIC)]
KAFKA_OUT[(KAFKA_OUTPUT_TOPIC)]
end
subgraph Engine
WPARSE[wparse batch\n(-n 限制条数自动退出)]
SINKS{{Sink Group\n(models/sink)}}
OML[OML 映射/脱敏]
end
subgraph Verifier
FILE[file sink: events.parsed.prototext]
CONSUME[wpkit kafka consume\n(可选验证)]
end
WPGEN -- produce --> KAFKA_IN
WPARSE -- consume --> KAFKA_IN
WPARSE -- route --> OML --> SINKS
SINKS -- write --> FILE
SINKS -- produce --> KAFKA_OUT
CONSUME -- verify --> KAFKA_OUT
If Mermaid is not supported, refer to the ASCII version:
wpgen(sample) --> Kafka(KAFKA_INPUT_TOPIC) --> wparse(batch) --> [OML/route] --> sinks{file,kafka}
sinks --> file: data/out_dat/events.parsed.prototext
sinks --> Kafka(KAFKA_OUTPUT_TOPIC) --> (optional) wpkit kafka consume verification
Directory Structure
conf/wparse.toml: Engine main config (directories/concurrency/logging, etc.)wpgen.toml: Data generator config (points to Kafka sink, overrides input topic)
topology/source/wpsrc.toml: Source routing (contains two[[sources]]:kafka_inputsubscribes to input topic;kafka_output_tapsubscribes to output topic for self-testing/demo, can be disabled as needed)topology/sink/business.d/example.toml: Business sink routing (contains a file sink and a Kafka sink)models/oml/...: OML models (result field mapping/masking)case_verify.sh: One-click verification script (startswparse→wpgensends → verification)
Note: Source and Sink connector IDs reference definitions in the repository root connectors/ directory:
connectors/source.d/30-kafka.toml: id=kafka_src(allows overridingtopic/group_id/config)connectors/sink.d/30-kafka.toml: id=kafka_sink(allows overridingtopic/config/num_partitions/replication/brokers/fmt)
Prerequisites
- Kafka running locally, default address
localhost:9092(or override via environment variables, see below)
Quick Start
Enter the case directory and run the script (default debug):
cd extensions/wp-connectors/testcase
./case_verify.sh # or ./case_verify.sh release
Main script steps:
- Clean run directory (preserving
conf/templates), build binaries totarget/<profile>, add toPATH wpkit conf checkfor config self-check; clean data directory- Start
wparsein background (-nlimits processing count, auto-exits on completion) - Run
wpgen sampleto generate sample data and write to Kafka input topic - Wait for
wparseto exit and perform file sink verification (optional)
Parameters
The script supports the following optional environment variables:
PROFILE: Build and run profile (debug|release), defaultdebugLINE_CNT: Number of sample records to generate/process, default3000STAT_SEC: Statistics print interval (seconds), default3KAFKA_BOOTSTRAP_SERVERS: Kafka address, defaultlocalhost:9092KAFKA_INPUT_TOPIC: Input topic (wpgenwrites,wparseconsumes), defaultwp.testcase.events.rawKAFKA_OUTPUT_TOPIC: Output topic (wparseKafka sink writes), defaultwp.testcase.events.parsed
Example:
KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 KAFKA_INPUT_TOPIC=my_in KAFKA_OUTPUT_TOPIC=my_out ./case_verify.sh
Result Verification
- File Sink: The script runs
wpkit stat fileandwpkit validate sink-file -v; output is available indata/out_dat/asevents.parsed.prototext(permodels/sink/business.d/example.tomlfile sink config) - Kafka Output: Optionally run the following command to view output topic (recommend using a fresh group to avoid messages being consumed by other consumers)
wpkit kafka consume --brokers ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092} \
--group wpkit-consume-$$ \
--topic "${KAFKA_OUTPUT_TOPIC:-wp.testcase.events.parsed}"
Kafka (中文)
本目录提供一套基于 Kafka 的端到端校验用例,验证统一 Kafka Source/Sink 连接器是否按预期工作。
- 发送端:
wpgen将样例数据写入 Kafka(输入 topic,默认wp.testcase.events.raw) - 引擎端:
wparse读取 Kafka 输入,解析并路由到多个 Sink(其中包含 Kafka 输出 topic,默认wp.testcase.events.parsed,以及一个文件型 Sink) - 可选校验:使用
wpkit kafka consume验证 Kafka 输出 topic 的消息
数据流图
下图展示 testcase 的数据流与关键环节(wp.testcase.events.raw/wp.testcase.events.parsed)。
flowchart LR
subgraph Producer
WPGEN[wpgen sample]\n(按 wpgen.toml 写 Kafka)
end
subgraph Kafka
KAFKA_IN[(KAFKA_INPUT_TOPIC)]
KAFKA_OUT[(KAFKA_OUTPUT_TOPIC)]
end
subgraph Engine
WPARSE[wparse batch\n(-n 限制条数自动退出)]
SINKS{{Sink Group\n(models/sink)}}
OML[OML 映射/脱敏]
end
subgraph Verifier
FILE[file sink: events.parsed.prototext]
CONSUME[wpkit kafka consume\n(可选验证)]
end
WPGEN -- produce --> KAFKA_IN
WPARSE -- consume --> KAFKA_IN
WPARSE -- route --> OML --> SINKS
SINKS -- write --> FILE
SINKS -- produce --> KAFKA_OUT
CONSUME -- verify --> KAFKA_OUT
如渲染不支持 Mermaid,可参考 ASCII 版:
wpgen(sample) --> Kafka(KAFKA_INPUT_TOPIC) --> wparse(batch) --> [OML/route] --> sinks{file,kafka}
sinks --> file: data/out_dat/events.parsed.prototext
sinks --> Kafka(KAFKA_OUTPUT_TOPIC) --> (optional) wpkit kafka consume 验证
目录结构
conf/wparse.toml:引擎主配置(目录/并发/日志等)wpgen.toml:数据生成器配置(已指向 Kafka sink,并覆写输入 topic)
topology/source/wpsrc.toml:Source 路由(包含两个[[sources]]:kafka_input订阅输入 topic;kafka_output_tap订阅输出 topic,用于自测/演示,可按需关闭)topology/sink/business.d/example.toml:业务 Sink 路由(包含一个文件型 sink 与一个 Kafka sink)models/oml/...:OML 模型(结果字段映射/脱敏)case_verify.sh:一键校验脚本(启动wparse→wpgen发送 → 校验)
说明:Source 与 Sink 连接器 id 引用仓库根目录 connectors/ 下的定义:
connectors/source.d/30-kafka.toml:id=kafka_src(允许覆写topic/group_id/config)connectors/sink.d/30-kafka.toml:id=kafka_sink(允许覆写topic/config/num_partitions/replication/brokers/fmt)
前置要求
- 本机已启动 Kafka,默认地址
localhost:9092(或通过环境变量覆盖,见下文)
快速开始
进入用例目录并运行脚本(默认 debug):
cd extensions/wp-connectors/testcase
./case_verify.sh # 或 ./case_verify.sh release
脚本主要步骤:
- 清理运行目录(保留
conf/模板)并构建二进制到target/<profile>,加入PATH wpkit conf check进行配置自检;清理数据目录- 后台启动
wparse(-n限制处理条数,完成后自动退出) - 执行
wpgen sample生成样例数据并写入 Kafka 输入 topic - 等待
wparse退出并进行文件型 sink 校验(可选)
运行参数
脚本支持以下可选环境变量:
PROFILE:构建与运行的 profile(debug|release),默认debugLINE_CNT:生成/处理的样例条数,默认3000STAT_SEC:统计打印间隔(秒),默认3KAFKA_BOOTSTRAP_SERVERS:Kafka 地址,默认localhost:9092KAFKA_INPUT_TOPIC:输入 topic(wpgen写入、wparse消费),默认wp.testcase.events.rawKAFKA_OUTPUT_TOPIC:输出 topic(wparse的 Kafka sink 写入),默认wp.testcase.events.parsed
示例:
KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 KAFKA_INPUT_TOPIC=my_in KAFKA_OUTPUT_TOPIC=my_out ./case_verify.sh
结果验证
- 文件型 Sink:脚本会执行
wpkit stat file与wpkit validate sink-file -v,在data/out_dat/下可见events.parsed.prototext(按models/sink/business.d/example.toml的文件型 sink 配置) - Kafka 输出:可选执行以下命令查看输出 topic(建议使用全新 group,以免被其他消费者读走)
wpkit kafka consume --brokers ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092} \
--group wpkit-consume-$$ \
--topic "${KAFKA_OUTPUT_TOPIC:-wp.testcase.events.parsed}"