最近,NotSoSecure 有机会探索作为项目一部分的监控和警报系统的工作。在这篇博文中,Anand Tiwari将谈论他在建立这样一个监控和警报系统时所面临的经验和挑战。
2017 年,OWASP 引入了一个新的风险“日志记录和监控不足”,作为其 Web 应用程序风险前 10 名列表三年更新的一部分。虽然不是直接漏洞,但 OWASP 将 Insufficient Logging & Monitoring 列为有效的 Logging & Monitoring 是一项基本防御措施。通过持续监控日志文件来快速检测异常可以帮助公司快速识别和响应攻击,从而潜在地阻止它们。
OWASP 建议:
不管我们是小型组织还是企业级组织,我们必须考虑的一件事是监控对我们的应用程序和网络的攻击。我们需要有一个实时监控系统来保护我们的应用程序,如果有人试图攻击,我们可以识别攻击并阻止它或采取必要的行动。
为了实现这一点,我们需要一个集中式系统,可以持续监控日志,在仪表板中可视化数据,并拥有一个可以通知攻击的通知系统。
在这篇博客中,我们将讨论如何在应用程序前面将 ModSecurity 设置为 Web 应用程序防火墙 (WAF),它将其日志发送到 ELK(Elasticsearch、Logstash、Kibana)堆栈以进行监控,并将 ElastAlert 用于警报。这可用于输入现有的 SIEM(安全事件和事件监控)解决方案,或作为使用开源解决方案的独立主动监控系统。
使用 ModSecurity 和 ELK 的持续监控和警报系统的高级工作流程可以描述如下:
在网络中使用 ModSecurity 和 ELK 进行监控和警报的示意图如下所示:
图中编号实体的工作/角色如下:
现在让我们详细讨论每个实体。
ModSecurity 是一个 WAF(Web 应用程序防火墙),一个开源工具包,它为 Web 应用程序防御者提供对 HTTP 流量的可见性和针对攻击的高级保护。
将 ModSecurity WAF 设置为 Nginx 反向代理的良好参考:
安装后,ModSecurity 将生成一个日志文件,其中包含所有被阻止的请求。基本上有三个日志文件将被配置到 Nginx 和 ModSecurity 配置文件中:
在服务器上遇到错误或任何恶意尝试时会生成错误日志。由于我们已经使用 Nginx 配置了我们的设置,所有错误日志(包括 Nginx 错误)都生成在同一个文件“error.log”中,默认情况下位于以下路径:
/var/log/nginx/
调试日志用于调试目的,对故障排除很有用。它可以通过“modsecurity.conf”文件启用。
审计日志包含有关 ModSecurity 检测到恶意事件时生成的日志的详细信息,并包含有关系统客户端请求的有用信息,包括客户端标头和数据负载,默认情况下未启用,可以通过“modsecurity. conf”配置文件。
在这里,我们将只关注“error.log”并解析这些信息以供我们分析。
让我们了解一下 Filebeat 和 ELK 的作用:
Filebeat – Filebeat 负责将所有日志转发到 Logstash,后者可以进一步将其传递到管道中。它是轻量级的,支持 SSL 和 TLS 加密并且非常可靠。
Logstash – Logstash 是一种用于解析日志并将其发送到 Elasticsearch 的工具。它功能强大,可以创建管道和索引事件或日志。它可以在 Elasticsearch 生态系统中使用。
Elasticsearch – 它是一个高度可扩展的开源分析引擎。它使我们能够快速存储、搜索和分析数据。当我们处理复杂的搜索功能和要求时,它通常很有用。它还能够在 Lucene 标准分析器之上提供一个分布式系统来进行索引。
Kibana – 这是一个与 Elasticsearch 集群交互并可视化 Elasticsearch 数据的 UI 工具。
现在让我们分析日志并了解对创建监控可视化有用的所有参数。
示例攻击错误日志如下图所示。
上面屏幕截图中的每个编号部分解释如下:
/usr/local/owasp-modsecurity-crs/rules/REQUEST-941-APPLICATION-ATTACK-XSS.conf
您可以参考Rohit Salecha写的博文,在您的系统中配置 Filebeat、Elasticsearch、Logstash 和Kibana :
实用 DevOps - 使用 Elasticsearch Logstash Kibana Filebeat 进行持续监控
安装 Filebeat 后,我们需要在 Filebeat 配置文件中提供日志,以便它可以将日志发送到 Logstash。此外,Logstash 会将它们发送到 Elasticsearch。
Filebeat.yml – 配置文件:
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
filebeat.modules:
- module: logstash
log:
enabled: true
filebeat.inputs:
- type: log
enabled: true
paths:
/etc/log/nginx/error.log
output.logstash:
enabled: true
hosts: logstash_server
ssl.enabled: false
Logstash 配置文件采用 JSON 格式,位于“/etc/logstash/conf.d”。配置文件由三个部分组成:输入、过滤器和输出。
我们创建了一个配置文件“beats-input.conf”,然后我们设置了“Filebeat”输入:
beats-input.conf:
input {
beats {
port => 5044
codec => "json"
}
}
Filter {
if [message] =~ "\A\{.+\}\z" {
json {
source => "message"
target => "httpRequest"
}
mutate {
remove_field => [ "json", "message" ]
}
}
mutate {
remove_field => [ "json", "agent" ]
remove_field => [ "json", "tags" ]
remove_field => [ "json", "thread_name" ]
}
}
output {
elasticsearch {
hosts => [“elasticsearch_server:9200”]
manage_template => false
index => "logstash-%{+YYYY.MM.dd}"
}
}
一切设置好后,数据被解析并发送到 Elasticsearch 服务器,该服务器将快速索引和分析数据。接下来是使用 Kibana 设置 Elasticsearch 以进行可视化。
为了从 Elasticsearch 中获取数据,我们首先需要在 Kibana 中创建一个“索引模式”,然后按照如下图所示的步骤操作:
步骤 1:通过在索引模式字段中将索引模式定义为 logstash-* 来创建索引模式。
第2步:接下来,在时间过滤字段中提供@timestamp,这将确保按时间过滤您的数据。
第 3 步:单击“发现”图标以查看您的日志。
您应该会在消息字段中看到反映的所有 WAF 错误日志。
在 Elasticsearch 中输入日志后,我们会将个人信息(如下所述)隔离为索引,以便我们可以在仪表板上可视化所需的信息。
我们需要仪表板上的日志中的以下信息:
当日志从 Logstash 发送到 Elasticsearch 并在 Kibana 中呈现时,数据在“消息”字段中以非结构化方式发送。在这种情况下查询有意义的信息会很麻烦,因为所有日志数据都存储在 onekey 下。日志消息应该组织得更好。因此我们使用 Grok。
Grok 是 Logstash 中的一个过滤器插件,它将非结构化数据解析为结构化和可查询的数据。它使用文本模式来匹配日志文件中的行。
如果您仔细查看原始数据,您会发现它实际上是由不同的部分组成的,每个部分由一个空格分隔。
让我们利用 Logstash Grok 过滤器并使用 Grok 过滤器模式创建结构化数据。Logstash Grok 过滤器带有100 多种内置模式,用于结构化非结构化数据。
由于我们的 modsecurity “error.log” 数据的内置模式没有运气,我们使用名为Grok 调试器的在线工具和一些有用的 Grok 模式构建了自定义 Grok模式。
Grok 支持正则表达式。Grok 使用的正则表达式库是 Oniguruma ,更多细节可以访问 Grok 过滤器插件站点。
Oniguruma的使用正则表达式将让您匹配一段文本并将其保存为字段,语法:
(?the pattern here)
首先让我们使用以下语法从消息数据中过滤时间戳:
(?%{YEAR}[./]%{MONTHNUM}[./]%{MONTHDAY} %{TIME})
现在我们将使用语法 %{GREEDYDATA:field_name} 作为攻击字段来分离我们可以过滤的无格式数据。
GREEDYDATA 表示“.*”。它们根据周围的限制扩展到可能的最多字符。
我们使用 Grok 过滤器 %{IP:client} 过滤了客户端 IP,它基本上会从日志数据中过滤 IP 地址。
以下是上述案例的 Grok 片段,它解释了将未格式化的数据隔离为攻击字段并删除消息字段。
grok {
match => { "message" => "(?%{YEAR}[./]%{MONTHNUM}[./]%{MONTHDAY} %{TIME}) %{GREEDYDATA:attack}, client: %{IP:client}, server: %{GREEDYDATA:server}"}
remove_field => ["message"]
}
grok 的输出——
[error] 34#34: *215 [client 192.168.33.1] ModSecurity: Access denied with code 403 (phase 2). detected XSS using libinjection. [file "/usr/local/owasp-modsecurity-crs/rules/REQUEST-941-APPLICATION-ATTACK-XSS.conf"] [line "37"] [id "941100"] [rev ""] [msg "XSS Attack Detected via libinjection"] [data "Matched Data: XSS data found within ARGS:email: ">"] [severity "2"] [ver "OWASP_CRS/3.2.0"] [maturity "0"] [accuracy "0"] [tag "application-multi"] [tag "language-multi"] [tag "platform-multi"] [tag "attack-xss"] [tag "OWASP_CRS"] [tag "OWASP_CRS/WEB_ATTACK/XSS"] [tag "WASCTC/WASC-8"] [tag "WASCTC/WASC-22"] [tag "OWASP_TOP_10/A3"] [tag "OWASP_AppSensor/IE1"] [tag "CAPEC-242"] [hostname "172.18.0.2"] [uri "/login.action"] [unique_id "158625916198.227197"] [ref "v661,27t:utf8toUnicode,t:urlDecodeUni,t:htmlEntityDecode,t:jsDecode,t:cssDecode,t:removeNulls"]
现在我们需要从攻击字段数据中过滤出未格式化的值:
攻击名称:
由于我们没有其他无格式值的 Grok 模式,我们可以使用正则表达式来查找无格式值。下面我们使用正则表达式来查找单个攻击名称。
您可以使用此网站进行在线正则表达式创建、测试和调试 - https://regex101.com/
如下所示在 Grok 调试器中,我们提取路径值,然后从/usr/local/owasp-modsecurity-crs/rules/REQUEST-941-APPLICATION-ATTACK-XSS.conf路径值中剥离名称为REQUEST-941-APPLICATION-ATTACK -XSS
grok {
match => {"attack" => "(?\[file ".+\/(.*?).conf"\])"}
}
grok {
match => {"attack_file" => "(?[A-Z][^.]+)"}
remove_field => ["attack_file"]
}
其他价值观
同样,我们从攻击字段数据中剥离了其他值,并创建了一个包含所有隔离值的完整 Logstash 配置文件。
完整的 Logstash 配置
input
{
beats
{
ssl => false
port => 5000
codec => "json"
}
}
filter {
grok {
match => { "message" => "(?%{YEAR}[./]%{MONTHNUM}[./]%{MONTHDAY} %{TIME}) \[%{LOGLEVEL:severity}\] %{POSINT:pid}#%{NUMBER:threadid}\: \*%{NUMBER:connectionid} %{GREEDYDATA:attack}, client: %{IP:client}, server: %{GREEDYDATA:server}"}
remove_field => ["message"]
}
grok {
match => {"attack" => "(?\[file ".+\/(.*?).conf"\])"}
}
grok {
match => {"attack_file" => "(?[A-Z][^.]+)"}
remove_field => ["attack_file"]
}
grok {
match => {"attack" => "(?\[msg "(.*?)"\])"}
}
grok {
match => {"attack" => "(?\[data "(.*?)"\])"}
}
grok {
match => {"attack" => "(?\[uri "(.*?)"\])"}
remove_field => ["attack"]
}
grok {
match => {"attack_uri" => "(?[/].+")"}
}
if [message] =~ "\A\{.+\}\z" {
json {
source => "message"
target => "httpRequest"
}
mutate {
remove_field => [ "json", "message" ]
}
}
mutate {
remove_field => [ "json", "agent" ]
remove_field => [ "json", "tags" ]
remove_field => [ "json", "thread_name" ]
}
}
output {
elasticsearch {
hosts => ["{{elasticsearch_server}}"]
manage_template => false
index => "logstash-%{+YYYY.MM.dd}"
}
}
如您所见,现在 Elasticsearch 索引中有多个字段,它可以过滤单个值。
攻击仪表板
现在让我们创建一个包含所有攻击计数和模式的仪表板。我们还可以根据我们的要求将其可视化为饼图或任何内容。
这总结了我们如何使用 ModSecurity 框架建立一个持续的安全监控和警报系统,并有额外的微调空间。可以实现数据的可视化,并且可以根据需要定制仪表板。
如果您想亲身体验,我们会在AppSecOps和DevSecOps课程中介绍此类主题以及其他防御策略。如果您想参加这些培训课程,请与我们联系。我们还帮助企业实施 DevSecOps 实践。
参考: