X Автоматизация и скраппинг веб-сайтов с помощью Selenium
Для небольших проектов Selenium мы не препятствуем автоматизации обнаружения, но стараемся имитировать поведение человека.

Когда вы хотите получить данные из Web, вы должны знать, что вы делаете. Не стоит перегружать целевой сервер запросами. Если вы делаете это из одного места, например IP address, вы можете получить (временный) бан.
Если вы хотите скрести большие объемы, рассмотрите возможность использования специализированных сервисов, таких как ZenRows, ScrapFly, WebScrapingAPI, ScrapingAnt и т.д. Они dist распределяют ваши запросы по множеству систем, каждая из которых имеет уникальный IP address, что означает, что целевой сервер будет думать, что к нему обращается множество различных (человеческих) клиентов.
Но иногда мы просто хотим соскрести немного данных, скажем, каждый день просматривать новые сообщения нескольких человек в социальных сетях, например X .
В этом посте мы будем использовать Selenium. Существуют такие решения, как Puppeteer или Playwright, но Selenium существует с 2004 года, это наиболее широко используемый инструмент для автоматизированного тестирования и имеет большое сообщество.
Я начну с некоторых соображений и закончу рабочим кодом. Код автоматизирует вход в X, , а затем поиск и отбраковку некоторых сообщений. Я знаю, что завтра все может измениться, но сегодня это работает.
Как обычно, я использую Ubuntu 22.04.
Скраппинг из X с помощью Selenium
Недавно X решил, что для просмотра сообщений необходимо войти в систему. Мне это не нравится. Большую часть времени я читаю сообщения только нескольких человек, используя расширение для браузера 'Breakthrough Twitter Login Wall'. Это больше не работает.
И иногда я скребу некоторые данные, но не сотни тысяч твитов. Думаю, так поступают многие разработчики. В мире насчитывается почти 30 миллионов инженеров-программистов. Теперь предположим, что каждый тысячный иногда раз в год скребет несколько X постов, которые скребут в среднем по 1000 постов каждый, для небольших проектов, тестирования и развлечения (обучения). То есть 30.000 скреперов в год, 100 скреперов в день, это 100.000 постов в день. Это не так уж и много, учитывая миллионы сообщений каждый день. X все еще имеет бесплатный аккаунт API , но он доступен только для записи. Если мы хотим получить данные, мы можем использовать только веб-сайт X и получить доступ к нему с помощью таких инструментов, как Selenium.
Selenium - это очень мощный инструмент для автоматизированного тестирования, но его также можно использовать для веб-скрепинга. Он был выпущен в 2004 году и имеет большое сообщество. Другие популярные инструменты, которые вы можете рассмотреть, - Puppeteer и Playwright.
Имитация человеческого поведения
Я бы сказал, что соскабливание веб-страниц не является противозаконным, если вы не выпускаете соскобленные данные в узнаваемом виде. Тем не менее, я могу себе представить, что некоторые компании блокируют доступ к своим веб-страницам и сервисам для автоматизированного программного обеспечения.
Что именно представляет собой автоматизированное программное обеспечение и как его можно обнаружить? Самая очевидная проверка - определить, отличается ли поведение посетителя от поведения человека. Обычно это означает, что ваше автоматизированное программное обеспечение должно вести себя как человек. Ввести что-то и подождать несколько секунд, нажать на кнопку и подождать несколько секунд, прокрутить страницу вниз и подождать несколько секунд и т.д. Человек не может заполнить форму за 50 миллисекунд...
Рассмотрим другой сценарий, когда Selenium используется для открытия страниц Web-сайта с помощью голосового управления. Даже если сайт обнаружит, что мы используем Selenium, это выглядит вполне законно, поскольку совершенно очевидно, что действия выполняет человек.
Зайти на сайт и купить дешевый товар - это человеческое поведение, но покупать все дешевые товары в больших количествах - это подозрительно. Особенно если это происходит изо дня в день.
Это означает, что главное, что должно делать наше программное обеспечение, - имитировать поведение человека. Сделать что-то и подождать случайное количество секунд, сделать что-то еще и подождать случайное количество секунд и т.д.
Выбор браузера
Selenium поддерживает множество браузеров. Для данного проекта я использую Chrome. Причина этого не вызывает сомнений: Chrome - самый используемый браузер не только среди пользователей Интернета user, но и среди разработчиков. На своей машине для разработчиков я использую Firefox, Chromium и (иногда) Brave. Это означает, что я могу использовать Chrome исключительно для этого проекта. Если вы используете Chrome также для просмотра веб-страниц, то вы можете создать отдельный профиль для приложения скраппинга.
Обнаружение скрепер-бота
Скрепер-бот - это инструмент или фрагмент кода, используемый для извлечения данных с веб-страниц. Веб-скреппинг - явление не новое, и многие организации предпринимают меры по предотвращению доступа автоматизированных инструментов к своим сайтам. В статье 'How to Avoid Bot Detection with Selenium', см. ссылки ниже, перечислены несколько способов избежать обнаружения бота.
Если мы ничего не будем делать, то обнаружить нашего бота Selenium будет очень легко. Пока мы ведем себя как человек, это может быть не так уж плохо. Мы показываем целевому сайту, что мы либо скрипт-кидди, либо хотим, чтобы они заметили, что мы используем какую-то автоматизацию.
Тем не менее, некоторые компании могут блокировать нас по своим законным причинам. Давайте посмотрим правде в глаза: хороший бот может внезапно превратиться в очень плохого.
Есть несколько сайтов, которые можно использовать для определения того, признан ли наш бот автоматизированным программным обеспечением:
- https://fingerprint.com/products/bot-detection
- https://pixelscan.net
- https://nowsecure.nl/#relax
Ниже приведен пример кода, который можно использовать для тестирования:
# bot_detect_test.py
import os
import time
# selenium
from selenium import webdriver
# chromedriver_binary, see pypi.org
import chromedriver_binary
# configure webdriver
from selenium.webdriver.chrome.options import Options
webdriver_options = Options()
# use a common screen format
webdriver_options.add_argument('window-size=1920,1080')
# profile
user_data_dir = os.path.join(os.getcwd(), 'selenium_data')
webdriver_options.add_argument('user-data-dir=' + user_data_dir)
# allow popups
webdriver_options.add_experimental_option('excludeSwitches', ['disable-popup-blocking']);
def main():
with webdriver.Chrome(options=webdriver_options) as wd:
wd.get('https://fingerprint.com/products/bot-detection')
time.sleep(8)
wd.save_screenshot('fingerprint.png')
print(f'ready')
if __name__ == '__main__':
main()
Будем ли мы заблокированы?
Чем больше мы пытаемся предотвратить обнаружение нашего бота Selenium , тем более подозрительными мы можем показаться бот-детекторам.
Существует проект 'undetected_chromedriver', см. ссылки ниже, который пытается поставить бота, основанного на Selenium, который не может быть обнаружен. Но если он будет обнаружен, то целевой сайт может решить заблокировать вас, потому что... зачем кому-то быть необнаруживаемым?
Компании, разрабатывающие детекторы ботов или предлагающие услуги по их обнаружению, внимательно следят за проектами, подобными 'undetected_chromedriver', и стараются опередить все реализуемые методы предотвращения обнаружения.
Проблему представляют и обновления. Новая версия Selenium или Chrome может нарушить меры по предотвращению обнаружения. Но и длительное отсутствие обновлений Chrome также может стать причиной обнаружения и вызвать подозрения.
Так что же делать? Ничего, немного, или использовать, например, 'undetected_chromedriver'? Если вы хотите наскрести много, то лучше всего воспользоваться сервисами, подобными вышеупомянутым. Для небольших проектов я предлагаю ничего не делать, чтобы предотвратить обнаружение, и посмотреть, что получится.
Действительно ли нам нужно автоматизировать вход в систему?
Вместо того чтобы автоматизировать вход в систему и регистрироваться с помощью нашего скрипта, мы можем войти в систему вручную с помощью браузера, а затем использовать данные из браузера (данные профиля, сессии) при подключении к X. Это сэкономит много кода. Но ... наше приложение в этом случае не будет полностью автоматизировано. А ведь мы используем Selenium, инструмент для автоматизации работы в Интернете (!).
О коде
Приведенный ниже код автоматизирует вход в X , а затем отбирает несколько постов из выбранного user. После этого скрипт завершает работу. Он использует отдельный профиль для веб-браузера, поэтому не должен мешать обычному использованию браузера. Если вход в систему прошел успешно и скрипт существует, то при следующем запуске скрипта он уже не входит в систему, а сразу начинает выбирать user и перебирать сообщения.
Посты выбранного user сбрасываются в файл 'posts.txt'. Это означает, что код не извлекает части сообщений. Для этого можно использовать BeautifulSoup .
Распознавание страниц
Наш скрипт работает со следующими страницами:
Когда вы не вошли в систему:
- Главная страница
- Вход - введите e-mail
- Вход - ввести пароль
- Логин - введите username (при необычной активности)
При входе в систему:
- Страница входа в систему
Страница "Необычная активность" показывается иногда, например, когда вы уже вошли в систему через другой браузер, перед страницей "Введите пароль".
Что мы делаем в цикле:
- Извлекаем язык
- Определить, какая страница показана
- Выполняем действие для страницы
Если мы находимся на главной странице, то просто перенаправляем на страницу входа в систему. Все остальные страницы без входа в систему имеют заголовок, "поле формы" и "кнопку формы". Это означает, что мы можем работать с ними аналогичным образом.
Причем, говоря о страницах, мы имеем в виду не URL, а то, что отображается на экране. X - это все Javascript.
Детектор "на какой странице мы находимся
Это, пожалуй, самый "интересный" фрагмент кода. Сначала мы создаем список взаимоисключающих элементов 'presence_of_element_located':
- Profile-button => logged in
- Кнопка входа в систему с текстом 'Sign in' => 'home_page'
- <h1> текст 'Войти в X' => 'login_page_1_email'
- <h1> text 'Введите пароль' => 'login_page_2_password'
- <h1> text 'Введите номер телефона или username' => 'login_page_3_username'
Затем мы ждем, пока хотя бы один из элементов не станет расположенным:
wait = WebDriverWait(
....
)
elem = wait.until(EC.any_of(*tuple(or_conditions)))
Получив элемент, мы можем определить страницу. А получив страницу, мы можем выполнить соответствующее действие.
Oops: 'Что-то пошло не так. Попробуйте перезагрузить".
При входе в систему иногда появляется это сообщение. Я вижу его в случайное время, поэтому думаю, не было ли оно встроено специально. При этом код выдает ошибку таймаута. Необходимо перезапустить скрипт или реализовать собственную функцию повторной попытки.
Языки
Есть два языка, с которыми нам приходится иметь дело: язык незарегистрированных пользователей и язык учетных записей. Часто они совпадают. Приведенный ниже код работает для английского и голландского языков, даже смешанных. Язык извлекается из страницы.
Чтобы добавить новый язык, добавьте его в:
# languages
self.available_langs = ['en', 'nl']
, и добавьте тексты на страницы:
# pages
self.pages = [
...
]
Конечно, сначала необходимо найти тексты, используя ручной вход в систему.
Чтобы запустить браузер на другом языке, сначала удалите каталог профиля браузера:
rm -R browser_profile
, а затем запустить скрипт после (!) установки новой локали:
bash -c 'LANGUAGE=nl_NL.UTF-8 python x_get_posts.py'
XPath поиск
Если вы хотите искать от корня документа, начните XPath с '//'.
Если вы хотите искать относительно определенного элемента, начните XPath с './/'.
Посты
На данный момент видимые посты записываются в файл HTML в виде posts.txt. Чтобы получить больше постов, реализуйте собственную функцию прокрутки вниз.
Извлечение данных из поста
Здесь все зависит от вас. Я предлагаю использовать BeautifulSoup.
Код
Если вы хотите попробовать, то вот код. Перед запуском убедитесь, что вы добавили данные своего аккаунта и имя поиска.
# x_get_posts.py
import logging
import os
import random
import sys
import time
# selenium
from selenium import webdriver
# using chromedriver_binary
import chromedriver_binary # Adds chromedriver binary to path
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
import selenium.common.exceptions as selenium_exceptions
from selenium.webdriver.chrome.options import Options
# configure webdriver: hide gui, window size, full-screen
webdriver_options = Options()
#webdriver_options.add_argument('--headless')
# use a common screen format
webdriver_options.add_argument('window-size=1920,1080')
# use a separate profile
user_data_dir = os.path.join(os.getcwd(), 'browser_profile')
webdriver_options.add_argument('user-data-dir=' + user_data_dir)
# allow popups
webdriver_options.add_experimental_option('excludeSwitches', ['disable-popup-blocking']);
# force language
webdriver_options.add_argument('--lang=nl-NL');
def get_logger(
console_log_level=logging.DEBUG,
file_log_level=logging.DEBUG,
log_file=os.path.splitext(__file__)[0] + '.log',
):
logger_format = '%(asctime)s %(levelname)s [%(filename)-30s%(funcName)30s():%(lineno)03s] %(message)s'
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
if console_log_level:
# console
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(console_log_level)
console_handler.setFormatter(logging.Formatter(logger_format))
logger.addHandler(console_handler)
if file_log_level:
# file
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(file_log_level)
file_handler.setFormatter(logging.Formatter(logger_format))
logger.addHandler(file_handler)
return logger
logger = get_logger(
file_log_level=None,
)
logger.debug('START')
logger.debug(f'user_data_dir = {user_data_dir}')
class SeleniumBase:
def __init__(
self,
logger=None,
wd=None,
):
self.logger = logger
self.wd = wd
self.web_driver_wait_timeout = 10
self.web_driver_wait_poll_frequency = 1
self.min_wait_for_next_page = 3.
def get_elem_attrs(self, elem, dump=False, dump_header=None):
elem_attrs = self.wd.execute_script('var items = {}; for (index = 0; index < arguments[0].attributes.length; ++index) { items[arguments[0].attributes[index].name] = arguments[0].attributes[index].value }; return items;', elem)
if dump:
if dump_header is not None:
self.logger.debug(f'{dump_header}')
for k, v in elem_attrs.items():
self.logger.debug(f'attrs {k}: {v}')
return elem_attrs
def random_wait(self, min_wait_secs=None):
self.logger.debug(f'(min_wait_secs = {min_wait_secs})')
min_wait_secs = min_wait_secs or 2.
wait_secs = min_wait_secs + 2 * random.uniform(0, 1)
self.logger.debug(f'wait_secs = {wait_secs:.1f}')
time.sleep(wait_secs)
def elem_click_wait(self, elem, before_min_wait=None, after_min_wait=None):
self.logger.debug(f'(, before_min_wait = {before_min_wait}, after_min_wait = {after_min_wait})')
if before_min_wait is not None:
self.random_wait(min_wait_secs=before_min_wait)
elem.click()
if after_min_wait is not None:
self.random_wait(min_wait_secs=after_min_wait)
def elem_send_keys_wait(self, elem, value, before_min_wait=None, after_min_wait=None):
self.logger.debug(f'(, value = {value}, before_min_wait = {before_min_wait}, after_min_wait = {after_min_wait})')
if before_min_wait is not None:
self.random_wait(min_wait_secs=before_min_wait)
elem.send_keys(value)
if after_min_wait is not None:
self.random_wait(min_wait_secs=after_min_wait)
def set_form_field_value_and_click(self, form_field_elem=None, form_field_value=None, form_button_elem=None):
self.logger.debug(f'(form_field_elem = {form_field_elem}, form_field_value = {form_field_value}, form_button_elem = {form_button_elem})')
if form_field_elem is not None and form_field_value is not None:
self.random_wait(min_wait_secs=1.)
form_field_elem.send_keys(form_field_value)
if form_button_elem is not None:
self.random_wait(min_wait_secs=.5)
form_button_elem.click()
self.random_wait(min_wait_secs=self.min_wait_for_next_page)
def wait_for_element(
self,
xpath,
parent_xpath=None,
parent_attr_check=None,
visible=True,
):
self.logger.debug(f'xpath = {xpath}, parent_xpath = {parent_xpath}, parent_attr_check = {parent_attr_check}, visible = {visible}')
wait = WebDriverWait(
self.wd,
timeout=self.web_driver_wait_timeout,
poll_frequency=self.web_driver_wait_poll_frequency,
ignored_exceptions=[
selenium_exceptions.ElementNotVisibleException,
selenium_exceptions.ElementNotSelectableException,
],
)
try:
if visible:
elem = wait.until(EC.visibility_of_element_located((By.XPATH, xpath)))
else:
elem = wait.until(EC.presence_of_element_located((By.XPATH, xpath)))
except selenium_exceptions.TimeoutException as e:
self.logger.exception(f'TimeoutException, e = {e}, e.args = {e.args}')
raise
except Exception as e:
self.logger.exception(f'other exception, e = {e}')
raise
self.get_elem_attrs(elem, dump=True, dump_header='elem')
if parent_xpath is None:
self.logger.debug(f'elem.tag_name = {elem.tag_name}')
return elem
parent_elem = elem.find_element(By.XPATH, parent_xpath)
self.get_elem_attrs(parent_elem, dump=True, dump_header='parent_elem')
if parent_attr_check is not None:
for k, v_expected in parent_attr_check.items():
v = parent_elem.get_attribute(k)
if v != v_expected:
raise Exception(f'parent_elem = {parent_elem}, attr k = {k}, v = {v} != v_expected = {v_expected}')
self.logger.debug(f'parent_elem.tag_name = {parent_elem.tag_name}')
return parent_elem
def wait_for_elements(self, xpath):
self.logger.debug(f'xpath = {xpath}')
wait = WebDriverWait(
self.wd,
timeout=self.web_driver_wait_timeout,
poll_frequency=self.web_driver_wait_poll_frequency,
ignored_exceptions=[
selenium_exceptions.ElementNotVisibleException,
selenium_exceptions.ElementNotSelectableException,
],
)
try:
elems = wait.until(EC.presence_of_all_elements_located((By.XPATH, xpath)))
except selenium_exceptions.TimeoutException as e:
self.logger.exception(f'TimeoutException, e = {e}, e.args = {e.args}')
raise
except Exception as e:
self.logger.exception(f'other exception, e = {e}')
raise
elems_len = len(elems)
self.logger.debug(f'elems_len = {elems_len}')
return elems
class XGetPosts(SeleniumBase):
def __init__(
self,
logger=None,
wd=None,
account=None,
search_for=None,
):
super().__init__(
logger=logger,
wd=wd,
)
self.account = account
self.search_for = search_for
# langs are in the <html> tag
self.available_langs = ['en', 'nl']
# pages
self.pages = [
{
'name': 'home_page',
'texts': {
'en': {
'login_button': 'Sign in',
},
'nl': {
'login_button': 'Inloggen',
},
},
'page_processor': self.home_page,
'login_button_text_xpath': '//a[@role="link"]/div/span/span[text()[normalize-space()="' + '{{login_button_text}}' + '"]]'
},
{
'name': 'login_page_1_email',
'texts': {
'en': {
'title': 'Sign in to X',
'form_button': 'Next',
},
'nl': {
'title': 'Registreren bij X',
'form_button': 'Volgende',
},
},
'page_processor': self.login_page_1_email,
'form_field_xpath': '//input[@type="text" and @name="text" and @autocomplete="username"]',
'form_field_value': self.account['email'],
'form_button_text_xpath': '//div[@role="button"]/div/span/span[text()[normalize-space()="' + '{{form_button_text}}' + '"]]'
},
{
'name': 'login_page_2_password',
'texts': {
'en': {
'title': 'Enter your password',
'form_button': 'Log in',
},
'nl': {
'title': 'Voer je wachtwoord in',
'form_button': 'Inloggen',
},
},
'page_processor': self.login_page_2_password,
'form_field_xpath': '//input[@type="password" and @name="password" and @autocomplete="current-password"]',
'form_field_value': self.account['password'],
'form_button_text_xpath': '//div[@role="button"]/div/span/span[text()[normalize-space()="' + '{{form_button_text}}' + '"]]',
},
{
'name': 'login_page_3_username',
'texts': {
'en': {
'title': 'Enter your phone number or username',
'form_button': 'Next',
},
'nl': {
'title': 'Voer je telefoonnummer of gebruikersnaam',
'form_button': 'Volgende',
},
},
'page_processor': self.login_page_3_username,
'form_field_xpath': '//input[@type="text" and @name="text" and @autocomplete="on"]',
'form_field_value': self.account['username'],
'form_button_text_xpath': '//div[@role="button"]/div/span/span[text()[normalize-space()="' + '{{form_button_text}}' + '"]]',
},
{
'name': 'loggedin_page',
'texts': {
'en': {
},
'nl': {
},
},
'page_processor': self.loggedin_page,
},
]
self.page_name_pages = {}
for page in self.pages:
self.page_name_pages[page['name']] = page
def get_page_lang(self, url):
self.logger.debug(f'(url = {url})')
# get lang from html tag
html_tag_xpath = '//html[@dir="ltr"]'
html_tag_elem = self.wait_for_element(
xpath=html_tag_xpath,
)
# lang must be present and available
lang = html_tag_elem.get_attribute('lang')
if lang not in self.available_langs:
raise Exception(f'lang = {lang} not in available_langs = {self.available_langs}')
self.logger.debug(f'lang = {lang}')
return lang
def which_page(self, url, lang):
self.logger.debug(f'(url = {url}, lang = {lang})')
# construct list of items that identify:
# - not logged in pages
# - logged in
or_conditions = []
for page in self.pages:
page_name = page['name']
if page_name == 'home_page':
# check for 'Sign in' button
login_button_text = page['texts'][lang]['login_button']
xpath = '//a[@href="/login" and @role="link" and @data-testid="loginButton"]/div[@dir="ltr"]/span/span[text()[normalize-space()="' + login_button_text + '"]]'
self.logger.debug(f'xpath = {xpath}')
or_conditions.append( EC.visibility_of_element_located((By.XPATH, xpath)) )
elif page_name == 'login_page_1_email':
# check for <h1> title
title_text = page['texts'][lang]['title']
xpath = '//h1/span/span[text()[normalize-space()="' + title_text + '"]]'
self.logger.debug(f'xpath = {xpath}')
or_conditions.append( EC.visibility_of_element_located((By.XPATH, xpath)) )
elif page_name == 'login_page_2_password':
# check for <h1> title
title_text = page['texts'][lang]['title']
xpath = '//h1/span/span[text()[normalize-space()="' + title_text + '"]]'
self.logger.debug(f'xpath = {xpath}')
or_conditions.append( EC.visibility_of_element_located((By.XPATH, xpath)) )
elif page_name == 'login_page_3_username':
# check for <h1> title
title_text = page['texts'][lang]['title']
xpath = '//h1/span/span[text()[normalize-space()="' + title_text + '"]]'
self.logger.debug(f'xpath = {xpath}')
or_conditions.append( EC.visibility_of_element_located((By.XPATH, xpath)) )
# check if logged in using profile button
xpath = '//a[@href="/' + self.account['screen_name'] + '" and @role="link" and @data-testid="AppTabBar_Profile_Link"]'
self.logger.debug(f'xpath = {xpath}')
or_conditions.append( EC.visibility_of_element_located((By.XPATH, xpath)) )
# or_conditions
self.logger.debug(f'or_conditions = {or_conditions}')
wait = WebDriverWait(
self.wd,
timeout=self.web_driver_wait_timeout,
poll_frequency=self.web_driver_wait_poll_frequency,
ignored_exceptions=[
selenium_exceptions.ElementNotVisibleException,
selenium_exceptions.ElementNotSelectableException,
],
)
try:
elem = wait.until(EC.any_of(*tuple(or_conditions)))
except selenium_exceptions.TimeoutException as e:
self.logger.exception(f'selenium_exceptions.TimeoutException, e = {e}, e.args = {e.args}')
raise
except Exception as e:
self.logger.exception(f'other exception, e = {e}')
raise
# not logged in and a known page, ... or .... logged in
self.logger.debug(f'elem.tag_name = {elem.tag_name}')
self.get_elem_attrs(elem, dump=True)
page = None
if elem.tag_name == 'a':
href = elem.get_attribute('href')
if href == '/login':
page = self.page_name_pages['home_page']
else:
data_testid = elem.get_attribute('data-testid')
if data_testid == 'AppTabBar_Profile_Link':
page = self.page_name_pages['loggedin_page']
elif elem.tag_name == 'span':
elem_text = elem.text
self.logger.debug(f'elem_text = {elem_text}')
if elem_text == self.page_name_pages['home_page']['texts'][lang]['login_button']:
page = self.page_name_pages['home_page']
elif elem_text == self.page_name_pages['login_page_1_email']['texts'][lang]['title']:
page = self.page_name_pages['login_page_1_email']
elif elem_text == self.page_name_pages['login_page_2_password']['texts'][lang]['title']:
page = self.page_name_pages['login_page_2_password']
elif elem_text == self.page_name_pages['login_page_3_username']['texts'][lang]['title']:
page = self.page_name_pages['login_page_3_username']
elif elem.tag_name == 'h1':
pass
self.logger.debug(f'page = {page}')
if page is None:
raise Exception(f'page is None')
return page
def process_page(self, url):
self.logger.debug(f'(url = {url})')
lang = self.get_page_lang(url)
page = self.which_page(url, lang)
page_processor = page['page_processor']
if page_processor is None:
raise Exception(f'page_processor is None')
return page_processor(page, url, lang)
def login_page_123_processor(self, page, url, lang):
self.logger.debug(f'(page = {page}, url = {url}, lang = {lang}')
# get (optional) form_field and form_button
form_field_xpath = page.get('form_field_xpath')
form_field_value = page.get('form_field_value')
form_field_elem = None
if form_field_xpath is not None:
form_field_elem = self.wait_for_element(
xpath=form_field_xpath,
)
form_button_elem = self.wait_for_element(
xpath=page['form_button_text_xpath'].replace('{{form_button_text}}', page['texts'][lang]['form_button']),
parent_xpath='../../..',
parent_attr_check={
'role': 'button',
},
)
# enter form_field_value and click
self.set_form_field_value_and_click(
form_field_elem=form_field_elem,
form_field_value=form_field_value,
form_button_elem=form_button_elem,
)
# return current_url
current_url = self.wd.current_url
self.logger.debug(f'current_url = {current_url}')
return current_url
def home_page(self, page, url, lang):
self.logger.debug(f'page = {page}, url = {url}, lang = {lang}')
login_button_elem = self.wait_for_element(
xpath=page['login_button_text_xpath'].replace('{{login_button_text}}', page['texts'][lang]['login_button']),
parent_xpath='../../..',
parent_attr_check={
'role': 'link',
},
)
href = login_button_elem.get_attribute('href')
self.logger.debug(f'href = {href}')
# redirect
self.wd.get(href)
self.random_wait(min_wait_secs=self.min_wait_for_next_page)
# return current_url
current_url = self.wd.current_url
self.logger.debug(f'current_url = {current_url}')
return current_url
def login_page_1_email(self, page, url, lang):
return self.login_page_123_processor(page, url, lang=lang)
def login_page_2_password(self, page, url, lang):
return self.login_page_123_processor(page, url, lang=lang)
def login_page_3_username(self, page, url, lang):
return self.login_page_123_processor(page, url, lang=lang)
def loggedin_page(self, page, url, lang):
self.logger.debug(f'page = {page}, url = {url}, lang = {lang}')
# locate search box
xpath = '//form[@role="search"]/div/div/div/div/label/div/div/input[@type="text" and @data-testid="SearchBox_Search_Input"]'
search_field_elem = self.wait_for_element(xpath)
# type the search item
self.elem_send_keys_wait(
search_field_elem,
self.search_for,
before_min_wait=2,
after_min_wait=self.min_wait_for_next_page,
)
# locate search result option buttons
xpath = '//div[@role="listbox" and starts-with(@id, "typeaheadDropdown-")]/div[@role="option" and @data-testid="typeaheadResult"]/div[@role="button"]'
button_elems = self.wait_for_elements(xpath)
# find search_for in options
xpath = './/span[text()[normalize-space()="' + self.search_for + '"]]'
found = False
for button_elem in button_elems:
try:
elem = button_elem.find_element(By.XPATH, xpath)
found = True
break
except selenium_exceptions.NoSuchElementException:
continue
if not found:
raise Exception(f'search_for = {search_for} not found in typeaheadDropdown')
# click the found item
self.elem_click_wait(
button_elem,
before_min_wait=2,
after_min_wait=self.min_wait_for_next_page,
)
# slurp posts visibility_of_element_located
xpath = '//article[@data-testid="tweet"]'
posts = self.wait_for_elements(xpath)
# dump posts
post_html_items = []
posts_len = len(posts)
visible_posts_len = 0
for post in posts:
self.logger.debug(f'tag = {post.tag_name}')
self.logger.debug(f'aria-labelledby = {post.get_attribute("aria-labelledby")}')
self.logger.debug(f'data-testid = {post.get_attribute("data-testid")}')
self.logger.debug(f'post.is_displayed() = {post.is_displayed()}')
if not post.is_displayed():
continue
visible_posts_len += 1
# expand to html
post_html = post.get_attribute('outerHTML')
post_html_items.append(post_html)
if posts_len > 0:
with open('posts.txt', 'w') as fo:
fo.write('\n'.join(post_html_items))
self.logger.debug(f'posts_len = {posts_len}')
self.logger.debug(f'visible_posts_len = {visible_posts_len}')
self.random_wait(min_wait_secs=5)
sys.exit()
def main():
# your account
account = {
'email': 'johndoe@example.com',
'password': 'secret',
'username': 'johndoe',
'screen_name': 'JohnDoe',
}
# your search
search_for = '@elonmusk'
with webdriver.Chrome(options=webdriver_options) as wd:
x = XGetPosts(
logger=logger,
wd=wd,
account=account,
search_for=search_for,
)
url = 'https://www.x.com'
wd.get(url)
x.random_wait(min_wait_secs=x.min_wait_for_next_page)
url = wd.current_url
while True:
logger.debug(f'url = {url}')
url = x.process_page(url)
logger.debug(f'ready ')
if __name__ == '__main__':
main()
Резюме
В этой заметке мы использовали Selenium для автоматизации входа в X и извлечения ряда сообщений. Для поиска элементов мы используем в основном (относительно) XPaths. Мы не пытаемся скрыть, что используем автоматизацию, но стараемся вести себя не хуже человека. Код поддерживает несколько языков. Код работает сегодня, но может не работать завтра из-за изменений в текстах и именовании.
Ссылки / кредиты
ChromeDriver - WebDriver for Chrome
https://sites.google.com/a/chromium.org/chromedriver/capabilities
Playwright
https://playwright.dev/python
Puppeteer
https://pptr.dev
ScrapingAnt - Puppeteer vs. Selenium - Which Is Better? + Bonus
https://scrapingant.com/blog/puppeteer-vs-selenium
Selenium with Python
https://selenium-python.readthedocs.io/index.html
Selenium with Python - 5. Waits
https://selenium-python.readthedocs.io/waits.html
Tracking Modified Selenium ChromeDriver
https://datadome.co/bot-management-protection/tracking-modified-selenium-chromedriver
undetected_chromedriver
https://github.com/ultrafunkamsterdam/undetected-chromedriver
WebScrapingAPI
https://www.webscrapingapi.com
WebScrapingAPI
https://www.webscrapingapi.com
ZenRows - How to Avoid Bot Detection with Selenium
https://www.zenrows.com/blog/selenium-avoid-bot-detection
Подробнее
Scraping Selenium Web automation
Недавний
- График временного ряда с Flask, Bootstrap и Chart.js
- Использование IPv6 с Microk8s
- Использование Ingress для доступа к RabbitMQ на кластере Microk8s
- Простая видеогалерея с Flask, Jinja, Bootstrap и JQuery
- Базовое планирование заданий с помощью APScheduler
- Коммутатор базы данных с HAProxy и HAProxy Runtime API
Большинство просмотренных
- Использование PyInstaller и Cython для создания исполняемого файла Python
- Уменьшение времени отклика на запросы на странице Flask SQLAlchemy веб-сайта
- Используя Python pyOpenSSL для проверки SSL-сертификатов, загруженных с хоста
- Подключение к службе на хосте Docker из контейнера Docker
- Использование UUID вместо Integer Autoincrement Primary Keys с SQLAlchemy и MariaDb
- SQLAlchemy: Использование Cascade Deletes для удаления связанных объектов