from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity, get_jwt, decode_token, verify_jwt_in_request, set_access_cookies, get_csrf_token from flask_mail import Message, Mail from threading import Thread from flask import Flask, render_template, redirect, url_for, flash, current_app, request, jsonify, make_response, send_from_directory, abort from forms_py.cls_form_contact import ContactForm from forms_py.cls_form_add_usr import AddUser from forms_py.cls_form_login import LogIn from forms_py.cls_db import DBContact from forms_py.cls_db_usr import DBForma from forms_py.cls_change_pswd import ChangePwsd from forms_py.cls_form_carousel import Carousel from forms_py.functions import db_conf_obj, generar_contrasena, hash_password, v, l_flash_msj, saludo_hr, cur_date, min_read_pst, get_date_n_time, getRandomId, hex_to_rgb, rgba_to_string from forms_py.cls_recover_pswd import RecoverPswd import os from datetime import datetime, timedelta, timezone from flask_bcrypt import Bcrypt # from flask_wtf.csrf import CSRFProtect from werkzeug.utils import secure_filename import re import json from bs4 import BeautifulSoup from functools import wraps from flask_caching import Cache app = Flask(__name__) cache = Cache(app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': '/tmp/flask-formha'}) bcrypt = Bcrypt(app) # csrf = CSRFProtect(app) url_login = os.getenv("login_url") email_sender = os.getenv("email_sender") email_pswd = os.getenv("pswd_formha") # lst_email_to = ["davidix1991@gmail.com", "davicho1991@live.com"] # \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\ # INICIO CONFIGURACIÓN JWT app.config["JWT_SECRET_KEY"] = os.getenv("jwt_secret_key") # Usa una clave segura en producción -> MOVER A VARIABLE DE ENTORNO app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1.5) # timedelta(hours=1) Token expira en 1 hora | timedelta(minutes=1) ` timedelta(seconds=60)` # EN LOCALHOST FALSE, EN PRODUCCIÓN TRUE app.config['JWT_COOKIE_SECURE'] = False # True en producción con HTTPS app.config['JWT_COOKIE_CSRF_PROTECT'] = False # True: Solo para producción, requiere HTTPS app.config['JWT_TOKEN_LOCATION'] = ['cookies'] # Ubicación donde buscar el token app.config['JWT_ACCESS_COOKIE_NAME'] = 'access_token_cookie' # Asegura que use el mismo nombre # app.config['JWT_ACCESS_CSRF_COOKIE_NAME'] = 'csrf_access_token' app.config['JWT_COOKIE_SAMESITE'] = 'Lax' # app.config['SESSION_PERMANENT'] = False # app.config['SESSION_TYPE'] = 'filesystem' # Asegura que la sesión no se guarde en cookies jwt = JWTManager(app) # FINAL CONFIGURACIÓN JWT # ///////////////////////////////////////////////////////////////////////////////////////// # \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\ # INICIO FLASK EMAIL app.config['MAIL_SERVER'] = 'smtp.gmail.com' app.config['MAIL_PORT'] = 465 app.config['MAIL_USE_SSL'] = True app.config['MAIL_USERNAME'] = email_sender # email en variable de entorno app.config['MAIL_PASSWORD'] = email_pswd # contraseña en variable de entorno app.config['SECRET_KEY'] = os.getenv("email_secret_key") mail = Mail(app) def send_async_email(app, msg): with app.app_context(): mail.send(msg) # FINAL FLASK EMAIL # ///////////////////////////////////////////////////////////////////////////////////////// # \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\ # CONFIGURACIÓN PARA GUARDAR IMÁGENES EN EL BACKEND # Configuración para guardar imágenes # UPLOAD_FOLDER = 'uploads/' # Asegúrate de crear esta carpeta # ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'} # app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER # app.add_url_rule('/uploads/', endpoint='uploaded_file', view_func=send_from_directory(UPLOAD_FOLDER)) # ///////////////////////////////////////////////////////////////////////////////////////// def cur_timestamp(): return int(datetime.now(timezone.utc).timestamp()) jsonDbContact = db_conf_obj("forma_db") dbContact = DBContact(jsonDbContact) dbUsers = DBForma(jsonDbContact) def lst_email_to() -> list: data = dbUsers.get_all_data('SELECT email FROM users WHERE is_contact_noti = true;', ()) respuesta = [ele[0] for ele in data] return respuesta # decorador para rutas protegidas en caso de que borres al vuelo a un usuario. def validate_user_exists(f): @wraps(f) def decorated_function(*args, **kwargs): user_id = get_jwt_identity() q = "SELECT id FROM users WHERE id = %s;" t = (user_id,) exists = dbUsers.get_data(q, t) is not None if not exists: f_mnsj = l_flash_msj('Error', 'El usuario no existe en la base de datos.', 'error') flash(f_mnsj) return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function # ################################################################## # # /$$$$$$ /$$$$$$$$ /$$ /$$ /$$$$$$$$ /$$$$$$$ /$$$$$$ /$$ # /$$__ $$| $$_____/| $$$ | $$| $$_____/| $$__ $$ /$$__ $$| $$ # | $$ \__/| $$ | $$$$| $$| $$ | $$ \ $$| $$ \ $$| $$ # | $$ /$$$$| $$$$$ | $$ $$ $$| $$$$$ | $$$$$$$/| $$$$$$$$| $$ # | $$|_ $$| $$__/ | $$ $$$$| $$__/ | $$__ $$| $$__ $$| $$ # | $$ \ $$| $$ | $$\ $$$| $$ | $$ \ $$| $$ | $$| $$ # | $$$$$$/| $$$$$$$$| $$ \ $$| $$$$$$$$| $$ | $$| $$ | $$| $$$$$$$$ # \______/ |________/|__/ \__/|________/|__/ |__/|__/ |__/|________/ # # Font Name: Big Money-ne | https://patorjk.com/software/taag/#p=testall&f=Graffiti&t=USER # ################################################################## @app.errorhandler(404) def page_not_found(e): # puedes usar una plantilla HTML personalizada si quieres # return render_template('404.html'), 404 mnsj = l_flash_msj("Error Sección", "La sección a la que quieres entrar no existe", "error") flash(mnsj) return redirect(url_for('home')) @app.route('/') # @cache.cached(timeout=3600) def home(): q = """ SELECT id, img_name, bg_color, txt_color, txt, TO_CHAR(created_at, 'DD/MM/YYYY HH24:MI') as fecha_creada, url, is_new_tab FROM carousel ORDER BY id DESC; """ data = dbUsers.get_all_data(q, ()) return render_template(v['home'], active_page='home', data=data) @app.route('/about-us') @cache.cached(timeout=43200) def about_us(): return render_template(v['about-us'], active_page='about_us') @app.route('/solutions') @cache.cached(timeout=43200) def solutions(): return render_template(v['solutions'], active_page='solutions') @app.route('/methodology') @cache.cached(timeout=43200) def methodology(): return render_template(v['methodology'], active_page='methodology') @app.route('/blog') def blog(): return render_template( v['blog']['all_posts'], active_page='blog' ) @app.route('/blog/api/posts') def api_posts(): q = r""" SELECT p.id, u.nombre || ' ' || u.apellido AS autor, TO_CHAR(p.created_at AT TIME ZONE 'America/Mexico_City', 'YYYY-MM-DD HH24:MI') AS author_creation, TO_CHAR(p.updated_at AT TIME ZONE 'America/Mexico_City', 'YYYY-MM-DD HH24:MI') AS author_updated, p.title, SUBSTRING(p.body_no_img FROM 1 FOR 180) AS preview, (SELECT value FROM jsonb_array_elements_text(p.lista_imagenes) LIMIT 1) AS first_img, ROUND(length(regexp_replace(p.body_no_img, '\s+', ' ', 'g')) / 5.0 / 375, 1) AS read_time, GREATEST(1, CEIL(length(regexp_replace(p.body_no_img, '\s+', ' ', 'g')) / 5.0 / 375)) AS read_time_min FROM posts p JOIN users u ON p.id_usr = u.id ORDER BY p.created_at DESC; """ data = dbUsers.get_all_data_dict(q) return jsonify(data) @app.route('/blog/') @app.route('/blog//src/') # @cache.cached(timeout=43200) def blog_post(post_id, source_name=None): # source_name = source_name or 'direct' # source_name = source_name.lower() # if source_name not in ['facebook', 'linkedin', 'x', 'li', 'fb', 'tw', 'direct']: # source_name = 'None' if source_name != None: q_vf = "INSERT INTO visited_from (post_id, source_name) VALUES (%s, %s);" t_vf = (post_id, source_name) dbUsers.update_data(q_vf, t_vf) q = fr""" SELECT u.nombre, u.apellido, TO_CHAR(p.created_at, 'DD/MM/YYYY') AS fecha_creada, TO_CHAR(p.updated_at, 'DD/MM/YYYY') AS fecha_updated, GREATEST(1, CEIL(length(regexp_replace(p.body_no_img, '\s+', ' ', 'g')) / 5.0 / 375)) AS read_time_min, p.title, p.body, COUNT(pv.viewed) AS total_views FROM posts p INNER JOIN users u ON p.id_usr = u.id LEFT JOIN posts_visited pv ON pv.id_post = p.id WHERE p.id = %s GROUP BY u.nombre, u.apellido, p.created_at, p.updated_at, p.title, p.body, p.body_no_img; """ t = (post_id,) data = dbUsers.get_data(q, t) if data is None: # Si no se encuentra el post, redirigir a la página de error 404 # return render_template('404.html'), 404 msg = l_flash_msj('Error Publicación','La publicación que intentas ver fue eliminada o no existe. 😅', 'warning') flash(msg) return redirect(url_for('blog')) return render_template(v['blog']['post'], data=data) @app.route('/blog/api/count-post-viewed') def count_posts_viewed(): q = "SELECT id_post, COUNT(id_post) AS count FROM posts_visited GROUP BY id_post ORDER BY id_post;" data = dbUsers.get_all_data_dict(q) return jsonify(data) @app.route('/blog/get-data', methods=['POST']) def get_data(): data = request.get_json() if not data: return jsonify({"success": False, "message": "No data received"}), 400 # Ejemplo: extraer y mostrar los campos esperados post_id = data.get("post_id") # actualizar la base de datos q_update = "INSERT INTO posts_visited (id_post, viewed) VALUES (%s, %s);" d_tuple = (post_id, cur_date()) dbUsers.update_data(q_update, d_tuple) return jsonify({"success": True}), 200 @app.route("/contact", methods=['GET', 'POST']) def contact(): form = ContactForm() # print(cur_date()) # get_date_n_time if form.validate_on_submit(): # Procesar datos del formulario c_date = cur_date() nombre = form.nombre.data.strip() apellido = form.apellido.data.strip() email = form.email.data.strip() estado = form.estado.data.strip() num_tel = form.num_tel.data.strip() size_co = form.size_co.data.strip() rol_contacto = form.rol_contacto.data.strip() industry_type = form.industry_type.data.strip() tipo_req = form.tipo_req.data.strip() # validar si ya se recibio info de ese email q_val_mail = "SELECT COUNT(email) FROM contact WHERE email = %s;" q_val_tel = "SELECT COUNT(num_tel) FROM contact WHERE num_tel = %s;" res_mail = dbUsers.get_all_data(q_val_mail, (email,)) is_dup_mail = res_mail[0][0] if res_mail else 0 res_phone = dbUsers.get_all_data(q_val_tel, (num_tel,)) is_dup_phone = res_phone[0][0] if res_phone else 0 f_mnsj = None if is_dup_mail > 0 or is_dup_phone > 0: f_mnsj = l_flash_msj("Información Precargada", "Ya contamos con tu información, pronto nos pondremos en contacto", "success") else: obj_datetime = get_date_n_time(c_date) hora = obj_datetime['hour'] fecha = obj_datetime['date'] data = ( c_date, nombre, apellido, email, estado, num_tel, size_co, rol_contacto, industry_type, tipo_req ) # Guardar datos en la base de datos dbContact.carga_contact(data) # Configurar y enviar email asíncrono msg = Message( "Subject, una persona busca asesoria", sender=email_sender, recipients=lst_email_to() ) url_login = "https://formha.temporal.work/login" msg.html = f"""
Escudo FORMHä

📩 Nuevo contacto recibido

  • Fecha: {fecha}
  • Hora: {hora}
  • Nombre: {form.nombre.data} {form.apellido.data}
  • Email: {form.email.data}
  • Teléfono: {form.num_tel.data}
  • Mensaje: {form.tipo_req.data}

👉 Iniciar Sesión

""" f_mnsj = l_flash_msj('Contacto', '¡Gracias por contactarnos! Te responderemos pronto. ☺️', 'success' ) # Enviar en segundo plano thr = Thread( target=send_async_email, args=(current_app._get_current_object(), msg) ) thr.start() flash(f_mnsj) return redirect(url_for('contact')) return render_template(v['contact'], form=form, active_page='contact') @app.route("/login", methods=['GET', 'POST']) def login(): form = LogIn() if form.validate_on_submit(): f_email = f"{form.email.data}".lower() f_pswd = form.password.data res_pswd_server = dbUsers.login((f_email)) if res_pswd_server is None: f_mnsj = l_flash_msj('Información', 'No se cueta con información del usuario', 'warning') flash(f_mnsj) return redirect(url_for('login')) res_pswd_server = res_pswd_server[0] if bcrypt.check_password_hash(res_pswd_server, f_pswd): id_user = dbUsers.get_id(f_email)[0] is_admin = dbUsers.get_data('SELECT is_admin FROM users WHERE id = %s;', (id_user,))[0] # Crear token JWT: access_token = create_access_token(identity=id_user) # access_token = create_access_token( identity=id_user ) access_token = create_access_token( identity=id_user, additional_claims={"is_admin": is_admin} ) # Redirigir a usr_home o a Admin depende # din_home = 'admin_home' if is_admin else 'user_home' response = make_response(redirect(url_for("user_home"))) set_access_cookies(response, access_token) return response else: f_mnsj = l_flash_msj('Credenciales', 'Verificar usuario y/o contraseña.', 'error') flash(f_mnsj) return render_template(v['login'], form=form, active_page='login') @app.route("/recover-pswd", methods=['GET', 'POST']) def recover_pswd(): form = RecoverPswd() if form.validate_on_submit(): f_email = f"{form.email.data}".lower() emailPswdReco = dbUsers.reset_pswd(f_email) if emailPswdReco is None: f_mnsj = l_flash_msj("Error", "Verificar el correo electrónico ingresado.", "error") flash(f_mnsj) return render_template(v['recover_pswd'], form=form, active_page='login') emailPswdReco = emailPswdReco[0] new_tmp_pswd = generar_contrasena() hashed_new_pswd = hash_password(new_tmp_pswd) # Configurar y enviar email asíncrono msg = Message( "Subject: Recuperación de contraseña", sender=email_sender, recipients=[emailPswdReco] # Asegúrate que es una lista ) # msg.html = render_template( 'email/recover_pswd.html', temp_password=new_tmp_pswd ) msg.html = """

Nueva contraseña temporal:

Contraseña: {}

Una vez iniciada tu sesión debes de cambiar la contraseña a una nueva que puedas recordar

""".format(new_tmp_pswd) dbUsers.update_pswd(pswd=hashed_new_pswd, email=emailPswdReco) # Enviar en segundo plano thr = Thread( target=send_async_email, args=(current_app._get_current_object(), msg) ) thr.start() f_mnsj = l_flash_msj("Éxito", "Se ha enviado una contraseña temporal a tu correo electrónico.", "success") flash(f_mnsj) return redirect(url_for('login')) # Redirige en lugar de renderizar return render_template(v['recover_pswd'], form=form, active_page='login') # ################################################################################################################################# # # $$\ $$\ $$$$$$\ $$$$$$$$\ $$$$$$$\ $$$$$$\ $$$$$$$\ $$\ $$\ $$$$$$\ $$\ $$\ # $$ | $$ |$$ __$$\ $$ _____|$$ __$$\ $$ __$$\ $$ __$$\ $$$\ $$$ |\_$$ _|$$$\ $$ | # $$ | $$ |$$ / \__|$$ | $$ | $$ | $$ / $$ |$$ | $$ |$$$$\ $$$$ | $$ | $$$$\ $$ | # $$ | $$ |\$$$$$$\ $$$$$\ $$$$$$$ | $$$$$$\ $$$$$$$$ |$$ | $$ |$$\$$\$$ $$ | $$ | $$ $$\$$ | # $$ | $$ | \____$$\ $$ __| $$ __$$< \______| $$ __$$ |$$ | $$ |$$ \$$$ $$ | $$ | $$ \$$$$ | # $$ | $$ |$$\ $$ |$$ | $$ | $$ | $$ | $$ |$$ | $$ |$$ |\$ /$$ | $$ | $$ |\$$$ | # \$$$$$$ |\$$$$$$ |$$$$$$$$\ $$ | $$ | $$ | $$ |$$$$$$$ |$$ | \_/ $$ |$$$$$$\ $$ | \$$ | # \______/ \______/ \________|\__| \__| \__| \__|\_______/ \__| \__|\______|\__| \__| # # Font Name: Big Money-ne | https://patorjk.com/software/taag/#p=testall&f=Graffiti&t=USER # # ################################################################################################################################# @app.context_processor @jwt_required(optional=True) def inject_user_role(): try: token_data = get_jwt() return {'is_admin': token_data.get('is_admin', False)} except Exception: return {'is_admin': False} def admin_required(view_func): @wraps(view_func) def wrapped_view(*args, **kwargs): token_data = get_jwt() is_admin = token_data.get('is_admin', False) if not is_admin: f_mnsj = l_flash_msj('Error', 'No tienes permisos para acceder a esta sección.', 'error') flash(f_mnsj) return redirect(url_for('user_home')) return view_func(*args, **kwargs) return wrapped_view @app.route('/user/home') @jwt_required() @validate_user_exists def user_home(): # get_jwt_identity() # todo el token token_data = get_jwt() is_admin = token_data.get('is_admin', False) # timestamp expiración de la sesión exp = token_data['exp'] # id del usuario activo: usr_id = token_data['sub'] is_pswd_reseted = dbUsers.get_data('SELECT is_pswd_reseted FROM users WHERE id = %s;', (usr_id,))[0] update_pswd = l_flash_msj('Recomendación', 'Cambia tu contraseña por una que recuerdes.', 'warning') if is_pswd_reseted else None q = "SELECT nombre, genero, lst_conn FROM users WHERE id = %s;" data_saludo = dbUsers.get_data(q, (usr_id,)) nombre, gender, lst_conn = data_saludo lst_conn = '' if lst_conn is None else get_date_n_time(lst_conn)['date'] current_date = get_date_n_time(cur_date())['date'] f_mnsj = None if lst_conn != current_date: strGreet = 'Bienvenido' if gender == 'M' else 'Bienvenida' f_mnsj = l_flash_msj(f'{saludo_hr()}', f'{strGreet} {nombre}', 'success') # dbUsers.update_data(q, d) flash(f_mnsj) flash(update_pswd) q = "UPDATE users SET lst_conn = %s WHERE id = %s;" d = (cur_date(), usr_id) dbUsers.update_data(q, d) q_contact = """ SELECT ID, TO_CHAR(FULL_DATE_TIME, 'DD/MM/YYYY') AS FECHA_FORMATEADA, NOMBRE, APELLIDO, ESTADO, SIZE_CO, ROL_CONTACTO, INDUSTRY_TYPE, STATUS FROM CONTACT WHERE STATUS <> 'Archivado' OR STATUS IS NULL ORDER BY FULL_DATE_TIME DESC; """ data_contact = dbUsers.get_all_data(q_contact, ()) return render_template(v['tmp_user']['home'], f_mnsj=f_mnsj, nombre=nombre, exp=exp, data_contact=data_contact, active_page='user_home') @app.route('/user/manage-record', methods=['POST']) @jwt_required() @validate_user_exists def manage_record(): # print(request.get_json()) # print("Cookies recibida:", request.cookies) # token = request.cookies.get('access_token_cookie') # decoded_token = decode_token(token) data = request.get_json() id = data['id'] valor = data['value'] q = 'UPDATE contact SET status = %s WHERE id = %s;' dbUsers.update_data(q, (valor, id)) return jsonify({"type": "success"}) @app.route('/user/get-contact-data', methods=['POST']) @jwt_required() @validate_user_exists def get_contact_data(): data = request.get_json() id = data['id'] q = "SELECT TO_CHAR(FULL_DATE_TIME, 'DD/MM/YYYY') as fecha, TO_CHAR(FULL_DATE_TIME, 'HH24:MI') as hora, nombre, apellido, email, estado, num_tel, size_co, rol_contacto, industry_type, tipo_req, status FROM contact WHERE id = %s;" t = (id,) dbData = dbUsers.get_data(q, t) return jsonify({"data": dbData}) @app.route('/user/download-db', methods=['GET']) @jwt_required() @validate_user_exists @admin_required def download_db(): q = """ SELECT id, TO_CHAR(FULL_DATE_TIME, 'DD/MM/YYYY') as fecha, TO_CHAR(FULL_DATE_TIME, 'HH24:MI') as hora, nombre, apellido, email, estado, num_tel, size_co, rol_contacto, industry_type, tipo_req, status FROM contact; """ dbData = dbUsers.get_all_data(q, ()) return jsonify({"data": dbData}) @app.route('/user/txt-editor') @jwt_required() @validate_user_exists @cache.cached(timeout=43200) def user_txteditor(): template_name = v['tmp_user'].get('txt_editor') return render_template(template_name, active_page='user_txteditor') @app.route('/user/save-post', methods=['POST']) @jwt_required() @validate_user_exists def save_post(): id_usr = get_jwt()['sub'] data = request.get_json() title = data['title'] body = data['body'] time = cur_date() soup = BeautifulSoup(body, 'html.parser') body_no_img = soup.get_text(separator=' ', strip=True) etiquetas_img = re.findall(r']+src=["\'](.*?)["\']', body) imagenes_json = json.dumps(etiquetas_img) if etiquetas_img else None q = None t = None if "id" in data: q = 'UPDATE posts SET updated_at = %s, title = %s, body = %s, body_no_img = %s, lista_imagenes = %s::jsonb WHERE id = %s AND id_usr = %s;' t = (time, title, body, body_no_img, imagenes_json, data['id'], id_usr) else: q = "INSERT INTO posts (id_usr, created_at, title, body, body_no_img, lista_imagenes) VALUES (%s, %s, %s, %s, %s, %s::jsonb);" t = (id_usr, time, title, body, body_no_img, imagenes_json) try: dbUsers.update_data(q, t) return jsonify({ "status": "success", "title_post": title, "redirect_url": url_for('my_posts') # o usa _external=True si se necesita URL completa # url_for('my_posts', _external=True) }) except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @app.route('/user/my-posts') @jwt_required() @validate_user_exists def my_posts(): id_usr = get_jwt()['sub'] page = int(request.args.get("page", 1)) per_page = 8 # Número de tarjetas por página # Obtener todos los posts del usuario (puedes optimizar esto con paginación SQL real después) q_all = r""" SELECT id, TO_CHAR(created_at, 'DD/MM/YYYY HH24:MI') as fecha_creada, TO_CHAR(updated_at, 'DD/MM/YYYY HH24:MI') as fecha_updated, title, LEFT(body_no_img, 180), lista_imagenes->0, GREATEST(array_length(regexp_split_to_array(TRIM(body_no_img), '\s+'), 1) / 375, 1) as n_words FROM posts WHERE id_usr = %s ORDER BY COALESCE(updated_at, created_at) DESC, created_at DESC; """ data_all = dbUsers.get_all_data(q_all, (id_usr,)) total_posts = len(data_all) total_pages = (total_posts + per_page - 1) // per_page start = (page - 1) * per_page end = start + per_page data = data_all[start:end] return render_template( v['tmp_user']['my_posts'], data=data, current_page=page, total_pages=total_pages, active_page='my_posts' ) @app.route('/user/del-post', methods=['POST']) @jwt_required() @validate_user_exists def del_post(): data = request.get_json() t = (data['id'],) q = 'DELETE FROM posts WHERE id = %s;' dbUsers.update_data(q, t) res = {'ok': True, 'message': f'El elemento se eliminó, pendiente manejar el error en el frontend', "id": t} return jsonify(res), 200 @app.route('/user/') @jwt_required() @validate_user_exists def post(post_id): # Obtener el post usr_id = get_jwt()['sub'] q = """ SELECT TO_CHAR(created_at, 'DD/MM/YYYY HH24:MI') as fecha_creada, TO_CHAR(updated_at, 'DD/MM/YYYY HH24:MI') as fecha_updated, title, body FROM posts WHERE id_usr = %s AND id = %s; """ t = (usr_id, post_id ) data = dbUsers.get_data(q, t) q_nw = r"SELECT array_length(regexp_split_to_array(TRIM(body_no_img), '\s+'), 1) FROM posts WHERE id_usr = %s AND id = %s;" n_words = dbUsers.get_data(q_nw, t)[0] time_read = min_read_pst(n_words) return render_template(v['tmp_user']['read_post'], data=data, time_read=time_read, post_id=post_id) @app.route('/user/edit-post/') @jwt_required() @validate_user_exists def edit_post(id_post): q = 'SELECT title, body FROM posts WHERE id = %s;' t = (id_post,) data = dbUsers.get_data(q, t) return render_template(v['tmp_user']['edit_post'], data=data, id_post=id_post) @app.route('/user/change-pswd', methods=['GET', 'POST']) @jwt_required() @validate_user_exists def change_pswd(): form = ChangePwsd() if form.validate_on_submit(): f_old_pswd = form.cur_pswd.data f_new_pswd = form.new_pswd.data usr_id = get_jwt()['sub'] q = "SELECT pswd FROM users WHERE id = %s;" t = (usr_id,) data = dbUsers.get_data(q, t)[0] if bcrypt.check_password_hash(data, f_old_pswd): new_hash_pswd = hash_password(f_new_pswd) q = "UPDATE users SET pswd = %s, is_pswd_reseted = false WHERE id = %s;" t = (new_hash_pswd, usr_id) dbUsers.update_data(q, t) f_mnsj = l_flash_msj('Éxito', 'La contraseña ha sido actualizada.', 'success') flash(f_mnsj) response = make_response(redirect(url_for('login'))) response.delete_cookie('access_token_cookie') return response else: f_mnsj = l_flash_msj('Error', 'La contraseña a actualizar es incorrecta.', 'error') flash(f_mnsj) return render_template(v['tmp_user']['change_pswd'], active_page='change_pswd', form=form) @app.route('/user/metrics') @jwt_required() @validate_user_exists @admin_required def metrics(): return render_template(v['tmp_user']['metrics'], active_page='metrics') @app.route('/user/metrics/data') @jwt_required() @validate_user_exists @admin_required def data_metrics(): # SELECT pv.id_post, pv.viewed, u.id, u.nombre, u.apellido # FROM posts_visited pv # INNER JOIN posts p ON p.id = pv.id_post # INNER JOIN users u ON u.id = p.id_usr; # -- SELECT pv.id_post, pv.viewed, u.nombre, u.apellido # -- FROM posts_visited pv # -- INNER JOIN posts p ON p.id = pv.id_post # -- INNER JOIN users u ON u.id = p.id_usr; q_contact = r""" SELECT CASE EXTRACT(MONTH FROM full_date_time AT TIME ZONE 'America/Mexico_City') WHEN 1 THEN 'Enero' WHEN 2 THEN 'Febrero' WHEN 3 THEN 'Marzo' WHEN 4 THEN 'Abril' WHEN 5 THEN 'Mayo' WHEN 6 THEN 'Junio' WHEN 7 THEN 'Julio' WHEN 8 THEN 'Agosto' WHEN 9 THEN 'Septiembre' WHEN 10 THEN 'Octubre' WHEN 11 THEN 'Noviembre' WHEN 12 THEN 'Diciembre' END AS mes, COUNT(*) AS cantidad_registros, ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje FROM contact GROUP BY EXTRACT(MONTH FROM full_date_time AT TIME ZONE 'America/Mexico_City') ORDER BY EXTRACT(MONTH FROM full_date_time AT TIME ZONE 'America/Mexico_City'); """ q_count_state = "SELECT estado, COUNT(estado) AS conteo_edo, ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje FROM contact GROUP BY estado ORDER BY conteo_edo DESC;" q_size_co = "SELECT size_co, COUNT(size_co) AS conteo_size FROM contact GROUP BY size_co ORDER BY conteo_size DESC;" q_rol_contact = """ SELECT rol_contacto, COUNT(*) AS conteo, ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje FROM contact GROUP BY rol_contacto ORDER BY conteo DESC; """ q_industry_type = """ SELECT industry_type, COUNT(industry_type) AS conteo, ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje FROM contact GROUP BY industry_type ORDER BY conteo DESC; """ q_group_status = """ SELECT COALESCE(status, 'Sin Estatus') AS status, COUNT(*) AS conteo, ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS porcentaje FROM contact GROUP BY COALESCE(status, 'Sin Estatus') ORDER BY conteo; """ q_posts_mas_vistos = "SELECT id_post, COUNT(id_post) AS conteo FROM posts_visited GROUP BY id_post ORDER BY conteo DESC LIMIT 10;" q_top_three_authors = """ SELECT CONCAT(u.nombre, ' ', u.apellido) AS autor, COUNT(*) AS total_posts FROM posts_visited pv INNER JOIN posts p ON p.id = pv.id_post INNER JOIN users u ON u.id = p.id_usr GROUP BY u.id, u.nombre, u.apellido ORDER BY total_posts DESC LIMIT 3; """ data_contact = { "count_monthly": dbUsers.get_all_data(q_contact, ()), "count_state": dbUsers.get_all_data(q_count_state, ()), "size_co": dbUsers.get_all_data(q_size_co, ()), "rol_contact": dbUsers.get_all_data(q_rol_contact, ()), "industry_type": dbUsers.get_all_data(q_industry_type, ()), "group_status": dbUsers.get_all_data(q_group_status, ()), "top_ten": dbUsers.get_all_data(q_posts_mas_vistos, ()), "top_three_authors": dbUsers.get_all_data(q_top_three_authors, ()), } return jsonify(data_contact) @app.route('/user/manage-profiles', methods=['GET', 'POST']) @jwt_required() @validate_user_exists @admin_required def manage_profiles(): form = AddUser() id_usr = get_jwt()['sub'] q_all_users = """ SELECT u.id, u.nombre, u.apellido, u.email, COALESCE(TO_CHAR(u.lst_conn, 'DD/MM/YYYY HH24:MI'), 'Sin conexión') AS ultima_conexion, CASE WHEN u.is_admin = true THEN 'Sí' WHEN u.is_admin = false THEN 'No' END AS admin, COALESCE(p.conteo, 0) AS conteo, CASE WHEN u.is_contact_noti = true THEN 'Sí' WHEN u.is_contact_noti = false THEN 'No' END AS isNotificated FROM users u LEFT JOIN ( SELECT id_usr, COUNT(*) AS conteo FROM posts GROUP BY id_usr ) p ON u.id = p.id_usr ORDER BY conteo DESC; """ data_all_users = dbUsers.get_all_data(q_all_users, ()) if form.validate_on_submit(): f_nombre = f'{form.nombre.data}'.title().strip() f_apellido = f'{form.apellido.data}'.title().strip() f_genero = form.genero.data f_email = f'{form.email.data}'.lower().strip() f_isAdmin = f'{form.isAdmin.data}'.upper().strip() f_isContactNoti = f'{form.isContactNoti.data}'.upper().strip() f_id = form.id.data.strip() f_mnsj = None q = None t = None subject = None html_content = None # si el f_id no es igual a '' entonces es una actualización de datos if f_id != '': q = "UPDATE users SET nombre = %s, apellido = %s, genero = %s, email = %s, is_admin = %s, is_contact_noti = %s WHERE id = %s;" t = (f_nombre, f_apellido, f_genero, f_email, f_isAdmin, f_isContactNoti, f_id) f_mnsj = l_flash_msj('Éxito', f'Usuario actualizado: {f_nombre}', 'success') subject = "Usuario actualizado" html_content = """

¡Hola!

Tu cuenta ha sido actualizada con éxito.

Nombre: {}

Apellido: {}

Género: {}

Email: {}

""".format(f_nombre, f_apellido, f_genero, f_email) # en caso de que el f_id = '' quiere decir que es un nuevo registro y debo checar que no sea un registro preexiste a partir del email else: q_isDuplicated = "SELECT email FROM users WHERE email = %s;" t_isDuplicated = (f_email,) isDuplicated = dbUsers.get_data(q_isDuplicated, t_isDuplicated) if isDuplicated is not None: f_mnsj = l_flash_msj('Error', 'El correo electrónico ya existe, NO SE SOBREESCRIBE EL USUARIO EXISTENTE.', 'error') flash(f_mnsj) return redirect(url_for('manage_profiles')) random_id = getRandomId() tmp_pswd = generar_contrasena() hashed_pswd = hash_password(tmp_pswd) q = "INSERT INTO users (id, nombre, apellido, genero, email, pswd, is_admin, is_pswd_reseted ) values (%s, %s, %s, %s, %s, %s, %s, %s);" t = (random_id, f_nombre, f_apellido, f_genero, f_email, hashed_pswd, f_isAdmin, True) f_mnsj = l_flash_msj('Éxito', f'Usuario creado: {f_nombre}', 'success') subject = "Nueva cuenta creada" html_content = """

¡Bienvenido a Forma!

Tu cuenta ha sido creada con éxito.

Nombre: {}

Apellido: {}

Género: {}

Email: {}

Contraseña temporal: {}

Una vez inicies sesión debes de cambiar la contraseña a una nueva que puedas recordar

""".format(f_nombre, f_apellido, f_genero, f_email, tmp_pswd) # Crear el mensaje de correo con todo el contenido msg = Message( subject=subject, sender=email_sender, recipients=[f_email] ) msg.html = html_content # Asignar el contenido HTML aquí # Enviar en segundo plano thr = Thread(target=send_async_email, args=(current_app._get_current_object(), msg)) thr.start() dbUsers.update_data(q, t) flash(f_mnsj) return redirect(url_for('manage_profiles')) return render_template(v['tmp_user']['manage_profiles'], form=form, data_all_users=data_all_users, active_page='manage_profiles', id_usr=id_usr) @app.route('/user/manage-profiles/delete-usr', methods=['GET', 'POST']) @jwt_required() @validate_user_exists @admin_required def delete_user(): data = request.get_json() id_usr = data['id'] q_del_usr = "DELETE FROM users WHERE id = %s;" dbUsers.update_data(q_del_usr, (id_usr,)) q_del_posts = "DELETE FROM posts WHERE id_usr = %s;" dbUsers.update_data(q_del_posts, (id_usr,)) res = {'ok': True, 'message': f'El elemento se eliminó, pendiente manejar el error en el frontend', "id": id_usr} return jsonify(res), 200 @app.route('/user/manage-profiles/get-user', methods=['POST']) # Cambiado a POST @jwt_required() @validate_user_exists @admin_required def get_user(): data = request.get_json() id_usr = data['id'] q = "SELECT id, nombre, apellido, genero, email, is_admin, is_contact_noti FROM users WHERE id = %s;" t = (id_usr,) dbData = dbUsers.get_data(q, t) return jsonify({"data": dbData}) @app.route('/user/manage-profiles/update-user', methods=['POST']) @jwt_required() @validate_user_exists @admin_required def update_user(): data = request.get_json() id_usr = data['id'] f_nombre = f'{data["nombre"]}'.title().strip() f_apellido = f'{data["apellido"]}'.title().strip() f_genero = data['genero'] f_email = f'{data["email"]}'.lower().strip() f_isAdmin = data['isAdmin'].strip().lower() q = "UPDATE users SET nombre = %s, apellido = %s, genero = %s, email = %s, is_admin = %s WHERE id = %s;" t = (f_nombre, f_apellido, f_genero, f_email, f_isAdmin, id_usr) dbUsers.update_data(q, t) res = {'ok': True, 'message': 'El elemento se actualizó correctamente', "id": "id_usr"} return jsonify(res), 200 @app.route('/user/carousel', methods=['GET', 'POST']) @jwt_required() @validate_user_exists @admin_required def carousel(): form = Carousel() mnsj_flash = None q_all_slides = """ SELECT id, img_name, bg_color, txt_color, txt, TO_CHAR(created_at, 'DD/MM/YYYY HH24:MI') as fecha_creada, url, is_new_tab FROM carousel ORDER BY id DESC; """ data = dbUsers.get_all_data(q_all_slides, ()) if request.method == 'POST' and form.validate_on_submit(): # img image_file = form.img.data filename = secure_filename(image_file.filename) prev_img = dbUsers.get_all_data("SELECT img_name FROM carousel;", ()) lst_img = [ele[0] for ele in prev_img] url = None if form.url.data == '' else form.url.data isNewTab = form.isNewTab.data # validar que el archivo no se haya cargado previamente if filename in lst_img: mnsj_flash = l_flash_msj('Error Archivo', 'La imagen ya ha sido cargada previamente.', 'error') else: image_file.save(f'./static/uploads/{filename}') bg_color = rgba_to_string((*hex_to_rgb(form.bg_color.data), 0.5)) txt_color = rgba_to_string((*hex_to_rgb(form.txt_color.data), 1)) txt = form.txt.data q = "INSERT INTO carousel (img_name, bg_color, txt_color, txt, url, is_new_tab) VALUES (%s, %s, %s, %s, %s, %s);" t = (filename, bg_color, txt_color, txt, url, isNewTab) dbUsers.update_data(q, t) mnsj_flash = l_flash_msj('Datos Slide', f'Datos agregados para imagen: {filename}', 'success') flash(mnsj_flash) return redirect(url_for('carousel')) return render_template(v['tmp_user']['carousel'], form=form, data=data, active_page='metrics') @app.route('/user/carousel/delete-slide/', methods=["POST", "DELETE"]) @jwt_required() @validate_user_exists @admin_required def delete_slide(id): try: q_img_file = "SELECT img_name FROM carousel WHERE id = %s;" t_img_file = (id,) result = dbUsers.get_data(q_img_file, t_img_file) if not result: return jsonify({'error': 'No se encontró el registro'}), 404 data_img = result[0] filepath = os.path.join(os.getcwd(), 'static', 'uploads', data_img) q_del_record = "DELETE FROM carousel WHERE id = %s;" dbUsers.update_data(q_del_record, t_img_file) if os.path.isfile(filepath): os.remove(filepath) else: print(f"[!] Archivo no encontrado: {filepath}") return jsonify({'msg': 'Slide eliminado correctamente'}), 200 except Exception as e: print(f'Error al eliminar el slide: {e}') return jsonify({'error': 'Error interno del servidor'}), 500 @app.route('/user/carousel/download/') @jwt_required() @validate_user_exists @admin_required def download_file(filename): uploads_dir = os.path.join(app.root_path, 'static', 'uploads') try: return send_from_directory(uploads_dir, filename, as_attachment=True) except FileNotFoundError: abort(404) # ------------------------------------------------------------- # MANEJO DE ERRORES DE JWT @jwt.unauthorized_loader def unauthorized_response(callback): return jsonify({"error": "Token inválido o no proporcionado"}), 401 # Detecta si la petición viene del frontend (fetch) def is_fetch_request(): return request.headers.get("X-Requested-With") == "XMLHttpRequest" @jwt.expired_token_loader def handle_expired_token(jwt_header, jwt_payload): # if is_fetch_request(): # return jsonify({"error": "El token ha expirado"}), 401 l_flash_msj('Sesión Expirada', 'Por favor inicia sesión nuevamente', 'warning') # flash(l_flash_msj) return redirect(url_for('login')) @jwt.unauthorized_loader def handle_unauthorized_error(reason): if is_fetch_request(): return jsonify({"error": "Token inválido o no proporcionado"}), 401 flash(f'Debes iniciar sesión para acceder a esta página: {reason}', 'error') return redirect(url_for('login')) @jwt.invalid_token_loader def handle_invalid_token_error(reason): if is_fetch_request(): return jsonify({"error": "Sesión inválida"}), 401 flash(f'Sesión inválida: {reason}', 'error') return redirect(url_for('login')) # ------------------------------------------------------------- @app.route("/logout") def logout(): f_mnsj = l_flash_msj('👋🏼Logout', 'Se ha cerrado tu sesión.', 'success') flash(f_mnsj) response = make_response(redirect(url_for('login'))) response.delete_cookie('access_token_cookie') return response if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=8089)