Coverage for api/users/services.py: 90.59%

202 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2026-01-25 13:05 +0000

1import redis 

2import logging 

3from models.users import Users 

4from models.roles import Roles 

5from models.role_mapper import RoleMapper 

6from models.login_logs import LoginLogs 

7from models.user_sessions import UserSessions 

8from models.password_reset_tokens import PasswordResetTokens 

9from typing import Optional, List 

10from sqlalchemy.ext.asyncio import AsyncSession 

11from sqlalchemy import select, func, or_, delete, case 

12from core.security import hash_password, clear_user_all_sessions 

13from .schema import UserResponse, UserPagination, UserCreate, UserUpdate, UserDeleteBatchResponse, UserDeleteResult 

14from utils.custom_exception import ServerException, ConflictException, NotFoundException 

15 

16logger = logging.getLogger(__name__) 

17 

18async def get_all_users( 

19 db: AsyncSession, 

20 keyword: Optional[str] = None, 

21 status: Optional[str] = None, 

22 role: Optional[str] = None, 

23 page: int = 1, 

24 per_page: int = 10, 

25 sort_by: Optional[str] = None, 

26 desc: bool = False 

27) -> UserPagination: 

28 """Get all users list""" 

29 try: 

30 query = select(Users) 

31 

32 if keyword: 

33 query = query.where( 

34 or_( 

35 Users.first_name.ilike(f"%{keyword}%"), 

36 Users.last_name.ilike(f"%{keyword}%"), 

37 Users.email.ilike(f"%{keyword}%") 

38 ) 

39 ) 

40 

41 if status: 

42 status_list = [s.strip().lower() == 'true' for s in status.split(',')] 

43 if len(status_list) == 1: 

44 query = query.where(Users.status == status_list[0]) 

45 else: 

46 query = query.where(Users.status.in_(status_list)) 

47 

48 has_role_join = False 

49 

50 if role: 

51 role_list = [r.strip() for r in role.split(',')] 

52 query = query.join(RoleMapper, Users.id == RoleMapper.user_id) 

53 query = query.join(Roles, RoleMapper.role_id == Roles.id) 

54 query = query.where(Roles.name.in_(role_list)) 

55 has_role_join = True 

56 

57 if sort_by: 

58 if sort_by == "role": 

59 # For role sorting, use LEFT JOIN with distinct to avoid duplicates 

60 if not has_role_join: 

61 query = query.outerjoin(RoleMapper, Users.id == RoleMapper.user_id) 

62 query = query.outerjoin(Roles, RoleMapper.role_id == Roles.id) 

63 # Use distinct to avoid duplicate rows when a user has multiple roles 

64 query = query.distinct() 

65 null_order = case((Roles.name.is_(None), 1), else_=0) 

66 if desc: 

67 query = query.order_by(null_order.asc(), Roles.name.desc(), Users.id) 

68 else: 

69 query = query.order_by(null_order.asc(), Roles.name.asc(), Users.id) 

70 else: 

71 # For other fields, use direct column access 

72 sort_column = getattr(Users, sort_by, None) 

73 if sort_column: 

74 if desc: 

75 query = query.order_by(sort_column.desc()) 

76 else: 

77 query = query.order_by(sort_column.asc()) 

78 else: 

79 query = query.order_by(Users.created_at.desc()) 

80 else: 

81 query = query.order_by(Users.id.asc()) 

82 

83 count_query = select(func.count(Users.id)) 

84 if keyword: 

85 count_query = count_query.where( 

86 or_( 

87 Users.first_name.ilike(f"%{keyword}%"), 

88 Users.last_name.ilike(f"%{keyword}%"), 

89 Users.email.ilike(f"%{keyword}%") 

90 ) 

91 ) 

92 if status: 

93 status_list = [s.strip().lower() == 'true' for s in status.split(',')] 

94 if len(status_list) == 1: 

95 count_query = count_query.where(Users.status == status_list[0]) 

96 else: 

97 count_query = count_query.where(Users.status.in_(status_list)) 

98 if role: 

99 role_list = [r.strip() for r in role.split(',')] 

100 count_query = count_query.join(RoleMapper, Users.id == RoleMapper.user_id) 

101 count_query = count_query.join(Roles, RoleMapper.role_id == Roles.id) 

102 count_query = count_query.where(Roles.name.in_(role_list)) 

103 

104 total_result = await db.execute(count_query) 

105 total = total_result.scalar() 

106 

107 offset = (page - 1) * per_page 

108 query = query.offset(offset).limit(per_page) 

109 

110 result = await db.execute(query) 

111 users = result.scalars().all() 

112 

113 if not users: 

114 return UserPagination( 

115 users=[], 

116 total=0, 

117 page=page, 

118 per_page=per_page, 

119 total_pages=0 

120 ) 

121 

122 user_responses = [] 

123 for user in users: 

124 role_query = select(Roles.name).join( 

125 RoleMapper, Roles.id == RoleMapper.role_id 

126 ).where(RoleMapper.user_id == user.id).limit(1) 

127 

128 role_result = await db.execute(role_query) 

129 user_role = role_result.scalar() 

130 

131 user_response = UserResponse( 

132 id=user.id, 

133 email=user.email, 

134 first_name=user.first_name, 

135 last_name=user.last_name, 

136 phone=user.phone, 

137 status=user.status, 

138 created_at=user.created_at, 

139 role=user_role 

140 ) 

141 user_responses.append(user_response) 

142 

143 total_pages = (total + per_page - 1) // per_page 

144 

145 return UserPagination( 

146 users=user_responses, 

147 total=total, 

148 page=page, 

149 per_page=per_page, 

150 total_pages=total_pages 

151 ) 

152 

153 except Exception as e: 

154 raise ServerException(f"Failed to retrieve users: {str(e)}") 

155 

156async def create_user(db: AsyncSession, user_data: UserCreate) -> UserResponse: 

157 """Create a new user""" 

158 try: 

159 # Check if the email already exists 

160 result = await db.execute( 

161 select(Users).where( 

162 or_( 

163 Users.email == user_data.email, 

164 Users.pending_email == user_data.email 

165 ) 

166 ) 

167 ) 

168 existing_user = result.scalar_one_or_none() 

169 if existing_user: 

170 raise ConflictException("Email already exists") 

171 

172 user = Users( 

173 first_name=user_data.first_name, 

174 last_name=user_data.last_name, 

175 email=user_data.email, 

176 phone=user_data.phone, 

177 hash_password=await hash_password(user_data.password), 

178 status=user_data.status 

179 ) 

180 

181 db.add(user) 

182 await db.commit() 

183 await db.refresh(user) 

184 

185 user_role = None 

186 if user_data.role: 

187 await _assign_user_role(db, user.id, user_data.role) 

188 user_role = user_data.role 

189 

190 return UserResponse( 

191 id=user.id, 

192 email=user.email, 

193 first_name=user.first_name, 

194 last_name=user.last_name, 

195 phone=user.phone, 

196 status=user.status, 

197 created_at=user.created_at, 

198 role=user_role 

199 ) 

200 

201 except ConflictException: 

202 raise 

203 except Exception as e: 

204 raise ServerException(f"Failed to create user: {str(e)}") 

205 

206async def update_user(db: AsyncSession, user_id: str, user_data: UserUpdate) -> UserResponse: 

207 """Update user information""" 

208 try: 

209 result = await db.execute( 

210 select(Users).where(Users.id == user_id) 

211 ) 

212 user = result.scalar_one_or_none() 

213 if not user: 

214 raise NotFoundException("User not found") 

215 

216 # Check if the email is already used by another user 

217 if user_data.email and user_data.email != user.email: 

218 result = await db.execute( 

219 select(Users).where( 

220 or_( 

221 Users.email == user_data.email, 

222 Users.pending_email == user_data.email 

223 ), 

224 Users.id != user_id 

225 ) 

226 ) 

227 if result.scalar_one_or_none(): 

228 raise ConflictException("Email already exists") 

229 

230 update_data = user_data.model_dump(exclude_unset=True, exclude={'role'}) 

231 for field, value in update_data.items(): 

232 setattr(user, field, value) 

233 

234 await db.commit() 

235 await db.refresh(user) 

236 

237 if 'role' in user_data.model_dump(exclude_unset=True): 

238 await _update_user_role(db, user_id, user_data.role) 

239 

240 role_query = select(Roles.name).join( 

241 RoleMapper, Roles.id == RoleMapper.role_id 

242 ).where(RoleMapper.user_id == user.id).limit(1) 

243 

244 role_result = await db.execute(role_query) 

245 user_role = role_result.scalar() 

246 

247 return UserResponse( 

248 id=user.id, 

249 email=user.email, 

250 first_name=user.first_name, 

251 last_name=user.last_name, 

252 phone=user.phone, 

253 status=user.status, 

254 created_at=user.created_at, 

255 role=user_role 

256 ) 

257 

258 except (ConflictException, NotFoundException): 

259 raise 

260 except Exception as e: 

261 raise ServerException(f"Failed to update user: {str(e)}") 

262 

263async def delete_users(db: AsyncSession, redis_client: redis.Redis, user_ids: List[str], token: Optional[dict] = None) -> UserDeleteBatchResponse: 

264 """Delete multiple users with detailed batch processing results""" 

265 try: 

266 results = [] 

267 success_count = 0 

268 failed_count = 0 

269 

270 # Get current user ID from token 

271 current_user_id = token.get("sub") if token else None 

272 

273 # Check which users exist 

274 result = await db.execute( 

275 select(Users.id).where(Users.id.in_(user_ids)) 

276 ) 

277 existing_ids = set(result.scalars().all()) 

278 

279 # Process each user ID 

280 for user_id in user_ids: 

281 try: 

282 # Skip if trying to delete own account 

283 if current_user_id and user_id == current_user_id: 

284 results.append(UserDeleteResult( 

285 user_id=user_id, 

286 status="failed", 

287 message="Cannot delete your own account" 

288 )) 

289 failed_count += 1 

290 continue 

291 

292 if user_id in existing_ids: 

293 # Clear user sessions and tokens before deletion 

294 await clear_user_all_sessions(db, redis_client, user_id) 

295 

296 # Delete related records first to avoid foreign key constraints 

297 await _delete_user_related_records(db, user_id) 

298 

299 # Delete the user 

300 await db.execute( 

301 delete(Users).where(Users.id == user_id) 

302 ) 

303 

304 results.append(UserDeleteResult( 

305 user_id=user_id, 

306 status="success", 

307 message="User deleted successfully" 

308 )) 

309 success_count += 1 

310 else: 

311 results.append(UserDeleteResult( 

312 user_id=user_id, 

313 status="failed", 

314 message="User not found" 

315 )) 

316 failed_count += 1 

317 

318 except Exception as e: 

319 results.append(UserDeleteResult( 

320 user_id=user_id, 

321 status="failed", 

322 message=f"Failed to delete user: {str(e)}" 

323 )) 

324 failed_count += 1 

325 

326 # Commit all successful deletions 

327 if success_count > 0: 

328 await db.commit() 

329 

330 return UserDeleteBatchResponse( 

331 results=results, 

332 total_users=len(user_ids), 

333 success_count=success_count, 

334 failed_count=failed_count 

335 ) 

336 

337 except Exception as e: 

338 raise ServerException(f"Failed to delete users: {str(e)}") 

339 

340async def reset_user_password( 

341 db: AsyncSession, 

342 redis_client: redis.Redis, 

343 user_id: str, 

344 new_password: str 

345) -> bool: 

346 """Reset user password and logout all devices""" 

347 try: 

348 result = await db.execute( 

349 select(Users).where(Users.id == user_id) 

350 ) 

351 user = result.scalar_one_or_none() 

352 if not user: 

353 raise NotFoundException("User not found") 

354 

355 user.hash_password = await hash_password(new_password) 

356 user.password_reset_required = True 

357 await db.commit() 

358 

359 await clear_user_all_sessions(db, redis_client, user_id) 

360 

361 return True 

362 

363 except NotFoundException: 

364 raise 

365 except Exception as e: 

366 raise ServerException(f"Failed to reset password: {str(e)}") 

367 

368async def _assign_user_role(db: AsyncSession, user_id: str, role_name: str) -> None: 

369 """Assign a role to a user""" 

370 try: 

371 role_result = await db.execute( 

372 select(Roles).where(Roles.name == role_name) 

373 ) 

374 role = role_result.scalar_one_or_none() 

375 if not role: 

376 raise NotFoundException(f"Role '{role_name}' not found") 

377 

378 existing_mapping = await db.execute( 

379 select(RoleMapper).where( 

380 RoleMapper.user_id == user_id, 

381 RoleMapper.role_id == role.id 

382 ) 

383 ) 

384 if existing_mapping.scalar_one_or_none(): 

385 return 

386 

387 role_mapping = RoleMapper( 

388 user_id=user_id, 

389 role_id=role.id 

390 ) 

391 db.add(role_mapping) 

392 await db.commit() 

393 

394 except NotFoundException: 

395 raise 

396 except Exception as e: 

397 raise ServerException(f"Failed to assign role: {str(e)}") 

398 

399async def _update_user_role(db: AsyncSession, user_id: str, role_name: Optional[str]) -> None: 

400 """Update user role (remove existing and assign new one)""" 

401 try: 

402 await db.execute( 

403 delete(RoleMapper).where(RoleMapper.user_id == user_id) 

404 ) 

405 

406 if role_name: 

407 await _assign_user_role(db, user_id, role_name) 

408 

409 await db.commit() 

410 

411 except Exception as e: 

412 raise ServerException(f"Failed to update user role: {str(e)}") 

413 

414async def _delete_user_related_records(db: AsyncSession, user_id: str) -> None: 

415 """Delete all records related to a user to avoid foreign key constraints""" 

416 try: 

417 # Delete login logs 

418 await db.execute( 

419 delete(LoginLogs).where(LoginLogs.user_id == user_id) 

420 ) 

421 

422 # Delete user sessions 

423 await db.execute( 

424 delete(UserSessions).where(UserSessions.user_id == user_id) 

425 ) 

426 

427 # Delete role mappings 

428 await db.execute( 

429 delete(RoleMapper).where(RoleMapper.user_id == user_id) 

430 ) 

431 

432 # Delete password reset tokens 

433 await db.execute( 

434 delete(PasswordResetTokens).where(PasswordResetTokens.user_id == user_id) 

435 ) 

436 

437 except Exception as e: 

438 raise ServerException(f"Failed to delete user related records: {str(e)}")