diff --git a/dockerfile b/dockerfile index 2912b26..f38daf2 100644 --- a/dockerfile +++ b/dockerfile @@ -3,4 +3,8 @@ FROM python:3.12 COPY slopserver/ requirements.txt /slopserver/ WORKDIR /slopserver/ -RUN python3 -m pip install -r requirements.txt \ No newline at end of file +RUN python3 -m pip install -r requirements.txt + +EXPOSE 8000 + +ENTRYPOINT [ "fastapi", "run", "--workers", "4", "/slopserver/server.py" ] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 3886f7c..d05de03 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,6 +28,7 @@ psycopg==3.2.10 pwdlib==0.2.1 pycparser==2.23 pydantic==2.12.3 +pydantic-settings==2.11.0 pydantic_core==2.41.4 Pygments==2.19.2 PyJWT==2.10.1 diff --git a/slopserver/server.py b/slopserver/server.py index 1ac041e..7ae468c 100644 --- a/slopserver/server.py +++ b/slopserver/server.py @@ -12,11 +12,12 @@ from datetime import datetime, timedelta import uvicorn -from fastapi import Body, Depends, FastAPI, Form, HTTPException +from fastapi import Body, Depends, FastAPI, Form, HTTPException, Header from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from fastapi.middleware.cors import CORSMiddleware from pydantic import AfterValidator, Base64Str +from pydantic_settings import BaseSettings from sqlalchemy import create_engine @@ -38,8 +39,16 @@ app = FastAPI() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") -TEMP_ENGINE = create_engine("postgresql+psycopg://slop-farmer@192.168.1.163/slop-farmer") -TEMP_SECRET = "5bcc778a96b090c3ac1d587bb694a060eaf7bdb5832365f91d5078faf1fff210" +class ServerSettings(BaseSettings): + db_url: str = "postgresql+psycopg://slop-farmer@192.168.1.163/slop-farmer" + token_secret: str = "5bcc778a96b090c3ac1d587bb694a060eaf7bdb5832365f91d5078faf1fff210" + # altcha_secret: str + +settings = ServerSettings() + + +DB_ENGINE = create_engine(settings.db_url) +TOKEN_SECRET = settings.token_secret ALGO = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 @@ -70,13 +79,33 @@ def auth_user(email: str, password: str, db_engine): return False return user +def generate_auth_token(username): + expiration = datetime.now() + timedelta(days=30) + uuid = username + bearer_token = { + "iss": "slopserver", + "exp": int(expiration.timestamp()), + "aud": "slopserver", + "sub": str(uuid), + "client_id": str(uuid), + "iat": int(datetime.now().timestamp()), + "jti": str(uuid) + } + + encoded_jwt = jwt.encode(bearer_token, TOKEN_SECRET, ALGO) + return encoded_jwt + +def verify_auth_token(token: str): + token = jwt.decode(token, TOKEN_SECRET, ALGO, verify=True) + + @app.post("/report") -async def report_slop(report: SlopReport): - insert_slop(report.slop_urls, TEMP_ENGINE) +async def report_slop(report: SlopReport, bearer: Annotated[str, AfterValidator(verify_auth_token), Header()]): + insert_slop(report.slop_urls, DB_ENGINE) @app.post("/check") -async def check_slop(check: Annotated[SlopReport, Body()]): - slop_results = select_slop(check.slop_urls, TEMP_ENGINE) +async def check_slop(check: Annotated[SlopReport, Body()], bearer: Annotated[str, AfterValidator(verify_auth_token), Header()]): + slop_results = select_slop(check.slop_urls, DB_ENGINE) return slop_results async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): @@ -84,7 +113,7 @@ async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): @app.post("/token") async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]): - user = get_user(form_data.username, TEMP_ENGINE) + user = get_user(form_data.username, DB_ENGINE) if not user: raise HTTPException(status_code=400, detail="Incorrect username or password") @@ -92,12 +121,12 @@ async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]): async def signup_form(form_data: Annotated[SignupForm, Form()]): # if we're here, form is validated including the altcha # check for existing user with the given email - if get_user(form_data.email, TEMP_ENGINE): + if get_user(form_data.email, DB_ENGINE): # user already exists raise HTTPException(status_code=409, detail="User already exists") # create user - create_user(form_data.email, get_password_hash(form_data.password), TEMP_ENGINE) + create_user(form_data.email, get_password_hash(form_data.password), DB_ENGINE) @app.get("/altcha-challenge") async def altcha_challenge(): @@ -109,6 +138,14 @@ async def altcha_challenge(): challenge = create_challenge(options) return challenge +@app.post("/login") +async def simple_login(username: Annotated[str, Form()], password: Annotated[str, Form()]): + user = auth_user(username, password, DB_ENGINE) + if not user: + raise HTTPException(status_code=400, detail="Incorrect username or password") + token = generate_auth_token(username) + return {"access_token": token, "token_type": "bearer"} + @app.post("/altcha-challenge") async def altcha_verify(payload: Annotated[Base64Str, AfterValidator(altcha_validator)]): # if verified, return a JWT for anonymous API access @@ -124,7 +161,7 @@ async def altcha_verify(payload: Annotated[Base64Str, AfterValidator(altcha_vali "jti": str(uuid) } - encoded_jwt = jwt.encode(bearer_token, TEMP_SECRET, ALGO) + encoded_jwt = jwt.encode(bearer_token, TOKEN_SECRET, ALGO) return encoded_jwt