Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

OML Examples

This example provides comprehensive OML (Object Modeling Language) transformation demonstrations covering data transformation, field mapping, conditional matching, and knowledge base queries.

Purpose

Validate the ability to:

  • Transform parsed data using OML models
  • Perform conditional matching with match expressions
  • Query knowledge bases with SQL-like syntax
  • Create nested objects and collect fields dynamically

Features Validated

FeatureDescription
Conditional Matchingmatch ... {} with pattern matching
Range Matchingin(digit(1), digit(3)) for value ranges
Tuple Matching(read(city), read(count)) for multi-field conditions
Negation Matching!chars(warp) for exclusion patterns
Optional Fieldstake(option:[severity]) for optional handling
Knowledge Base Queryselect ... from ... where SQL-like queries
Wildcard Collection* = take() for passthrough fields
Object Creationobject { ... } for nested structures
Pipeline Processingpipe ... | base64_en for transformations

OML Models Included

ModelDescription
csv_example.omlCSV data processing with conditional matching
skyeye_stat.omlSystem monitoring data transformation
work_case.omlBusiness case data processing

Quick Start

cd core/oml_examples
./run.sh

oml_examples (中文)

本用例演示“OML(Object Modeling Language)转换“的多种场景:通过丰富的 OML 示例展示数据转换、字段映射、条件匹配、知识库查询等高级特性。适用于学习 OML 语法与最佳实践。

目录结构

core/oml_examples/
├── README.md                    # 本说明文档
├── run.sh                       # 一键运行脚本
├── conf/                        # 配置文件目录
│   ├── wparse.toml             # WarpParse 主配置
│   └── wpgen.toml              # 数据生成器配置
├── models/                      # 规则与模型目录
│   ├── oml/                    # OML 转换模型
│   │   ├── csv_example.oml     # CSV 数据处理与条件匹配
│   │   ├── skyeye_stat.oml     # 系统监控数据转换
│   │   └── work_case.oml       # 工作案例数据处理
│   ├── knowledge/              # 知识库数据
│   │   ├── knowdb.toml         # 知识库主配置
│   │   ├── address/            # 地址信息知识库
│   │   │   ├── create.sql      # 建表语句
│   │   │   ├── data.csv        # 地址数据
│   │   │   └── insert.sql      # 插入语句
│   │   ├── example/            # 示例数据知识库
│   │   │   ├── create.sql      # 建表语句
│   │   │   ├── data.csv        # 示例数据
│   │   │   └── insert.sql      # 插入语句
│   │   └── example_score/      # 分数数据知识库
│   │       ├── create.sql      # 建表语句
│   │       ├── data.csv        # 分数数据
│   │       └── insert.sql      # 插入语句
│   ├── sinks/                  # 数据汇配置
│   │   ├── defaults.toml       # 默认配置
│   │   ├── business.d/         # 业务路由配置
│   │   │   ├── csv_example.toml # CSV 示例输出
│   │   │   ├── skyeye_stat.toml # 系统监控输出
│   │   │   └── work_case.toml   # 工作案例输出
│   │   └── infra.d/            # 基础设施配置
│   │       ├── default.toml    # 默认数据汇
│   │       ├── error.toml      # 错误数据处理
│   │       ├── miss.toml       # 缺失数据处理
│   │       ├── monitor.toml    # 监控数据处理
│   │       └── residue.toml    # 残留数据处理
│   ├── sources/                # 数据源配置(空)
│   └── wpl/                    # WPL 解析规则(空)
├── data/                        # 运行数据目录
│   ├── in_dat/                  # 输入数据目录
│   ├── out_dat/                 # 输出数据目录
│   │   ├── csv_example.dat     # CSV 处理结果
│   │   ├── skyeye_adm.json     # SkyEye ADM 输出
│   │   ├── skyeye_pdm.json     # SkyEye PDM 输出
│   │   └── work_case.json      # 工作案例输出
│   └── logs/                    # 日志文件目录
│       ├── gen.dat             # 生成的样本数据
│       └── *.log               # 各类日志文件
└── .run/                        # 运行时数据目录

快速开始

运行环境要求

  • WarpParse 引擎(需在系统 PATH 中)
  • Bash shell 环境

运行命令

# 进入用例目录
cd core/oml_examples

# 运行完整流程(默认生成 3000 条测试数据)
./run.sh

执行逻辑

流程概览

run.sh 脚本执行以下主要步骤:

  1. 环境初始化

    • 保留必要的配置文件(wparse.toml, wpgen.toml)
    • 清理历史运行数据
    • 创建必要的目录结构
    • 初始化知识库数据
  2. 服务检查

    • 使用 wproj check 验证配置和模型
    • 确保所有依赖项正常
  3. 数据清理

    • 清空输入输出数据目录
    • 重置日志文件
    • 清理生成器缓存
  4. 生成样本数据

    • 使用 wpgen sample 生成测试数据
    • 数据包含 CSV、JSON 等多种格式
    • 模拟真实的业务场景数据
  5. 执行数据解析

    • 启动 WarpParse 批处理模式
    • 加载 OML 模型进行数据转换
    • 应用知识库查询增强数据
  6. 验证输出结果

    • 统计各输出文件的数据量
    • 验证数据转换的正确性
    • 检查知识库查询结果

数据流向

生成数据 (data/logs/gen.dat)
    ↓
WarpParse OML 引擎
    ↓
┌─────────────┬─────────────┬─────────────┐
│ csv_example │ skyeye_stat │ work_case   │
│    处理     │    转换     │    解析     │
└─────────────┴─────────────┴─────────────┘
    ↓             ↓             ↓
┌─────────────┬─────────────┬─────────────┐
│csv_example  │skyeye_adm   │work_case    │
│    .dat     │  .json      │   .json     │
└─────────────┴─────────────┴─────────────┘
              └─────────────┘
                skyeye_pdm
                  .json

OML 示例详解

1. csv_example.oml - CSV 数据处理与条件匹配

name: csv_example
rule : csv_example/*
---
occur_time  =  Time::now() ;
year  = take();
sid   = digit(10);

quart   = match  read(month) {
    in ( digit(1) , digit(3) )    => chars(Q1);
    in ( digit(4) , digit(6) )    => chars(Q2);
    in ( digit(7) , digit(9) )    => chars(Q3);
    in ( digit(10) , digit(12) )  => chars(Q4);
    _ => chars(Q5);
};

level  = match  ( read(city) , read(count) ) {
   ( chars(cs) , in ( digit(81) ,  digit(200) ) )    => chars(GOOD);
   ( chars(cs) , in ( digit(0) ,   digit(80) )  )   => chars(BAD);
   ( chars(bj) , in ( digit(101) , digit(200) ) )   => chars(GOOD);
   ( chars(bj) , in ( digit(0) ,   digit(100) ) )   => chars(BAD);
    _ => chars(NOR);
};

vender = match  read(product)  {
    chars(warp)   =>   chars(warp-rd)
    !chars(warp)   =>  chars(other)
};

severity: chars =  match take(option:[severity]) {
    digit(0) => chars(未知);
    digit(1) => chars(信息);
    digit(2) => chars(低危);
    digit(3) => chars(高危(漏洞));
};

math_score = select math  from example_score where id = read(sid) ;
*  = take() ;

特性演示

  • 时间戳生成Time::now() 获取当前时间
  • 范围匹配in (digit(1), digit(3)) 匹配数值范围
  • 元组匹配(read(city), read(count)) 多字段联合匹配
  • 否定匹配!chars(warp) 排除特定值
  • 可选字段take(option:[severity]) 处理可选字段
  • 知识库查询select ... from ... where 动态查询数据
  • 透传字段* = take() 保留所有未处理字段

2. skyeye_stat.oml - 系统监控数据转换

name : skyeye_stat
rule : skyeye_stat/*
---
vendor      = chars(wparse) ;
v_ip        = ip(127.0.0.1) ;
recv_time   = Time::now() ;
cust_tag    = fmt("[{pos_sn}-{sip}]", @pos_sn, @sip ) ;

value  = object {
  process,memory_free : float =  take() ;
  cpu_free,cpu_used_by_one_min, cpu_used_by_fifty_min : float =  take() ;
  disk_free,disk_used,disk_used_by_one_min, disk_used_by_fify_min : float =  take() ;
} ;

time_all  = collect  take( keys : [ *time* ] ) ;
raw_msg  =  pipe take() | base64_en ;

特性演示

  • 常量赋值chars(wparse)ip(127.0.0.1) 固定值
  • 格式化字符串fmt() 模板字符串
  • 嵌套对象object { ... } 创建结构化数据
  • 字段收集collect take(keys: [*time*]) 动态收集字段
  • 管道处理pipe ... | base64_en 数据流处理

3. work_case.oml - 工作案例数据处理

name : work_case
rule : work_case/*
---
agent_id   = take() ;
symbol     = take() ;
botnet     = take() ;

# 条件分支处理
symbol: chars = match read(symbol) {
    chars(web_cve) => chars(Web漏洞);
    chars(os_cve)  => chars(系统漏洞);
    _              => read(symbol);
};

# 动态路径收集
path_list = collect read(keys: [details*path]) ;

# 透传剩余字段
* = take() ;

特性演示

  • 简单字段提取:基础的数据提取
  • 条件替换:基于值的字段映射
  • 通配符收集[details*path] 模糊匹配收集
  • 剩余字段处理:保留所有未定义字段

配置说明

主配置文件 (conf/wparse.toml)

version = "1.0"
robust = "normal"

[models]
wpl = "./models/wpl"
oml = "./models/oml"
sources = "./models/sources"
sinks = "./models/sinks"

[performance]
rate_limit_rps = 500000   # 高速率限制,适合批处理
parse_workers = 2         # 解析工作线程数

[rescue]
path = "./data/rescue"

[log_conf]
level = "warn,ctrl=info,launch=info,source=info,sink=info,stat=info,runtime=warn,oml=warn,wpl=warn,klib=warn,orion_error=error,orion_sens=warn"
output = "File"

[stat]
window_sec = 60

数据生成器配置 (conf/wpgen.toml)

[generator]
mode = "sample"          # 使用预定义样本
count = 1000            # 生成数据条数
speed = 1000            # 生成速度(条/秒)
parallel = 1            # 并发数

[output]
connect = "file_raw_sink"

[output.params]
base = "data/in_dat"
file = "gen.dat"

[log_conf]
level = "info"
output = "File"

知识库配置 (models/knowledge/knowdb.toml)

version = "2.0"

[default]
transaction = true       # 启用事务
batch_size = 2000       # 批处理大小

[csv]
has_header = true       # CSV 包含表头
delimiter = ","         # 分隔符
encoding = "utf-8"      # 编码

[table.example_score]
mapping = "header"      # 按表头映射
range = "5,110"         # 数据行范围

[table.address]
mapping = "index"       # 按索引映射
range = "5,110"         # 数据行范围

业务 Sink 配置示例 (models/sinks/business.d/csv_example.toml)

version = "2.0"

[sink]
name = "csv_example"
type = "file_csv_sink"
connect = "csv_sink"

[sink.params]
base = "data/out_dat"
file = "csv_example.dat"

[sink.expect]
basis = "total_input"
ratio = 0.7              # 期望 70% 的数据进入此 sink
deviation = 0.01         # 允许 1% 的偏差

验证

运行成功验证

  1. 输出文件统计

    wproj data stat
    
  2. 验证数据完整性

    wproj data validate
    
  3. 查看输出样例

    # CSV 输出
    head -20 data/out_dat/csv_example.dat
    
    # JSON 输出
    jq . data/out_dat/skyeye_adm.json | head -40
    

使用不同的输出格式

配置不同的 Sink 类型:

  • JSON 输出type = "file_json_sink"
  • CSV 输出type = "file_csv_sink"
  • KV 输出type = "file_kv_sink"
  • 原始输出type = "file_raw_sink"

本文档最后更新时间:2025-12-16