579 lines
23 KiB
Python
579 lines
23 KiB
Python
import os
|
|
import yaml
|
|
import random
|
|
import glob
|
|
import logging
|
|
import requests
|
|
import time
|
|
from dotenv import load_dotenv
|
|
from playwright.sync_api import sync_playwright
|
|
from faker import Faker
|
|
|
|
# Configure logging for container environment
|
|
def setup_logging():
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
# Ensure logs are flushed immediately for container environments
|
|
logging.getLogger().handlers[0].setStream(open('/dev/stdout', 'w', buffering=1))
|
|
|
|
return logging.getLogger(__name__)
|
|
|
|
logger = setup_logging()
|
|
|
|
|
|
def wait_for_tor_connection(proxy_server):
|
|
"""Wait for Tor connection to be established using Playwright"""
|
|
logger.info("Waiting for Tor connection to be established...")
|
|
|
|
while True:
|
|
try:
|
|
# Use Playwright to check Tor status through the proxy
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(
|
|
headless=True,
|
|
proxy={
|
|
"server": proxy_server
|
|
}
|
|
)
|
|
context = browser.new_context()
|
|
page = context.new_page()
|
|
|
|
try:
|
|
response = page.goto('https://check.torproject.org/api/ip', timeout=10000)
|
|
if response and response.status == 200:
|
|
content = page.content()
|
|
# Extract JSON from the page content
|
|
import re
|
|
json_match = re.search(r'\{.*\}', content)
|
|
if json_match:
|
|
import json
|
|
data = json.loads(json_match.group())
|
|
if data.get("IsTor") is True:
|
|
logger.info(f"Tor connection established! IP: {data.get('IP', 'Unknown')}")
|
|
browser.close()
|
|
return
|
|
else:
|
|
logger.info(f"Not using Tor yet. Current IP: {data.get('IP', 'Unknown')}")
|
|
else:
|
|
logger.warning("Could not parse Tor check response")
|
|
else:
|
|
logger.warning(f"Tor check failed with status: {response.status if response else 'No response'}")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error during Tor check: {e}")
|
|
finally:
|
|
browser.close()
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error checking Tor status: {e}")
|
|
|
|
logger.info("Waiting 10 seconds before next Tor check...")
|
|
time.sleep(10)
|
|
|
|
|
|
def close_cookie_consent(page):
|
|
try:
|
|
logger.info("Waiting for cookie consent dialog to appear...")
|
|
reject_button = page.locator("#onetrust-reject-all-handler")
|
|
|
|
# Wait longer for the cookie banner to appear after page load
|
|
reject_button.wait_for(state="visible", timeout=30000)
|
|
reject_button.click()
|
|
logger.info("Cookie consent dialog closed")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Cookie consent dialog not found or timed out: {e}")
|
|
# Continue execution even if cookie dialog is not found
|
|
|
|
|
|
def close_popup_modal(page):
|
|
try:
|
|
logger.info("Waiting for popup modal to appear...")
|
|
popup_close_button = page.locator("button.pum-close.popmake-close")
|
|
popup_close_button.wait_for(state="visible", timeout=15000)
|
|
popup_close_button.click()
|
|
logger.info("Popup modal closed")
|
|
except Exception as e:
|
|
logger.error(f"Could not close popup modal: {e}")
|
|
|
|
|
|
def load_form_data(yaml_file="denuncias.yml"):
|
|
with open(yaml_file, 'r', encoding='utf-8') as file:
|
|
return yaml.safe_load(file)['denuncias']
|
|
|
|
|
|
def generate_mock_data(num_submissions=3):
|
|
"""Generate realistic Spanish mock data for form submissions with high variety"""
|
|
fake = Faker('es_ES') # Spanish locale
|
|
|
|
# Much larger variety of establishment types
|
|
establishment_types = [
|
|
"Bar", "Café", "Restaurante", "Taberna", "Cervecería", "Mesón", "Tasca", "Bodega",
|
|
"Pub", "Tapería", "Gastrobar", "Brasería", "Marisquería", "Pizzería", "Hamburguesería",
|
|
"Chiringuito", "Terraza", "Club", "Discoteca", "Karaoke", "Billar", "Recreativo",
|
|
"Hotel", "Hostal", "Pensión", "Parador", "Resort", "Camping"
|
|
]
|
|
|
|
# Expanded bar names with more creativity
|
|
bar_prefixes = ["El", "La", "Los", "Las"]
|
|
bar_themes = [
|
|
"Rincón", "Esquina", "Amigos", "Victoria", "Deportivo", "Goles", "Peña", "Balón",
|
|
"Hinchada", "Campeón", "Final", "Clásico", "Estadio", "Grada", "Cancha", "Liga",
|
|
"Copa", "Trofeo", "Medalla", "Triunfo", "Gloria", "Éxito", "Pasión", "Furia",
|
|
"Atlético", "Sporting", "Racing", "United", "Central", "Real", "Imperial",
|
|
"Madrid", "Barcelona", "Valencia", "Sevilla", "Bilbao", "Atlántico", "Mediterráneo"
|
|
]
|
|
|
|
# Additional name patterns
|
|
place_names = ["del Puerto", "de la Playa", "del Centro", "de la Plaza", "del Barrio"]
|
|
descriptors = ["Dorado", "Negro", "Blanco", "Rojo", "Azul", "Verde", "Nuevo", "Viejo"]
|
|
|
|
# Expanded football events with more detail
|
|
teams = [
|
|
"Real Madrid", "FC Barcelona", "Atlético Madrid", "Sevilla FC", "Valencia CF",
|
|
"Real Betis", "Villarreal CF", "Real Sociedad", "Athletic Bilbao", "Getafe CF",
|
|
"Osasuna", "Celta de Vigo", "Rayo Vallecano", "Espanyol", "Mallorca", "Cádiz CF",
|
|
"Elche CF", "Levante UD", "Alavés", "Granada CF"
|
|
]
|
|
|
|
competitions = [
|
|
"LaLiga Santander", "Copa del Rey", "Champions League", "Europa League",
|
|
"Conference League", "Supercopa de España", "Copa de la Liga"
|
|
]
|
|
|
|
# More varied and realistic complaint descriptions
|
|
complaint_situations = [
|
|
"múltiples pantallas mostrando contenido pirata",
|
|
"señal de televisión claramente no oficial",
|
|
"retransmisión sin los logos oficiales de LaLiga",
|
|
"calidad de imagen sospechosamente baja típica de streams ilegales",
|
|
"el personal admitió no tener licencia para la emisión",
|
|
"publicidad en redes sociales promocionando la retransmisión gratuita",
|
|
"cobro de entrada específico para ver el partido",
|
|
"gran cantidad de espectadores sin consumo proporcional",
|
|
"retransmisión con comentarios en idioma extranjero",
|
|
"interrupciones constantes típicas de señales pirata"
|
|
]
|
|
|
|
complaint_contexts = [
|
|
"Durante mi visita al establecimiento pude comprobar que",
|
|
"Como cliente habitual del local, he observado que",
|
|
"En mi paso por el establecimiento noté que",
|
|
"Mientras cenaba en el restaurante observé que",
|
|
"Como vecino del local, he visto que",
|
|
"Durante el evento deportivo pude verificar que"
|
|
]
|
|
|
|
complaint_endings = [
|
|
"Creo que es importante que LaLiga investigue esta situación.",
|
|
"Espero que tomen las medidas oportunas.",
|
|
"Considero necesario que se actúe contra esta práctica ilegal.",
|
|
"Ruego investiguen este establecimiento.",
|
|
"Solicito que se verifique la legalidad de sus emisiones.",
|
|
"Agradecería que revisaran la situación de este local."
|
|
]
|
|
|
|
# Get available images
|
|
images = []
|
|
if os.path.exists("images"):
|
|
for ext in ["*.jpg", "*.jpeg", "*.png", "*.gif", "*.bmp"]:
|
|
images.extend(glob.glob(os.path.join("images", ext)))
|
|
images.extend(glob.glob(os.path.join("images", ext.upper())))
|
|
|
|
mock_submissions = []
|
|
|
|
for i in range(num_submissions):
|
|
# Generate more varied establishment names
|
|
if fake.boolean(chance_of_getting_true=40):
|
|
# Pattern: Type + Theme
|
|
est_type = fake.random_element(establishment_types)
|
|
theme = fake.random_element(bar_themes)
|
|
full_name = f"{est_type} {theme}"
|
|
elif fake.boolean(chance_of_getting_true=30):
|
|
# Pattern: Type + Prefix + Theme
|
|
est_type = fake.random_element(establishment_types)
|
|
prefix = fake.random_element(bar_prefixes)
|
|
theme = fake.random_element(bar_themes)
|
|
full_name = f"{est_type} {prefix} {theme}"
|
|
elif fake.boolean(chance_of_getting_true=20):
|
|
# Pattern: Type + Theme + Place
|
|
est_type = fake.random_element(establishment_types)
|
|
theme = fake.random_element(bar_themes)
|
|
place = fake.random_element(place_names)
|
|
full_name = f"{est_type} {theme} {place}"
|
|
else:
|
|
# Pattern: Type + Descriptor + Theme
|
|
est_type = fake.random_element(establishment_types)
|
|
descriptor = fake.random_element(descriptors)
|
|
theme = fake.random_element(bar_themes)
|
|
full_name = f"{est_type} {descriptor} {theme}"
|
|
|
|
# Generate more varied addresses
|
|
street_types = ["Calle", "Avenida", "Plaza", "Paseo", "Ronda", "Travesía", "Callejón"]
|
|
street_type = fake.random_element(street_types)
|
|
|
|
# Sometimes use real Spanish street patterns
|
|
if fake.boolean(chance_of_getting_true=30):
|
|
famous_streets = [
|
|
"Gran Vía", "Puerta del Sol", "Las Ramblas", "Paseo de Gracia",
|
|
"Calle Mayor", "Plaza Mayor", "Avenida de la Constitución"
|
|
]
|
|
street_name = fake.random_element(famous_streets)
|
|
street_type = "" # These already include the type
|
|
else:
|
|
street_name = fake.street_name()
|
|
|
|
street_number = fake.building_number()
|
|
if fake.boolean(chance_of_getting_true=20):
|
|
# Add apartment/floor info sometimes
|
|
floor_info = f", {fake.random_int(1, 5)}º"
|
|
street_number += floor_info
|
|
|
|
# Generate varied football events
|
|
if fake.boolean(chance_of_getting_true=40):
|
|
# Specific match
|
|
team1 = fake.random_element(teams)
|
|
team2 = fake.random_element([t for t in teams if t != team1])
|
|
competition = fake.random_element(competitions)
|
|
evento = f"{competition} - {team1} vs {team2}"
|
|
else:
|
|
# General competition round
|
|
competition = fake.random_element(competitions)
|
|
if "LaLiga" in competition:
|
|
jornada = fake.random_int(1, 38)
|
|
evento = f"{competition} - Jornada {jornada}"
|
|
else:
|
|
rounds = ["Fase de Grupos", "Dieciseisavos", "Octavos", "Cuartos", "Semifinal", "Final"]
|
|
round_name = fake.random_element(rounds)
|
|
evento = f"{competition} - {round_name}"
|
|
|
|
# Generate more varied complaint descriptions
|
|
context = fake.random_element(complaint_contexts)
|
|
situation = fake.random_element(complaint_situations)
|
|
ending = fake.random_element(complaint_endings)
|
|
descripcion = f"{context} {situation}. {ending}"
|
|
|
|
# More realistic email generation
|
|
email_chance = fake.random_int(1, 100)
|
|
if email_chance <= 50:
|
|
email_contacto = fake.free_email()
|
|
elif email_chance <= 70:
|
|
email_contacto = fake.company_email()
|
|
else:
|
|
email_contacto = ""
|
|
|
|
# More varied image selection
|
|
imagen = ""
|
|
if images:
|
|
image_chance = fake.random_int(1, 100)
|
|
if image_chance <= 40:
|
|
imagen = os.path.basename(fake.random_element(images))
|
|
|
|
submission = {
|
|
"nombre_local": full_name,
|
|
"direccion": f"{street_type} {street_name} {street_number}".strip(),
|
|
"codigo_postal": fake.postcode(),
|
|
"municipio": fake.city(),
|
|
"evento_deportivo": evento,
|
|
"descripcion": descripcion,
|
|
"email_contacto": email_contacto,
|
|
"imagen": imagen
|
|
}
|
|
|
|
mock_submissions.append(submission)
|
|
|
|
logger.info(f"Generated {num_submissions} mock submissions")
|
|
return mock_submissions
|
|
|
|
|
|
def get_image_path(specified_image=None, images_folder="images"):
|
|
# If a specific image is requested, try to use it
|
|
if specified_image and specified_image.strip():
|
|
specified_path = os.path.join(images_folder, specified_image)
|
|
if os.path.exists(specified_path):
|
|
logger.info(f"Using specified image: {specified_path}")
|
|
return os.path.abspath(specified_path)
|
|
else:
|
|
logger.warning(f"Specified image '{specified_image}' not found, no image will be uploaded")
|
|
return None
|
|
|
|
# If no image specified, don't upload any image
|
|
logger.info("No image specified, skipping image upload")
|
|
return None
|
|
|
|
|
|
def fill_form_field(page, field_identifiers, value, field_name):
|
|
"""Try multiple strategies to fill a form field"""
|
|
if not value:
|
|
return
|
|
|
|
for identifier in field_identifiers:
|
|
try:
|
|
# Try to locate the field
|
|
field = page.locator(identifier)
|
|
if field.count() > 0:
|
|
field.fill(str(value))
|
|
logger.info(f"Successfully filled {field_name} using {identifier}")
|
|
return
|
|
except Exception:
|
|
continue
|
|
|
|
logger.warning(f"Could not find field for {field_name}")
|
|
|
|
|
|
def upload_image_field(page, file_identifiers, image_path):
|
|
"""Try multiple strategies to upload an image"""
|
|
if not image_path:
|
|
return
|
|
|
|
for identifier in file_identifiers:
|
|
try:
|
|
field = page.locator(identifier)
|
|
if field.count() > 0:
|
|
field.set_input_files(image_path)
|
|
logger.info(f"Successfully uploaded image using {identifier}")
|
|
return
|
|
except Exception:
|
|
continue
|
|
|
|
logger.warning("Could not find file upload field")
|
|
|
|
|
|
def fill_form(page, form_data):
|
|
try:
|
|
logger.info("Filling form with data...")
|
|
|
|
# Define multiple possible selectors for each field (in order of preference)
|
|
field_mappings = {
|
|
"nombre_local": [
|
|
"#form-field-local__name", # New format
|
|
"#form-field-local_name", # Old format
|
|
"input[placeholder*='Nombre del local']",
|
|
"input[name*='local'][name*='name']"
|
|
],
|
|
"direccion": [
|
|
"#form-field-local__street",
|
|
"#form-field-local_street",
|
|
"input[placeholder*='Dirección del local']",
|
|
"input[name*='local'][name*='street']"
|
|
],
|
|
"codigo_postal": [
|
|
"#form-field-local__postal",
|
|
"#form-field-local_postal",
|
|
"input[placeholder*='Código Postal']",
|
|
"input[name*='local'][name*='postal']"
|
|
],
|
|
"municipio": [
|
|
"#form-field-local__localy",
|
|
"#form-field-local_localy",
|
|
"input[placeholder*='Municipio']",
|
|
"input[name*='local'][name*='local']"
|
|
],
|
|
"evento_deportivo": [
|
|
"#form-field-field__evento",
|
|
"#form-field-field_evento",
|
|
"textarea[placeholder*='evento deportivo']",
|
|
"textarea[name*='evento']"
|
|
],
|
|
"descripcion": [
|
|
"#form-field-field__message",
|
|
"#form-field-field_message",
|
|
"textarea[placeholder*='situación a denunciar']",
|
|
"textarea[name*='message']"
|
|
],
|
|
"email_contacto": [
|
|
"#form-field-field_email",
|
|
"input[type='email']",
|
|
"input[placeholder*='Email']"
|
|
]
|
|
}
|
|
|
|
# Fill each field using fallback strategies
|
|
for field_key, selectors in field_mappings.items():
|
|
if field_key in form_data:
|
|
fill_form_field(page, selectors, form_data[field_key], field_key)
|
|
|
|
# Handle image upload with fallback strategies
|
|
specified_image = form_data.get("imagen")
|
|
image_path = get_image_path(specified_image)
|
|
if image_path:
|
|
logger.info("Uploading image...")
|
|
file_selectors = [
|
|
"#form-field-local_files",
|
|
"input[type='file']",
|
|
"input[name*='local_files']",
|
|
".elementor-upload-field"
|
|
]
|
|
upload_image_field(page, file_selectors, image_path)
|
|
logger.info("Image upload attempted")
|
|
|
|
logger.info("Form filled successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error filling form: {e}")
|
|
|
|
|
|
def submit_form(page):
|
|
try:
|
|
logger.info("Submitting form...")
|
|
page.click("#form-denuncias-btn")
|
|
|
|
# Wait for success message
|
|
success_heading = page.locator("h2:has-text('FORMULARIO ENVIADO CORRECTAMENTE')")
|
|
success_heading.wait_for(state="visible", timeout=10000)
|
|
logger.info("Form submitted successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error submitting form: {e}")
|
|
return False
|
|
|
|
|
|
def return_to_form(page):
|
|
try:
|
|
logger.info("Returning to form...")
|
|
|
|
# Try clicking the return button first
|
|
try:
|
|
return_button = page.locator("a:has-text('Volver')")
|
|
return_button.wait_for(state="visible", timeout=5000)
|
|
return_button.click()
|
|
logger.info("Clicked return button")
|
|
except Exception:
|
|
# Fallback: navigate directly to the form URL
|
|
logger.info("Return button not found, navigating directly to form URL")
|
|
page.goto("https://laligabares.com/denuncias/")
|
|
|
|
logger.info("Returned to form page")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error returning to form: {e}")
|
|
|
|
|
|
def main():
|
|
load_dotenv()
|
|
|
|
url = os.getenv("TARGET_URL", "https://laligabares.com/denuncias/")
|
|
if not url:
|
|
logger.error("TARGET_URL environment variable is not set")
|
|
return
|
|
|
|
headless = os.getenv("HEADLESS", "true").lower() == "true"
|
|
use_mock_data = os.getenv("USE_MOCK_DATA", "false").lower() == "true"
|
|
use_tor = os.getenv("USE_TOR", "false").lower() == "true"
|
|
proxy_address = os.getenv("TOR_PROXY", "socks5://127.0.0.1:9050")
|
|
|
|
logger.info(f"Opening browser to: {url}")
|
|
logger.info(f"Headless mode: {headless}")
|
|
logger.info(f"Use mock data: {use_mock_data}")
|
|
logger.info(f"Use Tor: {use_tor}")
|
|
if use_tor:
|
|
logger.info(f"Tor proxy: {proxy_address}")
|
|
|
|
# Wait for Tor connection if enabled
|
|
if use_tor:
|
|
wait_for_tor_connection(proxy_address)
|
|
|
|
with sync_playwright() as p:
|
|
# Configure browser with or without proxy
|
|
browser_options = {"headless": headless}
|
|
if use_tor:
|
|
browser_options["proxy"] = {"server": proxy_address}
|
|
|
|
browser = p.chromium.launch(**browser_options)
|
|
page = browser.new_page()
|
|
page.goto(url)
|
|
|
|
close_cookie_consent(page)
|
|
close_popup_modal(page)
|
|
|
|
if use_mock_data:
|
|
# Continuous mock data mode - generate and submit until stopped
|
|
submission_count = 0
|
|
failure_count = 0
|
|
max_failures = 3
|
|
logger.info("Starting continuous mock data mode - will run until stopped")
|
|
|
|
while True:
|
|
# Generate one submission at a time for variety
|
|
form_data = generate_mock_data(num_submissions=1)[0]
|
|
submission_count += 1
|
|
|
|
logger.info(f"--- Mock submission #{submission_count} ---")
|
|
logger.info(f"Submitting for: {form_data['nombre_local']}")
|
|
|
|
fill_form(page, form_data)
|
|
success = submit_form(page)
|
|
|
|
if success:
|
|
failure_count = 0 # Reset failure counter on success
|
|
return_to_form(page)
|
|
|
|
# Wait between submissions
|
|
wait_time = random.randint(3, 5) # Random delay 3-5 seconds
|
|
logger.info(f"Waiting {wait_time} seconds before next submission...")
|
|
page.wait_for_timeout(wait_time * 1000)
|
|
else:
|
|
failure_count += 1
|
|
logger.warning(f"Submission failed. Failure count: {failure_count}/{max_failures}")
|
|
|
|
if failure_count >= max_failures:
|
|
logger.error(f"Reached maximum failures ({max_failures}). Exiting application.")
|
|
break
|
|
|
|
# Wait a bit before retrying
|
|
logger.info("Waiting 5 seconds before next attempt...")
|
|
page.wait_for_timeout(5000)
|
|
else:
|
|
# YAML file mode - process all submissions once
|
|
form_data_list = load_form_data()
|
|
failure_count = 0
|
|
max_failures = 3
|
|
successful_submissions = 0
|
|
|
|
for i, form_data in enumerate(form_data_list, 1):
|
|
logger.info(f"--- Processing submission {i}/{len(form_data_list)} ---")
|
|
logger.info(f"Submitting for: {form_data['nombre_local']}")
|
|
|
|
fill_form(page, form_data)
|
|
success = submit_form(page)
|
|
|
|
if success:
|
|
failure_count = 0 # Reset failure counter on success
|
|
successful_submissions += 1
|
|
|
|
# Return to form for next submission (except on last one)
|
|
if i < len(form_data_list):
|
|
return_to_form(page)
|
|
logger.info("Waiting 2 seconds before next submission...")
|
|
page.wait_for_timeout(2000)
|
|
else:
|
|
failure_count += 1
|
|
logger.warning(f"Submission failed. Failure count: {failure_count}/{max_failures}")
|
|
|
|
if failure_count >= max_failures:
|
|
logger.error(f"Reached maximum failures ({max_failures}). Exiting application.")
|
|
break
|
|
|
|
# Return to form to retry or continue
|
|
if i < len(form_data_list):
|
|
return_to_form(page)
|
|
logger.info("Waiting 5 seconds before next attempt...")
|
|
page.wait_for_timeout(5000)
|
|
|
|
logger.info(f"Completed {successful_submissions}/{len(form_data_list)} form submissions successfully")
|
|
|
|
if not headless:
|
|
input("Press Enter to close the browser...")
|
|
else:
|
|
print("Browser opened in headless mode - closing automatically")
|
|
|
|
browser.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|