Coverage for api/auth/services.py: 96.99%

332 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2026-01-25 13:05 +0000

1import ast 

2import redis 

3from typing import Optional 

4from sqlalchemy import select, update, or_ 

5from urllib.parse import quote 

6from models.users import Users 

7from core.config import settings 

8from extensions.smtp import SMTPMailer 

9from models.login_logs import LoginLogs 

10from datetime import datetime, timedelta 

11from models.user_sessions import UserSessions 

12from sqlalchemy.ext.asyncio import AsyncSession 

13from utils.email_templates import ( 

14 PASSWORD_RESET_TEMPLATE, 

15 EMAIL_VERIFICATION_TEMPLATE, 

16) 

17from models.password_reset_tokens import PasswordResetTokens 

18from models.email_verification_tokens import EmailVerificationTokens 

19from .schema import ( 

20 UserRegister, 

21 UserLogin, 

22 LoginResult, 

23 SessionResult, 

24 TokenValidationResponse, 

25 ActionRequiredResponse, 

26 PasswordResetRequiredResponse, 

27) 

28from core.security import ( 

29 verify_password, 

30 create_access_token, 

31 hash_password, 

32 extend_session_ttl, 

33 clear_user_all_sessions, 

34 create_password_reset_token, 

35 create_email_verification_token 

36) 

37from utils.custom_exception import ( 

38 ConflictException, 

39 AuthenticationException, 

40 PasswordResetRequiredException, 

41 NotFoundException, 

42 SMTPNotConfiguredException, 

43 ServerException, 

44 ValidationException, 

45 EmailVerificationRequiredException, 

46 RegistrationDisabledException, 

47) 

48 

49 

50async def register( 

51 db: AsyncSession, 

52 redis_client: redis.Redis, 

53 user_data: UserRegister, 

54 ip_address: str, 

55 user_agent: str, 

56 mailer: Optional[SMTPMailer] = None 

57) -> LoginResult: 

58 """User register""" 

59 if not settings.REGISTRATION_ENABLE: 

60 raise RegistrationDisabledException("Registration is disabled") 

61 

62 user = await _create_user(db, user_data) 

63 

64 # Check if email verification is required 

65 if settings.EMAIL_VERIFICATION_ENABLE and settings.SMTP_ENABLE and mailer: 

66 # Check cooldown 

67 cooldown_key = f"email_verification_cooldown:{user.email}" 

68 remaining_seconds = await redis_client.ttl(cooldown_key) 

69 

70 if remaining_seconds > 0: 

71 # In cooldown, return 202 without data 

72 await _log_login_attempt( 

73 db, email=user.email, 

74 ip_address=ip_address, 

75 user_agent=user_agent, 

76 is_success=True, 

77 user_id=user.id 

78 ) 

79 raise EmailVerificationRequiredException( 

80 message="Email verification required", 

81 details=None 

82 ) 

83 else: 

84 # Not in cooldown, send verification email 

85 await _send_registration_verification_email(db, mailer, user) 

86 

87 # Set cooldown 

88 await redis_client.setex( 

89 cooldown_key, 

90 settings.EMAIL_VERIFICATION_COOLDOWN_SECONDS, 

91 "1" 

92 ) 

93 

94 await _log_login_attempt( 

95 db, email=user.email, 

96 ip_address=ip_address, 

97 user_agent=user_agent, 

98 is_success=True, 

99 user_id=user.id 

100 ) 

101 raise EmailVerificationRequiredException( 

102 message="Email verification required", 

103 details=None 

104 ) 

105 

106 session_result = await _create_user_session( 

107 db, redis_client, user, ip_address, user_agent 

108 ) 

109 await _log_login_attempt( 

110 db, email=user.email, 

111 ip_address=ip_address, 

112 user_agent=user_agent, 

113 is_success=True, 

114 user_id=user.id 

115 ) 

116 return { 

117 "user": user, 

118 "session_id": session_result["session_id"], 

119 "access_token": session_result["access_token"] 

120 } 

121 

122async def login( 

123 db: AsyncSession, 

124 redis_client: redis.Redis, 

125 login_data: UserLogin, 

126 ip_address: str, 

127 user_agent: str, 

128 mailer: Optional[SMTPMailer] = None 

129) -> LoginResult: 

130 """User login""" 

131 result = await db.execute( 

132 select(Users).where(Users.email == login_data.email) 

133 ) 

134 user = result.scalar_one_or_none() 

135 

136 if not user: 

137 await _log_login_attempt( 

138 db, email=login_data.email, 

139 ip_address=ip_address, 

140 user_agent=user_agent, 

141 is_success=False, 

142 failure_reason="User not found" 

143 ) 

144 raise AuthenticationException("Invalid email or password") 

145 

146 # Check if user account is disabled 

147 if not user.status: 

148 await _log_login_attempt( 

149 db, email=login_data.email, 

150 ip_address=ip_address, 

151 user_agent=user_agent, 

152 is_success=False, 

153 failure_reason="Account disabled" 

154 ) 

155 raise AuthenticationException("Account is disabled") 

156 

157 # Now verify password 

158 if not await verify_password(login_data.password, user.hash_password): 

159 await _log_login_attempt( 

160 db, email=login_data.email, 

161 ip_address=ip_address, 

162 user_agent=user_agent, 

163 is_success=False, 

164 failure_reason="Invalid password" 

165 ) 

166 raise AuthenticationException("Invalid email or password") 

167 

168 # Check if password reset is required 

169 if user.password_reset_required: 

170 reset_token = await create_password_reset_token(user.id, user.email) 

171 

172 reset_token_record = PasswordResetTokens( 

173 user_id=user.id, 

174 token=reset_token, 

175 expires_at=datetime.now().astimezone() + timedelta(minutes=settings.PASSWORD_RESET_TOKEN_EXPIRE_MINUTES) 

176 ) 

177 db.add(reset_token_record) 

178 await db.commit() 

179 

180 await _log_login_attempt( 

181 db, email=user.email, 

182 ip_address=ip_address, 

183 user_agent=user_agent, 

184 is_success=True, 

185 user_id=user.id 

186 ) 

187 

188 raise PasswordResetRequiredException( 

189 message="Password reset required", 

190 details=PasswordResetRequiredResponse( 

191 reset_token=reset_token, 

192 expires_at=reset_token_record.expires_at.isoformat() if reset_token_record.expires_at else None 

193 ) 

194 ) 

195 

196 # Check if email verification is required 

197 if settings.EMAIL_VERIFICATION_ENABLE and settings.SMTP_ENABLE and mailer: 

198 if not user.email_verified: 

199 # Check cooldown 

200 cooldown_key = f"email_verification_cooldown:{user.email}" 

201 remaining_seconds = await redis_client.ttl(cooldown_key) 

202 

203 if remaining_seconds > 0: 

204 # In cooldown, return 202 with cooldown time 

205 await _log_login_attempt( 

206 db, email=user.email, 

207 ip_address=ip_address, 

208 user_agent=user_agent, 

209 is_success=True, 

210 user_id=user.id 

211 ) 

212 # Calculate expires_at from cooldown 

213 expires_at = (datetime.now().astimezone() + timedelta(seconds=remaining_seconds)).isoformat() 

214 raise EmailVerificationRequiredException( 

215 message="Email verification required", 

216 details=ActionRequiredResponse( 

217 action_type="email_verification", 

218 token=None, 

219 expires_at=expires_at 

220 ) 

221 ) 

222 else: 

223 # Not in cooldown, send verification email 

224 await _send_registration_verification_email(db, mailer, user) 

225 

226 # Set cooldown 

227 await redis_client.setex( 

228 cooldown_key, 

229 settings.EMAIL_VERIFICATION_COOLDOWN_SECONDS, 

230 "1" 

231 ) 

232 

233 await _log_login_attempt( 

234 db, email=user.email, 

235 ip_address=ip_address, 

236 user_agent=user_agent, 

237 is_success=True, 

238 user_id=user.id 

239 ) 

240 # Calculate expires_at from cooldown 

241 expires_at = (datetime.now().astimezone() + timedelta(seconds=settings.EMAIL_VERIFICATION_COOLDOWN_SECONDS)).isoformat() 

242 raise EmailVerificationRequiredException( 

243 message="Email verification required", 

244 details=ActionRequiredResponse( 

245 action_type="email_verification", 

246 token=None, 

247 expires_at=expires_at 

248 ) 

249 ) 

250 

251 session_result = await _create_user_session( 

252 db, redis_client, user, ip_address, user_agent 

253 ) 

254 await _log_login_attempt( 

255 db, email=user.email, 

256 ip_address=ip_address, 

257 user_agent=user_agent, 

258 is_success=True, 

259 user_id=user.id 

260 ) 

261 return { 

262 "user": user, 

263 "session_id": session_result["session_id"], 

264 "access_token": session_result["access_token"] 

265 } 

266 

267async def logout( 

268 db: AsyncSession, 

269 redis_client: redis.Redis, 

270 user_id: str, 

271 session_id: str 

272) -> bool: 

273 """User logout""" 

274 try: 

275 redis_key = f"session:{session_id}" 

276 await redis_client.delete(redis_key) 

277 

278 result = await db.execute( 

279 select(UserSessions).where( 

280 UserSessions.user_id == user_id, 

281 UserSessions.id == session_id 

282 ) 

283 ) 

284 session = result.scalar_one_or_none() 

285 if session: 

286 session.is_active = False 

287 await db.commit() 

288 

289 return True 

290 except Exception: 

291 raise ServerException("Logout failed") 

292 

293async def logout_all_devices( 

294 db: AsyncSession, 

295 redis_client: redis.Redis, 

296 user_id: str 

297) -> bool: 

298 """Logout user from all devices""" 

299 try: 

300 return await clear_user_all_sessions(db, redis_client, user_id) 

301 except Exception: 

302 raise ServerException("Failed to logout all devices") 

303 

304async def token( 

305 db: AsyncSession, 

306 redis_client: redis.Redis, 

307 session_id: str 

308) -> str: 

309 """Use session_id (Cookie) to issue new access_token and refresh session""" 

310 raw = await redis_client.get(f"session:{session_id}") 

311 if not raw: 

312 raise AuthenticationException("Invalid or expired session") 

313 try: 

314 data = ast.literal_eval(raw) 

315 except Exception: 

316 raise AuthenticationException("Invalid or expired session") 

317 

318 user_id = data.get("user_id") 

319 if not user_id: 

320 raise AuthenticationException("Invalid or expired session") 

321 

322 # Verify user exists 

323 user = await _get_user_by_id(db, user_id) 

324 if not user: 

325 raise NotFoundException("User not found") 

326 

327 if not user.status: 

328 raise AuthenticationException("Account is disabled") 

329 

330 new_access_token = await create_access_token(data={ 

331 "sub": user_id, 

332 "email": user.email, 

333 "sid": session_id 

334 }) 

335 

336 data["access_token"] = new_access_token 

337 await extend_session_ttl(redis_client, session_id, data) 

338 await _update_session_expiry(db, session_id) 

339 

340 return new_access_token 

341 

342async def reset_password( 

343 db: AsyncSession, 

344 redis_client: redis.Redis, 

345 token: dict, 

346 new_password: str, 

347 ip_address: str, 

348 user_agent: str 

349) -> LoginResult: 

350 """Reset password using token""" 

351 try: 

352 user_id = token.get("sub") 

353 token_string = token.get("token") 

354 

355 result = await db.execute( 

356 select(PasswordResetTokens).where( 

357 PasswordResetTokens.token == token_string, 

358 PasswordResetTokens.user_id == user_id, 

359 PasswordResetTokens.is_used == False, 

360 PasswordResetTokens.expires_at > datetime.now().astimezone() 

361 ) 

362 ) 

363 token_record = result.scalar_one_or_none() 

364 

365 if not token_record: 

366 raise AuthenticationException("Invalid or expired token") 

367 

368 result = await db.execute( 

369 select(Users).where(Users.id == user_id) 

370 ) 

371 user = result.scalar_one_or_none() 

372 

373 if not user: 

374 raise NotFoundException("User not found") 

375 

376 user.hash_password = await hash_password(new_password) 

377 user.password_reset_required = False 

378 

379 token_record.is_used = True 

380 

381 # Force logout all devices 

382 await clear_user_all_sessions(db, redis_client, user_id) 

383 

384 # Create new session 

385 session_result = await _create_user_session( 

386 db, redis_client, user, ip_address, user_agent 

387 ) 

388 

389 await db.commit() 

390 

391 return { 

392 "user": user, 

393 "session_id": session_result["session_id"], 

394 "access_token": session_result["access_token"] 

395 } 

396 

397 except (AuthenticationException, NotFoundException): 

398 raise 

399 except Exception as e: 

400 raise ServerException(f"Failed to reset password: {str(e)}") 

401 

402async def validate_password_reset_token( 

403 db: AsyncSession, 

404 token: dict 

405) -> TokenValidationResponse: 

406 """Validate password reset token without consuming it""" 

407 try: 

408 user_id = token.get("sub") 

409 token_string = token.get("token") 

410 

411 result = await db.execute( 

412 select(PasswordResetTokens).where( 

413 PasswordResetTokens.token == token_string, 

414 PasswordResetTokens.user_id == user_id, 

415 PasswordResetTokens.is_used == False, 

416 PasswordResetTokens.expires_at > datetime.now().astimezone() 

417 ) 

418 ) 

419 token_record = result.scalar_one_or_none() 

420 

421 if not token_record: 

422 raise AuthenticationException("Invalid or expired token") 

423 

424 result = await db.execute( 

425 select(Users).where(Users.id == user_id) 

426 ) 

427 user = result.scalar_one_or_none() 

428 

429 if not user or not user.status: 

430 raise AuthenticationException("User not found or account disabled") 

431 

432 return TokenValidationResponse( 

433 is_valid=True 

434 ) 

435 

436 except AuthenticationException: 

437 raise 

438 except Exception as e: 

439 raise ServerException(f"Token validation failed: {str(e)}") 

440 

441async def forgot_password( 

442 db: AsyncSession, 

443 email: str, 

444 mailer: SMTPMailer, 

445 redis_client: redis.Redis, 

446) -> dict: 

447 """ 

448 Forgot password and send reset password email 

449 """ 

450 try: 

451 user = await _get_user_by_email_for_password_reset(db, email) 

452 

453 if not getattr(mailer, "enabled", False): 

454 raise SMTPNotConfiguredException("SMTP is disabled") 

455 

456 cooldown_key = f"password_reset_cooldown:{email}" 

457 remaining_seconds = await redis_client.ttl(cooldown_key) 

458 

459 if remaining_seconds > 0: 

460 raise ValidationException( 

461 f"Please wait {remaining_seconds} seconds before requesting another password reset email", 

462 details={"cooldown_seconds": remaining_seconds} 

463 ) 

464 

465 token_meta = await _request_password_reset_email(db, user) 

466 reset_token = token_meta["reset_token"] 

467 reset_url = ( 

468 f"http{'s' if settings.SSL_ENABLE else ''}://" 

469 f"{settings.HOSTNAME}:{settings.FRONTEND_PORT}" 

470 f"/auth/reset-password?token={quote(reset_token, safe='')}" 

471 ) 

472 

473 # Render email template with user name and app name 

474 user_name = f"{user.first_name} {user.last_name}".strip() 

475 app_name = settings.PROJECT_NAME 

476 

477 email_content = PASSWORD_RESET_TEMPLATE.render( 

478 reset_url=reset_url, 

479 user_name=user_name, 

480 app_name=app_name, 

481 ) 

482 

483 mailer.send_text( 

484 to_emails=[email], 

485 subject=email_content["subject"], 

486 body=email_content["body"], 

487 html_body=email_content.get("html_body"), 

488 ) 

489 

490 # Set cooldown period in Redis 

491 await redis_client.setex( 

492 cooldown_key, 

493 settings.PASSWORD_RESET_EMAIL_COOLDOWN_SECONDS, 

494 "1" 

495 ) 

496 

497 return {**token_meta, "reset_url": reset_url} 

498 

499 except (NotFoundException, AuthenticationException, SMTPNotConfiguredException, ValidationException): 

500 raise 

501 except Exception as e: 

502 raise ServerException(f"Failed to send password reset email: {str(e)}") 

503 

504 

505async def get_password_reset_cooldown( 

506 email: str, 

507 redis_client: redis.Redis, 

508) -> dict: 

509 """ 

510 Get remaining cooldown time for password reset email. 

511  

512 Returns: 

513 Dict with 'cooldown_seconds' (0 if no cooldown active) 

514 """ 

515 cooldown_key = f"password_reset_cooldown:{email}" 

516 remaining_seconds = await redis_client.ttl(cooldown_key) 

517 

518 # TTL returns -1 if key exists but has no expiry, -2 if key doesn't exist 

519 if remaining_seconds < 0: 

520 remaining_seconds = 0 

521 

522 return {"cooldown_seconds": remaining_seconds} 

523 

524async def _update_session_expiry(db: AsyncSession, session_id: str) -> None: 

525 """Update session expiry time in database""" 

526 try: 

527 result = await db.execute( 

528 select(UserSessions).where(UserSessions.id == session_id) 

529 ) 

530 session = result.scalar_one_or_none() 

531 if session: 

532 session.expires_at = datetime.now().astimezone() + timedelta(minutes=settings.SESSION_EXPIRE_MINUTES) 

533 await db.commit() 

534 except Exception as e: 

535 raise ServerException(f"Failed to update session expiry in database: {e}") 

536 

537async def _create_user(db: AsyncSession, user_data: UserRegister) -> Users: 

538 try: 

539 result = await db.execute( 

540 select(Users).where( 

541 or_( 

542 Users.email == user_data.email, 

543 Users.pending_email == user_data.email 

544 ) 

545 ) 

546 ) 

547 existing_user = result.scalar_one_or_none() 

548 if existing_user: 

549 raise ConflictException("Email already exists") 

550 

551 user = Users( 

552 first_name=user_data.first_name, 

553 last_name=user_data.last_name, 

554 email=user_data.email, 

555 phone=user_data.phone, 

556 hash_password=await hash_password(user_data.password) 

557 ) 

558 db.add(user) 

559 await db.commit() 

560 await db.refresh(user) 

561 return user 

562 except ConflictException: 

563 raise 

564 except Exception as e: 

565 raise ServerException(f"Failed to create user: {str(e)}") 

566 

567async def _get_user_by_id(db: AsyncSession, user_id: str) -> Optional[Users]: 

568 result = await db.execute( 

569 select(Users).where(Users.id == user_id) 

570 ) 

571 return result.scalar_one_or_none() 

572 

573async def _create_user_session( 

574 db: AsyncSession, 

575 redis_client: redis.Redis, 

576 user: Users, 

577 ip_address: str, 

578 user_agent: str 

579) -> SessionResult: 

580 try: 

581 session = UserSessions( 

582 user_id=user.id, 

583 jwt_access_token="", 

584 ip_address=ip_address, 

585 user_agent=user_agent, 

586 expires_at=datetime.now().astimezone() + timedelta(minutes=settings.SESSION_EXPIRE_MINUTES) 

587 ) 

588 db.add(session) 

589 await db.commit() 

590 await db.refresh(session) 

591 

592 session_id = session.id 

593 access_token = await create_access_token(data={ 

594 "sub": user.id, 

595 "email": user.email, 

596 "sid": session_id 

597 }) 

598 

599 session.jwt_access_token = access_token 

600 await db.commit() 

601 

602 redis_key = f"session:{session_id}" 

603 session_data = { 

604 "user_id": user.id, 

605 "email": user.email, 

606 "ip_address": ip_address, 

607 "user_agent": user_agent, 

608 "access_token": access_token, 

609 "created_at": datetime.now().astimezone().isoformat(), 

610 "last_activity": datetime.now().astimezone().isoformat(), 

611 } 

612 

613 await redis_client.setex( 

614 redis_key, 

615 settings.SESSION_EXPIRE_MINUTES * 60, 

616 str(session_data) 

617 ) 

618 

619 return { 

620 "session_id": session_id, 

621 "access_token": access_token 

622 } 

623 except Exception as e: 

624 raise ServerException(f"Failed to create user session: {str(e)}") 

625 

626async def _log_login_attempt( 

627 db: AsyncSession, 

628 email: str, 

629 ip_address: str, 

630 user_agent: str, 

631 is_success: bool, 

632 user_id: Optional[str] = None, 

633 failure_reason: Optional[str] = None 

634) -> None: 

635 log = LoginLogs( 

636 user_id=user_id, 

637 email=email, 

638 ip_address=ip_address, 

639 user_agent=user_agent, 

640 is_success=is_success, 

641 failure_reason=failure_reason 

642 ) 

643 db.add(log) 

644 await db.commit() 

645 

646async def _get_user_by_email_for_password_reset(db: AsyncSession, email: str) -> Users: 

647 result = await db.execute(select(Users).where(Users.email == email)) 

648 user = result.scalar_one_or_none() 

649 if not user: 

650 raise NotFoundException("User not registered") 

651 if not user.status: 

652 raise AuthenticationException("Account is disabled") 

653 return user 

654 

655async def _request_password_reset_email( 

656 db: AsyncSession, 

657 user: Users, 

658) -> dict: 

659 """ 

660 Create a password reset token record for the user and return token metadata. 

661 Invalidates all previous unused tokens for this user before creating a new one. 

662 """ 

663 now = datetime.now().astimezone() 

664 expires_at = now + timedelta(minutes=settings.PASSWORD_RESET_TOKEN_EXPIRE_MINUTES) 

665 

666 # Invalidate all previous unused tokens for this user 

667 await db.execute( 

668 update(PasswordResetTokens) 

669 .where( 

670 PasswordResetTokens.user_id == user.id, 

671 PasswordResetTokens.is_used == False 

672 ) 

673 .values(is_used=True) 

674 ) 

675 

676 reset_token = await create_password_reset_token(user.id, user.email) 

677 reset_token_record = PasswordResetTokens( 

678 user_id=user.id, 

679 token=reset_token, 

680 expires_at=expires_at, 

681 ) 

682 db.add(reset_token_record) 

683 await db.commit() 

684 

685 return { 

686 "reset_token": reset_token, 

687 "expires_at": expires_at, 

688 "user_id": user.id, 

689 } 

690 

691async def verify_email( 

692 db: AsyncSession, 

693 redis_client: redis.Redis, 

694 token: dict, 

695 ip_address: str, 

696 user_agent: str 

697) -> LoginResult: 

698 """Verify email using token and create session""" 

699 try: 

700 user_id = token.get("sub") 

701 email = token.get("email") 

702 verification_type = token.get("verification_type") 

703 token_string = token.get("token") 

704 

705 # Verify token record exists and is valid 

706 result = await db.execute( 

707 select(EmailVerificationTokens).where( 

708 EmailVerificationTokens.token == token_string, 

709 EmailVerificationTokens.user_id == user_id, 

710 EmailVerificationTokens.email == email, 

711 EmailVerificationTokens.token_type == verification_type, 

712 EmailVerificationTokens.is_used == False, 

713 EmailVerificationTokens.expires_at > datetime.now().astimezone() 

714 ) 

715 ) 

716 token_record = result.scalar_one_or_none() 

717 

718 if not token_record: 

719 raise AuthenticationException("Invalid or expired token") 

720 

721 # Get user 

722 result = await db.execute( 

723 select(Users).where(Users.id == user_id) 

724 ) 

725 user = result.scalar_one_or_none() 

726 

727 if not user: 

728 raise NotFoundException("User not found") 

729 

730 if not user.status: 

731 raise AuthenticationException("Account is disabled") 

732 

733 # Mark token as used 

734 token_record.is_used = True 

735 

736 if verification_type == "registration": 

737 # Mark email as verified 

738 user.email_verified = True 

739 elif verification_type == "email_change": 

740 # Update email from pending_email to email 

741 if user.pending_email != email: 

742 raise AuthenticationException("Email mismatch") 

743 

744 # Check if new email already exists 

745 result = await db.execute( 

746 select(Users).where(Users.email == email, Users.id != user_id) 

747 ) 

748 if result.scalar_one_or_none(): 

749 raise ConflictException("Email already exists") 

750 

751 user.email = email 

752 user.pending_email = None 

753 user.email_verified = True 

754 

755 # Create session for verified user 

756 session_result = await _create_user_session( 

757 db, redis_client, user, ip_address, user_agent 

758 ) 

759 

760 await db.commit() 

761 

762 return { 

763 "user": user, 

764 "session_id": session_result["session_id"], 

765 "access_token": session_result["access_token"] 

766 } 

767 

768 except (AuthenticationException, NotFoundException, ConflictException): 

769 raise 

770 except Exception as e: 

771 raise ServerException(f"Failed to verify email: {str(e)}") 

772 

773async def resend_verification_email( 

774 db: AsyncSession, 

775 email: str, 

776 mailer: SMTPMailer, 

777 redis_client: redis.Redis, 

778) -> dict: 

779 """Resend email verification""" 

780 try: 

781 user = await _get_user_by_email_for_password_reset(db, email) 

782 

783 if not settings.SMTP_ENABLE or not getattr(mailer, "enabled", False): 

784 raise SMTPNotConfiguredException("SMTP is disabled") 

785 

786 # Check cooldown 

787 cooldown_key = f"email_verification_cooldown:{email}" 

788 remaining_seconds = await redis_client.ttl(cooldown_key) 

789 

790 if remaining_seconds > 0: 

791 raise ValidationException( 

792 f"Please wait {remaining_seconds} seconds before requesting another verification email", 

793 details={"cooldown_seconds": remaining_seconds} 

794 ) 

795 

796 # Determine verification type 

797 if user.email_verified: 

798 # If email is already verified but there's a pending email, resend email change verification 

799 if user.pending_email: 

800 token_meta = await _request_email_change_verification_email(db, user, user.pending_email) 

801 verification_url = ( 

802 f"http{'s' if settings.SSL_ENABLE else ''}://" 

803 f"{settings.HOSTNAME}:{settings.FRONTEND_PORT}" 

804 f"/auth/verify-email?token={quote(token_meta['verification_token'], safe='')}" 

805 ) 

806 

807 user_name = f"{user.first_name} {user.last_name}".strip() 

808 app_name = settings.PROJECT_NAME 

809 

810 email_content = EMAIL_VERIFICATION_TEMPLATE.render( 

811 verification_url=verification_url, 

812 user_name=user_name, 

813 app_name=app_name, 

814 expire_minutes=settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_MINUTES, 

815 ) 

816 

817 mailer.send_text( 

818 to_emails=[user.pending_email], 

819 subject=email_content["subject"], 

820 body=email_content["body"], 

821 html_body=email_content.get("html_body"), 

822 ) 

823 else: 

824 raise ValidationException("Email is already verified") 

825 else: 

826 # Resend registration verification 

827 token_meta = await _request_registration_verification_email(db, user) 

828 verification_url = ( 

829 f"http{'s' if settings.SSL_ENABLE else ''}://" 

830 f"{settings.HOSTNAME}:{settings.FRONTEND_PORT}" 

831 f"/auth/verify-email?token={quote(token_meta['verification_token'], safe='')}" 

832 ) 

833 

834 user_name = f"{user.first_name} {user.last_name}".strip() 

835 app_name = settings.PROJECT_NAME 

836 

837 email_content = EMAIL_VERIFICATION_TEMPLATE.render( 

838 verification_url=verification_url, 

839 user_name=user_name, 

840 app_name=app_name, 

841 expire_minutes=settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_MINUTES, 

842 ) 

843 

844 mailer.send_text( 

845 to_emails=[email], 

846 subject=email_content["subject"], 

847 body=email_content["body"], 

848 html_body=email_content.get("html_body"), 

849 ) 

850 

851 # Set cooldown 

852 await redis_client.setex( 

853 cooldown_key, 

854 settings.EMAIL_VERIFICATION_COOLDOWN_SECONDS, 

855 "1" 

856 ) 

857 

858 return {"message": "Verification email sent"} 

859 

860 except (NotFoundException, AuthenticationException, SMTPNotConfiguredException, ValidationException): 

861 raise 

862 except Exception as e: 

863 raise ServerException(f"Failed to send verification email: {str(e)}") 

864 

865async def _send_registration_verification_email( 

866 db: AsyncSession, 

867 mailer: SMTPMailer, 

868 user: Users 

869) -> None: 

870 """Send registration verification email""" 

871 if not settings.SMTP_ENABLE or not getattr(mailer, "enabled", False): 

872 return 

873 

874 token_meta = await _request_registration_verification_email(db, user) 

875 verification_url = ( 

876 f"http{'s' if settings.SSL_ENABLE else ''}://" 

877 f"{settings.HOSTNAME}:{settings.FRONTEND_PORT}" 

878 f"/auth/verify-email?token={quote(token_meta['verification_token'], safe='')}" 

879 ) 

880 

881 user_name = f"{user.first_name} {user.last_name}".strip() 

882 app_name = settings.PROJECT_NAME 

883 

884 email_content = EMAIL_VERIFICATION_TEMPLATE.render( 

885 verification_url=verification_url, 

886 user_name=user_name, 

887 app_name=app_name, 

888 expire_minutes=settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_MINUTES, 

889 ) 

890 

891 mailer.send_text( 

892 to_emails=[user.email], 

893 subject=email_content["subject"], 

894 body=email_content["body"], 

895 html_body=email_content.get("html_body"), 

896 ) 

897 

898async def _request_registration_verification_email( 

899 db: AsyncSession, 

900 user: Users, 

901) -> dict: 

902 """Create a registration verification token record""" 

903 now = datetime.now().astimezone() 

904 expires_at = now + timedelta(minutes=settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_MINUTES) 

905 

906 # Invalidate all previous unused registration tokens for this user 

907 await db.execute( 

908 update(EmailVerificationTokens) 

909 .where( 

910 EmailVerificationTokens.user_id == user.id, 

911 EmailVerificationTokens.token_type == "registration", 

912 EmailVerificationTokens.is_used == False 

913 ) 

914 .values(is_used=True) 

915 ) 

916 

917 verification_token = await create_email_verification_token(user.id, user.email, "registration") 

918 token_record = EmailVerificationTokens( 

919 user_id=user.id, 

920 email=user.email, 

921 token=verification_token, 

922 token_type="registration", 

923 expires_at=expires_at, 

924 ) 

925 db.add(token_record) 

926 await db.commit() 

927 

928 return { 

929 "verification_token": verification_token, 

930 "expires_at": expires_at, 

931 "user_id": user.id, 

932 } 

933 

934async def _request_email_change_verification_email( 

935 db: AsyncSession, 

936 user: Users, 

937 new_email: str, 

938) -> dict: 

939 """Create an email change verification token record""" 

940 now = datetime.now().astimezone() 

941 expires_at = now + timedelta(minutes=settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_MINUTES) 

942 

943 # Invalidate all previous unused email_change tokens for this user 

944 await db.execute( 

945 update(EmailVerificationTokens) 

946 .where( 

947 EmailVerificationTokens.user_id == user.id, 

948 EmailVerificationTokens.token_type == "email_change", 

949 EmailVerificationTokens.is_used == False 

950 ) 

951 .values(is_used=True) 

952 ) 

953 

954 verification_token = await create_email_verification_token(user.id, new_email, "email_change") 

955 token_record = EmailVerificationTokens( 

956 user_id=user.id, 

957 email=new_email, 

958 token=verification_token, 

959 token_type="email_change", 

960 expires_at=expires_at, 

961 ) 

962 db.add(token_record) 

963 await db.commit() 

964 

965 return { 

966 "verification_token": verification_token, 

967 "expires_at": expires_at, 

968 "user_id": user.id, 

969 }