formha/main.py

1330 lines
51 KiB
Python

from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity, get_jwt, decode_token, verify_jwt_in_request, set_access_cookies, get_csrf_token
from flask_mail import Message, Mail
from threading import Thread
from flask import Flask, render_template, redirect, url_for, flash, current_app, request, jsonify, make_response, send_from_directory, abort
from forms_py.cls_form_contact import ContactForm
from forms_py.cls_form_add_usr import AddUser
from forms_py.cls_form_login import LogIn
from forms_py.cls_db import DBContact
from forms_py.cls_db_usr import DBForma
from forms_py.cls_change_pswd import ChangePwsd
from forms_py.cls_form_carousel import Carousel
from forms_py.functions import db_conf_obj, generar_contrasena, hash_password, v, l_flash_msj, saludo_hr, cur_date, min_read_pst, get_date_n_time, getRandomId, hex_to_rgb, rgba_to_string
from forms_py.cls_recover_pswd import RecoverPswd
import os
from datetime import datetime, timedelta, timezone
from flask_bcrypt import Bcrypt
# from flask_wtf.csrf import CSRFProtect
from werkzeug.utils import secure_filename
import re
import json
from bs4 import BeautifulSoup
from functools import wraps
from flask_caching import Cache
import platform
folder_upload = None
folder_cache = None
if 'microsoft' in platform.uname().release.lower():
# folder_upload = os.path.join(os.getcwd(), "/static/uploads/")
folder_upload = os.path.join(os.getcwd(), "static", "uploads")
folder_cache = os.path.join(os.getcwd(), "cache")
elif platform.system() == "Linux":
# folder_upload = "/var/www/uploads/formha"
folder_upload = "/var/www/formha/static/uploads/"
folder_cache = "/var/www/formha/cache/"
app = Flask(__name__)
cache = Cache(app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': folder_cache})
# cache = Cache(app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': '/tmp/flask-formha'})
bcrypt = Bcrypt(app)
# csrf = CSRFProtect(app)
url_login = os.getenv("login_url")
email_sender = os.getenv("email_sender")
email_pswd = os.getenv("pswd_formha")
# lst_email_to = ["davidix1991@gmail.com", "davicho1991@live.com"]
# \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
# INICIO CONFIGURACIÓN JWT
app.config["JWT_SECRET_KEY"] = os.getenv("jwt_secret_key") # Usa una clave segura en producción -> MOVER A VARIABLE DE ENTORNO
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1.5) # timedelta(hours=1) Token expira en 1 hora | timedelta(minutes=1) ` timedelta(seconds=60)`
# EN LOCALHOST FALSE, EN PRODUCCIÓN TRUE
app.config['JWT_COOKIE_SECURE'] = False # True en producción con HTTPS
app.config['JWT_COOKIE_CSRF_PROTECT'] = False # True: Solo para producción, requiere HTTPS
app.config['JWT_TOKEN_LOCATION'] = ['cookies'] # Ubicación donde buscar el token
app.config['JWT_ACCESS_COOKIE_NAME'] = 'access_token_cookie' # Asegura que use el mismo nombre
# app.config['JWT_ACCESS_CSRF_COOKIE_NAME'] = 'csrf_access_token'
app.config['JWT_COOKIE_SAMESITE'] = 'Lax'
# app.config['SESSION_PERMANENT'] = False
# app.config['SESSION_TYPE'] = 'filesystem' # Asegura que la sesión no se guarde en cookies
jwt = JWTManager(app)
# FINAL CONFIGURACIÓN JWT
# /////////////////////////////////////////////////////////////////////////////////////////
# \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
# INICIO FLASK EMAIL
app.config['MAIL_SERVER'] = 'smtp.gmail.com'
app.config['MAIL_PORT'] = 465
app.config['MAIL_USE_SSL'] = True
app.config['MAIL_USERNAME'] = email_sender # email en variable de entorno
app.config['MAIL_PASSWORD'] = email_pswd # contraseña en variable de entorno
app.config['SECRET_KEY'] = os.getenv("email_secret_key")
mail = Mail(app)
def send_async_email(app, msg):
with app.app_context():
mail.send(msg)
# FINAL FLASK EMAIL
# /////////////////////////////////////////////////////////////////////////////////////////
# \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
# CONFIGURACIÓN PARA GUARDAR IMÁGENES EN EL BACKEND
# Configuración para guardar imágenes
# UPLOAD_FOLDER = 'uploads/' # Asegúrate de crear esta carpeta
# ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
# app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
# app.add_url_rule('/uploads/<filename>', endpoint='uploaded_file', view_func=send_from_directory(UPLOAD_FOLDER))
# /////////////////////////////////////////////////////////////////////////////////////////
def cur_timestamp():
return int(datetime.now(timezone.utc).timestamp())
jsonDbContact = db_conf_obj("forma_db")
dbContact = DBContact(jsonDbContact)
dbUsers = DBForma(jsonDbContact)
@app.route('/favicon.ico')
def favicon():
return send_from_directory(os.path.join(app.static_folder, 'y_img/favicon'), 'favicon.ico', mimetype='image/vnd.microsoft.icon')
def lst_email_to() -> list:
data = dbUsers.get_all_data('SELECT email FROM users WHERE is_contact_noti = true;', ())
respuesta = [ele[0] for ele in data]
return respuesta
# decorador para rutas protegidas en caso de que borres al vuelo a un usuario.
def validate_user_exists(f):
@wraps(f)
def decorated_function(*args, **kwargs):
user_id = get_jwt_identity()
q = "SELECT id FROM users WHERE id = %s;"
t = (user_id,)
exists = dbUsers.get_data(q, t) is not None
if not exists:
f_mnsj = l_flash_msj('Error', 'El usuario no existe en la base de datos.', 'error')
flash(f_mnsj)
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
# ##################################################################
#
# /$$$$$$ /$$$$$$$$ /$$ /$$ /$$$$$$$$ /$$$$$$$ /$$$$$$ /$$
# /$$__ $$| $$_____/| $$$ | $$| $$_____/| $$__ $$ /$$__ $$| $$
# | $$ \__/| $$ | $$$$| $$| $$ | $$ \ $$| $$ \ $$| $$
# | $$ /$$$$| $$$$$ | $$ $$ $$| $$$$$ | $$$$$$$/| $$$$$$$$| $$
# | $$|_ $$| $$__/ | $$ $$$$| $$__/ | $$__ $$| $$__ $$| $$
# | $$ \ $$| $$ | $$\ $$$| $$ | $$ \ $$| $$ | $$| $$
# | $$$$$$/| $$$$$$$$| $$ \ $$| $$$$$$$$| $$ | $$| $$ | $$| $$$$$$$$
# \______/ |________/|__/ \__/|________/|__/ |__/|__/ |__/|________/
#
# Font Name: Big Money-ne | https://patorjk.com/software/taag/#p=testall&f=Graffiti&t=USER
# ##################################################################
@app.errorhandler(404)
def page_not_found(e):
# puedes usar una plantilla HTML personalizada si quieres
# return render_template('404.html'), 404
mnsj = l_flash_msj("Error Sección", "La sección a la que quieres entrar no existe", "error")
flash(mnsj)
return redirect(url_for('home'))
@app.route('/')
@cache.cached(timeout=3600) # 1 hora
def home():
q = """
SELECT
id,
img_name,
bg_color,
txt_color,
txt,
TO_CHAR(created_at, 'DD/MM/YYYY HH24:MI') as fecha_creada,
url,
is_new_tab
FROM
carousel
ORDER BY
id DESC;
"""
data = dbUsers.get_all_data(q, ())
return render_template(v['home'], active_page='home', data=data)
@app.route('/about-us')
@cache.cached(timeout=43200) # 12 horas
def about_us():
return render_template(v['about-us'], active_page='about_us')
@app.route('/solutions')
@cache.cached(timeout=43200) # 12 horas
def solutions():
return render_template(v['solutions'], active_page='solutions')
@app.route('/methodology')
@cache.cached(timeout=43200) # 12 horas
def methodology():
return render_template(v['methodology'], active_page='methodology')
@app.route('/blog')
# @cache.cached(timeout=3600) # 1 hora
def blog():
q = r"""
SELECT
p.id,
u.nombre || ' ' || u.apellido AS autor,
TO_CHAR(p.created_at AT TIME ZONE 'America/Mexico_City', 'YYYY-MM-DD HH24:MI') AS author_creation,
TO_CHAR(p.updated_at AT TIME ZONE 'America/Mexico_City', 'YYYY-MM-DD HH24:MI') AS author_updated,
p.title,
SUBSTRING(p.body_no_img FROM 1 FOR 180) AS preview,
(SELECT value FROM jsonb_array_elements_text(p.lista_imagenes) LIMIT 1) AS first_img,
ROUND(length(regexp_replace(p.body_no_img, '\s+', ' ', 'g')) / 5.0 / 375, 1) AS read_time,
GREATEST(1, CEIL(length(regexp_replace(p.body_no_img, '\s+', ' ', 'g')) / 5.0 / 375)) AS read_time_min
FROM posts p
JOIN users u ON p.id_usr = u.id
ORDER BY p.created_at DESC;
"""
data = dbUsers.get_all_data_dict(q)
return render_template( v['blog']['all_posts'], active_page='blog', posts=data )
# @app.route('/blog/api/posts')
# @cache.cached(timeout=3600) # 1 hora
# def api_posts():
# q = r"""
# SELECT
# p.id,
# u.nombre || ' ' || u.apellido AS autor,
# TO_CHAR(p.created_at AT TIME ZONE 'America/Mexico_City', 'YYYY-MM-DD HH24:MI') AS author_creation,
# TO_CHAR(p.updated_at AT TIME ZONE 'America/Mexico_City', 'YYYY-MM-DD HH24:MI') AS author_updated,
# p.title,
# SUBSTRING(p.body_no_img FROM 1 FOR 180) AS preview,
# (SELECT value FROM jsonb_array_elements_text(p.lista_imagenes) LIMIT 1) AS first_img,
# ROUND(length(regexp_replace(p.body_no_img, '\s+', ' ', 'g')) / 5.0 / 375, 1) AS read_time,
# GREATEST(1, CEIL(length(regexp_replace(p.body_no_img, '\s+', ' ', 'g')) / 5.0 / 375)) AS read_time_min
# FROM posts p
# JOIN users u ON p.id_usr = u.id
# ORDER BY p.created_at DESC;
# """
# data = dbUsers.get_all_data_dict(q)
# return jsonify(data)
@app.route('/blog/<int:post_id>')
@app.route('/blog/<int:post_id>/src/<string:source_name>')
@cache.cached(timeout=1800) # 30 minutos
def blog_post(post_id, source_name=None):
# source_name = source_name or 'direct'
# source_name = source_name.lower()
# if source_name not in ['facebook', 'linkedin', 'x', 'li', 'fb', 'tw', 'direct']:
# source_name = 'None'
if source_name != None:
q_vf = "INSERT INTO visited_from (post_id, source_name) VALUES (%s, %s);"
t_vf = (post_id, source_name)
dbUsers.update_data(q_vf, t_vf)
q = fr"""
SELECT
u.nombre,
u.apellido,
TO_CHAR(p.created_at, 'DD/MM/YYYY') AS fecha_creada,
TO_CHAR(p.updated_at, 'DD/MM/YYYY') AS fecha_updated,
GREATEST(1, CEIL(length(regexp_replace(p.body_no_img, '\s+', ' ', 'g')) / 5.0 / 375)) AS read_time_min,
p.title,
p.body,
COUNT(pv.viewed) AS total_views
FROM
posts p
INNER JOIN
users u ON p.id_usr = u.id
LEFT JOIN
posts_visited pv ON pv.id_post = p.id
WHERE
p.id = %s
GROUP BY
u.nombre, u.apellido, p.created_at, p.updated_at, p.title, p.body, p.body_no_img;
"""
t = (post_id,)
data = dbUsers.get_data(q, t)
if data is None:
# Si no se encuentra el post, redirigir a la página de error 404
# return render_template('404.html'), 404
msg = l_flash_msj('Error Publicación','La publicación que intentas ver fue eliminada o no existe. 😅', 'warning')
flash(msg)
return redirect(url_for('blog'))
return render_template(v['blog']['post'], data=data)
@app.route('/blog/api/count-post-viewed')
@cache.cached(timeout=1800) # 30 minutos
def count_posts_viewed():
q = "SELECT id_post, COUNT(id_post) AS count FROM posts_visited GROUP BY id_post ORDER BY id_post;"
data = dbUsers.get_all_data_dict(q)
return jsonify(data)
@app.route('/blog/get-data', methods=['POST'])
def get_data():
data = request.get_json()
if not data:
return jsonify({"success": False, "message": "No data received"}), 400
# Ejemplo: extraer y mostrar los campos esperados
post_id = data.get("post_id")
# actualizar la base de datos
q_update = "INSERT INTO posts_visited (id_post, viewed) VALUES (%s, %s);"
d_tuple = (post_id, cur_date())
dbUsers.update_data(q_update, d_tuple)
return jsonify({"success": True}), 200
@app.route("/contact", methods=['GET', 'POST'])
def contact():
form = ContactForm()
# print(cur_date())
# get_date_n_time
if form.validate_on_submit():
# Procesar datos del formulario
c_date = cur_date()
nombre = form.nombre.data.strip()
apellido = form.apellido.data.strip()
email = form.email.data.strip()
estado = form.estado.data.strip()
num_tel = form.num_tel.data.strip()
size_co = form.size_co.data.strip()
rol_contacto = form.rol_contacto.data.strip()
industry_type = form.industry_type.data.strip()
tipo_req = form.tipo_req.data.strip()
# validar si ya se recibio info de ese email
q_val_mail = "SELECT COUNT(email) FROM contact WHERE email = %s;"
q_val_tel = "SELECT COUNT(num_tel) FROM contact WHERE num_tel = %s;"
res_mail = dbUsers.get_all_data(q_val_mail, (email,))
is_dup_mail = res_mail[0][0] if res_mail else 0
res_phone = dbUsers.get_all_data(q_val_tel, (num_tel,))
is_dup_phone = res_phone[0][0] if res_phone else 0
f_mnsj = None
if is_dup_mail > 0 or is_dup_phone > 0:
f_mnsj = l_flash_msj("Información Precargada", "Ya contamos con tu información, pronto nos pondremos en contacto", "success")
else:
obj_datetime = get_date_n_time(c_date)
hora = obj_datetime['hour']
fecha = obj_datetime['date']
data = ( c_date, nombre, apellido, email, estado, num_tel, size_co, rol_contacto, industry_type, tipo_req )
# Guardar datos en la base de datos
dbContact.carga_contact(data)
# Configurar y enviar email asíncrono
msg = Message( "Subject, una persona busca asesoria", sender=email_sender, recipients=lst_email_to() )
url_login = "https://formha.temporal.work/login"
msg.html = f"""
<div style="font-family: Arial, sans-serif;">
<img src="https://formha.temporal.work/static/y_img/logos/formha_blanco_vertical.png" alt="Escudo FORMHä" height="150" style="margin-bottom: 10px;"><br>
<h2 style="color: #333;">📩 Nuevo contacto recibido</h2>
<ul>
<li><strong>Fecha:</strong> {fecha}</li>
<li><strong>Hora:</strong> {hora}</li>
<li><strong>Nombre:</strong> {form.nombre.data} {form.apellido.data}</li>
<li><strong>Email:</strong> {form.email.data}</li>
<li><strong>Teléfono:</strong> {form.num_tel.data}</li>
<li><strong>Mensaje:</strong> {form.tipo_req.data}</li>
</ul>
<p><a href="{url_login}" target="_blank">👉 Iniciar Sesión</a></p>
</div>
"""
f_mnsj = l_flash_msj('Contacto', '¡Gracias por contactarnos! Te responderemos pronto. ☺️', 'success' )
# Enviar en segundo plano
thr = Thread( target=send_async_email, args=(current_app._get_current_object(), msg) )
thr.start()
flash(f_mnsj)
return redirect(url_for('contact'))
return render_template(v['contact'], form=form, active_page='contact')
@app.route("/login", methods=['GET', 'POST'])
def login():
form = LogIn()
if form.validate_on_submit():
f_email = f"{form.email.data}".lower()
f_pswd = form.password.data
res_pswd_server = dbUsers.login((f_email))
if res_pswd_server is None:
f_mnsj = l_flash_msj('Información', 'No se cueta con información del usuario', 'warning')
flash(f_mnsj)
return redirect(url_for('login'))
res_pswd_server = res_pswd_server[0]
if bcrypt.check_password_hash(res_pswd_server, f_pswd):
id_user = dbUsers.get_id(f_email)[0]
is_admin = dbUsers.get_data('SELECT is_admin FROM users WHERE id = %s;', (id_user,))[0]
# Crear token JWT: access_token = create_access_token(identity=id_user)
# access_token = create_access_token( identity=id_user )
access_token = create_access_token( identity=id_user, additional_claims={"is_admin": is_admin} )
# Redirigir a usr_home o a Admin depende
# din_home = 'admin_home' if is_admin else 'user_home'
response = make_response(redirect(url_for("user_home")))
set_access_cookies(response, access_token)
return response
else:
f_mnsj = l_flash_msj('Credenciales', 'Verificar usuario y/o contraseña.', 'error')
flash(f_mnsj)
return render_template(v['login'], form=form, active_page='login')
@app.route("/recover-pswd", methods=['GET', 'POST'])
def recover_pswd():
form = RecoverPswd()
if form.validate_on_submit():
f_email = f"{form.email.data}".lower()
emailPswdReco = dbUsers.reset_pswd(f_email)
if emailPswdReco is None:
f_mnsj = l_flash_msj("Error", "Verificar el correo electrónico ingresado.", "error")
flash(f_mnsj)
return render_template(v['recover_pswd'], form=form, active_page='login')
emailPswdReco = emailPswdReco[0]
new_tmp_pswd = generar_contrasena()
hashed_new_pswd = hash_password(new_tmp_pswd)
# Configurar y enviar email asíncrono
msg = Message(
"Subject: Recuperación de contraseña",
sender=email_sender,
recipients=[emailPswdReco] # Asegúrate que es una lista
)
msg.html = f"""
<!DOCTYPE html>
<html lang="es">
<head>
<meta charset="UTF-8">
<title>Recuperación de contraseña</title>
</head>
<body style="font-family: Arial, sans-serif; background-color: #f4f4f4; padding: 20px;">
<div style="max-width: 600px; margin: auto; background-color: white; padding: 30px; border-radius: 10px; box-shadow: 0 0 10px rgba(0,0,0,0.1);">
<h2 style="color: #2c3e50;">🔒 Recuperación de contraseña</h2>
<p>Hola,</p>
<p>Hemos recibido una solicitud para restablecer tu contraseña. A continuación te proporcionamos una contraseña temporal que podrás usar para iniciar sesión:</p>
<p style="font-size: 18px;"><strong>🔑 Contraseña temporal:</strong> <span style="color: #e74c3c;">{new_tmp_pswd}</span></p>
<p><strong>Importante:</strong> Una vez que inicies sesión, te recomendamos cambiar tu contraseña por una que solo tú conozcas y puedas recordar fácilmente.</p>
<hr>
<p style="font-size: 12px; color: #888;">Este es un mensaje automático, por favor no respondas a este correo.</p>
</div>
</body>
</html>
"""
dbUsers.update_pswd(pswd=hashed_new_pswd, email=emailPswdReco)
# Enviar en segundo plano
thr = Thread(
target=send_async_email,
args=(current_app._get_current_object(), msg)
)
thr.start()
f_mnsj = l_flash_msj("Éxito", "Se ha enviado una contraseña temporal a tu correo electrónico.", "success")
flash(f_mnsj)
return redirect(url_for('login')) # Redirige en lugar de renderizar
return render_template(v['recover_pswd'], form=form, active_page='login')
# #################################################################################################################################
#
# $$\ $$\ $$$$$$\ $$$$$$$$\ $$$$$$$\ $$$$$$\ $$$$$$$\ $$\ $$\ $$$$$$\ $$\ $$\
# $$ | $$ |$$ __$$\ $$ _____|$$ __$$\ $$ __$$\ $$ __$$\ $$$\ $$$ |\_$$ _|$$$\ $$ |
# $$ | $$ |$$ / \__|$$ | $$ | $$ | $$ / $$ |$$ | $$ |$$$$\ $$$$ | $$ | $$$$\ $$ |
# $$ | $$ |\$$$$$$\ $$$$$\ $$$$$$$ | $$$$$$\ $$$$$$$$ |$$ | $$ |$$\$$\$$ $$ | $$ | $$ $$\$$ |
# $$ | $$ | \____$$\ $$ __| $$ __$$< \______| $$ __$$ |$$ | $$ |$$ \$$$ $$ | $$ | $$ \$$$$ |
# $$ | $$ |$$\ $$ |$$ | $$ | $$ | $$ | $$ |$$ | $$ |$$ |\$ /$$ | $$ | $$ |\$$$ |
# \$$$$$$ |\$$$$$$ |$$$$$$$$\ $$ | $$ | $$ | $$ |$$$$$$$ |$$ | \_/ $$ |$$$$$$\ $$ | \$$ |
# \______/ \______/ \________|\__| \__| \__| \__|\_______/ \__| \__|\______|\__| \__|
#
# Font Name: Big Money-ne | https://patorjk.com/software/taag/#p=testall&f=Graffiti&t=USER
#
# #################################################################################################################################
@app.context_processor
@jwt_required(optional=True)
def inject_user_role():
try:
token_data = get_jwt()
return {'is_admin': token_data.get('is_admin', False)}
except Exception:
return {'is_admin': False}
def admin_required(view_func):
@wraps(view_func)
def wrapped_view(*args, **kwargs):
token_data = get_jwt()
is_admin = token_data.get('is_admin', False)
if not is_admin:
f_mnsj = l_flash_msj('Error', 'No tienes permisos para acceder a esta sección.', 'error')
flash(f_mnsj)
return redirect(url_for('user_home'))
return view_func(*args, **kwargs)
return wrapped_view
@app.route('/user/home')
@jwt_required()
@validate_user_exists
def user_home():
# get_jwt_identity()
# todo el token
token_data = get_jwt()
is_admin = token_data.get('is_admin', False)
# timestamp expiración de la sesión
exp = token_data['exp']
# id del usuario activo:
usr_id = token_data['sub']
is_pswd_reseted = dbUsers.get_data('SELECT is_pswd_reseted FROM users WHERE id = %s;', (usr_id,))[0]
update_pswd = l_flash_msj('Recomendación', 'Cambia tu contraseña por una que recuerdes.', 'warning') if is_pswd_reseted else None
q = "SELECT nombre, genero, lst_conn FROM users WHERE id = %s;"
data_saludo = dbUsers.get_data(q, (usr_id,))
nombre, gender, lst_conn = data_saludo
lst_conn = '' if lst_conn is None else get_date_n_time(lst_conn)['date']
current_date = get_date_n_time(cur_date())['date']
f_mnsj = None
if lst_conn != current_date:
strGreet = 'Bienvenido' if gender == 'M' else 'Bienvenida'
f_mnsj = l_flash_msj(f'{saludo_hr()}', f'{strGreet} {nombre}', 'success')
# dbUsers.update_data(q, d)
flash(f_mnsj)
flash(update_pswd)
q = "UPDATE users SET lst_conn = %s WHERE id = %s;"
d = (cur_date(), usr_id)
dbUsers.update_data(q, d)
q_contact = """
SELECT
ID,
TO_CHAR(FULL_DATE_TIME, 'DD/MM/YYYY') AS FECHA_FORMATEADA,
NOMBRE,
APELLIDO,
ESTADO,
SIZE_CO,
ROL_CONTACTO,
INDUSTRY_TYPE,
STATUS
FROM
CONTACT
WHERE
STATUS <> 'Archivado'
OR STATUS IS NULL
ORDER BY
FULL_DATE_TIME DESC;
"""
data_contact = dbUsers.get_all_data(q_contact, ())
return render_template(v['tmp_user']['home'], f_mnsj=f_mnsj, nombre=nombre, exp=exp, data_contact=data_contact, active_page='user_home')
@app.route('/user/manage-record', methods=['POST'])
@jwt_required()
@validate_user_exists
def manage_record():
# print(request.get_json())
# print("Cookies recibida:", request.cookies)
# token = request.cookies.get('access_token_cookie')
# decoded_token = decode_token(token)
data = request.get_json()
id = data['id']
valor = data['value']
q = 'UPDATE contact SET status = %s WHERE id = %s;'
dbUsers.update_data(q, (valor, id))
return jsonify({"type": "success"})
@app.route('/user/get-contact-data', methods=['POST'])
@jwt_required()
@validate_user_exists
def get_contact_data():
data = request.get_json()
id = data['id']
q = "SELECT TO_CHAR(FULL_DATE_TIME, 'DD/MM/YYYY') as fecha, TO_CHAR(FULL_DATE_TIME, 'HH24:MI') as hora, nombre, apellido, email, estado, num_tel, size_co, rol_contacto, industry_type, tipo_req, status FROM contact WHERE id = %s;"
t = (id,)
dbData = dbUsers.get_data(q, t)
return jsonify({"data": dbData})
@app.route('/user/download-db', methods=['GET'])
@jwt_required()
@validate_user_exists
@admin_required
def download_db():
q = """
SELECT
id,
TO_CHAR(FULL_DATE_TIME, 'DD/MM/YYYY') as fecha,
TO_CHAR(FULL_DATE_TIME, 'HH24:MI') as hora,
nombre,
apellido,
email,
estado,
num_tel,
size_co,
rol_contacto,
industry_type,
tipo_req,
status
FROM
contact;
"""
dbData = dbUsers.get_all_data(q, ())
return jsonify({"data": dbData})
@app.route('/user/txt-editor')
@jwt_required()
@validate_user_exists
@cache.cached(timeout=43200)
def user_txteditor():
template_name = v['tmp_user'].get('txt_editor')
return render_template(template_name, active_page='user_txteditor')
@app.route('/user/save-post', methods=['POST'])
@jwt_required()
@validate_user_exists
def save_post():
id_usr = get_jwt()['sub']
data = request.get_json()
title = data['title']
body = data['body']
time = cur_date()
soup = BeautifulSoup(body, 'html.parser')
body_no_img = soup.get_text(separator=' ', strip=True)
etiquetas_img = re.findall(r'<img[^>]+src=["\'](.*?)["\']', body)
imagenes_json = json.dumps(etiquetas_img) if etiquetas_img else None
q = None
t = None
if "id" in data:
q = 'UPDATE posts SET updated_at = %s, title = %s, body = %s, body_no_img = %s, lista_imagenes = %s::jsonb WHERE id = %s AND id_usr = %s;'
t = (time, title, body, body_no_img, imagenes_json, data['id'], id_usr)
else:
q = "INSERT INTO posts (id_usr, created_at, title, body, body_no_img, lista_imagenes) VALUES (%s, %s, %s, %s, %s, %s::jsonb);"
t = (id_usr, time, title, body, body_no_img, imagenes_json)
try:
dbUsers.update_data(q, t)
return jsonify({
"status": "success",
"title_post": title,
"redirect_url": url_for('my_posts') # o usa _external=True si se necesita URL completa
# url_for('my_posts', _external=True)
})
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/user/my-posts')
@jwt_required()
@validate_user_exists
def my_posts():
id_usr = get_jwt()['sub']
page = int(request.args.get("page", 1))
per_page = 8 # Número de tarjetas por página
# Obtener todos los posts del usuario (puedes optimizar esto con paginación SQL real después)
q_all = r"""
SELECT
id,
TO_CHAR(created_at, 'DD/MM/YYYY HH24:MI') as fecha_creada,
TO_CHAR(updated_at, 'DD/MM/YYYY HH24:MI') as fecha_updated,
title,
LEFT(body_no_img, 180),
lista_imagenes->0,
GREATEST(array_length(regexp_split_to_array(TRIM(body_no_img), '\s+'), 1) / 375, 1) as n_words
FROM
posts
WHERE
id_usr = %s
ORDER BY
COALESCE(updated_at, created_at) DESC,
created_at DESC;
"""
data_all = dbUsers.get_all_data(q_all, (id_usr,))
total_posts = len(data_all)
total_pages = (total_posts + per_page - 1) // per_page
start = (page - 1) * per_page
end = start + per_page
data = data_all[start:end]
return render_template(
v['tmp_user']['my_posts'],
data=data,
current_page=page,
total_pages=total_pages,
active_page='my_posts'
)
@app.route('/user/del-post', methods=['POST'])
@jwt_required()
@validate_user_exists
def del_post():
data = request.get_json()
t = (data['id'],)
q = 'DELETE FROM posts WHERE id = %s;'
dbUsers.update_data(q, t)
res = {'ok': True, 'message': f'El elemento se eliminó, pendiente manejar el error en el frontend', "id": t}
return jsonify(res), 200
@app.route('/user/<int:post_id>')
@jwt_required()
@validate_user_exists
def post(post_id):
# Obtener el post
usr_id = get_jwt()['sub']
q = """
SELECT
TO_CHAR(created_at, 'DD/MM/YYYY HH24:MI') as fecha_creada,
TO_CHAR(updated_at, 'DD/MM/YYYY HH24:MI') as fecha_updated,
title,
body
FROM
posts
WHERE
id_usr = %s AND id = %s;
"""
t = (usr_id, post_id )
data = dbUsers.get_data(q, t)
q_nw = r"SELECT array_length(regexp_split_to_array(TRIM(body_no_img), '\s+'), 1) FROM posts WHERE id_usr = %s AND id = %s;"
n_words = dbUsers.get_data(q_nw, t)[0]
time_read = min_read_pst(n_words)
return render_template(v['tmp_user']['read_post'], data=data, time_read=time_read, post_id=post_id)
@app.route('/user/edit-post/<int:id_post>')
@jwt_required()
@validate_user_exists
def edit_post(id_post):
q = 'SELECT title, body FROM posts WHERE id = %s;'
t = (id_post,)
data = dbUsers.get_data(q, t)
return render_template(v['tmp_user']['edit_post'], data=data, id_post=id_post)
@app.route('/user/change-pswd', methods=['GET', 'POST'])
@jwt_required()
@validate_user_exists
def change_pswd():
form = ChangePwsd()
if form.validate_on_submit():
f_old_pswd = form.cur_pswd.data
f_new_pswd = form.new_pswd.data
usr_id = get_jwt()['sub']
q = "SELECT pswd FROM users WHERE id = %s;"
t = (usr_id,)
data = dbUsers.get_data(q, t)[0]
if bcrypt.check_password_hash(data, f_old_pswd):
new_hash_pswd = hash_password(f_new_pswd)
q = "UPDATE users SET pswd = %s, is_pswd_reseted = false WHERE id = %s;"
t = (new_hash_pswd, usr_id)
dbUsers.update_data(q, t)
f_mnsj = l_flash_msj('Éxito', 'La contraseña ha sido actualizada.', 'success')
flash(f_mnsj)
response = make_response(redirect(url_for('login')))
response.delete_cookie('access_token_cookie')
return response
else:
f_mnsj = l_flash_msj('Error', 'La contraseña a actualizar es incorrecta.', 'error')
flash(f_mnsj)
return render_template(v['tmp_user']['change_pswd'], active_page='change_pswd', form=form)
@app.route('/user/metrics')
@jwt_required()
@validate_user_exists
@admin_required
def metrics():
return render_template(v['tmp_user']['metrics'], active_page='metrics')
@app.route('/user/metrics/data')
@jwt_required()
@validate_user_exists
@admin_required
def data_metrics():
# SELECT pv.id_post, pv.viewed, u.id, u.nombre, u.apellido
# FROM posts_visited pv
# INNER JOIN posts p ON p.id = pv.id_post
# INNER JOIN users u ON u.id = p.id_usr;
# -- SELECT pv.id_post, pv.viewed, u.nombre, u.apellido
# -- FROM posts_visited pv
# -- INNER JOIN posts p ON p.id = pv.id_post
# -- INNER JOIN users u ON u.id = p.id_usr;
q_contact = r"""
SELECT
CASE EXTRACT(MONTH FROM full_date_time AT TIME ZONE 'America/Mexico_City')
WHEN 1 THEN 'Enero'
WHEN 2 THEN 'Febrero'
WHEN 3 THEN 'Marzo'
WHEN 4 THEN 'Abril'
WHEN 5 THEN 'Mayo'
WHEN 6 THEN 'Junio'
WHEN 7 THEN 'Julio'
WHEN 8 THEN 'Agosto'
WHEN 9 THEN 'Septiembre'
WHEN 10 THEN 'Octubre'
WHEN 11 THEN 'Noviembre'
WHEN 12 THEN 'Diciembre'
END AS mes,
COUNT(*) AS cantidad_registros,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje
FROM
contact
GROUP BY
EXTRACT(MONTH FROM full_date_time AT TIME ZONE 'America/Mexico_City')
ORDER BY
EXTRACT(MONTH FROM full_date_time AT TIME ZONE 'America/Mexico_City');
"""
q_count_state = "SELECT estado, COUNT(estado) AS conteo_edo, ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje FROM contact GROUP BY estado ORDER BY conteo_edo DESC;"
q_size_co = "SELECT size_co, COUNT(size_co) AS conteo_size FROM contact GROUP BY size_co ORDER BY conteo_size DESC;"
q_rol_contact = """
SELECT
rol_contacto,
COUNT(*) AS conteo,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje
FROM contact
GROUP BY rol_contacto
ORDER BY conteo DESC;
"""
q_industry_type = """
SELECT
industry_type,
COUNT(industry_type) AS conteo,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje
FROM
contact
GROUP BY
industry_type
ORDER BY
conteo DESC;
"""
q_group_status = """
SELECT
COALESCE(status, 'Sin Estatus') AS status,
COUNT(*) AS conteo,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje
FROM
contact
GROUP BY
COALESCE(status, 'Sin Estatus')
ORDER BY
conteo;
"""
q_posts_mas_vistos = "SELECT id_post, COUNT(id_post) AS conteo FROM posts_visited GROUP BY id_post ORDER BY conteo DESC LIMIT 10;"
q_top_three_authors = """
SELECT
CONCAT(u.nombre, ' ', u.apellido) AS autor,
COUNT(*) AS total_posts
FROM
posts_visited pv
INNER JOIN
posts p ON p.id = pv.id_post
INNER JOIN
users u ON u.id = p.id_usr
GROUP BY
u.id, u.nombre, u.apellido
ORDER BY
total_posts DESC
LIMIT 3;
"""
q_visited_from = """
SELECT
CASE source_name
WHEN 'li' THEN 'LinkedIn'
WHEN 'wa' THEN 'WhatsApp'
WHEN 'fb' THEN 'Facebook'
WHEN 'x' THEN 'X'
ELSE 'Otro'
END AS nombre_legible,
COUNT(source_name) AS total
FROM visited_from
GROUP BY source_name
ORDER BY total DESC;
"""
data_contact = {
"count_monthly": dbUsers.get_all_data(q_contact, ()),
"count_state": dbUsers.get_all_data(q_count_state, ()),
"size_co": dbUsers.get_all_data(q_size_co, ()),
"rol_contact": dbUsers.get_all_data(q_rol_contact, ()),
"industry_type": dbUsers.get_all_data(q_industry_type, ()),
"group_status": dbUsers.get_all_data(q_group_status, ()),
"top_ten": dbUsers.get_all_data(q_posts_mas_vistos, ()),
"top_three_authors": dbUsers.get_all_data(q_top_three_authors, ()),
"visited_from": dbUsers.get_all_data(q_visited_from, ())
}
return jsonify(data_contact)
@app.route('/user/manage-profiles', methods=['GET', 'POST'])
@jwt_required()
@validate_user_exists
@admin_required
def manage_profiles():
form = AddUser()
id_usr = get_jwt()['sub']
q_all_users = """
SELECT
u.id, u.nombre, u.apellido, u.email, COALESCE(TO_CHAR(u.lst_conn, 'DD/MM/YYYY HH24:MI'), 'Sin conexión') AS ultima_conexion,
CASE
WHEN u.is_admin = true THEN ''
WHEN u.is_admin = false THEN 'No'
END AS admin,
COALESCE(p.conteo, 0) AS conteo,
CASE
WHEN u.is_contact_noti = true THEN ''
WHEN u.is_contact_noti = false THEN 'No'
END AS isNotificated
FROM users u
LEFT JOIN (
SELECT id_usr, COUNT(*) AS conteo
FROM posts
GROUP BY id_usr
) p ON u.id = p.id_usr
ORDER BY conteo DESC;
"""
data_all_users = dbUsers.get_all_data(q_all_users, ())
if form.validate_on_submit():
f_nombre = f'{form.nombre.data}'.title().strip()
f_apellido = f'{form.apellido.data}'.title().strip()
f_genero = form.genero.data
f_email = f'{form.email.data}'.lower().strip()
f_isAdmin = f'{form.isAdmin.data}'.upper().strip()
f_isContactNoti = f'{form.isContactNoti.data}'.upper().strip()
f_id = form.id.data.strip()
f_mnsj = None
q = None
t = None
subject = None
html_content = None
r_str_isAdmin = '' if f_isAdmin == 'TRUE' else 'No'
r_str_isContactNoti = '' if f_isContactNoti == 'TRUE' else 'No'
# si el f_id no es igual a '' entonces es una actualización de datos
if f_id != '':
q = "UPDATE users SET nombre = %s, apellido = %s, genero = %s, email = %s, is_admin = %s, is_contact_noti = %s WHERE id = %s;"
t = (f_nombre, f_apellido, f_genero, f_email, f_isAdmin, f_isContactNoti, f_id)
f_mnsj = l_flash_msj('Éxito', f'Usuario actualizado: {f_nombre}', 'success')
subject = "Usuario actualizado"
html_content = """
<table width="100%" cellpadding="0" cellspacing="0" style="font-family: Arial, sans-serif; background-color: #f4f4f4; padding: 20px;">
<tr>
<td align="center">
<table width="600" cellpadding="0" cellspacing="0" style="background-color: #ffffff; border-radius: 8px; padding: 40px;">
<tr>
<td align="center" style="padding-bottom: 20px;">
<img src="https://formha.temporal.work/static/y_img/logos/formha_blanco_vertical.png" alt="Logo Formha" style="width: 150px; height: auto;">
<p style="color: #4a5568; font-size: 16px; margin-top: 10px;"><strong>Tu cuenta ha sido actualizada con éxito.</strong></p>
</td>
</tr>
<tr>
<td style="color: #2d3748; font-size: 15px; line-height: 1.6;">
<p>
<strong>Nombre:</strong> {} <br>
<strong>Apellido:</strong> {}<br>
<strong>Género:</strong> {} <br>
<strong>Email:</strong> {} <br>
<strong>Permisos de Admin:</strong> {}<br>
<strong>Notificaciones por email:</strong> {}
</p>
</td>
</tr>
<tr>
<td align="center" style="padding-top: 30px;">
<a href="https://formha.temporal.work/login" style="background-color: #4299e1; color: #ffffff; text-decoration: none; padding: 12px 24px; border-radius: 5px; font-weight: bold; display: inline-block;">
Ir al sitio Formha
</a>
</td>
</tr>
<tr>
<td align="center" style="padding-top: 30px; font-size: 12px; color: #a0aec0;">
Si no realizaste esta actualización, por favor contacta con el soporte técnico.
</td>
</tr>
</table>
</td>
</tr>
</table>
""".format(f_nombre, f_apellido, f_genero, f_email, r_str_isAdmin, r_str_isContactNoti)
# en caso de que el f_id = '' quiere decir que es un nuevo registro y debo checar que no sea un registro preexiste a partir del email
else:
q_isDuplicated = "SELECT email FROM users WHERE email = %s;"
t_isDuplicated = (f_email,)
isDuplicated = dbUsers.get_data(q_isDuplicated, t_isDuplicated)
if isDuplicated is not None:
f_mnsj = l_flash_msj('Error', 'El correo electrónico ya existe, NO SE SOBREESCRIBE EL USUARIO EXISTENTE.', 'error')
flash(f_mnsj)
return redirect(url_for('manage_profiles'))
random_id = getRandomId()
tmp_pswd = generar_contrasena()
hashed_pswd = hash_password(tmp_pswd)
q = "INSERT INTO users (id, nombre, apellido, genero, email, pswd, is_admin, is_pswd_reseted, is_contact_noti ) values (%s, %s, %s, %s, %s, %s, %s, %s, %s);"
t = (random_id, f_nombre, f_apellido, f_genero, f_email, hashed_pswd, f_isAdmin, True, f_isContactNoti)
f_mnsj = l_flash_msj('Éxito', f'Usuario creado: {f_nombre}', 'success')
subject = "Nueva cuenta creada"
html_content = """
<table width="100%" cellpadding="0" cellspacing="0" style="font-family: Arial, sans-serif; background-color: #f4f4f4; padding: 20px;">
<tr>
<td align="center">
<table width="600" cellpadding="0" cellspacing="0" style="background-color: #ffffff; border-radius: 8px; padding: 40px;">
<tr>
<td align="center" style="padding-bottom: 20px;">
<img src="https://formha.temporal.work/static/y_img/logos/formha_blanco_vertical.png" alt="Logo Formha" style="width: 150px; height: auto;">
<h1 style="color: #2c5282; margin: 0;">¡Bienvenido a <span style="color:#4299e1;">Formha</span>!</h1>
<p style="color: #4a5568; font-size: 16px; margin-top: 10px;">Tu cuenta ha sido creada con éxito.</p>
</td>
</tr>
<tr>
<td style="color: #2d3748; font-size: 15px; line-height: 1.6;">
<p>
<strong>Nombre:</strong> {}<br>
<strong>Apellido:</strong> {}<br>
<strong>Género:</strong> {}<br>
<strong>Email:</strong> {}<br>
<strong>Contraseña temporal:</strong> <code style="background-color:#edf2f7; padding: 4px 8px; border-radius: 4px;">{}</code><br>
<strong>Permisos de Admin:</strong> {}<br>
<strong>Notificaciones por email:</strong> {}
</p>
<p style="margin-top: 20px;"><strong style="color: #e53e3e;">Importante:</strong> Una vez inicies sesión, debes cambiar la contraseña por una nueva que recuerdes.</p>
</td>
</tr>
<tr>
<td align="center" style="padding-top: 30px;">
<a href="https://formha.temporal.work/login" style="background-color: #4299e1; color: #ffffff; text-decoration: none; padding: 12px 24px; border-radius: 5px; font-weight: bold; display: inline-block;">
Ir al sitio Formha
</a>
</td>
</tr>
</table>
</td>
</tr>
</table>
""".format(f_nombre, f_apellido, f_genero, f_email, tmp_pswd, r_str_isAdmin, r_str_isContactNoti)
# Crear el mensaje de correo con todo el contenido
msg = Message(
subject=subject,
sender=email_sender,
recipients=[f_email]
)
msg.html = html_content # Asignar el contenido HTML aquí
# Enviar en segundo plano
thr = Thread(target=send_async_email, args=(current_app._get_current_object(), msg))
thr.start()
dbUsers.update_data(q, t)
flash(f_mnsj)
return redirect(url_for('manage_profiles'))
return render_template(v['tmp_user']['manage_profiles'], form=form, data_all_users=data_all_users, active_page='manage_profiles', id_usr=id_usr)
@app.route('/user/manage-profiles/delete-usr', methods=['GET', 'POST'])
@jwt_required()
@validate_user_exists
@admin_required
def delete_user():
data = request.get_json()
id_usr = data['id']
q_del_usr = "DELETE FROM users WHERE id = %s;"
dbUsers.update_data(q_del_usr, (id_usr,))
q_del_posts = "DELETE FROM posts WHERE id_usr = %s;"
dbUsers.update_data(q_del_posts, (id_usr,))
res = {'ok': True, 'message': f'El elemento se eliminó, pendiente manejar el error en el frontend', "id": id_usr}
return jsonify(res), 200
@app.route('/user/manage-profiles/get-user', methods=['POST']) # Cambiado a POST
@jwt_required()
@validate_user_exists
@admin_required
def get_user():
data = request.get_json()
id_usr = data['id']
q = "SELECT id, nombre, apellido, genero, email, is_admin, is_contact_noti FROM users WHERE id = %s;"
t = (id_usr,)
dbData = dbUsers.get_data(q, t)
return jsonify({"data": dbData})
@app.route('/user/manage-profiles/update-user', methods=['POST'])
@jwt_required()
@validate_user_exists
@admin_required
def update_user():
data = request.get_json()
id_usr = data['id']
f_nombre = f'{data["nombre"]}'.title().strip()
f_apellido = f'{data["apellido"]}'.title().strip()
f_genero = data['genero']
f_email = f'{data["email"]}'.lower().strip()
f_isAdmin = data['isAdmin'].strip().lower()
q = "UPDATE users SET nombre = %s, apellido = %s, genero = %s, email = %s, is_admin = %s WHERE id = %s;"
t = (f_nombre, f_apellido, f_genero, f_email, f_isAdmin, id_usr)
dbUsers.update_data(q, t)
res = {'ok': True, 'message': 'El elemento se actualizó correctamente', "id": "id_usr"}
return jsonify(res), 200
@app.route('/user/carousel', methods=['GET', 'POST'])
@jwt_required()
@validate_user_exists
@admin_required
def carousel():
form = Carousel()
mnsj_flash = None
q_all_slides = """
SELECT
id,
img_name,
bg_color,
txt_color,
txt,
TO_CHAR(created_at, 'DD/MM/YYYY HH24:MI') as fecha_creada,
url,
is_new_tab
FROM
carousel
ORDER BY
id DESC;
"""
data = dbUsers.get_all_data(q_all_slides, ())
if request.method == 'POST' and form.validate_on_submit():
# img
image_file = form.img.data
filename = secure_filename(image_file.filename)
prev_img = dbUsers.get_all_data("SELECT img_name FROM carousel;", ())
lst_img = [ele[0] for ele in prev_img]
url = None if form.url.data == '' else form.url.data
isNewTab = form.isNewTab.data
# validar que el archivo no se haya cargado previamente
if filename in lst_img:
mnsj_flash = l_flash_msj('Error Archivo', 'La imagen ya ha sido cargada previamente.', 'error')
else:
# image_file.save(f'./static/uploads/{filename}')
image_file.save(os.path.join(folder_upload, filename))
bg_color = rgba_to_string((*hex_to_rgb(form.bg_color.data), 0.5))
txt_color = rgba_to_string((*hex_to_rgb(form.txt_color.data), 1))
txt = form.txt.data
q = "INSERT INTO carousel (img_name, bg_color, txt_color, txt, url, is_new_tab) VALUES (%s, %s, %s, %s, %s, %s);"
t = (filename, bg_color, txt_color, txt, url, isNewTab)
dbUsers.update_data(q, t)
mnsj_flash = l_flash_msj('Datos Slide', f'Datos agregados para imagen: {filename}', 'success')
flash(mnsj_flash)
return redirect(url_for('carousel'))
return render_template(v['tmp_user']['carousel'], form=form, data=data, active_page='carousel')
@app.route('/user/carousel/delete-slide/<int:id>', methods=["POST", "DELETE"])
@jwt_required()
@validate_user_exists
@admin_required
def delete_slide(id):
try:
q_img_file = "SELECT img_name FROM carousel WHERE id = %s;"
t_img_file = (id,)
result = dbUsers.get_data(q_img_file, t_img_file)
if not result:
return jsonify({'error': 'No se encontró el registro'}), 404
data_img = result[0]
# filepath = os.path.join(os.getcwd(), 'static', 'uploads', data_img)
basedir = os.path.abspath(os.path.dirname(__file__))
filepath = os.path.join(basedir, 'static', 'uploads', data_img)
q_del_record = "DELETE FROM carousel WHERE id = %s;"
dbUsers.update_data(q_del_record, t_img_file)
if os.path.isfile(filepath):
os.remove(filepath)
else:
print(f"[!] Archivo no encontrado: {filepath}")
return jsonify({'msg': 'Slide eliminado correctamente'}), 200
except Exception as e:
print(f'Error al eliminar el slide: {e}')
return jsonify({'error': 'Error interno del servidor'}), 500
@app.route('/user/carousel/download/<path:filename>')
@jwt_required()
@validate_user_exists
@admin_required
def download_file(filename):
uploads_dir = os.path.join(app.root_path, 'static', 'uploads')
try:
return send_from_directory(uploads_dir, filename, as_attachment=True)
except FileNotFoundError:
abort(404)
# -------------------------------------------------------------
# MANEJO DE ERRORES DE JWT
@jwt.unauthorized_loader
def unauthorized_response(callback):
return jsonify({"error": "Token inválido o no proporcionado"}), 401
# Detecta si la petición viene del frontend (fetch)
def is_fetch_request():
return request.headers.get("X-Requested-With") == "XMLHttpRequest"
@jwt.expired_token_loader
def handle_expired_token(jwt_header, jwt_payload):
# if is_fetch_request():
# return jsonify({"error": "El token ha expirado"}), 401
l_flash_msj('Sesión Expirada', 'Por favor inicia sesión nuevamente', 'warning')
# flash(l_flash_msj)
return redirect(url_for('login'))
@jwt.unauthorized_loader
def handle_unauthorized_error(reason):
if is_fetch_request():
return jsonify({"error": "Token inválido o no proporcionado"}), 401
flash(f'Debes iniciar sesión para acceder a esta página: {reason}', 'error')
return redirect(url_for('login'))
@jwt.invalid_token_loader
def handle_invalid_token_error(reason):
if is_fetch_request():
return jsonify({"error": "Sesión inválida"}), 401
flash(f'Sesión inválida: {reason}', 'error')
return redirect(url_for('login'))
# -------------------------------------------------------------
@app.route("/logout")
def logout():
f_mnsj = l_flash_msj('👋🏼Logout', 'Se ha cerrado tu sesión.', 'success')
flash(f_mnsj)
response = make_response(redirect(url_for('login')))
response.delete_cookie('access_token_cookie')
return response
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=8089)