user creation flow with altcha verification
This commit is contained in:
1
slopserver/common.py
Normal file
1
slopserver/common.py
Normal file
@@ -0,0 +1 @@
|
||||
TEMP_HMAC_KEY = "0460de065912d0292df1e7422a5ed2dc362ed56d6bab64fe50b89957463061f3"
|
||||
@@ -51,3 +51,10 @@ def get_user(email, engine):
|
||||
with Session(engine) as session:
|
||||
user = session.scalar(query)
|
||||
return user
|
||||
|
||||
def create_user(email, password_hash, engine):
|
||||
user = User(email=email, password_hash=password_hash, email_verified=False)
|
||||
|
||||
with Session(engine) as session:
|
||||
session.add(user)
|
||||
session.commit()
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
from typing import Annotated
|
||||
from sqlmodel import Field, SQLModel, create_engine, Relationship
|
||||
from pydantic import AfterValidator, BaseModel
|
||||
from pydantic import AfterValidator, BaseModel, EmailStr, Json
|
||||
|
||||
from altcha import Payload as AltchaPayload, verify_solution
|
||||
|
||||
from urllib.parse import urlparse, ParseResult
|
||||
|
||||
from slopserver.common import TEMP_HMAC_KEY
|
||||
|
||||
NAMING_CONVENTION = {
|
||||
"ix": "ix_%(column_0_label)s",
|
||||
"uq": "uq_%(table_name)s_%(column_0_name)s",
|
||||
@@ -36,7 +40,6 @@ class User(SQLModel, table=True):
|
||||
id: int | None = Field(default=None, primary_key=True)
|
||||
email: str = Field(index=True, unique=True)
|
||||
password_hash: str
|
||||
salt: str
|
||||
|
||||
email_verified: bool = Field(default=False)
|
||||
|
||||
@@ -56,7 +59,17 @@ def url_validator(urls: list[str]) -> list[ParseResult]:
|
||||
raise ValueError(f"couldn't parse '{url}' as a URL")
|
||||
return parsed_urls
|
||||
|
||||
def altcha_validator(altcha_response: AltchaPayload):
|
||||
verified = verify_solution(altcha_response, TEMP_HMAC_KEY)
|
||||
if not verified[0]:
|
||||
raise ValueError(f"altcha verification failed: {verified[1]}")
|
||||
return None
|
||||
|
||||
class SlopReport(BaseModel):
|
||||
"""Accept reports of one or more slop page URLs"""
|
||||
slop_urls: Annotated[list[str], AfterValidator(url_validator)]
|
||||
slop_urls: Annotated[list[str], AfterValidator(url_validator)]
|
||||
|
||||
class SignupForm(BaseModel):
|
||||
email: EmailStr
|
||||
password: str
|
||||
altcha_response: Annotated[Json, AfterValidator(altcha_validator)]
|
||||
@@ -8,18 +8,23 @@
|
||||
- post report
|
||||
"""
|
||||
from typing import Annotated
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import uvicorn
|
||||
|
||||
from fastapi import Depends, FastAPI, HTTPException
|
||||
from fastapi import Depends, FastAPI, Form, HTTPException
|
||||
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
|
||||
from pwdlib import PasswordHash
|
||||
|
||||
from altcha import ChallengeOptions, create_challenge, verify_solution
|
||||
|
||||
from slopserver.models import Domain, Path, User
|
||||
from slopserver.models import SlopReport
|
||||
from slopserver.db import select_slop, insert_slop, get_user
|
||||
from slopserver.models import SlopReport, SignupForm
|
||||
from slopserver.db import select_slop, insert_slop, get_user, create_user
|
||||
from slopserver.common import TEMP_HMAC_KEY
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
@@ -30,6 +35,7 @@ TEMP_SECRET = "5bcc778a96b090c3ac1d587bb694a060eaf7bdb5832365f91d5078faf1fff210"
|
||||
ALGO = "HS256"
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES = 30
|
||||
|
||||
|
||||
password_hash = PasswordHash.recommended()
|
||||
|
||||
|
||||
@@ -66,5 +72,26 @@ async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
|
||||
if not user:
|
||||
raise HTTPException(status_code=400, detail="Incorrect username or password")
|
||||
|
||||
@app.post("/signup")
|
||||
async def signup_form(form_data: Annotated[SignupForm, Form()]):
|
||||
# if we're here, form is validated including the altcha
|
||||
# check for existing user with the given email
|
||||
if get_user(form_data.email, TEMP_ENGINE):
|
||||
# user already exists
|
||||
raise HTTPException(status_code=409, detail="User already exists")
|
||||
|
||||
# create user
|
||||
create_user(form_data.email, get_password_hash(form_data.password), TEMP_ENGINE)
|
||||
|
||||
@app.get("/challenge")
|
||||
async def altcha_challenge():
|
||||
options = ChallengeOptions(
|
||||
expires=datetime.now() + timedelta(minutes=10),
|
||||
max_number=100000,
|
||||
hmac_key=TEMP_HMAC_KEY
|
||||
)
|
||||
challenge = create_challenge(options)
|
||||
return challenge
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
Reference in New Issue
Block a user