Coverage for api/account/services.py: 100.00%
77 statements
« prev ^ index » next coverage.py v7.9.2, created at 2026-01-25 13:05 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2026-01-25 13:05 +0000
1import redis
2import logging
3from typing import Optional, Tuple
4from urllib.parse import quote
5from sqlalchemy import select, or_
6from models.users import Users
7from core.config import settings
8from extensions.smtp import SMTPMailer
9from .schema import UserUpdate, PasswordChange
10from sqlalchemy.ext.asyncio import AsyncSession
11from utils.custom_exception import AuthenticationException, ServerException, SMTPNotConfiguredException
12from core.security import hash_password, verify_password, clear_user_all_sessions
13from utils.email_templates import EMAIL_VERIFICATION_TEMPLATE
14from api.auth.services import _request_email_change_verification_email
16logger = logging.getLogger(__name__)
18async def get_user_by_id(db: AsyncSession, user_id: str) -> Optional[Users]:
19 """Get user info by id"""
20 result = await db.execute(
21 select(Users).where(Users.id == user_id)
22 )
23 return result.scalar_one_or_none()
25async def update_user_profile(
26 db: AsyncSession,
27 user_id: str,
28 user_update: UserUpdate,
29 mailer: Optional[SMTPMailer] = None,
30 redis_client: Optional[redis.Redis] = None
31) -> Optional[Tuple[Users, bool]]:
32 """Update user info (excluding password)"""
33 user = await get_user_by_id(db, user_id)
34 if not user:
35 return None
37 email_change_requested = False
38 new_email = user_update.email
39 if new_email and new_email != user.email:
40 result = await db.execute(
41 select(Users).where(
42 or_(
43 Users.email == new_email,
44 Users.pending_email == new_email
45 ),
46 Users.id != user_id
47 )
48 )
49 if result.scalar_one_or_none():
50 raise ValueError("Email already exists")
52 # Defer email change until verification completes.
53 user.pending_email = new_email
54 email_change_requested = True
56 update_data = user_update.model_dump(exclude_unset=True)
57 update_data.pop("email", None)
58 for field, value in update_data.items():
59 setattr(user, field, value)
61 if email_change_requested and settings.SMTP_ENABLE and mailer and getattr(mailer, "enabled", False):
62 should_send = True
63 if redis_client:
64 cooldown_key = f"email_verification_cooldown:{new_email}"
65 remaining_seconds = await redis_client.ttl(cooldown_key)
66 try:
67 remaining_seconds = int(remaining_seconds)
68 except (TypeError, ValueError):
69 remaining_seconds = 0
70 if remaining_seconds > 0:
71 should_send = False
73 if should_send:
74 try:
75 token_meta = await _request_email_change_verification_email(db, user, new_email)
76 verification_url = (
77 f"http{'s' if settings.SSL_ENABLE else ''}://"
78 f"{settings.HOSTNAME}:{settings.FRONTEND_PORT}"
79 f"/auth/verify-email?token={quote(token_meta['verification_token'], safe='')}"
80 )
82 user_name = f"{user.first_name} {user.last_name}".strip()
83 app_name = settings.PROJECT_NAME
85 email_content = EMAIL_VERIFICATION_TEMPLATE.render(
86 verification_url=verification_url,
87 user_name=user_name,
88 app_name=app_name,
89 expire_minutes=settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_MINUTES,
90 )
92 mailer.send_text(
93 to_emails=[new_email],
94 subject=email_content["subject"],
95 body=email_content["body"],
96 html_body=email_content.get("html_body"),
97 )
99 if redis_client:
100 await redis_client.setex(
101 cooldown_key,
102 settings.EMAIL_VERIFICATION_COOLDOWN_SECONDS,
103 "1"
104 )
105 except SMTPNotConfiguredException as exc:
106 logger.warning("Skip email change verification send: %s", exc)
108 await db.commit()
109 await db.refresh(user)
110 return user, email_change_requested
112async def change_password(db: AsyncSession, user_id: str, password_change: PasswordChange, redis_client=None) -> bool:
113 """Change user password"""
114 try:
115 user = await get_user_by_id(db, user_id)
116 if not user:
117 return False
119 if not await verify_password(password_change.current_password, user.hash_password):
120 raise AuthenticationException("Current password is incorrect")
122 user.hash_password = await hash_password(password_change.new_password)
123 user.password_reset_required = False
125 if password_change.logout_all_devices and redis_client:
126 await clear_user_all_sessions(db, redis_client, user_id)
128 await db.commit()
130 return True
131 except AuthenticationException:
132 raise
133 except Exception:
134 raise ServerException("Failed to change password")