From cc8d4e9977b163ad697d4c52e3be5350b8fca9f4 Mon Sep 17 00:00:00 2001 From: Jack Case Date: Mon, 20 Oct 2025 18:25:29 +0000 Subject: [PATCH] user creation flow with altcha verification --- slopserver/common.py | 1 + slopserver/db.py | 7 +++++++ slopserver/models.py | 19 ++++++++++++++++--- slopserver/server.py | 33 ++++++++++++++++++++++++++++++--- 4 files changed, 54 insertions(+), 6 deletions(-) create mode 100644 slopserver/common.py diff --git a/slopserver/common.py b/slopserver/common.py new file mode 100644 index 0000000..3aa6250 --- /dev/null +++ b/slopserver/common.py @@ -0,0 +1 @@ +TEMP_HMAC_KEY = "0460de065912d0292df1e7422a5ed2dc362ed56d6bab64fe50b89957463061f3" \ No newline at end of file diff --git a/slopserver/db.py b/slopserver/db.py index 2fe8118..4a155c0 100644 --- a/slopserver/db.py +++ b/slopserver/db.py @@ -51,3 +51,10 @@ def get_user(email, engine): with Session(engine) as session: user = session.scalar(query) return user + +def create_user(email, password_hash, engine): + user = User(email=email, password_hash=password_hash, email_verified=False) + + with Session(engine) as session: + session.add(user) + session.commit() diff --git a/slopserver/models.py b/slopserver/models.py index 1d3594c..7a1bcbe 100644 --- a/slopserver/models.py +++ b/slopserver/models.py @@ -1,9 +1,13 @@ from typing import Annotated from sqlmodel import Field, SQLModel, create_engine, Relationship -from pydantic import AfterValidator, BaseModel +from pydantic import AfterValidator, BaseModel, EmailStr, Json + +from altcha import Payload as AltchaPayload, verify_solution from urllib.parse import urlparse, ParseResult +from slopserver.common import TEMP_HMAC_KEY + NAMING_CONVENTION = { "ix": "ix_%(column_0_label)s", "uq": "uq_%(table_name)s_%(column_0_name)s", @@ -36,7 +40,6 @@ class User(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) email: str = Field(index=True, unique=True) password_hash: str - salt: str email_verified: bool = Field(default=False) @@ -56,7 +59,17 @@ def url_validator(urls: list[str]) -> list[ParseResult]: raise ValueError(f"couldn't parse '{url}' as a URL") return parsed_urls +def altcha_validator(altcha_response: AltchaPayload): + verified = verify_solution(altcha_response, TEMP_HMAC_KEY) + if not verified[0]: + raise ValueError(f"altcha verification failed: {verified[1]}") + return None class SlopReport(BaseModel): """Accept reports of one or more slop page URLs""" - slop_urls: Annotated[list[str], AfterValidator(url_validator)] \ No newline at end of file + slop_urls: Annotated[list[str], AfterValidator(url_validator)] + +class SignupForm(BaseModel): + email: EmailStr + password: str + altcha_response: Annotated[Json, AfterValidator(altcha_validator)] \ No newline at end of file diff --git a/slopserver/server.py b/slopserver/server.py index d85cd58..9269e3c 100644 --- a/slopserver/server.py +++ b/slopserver/server.py @@ -8,18 +8,23 @@ - post report """ from typing import Annotated +from datetime import datetime, timedelta + import uvicorn -from fastapi import Depends, FastAPI, HTTPException +from fastapi import Depends, FastAPI, Form, HTTPException from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from sqlalchemy import create_engine from pwdlib import PasswordHash +from altcha import ChallengeOptions, create_challenge, verify_solution + from slopserver.models import Domain, Path, User -from slopserver.models import SlopReport -from slopserver.db import select_slop, insert_slop, get_user +from slopserver.models import SlopReport, SignupForm +from slopserver.db import select_slop, insert_slop, get_user, create_user +from slopserver.common import TEMP_HMAC_KEY app = FastAPI() @@ -30,6 +35,7 @@ TEMP_SECRET = "5bcc778a96b090c3ac1d587bb694a060eaf7bdb5832365f91d5078faf1fff210" ALGO = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 + password_hash = PasswordHash.recommended() @@ -66,5 +72,26 @@ async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]): if not user: raise HTTPException(status_code=400, detail="Incorrect username or password") +@app.post("/signup") +async def signup_form(form_data: Annotated[SignupForm, Form()]): + # if we're here, form is validated including the altcha + # check for existing user with the given email + if get_user(form_data.email, TEMP_ENGINE): + # user already exists + raise HTTPException(status_code=409, detail="User already exists") + + # create user + create_user(form_data.email, get_password_hash(form_data.password), TEMP_ENGINE) + +@app.get("/challenge") +async def altcha_challenge(): + options = ChallengeOptions( + expires=datetime.now() + timedelta(minutes=10), + max_number=100000, + hmac_key=TEMP_HMAC_KEY + ) + challenge = create_challenge(options) + return challenge + if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file