业务逻辑漏洞:越权、遍历与条件竞争

作者:Yolo 发布时间: 2026-06-11 阅读量:7

业务逻辑漏洞:越权、遍历与条件竞争

网络安全系列 · 前端与浏览器安全篇

一、什么是业务逻辑漏洞?

业务逻辑漏洞(Business Logic Vulnerability)是指应用程序在实现业务功能时,由于设计或编码缺陷导致的安全问题。与SQL注入、XSS等技术漏洞不同,业务逻辑漏洞不依赖于特定的输入格式或语法错误,而是利用业务流程本身的缺陷进行攻击。

核心特点:


  • 难以自动化检测:传统扫描器无法识别业务逻辑问题

  • 高度依赖上下文:同样的代码在不同业务场景下风险不同

  • 危害直接:往往直接导致数据泄露或财产损失

  • 修复成本高:通常需要重新设计业务流程


二、越权访问:身份与权限的边界

2.1 越权访问的类型

水平越权(Horizontal Privilege Escalation)

攻击者访问同级权限的其他用户资源:

# 正常请求
GET /api/orders/12345 HTTP/1.1
Authorization: Bearer token_user_A

# 水平越权 - 修改ID访问其他用户订单
GET /api/orders/12346 HTTP/1.1
Authorization: Bearer token_user_A

垂直越权(Vertical Privilege Escalation)

普通用户获得管理员权限

# 普通用户尝试访问管理接口
POST /api/admin/users/delete HTTP/1.1
Authorization: Bearer token_normal_user
Content-Type: application/json

{"user_id": "12345"}

2.2 常见攻击场景

场景一:订单信息泄露

# 漏洞代码示例
@app.route('/order/<order_id>')
def view_order(order_id):
    # 只查询订单,不验证归属
    order = db.query(f"SELECT * FROM orders WHERE id = {order_id}")
    return jsonify(order)

# 修复方案
@app.route('/order/<order_id>')
@login_required
def view_order_fixed(order_id):
    user_id = current_user.id
    # 同时验证订单归属
    order = Order.query.filter_by(
        id=order_id, 
        user_id=user_id  # 关键:添加归属验证
    ).first()
    
    if not order:
        return jsonify({"error": "Order not found"}), 404
    return jsonify(order.to_dict())

场景二:管理员功能未授权访问

// 前端"隐藏"管理按钮(不可靠)
if (user.role !== 'admin') {
    hideAdminPanel();  // 仅前端隐藏,接口仍可访问
}

// 后端必须验证权限
app.post('/api/admin/delete-user', (req, res) => {
    // 漏洞:未验证管理员身份
    const { userId } = req.body;
    await User.delete(userId);
});

// 修复:严格的权限中间件
const requireAdmin = (req, res, next) => {
    if (req.user.role !== 'admin') {
        return res.status(403).json({ error: 'Admin access required' });
    }
    next();
};

app.post('/api/admin/delete-user', requireAdmin, async (req, res) => {
    // 安全:只有管理员能执行
    const { userId } = req.body;
    await User.delete(userId);
});

2.3 防御策略

防御层措施实现要点
认证层强制登录验证所有敏感接口必须验证身份
授权层资源归属校验查询时添加 user_id = ? 条件
角色层RBAC权限模型基于角色的访问控制
审计层操作日志记录记录谁、何时、做了什么

三、遍历攻击:数据的批量获取

3.1 IDOR(不安全的直接对象引用)

IDOR(Insecure Direct Object Reference) 是最常见的遍历漏洞:

# 获取用户资料
GET /api/user/1001/profile

# 遍历ID获取所有用户资料
GET /api/user/1002/profile
GET /api/user/1003/profile
GET /api/user/1004/profile
...

3.2 批量遍历技术

顺序遍历

# 使用curl批量遍历
for i in $(seq 1 10000); do
    curl -s "https://api.example.com/order/$i" \
         -H "Authorization: Bearer $TOKEN" \
         | jq '.order_number, .total'
done

参数篡改

# 原始请求
POST /api/export HTTP/1.1
Content-Type: application/json

{"user_ids": [1001]}

# 篡改后批量导出
POST /api/export HTTP/1.1
Content-Type: application/json

{"user_ids": [1,2,3,4,5,...,9999]}  # 遍历所有用户

3.3 防御方案

方案一:使用不可预测标识符

import uuid
import hashlib

# 漏洞:使用自增ID
order_id = 12345  # 可预测

# 修复:使用UUID或加密ID
order_id = str(uuid.uuid4())  # "550e8400-e29b-41d4-a716-446655440000"

# 或加密ID
encrypted_id = hashlib.sha256(f"order_{internal_id}_{secret}".encode()).hexdigest()[:16]

方案二:速率限制与异常检测

from flask_limiter import Limiter

limiter = Limiter(app, key_func=get_remote_address)

@app.route('/api/user/<user_id>')
@limiter.limit("10 per minute")  # 单IP限流
def get_user(user_id):
    # 同时检测异常模式
    if detect_scanning_pattern(request):
        return jsonify({"error": "Suspicious activity detected"}), 429
    
    # 验证用户是否有权访问此资源
    if not authorize_resource_access(current_user, user_id):
        return jsonify({"error": "Unauthorized"}), 403
    
    return jsonify(get_user_data(user_id))

方案三:批量接口的严格限制

@app.route('/api/bulk-export', methods=['POST'])
def bulk_export():
    data = request.get_json()
    requested_ids = data.get('user_ids', [])
    
    # 限制批量数量
    if len(requested_ids) > 100:
        return jsonify({"error": "Max 100 items per request"}), 400
    
    # 验证所有ID的访问权限
    authorized_ids = []
    for uid in requested_ids:
        if authorize_resource_access(current_user, uid):
            authorized_ids.append(uid)
    
    # 只返回有权限的数据
    return jsonify(export_users(authorized_ids))

四、条件竞争:时间窗口的攻击

4.1 什么是条件竞争?

条件竞争(Race Condition)发生在多个操作并发执行时,由于执行顺序不确定导致的安全问题。攻击者利用极短的时间窗口,在系统状态检查与状态变更之间插入恶意操作。

4.2 典型攻击场景

场景一:优惠券重复使用

# 漏洞代码:领取优惠券
@app.route('/api/coupon/claim', methods=['POST'])
def claim_coupon():
    coupon_id = request.json.get('coupon_id')
    user_id = current_user.id
    
    # 检查是否已领取
    if UserCoupon.query.filter_by(user_id=user_id, coupon_id=coupon_id).first():
        return jsonify({"error": "Already claimed"}), 400
    
    # 检查优惠券是否还有剩余
    coupon = Coupon.query.get(coupon_id)
    if coupon.remaining <= 0:
        return jsonify({"error": "Coupon exhausted"}), 400
    
    # 创建用户优惠券记录
    user_coupon = UserCoupon(user_id=user_id, coupon_id=coupon_id)
    db.session.add(user_coupon)
    
    # 减少剩余数量
    coupon.remaining -= 1
    db.session.commit()
    
    return jsonify({"success": True})

攻击方式:并发发送多个请求,利用检查与更新之间的时间差:

# 使用并发工具同时发送多个请求
curl -X POST /api/coupon/claim -d '{"coupon_id": 123}' &
curl -X POST /api/coupon/claim -d '{"coupon_id": 123}' &
curl -X POST /api/coupon/claim -d '{"coupon_id": 123}' &
curl -X POST /api/coupon/claim -d '{"coupon_id": 123}' &
wait

场景二:账户余额透支

# 漏洞:转账操作
@app.route('/api/transfer', methods=['POST'])
def transfer():
    data = request.json
    from_account = Account.query.get(data['from_account'])
    amount = data['amount']
    
    # 检查余额
    if from_account.balance < amount:
        return jsonify({"error": "Insufficient balance"}), 400
    
    # 模拟处理延迟
    time.sleep(0.1)
    
    # 扣减余额
    from_account.balance -= amount
    db.session.commit()
    
    return jsonify({"success": True})

攻击方式:同时发起多笔转账,余额检查通过后,多笔操作同时扣减:

# 余额100元,同时发起10笔50元转账
for i in {1..10}; do
    curl -X POST /api/transfer -d '{"from_account": 123, "amount": 50}' &
done
wait
# 结果:可能成功转账多笔,余额变为负数

4.3 防御方案

方案一:数据库事务与锁

from sqlalchemy import select, update
from sqlalchemy.orm import sessionmaker

# 使用悲观锁(SELECT FOR UPDATE)
@app.route('/api/coupon/claim', methods=['POST'])
def claim_coupon_fixed():
    coupon_id = request.json.get('coupon_id')
    user_id = current_user.id
    
    try:
        # 开启事务,锁定优惠券记录
        with db.session.begin():
            # SELECT FOR UPDATE 锁定行,防止并发修改
            coupon = db.session.execute(
                select(Coupon)
                .where(Coupon.id == coupon_id)
                .with_for_update()  # 悲观锁
            ).scalar_one()
            
            # 检查是否已领取(在事务内再次检查)
            existing = db.session.execute(
                select(UserCoupon)
                .where(
                    UserCoupon.user_id == user_id,
                    UserCoupon.coupon_id == coupon_id
                )
            ).scalar_one_or_none()
            
            if existing:
                return jsonify({"error": "Already claimed"}), 400
            
            # 检查剩余数量
            if coupon.remaining <= 0:
                return jsonify({"error": "Coupon exhausted"}), 400
            
            # 创建记录并扣减(原子操作)
            user_coupon = UserCoupon(user_id=user_id, coupon_id=coupon_id)
            db.session.add(user_coupon)
            coupon.remaining -= 1
            
        # 事务自动提交
        return jsonify({"success": True})
        
    except Exception as e:
        db.session.rollback()
        return jsonify({"error": str(e)}), 500

方案二:唯一约束

-- 数据库层面防止重复领取
CREATE TABLE user_coupons (
    user_id INT NOT NULL,
    coupon_id INT NOT NULL,
    claimed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (user_id, coupon_id),  -- 唯一约束
    FOREIGN KEY (user_id) REFERENCES users(id),
    FOREIGN KEY (coupon_id) REFERENCES coupons(id)
);

方案三:乐观锁

# 使用版本号实现乐观锁
class Account(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    balance = db.Column(db.Numeric(10, 2))
    version = db.Column(db.Integer, default=0)  # 版本号
    
@app.route('/api/transfer', methods=['POST'])
def transfer_fixed():
    data = request.json
    amount = Decimal(data['amount'])
    
    max_retries = 3
    for attempt in range(max_retries):
        try:
            with db.session.begin():
                # 读取账户和版本号
                account = db.session.execute(
                    select(Account).where(Account.id == data['from_account'])
                ).scalar_one()
                
                current_version = account.version
                
                if account.balance < amount:
                    return jsonify({"error": "Insufficient balance"}), 400
                
                # 更新时检查版本号(乐观锁)
                result = db.session.execute(
                    update(Account)
                    .where(
                        Account.id == data['from_account'],
                        Account.version == current_version  # 版本号匹配才更新
                    )
                    .values(
                        balance=Account.balance - amount,
                        version=Account.version + 1
                    )
                )
                
                # 如果更新行数为0,说明版本冲突
                if result.rowcount == 0:
                    db.session.rollback()
                    if attempt < max_retries - 1:
                        continue  # 重试
                    return jsonify({"error": "Concurrent modification"}), 409
                
                # 记录转账
                db.session.add(Transaction(
                    from_account=data['from_account'],
                    amount=amount
                ))
                
            return jsonify({"success": True})
            
        except Exception as e:
            db.session.rollback()
            if attempt == max_retries - 1:
                return jsonify({"error": "Transfer failed"}), 500
            
    return jsonify({"error": "Max retries exceeded"}), 500

五、综合防御框架

5.1 安全开发清单

□ 每个接口都验证用户身份
□ 敏感操作验证资源归属
□ 使用不可预测的资源标识符
□ 批量操作限制数量并验证权限
□ 并发场景使用数据库锁或乐观锁
□ 关键操作添加操作日志
□ 定期审计权限配置

5.2 检测方法

手动测试

# 1. 水平越权测试
# 用户A登录,尝试访问用户B的资源
curl -H "Authorization: Bearer TOKEN_A" /api/user/1002/orders

# 2. IDOR测试
# 遍历ID序列
for id in $(seq 1 100); do
    curl -s -o /dev/null -w "%{http_code}" \
         /api/document/$id | grep -v 404
done

# 3. 条件竞争测试
# 并发发送请求
for i in {1..10}; do
    curl -X POST /api/coupon/claim -d '{"id":1}' &
done
wait

自动化检测思路

# 使用参数变异检测IDOR
def test_idor(endpoint, param_name, baseline_value):
    """检测是否存在IDOR漏洞"""
    # 基线请求(合法访问)
    baseline = request(endpoint, {param_name: baseline_value})
    
    # 测试相邻值
    for test_value in [baseline_value - 1, baseline_value + 1]:
        response = request(endpoint, {param_name: test_value})
        
        # 如果返回200且数据结构与基线相似,可能存在IDOR
        if response.status == 200 and similar_structure(baseline, response):
            report_vulnerability(f"Possible IDOR at {endpoint}")

六、总结

业务逻辑漏洞是Web应用中最常见但也最容易被忽视的安全问题。核心防御要点:

  1. 永远不要信任前端:所有权限验证必须在后端完成
  2. 资源访问必验证:查询时始终添加归属条件
  3. 标识符要不可预测:使用UUID替代自增ID
  4. 并发操作要加锁:关键业务流程使用数据库锁
  5. 日志记录要完整:便于事后审计和异常检测
记住:业务逻辑漏洞没有通用的"补丁",需要深入理解业务场景,在每个关键环节实施适当的控制。

本文是网络安全系列文章的一部分,更多内容请关注 yolonote.cc