967 lines
35 KiB
Python
967 lines
35 KiB
Python
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity, get_jwt, decode_token, verify_jwt_in_request, set_access_cookies, get_csrf_token
|
|
from flask_mail import Message, Mail
|
|
from threading import Thread
|
|
from flask import Flask, render_template, redirect, url_for, flash, current_app, request, jsonify, make_response
|
|
from forms_py.cls_form_contact import ContactForm
|
|
from forms_py.cls_form_add_usr import AddUser
|
|
from forms_py.cls_form_login import LogIn
|
|
from forms_py.cls_db import DBContact
|
|
from forms_py.cls_db_usr import DBForma
|
|
from forms_py.cls_change_pswd import ChangePwsd
|
|
from forms_py.functions import db_conf_obj, generar_contrasena, hash_password, v, l_flash_msj, saludo_hr, cur_date, min_read_pst, get_date_n_time, getRandomId
|
|
from forms_py.cls_recover_pswd import RecoverPswd
|
|
import os
|
|
from datetime import datetime, timedelta, timezone
|
|
from flask_bcrypt import Bcrypt
|
|
# from flask_wtf.csrf import CSRFProtect
|
|
from werkzeug.utils import secure_filename
|
|
import re
|
|
import json
|
|
from bs4 import BeautifulSoup
|
|
from functools import wraps
|
|
|
|
app = Flask(__name__)
|
|
bcrypt = Bcrypt(app)
|
|
# csrf = CSRFProtect(app)
|
|
|
|
url_login = os.getenv("login_url")
|
|
|
|
email_sender = os.getenv("email_sender")
|
|
email_pswd = os.getenv("pswd_formha")
|
|
# lst_email_to = ["davidix1991@gmail.com", "davicho1991@live.com"]
|
|
|
|
# \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
|
|
# INICIO CONFIGURACIÓN JWT
|
|
app.config["JWT_SECRET_KEY"] = os.getenv("jwt_secret_key") # Usa una clave segura en producción -> MOVER A VARIABLE DE ENTORNO
|
|
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1.5) # timedelta(hours=1) Token expira en 1 hora | timedelta(minutes=1) ` timedelta(seconds=60)`
|
|
# EN LOCALHOST FALSE, EN PRODUCCIÓN TRUE
|
|
app.config['JWT_COOKIE_SECURE'] = False # True en producción con HTTPS
|
|
app.config['JWT_COOKIE_CSRF_PROTECT'] = False # True: Solo para producción, requiere HTTPS
|
|
app.config['JWT_TOKEN_LOCATION'] = ['cookies'] # Ubicación donde buscar el token
|
|
app.config['JWT_ACCESS_COOKIE_NAME'] = 'access_token_cookie' # Asegura que use el mismo nombre
|
|
# app.config['JWT_ACCESS_CSRF_COOKIE_NAME'] = 'csrf_access_token'
|
|
app.config['JWT_COOKIE_SAMESITE'] = 'Lax'
|
|
|
|
# app.config['SESSION_PERMANENT'] = False
|
|
# app.config['SESSION_TYPE'] = 'filesystem' # Asegura que la sesión no se guarde en cookies
|
|
jwt = JWTManager(app)
|
|
# FINAL CONFIGURACIÓN JWT
|
|
# /////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
# \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
|
|
# INICIO FLASK EMAIL
|
|
app.config['MAIL_SERVER'] = 'smtp.gmail.com'
|
|
app.config['MAIL_PORT'] = 465
|
|
app.config['MAIL_USE_SSL'] = True
|
|
app.config['MAIL_USERNAME'] = email_sender # email en variable de entorno
|
|
app.config['MAIL_PASSWORD'] = email_pswd # contraseña en variable de entorno
|
|
app.config['SECRET_KEY'] = os.getenv("email_secret_key")
|
|
mail = Mail(app)
|
|
|
|
def send_async_email(app, msg):
|
|
with app.app_context():
|
|
mail.send(msg)
|
|
|
|
# FINAL FLASK EMAIL
|
|
# /////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
# \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
|
|
# CONFIGURACIÓN PARA GUARDAR IMÁGENES EN EL BACKEND
|
|
# Configuración para guardar imágenes
|
|
# UPLOAD_FOLDER = 'uploads/' # Asegúrate de crear esta carpeta
|
|
# ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
|
|
# app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
|
|
# app.add_url_rule('/uploads/<filename>', endpoint='uploaded_file', view_func=send_from_directory(UPLOAD_FOLDER))
|
|
|
|
# /////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
def cur_timestamp():
|
|
return int(datetime.now(timezone.utc).timestamp())
|
|
|
|
|
|
jsonDbContact = db_conf_obj("forma_db")
|
|
dbContact = DBContact(jsonDbContact)
|
|
dbUsers = DBForma(jsonDbContact)
|
|
|
|
|
|
def lst_email_to() -> list:
|
|
data = dbUsers.get_all_data('SELECT email FROM users WHERE is_contact_noti = true;')
|
|
re = [ele[0] for ele in data]
|
|
return re
|
|
|
|
# decorador para rutas protegidas en caso de que borres al vuelo a un usuario.
|
|
def validate_user_exists(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
user_id = get_jwt_identity()
|
|
q = "SELECT id FROM users WHERE id = %s;"
|
|
t = (user_id,)
|
|
exists = dbUsers.get_data(q, t) is not None
|
|
|
|
if not exists:
|
|
f_mnsj = l_flash_msj('Error', 'El usuario no existe en la base de datos.', 'error')
|
|
flash(f_mnsj)
|
|
return redirect(url_for('login'))
|
|
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
|
|
|
|
# ##################################################################
|
|
#
|
|
# /$$$$$$ /$$$$$$$$ /$$ /$$ /$$$$$$$$ /$$$$$$$ /$$$$$$ /$$
|
|
# /$$__ $$| $$_____/| $$$ | $$| $$_____/| $$__ $$ /$$__ $$| $$
|
|
# | $$ \__/| $$ | $$$$| $$| $$ | $$ \ $$| $$ \ $$| $$
|
|
# | $$ /$$$$| $$$$$ | $$ $$ $$| $$$$$ | $$$$$$$/| $$$$$$$$| $$
|
|
# | $$|_ $$| $$__/ | $$ $$$$| $$__/ | $$__ $$| $$__ $$| $$
|
|
# | $$ \ $$| $$ | $$\ $$$| $$ | $$ \ $$| $$ | $$| $$
|
|
# | $$$$$$/| $$$$$$$$| $$ \ $$| $$$$$$$$| $$ | $$| $$ | $$| $$$$$$$$
|
|
# \______/ |________/|__/ \__/|________/|__/ |__/|__/ |__/|________/
|
|
#
|
|
# Font Name: Big Money-ne | https://patorjk.com/software/taag/#p=testall&f=Graffiti&t=USER
|
|
# ##################################################################
|
|
|
|
@app.route('/')
|
|
def home():
|
|
return render_template(v['home'], active_page='home')
|
|
|
|
@app.route('/about-us')
|
|
def about_us():
|
|
return render_template(v['about-us'], active_page='about_us')
|
|
|
|
@app.route('/solutions')
|
|
def solutions():
|
|
return render_template(v['solutions'], active_page='solutions')
|
|
|
|
@app.route('/methodology')
|
|
def methodology():
|
|
return render_template(v['methodology'], active_page='methodology')
|
|
|
|
|
|
@app.route('/blog')
|
|
def blog():
|
|
|
|
# if any(x in search for x in ["'", '"', " OR ", "--", ";", "1=1"]):
|
|
# app.logger.warning(f"Intento de SQL injection detectado: {search}")
|
|
# 🛑 IMPORTANTE: Este bloque acepta input del usuario y necesita sanitización adecuada.
|
|
# TODO: Reemplazar concatenación de strings por parámetros SQL usando psycopg2.sql.SQL y placeholders (%s)
|
|
# Ejemplo de ataque detectado: ' OR '1'='1
|
|
# Aunque no ejecuta el ataque, sí lanza error → posible vector de DoS
|
|
|
|
|
|
# Parámetros
|
|
page = request.args.get("page", 1, type=int)
|
|
search = request.args.get("q", "").strip()
|
|
per_page = 9
|
|
offset = (page - 1) * per_page
|
|
|
|
# Armado de condiciones SQL para búsqueda
|
|
search_filter = ""
|
|
if search:
|
|
like = f"'%{search}%'"
|
|
search_filter = f"""
|
|
WHERE
|
|
LOWER(p.title) LIKE LOWER({like}) OR
|
|
LOWER(p.body_no_img) LIKE LOWER({like}) OR
|
|
LOWER(u.nombre || ' ' || u.apellido) LIKE LOWER({like})
|
|
"""
|
|
|
|
# Conteo total
|
|
count_query = f"""
|
|
SELECT COUNT(*)
|
|
FROM posts p
|
|
INNER JOIN users u ON u.id = p.id_usr
|
|
{search_filter};
|
|
"""
|
|
total_posts = dbUsers.get_all_data(count_query)[0][0]
|
|
total_pages = (total_posts + per_page - 1) // per_page
|
|
|
|
# Consulta con paginación
|
|
q = fr"""
|
|
SELECT
|
|
p.id,
|
|
u.nombre,
|
|
u.apellido,
|
|
TO_CHAR(p.created_at, 'DD/MM/YYYY HH24:MI'),
|
|
TO_CHAR(p.updated_at, 'DD/MM/YYYY HH24:MI'),
|
|
p.title,
|
|
LEFT(p.body_no_img, 180) AS preview,
|
|
p.lista_imagenes->0 AS primera_imagen,
|
|
array_length(regexp_split_to_array(TRIM(body_no_img), '\s+'), 1) / 375 as n_words
|
|
FROM
|
|
posts p
|
|
INNER JOIN
|
|
users u ON u.id = p.id_usr
|
|
{search_filter}
|
|
ORDER BY
|
|
p.created_at DESC
|
|
LIMIT {per_page} OFFSET {offset};
|
|
"""
|
|
data = dbUsers.get_all_data(q)
|
|
|
|
return render_template(
|
|
v['blog']['all_posts'],
|
|
active_page='blog',
|
|
data=data,
|
|
current_page=page,
|
|
total_pages=total_pages,
|
|
search=search # pasamos el término de búsqueda a la plantilla
|
|
)
|
|
|
|
|
|
@app.route('/blog/<int:post_id>')
|
|
def blog_post(post_id):
|
|
q_visited = "INSERT INTO posts_visited (id_post, viewed ) VALUES ( %s, %s);"
|
|
t_visited = (post_id, cur_date())
|
|
dbUsers.update_data(q_visited, t_visited)
|
|
# Obtener el post
|
|
q = fr"""
|
|
SELECT
|
|
u.nombre,
|
|
u.apellido,
|
|
TO_CHAR(p.created_at, 'DD/MM/YYYY HH24:MI') AS fecha_creada,
|
|
TO_CHAR(p.updated_at, 'DD/MM/YYYY HH24:MI') AS fecha_updated,
|
|
array_length(regexp_split_to_array(TRIM(p.body_no_img), '\s+'), 1) / 375 as read_time_min,
|
|
p.title,
|
|
p.body
|
|
FROM
|
|
posts p
|
|
INNER JOIN
|
|
users u
|
|
ON
|
|
p.id_usr = u.id
|
|
WHERE
|
|
p.id = %s;
|
|
"""
|
|
t = (post_id,)
|
|
data = dbUsers.get_data(q, t)
|
|
|
|
return render_template(v['blog']['post'], data=data)
|
|
|
|
|
|
@app.route("/contact", methods=['GET', 'POST'])
|
|
def contact():
|
|
form = ContactForm()
|
|
# print(cur_date())
|
|
# get_date_n_time
|
|
|
|
if form.validate_on_submit():
|
|
# Procesar datos del formulario
|
|
f_mnsj = l_flash_msj('Contacto', '¡Gracias por contactarnos! Te responderemos pronto. ☺️', 'success' )
|
|
flash(f_mnsj)
|
|
|
|
|
|
c_date = cur_date()
|
|
|
|
obj_datetime = get_date_n_time(c_date)
|
|
hora = obj_datetime['hour']
|
|
fecha = obj_datetime['date']
|
|
|
|
data = ( c_date, form.nombre.data, form.apellido.data, form.email.data, form.estado.data, form.num_tel.data, form.size_co.data, form.rol_contacto.data, form.industry_type.data, form.tipo_req.data )
|
|
|
|
# Guardar datos en la base de datos
|
|
dbContact.carga_contact(data)
|
|
|
|
# Configurar y enviar email asíncrono
|
|
msg = Message( "Subject, una persona busca asesoria", sender=email_sender, recipients=lst_email_to() )
|
|
|
|
msg.html =f"""
|
|
<h1>Nuevo contacto recibido</h1>
|
|
<p><trong>Fecha: </strong> {fecha}</p>
|
|
<p><strong>Hora:</strong> {hora}</p>
|
|
<p><strong>Nombre:</strong> {form.nombre.data} {form.apellido.data}</p>
|
|
<p><strong>Email:</strong> {form.email.data}</p>
|
|
<p><strong>Teléfono:</strong> {form.num_tel.data}</p>
|
|
<p><strong>Mensaje:</strong> {form.tipo_req.data}</p>
|
|
<p><a href="{url_login}" target='_blank'>Iniciar Sesión</a></p>
|
|
"""
|
|
|
|
# Enviar en segundo plano
|
|
thr = Thread( target=send_async_email, args=(current_app._get_current_object(), msg) )
|
|
thr.start()
|
|
|
|
return redirect(url_for('contact'))
|
|
|
|
return render_template(v['contact'], form=form, active_page='contact')
|
|
|
|
@app.route("/login", methods=['GET', 'POST'])
|
|
def login():
|
|
form = LogIn()
|
|
if form.validate_on_submit():
|
|
f_email = f"{form.email.data}".lower()
|
|
f_pswd = form.password.data
|
|
res_pswd_server = dbUsers.login((f_email))
|
|
|
|
if res_pswd_server is None:
|
|
f_mnsj = l_flash_msj('Información', 'No se cueta con información del usuario', 'warning')
|
|
flash(f_mnsj)
|
|
return redirect(url_for('login'))
|
|
|
|
res_pswd_server = res_pswd_server[0]
|
|
if bcrypt.check_password_hash(res_pswd_server, f_pswd):
|
|
|
|
id_user = dbUsers.get_id(f_email)[0]
|
|
|
|
|
|
is_admin = dbUsers.get_data('SELECT is_admin FROM users WHERE id = %s;', (id_user,))[0]
|
|
|
|
# Crear token JWT: access_token = create_access_token(identity=id_user)
|
|
# access_token = create_access_token( identity=id_user )
|
|
access_token = create_access_token( identity=id_user, additional_claims={"is_admin": is_admin} )
|
|
|
|
# Redirigir a usr_home o a Admin depende
|
|
# din_home = 'admin_home' if is_admin else 'user_home'
|
|
|
|
response = make_response(redirect(url_for("user_home")))
|
|
|
|
set_access_cookies(response, access_token)
|
|
return response
|
|
else:
|
|
f_mnsj = l_flash_msj('Credenciales', 'Verificar usuario y/o contraseña.', 'error')
|
|
flash(f_mnsj)
|
|
return render_template(v['login'], form=form, active_page='login')
|
|
|
|
|
|
@app.route("/recover-pswd", methods=['GET', 'POST'])
|
|
def recover_pswd():
|
|
form = RecoverPswd()
|
|
if form.validate_on_submit():
|
|
f_email = f"{form.email.data}".lower()
|
|
emailPswdReco = dbUsers.reset_pswd(f_email)
|
|
|
|
if emailPswdReco is None:
|
|
f_mnsj = l_flash_msj("Error", "Verificar el correo electrónico ingresado.", "error")
|
|
flash(f_mnsj)
|
|
return render_template(v['recover_pswd'], form=form, active_page='login')
|
|
|
|
emailPswdReco = emailPswdReco[0]
|
|
new_tmp_pswd = generar_contrasena()
|
|
hashed_new_pswd = hash_password(new_tmp_pswd)
|
|
|
|
# Configurar y enviar email asíncrono
|
|
msg = Message(
|
|
"Subject: Recuperación de contraseña",
|
|
sender=email_sender,
|
|
recipients=[emailPswdReco] # Asegúrate que es una lista
|
|
)
|
|
|
|
# msg.html = render_template( 'email/recover_pswd.html', temp_password=new_tmp_pswd )
|
|
|
|
msg.html = """
|
|
<h1>Nueva contraseña temporal:</h1>
|
|
<p><strong>Contraseña:</strong> {}</p>
|
|
<p><strong>Una vez iniciada tu sesión debes de cambiar la contraseña a una nueva que puedas recordar</strong></p>
|
|
""".format(new_tmp_pswd)
|
|
|
|
dbUsers.update_pswd(pswd=hashed_new_pswd, email=emailPswdReco)
|
|
|
|
# Enviar en segundo plano
|
|
thr = Thread(
|
|
target=send_async_email,
|
|
args=(current_app._get_current_object(), msg)
|
|
)
|
|
thr.start()
|
|
f_mnsj = l_flash_msj("Éxito", "Se ha enviado una contraseña temporal a tu correo electrónico.", "success")
|
|
flash(f_mnsj)
|
|
return redirect(url_for('login')) # Redirige en lugar de renderizar
|
|
|
|
return render_template(v['recover_pswd'], form=form, active_page='login')
|
|
|
|
|
|
# #################################################################################################################################
|
|
#
|
|
# $$\ $$\ $$$$$$\ $$$$$$$$\ $$$$$$$\ $$$$$$\ $$$$$$$\ $$\ $$\ $$$$$$\ $$\ $$\
|
|
# $$ | $$ |$$ __$$\ $$ _____|$$ __$$\ $$ __$$\ $$ __$$\ $$$\ $$$ |\_$$ _|$$$\ $$ |
|
|
# $$ | $$ |$$ / \__|$$ | $$ | $$ | $$ / $$ |$$ | $$ |$$$$\ $$$$ | $$ | $$$$\ $$ |
|
|
# $$ | $$ |\$$$$$$\ $$$$$\ $$$$$$$ | $$$$$$\ $$$$$$$$ |$$ | $$ |$$\$$\$$ $$ | $$ | $$ $$\$$ |
|
|
# $$ | $$ | \____$$\ $$ __| $$ __$$< \______| $$ __$$ |$$ | $$ |$$ \$$$ $$ | $$ | $$ \$$$$ |
|
|
# $$ | $$ |$$\ $$ |$$ | $$ | $$ | $$ | $$ |$$ | $$ |$$ |\$ /$$ | $$ | $$ |\$$$ |
|
|
# \$$$$$$ |\$$$$$$ |$$$$$$$$\ $$ | $$ | $$ | $$ |$$$$$$$ |$$ | \_/ $$ |$$$$$$\ $$ | \$$ |
|
|
# \______/ \______/ \________|\__| \__| \__| \__|\_______/ \__| \__|\______|\__| \__|
|
|
#
|
|
# Font Name: Big Money-ne | https://patorjk.com/software/taag/#p=testall&f=Graffiti&t=USER
|
|
#
|
|
# #################################################################################################################################
|
|
|
|
@app.context_processor
|
|
@jwt_required(optional=True)
|
|
def inject_user_role():
|
|
try:
|
|
token_data = get_jwt()
|
|
return {'is_admin': token_data.get('is_admin', False)}
|
|
except Exception:
|
|
return {'is_admin': False}
|
|
|
|
|
|
def admin_required(view_func):
|
|
@wraps(view_func)
|
|
def wrapped_view(*args, **kwargs):
|
|
token_data = get_jwt()
|
|
is_admin = token_data.get('is_admin', False)
|
|
|
|
if not is_admin:
|
|
f_mnsj = l_flash_msj('Error', 'No tienes permisos para acceder a esta sección.', 'error')
|
|
flash(f_mnsj)
|
|
return redirect(url_for('user_home'))
|
|
|
|
return view_func(*args, **kwargs)
|
|
return wrapped_view
|
|
|
|
|
|
@app.route('/user/home')
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def user_home():
|
|
# get_jwt_identity()
|
|
# todo el token
|
|
token_data = get_jwt()
|
|
is_admin = token_data.get('is_admin', False)
|
|
# timestamp expiración de la sesión
|
|
exp = token_data['exp']
|
|
|
|
|
|
# id del usuario activo:
|
|
usr_id = token_data['sub']
|
|
|
|
is_pswd_reseted = dbUsers.get_data('SELECT is_pswd_reseted FROM users WHERE id = %s;', (usr_id,))[0]
|
|
|
|
|
|
update_pswd = l_flash_msj('Recomendación', 'Cambia tu contraseña por una que recuerdes.', 'warning') if is_pswd_reseted else None
|
|
|
|
q = "SELECT nombre, genero, lst_conn FROM users WHERE id = %s;"
|
|
data_saludo = dbUsers.get_data(q, (usr_id,))
|
|
nombre, gender, lst_conn = data_saludo
|
|
|
|
lst_conn = '' if lst_conn is None else get_date_n_time(lst_conn)['date']
|
|
current_date = get_date_n_time(cur_date())['date']
|
|
|
|
f_mnsj = None
|
|
|
|
if lst_conn != current_date:
|
|
# q = "UPDATE users SET lst_conn = %s WHERE id = %s;"
|
|
# d = (cur_date(), usr_id)
|
|
strGreet = 'Bienvenido' if gender == 'M' else 'Bienvenida'
|
|
f_mnsj = l_flash_msj(f'{saludo_hr()}', f'{strGreet} {nombre}', 'success')
|
|
# dbUsers.update_data(q, d)
|
|
flash(f_mnsj)
|
|
|
|
|
|
flash(update_pswd)
|
|
|
|
q = "UPDATE users SET lst_conn = %s WHERE id = %s;"
|
|
d = (cur_date(), usr_id)
|
|
dbUsers.update_data(q, d)
|
|
|
|
|
|
q_contact = """
|
|
SELECT
|
|
ID, TO_CHAR(FULL_DATE_TIME, 'DD/MM/YYYY') AS FECHA_FORMATEADA, NOMBRE, APELLIDO, ESTADO, SIZE_CO, ROL_CONTACTO, INDUSTRY_TYPE, STATUS
|
|
FROM
|
|
CONTACT
|
|
WHERE
|
|
STATUS <> 'Archivado'
|
|
OR STATUS IS NULL
|
|
ORDER BY
|
|
FULL_DATE_TIME DESC;
|
|
"""
|
|
data_contact = dbUsers.get_all_data(q_contact)
|
|
|
|
# return render_template(v['tmp_user']['home'], token_data=token_data, f_mnsj=f_mnsj, nombre=nombre, exp=exp, data_contact=data_contact)
|
|
return render_template(v['tmp_user']['home'], f_mnsj=f_mnsj, nombre=nombre, exp=exp, data_contact=data_contact)
|
|
|
|
@app.route('/user/manage-record', methods=['POST'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def manage_record():
|
|
# print(request.get_json())
|
|
# print("Cookies recibida:", request.cookies)
|
|
# token = request.cookies.get('access_token_cookie')
|
|
# decoded_token = decode_token(token)
|
|
data = request.get_json()
|
|
id = data['id']
|
|
valor = data['value']
|
|
|
|
q = 'UPDATE contact SET status = %s WHERE id = %s;'
|
|
dbUsers.update_data(q, (valor, id))
|
|
|
|
return jsonify({"type": "success"})
|
|
|
|
@app.route('/user/get-contact-data', methods=['POST'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def get_contact_data():
|
|
data = request.get_json()
|
|
id = data['id']
|
|
q = "SELECT TO_CHAR(FULL_DATE_TIME, 'DD/MM/YYYY') as fecha, TO_CHAR(FULL_DATE_TIME, 'HH24:MI') as hora, nombre, apellido, email, estado, num_tel, size_co, rol_contacto, industry_type, tipo_req, status FROM contact WHERE id = %s;"
|
|
t = (id,)
|
|
dbData = dbUsers.get_data(q, t)
|
|
return jsonify({"data": dbData})
|
|
|
|
|
|
|
|
@app.route('/user/download-db', methods=['GET'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@admin_required
|
|
def download_db():
|
|
q = """
|
|
SELECT
|
|
id, TO_CHAR(FULL_DATE_TIME, 'DD/MM/YYYY') as fecha, TO_CHAR(FULL_DATE_TIME, 'HH24:MI') as hora, nombre, apellido, email, estado, num_tel, size_co, rol_contacto, industry_type, tipo_req, status
|
|
FROM
|
|
contact;
|
|
"""
|
|
dbData = dbUsers.get_all_data(q)
|
|
return jsonify({"data": dbData})
|
|
|
|
@app.route('/user/txt-editor')
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def user_txteditor():
|
|
template_name = v['tmp_user'].get('txt_editor')
|
|
return render_template(template_name)
|
|
|
|
|
|
@app.route('/user/save-post', methods=['POST'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def save_post():
|
|
id_usr = get_jwt()['sub']
|
|
data = request.get_json()
|
|
title = data['title']
|
|
body = data['body']
|
|
|
|
soup = BeautifulSoup(body, 'html.parser')
|
|
body_no_img = soup.get_text(separator=' ', strip=True)
|
|
etiquetas_img = re.findall(r'<img[^>]+src=["\'](.*?)["\']', body)
|
|
imagenes_json = json.dumps(etiquetas_img) if etiquetas_img else None
|
|
|
|
q = None
|
|
t = None
|
|
if "id" in data:
|
|
q = 'UPDATE posts SET updated_at = %s, title = %s, body = %s, body_no_img = %s, lista_imagenes = %s::jsonb WHERE id = %s AND id_usr = %s;'
|
|
t = (cur_date(), title, body, body_no_img, imagenes_json, data['id'], id_usr)
|
|
else:
|
|
q = "INSERT INTO posts (id_usr, created_at, title, body, body_no_img, lista_imagenes) VALUES (%s, %s, %s, %s, %s, %s::jsonb);"
|
|
t = (id_usr, cur_date(), title, body, body_no_img, imagenes_json)
|
|
|
|
try:
|
|
dbUsers.update_data(q, t)
|
|
return jsonify({
|
|
"status": "success",
|
|
"title_post": title,
|
|
"redirect_url": url_for('my_posts') # o usa _external=True si se necesita URL completa
|
|
# url_for('my_posts', _external=True)
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({"status": "error", "message": str(e)}), 500
|
|
|
|
|
|
|
|
@app.route('/user/my-posts')
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def my_posts():
|
|
id_usr = get_jwt()['sub']
|
|
page = int(request.args.get("page", 1))
|
|
per_page = 8 # Número de tarjetas por página
|
|
|
|
# Obtener todos los posts del usuario (puedes optimizar esto con paginación SQL real después)
|
|
q_all = fr"""
|
|
SELECT
|
|
id,
|
|
TO_CHAR(created_at, 'DD/MM/YYYY HH24:MI') as fecha_creada,
|
|
TO_CHAR(updated_at, 'DD/MM/YYYY HH24:MI') as fecha_updated,
|
|
title,
|
|
LEFT(body_no_img, 180),
|
|
lista_imagenes->0,
|
|
array_length(regexp_split_to_array(TRIM(body_no_img), '\s+'), 1) / 375 as n_words
|
|
FROM
|
|
posts
|
|
WHERE
|
|
id_usr = '{id_usr}'
|
|
ORDER BY
|
|
COALESCE(updated_at, created_at) DESC,
|
|
created_at DESC;
|
|
"""
|
|
|
|
data_all = dbUsers.get_all_data(q_all)
|
|
total_posts = len(data_all)
|
|
total_pages = (total_posts + per_page - 1) // per_page
|
|
|
|
start = (page - 1) * per_page
|
|
end = start + per_page
|
|
data = data_all[start:end]
|
|
|
|
return render_template(
|
|
v['tmp_user']['my_posts'],
|
|
data=data,
|
|
current_page=page,
|
|
total_pages=total_pages
|
|
)
|
|
|
|
|
|
@app.route('/user/del-post', methods=['POST'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def del_post():
|
|
data = request.get_json()
|
|
t = (data['id'],)
|
|
q = 'DELETE FROM posts WHERE id = %s;'
|
|
dbUsers.update_data(q, t)
|
|
res = {'ok': True, 'message': f'El elemento se eliminó, pendiente manejar el error en el frontend', "id": t}
|
|
return jsonify(res), 200
|
|
|
|
@app.route('/user/<int:post_id>')
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def post(post_id):
|
|
# Obtener el post
|
|
usr_id = get_jwt()['sub']
|
|
q = "select TO_CHAR(created_at, 'DD/MM/YYYY HH24:MI') as fecha_creada, TO_CHAR(updated_at, 'DD/MM/YYYY HH24:MI') as fecha_updated, title, body from posts where id_usr = %s AND id = %s;"
|
|
t = (usr_id, post_id )
|
|
data = dbUsers.get_data(q, t)
|
|
q_nw = r"SELECT array_length(regexp_split_to_array(TRIM(body_no_img), '\s+'), 1) FROM posts WHERE id_usr = %s AND id = %s;"
|
|
n_words = dbUsers.get_data(q_nw, t)[0]
|
|
time_read = min_read_pst(n_words)
|
|
return render_template(v['tmp_user']['read_post'], data=data, time_read=time_read, post_id=post_id)
|
|
|
|
@app.route('/user/edit-post/<int:id_post>')
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def edit_post(id_post):
|
|
q = 'SELECT title, body FROM posts WHERE id = %s;'
|
|
t = (id_post,)
|
|
data = dbUsers.get_data(q, t)
|
|
return render_template(v['tmp_user']['edit_post'], data=data, id_post=id_post)
|
|
|
|
|
|
@app.route('/user/change-pswd', methods=['GET', 'POST'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def change_pswd():
|
|
form = ChangePwsd()
|
|
if form.validate_on_submit():
|
|
f_old_pswd = form.cur_pswd.data
|
|
f_new_pswd = form.new_pswd.data
|
|
usr_id = get_jwt()['sub']
|
|
q = "SELECT pswd FROM users WHERE id = %s;"
|
|
t = (usr_id,)
|
|
data = dbUsers.get_data(q, t)[0]
|
|
|
|
if bcrypt.check_password_hash(data, f_old_pswd):
|
|
new_hash_pswd = hash_password(f_new_pswd)
|
|
q = "UPDATE users SET pswd = %s, is_pswd_reseted = false WHERE id = %s;"
|
|
t = (new_hash_pswd, usr_id)
|
|
dbUsers.update_data(q, t)
|
|
|
|
f_mnsj = l_flash_msj('Éxito', 'La contraseña ha sido actualizada.', 'success')
|
|
flash(f_mnsj)
|
|
|
|
response = make_response(redirect(url_for('login')))
|
|
response.delete_cookie('access_token_cookie')
|
|
return response
|
|
else:
|
|
f_mnsj = l_flash_msj('Error', 'La contraseña a actualizar es incorrecta.', 'error')
|
|
flash(f_mnsj)
|
|
|
|
return render_template(v['tmp_user']['change_pswd'], active_page='change_pswd', form=form)
|
|
|
|
|
|
@app.route('/user/metrics')
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@admin_required
|
|
def metrics():
|
|
return render_template(v['tmp_user']['metrics'], active_page='metrics')
|
|
|
|
@app.route('/user/metrics/data')
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@admin_required
|
|
def data_metrics():
|
|
q_contact = r"""
|
|
SELECT
|
|
CASE EXTRACT(MONTH FROM full_date_time AT TIME ZONE 'America/Mexico_City')
|
|
WHEN 1 THEN 'Enero'
|
|
WHEN 2 THEN 'Febrero'
|
|
WHEN 3 THEN 'Marzo'
|
|
WHEN 4 THEN 'Abril'
|
|
WHEN 5 THEN 'Mayo'
|
|
WHEN 6 THEN 'Junio'
|
|
WHEN 7 THEN 'Julio'
|
|
WHEN 8 THEN 'Agosto'
|
|
WHEN 9 THEN 'Septiembre'
|
|
WHEN 10 THEN 'Octubre'
|
|
WHEN 11 THEN 'Noviembre'
|
|
WHEN 12 THEN 'Diciembre'
|
|
END AS mes,
|
|
COUNT(*) AS cantidad_registros,
|
|
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje
|
|
FROM
|
|
contact
|
|
GROUP BY
|
|
EXTRACT(MONTH FROM full_date_time AT TIME ZONE 'America/Mexico_City')
|
|
ORDER BY
|
|
EXTRACT(MONTH FROM full_date_time AT TIME ZONE 'America/Mexico_City');
|
|
"""
|
|
q_count_state = "SELECT estado, COUNT(estado) AS conteo_edo, ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje FROM contact GROUP BY estado ORDER BY conteo_edo DESC;"
|
|
q_size_co = "SELECT size_co, COUNT(size_co) AS conteo_size FROM contact GROUP BY size_co ORDER BY conteo_size DESC;"
|
|
q_rol_contact = """
|
|
SELECT
|
|
rol_contacto,
|
|
COUNT(*) AS conteo,
|
|
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje
|
|
FROM contact
|
|
GROUP BY rol_contacto
|
|
ORDER BY conteo DESC;
|
|
"""
|
|
|
|
q_industry_type = """
|
|
SELECT
|
|
industry_type,
|
|
COUNT(industry_type) AS conteo,
|
|
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje
|
|
FROM
|
|
contact
|
|
GROUP BY
|
|
industry_type
|
|
ORDER BY
|
|
conteo DESC;
|
|
"""
|
|
|
|
q_group_status = """
|
|
SELECT
|
|
COALESCE(status, 'Sin Estatus') AS status,
|
|
COUNT(*) AS conteo,
|
|
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje
|
|
FROM
|
|
contact
|
|
GROUP BY
|
|
COALESCE(status, 'Sin Estatus')
|
|
ORDER BY
|
|
conteo;
|
|
"""
|
|
|
|
data_contact = {
|
|
"count_monthly": dbUsers.get_all_data(q_contact),
|
|
"count_state": dbUsers.get_all_data(q_count_state),
|
|
"size_co": dbUsers.get_all_data(q_size_co),
|
|
"rol_contact": dbUsers.get_all_data(q_rol_contact),
|
|
"industry_type": dbUsers.get_all_data(q_industry_type),
|
|
"group_status": dbUsers.get_all_data(q_group_status)
|
|
}
|
|
return jsonify(data_contact)
|
|
|
|
|
|
@app.route('/user/manage-profiles', methods=['GET', 'POST'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@admin_required
|
|
def manage_profiles():
|
|
form = AddUser()
|
|
|
|
id_usr = get_jwt()['sub']
|
|
|
|
q_all_users = """
|
|
SELECT
|
|
u.id, u.nombre, u.apellido, u.email, COALESCE(TO_CHAR(u.lst_conn, 'DD/MM/YYYY HH24:MI'), 'Sin conexión') AS ultima_conexion,
|
|
CASE
|
|
WHEN u.is_admin = true THEN 'Sí'
|
|
WHEN u.is_admin = false THEN 'No'
|
|
END AS admin,
|
|
COALESCE(p.conteo, 0) AS conteo,
|
|
CASE
|
|
WHEN u.is_contact_noti = true THEN 'Sí'
|
|
WHEN u.is_contact_noti = false THEN 'No'
|
|
END AS isNotificated
|
|
FROM users u
|
|
LEFT JOIN (
|
|
SELECT id_usr, COUNT(*) AS conteo
|
|
FROM posts
|
|
GROUP BY id_usr
|
|
) p ON u.id = p.id_usr
|
|
ORDER BY conteo DESC;
|
|
"""
|
|
data_all_users = dbUsers.get_all_data(q_all_users)
|
|
|
|
if form.validate_on_submit():
|
|
f_nombre = f'{form.nombre.data}'.title().strip()
|
|
f_apellido = f'{form.apellido.data}'.title().strip()
|
|
f_genero = form.genero.data
|
|
f_email = f'{form.email.data}'.lower().strip()
|
|
f_isAdmin = f'{form.isAdmin.data}'.upper().strip()
|
|
f_isContactNoti = f'{form.isContactNoti.data}'.upper().strip()
|
|
f_id = form.id.data.strip()
|
|
f_mnsj = None
|
|
q = None
|
|
t = None
|
|
subject = None
|
|
html_content = None
|
|
|
|
# si el f_id no es igual a '' entonces es una actualización de datos
|
|
if f_id != '':
|
|
q = "UPDATE users SET nombre = %s, apellido = %s, genero = %s, email = %s, is_admin = %s, is_contact_noti = %s WHERE id = %s;"
|
|
t = (f_nombre, f_apellido, f_genero, f_email, f_isAdmin, f_isContactNoti, f_id)
|
|
f_mnsj = l_flash_msj('Éxito', f'Usuario actualizado: {f_nombre}', 'success')
|
|
subject = "Usuario actualizado"
|
|
html_content = """
|
|
<h1>¡Hola!</h1>
|
|
<p>Tu cuenta ha sido actualizada con éxito.</p>
|
|
<p><strong>Nombre:</strong> {}</p>
|
|
<p><strong>Apellido:</strong> {}</p>
|
|
<p><strong>Género:</strong> {}</p>
|
|
<p><strong>Email:</strong> {}</p>
|
|
""".format(f_nombre, f_apellido, f_genero, f_email)
|
|
|
|
# en caso de que el f_id = '' quiere decir que es un nuevo registro y debo checar que no sea un registro preexiste a partir del email
|
|
else:
|
|
q_isDuplicated = "SELECT email FROM users WHERE email = %s;"
|
|
t_isDuplicated = (f_email,)
|
|
isDuplicated = dbUsers.get_data(q_isDuplicated, t_isDuplicated)
|
|
|
|
if isDuplicated is not None:
|
|
f_mnsj = l_flash_msj('Error', 'El correo electrónico ya existe, NO SE SOBREESCRIBE EL USUARIO EXISTENTE.', 'error')
|
|
flash(f_mnsj)
|
|
return redirect(url_for('manage_profiles'))
|
|
|
|
|
|
random_id = getRandomId()
|
|
tmp_pswd = generar_contrasena()
|
|
hashed_pswd = hash_password(tmp_pswd)
|
|
q = "INSERT INTO users (id, nombre, apellido, genero, email, pswd, is_admin, is_pswd_reseted ) values (%s, %s, %s, %s, %s, %s, %s, %s);"
|
|
t = (random_id, f_nombre, f_apellido, f_genero, f_email, hashed_pswd, f_isAdmin, True)
|
|
f_mnsj = l_flash_msj('Éxito', f'Usuario creado: {f_nombre}', 'success')
|
|
subject = "Nueva cuenta creada"
|
|
html_content = """
|
|
<h1>¡Bienvenido a Forma!</h1>
|
|
<p>Tu cuenta ha sido creada con éxito.</p>
|
|
<p><strong>Nombre:</strong> {}</p>
|
|
<p><strong>Apellido:</strong> {}</p>
|
|
<p><strong>Género:</strong> {}</p>
|
|
<p><strong>Email:</strong> {}</p>
|
|
<p><strong>Contraseña temporal:</strong> {}</p>
|
|
<p><strong>Una vez inicies sesión debes de cambiar la contraseña a una nueva que puedas recordar</strong></p>
|
|
""".format(f_nombre, f_apellido, f_genero, f_email, tmp_pswd)
|
|
|
|
# Crear el mensaje de correo con todo el contenido
|
|
msg = Message(
|
|
subject=subject,
|
|
sender=email_sender,
|
|
recipients=[f_email]
|
|
)
|
|
msg.html = html_content # Asignar el contenido HTML aquí
|
|
|
|
# Enviar en segundo plano
|
|
thr = Thread(target=send_async_email, args=(current_app._get_current_object(), msg))
|
|
thr.start()
|
|
|
|
dbUsers.update_data(q, t)
|
|
flash(f_mnsj)
|
|
return redirect(url_for('manage_profiles'))
|
|
|
|
return render_template(v['tmp_user']['manage_profiles'], form=form, data_all_users=data_all_users, active_page='manage_profiles', id_usr=id_usr)
|
|
|
|
|
|
@app.route('/user/manage-profiles/delete-usr', methods=['GET', 'POST'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@admin_required
|
|
def delete_user():
|
|
data = request.get_json()
|
|
id_usr = data['id']
|
|
q_del_usr = "DELETE FROM users WHERE id = %s;"
|
|
dbUsers.update_data(q_del_usr, (id_usr,))
|
|
|
|
q_del_posts = "DELETE FROM posts WHERE id_usr = %s;"
|
|
dbUsers.update_data(q_del_posts, (id_usr,))
|
|
|
|
res = {'ok': True, 'message': f'El elemento se eliminó, pendiente manejar el error en el frontend', "id": id_usr}
|
|
return jsonify(res), 200
|
|
|
|
@app.route('/user/manage-profiles/get-user', methods=['POST']) # Cambiado a POST
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@admin_required
|
|
def get_user():
|
|
data = request.get_json()
|
|
id_usr = data['id']
|
|
q = "SELECT id, nombre, apellido, genero, email, is_admin, is_contact_noti FROM users WHERE id = %s;"
|
|
t = (id_usr,)
|
|
dbData = dbUsers.get_data(q, t)
|
|
return jsonify({"data": dbData})
|
|
|
|
|
|
@app.route('/user/manage-profiles/update-user', methods=['POST'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@admin_required
|
|
def update_user():
|
|
data = request.get_json()
|
|
id_usr = data['id']
|
|
f_nombre = f'{data["nombre"]}'.title().strip()
|
|
f_apellido = f'{data["apellido"]}'.title().strip()
|
|
f_genero = data['genero']
|
|
f_email = f'{data["email"]}'.lower().strip()
|
|
f_isAdmin = data['isAdmin'].strip().lower()
|
|
|
|
q = "UPDATE users SET nombre = %s, apellido = %s, genero = %s, email = %s, is_admin = %s WHERE id = %s;"
|
|
|
|
t = (f_nombre, f_apellido, f_genero, f_email, f_isAdmin, id_usr)
|
|
|
|
dbUsers.update_data(q, t)
|
|
|
|
|
|
res = {'ok': True, 'message': 'El elemento se actualizó correctamente', "id": "id_usr"}
|
|
return jsonify(res), 200
|
|
|
|
|
|
# -------------------------------------------------------------
|
|
# MANEJO DE ERRORES DE JWT
|
|
|
|
@jwt.unauthorized_loader
|
|
def unauthorized_response(callback):
|
|
return jsonify({"error": "Token inválido o no proporcionado"}), 401
|
|
|
|
|
|
# Detecta si la petición viene del frontend (fetch)
|
|
def is_fetch_request():
|
|
return request.headers.get("X-Requested-With") == "XMLHttpRequest"
|
|
|
|
@jwt.expired_token_loader
|
|
def handle_expired_token(jwt_header, jwt_payload):
|
|
# if is_fetch_request():
|
|
# return jsonify({"error": "El token ha expirado"}), 401
|
|
l_flash_msj('Sesión Expirada', 'Por favor inicia sesión nuevamente', 'warning')
|
|
|
|
# flash(l_flash_msj)
|
|
return redirect(url_for('login'))
|
|
|
|
@jwt.unauthorized_loader
|
|
def handle_unauthorized_error(reason):
|
|
if is_fetch_request():
|
|
return jsonify({"error": "Token inválido o no proporcionado"}), 401
|
|
flash(f'Debes iniciar sesión para acceder a esta página: {reason}', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
@jwt.invalid_token_loader
|
|
def handle_invalid_token_error(reason):
|
|
if is_fetch_request():
|
|
return jsonify({"error": "Sesión inválida"}), 401
|
|
flash(f'Sesión inválida: {reason}', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
# -------------------------------------------------------------
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
f_mnsj = l_flash_msj('👋🏼Logout', 'Se ha cerrado tu sesión.', 'success')
|
|
flash(f_mnsj)
|
|
response = make_response(redirect(url_for('login')))
|
|
response.delete_cookie('access_token_cookie')
|
|
return response
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, host='0.0.0.0', port=8089) |