Source code for milvus.client.grpc_client

from urllib.parse import urlparse
import logging

import grpc
from grpc._cython import cygrpc

from ..grpc_gen import milvus_pb2_grpc
from ..grpc_gen import milvus_pb2 as grpc_types
from .abstract import (
    ConnectIntf,
    TableSchema,
    IndexParam
)
from .prepare import Prepare
from .types import IndexType, MetricType, Status
from .utils import (
    int_or_str,
    is_legal_host,
    is_legal_port,
)

from .hooks import BaseaSearchHook
from .client_hooks import (
    SearchHook
)

from .exceptions import ParamError, NotConnectError
from ..settings import DefaultConfig as config
from . import __version__

LOGGER = logging.getLogger(__name__)


[docs]class GrpcMilvus(ConnectIntf): def __init__(self): self._channel = None self._stub = None self._uri = None self.status = None # hook self._search_hook = SearchHook() self._search_file_hook = SearchHook() def __str__(self): attr_list = ['%s=%r' % (key, value) for key, value in self.__dict__.items() if not key.startswith('_')] return '<Milvus: {}>'.format(', '.join(attr_list)) def _set_uri(self, host=None, port=None, uri=None): """ Set server network address """ if host is not None: _port = port if port is not None else config.GRPC_PORT _host = host elif port is None: try: config_uri = urlparse(config.GRPC_URI) _uri = urlparse(uri) if uri else config_uri if _uri.scheme != 'tcp': raise ParamError( 'Invalid parameter uri: `{}`. Scheme `{}` ' 'is not supported'.format(_uri, _uri.scheme)) _host = _uri.hostname _port = _uri.port except Exception: raise ParamError("`{}` is illegal".format(uri)) else: raise ParamError("Param is not complete. Please invoke as follow:\n" "\t(host = ${HOST}, port = ${PORT})\n" "\t(uri = ${URI})\n") if not is_legal_host(_host) or not is_legal_port(_port): raise ParamError("host or port is illegal") self._uri = "{}:{}".format(str(_host), str(_port)) def _set_channel(self, host=None, port=None, uri=None): """ set grpc channel """ self._set_uri(host, port, uri) # set transport unlimited self._channel = grpc.insecure_channel( self._uri, options=[(cygrpc.ChannelArgKey.max_send_message_length, -1), (cygrpc.ChannelArgKey.max_receive_message_length, -1)] ) def _set_hook(self, **kwargs): _search_hook = kwargs.get('search', None) if _search_hook: if not isinstance(_search_hook, BaseaSearchHook): raise ParamError("search hook must be a subclass of `BaseSearchHook`") self._search_hook = _search_hook _search_file_hook = kwargs.get('search_in_file', None) if _search_file_hook: if not isinstance(_search_file_hook, BaseaSearchHook): raise ParamError("search hook must be a subclass of `BaseSearchHook`") self._search_file_hook = _search_file_hook @property def server_address(self): """ Server network address """ return self._uri
[docs] def connect(self, host=None, port=None, uri=None, timeout=3): """ Connect method should be called before any operations. Server will be connected after connect return OK :type host: str :type port: str :type uri: str :type timeout: float :param host: (Optional) host of the server, default host is 127.0.0.1 :param port: (Optional) port of the server, default port is 19530 :param uri: (Optional) only support tcp proto now, default uri is `tcp://127.0.0.1:19530` :param timeout: (Optional) connection timeout, default timeout is 3000ms :return: Status, indicate if connect is successful :rtype: Status """ if not self._channel: self._set_channel(host, port, uri) elif self.connected(): return Status(message="You have already connected!", code=Status.CONNECT_FAILED) try: # check if server is ready grpc.channel_ready_future(self._channel).result(timeout=timeout) except grpc.FutureTimeoutError: raise NotConnectError('Fail connecting to server on {}. Timeout'.format(self._uri)) except grpc.RpcError as e: raise NotConnectError("Connect error: <{}>".format(e)) # Unexpected error except Exception as e: raise NotConnectError("Error occurred when trying to connect server:\n" "\t<{}>".format(str(e))) self._stub = milvus_pb2_grpc.MilvusServiceStub(self._channel) self.status = Status() return self.status
[docs] def connected(self): """ Check if client is connected to the server :return: if client is connected :rtype: bool """ if not self._stub or not self._channel: return False try: grpc.channel_ready_future(self._channel).result(timeout=2) return True except (grpc.FutureTimeoutError, grpc.RpcError): return False
[docs] def disconnect(self): """ Disconnect with the server and distroy the channel :return: Status, indicate if disconnect is successful :rtype: Status """ # After closeing, a exception stack trace is printed from a background thread and # no exception is thrown in the main thread, issue is under test and not done yet # checkout https://github.com/grpc/grpc/issues/18995 # Also checkout Properly Specify Channel.close Behavior in Python: # https://github.com/grpc/grpc/issues/19235 if not self.connected(): raise NotConnectError('Please connect to the server first!') # closing channel by calling interface close() will result in grpc interval error del self._channel # try: # self._channel.close() # except Exception as e: # LOGGER.error(e) # return Status(code=Status.CONNECT_FAILED, message='Disconnection failed') self.status = None self._channel = None self._stub = None return Status(message='Disconnect successfully')
[docs] def client_version(self): """ Provide client version :return: version: Client version :rtype: (str) """ return __version__
[docs] def server_version(self, timeout=10): """ Provide server version :return: Status: indicate if operation is successful str : Server version :rtype: (Status, str) """ return self._cmd(cmd='version', timeout=timeout)
[docs] def server_status(self, timeout=10): """ Provide server status :return: Status: indicate if operation is successful str : Server version :rtype: (Status, str) """ return self._cmd(cmd='OK', timeout=timeout)
def _cmd(self, cmd, timeout=10): if not self.connected(): raise NotConnectError('Please connect to the server first') cmd = Prepare.cmd(cmd) try: response = self._stub.Cmd.future(cmd).result(timeout=timeout) if response.status.error_code == 0: return Status(message='Success!'), response.string_reply return Status(code=response.status.error_code, message=response.status.reason), None except grpc.FutureTimeoutError as e: LOGGER.error(e) return Status(Status.UNEXPECTED_ERROR, message='Request timeout'), None except grpc.RpcError as e: LOGGER.error(e) return Status(e.code(), message='Error occurred. {}'.format(e.details())), None
[docs] def create_table(self, param, timeout=10): """ Create table :type param: dict or TableSchema :param param: Provide table information to be created `example param={'table_name': 'name', 'dimension': 16, 'index_file_size': 1024 (default), 'metric_type': Metric_type.L2 (default) }` `OR using Prepare.table_schema to create param` :param timeout: timeout, The unit is seconds :type timeout: double :return: Status, indicate if operation is successful :rtype: Status """ if not self.connected(): raise NotConnectError('Please connect to the server first') table_schema = Prepare.table_schema(param) try: status = self._stub.CreateTable.future(table_schema).result(timeout=timeout) if status.error_code == 0: return Status(message='Create table successfully!') LOGGER.error(status) return Status(code=status.error_code, message=status.reason) except grpc.FutureTimeoutError as e: LOGGER.error(e) return Status(Status.UNEXPECTED_ERROR, message='Request timeout') except grpc.RpcError as e: LOGGER.error(e) return Status(e.code(), message='Error occurred: {}'.format(e.details()))
[docs] def has_table(self, table_name, timeout=10): """ This method is used to test table existence. :param table_name: table name is going to be tested. :type table_name: str :param timeout: time waiting for server response :type timeout: int :return: Status: indicate if vectors inserted successfully bool if given table_name exists """ if not self.connected(): raise NotConnectError('Please connect to the server first') table_name = Prepare.table_name(table_name) try: reply = self._stub.HasTable.future(table_name).result(timeout=timeout) if reply.status.error_code == 0: return Status(), reply.bool_reply return Status(code=reply.status.error_code, message=reply.status.reason), False except grpc.FutureTimeoutError as e: LOGGER.error(e) return Status(code=Status.UNEXPECTED_ERROR, message="request timeout"), False except grpc.RpcError as e: LOGGER.error(e) return Status(code=e.code(), message=e.details()), False
[docs] def describe_table(self, table_name, timeout=10): """ Show table information :type table_name: str :param table_name: which table to be shown :returns: (Status, table_schema) Status: indicate if query is successful table_schema: return when operation is successful :rtype: (Status, TableSchema) """ if not self.connected(): raise NotConnectError('Please connect to the server first') table_name = Prepare.table_name(table_name) try: response = self._stub.DescribeTable.future(table_name).result(timeout=timeout) if response.status.error_code == 0: table = TableSchema( table_name=response.table_name, dimension=response.dimension, index_file_size=response.index_file_size, metric_type=MetricType(response.metric_type) ) return Status(message='Describe table successfully!'), table LOGGER.error(response.status) return Status(code=response.status.error_code, message=response.status.reason), None except grpc.FutureTimeoutError as e: LOGGER.error(e) return Status(Status.UNEXPECTED_ERROR, message='Request timeout'), None except grpc.RpcError as e: LOGGER.error(e) return Status(e.code(), message='Error occurred. {}'.format(e.details())), None
[docs] def count_table(self, table_name, timeout=30): """ obtain vector number in table :type table_name: str :param table_name: target table name. :returns: Status: indicate if operation is successful res: int, table row count """ if not self.connected(): raise NotConnectError('Please connect to the server first') table_name = Prepare.table_name(table_name) try: trc = self._stub.CountTable.future(table_name).result(timeout=timeout) if trc.status.error_code == 0: return Status(message='Success!'), trc.table_row_count return Status(code=trc.status.error_code, message=trc.status.reason), None except grpc.FutureTimeoutError as e: LOGGER.error(e) return Status(Status.UNEXPECTED_ERROR, message='Request timeout'), None except grpc.RpcError as e: LOGGER.error(e) return Status(e.code(), message='Error occurred. {}'.format(e.details())), None
[docs] def show_tables(self, timeout=10): """ Show all tables information in database :return: Status: indicate if this operation is successful tables: list of table names, return when operation is successful :rtype: (Status, list[str]) """ if not self.connected(): raise NotConnectError('Please connect to the server first') cmd = Prepare.cmd('show_tables') try: response = self._stub.ShowTables.future(cmd).result(timeout=timeout) if response.status.error_code == 0: return Status(message='Show tables successfully!'), \ [name for name in response.table_names if len(name) > 0] return Status(response.status.error_code, message=response.status.reason), [] except grpc.FutureTimeoutError: return Status(Status.UNEXPECTED_ERROR, message="Request timeout"), [] except grpc.RpcError as e: LOGGER.error(e) return Status(e.code(), message='Error occurred. {}'.format(e.details())), []
[docs] def preload_table(self, table_name, timeout=300): """ Load table to cache in advance :type table_name: str :param table_name: table to preload :returns: Status: indicate if invoke is successful """ if not self.connected(): raise NotConnectError('Please connect to the server first') table_name = Prepare.table_name(table_name) try: status = self._stub.PreloadTable.future(table_name).result(timeout=timeout) return Status(code=status.error_code, message=status.reason) except grpc.FutureTimeoutError as e: LOGGER.error(e) return Status(Status.UNEXPECTED_ERROR, message='Request timeout') except grpc.RpcError as e: return Status(code=e.code(), message='Error occurred. {}'.format(e.details()))
[docs] def drop_table(self, table_name, timeout=20): """ Delete table with table_name :type table_name: str :param table_name: Name of the table being deleted :return: Status, indicate if operation is successful :rtype: Status """ if not self.connected(): raise NotConnectError('Please connect to the server first') table_name = Prepare.table_name(table_name) try: status = self._stub.DropTable.future(table_name).result(timeout=timeout) if status.error_code == 0: return Status(message='Delete table successfully!') return Status(code=status.error_code, message=status.reason) except grpc.FutureTimeoutError as e: LOGGER.error(e) return Status(Status.UNEXPECTED_ERROR, message='Request timeout') except grpc.RpcError as e: LOGGER.error(e) return Status(e.code(), message='Error occurred: {}'.format(e.details()))
[docs] def insert(self, table_name, records, ids=None, timeout=-1, **kwargs): """ Add vectors to table :param ids: list of id :type ids: list[int] :type table_name: str :param table_name: table name been inserted :type records: list[list[float]] `example records: [[1.2345],[1.2345]]` `OR using Prepare.records` :param records: list of vectors been inserted :type timeout: int :param timeout: time waiting for server response :returns: Status: indicate if vectors inserted successfully ids: list of id, after inserted every vector is given a id :rtype: (Status, list(int)) """ if not self.connected(): raise NotConnectError('Please connect to the server first') insert_param = kwargs.get('insert_param', None) if not insert_param: insert_param = Prepare.insert_param(table_name, records, ids) else: if not isinstance(insert_param, grpc_types.InsertParam): raise ParamError("The value of key 'insert_param' is invalid") try: if timeout == -1: vector_ids = self._stub.Insert(insert_param) else: vector_ids = self._stub.Insert.future(insert_param).result(timeout=timeout) if vector_ids.status.error_code == 0: ids = list(vector_ids.vector_id_array) return Status(message='Add vectors successfully!'), ids return Status(code=vector_ids.status.error_code, message=vector_ids.status.reason), [] except grpc.RpcError as e: LOGGER.error(e) return Status(e.code(), message='Error occurred. {}'.format(e.details())), [] except grpc.FutureTimeoutError as e: LOGGER.error(e) return Status(code=Status.UNEXPECTED_ERROR, message="Request timeout"), []
[docs] def create_index(self, table_name, index=None, timeout=-1): """ build vectors of specific table and create vector index :param table_name: table used to crete index. :type table_name: str :param index: index params :type index: dict index_param can be None `example (default) param={'index_type': IndexType.FLAT, 'nlist': 16384}` :param timeout: grpc request timeout. if `timeout` = -1, method invoke a synchronous call, waiting util grpc response else method invoke a asynchronous call, timeout work here :type timeout: int :return: Status, indicate if operation is successful """ index_default = { 'index_type': IndexType.FLAT, 'nlist': 16384 } if not index: _index = index_default elif not isinstance(index, dict): raise ParamError("param `index` should be a dictionary") else: _index = index if index.get('index_type', None) is None: _index.update({'index_type': IndexType.FLAT}) if index.get('nlist', None) is None: _index.update({'nlist': 16384}) if not self.connected(): raise NotConnectError('Please connect to the server first') index_param = Prepare.index_param(table_name, _index) try: if timeout == -1: status = self._stub.CreateIndex(index_param) elif timeout < 0: raise ParamError("Param `timeout` should be a positive number or -1") else: try: status = self._stub.CreateIndex.future(index_param).result(timeout=timeout) except grpc.FutureTimeoutError as e: LOGGER.error(e) return Status(Status.UNEXPECTED_ERROR, message='Request timeout') if status.error_code == 0: return Status(message='Build index successfully!') return Status(code=status.error_code, message=status.reason) except grpc.RpcError as e: LOGGER.error(e) return Status(e.code(), message='Error occurred. {}'.format(e.details()))
[docs] def describe_index(self, table_name, timeout=10): """ Show index information of designated table :type table_name: str :param table_name: table name been queried :returns: Status: indicate if query is successful IndexSchema: """ if not self.connected(): raise NotConnectError('Please connect to the server first') table_name = Prepare.table_name(table_name) try: index_param = self._stub.DescribeIndex.future(table_name).result(timeout=timeout) status = index_param.status if status.error_code == 0: return Status(message="Successfully"), \ IndexParam(index_param.table_name, index_param.index.index_type, index_param.index.nlist) return Status(code=status.error_code, message=status.reason), None except grpc.FutureTimeoutError as e: LOGGER.error(e) return Status(code=Status.UNEXPECTED_ERROR, message='Request timeout'), None except grpc.RpcError as e: LOGGER.error(e) return Status(e.code(), message='Error occurred. {}'.format(e.details())), None
[docs] def drop_index(self, table_name, timeout=10): """ drop index from index file :param table_name: target table name. :type table_name: str :return: Status: indicate if operation is successful ::rtype: Status """ if not self.connected(): raise NotConnectError('Please connect to the server first') table_name = Prepare.table_name(table_name) try: status = self._stub.DropIndex.future(table_name).result(timeout=timeout) return Status(code=status.error_code, message=status.reason) except grpc.FutureTimeoutError as e: LOGGER.error(e) return Status(Status.UNEXPECTED_ERROR, message='Request timeout') except grpc.RpcError as e: LOGGER.error(e) return Status(e.code(), message='Error occurred. {}'.format(e.details()))
[docs] def search(self, table_name, top_k, nprobe, query_records, query_ranges=None, **kwargs): """ Search similar vectors in designated table :param table_name: target table name :type table_name: str :param top_k: number of vertors which is most similar with query vectors :type top_k: int :param nprobe: cell number of probe :type nprobe: int :param query_records: vectors to query :type query_records: list[list[float32]] :param query_ranges: query data range :return Status: indicate if search successfully result: query result :rtype: (Status, TopKQueryResult) """ if not self.connected(): raise NotConnectError('Please connect to the server first') request = Prepare.search_param( table_name, query_records, query_ranges, top_k, nprobe ) try: self._search_hook.pre_search() response = self._stub.Search(request) self._search_hook.aft_search() if self._search_hook.on_response(): return response if response.status.error_code != 0: return Status(code=response.status.error_code, message=response.status.reason), [] resutls = self._search_hook.handle_response(response) return Status(message='Search vectors successfully!'), resutls except grpc.RpcError as e: LOGGER.error(e) status = Status(code=e.code(), message='Error occurred: {}'.format(e.details())) return status, []
[docs] def search_in_files(self, table_name, file_ids, query_records, top_k, nprobe=16, query_ranges=None, **kwargs): """ Query vectors in a table, in specified files :type nprobe: int :param nprobe: :type table_name: str :param table_name: table name been queried :type file_ids: list[str] or list[int] :param file_ids: Specified files id array :type query_records: list[list[float]] :param query_records: all vectors going to be queried :param query_ranges: Optional ranges for conditional search. If not specified, search in the whole table :type top_k: int :param top_k: how many similar vectors will be searched :returns: Status: indicate if query is successful results: query result :rtype: (Status, TopKQueryResult) """ if not self.connected(): raise NotConnectError('Please connect to the server first') file_ids = list(map(int_or_str, file_ids)) infos = Prepare.search_vector_in_files_param( table_name, query_records, query_ranges, top_k, nprobe, file_ids ) try: self._search_file_hook.pre_search() response = self._stub.SearchInFiles(infos) self._search_file_hook.aft_search() if self._search_file_hook.on_response(): return response if response.status.error_code != 0: return Status(code=response.status.error_code, message=response.status.reason), [] return Status(message='Search vectors successfully!'), \ self._search_file_hook.handle_response(response) except grpc.RpcError as e: LOGGER.error(e) status = Status(code=e.code(), message='Error occurred. {}'.format(e.details())) return status, []
def __delete_vectors_by_range(self, table_name, start_date=None, end_date=None, timeout=10): """ Delete vectors by range. The data range contains start_time but not end_time This method is deprecated, not recommended for users :type table_name: str :param table_name: str, date, datetime :type start_date: str, date, datetime :param start_date: :type end_date: str, date, datetime :param end_date: :return: Status: indicate if invoke is successful """ if not self.connected(): raise NotConnectError('Please connect to the server first') delete_range = Prepare.delete_param(table_name, start_date, end_date) try: status = self._stub.DeleteByRange.future(delete_range).result(timeout=timeout) return Status(code=status.error_code, message=status.reason) except grpc.FutureTimeoutError as e: LOGGER.error(e) return Status(Status.UNEXPECTED_ERROR, message='Request timeout') except grpc.RpcError as e: LOGGER.error(e) return Status(e.code(), message='Error occurred. {}'.format(e.details())) # In old version of pymilvus, some methods are different from the new. # apply alternative method name for compatibility get_table_row_count = count_table delete_table = drop_table add_vectors = insert search_vectors = search search_vectors_in_files = search_in_files