7 Commits
v0.5 ... v0.7

Author SHA1 Message Date
Jack Case
e8189ed832 make the altcha challenge a smidge easier 2025-11-09 18:09:29 +00:00
Jack Case
c0600f527f handle password field of signup form properly, and verify altcha solution 2025-11-09 18:05:44 +00:00
Jack Case
ed6065a6ea add sqlite feature to devcontainer for test DB 2025-11-09 18:05:04 +00:00
Jack Case
ea843597aa updates 2025-10-30 01:55:18 +00:00
Jack Case
b35c92b96f ran tests against a local sqlite DB
reports appear to work properly now, will run prod DB migrations and upgrade the server container when I'm home.
2025-10-29 15:30:02 +00:00
Jack Case
e420a31127 new pass at properly handling related report models 2025-10-29 14:33:02 +00:00
Jack Case
9c07870cb7 properly specify the Report model with relationships to User and Path 2025-10-29 13:50:52 +00:00
5 changed files with 60 additions and 34 deletions

View File

@@ -14,7 +14,8 @@
// Features to add to the dev container. More info: https://containers.dev/features.
"features": {
"ghcr.io/devcontainers/features/docker-in-docker:2": {}
"ghcr.io/devcontainers/features/docker-in-docker:2": {},
"ghcr.io/warrenbuckley/codespace-features/sqlite:1": {}
},
// Use 'forwardPorts' to make a list of ports inside the container available locally.
@@ -22,7 +23,7 @@
// Use 'postCreateCommand' to run commands after the container is created.
// "postCreateCommand": "pip3 install --user -r requirements.txt",
"postCreateCommand": ". ./env/bin/activate; python -m pip install -r requirements.txt"
"postCreateCommand": "python -m venv env && . ./env/bin/activate && python -m pip install -r requirements.txt"
// Configure tool-specific properties.
// "customizations": {},

7
.dockerignore Normal file
View File

@@ -0,0 +1,7 @@
.devcontainer/
.vscode/
test_db.sqlite
env/
**/__pycache__/
slopserver/alembic/

View File

@@ -34,27 +34,40 @@ def insert_slop(urls: list[ParseResult], engine: Engine, user: User | None = Non
if not domain in existing_dict:
# create a new domain object and paths
new_domain = Domain(domain_name=domain, paths=list())
new_domain.paths = [Path(path=path) for path in paths]
new_paths = list()
for path in paths:
new_path = Path(path=path)
if user:
reports = list()
reports.append(Report(path=new_path, user=user, timestamp=datetime.now()))
new_path.reports = reports
new_paths.append(new_path)
new_domain.paths = new_paths
session.add(new_domain)
if user:
for path in new_domain.paths:
new_report = Report(path_id=path.id, user_id=user.id)
session.add(new_report)
else:
existing_domain = existing_dict[domain]
existing_paths = set((path.path for path in existing_domain.paths))
existing_paths = dict({path.path: path for path in existing_domain.paths})
for path in paths:
if not path in existing_paths:
new_path = Path(path=path)
if user:
report_list = list()
report_list.append(Report(path=new_path, user=user, timestamp=datetime.now()))
new_path.reports = report_list
existing_domain.paths.append(new_path)
session.add(new_path)
session.flush([new_path])
session.refresh(new_path)
else:
# domain and path exist, append to the path's reports
if user:
new_report = Report(
path_id=new_path.id, user_id=user.id, timestamp=datetime.now())
session.add(new_report)
existing_path = existing_paths.get(path)
report_dict = {(report.user.id, report.path.id): report for report in existing_path.reports}
existing_report = report_dict.get((user.id, existing_path.id))
if existing_report:
existing_report.timestamp = datetime.now()
else:
existing_path.reports.append(Report(path=existing_path, user=user, timestamp=datetime.now()))
session.commit()

View File

@@ -37,6 +37,7 @@ class Path(SQLModel, table=True):
domain_id: int | None = Field(foreign_key="domain.id")
domain: Domain = Relationship(back_populates="paths")
reports: list["Report"] = Relationship(back_populates="path")
class User(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
@@ -45,11 +46,16 @@ class User(SQLModel, table=True):
email_verified: bool = Field(default=False)
reports: list["Report"] = Relationship(back_populates="user")
class Report(SQLModel, table=True):
path_id: int | None = Field(default=None, primary_key=True, foreign_key="path.id")
user_id: int | None = Field(default=None, primary_key=True, foreign_key="user.id")
timestamp: datetime | None = Field(default=datetime.now())
path: Path = Relationship(back_populates="reports")
user: User = Relationship(back_populates="reports")
################################################
# API Models
################################################
@@ -67,8 +73,7 @@ def url_validator(urls: list[str]) -> list[ParseResult]:
return parsed_urls
def altcha_validator(altcha_response: AltchaPayload):
# verified = verify_solution(altcha_response, TEMP_HMAC_KEY)
verified = (True, None)
verified = verify_solution(altcha_response, TEMP_HMAC_KEY)
if not verified[0]:
raise ValueError(f"altcha verification failed: {verified[1]}")
return None

View File

@@ -40,7 +40,7 @@ app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class ServerSettings(BaseSettings):
db_url: str = "postgresql+psycopg2://slop-farmer@192.168.1.163/slop-farmer"
db_url: str = "sqlite+pysqlite:///test_db.sqlite"
token_secret: str = "5bcc778a96b090c3ac1d587bb694a060eaf7bdb5832365f91d5078faf1fff210"
# altcha_secret: str
@@ -134,13 +134,13 @@ async def signup_form(form_data: Annotated[SignupForm, Form()]):
raise HTTPException(status_code=409, detail="User already exists")
# create user
create_user(form_data.email, get_password_hash(form_data.password), DB_ENGINE)
create_user(form_data.email, get_password_hash(form_data.password.get_secret_value()), DB_ENGINE)
@app.get("/altcha-challenge")
async def altcha_challenge():
options = ChallengeOptions(
expires=datetime.now() + timedelta(minutes=10),
max_number=100000,
max_number=80000,
hmac_key=TEMP_HMAC_KEY
)
challenge = create_challenge(options)
@@ -154,24 +154,24 @@ async def simple_login(username: Annotated[str, Form()], password: Annotated[str
token = generate_auth_token(username)
return {"access_token": token, "token_type": "bearer"}
@app.post("/altcha-challenge")
async def altcha_verify(payload: Annotated[Base64Str, AfterValidator(altcha_validator)]):
# if verified, return a JWT for anonymous API access
expiration = datetime.now() + timedelta(days=30)
uuid = uuid4()
bearer_token = {
"iss": "slopserver",
"exp": int(expiration.timestamp()),
"aud": "slopserver",
"sub": str(uuid),
"client_id": str(uuid),
"iat": int(datetime.now().timestamp()),
"jti": str(uuid)
}
# @app.post("/altcha-challenge")
# async def altcha_verify(payload: Annotated[Base64Str, AfterValidator(altcha_validator)]):
# # if verified, return a JWT for anonymous API access
# expiration = datetime.now() + timedelta(days=30)
# uuid = uuid4()
# bearer_token = {
# "iss": "slopserver",
# "exp": int(expiration.timestamp()),
# "aud": "slopserver",
# "sub": str(uuid),
# "client_id": str(uuid),
# "iat": int(datetime.now().timestamp()),
# "jti": str(uuid)
# }
encoded_jwt = jwt.encode(bearer_token, TOKEN_SECRET, ALGO)
# encoded_jwt = jwt.encode(bearer_token, TOKEN_SECRET, ALGO)
return encoded_jwt
# return encoded_jwt