74 lines
2.8 KiB
Python
74 lines
2.8 KiB
Python
from collections.abc import Iterable
|
|
from datetime import datetime
|
|
from urllib.parse import ParseResult
|
|
from sqlalchemy import select
|
|
from sqlalchemy.engine import Engine
|
|
from sqlalchemy.orm import Session
|
|
from slopserver.models import Domain, Path, User, Report
|
|
|
|
def select_slop(urls: list[ParseResult], engine: Engine) -> Iterable[Domain]:
|
|
query = select(Domain).where(Domain.domain_name.in_(url[1] for url in urls))
|
|
with Session(engine) as session:
|
|
rows = session.scalars(query).all()
|
|
return rows
|
|
|
|
def insert_slop(urls: list[ParseResult], engine: Engine, user: User | None = None):
|
|
domain_dict: dict[str. set[str]] = dict()
|
|
for url in urls:
|
|
if not domain_dict.get(url[1]):
|
|
domain_dict[url[1]] = set()
|
|
|
|
if url.path:
|
|
domain_dict[url[1]].add(url.path)
|
|
|
|
# get existing domains
|
|
query = select(Domain).where(Domain.domain_name.in_(domain_dict.keys()))
|
|
|
|
existing_dict: dict[str, Domain] = dict()
|
|
with Session(engine) as session:
|
|
existing_domains = session.scalars(query).all()
|
|
for domain in existing_domains:
|
|
existing_dict[domain.domain_name] = domain
|
|
|
|
for domain, paths in domain_dict.items():
|
|
if not domain in existing_dict:
|
|
# create a new domain object and paths
|
|
new_domain = Domain(domain_name=domain, paths=list())
|
|
new_domain.paths = [Path(path=path) for path in paths]
|
|
session.add(new_domain)
|
|
if user:
|
|
for path in new_domain.paths:
|
|
new_report = Report(path_id=path.id, user_id=user.id)
|
|
session.add(new_report)
|
|
|
|
else:
|
|
existing_domain = existing_dict[domain]
|
|
existing_paths = set((path.path for path in existing_domain.paths))
|
|
for path in paths:
|
|
if not path in existing_paths:
|
|
new_path = Path(path=path)
|
|
existing_domain.paths.append(new_path)
|
|
session.add(new_path)
|
|
session.flush([new_path])
|
|
session.refresh(new_path)
|
|
if user:
|
|
new_report = Report(
|
|
path_id=new_path.id, user_id=user.id, timestamp=datetime.now())
|
|
session.add(new_report)
|
|
|
|
session.commit()
|
|
|
|
def get_user(email, engine):
|
|
query = select(User).where(User.email == email)
|
|
|
|
with Session(engine) as session:
|
|
user = session.scalar(query)
|
|
return user
|
|
|
|
def create_user(email, password_hash, engine):
|
|
user = User(email=email, password_hash=password_hash, email_verified=False)
|
|
|
|
with Session(engine) as session:
|
|
session.add(user)
|
|
session.commit()
|