Coverage for api/roles/controller.py: 98.88%

89 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2026-01-25 13:05 +0000

1from core.dependencies import get_db 

2from core.security import verify_token 

3from core.rbac import require_permission 

4from core.permissions import Permission 

5from sqlalchemy.ext.asyncio import AsyncSession 

6from utils.response import APIResponse, parse_responses, common_responses 

7from fastapi import APIRouter, Depends, HTTPException, Request, Path, Response 

8from utils.custom_exception import NotFoundException, ConflictException, ServerException 

9from .services import ( 

10 get_all_roles, create_role, update_role, delete_role, 

11 get_role_attribute_mapping, update_role_attribute_mapping, 

12 check_user_permissions 

13) 

14from .schema import ( 

15 RoleResponse, RoleCreate, RoleUpdate, RolesListResponse, 

16 RoleAttributesMapping, RoleAttributeMappingBatchResponse, 

17 RoleAttributesGroupedResponse, 

18 PermissionCheckResponse, 

19 role_attributes_success_response_example, role_attributes_partial_response_example, 

20 role_attributes_failed_response_example 

21) 

22 

23router = APIRouter(tags=["Roles"]) 

24 

25@router.get( 

26 "", 

27 response_model=APIResponse[RolesListResponse], 

28 response_model_exclude_none=True, 

29 summary="Get all custom roles", 

30 responses=parse_responses({ 

31 200: ("Successfully retrieved roles", RolesListResponse) 

32 }, common_responses) 

33) 

34@require_permission([Permission.VIEW_ROLES, Permission.MANAGE_ROLES]) 

35async def get_roles( 

36 request: Request, 

37 token: dict = Depends(verify_token), 

38 db: AsyncSession = Depends(get_db) 

39): 

40 """Get all custom roles""" 

41 try: 

42 roles = await get_all_roles(db) 

43 return APIResponse(code=200, message="Successfully retrieved roles", data=roles) 

44 except Exception: 

45 raise HTTPException(status_code=500) 

46 

47@router.post( 

48 "", 

49 response_model=APIResponse[RoleResponse], 

50 response_model_exclude_none=True, 

51 summary="Create new role", 

52 responses=parse_responses({ 

53 200: ("Role created successfully", RoleResponse), 

54 409: ("Role name already exists", None) 

55 }, common_responses) 

56) 

57@require_permission([Permission.MANAGE_ROLES]) 

58async def create_role_api( 

59 role_data: RoleCreate, 

60 request: Request, 

61 token: dict = Depends(verify_token), 

62 db: AsyncSession = Depends(get_db) 

63): 

64 """Create a new role""" 

65 try: 

66 role = await create_role(db, role_data) 

67 return APIResponse(code=200, message="Role created successfully", data=role) 

68 except ConflictException: 

69 raise HTTPException(status_code=409, detail="Role name already exists") 

70 except Exception: 

71 raise HTTPException(status_code=500) 

72 

73@router.put( 

74 "/{role_id}", 

75 response_model=APIResponse[RoleResponse], 

76 response_model_exclude_none=True, 

77 summary="Update role info", 

78 responses=parse_responses({ 

79 200: ("Role updated successfully", RoleResponse), 

80 404: ("Role not found", None), 

81 409: ("Role name already exists", None) 

82 }, common_responses) 

83) 

84@require_permission([Permission.MANAGE_ROLES]) 

85async def update_role_api( 

86 role_id: str = Path(..., description="Role ID"), 

87 role_data: RoleUpdate = None, 

88 request: Request = None, 

89 token: dict = Depends(verify_token), 

90 db: AsyncSession = Depends(get_db) 

91): 

92 """Update role information""" 

93 try: 

94 role = await update_role(db, role_id, role_data) 

95 return APIResponse(code=200, message="Role updated successfully", data=role) 

96 except NotFoundException: 

97 raise HTTPException(status_code=404, detail="Role not found") 

98 except ConflictException: 

99 raise HTTPException(status_code=409, detail="Role name already exists") 

100 except ServerException: 

101 raise HTTPException(status_code=500) 

102 

103@router.delete( 

104 "/{role_id}", 

105 response_model=APIResponse[dict], 

106 response_model_exclude_none=True, 

107 summary="Delete role", 

108 responses=parse_responses({ 

109 200: ("Role deleted successfully", None), 

110 404: ("Role not found", None), 

111 409: ("Cannot delete role that is assigned to users", None) 

112 }, common_responses) 

113) 

114@require_permission([Permission.MANAGE_ROLES]) 

115async def delete_role_api( 

116 role_id: str = Path(..., description="Role ID"), 

117 request: Request = None, 

118 token: dict = Depends(verify_token), 

119 db: AsyncSession = Depends(get_db) 

120): 

121 """Delete a role""" 

122 try: 

123 await delete_role(db, role_id) 

124 return APIResponse(code=200, message="Role deleted successfully") 

125 except NotFoundException: 

126 raise HTTPException(status_code=404, detail="Role not found") 

127 except ConflictException: 

128 raise HTTPException(status_code=409, detail="Cannot delete role that is assigned to users") 

129 except Exception: 

130 raise HTTPException(status_code=500) 

131 

132@router.get( 

133 "/{role_id}/attributes", 

134 response_model=APIResponse[RoleAttributesGroupedResponse], 

135 response_model_exclude_none=True, 

136 summary="Get role attributes mapping", 

137 responses=parse_responses({ 

138 200: ("Successfully retrieved role attributes mapping", RoleAttributesGroupedResponse, RoleAttributesGroupedResponse.get_example_response()), 

139 404: ("Role not found", None) 

140 }, common_responses) 

141) 

142@require_permission([Permission.VIEW_ROLES, Permission.MANAGE_ROLES]) 

143async def get_role_attribute_mapping_api( 

144 role_id: str = Path(..., description="Role ID"), 

145 request: Request = None, 

146 token: dict = Depends(verify_token), 

147 db: AsyncSession = Depends(get_db) 

148): 

149 """Get role attributes mapping with all available attributes""" 

150 try: 

151 attributes_mapping = await get_role_attribute_mapping(db, role_id) 

152 return APIResponse( 

153 code=200, 

154 message="Successfully retrieved role attributes mapping", 

155 data=attributes_mapping 

156 ) 

157 except NotFoundException: 

158 raise HTTPException(status_code=404, detail="Role not found") 

159 except Exception: 

160 raise HTTPException(status_code=500) 

161 

162@router.put( 

163 "/{role_id}/attributes", 

164 response_model=APIResponse[RoleAttributeMappingBatchResponse], 

165 response_model_exclude_none=True, 

166 summary="Update role attributes", 

167 responses=parse_responses({ 

168 200: ("All role attributes processed successfully", RoleAttributeMappingBatchResponse, role_attributes_success_response_example), 

169 207: ("Role attributes processed with partial success", RoleAttributeMappingBatchResponse, role_attributes_partial_response_example), 

170 400: ("All role attributes failed to process", RoleAttributeMappingBatchResponse, role_attributes_failed_response_example) 

171 }, common_responses) 

172) 

173@require_permission([Permission.MANAGE_ROLES]) 

174async def update_role_attribute_mapping_api( 

175 role_id: str = Path(..., description="Role ID"), 

176 attributes_data: RoleAttributesMapping = None, 

177 request: Request = None, 

178 token: dict = Depends(verify_token), 

179 db: AsyncSession = Depends(get_db) 

180): 

181 """Update role attributes with batch processing results""" 

182 try: 

183 batch_result = await update_role_attribute_mapping(db, role_id, attributes_data.attributes) 

184 

185 # Determine response code based on results 

186 if batch_result.failed_count == 0: 

187 # All successful 

188 return APIResponse( 

189 code=200, 

190 message="All role attributes processed successfully", 

191 data=batch_result 

192 ) 

193 elif batch_result.success_count == 0: 

194 # All failed - return 400 status code 

195 response = APIResponse( 

196 code=400, 

197 message="All role attributes failed to process", 

198 data=batch_result 

199 ) 

200 return Response( 

201 content=response.model_dump_json(), 

202 status_code=400, 

203 media_type="application/json" 

204 ) 

205 else: 

206 # Partial success - return 207 status code 

207 response = APIResponse( 

208 code=207, 

209 message="Role attributes processed with partial success", 

210 data=batch_result 

211 ) 

212 return Response( 

213 content=response.model_dump_json(), 

214 status_code=207, 

215 media_type="application/json" 

216 ) 

217 except NotFoundException: 

218 raise HTTPException(status_code=404, detail="Role not found") 

219 except Exception: 

220 raise HTTPException(status_code=500) 

221 

222@router.get( 

223 "/permissions", 

224 response_model=APIResponse[PermissionCheckResponse], 

225 response_model_exclude_none=True, 

226 summary="Get current user permissions", 

227 responses=parse_responses({ 

228 200: ("User permissions retrieved", PermissionCheckResponse, PermissionCheckResponse.get_example_response()) 

229 }, common_responses) 

230) 

231async def get_user_permissions_api( 

232 request: Request = None, 

233 token: dict = Depends(verify_token), 

234 db: AsyncSession = Depends(get_db) 

235): 

236 """Get all permissions for the current user""" 

237 try: 

238 user_id = token.get("sub") 

239 result = await check_user_permissions(db, user_id, None) 

240 

241 return APIResponse(code=200, message="User permissions retrieved", data=result) 

242 except HTTPException: 

243 raise 

244 except Exception: 

245 raise HTTPException(status_code=500)