u1timate
发布于 2025-03-14 / 16 阅读
0
0

ES威胁狩猎规则编写

0x01 基于KQL编写规则

KQL:(Kibana Query Language )查询语法是Kibana为了简化ES查询设计的一套简单查询语法,Kibana支持索引字段和语法补全,可以非常方便的查询数据。如果关闭 KQL,Kibana 将使用 Lucene。

KQL仅过滤数据,在汇总,转换或分类数据中没有作用。

KQL不要与具有不同功能集的Lucene查询语言混淆。

常用的查询语句

#获取该字段存在的文档列表
http.request.method: * 

# 全文搜索
Hello

# 搜索指定字段的值
# 在查询keyword,数字,日期或布尔字段时,该值必须是确切的匹配,查询文本字段的时候就可以模糊搜索
http.request.body.content: "null pointer"   

# 范围搜索
http.response.bytes > 10000 and http.response.bytes <= 20000

@timestamp < now-2w  # 参见https://www.elastic.co/guide/en/elasticsearch/reference/8.17/common-options.html#date-math

# 通配符搜索,当前仅支持*
http.response.status_code: 4*  # 查找以4开头的文档

# 取反操作
NOT http.request.method: GET # 获取不是Get的文档

# 复杂查询
http.request.method: GET OR http.response.status_code: 400

(http.request.method: GET AND http.response.status_code: 200) OR
(http.request.method: POST AND http.response.status_code: 400)

http.request.method: (GET OR POST OR DELETE)  # 速记语法

# 匹配多个字段,前提是这些字段的类型是相同的,否则会报错
datastream.*: logs


# 查询数组,如user:[{first:"", last:""},{}],查询示例如下
user:{ first: "Alice" and last: "White" }

0x02 聚合查询

聚合查询结果以检测匹配数目何时超过阈值。

如当钉钉代离职人员操作文件数量大于10的时候,告警

0x03 EQL

Event Query Language (EQL) 是基于事件的时间序列数据的查询语言。在 Elastic Security 平台上,当输入有效的 EQL 时,查询会在数据节点上编译,执行查询并返回结果。这一切都快速、并行地发生,让用户立即看到结果。

EQL syntax reference | Elasticsearch Guide [8.15]

[警告]EQL搜索必须得字段如下:

时间戳@timestamp 字段

event.category 字段

没有这些字段,在XDR中编写EQL规则,会报错

语法简介

event_category where condition # event_category 默认使用ECS标准的event.category,如果event_category为any的情况下,表示匹配任意事件类型

如果需要替换需要在API中设置event_category_field的值,如下

GET /my-data-stream/_eql/search
{
  "timestamp_field": "file.accessed",
  "event_category_field": "file.type", # 这里就将类型字段修改file.type了
  "query": """
    file where (file.size > 1 and file.type == "file")
  """
}

查询示例

event.category为process, 且process.name为regsvr32.exe的文档

{
  "query": """
    process where process.name == "regsvr32.exe"
  """
}

默认情况下会返回10个文档,且根据时间升序排列。

{
  "query": """
    process where process.name == "regsvr32.exe"
  """,
  "size": 50  # 指定返回的文档数量
}

匹配任意类型的事件

any where process.name == "regsvr32.exe"

注意:如果字段包含-以数字开头包含空格的情况,需要包裹,如process name` == "regsvr32.exe"

匹配指定事件的任意条件

process where true

正则表达式

my_field like  "VALUE*"         // 大小写敏感的模糊匹配,以VALUE开头,如果是?表示只匹配一个
my_field like  "VALUE?"  //大小写敏感的模糊匹配,以VALUE开头,匹配长度为1,比如VALUES匹配成功,VALUESS匹配失败
my_field like~ "value*"         //大小写不敏感的模糊匹配

my_field regex  "VALUE[^Z].?"   // 大小写敏感的正则匹配
my_field regex~ "value[^z].?"   // 大小写不敏感的正则匹配

比较的局限性

EQL是不支持链式比较的,也不支持两个字段进行直接比较,比如

0<file.count<10

process where process.parent.name == "foo" and process.parent.name == process.name 

应该换成这样

file.count<10 and file.count>0

process where process.parent.name == "foo" and process.name == "foo"

查询操作符

my_field in ("Value-1", "VALUE2", "VAL3")                 // case-sensitive
my_field in~ ("value-1", "value2", "val3")                // case-insensitive

my_field not in ("Value-1", "VALUE2", "VAL3")             // case-sensitive
my_field not in~ ("value-1", "value2", "val3")            // case-insensitive

my_field : ("value-1", "value2", "val3")                  // case-insensitive,仅支持字符串比较

my_field like  ("Value-*", "VALUE2", "VAL?")              // case-sensitive
my_field like~ ("value-*", "value2", "val?")              // case-insensitive

my_field regex  ("[vV]alue-[0-9]", "VALUE[^2].?", "VAL3") // case-sensitive
my_field regex~  ("value-[0-9]", "value[^2].?", "val3")   // case-insensitive

字段是否存在

?my_field != null  # my_field这个字段存在
?my_field != null   # my_filed这个字段不存在

[旗子]Sequence时序

时序数据是按时间排序的、具有明确时间戳的事件记录,例如日志、监控数据和事务数据。可以做事件关联

时序匹配方式

基础查询: 匹配所有符合条件的单个事件。

process where process.name == "regsvr32.exe"

上述查询会筛选所有进程名为 regsvr32.exe 的事件。

事件序列匹配 (sequence): 按时间顺序寻找一组相关事件。

sequence
  [ process where process.name == "regsvr32.exe" ]
  [ file where file.name == "scrobj.dll" ]

匹配一个进程 regsvr32.exe 和文件 scrobj.dll 的创建,要求事件发生顺序符合书写顺序

时间约束 (maxspan): 为事件序列的发生时间添加约束,例如必须在 1 小时内完成:

sequence with maxspan=1h
  [ process where process.name == "regsvr32.exe" ]
  [ file where file.name == "scrobj.dll" ]

缺失事件 (!): 匹配不包含某些条件的操作,例如:

sequence
  [ process where process.name == "cmd.exe" ]
  ![ process where process.command_line contains "ocx" ]
  [ file where file.name == "important.log" ]

在执行 cmd.exe 后,没有执行包含 ocx 的命令行动作,却最后创建了 important.log 文件。

by关键字

by 关键字是用于 按字段值分组 的方式,确保序列中的事件是关联的,并且字段值是共享的。确保匹配结果属于同一个逻辑上下文(如用户、进程、主机等)。例如:

  1. 按相同的用户分组。

  2. 按同一进程分组。

  3. 按设备或 IP 地址分组。

sequence
  [ file where file.extension == "exe" ] by user.name, file.path
  [ process where true ] by user.name, process.executable

查询逻辑中要求事件按时间顺序出现:

1、第一个事件匹配 file.extension == "exe" 且分组字段值满足一致性(相同用户(user.name)且使用相同文件路径(file.path)的事件)。

2、与第一个事件相同用户(user.name)且运行相同可执行文件路径(process.executable)的所有process事件会被命中

sequence by process.pid
  [ process where process.name == "cmd.exe" ]
  [ file where file.name == "important.txt" ]

在事件流中,找到进程 ID (process.pid) 相同的事件,并且必须按顺序满足以下条件:

  1. 名为 cmd.exe 的进程。

  2. 该进程操作了名为 important.txt 的文件。

  • 典型场景的适用性

    • 文件-进程链式关联(恶意文件运行)。

    • 用户操作追踪(监控用户对敏感文件的交互)。

    • 功能扩展场景(加入 maxspan 限定时间范围等)。

until关键字

用于指定事件序列的终止条件。换句话说,当 sequence 查询匹配到 until 所定义的特定事件时,序列的追踪会提前终止,而不会继续寻找后续的事件。

基本语法

sequence
  [ event_type_1 where condition_1 ]
  [ event_type_2 where condition_2 ]
  ...
until
  [ event_type_termination where termination_condition ]

使用示例

sequence by process.pid
  [ process where process.name == "notepad.exe" ]
  [ file where file.extension == "txt" ]
until
  [ process where event.action == "termination" ]

查找名称为 notepad.exe 的进程。找到该进程后,如果它与 .txt 文件发生交互,事件符合序列。直到如果检测到此进程的终止事件(event.action == "termination"),查询将停止进一步追踪。

  • 适用的场景

  1. 进程行为终止:

    1. 只追踪进程在正常运行期间的事件,忽略进程终止后的事件。

    2. 示例:分析恶意软件运行前的行为链。

  2. 网络连接状态终止:

    1. 若网络连接被关闭,停止追踪连接的所有后续事件。

    2. 示例:追踪数据泄露或异常连接。

  3. 用户登录/登出行为链:

    1. 当用户退出时,不再追踪该用户的行为。

    2. 示例:分析用户的高危操作,直到用户主动退出为止。

  4. 对象操作终止:

    1. 文件、资源或设备被删除时停止追踪后续操作

    2. 示例:追踪被修改的文件,直到文件被删除为止。

with run 语法

用于将事件序列的匹配与 数据分组 次数统计 相结合。它可以帮助我们实现特定上下文下的批量匹配,从而有效地表达 重复行为 的查询逻辑。

将一个或多个源事件序列在分组基础上进行匹配,并验证其是否满足预期的重复次数。可结合统计行为场景,例如:某用户多次登录失败后,再查看关键数据文件。

sequence with runs=次数条件
  [ event_type_1 where condition_1 ]
  [ event_type_2 where condition_2 ]

runs:指定序列在分组条件(如按用户分组)下匹配的次数限制,例如 runs=5 代表该序列必须重复发生 5 次。

示例:

sequence with runs=5 by user.name
  [ authentication where event.type == "login" and status == "failed" ]

同一用户是否尝试登录超过了5次

sequence
  [ process where event.type == "creation" ] by process.executable
  [ library where process.name == "regsvr32.exe" ] by dll.path with runs=3

这段查询会查找以下两个逻辑上相关的事件序列:

事件 1:某个进程可执行文件(例如 malicious.exe)的 创建操作。

事件 2:与该进程相关联的 regsvr32.exe 连续加载3 次相同 DLL 文件 的行为。

使用场景

  • 入侵检测: 例如检测恶意软件活动:

sequence
  [ process where process.name == "powershell.exe" ]
  [ network where network.direction == "outbound" and network.name == "malicious_server.com" ]

此查询查找通过 PowerShell 到外部恶意服务器的网络连接。

  • 异常行为监控: 检测系统中的异常登录行为:

sequence with maxspan=10m
  [ authentication where user.name == "admin" and event.type == "successful" ]
  [ authentication where user.name == "admin" and event.type == "failed" ]

查询管理员在 10 分钟内成功登录后立即尝试失败登录。

  • 检测特定用户的行为链

sequence by user.name
  [ authentication where event.type == "login" and result == "success" ]
  [ process where process.name == "powershell.exe" ]
  [ network where network.direction == "outbound" and destination.ip == "192.168.0.100" ]

匹配同一个用户名 (user.name) 下按照时间顺序发生的事件链:

1、成功登录。

2、使用 PowerShell 运行命令。

3、执行了 outbound 网络连接到 IP 地址 192.168.0.100。

  • 限时重复行为

sequence with runs=5 by user.name with maxspan=30m
  [ authentication where event.type == "login" and status == "failed" ]

匹配在 30 分钟内,用户登录失败事件必须重复出现至少 5 次。

0x04 新字词

0x05 ES|QL

https://www.elastic.co/guide/en/elasticsearch/reference/8.15/esql-getting-started.html

ES|QL (Elasticsearch Query Language) 是一种新型的基于管道 (piped) 的查询语言,这种语言旨在简化对数据的查询、转换、聚合和可视化流程。它是一种高级查询语言,提供了比传统 Elasticsearch Query DSL 更加直观和功能丰富的写法,专用于复杂数据查询和探索。

  1. 核心功能

  • 查询数据:通过指定数据源 (from <index>) 快速查询所需数据。

  • 数据转换:支持基于字段的动态计算 (eval)、数据过滤 (where)、储存或处理表操作。

  • 数据聚合:支持统计、分组、排序等操作,类似 SQL 的操作方式。

  • 数据集关联:可以通过 enrich 功能在不同数据表之间建立关联,丰富数据内容。

  • 可视化:与 Elasticsearch 内的可视化工具(如 Kibana)高度集成,有助于用户对查询结果进行直观展示。

ES|QL 的语法采用管道 (|) 表示连续步骤,结合了 SQL 的部分结构化语言风格,适合数据流水线的逐步处理。示例如下:

from logs-*
| stats avg_bytes = avg(bytes) by user.name
| eval avg_kb = avg_bytes / 1024
| sort avg_kb desc
| limit 5

logs-* 索引中查询数据。然后计算每位用户的数据传输平均值并转换为 KB。最后按降序排列并仅显示前 5 条记录。

任何一条ES|QL语句都是从from这种源命令开始, 这种命令将es中的文档转成一个表格,表格的每一行都代表一个文档,如下

后续在针对这个表格通过管道符方式进行操作。示例如下:

from logs-siem.dingding.events-app-2025.03 | LIMIT  3  # 只获取表格的前三行

FROM logs-siem.dingding.events-app-2025.03 | SORT @timestamp DESC  # 安装指定的指定进行排序


FROM logs-siem.dingding.events-app-2025.03
| WHERE file.size > 5000000

链式命令处理

可以链式编写命令,并由管道字符|隔开。每个处理命令都可以在上一个命令的输出表上工作。查询的结果是最终处理命令产生的表。示例如下

FROM sample_data
| SORT @timestamp DESC
| LIMIT 3

eval 命令

主要用于动态计算字段值创建新字段。它允许我们对现有的数据进行转换、计算,甚至生成衍生的字段,而无需预先在索引映射中定义。

语法如下:

eval new_field = expression

expression:计算表达式,包含常规运算(加减乘除、比较等)、字符串函数、日期时间函数、类型转换函数,以及其他内置函数。

FROM sample_data
| EVAL duration_ms = ROUND(event_duration/1000000.0, 1)

stats 数据汇总

用于对数据进行聚合统计并按指定字段分组。它类似于 SQL 中的 GROUP BY 操作。基本语法如下

STATS [统计函数(field)] AS new_field [BY group_field, ...]
  • 统计函数:对数据字段执行聚合操作,例如 count(), sum(), avg(), max(), min() 等。

  • AS new_field:可以为统计结果指定一个输出字段名。

  • BY group_field:按一个或多个字段进行分组。

示例如下:

  1. 计算事件数量

FROM logs-siem.dingding.events-app-2025.03
| STATS login_count = COUNT() by user.full_name
| sort login_count DESC

user.full_name 分组。统计每位用户的事件数量,生成字段 login_count。最后按登录事件数量降序排序。

ENRICH命令

用于丰富(扩展)数据,通过将主数据集中的记录与外部数据源(索引)连接起来,动态地补充字段信息。这类似于 SQL 中的 JOIN,但更灵活,专门用于基于 Elasticsearch 索引进行跨数据集的关联查询。

语法如下:

ENRICH <target_index> ON <match_field> WITH <additional_fields>
  • <target_index>:需要 enrich 的目标索引。

  • <match_field>:主数据集字段与目标索引字段间的匹配条件。

  • <additional_fields>:从目标索引中提取的额外的字段列表,用于丰富主数据集。

注意:ENRICH 命令依赖 Elasticsearch 中提前配置的 enrich policy。使用如下接口查看现有的 enrich policies

GET /_enrich/policy

{
  "policies": []
}

如果没有enrich policy 需要手动配置

创建enrich policy策略

  • 创建策略

PUT /_enrich/policy/resigning-employee
{
  "match": {
    "indices": ["resigning-employee-*"],
    "match_field": "user.id",
    "enrich_fields": ["status"]
  }
}
  1. indices:指定目标索引(例如 resigning-employee-*)。

  2. match_field:设置匹配字段,例如 client_ip

  3. enrich_fields:设置需要添加到主数据集中的字段。

  • 执行策略生成enrich index

注意:每次向索引之后添加玩数据后都要运行下面的接口,因为Enrich Index 是根据 Enrich Policy 手动执行后一次性生成的,而不是动态同步的

POST /_enrich/policy/resigning-employee/_execute
  • 检查用户角色

用户运行 ENRICH 查询需要具备以下权限:

  • 读取政策索引的权限 (read index)。

  • enrich_user 角色权限:这是 Elasticsearch 内置角色,提供 enrich 相关操作的权限。

GET /_security/user
  • 删除策略

DELETE /_enrich/policy/resigning-employee

{
  "acknowledged": true
}

示例

FROM sample_data
| KEEP @timestamp, client_ip, event_duration 
| EVAL client_ip = TO_STRING(client_ip)  
| ENRICH clientip_policy ON client_ip WITH env
| STATS median_duration = MEDIAN(event_duration) BY env

分解:

KEEP @timestamp, client_ip, event_duration 保留字段 @timestamp, client_ip, 和 event_duration,丢弃其他不相关字段。

EVAL client_ip = TO_STRING(client_ip) 将字段 client_ip 转换为 字符串类型,确保其格式正确以用于后续 ENRICH 关联。

ENRICH clientip_policy ON client_ip WITH env使用主数据集的 client_ip 字段与 clientip_policy 索引中的 client_ip 字段匹配。匹配成功后,从 clientip_policy 提取字段 env

STATS median_duration = MEDIAN(event_duration) BY env 按 env 字段分组,即每个环境(例如 test, production, development 等)形成一个独立分组。在每个分组内,计算字段 event_duration 的中位值,生成新字段 median_duration。

查询结果示例如下:

//sample_data索引
 [
  { "@timestamp": "2023-03-14T10:00:00Z", "client_ip": "192.168.1.1", "event_duration": 200 },
  { "@timestamp": "2023-03-14T10:05:00Z", "client_ip": "192.168.1.1", "event_duration": 300 },
  { "@timestamp": "2023-03-14T10:10:00Z", "client_ip": "192.168.1.2", "event_duration": 150 },
  { "@timestamp": "2023-03-14T10:15:00Z", "client_ip": "192.168.1.3", "event_duration": 400 }
] 
//clientip_policy 索引
[
  { "client_ip": "192.168.1.1", "env": "production" },
  { "client_ip": "192.168.1.2", "env": "staging" },
  { "client_ip": "192.168.1.3", "env": "development" }
]

//查询结果
[
  { "env": "production", "median_duration": 250 },
  { "env": "staging", "median_duration": 150 },
  { "env": "development", "median_duration": 400 }
]

示例场景

检测暴力登录尝试

FROM logs-siem.mailsec.events-app-*
| where event.action == "smtp-login" and event.outcome=="failure"
| STATS fail_count=COUNT() by user.email
| where fail_count>5
| keep user.email, fail_count
  1. logs-siem.mailsec.events-app-* 中筛选出所有登录失败的记录。

  2. 按用户名 (user.email) 分组,统计每个用户的失败尝试次数。

  3. 筛选失败次数大于 5 的用户,称为异常行为。

  4. 返回触发条件的用户名和失败次数。

待离职用户下载文件

注意使用ENRICH的时候,要确定已经创建了resigning-employee策略

FROM logs-siem.dingding.events-app-*
| ENRICH resigning-employee ON user.id WITH status
| where status=="resigning"
| STATS download_count=COUNT() by user.id 
| where download_count>10 
| KEEP user.id, download_count,status
  1. 使用 ENRICH 命令,从名为 resigning-employee enrich policy 索引中补充员工信息。按 user.id 进行匹配,将 enrich policy 中的 status 字段动态附加到主数据集。

  2. 筛选出 status 为 "resigning" 的记录,仅关注即将离职的员工。

  3. 按员工 (user.id) 作为分组字段,对每位员工的日志事件进行统计。对于每个员工,统计其下载次数 (COUNT()),生成一个新字段download_count,表示员工下载文件的总次数。

  4. 将文件下载次数超过 10 次的离职员工记录筛选出来。这些员工可能存在异常数据访问行为。

  5. 仅保留用户 ID (user.id)、文件下载次数 (download_count)、员工状态 (status)。剔除其他无关字段。

检测恶意文件访问

FROM file-access-logs
| WHERE file.name == "config.json" OR file.name == "db_backup.sql"
| STATS access_count = COUNT() BY user.name, file.name
| WHERE access_count > 10
| KEEP user.name, file.name, access_count
  1. 筛选出访问目标文件(如 config.json 或 db_backup.sql)的所有记录。

  2. 按用户名 (user.name) 和文件名 (file.name) 分组。

  3. 统计用户对特定文件的访问次数 (COUNT)。

  4. 筛选访问次数超过 10 的记录,认为是异常行为。

  5. 返回用户名、文件名和访问次数。

结合威胁情报库

FROM network-logs
| ENRICH threat-intel-db ON source.ip WITH threat.level, threat.category
| WHERE threat.level >= 5
| STATS attack_count = COUNT() BY source.ip, threat.category
| KEEP source.ip, threat.category, attack_count
  1. 使用 ENRICH 将主日志的 IP (source.ip) 与威胁情报数据库 (threat-intel-db) 动态关联。

  2. 提取威胁等级 (threat.level) 和威胁类别 (threat.category)。

  3. 筛选高威胁等级的记录(threat.level >= 5)。

  4. 按来源 IP 和威胁类别分组,统计攻击次数。

0x06 ES|QL和EQL的区别

特性

EQL (Event Query Language)

ES|QL (Elasticsearch Query Language)

设计目标

安全分析和威胁检测

通用的数据查询和处理

应用场景

SIEM、EDR 等安全分析平台

Kibana、Elasticsearch API 等多种 Elasticsearch 相关工具和平台

核心特性

序列检测、时间窗口、关联分析、模式匹配

SQL-like 语法、管道操作、丰富的函数库、与 Elasticsearch 集成

示例

sequence with maxspan=1h [process where event.type  "start" and process.name  "powershell.exe"] [network where process.name  "powershell.exe" and destination.port  443]
FROM logs-siem.dingding.events-app-*
| ENRICH resigning-employee ON user.id WITH status
| where status=="resigning"
| STATS download_count=COUNT() by user.id 
| where download_count>10 
| KEEP user.id, download_count,status


评论