Compare commits
20 Commits
v0.5
...
feat_domai
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cbabd5da3c | ||
|
|
283e9c6b7f | ||
|
|
1cb9ae8606 | ||
|
|
6a1837762e | ||
|
|
fcc5d9d7bc | ||
|
|
a36b6e9865 | ||
|
|
20fffc85c1 | ||
|
|
6724a903eb | ||
|
|
cfd54eca5e | ||
|
|
f5f8b5c873 | ||
|
|
f125fc1807 | ||
|
|
f24f3ceee4 | ||
|
|
49377f2e20 | ||
|
|
e8189ed832 | ||
|
|
c0600f527f | ||
|
|
ed6065a6ea | ||
|
|
ea843597aa | ||
|
|
b35c92b96f | ||
|
|
e420a31127 | ||
|
|
9c07870cb7 |
@@ -14,7 +14,8 @@
|
|||||||
|
|
||||||
// Features to add to the dev container. More info: https://containers.dev/features.
|
// Features to add to the dev container. More info: https://containers.dev/features.
|
||||||
"features": {
|
"features": {
|
||||||
"ghcr.io/devcontainers/features/docker-in-docker:2": {}
|
"ghcr.io/devcontainers/features/docker-in-docker:2": {},
|
||||||
|
"ghcr.io/warrenbuckley/codespace-features/sqlite:1": {}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Use 'forwardPorts' to make a list of ports inside the container available locally.
|
// Use 'forwardPorts' to make a list of ports inside the container available locally.
|
||||||
@@ -22,7 +23,7 @@
|
|||||||
|
|
||||||
// Use 'postCreateCommand' to run commands after the container is created.
|
// Use 'postCreateCommand' to run commands after the container is created.
|
||||||
// "postCreateCommand": "pip3 install --user -r requirements.txt",
|
// "postCreateCommand": "pip3 install --user -r requirements.txt",
|
||||||
"postCreateCommand": ". ./env/bin/activate; python -m pip install -r requirements.txt"
|
"postCreateCommand": "python -m venv env && . ./env/bin/activate && python -m pip install -r requirements.txt"
|
||||||
|
|
||||||
// Configure tool-specific properties.
|
// Configure tool-specific properties.
|
||||||
// "customizations": {},
|
// "customizations": {},
|
||||||
|
|||||||
7
.dockerignore
Normal file
7
.dockerignore
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
.devcontainer/
|
||||||
|
.vscode/
|
||||||
|
|
||||||
|
test_db.sqlite
|
||||||
|
env/
|
||||||
|
**/__pycache__/
|
||||||
|
slopserver/alembic/
|
||||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -214,4 +214,7 @@ __marimo__/
|
|||||||
|
|
||||||
# Streamlit
|
# Streamlit
|
||||||
.streamlit/secrets.toml
|
.streamlit/secrets.toml
|
||||||
|
|
||||||
|
|
||||||
slopserver/server_config.env
|
slopserver/server_config.env
|
||||||
|
test_db.sqlite
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ PyJWT==2.10.1
|
|||||||
python-dotenv==1.1.1
|
python-dotenv==1.1.1
|
||||||
python-multipart==0.0.20
|
python-multipart==0.0.20
|
||||||
PyYAML==6.0.3
|
PyYAML==6.0.3
|
||||||
|
resend==2.19.0
|
||||||
rich==14.2.0
|
rich==14.2.0
|
||||||
rich-toolkit==0.15.1
|
rich-toolkit==0.15.1
|
||||||
rignore==0.7.1
|
rignore==0.7.1
|
||||||
|
|||||||
@@ -1 +0,0 @@
|
|||||||
TEMP_HMAC_KEY = "0460de065912d0292df1e7422a5ed2dc362ed56d6bab64fe50b89957463061f3"
|
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
from collections.abc import Iterable
|
from collections.abc import Iterable
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from urllib.parse import ParseResult
|
from urllib.parse import ParseResult
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select, func
|
||||||
from sqlalchemy.engine import Engine
|
from sqlalchemy.engine import Engine
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
from slopserver.models import Domain, Path, User, Report
|
from slopserver.models import Domain, Path, User, Report
|
||||||
@@ -12,6 +12,13 @@ def select_slop(urls: list[ParseResult], engine: Engine) -> Iterable[Domain]:
|
|||||||
rows = session.scalars(query).all()
|
rows = session.scalars(query).all()
|
||||||
return rows
|
return rows
|
||||||
|
|
||||||
|
def top_offenders(engine: Engine, limit: int|None = None) -> Iterable[Domain]:
|
||||||
|
query = select(Domain.domain_name, func.count(Path.id)).join(Path).group_by(Domain.id).order_by(func.count(Path.id).desc())
|
||||||
|
if limit: query = query.limit(limit)
|
||||||
|
with Session(engine) as session:
|
||||||
|
top_offenders = session.execute(query).all()
|
||||||
|
return top_offenders
|
||||||
|
|
||||||
def insert_slop(urls: list[ParseResult], engine: Engine, user: User | None = None):
|
def insert_slop(urls: list[ParseResult], engine: Engine, user: User | None = None):
|
||||||
domain_dict: dict[str. set[str]] = dict()
|
domain_dict: dict[str. set[str]] = dict()
|
||||||
for url in urls:
|
for url in urls:
|
||||||
@@ -34,31 +41,44 @@ def insert_slop(urls: list[ParseResult], engine: Engine, user: User | None = Non
|
|||||||
if not domain in existing_dict:
|
if not domain in existing_dict:
|
||||||
# create a new domain object and paths
|
# create a new domain object and paths
|
||||||
new_domain = Domain(domain_name=domain, paths=list())
|
new_domain = Domain(domain_name=domain, paths=list())
|
||||||
new_domain.paths = [Path(path=path) for path in paths]
|
new_paths = list()
|
||||||
session.add(new_domain)
|
for path in paths:
|
||||||
|
new_path = Path(path=path)
|
||||||
if user:
|
if user:
|
||||||
for path in new_domain.paths:
|
reports = list()
|
||||||
new_report = Report(path_id=path.id, user_id=user.id)
|
reports.append(Report(path=new_path, user=user, timestamp=datetime.now()))
|
||||||
session.add(new_report)
|
new_path.reports = reports
|
||||||
|
new_paths.append(new_path)
|
||||||
|
new_domain.paths = new_paths
|
||||||
|
session.add(new_domain)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
existing_domain = existing_dict[domain]
|
existing_domain = existing_dict[domain]
|
||||||
existing_paths = set((path.path for path in existing_domain.paths))
|
existing_paths = dict({path.path: path for path in existing_domain.paths})
|
||||||
for path in paths:
|
for path in paths:
|
||||||
if not path in existing_paths:
|
if not path in existing_paths:
|
||||||
new_path = Path(path=path)
|
new_path = Path(path=path)
|
||||||
|
if user:
|
||||||
|
report_list = list()
|
||||||
|
report_list.append(Report(path=new_path, user=user, timestamp=datetime.now()))
|
||||||
|
new_path.reports = report_list
|
||||||
existing_domain.paths.append(new_path)
|
existing_domain.paths.append(new_path)
|
||||||
session.add(new_path)
|
session.add(new_path)
|
||||||
session.flush([new_path])
|
|
||||||
session.refresh(new_path)
|
else:
|
||||||
|
# domain and path exist, append to the path's reports
|
||||||
if user:
|
if user:
|
||||||
new_report = Report(
|
existing_path = existing_paths.get(path)
|
||||||
path_id=new_path.id, user_id=user.id, timestamp=datetime.now())
|
report_dict = {(report.user.id, report.path.id): report for report in existing_path.reports}
|
||||||
session.add(new_report)
|
existing_report = report_dict.get((user.id, existing_path.id))
|
||||||
|
if existing_report:
|
||||||
|
existing_report.timestamp = datetime.now()
|
||||||
|
else:
|
||||||
|
existing_path.reports.append(Report(path=existing_path, user=user, timestamp=datetime.now()))
|
||||||
|
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
def get_user(email, engine):
|
def get_user(email, engine) -> User:
|
||||||
query = select(User).where(User.email == email)
|
query = select(User).where(User.email == email)
|
||||||
|
|
||||||
with Session(engine) as session:
|
with Session(engine) as session:
|
||||||
@@ -71,3 +91,9 @@ def create_user(email, password_hash, engine):
|
|||||||
with Session(engine) as session:
|
with Session(engine) as session:
|
||||||
session.add(user)
|
session.add(user)
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
|
def verify_user_email(user: User, engine):
|
||||||
|
with Session(engine) as session:
|
||||||
|
session.add(user)
|
||||||
|
user.email_verified = True
|
||||||
|
session.commit()
|
||||||
|
|||||||
22
slopserver/email.py
Normal file
22
slopserver/email.py
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
"""Send emails for account verification through Resend API"""
|
||||||
|
from slopserver.settings import settings
|
||||||
|
import resend
|
||||||
|
|
||||||
|
resend.api_key = settings.resend_token
|
||||||
|
|
||||||
|
def send_email(to: str, subject, content):
|
||||||
|
params: resend.Emails.SendParams = {
|
||||||
|
"from": "slopfarmer@jack-case.pro",
|
||||||
|
"to": to,
|
||||||
|
"subject": subject,
|
||||||
|
"html": content
|
||||||
|
}
|
||||||
|
|
||||||
|
email: resend.Emails.SendResponse = resend.Emails.send(params)
|
||||||
|
|
||||||
|
return email
|
||||||
|
|
||||||
|
def generate_verification_email(verification_url: str):
|
||||||
|
return f"""
|
||||||
|
<p>click here to verify your Slop Farmer account: <a href={verification_url}>{verification_url}</a></p>
|
||||||
|
"""
|
||||||
@@ -8,7 +8,7 @@ from altcha import Payload as AltchaPayload, verify_solution
|
|||||||
|
|
||||||
from urllib.parse import urlparse, ParseResult
|
from urllib.parse import urlparse, ParseResult
|
||||||
|
|
||||||
from slopserver.common import TEMP_HMAC_KEY
|
from slopserver.settings import settings
|
||||||
|
|
||||||
NAMING_CONVENTION = {
|
NAMING_CONVENTION = {
|
||||||
"ix": "ix_%(column_0_label)s",
|
"ix": "ix_%(column_0_label)s",
|
||||||
@@ -37,6 +37,7 @@ class Path(SQLModel, table=True):
|
|||||||
|
|
||||||
domain_id: int | None = Field(foreign_key="domain.id")
|
domain_id: int | None = Field(foreign_key="domain.id")
|
||||||
domain: Domain = Relationship(back_populates="paths")
|
domain: Domain = Relationship(back_populates="paths")
|
||||||
|
reports: list["Report"] = Relationship(back_populates="path")
|
||||||
|
|
||||||
class User(SQLModel, table=True):
|
class User(SQLModel, table=True):
|
||||||
id: int | None = Field(default=None, primary_key=True)
|
id: int | None = Field(default=None, primary_key=True)
|
||||||
@@ -45,11 +46,16 @@ class User(SQLModel, table=True):
|
|||||||
|
|
||||||
email_verified: bool = Field(default=False)
|
email_verified: bool = Field(default=False)
|
||||||
|
|
||||||
|
reports: list["Report"] = Relationship(back_populates="user")
|
||||||
|
|
||||||
class Report(SQLModel, table=True):
|
class Report(SQLModel, table=True):
|
||||||
path_id: int | None = Field(default=None, primary_key=True, foreign_key="path.id")
|
path_id: int | None = Field(default=None, primary_key=True, foreign_key="path.id")
|
||||||
user_id: int | None = Field(default=None, primary_key=True, foreign_key="user.id")
|
user_id: int | None = Field(default=None, primary_key=True, foreign_key="user.id")
|
||||||
timestamp: datetime | None = Field(default=datetime.now())
|
timestamp: datetime | None = Field(default=datetime.now())
|
||||||
|
|
||||||
|
path: Path = Relationship(back_populates="reports")
|
||||||
|
user: User = Relationship(back_populates="reports")
|
||||||
|
|
||||||
################################################
|
################################################
|
||||||
# API Models
|
# API Models
|
||||||
################################################
|
################################################
|
||||||
@@ -67,8 +73,7 @@ def url_validator(urls: list[str]) -> list[ParseResult]:
|
|||||||
return parsed_urls
|
return parsed_urls
|
||||||
|
|
||||||
def altcha_validator(altcha_response: AltchaPayload):
|
def altcha_validator(altcha_response: AltchaPayload):
|
||||||
# verified = verify_solution(altcha_response, TEMP_HMAC_KEY)
|
verified = verify_solution(altcha_response, settings.altcha_secret)
|
||||||
verified = (True, None)
|
|
||||||
if not verified[0]:
|
if not verified[0]:
|
||||||
raise ValueError(f"altcha verification failed: {verified[1]}")
|
raise ValueError(f"altcha verification failed: {verified[1]}")
|
||||||
return None
|
return None
|
||||||
@@ -80,4 +85,4 @@ class SlopReport(BaseModel):
|
|||||||
class SignupForm(BaseModel):
|
class SignupForm(BaseModel):
|
||||||
email: EmailStr
|
email: EmailStr
|
||||||
password: SecretStr
|
password: SecretStr
|
||||||
altcha: Annotated[Base64Str, AfterValidator(altcha_validator)]
|
altcha: Annotated[str, AfterValidator(altcha_validator)]
|
||||||
@@ -15,9 +15,10 @@ import uvicorn
|
|||||||
from fastapi import Body, Depends, FastAPI, Form, HTTPException, Header
|
from fastapi import Body, Depends, FastAPI, Form, HTTPException, Header
|
||||||
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
|
from fastapi.responses import HTMLResponse
|
||||||
|
|
||||||
from pydantic import AfterValidator, Base64Str
|
from pydantic import AfterValidator, Base64Str
|
||||||
from pydantic_settings import BaseSettings
|
|
||||||
|
|
||||||
from sqlalchemy import create_engine
|
from sqlalchemy import create_engine
|
||||||
|
|
||||||
@@ -29,23 +30,16 @@ import jwt
|
|||||||
|
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
|
from slopserver.settings import settings
|
||||||
from slopserver.models import Domain, Path, User
|
from slopserver.models import Domain, Path, User
|
||||||
from slopserver.models import SlopReport, SignupForm, altcha_validator
|
from slopserver.models import SlopReport, SignupForm, altcha_validator
|
||||||
from slopserver.db import select_slop, insert_slop, get_user, create_user
|
from slopserver.db import select_slop, insert_slop, get_user, create_user, verify_user_email
|
||||||
from slopserver.common import TEMP_HMAC_KEY
|
from slopserver.email import generate_verification_email, send_email
|
||||||
|
|
||||||
app = FastAPI()
|
app = FastAPI()
|
||||||
|
|
||||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
||||||
|
|
||||||
class ServerSettings(BaseSettings):
|
|
||||||
db_url: str = "postgresql+psycopg2://slop-farmer@192.168.1.163/slop-farmer"
|
|
||||||
token_secret: str = "5bcc778a96b090c3ac1d587bb694a060eaf7bdb5832365f91d5078faf1fff210"
|
|
||||||
# altcha_secret: str
|
|
||||||
|
|
||||||
settings = ServerSettings()
|
|
||||||
|
|
||||||
|
|
||||||
DB_ENGINE = create_engine(settings.db_url)
|
DB_ENGINE = create_engine(settings.db_url)
|
||||||
TOKEN_SECRET = settings.token_secret
|
TOKEN_SECRET = settings.token_secret
|
||||||
@@ -95,6 +89,17 @@ def generate_auth_token(username):
|
|||||||
encoded_jwt = jwt.encode(bearer_token, TOKEN_SECRET, ALGO)
|
encoded_jwt = jwt.encode(bearer_token, TOKEN_SECRET, ALGO)
|
||||||
return encoded_jwt
|
return encoded_jwt
|
||||||
|
|
||||||
|
def generate_verification_token(username):
|
||||||
|
expiration = datetime.now() + timedelta(days=2)
|
||||||
|
verification_token = {
|
||||||
|
"iss": "slopserver",
|
||||||
|
"exp": int(expiration.timestamp()),
|
||||||
|
"sub": username
|
||||||
|
}
|
||||||
|
|
||||||
|
encoded_jwt = jwt.encode(verification_token, TOKEN_SECRET, ALGO)
|
||||||
|
return encoded_jwt
|
||||||
|
|
||||||
def get_token_user(decoded_token):
|
def get_token_user(decoded_token):
|
||||||
user = get_user(decoded_token["sub"], DB_ENGINE)
|
user = get_user(decoded_token["sub"], DB_ENGINE)
|
||||||
return user
|
return user
|
||||||
@@ -106,13 +111,20 @@ def verify_auth_token(token: str):
|
|||||||
except:
|
except:
|
||||||
raise HTTPException(status_code=401, detail="invalid access token")
|
raise HTTPException(status_code=401, detail="invalid access token")
|
||||||
|
|
||||||
|
def verify_verification_token(token: str):
|
||||||
|
try:
|
||||||
|
token = jwt.decode(token, TOKEN_SECRET, ALGO)
|
||||||
|
return token
|
||||||
|
except:
|
||||||
|
raise HTTPException(status_code=404, detail="invalid verification URL")
|
||||||
|
|
||||||
@app.post("/report")
|
@app.post("/report")
|
||||||
async def report_slop(report: SlopReport, bearer: Annotated[str, AfterValidator(verify_auth_token), Header()]):
|
def report_slop(report: SlopReport, bearer: Annotated[str, AfterValidator(verify_auth_token), Header()]):
|
||||||
user = get_token_user(bearer)
|
user = get_token_user(bearer)
|
||||||
insert_slop(report.slop_urls, DB_ENGINE, user)
|
insert_slop(report.slop_urls, DB_ENGINE, user)
|
||||||
|
|
||||||
@app.post("/check")
|
@app.post("/check")
|
||||||
async def check_slop(check: Annotated[SlopReport, Body()], bearer: Annotated[str, AfterValidator(verify_auth_token), Header()]):
|
def check_slop(check: Annotated[SlopReport, Body()], bearer: Annotated[str, AfterValidator(verify_auth_token), Header()]):
|
||||||
slop_results = select_slop(check.slop_urls, DB_ENGINE)
|
slop_results = select_slop(check.slop_urls, DB_ENGINE)
|
||||||
return slop_results
|
return slop_results
|
||||||
|
|
||||||
@@ -120,13 +132,13 @@ async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
@app.post("/token")
|
@app.post("/token")
|
||||||
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
|
def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
|
||||||
user = get_user(form_data.username, DB_ENGINE)
|
user = get_user(form_data.username, DB_ENGINE)
|
||||||
if not user:
|
if not user:
|
||||||
raise HTTPException(status_code=400, detail="Incorrect username or password")
|
raise HTTPException(status_code=400, detail="Incorrect username or password")
|
||||||
|
|
||||||
@app.post("/signup")
|
@app.post("/signup")
|
||||||
async def signup_form(form_data: Annotated[SignupForm, Form()]):
|
def signup_form(form_data: Annotated[SignupForm, Form()]):
|
||||||
# if we're here, form is validated including the altcha
|
# if we're here, form is validated including the altcha
|
||||||
# check for existing user with the given email
|
# check for existing user with the given email
|
||||||
if get_user(form_data.email, DB_ENGINE):
|
if get_user(form_data.email, DB_ENGINE):
|
||||||
@@ -134,46 +146,57 @@ async def signup_form(form_data: Annotated[SignupForm, Form()]):
|
|||||||
raise HTTPException(status_code=409, detail="User already exists")
|
raise HTTPException(status_code=409, detail="User already exists")
|
||||||
|
|
||||||
# create user
|
# create user
|
||||||
create_user(form_data.email, get_password_hash(form_data.password), DB_ENGINE)
|
create_user(form_data.email, get_password_hash(form_data.password.get_secret_value()), DB_ENGINE)
|
||||||
|
|
||||||
|
# send verification email
|
||||||
|
# create a jwt encoding the username and a time limit to be the verification URL
|
||||||
|
token = generate_verification_token(form_data.email)
|
||||||
|
|
||||||
|
email_html = generate_verification_email(settings.api_base + "verify/?token=" + token)
|
||||||
|
status = send_email(form_data.email, "Slop Farmer Email Verification", email_html)
|
||||||
|
return status
|
||||||
|
|
||||||
|
@app.get("/verify")
|
||||||
|
def verify_email(token: Annotated[str, AfterValidator(verify_verification_token)]):
|
||||||
|
user = get_user(token["sub"], DB_ENGINE)
|
||||||
|
if not user:
|
||||||
|
raise HTTPException(status_code=404, detail="invalid verification URL")
|
||||||
|
if user.email_verified:
|
||||||
|
raise HTTPException(status_code=404, detail="already verified")
|
||||||
|
|
||||||
|
verify_user_email(user, DB_ENGINE)
|
||||||
|
html = f"""
|
||||||
|
<html>
|
||||||
|
<head>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<p>{token["sub"]} verified. You may log in now.</p>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
"""
|
||||||
|
|
||||||
|
return HTMLResponse(content=html, status_code=200)
|
||||||
|
|
||||||
|
|
||||||
@app.get("/altcha-challenge")
|
@app.get("/altcha-challenge")
|
||||||
async def altcha_challenge():
|
def altcha_challenge():
|
||||||
options = ChallengeOptions(
|
options = ChallengeOptions(
|
||||||
expires=datetime.now() + timedelta(minutes=10),
|
expires=datetime.now() + timedelta(minutes=10),
|
||||||
max_number=100000,
|
max_number=80000,
|
||||||
hmac_key=TEMP_HMAC_KEY
|
hmac_key=settings.altcha_secret
|
||||||
)
|
)
|
||||||
challenge = create_challenge(options)
|
challenge = create_challenge(options)
|
||||||
return challenge
|
return challenge
|
||||||
|
|
||||||
@app.post("/login")
|
@app.post("/login")
|
||||||
async def simple_login(username: Annotated[str, Form()], password: Annotated[str, Form()]):
|
def simple_login(username: Annotated[str, Form()], password: Annotated[str, Form()]):
|
||||||
user = auth_user(username, password, DB_ENGINE)
|
user = auth_user(username, password, DB_ENGINE)
|
||||||
if not user:
|
if not user:
|
||||||
raise HTTPException(status_code=401, detail="Incorrect username or password")
|
raise HTTPException(status_code=401, detail="Incorrect username or password")
|
||||||
|
if not user.email_verified:
|
||||||
|
raise HTTPException(status_code=401, detail="Unverified email address")
|
||||||
token = generate_auth_token(username)
|
token = generate_auth_token(username)
|
||||||
return {"access_token": token, "token_type": "bearer"}
|
return {"access_token": token, "token_type": "bearer"}
|
||||||
|
|
||||||
@app.post("/altcha-challenge")
|
|
||||||
async def altcha_verify(payload: Annotated[Base64Str, AfterValidator(altcha_validator)]):
|
|
||||||
# if verified, return a JWT for anonymous API access
|
|
||||||
expiration = datetime.now() + timedelta(days=30)
|
|
||||||
uuid = uuid4()
|
|
||||||
bearer_token = {
|
|
||||||
"iss": "slopserver",
|
|
||||||
"exp": int(expiration.timestamp()),
|
|
||||||
"aud": "slopserver",
|
|
||||||
"sub": str(uuid),
|
|
||||||
"client_id": str(uuid),
|
|
||||||
"iat": int(datetime.now().timestamp()),
|
|
||||||
"jti": str(uuid)
|
|
||||||
}
|
|
||||||
|
|
||||||
encoded_jwt = jwt.encode(bearer_token, TOKEN_SECRET, ALGO)
|
|
||||||
|
|
||||||
return encoded_jwt
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||||
13
slopserver/settings.py
Normal file
13
slopserver/settings.py
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
from pydantic_settings import BaseSettings
|
||||||
|
|
||||||
|
|
||||||
|
class ServerSettings(BaseSettings):
|
||||||
|
db_url: str = "sqlite+pysqlite:///slopserver/test/test_db.sqlite"
|
||||||
|
token_secret: str = "5bcc778a96b090c3ac1d587bb694a060eaf7bdb5832365f91d5078faf1fff210"
|
||||||
|
altcha_secret: str = "0460de065912d0292df1e7422a5ed2dc362ed56d6bab64fe50b89957463061f3"
|
||||||
|
resend_token: str = "re_NXpjzbqR_KgAbu72PKjYHcquX24WvnN3i"
|
||||||
|
|
||||||
|
sender_email: str = "slopfarmer@jack-case.pro"
|
||||||
|
api_base: str = "api.slopfarmer.jack-case.pro/"
|
||||||
|
|
||||||
|
settings = ServerSettings()
|
||||||
0
slopserver/test/__init__.py
Normal file
0
slopserver/test/__init__.py
Normal file
5
slopserver/test/backup_test_db.sh
Executable file
5
slopserver/test/backup_test_db.sh
Executable file
@@ -0,0 +1,5 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
test_dir=slopserver/test
|
||||||
|
|
||||||
|
sqlite3 $test_dir/test_db.sqlite .dump > $1
|
||||||
5
slopserver/test/create_test_db.sh
Executable file
5
slopserver/test/create_test_db.sh
Executable file
@@ -0,0 +1,5 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
test_dir=slopserver/test
|
||||||
|
|
||||||
|
cat $test_dir/test_db.sql | sqlite3 $test_dir/test_db.sqlite
|
||||||
5
slopserver/test/test_api.py
Normal file
5
slopserver/test/test_api.py
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
from fastapi import FastAPI
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
from slopserver.server import app as slopserver_app
|
||||||
|
|
||||||
|
client = TestClient(slopserver_app)
|
||||||
18
slopserver/test/test_db.py
Normal file
18
slopserver/test/test_db.py
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
from slopserver.db import *
|
||||||
|
from slopserver.models import *
|
||||||
|
from slopserver.settings import settings
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
class TestDBFuncs(unittest.TestCase):
|
||||||
|
|
||||||
|
test_db_url = settings.db_url
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.engine = create_engine(self.test_db_url)
|
||||||
|
|
||||||
|
def test_get_top_offenders(self):
|
||||||
|
items = top_offenders(self.engine)
|
||||||
|
print(items)
|
||||||
|
self.assertEqual(items[0][0], "moogle.com")
|
||||||
|
|
||||||
45
slopserver/test/test_db.sql
Normal file
45
slopserver/test/test_db.sql
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
PRAGMA foreign_keys=OFF;
|
||||||
|
BEGIN TRANSACTION;
|
||||||
|
CREATE TABLE domain (
|
||||||
|
id INTEGER NOT NULL,
|
||||||
|
domain_name VARCHAR NOT NULL,
|
||||||
|
CONSTRAINT pk_domain PRIMARY KEY (id)
|
||||||
|
);
|
||||||
|
INSERT INTO domain VALUES(1,'google.com');
|
||||||
|
INSERT INTO domain VALUES(2,'moogle.com');
|
||||||
|
CREATE TABLE user (
|
||||||
|
id INTEGER NOT NULL,
|
||||||
|
email VARCHAR NOT NULL,
|
||||||
|
password_hash VARCHAR NOT NULL,
|
||||||
|
email_verified BOOLEAN NOT NULL,
|
||||||
|
CONSTRAINT pk_user PRIMARY KEY (id)
|
||||||
|
);
|
||||||
|
INSERT INTO user VALUES(1,'alphauser01','$argon2id$v=19$m=65536,t=3,p=4$3z7uxa3NHl/dKG07RGEvBA$0NOBftJpP+HiR7wfgdwBk2UR9F12YBjrqeqLSyDl47o','True');
|
||||||
|
CREATE TABLE path (
|
||||||
|
id INTEGER NOT NULL,
|
||||||
|
path VARCHAR NOT NULL,
|
||||||
|
domain_id INTEGER,
|
||||||
|
CONSTRAINT pk_path PRIMARY KEY (id),
|
||||||
|
CONSTRAINT fk_path_domain_id_domain FOREIGN KEY(domain_id) REFERENCES domain (id)
|
||||||
|
);
|
||||||
|
INSERT INTO path VALUES(1,'/',0);
|
||||||
|
INSERT INTO path VALUES(2,'/path1',0);
|
||||||
|
INSERT INTO path VALUES(3,'/path2',1);
|
||||||
|
INSERT INTO path VALUES(4,'/path3/a',1);
|
||||||
|
INSERT INTO path VALUES(5,'/path4',1);
|
||||||
|
INSERT INTO path VALUES(6,'/path2',2);
|
||||||
|
INSERT INTO path VALUES(7,'/',2);
|
||||||
|
INSERT INTO path VALUES(8,'/path3',2);
|
||||||
|
INSERT INTO path VALUES(9,'/path4',2);
|
||||||
|
CREATE TABLE report (
|
||||||
|
path_id INTEGER NOT NULL,
|
||||||
|
user_id INTEGER NOT NULL,
|
||||||
|
timestamp DATETIME,
|
||||||
|
CONSTRAINT pk_report PRIMARY KEY (path_id, user_id),
|
||||||
|
CONSTRAINT fk_report_path_id_path FOREIGN KEY(path_id) REFERENCES path (id),
|
||||||
|
CONSTRAINT fk_report_user_id_user FOREIGN KEY(user_id) REFERENCES user (id)
|
||||||
|
);
|
||||||
|
INSERT INTO report VALUES(2,1,'11-26-2025');
|
||||||
|
CREATE UNIQUE INDEX ix_domain_domain_name ON domain (domain_name);
|
||||||
|
CREATE UNIQUE INDEX ix_user_email ON user (email);
|
||||||
|
COMMIT;
|
||||||
Reference in New Issue
Block a user