Coverage for api/auth/controller.py: 97.42%

194 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2026-01-25 13:05 +0000

1import logging 

2from core.redis import get_redis 

3from core.config import settings 

4from .schema import UserResponse 

5from core.dependencies import get_db 

6from datetime import datetime, timedelta 

7from utils.get_real_ip import get_real_ip 

8from sqlalchemy.ext.asyncio import AsyncSession 

9from core.security import verify_password_reset_token, verify_token, verify_email_verification_token 

10from fastapi import APIRouter, Depends, HTTPException, Request, Response, Query 

11from utils.response import APIResponse, parse_responses, common_responses 

12from extensions.smtp import get_mailer, SMTPMailer 

13from utils.custom_exception import SMTPNotConfiguredException 

14from .schema import ( 

15 UserRegister, 

16 UserLogin, 

17 UserLoginResponse, 

18 TokenResponse, 

19 ResetPasswordRequest, 

20 TokenValidationResponse, 

21 LogoutRequest, 

22 ForgotPasswordRequest, 

23 PasswordResetCooldownResponse, 

24 ResendVerificationRequest, 

25 ActionRequiredResponse, 

26 action_required_response_examples 

27) 

28from .services import ( 

29 register, 

30 login, 

31 logout, 

32 token, 

33 logout_all_devices, 

34 reset_password, 

35 validate_password_reset_token, 

36 forgot_password, 

37 get_password_reset_cooldown, 

38 verify_email, 

39 resend_verification_email, 

40) 

41from utils.custom_exception import ( 

42 ConflictException, 

43 AuthenticationException, 

44 PasswordResetRequiredException, 

45 NotFoundException, 

46 ValidationException, 

47 EmailVerificationRequiredException, 

48 RegistrationDisabledException, 

49) 

50 

51logger = logging.getLogger(__name__) 

52router = APIRouter(tags=["Auth"]) 

53 

54@router.post( 

55 "/register", 

56 response_model=APIResponse[UserLoginResponse], 

57 response_model_exclude_none=True, 

58 summary="Register account", 

59 responses=parse_responses({ 

60 200: ("User registered successfully", UserLoginResponse), 

61 202: ("Email verification required", None), 

62 409: ("Email already exists", None), 

63 503: ("Registration is disabled", None) 

64 }, common_responses) 

65) 

66async def register_api( 

67 user_data: UserRegister, 

68 request: Request, 

69 response: Response, 

70 db: AsyncSession = Depends(get_db), 

71 redis_client = Depends(get_redis), 

72 mailer: SMTPMailer = Depends(get_mailer) 

73): 

74 try: 

75 client_ip = get_real_ip(request) 

76 user_agent = request.headers.get("user-agent", "Registration") 

77 

78 result = await register(db, redis_client, user_data, client_ip, user_agent, mailer) 

79 

80 user = result["user"] 

81 session_id = result["session_id"] 

82 access_token = result["access_token"] 

83 

84 response.set_cookie( 

85 key="session_id", 

86 value=session_id, 

87 httponly=settings.COOKIE_HTTPONLY, 

88 secure=settings.COOKIE_SECURE, 

89 samesite=settings.COOKIE_SAMESITE, 

90 max_age=settings.SESSION_EXPIRE_MINUTES * 60 

91 ) 

92 

93 user_response = UserResponse( 

94 id=user.id, 

95 first_name=user.first_name, 

96 last_name=user.last_name, 

97 email=user.email, 

98 phone=user.phone 

99 ) 

100 

101 response_data = UserLoginResponse( 

102 access_token=access_token, 

103 expires_at=datetime.now().astimezone() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES), 

104 user=user_response 

105 ) 

106 return APIResponse(code=200, message="User registered successfully", data=response_data) 

107 except EmailVerificationRequiredException as e: 

108 resp = APIResponse(code=202, message="Email verification required") 

109 raise HTTPException(status_code=202, detail=resp.dict(exclude_none=True)) 

110 except ConflictException: 

111 raise HTTPException(status_code=409, detail="Email already exists") 

112 except RegistrationDisabledException: 

113 raise HTTPException(status_code=503, detail="Registration is disabled") 

114 except Exception: 

115 raise HTTPException(status_code=500) 

116 

117@router.post( 

118 "/login", 

119 response_model=APIResponse[UserLoginResponse], 

120 response_model_exclude_none=True, 

121 summary="Login account", 

122 responses=parse_responses({ 

123 200: ("User logged in successfully", UserLoginResponse), 

124 202: ("Password reset required / Email verification required", ActionRequiredResponse, action_required_response_examples), 

125 401: ("Invalid email or password", None) 

126 }, common_responses) 

127) 

128async def login_api( 

129 user_data: UserLogin, 

130 request: Request, 

131 response: Response, 

132 db: AsyncSession = Depends(get_db), 

133 redis_client = Depends(get_redis), 

134 mailer: SMTPMailer = Depends(get_mailer) 

135): 

136 try: 

137 client_ip = get_real_ip(request) 

138 user_agent = request.headers.get("user-agent", "") 

139 

140 result = await login(db, redis_client, user_data, client_ip, user_agent, mailer) 

141 

142 user = result["user"] 

143 session_id = result["session_id"] 

144 access_token = result["access_token"] 

145 

146 user_response = UserResponse( 

147 id=user.id, 

148 first_name=user.first_name, 

149 last_name=user.last_name, 

150 email=user.email, 

151 phone=user.phone 

152 ) 

153 

154 response.set_cookie( 

155 key="session_id", 

156 value=session_id, 

157 httponly=settings.COOKIE_HTTPONLY, 

158 secure=settings.COOKIE_SECURE, 

159 samesite=settings.COOKIE_SAMESITE, 

160 max_age=settings.SESSION_EXPIRE_MINUTES * 60 

161 ) 

162 

163 response_data = UserLoginResponse( 

164 access_token=access_token, 

165 expires_at=datetime.now().astimezone() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES), 

166 user=user_response 

167 ) 

168 return APIResponse(code=200, message="User logged in successfully", data=response_data) 

169 except PasswordResetRequiredException as e: 

170 resp = APIResponse(code=202, message="Password reset required", data=e.details) 

171 raise HTTPException(status_code=202, detail=resp.dict(exclude_none=True)) 

172 except EmailVerificationRequiredException as e: 

173 resp = APIResponse(code=202, message="Email verification required", data=e.details) 

174 raise HTTPException(status_code=202, detail=resp.dict(exclude_none=True)) 

175 except AuthenticationException as e: 

176 raise HTTPException(status_code=401, detail="Invalid email or password") 

177 except Exception: 

178 raise HTTPException(status_code=500) 

179 

180@router.post( 

181 "/logout", 

182 response_model=APIResponse[None], 

183 response_model_exclude_none=True, 

184 summary="Logout account", 

185 responses=parse_responses({ 

186 200: ("User logged out successfully", None), 

187 401: ("Invalid or expired session / Invalid or expired token", None) 

188 }, common_responses) 

189) 

190async def logout_api( 

191 logout_data: LogoutRequest, 

192 token: dict = Depends(verify_token), 

193 response: Response = None, 

194 db: AsyncSession = Depends(get_db), 

195 redis_client = Depends(get_redis) 

196): 

197 """ 

198 Logout user from current device or all devices 

199  

200 Args: 

201 logout_data: Contains logout_all flag to determine logout scope 

202 """ 

203 try: 

204 user_id = token.get("sub") 

205 session_id = token.get("sid") 

206 

207 if logout_data.logout_all: 

208 # Logout from all devices 

209 if await logout_all_devices(db, redis_client, user_id): 

210 if response: 

211 response.delete_cookie("session_id") 

212 return APIResponse(code=200, message="User logged out successfully") 

213 else: 

214 # Logout from current device only 

215 if not session_id: 

216 raise AuthenticationException("Invalid or expired session") 

217 

218 if await logout(db, redis_client, user_id, session_id): 

219 if response: 

220 response.delete_cookie("session_id") 

221 return APIResponse(code=200, message="User logged out successfully") 

222 except AuthenticationException: 

223 raise HTTPException(status_code=401, detail="Invalid or expired session") 

224 except Exception: 

225 raise HTTPException(status_code=500) 

226 

227@router.post( 

228 "/token", 

229 response_model=APIResponse[TokenResponse], 

230 response_model_exclude_unset=True, 

231 summary="Refresh token", 

232 responses=parse_responses({ 

233 200: ("Token refreshed successfully", TokenResponse), 

234 401: ("Invalid or expired session", None) 

235 }, common_responses) 

236) 

237async def token_api( 

238 request: Request, 

239 db: AsyncSession = Depends(get_db), 

240 redis_client = Depends(get_redis) 

241): 

242 """Use session_id Cookie to get new access_token""" 

243 try: 

244 session_id = request.cookies.get("session_id") 

245 if not session_id: 

246 raise AuthenticationException("Invalid or expired session") 

247 new_access_token = await token(db, redis_client, session_id) 

248 response_data = TokenResponse( 

249 access_token=new_access_token, 

250 expires_at=datetime.now().astimezone() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) 

251 ) 

252 return APIResponse(code=200, message="Token refreshed successfully", data=response_data) 

253 except (AuthenticationException, NotFoundException): 

254 raise HTTPException(status_code=401, detail="Invalid or expired session") 

255 except Exception: 

256 raise HTTPException(status_code=500) 

257 

258@router.post( 

259 "/reset-password", 

260 response_model=APIResponse[UserLoginResponse], 

261 response_model_exclude_none=True, 

262 summary="Reset password with token", 

263 responses=parse_responses({ 

264 200: ("Password reset successfully", UserLoginResponse), 

265 401: ("Invalid or expired token", None), 

266 404: ("User not found", None) 

267 }, common_responses) 

268) 

269async def reset_password_api( 

270 request: Request, 

271 response: Response, 

272 request_data: ResetPasswordRequest, 

273 token: dict = Depends(verify_password_reset_token), 

274 db: AsyncSession = Depends(get_db), 

275 redis_client = Depends(get_redis) 

276): 

277 """Reset password using token""" 

278 try: 

279 client_ip = get_real_ip(request) 

280 user_agent = request.headers.get("user-agent", "") 

281 

282 result = await reset_password(db, redis_client, token, request_data.new_password, client_ip, user_agent) 

283 

284 user = result["user"] 

285 session_id = result["session_id"] 

286 access_token = result["access_token"] 

287 

288 user_response = UserResponse( 

289 id=user.id, 

290 first_name=user.first_name, 

291 last_name=user.last_name, 

292 email=user.email, 

293 phone=user.phone 

294 ) 

295 

296 response_data = UserLoginResponse( 

297 access_token=access_token, 

298 expires_at=datetime.now().astimezone() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES), 

299 user=user_response 

300 ) 

301 

302 response.set_cookie( 

303 key="session_id", 

304 value=session_id, 

305 httponly=settings.COOKIE_HTTPONLY, 

306 secure=settings.COOKIE_SECURE, 

307 samesite=settings.COOKIE_SAMESITE, 

308 max_age=settings.SESSION_EXPIRE_MINUTES * 60 

309 ) 

310 

311 return APIResponse(code=200, message="Password reset successfully", data=response_data) 

312 except AuthenticationException: 

313 raise HTTPException(status_code=401, detail="Invalid or expired token") 

314 except NotFoundException: 

315 raise HTTPException(status_code=404, detail="User not found") 

316 except Exception: 

317 raise HTTPException(status_code=500) 

318 

319@router.get( 

320 "/validate-reset-token", 

321 response_model=APIResponse[TokenValidationResponse], 

322 response_model_exclude_none=True, 

323 summary="Validate password reset token", 

324 responses=parse_responses({ 

325 200: ("Token is valid", TokenValidationResponse) 

326 }, common_responses) 

327) 

328async def validate_reset_token_api( 

329 token: dict = Depends(verify_password_reset_token), 

330 db: AsyncSession = Depends(get_db) 

331): 

332 """Validate password reset token without consuming it""" 

333 try: 

334 result = await validate_password_reset_token(db, token) 

335 return APIResponse(code=200, message="Token is valid", data=result) 

336 except AuthenticationException: 

337 raise HTTPException(status_code=401, detail="Invalid or expired token") 

338 except Exception: 

339 raise HTTPException(status_code=500) 

340 

341 

342@router.post( 

343 "/forgot-password", 

344 response_model=APIResponse[None], 

345 response_model_exclude_none=True, 

346 summary="Send reset password email", 

347 responses=parse_responses({ 

348 200: ("Reset password email sent", None), 

349 400: ("Please wait before requesting another password reset email", None), 

350 403: ("Account is disabled", None), 

351 404: ("User not registered", None), 

352 503: ("SMTP is disabled", None), 

353 }, common_responses), 

354) 

355async def forgot_password_api( 

356 request: Request, 

357 request_data: ForgotPasswordRequest, 

358 db: AsyncSession = Depends(get_db), 

359 mailer: SMTPMailer = Depends(get_mailer), 

360 redis_client = Depends(get_redis), 

361): 

362 """ 

363 Send password reset email based on input email. 

364 """ 

365 try: 

366 await forgot_password(db, request_data.email, mailer, redis_client) 

367 return APIResponse(code=200, message="Reset password email sent") 

368 except ValidationException: 

369 raise HTTPException(status_code=400, detail="Please wait before requesting another password reset email") 

370 except AuthenticationException: 

371 raise HTTPException(status_code=403, detail="Account is disabled") 

372 except NotFoundException: 

373 raise HTTPException(status_code=404, detail="User not registered") 

374 except SMTPNotConfiguredException: 

375 raise HTTPException(status_code=503, detail="SMTP is disabled") 

376 except Exception: 

377 raise HTTPException(status_code=500) 

378 

379@router.get( 

380 "/forgot-password/cooldown", 

381 response_model=APIResponse[PasswordResetCooldownResponse], 

382 response_model_exclude_none=True, 

383 summary="Get password reset email cooldown status", 

384 responses=parse_responses({ 

385 200: ("Cooldown status retrieved", PasswordResetCooldownResponse), 

386 }, common_responses), 

387) 

388async def get_password_reset_cooldown_api( 

389 email: str = Query(..., description="Email address to check cooldown for"), 

390 redis_client = Depends(get_redis), 

391): 

392 """ 

393 Get remaining cooldown time for password reset email. 

394 Returns 0 if no cooldown is active. 

395 """ 

396 try: 

397 result = await get_password_reset_cooldown(email, redis_client) 

398 response_data = PasswordResetCooldownResponse( 

399 cooldown_seconds=result["cooldown_seconds"] 

400 ) 

401 return APIResponse(code=200, message="Cooldown status retrieved", data=response_data) 

402 except Exception: 

403 raise HTTPException(status_code=500) 

404 

405@router.get( 

406 "/verify-email", 

407 response_model=APIResponse[UserLoginResponse], 

408 response_model_exclude_none=True, 

409 summary="Verify email address", 

410 responses=parse_responses({ 

411 200: ("Email verified successfully", UserLoginResponse), 

412 401: ("Invalid or expired token", None), 

413 404: ("User not found", None), 

414 409: ("Email already exists", None) 

415 }, common_responses) 

416) 

417async def verify_email_api( 

418 request: Request, 

419 response: Response, 

420 token: dict = Depends(verify_email_verification_token), 

421 db: AsyncSession = Depends(get_db), 

422 redis_client = Depends(get_redis) 

423): 

424 """Verify email address using token and create session""" 

425 try: 

426 client_ip = get_real_ip(request) 

427 user_agent = request.headers.get("user-agent", "") 

428 

429 result = await verify_email(db, redis_client, token, client_ip, user_agent) 

430 

431 user = result["user"] 

432 session_id = result["session_id"] 

433 access_token = result["access_token"] 

434 

435 user_response = UserResponse( 

436 id=user.id, 

437 first_name=user.first_name, 

438 last_name=user.last_name, 

439 email=user.email, 

440 phone=user.phone 

441 ) 

442 

443 response_data = UserLoginResponse( 

444 access_token=access_token, 

445 expires_at=datetime.now().astimezone() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES), 

446 user=user_response 

447 ) 

448 

449 response.set_cookie( 

450 key="session_id", 

451 value=session_id, 

452 httponly=settings.COOKIE_HTTPONLY, 

453 secure=settings.COOKIE_SECURE, 

454 samesite=settings.COOKIE_SAMESITE, 

455 max_age=settings.SESSION_EXPIRE_MINUTES * 60 

456 ) 

457 

458 return APIResponse(code=200, message="Email verified successfully", data=response_data) 

459 except AuthenticationException: 

460 raise HTTPException(status_code=401, detail="Invalid or expired token") 

461 except NotFoundException: 

462 raise HTTPException(status_code=404, detail="User not found") 

463 except ConflictException: 

464 raise HTTPException(status_code=409, detail="Email already exists") 

465 except Exception: 

466 raise HTTPException(status_code=500) 

467 

468@router.post( 

469 "/resend-verification", 

470 response_model=APIResponse[None], 

471 response_model_exclude_none=True, 

472 summary="Resend email verification", 

473 responses=parse_responses({ 

474 200: ("Verification email sent", None), 

475 400: ("Please wait before requesting another verification email", None), 

476 403: ("Account is disabled", None), 

477 404: ("User not registered", None), 

478 503: ("SMTP is disabled", None), 

479 }, common_responses) 

480) 

481async def resend_verification_api( 

482 request: Request, 

483 request_data: ResendVerificationRequest, 

484 db: AsyncSession = Depends(get_db), 

485 mailer: SMTPMailer = Depends(get_mailer), 

486 redis_client = Depends(get_redis), 

487): 

488 """Resend email verification email""" 

489 try: 

490 await resend_verification_email(db, request_data.email, mailer, redis_client) 

491 return APIResponse(code=200, message="Verification email sent") 

492 except ValidationException: 

493 raise HTTPException(status_code=400, detail="Please wait before requesting another verification email") 

494 except AuthenticationException: 

495 raise HTTPException(status_code=403, detail="Account is disabled") 

496 except NotFoundException: 

497 raise HTTPException(status_code=404, detail="User not registered") 

498 except SMTPNotConfiguredException: 

499 raise HTTPException(status_code=503, detail="SMTP is disabled") 

500 except Exception: 

501 raise HTTPException(status_code=500) 

502 

503@router.get( 

504 "/resend-verification/cooldown", 

505 response_model=APIResponse[PasswordResetCooldownResponse], 

506 response_model_exclude_none=True, 

507 summary="Get email verification cooldown status", 

508 responses=parse_responses({ 

509 200: ("Cooldown status retrieved", PasswordResetCooldownResponse), 

510 }, common_responses), 

511) 

512async def get_email_verification_cooldown_api( 

513 email: str = Query(..., description="Email address to check cooldown for"), 

514 redis_client = Depends(get_redis), 

515): 

516 """ 

517 Get remaining cooldown time for email verification email. 

518 Returns 0 if no cooldown is active. 

519 """ 

520 try: 

521 cooldown_key = f"email_verification_cooldown:{email}" 

522 remaining_seconds = await redis_client.ttl(cooldown_key) 

523 

524 # TTL returns -1 if key exists but has no expiry, -2 if key doesn't exist 

525 if remaining_seconds < 0: 

526 remaining_seconds = 0 

527 

528 response_data = PasswordResetCooldownResponse( 

529 cooldown_seconds=remaining_seconds 

530 ) 

531 return APIResponse(code=200, message="Cooldown status retrieved", data=response_data) 

532 except Exception: 

533 raise HTTPException(status_code=500)