Source code for milvus.client.stub

# -*- coding: UTF-8 -*-

import collections
import copy
import functools
import logging
import threading

from urllib.parse import urlparse

from . import __version__
from .types import IndexType, MetricType, Status
from .check import check_pass_param, is_legal_host, is_legal_port
from .pool import ConnectionPool
from .grpc_handler import GrpcHandler
from .http_handler import HttpHandler
from .exceptions import ParamError, NotConnectError, DeprecatedError

from ..settings import DefaultConfig as config

LOGGER = logging.getLogger(__name__)


def deprecated(func):
    @functools.wraps(func)
    def inner(*args, **kwargs):
        error_str = "Function {} has been deprecated".format(func.__name__)
        LOGGER.error(error_str)
        raise DeprecatedError(error_str)

    return inner


def check_connect(func):
    @functools.wraps(func)
    def inner(self, *args, **kwargs):
        return func(self, *args, **kwargs)

    return inner


def _pool_args(**kwargs):
    pool_kwargs = dict()
    for k, v in kwargs.items():
        if k in ("pool_size", "wait_timeout", "handler", "try_connect", "pre_ping"):
            pool_kwargs[k] = v

    return pool_kwargs


def _set_uri(host, port, uri, handler="GRPC"):
    default_port = config.GRPC_PORT if handler == "GRPC" else config.HTTP_PORT
    default_uri = config.GRPC_URI if handler == "GRPC" else config.HTTP_URI
    uri_prefix = "tcp://" if handler == "GRPC" else "http://"

    if host is not None:
        _port = port if port is not None else default_port
        _host = host
    elif port is None:
        try:
            _uri = urlparse(uri) if uri else urlparse(default_uri)
            _host = _uri.hostname
            _port = _uri.port
        except (AttributeError, ValueError, TypeError) as e:
            raise ParamError("uri is illegal: {}".format(e))
    else:
        raise ParamError("Param is not complete. Please invoke as follow:\n"
                         "\t(host = ${HOST}, port = ${PORT})\n"
                         "\t(uri = ${URI})\n")

    if not is_legal_host(_host) or not is_legal_port(_port):
        raise ParamError("host {} or port {} is illegal".format(_host, _port))

    return "{}{}:{}".format(uri_prefix, str(_host), str(_port))


[docs]class Milvus: def __init__(self, host=None, port=None, handler="GRPC", **kwargs): self._uri = None self._status = None self._connected = False self._handler = handler _uri = kwargs.get('uri', None) pool_uri = _set_uri(host, port, _uri, self._handler) pool_kwargs = _pool_args(handler=handler, **kwargs) self._pool = ConnectionPool(pool_uri, **pool_kwargs) # store extra key-words arguments self._kw = kwargs self._hooks = collections.defaultdict() def __enter__(self): self._conn = self._pool.fetch() return self def __exit__(self, exc_type, exc_val, exc_tb): self._conn.close() self._conn = None def __del__(self): return self.close() def _connection(self): return self._pool.fetch()
[docs] @deprecated def set_hook(self, **kwargs): """ Deprecated """ # TODO: may remove it. if self._stub: return self._stub.set_hook(**kwargs) self._hooks.update(kwargs)
@property def handler(self): return self._handler
[docs] @deprecated def connect(self, host=None, port=None, uri=None, timeout=2): """ Deprecated """ if self.connected() and self._connected: return Status(message="You have already connected {} !".format(self._uri), code=Status.CONNECT_FAILED) if self._stub is None: self._init(host, port, uri, handler=self._handler) if self.ping(timeout): self._status = Status(message="Connected") self._connected = True return self._status
[docs] @deprecated def connected(self): """ Deprecated """ return True if self._status and self._status.OK() else False
[docs] @deprecated def disconnect(self): """ Deprecated """ pass
[docs] def close(self): """ Close client instance """ self._pool = None
[docs] def client_version(self): """ Returns the version of the client. :return: Version of the client. :rtype: (str) """ return __version__
[docs] def server_status(self, timeout=10): """ Returns the status of the Milvus server. :return: Status: Whether the operation is successful. str : Status of the Milvus server. :rtype: (Status, str) """ return self._cmd("status", timeout)
[docs] def server_version(self, timeout=10): """ Returns the version of the Milvus server. :return: Status: Whether the operation is successful. str : Version of the Milvus server. :rtype: (Status, str) """ return self._cmd("version", timeout)
@check_connect def _cmd(self, cmd, timeout=10): check_pass_param(cmd=cmd) with self._connection() as handler: return handler._cmd(cmd, timeout)
[docs] @check_connect def create_collection(self, param, timeout=10): """ Creates a collection. :type param: dict :param param: Information needed to create a collection. `param={'collection_name': 'name', 'dimension': 16, 'index_file_size': 1024 (default), 'metric_type': Metric_type.L2 (default) }` :param timeout: Timeout in seconds. :type timeout: double :return: Whether the operation is successful. :rtype: Status """ if not isinstance(param, dict): raise ParamError('Param type incorrect, expect {} but get {} instead' .format(type(dict), type(param))) collection_param = copy.deepcopy(param) if 'collection_name' not in collection_param: raise ParamError('collection_name is required') collection_name = collection_param["collection_name"] collection_param.pop('collection_name') if 'dimension' not in collection_param: raise ParamError('dimension is required') dim = collection_param["dimension"] collection_param.pop("dimension") index_file_size = collection_param.get('index_file_size', 1024) collection_param.pop('index_file_size', None) metric_type = collection_param.get('metric_type', MetricType.L2) collection_param.pop('metric_type', None) check_pass_param(collection_name=collection_name, dimension=dim, index_file_size=index_file_size, metric_type=metric_type) with self._connection() as handler: return handler.create_collection(collection_name, dim, index_file_size, metric_type, collection_param)
[docs] @check_connect def has_collection(self, collection_name, timeout=10): """ Checks whether a collection exists. :param collection_name: Name of the collection to check. :type collection_name: str :param timeout: Timeout in seconds. :type timeout: int :return: Status: indicate whether the operation is successful. bool if given collection_name exists """ check_pass_param(collection_name=collection_name) with self._connection() as handler: return handler.has_collection(collection_name, timeout)
[docs] @check_connect def get_collection_info(self, collection_name, timeout=10): """ Returns information of a collection. :type collection_name: str :param collection_name: Name of the collection to describe. :returns: (Status, table_schema) Status: indicate if query is successful table_schema: return when operation is successful :rtype: (Status, TableSchema) """ check_pass_param(collection_name=collection_name) with self._connection() as handler: return handler.describe_collection(collection_name, timeout)
[docs] @check_connect def count_entities(self, collection_name, timeout=10): """ Returns the number of vectors in a collection. :type collection_name: str :param collection_name: target table name. :returns: Status: indicate if operation is successful res: int, table row count """ check_pass_param(collection_name=collection_name) with self._connection() as handler: return handler.count_collection(collection_name, timeout)
[docs] @check_connect def list_collections(self, timeout=10): """ Returns collection list. :return: Status: indicate if this operation is successful collections: list of collection names, return when operation is successful :rtype: (Status, list[str]) """ with self._connection() as handler: return handler.show_collections(timeout)
[docs] @check_connect def get_collection_stats(self, collection_name, timeout=10): """ Returns collection statistics information :return: Status: indicate if this operation is successful statistics: statistics information :rtype: (Status, dict) """ check_pass_param(collection_name=collection_name) with self._connection() as handler: return handler.show_collection_info(collection_name, timeout)
[docs] @check_connect def load_collection(self, collection_name, timeout=None): """ Loads a collection for caching. :type collection_name: str :param collection_name: collection to load :returns: Status: indicate if invoke is successful """ check_pass_param(collection_name=collection_name) with self._connection() as handler: return handler.preload_collection(collection_name, timeout)
[docs] @check_connect def drop_collection(self, collection_name, timeout=10): """ Deletes a collection by name. :type collection_name: str :param collection_name: Name of the collection being deleted :return: Status, indicate if operation is successful :rtype: Status """ check_pass_param(collection_name=collection_name) with self._connection() as handler: return handler.drop_collection(collection_name, timeout)
[docs] @check_connect def insert(self, collection_name, records, ids=None, partition_tag=None, params=None, timeout=None, **kwargs): """ Insert vectors to a collection. :param ids: list of id :type ids: list[int] :type collection_name: str :param collection_name: Name of the collection to insert vectors to. :type records: list[list[float]] `example records: [[1.2345],[1.2345]]` `OR using Prepare.records` :param records: List of vectors to insert. :type partition_tag: str or None. If partition_tag is None, vectors will be inserted to the collection rather than partitions. :param partition_tag: Tag of a partition. :returns: Status: Whether vectors are inserted successfully. ids: IDs of the inserted vectors. :rtype: (Status, list(int)) """ if kwargs.get("insert_param", None) is not None: with self._connection() as handler: return handler.insert(None, None, timeout=timeout, **kwargs) check_pass_param(collection_name=collection_name, records=records) partition_tag is not None and check_pass_param(partition_tag=partition_tag) if ids is not None: check_pass_param(ids=ids) if len(records) != len(ids): raise ParamError("length of vectors do not match that of ids") params = params or dict() if not isinstance(params, dict): raise ParamError("Params must be a dictionary type") with self._connection() as handler: return handler.insert(collection_name, records, ids, partition_tag, params, timeout, **kwargs)
[docs] @check_connect def get_entity_by_id(self, collection_name, ids, timeout=None): """ Returns raw vectors according to ids. :param collection_name: Name of the collection :type collection_name: str :param ids: list of vector id :type ids: list :returns: Status: indicate if invoke is successful """ check_pass_param(collection_name=collection_name, ids=ids) with self._connection() as handler: return handler.get_vectors_by_ids(collection_name, ids, timeout=timeout)
[docs] @check_connect def list_id_in_segment(self, collection_name, segment_name, timeout=None): check_pass_param(collection_name=collection_name) check_pass_param(collection_name=segment_name) with self._connection() as handler: return handler.get_vector_ids(collection_name, segment_name, timeout)
[docs] @check_connect def create_index(self, collection_name, index_type=None, params=None, timeout=None, **kwargs): """ Creates index for a collection. :param collection_name: Collection used to create index. :type collection_name: str :param index: index params :type index: dict index_param can be None `example (default) param={'index_type': IndexType.FLAT, 'nlist': 16384}` :param timeout: grpc request timeout. if `timeout` = -1, method invoke a synchronous call, waiting util grpc response else method invoke a asynchronous call, timeout work here :type timeout: int :return: Whether the operation is successful. """ _index_type = IndexType.FLAT if index_type is None else index_type check_pass_param(collection_name=collection_name, index_type=_index_type) params = params or dict() if not isinstance(params, dict): raise ParamError("Params must be a dictionary type") with self._connection() as handler: return handler.create_index(collection_name, _index_type, params, timeout, **kwargs)
[docs] @check_connect def get_index_info(self, collection_name, timeout=10): """ Show index information of a collection. :type collection_name: str :param collection_name: table name been queried :returns: Status: Whether the operation is successful. IndexSchema: """ check_pass_param(collection_name=collection_name) with self._connection() as handler: return handler.describe_index(collection_name, timeout)
[docs] @check_connect def drop_index(self, collection_name, timeout=10): """ Removes an index. :param collection_name: target collection name. :type collection_name: str :return: Status: Whether the operation is successful. ::rtype: Status """ check_pass_param(collection_name=collection_name) with self._connection() as handler: return handler.drop_index(collection_name, timeout)
[docs] @check_connect def create_partition(self, collection_name, partition_tag, timeout=10): """ create a partition for a collection. :param collection_name: Name of the collection. :type collection_name: str :param partition_name: Name of the partition. :type partition_name: str :param partition_tag: Name of the partition tag. :type partition_tag: str :param timeout: time waiting for response. :type timeout: int :return: Status: Whether the operation is successful. """ check_pass_param(collection_name=collection_name, partition_tag=partition_tag) with self._connection() as handler: return handler.create_partition(collection_name, partition_tag, timeout)
[docs] @check_connect def has_partition(self, collection_name, partition_tag): """ Check if specified partition exists. :param collection_name: target table name. :type collection_name: str :param partition_tag: partition tag. :type partition_tag: str :return: Status: Whether the operation is successful. exists: If specified partition exists """ check_pass_param(collection_name=collection_name, partition_tag=partition_tag) with self._connection() as handler: return handler.has_partition(collection_name, partition_tag)
[docs] @check_connect def list_partitions(self, collection_name, timeout=10): """ Show all partitions in a collection. :param collection_name: target table name. :type collection_name: str :param timeout: time waiting for response. :type timeout: int :return: Status: Whether the operation is successful. partition_list: """ check_pass_param(collection_name=collection_name) with self._connection() as handler: return handler.show_partitions(collection_name, timeout)
[docs] @check_connect def drop_partition(self, collection_name, partition_tag, timeout=10): """ Deletes a partition in a collection. :param collection_name: Collection name. :type collection_name: str :param partition_tag: Partition name. :type partition_tag: str :param timeout: time waiting for response. :type timeout: int :return: Status: Whether the operation is successful. """ check_pass_param(collection_name=collection_name, partition_tag=partition_tag) with self._connection() as handler: return handler.drop_partition(collection_name, partition_tag, timeout)
[docs] @check_connect def search(self, collection_name, top_k, query_records, partition_tags=None, params=None, timeout=None, **kwargs): """ Search vectors in a collection. :param collection_name: Name of the collection. :type collection_name: str :param top_k: number of vertors which is most similar with query vectors :type top_k: int :param nprobe: cell number of probe :type nprobe: int :param query_records: vectors to query :type query_records: list[list[float32]] :param partition_tags: tags to search :type partition_tags: list :return Status: Whether the operation is successful. result: query result :rtype: (Status, TopKQueryResult) """ check_pass_param(collection_name=collection_name, topk=top_k, records=query_records) if partition_tags is not None: check_pass_param(partition_tag_array=partition_tags) params = dict() if params is None else params if not isinstance(params, dict): raise ParamError("Params must be a dictionary type") with self._connection() as handler: return handler.search(collection_name, top_k, query_records, partition_tags, params, timeout, **kwargs)
[docs] @check_connect def search_in_segment(self, collection_name, file_ids, query_records, top_k, params=None, timeout=None, **kwargs): """ Searches for vectors in specific segments of a collection. The Milvus server stores vector data into multiple files. Searching for vectors in specific files is a method used in Mishards. Obtain more detail about Mishards, see <a href="https://github.com/milvus-io/milvus/tree/master/shards"> :type collection_name: str :param collection_name: table name been queried :type file_ids: list[str] or list[int] :param file_ids: Specified files id array :type query_records: list[list[float]] :param query_records: all vectors going to be queried :param query_ranges: Optional ranges for conditional search. If not specified, search in the whole table :type top_k: int :param top_k: how many similar vectors will be searched :returns: Status: indicate if query is successful results: query result :rtype: (Status, TopKQueryResult) """ check_pass_param(collection_name=collection_name, topk=top_k, records=query_records, ids=file_ids) params = dict() if params is None else params if not isinstance(params, dict): raise ParamError("Params must be a dictionary type") with self._connection() as handler: return handler.search_in_files(collection_name, file_ids, query_records, top_k, params, timeout, **kwargs)
[docs] @check_connect def delete_entity_by_id(self, collection_name, id_array, timeout=None): """ Deletes vectors in a collection by vector ID. :param collection_name: Name of the collection. :type collection_name: str :param id_array: list of vector id :type id_array: list[int] :return: Status: Whether the operation is successful. """ check_pass_param(collection_name=collection_name, ids=id_array) with self._connection() as handler: return handler.delete_by_id(collection_name, id_array, timeout)
[docs] @check_connect def flush(self, collection_name_array=None, timeout=None, **kwargs): """ Flushes vector data in one collection or multiple collections to disk. :type collection_name_array: list :param collection_name: Name of one or multiple collections to flush. """ if collection_name_array in (None, []): with self._connection() as handler: return handler.flush([], timeout) if not isinstance(collection_name_array, list): raise ParamError("Collection name array must be type of list") if len(collection_name_array) <= 0: raise ParamError("Collection name array is not allowed to be empty") for name in collection_name_array: check_pass_param(collection_name=name) with self._connection() as handler: return handler.flush(collection_name_array, timeout, **kwargs)
[docs] @check_connect def compact(self, collection_name, timeout=None, **kwargs): """ Compacts segments in a collection. This function is recommended after deleting vectors. :type collection_name: str :param collection_name: Name of the collections to compact. """ check_pass_param(collection_name=collection_name) with self._connection() as handler: return handler.compact(collection_name, timeout, **kwargs)
[docs] def get_config(self, parent_key, child_key): """ Gets Milvus configurations. """ cmd = "get_config {}.{}".format(parent_key, child_key) return self._cmd(cmd)
[docs] def set_config(self, parent_key, child_key, value): """ Sets Milvus configurations. """ cmd = "set_config {}.{} {}".format(parent_key, child_key, value) return self._cmd(cmd)