You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
OneAuth/tests/wildcard_perm_test.go

186 lines
5.2 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package tests
import (
"testing"
)
// TestWildcardPermission 测试通配符权限 (*:* 和 resource:*)
func TestWildcardPermission(t *testing.T) {
ensureUsers(t)
// Admin 拥有 *:* 权限,应该能访问所有资源
t.Run("Admin with wildcard can access any resource", func(t *testing.T) {
// 尝试访问各种管理员端点
resp := doRequest(t, "GET", "/api/users", nil, AdminToken)
assertStatus(t, resp, 200)
resp = doRequest(t, "GET", "/api/roles", nil, AdminToken)
assertStatus(t, resp, 200)
resp = doRequest(t, "GET", "/api/orgs", nil, AdminToken)
assertStatus(t, resp, 200)
})
// 普通用户没有 user:admin 权限,不能访问用户管理
t.Run("Regular user cannot access admin endpoints", func(t *testing.T) {
resp := doRequest(t, "GET", "/api/users", nil, User1Token)
if resp.Code == 200 {
t.Errorf("Expected regular user to be denied access to /api/users, got 200")
}
})
}
// TestPermissionHierarchy 测试权限层级 (resource:action vs resource:*)
func TestPermissionHierarchy(t *testing.T) {
ensureUsers(t)
// 创建自定义角色,拥有 org:* 权限(所有 org 操作)
t.Run("Create role with org:* permission", func(t *testing.T) {
// 先创建角色
resp := doRequest(t, "POST", "/api/roles", map[string]string{
"code": "org_manager",
"name": "Org Manager",
"description": "Can manage all org operations",
}, AdminToken)
assertStatus(t, resp, 200)
var data struct {
ID string `json:"id"`
}
decodeResponse(t, resp, &data)
// 获取所有权限
resp = doRequest(t, "GET", "/api/roles", nil, AdminToken)
assertStatus(t, resp, 200)
// 更新角色权限 - 赋予 vb:org:* 权限
resp = doRequest(t, "PUT", "/api/roles/"+data.ID+"/permissions", map[string]interface{}{
"permission_ids": []string{"vb:org:*"},
}, AdminToken)
assertStatus(t, resp, 200)
// 清理:删除测试角色
resp = doRequest(t, "DELETE", "/api/roles/"+data.ID, nil, AdminToken)
assertStatus(t, resp, 200)
})
}
// TestPermAnyAll 测试 PermAny 和 PermAll 中间件
func TestPermAnyAll(t *testing.T) {
ensureUsers(t)
// Admin 拥有所有权限,应该能通过 PermAny 和 PermAll
t.Run("Admin passes PermAny and PermAll", func(t *testing.T) {
// 这些端点内部可能使用 PermAny 或 PermAll
resp := doRequest(t, "GET", "/api/users", nil, AdminToken)
assertStatus(t, resp, 200)
})
// 普通用户只有特定权限
t.Run("Regular user with limited permissions", func(t *testing.T) {
// User1 可以创建 org
resp := doRequest(t, "POST", "/api/orgs", map[string]string{
"code": "test_perm_any_" + User1ID[:8],
"name": "Test Perm Any",
}, User1Token)
// 如果组织已存在,可能会返回 400但不应该是 403
if resp.Code == 403 {
t.Errorf("User should have org:create permission, got 403")
}
})
}
// TestResourceLevelPermission 测试资源级别权限控制
func TestResourceLevelPermission(t *testing.T) {
ensureUsers(t)
var orgID string
// User1 创建组织
t.Run("User1 creates org", func(t *testing.T) {
resp := doRequest(t, "POST", "/api/orgs", map[string]string{
"code": "resource_perm_test",
"name": "Resource Perm Test",
}, User1Token)
if resp.Code == 200 {
var data struct {
ID string `json:"id"`
}
decodeResponse(t, resp, &data)
orgID = data.ID
}
})
if orgID == "" {
t.Skip("Failed to create org, skipping resource permission tests")
}
// User2 不应该能访问 User1 的组织
t.Run("User2 cannot access User1's org", func(t *testing.T) {
resp := doRequest(t, "GET", "/api/orgs/"+orgID, nil, User2Token)
if resp.Code == 200 {
t.Errorf("User2 should not be able to access User1's org, got 200")
}
})
// User1 可以访问自己的组织
t.Run("User1 can access own org", func(t *testing.T) {
resp := doRequest(t, "GET", "/api/orgs/"+orgID, nil, User1Token)
assertStatus(t, resp, 200)
})
// Admin 可以访问任何组织
t.Run("Admin can access any org", func(t *testing.T) {
resp := doRequest(t, "GET", "/api/orgs/"+orgID, nil, AdminToken)
assertStatus(t, resp, 200)
})
// 清理
t.Run("Cleanup org", func(t *testing.T) {
resp := doRequest(t, "DELETE", "/api/orgs/"+orgID, nil, User1Token)
assertStatus(t, resp, 200)
})
}
// TestPermissionCache 测试权限缓存失效
func TestPermissionCache(t *testing.T) {
ensureUsers(t)
// 创建临时角色并赋予权限
var roleID string
t.Run("Create temporary role", func(t *testing.T) {
resp := doRequest(t, "POST", "/api/roles", map[string]string{
"code": "temp_role_" + User2ID[:8],
"name": "Temp Role",
"description": "Temporary role for cache test",
}, AdminToken)
assertStatus(t, resp, 200)
var data struct {
ID string `json:"id"`
}
decodeResponse(t, resp, &data)
roleID = data.ID
})
if roleID == "" {
t.Skip("Failed to create role")
}
// 赋予角色权限
t.Run("Assign permissions to role", func(t *testing.T) {
resp := doRequest(t, "PUT", "/api/roles/"+roleID+"/permissions", map[string]interface{}{
"permission_ids": []string{"vb:org:read"},
}, AdminToken)
assertStatus(t, resp, 200)
})
// 清理
t.Run("Delete temporary role", func(t *testing.T) {
resp := doRequest(t, "DELETE", "/api/roles/"+roleID, nil, AdminToken)
assertStatus(t, resp, 200)
})
}