CoderCastrov logo
CoderCastrov
Питон

Как парсить Reddit с использованием pushshift.io через Python

Как парсить Reddit с использованием pushshift.io через Python
просмотров
4 мин чтение
#Питон

В начале 2018 года Reddit внес некоторые изменения в свое API, что привело к закрытию предыдущего метода получения всего Subreddit. К счастью, существует pushshift.io. Для своих нужд я решил использовать pushshift для получения всех доступных сообщений за определенный период времени. После получения я хотел вернуться на Reddit и получить отдельную публикацию и все ее комментарии. Последний шаг получения с Reddit может не всегда быть необходимым. Все зависит от ваших индивидуальных потребностей.

Извлечение социальных сетей из сообщества Subreddit r/Siacoin

Пошаговое руководство по парсингу и созданию социальных сетей в разрезе месяцев для сообщества Subreddit r/Siacoin.

medium.com

Предупреждение:

import math
import json
import requests
import itertools
import numpy as np
import time
from datetime import datetime, timedelta

Шаг 1: Отправка запроса на pushshift.

[https://api.pushshift.io/reddit/search/submission?subreddit={}&after={}&before={}&size={](https://api.pushshift.io/reddit/search/submission?subreddit=%7B%7D&after=%7B%7D&before=%7B%7D&size=%7B)}
  • Шаблон URI, который принимает исследуемый Sub-Reddit, два временных штампа EPOCH (без миллисекунд) и максимальное количество записей, которые мы хотим получить. На момент написания этого текста, максимальное значение для параметра size равно 500.

Как мы можем быть уверены, что получаем все за указанный период времени?

Решение проблемы 1: "Добавление логики для запроса дополнительных сообщений..."

Сначала нам понадобится метод, который принимает URI и обрабатывает HTTP-запрос/ответ.

def make_request(uri, max_retries=5):
    def fire_away(uri):
        response = requests.get(uri)
        assert response.status_code == 200
        return json.loads(response.content)
    
    current_tries = 1
    while current_tries < max_retries:
        try:
            time.sleep(1)
            response = fire_away(uri)
            return response
        except:
            time.sleep(1)
            current_tries += 1
    return fire_away(uri)
  • Если по какой-то причине наш запрос не выполнится, подождите секунду и повторите попытку. Мы попробуем это 5 раз, прежде чем сдаться.

Второе, свяжите наш предыдущий метод и получите наш интервал. Если мы берем 500, убедитесь, что проверяем, есть ли еще записи.

def pull_posts_for(subreddit, start_at, end_at):
    
    def map_posts(posts):
        return list(map(lambda post: {
            'id': post['id'],
            'created_utc': post['created_utc'],
            'prefix': 't4_'
        }, posts))
    
    SIZE = 500
    URI_TEMPLATE = r'[https://api.pushshift.io/reddit/search/submission?subreddit={}&after={}&before={}&size={}'](https://api.pushshift.io/reddit/search/submission?subreddit=%7B%7D&after=%7B%7D&before=%7B%7D&size=%7B%7D%27)
    
    post_collections = map_posts( \
        make_request( \
            URI_TEMPLATE.format( \
                subreddit, start_at, end_at, SIZE))['data'])
    n = len(post_collections)
    while n == SIZE:
        last = post_collections[-1]
        new_start_at = last['created_utc'] - (10)
        
        more_posts = map_posts( \
            make_request( \
                URI_TEMPLATE.format( \
                    subreddit, new_start_at, end_at, SIZE))['data'])
        
        n = len(more_posts)
        post_collections.extend(more_posts)
    return post_collections

Предупреждение: мы получим дубликаты из-за того, как это написано.

Наконец, объединим все вместе, чтобы получить сообщения за последний год из r/Siacoin. Имейте в виду, что ваши результаты будут отличаться в зависимости от времени суток, когда вы запускаете это.

subreddit = 'Siacoin'
end_at = math.ceil(datetime.utcnow().timestamp())
start_at = math.floor((datetime.utcnow() - \
                       timedelta(days=365)).timestamp())
posts = pull_posts_for(subreddit, start_at, end_at)
## ~ 4314
print(len(posts))
## ~ 4306
print(len(np.unique([ post['id'] for post in posts ])))

Решение проблемы 2: "Создание метода для создания интервалов поиска по времени..."

def give_me_intervals(start_at, number_of_days_per_interval=3):
    
    end_at = math.ceil(datetime.utcnow().timestamp())
        
    ## 1 day = 86400,
    period = (86400 * number_of_days_per_interval)
    end = start_at + period
    yield (int(start_at), int(end))
    padding = 1
    while end <= end_at:
        start_at = end + padding
        end = (start_at - padding) + period
        yield int(start_at), int(end)
## test out the solution,
start_at = math.floor(\
     (datetime.utcnow() - timedelta(days=365)).timestamp())
list(give_me_intervals(start_at, 7))[
 (1509816061, 1510420861),  
 (1510420862, 1511025661),  
 (1511025662, 1511630461),  
 (1511630462, 1512235261),
 ...
]

Наконец, объединим все вместе и создадим окончательное решение.

subreddit = 'Siacoin'
start_at = math.floor(\
    (datetime.utcnow() - timedelta(days=365)).timestamp())
posts = []
for interval in give_me_intervals(start_at, 7):
    pulled_posts = pull_posts_for(
        subreddit, interval[0], interval[1])
    
    posts.extend(pulled_posts)
    time.sleep(.500)
## ~ 4306
print(len(posts))
## ~ 4306
print(len(np.unique([ post['id'] for post in posts ])))
  • Предупреждение: мы все равно можем получить дубликаты. Это происходит из-за того, как мы запрашиваем дополнительные записи, когда наш интервал дает 500 записей.

Надеюсь, все это понятно. Теперь мы перейдем к тому, чтобы взять эти идентификаторы публикаций и получить сообщение и комментарии с Reddit. На этом этапе настройте учетную запись разработчика, если вы еще не сделали этого. (Гугл: "учетная запись разработчика Reddit")

import praw
config = {
    "username": "*",
    "client_id": "*",
    "client_secret": "*",
    "user_agent": "*"
}
reddit = praw.Reddit(client_id=config['client_id'], \
                     client_secret=config['client_secret'], \
                     user_agent=config['user_agent'])
  • Предупреждение: Если вы не настроены правильно, вы получите код состояния 401.

Наконец, вызовите Reddit и сохраните наши сообщения и комментарии. Здесь можно найти записную книжку, в которой описан весь этот процесс here.

**## ПРЕДУПРЕЖДЕНИЕ: REDDIT БУДЕТ ОГРАНИЧИВАТЬ ВАС, ЕСЛИ ВЫ БУДЕТЕ НАПРЯЖНЫМИ! БУДЬТЕ ДОБРЫМИ!**
TIMEOUT_AFTER_COMMENT_IN_SECS = .350
posts_from_reddit = []
comments_from_reddit = []
for submission_id in np.unique([ post['id'] for post in posts ]):
    submission = reddit.submission(id=submission_id)
    posts_from_reddit.append(submission)
    submission.comments.replace_more(limit=None)
    for comment in submission.comments.list():
        comments_from_reddit.append(comment)
        
        if TIMEOUT_AFTER_COMMENT_IN_SECS > 0:
            time.sleep(TIMEOUT_AFTER_COMMENT_IN_SECS)
## ~ 4306
print(len(posts_from_reddit))
## ~ 35216
print(len(comments_from_reddit))

Бонус: Задачи Luigi для обработки под-реддита и разделения сообщений и комментариев на файлы по дням. Практически, активность за каждый день. См. репозитории на Github scrappers и pipelines.