from datetime import datetime, timedelta import json import traceback import urllib import aiofiles import fastapi from fastapi import Cookie, File, Form, Request, UploadFile, WebSocket, WebSocketDisconnect from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, PlainTextResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi.middleware.cors import CORSMiddleware import reporthook import inventory import sbenv import searchlib MAX_SEARCH_DAYS = 180 MAX_SHOW_DAYS = 20 REPORT_HORIZON = 180 MAX_USER_REPORTS_PER_DAY = 3 ################################ # Core configuration ################################ app = fastapi.FastAPI(docs_url=None) origins = [ 'https://prograde.gg', 'http://localhost', 'http://localhost:5000', 'http://localhost:8080', 'http://localhost:8000' ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=['*'], allow_headers=['*'] ) app.mount('/static', StaticFiles(directory='static'), name='static') tmplts = Jinja2Templates(directory='templates') # TODO Get the path correctly. ################################ # User-facing endpoints ################################ @app.exception_handler(Exception) async def handle_exception(req: Request, exc: Exception): tb = traceback.format_exc() await reporthook.send_report(tb) return PlainTextResponse('error', status_code=500) @app.get('/') async def render_main(req: Request): raw_articles = await load_recent_articles() converted = convert_days_from_articles(raw_articles) num_days = calc_num_days(converted) p = { 'sb': { 'num_days': num_days, 'days': converted }, 'notices': [ { 'style': 'primary', 'text': 'There were so many incidents in August 2021 that news sites stopped reporting on it, so there\'s some missing data here.', } ], 'request': req, } return tmplts.TemplateResponse('main.htm', p) @app.post('/action/flag') async def handle_flag(req: Request, date: str = Form(...), article: str = Form(...)): xff = req.headers['X-Forwarded-For'] if 'X-Forwarded-For' in req.headers else None ipaddr = xff if xff is not None else req.client.host try: today = datetime.now() pdate = datetime.strptime(date, inventory.DATE_FORMAT) if pdate > today or (today - pdate).days > REPORT_HORIZON: raise ValueError('bad date') except Exception as e: return JSONResponse({'status': 'error'}, status_code=400) flags = await inventory.load_date_flags_async(pdate) # Make sure it's not a duplicate and limit the number of reports nreporter = 0 for e in flags: if e['src'] == ipaddr: if e['url'] == article: return {'status': 'OK'} nreporter += 1 if nreporter + 1 >= MAX_USER_REPORTS_PER_DAY: print('user', ipaddr, 'looking sussy') await reporthook.send_report('address %s made more reports for %s than allowed' % (ipaddr, date)) return JSONResponse({'status': 'error'}, status_code=429) await reporthook.send_report('address %s reported url %s' % (ipaddr, article)) flags.append({ 'src': ipaddr, 'url': article, }) await inventory.save_date_flags_async(pdate, flags) return make_html_redirect_response('/') @app.post('/action/submit') async def handle_submit(req: Request, article: str = Form(...)): ipaddr = req.client.host today_str = datetime.now().strftime(inventory.DATE_FORMAT) fetched_art = searchlib.fetch_article(article) if fetched_art is None: return make_html_redirect_response('/') eff_date = fetched_art['nd'] if 'nd' in fetched_art else today_str # Now process it so we can tell that it's a definite match. proced_art = searchlib.process_day_results(eff_date, [fetched_art]) print(proced_art) if len(proced_art['pass']) == 0: return make_html_redirect_response('/') # If it all looks good then store it and report it. fetched_art['srcip'] = ipaddr await add_article(eff_date, fetched_art) await reporthook.send_report('address %s submitted good-looking article %s' % (ipaddr, article)) return make_html_redirect_response('/') ################################ # API endpoints ################################ @app.post('/api/addarticle') async def handle_addarticle(req: Request): if not check_admin_token(req): return JSONResponse(status_code=403, content={'error': 'forbidden'}) body = await req.json() await add_article(body['date'], body['desc']) return {'status': 'OK'} async def add_article(datestr, adesc): date = datetime.strptime(datestr, inventory.DATE_FORMAT) articles = await inventory.load_date_report_async(date) articles.append(adesc) await inventory.save_date_report_async(date, articles) ################################ # Utilities ################################ def check_admin_token(req: Request): ak = sbenv.get_admin_key() if ak is None: raise RuntimeError('checked api endpoint without key loaded') if ak == 'UNSAFE_TESTING': return True if 'Authorization' in req.headers: auth = req.headers['Authorization'] if not auth.startswith('Bearer '): return False tok = auth[len('Bearer '):] return tok == sbenv.get_admin_key() else: return False async def load_days_from_file(path): async with aiofiles.open(path, mode='r') as f: contents = await f.read() return json.loads(contents) async def load_recent_articles(): today = datetime.now() day_dur = timedelta(days=1) reports = {} for i in range(MAX_SEARCH_DAYS): that_day = today - i * day_dur report = await inventory.load_date_report_async(that_day) flags = await inventory.load_date_flags_async(that_day) if len(report) > 0: reports[that_day.strftime(inventory.DATE_FORMAT)] = { 'articles': report, 'flags': flags, } return reports def convert_days_from_articles(days): output = [] for dstr, parts in days.items(): dr = searchlib.process_day_results(dstr, parts['articles']) flags = {e['url'] for e in parts['flags']} day = { 'date': dstr, 'links': [], 'maybe_links': [] } # Process hard passes. for a in dr['pass']: ca = convert_article(a) if a['url'] not in flags: day['links'].append(ca) else: day['maybe_links'].append(ca) # Process weak articles. for a in dr['maybe']: ca = convert_article(a) if a['url'] not in flags: day['maybe_links'].append(ca) if len(day['links']) > 0: output.append(day) if len(output) > MAX_SHOW_DAYS: break return output def convert_article(a): u = a['url'] uu = urllib.parse.urlparse(u) return { 'url': u, 'title': a['gtitle'], 'slug': a['slug'], 'domain': uu.netloc, } def calc_num_days(dayslist): today = datetime.now() lowest = -1 for d in dayslist: pd = datetime.strptime(d['date'], inventory.DATE_FORMAT) diff = today - pd ndays = diff.days if ndays < lowest or lowest == -1: lowest = ndays return lowest def make_html_redirect_response(url): return HTMLResponse('')