123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
-
- from datetime import datetime, timedelta
- import json
- import traceback
-
- import aiofiles
- import fastapi
- from fastapi import Cookie, File, Form, Request, UploadFile, WebSocket, WebSocketDisconnect
- from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, PlainTextResponse
- from fastapi.staticfiles import StaticFiles
- from fastapi.templating import Jinja2Templates
- from fastapi.middleware.cors import CORSMiddleware
-
- import reporthook
- import inventory
- import sbenv
- import searchlib
-
- MAX_SEARCH_DAYS = 180
- MAX_SHOW_DAYS = 20
-
- ################################
- # Core configuration
- ################################
-
- app = fastapi.FastAPI(docs_url=None)
-
- origins = [
- 'https://prograde.gg',
- 'http://localhost',
- 'http://localhost:5000',
- 'http://localhost:8080',
- 'http://localhost:8000'
- ]
-
- app.add_middleware(
- CORSMiddleware,
- allow_origins=origins,
- allow_credentials=True,
- allow_methods=['*'],
- allow_headers=['*']
- )
-
- app.mount('/static', StaticFiles(directory='static'), name='static')
-
- tmplts = Jinja2Templates(directory='templates') # TODO Get the path correctly.
-
- ################################
- # User-facing endpoints
- ################################
-
- @app.exception_handler(Exception)
- async def handle_exception(req: Request, exc: Exception):
- tb = traceback.format_exc()
- await reporthook.send_report(tb)
- return PlainTextResponse('error', status_code=500)
-
- @app.get('/')
- async def render_main(req: Request):
- raw_articles = await load_recent_articles()
- converted = convert_days_from_articles(raw_articles)
- num_days = calc_num_days(converted)
-
- p = {
- 'sb': {
- 'num_days': num_days,
- 'days': converted
- },
- 'notices': [
- {
- 'style': 'primary',
- 'text': 'There were so many incidents in August 2021 that news sites stopped reporting on it, so there\'s some missing data here.',
- }
- ],
- 'request': req,
- }
-
- return tmplts.TemplateResponse('main.htm', p)
-
- ################################
- # API endpoints
- ################################
-
- @app.post('/api/addarticle')
- async def handle_addarticle(req: Request):
- if not check_admin_token(req):
- return JSONResponse(status_code=403, content={'error': 'forbidden'})
-
- body = await req.json()
- await add_article(body['date'], body['desc'])
-
- return {'status': 'OK'}
-
- async def add_article(datestr, adesc):
- date = datetime.strptime(datestr, inventory.DATE_FORMAT)
-
- articles = await inventory.load_date_report_async(date)
- articles.append(adesc)
- await inventory.save_date_report_async(date, articles)
-
- ################################
- # Utilities
- ################################
-
- def check_admin_token(req: Request):
- ak = sbenv.get_admin_key()
- if ak is None:
- raise RuntimeError('checked api endpoint without key loaded')
-
- if ak == 'UNSAFE_TESTING':
- return True
-
- if 'Authorization' in req.headers:
- auth = req.headers['Authorization']
-
- if not auth.startswith('Bearer '):
- return False
-
- tok = auth[len('Bearer '):]
- return tok == sbenv.get_admin_key()
- else:
- return False
-
- async def load_days_from_file(path):
- async with aiofiles.open(path, mode='r') as f:
- contents = await f.read()
- return json.loads(contents)
-
- async def load_recent_articles():
- today = datetime.now()
- day_dur = timedelta(days=1)
- reports = {}
-
- for i in range(MAX_SEARCH_DAYS):
- that_day = today - i * day_dur
- report = await inventory.load_date_report_async(that_day)
- if len(report) > 0:
- reports[that_day.strftime(inventory.DATE_FORMAT)] = report
-
- return reports
-
- def convert_days_from_articles(rarts):
- processed = searchlib.process_results(rarts)
- output = []
-
- for dstr, arts in processed.items():
- day = {
- 'date': dstr,
- 'links': [convert_article(a) for a in arts['pass']],
- 'maybe_links': [convert_article(a) for a in arts['maybe']]
- }
-
- if len(day['links']) > 0:
- output.append(day)
-
- if len(output) > MAX_SHOW_DAYS:
- break
-
- return output
-
- def convert_article(a):
- return {
- 'url': a['url'],
- 'title': a['gtitle'],
- 'slug': a['slug'],
- }
-
- def calc_num_days(dayslist):
- today = datetime.now()
- lowest = -1
-
- for d in dayslist:
- pd = datetime.strptime(d['date'], inventory.DATE_FORMAT)
- diff = today - pd
- ndays = diff.days
- if ndays < lowest or lowest == -1:
- lowest = ndays
-
- return lowest
|