1330 lines
51 KiB
Python
1330 lines
51 KiB
Python
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity, get_jwt, decode_token, verify_jwt_in_request, set_access_cookies, get_csrf_token
|
|
from flask_mail import Message, Mail
|
|
from threading import Thread
|
|
from flask import Flask, render_template, redirect, url_for, flash, current_app, request, jsonify, make_response, send_from_directory, abort
|
|
from forms_py.cls_form_contact import ContactForm
|
|
from forms_py.cls_form_add_usr import AddUser
|
|
from forms_py.cls_form_login import LogIn
|
|
from forms_py.cls_db import DBContact
|
|
from forms_py.cls_db_usr import DBForma
|
|
from forms_py.cls_change_pswd import ChangePwsd
|
|
from forms_py.cls_form_carousel import Carousel
|
|
from forms_py.functions import db_conf_obj, generar_contrasena, hash_password, v, l_flash_msj, saludo_hr, cur_date, min_read_pst, get_date_n_time, getRandomId, hex_to_rgb, rgba_to_string
|
|
from forms_py.cls_recover_pswd import RecoverPswd
|
|
import os
|
|
from datetime import datetime, timedelta, timezone
|
|
from flask_bcrypt import Bcrypt
|
|
# from flask_wtf.csrf import CSRFProtect
|
|
from werkzeug.utils import secure_filename
|
|
import re
|
|
import json
|
|
from bs4 import BeautifulSoup
|
|
from functools import wraps
|
|
from flask_caching import Cache
|
|
|
|
import platform
|
|
|
|
folder_upload = None
|
|
folder_cache = None
|
|
|
|
if 'microsoft' in platform.uname().release.lower():
|
|
# folder_upload = os.path.join(os.getcwd(), "/static/uploads/")
|
|
folder_upload = os.path.join(os.getcwd(), "static", "uploads")
|
|
folder_cache = os.path.join(os.getcwd(), "cache")
|
|
|
|
elif platform.system() == "Linux":
|
|
# folder_upload = "/var/www/uploads/formha"
|
|
folder_upload = "/var/www/formha/static/uploads/"
|
|
folder_cache = "/var/www/formha/cache/"
|
|
|
|
|
|
app = Flask(__name__)
|
|
cache = Cache(app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': folder_cache})
|
|
# cache = Cache(app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': '/tmp/flask-formha'})
|
|
|
|
bcrypt = Bcrypt(app)
|
|
# csrf = CSRFProtect(app)
|
|
|
|
url_login = os.getenv("login_url")
|
|
|
|
|
|
email_sender = os.getenv("email_sender")
|
|
email_pswd = os.getenv("pswd_formha")
|
|
# lst_email_to = ["davidix1991@gmail.com", "davicho1991@live.com"]
|
|
|
|
# \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
|
|
# INICIO CONFIGURACIÓN JWT
|
|
app.config["JWT_SECRET_KEY"] = os.getenv("jwt_secret_key") # Usa una clave segura en producción -> MOVER A VARIABLE DE ENTORNO
|
|
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1.5) # timedelta(hours=1) Token expira en 1 hora | timedelta(minutes=1) ` timedelta(seconds=60)`
|
|
# EN LOCALHOST FALSE, EN PRODUCCIÓN TRUE
|
|
app.config['JWT_COOKIE_SECURE'] = False # True en producción con HTTPS
|
|
app.config['JWT_COOKIE_CSRF_PROTECT'] = False # True: Solo para producción, requiere HTTPS
|
|
app.config['JWT_TOKEN_LOCATION'] = ['cookies'] # Ubicación donde buscar el token
|
|
app.config['JWT_ACCESS_COOKIE_NAME'] = 'access_token_cookie' # Asegura que use el mismo nombre
|
|
# app.config['JWT_ACCESS_CSRF_COOKIE_NAME'] = 'csrf_access_token'
|
|
app.config['JWT_COOKIE_SAMESITE'] = 'Lax'
|
|
|
|
# app.config['SESSION_PERMANENT'] = False
|
|
# app.config['SESSION_TYPE'] = 'filesystem' # Asegura que la sesión no se guarde en cookies
|
|
jwt = JWTManager(app)
|
|
# FINAL CONFIGURACIÓN JWT
|
|
# /////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
# \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
|
|
# INICIO FLASK EMAIL
|
|
app.config['MAIL_SERVER'] = 'smtp.gmail.com'
|
|
app.config['MAIL_PORT'] = 465
|
|
app.config['MAIL_USE_SSL'] = True
|
|
app.config['MAIL_USERNAME'] = email_sender # email en variable de entorno
|
|
app.config['MAIL_PASSWORD'] = email_pswd # contraseña en variable de entorno
|
|
app.config['SECRET_KEY'] = os.getenv("email_secret_key")
|
|
mail = Mail(app)
|
|
|
|
def send_async_email(app, msg):
|
|
with app.app_context():
|
|
mail.send(msg)
|
|
|
|
# FINAL FLASK EMAIL
|
|
# /////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
# \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
|
|
# CONFIGURACIÓN PARA GUARDAR IMÁGENES EN EL BACKEND
|
|
# Configuración para guardar imágenes
|
|
# UPLOAD_FOLDER = 'uploads/' # Asegúrate de crear esta carpeta
|
|
# ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
|
|
# app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
|
|
# app.add_url_rule('/uploads/<filename>', endpoint='uploaded_file', view_func=send_from_directory(UPLOAD_FOLDER))
|
|
|
|
# /////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
def cur_timestamp():
|
|
return int(datetime.now(timezone.utc).timestamp())
|
|
|
|
|
|
jsonDbContact = db_conf_obj("forma_db")
|
|
dbContact = DBContact(jsonDbContact)
|
|
dbUsers = DBForma(jsonDbContact)
|
|
|
|
@app.route('/favicon.ico')
|
|
def favicon():
|
|
return send_from_directory(os.path.join(app.static_folder, 'y_img/favicon'), 'favicon.ico', mimetype='image/vnd.microsoft.icon')
|
|
|
|
def lst_email_to() -> list:
|
|
data = dbUsers.get_all_data('SELECT email FROM users WHERE is_contact_noti = true;', ())
|
|
respuesta = [ele[0] for ele in data]
|
|
return respuesta
|
|
|
|
# decorador para rutas protegidas en caso de que borres al vuelo a un usuario.
|
|
def validate_user_exists(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
user_id = get_jwt_identity()
|
|
q = "SELECT id FROM users WHERE id = %s;"
|
|
t = (user_id,)
|
|
exists = dbUsers.get_data(q, t) is not None
|
|
|
|
if not exists:
|
|
f_mnsj = l_flash_msj('Error', 'El usuario no existe en la base de datos.', 'error')
|
|
flash(f_mnsj)
|
|
return redirect(url_for('login'))
|
|
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
|
|
# ##################################################################
|
|
#
|
|
# /$$$$$$ /$$$$$$$$ /$$ /$$ /$$$$$$$$ /$$$$$$$ /$$$$$$ /$$
|
|
# /$$__ $$| $$_____/| $$$ | $$| $$_____/| $$__ $$ /$$__ $$| $$
|
|
# | $$ \__/| $$ | $$$$| $$| $$ | $$ \ $$| $$ \ $$| $$
|
|
# | $$ /$$$$| $$$$$ | $$ $$ $$| $$$$$ | $$$$$$$/| $$$$$$$$| $$
|
|
# | $$|_ $$| $$__/ | $$ $$$$| $$__/ | $$__ $$| $$__ $$| $$
|
|
# | $$ \ $$| $$ | $$\ $$$| $$ | $$ \ $$| $$ | $$| $$
|
|
# | $$$$$$/| $$$$$$$$| $$ \ $$| $$$$$$$$| $$ | $$| $$ | $$| $$$$$$$$
|
|
# \______/ |________/|__/ \__/|________/|__/ |__/|__/ |__/|________/
|
|
#
|
|
# Font Name: Big Money-ne | https://patorjk.com/software/taag/#p=testall&f=Graffiti&t=USER
|
|
# ##################################################################
|
|
|
|
@app.errorhandler(404)
|
|
def page_not_found(e):
|
|
# puedes usar una plantilla HTML personalizada si quieres
|
|
# return render_template('404.html'), 404
|
|
mnsj = l_flash_msj("Error Sección", "La sección a la que quieres entrar no existe", "error")
|
|
flash(mnsj)
|
|
return redirect(url_for('home'))
|
|
|
|
@app.route('/')
|
|
@cache.cached(timeout=3600) # 1 hora
|
|
def home():
|
|
q = """
|
|
SELECT
|
|
id,
|
|
img_name,
|
|
bg_color,
|
|
txt_color,
|
|
txt,
|
|
TO_CHAR(created_at, 'DD/MM/YYYY HH24:MI') as fecha_creada,
|
|
url,
|
|
is_new_tab
|
|
FROM
|
|
carousel
|
|
ORDER BY
|
|
id DESC;
|
|
"""
|
|
data = dbUsers.get_all_data(q, ())
|
|
return render_template(v['home'], active_page='home', data=data)
|
|
|
|
@app.route('/about-us')
|
|
@cache.cached(timeout=43200) # 12 horas
|
|
def about_us():
|
|
return render_template(v['about-us'], active_page='about_us')
|
|
|
|
@app.route('/solutions')
|
|
@cache.cached(timeout=43200) # 12 horas
|
|
def solutions():
|
|
return render_template(v['solutions'], active_page='solutions')
|
|
|
|
@app.route('/methodology')
|
|
@cache.cached(timeout=43200) # 12 horas
|
|
def methodology():
|
|
return render_template(v['methodology'], active_page='methodology')
|
|
|
|
@app.route('/blog')
|
|
# @cache.cached(timeout=3600) # 1 hora
|
|
def blog():
|
|
q = r"""
|
|
SELECT
|
|
p.id,
|
|
u.nombre || ' ' || u.apellido AS autor,
|
|
TO_CHAR(p.created_at AT TIME ZONE 'America/Mexico_City', 'YYYY-MM-DD HH24:MI') AS author_creation,
|
|
TO_CHAR(p.updated_at AT TIME ZONE 'America/Mexico_City', 'YYYY-MM-DD HH24:MI') AS author_updated,
|
|
p.title,
|
|
SUBSTRING(p.body_no_img FROM 1 FOR 180) AS preview,
|
|
(SELECT value FROM jsonb_array_elements_text(p.lista_imagenes) LIMIT 1) AS first_img,
|
|
ROUND(length(regexp_replace(p.body_no_img, '\s+', ' ', 'g')) / 5.0 / 375, 1) AS read_time,
|
|
GREATEST(1, CEIL(length(regexp_replace(p.body_no_img, '\s+', ' ', 'g')) / 5.0 / 375)) AS read_time_min
|
|
FROM posts p
|
|
JOIN users u ON p.id_usr = u.id
|
|
ORDER BY p.created_at DESC;
|
|
"""
|
|
data = dbUsers.get_all_data_dict(q)
|
|
return render_template( v['blog']['all_posts'], active_page='blog', posts=data )
|
|
|
|
# @app.route('/blog/api/posts')
|
|
# @cache.cached(timeout=3600) # 1 hora
|
|
# def api_posts():
|
|
# q = r"""
|
|
# SELECT
|
|
# p.id,
|
|
# u.nombre || ' ' || u.apellido AS autor,
|
|
# TO_CHAR(p.created_at AT TIME ZONE 'America/Mexico_City', 'YYYY-MM-DD HH24:MI') AS author_creation,
|
|
# TO_CHAR(p.updated_at AT TIME ZONE 'America/Mexico_City', 'YYYY-MM-DD HH24:MI') AS author_updated,
|
|
# p.title,
|
|
# SUBSTRING(p.body_no_img FROM 1 FOR 180) AS preview,
|
|
# (SELECT value FROM jsonb_array_elements_text(p.lista_imagenes) LIMIT 1) AS first_img,
|
|
# ROUND(length(regexp_replace(p.body_no_img, '\s+', ' ', 'g')) / 5.0 / 375, 1) AS read_time,
|
|
# GREATEST(1, CEIL(length(regexp_replace(p.body_no_img, '\s+', ' ', 'g')) / 5.0 / 375)) AS read_time_min
|
|
# FROM posts p
|
|
# JOIN users u ON p.id_usr = u.id
|
|
# ORDER BY p.created_at DESC;
|
|
# """
|
|
# data = dbUsers.get_all_data_dict(q)
|
|
# return jsonify(data)
|
|
|
|
|
|
@app.route('/blog/<int:post_id>')
|
|
@app.route('/blog/<int:post_id>/src/<string:source_name>')
|
|
@cache.cached(timeout=1800) # 30 minutos
|
|
def blog_post(post_id, source_name=None):
|
|
# source_name = source_name or 'direct'
|
|
# source_name = source_name.lower()
|
|
# if source_name not in ['facebook', 'linkedin', 'x', 'li', 'fb', 'tw', 'direct']:
|
|
# source_name = 'None'
|
|
|
|
if source_name != None:
|
|
q_vf = "INSERT INTO visited_from (post_id, source_name) VALUES (%s, %s);"
|
|
t_vf = (post_id, source_name)
|
|
dbUsers.update_data(q_vf, t_vf)
|
|
|
|
q = fr"""
|
|
SELECT
|
|
u.nombre,
|
|
u.apellido,
|
|
TO_CHAR(p.created_at, 'DD/MM/YYYY') AS fecha_creada,
|
|
TO_CHAR(p.updated_at, 'DD/MM/YYYY') AS fecha_updated,
|
|
GREATEST(1, CEIL(length(regexp_replace(p.body_no_img, '\s+', ' ', 'g')) / 5.0 / 375)) AS read_time_min,
|
|
p.title,
|
|
p.body,
|
|
COUNT(pv.viewed) AS total_views
|
|
FROM
|
|
posts p
|
|
INNER JOIN
|
|
users u ON p.id_usr = u.id
|
|
LEFT JOIN
|
|
posts_visited pv ON pv.id_post = p.id
|
|
WHERE
|
|
p.id = %s
|
|
GROUP BY
|
|
u.nombre, u.apellido, p.created_at, p.updated_at, p.title, p.body, p.body_no_img;
|
|
"""
|
|
t = (post_id,)
|
|
data = dbUsers.get_data(q, t)
|
|
|
|
|
|
if data is None:
|
|
# Si no se encuentra el post, redirigir a la página de error 404
|
|
# return render_template('404.html'), 404
|
|
msg = l_flash_msj('Error Publicación','La publicación que intentas ver fue eliminada o no existe. 😅', 'warning')
|
|
flash(msg)
|
|
return redirect(url_for('blog'))
|
|
|
|
|
|
return render_template(v['blog']['post'], data=data)
|
|
|
|
@app.route('/blog/api/count-post-viewed')
|
|
@cache.cached(timeout=1800) # 30 minutos
|
|
def count_posts_viewed():
|
|
q = "SELECT id_post, COUNT(id_post) AS count FROM posts_visited GROUP BY id_post ORDER BY id_post;"
|
|
data = dbUsers.get_all_data_dict(q)
|
|
return jsonify(data)
|
|
|
|
@app.route('/blog/get-data', methods=['POST'])
|
|
def get_data():
|
|
data = request.get_json()
|
|
|
|
if not data:
|
|
return jsonify({"success": False, "message": "No data received"}), 400
|
|
# Ejemplo: extraer y mostrar los campos esperados
|
|
post_id = data.get("post_id")
|
|
|
|
# actualizar la base de datos
|
|
q_update = "INSERT INTO posts_visited (id_post, viewed) VALUES (%s, %s);"
|
|
d_tuple = (post_id, cur_date())
|
|
dbUsers.update_data(q_update, d_tuple)
|
|
|
|
return jsonify({"success": True}), 200
|
|
|
|
@app.route("/contact", methods=['GET', 'POST'])
|
|
def contact():
|
|
form = ContactForm()
|
|
# print(cur_date())
|
|
# get_date_n_time
|
|
|
|
if form.validate_on_submit():
|
|
# Procesar datos del formulario
|
|
|
|
c_date = cur_date()
|
|
nombre = form.nombre.data.strip()
|
|
apellido = form.apellido.data.strip()
|
|
email = form.email.data.strip()
|
|
estado = form.estado.data.strip()
|
|
num_tel = form.num_tel.data.strip()
|
|
size_co = form.size_co.data.strip()
|
|
rol_contacto = form.rol_contacto.data.strip()
|
|
industry_type = form.industry_type.data.strip()
|
|
tipo_req = form.tipo_req.data.strip()
|
|
|
|
# validar si ya se recibio info de ese email
|
|
q_val_mail = "SELECT COUNT(email) FROM contact WHERE email = %s;"
|
|
|
|
q_val_tel = "SELECT COUNT(num_tel) FROM contact WHERE num_tel = %s;"
|
|
|
|
res_mail = dbUsers.get_all_data(q_val_mail, (email,))
|
|
is_dup_mail = res_mail[0][0] if res_mail else 0
|
|
|
|
res_phone = dbUsers.get_all_data(q_val_tel, (num_tel,))
|
|
is_dup_phone = res_phone[0][0] if res_phone else 0
|
|
|
|
f_mnsj = None
|
|
|
|
if is_dup_mail > 0 or is_dup_phone > 0:
|
|
f_mnsj = l_flash_msj("Información Precargada", "Ya contamos con tu información, pronto nos pondremos en contacto", "success")
|
|
else:
|
|
|
|
obj_datetime = get_date_n_time(c_date)
|
|
hora = obj_datetime['hour']
|
|
fecha = obj_datetime['date']
|
|
data = ( c_date, nombre, apellido, email, estado, num_tel, size_co, rol_contacto, industry_type, tipo_req )
|
|
|
|
# Guardar datos en la base de datos
|
|
dbContact.carga_contact(data)
|
|
|
|
# Configurar y enviar email asíncrono
|
|
msg = Message( "Subject, una persona busca asesoria", sender=email_sender, recipients=lst_email_to() )
|
|
url_login = "https://formha.temporal.work/login"
|
|
|
|
msg.html = f"""
|
|
<div style="font-family: Arial, sans-serif;">
|
|
<img src="https://formha.temporal.work/static/y_img/logos/formha_blanco_vertical.png" alt="Escudo FORMHä" height="150" style="margin-bottom: 10px;"><br>
|
|
<h2 style="color: #333;">📩 Nuevo contacto recibido</h2>
|
|
<ul>
|
|
<li><strong>Fecha:</strong> {fecha}</li>
|
|
<li><strong>Hora:</strong> {hora}</li>
|
|
<li><strong>Nombre:</strong> {form.nombre.data} {form.apellido.data}</li>
|
|
<li><strong>Email:</strong> {form.email.data}</li>
|
|
<li><strong>Teléfono:</strong> {form.num_tel.data}</li>
|
|
<li><strong>Mensaje:</strong> {form.tipo_req.data}</li>
|
|
</ul>
|
|
<p><a href="{url_login}" target="_blank">👉 Iniciar Sesión</a></p>
|
|
</div>
|
|
"""
|
|
f_mnsj = l_flash_msj('Contacto', '¡Gracias por contactarnos! Te responderemos pronto. ☺️', 'success' )
|
|
|
|
# Enviar en segundo plano
|
|
thr = Thread( target=send_async_email, args=(current_app._get_current_object(), msg) )
|
|
thr.start()
|
|
|
|
flash(f_mnsj)
|
|
return redirect(url_for('contact'))
|
|
|
|
return render_template(v['contact'], form=form, active_page='contact')
|
|
|
|
@app.route("/login", methods=['GET', 'POST'])
|
|
def login():
|
|
form = LogIn()
|
|
if form.validate_on_submit():
|
|
f_email = f"{form.email.data}".lower()
|
|
f_pswd = form.password.data
|
|
res_pswd_server = dbUsers.login((f_email))
|
|
|
|
if res_pswd_server is None:
|
|
f_mnsj = l_flash_msj('Información', 'No se cueta con información del usuario', 'warning')
|
|
flash(f_mnsj)
|
|
return redirect(url_for('login'))
|
|
|
|
res_pswd_server = res_pswd_server[0]
|
|
if bcrypt.check_password_hash(res_pswd_server, f_pswd):
|
|
|
|
id_user = dbUsers.get_id(f_email)[0]
|
|
|
|
|
|
is_admin = dbUsers.get_data('SELECT is_admin FROM users WHERE id = %s;', (id_user,))[0]
|
|
|
|
# Crear token JWT: access_token = create_access_token(identity=id_user)
|
|
# access_token = create_access_token( identity=id_user )
|
|
access_token = create_access_token( identity=id_user, additional_claims={"is_admin": is_admin} )
|
|
|
|
# Redirigir a usr_home o a Admin depende
|
|
# din_home = 'admin_home' if is_admin else 'user_home'
|
|
|
|
response = make_response(redirect(url_for("user_home")))
|
|
|
|
set_access_cookies(response, access_token)
|
|
return response
|
|
else:
|
|
f_mnsj = l_flash_msj('Credenciales', 'Verificar usuario y/o contraseña.', 'error')
|
|
flash(f_mnsj)
|
|
return render_template(v['login'], form=form, active_page='login')
|
|
|
|
|
|
@app.route("/recover-pswd", methods=['GET', 'POST'])
|
|
def recover_pswd():
|
|
form = RecoverPswd()
|
|
if form.validate_on_submit():
|
|
f_email = f"{form.email.data}".lower()
|
|
emailPswdReco = dbUsers.reset_pswd(f_email)
|
|
|
|
if emailPswdReco is None:
|
|
f_mnsj = l_flash_msj("Error", "Verificar el correo electrónico ingresado.", "error")
|
|
flash(f_mnsj)
|
|
return render_template(v['recover_pswd'], form=form, active_page='login')
|
|
|
|
emailPswdReco = emailPswdReco[0]
|
|
new_tmp_pswd = generar_contrasena()
|
|
hashed_new_pswd = hash_password(new_tmp_pswd)
|
|
|
|
# Configurar y enviar email asíncrono
|
|
msg = Message(
|
|
"Subject: Recuperación de contraseña",
|
|
sender=email_sender,
|
|
recipients=[emailPswdReco] # Asegúrate que es una lista
|
|
)
|
|
|
|
|
|
msg.html = f"""
|
|
<!DOCTYPE html>
|
|
<html lang="es">
|
|
<head>
|
|
<meta charset="UTF-8">
|
|
<title>Recuperación de contraseña</title>
|
|
</head>
|
|
<body style="font-family: Arial, sans-serif; background-color: #f4f4f4; padding: 20px;">
|
|
<div style="max-width: 600px; margin: auto; background-color: white; padding: 30px; border-radius: 10px; box-shadow: 0 0 10px rgba(0,0,0,0.1);">
|
|
<h2 style="color: #2c3e50;">🔒 Recuperación de contraseña</h2>
|
|
<p>Hola,</p>
|
|
<p>Hemos recibido una solicitud para restablecer tu contraseña. A continuación te proporcionamos una contraseña temporal que podrás usar para iniciar sesión:</p>
|
|
|
|
<p style="font-size: 18px;"><strong>🔑 Contraseña temporal:</strong> <span style="color: #e74c3c;">{new_tmp_pswd}</span></p>
|
|
|
|
<p><strong>Importante:</strong> Una vez que inicies sesión, te recomendamos cambiar tu contraseña por una que solo tú conozcas y puedas recordar fácilmente.</p>
|
|
|
|
<hr>
|
|
<p style="font-size: 12px; color: #888;">Este es un mensaje automático, por favor no respondas a este correo.</p>
|
|
</div>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
|
|
|
|
dbUsers.update_pswd(pswd=hashed_new_pswd, email=emailPswdReco)
|
|
|
|
# Enviar en segundo plano
|
|
thr = Thread(
|
|
target=send_async_email,
|
|
args=(current_app._get_current_object(), msg)
|
|
)
|
|
thr.start()
|
|
f_mnsj = l_flash_msj("Éxito", "Se ha enviado una contraseña temporal a tu correo electrónico.", "success")
|
|
flash(f_mnsj)
|
|
return redirect(url_for('login')) # Redirige en lugar de renderizar
|
|
|
|
return render_template(v['recover_pswd'], form=form, active_page='login')
|
|
|
|
|
|
# #################################################################################################################################
|
|
#
|
|
# $$\ $$\ $$$$$$\ $$$$$$$$\ $$$$$$$\ $$$$$$\ $$$$$$$\ $$\ $$\ $$$$$$\ $$\ $$\
|
|
# $$ | $$ |$$ __$$\ $$ _____|$$ __$$\ $$ __$$\ $$ __$$\ $$$\ $$$ |\_$$ _|$$$\ $$ |
|
|
# $$ | $$ |$$ / \__|$$ | $$ | $$ | $$ / $$ |$$ | $$ |$$$$\ $$$$ | $$ | $$$$\ $$ |
|
|
# $$ | $$ |\$$$$$$\ $$$$$\ $$$$$$$ | $$$$$$\ $$$$$$$$ |$$ | $$ |$$\$$\$$ $$ | $$ | $$ $$\$$ |
|
|
# $$ | $$ | \____$$\ $$ __| $$ __$$< \______| $$ __$$ |$$ | $$ |$$ \$$$ $$ | $$ | $$ \$$$$ |
|
|
# $$ | $$ |$$\ $$ |$$ | $$ | $$ | $$ | $$ |$$ | $$ |$$ |\$ /$$ | $$ | $$ |\$$$ |
|
|
# \$$$$$$ |\$$$$$$ |$$$$$$$$\ $$ | $$ | $$ | $$ |$$$$$$$ |$$ | \_/ $$ |$$$$$$\ $$ | \$$ |
|
|
# \______/ \______/ \________|\__| \__| \__| \__|\_______/ \__| \__|\______|\__| \__|
|
|
#
|
|
# Font Name: Big Money-ne | https://patorjk.com/software/taag/#p=testall&f=Graffiti&t=USER
|
|
#
|
|
# #################################################################################################################################
|
|
|
|
@app.context_processor
|
|
@jwt_required(optional=True)
|
|
def inject_user_role():
|
|
try:
|
|
token_data = get_jwt()
|
|
return {'is_admin': token_data.get('is_admin', False)}
|
|
except Exception:
|
|
return {'is_admin': False}
|
|
|
|
|
|
def admin_required(view_func):
|
|
@wraps(view_func)
|
|
def wrapped_view(*args, **kwargs):
|
|
token_data = get_jwt()
|
|
is_admin = token_data.get('is_admin', False)
|
|
|
|
if not is_admin:
|
|
f_mnsj = l_flash_msj('Error', 'No tienes permisos para acceder a esta sección.', 'error')
|
|
flash(f_mnsj)
|
|
return redirect(url_for('user_home'))
|
|
|
|
return view_func(*args, **kwargs)
|
|
return wrapped_view
|
|
|
|
|
|
@app.route('/user/home')
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def user_home():
|
|
# get_jwt_identity()
|
|
# todo el token
|
|
token_data = get_jwt()
|
|
is_admin = token_data.get('is_admin', False)
|
|
# timestamp expiración de la sesión
|
|
exp = token_data['exp']
|
|
|
|
|
|
# id del usuario activo:
|
|
usr_id = token_data['sub']
|
|
|
|
is_pswd_reseted = dbUsers.get_data('SELECT is_pswd_reseted FROM users WHERE id = %s;', (usr_id,))[0]
|
|
|
|
|
|
update_pswd = l_flash_msj('Recomendación', 'Cambia tu contraseña por una que recuerdes.', 'warning') if is_pswd_reseted else None
|
|
|
|
q = "SELECT nombre, genero, lst_conn FROM users WHERE id = %s;"
|
|
data_saludo = dbUsers.get_data(q, (usr_id,))
|
|
nombre, gender, lst_conn = data_saludo
|
|
|
|
lst_conn = '' if lst_conn is None else get_date_n_time(lst_conn)['date']
|
|
current_date = get_date_n_time(cur_date())['date']
|
|
|
|
f_mnsj = None
|
|
|
|
if lst_conn != current_date:
|
|
strGreet = 'Bienvenido' if gender == 'M' else 'Bienvenida'
|
|
f_mnsj = l_flash_msj(f'{saludo_hr()}', f'{strGreet} {nombre}', 'success')
|
|
# dbUsers.update_data(q, d)
|
|
flash(f_mnsj)
|
|
|
|
flash(update_pswd)
|
|
|
|
q = "UPDATE users SET lst_conn = %s WHERE id = %s;"
|
|
d = (cur_date(), usr_id)
|
|
dbUsers.update_data(q, d)
|
|
|
|
|
|
q_contact = """
|
|
SELECT
|
|
ID,
|
|
TO_CHAR(FULL_DATE_TIME, 'DD/MM/YYYY') AS FECHA_FORMATEADA,
|
|
NOMBRE,
|
|
APELLIDO,
|
|
ESTADO,
|
|
SIZE_CO,
|
|
ROL_CONTACTO,
|
|
INDUSTRY_TYPE,
|
|
STATUS
|
|
FROM
|
|
CONTACT
|
|
WHERE
|
|
STATUS <> 'Archivado'
|
|
OR STATUS IS NULL
|
|
ORDER BY
|
|
FULL_DATE_TIME DESC;
|
|
"""
|
|
data_contact = dbUsers.get_all_data(q_contact, ())
|
|
|
|
return render_template(v['tmp_user']['home'], f_mnsj=f_mnsj, nombre=nombre, exp=exp, data_contact=data_contact, active_page='user_home')
|
|
|
|
@app.route('/user/manage-record', methods=['POST'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def manage_record():
|
|
# print(request.get_json())
|
|
# print("Cookies recibida:", request.cookies)
|
|
# token = request.cookies.get('access_token_cookie')
|
|
# decoded_token = decode_token(token)
|
|
data = request.get_json()
|
|
id = data['id']
|
|
valor = data['value']
|
|
|
|
q = 'UPDATE contact SET status = %s WHERE id = %s;'
|
|
dbUsers.update_data(q, (valor, id))
|
|
|
|
return jsonify({"type": "success"})
|
|
|
|
@app.route('/user/get-contact-data', methods=['POST'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def get_contact_data():
|
|
data = request.get_json()
|
|
id = data['id']
|
|
q = "SELECT TO_CHAR(FULL_DATE_TIME, 'DD/MM/YYYY') as fecha, TO_CHAR(FULL_DATE_TIME, 'HH24:MI') as hora, nombre, apellido, email, estado, num_tel, size_co, rol_contacto, industry_type, tipo_req, status FROM contact WHERE id = %s;"
|
|
t = (id,)
|
|
dbData = dbUsers.get_data(q, t)
|
|
return jsonify({"data": dbData})
|
|
|
|
|
|
|
|
@app.route('/user/download-db', methods=['GET'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@admin_required
|
|
def download_db():
|
|
q = """
|
|
SELECT
|
|
id,
|
|
TO_CHAR(FULL_DATE_TIME, 'DD/MM/YYYY') as fecha,
|
|
TO_CHAR(FULL_DATE_TIME, 'HH24:MI') as hora,
|
|
nombre,
|
|
apellido,
|
|
email,
|
|
estado,
|
|
num_tel,
|
|
size_co,
|
|
rol_contacto,
|
|
industry_type,
|
|
tipo_req,
|
|
status
|
|
FROM
|
|
contact;
|
|
"""
|
|
dbData = dbUsers.get_all_data(q, ())
|
|
return jsonify({"data": dbData})
|
|
|
|
@app.route('/user/txt-editor')
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@cache.cached(timeout=43200)
|
|
def user_txteditor():
|
|
template_name = v['tmp_user'].get('txt_editor')
|
|
return render_template(template_name, active_page='user_txteditor')
|
|
|
|
@app.route('/user/save-post', methods=['POST'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def save_post():
|
|
id_usr = get_jwt()['sub']
|
|
data = request.get_json()
|
|
title = data['title']
|
|
body = data['body']
|
|
time = cur_date()
|
|
|
|
soup = BeautifulSoup(body, 'html.parser')
|
|
body_no_img = soup.get_text(separator=' ', strip=True)
|
|
etiquetas_img = re.findall(r'<img[^>]+src=["\'](.*?)["\']', body)
|
|
imagenes_json = json.dumps(etiquetas_img) if etiquetas_img else None
|
|
|
|
q = None
|
|
t = None
|
|
if "id" in data:
|
|
q = 'UPDATE posts SET updated_at = %s, title = %s, body = %s, body_no_img = %s, lista_imagenes = %s::jsonb WHERE id = %s AND id_usr = %s;'
|
|
t = (time, title, body, body_no_img, imagenes_json, data['id'], id_usr)
|
|
else:
|
|
q = "INSERT INTO posts (id_usr, created_at, title, body, body_no_img, lista_imagenes) VALUES (%s, %s, %s, %s, %s, %s::jsonb);"
|
|
t = (id_usr, time, title, body, body_no_img, imagenes_json)
|
|
|
|
try:
|
|
dbUsers.update_data(q, t)
|
|
return jsonify({
|
|
"status": "success",
|
|
"title_post": title,
|
|
"redirect_url": url_for('my_posts') # o usa _external=True si se necesita URL completa
|
|
# url_for('my_posts', _external=True)
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({"status": "error", "message": str(e)}), 500
|
|
|
|
|
|
|
|
@app.route('/user/my-posts')
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def my_posts():
|
|
id_usr = get_jwt()['sub']
|
|
page = int(request.args.get("page", 1))
|
|
per_page = 8 # Número de tarjetas por página
|
|
|
|
# Obtener todos los posts del usuario (puedes optimizar esto con paginación SQL real después)
|
|
q_all = r"""
|
|
SELECT
|
|
id,
|
|
TO_CHAR(created_at, 'DD/MM/YYYY HH24:MI') as fecha_creada,
|
|
TO_CHAR(updated_at, 'DD/MM/YYYY HH24:MI') as fecha_updated,
|
|
title,
|
|
LEFT(body_no_img, 180),
|
|
lista_imagenes->0,
|
|
GREATEST(array_length(regexp_split_to_array(TRIM(body_no_img), '\s+'), 1) / 375, 1) as n_words
|
|
FROM
|
|
posts
|
|
WHERE
|
|
id_usr = %s
|
|
ORDER BY
|
|
COALESCE(updated_at, created_at) DESC,
|
|
created_at DESC;
|
|
"""
|
|
|
|
data_all = dbUsers.get_all_data(q_all, (id_usr,))
|
|
total_posts = len(data_all)
|
|
total_pages = (total_posts + per_page - 1) // per_page
|
|
|
|
start = (page - 1) * per_page
|
|
end = start + per_page
|
|
data = data_all[start:end]
|
|
|
|
return render_template(
|
|
v['tmp_user']['my_posts'],
|
|
data=data,
|
|
current_page=page,
|
|
total_pages=total_pages,
|
|
active_page='my_posts'
|
|
)
|
|
|
|
|
|
@app.route('/user/del-post', methods=['POST'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def del_post():
|
|
data = request.get_json()
|
|
t = (data['id'],)
|
|
q = 'DELETE FROM posts WHERE id = %s;'
|
|
dbUsers.update_data(q, t)
|
|
res = {'ok': True, 'message': f'El elemento se eliminó, pendiente manejar el error en el frontend', "id": t}
|
|
return jsonify(res), 200
|
|
|
|
@app.route('/user/<int:post_id>')
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def post(post_id):
|
|
# Obtener el post
|
|
usr_id = get_jwt()['sub']
|
|
q = """
|
|
SELECT
|
|
TO_CHAR(created_at, 'DD/MM/YYYY HH24:MI') as fecha_creada,
|
|
TO_CHAR(updated_at, 'DD/MM/YYYY HH24:MI') as fecha_updated,
|
|
title,
|
|
body
|
|
FROM
|
|
posts
|
|
WHERE
|
|
id_usr = %s AND id = %s;
|
|
"""
|
|
t = (usr_id, post_id )
|
|
data = dbUsers.get_data(q, t)
|
|
q_nw = r"SELECT array_length(regexp_split_to_array(TRIM(body_no_img), '\s+'), 1) FROM posts WHERE id_usr = %s AND id = %s;"
|
|
n_words = dbUsers.get_data(q_nw, t)[0]
|
|
time_read = min_read_pst(n_words)
|
|
return render_template(v['tmp_user']['read_post'], data=data, time_read=time_read, post_id=post_id)
|
|
|
|
@app.route('/user/edit-post/<int:id_post>')
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def edit_post(id_post):
|
|
q = 'SELECT title, body FROM posts WHERE id = %s;'
|
|
t = (id_post,)
|
|
data = dbUsers.get_data(q, t)
|
|
return render_template(v['tmp_user']['edit_post'], data=data, id_post=id_post)
|
|
|
|
|
|
@app.route('/user/change-pswd', methods=['GET', 'POST'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
def change_pswd():
|
|
form = ChangePwsd()
|
|
if form.validate_on_submit():
|
|
f_old_pswd = form.cur_pswd.data
|
|
f_new_pswd = form.new_pswd.data
|
|
usr_id = get_jwt()['sub']
|
|
q = "SELECT pswd FROM users WHERE id = %s;"
|
|
t = (usr_id,)
|
|
data = dbUsers.get_data(q, t)[0]
|
|
|
|
if bcrypt.check_password_hash(data, f_old_pswd):
|
|
new_hash_pswd = hash_password(f_new_pswd)
|
|
q = "UPDATE users SET pswd = %s, is_pswd_reseted = false WHERE id = %s;"
|
|
t = (new_hash_pswd, usr_id)
|
|
dbUsers.update_data(q, t)
|
|
|
|
f_mnsj = l_flash_msj('Éxito', 'La contraseña ha sido actualizada.', 'success')
|
|
flash(f_mnsj)
|
|
|
|
response = make_response(redirect(url_for('login')))
|
|
response.delete_cookie('access_token_cookie')
|
|
return response
|
|
else:
|
|
f_mnsj = l_flash_msj('Error', 'La contraseña a actualizar es incorrecta.', 'error')
|
|
flash(f_mnsj)
|
|
|
|
return render_template(v['tmp_user']['change_pswd'], active_page='change_pswd', form=form)
|
|
|
|
|
|
@app.route('/user/metrics')
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@admin_required
|
|
def metrics():
|
|
return render_template(v['tmp_user']['metrics'], active_page='metrics')
|
|
|
|
@app.route('/user/metrics/data')
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@admin_required
|
|
def data_metrics():
|
|
# SELECT pv.id_post, pv.viewed, u.id, u.nombre, u.apellido
|
|
# FROM posts_visited pv
|
|
# INNER JOIN posts p ON p.id = pv.id_post
|
|
# INNER JOIN users u ON u.id = p.id_usr;
|
|
|
|
# -- SELECT pv.id_post, pv.viewed, u.nombre, u.apellido
|
|
# -- FROM posts_visited pv
|
|
# -- INNER JOIN posts p ON p.id = pv.id_post
|
|
# -- INNER JOIN users u ON u.id = p.id_usr;
|
|
|
|
q_contact = r"""
|
|
SELECT
|
|
CASE EXTRACT(MONTH FROM full_date_time AT TIME ZONE 'America/Mexico_City')
|
|
WHEN 1 THEN 'Enero'
|
|
WHEN 2 THEN 'Febrero'
|
|
WHEN 3 THEN 'Marzo'
|
|
WHEN 4 THEN 'Abril'
|
|
WHEN 5 THEN 'Mayo'
|
|
WHEN 6 THEN 'Junio'
|
|
WHEN 7 THEN 'Julio'
|
|
WHEN 8 THEN 'Agosto'
|
|
WHEN 9 THEN 'Septiembre'
|
|
WHEN 10 THEN 'Octubre'
|
|
WHEN 11 THEN 'Noviembre'
|
|
WHEN 12 THEN 'Diciembre'
|
|
END AS mes,
|
|
COUNT(*) AS cantidad_registros,
|
|
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje
|
|
FROM
|
|
contact
|
|
GROUP BY
|
|
EXTRACT(MONTH FROM full_date_time AT TIME ZONE 'America/Mexico_City')
|
|
ORDER BY
|
|
EXTRACT(MONTH FROM full_date_time AT TIME ZONE 'America/Mexico_City');
|
|
"""
|
|
q_count_state = "SELECT estado, COUNT(estado) AS conteo_edo, ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje FROM contact GROUP BY estado ORDER BY conteo_edo DESC;"
|
|
q_size_co = "SELECT size_co, COUNT(size_co) AS conteo_size FROM contact GROUP BY size_co ORDER BY conteo_size DESC;"
|
|
q_rol_contact = """
|
|
SELECT
|
|
rol_contacto,
|
|
COUNT(*) AS conteo,
|
|
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje
|
|
FROM contact
|
|
GROUP BY rol_contacto
|
|
ORDER BY conteo DESC;
|
|
"""
|
|
|
|
q_industry_type = """
|
|
SELECT
|
|
industry_type,
|
|
COUNT(industry_type) AS conteo,
|
|
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje
|
|
FROM
|
|
contact
|
|
GROUP BY
|
|
industry_type
|
|
ORDER BY
|
|
conteo DESC;
|
|
"""
|
|
|
|
q_group_status = """
|
|
SELECT
|
|
COALESCE(status, 'Sin Estatus') AS status,
|
|
COUNT(*) AS conteo,
|
|
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje
|
|
FROM
|
|
contact
|
|
GROUP BY
|
|
COALESCE(status, 'Sin Estatus')
|
|
ORDER BY
|
|
conteo;
|
|
"""
|
|
|
|
q_posts_mas_vistos = "SELECT id_post, COUNT(id_post) AS conteo FROM posts_visited GROUP BY id_post ORDER BY conteo DESC LIMIT 10;"
|
|
|
|
q_top_three_authors = """
|
|
SELECT
|
|
CONCAT(u.nombre, ' ', u.apellido) AS autor,
|
|
COUNT(*) AS total_posts
|
|
FROM
|
|
posts_visited pv
|
|
INNER JOIN
|
|
posts p ON p.id = pv.id_post
|
|
INNER JOIN
|
|
users u ON u.id = p.id_usr
|
|
GROUP BY
|
|
u.id, u.nombre, u.apellido
|
|
ORDER BY
|
|
total_posts DESC
|
|
LIMIT 3;
|
|
"""
|
|
|
|
q_visited_from = """
|
|
SELECT
|
|
CASE source_name
|
|
WHEN 'li' THEN 'LinkedIn'
|
|
WHEN 'wa' THEN 'WhatsApp'
|
|
WHEN 'fb' THEN 'Facebook'
|
|
WHEN 'x' THEN 'X'
|
|
ELSE 'Otro'
|
|
END AS nombre_legible,
|
|
COUNT(source_name) AS total
|
|
FROM visited_from
|
|
GROUP BY source_name
|
|
ORDER BY total DESC;
|
|
"""
|
|
|
|
|
|
data_contact = {
|
|
"count_monthly": dbUsers.get_all_data(q_contact, ()),
|
|
"count_state": dbUsers.get_all_data(q_count_state, ()),
|
|
"size_co": dbUsers.get_all_data(q_size_co, ()),
|
|
"rol_contact": dbUsers.get_all_data(q_rol_contact, ()),
|
|
"industry_type": dbUsers.get_all_data(q_industry_type, ()),
|
|
"group_status": dbUsers.get_all_data(q_group_status, ()),
|
|
"top_ten": dbUsers.get_all_data(q_posts_mas_vistos, ()),
|
|
"top_three_authors": dbUsers.get_all_data(q_top_three_authors, ()),
|
|
"visited_from": dbUsers.get_all_data(q_visited_from, ())
|
|
}
|
|
|
|
return jsonify(data_contact)
|
|
|
|
|
|
@app.route('/user/manage-profiles', methods=['GET', 'POST'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@admin_required
|
|
def manage_profiles():
|
|
form = AddUser()
|
|
|
|
id_usr = get_jwt()['sub']
|
|
|
|
q_all_users = """
|
|
SELECT
|
|
u.id, u.nombre, u.apellido, u.email, COALESCE(TO_CHAR(u.lst_conn, 'DD/MM/YYYY HH24:MI'), 'Sin conexión') AS ultima_conexion,
|
|
CASE
|
|
WHEN u.is_admin = true THEN 'Sí'
|
|
WHEN u.is_admin = false THEN 'No'
|
|
END AS admin,
|
|
COALESCE(p.conteo, 0) AS conteo,
|
|
CASE
|
|
WHEN u.is_contact_noti = true THEN 'Sí'
|
|
WHEN u.is_contact_noti = false THEN 'No'
|
|
END AS isNotificated
|
|
FROM users u
|
|
LEFT JOIN (
|
|
SELECT id_usr, COUNT(*) AS conteo
|
|
FROM posts
|
|
GROUP BY id_usr
|
|
) p ON u.id = p.id_usr
|
|
ORDER BY conteo DESC;
|
|
"""
|
|
data_all_users = dbUsers.get_all_data(q_all_users, ())
|
|
|
|
if form.validate_on_submit():
|
|
f_nombre = f'{form.nombre.data}'.title().strip()
|
|
f_apellido = f'{form.apellido.data}'.title().strip()
|
|
f_genero = form.genero.data
|
|
f_email = f'{form.email.data}'.lower().strip()
|
|
f_isAdmin = f'{form.isAdmin.data}'.upper().strip()
|
|
f_isContactNoti = f'{form.isContactNoti.data}'.upper().strip()
|
|
f_id = form.id.data.strip()
|
|
f_mnsj = None
|
|
q = None
|
|
t = None
|
|
subject = None
|
|
html_content = None
|
|
|
|
r_str_isAdmin = 'Sí' if f_isAdmin == 'TRUE' else 'No'
|
|
r_str_isContactNoti = 'Sí' if f_isContactNoti == 'TRUE' else 'No'
|
|
|
|
# si el f_id no es igual a '' entonces es una actualización de datos
|
|
if f_id != '':
|
|
q = "UPDATE users SET nombre = %s, apellido = %s, genero = %s, email = %s, is_admin = %s, is_contact_noti = %s WHERE id = %s;"
|
|
t = (f_nombre, f_apellido, f_genero, f_email, f_isAdmin, f_isContactNoti, f_id)
|
|
f_mnsj = l_flash_msj('Éxito', f'Usuario actualizado: {f_nombre}', 'success')
|
|
subject = "Usuario actualizado"
|
|
html_content = """
|
|
<table width="100%" cellpadding="0" cellspacing="0" style="font-family: Arial, sans-serif; background-color: #f4f4f4; padding: 20px;">
|
|
<tr>
|
|
<td align="center">
|
|
<table width="600" cellpadding="0" cellspacing="0" style="background-color: #ffffff; border-radius: 8px; padding: 40px;">
|
|
<tr>
|
|
<td align="center" style="padding-bottom: 20px;">
|
|
<img src="https://formha.temporal.work/static/y_img/logos/formha_blanco_vertical.png" alt="Logo Formha" style="width: 150px; height: auto;">
|
|
<p style="color: #4a5568; font-size: 16px; margin-top: 10px;"><strong>Tu cuenta ha sido actualizada con éxito.</strong></p>
|
|
</td>
|
|
</tr>
|
|
<tr>
|
|
<td style="color: #2d3748; font-size: 15px; line-height: 1.6;">
|
|
<p>
|
|
<strong>Nombre:</strong> {} <br>
|
|
<strong>Apellido:</strong> {}<br>
|
|
<strong>Género:</strong> {} <br>
|
|
<strong>Email:</strong> {} <br>
|
|
<strong>Permisos de Admin:</strong> {}<br>
|
|
<strong>Notificaciones por email:</strong> {}
|
|
</p>
|
|
</td>
|
|
</tr>
|
|
<tr>
|
|
<td align="center" style="padding-top: 30px;">
|
|
<a href="https://formha.temporal.work/login" style="background-color: #4299e1; color: #ffffff; text-decoration: none; padding: 12px 24px; border-radius: 5px; font-weight: bold; display: inline-block;">
|
|
Ir al sitio Formha
|
|
</a>
|
|
</td>
|
|
</tr>
|
|
<tr>
|
|
<td align="center" style="padding-top: 30px; font-size: 12px; color: #a0aec0;">
|
|
Si no realizaste esta actualización, por favor contacta con el soporte técnico.
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
""".format(f_nombre, f_apellido, f_genero, f_email, r_str_isAdmin, r_str_isContactNoti)
|
|
|
|
|
|
# en caso de que el f_id = '' quiere decir que es un nuevo registro y debo checar que no sea un registro preexiste a partir del email
|
|
else:
|
|
q_isDuplicated = "SELECT email FROM users WHERE email = %s;"
|
|
t_isDuplicated = (f_email,)
|
|
isDuplicated = dbUsers.get_data(q_isDuplicated, t_isDuplicated)
|
|
|
|
if isDuplicated is not None:
|
|
f_mnsj = l_flash_msj('Error', 'El correo electrónico ya existe, NO SE SOBREESCRIBE EL USUARIO EXISTENTE.', 'error')
|
|
flash(f_mnsj)
|
|
return redirect(url_for('manage_profiles'))
|
|
|
|
|
|
random_id = getRandomId()
|
|
tmp_pswd = generar_contrasena()
|
|
hashed_pswd = hash_password(tmp_pswd)
|
|
q = "INSERT INTO users (id, nombre, apellido, genero, email, pswd, is_admin, is_pswd_reseted, is_contact_noti ) values (%s, %s, %s, %s, %s, %s, %s, %s, %s);"
|
|
t = (random_id, f_nombre, f_apellido, f_genero, f_email, hashed_pswd, f_isAdmin, True, f_isContactNoti)
|
|
f_mnsj = l_flash_msj('Éxito', f'Usuario creado: {f_nombre}', 'success')
|
|
|
|
subject = "Nueva cuenta creada"
|
|
html_content = """
|
|
<table width="100%" cellpadding="0" cellspacing="0" style="font-family: Arial, sans-serif; background-color: #f4f4f4; padding: 20px;">
|
|
<tr>
|
|
<td align="center">
|
|
<table width="600" cellpadding="0" cellspacing="0" style="background-color: #ffffff; border-radius: 8px; padding: 40px;">
|
|
<tr>
|
|
<td align="center" style="padding-bottom: 20px;">
|
|
<img src="https://formha.temporal.work/static/y_img/logos/formha_blanco_vertical.png" alt="Logo Formha" style="width: 150px; height: auto;">
|
|
<h1 style="color: #2c5282; margin: 0;">¡Bienvenido a <span style="color:#4299e1;">Formha</span>!</h1>
|
|
<p style="color: #4a5568; font-size: 16px; margin-top: 10px;">Tu cuenta ha sido creada con éxito.</p>
|
|
</td>
|
|
</tr>
|
|
<tr>
|
|
<td style="color: #2d3748; font-size: 15px; line-height: 1.6;">
|
|
<p>
|
|
<strong>Nombre:</strong> {}<br>
|
|
<strong>Apellido:</strong> {}<br>
|
|
<strong>Género:</strong> {}<br>
|
|
<strong>Email:</strong> {}<br>
|
|
<strong>Contraseña temporal:</strong> <code style="background-color:#edf2f7; padding: 4px 8px; border-radius: 4px;">{}</code><br>
|
|
<strong>Permisos de Admin:</strong> {}<br>
|
|
<strong>Notificaciones por email:</strong> {}
|
|
</p>
|
|
<p style="margin-top: 20px;"><strong style="color: #e53e3e;">Importante:</strong> Una vez inicies sesión, debes cambiar la contraseña por una nueva que recuerdes.</p>
|
|
</td>
|
|
</tr>
|
|
<tr>
|
|
<td align="center" style="padding-top: 30px;">
|
|
<a href="https://formha.temporal.work/login" style="background-color: #4299e1; color: #ffffff; text-decoration: none; padding: 12px 24px; border-radius: 5px; font-weight: bold; display: inline-block;">
|
|
Ir al sitio Formha
|
|
</a>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
</td>
|
|
</tr>
|
|
</table>
|
|
""".format(f_nombre, f_apellido, f_genero, f_email, tmp_pswd, r_str_isAdmin, r_str_isContactNoti)
|
|
|
|
# Crear el mensaje de correo con todo el contenido
|
|
msg = Message(
|
|
subject=subject,
|
|
sender=email_sender,
|
|
recipients=[f_email]
|
|
)
|
|
msg.html = html_content # Asignar el contenido HTML aquí
|
|
|
|
# Enviar en segundo plano
|
|
thr = Thread(target=send_async_email, args=(current_app._get_current_object(), msg))
|
|
thr.start()
|
|
|
|
dbUsers.update_data(q, t)
|
|
flash(f_mnsj)
|
|
return redirect(url_for('manage_profiles'))
|
|
|
|
return render_template(v['tmp_user']['manage_profiles'], form=form, data_all_users=data_all_users, active_page='manage_profiles', id_usr=id_usr)
|
|
|
|
|
|
@app.route('/user/manage-profiles/delete-usr', methods=['GET', 'POST'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@admin_required
|
|
def delete_user():
|
|
data = request.get_json()
|
|
id_usr = data['id']
|
|
q_del_usr = "DELETE FROM users WHERE id = %s;"
|
|
dbUsers.update_data(q_del_usr, (id_usr,))
|
|
|
|
q_del_posts = "DELETE FROM posts WHERE id_usr = %s;"
|
|
dbUsers.update_data(q_del_posts, (id_usr,))
|
|
|
|
res = {'ok': True, 'message': f'El elemento se eliminó, pendiente manejar el error en el frontend', "id": id_usr}
|
|
return jsonify(res), 200
|
|
|
|
@app.route('/user/manage-profiles/get-user', methods=['POST']) # Cambiado a POST
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@admin_required
|
|
def get_user():
|
|
data = request.get_json()
|
|
id_usr = data['id']
|
|
q = "SELECT id, nombre, apellido, genero, email, is_admin, is_contact_noti FROM users WHERE id = %s;"
|
|
t = (id_usr,)
|
|
dbData = dbUsers.get_data(q, t)
|
|
return jsonify({"data": dbData})
|
|
|
|
|
|
@app.route('/user/manage-profiles/update-user', methods=['POST'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@admin_required
|
|
def update_user():
|
|
data = request.get_json()
|
|
id_usr = data['id']
|
|
f_nombre = f'{data["nombre"]}'.title().strip()
|
|
f_apellido = f'{data["apellido"]}'.title().strip()
|
|
f_genero = data['genero']
|
|
f_email = f'{data["email"]}'.lower().strip()
|
|
f_isAdmin = data['isAdmin'].strip().lower()
|
|
|
|
q = "UPDATE users SET nombre = %s, apellido = %s, genero = %s, email = %s, is_admin = %s WHERE id = %s;"
|
|
|
|
t = (f_nombre, f_apellido, f_genero, f_email, f_isAdmin, id_usr)
|
|
|
|
dbUsers.update_data(q, t)
|
|
|
|
res = {'ok': True, 'message': 'El elemento se actualizó correctamente', "id": "id_usr"}
|
|
return jsonify(res), 200
|
|
|
|
|
|
|
|
@app.route('/user/carousel', methods=['GET', 'POST'])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@admin_required
|
|
def carousel():
|
|
form = Carousel()
|
|
mnsj_flash = None
|
|
|
|
q_all_slides = """
|
|
SELECT
|
|
id,
|
|
img_name,
|
|
bg_color,
|
|
txt_color,
|
|
txt,
|
|
TO_CHAR(created_at, 'DD/MM/YYYY HH24:MI') as fecha_creada,
|
|
url,
|
|
is_new_tab
|
|
FROM
|
|
carousel
|
|
ORDER BY
|
|
id DESC;
|
|
"""
|
|
|
|
data = dbUsers.get_all_data(q_all_slides, ())
|
|
|
|
if request.method == 'POST' and form.validate_on_submit():
|
|
# img
|
|
image_file = form.img.data
|
|
filename = secure_filename(image_file.filename)
|
|
|
|
prev_img = dbUsers.get_all_data("SELECT img_name FROM carousel;", ())
|
|
lst_img = [ele[0] for ele in prev_img]
|
|
|
|
url = None if form.url.data == '' else form.url.data
|
|
|
|
isNewTab = form.isNewTab.data
|
|
|
|
# validar que el archivo no se haya cargado previamente
|
|
if filename in lst_img:
|
|
mnsj_flash = l_flash_msj('Error Archivo', 'La imagen ya ha sido cargada previamente.', 'error')
|
|
else:
|
|
# image_file.save(f'./static/uploads/{filename}')
|
|
image_file.save(os.path.join(folder_upload, filename))
|
|
|
|
bg_color = rgba_to_string((*hex_to_rgb(form.bg_color.data), 0.5))
|
|
txt_color = rgba_to_string((*hex_to_rgb(form.txt_color.data), 1))
|
|
txt = form.txt.data
|
|
|
|
q = "INSERT INTO carousel (img_name, bg_color, txt_color, txt, url, is_new_tab) VALUES (%s, %s, %s, %s, %s, %s);"
|
|
t = (filename, bg_color, txt_color, txt, url, isNewTab)
|
|
dbUsers.update_data(q, t)
|
|
mnsj_flash = l_flash_msj('Datos Slide', f'Datos agregados para imagen: {filename}', 'success')
|
|
|
|
flash(mnsj_flash)
|
|
return redirect(url_for('carousel'))
|
|
|
|
return render_template(v['tmp_user']['carousel'], form=form, data=data, active_page='carousel')
|
|
|
|
|
|
@app.route('/user/carousel/delete-slide/<int:id>', methods=["POST", "DELETE"])
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@admin_required
|
|
def delete_slide(id):
|
|
try:
|
|
q_img_file = "SELECT img_name FROM carousel WHERE id = %s;"
|
|
t_img_file = (id,)
|
|
result = dbUsers.get_data(q_img_file, t_img_file)
|
|
if not result:
|
|
return jsonify({'error': 'No se encontró el registro'}), 404
|
|
|
|
data_img = result[0]
|
|
|
|
# filepath = os.path.join(os.getcwd(), 'static', 'uploads', data_img)
|
|
basedir = os.path.abspath(os.path.dirname(__file__))
|
|
filepath = os.path.join(basedir, 'static', 'uploads', data_img)
|
|
|
|
|
|
|
|
q_del_record = "DELETE FROM carousel WHERE id = %s;"
|
|
dbUsers.update_data(q_del_record, t_img_file)
|
|
|
|
if os.path.isfile(filepath):
|
|
os.remove(filepath)
|
|
else:
|
|
print(f"[!] Archivo no encontrado: {filepath}")
|
|
|
|
return jsonify({'msg': 'Slide eliminado correctamente'}), 200
|
|
except Exception as e:
|
|
print(f'Error al eliminar el slide: {e}')
|
|
return jsonify({'error': 'Error interno del servidor'}), 500
|
|
|
|
|
|
|
|
|
|
@app.route('/user/carousel/download/<path:filename>')
|
|
@jwt_required()
|
|
@validate_user_exists
|
|
@admin_required
|
|
def download_file(filename):
|
|
uploads_dir = os.path.join(app.root_path, 'static', 'uploads')
|
|
try:
|
|
return send_from_directory(uploads_dir, filename, as_attachment=True)
|
|
except FileNotFoundError:
|
|
abort(404)
|
|
|
|
# -------------------------------------------------------------
|
|
# MANEJO DE ERRORES DE JWT
|
|
|
|
@jwt.unauthorized_loader
|
|
def unauthorized_response(callback):
|
|
return jsonify({"error": "Token inválido o no proporcionado"}), 401
|
|
|
|
|
|
# Detecta si la petición viene del frontend (fetch)
|
|
def is_fetch_request():
|
|
return request.headers.get("X-Requested-With") == "XMLHttpRequest"
|
|
|
|
@jwt.expired_token_loader
|
|
def handle_expired_token(jwt_header, jwt_payload):
|
|
# if is_fetch_request():
|
|
# return jsonify({"error": "El token ha expirado"}), 401
|
|
l_flash_msj('Sesión Expirada', 'Por favor inicia sesión nuevamente', 'warning')
|
|
|
|
# flash(l_flash_msj)
|
|
return redirect(url_for('login'))
|
|
|
|
@jwt.unauthorized_loader
|
|
def handle_unauthorized_error(reason):
|
|
if is_fetch_request():
|
|
return jsonify({"error": "Token inválido o no proporcionado"}), 401
|
|
flash(f'Debes iniciar sesión para acceder a esta página: {reason}', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
@jwt.invalid_token_loader
|
|
def handle_invalid_token_error(reason):
|
|
if is_fetch_request():
|
|
return jsonify({"error": "Sesión inválida"}), 401
|
|
flash(f'Sesión inválida: {reason}', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
# -------------------------------------------------------------
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
f_mnsj = l_flash_msj('👋🏼Logout', 'Se ha cerrado tu sesión.', 'success')
|
|
flash(f_mnsj)
|
|
response = make_response(redirect(url_for('login')))
|
|
response.delete_cookie('access_token_cookie')
|
|
return response
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, host='0.0.0.0', port=8089) |