15 Commits
v0.5 ... v0.10

Author SHA1 Message Date
Jack Case
a36b6e9865 actually send the email 2025-11-15 16:04:45 +00:00
Jack Case
20fffc85c1 check if email is verified when authorizing 2025-11-15 15:53:02 +00:00
Jack Case
6724a903eb email verification responses 2025-11-15 15:45:05 +00:00
Jack Case
cfd54eca5e working on verification email sending 2025-11-15 14:04:03 +00:00
Jack Case
f5f8b5c873 change async path functions from async so that fastapi handles concurrency for non-async apis 2025-11-15 14:02:53 +00:00
Jack Case
f125fc1807 add resend to requirements and create email.py 2025-11-15 13:24:26 +00:00
Jack Case
f24f3ceee4 remove Base64Str type from altcha
This was decoding the string, which the altcha validator expects to do itself. Now it works.
2025-11-09 18:51:22 +00:00
Jack Case
49377f2e20 add a setting for Altcha secret key 2025-11-09 18:35:16 +00:00
Jack Case
e8189ed832 make the altcha challenge a smidge easier 2025-11-09 18:09:29 +00:00
Jack Case
c0600f527f handle password field of signup form properly, and verify altcha solution 2025-11-09 18:05:44 +00:00
Jack Case
ed6065a6ea add sqlite feature to devcontainer for test DB 2025-11-09 18:05:04 +00:00
Jack Case
ea843597aa updates 2025-10-30 01:55:18 +00:00
Jack Case
b35c92b96f ran tests against a local sqlite DB
reports appear to work properly now, will run prod DB migrations and upgrade the server container when I'm home.
2025-10-29 15:30:02 +00:00
Jack Case
e420a31127 new pass at properly handling related report models 2025-10-29 14:33:02 +00:00
Jack Case
9c07870cb7 properly specify the Report model with relationships to User and Path 2025-10-29 13:50:52 +00:00
9 changed files with 150 additions and 60 deletions

View File

@@ -14,7 +14,8 @@
// Features to add to the dev container. More info: https://containers.dev/features.
"features": {
"ghcr.io/devcontainers/features/docker-in-docker:2": {}
"ghcr.io/devcontainers/features/docker-in-docker:2": {},
"ghcr.io/warrenbuckley/codespace-features/sqlite:1": {}
},
// Use 'forwardPorts' to make a list of ports inside the container available locally.
@@ -22,7 +23,7 @@
// Use 'postCreateCommand' to run commands after the container is created.
// "postCreateCommand": "pip3 install --user -r requirements.txt",
"postCreateCommand": ". ./env/bin/activate; python -m pip install -r requirements.txt"
"postCreateCommand": "python -m venv env && . ./env/bin/activate && python -m pip install -r requirements.txt"
// Configure tool-specific properties.
// "customizations": {},

7
.dockerignore Normal file
View File

@@ -0,0 +1,7 @@
.devcontainer/
.vscode/
test_db.sqlite
env/
**/__pycache__/
slopserver/alembic/

View File

@@ -35,6 +35,7 @@ PyJWT==2.10.1
python-dotenv==1.1.1
python-multipart==0.0.20
PyYAML==6.0.3
resend==2.19.0
rich==14.2.0
rich-toolkit==0.15.1
rignore==0.7.1

View File

@@ -1 +0,0 @@
TEMP_HMAC_KEY = "0460de065912d0292df1e7422a5ed2dc362ed56d6bab64fe50b89957463061f3"

View File

@@ -34,31 +34,44 @@ def insert_slop(urls: list[ParseResult], engine: Engine, user: User | None = Non
if not domain in existing_dict:
# create a new domain object and paths
new_domain = Domain(domain_name=domain, paths=list())
new_domain.paths = [Path(path=path) for path in paths]
session.add(new_domain)
new_paths = list()
for path in paths:
new_path = Path(path=path)
if user:
for path in new_domain.paths:
new_report = Report(path_id=path.id, user_id=user.id)
session.add(new_report)
reports = list()
reports.append(Report(path=new_path, user=user, timestamp=datetime.now()))
new_path.reports = reports
new_paths.append(new_path)
new_domain.paths = new_paths
session.add(new_domain)
else:
existing_domain = existing_dict[domain]
existing_paths = set((path.path for path in existing_domain.paths))
existing_paths = dict({path.path: path for path in existing_domain.paths})
for path in paths:
if not path in existing_paths:
new_path = Path(path=path)
if user:
report_list = list()
report_list.append(Report(path=new_path, user=user, timestamp=datetime.now()))
new_path.reports = report_list
existing_domain.paths.append(new_path)
session.add(new_path)
session.flush([new_path])
session.refresh(new_path)
else:
# domain and path exist, append to the path's reports
if user:
new_report = Report(
path_id=new_path.id, user_id=user.id, timestamp=datetime.now())
session.add(new_report)
existing_path = existing_paths.get(path)
report_dict = {(report.user.id, report.path.id): report for report in existing_path.reports}
existing_report = report_dict.get((user.id, existing_path.id))
if existing_report:
existing_report.timestamp = datetime.now()
else:
existing_path.reports.append(Report(path=existing_path, user=user, timestamp=datetime.now()))
session.commit()
def get_user(email, engine):
def get_user(email, engine) -> User:
query = select(User).where(User.email == email)
with Session(engine) as session:
@@ -71,3 +84,9 @@ def create_user(email, password_hash, engine):
with Session(engine) as session:
session.add(user)
session.commit()
def verify_user_email(user: User, engine):
with Session(engine) as session:
session.add(user)
user.email_verified = True
session.commit()

22
slopserver/email.py Normal file
View File

@@ -0,0 +1,22 @@
"""Send emails for account verification through Resend API"""
from slopserver.settings import settings
import resend
resend.api_key = settings.resend_token
def send_email(to: str, subject, content):
params: resend.Emails.SendParams = {
"from": "slopfarmer@jack-case.pro",
"to": to,
"subject": subject,
"html": content
}
email: resend.Emails.SendResponse = resend.Emails.send(params)
return email
def generate_verification_email(verification_url: str):
return f"""
<p>click here to verify your Slop Farmer account: <a href={verification_url}>{verification_url}</a></p>
"""

View File

@@ -8,7 +8,7 @@ from altcha import Payload as AltchaPayload, verify_solution
from urllib.parse import urlparse, ParseResult
from slopserver.common import TEMP_HMAC_KEY
from slopserver.settings import settings
NAMING_CONVENTION = {
"ix": "ix_%(column_0_label)s",
@@ -37,6 +37,7 @@ class Path(SQLModel, table=True):
domain_id: int | None = Field(foreign_key="domain.id")
domain: Domain = Relationship(back_populates="paths")
reports: list["Report"] = Relationship(back_populates="path")
class User(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
@@ -45,11 +46,16 @@ class User(SQLModel, table=True):
email_verified: bool = Field(default=False)
reports: list["Report"] = Relationship(back_populates="user")
class Report(SQLModel, table=True):
path_id: int | None = Field(default=None, primary_key=True, foreign_key="path.id")
user_id: int | None = Field(default=None, primary_key=True, foreign_key="user.id")
timestamp: datetime | None = Field(default=datetime.now())
path: Path = Relationship(back_populates="reports")
user: User = Relationship(back_populates="reports")
################################################
# API Models
################################################
@@ -67,8 +73,7 @@ def url_validator(urls: list[str]) -> list[ParseResult]:
return parsed_urls
def altcha_validator(altcha_response: AltchaPayload):
# verified = verify_solution(altcha_response, TEMP_HMAC_KEY)
verified = (True, None)
verified = verify_solution(altcha_response, settings.altcha_secret)
if not verified[0]:
raise ValueError(f"altcha verification failed: {verified[1]}")
return None
@@ -80,4 +85,4 @@ class SlopReport(BaseModel):
class SignupForm(BaseModel):
email: EmailStr
password: SecretStr
altcha: Annotated[Base64Str, AfterValidator(altcha_validator)]
altcha: Annotated[str, AfterValidator(altcha_validator)]

View File

@@ -15,9 +15,10 @@ import uvicorn
from fastapi import Body, Depends, FastAPI, Form, HTTPException, Header
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
from pydantic import AfterValidator, Base64Str
from pydantic_settings import BaseSettings
from sqlalchemy import create_engine
@@ -29,23 +30,16 @@ import jwt
from uuid import uuid4
from slopserver.settings import settings
from slopserver.models import Domain, Path, User
from slopserver.models import SlopReport, SignupForm, altcha_validator
from slopserver.db import select_slop, insert_slop, get_user, create_user
from slopserver.common import TEMP_HMAC_KEY
from slopserver.db import select_slop, insert_slop, get_user, create_user, verify_user_email
from slopserver.email import generate_verification_email, send_email
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class ServerSettings(BaseSettings):
db_url: str = "postgresql+psycopg2://slop-farmer@192.168.1.163/slop-farmer"
token_secret: str = "5bcc778a96b090c3ac1d587bb694a060eaf7bdb5832365f91d5078faf1fff210"
# altcha_secret: str
settings = ServerSettings()
DB_ENGINE = create_engine(settings.db_url)
TOKEN_SECRET = settings.token_secret
@@ -95,6 +89,17 @@ def generate_auth_token(username):
encoded_jwt = jwt.encode(bearer_token, TOKEN_SECRET, ALGO)
return encoded_jwt
def generate_verification_token(username):
expiration = datetime.now() + timedelta(days=2)
verification_token = {
"iss": "slopserver",
"exp": int(expiration.timestamp()),
"sub": username
}
encoded_jwt = jwt.encode(verification_token, TOKEN_SECRET, ALGO)
return encoded_jwt
def get_token_user(decoded_token):
user = get_user(decoded_token["sub"], DB_ENGINE)
return user
@@ -106,13 +111,20 @@ def verify_auth_token(token: str):
except:
raise HTTPException(status_code=401, detail="invalid access token")
def verify_verification_token(token: str):
try:
token = jwt.decode(token, TOKEN_SECRET, ALGO)
return token
except:
raise HTTPException(status_code=404, detail="invalid verification URL")
@app.post("/report")
async def report_slop(report: SlopReport, bearer: Annotated[str, AfterValidator(verify_auth_token), Header()]):
def report_slop(report: SlopReport, bearer: Annotated[str, AfterValidator(verify_auth_token), Header()]):
user = get_token_user(bearer)
insert_slop(report.slop_urls, DB_ENGINE, user)
@app.post("/check")
async def check_slop(check: Annotated[SlopReport, Body()], bearer: Annotated[str, AfterValidator(verify_auth_token), Header()]):
def check_slop(check: Annotated[SlopReport, Body()], bearer: Annotated[str, AfterValidator(verify_auth_token), Header()]):
slop_results = select_slop(check.slop_urls, DB_ENGINE)
return slop_results
@@ -120,13 +132,13 @@ async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
pass
@app.post("/token")
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
user = get_user(form_data.username, DB_ENGINE)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
@app.post("/signup")
async def signup_form(form_data: Annotated[SignupForm, Form()]):
def signup_form(form_data: Annotated[SignupForm, Form()]):
# if we're here, form is validated including the altcha
# check for existing user with the given email
if get_user(form_data.email, DB_ENGINE):
@@ -134,46 +146,57 @@ async def signup_form(form_data: Annotated[SignupForm, Form()]):
raise HTTPException(status_code=409, detail="User already exists")
# create user
create_user(form_data.email, get_password_hash(form_data.password), DB_ENGINE)
create_user(form_data.email, get_password_hash(form_data.password.get_secret_value()), DB_ENGINE)
# send verification email
# create a jwt encoding the username and a time limit to be the verification URL
token = generate_verification_token(form_data.email)
email_html = generate_verification_email(settings.api_base + "verify/?token=" + token)
status = send_email(form_data.email, "Slop Farmer Email Verification", email_html)
return status
@app.get("/verify")
def verify_email(token: Annotated[str, AfterValidator(verify_verification_token)]):
user = get_user(token["sub"], DB_ENGINE)
if not user:
raise HTTPException(status_code=404, detail="invalid verification URL")
if user.email_verified:
raise HTTPException(status_code=404, detail="already verified")
verify_user_email(user, DB_ENGINE)
html = f"""
<html>
<head>
</head>
<body>
<p>{token["sub"]} verified. You may log in now.</p>
</body>
</html>
"""
return HTMLResponse(content=html, status_code=200)
@app.get("/altcha-challenge")
async def altcha_challenge():
def altcha_challenge():
options = ChallengeOptions(
expires=datetime.now() + timedelta(minutes=10),
max_number=100000,
hmac_key=TEMP_HMAC_KEY
max_number=80000,
hmac_key=settings.altcha_secret
)
challenge = create_challenge(options)
return challenge
@app.post("/login")
async def simple_login(username: Annotated[str, Form()], password: Annotated[str, Form()]):
def simple_login(username: Annotated[str, Form()], password: Annotated[str, Form()]):
user = auth_user(username, password, DB_ENGINE)
if not user:
raise HTTPException(status_code=401, detail="Incorrect username or password")
if not user.email_verified:
raise HTTPException(status_code=401, detail="Unverified email address")
token = generate_auth_token(username)
return {"access_token": token, "token_type": "bearer"}
@app.post("/altcha-challenge")
async def altcha_verify(payload: Annotated[Base64Str, AfterValidator(altcha_validator)]):
# if verified, return a JWT for anonymous API access
expiration = datetime.now() + timedelta(days=30)
uuid = uuid4()
bearer_token = {
"iss": "slopserver",
"exp": int(expiration.timestamp()),
"aud": "slopserver",
"sub": str(uuid),
"client_id": str(uuid),
"iat": int(datetime.now().timestamp()),
"jti": str(uuid)
}
encoded_jwt = jwt.encode(bearer_token, TOKEN_SECRET, ALGO)
return encoded_jwt
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

13
slopserver/settings.py Normal file
View File

@@ -0,0 +1,13 @@
from pydantic_settings import BaseSettings
class ServerSettings(BaseSettings):
db_url: str = "sqlite+pysqlite:///test_db.sqlite"
token_secret: str = "5bcc778a96b090c3ac1d587bb694a060eaf7bdb5832365f91d5078faf1fff210"
altcha_secret: str = "0460de065912d0292df1e7422a5ed2dc362ed56d6bab64fe50b89957463061f3"
resend_token: str = "re_NXpjzbqR_KgAbu72PKjYHcquX24WvnN3i"
sender_email: str = "slopfarmer@jack-case.pro"
api_base: str = "api.slopfarmer.jack-case.pro/"
settings = ServerSettings()