Files
laliga-denuncias/main.py

579 lines
23 KiB
Python

import os
import yaml
import random
import glob
import logging
import requests
import time
from dotenv import load_dotenv
from playwright.sync_api import sync_playwright
from faker import Faker
# Configure logging for container environment
def setup_logging():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Ensure logs are flushed immediately for container environments
logging.getLogger().handlers[0].setStream(open('/dev/stdout', 'w', buffering=1))
return logging.getLogger(__name__)
logger = setup_logging()
def wait_for_tor_connection(proxy_server):
"""Wait for Tor connection to be established using Playwright"""
logger.info("Waiting for Tor connection to be established...")
while True:
try:
# Use Playwright to check Tor status through the proxy
with sync_playwright() as p:
browser = p.chromium.launch(
headless=True,
proxy={
"server": proxy_server
}
)
context = browser.new_context()
page = context.new_page()
try:
response = page.goto('https://check.torproject.org/api/ip', timeout=10000)
if response and response.status == 200:
content = page.content()
# Extract JSON from the page content
import re
json_match = re.search(r'\{.*\}', content)
if json_match:
import json
data = json.loads(json_match.group())
if data.get("IsTor") is True:
logger.info(f"Tor connection established! IP: {data.get('IP', 'Unknown')}")
browser.close()
return
else:
logger.info(f"Not using Tor yet. Current IP: {data.get('IP', 'Unknown')}")
else:
logger.warning("Could not parse Tor check response")
else:
logger.warning(f"Tor check failed with status: {response.status if response else 'No response'}")
except Exception as e:
logger.warning(f"Error during Tor check: {e}")
finally:
browser.close()
except Exception as e:
logger.warning(f"Error checking Tor status: {e}")
logger.info("Waiting 10 seconds before next Tor check...")
time.sleep(10)
def close_cookie_consent(page):
try:
logger.info("Waiting for cookie consent dialog to appear...")
reject_button = page.locator("#onetrust-reject-all-handler")
# Wait longer for the cookie banner to appear after page load
reject_button.wait_for(state="visible", timeout=30000)
reject_button.click()
logger.info("Cookie consent dialog closed")
except Exception as e:
logger.warning(f"Cookie consent dialog not found or timed out: {e}")
# Continue execution even if cookie dialog is not found
def close_popup_modal(page):
try:
logger.info("Waiting for popup modal to appear...")
popup_close_button = page.locator("button.pum-close.popmake-close")
popup_close_button.wait_for(state="visible", timeout=15000)
popup_close_button.click()
logger.info("Popup modal closed")
except Exception as e:
logger.error(f"Could not close popup modal: {e}")
def load_form_data(yaml_file="denuncias.yml"):
with open(yaml_file, 'r', encoding='utf-8') as file:
return yaml.safe_load(file)['denuncias']
def generate_mock_data(num_submissions=3):
"""Generate realistic Spanish mock data for form submissions with high variety"""
fake = Faker('es_ES') # Spanish locale
# Much larger variety of establishment types
establishment_types = [
"Bar", "Café", "Restaurante", "Taberna", "Cervecería", "Mesón", "Tasca", "Bodega",
"Pub", "Tapería", "Gastrobar", "Brasería", "Marisquería", "Pizzería", "Hamburguesería",
"Chiringuito", "Terraza", "Club", "Discoteca", "Karaoke", "Billar", "Recreativo",
"Hotel", "Hostal", "Pensión", "Parador", "Resort", "Camping"
]
# Expanded bar names with more creativity
bar_prefixes = ["El", "La", "Los", "Las"]
bar_themes = [
"Rincón", "Esquina", "Amigos", "Victoria", "Deportivo", "Goles", "Peña", "Balón",
"Hinchada", "Campeón", "Final", "Clásico", "Estadio", "Grada", "Cancha", "Liga",
"Copa", "Trofeo", "Medalla", "Triunfo", "Gloria", "Éxito", "Pasión", "Furia",
"Atlético", "Sporting", "Racing", "United", "Central", "Real", "Imperial",
"Madrid", "Barcelona", "Valencia", "Sevilla", "Bilbao", "Atlántico", "Mediterráneo"
]
# Additional name patterns
place_names = ["del Puerto", "de la Playa", "del Centro", "de la Plaza", "del Barrio"]
descriptors = ["Dorado", "Negro", "Blanco", "Rojo", "Azul", "Verde", "Nuevo", "Viejo"]
# Expanded football events with more detail
teams = [
"Real Madrid", "FC Barcelona", "Atlético Madrid", "Sevilla FC", "Valencia CF",
"Real Betis", "Villarreal CF", "Real Sociedad", "Athletic Bilbao", "Getafe CF",
"Osasuna", "Celta de Vigo", "Rayo Vallecano", "Espanyol", "Mallorca", "Cádiz CF",
"Elche CF", "Levante UD", "Alavés", "Granada CF"
]
competitions = [
"LaLiga Santander", "Copa del Rey", "Champions League", "Europa League",
"Conference League", "Supercopa de España", "Copa de la Liga"
]
# More varied and realistic complaint descriptions
complaint_situations = [
"múltiples pantallas mostrando contenido pirata",
"señal de televisión claramente no oficial",
"retransmisión sin los logos oficiales de LaLiga",
"calidad de imagen sospechosamente baja típica de streams ilegales",
"el personal admitió no tener licencia para la emisión",
"publicidad en redes sociales promocionando la retransmisión gratuita",
"cobro de entrada específico para ver el partido",
"gran cantidad de espectadores sin consumo proporcional",
"retransmisión con comentarios en idioma extranjero",
"interrupciones constantes típicas de señales pirata"
]
complaint_contexts = [
"Durante mi visita al establecimiento pude comprobar que",
"Como cliente habitual del local, he observado que",
"En mi paso por el establecimiento noté que",
"Mientras cenaba en el restaurante observé que",
"Como vecino del local, he visto que",
"Durante el evento deportivo pude verificar que"
]
complaint_endings = [
"Creo que es importante que LaLiga investigue esta situación.",
"Espero que tomen las medidas oportunas.",
"Considero necesario que se actúe contra esta práctica ilegal.",
"Ruego investiguen este establecimiento.",
"Solicito que se verifique la legalidad de sus emisiones.",
"Agradecería que revisaran la situación de este local."
]
# Get available images
images = []
if os.path.exists("images"):
for ext in ["*.jpg", "*.jpeg", "*.png", "*.gif", "*.bmp"]:
images.extend(glob.glob(os.path.join("images", ext)))
images.extend(glob.glob(os.path.join("images", ext.upper())))
mock_submissions = []
for i in range(num_submissions):
# Generate more varied establishment names
if fake.boolean(chance_of_getting_true=40):
# Pattern: Type + Theme
est_type = fake.random_element(establishment_types)
theme = fake.random_element(bar_themes)
full_name = f"{est_type} {theme}"
elif fake.boolean(chance_of_getting_true=30):
# Pattern: Type + Prefix + Theme
est_type = fake.random_element(establishment_types)
prefix = fake.random_element(bar_prefixes)
theme = fake.random_element(bar_themes)
full_name = f"{est_type} {prefix} {theme}"
elif fake.boolean(chance_of_getting_true=20):
# Pattern: Type + Theme + Place
est_type = fake.random_element(establishment_types)
theme = fake.random_element(bar_themes)
place = fake.random_element(place_names)
full_name = f"{est_type} {theme} {place}"
else:
# Pattern: Type + Descriptor + Theme
est_type = fake.random_element(establishment_types)
descriptor = fake.random_element(descriptors)
theme = fake.random_element(bar_themes)
full_name = f"{est_type} {descriptor} {theme}"
# Generate more varied addresses
street_types = ["Calle", "Avenida", "Plaza", "Paseo", "Ronda", "Travesía", "Callejón"]
street_type = fake.random_element(street_types)
# Sometimes use real Spanish street patterns
if fake.boolean(chance_of_getting_true=30):
famous_streets = [
"Gran Vía", "Puerta del Sol", "Las Ramblas", "Paseo de Gracia",
"Calle Mayor", "Plaza Mayor", "Avenida de la Constitución"
]
street_name = fake.random_element(famous_streets)
street_type = "" # These already include the type
else:
street_name = fake.street_name()
street_number = fake.building_number()
if fake.boolean(chance_of_getting_true=20):
# Add apartment/floor info sometimes
floor_info = f", {fake.random_int(1, 5)}º"
street_number += floor_info
# Generate varied football events
if fake.boolean(chance_of_getting_true=40):
# Specific match
team1 = fake.random_element(teams)
team2 = fake.random_element([t for t in teams if t != team1])
competition = fake.random_element(competitions)
evento = f"{competition} - {team1} vs {team2}"
else:
# General competition round
competition = fake.random_element(competitions)
if "LaLiga" in competition:
jornada = fake.random_int(1, 38)
evento = f"{competition} - Jornada {jornada}"
else:
rounds = ["Fase de Grupos", "Dieciseisavos", "Octavos", "Cuartos", "Semifinal", "Final"]
round_name = fake.random_element(rounds)
evento = f"{competition} - {round_name}"
# Generate more varied complaint descriptions
context = fake.random_element(complaint_contexts)
situation = fake.random_element(complaint_situations)
ending = fake.random_element(complaint_endings)
descripcion = f"{context} {situation}. {ending}"
# More realistic email generation
email_chance = fake.random_int(1, 100)
if email_chance <= 50:
email_contacto = fake.free_email()
elif email_chance <= 70:
email_contacto = fake.company_email()
else:
email_contacto = ""
# More varied image selection
imagen = ""
if images:
image_chance = fake.random_int(1, 100)
if image_chance <= 40:
imagen = os.path.basename(fake.random_element(images))
submission = {
"nombre_local": full_name,
"direccion": f"{street_type} {street_name} {street_number}".strip(),
"codigo_postal": fake.postcode(),
"municipio": fake.city(),
"evento_deportivo": evento,
"descripcion": descripcion,
"email_contacto": email_contacto,
"imagen": imagen
}
mock_submissions.append(submission)
logger.info(f"Generated {num_submissions} mock submissions")
return mock_submissions
def get_image_path(specified_image=None, images_folder="images"):
# If a specific image is requested, try to use it
if specified_image and specified_image.strip():
specified_path = os.path.join(images_folder, specified_image)
if os.path.exists(specified_path):
logger.info(f"Using specified image: {specified_path}")
return os.path.abspath(specified_path)
else:
logger.warning(f"Specified image '{specified_image}' not found, no image will be uploaded")
return None
# If no image specified, don't upload any image
logger.info("No image specified, skipping image upload")
return None
def fill_form_field(page, field_identifiers, value, field_name):
"""Try multiple strategies to fill a form field"""
if not value:
return
for identifier in field_identifiers:
try:
# Try to locate the field
field = page.locator(identifier)
if field.count() > 0:
field.fill(str(value))
logger.info(f"Successfully filled {field_name} using {identifier}")
return
except Exception:
continue
logger.warning(f"Could not find field for {field_name}")
def upload_image_field(page, file_identifiers, image_path):
"""Try multiple strategies to upload an image"""
if not image_path:
return
for identifier in file_identifiers:
try:
field = page.locator(identifier)
if field.count() > 0:
field.set_input_files(image_path)
logger.info(f"Successfully uploaded image using {identifier}")
return
except Exception:
continue
logger.warning("Could not find file upload field")
def fill_form(page, form_data):
try:
logger.info("Filling form with data...")
# Define multiple possible selectors for each field (in order of preference)
field_mappings = {
"nombre_local": [
"#form-field-local__name", # New format
"#form-field-local_name", # Old format
"input[placeholder*='Nombre del local']",
"input[name*='local'][name*='name']"
],
"direccion": [
"#form-field-local__street",
"#form-field-local_street",
"input[placeholder*='Dirección del local']",
"input[name*='local'][name*='street']"
],
"codigo_postal": [
"#form-field-local__postal",
"#form-field-local_postal",
"input[placeholder*='Código Postal']",
"input[name*='local'][name*='postal']"
],
"municipio": [
"#form-field-local__localy",
"#form-field-local_localy",
"input[placeholder*='Municipio']",
"input[name*='local'][name*='local']"
],
"evento_deportivo": [
"#form-field-field__evento",
"#form-field-field_evento",
"textarea[placeholder*='evento deportivo']",
"textarea[name*='evento']"
],
"descripcion": [
"#form-field-field__message",
"#form-field-field_message",
"textarea[placeholder*='situación a denunciar']",
"textarea[name*='message']"
],
"email_contacto": [
"#form-field-field_email",
"input[type='email']",
"input[placeholder*='Email']"
]
}
# Fill each field using fallback strategies
for field_key, selectors in field_mappings.items():
if field_key in form_data:
fill_form_field(page, selectors, form_data[field_key], field_key)
# Handle image upload with fallback strategies
specified_image = form_data.get("imagen")
image_path = get_image_path(specified_image)
if image_path:
logger.info("Uploading image...")
file_selectors = [
"#form-field-local_files",
"input[type='file']",
"input[name*='local_files']",
".elementor-upload-field"
]
upload_image_field(page, file_selectors, image_path)
logger.info("Image upload attempted")
logger.info("Form filled successfully")
except Exception as e:
logger.error(f"Error filling form: {e}")
def submit_form(page):
try:
logger.info("Submitting form...")
page.click("#form-denuncias-btn")
# Wait for success message
success_heading = page.locator("h2:has-text('FORMULARIO ENVIADO CORRECTAMENTE')")
success_heading.wait_for(state="visible", timeout=10000)
logger.info("Form submitted successfully")
return True
except Exception as e:
logger.error(f"Error submitting form: {e}")
return False
def return_to_form(page):
try:
logger.info("Returning to form...")
# Try clicking the return button first
try:
return_button = page.locator("a:has-text('Volver')")
return_button.wait_for(state="visible", timeout=5000)
return_button.click()
logger.info("Clicked return button")
except Exception:
# Fallback: navigate directly to the form URL
logger.info("Return button not found, navigating directly to form URL")
page.goto("https://laligabares.com/denuncias/")
logger.info("Returned to form page")
except Exception as e:
logger.error(f"Error returning to form: {e}")
def main():
load_dotenv()
url = os.getenv("TARGET_URL", "https://laligabares.com/denuncias/")
if not url:
logger.error("TARGET_URL environment variable is not set")
return
headless = os.getenv("HEADLESS", "true").lower() == "true"
use_mock_data = os.getenv("USE_MOCK_DATA", "false").lower() == "true"
use_tor = os.getenv("USE_TOR", "false").lower() == "true"
proxy_address = os.getenv("TOR_PROXY", "socks5://127.0.0.1:9050")
logger.info(f"Opening browser to: {url}")
logger.info(f"Headless mode: {headless}")
logger.info(f"Use mock data: {use_mock_data}")
logger.info(f"Use Tor: {use_tor}")
if use_tor:
logger.info(f"Tor proxy: {proxy_address}")
# Wait for Tor connection if enabled
if use_tor:
wait_for_tor_connection(proxy_address)
with sync_playwright() as p:
# Configure browser with or without proxy
browser_options = {"headless": headless}
if use_tor:
browser_options["proxy"] = {"server": proxy_address}
browser = p.chromium.launch(**browser_options)
page = browser.new_page()
page.goto(url)
close_cookie_consent(page)
close_popup_modal(page)
if use_mock_data:
# Continuous mock data mode - generate and submit until stopped
submission_count = 0
failure_count = 0
max_failures = 3
logger.info("Starting continuous mock data mode - will run until stopped")
while True:
# Generate one submission at a time for variety
form_data = generate_mock_data(num_submissions=1)[0]
submission_count += 1
logger.info(f"--- Mock submission #{submission_count} ---")
logger.info(f"Submitting for: {form_data['nombre_local']}")
fill_form(page, form_data)
success = submit_form(page)
if success:
failure_count = 0 # Reset failure counter on success
return_to_form(page)
# Wait between submissions
wait_time = random.randint(3, 5) # Random delay 3-5 seconds
logger.info(f"Waiting {wait_time} seconds before next submission...")
page.wait_for_timeout(wait_time * 1000)
else:
failure_count += 1
logger.warning(f"Submission failed. Failure count: {failure_count}/{max_failures}")
if failure_count >= max_failures:
logger.error(f"Reached maximum failures ({max_failures}). Exiting application.")
break
# Wait a bit before retrying
logger.info("Waiting 5 seconds before next attempt...")
page.wait_for_timeout(5000)
else:
# YAML file mode - process all submissions once
form_data_list = load_form_data()
failure_count = 0
max_failures = 3
successful_submissions = 0
for i, form_data in enumerate(form_data_list, 1):
logger.info(f"--- Processing submission {i}/{len(form_data_list)} ---")
logger.info(f"Submitting for: {form_data['nombre_local']}")
fill_form(page, form_data)
success = submit_form(page)
if success:
failure_count = 0 # Reset failure counter on success
successful_submissions += 1
# Return to form for next submission (except on last one)
if i < len(form_data_list):
return_to_form(page)
logger.info("Waiting 2 seconds before next submission...")
page.wait_for_timeout(2000)
else:
failure_count += 1
logger.warning(f"Submission failed. Failure count: {failure_count}/{max_failures}")
if failure_count >= max_failures:
logger.error(f"Reached maximum failures ({max_failures}). Exiting application.")
break
# Return to form to retry or continue
if i < len(form_data_list):
return_to_form(page)
logger.info("Waiting 5 seconds before next attempt...")
page.wait_for_timeout(5000)
logger.info(f"Completed {successful_submissions}/{len(form_data_list)} form submissions successfully")
if not headless:
input("Press Enter to close the browser...")
else:
print("Browser opened in headless mode - closing automatically")
browser.close()
if __name__ == "__main__":
main()