utils.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. import logging
  2. import smtplib
  3. import jinja2
  4. from datetime import datetime, timedelta
  5. from pathlib import Path
  6. from typing import Any, Dict, Optional
  7. import emails
  8. from emails.template import JinjaTemplate
  9. from jose import jwt
  10. from app.core.config import settings
  11. from email.mime.multipart import MIMEMultipart
  12. from email.mime.text import MIMEText
  13. # def render_template(template, kwargs):
  14. # templateLoader = jinja2.FileSystemLoader(searchpath="./templates")
  15. # templateEnv = jinja2.Environment(loader=templateLoader)
  16. # templ = templateEnv.get_template(template)
  17. # return templ.render(kwargs)
  18. def send_email(
  19. email_to: str,
  20. subject_template: str = "",
  21. html_template: str = "",
  22. environment: Dict[str, Any] = {},
  23. ) -> None:
  24. assert settings.EMAILS_ENABLED, "no provided configuration for email variables"
  25. m_m = MIMEMultipart('mixed')
  26. m_m['From'] = settings.EMAILS_FROM_EMAIL
  27. m_m['Subject'] = (subject_template)
  28. m_m['To'] = email_to
  29. body = JinjaTemplate(html_template).render(**environment)
  30. m_m.attach(MIMEText(body,'html'))
  31. server = smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT)
  32. server.login(settings.SMTP_USER, settings.SMTP_PASSWORD)
  33. server.sendmail(settings.EMAILS_FROM_EMAIL, email_to, m_m.as_string())
  34. server.quit()
  35. def send_test_email(email_to: str) -> None:
  36. project_name = settings.PROJECT_NAME
  37. subject = f"{project_name} - Test email"
  38. with open(Path(settings.EMAIL_TEMPLATES_DIR) / "test_email.html") as f:
  39. template_str = f.read()
  40. send_email(
  41. email_to=email_to,
  42. subject_template=subject,
  43. html_template=template_str,
  44. environment={"project_name": settings.PROJECT_NAME, "email": email_to},
  45. )
  46. def send_reset_password_email(email_to: str, email: str, token: str) -> None:
  47. project_name = settings.PROJECT_NAME
  48. subject = f"{project_name} - Password recovery for user {email}"
  49. # with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f:
  50. with open("/home/conrad/creator/app/email-templates/reset_password.html") as f:
  51. template_str = f.read()
  52. server_host = settings.SERVER_HOST
  53. link = f"{server_host}/api/v1/reset-password?token={token}"
  54. send_email(
  55. email_to=email_to,
  56. subject_template=subject,
  57. html_template=template_str,
  58. environment={
  59. "project_name": settings.PROJECT_NAME,
  60. "username": email,
  61. "email": email_to,
  62. "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
  63. "link": link,
  64. },
  65. )
  66. def send_new_account_email(email_to: str, username: str, password: str) -> None:
  67. project_name = settings.PROJECT_NAME
  68. subject = f"{project_name} - New account for user {username}"
  69. # with open(Path(settings.EMAIL_TEMPLATES_DIR) / "new_account.html") as f:
  70. with open("/home/conrad/creator/app/email-templates/new_account.html") as f:
  71. template_str = f.read()
  72. link = settings.SERVER_HOST
  73. send_email(
  74. email_to=email_to,
  75. subject_template=subject,
  76. html_template=template_str,
  77. environment={
  78. "project_name": settings.PROJECT_NAME,
  79. "username": username,
  80. "password": password,
  81. "email": email_to,
  82. "link": link,
  83. },
  84. )
  85. def generate_password_reset_token(email: str) -> str:
  86. delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
  87. now = datetime.utcnow()
  88. expires = now + delta
  89. exp = expires.timestamp()
  90. encoded_jwt = jwt.encode(
  91. {"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256",
  92. )
  93. return encoded_jwt
  94. def verify_password_reset_token(token: str) -> Optional[str]:
  95. try:
  96. decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
  97. return decoded_token["email"]
  98. except jwt.JWTError:
  99. return None
  100. class DFAFilter():
  101. '''Filter Messages from keywords
  102. Use DFA to keep algorithm perform constantly
  103. >>> f = DFAFilter()
  104. >>> f.add("sexy")
  105. >>> f.filter("hello sexy baby")
  106. hello **** baby
  107. '''
  108. def __init__(self):
  109. self.keyword_chains = {}
  110. self.delimit = '\x00'
  111. def add(self, keyword):
  112. # if not isinstance(keyword, unicode):
  113. # keyword = keyword.decode('utf-8')
  114. keyword = keyword.lower()
  115. chars = keyword.strip()
  116. if not chars:
  117. return
  118. level = self.keyword_chains
  119. for i in range(len(chars)):
  120. if chars[i] in level:
  121. level = level[chars[i]]
  122. else:
  123. if not isinstance(level, dict):
  124. break
  125. for j in range(i, len(chars)):
  126. level[chars[j]] = {}
  127. last_level, last_char = level, chars[j]
  128. level = level[chars[j]]
  129. last_level[last_char] = {self.delimit: 0}
  130. break
  131. if i == len(chars) - 1:
  132. level[self.delimit] = 0
  133. def parse(self, path):
  134. with open(path) as f:
  135. for keyword in f:
  136. self.add(keyword)
  137. def filter(self, message, repl="*"):
  138. # if not isinstance(message, unicode):
  139. # message = message.decode('utf-8')
  140. message = message.lower()
  141. ret = []
  142. start = 0
  143. while start < len(message):
  144. level = self.keyword_chains
  145. step_ins = 0
  146. for char in message[start:]:
  147. if char in level:
  148. step_ins += 1
  149. if self.delimit not in level[char]:
  150. level = level[char]
  151. else:
  152. ret.append(repl * step_ins)
  153. start += step_ins - 1
  154. break
  155. else:
  156. ret.append(message[start])
  157. break
  158. else:
  159. ret.append(message[start])
  160. start += 1
  161. return ''.join(ret)
  162. class NaiveFilter():
  163. '''Filter Messages from keywords
  164. very simple filter implementation
  165. >>> f = NaiveFilter()
  166. >>> f.add("sexy")
  167. >>> f.filter("hello sexy baby")
  168. hello **** baby
  169. '''
  170. def __init__(self):
  171. self.keywords = set([])
  172. def parse(self, path):
  173. for keyword in open(path):
  174. self.keywords.add(keyword.replace('\n',""))
  175. def filter(self, message, repl="*"):
  176. # message = unicode(message).lower()
  177. for kw in self.keywords:
  178. message = message.replace(kw, repl)
  179. return message