Coverage for api/account/services.py: 100.00%

77 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2026-01-25 13:05 +0000

1import redis 

2import logging 

3from typing import Optional, Tuple 

4from urllib.parse import quote 

5from sqlalchemy import select, or_ 

6from models.users import Users 

7from core.config import settings 

8from extensions.smtp import SMTPMailer 

9from .schema import UserUpdate, PasswordChange 

10from sqlalchemy.ext.asyncio import AsyncSession 

11from utils.custom_exception import AuthenticationException, ServerException, SMTPNotConfiguredException 

12from core.security import hash_password, verify_password, clear_user_all_sessions 

13from utils.email_templates import EMAIL_VERIFICATION_TEMPLATE 

14from api.auth.services import _request_email_change_verification_email 

15 

16logger = logging.getLogger(__name__) 

17 

18async def get_user_by_id(db: AsyncSession, user_id: str) -> Optional[Users]: 

19 """Get user info by id""" 

20 result = await db.execute( 

21 select(Users).where(Users.id == user_id) 

22 ) 

23 return result.scalar_one_or_none() 

24 

25async def update_user_profile( 

26 db: AsyncSession, 

27 user_id: str, 

28 user_update: UserUpdate, 

29 mailer: Optional[SMTPMailer] = None, 

30 redis_client: Optional[redis.Redis] = None 

31) -> Optional[Tuple[Users, bool]]: 

32 """Update user info (excluding password)""" 

33 user = await get_user_by_id(db, user_id) 

34 if not user: 

35 return None 

36 

37 email_change_requested = False 

38 new_email = user_update.email 

39 if new_email and new_email != user.email: 

40 result = await db.execute( 

41 select(Users).where( 

42 or_( 

43 Users.email == new_email, 

44 Users.pending_email == new_email 

45 ), 

46 Users.id != user_id 

47 ) 

48 ) 

49 if result.scalar_one_or_none(): 

50 raise ValueError("Email already exists") 

51 

52 # Defer email change until verification completes. 

53 user.pending_email = new_email 

54 email_change_requested = True 

55 

56 update_data = user_update.model_dump(exclude_unset=True) 

57 update_data.pop("email", None) 

58 for field, value in update_data.items(): 

59 setattr(user, field, value) 

60 

61 if email_change_requested and settings.SMTP_ENABLE and mailer and getattr(mailer, "enabled", False): 

62 should_send = True 

63 if redis_client: 

64 cooldown_key = f"email_verification_cooldown:{new_email}" 

65 remaining_seconds = await redis_client.ttl(cooldown_key) 

66 try: 

67 remaining_seconds = int(remaining_seconds) 

68 except (TypeError, ValueError): 

69 remaining_seconds = 0 

70 if remaining_seconds > 0: 

71 should_send = False 

72 

73 if should_send: 

74 try: 

75 token_meta = await _request_email_change_verification_email(db, user, new_email) 

76 verification_url = ( 

77 f"http{'s' if settings.SSL_ENABLE else ''}://" 

78 f"{settings.HOSTNAME}:{settings.FRONTEND_PORT}" 

79 f"/auth/verify-email?token={quote(token_meta['verification_token'], safe='')}" 

80 ) 

81 

82 user_name = f"{user.first_name} {user.last_name}".strip() 

83 app_name = settings.PROJECT_NAME 

84 

85 email_content = EMAIL_VERIFICATION_TEMPLATE.render( 

86 verification_url=verification_url, 

87 user_name=user_name, 

88 app_name=app_name, 

89 expire_minutes=settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_MINUTES, 

90 ) 

91 

92 mailer.send_text( 

93 to_emails=[new_email], 

94 subject=email_content["subject"], 

95 body=email_content["body"], 

96 html_body=email_content.get("html_body"), 

97 ) 

98 

99 if redis_client: 

100 await redis_client.setex( 

101 cooldown_key, 

102 settings.EMAIL_VERIFICATION_COOLDOWN_SECONDS, 

103 "1" 

104 ) 

105 except SMTPNotConfiguredException as exc: 

106 logger.warning("Skip email change verification send: %s", exc) 

107 

108 await db.commit() 

109 await db.refresh(user) 

110 return user, email_change_requested 

111 

112async def change_password(db: AsyncSession, user_id: str, password_change: PasswordChange, redis_client=None) -> bool: 

113 """Change user password""" 

114 try: 

115 user = await get_user_by_id(db, user_id) 

116 if not user: 

117 return False 

118 

119 if not await verify_password(password_change.current_password, user.hash_password): 

120 raise AuthenticationException("Current password is incorrect") 

121 

122 user.hash_password = await hash_password(password_change.new_password) 

123 user.password_reset_required = False 

124 

125 if password_change.logout_all_devices and redis_client: 

126 await clear_user_all_sessions(db, redis_client, user_id) 

127 

128 await db.commit() 

129 

130 return True 

131 except AuthenticationException: 

132 raise 

133 except Exception: 

134 raise ServerException("Failed to change password")