123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- #!/usr/bin/env python3
-
- import os
- import sys
-
- import json
- import re
- import tempfile
- from urllib.parse import urlparse
-
- import requests
- from mastodon import Mastodon
-
- def load_config():
- with open(os.getenv('MR_CONFIG', 'config.json'), 'r') as f:
- return json.loads(f.read())
-
- def load_recents():
- try:
- with open(os.getenv('MR_RECENTS', 'recents.json'), 'r') as f:
- return json.loads(f.read())
- except NotFound:
- return []
-
- def save_recents(recents):
- with open(os.getenv('MR_RECENTS', 'recents.json'), 'w') as f:
- f.write(json.dumps(recents))
-
- def make_masto(config):
- baseurl = 'https://%s' % config['hostname']
- return Mastodon(access_token=config['token'], api_base_url=baseurl)
-
- def download_file(url, destdir):
- resp = requests.get(url)
- resp.raise_for_status()
-
- fname = None
- if 'content-disposition' in resp.headers:
- # Taken from: https://stackoverflow.com/questions/31804799/
- d = resp.headers['content-disposition']
- fname = re.findall("filename=(.+)", d)[0]
- else:
- u = urlparse(url)
- fname = u.path.rsplit('/', 1)[-1]
-
- destpath = os.path.join(destdir, fname)
-
- with open(destpath, 'wb') as f:
- f.write(resp.content)
-
- return destpath
-
- def make_post(masto, data):
- with tempfile.TemporaryDirectory(prefix='mastoreddit.', dir='/tmp') as td:
- os.makedirs(td, exist_ok=True)
-
- if len(data['media']) > 4:
- data['media'] = data['media'][:4]
-
- image_resps = None
- if data['media'] is not None and len(data['media']) > 0:
- image_resps = []
- for a in data['media']:
- dlpath = download_file(a, td)
- res = masto.media_post(dlpath)
- image_resps.append(res)
-
- body = '%s\n\nby /u/%s at https://old.reddit.com/%s' % (data['title'], data['username'], data['rid'])
- masto.status_post(body, media_ids=image_resps)
-
- def extract_media_urls(data):
-
- if 'selftext' in data:
- st = data['selftext']
- if st is not None and len(st) > 0:
- return []
-
- if 'post_hint' not in data:
- if 'gallery_data' in data:
- gd = data['gallery_data']['items']
- mmd = data['media_metadata']
-
- ents = []
- for ent in gd:
- mid = ent['media_id']
- mime = mmd[mid]['m']
- _, ext = mime.split('/')
- url = 'https://i.redd.it/%s.%s' % (mid, ext)
- ents.append(url)
-
- return ents
- else:
- raise ValueError('no hint type and missing gallery')
-
- hint = data['post_hint']
-
- if hint == 'link':
- if 'crosspost_parent_list' in data:
- return extract_media_urls(data['crosspost_parent_list'][0])
- else:
- # Not a reddit crosspost.
- return []
-
- if hint == 'hosted:video':
- return [data['secure_media']['reddit_video']['fallback_url']]
-
- if hint == 'image':
- return [data['url']]
-
- if hint == 'rich:video':
- # TODO maybe use thumbnail?
- print('got a video at', data['url'], 'but ignoring')
-
- raise ValueError('unknown hint type ' + hint)
-
- def process_post(post):
- d = post['data']
-
- media = []
- try:
- media = extract_media_urls(d)
- except Exception as e:
- print('error processing ' + d['id'] + ': ' + str(e))
-
- return {
- 'rid': d['id'],
- 'title': d['title'],
- 'username': d['author'],
- 'score': d['ups'],
- 'media': media
- }
-
- def query_tops(config):
- url = 'https://www.reddit.com/r/%s/top.json?sort=top&t=day' % config['subreddit']
- resp = requests.get(url, headers = {'User-agent': 'mastoreddit'})
- resp.raise_for_status()
- j = resp.json()
-
- posts = []
-
- for post in j['data']['children']:
- if post['kind'] != 't3':
- print('found post of kind', post['kind'])
-
-
- p = process_post(post)
-
- lt = p['title'].lower()
- if any(f in lt for f in config['skip_titles']):
- continue
-
- posts.append(p)
-
- return posts
-
- def filter_posts(config, posts, filt):
- tp = config['top_posts']
- ok = []
- filt = set(filt)
- newfilt = set()
-
- for p in posts:
- if len(ok) <= tp and p['score'] >= config['min_score']:
- if p['rid'] not in filt:
- ok.append(p)
- newfilt.add(p['rid'])
- elif p['rid'] in filt:
- newfilt.add(p['rid'])
-
- return ok, list(newfilt)
-
- def just_toot(config, msg):
- masto = make_masto(config)
- masto.toot(msg)
-
- def main():
- config = load_config()
- masto = make_masto(config)
-
- recents = load_recents()
-
- res = query_tops(config)
- okps, newrecents = filter_posts(config, res, recents)
-
- for p in okps:
- print('posting', p['rid'], '-', p['title'])
- make_post(masto, p)
-
- save_recents(newrecents)
-
- if __name__ == '__main__':
- main()
|