formha/main.py
2025-04-25 17:51:34 -06:00

959 lines
35 KiB
Python

from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity, get_jwt, decode_token, verify_jwt_in_request, set_access_cookies, get_csrf_token
from flask_mail import Message, Mail
from threading import Thread
from flask import Flask, render_template, redirect, url_for, flash, current_app, request, jsonify, make_response
from forms_py.cls_form_contact import ContactForm
from forms_py.cls_form_add_usr import AddUser
from forms_py.cls_form_login import LogIn
from forms_py.cls_db import DBContact
from forms_py.cls_db_usr import DBForma
from forms_py.cls_change_pswd import ChangePwsd
from forms_py.functions import db_conf_obj, generar_contrasena, hash_password, v, l_flash_msj, saludo_hr, cur_date, min_read_pst, get_date_n_time, getRandomId
from forms_py.cls_recover_pswd import RecoverPswd
import os
from datetime import datetime, timedelta, timezone
from flask_bcrypt import Bcrypt
# from flask_wtf.csrf import CSRFProtect
from werkzeug.utils import secure_filename
import re
import json
from bs4 import BeautifulSoup
from functools import wraps
app = Flask(__name__)
bcrypt = Bcrypt(app)
# csrf = CSRFProtect(app)
email_sender = os.getenv("email_sender")
email_pswd = os.getenv("pswd_formha")
# lst_email_to = ["davidix1991@gmail.com", "davicho1991@live.com"]
# \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
# INICIO CONFIGURACIÓN JWT
app.config["JWT_SECRET_KEY"] = os.getenv("jwt_secret_key") # Usa una clave segura en producción -> MOVER A VARIABLE DE ENTORNO
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1.5) # timedelta(hours=1) Token expira en 1 hora | timedelta(minutes=1) ` timedelta(seconds=60)`
# EN LOCALHOST FALSE, EN PRODUCCIÓN TRUE
app.config['JWT_COOKIE_SECURE'] = False # True en producción con HTTPS
app.config['JWT_COOKIE_CSRF_PROTECT'] = False # True: Solo para producción, requiere HTTPS
app.config['JWT_TOKEN_LOCATION'] = ['cookies'] # Ubicación donde buscar el token
app.config['JWT_ACCESS_COOKIE_NAME'] = 'access_token_cookie' # Asegura que use el mismo nombre
# app.config['JWT_ACCESS_CSRF_COOKIE_NAME'] = 'csrf_access_token'
app.config['JWT_COOKIE_SAMESITE'] = 'Lax'
# app.config['SESSION_PERMANENT'] = False
# app.config['SESSION_TYPE'] = 'filesystem' # Asegura que la sesión no se guarde en cookies
jwt = JWTManager(app)
# FINAL CONFIGURACIÓN JWT
# /////////////////////////////////////////////////////////////////////////////////////////
# \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
# INICIO FLASK EMAIL
app.config['MAIL_SERVER'] = 'smtp.gmail.com'
app.config['MAIL_PORT'] = 465
app.config['MAIL_USE_SSL'] = True
app.config['MAIL_USERNAME'] = email_sender # email en variable de entorno
app.config['MAIL_PASSWORD'] = email_pswd # contraseña en variable de entorno
app.config['SECRET_KEY'] = os.getenv("email_secret_key")
mail = Mail(app)
def send_async_email(app, msg):
with app.app_context():
mail.send(msg)
# FINAL FLASK EMAIL
# /////////////////////////////////////////////////////////////////////////////////////////
# \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
# CONFIGURACIÓN PARA GUARDAR IMÁGENES EN EL BACKEND
# Configuración para guardar imágenes
# UPLOAD_FOLDER = 'uploads/' # Asegúrate de crear esta carpeta
# ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
# app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
# app.add_url_rule('/uploads/<filename>', endpoint='uploaded_file', view_func=send_from_directory(UPLOAD_FOLDER))
# /////////////////////////////////////////////////////////////////////////////////////////
def cur_timestamp():
return int(datetime.now(timezone.utc).timestamp())
jsonDbContact = db_conf_obj("forma_db")
dbContact = DBContact(jsonDbContact)
dbUsers = DBForma(jsonDbContact)
def lst_email_to() -> list:
data = dbUsers.get_all_data('SELECT email FROM users WHERE is_contact_noti = true;')
re = [ele[0] for ele in data]
return re
# decorador para rutas protegidas en caso de que borres al vuelo a un usuario.
def validate_user_exists(f):
@wraps(f)
def decorated_function(*args, **kwargs):
user_id = get_jwt_identity()
q = "SELECT id FROM users WHERE id = %s;"
t = (user_id,)
exists = dbUsers.get_data(q, t) is not None
if not exists:
f_mnsj = l_flash_msj('Error', 'El usuario no existe en la base de datos.', 'error')
flash(f_mnsj)
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
# ##################################################################
#
# /$$$$$$ /$$$$$$$$ /$$ /$$ /$$$$$$$$ /$$$$$$$ /$$$$$$ /$$
# /$$__ $$| $$_____/| $$$ | $$| $$_____/| $$__ $$ /$$__ $$| $$
# | $$ \__/| $$ | $$$$| $$| $$ | $$ \ $$| $$ \ $$| $$
# | $$ /$$$$| $$$$$ | $$ $$ $$| $$$$$ | $$$$$$$/| $$$$$$$$| $$
# | $$|_ $$| $$__/ | $$ $$$$| $$__/ | $$__ $$| $$__ $$| $$
# | $$ \ $$| $$ | $$\ $$$| $$ | $$ \ $$| $$ | $$| $$
# | $$$$$$/| $$$$$$$$| $$ \ $$| $$$$$$$$| $$ | $$| $$ | $$| $$$$$$$$
# \______/ |________/|__/ \__/|________/|__/ |__/|__/ |__/|________/
#
# Font Name: Big Money-ne | https://patorjk.com/software/taag/#p=testall&f=Graffiti&t=USER
# ##################################################################
@app.route('/')
def home():
return render_template(v['home'], active_page='home')
@app.route('/about-us')
def about_us():
return render_template(v['about-us'], active_page='about_us')
@app.route('/solutions')
def solutions():
return render_template(v['solutions'], active_page='solutions')
@app.route('/methodology')
def methodology():
return render_template(v['methodology'], active_page='methodology')
@app.route('/blog')
def blog():
# if any(x in search for x in ["'", '"', " OR ", "--", ";", "1=1"]):
# app.logger.warning(f"Intento de SQL injection detectado: {search}")
# 🛑 IMPORTANTE: Este bloque acepta input del usuario y necesita sanitización adecuada.
# TODO: Reemplazar concatenación de strings por parámetros SQL usando psycopg2.sql.SQL y placeholders (%s)
# Ejemplo de ataque detectado: ' OR '1'='1
# Aunque no ejecuta el ataque, sí lanza error → posible vector de DoS
# Parámetros
page = request.args.get("page", 1, type=int)
search = request.args.get("q", "").strip()
per_page = 9
offset = (page - 1) * per_page
# Armado de condiciones SQL para búsqueda
search_filter = ""
if search:
like = f"'%{search}%'"
search_filter = f"""
WHERE
LOWER(p.title) LIKE LOWER({like}) OR
LOWER(p.body_no_img) LIKE LOWER({like}) OR
LOWER(u.nombre || ' ' || u.apellido) LIKE LOWER({like})
"""
# Conteo total
count_query = f"""
SELECT COUNT(*)
FROM posts p
INNER JOIN users u ON u.id = p.id_usr
{search_filter};
"""
total_posts = dbUsers.get_all_data(count_query)[0][0]
total_pages = (total_posts + per_page - 1) // per_page
# Consulta con paginación
q = fr"""
SELECT
p.id,
u.nombre,
u.apellido,
TO_CHAR(p.created_at, 'DD/MM/YYYY HH24:MI'),
TO_CHAR(p.updated_at, 'DD/MM/YYYY HH24:MI'),
p.title,
LEFT(p.body_no_img, 180) AS preview,
p.lista_imagenes->0 AS primera_imagen,
array_length(regexp_split_to_array(TRIM(body_no_img), '\s+'), 1) / 375 as n_words
FROM
posts p
INNER JOIN
users u ON u.id = p.id_usr
{search_filter}
ORDER BY
p.created_at DESC
LIMIT {per_page} OFFSET {offset};
"""
data = dbUsers.get_all_data(q)
return render_template(
v['blog']['all_posts'],
active_page='blog',
data=data,
current_page=page,
total_pages=total_pages,
search=search # pasamos el término de búsqueda a la plantilla
)
@app.route('/blog/<int:post_id>')
def blog_post(post_id):
q_visited = "INSERT INTO posts_visited (id_post, viewed ) VALUES ( %s, %s);"
t_visited = (post_id, cur_date())
dbUsers.update_data(q_visited, t_visited)
# Obtener el post
q = fr"""
SELECT
u.nombre,
u.apellido,
TO_CHAR(p.created_at, 'DD/MM/YYYY HH24:MI') AS fecha_creada,
TO_CHAR(p.updated_at, 'DD/MM/YYYY HH24:MI') AS fecha_updated,
array_length(regexp_split_to_array(TRIM(p.body_no_img), '\s+'), 1) / 375 as read_time_min,
p.title,
p.body
FROM
posts p
INNER JOIN
users u
ON
p.id_usr = u.id
WHERE
p.id = %s;
"""
t = (post_id,)
data = dbUsers.get_data(q, t)
return render_template(v['blog']['post'], data=data)
@app.route("/contact", methods=['GET', 'POST'])
def contact():
form = ContactForm()
# print(cur_date())
# get_date_n_time
if form.validate_on_submit():
# Procesar datos del formulario
f_mnsj = l_flash_msj('Contacto', '¡Gracias por contactarnos! Te responderemos pronto. ☺️', 'success' )
flash(f_mnsj)
c_date = cur_date()
obj_datetime = get_date_n_time(c_date)
hora = obj_datetime['hour']
fecha = obj_datetime['date']
data = ( c_date, form.nombre.data, form.apellido.data, form.email.data, form.estado.data, form.num_tel.data, form.size_co.data, form.rol_contacto.data, form.industry_type.data, form.tipo_req.data )
# Guardar datos en la base de datos
dbContact.carga_contact(data)
# Configurar y enviar email asíncrono
msg = Message( "Subject, una persona busca asesoria", sender=email_sender, recipients=lst_email_to() )
msg.html ="""
<h1>Nuevo contacto recibido</h1>
<p><trong>Fecha: </strong> {}</p><p><strong>Hora:</strong> {}</p> <p><strong>Nombre:</strong> {} {}</p> <p><strong>Email:</strong> {}</p> <p><strong>Teléfono:</strong> {}</p> <p><strong>Mensaje:</strong> {}</p> <a href="http://127.0.0.1:8089/login" target='_blank'>Iniciar Sesión</a>
""".format(fecha, hora, form.nombre.data, form.apellido.data, form.email.data, form.num_tel.data, form.tipo_req.data)
# Enviar en segundo plano
thr = Thread( target=send_async_email, args=(current_app._get_current_object(), msg) )
thr.start()
return redirect(url_for('contact'))
return render_template(v['contact'], form=form, active_page='contact')
@app.route("/login", methods=['GET', 'POST'])
def login():
form = LogIn()
if form.validate_on_submit():
f_email = f"{form.email.data}".lower()
f_pswd = form.password.data
res_pswd_server = dbUsers.login((f_email))
if res_pswd_server is None:
f_mnsj = l_flash_msj('Información', 'No se cueta con información del usuario', 'warning')
flash(f_mnsj)
return redirect(url_for('login'))
res_pswd_server = res_pswd_server[0]
if bcrypt.check_password_hash(res_pswd_server, f_pswd):
id_user = dbUsers.get_id(f_email)[0]
is_admin = dbUsers.get_data('SELECT is_admin FROM users WHERE id = %s;', (id_user,))[0]
# Crear token JWT: access_token = create_access_token(identity=id_user)
# access_token = create_access_token( identity=id_user )
access_token = create_access_token( identity=id_user, additional_claims={"is_admin": is_admin} )
# Redirigir a usr_home o a Admin depende
# din_home = 'admin_home' if is_admin else 'user_home'
response = make_response(redirect(url_for("user_home")))
set_access_cookies(response, access_token)
return response
else:
f_mnsj = l_flash_msj('Credenciales', 'Verificar usuario y/o contraseña.', 'error')
flash(f_mnsj)
return render_template(v['login'], form=form, active_page='login')
@app.route("/recover-pswd", methods=['GET', 'POST'])
def recover_pswd():
form = RecoverPswd()
if form.validate_on_submit():
f_email = f"{form.email.data}".lower()
emailPswdReco = dbUsers.reset_pswd(f_email)
if emailPswdReco is None:
f_mnsj = l_flash_msj("Error", "Verificar el correo electrónico ingresado.", "error")
flash(f_mnsj)
return render_template(v['recover_pswd'], form=form, active_page='login')
emailPswdReco = emailPswdReco[0]
new_tmp_pswd = generar_contrasena()
hashed_new_pswd = hash_password(new_tmp_pswd)
# Configurar y enviar email asíncrono
msg = Message(
"Subject: Recuperación de contraseña",
sender=email_sender,
recipients=[emailPswdReco] # Asegúrate que es una lista
)
# msg.html = render_template( 'email/recover_pswd.html', temp_password=new_tmp_pswd )
msg.html = """
<h1>Nueva contraseña temporal:</h1>
<p><strong>Contraseña:</strong> {}</p>
<p><strong>Una vez iniciada tu sesión debes de cambiar la contraseña a una nueva que puedas recordar</strong></p>
""".format(new_tmp_pswd)
dbUsers.update_pswd(pswd=hashed_new_pswd, email=emailPswdReco)
# Enviar en segundo plano
thr = Thread(
target=send_async_email,
args=(current_app._get_current_object(), msg)
)
thr.start()
f_mnsj = l_flash_msj("Éxito", "Se ha enviado una contraseña temporal a tu correo electrónico.", "success")
flash(f_mnsj)
return redirect(url_for('login')) # Redirige en lugar de renderizar
return render_template(v['recover_pswd'], form=form, active_page='login')
# #################################################################################################################################
#
# $$\ $$\ $$$$$$\ $$$$$$$$\ $$$$$$$\ $$$$$$\ $$$$$$$\ $$\ $$\ $$$$$$\ $$\ $$\
# $$ | $$ |$$ __$$\ $$ _____|$$ __$$\ $$ __$$\ $$ __$$\ $$$\ $$$ |\_$$ _|$$$\ $$ |
# $$ | $$ |$$ / \__|$$ | $$ | $$ | $$ / $$ |$$ | $$ |$$$$\ $$$$ | $$ | $$$$\ $$ |
# $$ | $$ |\$$$$$$\ $$$$$\ $$$$$$$ | $$$$$$\ $$$$$$$$ |$$ | $$ |$$\$$\$$ $$ | $$ | $$ $$\$$ |
# $$ | $$ | \____$$\ $$ __| $$ __$$< \______| $$ __$$ |$$ | $$ |$$ \$$$ $$ | $$ | $$ \$$$$ |
# $$ | $$ |$$\ $$ |$$ | $$ | $$ | $$ | $$ |$$ | $$ |$$ |\$ /$$ | $$ | $$ |\$$$ |
# \$$$$$$ |\$$$$$$ |$$$$$$$$\ $$ | $$ | $$ | $$ |$$$$$$$ |$$ | \_/ $$ |$$$$$$\ $$ | \$$ |
# \______/ \______/ \________|\__| \__| \__| \__|\_______/ \__| \__|\______|\__| \__|
#
# Font Name: Big Money-ne | https://patorjk.com/software/taag/#p=testall&f=Graffiti&t=USER
#
# #################################################################################################################################
@app.context_processor
@jwt_required(optional=True)
def inject_user_role():
try:
token_data = get_jwt()
return {'is_admin': token_data.get('is_admin', False)}
except Exception:
return {'is_admin': False}
def admin_required(view_func):
@wraps(view_func)
def wrapped_view(*args, **kwargs):
token_data = get_jwt()
is_admin = token_data.get('is_admin', False)
if not is_admin:
f_mnsj = l_flash_msj('Error', 'No tienes permisos para acceder a esta sección.', 'error')
flash(f_mnsj)
return redirect(url_for('user_home'))
return view_func(*args, **kwargs)
return wrapped_view
@app.route('/user/home')
@jwt_required()
@validate_user_exists
def user_home():
# get_jwt_identity()
# todo el token
token_data = get_jwt()
is_admin = token_data.get('is_admin', False)
# timestamp expiración de la sesión
exp = token_data['exp']
# id del usuario activo:
usr_id = token_data['sub']
is_pswd_reseted = dbUsers.get_data('SELECT is_pswd_reseted FROM users WHERE id = %s;', (usr_id,))[0]
update_pswd = l_flash_msj('Recomendación', 'Cambia tu contraseña por una que recuerdes.', 'warning') if is_pswd_reseted else None
q = "SELECT nombre, genero, lst_conn FROM users WHERE id = %s;"
data_saludo = dbUsers.get_data(q, (usr_id,))
nombre, gender, lst_conn = data_saludo
lst_conn = '' if lst_conn is None else get_date_n_time(lst_conn)['date']
current_date = get_date_n_time(cur_date())['date']
f_mnsj = None
if lst_conn != current_date:
# q = "UPDATE users SET lst_conn = %s WHERE id = %s;"
# d = (cur_date(), usr_id)
strGreet = 'Bienvenido' if gender == 'M' else 'Bienvenida'
f_mnsj = l_flash_msj(f'{saludo_hr()}', f'{strGreet} {nombre}', 'success')
# dbUsers.update_data(q, d)
flash(f_mnsj)
flash(update_pswd)
q = "UPDATE users SET lst_conn = %s WHERE id = %s;"
d = (cur_date(), usr_id)
dbUsers.update_data(q, d)
q_contact = """
SELECT
ID, TO_CHAR(FULL_DATE_TIME, 'DD/MM/YYYY') AS FECHA_FORMATEADA, NOMBRE, APELLIDO, ESTADO, SIZE_CO, ROL_CONTACTO, INDUSTRY_TYPE, STATUS
FROM
CONTACT
WHERE
STATUS <> 'Archivado'
OR STATUS IS NULL
ORDER BY
FULL_DATE_TIME DESC;
"""
data_contact = dbUsers.get_all_data(q_contact)
# return render_template(v['tmp_user']['home'], token_data=token_data, f_mnsj=f_mnsj, nombre=nombre, exp=exp, data_contact=data_contact)
return render_template(v['tmp_user']['home'], f_mnsj=f_mnsj, nombre=nombre, exp=exp, data_contact=data_contact)
@app.route('/user/manage-record', methods=['POST'])
@jwt_required()
@validate_user_exists
def manage_record():
# print(request.get_json())
# print("Cookies recibida:", request.cookies)
# token = request.cookies.get('access_token_cookie')
# decoded_token = decode_token(token)
data = request.get_json()
id = data['id']
valor = data['value']
q = 'UPDATE contact SET status = %s WHERE id = %s;'
dbUsers.update_data(q, (valor, id))
return jsonify({"type": "success"})
@app.route('/user/get-contact-data', methods=['POST'])
@jwt_required()
@validate_user_exists
def get_contact_data():
data = request.get_json()
id = data['id']
q = "SELECT TO_CHAR(FULL_DATE_TIME, 'DD/MM/YYYY') as fecha, TO_CHAR(FULL_DATE_TIME, 'HH24:MI') as hora, nombre, apellido, email, estado, num_tel, size_co, rol_contacto, industry_type, tipo_req, status FROM contact WHERE id = %s;"
t = (id,)
dbData = dbUsers.get_data(q, t)
return jsonify({"data": dbData})
@app.route('/user/download-db', methods=['GET'])
@jwt_required()
@validate_user_exists
@admin_required
def download_db():
q = """
SELECT
id, TO_CHAR(FULL_DATE_TIME, 'DD/MM/YYYY') as fecha, TO_CHAR(FULL_DATE_TIME, 'HH24:MI') as hora, nombre, apellido, email, estado, num_tel, size_co, rol_contacto, industry_type, tipo_req, status
FROM
contact;
"""
dbData = dbUsers.get_all_data(q)
return jsonify({"data": dbData})
@app.route('/user/txt-editor')
@jwt_required()
@validate_user_exists
def user_txteditor():
template_name = v['tmp_user'].get('txt_editor')
return render_template(template_name)
@app.route('/user/save-post', methods=['POST'])
@jwt_required()
@validate_user_exists
def save_post():
id_usr = get_jwt()['sub']
data = request.get_json()
title = data['title']
body = data['body']
soup = BeautifulSoup(body, 'html.parser')
body_no_img = soup.get_text(separator=' ', strip=True)
etiquetas_img = re.findall(r'<img[^>]+src=["\'](.*?)["\']', body)
imagenes_json = json.dumps(etiquetas_img) if etiquetas_img else None
q = None
t = None
if "id" in data:
q = 'UPDATE posts SET updated_at = %s, title = %s, body = %s, body_no_img = %s, lista_imagenes = %s::jsonb WHERE id = %s AND id_usr = %s;'
t = (cur_date(), title, body, body_no_img, imagenes_json, data['id'], id_usr)
else:
q = "INSERT INTO posts (id_usr, created_at, title, body, body_no_img, lista_imagenes) VALUES (%s, %s, %s, %s, %s, %s::jsonb);"
t = (id_usr, cur_date(), title, body, body_no_img, imagenes_json)
try:
dbUsers.update_data(q, t)
return jsonify({
"status": "success",
"title_post": title,
"redirect_url": url_for('my_posts') # o usa _external=True si se necesita URL completa
# url_for('my_posts', _external=True)
})
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/user/my-posts')
@jwt_required()
@validate_user_exists
def my_posts():
id_usr = get_jwt()['sub']
page = int(request.args.get("page", 1))
per_page = 8 # Número de tarjetas por página
# Obtener todos los posts del usuario (puedes optimizar esto con paginación SQL real después)
q_all = fr"""
SELECT
id,
TO_CHAR(created_at, 'DD/MM/YYYY HH24:MI') as fecha_creada,
TO_CHAR(updated_at, 'DD/MM/YYYY HH24:MI') as fecha_updated,
title,
LEFT(body_no_img, 180),
lista_imagenes->0,
array_length(regexp_split_to_array(TRIM(body_no_img), '\s+'), 1) / 375 as n_words
FROM
posts
WHERE
id_usr = '{id_usr}'
ORDER BY
COALESCE(updated_at, created_at) DESC,
created_at DESC;
"""
data_all = dbUsers.get_all_data(q_all)
total_posts = len(data_all)
total_pages = (total_posts + per_page - 1) // per_page
start = (page - 1) * per_page
end = start + per_page
data = data_all[start:end]
return render_template(
v['tmp_user']['my_posts'],
data=data,
current_page=page,
total_pages=total_pages
)
@app.route('/user/del-post', methods=['POST'])
@jwt_required()
@validate_user_exists
def del_post():
data = request.get_json()
t = (data['id'],)
q = 'DELETE FROM posts WHERE id = %s;'
dbUsers.update_data(q, t)
res = {'ok': True, 'message': f'El elemento se eliminó, pendiente manejar el error en el frontend', "id": t}
return jsonify(res), 200
@app.route('/user/<int:post_id>')
@jwt_required()
@validate_user_exists
def post(post_id):
# Obtener el post
usr_id = get_jwt()['sub']
q = "select TO_CHAR(created_at, 'DD/MM/YYYY HH24:MI') as fecha_creada, TO_CHAR(updated_at, 'DD/MM/YYYY HH24:MI') as fecha_updated, title, body from posts where id_usr = %s AND id = %s;"
t = (usr_id, post_id )
data = dbUsers.get_data(q, t)
q_nw = r"SELECT array_length(regexp_split_to_array(TRIM(body_no_img), '\s+'), 1) FROM posts WHERE id_usr = %s AND id = %s;"
n_words = dbUsers.get_data(q_nw, t)[0]
time_read = min_read_pst(n_words)
return render_template(v['tmp_user']['read_post'], data=data, time_read=time_read, post_id=post_id)
@app.route('/user/edit-post/<int:id_post>')
@jwt_required()
@validate_user_exists
def edit_post(id_post):
q = 'SELECT title, body FROM posts WHERE id = %s;'
t = (id_post,)
data = dbUsers.get_data(q, t)
return render_template(v['tmp_user']['edit_post'], data=data, id_post=id_post)
@app.route('/user/change-pswd', methods=['GET', 'POST'])
@jwt_required()
@validate_user_exists
def change_pswd():
form = ChangePwsd()
if form.validate_on_submit():
f_old_pswd = form.cur_pswd.data
f_new_pswd = form.new_pswd.data
usr_id = get_jwt()['sub']
q = "SELECT pswd FROM users WHERE id = %s;"
t = (usr_id,)
data = dbUsers.get_data(q, t)[0]
if bcrypt.check_password_hash(data, f_old_pswd):
new_hash_pswd = hash_password(f_new_pswd)
q = "UPDATE users SET pswd = %s, is_pswd_reseted = false WHERE id = %s;"
t = (new_hash_pswd, usr_id)
dbUsers.update_data(q, t)
f_mnsj = l_flash_msj('Éxito', 'La contraseña ha sido actualizada.', 'success')
flash(f_mnsj)
response = make_response(redirect(url_for('login')))
response.delete_cookie('access_token_cookie')
return response
else:
f_mnsj = l_flash_msj('Error', 'La contraseña a actualizar es incorrecta.', 'error')
flash(f_mnsj)
return render_template(v['tmp_user']['change_pswd'], active_page='change_pswd', form=form)
@app.route('/user/metrics')
@jwt_required()
@validate_user_exists
@admin_required
def metrics():
return render_template(v['tmp_user']['metrics'], active_page='metrics')
@app.route('/user/metrics/data')
@jwt_required()
@validate_user_exists
@admin_required
def data_metrics():
q_contact = r"""
SELECT
CASE EXTRACT(MONTH FROM full_date_time AT TIME ZONE 'America/Mexico_City')
WHEN 1 THEN 'Enero'
WHEN 2 THEN 'Febrero'
WHEN 3 THEN 'Marzo'
WHEN 4 THEN 'Abril'
WHEN 5 THEN 'Mayo'
WHEN 6 THEN 'Junio'
WHEN 7 THEN 'Julio'
WHEN 8 THEN 'Agosto'
WHEN 9 THEN 'Septiembre'
WHEN 10 THEN 'Octubre'
WHEN 11 THEN 'Noviembre'
WHEN 12 THEN 'Diciembre'
END AS mes,
COUNT(*) AS cantidad_registros,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje
FROM
contact
GROUP BY
EXTRACT(MONTH FROM full_date_time AT TIME ZONE 'America/Mexico_City')
ORDER BY
EXTRACT(MONTH FROM full_date_time AT TIME ZONE 'America/Mexico_City');
"""
q_count_state = "SELECT estado, COUNT(estado) AS conteo_edo, ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje FROM contact GROUP BY estado ORDER BY conteo_edo DESC;"
q_size_co = "SELECT size_co, COUNT(size_co) AS conteo_size FROM contact GROUP BY size_co ORDER BY conteo_size DESC;"
q_rol_contact = """
SELECT
rol_contacto,
COUNT(*) AS conteo,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje
FROM contact
GROUP BY rol_contacto
ORDER BY conteo DESC;
"""
q_industry_type = """
SELECT
industry_type,
COUNT(industry_type) AS conteo,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje
FROM
contact
GROUP BY
industry_type
ORDER BY
conteo DESC;
"""
q_group_status = """
SELECT
COALESCE(status, 'Sin Estatus') AS status,
COUNT(*) AS conteo,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje
FROM
contact
GROUP BY
COALESCE(status, 'Sin Estatus')
ORDER BY
conteo;
"""
data_contact = {
"count_monthly": dbUsers.get_all_data(q_contact),
"count_state": dbUsers.get_all_data(q_count_state),
"size_co": dbUsers.get_all_data(q_size_co),
"rol_contact": dbUsers.get_all_data(q_rol_contact),
"industry_type": dbUsers.get_all_data(q_industry_type),
"group_status": dbUsers.get_all_data(q_group_status)
}
return jsonify(data_contact)
@app.route('/user/manage-profiles', methods=['GET', 'POST'])
@jwt_required()
@validate_user_exists
@admin_required
def manage_profiles():
form = AddUser()
id_usr = get_jwt()['sub']
q_all_users = """
SELECT
u.id, u.nombre, u.apellido, u.email, COALESCE(TO_CHAR(u.lst_conn, 'DD/MM/YYYY HH24:MI'), 'Sin conexión') AS ultima_conexion,
CASE
WHEN u.is_admin = true THEN ''
WHEN u.is_admin = false THEN 'No'
END AS admin,
COALESCE(p.conteo, 0) AS conteo,
CASE
WHEN u.is_contact_noti = true THEN ''
WHEN u.is_contact_noti = false THEN 'No'
END AS isNotificated
FROM users u
LEFT JOIN (
SELECT id_usr, COUNT(*) AS conteo
FROM posts
GROUP BY id_usr
) p ON u.id = p.id_usr
ORDER BY conteo DESC;
"""
data_all_users = dbUsers.get_all_data(q_all_users)
if form.validate_on_submit():
f_nombre = f'{form.nombre.data}'.title().strip()
f_apellido = f'{form.apellido.data}'.title().strip()
f_genero = form.genero.data
f_email = f'{form.email.data}'.lower().strip()
f_isAdmin = f'{form.isAdmin.data}'.upper().strip()
f_isContactNoti = f'{form.isContactNoti.data}'.upper().strip()
f_id = form.id.data.strip()
f_mnsj = None
q = None
t = None
subject = None
html_content = None
# si el f_id no es igual a '' entonces es una actualización de datos
if f_id != '':
q = "UPDATE users SET nombre = %s, apellido = %s, genero = %s, email = %s, is_admin = %s, is_contact_noti = %s WHERE id = %s;"
t = (f_nombre, f_apellido, f_genero, f_email, f_isAdmin, f_isContactNoti, f_id)
f_mnsj = l_flash_msj('Éxito', f'Usuario actualizado: {f_nombre}', 'success')
subject = "Usuario actualizado"
html_content = """
<h1>¡Hola!</h1>
<p>Tu cuenta ha sido actualizada con éxito.</p>
<p><strong>Nombre:</strong> {}</p>
<p><strong>Apellido:</strong> {}</p>
<p><strong>Género:</strong> {}</p>
<p><strong>Email:</strong> {}</p>
""".format(f_nombre, f_apellido, f_genero, f_email)
# en caso de que el f_id = '' quiere decir que es un nuevo registro y debo checar que no sea un registro preexiste a partir del email
else:
q_isDuplicated = "SELECT email FROM users WHERE email = %s;"
t_isDuplicated = (f_email,)
isDuplicated = dbUsers.get_data(q_isDuplicated, t_isDuplicated)
if isDuplicated is not None:
f_mnsj = l_flash_msj('Error', 'El correo electrónico ya existe, NO SE SOBREESCRIBE EL USUARIO EXISTENTE.', 'error')
flash(f_mnsj)
return redirect(url_for('manage_profiles'))
random_id = getRandomId()
tmp_pswd = generar_contrasena()
hashed_pswd = hash_password(tmp_pswd)
q = "INSERT INTO users (id, nombre, apellido, genero, email, pswd, is_admin, is_pswd_reseted ) values (%s, %s, %s, %s, %s, %s, %s, %s);"
t = (random_id, f_nombre, f_apellido, f_genero, f_email, hashed_pswd, f_isAdmin, True)
f_mnsj = l_flash_msj('Éxito', f'Usuario creado: {f_nombre}', 'success')
subject = "Nueva cuenta creada"
html_content = """
<h1>¡Bienvenido a Forma!</h1>
<p>Tu cuenta ha sido creada con éxito.</p>
<p><strong>Nombre:</strong> {}</p>
<p><strong>Apellido:</strong> {}</p>
<p><strong>Género:</strong> {}</p>
<p><strong>Email:</strong> {}</p>
<p><strong>Contraseña temporal:</strong> {}</p>
<p><strong>Una vez inicies sesión debes de cambiar la contraseña a una nueva que puedas recordar</strong></p>
""".format(f_nombre, f_apellido, f_genero, f_email, tmp_pswd)
# Crear el mensaje de correo con todo el contenido
msg = Message(
subject=subject,
sender=email_sender,
recipients=[f_email]
)
msg.html = html_content # Asignar el contenido HTML aquí
# Enviar en segundo plano
thr = Thread(target=send_async_email, args=(current_app._get_current_object(), msg))
thr.start()
dbUsers.update_data(q, t)
flash(f_mnsj)
return redirect(url_for('manage_profiles'))
return render_template(v['tmp_user']['manage_profiles'], form=form, data_all_users=data_all_users, active_page='manage_profiles', id_usr=id_usr)
@app.route('/user/manage-profiles/delete-usr', methods=['GET', 'POST'])
@jwt_required()
@validate_user_exists
@admin_required
def delete_user():
data = request.get_json()
id_usr = data['id']
q_del_usr = "DELETE FROM users WHERE id = %s;"
dbUsers.update_data(q_del_usr, (id_usr,))
q_del_posts = "DELETE FROM posts WHERE id_usr = %s;"
dbUsers.update_data(q_del_posts, (id_usr,))
res = {'ok': True, 'message': f'El elemento se eliminó, pendiente manejar el error en el frontend', "id": id_usr}
return jsonify(res), 200
@app.route('/user/manage-profiles/get-user', methods=['POST']) # Cambiado a POST
@jwt_required()
@validate_user_exists
@admin_required
def get_user():
data = request.get_json()
id_usr = data['id']
q = "SELECT id, nombre, apellido, genero, email, is_admin, is_contact_noti FROM users WHERE id = %s;"
t = (id_usr,)
dbData = dbUsers.get_data(q, t)
return jsonify({"data": dbData})
@app.route('/user/manage-profiles/update-user', methods=['POST'])
@jwt_required()
@validate_user_exists
@admin_required
def update_user():
data = request.get_json()
id_usr = data['id']
f_nombre = f'{data["nombre"]}'.title().strip()
f_apellido = f'{data["apellido"]}'.title().strip()
f_genero = data['genero']
f_email = f'{data["email"]}'.lower().strip()
f_isAdmin = data['isAdmin'].strip().lower()
q = "UPDATE users SET nombre = %s, apellido = %s, genero = %s, email = %s, is_admin = %s WHERE id = %s;"
t = (f_nombre, f_apellido, f_genero, f_email, f_isAdmin, id_usr)
dbUsers.update_data(q, t)
res = {'ok': True, 'message': 'El elemento se actualizó correctamente', "id": "id_usr"}
return jsonify(res), 200
# -------------------------------------------------------------
# MANEJO DE ERRORES DE JWT
@jwt.unauthorized_loader
def unauthorized_response(callback):
return jsonify({"error": "Token inválido o no proporcionado"}), 401
# Detecta si la petición viene del frontend (fetch)
def is_fetch_request():
return request.headers.get("X-Requested-With") == "XMLHttpRequest"
@jwt.expired_token_loader
def handle_expired_token(jwt_header, jwt_payload):
# if is_fetch_request():
# return jsonify({"error": "El token ha expirado"}), 401
l_flash_msj('Sesión Expirada', 'Por favor inicia sesión nuevamente', 'warning')
# flash(l_flash_msj)
return redirect(url_for('login'))
@jwt.unauthorized_loader
def handle_unauthorized_error(reason):
if is_fetch_request():
return jsonify({"error": "Token inválido o no proporcionado"}), 401
flash(f'Debes iniciar sesión para acceder a esta página: {reason}', 'error')
return redirect(url_for('login'))
@jwt.invalid_token_loader
def handle_invalid_token_error(reason):
if is_fetch_request():
return jsonify({"error": "Sesión inválida"}), 401
flash(f'Sesión inválida: {reason}', 'error')
return redirect(url_for('login'))
# -------------------------------------------------------------
@app.route("/logout")
def logout():
f_mnsj = l_flash_msj('👋🏼Logout', 'Se ha cerrado tu sesión.', 'success')
flash(f_mnsj)
response = make_response(redirect(url_for('login')))
response.delete_cookie('access_token_cookie')
return response
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=8089)