Coverage for api/roles/controller.py: 98.88%
89 statements
« prev ^ index » next coverage.py v7.9.2, created at 2026-01-25 13:05 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2026-01-25 13:05 +0000
1from core.dependencies import get_db
2from core.security import verify_token
3from core.rbac import require_permission
4from core.permissions import Permission
5from sqlalchemy.ext.asyncio import AsyncSession
6from utils.response import APIResponse, parse_responses, common_responses
7from fastapi import APIRouter, Depends, HTTPException, Request, Path, Response
8from utils.custom_exception import NotFoundException, ConflictException, ServerException
9from .services import (
10 get_all_roles, create_role, update_role, delete_role,
11 get_role_attribute_mapping, update_role_attribute_mapping,
12 check_user_permissions
13)
14from .schema import (
15 RoleResponse, RoleCreate, RoleUpdate, RolesListResponse,
16 RoleAttributesMapping, RoleAttributeMappingBatchResponse,
17 RoleAttributesGroupedResponse,
18 PermissionCheckResponse,
19 role_attributes_success_response_example, role_attributes_partial_response_example,
20 role_attributes_failed_response_example
21)
23router = APIRouter(tags=["Roles"])
25@router.get(
26 "",
27 response_model=APIResponse[RolesListResponse],
28 response_model_exclude_none=True,
29 summary="Get all custom roles",
30 responses=parse_responses({
31 200: ("Successfully retrieved roles", RolesListResponse)
32 }, common_responses)
33)
34@require_permission([Permission.VIEW_ROLES, Permission.MANAGE_ROLES])
35async def get_roles(
36 request: Request,
37 token: dict = Depends(verify_token),
38 db: AsyncSession = Depends(get_db)
39):
40 """Get all custom roles"""
41 try:
42 roles = await get_all_roles(db)
43 return APIResponse(code=200, message="Successfully retrieved roles", data=roles)
44 except Exception:
45 raise HTTPException(status_code=500)
47@router.post(
48 "",
49 response_model=APIResponse[RoleResponse],
50 response_model_exclude_none=True,
51 summary="Create new role",
52 responses=parse_responses({
53 200: ("Role created successfully", RoleResponse),
54 409: ("Role name already exists", None)
55 }, common_responses)
56)
57@require_permission([Permission.MANAGE_ROLES])
58async def create_role_api(
59 role_data: RoleCreate,
60 request: Request,
61 token: dict = Depends(verify_token),
62 db: AsyncSession = Depends(get_db)
63):
64 """Create a new role"""
65 try:
66 role = await create_role(db, role_data)
67 return APIResponse(code=200, message="Role created successfully", data=role)
68 except ConflictException:
69 raise HTTPException(status_code=409, detail="Role name already exists")
70 except Exception:
71 raise HTTPException(status_code=500)
73@router.put(
74 "/{role_id}",
75 response_model=APIResponse[RoleResponse],
76 response_model_exclude_none=True,
77 summary="Update role info",
78 responses=parse_responses({
79 200: ("Role updated successfully", RoleResponse),
80 404: ("Role not found", None),
81 409: ("Role name already exists", None)
82 }, common_responses)
83)
84@require_permission([Permission.MANAGE_ROLES])
85async def update_role_api(
86 role_id: str = Path(..., description="Role ID"),
87 role_data: RoleUpdate = None,
88 request: Request = None,
89 token: dict = Depends(verify_token),
90 db: AsyncSession = Depends(get_db)
91):
92 """Update role information"""
93 try:
94 role = await update_role(db, role_id, role_data)
95 return APIResponse(code=200, message="Role updated successfully", data=role)
96 except NotFoundException:
97 raise HTTPException(status_code=404, detail="Role not found")
98 except ConflictException:
99 raise HTTPException(status_code=409, detail="Role name already exists")
100 except ServerException:
101 raise HTTPException(status_code=500)
103@router.delete(
104 "/{role_id}",
105 response_model=APIResponse[dict],
106 response_model_exclude_none=True,
107 summary="Delete role",
108 responses=parse_responses({
109 200: ("Role deleted successfully", None),
110 404: ("Role not found", None),
111 409: ("Cannot delete role that is assigned to users", None)
112 }, common_responses)
113)
114@require_permission([Permission.MANAGE_ROLES])
115async def delete_role_api(
116 role_id: str = Path(..., description="Role ID"),
117 request: Request = None,
118 token: dict = Depends(verify_token),
119 db: AsyncSession = Depends(get_db)
120):
121 """Delete a role"""
122 try:
123 await delete_role(db, role_id)
124 return APIResponse(code=200, message="Role deleted successfully")
125 except NotFoundException:
126 raise HTTPException(status_code=404, detail="Role not found")
127 except ConflictException:
128 raise HTTPException(status_code=409, detail="Cannot delete role that is assigned to users")
129 except Exception:
130 raise HTTPException(status_code=500)
132@router.get(
133 "/{role_id}/attributes",
134 response_model=APIResponse[RoleAttributesGroupedResponse],
135 response_model_exclude_none=True,
136 summary="Get role attributes mapping",
137 responses=parse_responses({
138 200: ("Successfully retrieved role attributes mapping", RoleAttributesGroupedResponse, RoleAttributesGroupedResponse.get_example_response()),
139 404: ("Role not found", None)
140 }, common_responses)
141)
142@require_permission([Permission.VIEW_ROLES, Permission.MANAGE_ROLES])
143async def get_role_attribute_mapping_api(
144 role_id: str = Path(..., description="Role ID"),
145 request: Request = None,
146 token: dict = Depends(verify_token),
147 db: AsyncSession = Depends(get_db)
148):
149 """Get role attributes mapping with all available attributes"""
150 try:
151 attributes_mapping = await get_role_attribute_mapping(db, role_id)
152 return APIResponse(
153 code=200,
154 message="Successfully retrieved role attributes mapping",
155 data=attributes_mapping
156 )
157 except NotFoundException:
158 raise HTTPException(status_code=404, detail="Role not found")
159 except Exception:
160 raise HTTPException(status_code=500)
162@router.put(
163 "/{role_id}/attributes",
164 response_model=APIResponse[RoleAttributeMappingBatchResponse],
165 response_model_exclude_none=True,
166 summary="Update role attributes",
167 responses=parse_responses({
168 200: ("All role attributes processed successfully", RoleAttributeMappingBatchResponse, role_attributes_success_response_example),
169 207: ("Role attributes processed with partial success", RoleAttributeMappingBatchResponse, role_attributes_partial_response_example),
170 400: ("All role attributes failed to process", RoleAttributeMappingBatchResponse, role_attributes_failed_response_example)
171 }, common_responses)
172)
173@require_permission([Permission.MANAGE_ROLES])
174async def update_role_attribute_mapping_api(
175 role_id: str = Path(..., description="Role ID"),
176 attributes_data: RoleAttributesMapping = None,
177 request: Request = None,
178 token: dict = Depends(verify_token),
179 db: AsyncSession = Depends(get_db)
180):
181 """Update role attributes with batch processing results"""
182 try:
183 batch_result = await update_role_attribute_mapping(db, role_id, attributes_data.attributes)
185 # Determine response code based on results
186 if batch_result.failed_count == 0:
187 # All successful
188 return APIResponse(
189 code=200,
190 message="All role attributes processed successfully",
191 data=batch_result
192 )
193 elif batch_result.success_count == 0:
194 # All failed - return 400 status code
195 response = APIResponse(
196 code=400,
197 message="All role attributes failed to process",
198 data=batch_result
199 )
200 return Response(
201 content=response.model_dump_json(),
202 status_code=400,
203 media_type="application/json"
204 )
205 else:
206 # Partial success - return 207 status code
207 response = APIResponse(
208 code=207,
209 message="Role attributes processed with partial success",
210 data=batch_result
211 )
212 return Response(
213 content=response.model_dump_json(),
214 status_code=207,
215 media_type="application/json"
216 )
217 except NotFoundException:
218 raise HTTPException(status_code=404, detail="Role not found")
219 except Exception:
220 raise HTTPException(status_code=500)
222@router.get(
223 "/permissions",
224 response_model=APIResponse[PermissionCheckResponse],
225 response_model_exclude_none=True,
226 summary="Get current user permissions",
227 responses=parse_responses({
228 200: ("User permissions retrieved", PermissionCheckResponse, PermissionCheckResponse.get_example_response())
229 }, common_responses)
230)
231async def get_user_permissions_api(
232 request: Request = None,
233 token: dict = Depends(verify_token),
234 db: AsyncSession = Depends(get_db)
235):
236 """Get all permissions for the current user"""
237 try:
238 user_id = token.get("sub")
239 result = await check_user_permissions(db, user_id, None)
241 return APIResponse(code=200, message="User permissions retrieved", data=result)
242 except HTTPException:
243 raise
244 except Exception:
245 raise HTTPException(status_code=500)