|
- import logging
- import smtplib
- import jinja2
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import Any, Dict, Optional
- import emails
- from emails.template import JinjaTemplate
- from jose import jwt
- from app.core.config import settings
- from email.mime.multipart import MIMEMultipart
- from email.mime.text import MIMEText
- # def render_template(template, kwargs):
- # templateLoader = jinja2.FileSystemLoader(searchpath="./templates")
- # templateEnv = jinja2.Environment(loader=templateLoader)
- # templ = templateEnv.get_template(template)
- # return templ.render(kwargs)
- def send_email(
- email_to: str,
- subject_template: str = "",
- html_template: str = "",
- environment: Dict[str, Any] = {},
- ) -> None:
- assert settings.EMAILS_ENABLED, "no provided configuration for email variables"
- m_m = MIMEMultipart('mixed')
- m_m['From'] = settings.EMAILS_FROM_EMAIL
- m_m['Subject'] = (subject_template)
- m_m['To'] = email_to
- body = JinjaTemplate(html_template).render(**environment)
- m_m.attach(MIMEText(body,'html'))
- server = smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT)
- server.login(settings.SMTP_USER, settings.SMTP_PASSWORD)
- server.sendmail(settings.EMAILS_FROM_EMAIL, email_to, m_m.as_string())
- server.quit()
- def send_test_email(email_to: str) -> None:
- project_name = settings.PROJECT_NAME
- subject = f"{project_name} - Test email"
- with open(Path(settings.EMAIL_TEMPLATES_DIR) / "test_email.html") as f:
- template_str = f.read()
- send_email(
- email_to=email_to,
- subject_template=subject,
- html_template=template_str,
- environment={"project_name": settings.PROJECT_NAME, "email": email_to},
- )
- def send_reset_password_email(email_to: str, email: str, token: str) -> None:
- project_name = settings.PROJECT_NAME
- subject = f"{project_name} - Password recovery for user {email}"
- # with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f:
- with open("/home/conrad/creator/app/email-templates/reset_password.html") as f:
- template_str = f.read()
- server_host = settings.SERVER_HOST
- link = f"{server_host}/api/v1/reset-password?token={token}"
- send_email(
- email_to=email_to,
- subject_template=subject,
- html_template=template_str,
- environment={
- "project_name": settings.PROJECT_NAME,
- "username": email,
- "email": email_to,
- "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
- "link": link,
- },
- )
- def send_new_account_email(email_to: str, username: str, password: str) -> None:
- project_name = settings.PROJECT_NAME
- subject = f"{project_name} - New account for user {username}"
- # with open(Path(settings.EMAIL_TEMPLATES_DIR) / "new_account.html") as f:
- with open("/home/conrad/creator/app/email-templates/new_account.html") as f:
- template_str = f.read()
- link = settings.SERVER_HOST
- send_email(
- email_to=email_to,
- subject_template=subject,
- html_template=template_str,
- environment={
- "project_name": settings.PROJECT_NAME,
- "username": username,
- "password": password,
- "email": email_to,
- "link": link,
- },
- )
- def generate_password_reset_token(email: str) -> str:
- delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
- now = datetime.utcnow()
- expires = now + delta
- exp = expires.timestamp()
- encoded_jwt = jwt.encode(
- {"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256",
- )
- return encoded_jwt
- def verify_password_reset_token(token: str) -> Optional[str]:
- try:
- decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
- return decoded_token["email"]
- except jwt.JWTError:
- return None
- class DFAFilter():
- '''Filter Messages from keywords
- Use DFA to keep algorithm perform constantly
- >>> f = DFAFilter()
- >>> f.add("sexy")
- >>> f.filter("hello sexy baby")
- hello **** baby
- '''
- def __init__(self):
- self.keyword_chains = {}
- self.delimit = '\x00'
- def add(self, keyword):
- # if not isinstance(keyword, unicode):
- # keyword = keyword.decode('utf-8')
- keyword = keyword.lower()
- chars = keyword.strip()
- if not chars:
- return
- level = self.keyword_chains
- for i in range(len(chars)):
- if chars[i] in level:
- level = level[chars[i]]
- else:
- if not isinstance(level, dict):
- break
- for j in range(i, len(chars)):
- level[chars[j]] = {}
- last_level, last_char = level, chars[j]
- level = level[chars[j]]
- last_level[last_char] = {self.delimit: 0}
- break
- if i == len(chars) - 1:
- level[self.delimit] = 0
- def parse(self, path):
- with open(path) as f:
- for keyword in f:
- self.add(keyword)
- def filter(self, message, repl="*"):
- # if not isinstance(message, unicode):
- # message = message.decode('utf-8')
- message = message.lower()
- ret = []
- start = 0
- while start < len(message):
- level = self.keyword_chains
- step_ins = 0
- for char in message[start:]:
- if char in level:
- step_ins += 1
- if self.delimit not in level[char]:
- level = level[char]
- else:
- ret.append(repl * step_ins)
- start += step_ins - 1
- break
- else:
- ret.append(message[start])
- break
- else:
- ret.append(message[start])
- start += 1
- return ''.join(ret)
- class NaiveFilter():
- '''Filter Messages from keywords
- very simple filter implementation
- >>> f = NaiveFilter()
- >>> f.add("sexy")
- >>> f.filter("hello sexy baby")
- hello **** baby
- '''
- def __init__(self):
- self.keywords = set([])
- def parse(self, path):
- for keyword in open(path):
- self.keywords.add(keyword.replace('\n',""))
- def filter(self, message, repl="*"):
- # message = unicode(message).lower()
- for kw in self.keywords:
- message = message.replace(kw, repl)
- return message
|