Compare commits
9 Commits
v0.5
...
feat_signu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f24f3ceee4 | ||
|
|
49377f2e20 | ||
|
|
e8189ed832 | ||
|
|
c0600f527f | ||
|
|
ed6065a6ea | ||
|
|
ea843597aa | ||
|
|
b35c92b96f | ||
|
|
e420a31127 | ||
|
|
9c07870cb7 |
@@ -14,7 +14,8 @@
|
||||
|
||||
// Features to add to the dev container. More info: https://containers.dev/features.
|
||||
"features": {
|
||||
"ghcr.io/devcontainers/features/docker-in-docker:2": {}
|
||||
"ghcr.io/devcontainers/features/docker-in-docker:2": {},
|
||||
"ghcr.io/warrenbuckley/codespace-features/sqlite:1": {}
|
||||
},
|
||||
|
||||
// Use 'forwardPorts' to make a list of ports inside the container available locally.
|
||||
@@ -22,7 +23,7 @@
|
||||
|
||||
// Use 'postCreateCommand' to run commands after the container is created.
|
||||
// "postCreateCommand": "pip3 install --user -r requirements.txt",
|
||||
"postCreateCommand": ". ./env/bin/activate; python -m pip install -r requirements.txt"
|
||||
"postCreateCommand": "python -m venv env && . ./env/bin/activate && python -m pip install -r requirements.txt"
|
||||
|
||||
// Configure tool-specific properties.
|
||||
// "customizations": {},
|
||||
|
||||
7
.dockerignore
Normal file
7
.dockerignore
Normal file
@@ -0,0 +1,7 @@
|
||||
.devcontainer/
|
||||
.vscode/
|
||||
|
||||
test_db.sqlite
|
||||
env/
|
||||
**/__pycache__/
|
||||
slopserver/alembic/
|
||||
@@ -1 +0,0 @@
|
||||
TEMP_HMAC_KEY = "0460de065912d0292df1e7422a5ed2dc362ed56d6bab64fe50b89957463061f3"
|
||||
@@ -34,27 +34,40 @@ def insert_slop(urls: list[ParseResult], engine: Engine, user: User | None = Non
|
||||
if not domain in existing_dict:
|
||||
# create a new domain object and paths
|
||||
new_domain = Domain(domain_name=domain, paths=list())
|
||||
new_domain.paths = [Path(path=path) for path in paths]
|
||||
new_paths = list()
|
||||
for path in paths:
|
||||
new_path = Path(path=path)
|
||||
if user:
|
||||
reports = list()
|
||||
reports.append(Report(path=new_path, user=user, timestamp=datetime.now()))
|
||||
new_path.reports = reports
|
||||
new_paths.append(new_path)
|
||||
new_domain.paths = new_paths
|
||||
session.add(new_domain)
|
||||
if user:
|
||||
for path in new_domain.paths:
|
||||
new_report = Report(path_id=path.id, user_id=user.id)
|
||||
session.add(new_report)
|
||||
|
||||
else:
|
||||
existing_domain = existing_dict[domain]
|
||||
existing_paths = set((path.path for path in existing_domain.paths))
|
||||
existing_paths = dict({path.path: path for path in existing_domain.paths})
|
||||
for path in paths:
|
||||
if not path in existing_paths:
|
||||
new_path = Path(path=path)
|
||||
if user:
|
||||
report_list = list()
|
||||
report_list.append(Report(path=new_path, user=user, timestamp=datetime.now()))
|
||||
new_path.reports = report_list
|
||||
existing_domain.paths.append(new_path)
|
||||
session.add(new_path)
|
||||
session.flush([new_path])
|
||||
session.refresh(new_path)
|
||||
|
||||
else:
|
||||
# domain and path exist, append to the path's reports
|
||||
if user:
|
||||
new_report = Report(
|
||||
path_id=new_path.id, user_id=user.id, timestamp=datetime.now())
|
||||
session.add(new_report)
|
||||
existing_path = existing_paths.get(path)
|
||||
report_dict = {(report.user.id, report.path.id): report for report in existing_path.reports}
|
||||
existing_report = report_dict.get((user.id, existing_path.id))
|
||||
if existing_report:
|
||||
existing_report.timestamp = datetime.now()
|
||||
else:
|
||||
existing_path.reports.append(Report(path=existing_path, user=user, timestamp=datetime.now()))
|
||||
|
||||
session.commit()
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ from altcha import Payload as AltchaPayload, verify_solution
|
||||
|
||||
from urllib.parse import urlparse, ParseResult
|
||||
|
||||
from slopserver.common import TEMP_HMAC_KEY
|
||||
from slopserver.settings import settings
|
||||
|
||||
NAMING_CONVENTION = {
|
||||
"ix": "ix_%(column_0_label)s",
|
||||
@@ -37,6 +37,7 @@ class Path(SQLModel, table=True):
|
||||
|
||||
domain_id: int | None = Field(foreign_key="domain.id")
|
||||
domain: Domain = Relationship(back_populates="paths")
|
||||
reports: list["Report"] = Relationship(back_populates="path")
|
||||
|
||||
class User(SQLModel, table=True):
|
||||
id: int | None = Field(default=None, primary_key=True)
|
||||
@@ -45,11 +46,16 @@ class User(SQLModel, table=True):
|
||||
|
||||
email_verified: bool = Field(default=False)
|
||||
|
||||
reports: list["Report"] = Relationship(back_populates="user")
|
||||
|
||||
class Report(SQLModel, table=True):
|
||||
path_id: int | None = Field(default=None, primary_key=True, foreign_key="path.id")
|
||||
user_id: int | None = Field(default=None, primary_key=True, foreign_key="user.id")
|
||||
timestamp: datetime | None = Field(default=datetime.now())
|
||||
|
||||
path: Path = Relationship(back_populates="reports")
|
||||
user: User = Relationship(back_populates="reports")
|
||||
|
||||
################################################
|
||||
# API Models
|
||||
################################################
|
||||
@@ -67,8 +73,7 @@ def url_validator(urls: list[str]) -> list[ParseResult]:
|
||||
return parsed_urls
|
||||
|
||||
def altcha_validator(altcha_response: AltchaPayload):
|
||||
# verified = verify_solution(altcha_response, TEMP_HMAC_KEY)
|
||||
verified = (True, None)
|
||||
verified = verify_solution(altcha_response, settings.altcha_secret)
|
||||
if not verified[0]:
|
||||
raise ValueError(f"altcha verification failed: {verified[1]}")
|
||||
return None
|
||||
@@ -80,4 +85,4 @@ class SlopReport(BaseModel):
|
||||
class SignupForm(BaseModel):
|
||||
email: EmailStr
|
||||
password: SecretStr
|
||||
altcha: Annotated[Base64Str, AfterValidator(altcha_validator)]
|
||||
altcha: Annotated[str, AfterValidator(altcha_validator)]
|
||||
@@ -17,7 +17,7 @@ from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
from pydantic import AfterValidator, Base64Str
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
|
||||
@@ -29,23 +29,15 @@ import jwt
|
||||
|
||||
from uuid import uuid4
|
||||
|
||||
|
||||
from slopserver.settings import settings
|
||||
from slopserver.models import Domain, Path, User
|
||||
from slopserver.models import SlopReport, SignupForm, altcha_validator
|
||||
from slopserver.db import select_slop, insert_slop, get_user, create_user
|
||||
from slopserver.common import TEMP_HMAC_KEY
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
||||
|
||||
class ServerSettings(BaseSettings):
|
||||
db_url: str = "postgresql+psycopg2://slop-farmer@192.168.1.163/slop-farmer"
|
||||
token_secret: str = "5bcc778a96b090c3ac1d587bb694a060eaf7bdb5832365f91d5078faf1fff210"
|
||||
# altcha_secret: str
|
||||
|
||||
settings = ServerSettings()
|
||||
|
||||
|
||||
DB_ENGINE = create_engine(settings.db_url)
|
||||
TOKEN_SECRET = settings.token_secret
|
||||
@@ -134,14 +126,14 @@ async def signup_form(form_data: Annotated[SignupForm, Form()]):
|
||||
raise HTTPException(status_code=409, detail="User already exists")
|
||||
|
||||
# create user
|
||||
create_user(form_data.email, get_password_hash(form_data.password), DB_ENGINE)
|
||||
create_user(form_data.email, get_password_hash(form_data.password.get_secret_value()), DB_ENGINE)
|
||||
|
||||
@app.get("/altcha-challenge")
|
||||
async def altcha_challenge():
|
||||
options = ChallengeOptions(
|
||||
expires=datetime.now() + timedelta(minutes=10),
|
||||
max_number=100000,
|
||||
hmac_key=TEMP_HMAC_KEY
|
||||
max_number=80000,
|
||||
hmac_key=settings.altcha_secret
|
||||
)
|
||||
challenge = create_challenge(options)
|
||||
return challenge
|
||||
@@ -153,27 +145,6 @@ async def simple_login(username: Annotated[str, Form()], password: Annotated[str
|
||||
raise HTTPException(status_code=401, detail="Incorrect username or password")
|
||||
token = generate_auth_token(username)
|
||||
return {"access_token": token, "token_type": "bearer"}
|
||||
|
||||
@app.post("/altcha-challenge")
|
||||
async def altcha_verify(payload: Annotated[Base64Str, AfterValidator(altcha_validator)]):
|
||||
# if verified, return a JWT for anonymous API access
|
||||
expiration = datetime.now() + timedelta(days=30)
|
||||
uuid = uuid4()
|
||||
bearer_token = {
|
||||
"iss": "slopserver",
|
||||
"exp": int(expiration.timestamp()),
|
||||
"aud": "slopserver",
|
||||
"sub": str(uuid),
|
||||
"client_id": str(uuid),
|
||||
"iat": int(datetime.now().timestamp()),
|
||||
"jti": str(uuid)
|
||||
}
|
||||
|
||||
encoded_jwt = jwt.encode(bearer_token, TOKEN_SECRET, ALGO)
|
||||
|
||||
return encoded_jwt
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
9
slopserver/settings.py
Normal file
9
slopserver/settings.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
|
||||
class ServerSettings(BaseSettings):
|
||||
db_url: str = "sqlite+pysqlite:///test_db.sqlite"
|
||||
token_secret: str = "5bcc778a96b090c3ac1d587bb694a060eaf7bdb5832365f91d5078faf1fff210"
|
||||
altcha_secret: str = "0460de065912d0292df1e7422a5ed2dc362ed56d6bab64fe50b89957463061f3"
|
||||
|
||||
settings = ServerSettings()
|
||||
Reference in New Issue
Block a user