CoderCastrov logo
CoderCastrov
Парсер

Как автоматизировать продвинутый парсинг (скрапинг) электронной почты видео на YouTube с помощью Python.

Как автоматизировать продвинутый парсинг (скрапинг) электронной почты видео на YouTube с помощью Python.
просмотров
6 мин чтение
#Парсер

YouTube - это место, полное контента и медиа. Это отличное место для сбора и парсинга данных. В этом руководстве я расскажу, как парсить видео на YouTube с использованием дополнительных методов и функций, чтобы сделать эту задачу легкой и увлекательной.

Python - отличное место для начала. Его синтаксис дружелюбен к начинающим, а доступность библиотек позволяет легко парсить данные. Мы будем использовать несколько библиотек, чтобы нам помочь. В нашем случае, некоторые основные импорты, которые мы будем использовать: Selenium, multiprocessing, requests и queue.

Давайте начнем с настройки наших импортов...

# ----- Импорт и библиотеки ----- # 
import requests
import re
import time
import queue
import multiprocessing
from threading import Thread
import sys 
# ----- Библиотеки Selenium ----- # 
import selenium
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import chromedriver_autoinstaller

Теперь, когда мы настроили все наши импорты, мы можем начать с создания наших классов и функций для выполнения задачи. Мы начнем с создания класса "Scrape".

Сначала мы инициализируем наш класс и создаем наши переменные экземпляра. У нас есть поисковый запрос, который представляет собой видео, которые мы ищем, у нас есть переменная link count, которая является минимальным количеством ссылок, которое мы хотим получить (если не истекает время ожидания), у нас есть переменные view и sub count, которые установлены для получения максимального количества подписчиков или просмотров, разрешенных для каждого видео, и, наконец, у нас есть self.url, который создает URL для открытия и добавляет "&sp=CAO%253D", что дает нам наиболее недавно загруженные видео.

# Объявления
self.filtered_links = queue.Queue()  
self.links = set()  
self.timeout = False  
self.links_gathered = False  
self.position = 0  
self.manager = multiprocessing.Manager()  
self.return_dict = self.manager.dict()  
self.channels = [] # Используется для хранения просмотренных каналов и их подписчиков

Теперь добавьте этот фрагмент кода сразу после кода внутри __init__. Здесь мы объявляем некоторые переменные, такие как self.filtered_links = queue.Queue(), которая создает очередь без дублирующихся URL-адресов, self.links = set(), чтобы убедиться, что мы не сохраняем дубликаты, self.timeout - это функция тайм-аута, которую мы создаем в случае, если сбор ссылок занимает слишком много времени, self.position - это наше начальное положение для отображения страницы YouTube. Нам нужно использовать JavaScript, чтобы продолжать отображать страницу, чтобы собрать больше ссылок. self.manager и self.return_dict используются для хранения наших электронных писем. И, наконец, у нас есть self.channels, который используется для отслеживания просмотренных каналов и их количества подписчиков (мы делаем это, чтобы код работал быстрее и эффективнее).

# Настройка драйвера и параметры
chromedriver_autoinstaller.install(cwd=True) # Получить последнюю версию chromedriver // Cwd=True -> Сохраняет драйвер в текущем каталоге
self.options = webdriver.ChromeOptions()
self.options.add_argument("--incognito")
self.options.add_argument("--log-level=OFF")
self.options.add_argument('--disable-gpu')
self.options.add_argument("--headless") # Устанавливает безголовый режим для отключения всплывающего окна
self.__FetchVideos() # Получить URL

Теперь мы настраиваем драйверы и параметры Selenium и вызываем нашу функцию FetchVideos. Мы используем библиотеку chromedriver_autoinstaller, чтобы всегда получать правильную обновленную версию chromedriver автоматически. Мы устанавливаем некоторые аргументы, такие как --incognito, чтобы запустить Chrome в режиме инкогнито. Мы также используем --headless, чтобы Chrome не появлялся на экране и работал незаметно в фоновом режиме.

Теперь давайте создадим нашу функцию FetchVideos, которая собирает URL-адреса, соответствующие нашим параметрам. Мы начинаем с создания функции __FetchVideos(self): и объявляем некоторые переменные. Затем мы запускаем и открываем драйвер Chrome. Мы используем with, который является менеджером контекста. Он гарантирует, что код может открыться и закрыться безопасно. Затем мы вызываем driver.get(self.url), который открывает Chrome и получает URL-адрес страницы с видео. Затем мы создаем поток для нашей функции таймера и запускаем его.

Добавьте это сразу после кода выше. Здесь мы отображаем URL-адреса для сбора. Мы используем try и except для перехвата любых ошибок (обычно YouTube показывает капчу). Если мы поймали ошибку, мы закрываем драйвер на всякий случай и выходим из кода. Вначале у нас есть цикл while. Он выполняется, пока количество ссылок не будет собрано или тайм-аут не будет установлен на True. Затем мы запускаем цикл for, используя WebDriverWait, пока все элементы не станут видимыми. Мы используем селектор XPATH, чтобы получить URL-адреса видео. Затем, если мы хотим проверять параметры количества подписчиков или просмотров, мы переходим в оператор if, находим элемент просмотров. В случае с подписчиками нам нужно получить канал и использовать нашу функцию, которую мы создаем __subFinder(channel), чтобы получить количество подписчиков. Если канал уже сохранен в нашем списке, возвращаем количество подписчиков. Затем мы проверяем его в нашем операторе if, чтобы убедиться, что это правильные требования, и если это так, добавляем URL-адрес в self.links. Наконец, мы добавляем 10000 к self.position и выполняем некоторый JavaScript, чтобы отобразить больше видео. Мы добавляем все наши неповторяющиеся ссылки в очередь и устанавливаем self.links_gathered = True.

Теперь давайте создадим нашу функцию __timeoutFunc и функцию __subFinder. Начнем с функции __subFinder.

Здесь мы создаем нашу функцию и принимаем параметр URL. Мы используем requests, чтобы получить исходный код страницы, а затем, используя регулярные выражения, мы пытаемся найти количество подписчиков. YouTube делает это сложным и запутанным, используя свои собственные пользовательские теги, интерпретируемые их собственным кодом JavaScript, поэтому нам пришлось импровизировать. Если мы получаем ошибку, скорее всего это означает, что количество подписчиков скрыто. Затем мы добавляем канал и подписчиков в наш список и возвращаем количество подписчиков.

Здесь мы объявляем, что время ожидания составляет 60 секунд и говорим, что если количество подписчиков включено, добавляем еще 60 секунд, потому что получение количества подписчиков занимает больше времени. Мы устанавливаем 50 ссылок в минуту и, если количество подписчиков, 50 ссылок за 2 минуты. Таким образом, если у нас есть 100 ссылок для получения и не требуется количество подписчиков, это займет максимум 1 минуту, прежде чем вернуть собранные ссылки.

Теперь ранее вы видели эту функцию, cstn. Это еще одна функция, которую мы создали, чтобы преобразовывать строковые числа, такие как "5k" или "50,000", в целое число. Вот как выглядит функция.

Теперь давайте перейдем к парсингу (скрапингу) электронной почты. Мы будем использовать многопроцессорность, чтобы сделать это быстрее и эффективнее, используя вычислительные возможности нашего компьютера.

Здесь мы создаем нашу рабочую функцию. Эта функция получает страницу URL-адресов из очереди, использует регулярные выражения для поиска электронной почты и вызывает нашу функцию fix_email, чтобы исправить любые неправильные электронные письма, например, techysavage@gmail... станет techysavage@gmail.com. Затем мы сохраняем его в нашем словаре менеджера и освобождаем блокировку. Ниже приведена наша функция fix_email, которая пытается исправить любые незначительные ошибки.

# ----- Функция исправления электронной почты ----- #
def fix_email(email):
    endings = ['.com', '.us', '.ca', '.ge', '.uk', '.it']
    # Для разных типов доменных окончаний
    # Проходим по окончаниям и исправляем электронную почту
    for e in endings:
        if e in email and not email.endswith(e):
            return email.split(e)[0] + e
    return email

Наконец, у нас есть функция run, которая обрабатывает все это. Сначала мы проверяем, является ли введенное количество процессоров цифрой, если нет, то мы проверяем, говорит ли параметр "half" или "full". Half означает, что он использует половину логических процессоров или использует использование ЦП на уровне около 50%, тогда как full использует все процессоры. Мы создаем нашу семафорную блокировку, чтобы она могла использовать только определенное количество логических процессоров. Затем мы создаем наш объект scrape и запускаем его. Как только он закончится, мы вызываем нашу функцию start_processes и запускаем рабочих. Когда все закончится, мы получаем наши сохраненные электронные письма без дубликатов в виде множества и создаем объект, который содержит общее время и сохраненные электронные письма в виде словаря.

Спасибо за чтение, и я надеюсь, что вы узнали что-то новое и этот код поможет вам. Вы можете проверить код на моем GitHub или скопировать полный код ниже. 👍 https://github.com/Ashura-R/Youtube-Email-Scraper/