utils.py 920 B

1234567891011121314151617181920212223242526272829303132333435
  1. from typing import Any
  2. from fastapi import APIRouter, Depends
  3. from pydantic.networks import EmailStr
  4. from app import models, schemas
  5. from app.api import deps
  6. from app.core.celery_app import celery_app
  7. from app.utils import send_test_email
  8. router = APIRouter()
  9. @router.post("/test-celery/", response_model=schemas.Msg, status_code=201)
  10. def test_celery(
  11. msg: schemas.Msg,
  12. current_user: models.User = Depends(deps.get_current_active_superuser),
  13. ) -> Any:
  14. """
  15. Test Celery worker.
  16. """
  17. celery_app.send_task("app.worker.test_celery", args=[msg.msg])
  18. return {"msg": "Word received"}
  19. @router.post("/test-email/", response_model=schemas.Msg, status_code=201)
  20. def test_email(
  21. email_to: EmailStr,
  22. current_user: models.User = Depends(deps.get_current_active_superuser),
  23. ) -> Any:
  24. """
  25. Test emails.
  26. """
  27. send_test_email(email_to=email_to)
  28. return {"msg": "Test email sent"}