Coverage for api/auth/services.py: 96.99%
332 statements
« prev ^ index » next coverage.py v7.9.2, created at 2026-01-25 13:05 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2026-01-25 13:05 +0000
1import ast
2import redis
3from typing import Optional
4from sqlalchemy import select, update, or_
5from urllib.parse import quote
6from models.users import Users
7from core.config import settings
8from extensions.smtp import SMTPMailer
9from models.login_logs import LoginLogs
10from datetime import datetime, timedelta
11from models.user_sessions import UserSessions
12from sqlalchemy.ext.asyncio import AsyncSession
13from utils.email_templates import (
14 PASSWORD_RESET_TEMPLATE,
15 EMAIL_VERIFICATION_TEMPLATE,
16)
17from models.password_reset_tokens import PasswordResetTokens
18from models.email_verification_tokens import EmailVerificationTokens
19from .schema import (
20 UserRegister,
21 UserLogin,
22 LoginResult,
23 SessionResult,
24 TokenValidationResponse,
25 ActionRequiredResponse,
26 PasswordResetRequiredResponse,
27)
28from core.security import (
29 verify_password,
30 create_access_token,
31 hash_password,
32 extend_session_ttl,
33 clear_user_all_sessions,
34 create_password_reset_token,
35 create_email_verification_token
36)
37from utils.custom_exception import (
38 ConflictException,
39 AuthenticationException,
40 PasswordResetRequiredException,
41 NotFoundException,
42 SMTPNotConfiguredException,
43 ServerException,
44 ValidationException,
45 EmailVerificationRequiredException,
46 RegistrationDisabledException,
47)
50async def register(
51 db: AsyncSession,
52 redis_client: redis.Redis,
53 user_data: UserRegister,
54 ip_address: str,
55 user_agent: str,
56 mailer: Optional[SMTPMailer] = None
57) -> LoginResult:
58 """User register"""
59 if not settings.REGISTRATION_ENABLE:
60 raise RegistrationDisabledException("Registration is disabled")
62 user = await _create_user(db, user_data)
64 # Check if email verification is required
65 if settings.EMAIL_VERIFICATION_ENABLE and settings.SMTP_ENABLE and mailer:
66 # Check cooldown
67 cooldown_key = f"email_verification_cooldown:{user.email}"
68 remaining_seconds = await redis_client.ttl(cooldown_key)
70 if remaining_seconds > 0:
71 # In cooldown, return 202 without data
72 await _log_login_attempt(
73 db, email=user.email,
74 ip_address=ip_address,
75 user_agent=user_agent,
76 is_success=True,
77 user_id=user.id
78 )
79 raise EmailVerificationRequiredException(
80 message="Email verification required",
81 details=None
82 )
83 else:
84 # Not in cooldown, send verification email
85 await _send_registration_verification_email(db, mailer, user)
87 # Set cooldown
88 await redis_client.setex(
89 cooldown_key,
90 settings.EMAIL_VERIFICATION_COOLDOWN_SECONDS,
91 "1"
92 )
94 await _log_login_attempt(
95 db, email=user.email,
96 ip_address=ip_address,
97 user_agent=user_agent,
98 is_success=True,
99 user_id=user.id
100 )
101 raise EmailVerificationRequiredException(
102 message="Email verification required",
103 details=None
104 )
106 session_result = await _create_user_session(
107 db, redis_client, user, ip_address, user_agent
108 )
109 await _log_login_attempt(
110 db, email=user.email,
111 ip_address=ip_address,
112 user_agent=user_agent,
113 is_success=True,
114 user_id=user.id
115 )
116 return {
117 "user": user,
118 "session_id": session_result["session_id"],
119 "access_token": session_result["access_token"]
120 }
122async def login(
123 db: AsyncSession,
124 redis_client: redis.Redis,
125 login_data: UserLogin,
126 ip_address: str,
127 user_agent: str,
128 mailer: Optional[SMTPMailer] = None
129) -> LoginResult:
130 """User login"""
131 result = await db.execute(
132 select(Users).where(Users.email == login_data.email)
133 )
134 user = result.scalar_one_or_none()
136 if not user:
137 await _log_login_attempt(
138 db, email=login_data.email,
139 ip_address=ip_address,
140 user_agent=user_agent,
141 is_success=False,
142 failure_reason="User not found"
143 )
144 raise AuthenticationException("Invalid email or password")
146 # Check if user account is disabled
147 if not user.status:
148 await _log_login_attempt(
149 db, email=login_data.email,
150 ip_address=ip_address,
151 user_agent=user_agent,
152 is_success=False,
153 failure_reason="Account disabled"
154 )
155 raise AuthenticationException("Account is disabled")
157 # Now verify password
158 if not await verify_password(login_data.password, user.hash_password):
159 await _log_login_attempt(
160 db, email=login_data.email,
161 ip_address=ip_address,
162 user_agent=user_agent,
163 is_success=False,
164 failure_reason="Invalid password"
165 )
166 raise AuthenticationException("Invalid email or password")
168 # Check if password reset is required
169 if user.password_reset_required:
170 reset_token = await create_password_reset_token(user.id, user.email)
172 reset_token_record = PasswordResetTokens(
173 user_id=user.id,
174 token=reset_token,
175 expires_at=datetime.now().astimezone() + timedelta(minutes=settings.PASSWORD_RESET_TOKEN_EXPIRE_MINUTES)
176 )
177 db.add(reset_token_record)
178 await db.commit()
180 await _log_login_attempt(
181 db, email=user.email,
182 ip_address=ip_address,
183 user_agent=user_agent,
184 is_success=True,
185 user_id=user.id
186 )
188 raise PasswordResetRequiredException(
189 message="Password reset required",
190 details=PasswordResetRequiredResponse(
191 reset_token=reset_token,
192 expires_at=reset_token_record.expires_at.isoformat() if reset_token_record.expires_at else None
193 )
194 )
196 # Check if email verification is required
197 if settings.EMAIL_VERIFICATION_ENABLE and settings.SMTP_ENABLE and mailer:
198 if not user.email_verified:
199 # Check cooldown
200 cooldown_key = f"email_verification_cooldown:{user.email}"
201 remaining_seconds = await redis_client.ttl(cooldown_key)
203 if remaining_seconds > 0:
204 # In cooldown, return 202 with cooldown time
205 await _log_login_attempt(
206 db, email=user.email,
207 ip_address=ip_address,
208 user_agent=user_agent,
209 is_success=True,
210 user_id=user.id
211 )
212 # Calculate expires_at from cooldown
213 expires_at = (datetime.now().astimezone() + timedelta(seconds=remaining_seconds)).isoformat()
214 raise EmailVerificationRequiredException(
215 message="Email verification required",
216 details=ActionRequiredResponse(
217 action_type="email_verification",
218 token=None,
219 expires_at=expires_at
220 )
221 )
222 else:
223 # Not in cooldown, send verification email
224 await _send_registration_verification_email(db, mailer, user)
226 # Set cooldown
227 await redis_client.setex(
228 cooldown_key,
229 settings.EMAIL_VERIFICATION_COOLDOWN_SECONDS,
230 "1"
231 )
233 await _log_login_attempt(
234 db, email=user.email,
235 ip_address=ip_address,
236 user_agent=user_agent,
237 is_success=True,
238 user_id=user.id
239 )
240 # Calculate expires_at from cooldown
241 expires_at = (datetime.now().astimezone() + timedelta(seconds=settings.EMAIL_VERIFICATION_COOLDOWN_SECONDS)).isoformat()
242 raise EmailVerificationRequiredException(
243 message="Email verification required",
244 details=ActionRequiredResponse(
245 action_type="email_verification",
246 token=None,
247 expires_at=expires_at
248 )
249 )
251 session_result = await _create_user_session(
252 db, redis_client, user, ip_address, user_agent
253 )
254 await _log_login_attempt(
255 db, email=user.email,
256 ip_address=ip_address,
257 user_agent=user_agent,
258 is_success=True,
259 user_id=user.id
260 )
261 return {
262 "user": user,
263 "session_id": session_result["session_id"],
264 "access_token": session_result["access_token"]
265 }
267async def logout(
268 db: AsyncSession,
269 redis_client: redis.Redis,
270 user_id: str,
271 session_id: str
272) -> bool:
273 """User logout"""
274 try:
275 redis_key = f"session:{session_id}"
276 await redis_client.delete(redis_key)
278 result = await db.execute(
279 select(UserSessions).where(
280 UserSessions.user_id == user_id,
281 UserSessions.id == session_id
282 )
283 )
284 session = result.scalar_one_or_none()
285 if session:
286 session.is_active = False
287 await db.commit()
289 return True
290 except Exception:
291 raise ServerException("Logout failed")
293async def logout_all_devices(
294 db: AsyncSession,
295 redis_client: redis.Redis,
296 user_id: str
297) -> bool:
298 """Logout user from all devices"""
299 try:
300 return await clear_user_all_sessions(db, redis_client, user_id)
301 except Exception:
302 raise ServerException("Failed to logout all devices")
304async def token(
305 db: AsyncSession,
306 redis_client: redis.Redis,
307 session_id: str
308) -> str:
309 """Use session_id (Cookie) to issue new access_token and refresh session"""
310 raw = await redis_client.get(f"session:{session_id}")
311 if not raw:
312 raise AuthenticationException("Invalid or expired session")
313 try:
314 data = ast.literal_eval(raw)
315 except Exception:
316 raise AuthenticationException("Invalid or expired session")
318 user_id = data.get("user_id")
319 if not user_id:
320 raise AuthenticationException("Invalid or expired session")
322 # Verify user exists
323 user = await _get_user_by_id(db, user_id)
324 if not user:
325 raise NotFoundException("User not found")
327 if not user.status:
328 raise AuthenticationException("Account is disabled")
330 new_access_token = await create_access_token(data={
331 "sub": user_id,
332 "email": user.email,
333 "sid": session_id
334 })
336 data["access_token"] = new_access_token
337 await extend_session_ttl(redis_client, session_id, data)
338 await _update_session_expiry(db, session_id)
340 return new_access_token
342async def reset_password(
343 db: AsyncSession,
344 redis_client: redis.Redis,
345 token: dict,
346 new_password: str,
347 ip_address: str,
348 user_agent: str
349) -> LoginResult:
350 """Reset password using token"""
351 try:
352 user_id = token.get("sub")
353 token_string = token.get("token")
355 result = await db.execute(
356 select(PasswordResetTokens).where(
357 PasswordResetTokens.token == token_string,
358 PasswordResetTokens.user_id == user_id,
359 PasswordResetTokens.is_used == False,
360 PasswordResetTokens.expires_at > datetime.now().astimezone()
361 )
362 )
363 token_record = result.scalar_one_or_none()
365 if not token_record:
366 raise AuthenticationException("Invalid or expired token")
368 result = await db.execute(
369 select(Users).where(Users.id == user_id)
370 )
371 user = result.scalar_one_or_none()
373 if not user:
374 raise NotFoundException("User not found")
376 user.hash_password = await hash_password(new_password)
377 user.password_reset_required = False
379 token_record.is_used = True
381 # Force logout all devices
382 await clear_user_all_sessions(db, redis_client, user_id)
384 # Create new session
385 session_result = await _create_user_session(
386 db, redis_client, user, ip_address, user_agent
387 )
389 await db.commit()
391 return {
392 "user": user,
393 "session_id": session_result["session_id"],
394 "access_token": session_result["access_token"]
395 }
397 except (AuthenticationException, NotFoundException):
398 raise
399 except Exception as e:
400 raise ServerException(f"Failed to reset password: {str(e)}")
402async def validate_password_reset_token(
403 db: AsyncSession,
404 token: dict
405) -> TokenValidationResponse:
406 """Validate password reset token without consuming it"""
407 try:
408 user_id = token.get("sub")
409 token_string = token.get("token")
411 result = await db.execute(
412 select(PasswordResetTokens).where(
413 PasswordResetTokens.token == token_string,
414 PasswordResetTokens.user_id == user_id,
415 PasswordResetTokens.is_used == False,
416 PasswordResetTokens.expires_at > datetime.now().astimezone()
417 )
418 )
419 token_record = result.scalar_one_or_none()
421 if not token_record:
422 raise AuthenticationException("Invalid or expired token")
424 result = await db.execute(
425 select(Users).where(Users.id == user_id)
426 )
427 user = result.scalar_one_or_none()
429 if not user or not user.status:
430 raise AuthenticationException("User not found or account disabled")
432 return TokenValidationResponse(
433 is_valid=True
434 )
436 except AuthenticationException:
437 raise
438 except Exception as e:
439 raise ServerException(f"Token validation failed: {str(e)}")
441async def forgot_password(
442 db: AsyncSession,
443 email: str,
444 mailer: SMTPMailer,
445 redis_client: redis.Redis,
446) -> dict:
447 """
448 Forgot password and send reset password email
449 """
450 try:
451 user = await _get_user_by_email_for_password_reset(db, email)
453 if not getattr(mailer, "enabled", False):
454 raise SMTPNotConfiguredException("SMTP is disabled")
456 cooldown_key = f"password_reset_cooldown:{email}"
457 remaining_seconds = await redis_client.ttl(cooldown_key)
459 if remaining_seconds > 0:
460 raise ValidationException(
461 f"Please wait {remaining_seconds} seconds before requesting another password reset email",
462 details={"cooldown_seconds": remaining_seconds}
463 )
465 token_meta = await _request_password_reset_email(db, user)
466 reset_token = token_meta["reset_token"]
467 reset_url = (
468 f"http{'s' if settings.SSL_ENABLE else ''}://"
469 f"{settings.HOSTNAME}:{settings.FRONTEND_PORT}"
470 f"/auth/reset-password?token={quote(reset_token, safe='')}"
471 )
473 # Render email template with user name and app name
474 user_name = f"{user.first_name} {user.last_name}".strip()
475 app_name = settings.PROJECT_NAME
477 email_content = PASSWORD_RESET_TEMPLATE.render(
478 reset_url=reset_url,
479 user_name=user_name,
480 app_name=app_name,
481 )
483 mailer.send_text(
484 to_emails=[email],
485 subject=email_content["subject"],
486 body=email_content["body"],
487 html_body=email_content.get("html_body"),
488 )
490 # Set cooldown period in Redis
491 await redis_client.setex(
492 cooldown_key,
493 settings.PASSWORD_RESET_EMAIL_COOLDOWN_SECONDS,
494 "1"
495 )
497 return {**token_meta, "reset_url": reset_url}
499 except (NotFoundException, AuthenticationException, SMTPNotConfiguredException, ValidationException):
500 raise
501 except Exception as e:
502 raise ServerException(f"Failed to send password reset email: {str(e)}")
505async def get_password_reset_cooldown(
506 email: str,
507 redis_client: redis.Redis,
508) -> dict:
509 """
510 Get remaining cooldown time for password reset email.
512 Returns:
513 Dict with 'cooldown_seconds' (0 if no cooldown active)
514 """
515 cooldown_key = f"password_reset_cooldown:{email}"
516 remaining_seconds = await redis_client.ttl(cooldown_key)
518 # TTL returns -1 if key exists but has no expiry, -2 if key doesn't exist
519 if remaining_seconds < 0:
520 remaining_seconds = 0
522 return {"cooldown_seconds": remaining_seconds}
524async def _update_session_expiry(db: AsyncSession, session_id: str) -> None:
525 """Update session expiry time in database"""
526 try:
527 result = await db.execute(
528 select(UserSessions).where(UserSessions.id == session_id)
529 )
530 session = result.scalar_one_or_none()
531 if session:
532 session.expires_at = datetime.now().astimezone() + timedelta(minutes=settings.SESSION_EXPIRE_MINUTES)
533 await db.commit()
534 except Exception as e:
535 raise ServerException(f"Failed to update session expiry in database: {e}")
537async def _create_user(db: AsyncSession, user_data: UserRegister) -> Users:
538 try:
539 result = await db.execute(
540 select(Users).where(
541 or_(
542 Users.email == user_data.email,
543 Users.pending_email == user_data.email
544 )
545 )
546 )
547 existing_user = result.scalar_one_or_none()
548 if existing_user:
549 raise ConflictException("Email already exists")
551 user = Users(
552 first_name=user_data.first_name,
553 last_name=user_data.last_name,
554 email=user_data.email,
555 phone=user_data.phone,
556 hash_password=await hash_password(user_data.password)
557 )
558 db.add(user)
559 await db.commit()
560 await db.refresh(user)
561 return user
562 except ConflictException:
563 raise
564 except Exception as e:
565 raise ServerException(f"Failed to create user: {str(e)}")
567async def _get_user_by_id(db: AsyncSession, user_id: str) -> Optional[Users]:
568 result = await db.execute(
569 select(Users).where(Users.id == user_id)
570 )
571 return result.scalar_one_or_none()
573async def _create_user_session(
574 db: AsyncSession,
575 redis_client: redis.Redis,
576 user: Users,
577 ip_address: str,
578 user_agent: str
579) -> SessionResult:
580 try:
581 session = UserSessions(
582 user_id=user.id,
583 jwt_access_token="",
584 ip_address=ip_address,
585 user_agent=user_agent,
586 expires_at=datetime.now().astimezone() + timedelta(minutes=settings.SESSION_EXPIRE_MINUTES)
587 )
588 db.add(session)
589 await db.commit()
590 await db.refresh(session)
592 session_id = session.id
593 access_token = await create_access_token(data={
594 "sub": user.id,
595 "email": user.email,
596 "sid": session_id
597 })
599 session.jwt_access_token = access_token
600 await db.commit()
602 redis_key = f"session:{session_id}"
603 session_data = {
604 "user_id": user.id,
605 "email": user.email,
606 "ip_address": ip_address,
607 "user_agent": user_agent,
608 "access_token": access_token,
609 "created_at": datetime.now().astimezone().isoformat(),
610 "last_activity": datetime.now().astimezone().isoformat(),
611 }
613 await redis_client.setex(
614 redis_key,
615 settings.SESSION_EXPIRE_MINUTES * 60,
616 str(session_data)
617 )
619 return {
620 "session_id": session_id,
621 "access_token": access_token
622 }
623 except Exception as e:
624 raise ServerException(f"Failed to create user session: {str(e)}")
626async def _log_login_attempt(
627 db: AsyncSession,
628 email: str,
629 ip_address: str,
630 user_agent: str,
631 is_success: bool,
632 user_id: Optional[str] = None,
633 failure_reason: Optional[str] = None
634) -> None:
635 log = LoginLogs(
636 user_id=user_id,
637 email=email,
638 ip_address=ip_address,
639 user_agent=user_agent,
640 is_success=is_success,
641 failure_reason=failure_reason
642 )
643 db.add(log)
644 await db.commit()
646async def _get_user_by_email_for_password_reset(db: AsyncSession, email: str) -> Users:
647 result = await db.execute(select(Users).where(Users.email == email))
648 user = result.scalar_one_or_none()
649 if not user:
650 raise NotFoundException("User not registered")
651 if not user.status:
652 raise AuthenticationException("Account is disabled")
653 return user
655async def _request_password_reset_email(
656 db: AsyncSession,
657 user: Users,
658) -> dict:
659 """
660 Create a password reset token record for the user and return token metadata.
661 Invalidates all previous unused tokens for this user before creating a new one.
662 """
663 now = datetime.now().astimezone()
664 expires_at = now + timedelta(minutes=settings.PASSWORD_RESET_TOKEN_EXPIRE_MINUTES)
666 # Invalidate all previous unused tokens for this user
667 await db.execute(
668 update(PasswordResetTokens)
669 .where(
670 PasswordResetTokens.user_id == user.id,
671 PasswordResetTokens.is_used == False
672 )
673 .values(is_used=True)
674 )
676 reset_token = await create_password_reset_token(user.id, user.email)
677 reset_token_record = PasswordResetTokens(
678 user_id=user.id,
679 token=reset_token,
680 expires_at=expires_at,
681 )
682 db.add(reset_token_record)
683 await db.commit()
685 return {
686 "reset_token": reset_token,
687 "expires_at": expires_at,
688 "user_id": user.id,
689 }
691async def verify_email(
692 db: AsyncSession,
693 redis_client: redis.Redis,
694 token: dict,
695 ip_address: str,
696 user_agent: str
697) -> LoginResult:
698 """Verify email using token and create session"""
699 try:
700 user_id = token.get("sub")
701 email = token.get("email")
702 verification_type = token.get("verification_type")
703 token_string = token.get("token")
705 # Verify token record exists and is valid
706 result = await db.execute(
707 select(EmailVerificationTokens).where(
708 EmailVerificationTokens.token == token_string,
709 EmailVerificationTokens.user_id == user_id,
710 EmailVerificationTokens.email == email,
711 EmailVerificationTokens.token_type == verification_type,
712 EmailVerificationTokens.is_used == False,
713 EmailVerificationTokens.expires_at > datetime.now().astimezone()
714 )
715 )
716 token_record = result.scalar_one_or_none()
718 if not token_record:
719 raise AuthenticationException("Invalid or expired token")
721 # Get user
722 result = await db.execute(
723 select(Users).where(Users.id == user_id)
724 )
725 user = result.scalar_one_or_none()
727 if not user:
728 raise NotFoundException("User not found")
730 if not user.status:
731 raise AuthenticationException("Account is disabled")
733 # Mark token as used
734 token_record.is_used = True
736 if verification_type == "registration":
737 # Mark email as verified
738 user.email_verified = True
739 elif verification_type == "email_change":
740 # Update email from pending_email to email
741 if user.pending_email != email:
742 raise AuthenticationException("Email mismatch")
744 # Check if new email already exists
745 result = await db.execute(
746 select(Users).where(Users.email == email, Users.id != user_id)
747 )
748 if result.scalar_one_or_none():
749 raise ConflictException("Email already exists")
751 user.email = email
752 user.pending_email = None
753 user.email_verified = True
755 # Create session for verified user
756 session_result = await _create_user_session(
757 db, redis_client, user, ip_address, user_agent
758 )
760 await db.commit()
762 return {
763 "user": user,
764 "session_id": session_result["session_id"],
765 "access_token": session_result["access_token"]
766 }
768 except (AuthenticationException, NotFoundException, ConflictException):
769 raise
770 except Exception as e:
771 raise ServerException(f"Failed to verify email: {str(e)}")
773async def resend_verification_email(
774 db: AsyncSession,
775 email: str,
776 mailer: SMTPMailer,
777 redis_client: redis.Redis,
778) -> dict:
779 """Resend email verification"""
780 try:
781 user = await _get_user_by_email_for_password_reset(db, email)
783 if not settings.SMTP_ENABLE or not getattr(mailer, "enabled", False):
784 raise SMTPNotConfiguredException("SMTP is disabled")
786 # Check cooldown
787 cooldown_key = f"email_verification_cooldown:{email}"
788 remaining_seconds = await redis_client.ttl(cooldown_key)
790 if remaining_seconds > 0:
791 raise ValidationException(
792 f"Please wait {remaining_seconds} seconds before requesting another verification email",
793 details={"cooldown_seconds": remaining_seconds}
794 )
796 # Determine verification type
797 if user.email_verified:
798 # If email is already verified but there's a pending email, resend email change verification
799 if user.pending_email:
800 token_meta = await _request_email_change_verification_email(db, user, user.pending_email)
801 verification_url = (
802 f"http{'s' if settings.SSL_ENABLE else ''}://"
803 f"{settings.HOSTNAME}:{settings.FRONTEND_PORT}"
804 f"/auth/verify-email?token={quote(token_meta['verification_token'], safe='')}"
805 )
807 user_name = f"{user.first_name} {user.last_name}".strip()
808 app_name = settings.PROJECT_NAME
810 email_content = EMAIL_VERIFICATION_TEMPLATE.render(
811 verification_url=verification_url,
812 user_name=user_name,
813 app_name=app_name,
814 expire_minutes=settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_MINUTES,
815 )
817 mailer.send_text(
818 to_emails=[user.pending_email],
819 subject=email_content["subject"],
820 body=email_content["body"],
821 html_body=email_content.get("html_body"),
822 )
823 else:
824 raise ValidationException("Email is already verified")
825 else:
826 # Resend registration verification
827 token_meta = await _request_registration_verification_email(db, user)
828 verification_url = (
829 f"http{'s' if settings.SSL_ENABLE else ''}://"
830 f"{settings.HOSTNAME}:{settings.FRONTEND_PORT}"
831 f"/auth/verify-email?token={quote(token_meta['verification_token'], safe='')}"
832 )
834 user_name = f"{user.first_name} {user.last_name}".strip()
835 app_name = settings.PROJECT_NAME
837 email_content = EMAIL_VERIFICATION_TEMPLATE.render(
838 verification_url=verification_url,
839 user_name=user_name,
840 app_name=app_name,
841 expire_minutes=settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_MINUTES,
842 )
844 mailer.send_text(
845 to_emails=[email],
846 subject=email_content["subject"],
847 body=email_content["body"],
848 html_body=email_content.get("html_body"),
849 )
851 # Set cooldown
852 await redis_client.setex(
853 cooldown_key,
854 settings.EMAIL_VERIFICATION_COOLDOWN_SECONDS,
855 "1"
856 )
858 return {"message": "Verification email sent"}
860 except (NotFoundException, AuthenticationException, SMTPNotConfiguredException, ValidationException):
861 raise
862 except Exception as e:
863 raise ServerException(f"Failed to send verification email: {str(e)}")
865async def _send_registration_verification_email(
866 db: AsyncSession,
867 mailer: SMTPMailer,
868 user: Users
869) -> None:
870 """Send registration verification email"""
871 if not settings.SMTP_ENABLE or not getattr(mailer, "enabled", False):
872 return
874 token_meta = await _request_registration_verification_email(db, user)
875 verification_url = (
876 f"http{'s' if settings.SSL_ENABLE else ''}://"
877 f"{settings.HOSTNAME}:{settings.FRONTEND_PORT}"
878 f"/auth/verify-email?token={quote(token_meta['verification_token'], safe='')}"
879 )
881 user_name = f"{user.first_name} {user.last_name}".strip()
882 app_name = settings.PROJECT_NAME
884 email_content = EMAIL_VERIFICATION_TEMPLATE.render(
885 verification_url=verification_url,
886 user_name=user_name,
887 app_name=app_name,
888 expire_minutes=settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_MINUTES,
889 )
891 mailer.send_text(
892 to_emails=[user.email],
893 subject=email_content["subject"],
894 body=email_content["body"],
895 html_body=email_content.get("html_body"),
896 )
898async def _request_registration_verification_email(
899 db: AsyncSession,
900 user: Users,
901) -> dict:
902 """Create a registration verification token record"""
903 now = datetime.now().astimezone()
904 expires_at = now + timedelta(minutes=settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_MINUTES)
906 # Invalidate all previous unused registration tokens for this user
907 await db.execute(
908 update(EmailVerificationTokens)
909 .where(
910 EmailVerificationTokens.user_id == user.id,
911 EmailVerificationTokens.token_type == "registration",
912 EmailVerificationTokens.is_used == False
913 )
914 .values(is_used=True)
915 )
917 verification_token = await create_email_verification_token(user.id, user.email, "registration")
918 token_record = EmailVerificationTokens(
919 user_id=user.id,
920 email=user.email,
921 token=verification_token,
922 token_type="registration",
923 expires_at=expires_at,
924 )
925 db.add(token_record)
926 await db.commit()
928 return {
929 "verification_token": verification_token,
930 "expires_at": expires_at,
931 "user_id": user.id,
932 }
934async def _request_email_change_verification_email(
935 db: AsyncSession,
936 user: Users,
937 new_email: str,
938) -> dict:
939 """Create an email change verification token record"""
940 now = datetime.now().astimezone()
941 expires_at = now + timedelta(minutes=settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_MINUTES)
943 # Invalidate all previous unused email_change tokens for this user
944 await db.execute(
945 update(EmailVerificationTokens)
946 .where(
947 EmailVerificationTokens.user_id == user.id,
948 EmailVerificationTokens.token_type == "email_change",
949 EmailVerificationTokens.is_used == False
950 )
951 .values(is_used=True)
952 )
954 verification_token = await create_email_verification_token(user.id, new_email, "email_change")
955 token_record = EmailVerificationTokens(
956 user_id=user.id,
957 email=new_email,
958 token=verification_token,
959 token_type="email_change",
960 expires_at=expires_at,
961 )
962 db.add(token_record)
963 await db.commit()
965 return {
966 "verification_token": verification_token,
967 "expires_at": expires_at,
968 "user_id": user.id,
969 }