业务逻辑漏洞:越权、遍历与条件竞争
网络安全系列 · 前端与浏览器安全篇
一、什么是业务逻辑漏洞?
业务逻辑漏洞(Business Logic Vulnerability)是指应用程序在实现业务功能时,由于设计或编码缺陷导致的安全问题。与SQL注入、XSS等技术漏洞不同,业务逻辑漏洞不依赖于特定的输入格式或语法错误,而是利用业务流程本身的缺陷进行攻击。
核心特点:
- 难以自动化检测:传统扫描器无法识别业务逻辑问题
- 高度依赖上下文:同样的代码在不同业务场景下风险不同
- 危害直接:往往直接导致数据泄露或财产损失
- 修复成本高:通常需要重新设计业务流程
二、越权访问:身份与权限的边界
2.1 越权访问的类型
水平越权(Horizontal Privilege Escalation)
攻击者访问同级权限的其他用户资源:
# 正常请求
GET /api/orders/12345 HTTP/1.1
Authorization: Bearer token_user_A
# 水平越权 - 修改ID访问其他用户订单
GET /api/orders/12346 HTTP/1.1
Authorization: Bearer token_user_A垂直越权(Vertical Privilege Escalation)
普通用户获得管理员权限:
# 普通用户尝试访问管理接口
POST /api/admin/users/delete HTTP/1.1
Authorization: Bearer token_normal_user
Content-Type: application/json
{"user_id": "12345"}2.2 常见攻击场景
场景一:订单信息泄露
# 漏洞代码示例
@app.route('/order/<order_id>')
def view_order(order_id):
# 只查询订单,不验证归属
order = db.query(f"SELECT * FROM orders WHERE id = {order_id}")
return jsonify(order)
# 修复方案
@app.route('/order/<order_id>')
@login_required
def view_order_fixed(order_id):
user_id = current_user.id
# 同时验证订单归属
order = Order.query.filter_by(
id=order_id,
user_id=user_id # 关键:添加归属验证
).first()
if not order:
return jsonify({"error": "Order not found"}), 404
return jsonify(order.to_dict())场景二:管理员功能未授权访问
// 前端"隐藏"管理按钮(不可靠)
if (user.role !== 'admin') {
hideAdminPanel(); // 仅前端隐藏,接口仍可访问
}
// 后端必须验证权限
app.post('/api/admin/delete-user', (req, res) => {
// 漏洞:未验证管理员身份
const { userId } = req.body;
await User.delete(userId);
});
// 修复:严格的权限中间件
const requireAdmin = (req, res, next) => {
if (req.user.role !== 'admin') {
return res.status(403).json({ error: 'Admin access required' });
}
next();
};
app.post('/api/admin/delete-user', requireAdmin, async (req, res) => {
// 安全:只有管理员能执行
const { userId } = req.body;
await User.delete(userId);
});2.3 防御策略
| 防御层 | 措施 | 实现要点 |
|---|---|---|
| 认证层 | 强制登录验证 | 所有敏感接口必须验证身份 |
| 授权层 | 资源归属校验 | 查询时添加 user_id = ? 条件 |
| 角色层 | RBAC权限模型 | 基于角色的访问控制 |
| 审计层 | 操作日志记录 | 记录谁、何时、做了什么 |
三、遍历攻击:数据的批量获取
3.1 IDOR(不安全的直接对象引用)
IDOR(Insecure Direct Object Reference) 是最常见的遍历漏洞:
# 获取用户资料
GET /api/user/1001/profile
# 遍历ID获取所有用户资料
GET /api/user/1002/profile
GET /api/user/1003/profile
GET /api/user/1004/profile
...3.2 批量遍历技术
顺序遍历
# 使用curl批量遍历
for i in $(seq 1 10000); do
curl -s "https://api.example.com/order/$i" \
-H "Authorization: Bearer $TOKEN" \
| jq '.order_number, .total'
done参数篡改
# 原始请求
POST /api/export HTTP/1.1
Content-Type: application/json
{"user_ids": [1001]}
# 篡改后批量导出
POST /api/export HTTP/1.1
Content-Type: application/json
{"user_ids": [1,2,3,4,5,...,9999]} # 遍历所有用户3.3 防御方案
方案一:使用不可预测标识符
import uuid
import hashlib
# 漏洞:使用自增ID
order_id = 12345 # 可预测
# 修复:使用UUID或加密ID
order_id = str(uuid.uuid4()) # "550e8400-e29b-41d4-a716-446655440000"
# 或加密ID
encrypted_id = hashlib.sha256(f"order_{internal_id}_{secret}".encode()).hexdigest()[:16]方案二:速率限制与异常检测
from flask_limiter import Limiter
limiter = Limiter(app, key_func=get_remote_address)
@app.route('/api/user/<user_id>')
@limiter.limit("10 per minute") # 单IP限流
def get_user(user_id):
# 同时检测异常模式
if detect_scanning_pattern(request):
return jsonify({"error": "Suspicious activity detected"}), 429
# 验证用户是否有权访问此资源
if not authorize_resource_access(current_user, user_id):
return jsonify({"error": "Unauthorized"}), 403
return jsonify(get_user_data(user_id))方案三:批量接口的严格限制
@app.route('/api/bulk-export', methods=['POST'])
def bulk_export():
data = request.get_json()
requested_ids = data.get('user_ids', [])
# 限制批量数量
if len(requested_ids) > 100:
return jsonify({"error": "Max 100 items per request"}), 400
# 验证所有ID的访问权限
authorized_ids = []
for uid in requested_ids:
if authorize_resource_access(current_user, uid):
authorized_ids.append(uid)
# 只返回有权限的数据
return jsonify(export_users(authorized_ids))四、条件竞争:时间窗口的攻击
4.1 什么是条件竞争?
条件竞争(Race Condition)发生在多个操作并发执行时,由于执行顺序不确定导致的安全问题。攻击者利用极短的时间窗口,在系统状态检查与状态变更之间插入恶意操作。
4.2 典型攻击场景
场景一:优惠券重复使用
# 漏洞代码:领取优惠券
@app.route('/api/coupon/claim', methods=['POST'])
def claim_coupon():
coupon_id = request.json.get('coupon_id')
user_id = current_user.id
# 检查是否已领取
if UserCoupon.query.filter_by(user_id=user_id, coupon_id=coupon_id).first():
return jsonify({"error": "Already claimed"}), 400
# 检查优惠券是否还有剩余
coupon = Coupon.query.get(coupon_id)
if coupon.remaining <= 0:
return jsonify({"error": "Coupon exhausted"}), 400
# 创建用户优惠券记录
user_coupon = UserCoupon(user_id=user_id, coupon_id=coupon_id)
db.session.add(user_coupon)
# 减少剩余数量
coupon.remaining -= 1
db.session.commit()
return jsonify({"success": True})攻击方式:并发发送多个请求,利用检查与更新之间的时间差:
# 使用并发工具同时发送多个请求
curl -X POST /api/coupon/claim -d '{"coupon_id": 123}' &
curl -X POST /api/coupon/claim -d '{"coupon_id": 123}' &
curl -X POST /api/coupon/claim -d '{"coupon_id": 123}' &
curl -X POST /api/coupon/claim -d '{"coupon_id": 123}' &
wait场景二:账户余额透支
# 漏洞:转账操作
@app.route('/api/transfer', methods=['POST'])
def transfer():
data = request.json
from_account = Account.query.get(data['from_account'])
amount = data['amount']
# 检查余额
if from_account.balance < amount:
return jsonify({"error": "Insufficient balance"}), 400
# 模拟处理延迟
time.sleep(0.1)
# 扣减余额
from_account.balance -= amount
db.session.commit()
return jsonify({"success": True})攻击方式:同时发起多笔转账,余额检查通过后,多笔操作同时扣减:
# 余额100元,同时发起10笔50元转账
for i in {1..10}; do
curl -X POST /api/transfer -d '{"from_account": 123, "amount": 50}' &
done
wait
# 结果:可能成功转账多笔,余额变为负数4.3 防御方案
方案一:数据库事务与锁
from sqlalchemy import select, update
from sqlalchemy.orm import sessionmaker
# 使用悲观锁(SELECT FOR UPDATE)
@app.route('/api/coupon/claim', methods=['POST'])
def claim_coupon_fixed():
coupon_id = request.json.get('coupon_id')
user_id = current_user.id
try:
# 开启事务,锁定优惠券记录
with db.session.begin():
# SELECT FOR UPDATE 锁定行,防止并发修改
coupon = db.session.execute(
select(Coupon)
.where(Coupon.id == coupon_id)
.with_for_update() # 悲观锁
).scalar_one()
# 检查是否已领取(在事务内再次检查)
existing = db.session.execute(
select(UserCoupon)
.where(
UserCoupon.user_id == user_id,
UserCoupon.coupon_id == coupon_id
)
).scalar_one_or_none()
if existing:
return jsonify({"error": "Already claimed"}), 400
# 检查剩余数量
if coupon.remaining <= 0:
return jsonify({"error": "Coupon exhausted"}), 400
# 创建记录并扣减(原子操作)
user_coupon = UserCoupon(user_id=user_id, coupon_id=coupon_id)
db.session.add(user_coupon)
coupon.remaining -= 1
# 事务自动提交
return jsonify({"success": True})
except Exception as e:
db.session.rollback()
return jsonify({"error": str(e)}), 500方案二:唯一约束
-- 数据库层面防止重复领取
CREATE TABLE user_coupons (
user_id INT NOT NULL,
coupon_id INT NOT NULL,
claimed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, coupon_id), -- 唯一约束
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (coupon_id) REFERENCES coupons(id)
);方案三:乐观锁
# 使用版本号实现乐观锁
class Account(db.Model):
id = db.Column(db.Integer, primary_key=True)
balance = db.Column(db.Numeric(10, 2))
version = db.Column(db.Integer, default=0) # 版本号
@app.route('/api/transfer', methods=['POST'])
def transfer_fixed():
data = request.json
amount = Decimal(data['amount'])
max_retries = 3
for attempt in range(max_retries):
try:
with db.session.begin():
# 读取账户和版本号
account = db.session.execute(
select(Account).where(Account.id == data['from_account'])
).scalar_one()
current_version = account.version
if account.balance < amount:
return jsonify({"error": "Insufficient balance"}), 400
# 更新时检查版本号(乐观锁)
result = db.session.execute(
update(Account)
.where(
Account.id == data['from_account'],
Account.version == current_version # 版本号匹配才更新
)
.values(
balance=Account.balance - amount,
version=Account.version + 1
)
)
# 如果更新行数为0,说明版本冲突
if result.rowcount == 0:
db.session.rollback()
if attempt < max_retries - 1:
continue # 重试
return jsonify({"error": "Concurrent modification"}), 409
# 记录转账
db.session.add(Transaction(
from_account=data['from_account'],
amount=amount
))
return jsonify({"success": True})
except Exception as e:
db.session.rollback()
if attempt == max_retries - 1:
return jsonify({"error": "Transfer failed"}), 500
return jsonify({"error": "Max retries exceeded"}), 500五、综合防御框架
5.1 安全开发清单
□ 每个接口都验证用户身份
□ 敏感操作验证资源归属
□ 使用不可预测的资源标识符
□ 批量操作限制数量并验证权限
□ 并发场景使用数据库锁或乐观锁
□ 关键操作添加操作日志
□ 定期审计权限配置5.2 检测方法
手动测试
# 1. 水平越权测试
# 用户A登录,尝试访问用户B的资源
curl -H "Authorization: Bearer TOKEN_A" /api/user/1002/orders
# 2. IDOR测试
# 遍历ID序列
for id in $(seq 1 100); do
curl -s -o /dev/null -w "%{http_code}" \
/api/document/$id | grep -v 404
done
# 3. 条件竞争测试
# 并发发送请求
for i in {1..10}; do
curl -X POST /api/coupon/claim -d '{"id":1}' &
done
wait自动化检测思路
# 使用参数变异检测IDOR
def test_idor(endpoint, param_name, baseline_value):
"""检测是否存在IDOR漏洞"""
# 基线请求(合法访问)
baseline = request(endpoint, {param_name: baseline_value})
# 测试相邻值
for test_value in [baseline_value - 1, baseline_value + 1]:
response = request(endpoint, {param_name: test_value})
# 如果返回200且数据结构与基线相似,可能存在IDOR
if response.status == 200 and similar_structure(baseline, response):
report_vulnerability(f"Possible IDOR at {endpoint}")六、总结
业务逻辑漏洞是Web应用中最常见但也最容易被忽视的安全问题。核心防御要点:
- 永远不要信任前端:所有权限验证必须在后端完成
- 资源访问必验证:查询时始终添加归属条件
- 标识符要不可预测:使用UUID替代自增ID
- 并发操作要加锁:关键业务流程使用数据库锁
- 日志记录要完整:便于事后审计和异常检测
记住:业务逻辑漏洞没有通用的"补丁",需要深入理解业务场景,在每个关键环节实施适当的控制。
本文是网络安全系列文章的一部分,更多内容请关注 yolonote.cc