Skip to content

Commit

Permalink
Multi cherry from 2.2 to master (#1453)
Browse files Browse the repository at this point in the history
- RBAC supports Database validation (#1396)
- Support database API (#1401)
- Using database by URI (#1420)

Signed-off-by: yangxuan <[email protected]>
Co-authored-by: SimFG <[email protected]>
Co-authored-by: jaime <[email protected]>
  • Loading branch information
3 people authored May 23, 2023
1 parent 69d9b40 commit 256a523
Show file tree
Hide file tree
Showing 11 changed files with 320 additions and 41 deletions.
144 changes: 144 additions & 0 deletions examples/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import random

from pymilvus import (
connections,
FieldSchema, CollectionSchema, DataType,
Collection,
db,
)
from pymilvus.orm import utility

_HOST = '127.0.0.1'
_PORT = '19530'
_ROOT = "root"
_ROOT_PASSWORD = "Milvus"
_METRIC_TYPE = 'IP'
_INDEX_TYPE = 'IVF_FLAT'
_NLIST = 1024
_NPROBE = 16
_TOPK = 3

# Vector parameters
_DIM = 128
_INDEX_FILE_SIZE = 32 # max file size of stored index


def connect_to_milvus(db_name="default"):
print(f"connect to milvus\n")
connections.connect(host=_HOST,
port=_PORT,
user=_ROOT,
password=_ROOT_PASSWORD,
db_name=db_name,
)


def connect_to_milvus_with_uri(db_name="default"):
print(f"connect to milvus\n")
connections.connect(
alias="uri-connection",
uri="http://{}:{}/{}".format(_HOST, _PORT, db_name),
)


def create_collection(collection_name, db_name):
default_fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="double", dtype=DataType.DOUBLE),
FieldSchema(name="fv", dtype=DataType.FLOAT_VECTOR, dim=128)
]
default_schema = CollectionSchema(fields=default_fields)
print(f"Create collection:{collection_name} within db:{db_name}")
return Collection(name=collection_name, schema=default_schema)


def insert(collection, num, dim):
data = [
[i for i in range(num)],
[float(i) for i in range(num)],
[[random.random() for _ in range(dim)] for _ in range(num)],
]
collection.insert(data)
return data[2]


def drop_index(collection):
collection.drop_index()
print("\nDrop index sucessfully")


def search(collection, vector_field, id_field, search_vectors):
search_param = {
"data": search_vectors,
"anns_field": vector_field,
"param": {"metric_type": _METRIC_TYPE, "params": {"nprobe": _NPROBE}},
"limit": _TOPK,
"expr": "id >= 0"}
results = collection.search(**search_param)
for i, result in enumerate(results):
print("\nSearch result for {}th vector: ".format(i))
for j, res in enumerate(result):
print("Top {}: {}".format(j, res))


def collection_read_write(collection, db_name):
col_name = "{}:{}".format(db_name, collection.name)
vectors = insert(collection, 10000, _DIM)
collection.flush()
print("\nInsert {} rows data into collection:{}".format(collection.num_entities, col_name))

# create index
index_param = {
"index_type": _INDEX_TYPE,
"params": {"nlist": _NLIST},
"metric_type": _METRIC_TYPE}
collection.create_index("fv", index_param)
print("\nCreated index:{} for collection:{}".format(collection.index().params, col_name))

# load data to memory
print("\nLoad collection:{}".format(col_name))
collection.load()
# search
print("\nSearch collection:{}".format(col_name))
search(collection, "fv", "id", vectors[:3])

# release memory
collection.release()
# drop collection index
collection.drop_index()
print("\nDrop collection:{}".format(col_name))


if __name__ == '__main__':
# connect to milvus and using database db1
# there will not check db1 already exists during connect
connect_to_milvus(db_name="default")

# create collection within default
col1_db1 = create_collection("col1_db1", "default")

# create db1
if "db1" not in db.list_database():
print("\ncreate database: db1")
db.create_database(db_name="db1")

# use database db1
db.using_database(db_name="db1")
# create collection within default
col2_db1 = create_collection("col1_db1", "db1")

# verify read and write
collection_read_write(col2_db1, "db1")

# list collections within db1
print("\nlist collections of database db1:")
print(utility.list_collections())

print("\ndrop collection: col1_db2 from db1")
col2_db1.drop()
print("\ndrop database: db1")
db.drop_database(db_name="db1")

# list database
print("\nlist databases:")
print(db.list_database())
29 changes: 18 additions & 11 deletions examples/role_and_privilege.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@

_CONNECTION = "demo"
_FOO_CONNECTION = "foo_connection"
_DB_NAME = "foo_db"
_HOST = '127.0.0.1'
_PORT = '19530'
_ROOT = "root"
_ROOT_PASSWORD = "Milvus"
_COLLECTION_NAME = "foocol2"


def connect_to_milvus(connection=_CONNECTION, user=_ROOT, password=_ROOT_PASSWORD):
def connect_to_milvus(connection=_CONNECTION, user=_ROOT, password=_ROOT_PASSWORD, db_name="default"):
print(f"connect to milvus\n")
connections.connect(alias=connection,
host=_HOST,
port=_PORT,
user=user,
password=password,
db_name=db_name,
)


Expand Down Expand Up @@ -133,9 +135,9 @@ def rbac_user(username, password, role_name, connection=_CONNECTION):
is_exception = True
assert is_exception
role = Role(role_name, using=_CONNECTION)
role.grant("User", "*", "SelectUser")
role.grant("User", "*", "SelectUser", db_name=_DB_NAME)
print(select_all_user(connection))
role.revoke("User", "*", "SelectUser")
role.revoke("User", "*", "SelectUser", db_name=_DB_NAME)


def role_example():
Expand Down Expand Up @@ -182,26 +184,31 @@ def privilege_example():
print(f"add user")
role.add_user(username)
print(f"grant privilege")
role.grant("Global", "*", privilege_create)
role.grant("Collection", object_name, privilege_insert)
# role.grant("Collection", object_name, "*")
# role.grant("Collection", "*", privilege_insert)
role.grant("Global", "*", privilege_create, db_name=_DB_NAME)
role.grant("Collection", object_name, privilege_insert, db_name=_DB_NAME)
# role.grant("Collection", object_name, "*", db_name=_DB_NAME)
# role.grant("Collection", "*", privilege_insert, db_name=_DB_NAME)

print(f"list grants")
print(role.list_grants(db_name=_DB_NAME))
print(f"list grant")
print(role.list_grant("Collection", object_name, db_name=_DB_NAME))

print(f"list grants")
print(role.list_grants())
print(f"list grant")
print(role.list_grant("Collection", object_name))

connect_to_milvus(connection=_FOO_CONNECTION, user=username, password=password)
connect_to_milvus(connection=_FOO_CONNECTION, user=username, password=password, db_name=_DB_NAME)
has_collection(_COLLECTION_NAME, connection=_FOO_CONNECTION)
rbac_collection(connection=_FOO_CONNECTION)
rbac_user(username, password, role_name, connection=_FOO_CONNECTION)

print(f"revoke privilege")
role.revoke("Global", "*", privilege_create)
role.revoke("Collection", object_name, privilege_insert)
# role.revoke("Collection", object_name, "*")
# role.revoke("Collection", "*", privilege_insert)
role.revoke("Collection", object_name, privilege_insert, db_name=_DB_NAME)
# role.revoke("Collection", object_name, "*", db_name=_DB_NAME)
# role.revoke("Collection", "*", privilege_insert, db_name=_DB_NAME)
print(f"remove user")
role.remove_user(username)
role.drop()
Expand Down
4 changes: 2 additions & 2 deletions pymilvus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
list_resource_groups, transfer_node, transfer_replica
)

from .orm import utility
from .orm import utility, db

from .orm.search import SearchResult, Hits, Hit
from .orm.schema import FieldSchema, CollectionSchema
Expand All @@ -86,7 +86,7 @@
'SearchResult', 'Hits', 'Hit', 'Replica', 'Group', 'Shard',
'FieldSchema', 'CollectionSchema',
'SearchFuture', 'MutationFuture',
'utility', 'DefaultConfig', 'ExceptionsMessage', 'MilvusUnavailableException', 'BulkInsertState',
'utility', 'db', 'DefaultConfig', 'ExceptionsMessage', 'MilvusUnavailableException', 'BulkInsertState',
'Role',
'create_resource_group', 'drop_resource_group', 'describe_resource_group',
'list_resource_groups', 'transfer_node', 'transfer_replica',
Expand Down
1 change: 1 addition & 0 deletions pymilvus/client/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ def is_legal_operate_privilege_type(operate_privilege_type: Any) -> bool:
class ParamChecker(metaclass=Singleton):
def __init__(self) -> None:
self.check_dict = {
"db_name": is_legal_table_name,
"collection_name": is_legal_table_name,
"field_name": is_legal_field_name,
"dimension": is_legal_dimension,
Expand Down
51 changes: 43 additions & 8 deletions pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def __init__(self, uri=Config.GRPC_URI, host="", port="", channel=None, **kwargs
self._request_id = None
self._user = kwargs.get("user", None)
self._set_authorization(**kwargs)
self._setup_db_interceptor(kwargs.get("db_name", None))
self._setup_grpc_channel()

def __get_address(self, uri: str, host: str, port: str) -> str:
Expand Down Expand Up @@ -127,12 +128,22 @@ def _wait_for_channel_ready(self, timeout=10):
def close(self):
self._channel.close()

def reset_db_name(self, db_name):
self._setup_db_interceptor(db_name)
self._setup_grpc_channel()

def _setup_authorization_interceptor(self, user, password):
if user and password:
authorization = base64.b64encode(f"{user}:{password}".encode('utf-8'))
key = "authorization"
self._authorization_interceptor = interceptor.header_adder_interceptor(key, authorization)

def _setup_db_interceptor(self, db_name):
if db_name:
self._db_interceptor = interceptor.header_adder_interceptor("dbname", db_name)
else:
self._db_interceptor = None

def _setup_grpc_channel(self):
""" Create a ddl grpc channel """
if self._channel is None:
Expand Down Expand Up @@ -174,6 +185,8 @@ def _setup_grpc_channel(self):
self._final_channel = self._channel
if self._authorization_interceptor:
self._final_channel = grpc.intercept_channel(self._final_channel, self._authorization_interceptor)
if self._db_interceptor:
self._final_channel = grpc.intercept_channel(self._final_channel, self._db_interceptor)
if self._log_level:
log_level_interceptor = interceptor.header_adder_interceptor("log_level", self._log_level)
self._final_channel = grpc.intercept_channel(self._final_channel, log_level_interceptor)
Expand Down Expand Up @@ -809,6 +822,28 @@ def get_loading_progress(self, collection_name, partition_names=None, timeout=No
raise MilvusException(response.status.error_code, response.status.reason)
return response.progress

@retry_on_rpc_failure()
def create_database(self, db_name, timeout=None):
request = Prepare.create_database_req(db_name)
status = self._stub.CreateDatabase(request, timeout=timeout)
if status.error_code != 0:
raise MilvusException(status.error_code, status.reason)

@retry_on_rpc_failure()
def drop_database(self, db_name, timeout=None):
request = Prepare.drop_database_req(db_name)
status = self._stub.DropDatabase(request, timeout=timeout)
if status.error_code != 0:
raise MilvusException(status.error_code, status.reason)

@retry_on_rpc_failure()
def list_database(self, timeout=None):
request = Prepare.list_database_req()
response = self._stub.ListDatabases(request, timeout=timeout)
if response.status.error_code != 0:
raise MilvusException(response.status.error_code, response.status.reason)
return list(response.db_names)

@retry_on_rpc_failure()
def get_load_state(self, collection_name, partition_names=None, timeout=None):
request = Prepare.get_load_state(collection_name, partition_names)
Expand Down Expand Up @@ -1227,33 +1262,33 @@ def select_all_user(self, include_role_info, timeout=None, **kwargs):
return UserInfo(resp.results)

@retry_on_rpc_failure()
def grant_privilege(self, role_name, object, object_name, privilege, timeout=None, **kwargs):
req = Prepare.operate_privilege_request(role_name, object, object_name, privilege,
def grant_privilege(self, role_name, object, object_name, privilege, db_name, timeout=None, **kwargs):
req = Prepare.operate_privilege_request(role_name, object, object_name, privilege, db_name,
milvus_types.OperatePrivilegeType.Grant)
resp = self._stub.OperatePrivilege(req, wait_for_ready=True, timeout=timeout)
if resp.error_code != 0:
raise MilvusException(resp.error_code, resp.reason)

@retry_on_rpc_failure()
def revoke_privilege(self, role_name, object, object_name, privilege, timeout=None, **kwargs):
req = Prepare.operate_privilege_request(role_name, object, object_name, privilege,
def revoke_privilege(self, role_name, object, object_name, privilege, db_name, timeout=None, **kwargs):
req = Prepare.operate_privilege_request(role_name, object, object_name, privilege, db_name,
milvus_types.OperatePrivilegeType.Revoke)
resp = self._stub.OperatePrivilege(req, wait_for_ready=True, timeout=timeout)
if resp.error_code != 0:
raise MilvusException(resp.error_code, resp.reason)

@retry_on_rpc_failure()
def select_grant_for_one_role(self, role_name, timeout=None, **kwargs):
req = Prepare.select_grant_request(role_name, None, None)
def select_grant_for_one_role(self, role_name, db_name, timeout=None, **kwargs):
req = Prepare.select_grant_request(role_name, None, None, db_name)
resp = self._stub.SelectGrant(req, wait_for_ready=True, timeout=timeout)
if resp.status.error_code != 0:
raise MilvusException(resp.status.error_code, resp.status.reason)

return GrantInfo(resp.entities)

@retry_on_rpc_failure()
def select_grant_for_role_and_object(self, role_name, object, object_name, timeout=None, **kwargs):
req = Prepare.select_grant_request(role_name, object, object_name)
def select_grant_for_role_and_object(self, role_name, object, object_name, db_name, timeout=None, **kwargs):
req = Prepare.select_grant_request(role_name, object, object_name, db_name)
resp = self._stub.SelectGrant(req, wait_for_ready=True, timeout=timeout)
if resp.status.error_code != 0:
raise MilvusException(resp.status.error_code, resp.status.reason)
Expand Down
Loading

0 comments on commit 256a523

Please sign in to comment.