diff --git a/README.md b/README.md
index 11b72b9..84018f9 100644
--- a/README.md
+++ b/README.md
@@ -75,10 +75,9 @@ PantherDB is a Simple, FileBase and Document Oriented datab
```python
users: list[PantherDocument] = db.collection('User').find(last_name='Rn')
```
-
-- #### All documents:
+ or all documents
```python
- users: list[PantherDocument] = db.collection('User').all()
+ users: list[PantherDocument] = db.collection('User').find()
```
- #### Count documents:
diff --git a/pantherdb/__init__.py b/pantherdb/__init__.py
index 115fe74..0d2a4d1 100644
--- a/pantherdb/__init__.py
+++ b/pantherdb/__init__.py
@@ -1,6 +1,3 @@
-from pantherdb.pantherdb import * # noqa: F403
+from pantherdb.pantherdb import PantherDB, PantherCollection, PantherDocument, PantherDBException, Cursor
-__version__ = '1.4.0'
-
-
-__all__ = ('__version__', 'PantherDB', 'PantherCollection', 'PantherDocument', 'PantherDBException')
+__version__ = '2.0.0'
diff --git a/pantherdb/pantherdb.py b/pantherdb/pantherdb.py
index 09d10e4..44732cc 100644
--- a/pantherdb/pantherdb.py
+++ b/pantherdb/pantherdb.py
@@ -1,8 +1,7 @@
from __future__ import annotations
-import typing
from pathlib import Path
-from typing import ClassVar, Iterator
+from typing import ClassVar, Iterator, Any, List, Tuple, Union
import orjson as json
@@ -17,8 +16,9 @@ class PantherDB:
_instances: ClassVar[dict] = {}
db_name: str = 'database.pdb'
__secret_key: bytes | None
- __fernet: typing.Any # type[cryptography.fernet.Fernet | None]
+ __fernet: Any # type[cryptography.fernet.Fernet | None]
__return_dict: bool
+ __return_cursor: bool
__content: dict
__ulid: ULID
@@ -36,6 +36,8 @@ def __new__(cls, *args, **kwargs):
# Replace with .removesuffix('.pdb') after python3.8 compatible support
if db_name.endswith('.pdb'):
db_name = db_name[:-4]
+ elif db_name.endswith('.json'):
+ db_name = db_name[:-5]
if db_name not in cls._instances:
cls._instances[db_name] = super().__new__(cls)
@@ -46,9 +48,11 @@ def __init__(
db_name: str | None = None,
*,
return_dict: bool = False,
+ return_cursor: bool = False,
secret_key: bytes | None = None,
):
self.__return_dict = return_dict
+ self.__return_cursor = return_cursor
self.__secret_key = secret_key
self.__ulid = ULID()
self.__content = {}
@@ -60,8 +64,11 @@ def __init__(
self.__fernet = None
if db_name:
- if not db_name.endswith('pdb'):
- db_name = f'{db_name}.pdb'
+ if not db_name.endswith(('pdb', 'json')):
+ if self.__secret_key:
+ db_name = f'{db_name}.pdb'
+ else:
+ db_name = f'{db_name}.json'
self.db_name = db_name
Path(self.db_name).touch(exist_ok=True)
@@ -76,6 +83,10 @@ def __str__(self) -> str:
def content(self) -> dict:
return self.__content
+ @property
+ def return_cursor(self) -> bool:
+ return self.__return_cursor
+
@property
def return_dict(self) -> bool:
return self.__return_dict
@@ -110,16 +121,18 @@ def _refresh(self) -> None:
else:
try:
decrypted_data: bytes = self.__fernet.decrypt(data)
- self.__content = json.loads(decrypted_data)
except Exception: # type[cryptography.fernet.InvalidToken]
error = '"secret_key" Is Not Valid'
raise PantherDBException(error)
+ self.__content = json.loads(decrypted_data)
+
def collection(self, collection_name: str) -> PantherCollection:
return PantherCollection(
db_name=self.db_name,
collection_name=collection_name,
return_dict=self.return_dict,
+ return_cursor=self.return_cursor,
secret_key=self.secret_key,
)
@@ -136,9 +149,10 @@ def __init__(
*,
collection_name: str,
return_dict: bool,
+ return_cursor: bool,
secret_key: bytes,
):
- super().__init__(db_name=db_name, return_dict=return_dict, secret_key=secret_key)
+ super().__init__(db_name=db_name, return_dict=return_dict, return_cursor=return_cursor, secret_key=secret_key)
self.__collection_name = collection_name
def __str__(self) -> str:
@@ -167,6 +181,7 @@ def __create_result(self, data: dict, /) -> PantherDocument | dict:
db_name=self.db_name,
collection_name=self.collection_name,
return_dict=self.return_dict,
+ return_cursor=self.return_cursor,
secret_key=self.secret_key,
**data,
)
@@ -213,17 +228,14 @@ def find_one(self, **kwargs) -> PantherDocument | dict | None:
# Return the first document
return d
- def find(self, **kwargs) -> list[PantherDocument | dict]:
+ def find(self, **kwargs) -> Cursor | List[PantherDocument | dict]:
documents = self._get_collection()
- # Empty Collection
- if not documents:
- return []
+ result = [d for _, d in self._find(documents, **kwargs) if d is not None]
- if not kwargs:
- return self.all()
-
- return [d for _, d in self._find(documents, **kwargs) if d is not None]
+ if self.return_cursor:
+ return Cursor(result)
+ return result
def first(self, **kwargs) -> PantherDocument | dict | None:
return self.find_one(**kwargs)
@@ -243,12 +255,6 @@ def last(self, **kwargs) -> PantherDocument | dict | None:
# Return the first one
return d
- def all(self) -> list[PantherDocument | dict]:
- if self.return_dict:
- return self._get_collection()
- else:
- return [self.__create_result(r) for r in self._get_collection()]
-
def insert_one(self, **kwargs) -> PantherDocument | dict:
documents = self._get_collection()
kwargs['_id'] = self.ulid.new()
@@ -372,6 +378,7 @@ def __init__(
*,
collection_name: str,
return_dict: bool,
+ return_cursor: bool,
secret_key: bytes,
**kwargs,
):
@@ -380,6 +387,7 @@ def __init__(
db_name=db_name,
collection_name=collection_name,
return_dict=return_dict,
+ return_cursor=return_cursor,
secret_key=secret_key,
)
@@ -399,6 +407,7 @@ def __getattr__(self, item: str):
def __setattr__(self, key, value):
if key not in [
'_PantherDB__return_dict',
+ '_PantherDB__return_cursor',
'_PantherDB__secret_key',
'_PantherDB__content',
'_PantherDB__fernet',
@@ -414,6 +423,10 @@ def __setattr__(self, key, value):
super().__setattr__(key, value)
+ __setitem__ = __setattr__
+
+ __getitem__ = __getattr__
+
@property
def id(self) -> int:
return self.data['_id']
@@ -434,3 +447,27 @@ def save(self) -> None:
def json(self) -> str:
return json.dumps(self.data).decode()
+
+
+class Cursor:
+ def __init__(self, documents: List[dict | PantherDocument]):
+ self.documents = documents
+ self._cursor = 0
+
+ def next(self):
+ try:
+ result = self.documents[self._cursor]
+ except IndexError:
+ raise StopIteration
+ self._cursor += 1
+ return result
+
+ __next__ = next
+
+ def __getitem__(self, index: int | slice) -> Union[Cursor, dict, ...]:
+ return self.documents[index]
+
+ def sort(self, sorts: List[Tuple[str, int]]):
+ for sort in sorts[::-1]:
+ self.documents.sort(key=lambda x: x[sort[0]], reverse=bool(sort[1] == -1))
+ return self
diff --git a/tests/test_normal.py b/tests/test_normal.py
index c2ab298..7f9153a 100644
--- a/tests/test_normal.py
+++ b/tests/test_normal.py
@@ -6,7 +6,7 @@
import orjson as json
from faker import Faker
-from pantherdb import PantherCollection, PantherDB, PantherDocument
+from pantherdb import PantherCollection, PantherDB, PantherDocument, Cursor
f = Faker()
@@ -55,7 +55,7 @@ def test_creation_of_db(self):
def test_creation_of_db_without_extension(self):
db_name = uuid4().hex
db = PantherDB(db_name=db_name)
- final_db_name = f'{db_name}.pdb'
+ final_db_name = f'{db_name}.json'
assert Path(final_db_name).exists()
assert Path(final_db_name).is_file()
@@ -385,48 +385,7 @@ def test_find_without_filter(self):
assert specific_count == _count_2
- def test_find_all(self):
- collection = self.db.collection(f.word())
-
- # Add others
- _count_1 = self.create_junk_document(collection)
-
- # Insert with specific names
- first_name = f.first_name()
- _count_2 = f.random.randint(2, 10)
- for i in range(_count_2):
- collection.insert_one(first_name=first_name, last_name=f.last_name())
-
- # Find
- objs = collection.all()
- _count_all = _count_1 + _count_2
-
- assert isinstance(objs, list)
- assert len(objs) == _count_all
- for i in range(_count_all):
- assert isinstance(objs[i], PantherDocument)
-
- # Check count of specific name
- specific_count = 0
- for i in range(_count_all):
- if objs[i].first_name == first_name:
- specific_count += 1
-
- assert specific_count == _count_2
-
# Count
- def test_count_all(self):
- collection = self.db.collection(f.word())
-
- # Add others
- _count = self.create_junk_document(collection)
-
- # Count them
- count_all = collection.count()
-
- assert count_all == _count
- assert count_all == len(collection.all())
-
def test_count_with_filter(self):
collection = self.db.collection(f.word())
@@ -727,6 +686,157 @@ def test_document_json_method(self):
}
assert obj.json() == json.dumps(_json).decode()
+class TestCursorPantherDB(TestCase):
+
+ @classmethod
+ def setUp(cls):
+ cls.db_name = uuid4().hex
+ cls.db_name = f'{cls.db_name}.pdb'
+ cls.db = PantherDB(db_name=cls.db_name, return_cursor=True)
+
+ @classmethod
+ def tearDown(cls):
+ Path(cls.db_name).unlink()
+
+ @classmethod
+ def create_junk_document(cls, collection) -> int:
+ _count = f.random.randint(2, 10)
+ for i in range(_count):
+ collection.insert_one(first_name=f'{f.first_name()}{i}', last_name=f'{f.last_name()}{i}')
+ return _count
+
+ # Find
+ def test_find_response_type(self):
+ collection = self.db.collection(f.word())
+ first_name = f.first_name()
+ collection.insert_one(first_name=first_name, last_name=f.last_name())
+
+ # Find
+ objs = collection.find(first_name=first_name)
+
+ assert isinstance(objs, Cursor)
+ assert len([o for o in objs]) == 1
+ assert isinstance(objs[0], PantherDocument)
+
+ def test_find_with_filter(self):
+ collection = self.db.collection(f.word())
+
+ # Add others
+ self.create_junk_document(collection)
+
+ # Insert with specific names
+ first_name = f.first_name()
+ _count = f.random.randint(2, 10)
+ last_names = []
+ for i in range(_count):
+ last_name = f.last_name()
+ last_names.append(last_name)
+ collection.insert_one(first_name=first_name, last_name=last_name)
+
+ # Find
+ objs = collection.find(first_name=first_name)
+
+ assert isinstance(objs, Cursor)
+ assert len([o for o in objs]) == _count
+ for i in range(_count):
+ assert objs[i].first_name == first_name
+ assert objs[i].last_name == last_names[i]
+
+ def test_find_without_filter(self):
+ collection = self.db.collection(f.word())
+
+ # Add others
+ _count_1 = self.create_junk_document(collection)
+
+ # Insert with specific names
+ first_name = f.first_name()
+ _count_2 = f.random.randint(2, 10)
+ for i in range(_count_2):
+ collection.insert_one(first_name=first_name, last_name=f.last_name())
+
+ # Find
+ objs = collection.find()
+ _count_all = _count_1 + _count_2
+
+ assert isinstance(objs, Cursor)
+ assert len([o for o in objs]) == _count_all
+ for i in range(_count_all):
+ assert isinstance(objs[i], PantherDocument)
+
+ # Check count of specific name
+ specific_count = 0
+ for i in range(_count_all):
+ if objs[i].first_name == first_name:
+ specific_count += 1
+
+ assert specific_count == _count_2
+
+ def test_find_with_sort(self):
+ collection = self.db.collection(f.word())
+
+ # Insert with specific values
+ collection.insert_one(first_name='A', last_name=0)
+ collection.insert_one(first_name='A', last_name=1)
+ collection.insert_one(first_name='B', last_name=0)
+ collection.insert_one(first_name='B', last_name=1)
+
+ # Find without sort
+ objs = collection.find()
+ assert (objs[0].first_name, objs[0].last_name) == ('A', 0)
+ assert (objs[1].first_name, objs[1].last_name) == ('A', 1)
+ assert (objs[2].first_name, objs[2].last_name) == ('B', 0)
+ assert (objs[3].first_name, objs[3].last_name) == ('B', 1)
+
+ # Find with single sort
+ objs = collection.find().sort([('first_name', 1)])
+ assert (objs[0].first_name, objs[0].last_name) == ('A', 0)
+ assert (objs[1].first_name, objs[1].last_name) == ('A', 1)
+ assert (objs[2].first_name, objs[2].last_name) == ('B', 0)
+ assert (objs[3].first_name, objs[3].last_name) == ('B', 1)
+
+ objs = collection.find().sort([('first_name', -1)])
+ assert (objs[0].first_name, objs[0].last_name) == ('B', 0)
+ assert (objs[1].first_name, objs[1].last_name) == ('B', 1)
+ assert (objs[2].first_name, objs[2].last_name) == ('A', 0)
+ assert (objs[3].first_name, objs[3].last_name) == ('A', 1)
+
+ objs = collection.find().sort([('last_name', 1)])
+ assert (objs[0].first_name, objs[0].last_name) == ('A', 0)
+ assert (objs[1].first_name, objs[1].last_name) == ('B', 0)
+ assert (objs[2].first_name, objs[2].last_name) == ('A', 1)
+ assert (objs[3].first_name, objs[3].last_name) == ('B', 1)
+
+ objs = collection.find().sort([('last_name', -1)])
+ assert (objs[0].first_name, objs[0].last_name) == ('A', 1)
+ assert (objs[1].first_name, objs[1].last_name) == ('B', 1)
+ assert (objs[2].first_name, objs[2].last_name) == ('A', 0)
+ assert (objs[3].first_name, objs[3].last_name) == ('B', 0)
+
+ # Find with multiple sort
+ objs = collection.find().sort([('first_name', 1), ('last_name', 1)])
+ assert (objs[0].first_name, objs[0].last_name) == ('A', 0)
+ assert (objs[1].first_name, objs[1].last_name) == ('A', 1)
+ assert (objs[2].first_name, objs[2].last_name) == ('B', 0)
+ assert (objs[3].first_name, objs[3].last_name) == ('B', 1)
+
+ objs = collection.find().sort([('first_name', 1), ('last_name', -1)])
+ assert (objs[0].first_name, objs[0].last_name) == ('A', 1)
+ assert (objs[1].first_name, objs[1].last_name) == ('A', 0)
+ assert (objs[2].first_name, objs[2].last_name) == ('B', 1)
+ assert (objs[3].first_name, objs[3].last_name) == ('B', 0)
+
+ objs = collection.find().sort([('first_name', -1), ('last_name', 1)])
+ assert (objs[0].first_name, objs[0].last_name) == ('B', 0)
+ assert (objs[1].first_name, objs[1].last_name) == ('B', 1)
+ assert (objs[2].first_name, objs[2].last_name) == ('A', 0)
+ assert (objs[3].first_name, objs[3].last_name) == ('A', 1)
+
+ objs = collection.find().sort([('first_name', -1), ('last_name', -1)])
+ assert (objs[0].first_name, objs[0].last_name) == ('B', 1)
+ assert (objs[1].first_name, objs[1].last_name) == ('B', 0)
+ assert (objs[2].first_name, objs[2].last_name) == ('A', 1)
+ assert (objs[3].first_name, objs[3].last_name) == ('A', 0)
+
# TODO: Test whole scenario with -> secret_key, return_dict
# TODO: Test where exceptions happen