Kafka
本目录提供一套基于 Kafka 的端到端校验用例,验证统一 Kafka Source/Sink 连接器是否按预期工作。
- 发送端:
wpgen将样例数据写入 Kafka(输入 topic,默认wp.testcase.events.raw) - 引擎端:
wparse读取 Kafka 输入,解析并路由到多个 Sink(其中包含 Kafka 输出 topic,默认wp.testcase.events.parsed,以及一个文件型 Sink) - 可选校验:使用
wpkit kafka consume验证 Kafka 输出 topic 的消息
数据流图
下图展示 testcase 的数据流与关键环节(wp.testcase.events.raw/wp.testcase.events.parsed)。
flowchart LR
subgraph Producer
WPGEN[wpgen sample]\n(按 wpgen.toml 写 Kafka)
end
subgraph Kafka
KAFKA_IN[(KAFKA_INPUT_TOPIC)]
KAFKA_OUT[(KAFKA_OUTPUT_TOPIC)]
end
subgraph Engine
WPARSE[wparse batch\n(-n 限制条数自动退出)]
SINKS{{Sink Group\n(models/sink)}}
OML[OML 映射/脱敏]
end
subgraph Verifier
FILE[file sink: events.parsed.prototext]
CONSUME[wpkit kafka consume\n(可选验证)]
end
WPGEN -- produce --> KAFKA_IN
WPARSE -- consume --> KAFKA_IN
WPARSE -- route --> OML --> SINKS
SINKS -- write --> FILE
SINKS -- produce --> KAFKA_OUT
CONSUME -- verify --> KAFKA_OUT
如渲染不支持 Mermaid,可参考 ASCII 版:
wpgen(sample) --> Kafka(KAFKA_INPUT_TOPIC) --> wparse(batch) --> [OML/route] --> sinks{file,kafka}
sinks --> file: data/out_dat/events.parsed.prototext
sinks --> Kafka(KAFKA_OUTPUT_TOPIC) --> (optional) wpkit kafka consume 验证
目录结构
conf/wparse.toml:引擎主配置(目录/并发/日志等)wpgen.toml:数据生成器配置(已指向 Kafka sink,并覆写输入 topic)
topology/source/wpsrc.toml:Source 路由(包含两个[[sources]]:kafka_input订阅输入 topic;kafka_output_tap订阅输出 topic,用于自测/演示,可按需关闭)topology/sink/business.d/example.toml:业务 Sink 路由(包含一个文件型 sink 与一个 Kafka sink)models/oml/...:OML 模型(结果字段映射/脱敏)case_verify.sh:一键校验脚本(启动wparse→wpgen发送 → 校验)
说明:Source 与 Sink 连接器 id 引用仓库根目录 connectors/ 下的定义:
connectors/source.d/30-kafka.toml:id=kafka_src(允许覆写topic/group_id/config)connectors/sink.d/30-kafka.toml:id=kafka_sink(允许覆写topic/config/num_partitions/replication/brokers/fmt)
前置要求
- 本机已启动 Kafka,默认地址
localhost:9092(或通过环境变量覆盖,见下文)
快速开始
进入用例目录并运行脚本(默认 debug):
cd extensions/wp-connectors/testcase
./case_verify.sh # 或 ./case_verify.sh release
脚本主要步骤:
- 清理运行目录(保留
conf/模板)并构建二进制到target/<profile>,加入PATH wpkit conf check进行配置自检;清理数据目录- 后台启动
wparse(-n限制处理条数,完成后自动退出) - 执行
wpgen sample生成样例数据并写入 Kafka 输入 topic - 等待
wparse退出并进行文件型 sink 校验(可选)
运行参数
脚本支持以下可选环境变量:
PROFILE:构建与运行的 profile(debug|release),默认debugLINE_CNT:生成/处理的样例条数,默认3000STAT_SEC:统计打印间隔(秒),默认3KAFKA_BOOTSTRAP_SERVERS:Kafka 地址,默认localhost:9092KAFKA_INPUT_TOPIC:输入 topic(wpgen写入、wparse消费),默认wp.testcase.events.rawKAFKA_OUTPUT_TOPIC:输出 topic(wparse的 Kafka sink 写入),默认wp.testcase.events.parsed
示例:
KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 KAFKA_INPUT_TOPIC=my_in KAFKA_OUTPUT_TOPIC=my_out ./case_verify.sh
结果验证
- 文件型 Sink:脚本会执行
wpkit stat file与wpkit validate sink-file -v,在data/out_dat/下可见events.parsed.prototext(按models/sink/business.d/example.toml的文件型 sink 配置) - Kafka 输出:可选执行以下命令查看输出 topic(建议使用全新 group,以免被其他消费者读走)
wpkit kafka consume --brokers ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092} \
--group wpkit-consume-$$ \
--topic "${KAFKA_OUTPUT_TOPIC:-wp.testcase.events.parsed}"