import traceback | |||||
import fastapi | import fastapi | ||||
from fastapi import Cookie, File, Form, Request, UploadFile, WebSocket, WebSocketDisconnect | from fastapi import Cookie, File, Form, Request, UploadFile, WebSocket, WebSocketDisconnect | ||||
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse | |||||
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, PlainTextResponse | |||||
from fastapi.staticfiles import StaticFiles | from fastapi.staticfiles import StaticFiles | ||||
from fastapi.templating import Jinja2Templates | from fastapi.templating import Jinja2Templates | ||||
from fastapi.middleware.cors import CORSMiddleware | from fastapi.middleware.cors import CORSMiddleware | ||||
import reporthook | |||||
################################ | ################################ | ||||
# Core configuration | # Core configuration | ||||
################################ | ################################ | ||||
tmplts = Jinja2Templates(directory='templates') # TODO Get the path correctly. | tmplts = Jinja2Templates(directory='templates') # TODO Get the path correctly. | ||||
################################ | |||||
# Core configuration | |||||
################################ | |||||
@app.exception_handler(Exception) | |||||
async def handle_exception(req: Request, exc: Exception): | |||||
tb = traceback.format_exc() | |||||
await reporthook.send_report(tb) | |||||
return PlainTextResponse('error', status_code=500) | |||||
@app.get('/') | @app.get('/') | ||||
def render_main(req: Request): | def render_main(req: Request): | ||||
p = { | p = { |
import httpx | |||||
import sbenv | |||||
async def send_report(msg): | |||||
hook_url = sbenv.get_report_webhook() | |||||
if hook_url is None: | |||||
raise RuntimeError('webhook envvar not specified') | |||||
chunks = chunk_message(msg, wrap_chunks='```') | |||||
chunks.insert(0, '**New Report:**') | |||||
async with httpx.AsyncClient() as client: | |||||
for c in chunks: | |||||
r = await client.post(hook_url, data={'content': c}) | |||||
r.raise_for_status() | |||||
MAX_MSG_SIZE = 1950 | |||||
def chunk_message(msg, wrap_chunks=None): | |||||
lines = msg.splitlines() | |||||
startwrap = wrap_chunks + '\n' if wrap_chunks is not None else '' | |||||
endwrap = '\n' + wrap_chunks if wrap_chunks is not None else '' | |||||
msgs = [] | |||||
cur_line = startwrap | |||||
for l in lines: | |||||
if len(cur_line) + len(l) >= MAX_MSG_SIZE: | |||||
msgs.append(cur_line + endwrap) | |||||
cur_line = startwrap | |||||
cur_line += '\n' + l | |||||
msgs.append(cur_line + endwrap) | |||||
return msgs |
return cj | return cj | ||||
def get_google_abuse_token(): | def get_google_abuse_token(): | ||||
return os.getenv('SB_GOOGABUSE') | |||||
return os.getenv('SB_GOOGABUSE').trim() | |||||
def get_report_webhook(): | |||||
hu = os.getenv('SB_REPORT_WH') | |||||
if hu is not None: | |||||
hu = hu.strip() | |||||
return hu |
export SB_ADMIN_KEY=$(cat $workdir/adminkey) | export SB_ADMIN_KEY=$(cat $workdir/adminkey) | ||||
export SB_COOKIES_TXT=$workdir/cookies.txt | export SB_COOKIES_TXT=$workdir/cookies.txt | ||||
export SB_GOOGABUSE=$(cat $workdir/googabuse.txt) | export SB_GOOGABUSE=$(cat $workdir/googabuse.txt) | ||||
export SB_REPORT_WH=$(cat $workdir/webhook.txt) | |||||
source $(pipenv --venv)/bin/activate | source $(pipenv --venv)/bin/activate | ||||
uvicorn app:app --host 0.0.0.0 --port 5000 | uvicorn app:app --host 0.0.0.0 --port 5000 |