Compare commits
2 Commits
v0.7
...
feat_signu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f24f3ceee4 | ||
|
|
49377f2e20 |
@@ -1 +0,0 @@
|
|||||||
TEMP_HMAC_KEY = "0460de065912d0292df1e7422a5ed2dc362ed56d6bab64fe50b89957463061f3"
|
|
||||||
@@ -8,7 +8,7 @@ from altcha import Payload as AltchaPayload, verify_solution
|
|||||||
|
|
||||||
from urllib.parse import urlparse, ParseResult
|
from urllib.parse import urlparse, ParseResult
|
||||||
|
|
||||||
from slopserver.common import TEMP_HMAC_KEY
|
from slopserver.settings import settings
|
||||||
|
|
||||||
NAMING_CONVENTION = {
|
NAMING_CONVENTION = {
|
||||||
"ix": "ix_%(column_0_label)s",
|
"ix": "ix_%(column_0_label)s",
|
||||||
@@ -73,7 +73,7 @@ def url_validator(urls: list[str]) -> list[ParseResult]:
|
|||||||
return parsed_urls
|
return parsed_urls
|
||||||
|
|
||||||
def altcha_validator(altcha_response: AltchaPayload):
|
def altcha_validator(altcha_response: AltchaPayload):
|
||||||
verified = verify_solution(altcha_response, TEMP_HMAC_KEY)
|
verified = verify_solution(altcha_response, settings.altcha_secret)
|
||||||
if not verified[0]:
|
if not verified[0]:
|
||||||
raise ValueError(f"altcha verification failed: {verified[1]}")
|
raise ValueError(f"altcha verification failed: {verified[1]}")
|
||||||
return None
|
return None
|
||||||
@@ -85,4 +85,4 @@ class SlopReport(BaseModel):
|
|||||||
class SignupForm(BaseModel):
|
class SignupForm(BaseModel):
|
||||||
email: EmailStr
|
email: EmailStr
|
||||||
password: SecretStr
|
password: SecretStr
|
||||||
altcha: Annotated[Base64Str, AfterValidator(altcha_validator)]
|
altcha: Annotated[str, AfterValidator(altcha_validator)]
|
||||||
@@ -17,7 +17,7 @@ from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
|||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
|
|
||||||
from pydantic import AfterValidator, Base64Str
|
from pydantic import AfterValidator, Base64Str
|
||||||
from pydantic_settings import BaseSettings
|
|
||||||
|
|
||||||
from sqlalchemy import create_engine
|
from sqlalchemy import create_engine
|
||||||
|
|
||||||
@@ -29,23 +29,15 @@ import jwt
|
|||||||
|
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
|
from slopserver.settings import settings
|
||||||
from slopserver.models import Domain, Path, User
|
from slopserver.models import Domain, Path, User
|
||||||
from slopserver.models import SlopReport, SignupForm, altcha_validator
|
from slopserver.models import SlopReport, SignupForm, altcha_validator
|
||||||
from slopserver.db import select_slop, insert_slop, get_user, create_user
|
from slopserver.db import select_slop, insert_slop, get_user, create_user
|
||||||
from slopserver.common import TEMP_HMAC_KEY
|
|
||||||
|
|
||||||
app = FastAPI()
|
app = FastAPI()
|
||||||
|
|
||||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
||||||
|
|
||||||
class ServerSettings(BaseSettings):
|
|
||||||
db_url: str = "sqlite+pysqlite:///test_db.sqlite"
|
|
||||||
token_secret: str = "5bcc778a96b090c3ac1d587bb694a060eaf7bdb5832365f91d5078faf1fff210"
|
|
||||||
# altcha_secret: str
|
|
||||||
|
|
||||||
settings = ServerSettings()
|
|
||||||
|
|
||||||
|
|
||||||
DB_ENGINE = create_engine(settings.db_url)
|
DB_ENGINE = create_engine(settings.db_url)
|
||||||
TOKEN_SECRET = settings.token_secret
|
TOKEN_SECRET = settings.token_secret
|
||||||
@@ -141,7 +133,7 @@ async def altcha_challenge():
|
|||||||
options = ChallengeOptions(
|
options = ChallengeOptions(
|
||||||
expires=datetime.now() + timedelta(minutes=10),
|
expires=datetime.now() + timedelta(minutes=10),
|
||||||
max_number=80000,
|
max_number=80000,
|
||||||
hmac_key=TEMP_HMAC_KEY
|
hmac_key=settings.altcha_secret
|
||||||
)
|
)
|
||||||
challenge = create_challenge(options)
|
challenge = create_challenge(options)
|
||||||
return challenge
|
return challenge
|
||||||
@@ -153,27 +145,6 @@ async def simple_login(username: Annotated[str, Form()], password: Annotated[str
|
|||||||
raise HTTPException(status_code=401, detail="Incorrect username or password")
|
raise HTTPException(status_code=401, detail="Incorrect username or password")
|
||||||
token = generate_auth_token(username)
|
token = generate_auth_token(username)
|
||||||
return {"access_token": token, "token_type": "bearer"}
|
return {"access_token": token, "token_type": "bearer"}
|
||||||
|
|
||||||
# @app.post("/altcha-challenge")
|
|
||||||
# async def altcha_verify(payload: Annotated[Base64Str, AfterValidator(altcha_validator)]):
|
|
||||||
# # if verified, return a JWT for anonymous API access
|
|
||||||
# expiration = datetime.now() + timedelta(days=30)
|
|
||||||
# uuid = uuid4()
|
|
||||||
# bearer_token = {
|
|
||||||
# "iss": "slopserver",
|
|
||||||
# "exp": int(expiration.timestamp()),
|
|
||||||
# "aud": "slopserver",
|
|
||||||
# "sub": str(uuid),
|
|
||||||
# "client_id": str(uuid),
|
|
||||||
# "iat": int(datetime.now().timestamp()),
|
|
||||||
# "jti": str(uuid)
|
|
||||||
# }
|
|
||||||
|
|
||||||
# encoded_jwt = jwt.encode(bearer_token, TOKEN_SECRET, ALGO)
|
|
||||||
|
|
||||||
# return encoded_jwt
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||||
9
slopserver/settings.py
Normal file
9
slopserver/settings.py
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
from pydantic_settings import BaseSettings
|
||||||
|
|
||||||
|
|
||||||
|
class ServerSettings(BaseSettings):
|
||||||
|
db_url: str = "sqlite+pysqlite:///test_db.sqlite"
|
||||||
|
token_secret: str = "5bcc778a96b090c3ac1d587bb694a060eaf7bdb5832365f91d5078faf1fff210"
|
||||||
|
altcha_secret: str = "0460de065912d0292df1e7422a5ed2dc362ed56d6bab64fe50b89957463061f3"
|
||||||
|
|
||||||
|
settings = ServerSettings()
|
||||||
Reference in New Issue
Block a user