-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #16 from wafflestudio/fix/Movie_and_Collection
Fix/movie and collection
- Loading branch information
Showing
49 changed files
with
1,647 additions
and
65 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
from pydantic import BaseModel | ||
from watchapedia.common.errors import InvalidFieldFormatError | ||
from typing import Annotated | ||
from pydantic.functional_validators import AfterValidator | ||
|
||
def validate_title(value: str | None) -> str | None: | ||
# title는 100자 이내여야 함 | ||
if value is None: | ||
return value | ||
if len(value) > 100: | ||
raise InvalidFieldFormatError("title") | ||
return value | ||
|
||
def validate_overview(value: str | None) -> str | None: | ||
# overview는 500자 이내여야 함 | ||
if value is None: | ||
return value | ||
if len(value) > 500: | ||
raise InvalidFieldFormatError("overview") | ||
return value | ||
|
||
|
||
class CollectionCreateRequest(BaseModel): | ||
title: Annotated[str, AfterValidator(validate_title)] | ||
overview: Annotated[str | None, AfterValidator(validate_overview)] = None | ||
movie_ids: list[int] | None = None | ||
|
||
class CollectionUpdateRequest(BaseModel): | ||
title: Annotated[str | None, AfterValidator(validate_title)] = None | ||
overview: Annotated[str | None, AfterValidator(validate_overview)] = None | ||
add_movie_ids: list[int] | None = None | ||
delete_movie_ids: list[int] | None = None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
from pydantic import BaseModel | ||
from datetime import datetime | ||
|
||
class MovieCompactResponse(BaseModel): | ||
id: int | ||
title: str | ||
poster_url: str | None | ||
average_rating: float | None | ||
|
||
class CollectionResponse(BaseModel): | ||
id: int | ||
user_id: int | ||
title: str | ||
overview: str | None | ||
likes_count: int | ||
comments_count: int | ||
created_at: datetime | ||
movies: list[MovieCompactResponse] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
from fastapi import HTTPException | ||
|
||
class CollectionNotFoundError(HTTPException): | ||
def __init__(self): | ||
super().__init__(status_code=404, detail="Collection not found") | ||
|
||
class InvalidFormatError(HTTPException): | ||
def __init__(self): | ||
super().__init__(status_code=400, detail="Collection Invalid Format") | ||
|
||
class NoSuchMovieError(HTTPException): | ||
def __init__(self): | ||
super().__init__(status_code=400, detail="No Such Movie in the Collection") | ||
|
||
class MovieAlreadyExistsError(HTTPException): | ||
def __init__(self): | ||
super().__init__(status_code=409, detail="Movie already exists in the Collection") | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
from sqlalchemy import Integer, String, Float, ForeignKey, Boolean | ||
from sqlalchemy.orm import relationship, Mapped, mapped_column | ||
from datetime import datetime | ||
from watchapedia.database.common import Base | ||
from sqlalchemy import DateTime | ||
from typing import TYPE_CHECKING | ||
if TYPE_CHECKING: | ||
from watchapedia.app.movie.models import Movie | ||
from watchapedia.app.user.models import User | ||
from watchapedia.app.collection_comment.models import CollectionComment | ||
|
||
class MovieCollection(Base): | ||
__tablename__ = 'movie_collection' | ||
|
||
movie_id: Mapped[int] = mapped_column( | ||
Integer, ForeignKey("movie.id", ondelete="CASCADE"), nullable=False, primary_key=True | ||
) | ||
collection_id: Mapped[int] = mapped_column( | ||
Integer, ForeignKey("collection.id", ondelete="CASCADE"), nullable=False, primary_key=True | ||
) | ||
|
||
class Collection(Base): | ||
__tablename__ = 'collection' | ||
|
||
id: Mapped[int] = mapped_column(Integer, primary_key=True) | ||
title: Mapped[str] = mapped_column(String(100), nullable=False) | ||
overview: Mapped[str | None] = mapped_column(String(500)) | ||
likes_count: Mapped[int] = mapped_column(Integer, nullable=False) | ||
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False) | ||
|
||
user_id: Mapped[int] = mapped_column( | ||
Integer, ForeignKey("user.id", ondelete="CASCADE"), nullable=False | ||
) | ||
user: Mapped["User"] = relationship("User", back_populates="collections") | ||
|
||
movies: Mapped[list["Movie"]] = relationship( | ||
secondary="movie_collection", back_populates="collections" | ||
) | ||
comments: Mapped[list["CollectionComment"]] = relationship( | ||
"CollectionComment", back_populates="collection", cascade="all, delete, delete-orphan" | ||
) | ||
|
||
class UserLikesCollection(Base): | ||
__tablename__ = 'user_likes_collection' | ||
|
||
id: Mapped[int] = mapped_column(Integer, primary_key=True) | ||
user_id: Mapped[int] = mapped_column( | ||
Integer, ForeignKey("user.id", ondelete="CASCADE"), nullable=False | ||
) | ||
collection_id: Mapped[int] = mapped_column( | ||
Integer, ForeignKey("collection.id", ondelete="CASCADE"), nullable=False | ||
) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
from sqlalchemy import select | ||
from sqlalchemy.orm import Session | ||
from fastapi import Depends | ||
from watchapedia.database.connection import get_db_session | ||
from typing import Annotated, Sequence | ||
from watchapedia.app.movie.models import Movie | ||
from watchapedia.app.collection.models import Collection, UserLikesCollection | ||
from datetime import datetime | ||
|
||
class CollectionRepository(): | ||
def __init__(self, session: Annotated[Session, Depends(get_db_session)]) -> None: | ||
self.session = session | ||
|
||
def create_collection( | ||
self, user_id: int, title: str, overview: str | None, created_at: datetime | ||
) -> Collection: | ||
collection = Collection( | ||
user_id=user_id, | ||
title=title, | ||
overview=overview, | ||
likes_count=0, | ||
created_at=created_at | ||
) | ||
self.session.add(collection) | ||
self.session.flush() | ||
|
||
return collection | ||
|
||
def add_collection_movie( | ||
self, collection: Collection, movies: list[Movie] | ||
) -> None: | ||
for movie in movies: | ||
collection.movies.append(movie) | ||
|
||
self.session.flush() | ||
|
||
def update_collection( | ||
self, collection: Collection, title: str | None, overview: str | None, add_movie_ids: list[int] | None, delete_movie_ids: list[int] | None | ||
) -> None: | ||
if title: | ||
collection.title = title | ||
|
||
if overview: | ||
collection.overview = overview | ||
|
||
if delete_movie_ids: | ||
for delete_movie_id in delete_movie_ids: | ||
delete_movie = self.get_movie_by_movie_id(delete_movie_id) | ||
collection.movies.remove(delete_movie) | ||
|
||
if add_movie_ids: | ||
for add_movie_id in add_movie_ids: | ||
add_movie = self.get_movie_by_movie_id(add_movie_id) | ||
collection.movies.append(add_movie) | ||
|
||
self.session.flush() | ||
|
||
def like_collection(self, user_id: int, collection: Collection) -> Collection: | ||
get_like_query = select(UserLikesCollection).filter( | ||
(UserLikesCollection.user_id == user_id) | ||
& (UserLikesCollection.collection_id == collection.id) | ||
) | ||
user_likes_collection = self.session.scalar(get_like_query) | ||
|
||
if user_likes_collection is None: | ||
user_likes_collection = UserLikesCollection( | ||
user_id=user_id, | ||
collection_id=collection.id | ||
) | ||
self.session.add(user_likes_collection) | ||
collection.likes_count += 1 | ||
else: | ||
self.session.delete(user_likes_collection) | ||
collection.likes_count -= 1 | ||
self.session.flush() | ||
return collection | ||
|
||
def get_collections_by_user_id(self, user_id: int) -> Sequence[Collection]: | ||
collections_list_query = select(Collection).where(Collection.user_id == user_id) | ||
return self.session.scalars(collections_list_query).all() | ||
|
||
def get_collection_by_collection_id(self, collection_id: int) -> Collection | None: | ||
get_collection_query = select(Collection).filter(Collection.id == collection_id) | ||
return self.session.scalar(get_collection_query) | ||
|
||
# title으로 복수의 collection get. 부분집합 허용. | ||
def search_collection_list(self, title: str) -> list[Collection] | None: | ||
get_collection_query = select(Collection).filter(Collection.title.ilike(f"%{title}%")) | ||
return self.session.execute(get_collection_query).scalars().all() | ||
|
||
def get_movie_by_movie_id(self, movie_id: int) -> Movie | None: | ||
get_movie_query = select(Movie).filter(Movie.id==movie_id) | ||
return self.session.scalar(get_movie_query) | ||
|
||
def delete_collection_by_id(self, collection: Collection) -> None: | ||
self.session.delete(collection) | ||
self.session.flush() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
from typing import Annotated | ||
from fastapi import Depends | ||
from datetime import datetime | ||
from watchapedia.app.collection.repository import CollectionRepository | ||
from watchapedia.app.collection.models import Collection | ||
from watchapedia.app.user.models import User | ||
from watchapedia.app.collection.dto.responses import CollectionResponse, MovieCompactResponse | ||
from watchapedia.app.movie.repository import MovieRepository | ||
from watchapedia.app.movie.errors import MovieNotFoundError | ||
from watchapedia.app.collection.errors import * | ||
from watchapedia.common.errors import PermissionDeniedError | ||
|
||
class CollectionService: | ||
def __init__( | ||
self, | ||
movie_repository: Annotated[MovieRepository, Depends()], | ||
collection_repository: Annotated[CollectionRepository, Depends()] | ||
) -> None: | ||
self.movie_repository = movie_repository | ||
self.collection_repository = collection_repository | ||
|
||
def create_collection( | ||
self, user_id: int, movie_ids: list[int] | None, title: str, overview: str | None | ||
) -> CollectionResponse: | ||
collection = self.collection_repository.create_collection( | ||
user_id=user_id, title=title, overview=overview, created_at=datetime.now() | ||
) | ||
if movie_ids: | ||
movies = [] | ||
for movie_id in movie_ids: | ||
movie = self.movie_repository.get_movie_by_movie_id(movie_id=movie_id) | ||
if movie is None: | ||
raise MovieNotFoundError() | ||
movies.append(movie) | ||
self.collection_repository.add_collection_movie(collection=collection, movies=movies) | ||
|
||
return self._process_collection_response(collection) | ||
|
||
def update_collection( | ||
self, collection_id: int, user_id: int, title: int | None, overview: int | None, add_movie_ids: list[int] | None, delete_movie_ids: list[int] | None | ||
) -> None: | ||
if not any([title, overview, add_movie_ids, delete_movie_ids]): | ||
raise InvalidFormatError() | ||
collection = self.collection_repository.get_collection_by_collection_id(collection_id) | ||
if not collection.user_id == user_id: | ||
raise PermissionDeniedError() | ||
|
||
movie_id_list = self.get_movie_ids_from_collection(collection_id) | ||
if delete_movie_ids: | ||
for delete_movie_id in delete_movie_ids: | ||
if delete_movie_id not in movie_id_list: | ||
raise NoSuchMovieError() | ||
if add_movie_ids: | ||
for add_movie_id in add_movie_ids: | ||
if self.movie_repository.get_movie_by_movie_id(add_movie_id) is None: | ||
raise MovieNotFoundError() | ||
if add_movie_id in movie_id_list: | ||
raise MovieAlreadyExistsError() | ||
|
||
self.collection_repository.update_collection( | ||
collection=collection, title=title, overview=overview, add_movie_ids=add_movie_ids, delete_movie_ids=delete_movie_ids | ||
) | ||
|
||
def get_collection_by_collection_id(self, collection_id: int) -> CollectionResponse: | ||
collection = self.collection_repository.get_collection_by_collection_id(collection_id=collection_id) | ||
if collection is None: | ||
raise CollectionNotFoundError() | ||
return self._process_collection_response(collection) | ||
|
||
def get_movie_ids_from_collection(self, collection_id: int) -> list[int] | None: | ||
collection = self.collection_repository.get_collection_by_collection_id(collection_id=collection_id) | ||
if collection is None: | ||
raise CollectionNotFoundError() | ||
return [ movie.id for movie in collection.movies ] | ||
|
||
def like_collection(self, user_id: int, collection_id: int) -> CollectionResponse: | ||
collection = self.collection_repository.get_collection_by_collection_id(collection_id) | ||
if collection is None: | ||
raise CollectionNotFoundError() | ||
updated_collection = self.collection_repository.like_collection(user_id, collection) | ||
return self._process_collection_response(updated_collection) | ||
|
||
def get_user_collections(self, user: User) -> list[CollectionResponse]: | ||
collections = self.collection_repository.get_collections_by_user_id(user.id) | ||
return [ self._process_collection_response(collection) for collection in collections ] | ||
|
||
def search_collection_list(self, title: str) -> list[CollectionResponse] | None: | ||
collections = self.collection_repository.search_collection_list(title) | ||
return [ self._process_collection_response(collection) for collection in collections ] | ||
|
||
def delete_collection_by_id(self, collection_id: int, user: User) -> None: | ||
collection = self.collection_repository.get_collection_by_collection_id(collection_id) | ||
if collection is None: | ||
raise CollectionNotFoundError() | ||
if collection.user_id != user.id: | ||
raise PermissionDeniedError() | ||
self.collection_repository.delete_collection_by_id(collection) | ||
|
||
def _process_collection_response(self, collection: Collection) -> CollectionResponse: | ||
return CollectionResponse( | ||
id=collection.id, | ||
user_id=collection.user_id, | ||
title=collection.title, | ||
overview=collection.overview, | ||
likes_count=collection.likes_count, | ||
comments_count=len(collection.comments), | ||
created_at=collection.created_at, | ||
movies=[ | ||
MovieCompactResponse( | ||
id=movie.id, | ||
title=movie.title, | ||
poster_url=movie.poster_url, | ||
average_rating=movie.average_rating | ||
) | ||
for movie in collection.movies | ||
] | ||
) |
Oops, something went wrong.