formha/main.py

230 lines
8.2 KiB
Python

from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity, get_jwt
from flask_mail import Message, Mail
from threading import Thread
from flask import Flask, render_template, redirect, url_for, flash, current_app # Añadí current_app
from flask import jsonify, make_response # Añade estas importaciones
from forms_py.cls_form_contact import ContactForm
from forms_py.cls_form_login import LogIn
from forms_py.cls_db import DBContact
from forms_py.cls_db_usr import DBForma
from forms_py.functions import db_conf_obj, generar_contrasena, hash_password, v
from forms_py.cls_recover_pswd import RecoverPswd
import os
from datetime import datetime, timedelta
from flask_bcrypt import Bcrypt
app = Flask(__name__)
bcrypt = Bcrypt(app)
email_sender = os.getenv("email_sender")
email_pswd = os.getenv("pswd_formha")
lst_email_to = ["davidix1991@gmail.com", "davicho1991@live.com"]
# Configuración de JWT (añade esto junto a tus otras configs)
app.config['JWT_SECRET_KEY'] = os.getenv('JWT_SECRET_KEY', 'super-secret-fallback-key') # Usa una clave segura en producción -> MOVER A VARIABLE DE ENTORNO
# app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1) # Token expira en 1 hora
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(minutes=30) # Token expira en 1 hora
app.config['JWT_COOKIE_SECURE'] = True # En producción debe ser True
app.config['JWT_COOKIE_CSRF_PROTECT'] = True # Recomendado para seguridad
app.config['JWT_TOKEN_LOCATION'] = ['cookies']
# FLASK EMAIL
app.config['MAIL_SERVER'] = 'smtp.gmail.com'
app.config['MAIL_PORT'] = 465
app.config['MAIL_USE_SSL'] = True
app.config['MAIL_USERNAME'] = email_sender # email en variable de entorno
app.config['MAIL_PASSWORD'] = email_pswd # contraseña en variable de entorno
app.config['SECRET_KEY'] = 'FoRmHä$2025' # Necesario para CSRF y mensajes flash -> la debo colocar en variable de entono?
mail = Mail(app)
jwt = JWTManager(app)
jsonDbContact = db_conf_obj("forma_db")
dbContact = DBContact(jsonDbContact)
dbUsers = DBForma(jsonDbContact)
def send_async_email(app, msg):
with app.app_context():
mail.send(msg)
@app.route('/')
def home():
return render_template(v['home'], active_page='home')
@app.route('/about-us')
def about_us():
return render_template(v['about-us'], active_page='about_us')
@app.route('/solutions')
def solutions():
return render_template(v['solutions'], active_page='solutions')
@app.route('/methodology')
def methodology():
return render_template(v['methodology'], active_page='methodology')
@app.route("/contact", methods=['GET', 'POST'])
def contact():
form = ContactForm()
if form.validate_on_submit(): # Corregí "validate_on_sumbit" a "validate_on_submit"
cur_date = datetime.now().strftime("%d/%m/%Y")
cur_hour = datetime.now().strftime("%H:%M")
# Procesar datos del formulario
flash('¡Gracias por contactarnos! Te responderemos pronto.', 'success')
data = (cur_date, cur_hour, form.nombre.data, form.apellido.data,
form.email.data, form.num_tel.data, form.size_co.data,
form.rol_contacto.data, form.industry_type.data, form.tipo_req.data)
dbContact.carga_contact(data)
# Configurar y enviar email asíncrono
msg = Message(
"Subject, una persona busca asesoria", sender=email_sender, recipients=lst_email_to
)
msg.html = """
<h1>Nuevo contacto recibido</h1>
<p><strong>Nombre:</strong> {} {}</p>
<p><strong>Email:</strong> {}</p>
<p><strong>Teléfono:</strong> {}</p>
<p><strong>Mensaje:</strong> {}</p>
""".format(form.nombre.data, form.apellido.data, form.email.data,
form.num_tel.data, form.tipo_req.data)
# Enviar en segundo plano
thr = Thread(
target=send_async_email,
args=(current_app._get_current_object(), msg)
)
thr.start()
return redirect(url_for('contact'))
return render_template(v['contact'], form=form, active_page='contact')
@app.route("/login", methods=['GET', 'POST'])
def login():
form = LogIn()
if form.validate_on_submit():
f_email = f"{form.email.data}".lower()
f_pswd = form.password.data
res_pswd_server = dbUsers.login((f_email))
if res_pswd_server is None:
flash('Usuario no registrado en la db', 'error')
return redirect(url_for('login'))
res_pswd_server = res_pswd_server[0]
if bcrypt.check_password_hash(res_pswd_server, f_pswd):
id_user = dbUsers.get_id(f_email)[0]
# Crear token JWT
access_token = create_access_token(identity=id_user)
# Redirigir a usr_home
response = make_response(redirect(url_for('usr_home')))
response.set_cookie('access_token_cookie', access_token, httponly=True, secure=True, samesite='Lax' )
flash('Inicio de sesión exitoso', 'success')
return response
else:
flash('Credenciales incorrectas', 'error')
return render_template(v['login'], form=form, active_page='login')
@app.route("/recover-pswd", methods=['GET', 'POST'])
def recover_pswd():
form = RecoverPswd()
if form.validate_on_submit():
f_email = f"{form.email.data}".lower()
emailPswdReco = dbUsers.reset_pswd(f_email)
if emailPswdReco is None:
flash('Email no válido', 'error')
return render_template(v['recover_pswd'], form=form, active_page='login')
emailPswdReco = emailPswdReco[0]
new_tmp_pswd = generar_contrasena()
hashed_new_pswd = hash_password(new_tmp_pswd)
# Configurar y enviar email asíncrono
msg = Message(
"Subject: Recuperación de contraseña",
sender=email_sender,
recipients=[emailPswdReco] # Asegúrate que es una lista
)
# msg.html = render_template( 'email/recover_pswd.html', temp_password=new_tmp_pswd )
msg.html = """
<h1>Nueva contraseña temporal:</h1>
<p><strong>Contraseña:</strong> {}</p>
<p><strong>Una vez iniciada tu sesión debes de cambiar la contraseña a una nueva que puedas recordar</strong></p>
""".format(new_tmp_pswd)
dbUsers.update_pswd(pswd=hashed_new_pswd, email=emailPswdReco)
# Enviar en segundo plano
thr = Thread(
target=send_async_email,
args=(current_app._get_current_object(), msg)
)
thr.start()
flash("Se ha enviado una contraseña temporal a tu correo electrónico", "success")
return redirect(url_for('login')) # Redirige en lugar de renderizar
return render_template(v['recover_pswd'], form=form, active_page='login')
@app.route('/user/home')
@jwt_required() # Protege esta ruta
def usr_home():
current_user = get_jwt_identity() # Obtiene el identity (normalmente el email)
token_data = get_jwt() # Obtiene TODOS los datos del token decodificado
# print("Token completo:", token_data)
# print("Usuario:", current_user)
return render_template(v['usr_home'], current_user=current_user, token_data=token_data)
# -------------------------------------------------------------
# Manejo de errores JWT
@jwt.expired_token_loader
def handle_expired_token(jwt_header, jwt_payload):
flash('Tu sesión ha expirado. Por favor inicia sesión nuevamente', 'warning')
return redirect(url_for('login'))
@jwt.unauthorized_loader
def handle_unauthorized_error(reason):
flash(f'Debes iniciar sesión para acceder a esta página: {reason}', 'error')
return redirect(url_for('login'))
@jwt.invalid_token_loader
def handle_invalid_token_error(reason):
flash(f'Sesión inválida: {reason}', 'error')
return redirect(url_for('login'))
@app.route("/logout")
def logout():
response = make_response(redirect(url_for('login')))
response.delete_cookie('access_token_cookie')
flash('Sesión cerrada correctamente', 'success')
return response
# -------------------------------------------------------------
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=8089)