Source code for milvus.client.abstract

from ..client.exceptions import ParamError

from .utils import (
    check_pass_param,
    parser_range_date,
    is_legal_date_range
)

from .types import IndexType


[docs]class TableSchema: def __init__(self, table_name, dimension, index_file_size, metric_type): """ Table Schema :type table_name: str :param table_name: (Required) name of table `IndexType`: 0-invalid, 1-flat, 2-ivflat, 3-IVF_SQ8, 4-MIX_NSG :type dimension: int64 :param dimension: (Required) dimension of vector :type index_file_size: int64 :param index_file_size: (Optional) max size of files which store index :type metric_type: MetricType :param metric_type: (Optional) vectors metric type `MetricType`: 1-L2, 2-IP """ check_pass_param(table_name=table_name, dimension=dimension, index_file_size=index_file_size, metric_type=metric_type) self.table_name = table_name self.dimension = dimension self.index_file_size = index_file_size self.metric_type = metric_type def __repr__(self): attr_list = ['%s=%r' % (key, value) for key, value in self.__dict__.items()] return '%s(%s)' % (self.__class__.__name__, ', '.join(attr_list))
[docs]class Range: """ Range information :type start_date: str, date or datetime `str should be YY-MM-DD format, e.g. "2019-07-01"` :param start_date: Range start date :type end_date: str, date or datetime `str should be YY-MM-DD format, e.g. "2019-07-01"` :param end_date: Range end date """ def __init__(self, start_date, end_date): start_date = parser_range_date(start_date) end_date = parser_range_date(end_date) if is_legal_date_range(start_date, end_date): self.start_date = start_date self.end_date = end_date else: raise ParamError("The start-date should be smaller" " than or equal to end-date!")
[docs]class QueryResult: def __init__(self, _id, _distance): self.id = _id self.distance = _distance def __str__(self): return "Result(id={}, distance={})".format(self.id, self.distance)
[docs]class RowQueryResult: def __init__(self, _id_list, _dis_list): self._id_list = _id_list or [] self._dis_list = _dis_list or [] # Iterator index self.__index = 0 def __getitem__(self, item): if isinstance(item, slice): _start = item.start or 0 _end = item.stop or self.__len__() _step = item.step or 1 elements = [] for i in range(_start, _end, _step): elements.append(self.__getitem__(i)) return elements return QueryResult(self._id_list[item], self._dis_list[item]) def __len__(self): return len(self._id_list) def __iter__(self): return self def __next__(self): while self.__index < self.__len__(): self.__index += 1 return self.__getitem__(self.__index - 1) # iterate stop, raise Exception self.__index = 0 raise StopIteration()
[docs]class TopKQueryResult: """ TopK query results, shown as 2-D array This Class unpack response from server, store ids and distances separately. """ def __init__(self, raw_source, **kwargs): self._raw = raw_source self._nq = 0 self._topk = 0 self._id_array = [] self._dis_array = [] ## self.__index = 0 self._unpack(self._raw) def _unpack(self, _raw): """ Args: _raw: Returns: """ self._nq = _raw.row_num if self._nq == 0: return id_list = list(_raw.ids) id_col = len(id_list) // self._nq for i in range(0, len(id_list), id_col): self._id_array.append(id_list[i: i + id_col]) dis_list = list(_raw.distances) dis_col = len(dis_list) // self._nq for j in range(0, len(dis_list), dis_col): self._dis_array.append(dis_list[j: j + dis_col]) if len(self._id_array) != self._nq or \ len(self._dis_array) != self._nq: raise ParamError("Result parse error.") if id_col != dis_col: raise ParamError("Result parse error.") self._topk = id_col @property def id_array(self): """ Id array, it's a 2-D array. """ return self._id_array @property def distance_array(self): """ Distance array, it's a 2-D array """ return self._dis_array @property def shape(self): """ getter. return result shape, format as (row, column). """ return self._nq, self._topk @property def raw(self): """ getter. return the raw result response """ return self._raw def __len__(self): return self._nq def __getitem__(self, item): if isinstance(item, slice): _start = item.start or 0 _end = item.stop or self.__len__() _step = item.step or 1 elements = [] for i in range(_start, _end, _step): elements.append(self.__getitem__(i)) return elements return RowQueryResult(self._id_array[item], self._dis_array[item]) def __iter__(self): return self def __next__(self): while self.__index < self.__len__(): self.__index += 1 return self.__getitem__(self.__index - 1) # iterate stop, raise Exception self.__index = 0 raise StopIteration() def __repr__(self): """ :return: """ lam = lambda x: "(id:{}, distance:{})".format(x.id, x.distance) if self.__len__() > 5: middle = '' ll = self[:3] for topk in ll: middle = middle + " [ %s" % ",\n ".join(map(lam, topk[:3])) middle += ",\n ..." middle += "\n %s ]\n\n" % lam(topk[-1]) spaces = """ ...... ......""" ahead = "[\n%s%s\n]" % (middle, spaces) return ahead # self.__len__() < 5 str_out_list = [] for i in range(self.__len__()): str_out_list.append("[\n%s\n]" % ",\n".join(map(lam, self[i]))) return "[\n%s\n]" % ",\n".join(str_out_list)
[docs]class IndexParam: """ Index Param :type table_name: str :param table_name: (Required) name of table :type index_type: IndexType :param index_type: (Required) index type, default = IndexType.INVALID `IndexType`: 0-invalid, 1-flat, 2-ivflat, 3-IVF_SQ8, 4-MIX_NSG :type nlist: int64 :param nlist: (Required) num of cell """ def __init__(self, table_name, index_type, nlist): if table_name is None: raise ParamError('Table name can\'t be None') table_name = str(table_name) if not isinstance(table_name, str) else table_name if isinstance(index_type, int): index_type = IndexType(index_type) if not isinstance(index_type, IndexType) or index_type == IndexType.INVALID: raise ParamError('Illegal index_type, should be IndexType but not IndexType.INVALID') self._table_name = table_name self._index_type = index_type self._nlist = nlist def __str__(self): attr_list = ['%s=%r' % (key.lstrip('_'), value) for key, value in self.__dict__.items()] return '(%s)' % (', '.join(attr_list)) def __repr__(self): attr_list = ['%s=%r' % (key, value) for key, value in self.__dict__.items()] return '%s(%s)' % (self.__class__.__name__, ', '.join(attr_list))
def _abstract(): raise NotImplementedError('You need to override this function')
[docs]class ConnectIntf: """SDK client abstract class Connection is a abstract class """
[docs] def connect(self, host, port, uri, timeout): """ Connect method should be called before any operations Server will be connected after connect return OK Should be implemented :type host: str :param host: host :type port: str :param port: port :type uri: str :param uri: (Optional) uri :type timeout: int :param timeout: :return: Status, indicate if connect is successful """ _abstract()
[docs] def connected(self): """ connected, connection status Should be implemented :return: Status, indicate if connect is successful """ _abstract()
[docs] def disconnect(self): """ Disconnect, server will be disconnected after disconnect return SUCCESS Should be implemented :return: Status, indicate if connect is successful """ _abstract()
[docs] def create_table(self, param, timeout): """ Create table Should be implemented :type param: TableSchema :param param: provide table information to be created :type timeout: int :param timeout: :return: Status, indicate if connect is successful """ _abstract()
[docs] def has_table(self, table_name, timeout): """ This method is used to test table existence. Should be implemented :type table_name: str :param table_name: table name is going to be tested. :type timeout: int :param timeout: :return: has_table: bool, if given table_name exists """ _abstract()
[docs] def delete_table(self, table_name, timeout): """ Delete table Should be implemented :type table_name: str :param table_name: table_name of the deleting table :type timeout: int :param timeout: :return: Status, indicate if connect is successful """ _abstract()
[docs] def add_vectors(self, table_name, records, ids, timeout, **kwargs): """ Add vectors to table Should be implemented :type table_name: str :param table_name: table name been inserted :type records: list[RowRecord] :param records: list of vectors been inserted :type ids: list[int] :param ids: list of ids :type timeout: int :param timeout: :returns: Status : indicate if vectors inserted successfully ids :list of id, after inserted every vector is given a id """ _abstract()
[docs] def search_vectors(self, table_name, top_k, nprobe, query_records, query_ranges, **kwargs): """ Query vectors in a table Should be implemented :type table_name: str :param table_name: table name been queried :type query_records: list[RowRecord] :param query_records: all vectors going to be queried :type query_ranges: list[Range] :param query_ranges: Optional ranges for conditional search. If not specified, search whole table :type top_k: int :param top_k: how many similar vectors will be searched :returns: Status: indicate if query is successful query_results: list[TopKQueryResult] """ _abstract()
[docs] def search_vectors_in_files(self, table_name, file_ids, query_records, top_k, nprobe, query_ranges, **kwargs): """ Query vectors in a table, query vector in specified files Should be implemented :type table_name: str :param table_name: table name been queried :type file_ids: list[str] :param file_ids: Specified files id array :type query_records: list[RowRecord] :param query_records: all vectors going to be queried :type query_ranges: list[Range] :param query_ranges: Optional ranges for conditional search. If not specified, search whole table :type top_k: int :param top_k: how many similar vectors will be searched :returns: Status: indicate if query is successful query_results: list[TopKQueryResult] """ _abstract()
[docs] def describe_table(self, table_name, timeout): """ Show table information Should be implemented :type table_name: str :param table_name: which table to be shown :type timeout: int :param timeout: :returns: Status: indicate if query is successful table_schema: TableSchema, given when operation is successful """ _abstract()
[docs] def get_table_row_count(self, table_name, timeout): """ Get table row count Should be implemented :type table_name, str :param table_name, target table name. :type timeout: int :param timeout: how many similar vectors will be searched :returns: Status: indicate if operation is successful count: int, table row count """ _abstract()
[docs] def show_tables(self, timeout): """ Show all tables in database should be implemented :type timeout: int :param timeout: how many similar vectors will be searched :return: Status: indicate if this operation is successful tables: list[str], list of table names """ _abstract()
[docs] def create_index(self, table_name, index, timeout): """ Create specified index in a table should be implemented :type table_name: str :param table_name: table name :type index: dict :param index: index information dict example: index = { "index_type": IndexType.FLAT, "nlist": 18384 } :type timeout: int :param timeout: how many similar vectors will be searched :return: Status: indicate if this operation is successful :rtype: Status """ _abstract()
[docs] def client_version(self): """ Provide client version should be implemented :return: Status: indicate if operation is successful str : Client version :rtype: (Status, str) """ _abstract()
[docs] def server_version(self, timeout): """ Provide server version should be implemented :type timeout: int :param timeout: how many similar vectors will be searched :return: Status: indicate if operation is successful str : Server version :rtype: (Status, str) """ _abstract()
[docs] def server_status(self, timeout): """ Provide server status. When cmd !='version', provide 'OK' should be implemented :type timeout: int :param timeout: how many similar vectors will be searched :return: Status: indicate if operation is successful str : Server version :rtype: (Status, str) """ _abstract()
[docs] def preload_table(self, table_name, timeout): """ load table to memory cache in advance should be implemented :param table_name: target table name. :type table_name: str :type timeout: int :param timeout: how many similar vectors will be searched :return: Status: indicate if operation is successful ::rtype: Status """ _abstract()
[docs] def describe_index(self, table_name, timeout): """ Show index information should be implemented :param table_name: target table name. :type table_name: str :type timeout: int :param timeout: how many similar vectors will be searched :return: Status: indicate if operation is successful TableSchema: table detail information :rtype: (Status, TableSchema) """ _abstract()
[docs] def drop_index(self, table_name, timeout): """ Show index information should be implemented :param table_name: target table name. :type table_name: str :type timeout: int :param timeout: how many similar vectors will be searched :return: Status: indicate if operation is successful ::rtype: Status """ _abstract()