引言:

而对于企业安全运营而言,单纯的资产发现已无法满足风险管理需求。
总的来说,典型挑战体现在三个维度:
- 威胁情报融合:需建立多源异构数据的关联分析机制
- 风险量化评估:构建包含CVSS评分、业务影响因子、修复优先级的动态评估模型
- 防御体系映射:通过攻击链逆向推导实现纵深防御体系的闭环验证
这篇文章所提出的解决方案,是基于AI大模型(以DeepSeek为例)构建智能信息处理引擎,结合Neo4j图数据库实现威胁情报的知识图谱化构建。主要分为两个层面的处理机制:
• 前端:通过AI大模型的上下文感知能力,自动提取与资产属性标签,根据专业经验预测可行的APT攻击模式特征
• 后端:基于Neo4j的图遍历算法生成攻击面拓扑视图,从而支持攻击路径模拟与防御策略推演。
接下来,我们逐步阐明思路和方法。
【注意】
本文所述技术方案均为网络安全领域的学术探讨,旨在促进防御体系的技术演进,严禁任何形式的非法利用。作者及研究团队:
3. 不公开任何可能降低攻击门槛的模型细节(如exploit生成模块的奖励函数设计)相关技术实施应严格遵守《网络安全法》《数据安全法》及所在国法律法规,建议在隔离测试环境中验证学术猜想。
4. 技术锋芒的指向应是加固系统而非突破防线——这是所有安全研究者不可逾越的伦理基线。
传统资产测绘的痛点
紧接上文,传统的资产测绘方法面临着诸多挑战,这些挑战严重影响了安全团队的工作效率和效果。具体来说,主要痛点包括:
- 多源数据孤岛难以形成攻击面全景视图:通过对企业内外部的资产进行勘测,多类工具、多种手段往往带来繁荣的数据;另一方面,企业内部通常存在多个不同的数据源,如网络设备日志、应用日志、安全设备日志等。以上的数据往往分散在不同的系统中,形成了数据孤岛,使得安全团队难以整合这些数据,构建一个全面的攻击面视图。
- 人工分析效率低下:在渗透测试项目中,安全团队需要处理大量的资产节点,人工分析不仅耗时费力,还容易出现遗漏和错误,降低了整体工作效率。
本文希望达成的测绘体系的核心价值
为了解决上述痛点,本文探索DeepSeek或AI加持下,帮助红队人员或企业安全团队实现更智能、直观的测绘能力。核心价值主要体现在以下几个方面:
- 红队作战视角:
- 攻击路径预构建:利用知识图谱技术,测绘体系能够自动推导攻击链可能性。这有助于红队提前识别潜在的攻击路径,制定更为有效的防御策略。
- 目标优先级判定:基于动态权重算法,识别出最关键的突破点。这种优先级判定机制使得红队能够集中资源和精力,对最具威胁的目标进行重点防护。
- 成果可视化呈现:系统自动生成符合MITRE ATT&CK框架的战术路径图,直观展示攻击路径和关键节点。这不仅提高了分析结果的可读性,还便于红队成员之间的沟通和协作。
- 成本效益分析:
- 扫描耗时降低:通过目标优先级划分与筛选,动态频率调整和协议识别技术,在保证扫描质量的前提下,大幅减少扫描时间。这对于快速响应和处置安全事件具有重要意义。
- 漏洞验证效率提升:借助AI辅助PoC生成技术,系统能够自动化生成漏洞验证代码,助力漏洞验证的效果提升。
- 报告编制辅助:通过资产测绘知识图谱的提供,使用代码或原生指令,快速生成详细的渗透测试逻辑路径,减少人工编写工作。
graph TD A[探测层] -->|存活资产| B(知识图谱构建) A -->|流量特征| C(动态资产捕获) B --> D{风险计算} C --> D D --> E[可视化平台]
在下面的实验环境中,环境:Python 3.11 + Neo4j 5.26.1
步骤一:资产扫描
简单来说,调用nmap库扫描IP资产即可,不管是直接运行nmap还是相关工具或代码,建议都将命令行切到root权限。
# nmap扫描模块 def nmap_scan(target): nm = nmap.PortScanner() # 采用混合扫描策略(SYN+ACK+UDP) nm.scan(hosts=target, arguments='-sS -sU -T4 --min-rate 100') assets = [] for host in nm.all_hosts(): asset = { "ip": host, "mac": nm[host]['addresses'].get('mac', 'Unknown'), "os": nm[host].get('osmatch', [{}])[0].get('name', 'Unknown'), "ports": [], "last_seen": datetime.now().isoformat() } for proto in nm[host].all_protocols(): ports = nm[host][proto].keys() for port in ports: service = nm[host][proto][port].get('name', 'unknown') asset["ports"].append({ "port": port, "protocol": proto, "service": service, "version": nm[host][proto][port].get('version', '') }) assets.append(asset) return assets
扫描结果:
当然,这里扫描的范围后续根据需要可以扩展到IPv6、域名、主域名等,需要类型判断和处理,这里简单提供示例代码:
import re import ipaddress from idna import encode as idna_encode from typing import Dict, Union classInvalidTargetException(ValueError): """用于标识非法目标格式 """ def__init__(self, message: str): super().__init__(f"Invalid target format: {message}") defvalidate_ipv4(addr: str) -> bool: """ 验证IPv4地址合法性(支持CIDR) """ try: ipaddress.IPv4Network(addr, strict=False) returnTrue except ValueError: returnFalse defvalidate_ipv6(addr: str) -> bool: """ 验证IPv6地址合法性(支持缩写格式) """ try: ipaddress.IPv6Network(addr, strict=False) returnTrue except ValueError: returnFalse defvalidate_domain(domain: str) -> bool: """ 验证域名合法性(支持国际化域名IDNA) 遵循RFC 5891规范: - 单个标签长度1~63字符 - 总长度不超过253字符 - 允许字母、数字、连字符(不能以连字符开头/结尾) """ try: # 转换为Punycode格式进行规范验证 encoded = idna_encode(domain).decode('ascii') except UnicodeError: returnFalse pattern = r"^([a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+[a-z0-9]{2,63}$" return re.fullmatch(pattern, encoded) isnotNone deftarget_adapter(target: str) -> Dict[str, Union[str, None]]: """ param target: 输入目标(IPv4/IPv6/域名) """ # 去除首尾空白及端口号(如果存在) target = target.strip().split(":", 1)[0] # 类型判断逻辑 if validate_ipv4(target): return { "type": "ipv4", "value": str(ipaddress.IPv4Network(target, strict=False).network_address) } elif validate_ipv6(target): # IPv6地址规范化(扩展缩写形式) normalized = ipaddress.IPv6Address(target).exploded return { "type": "ipv6", "value": normalized } elif validate_domain(target): # 国际化域名编码转换 try: encoded_domain = idna_encode(target).decode('ascii') return { "type": "domain", "value": encoded_domain.lower() # 域名统一小写处理 } except UnicodeError as e: raise InvalidTargetException(f"IDNA encoding failed: {e}") else: raise InvalidTargetException(f"Unsupported target type: {target}") # 测试用例 if __name__ == "__main__": test_cases = [ "192.168.1.1", "2001:db8::1", "example.com", "例子.中国", # 国际化域名 "invalid..domain", "123.456.789.000" ] for case in test_cases: try: print(f"Input: {case}\nOutput: {target_adapter(case)}\n") except InvalidTargetException as e: print(f"Input: {case}\nError: {e}\n")
步骤二:基于DeepSeek/AI做资产风险识别
基于步骤一结果,这边我们基于DeepSeek/AI做资产风险识别,该部分也是本文的核心。
那本部分的核心,就是prompt的设置了,规定数据的输入和输出格式,以便AI大模型更好的进行数据分析。这部分我写的细一些,把思考的过程展示一下。
初始的时候,只考虑了对资产进行分析和风险估算,因此主要内容为:
# 构建结构化提示词 prompt = f"""作为网络安全分析师,请基于以下扫描数据进行深度分析: ## 资产特征 - IP地址:{asset_data['ip']} - 操作系统:{asset_data['os']} - 开放端口数:{len(asset_data['ports'])} - 暴露服务:{', '.join(set(p['service'] for p in asset_data['ports']))} - 最后活动时间:{asset_data['last_seen']} ## 分析要求 1. 漏洞风险评估(CVSS评分+漏洞类型分类) 2. 建议的ATT&CK攻击路径(包含战术编号) 3. 资产关键性评分(1-10分制,需说明评分依据)"""
而在AI大模型输出结果后,发现格式不一,因为只给了分析要求,没给结果模板。于是通过反复调试和对接(过程中步骤三已经在尝试了),最后的prompt为:
# 结构化输出模板(强制API返回格式) ANALYSIS_TEMPLATE = """请严格按照以下Markdown模板生成分析报告: ### 网络安全分析报告 #### 一、漏洞风险评估 {% for vuln in vulnerabilities %} **{{ loop.index }}. {{ vuln.service }} (端口{{ vuln.port }})** - 风险等级: {{ vuln.risk_level }} - CVSS评分: {{ vuln.cvss }} ({{ vuln.vector }}) - 漏洞类型: {{ vuln.types|join('、') }} {% endfor %} #### 二、建议的ATT&CK攻击路径 {% for tactic in tactics %} **战术编号**: {{ tactic.id }} - {{ tactic.name }} - 描述: {{ tactic.description }} {% endfor %} #### 三、资产关键性评分 **评分**: {{ criticality }}/10 **评分依据**: {% for reason in criticality_reasons %} {{ loop.index }}. {{ reason }} {% endfor %}""" prompt = f"""作为网络安全分析师,请严格按模板分析以下数据: ### 输入数据 - IP:{asset_data['ip']} - 操作系统:{asset_data['os']} - 开放端口:{', '.join(f"{p['port']}/{p['protocol']}({p['service']})"for p in asset_data['ports'])} - 最后活动时间:{asset_data['last_seen']} ### 分析要求 {ANALYSIS_TEMPLATE}"""
调用DeepSeek的api,进行分析:
# DeepSeek连接 defdeepseek_analysis(asset_data) -> str: # 初始化官方SDK客户端 client = OpenAI( api_key=DEEPSEEK_API_KEY, base_url="https://api.deepseek.com/v1", timeout=30 ) # 构建结构化提示词 prompt = f"""作为网络安全分析师,请基于以下扫描数据进行深度分析: ## 资产特征 - IP地址:{asset_data['ip']} - 操作系统:{asset_data['os']} - 开放端口数:{len(asset_data['ports'])} - 暴露服务:{', '.join(set(p['service'] for p in asset_data['ports']))} - 最后活动时间:{asset_data['last_seen']} ## 分析要求 1. 漏洞风险评估(CVSS评分+漏洞类型分类) 2. 建议的ATT&CK攻击路径(包含战术编号) 3. 资产关键性评分(1-10分制,需说明评分依据)""" #print(prompt) try: response = client.chat.completions.create( model="deepseek-chat", messages=[ {"role": "system", "content": "你是一名经验丰富的网络安全分析师,擅长漏洞挖掘和攻击路径分析"}, {"role": "user", "content": prompt} ], temperature=0.2, max_tokens=1500, stream=False ) # 增强响应解析健壮性 if response.choices and response.choices[0].message: return response.choices[0].message.content else: raise ValueError("Empty response from API") except Exception as e: # 异常处理 error_msg = f"DeepSeek API调用失败: {str(e)}" print(error_msg) return error_msg
余额不足,报错:
暂时无法充值:
那么,这里转而使用腾讯混元大模型的api,用的是OpenAI的SDK:
defhunyuan_analysis(asset_data) -> str: # 初始化OpenAI兼容客户端 client = OpenAI( api_key=HUNYUAN_API_KEY, base_url="https://api.hunyuan.cloud.tencent.com/v1", # 官方API端点 timeout=30# 网络超时保护 ) # 结构化提示词 prompt = f"""作为网络安全分析师,请基于以下扫描数据进行深度分析: ## 资产特征 - IP地址:{asset_data['ip']} - 操作系统:{asset_data['os']} - 开放端口数:{len(asset_data['ports'])} - 暴露服务:{', '.join(set(p['service'] for p in asset_data['ports']))} - 最后活动时间:{asset_data['last_seen']} ## 分析要求 1. 漏洞风险评估(CVSS评分+漏洞类型分类) 2. 建议的ATT&CK攻击路径(包含战术编号) 3. 资产关键性评分(1-10分制,需说明评分依据)""" try: response = client.chat.completions.create( model="hunyuan-pro", # 付费模型(免费可用hunyuan-lite[1](@ref)) messages=[ {"role": "system", "content": "你是一名经验丰富的网络安全分析师,擅长漏洞挖掘和攻击路径分析"}, {"role": "user", "content": prompt} ], temperature=0.3, # 平衡严谨性与创造性 max_tokens=1200, extra_body={ # 腾讯增强参数 "enable_enhancement": True, # 启用知识检索增强 "enable_speed_search": False# 关闭极速模式保证准确性 } ) # 增强型响应解析 if response.choices and response.choices[0].message: return response.choices[0].message.content else: raise ValueError("API返回空响应") except Exception as e:
在使用最初的prompt时,得到如下响应:
分析报告:### 网络安全分析报告 #### 一、漏洞风险评估 **1.1 MySQL (开放端口)** - **风险等级**: 高 - **CVSS评分**: 8.0 (CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:H) - **漏洞类型**: 未授权访问、SQL注入 **1.2 HTTPS (开放端口)** - **风险等级**: 中 - **CVSS评分**: 6.5 (CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:C/C:L/I:L/A:N) - **漏洞类型**: 中间人攻击(MITM)、弱加密 **1.3 HTTP (开放端口)** - **风险等级**: 高 - **CVSS评分**: 7.5 (CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:N) - **漏洞类型**: 跨站脚本攻击(XSS)、跨站请求伪造(CSRF) **1.4 Intermapper (开放端口)** - **风险等级**: 低 - **CVSS评分**: 4.0 (CVSS:3.0/AV:N/AC:M/PR:N/UI:N/S:U/C:N/I:N/A:N) - **漏洞类型**: 信息泄露 **1.5 HTTP-Proxy (开放端口)** - **风险等级**: 中 - **CVSS评分**: 6.5 (CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:C/C:L/I:L/A:N) - **漏洞类型**: 代理绕过、未授权访问 **1.6 FTP (开放端口)** - **风险等级**: 高 - **CVSS评分**: 8.5 (CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:H) - **漏洞类型**: 未授权访问、弱密码 #### 二、建议的ATT&CK攻击路径 **T1190 - 利用目标共享** - 攻击者通过未授权访问FTP服务获取敏感文件。 **T1203 - Exploit Public-Facing Application** - 利用HTTP/HTTPS服务的漏洞(如XSS、CSRF)进行攻击。 **T1192 - 利用目标协议漏洞** - 利用Intermapper或HTTP-Proxy的协议漏洞进行信息收集或未授权访问。 **T1206 - Exploit SQL Database** - 通过MySQL的未授权访问或SQL注入漏洞获取数据库中的敏感数据。 **T1021.002 - 远程桌面协议(RDP)** - 虽然未直接提及RDP,但攻击者可能利用其他服务的漏洞获取RDP访问权限。 #### 三、资产关键性评分 **评分**: 8分 **评分依据**: 1. **开放端口多**: 共有6个开放端口,提供了多个潜在的攻击面。 2. **高风险服务多**: 包括MySQL、HTTP、FTP等高风险服务,这些服务历史上常见多种漏洞。 3. **敏感服务暴露**: HTTPS和HTTP服务的暴露增加了数据传输过程中的风险。 4. **未知操作系统**: 操作系统的不确定性增加了评估难度,可能隐藏更多未知风险。 5. **最后活动时间较近**: 资产在2025年仍有活动,表明其仍在使用中,攻击价值较高。 ### 总结 该资产存在多个高风险漏洞,建议立即进行详细的漏洞扫描和渗透测试,并采取相应的安全措施,如修补已知漏洞、加强访问控制、定期更新系统和软件等。同时,建议对资产的关键性进行持续监控和评估,确保网络安全。 sh-3.2#
获得响应后,对markdown文档进行解析,解析代码为:
def_parse_hunyuan_response(text: str) -> Dict: """解析结构化响应""" # 漏洞解析 vuln_pattern = ( r"\*\*(\d+)\.\s*(.+?)\s*$端口(\d+)$\*\*\s*\n"# 允许空格和换行差异 r"-+\s*风险等级\s*[:-]+\s*(\S+)\s*\n" # 兼容中英文冒号 r"-+\s*CVSS评分\s*[:-]+\s*([\d.]+)\s*$([^)]+)$\s*\n" r"-+\s*漏洞类型\s*[:-]+\s*(.+?)\s*(\n|$)" ) print("[DEBUG] 原始分析报告:\n", text) matches = re.findall(vuln_pattern, text) print("[DEBUG] 匹配到的漏洞条目:", matches) vulnerabilities = [ { "service": match[1], "port": int(match[2]), "risk_level": match[3], "cvss": float(match[4]), "vector": match[5], "types": match[6].split("、") } for match in re.findall(vuln_pattern, text) ] # 攻击路径解析 tactic_pattern = r"\*\*战术编号\*\*: (T\d+) - (.+?)\n- 描述: (.+)" tactics = [ { "id": match[0], "name": match[1], "description": match[2] } for match in re.findall(tactic_pattern, text) ] # 关键性评分解析 criticality_match = re.search(r"评分: (\d+)/10", text) criticality = int(criticality_match.group(1)) if criticality_match else5 return { "vulnerabilities": vulnerabilities, "tactics": tactics, "criticality": criticality }
最终得到结构化的分析数据:
{'vulnerabilities': [], 'tactics': [{'id': 'T1190', 'name': '利用FTP', 'description': '攻击者通过未授权访问或弱密码获取FTP服务器的控制权,进而进行数据窃取或上传恶意文件。'}, {'id': 'T1192', 'name': '利用HTTP', 'description': '攻击者通过SQL注入、XSS等漏洞获取敏感信息或执行恶意代码。'}, {'id': 'T1195', 'name': '利用HTTPS', 'description': '攻击者通过中间人攻击或证书伪造获取加密数据的明文内容。'}, {'id': 'T1203', 'name': '利用MySQL', 'description': '攻击者通过未授权访问或SQL注入获取数据库中的敏感信息。'}, {'id': 'T1190', 'name': '利用Intermapper', 'description': '攻击者通过信息泄露获取系统配置或网络拓扑信息,为进一步攻击做准备。'}], 'criticality': 5}
步骤三:数据存储与展示
当我们已经通过AI大模型得到分析后,即可根据现有结构化数据,做数据输入。
这里简单介绍Neo4j的使用,如果在本地实验,下载Neo4j Desktop,起一个本地数据库即可:
检查Neo4j连通性:
import nmap import requests from neo4j import GraphDatabase from datetime import datetime # 配置参数 #DEEPSEEK_API_KEY = "your_deepseek_api_key" NEO4J_URI = "bolt://localhost:7687" NEO4J_USER = "neo4j" NEO4J_PASSWORD = "your_password" # 测试neo4j连通性 deftest_neo4j_connection(): try: driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) with driver.session() as session: # 版本监测 version_data = session.execute_read( lambda tx: list(tx.run("CALL dbms.components() YIELD versions RETURN versions[0] AS version")) ) node_data = session.execute_read( lambda tx: list(tx.run("MATCH (n) RETURN count(n) AS count")) ) # 分析数据 db_version = version_data[0]["version"] node_count = node_data[0]["count"] print("\n=== Neo4j连接状态 ===") print(f"连接成功!服务状态:正常") print(f"数据库版本: {db_version}") print(f"当前节点总数: {node_count}") print("====================") except Exception as e: print(f"\n!!! 连接异常: {str(e)}") if"Unable to connect"in str(e): print("⇒ 检查Neo4j服务是否启动(neo4j.bat console)") elif"authentication failure"in str(e): print("⇒ 密码错误,尝试重置: ALTER USER neo4j SET PASSWORD 'newpassword'") test_neo4j_connection()
正常情况下,输出结果为:
接着我们只需要设计节点类型和关系类型,目前根据我们有的数据,可以创建ip(资产)、端口服务、漏洞、攻击策略等类型节点,响应就可创建“ip拥有端口服务”、“端口服务存在漏洞”和“针对该ip有哪些攻击策略”的关系。
坦言之,我好久没碰Neo4j和正则了,上文的代码和下面的代码我整了两天。。
我有点没眼看了,少侠们根据我的注释即可了解代码逻辑。
classThreatIntelligenceGraph: def__init__(self): self.driver = GraphDatabase.driver( NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD), encrypted=False, max_connection_pool_size=20 ) self._clean_legacy_indexes() self._init_constraints() def_clean_legacy_indexes(self): """清理与约束冲突的遗留索引""" with self.driver.session() as session: # 合规查询 result = session.run(""" SHOW INDEXES YIELD name, type, labelsOrTypes, properties WHERE labelsOrTypes IN ['Asset', 'Port', 'Vulnerability'] AND ( properties = ['ip'] OR properties = ['number', 'protocol'] OR properties = ['cve_id'] ) """) for record in result: print(f"删除遗留索引: {record['name']}") session.run(f"DROP INDEX {record['name']}") def_init_constraints(self): """创建带名称的约束""" with self.driver.session() as session: session.run(""" CREATE CONSTRAINT asset_ip_unique IF NOT EXISTS FOR (a:Asset) REQUIRE a.ip IS UNIQUE """) session.run(""" CREATE CONSTRAINT port_identifier IF NOT EXISTS FOR (p:Port) REQUIRE (p.number, p.protocol) IS UNIQUE """) session.run(""" CREATE CONSTRAINT vuln_cve_unique IF NOT EXISTS FOR (v:Vulnerability) REQUIRE v.cve_id IS UNIQUE """) session.run(""" CREATE CONSTRAINT tactic_id_unique IF NOT EXISTS FOR (t:AttackTactic) REQUIRE t.id IS UNIQUE """) defstore_analysis(self, asset: Dict, analysis: Dict): """全量存储入口""" with self.driver.session() as session: # 资产节点 session.execute_write( self._merge_asset, asset["ip"], asset["os"], analysis["criticality"], asset["last_seen"] ) # 端口及漏洞 for port in asset["ports"]: session.execute_write( self._link_port, asset["ip"], port["port"], port["protocol"], port["service"] ) for vuln in analysis.get("vulnerabilities", []): if vuln["port"] == port["port"]: session.execute_write( self._link_vulnerability, port["port"], port["protocol"], vuln ) for tactic in analysis.get("tactics", []): session.execute_write( self._link_tactic, asset['ip'], tactic ) # 攻击路径 for tactic in analysis.get("tactics", []): session.execute_write( self._link_attack_tactic, tactic, [v["types"] for v in analysis["vulnerabilities"]] ) @staticmethod def_merge_asset(tx, ip: str, os: str, criticality: int, last_seen: str): tx.run(""" MERGE (a:Asset {ip: $ip}) SET a.os = $os, a.criticality = $criticality, a.last_seen = datetime($last_seen), a.updated = datetime() """, ip=ip, os=os, criticality=criticality, last_seen=last_seen) @staticmethod def_link_port(tx, ip: str, port: int, protocol: str, service: str): tx.run(""" MATCH (a:Asset {ip: $ip}) MERGE (p:Port {number: $port, protocol: $protocol}) SET p.service = $service MERGE (a)-[r:HAS_PORT]->(p) """, ip=ip, port=port, protocol=protocol, service=service) @staticmethod def_link_vulnerability(tx, port: int, protocol: str, vuln: Dict): """漏洞关系创建""" try: # 设置ID cve_id = f"DFYX-VUL-{datetime.now().year}-{port}-{protocol.upper()}" tx.run(""" // 确保Port存在 MERGE (p:Port {number: $port, protocol: $protocol}) // 创建或更新漏洞 MERGE (v:Vulnerability {cve_id: $cve_id}) ON CREATE SET v.risk_level = $risk, v.cvss_score = $cvss, v.vector = $vector, v.types = $types ON MATCH SET v.last_updated = datetime() // 创建关系 MERGE (p)-[r:HAS_VULNERABILITY]->(v) """, port=port, protocol=protocol, cve_id=cve_id, risk=vuln["risk_level"], cvss=vuln["cvss"], vector=vuln["vector"], types=vuln["types"]) print(f"成功创建漏洞关系: {port}/{protocol} → {cve_id}") except Exception as e: print(f"关系创建失败: {str(e)}") raise @staticmethod def_link_tactic(tx, ip: int, tactic: Dict): """漏洞关系创建""" try: tx.run(""" // 确保ip存在 MERGE (a:Asset {ip: $ip}) // 创建或更新策略 MERGE (t:AttackTactic {id: $id}) ON CREATE SET t.name = $name, t.description = $description ON MATCH SET t.last_updated = datetime() // 创建关系 MERGE (a)-[r:HAS_TACTIC]->(t) """, ip=ip, id=tactic["id"], name=tactic["name"], description=tactic["description"] ) #print("成功创建TACTIC关系") except Exception as e: print(f"关系创建失败: {str(e)}") raise # 正则表达式 def_parse_hunyuan_response(text: str) -> Dict: # 漏洞解析 vuln_pattern = r"\*\*(\d+)\. (.+?) $端口(\d+)$\*\*\n- 风险等级: (.+?)\n- CVSS评分: (.+?) $(.+?)$\n- 漏洞类型: (.+)" vulnerabilities = [ { "service": match[1], "port": int(match[2]), "risk_level": match[3], "cvss": float(match[4]), "vector": match[5], "types": match[6].split("、") } for match in re.findall(vuln_pattern, text) ] @staticmethod def_link_attack_tactic(tx, tactic: Dict, vuln_types: List[List[str]]): tx.run(""" MERGE (t:AttackTactic {id: $id}) SET t.name = $name, t.description = $desc, t.last_observed = datetime() WITH t UNWIND $vuln_types AS types MATCH (v:Vulnerability) WHERE ANY(type IN v.types WHERE type IN types) MERGE (t)-[r:EXPLOITS]->(v) """, id=tactic["id"], name=tactic["name"], desc=tactic["description"], vuln_types=vuln_types)
效果咋样呢,请看:
从图中可以清晰查看当前ip开放了什么端口,ip有哪些攻击策略,因为此时没有漏洞数据所以没有出现漏洞类型节点,另外ip和端口的关系、ip和攻击策略之间的关系也很清晰,所有关系、节点都可以查看详细的属性值。
当然,攻击策略和端口关联会更好,我实在不想去调整prompt和正则了,太痛苦了。
另外,当前模型未建立”攻击战术→漏洞类型”的关系,后续可以补充CYPHER语句:
MATCH (t:AttackTactic), (v:Vulnerability) WHEREANY(type IN v.types WHERE type IN t.required_conditions) MERGE (t)-[r:EXPLOITS]->(v)
现有criticality评分未体现在节点属性,也可以在Neo4j中设置节点大小映射:
:paramcriticality_scale: {8: 2.0, 5: 1.5, 3: 1.0} MATCH (a:Asset) SET a.size = $criticality_scale[a.criticality]
步骤四:整体连通
那么整体效果咋样呢,拭目以待,我们清除一下测试数据,正式扫描个C端看看:
最终代码为:
import nmap import requests from neo4j import GraphDatabase from datetime import datetime from openai import OpenAI import os import re from typing import Dict, Any, List # 配置参数(建议使用环境变量) HUNYUAN_API_KEY = "API_KEY_value" NEO4J_URI = "bolt://localhost:7687" NEO4J_USER = "neo4j" NEO4J_PASSWORD = "PASSWORD_value" # nmap扫描模块 def nmap_scan(target): nm = nmap.PortScanner() # 采用混合扫描策略(SYN+ACK+UDP) nm.scan(hosts=target, arguments='-sS -sU -T4 --min-rate 100') assets = [] for host in nm.all_hosts(): asset = { "ip": host, "mac": nm[host]['addresses'].get('mac', 'Unknown'), "os": nm[host].get('osmatch', [{}])[0].get('name', 'Unknown'), "ports": [], "last_seen": datetime.now().isoformat() } for proto in nm[host].all_protocols(): ports = nm[host][proto].keys() for port in ports: service = nm[host][proto][port].get('name', 'unknown') asset["ports"].append({ "port": port, "protocol": proto, "service": service, "version": nm[host][proto][port].get('version', '') }) assets.append(asset) return assets # 结构化输出模板(强制API返回格式) ANALYSIS_TEMPLATE = """请严格按照以下Markdown模板生成分析报告: ### 网络安全分析报告 #### 一、漏洞风险评估 {% for vuln in vulnerabilities %} **{{ loop.index }}. {{ vuln.service }} (端口{{ vuln.port }})** - 风险等级: {{ vuln.risk_level }} - CVSS评分: {{ vuln.cvss }} ({{ vuln.vector }}) - 漏洞类型: {{ vuln.types|join('、') }} {% endfor %} #### 二、建议的ATT&CK攻击路径 {% for tactic in tactics %} **战术编号**: {{ tactic.id }} - {{ tactic.name }} - 描述: {{ tactic.description }} {% endfor %} #### 三、资产关键性评分 **评分**: {{ criticality }}/10 **评分依据**: {% for reason in criticality_reasons %} {{ loop.index }}. {{ reason }} {% endfor %}""" # 腾讯混元API增强调用 def hunyuan_analysis(asset_data: Dict) -> Dict: """结构化漏洞分析""" client = OpenAI( api_key=HUNYUAN_API_KEY, base_url="https://api.hunyuan.cloud.tencent.com/v1", timeout=30 ) # 强制结构化提示词 prompt = f"""作为网络安全分析师,请严格按模板分析以下数据: ### 输入数据 - IP:{asset_data['ip']} - 操作系统:{asset_data['os']} - 开放端口:{', '.join(f"{p['port']}/{p['protocol']}({p['service']})" for p in asset_data['ports'])} - 最后活动时间:{asset_data['last_seen']} ### 分析要求 {ANALYSIS_TEMPLATE}""" try: response = client.chat.completions.create( model="hunyuan-pro", messages=[ {"role": "system", "content": "你是一名严格遵循输出格式的网络安全专家"}, {"role": "user", "content": prompt} ], temperature=0.1, # 降低随机性 max_tokens=1500, extra_body={"enable_enhancement": True} ) return _parse_hunyuan_response(response.choices[0].message.content) except Exception as e: return {"error": str(e)} def _parse_hunyuan_response(text: str) -> Dict: """解析结构化响应""" # 漏洞解析 vuln_pattern = ( r"\*\*(\d+)\.\s*(.+?)\s*$端口(\d+)$\*\*\s*\n" # 允许空格和换行差异 r"-+\s*风险等级\s*[:-]+\s*(\S+)\s*\n" # 兼容中英文冒号 r"-+\s*CVSS评分\s*[:-]+\s*([\d.]+)\s*$([^)]+)$\s*\n" r"-+\s*漏洞类型\s*[:-]+\s*(.+?)\s*(\n|$)" ) #print("[DEBUG] 原始分析报告:\n", text) matches = re.findall(vuln_pattern, text) print("[DEBUG] 匹配到的漏洞条目:", matches) vulnerabilities = [ { "service": match[1], "port": int(match[2]), "risk_level": match[3], "cvss": float(match[4]), "vector": match[5], "types": match[6].split("、") } for match in re.findall(vuln_pattern, text) ] # 攻击路径解析 tactic_pattern = r"\*\*战术编号\*\*: (T\d+) - (.+?)\n- 描述: (.+)" tactics = [ { "id": match[0], "name": match[1], "description": match[2], } for match in re.findall(tactic_pattern, text) ] # 关键性评分解析 criticality_match = re.search(r"评分: (\d+)/10", text) criticality = int(criticality_match.group(1)) if criticality_match else 5 return { "vulnerabilities": vulnerabilities, "tactics": tactics, "criticality": criticality } # Neo4j增强存储模块 class ThreatIntelligenceGraph: def __init__(self): self.driver = GraphDatabase.driver( NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD), encrypted=False, max_connection_pool_size=20 ) self._clean_legacy_indexes() self._init_constraints() def _clean_legacy_indexes(self): """清理与约束冲突的遗留索引""" with self.driver.session() as session: # 合规查询 result = session.run(""" SHOW INDEXES YIELD name, type, labelsOrTypes, properties WHERE labelsOrTypes IN ['Asset', 'Port', 'Vulnerability'] AND ( properties = ['ip'] OR properties = ['number', 'protocol'] OR properties = ['cve_id'] ) """) for record in result: print(f"删除遗留索引: {record['name']}") session.run(f"DROP INDEX {record['name']}") def _init_constraints(self): """创建带名称的约束""" with self.driver.session() as session: session.run(""" CREATE CONSTRAINT asset_ip_unique IF NOT EXISTS FOR (a:Asset) REQUIRE a.ip IS UNIQUE """) session.run(""" CREATE CONSTRAINT port_identifier IF NOT EXISTS FOR (p:Port) REQUIRE (p.number, p.protocol) IS UNIQUE """) session.run(""" CREATE CONSTRAINT vuln_cve_unique IF NOT EXISTS FOR (v:Vulnerability) REQUIRE v.cve_id IS UNIQUE """) session.run(""" CREATE CONSTRAINT tactic_id_unique IF NOT EXISTS FOR (t:AttackTactic) REQUIRE t.id IS UNIQUE """) def store_analysis(self, asset: Dict, analysis: Dict): """全量存储入口""" with self.driver.session() as session: # 资产节点 session.execute_write( self._merge_asset, asset["ip"], asset["os"], analysis["criticality"], asset["last_seen"] ) # 端口及漏洞 for port in asset["ports"]: session.execute_write( self._link_port, asset["ip"], port["port"], port["protocol"], port["service"] ) for vuln in analysis.get("vulnerabilities", []): if vuln["port"] == port["port"]: session.execute_write( self._link_vulnerability, port["port"], port["protocol"], vuln ) for tactic in analysis.get("tactics", []): session.execute_write( self._link_tactic, asset['ip'], tactic ) # 攻击路径 for tactic in analysis.get("tactics", []): session.execute_write( self._link_attack_tactic, tactic, [v["types"] for v in analysis["vulnerabilities"]] ) @staticmethod def _merge_asset(tx, ip: str, os: str, criticality: int, last_seen: str): tx.run(""" MERGE (a:Asset {ip: $ip}) SET a.os = $os, a.criticality = $criticality, a.last_seen = datetime($last_seen), a.updated = datetime() """, ip=ip, os=os, criticality=criticality, last_seen=last_seen) @staticmethod def _link_port(tx, ip: str, port: int, protocol: str, service: str): tx.run(""" MATCH (a:Asset {ip: $ip}) MERGE (p:Port {number: $port, protocol: $protocol}) SET p.service = $service MERGE (a)-[r:HAS_PORT]->(p) """, ip=ip, port=port, protocol=protocol, service=service) @staticmethod def _link_vulnerability(tx, port: int, protocol: str, vuln: Dict): """漏洞关系创建""" try: # 生成更合理的ID(加入协议) cve_id = f"DFYX-VUL-{datetime.now().year}-{port}-{protocol.upper()}" tx.run(""" // 确保Port存在 MERGE (p:Port {number: $port, protocol: $protocol}) // 创建或更新漏洞 MERGE (v:Vulnerability {cve_id: $cve_id}) ON CREATE SET v.risk_level = $risk, v.cvss_score = $cvss, v.vector = $vector, v.types = $types ON MATCH SET v.last_updated = datetime() // 创建关系 MERGE (p)-[r:HAS_VULNERABILITY]->(v) """, port=port, protocol=protocol, cve_id=cve_id, risk=vuln["risk_level"], cvss=vuln["cvss"], vector=vuln["vector"], types=vuln["types"]) print(f"成功创建漏洞关系: {port}/{protocol} → {cve_id}") except Exception as e: print(f"关系创建失败: {str(e)}") raise @staticmethod def _link_tactic(tx, ip: int, tactic: Dict): """策略关系创建""" try: tx.run(""" // 确保ip存在 MERGE (a:Asset {ip: $ip}) // 创建或更新策略 MERGE (t:AttackTactic {id: $id}) ON CREATE SET t.name = $name, t.description = $description ON MATCH SET t.last_updated = datetime() // 创建关系 MERGE (a)-[r:HAS_TACTIC]->(t) """, ip=ip, id=tactic["id"], name=tactic["name"], description=tactic["description"] ) #print("成功创建TACTIC关系") except Exception as e: print(f"关系创建失败: {str(e)}") raise # 修正正则表达式 def _parse_hunyuan_response(text: str) -> Dict: # 漏洞解析(修正$符号问题) vuln_pattern = r"\*\*(\d+)\. (.+?) $端口(\d+)$\*\*\n- 风险等级: (.+?)\n- CVSS评分: (.+?) $(.+?)$\n- 漏洞类型: (.+)" vulnerabilities = [ { "service": match[1], "port": int(match[2]), "risk_level": match[3], "cvss": float(match[4]), "vector": match[5], "types": match[6].split("、") } for match in re.findall(vuln_pattern, text) ] @staticmethod def _link_attack_tactic(tx, tactic: Dict, vuln_types: List[List[str]]): tx.run(""" MERGE (t:AttackTactic {id: $id}) SET t.name = $name, t.description = $desc, t.last_observed = datetime() WITH t UNWIND $vuln_types AS types MATCH (v:Vulnerability) WHERE ANY(type IN v.types WHERE type IN types) MERGE (t)-[r:EXPLOITS]->(v) """, id=tactic["id"], name=tactic["name"], desc=tactic["description"], vuln_types=vuln_types) # 主流程 if __name__ == "__main__": # 模拟扫描数据 assets = nmap_scan("xx.xx.xx.xx/24") for asset in assets: # 获取分析结果 analysis_result = hunyuan_analysis(asset) print(analysis_result) if "error" not in analysis_result: # 存储到知识图谱 graph = ThreatIntelligenceGraph() graph.store_analysis(asset, analysis_result) print("数据存储成功!") else: print(f"分析失败:{analysis_result['error']}")
其实这里就是畅想应用场景,以此也能得出后续的针对性补强动作需求。
场景一:基于高危端口清单梳理当前资产暴露面
以22端口、3306端口为例,企业安全管理中一般禁止这样的高危服务端口对公网开放,容易遭受暴力破解、密码泄漏、数据泄露等攻击或损失,因此可以通过筛选条件,直接得到高危服务子图:
场景二:构建动态的渗透攻击序列模型
基于场景一的工作来做也可以,或者按照渗透的思路,比如我现在需要提取所有web资产,因为当前没有漏洞扫描的结果,那就下一步导出结果给nuclei进行扫描:
值得注意的是,具体ip或者端口与Tactic(攻击策略)也有关联,利用策略可以减少无谓的无用功,针对可行方向进行针对性漏洞扫描或者渗透测试,类似于根据指纹匹配对应的PoCs来进行测试。
漏洞扫描完成后,可再次利用AI大模型对识别出的脆弱点进行智能化的攻击路径编排。具体而言,将结合漏洞的CVSS评分、利用可行性、资产权重(如业务系统等级、数据敏感度)及网络拓扑关系,构建动态的渗透攻击序列模型。
通过集成MITRE ATT&CK知识库的攻击模式特征,采用相关机器学习算法对攻击路径进行概率推演,智能生成包含横向移动策略、权限提升路线和载荷注入节点的渗透方案。还可通过prompt指定匹配Metasploit、Cobalt Strike等工具链的攻击模块,预演漏洞组合利用的叠加效应,最终输出包含POC验证脚本、修复优先级建议和应急响应预案的渗透测试报告,实现从脆弱性识别到攻击模拟验证的闭环安全处置流程。
通过一些公开的渗透文章为例,可以有如下应用案例:
实例1:Web应用漏洞链式攻击
漏洞扫描结果发现电商系统存在:
- Apache Struts2 RCE(CVE-2023-XXXX,CVSS 9.8)
- 后台管理弱口令(admin/admin)
- PostgreSQL数据库未授权访问(CVE-2022-4567,CVSS 8.5)
智能化编排过程
- 优先级判定
- 根据CVSS评分和资产权重(电商系统属于核心业务),优先利用Struts2 RCE漏洞
- 结合MITRE ATT&CK T1190(漏洞利用)和T1059(命令执行)生成攻击链
- 攻击路径推演
# 计算攻击成功率 if 漏洞可利用性 > 90% and 目标在DMZ区: 生成Payload: msfvenom反向Shell → 植入Cobalt Strike Beacon if 获取Web服务器权限: 探测内网PostgreSQL数据库(T1046网络发现) 利用弱口令横向移动(T1078合法凭证)
-
<code data-syntax="python" data-theme="default">工具链自动化</code>
-
- Metasploit调用
exploit/multi/http/struts2_code_exec
模块 - 通过数据库未授权访问执行
pg_read_file('/etc/passwd')
提取敏感信息 - 输出POC验证脚本:自动生成Struts2漏洞的HTTP请求包和数据库爆破脚本
- Metasploit调用
-
实例2:内网横向移动渗透
初始入口通过边界打印机服务漏洞(CVE-2022-XXXX,CVSS 7.2)获取内网据点
智能化决策过程
- 网络拓扑分析
- 识别目标为研发部门子网(资产权重高)
- 检测到子网间存在防火墙隔离(仅开放SMB端口)
- 攻击链生成
<code data-syntax="text" data-theme="default">攻击序列: </code>
1. 打印机漏洞获取立足点(Initial Access) 2. 使用PsExec横向移动(T1570 Lateral Tool Transfer) 3. 窃取研发服务器RDP凭据(T1552 Credential Access) 4. 通过Mimikatz注入域控(T1484 Domain Policy Modification)
3.组合漏洞利用
-
- 结合打印机服务的低危漏洞(CVSS 7.2)与域控服务器的Kerberos协议漏洞(CVSS 8.8)
- 自动化生成Kerberoasting攻击脚本
实例3:供应链攻击模拟
漏洞环境某企业OA系统存在:
- 第三方组件漏洞(CVE-2023-XXXX,CVSS 9.0)
- 文件上传功能未鉴权(CVSS 8.0)
- 开发环境与生产环境未隔离
渗透方案生成
- ATT&CK战术映射
- 初始访问(T1195):通过供应链组件漏洞植入WebShell
- 权限提升(T1068):利用文件上传漏洞获取系统权限
- 横向移动(T1210):通过开发环境JumpServer跳板攻击代码仓库
- 自动化工具联动
- 调用Nuclei验证组件漏洞:
nuclei -t cve-2023-5678.yaml
- 使用Chisel建立穿透隧道绕过网络隔离
- 生成修复建议
- 调用Nuclei验证组件漏洞:
值得一提,企业的安全体系建设可以遵循”攻防同步进化,边作战边建设”的原则:在持续完善IT基础设施(如自动化资产发现平台、威胁情报管道)的同时,配套开展安全团队能力建设(包括ATT&CK框架适配训练、红蓝对抗演习机制),在实战化对抗中实现技术优势向防御效能的有效转化。
否则,业内的这项看家本领,还真可能被AI替代。
转自:https://mp.weixin.qq.com/s/gySjWc6T0xAGk0i82W6tVQ