You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
OneAuth/tests/edge_case_test.go

321 lines
9.1 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package tests
import (
"strings"
"testing"
)
// TestSQLInjection 测试 SQL 注入防护
func TestSQLInjection(t *testing.T) {
ensureUsers(t)
// 测试用户名中的 SQL 注入
t.Run("SQL Injection in username", func(t *testing.T) {
resp := doRequest(t, "POST", "/api/auth/register", map[string]string{
"username": "admin' OR '1'='1",
"password": "password123",
"email": "sql@test.com",
}, "")
// 应该正常处理,不应该崩溃或返回异常
if resp.Code == 500 {
t.Errorf("SQL injection caused server error: %s", resp.Body.String())
}
})
// 测试组织代码中的 SQL 注入
t.Run("SQL Injection in org code", func(t *testing.T) {
resp := doRequest(t, "POST", "/api/orgs", map[string]string{
"code": "test' OR '1'='1",
"name": "SQL Test Org",
}, User1Token)
// 应该正常处理或返回业务错误,而不是 SQL 错误
if resp.Code == 500 {
t.Errorf("SQL injection in org code caused server error: %s", resp.Body.String())
}
})
// 测试搜索中的 SQL 注入
t.Run("SQL Injection in search", func(t *testing.T) {
resp := doRequest(t, "GET", "/api/auth/users?keyword=admin' OR '1'='1", nil, AdminToken)
// 应该正常处理
if resp.Code == 500 {
t.Errorf("SQL injection in search caused server error: %s", resp.Body.String())
}
})
}
// TestXSSPrevention 测试 XSS 防护
func TestXSSPrevention(t *testing.T) {
ensureUsers(t)
xssPayload := "<script>alert('xss')</script>"
// 测试昵称中的 XSS
t.Run("XSS in nickname", func(t *testing.T) {
resp := doRequest(t, "PATCH", "/api/auth/me", map[string]string{
"nickname": xssPayload,
}, User1Token)
if resp.Code == 200 {
// 检查返回的数据是否被转义
if strings.Contains(resp.Body.String(), "<script>") {
t.Logf("Warning: XSS payload not escaped in response: %s", resp.Body.String())
}
}
})
// 测试组织名称中的 XSS
t.Run("XSS in org name", func(t *testing.T) {
resp := doRequest(t, "POST", "/api/orgs", map[string]string{
"code": "xss_test_org",
"name": xssPayload,
}, User1Token)
if resp.Code == 200 {
if strings.Contains(resp.Body.String(), "<script>") {
t.Logf("Warning: XSS payload not escaped in org response: %s", resp.Body.String())
}
}
})
}
// TestInputValidation 测试输入验证
func TestInputValidation(t *testing.T) {
ensureUsers(t)
// 测试空用户名
t.Run("Empty username", func(t *testing.T) {
resp := doRequest(t, "POST", "/api/auth/register", map[string]string{
"username": "",
"password": "password123",
"email": "empty@test.com",
}, "")
if resp.Code == 200 {
t.Errorf("Empty username should be rejected, got 200")
}
})
// 测试空密码
t.Run("Empty password", func(t *testing.T) {
resp := doRequest(t, "POST", "/api/auth/register", map[string]string{
"username": "testuser_empty_pass",
"password": "",
"email": "emptypass@test.com",
}, "")
if resp.Code == 200 {
t.Errorf("Empty password should be rejected, got 200")
}
})
// 测试超长输入
t.Run("Overlong username", func(t *testing.T) {
longUsername := strings.Repeat("a", 300)
resp := doRequest(t, "POST", "/api/auth/register", map[string]string{
"username": longUsername,
"password": "password123",
"email": "long@test.com",
}, "")
if resp.Code == 200 {
t.Logf("Warning: Overlong username was accepted")
}
})
// 测试无效邮箱格式
t.Run("Invalid email format", func(t *testing.T) {
resp := doRequest(t, "POST", "/api/auth/register", map[string]string{
"username": "testuser_invalid_email",
"password": "password123",
"email": "not-an-email",
}, "")
if resp.Code == 200 {
t.Logf("Warning: Invalid email format was accepted")
}
})
}
// TestIDOR 测试不安全的直接对象引用 (IDOR)
func TestIDOR(t *testing.T) {
ensureUsers(t)
// 测试User1 尝试访问 User2 的敏感信息
t.Run("User1 tries to access User2's data", func(t *testing.T) {
// 尝试通过 /api/users/{id} 访问其他用户信息
resp := doRequest(t, "GET", "/api/users/"+User2ID, nil, User1Token)
if resp.Code == 200 {
// 检查是否返回了敏感信息
t.Logf("Warning: User1 can access User2's data: %s", resp.Body.String())
}
})
// 测试User1 尝试修改 User2 的信息
t.Run("User1 tries to modify User2's data", func(t *testing.T) {
resp := doRequest(t, "PATCH", "/api/users/"+User2ID, map[string]string{
"nickname": "Hacked by User1",
}, User1Token)
if resp.Code == 200 {
t.Errorf("User1 should not be able to modify User2's data, got 200")
}
})
}
// TestAuthorizationBypass 测试授权绕过
func TestAuthorizationBypass(t *testing.T) {
ensureUsers(t)
// 测试:尝试访问需要认证但没有提供 token 的端点
t.Run("Access protected endpoint without token", func(t *testing.T) {
endpoints := []struct {
method string
path string
}{
{"GET", "/api/users"},
{"GET", "/api/orgs"},
{"GET", "/api/roles"},
{"GET", "/api/auth/me"},
}
for _, ep := range endpoints {
resp := doRequest(t, ep.method, ep.path, nil, "")
if resp.Code == 200 {
t.Errorf("%s %s should require authentication, got 200", ep.method, ep.path)
}
}
})
// 测试:使用无效 token
t.Run("Access with invalid token", func(t *testing.T) {
resp := doRequest(t, "GET", "/api/auth/me", nil, "invalid_token_here")
if resp.Code == 200 {
t.Errorf("Invalid token should be rejected, got 200")
}
})
// 测试:使用过期 token如果有办法生成
t.Run("Access with malformed token", func(t *testing.T) {
resp := doRequest(t, "GET", "/api/auth/me", nil, "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.invalid")
if resp.Code == 200 {
t.Errorf("Malformed token should be rejected, got 200")
}
})
}
// TestPrivilegeEscalation 测试权限提升
func TestPrivilegeEscalation(t *testing.T) {
ensureUsers(t)
// 测试:普通用户尝试赋予自己 admin 角色
t.Run("User tries to assign admin role to self", func(t *testing.T) {
resp := doRequest(t, "PUT", "/api/users/"+User1ID+"/roles", map[string]any{
"role_ids": []string{"admin"},
}, User1Token)
if resp.Code == 200 {
t.Errorf("User should not be able to assign admin role to self, got 200")
}
})
// 测试:普通用户尝试修改自己的权限
t.Run("User tries to modify own permissions", func(t *testing.T) {
resp := doRequest(t, "PUT", "/api/users/"+User1ID+"/permissions", map[string]any{
"permission_ids": []string{"*:*"},
}, User1Token)
if resp.Code == 200 {
t.Errorf("User should not be able to modify own permissions, got 200")
}
})
// 测试:普通用户尝试创建系统角色
t.Run("User tries to create system role", func(t *testing.T) {
resp := doRequest(t, "POST", "/api/roles", map[string]string{
"code": "super_admin",
"name": "Super Admin",
"description": "Trying to escalate privileges",
}, User1Token)
if resp.Code == 200 {
t.Errorf("User should not be able to create roles, got 200")
}
})
}
// TestRateLimiting 测试速率限制(如果实现了)
func TestRateLimiting(t *testing.T) {
ensureUsers(t)
// 快速发送多个请求
t.Run("Rapid requests", func(t *testing.T) {
for i := 0; i < 10; i++ {
resp := doRequest(t, "GET", "/api/auth/me", nil, User1Token)
if resp.Code == 429 {
t.Logf("Rate limiting detected after %d requests", i+1)
return
}
}
t.Logf("No rate limiting detected")
})
}
// TestSpecialCharacters 测试特殊字符处理
func TestSpecialCharacters(t *testing.T) {
ensureUsers(t)
specialChars := []string{
"test\x00null", // null byte
"test\nnewline", // newline
"test\rcarriage", // carriage return
"test\ttab", // tab
"test<script>", // HTML tag
"test../../etc/passwd", // path traversal
"test%00", // URL encoded null
"test%2e%2e%2f", // URL encoded path traversal
}
for _, char := range specialChars {
safeChar := strings.ReplaceAll(char, " ", "_")
if len(safeChar) > 4 {
safeChar = safeChar[:4]
}
codeSuffix := safeChar
if len(codeSuffix) > 8 {
codeSuffix = codeSuffix[:8]
}
t.Run("Special char: "+safeChar+"...", func(t *testing.T) {
resp := doRequest(t, "POST", "/api/orgs", map[string]string{
"code": "test_" + codeSuffix,
"name": char,
}, User1Token)
if resp.Code == 500 {
t.Errorf("Special character caused server error: %s", resp.Body.String())
}
})
}
}
// TestUUIDManipulation 测试 UUID 操作
func TestUUIDManipulation(t *testing.T) {
ensureUsers(t)
// 测试无效 UUID 格式
t.Run("Invalid UUID format", func(t *testing.T) {
resp := doRequest(t, "GET", "/api/users/invalid-uuid", nil, AdminToken)
if resp.Code == 200 {
t.Errorf("Invalid UUID should be rejected, got 200")
}
})
// 测试不存在的 UUID
t.Run("Non-existent UUID", func(t *testing.T) {
resp := doRequest(t, "GET", "/api/users/12345678-1234-1234-1234-123456789abc", nil, AdminToken)
if resp.Code == 200 {
t.Errorf("Non-existent UUID should return 404, got 200")
}
})
// 测试 SQL 注入风格的 UUID
t.Run("SQL injection in UUID", func(t *testing.T) {
resp := doRequest(t, "GET", "/api/users/' OR '1'='1", nil, AdminToken)
if resp.Code == 200 {
t.Errorf("SQL injection in UUID should be rejected, got 200")
}
})
}