Compare commits
1 Commits
feat_email
...
dependabot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
971bdd51d2 |
@@ -14,8 +14,7 @@
|
||||
|
||||
// Features to add to the dev container. More info: https://containers.dev/features.
|
||||
"features": {
|
||||
"ghcr.io/devcontainers/features/docker-in-docker:2": {},
|
||||
"ghcr.io/warrenbuckley/codespace-features/sqlite:1": {}
|
||||
"ghcr.io/devcontainers/features/docker-in-docker:2": {}
|
||||
},
|
||||
|
||||
// Use 'forwardPorts' to make a list of ports inside the container available locally.
|
||||
|
||||
@@ -35,7 +35,6 @@ PyJWT==2.10.1
|
||||
python-dotenv==1.1.1
|
||||
python-multipart==0.0.20
|
||||
PyYAML==6.0.3
|
||||
resend==2.19.0
|
||||
rich==14.2.0
|
||||
rich-toolkit==0.15.1
|
||||
rignore==0.7.1
|
||||
@@ -44,7 +43,7 @@ shellingham==1.5.4
|
||||
sniffio==1.3.1
|
||||
SQLAlchemy==2.0.44
|
||||
sqlmodel==0.0.27
|
||||
starlette==0.48.0
|
||||
starlette==0.49.1
|
||||
typer==0.19.2
|
||||
typing-inspection==0.4.2
|
||||
typing_extensions==4.15.0
|
||||
|
||||
1
slopserver/common.py
Normal file
1
slopserver/common.py
Normal file
@@ -0,0 +1 @@
|
||||
TEMP_HMAC_KEY = "0460de065912d0292df1e7422a5ed2dc362ed56d6bab64fe50b89957463061f3"
|
||||
@@ -71,7 +71,7 @@ def insert_slop(urls: list[ParseResult], engine: Engine, user: User | None = Non
|
||||
|
||||
session.commit()
|
||||
|
||||
def get_user(email, engine) -> User:
|
||||
def get_user(email, engine):
|
||||
query = select(User).where(User.email == email)
|
||||
|
||||
with Session(engine) as session:
|
||||
@@ -84,9 +84,3 @@ def create_user(email, password_hash, engine):
|
||||
with Session(engine) as session:
|
||||
session.add(user)
|
||||
session.commit()
|
||||
|
||||
def verify_user_email(user: User, engine):
|
||||
with Session(engine) as session:
|
||||
session.add(user)
|
||||
user.email_verified = True
|
||||
session.commit()
|
||||
|
||||
@@ -1,22 +0,0 @@
|
||||
"""Send emails for account verification through Resend API"""
|
||||
from slopserver.settings import settings
|
||||
import resend
|
||||
|
||||
resend.api_key = settings.resend_token
|
||||
|
||||
def send_email(to: str, subject, content):
|
||||
params: resend.Emails.SendParams = {
|
||||
"from": "slopfarmer@jack-case.pro",
|
||||
"to": to,
|
||||
"subject": subject,
|
||||
"html": content
|
||||
}
|
||||
|
||||
email: resend.Emails.SendResponse = resend.Emails.send(params)
|
||||
|
||||
return email
|
||||
|
||||
def generate_verification_email(verification_url: str):
|
||||
return f"""
|
||||
<p>click here to verify your Slop Farmer account: <a href={verification_url}>{verification_url}</a></p>
|
||||
"""
|
||||
@@ -8,7 +8,7 @@ from altcha import Payload as AltchaPayload, verify_solution
|
||||
|
||||
from urllib.parse import urlparse, ParseResult
|
||||
|
||||
from slopserver.settings import settings
|
||||
from slopserver.common import TEMP_HMAC_KEY
|
||||
|
||||
NAMING_CONVENTION = {
|
||||
"ix": "ix_%(column_0_label)s",
|
||||
@@ -73,7 +73,8 @@ def url_validator(urls: list[str]) -> list[ParseResult]:
|
||||
return parsed_urls
|
||||
|
||||
def altcha_validator(altcha_response: AltchaPayload):
|
||||
verified = verify_solution(altcha_response, settings.altcha_secret)
|
||||
# verified = verify_solution(altcha_response, TEMP_HMAC_KEY)
|
||||
verified = (True, None)
|
||||
if not verified[0]:
|
||||
raise ValueError(f"altcha verification failed: {verified[1]}")
|
||||
return None
|
||||
@@ -85,4 +86,4 @@ class SlopReport(BaseModel):
|
||||
class SignupForm(BaseModel):
|
||||
email: EmailStr
|
||||
password: SecretStr
|
||||
altcha: Annotated[str, AfterValidator(altcha_validator)]
|
||||
altcha: Annotated[Base64Str, AfterValidator(altcha_validator)]
|
||||
@@ -15,10 +15,9 @@ import uvicorn
|
||||
from fastapi import Body, Depends, FastAPI, Form, HTTPException, Header
|
||||
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import HTMLResponse
|
||||
|
||||
from pydantic import AfterValidator, Base64Str
|
||||
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
|
||||
@@ -30,15 +29,23 @@ import jwt
|
||||
|
||||
from uuid import uuid4
|
||||
|
||||
from slopserver.settings import settings
|
||||
|
||||
from slopserver.models import Domain, Path, User
|
||||
from slopserver.models import SlopReport, SignupForm, altcha_validator
|
||||
from slopserver.db import select_slop, insert_slop, get_user, create_user, verify_user_email
|
||||
from slopserver.db import select_slop, insert_slop, get_user, create_user
|
||||
from slopserver.common import TEMP_HMAC_KEY
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
||||
|
||||
class ServerSettings(BaseSettings):
|
||||
db_url: str = "sqlite+pysqlite:///test_db.sqlite"
|
||||
token_secret: str = "5bcc778a96b090c3ac1d587bb694a060eaf7bdb5832365f91d5078faf1fff210"
|
||||
# altcha_secret: str
|
||||
|
||||
settings = ServerSettings()
|
||||
|
||||
|
||||
DB_ENGINE = create_engine(settings.db_url)
|
||||
TOKEN_SECRET = settings.token_secret
|
||||
@@ -88,17 +95,6 @@ def generate_auth_token(username):
|
||||
encoded_jwt = jwt.encode(bearer_token, TOKEN_SECRET, ALGO)
|
||||
return encoded_jwt
|
||||
|
||||
def generate_verification_token(username):
|
||||
expiration = datetime.now() + timedelta(days=2)
|
||||
verification_token = {
|
||||
"iss": "slopserver",
|
||||
"exp": int(expiration.timestamp()),
|
||||
"sub": username
|
||||
}
|
||||
|
||||
encoded_jwt = jwt.encode(verification_token, TOKEN_SECRET, ALGO)
|
||||
return encoded_jwt
|
||||
|
||||
def get_token_user(decoded_token):
|
||||
user = get_user(decoded_token["sub"], DB_ENGINE)
|
||||
return user
|
||||
@@ -110,20 +106,13 @@ def verify_auth_token(token: str):
|
||||
except:
|
||||
raise HTTPException(status_code=401, detail="invalid access token")
|
||||
|
||||
def verify_verification_token(token: str):
|
||||
try:
|
||||
token = jwt.decode(token, TOKEN_SECRET, ALGO)
|
||||
return token
|
||||
except:
|
||||
raise HTTPException(status_code=404, detail="invalid verification URL")
|
||||
|
||||
@app.post("/report")
|
||||
def report_slop(report: SlopReport, bearer: Annotated[str, AfterValidator(verify_auth_token), Header()]):
|
||||
async def report_slop(report: SlopReport, bearer: Annotated[str, AfterValidator(verify_auth_token), Header()]):
|
||||
user = get_token_user(bearer)
|
||||
insert_slop(report.slop_urls, DB_ENGINE, user)
|
||||
|
||||
@app.post("/check")
|
||||
def check_slop(check: Annotated[SlopReport, Body()], bearer: Annotated[str, AfterValidator(verify_auth_token), Header()]):
|
||||
async def check_slop(check: Annotated[SlopReport, Body()], bearer: Annotated[str, AfterValidator(verify_auth_token), Header()]):
|
||||
slop_results = select_slop(check.slop_urls, DB_ENGINE)
|
||||
return slop_results
|
||||
|
||||
@@ -131,13 +120,13 @@ async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
|
||||
pass
|
||||
|
||||
@app.post("/token")
|
||||
def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
|
||||
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
|
||||
user = get_user(form_data.username, DB_ENGINE)
|
||||
if not user:
|
||||
raise HTTPException(status_code=400, detail="Incorrect username or password")
|
||||
|
||||
@app.post("/signup")
|
||||
def signup_form(form_data: Annotated[SignupForm, Form()]):
|
||||
async def signup_form(form_data: Annotated[SignupForm, Form()]):
|
||||
# if we're here, form is validated including the altcha
|
||||
# check for existing user with the given email
|
||||
if get_user(form_data.email, DB_ENGINE):
|
||||
@@ -145,54 +134,46 @@ def signup_form(form_data: Annotated[SignupForm, Form()]):
|
||||
raise HTTPException(status_code=409, detail="User already exists")
|
||||
|
||||
# create user
|
||||
create_user(form_data.email, get_password_hash(form_data.password.get_secret_value()), DB_ENGINE)
|
||||
|
||||
# send verification email
|
||||
# create a jwt encoding the username and a time limit to be the verification URL
|
||||
token = generate_verification_token(form_data.email)
|
||||
return token
|
||||
|
||||
|
||||
|
||||
@app.get("/verify")
|
||||
def verify_email(token: Annotated[str, AfterValidator(verify_verification_token)]):
|
||||
user = get_user(token["sub"], DB_ENGINE)
|
||||
if not user:
|
||||
raise HTTPException(status_code=404, detail="invalid verification URL")
|
||||
if user.email_verified:
|
||||
raise HTTPException(status_code=404, detail="already verified")
|
||||
|
||||
verify_user_email(user, DB_ENGINE)
|
||||
html = f"""
|
||||
<html>
|
||||
<head>
|
||||
</head>
|
||||
<body>
|
||||
<p>{token["sub"]} verified. You may log in now.</p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
return HTMLResponse(content=html, status_code=200)
|
||||
|
||||
create_user(form_data.email, get_password_hash(form_data.password), DB_ENGINE)
|
||||
|
||||
@app.get("/altcha-challenge")
|
||||
def altcha_challenge():
|
||||
async def altcha_challenge():
|
||||
options = ChallengeOptions(
|
||||
expires=datetime.now() + timedelta(minutes=10),
|
||||
max_number=80000,
|
||||
hmac_key=settings.altcha_secret
|
||||
max_number=100000,
|
||||
hmac_key=TEMP_HMAC_KEY
|
||||
)
|
||||
challenge = create_challenge(options)
|
||||
return challenge
|
||||
|
||||
@app.post("/login")
|
||||
def simple_login(username: Annotated[str, Form()], password: Annotated[str, Form()]):
|
||||
async def simple_login(username: Annotated[str, Form()], password: Annotated[str, Form()]):
|
||||
user = auth_user(username, password, DB_ENGINE)
|
||||
if not user:
|
||||
raise HTTPException(status_code=401, detail="Incorrect username or password")
|
||||
token = generate_auth_token(username)
|
||||
return {"access_token": token, "token_type": "bearer"}
|
||||
|
||||
@app.post("/altcha-challenge")
|
||||
async def altcha_verify(payload: Annotated[Base64Str, AfterValidator(altcha_validator)]):
|
||||
# if verified, return a JWT for anonymous API access
|
||||
expiration = datetime.now() + timedelta(days=30)
|
||||
uuid = uuid4()
|
||||
bearer_token = {
|
||||
"iss": "slopserver",
|
||||
"exp": int(expiration.timestamp()),
|
||||
"aud": "slopserver",
|
||||
"sub": str(uuid),
|
||||
"client_id": str(uuid),
|
||||
"iat": int(datetime.now().timestamp()),
|
||||
"jti": str(uuid)
|
||||
}
|
||||
|
||||
encoded_jwt = jwt.encode(bearer_token, TOKEN_SECRET, ALGO)
|
||||
|
||||
return encoded_jwt
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
@@ -1,12 +0,0 @@
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
|
||||
class ServerSettings(BaseSettings):
|
||||
db_url: str = "sqlite+pysqlite:///test_db.sqlite"
|
||||
token_secret: str = "5bcc778a96b090c3ac1d587bb694a060eaf7bdb5832365f91d5078faf1fff210"
|
||||
altcha_secret: str = "0460de065912d0292df1e7422a5ed2dc362ed56d6bab64fe50b89957463061f3"
|
||||
resend_token: str = "re_NXpjzbqR_KgAbu72PKjYHcquX24WvnN3i"
|
||||
|
||||
sender_email: str = "slopfarmer@jack-case.pro"
|
||||
|
||||
settings = ServerSettings()
|
||||
Reference in New Issue
Block a user