import logging import smtplib import jinja2 from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, Optional import emails from emails.template import JinjaTemplate from jose import jwt from app.core.config import settings from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText # def render_template(template, kwargs): # templateLoader = jinja2.FileSystemLoader(searchpath="./templates") # templateEnv = jinja2.Environment(loader=templateLoader) # templ = templateEnv.get_template(template) # return templ.render(kwargs) def send_email( email_to: str, subject_template: str = "", html_template: str = "", environment: Dict[str, Any] = {}, ) -> None: assert settings.EMAILS_ENABLED, "no provided configuration for email variables" m_m = MIMEMultipart('mixed') m_m['From'] = settings.EMAILS_FROM_EMAIL m_m['Subject'] = (subject_template) m_m['To'] = email_to body = JinjaTemplate(html_template).render(**environment) m_m.attach(MIMEText(body,'html')) server = smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT) server.login(settings.SMTP_USER, settings.SMTP_PASSWORD) server.sendmail(settings.EMAILS_FROM_EMAIL, email_to, m_m.as_string()) server.quit() def send_test_email(email_to: str) -> None: project_name = settings.PROJECT_NAME subject = f"{project_name} - Test email" with open(Path(settings.EMAIL_TEMPLATES_DIR) / "test_email.html") as f: template_str = f.read() send_email( email_to=email_to, subject_template=subject, html_template=template_str, environment={"project_name": settings.PROJECT_NAME, "email": email_to}, ) def send_reset_password_email(email_to: str, email: str, token: str) -> None: project_name = settings.PROJECT_NAME subject = f"{project_name} - Password recovery for user {email}" # with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f: with open("/home/conrad/creator/app/email-templates/reset_password.html") as f: template_str = f.read() server_host = settings.SERVER_HOST link = f"{server_host}/api/v1/reset-password?token={token}" send_email( email_to=email_to, subject_template=subject, html_template=template_str, environment={ "project_name": settings.PROJECT_NAME, "username": email, "email": email_to, "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS, "link": link, }, ) def send_new_account_email(email_to: str, username: str, password: str) -> None: project_name = settings.PROJECT_NAME subject = f"{project_name} - New account for user {username}" # with open(Path(settings.EMAIL_TEMPLATES_DIR) / "new_account.html") as f: with open("/home/conrad/creator/app/email-templates/new_account.html") as f: template_str = f.read() link = settings.SERVER_HOST send_email( email_to=email_to, subject_template=subject, html_template=template_str, environment={ "project_name": settings.PROJECT_NAME, "username": username, "password": password, "email": email_to, "link": link, }, ) def generate_password_reset_token(email: str) -> str: delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS) now = datetime.utcnow() expires = now + delta exp = expires.timestamp() encoded_jwt = jwt.encode( {"exp": exp, "nbf": now, "sub": email}, settings.SECRET_KEY, algorithm="HS256", ) return encoded_jwt def verify_password_reset_token(token: str) -> Optional[str]: try: decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) return decoded_token["email"] except jwt.JWTError: return None class DFAFilter(): '''Filter Messages from keywords Use DFA to keep algorithm perform constantly >>> f = DFAFilter() >>> f.add("sexy") >>> f.filter("hello sexy baby") hello **** baby ''' def __init__(self): self.keyword_chains = {} self.delimit = '\x00' def add(self, keyword): # if not isinstance(keyword, unicode): # keyword = keyword.decode('utf-8') keyword = keyword.lower() chars = keyword.strip() if not chars: return level = self.keyword_chains for i in range(len(chars)): if chars[i] in level: level = level[chars[i]] else: if not isinstance(level, dict): break for j in range(i, len(chars)): level[chars[j]] = {} last_level, last_char = level, chars[j] level = level[chars[j]] last_level[last_char] = {self.delimit: 0} break if i == len(chars) - 1: level[self.delimit] = 0 def parse(self, path): with open(path) as f: for keyword in f: self.add(keyword) def filter(self, message, repl="*"): # if not isinstance(message, unicode): # message = message.decode('utf-8') message = message.lower() ret = [] start = 0 while start < len(message): level = self.keyword_chains step_ins = 0 for char in message[start:]: if char in level: step_ins += 1 if self.delimit not in level[char]: level = level[char] else: ret.append(repl * step_ins) start += step_ins - 1 break else: ret.append(message[start]) break else: ret.append(message[start]) start += 1 return ''.join(ret) class NaiveFilter(): '''Filter Messages from keywords very simple filter implementation >>> f = NaiveFilter() >>> f.add("sexy") >>> f.filter("hello sexy baby") hello **** baby ''' def __init__(self): self.keywords = set([]) def parse(self, path): for keyword in open(path): self.keywords.add(keyword.replace('\n',"")) def filter(self, message, repl="*"): # message = unicode(message).lower() for kw in self.keywords: message = message.replace(kw, repl) return message