Coverage for api/users/services.py: 90.59%
202 statements
« prev ^ index » next coverage.py v7.9.2, created at 2026-01-25 13:05 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2026-01-25 13:05 +0000
1import redis
2import logging
3from models.users import Users
4from models.roles import Roles
5from models.role_mapper import RoleMapper
6from models.login_logs import LoginLogs
7from models.user_sessions import UserSessions
8from models.password_reset_tokens import PasswordResetTokens
9from typing import Optional, List
10from sqlalchemy.ext.asyncio import AsyncSession
11from sqlalchemy import select, func, or_, delete, case
12from core.security import hash_password, clear_user_all_sessions
13from .schema import UserResponse, UserPagination, UserCreate, UserUpdate, UserDeleteBatchResponse, UserDeleteResult
14from utils.custom_exception import ServerException, ConflictException, NotFoundException
16logger = logging.getLogger(__name__)
18async def get_all_users(
19 db: AsyncSession,
20 keyword: Optional[str] = None,
21 status: Optional[str] = None,
22 role: Optional[str] = None,
23 page: int = 1,
24 per_page: int = 10,
25 sort_by: Optional[str] = None,
26 desc: bool = False
27) -> UserPagination:
28 """Get all users list"""
29 try:
30 query = select(Users)
32 if keyword:
33 query = query.where(
34 or_(
35 Users.first_name.ilike(f"%{keyword}%"),
36 Users.last_name.ilike(f"%{keyword}%"),
37 Users.email.ilike(f"%{keyword}%")
38 )
39 )
41 if status:
42 status_list = [s.strip().lower() == 'true' for s in status.split(',')]
43 if len(status_list) == 1:
44 query = query.where(Users.status == status_list[0])
45 else:
46 query = query.where(Users.status.in_(status_list))
48 has_role_join = False
50 if role:
51 role_list = [r.strip() for r in role.split(',')]
52 query = query.join(RoleMapper, Users.id == RoleMapper.user_id)
53 query = query.join(Roles, RoleMapper.role_id == Roles.id)
54 query = query.where(Roles.name.in_(role_list))
55 has_role_join = True
57 if sort_by:
58 if sort_by == "role":
59 # For role sorting, use LEFT JOIN with distinct to avoid duplicates
60 if not has_role_join:
61 query = query.outerjoin(RoleMapper, Users.id == RoleMapper.user_id)
62 query = query.outerjoin(Roles, RoleMapper.role_id == Roles.id)
63 # Use distinct to avoid duplicate rows when a user has multiple roles
64 query = query.distinct()
65 null_order = case((Roles.name.is_(None), 1), else_=0)
66 if desc:
67 query = query.order_by(null_order.asc(), Roles.name.desc(), Users.id)
68 else:
69 query = query.order_by(null_order.asc(), Roles.name.asc(), Users.id)
70 else:
71 # For other fields, use direct column access
72 sort_column = getattr(Users, sort_by, None)
73 if sort_column:
74 if desc:
75 query = query.order_by(sort_column.desc())
76 else:
77 query = query.order_by(sort_column.asc())
78 else:
79 query = query.order_by(Users.created_at.desc())
80 else:
81 query = query.order_by(Users.id.asc())
83 count_query = select(func.count(Users.id))
84 if keyword:
85 count_query = count_query.where(
86 or_(
87 Users.first_name.ilike(f"%{keyword}%"),
88 Users.last_name.ilike(f"%{keyword}%"),
89 Users.email.ilike(f"%{keyword}%")
90 )
91 )
92 if status:
93 status_list = [s.strip().lower() == 'true' for s in status.split(',')]
94 if len(status_list) == 1:
95 count_query = count_query.where(Users.status == status_list[0])
96 else:
97 count_query = count_query.where(Users.status.in_(status_list))
98 if role:
99 role_list = [r.strip() for r in role.split(',')]
100 count_query = count_query.join(RoleMapper, Users.id == RoleMapper.user_id)
101 count_query = count_query.join(Roles, RoleMapper.role_id == Roles.id)
102 count_query = count_query.where(Roles.name.in_(role_list))
104 total_result = await db.execute(count_query)
105 total = total_result.scalar()
107 offset = (page - 1) * per_page
108 query = query.offset(offset).limit(per_page)
110 result = await db.execute(query)
111 users = result.scalars().all()
113 if not users:
114 return UserPagination(
115 users=[],
116 total=0,
117 page=page,
118 per_page=per_page,
119 total_pages=0
120 )
122 user_responses = []
123 for user in users:
124 role_query = select(Roles.name).join(
125 RoleMapper, Roles.id == RoleMapper.role_id
126 ).where(RoleMapper.user_id == user.id).limit(1)
128 role_result = await db.execute(role_query)
129 user_role = role_result.scalar()
131 user_response = UserResponse(
132 id=user.id,
133 email=user.email,
134 first_name=user.first_name,
135 last_name=user.last_name,
136 phone=user.phone,
137 status=user.status,
138 created_at=user.created_at,
139 role=user_role
140 )
141 user_responses.append(user_response)
143 total_pages = (total + per_page - 1) // per_page
145 return UserPagination(
146 users=user_responses,
147 total=total,
148 page=page,
149 per_page=per_page,
150 total_pages=total_pages
151 )
153 except Exception as e:
154 raise ServerException(f"Failed to retrieve users: {str(e)}")
156async def create_user(db: AsyncSession, user_data: UserCreate) -> UserResponse:
157 """Create a new user"""
158 try:
159 # Check if the email already exists
160 result = await db.execute(
161 select(Users).where(
162 or_(
163 Users.email == user_data.email,
164 Users.pending_email == user_data.email
165 )
166 )
167 )
168 existing_user = result.scalar_one_or_none()
169 if existing_user:
170 raise ConflictException("Email already exists")
172 user = Users(
173 first_name=user_data.first_name,
174 last_name=user_data.last_name,
175 email=user_data.email,
176 phone=user_data.phone,
177 hash_password=await hash_password(user_data.password),
178 status=user_data.status
179 )
181 db.add(user)
182 await db.commit()
183 await db.refresh(user)
185 user_role = None
186 if user_data.role:
187 await _assign_user_role(db, user.id, user_data.role)
188 user_role = user_data.role
190 return UserResponse(
191 id=user.id,
192 email=user.email,
193 first_name=user.first_name,
194 last_name=user.last_name,
195 phone=user.phone,
196 status=user.status,
197 created_at=user.created_at,
198 role=user_role
199 )
201 except ConflictException:
202 raise
203 except Exception as e:
204 raise ServerException(f"Failed to create user: {str(e)}")
206async def update_user(db: AsyncSession, user_id: str, user_data: UserUpdate) -> UserResponse:
207 """Update user information"""
208 try:
209 result = await db.execute(
210 select(Users).where(Users.id == user_id)
211 )
212 user = result.scalar_one_or_none()
213 if not user:
214 raise NotFoundException("User not found")
216 # Check if the email is already used by another user
217 if user_data.email and user_data.email != user.email:
218 result = await db.execute(
219 select(Users).where(
220 or_(
221 Users.email == user_data.email,
222 Users.pending_email == user_data.email
223 ),
224 Users.id != user_id
225 )
226 )
227 if result.scalar_one_or_none():
228 raise ConflictException("Email already exists")
230 update_data = user_data.model_dump(exclude_unset=True, exclude={'role'})
231 for field, value in update_data.items():
232 setattr(user, field, value)
234 await db.commit()
235 await db.refresh(user)
237 if 'role' in user_data.model_dump(exclude_unset=True):
238 await _update_user_role(db, user_id, user_data.role)
240 role_query = select(Roles.name).join(
241 RoleMapper, Roles.id == RoleMapper.role_id
242 ).where(RoleMapper.user_id == user.id).limit(1)
244 role_result = await db.execute(role_query)
245 user_role = role_result.scalar()
247 return UserResponse(
248 id=user.id,
249 email=user.email,
250 first_name=user.first_name,
251 last_name=user.last_name,
252 phone=user.phone,
253 status=user.status,
254 created_at=user.created_at,
255 role=user_role
256 )
258 except (ConflictException, NotFoundException):
259 raise
260 except Exception as e:
261 raise ServerException(f"Failed to update user: {str(e)}")
263async def delete_users(db: AsyncSession, redis_client: redis.Redis, user_ids: List[str], token: Optional[dict] = None) -> UserDeleteBatchResponse:
264 """Delete multiple users with detailed batch processing results"""
265 try:
266 results = []
267 success_count = 0
268 failed_count = 0
270 # Get current user ID from token
271 current_user_id = token.get("sub") if token else None
273 # Check which users exist
274 result = await db.execute(
275 select(Users.id).where(Users.id.in_(user_ids))
276 )
277 existing_ids = set(result.scalars().all())
279 # Process each user ID
280 for user_id in user_ids:
281 try:
282 # Skip if trying to delete own account
283 if current_user_id and user_id == current_user_id:
284 results.append(UserDeleteResult(
285 user_id=user_id,
286 status="failed",
287 message="Cannot delete your own account"
288 ))
289 failed_count += 1
290 continue
292 if user_id in existing_ids:
293 # Clear user sessions and tokens before deletion
294 await clear_user_all_sessions(db, redis_client, user_id)
296 # Delete related records first to avoid foreign key constraints
297 await _delete_user_related_records(db, user_id)
299 # Delete the user
300 await db.execute(
301 delete(Users).where(Users.id == user_id)
302 )
304 results.append(UserDeleteResult(
305 user_id=user_id,
306 status="success",
307 message="User deleted successfully"
308 ))
309 success_count += 1
310 else:
311 results.append(UserDeleteResult(
312 user_id=user_id,
313 status="failed",
314 message="User not found"
315 ))
316 failed_count += 1
318 except Exception as e:
319 results.append(UserDeleteResult(
320 user_id=user_id,
321 status="failed",
322 message=f"Failed to delete user: {str(e)}"
323 ))
324 failed_count += 1
326 # Commit all successful deletions
327 if success_count > 0:
328 await db.commit()
330 return UserDeleteBatchResponse(
331 results=results,
332 total_users=len(user_ids),
333 success_count=success_count,
334 failed_count=failed_count
335 )
337 except Exception as e:
338 raise ServerException(f"Failed to delete users: {str(e)}")
340async def reset_user_password(
341 db: AsyncSession,
342 redis_client: redis.Redis,
343 user_id: str,
344 new_password: str
345) -> bool:
346 """Reset user password and logout all devices"""
347 try:
348 result = await db.execute(
349 select(Users).where(Users.id == user_id)
350 )
351 user = result.scalar_one_or_none()
352 if not user:
353 raise NotFoundException("User not found")
355 user.hash_password = await hash_password(new_password)
356 user.password_reset_required = True
357 await db.commit()
359 await clear_user_all_sessions(db, redis_client, user_id)
361 return True
363 except NotFoundException:
364 raise
365 except Exception as e:
366 raise ServerException(f"Failed to reset password: {str(e)}")
368async def _assign_user_role(db: AsyncSession, user_id: str, role_name: str) -> None:
369 """Assign a role to a user"""
370 try:
371 role_result = await db.execute(
372 select(Roles).where(Roles.name == role_name)
373 )
374 role = role_result.scalar_one_or_none()
375 if not role:
376 raise NotFoundException(f"Role '{role_name}' not found")
378 existing_mapping = await db.execute(
379 select(RoleMapper).where(
380 RoleMapper.user_id == user_id,
381 RoleMapper.role_id == role.id
382 )
383 )
384 if existing_mapping.scalar_one_or_none():
385 return
387 role_mapping = RoleMapper(
388 user_id=user_id,
389 role_id=role.id
390 )
391 db.add(role_mapping)
392 await db.commit()
394 except NotFoundException:
395 raise
396 except Exception as e:
397 raise ServerException(f"Failed to assign role: {str(e)}")
399async def _update_user_role(db: AsyncSession, user_id: str, role_name: Optional[str]) -> None:
400 """Update user role (remove existing and assign new one)"""
401 try:
402 await db.execute(
403 delete(RoleMapper).where(RoleMapper.user_id == user_id)
404 )
406 if role_name:
407 await _assign_user_role(db, user_id, role_name)
409 await db.commit()
411 except Exception as e:
412 raise ServerException(f"Failed to update user role: {str(e)}")
414async def _delete_user_related_records(db: AsyncSession, user_id: str) -> None:
415 """Delete all records related to a user to avoid foreign key constraints"""
416 try:
417 # Delete login logs
418 await db.execute(
419 delete(LoginLogs).where(LoginLogs.user_id == user_id)
420 )
422 # Delete user sessions
423 await db.execute(
424 delete(UserSessions).where(UserSessions.user_id == user_id)
425 )
427 # Delete role mappings
428 await db.execute(
429 delete(RoleMapper).where(RoleMapper.user_id == user_id)
430 )
432 # Delete password reset tokens
433 await db.execute(
434 delete(PasswordResetTokens).where(PasswordResetTokens.user_id == user_id)
435 )
437 except Exception as e:
438 raise ServerException(f"Failed to delete user related records: {str(e)}")