Skip to content
Guides Zilliz Cloud Milvus Attu

BulkWriter

BulkWriter APIs generate Milvus-compatible JSON or Parquet files on the client side. The generated file groups can be uploaded to object storage and imported with MilvusClient#bulkInsert or the HTTP import APIs.

import {
BulkWriter,
ColumnBuffer,
JsonFormatter,
LocalStorage,
ParquetFormatter,
VolumeManager,
} from '@zilliz/milvus2-sdk-node';

The package also exports the related types:

type BulkWriterOptions;
type BulkWriterSchema;
type FlushEvent;
type Formatter;
type Storage;
type VolumeManagerConfig;
type VolumeCreateReq;
type VolumeListReq;
type VolumeNameReq;
type VolumeApplyReq;
type VolumeResponse;
new BulkWriter(options: BulkWriterOptions)
interface BulkWriterOptions {
schema: BulkWriterSchema;
storage?: Storage;
format?: 'json' | 'parquet';
chunkSize?: number;
localPath?: string;
}
interface BulkWriterSchema {
fields: FieldType[];
enable_dynamic_field?: boolean;
}

format defaults to json, chunkSize defaults to 128 MB, and storage defaults to LocalStorage.

MethodDescription
append(row)Validate and append one row. Auto-flushes when the buffered data reaches chunkSize.
commit()Flush the current buffer. Does nothing when the buffer is empty.
close()Flush remaining rows and return generated file groups as string[][].
writeFrom(source)Append all rows from an AsyncIterable and then close the writer.
PropertyTypeDescription
totalRowCountnumberTotal appended rows.
bufferRowCountnumberRows currently waiting in the buffer.
batchFilesstring[][]Generated file groups so far.
EventPayloadDescription
flush{ files, rowCount, chunkIndex }Emitted after one chunk is persisted.

Example:

const writer = new BulkWriter({
schema: {
fields: [
{ name: 'id', data_type: DataType.Int64, is_primary_key: true },
{ name: 'vector', data_type: DataType.FloatVector, dim: 128 },
],
},
format: 'parquet',
localPath: './bulk-output',
});
writer.on('flush', (event) => {
console.log(event.chunkIndex, event.files);
});
await writer.append({ id: 1, vector: embedding });
const files = await writer.close();

ColumnBuffer is exported for advanced formatter or storage integrations. Most applications should use BulkWriter directly.

new ColumnBuffer(schema: BulkWriterSchema)
Method or propertyDescription
append(row)Append one row and return the estimated byte size added to the buffer.
getColumn(name)Return one buffered field column.
getColumns()Return all buffered columns as a Map<string, any[]>.
getRow(index)Reconstruct one row from buffered column data.
rowCountNumber of buffered rows.
dynamicRowsDynamic $meta rows when enable_dynamic_field is enabled.

JsonFormatter and ParquetFormatter implement the Formatter interface.

interface Formatter {
readonly extension: string;
persist(
columns: Map<string, any[]>,
dynamicCol: Record<string, any>[],
rowCount: number,
dir: string,
schema: BulkWriterSchema
): Promise<string[]>;
}

LocalStorage implements the Storage interface and returns local file paths unchanged.

interface Storage {
write(localPath: string, remotePath: string): Promise<string>;
}

Custom storage implementations can upload files and return their remote import paths.

const writer = new BulkWriter({
schema,
storage: {
async write(localPath, remotePath) {
await uploadToS3(localPath, remotePath);
return remotePath;
},
},
});

VolumeManager manages Zilliz Cloud volumes used by cloud import workflows.

new VolumeManager(config: VolumeManagerConfig)
new VolumeManager(cloudEndpoint: string, apiKey: string, fetchImpl?: typeof fetch)
interface VolumeManagerConfig {
cloudEndpoint: string;
apiKey: string;
fetch?: typeof fetch;
timeout?: number;
}
MethodAliasDescription
createVolume(data, options?)create_volumeCreate a managed or external volume.
listVolumes(data, options?)list_volumesList volumes in a project.
describeVolume(data, options?)describe_volumeDescribe a volume by name.
deleteVolume(data, options?)delete_volumeDelete a volume by name.
applyVolume(data, options?)apply_volumeApply a path to a volume.
const volumes = new VolumeManager({
cloudEndpoint: 'https://api.cloud.zilliz.com',
apiKey: process.env.ZILLIZ_CLOUD_API_KEY,
});
await volumes.createVolume({
projectId: 'proj-123',
regionId: 'aws-us-west-2',
volumeName: 'imports',
type: 'MANAGED',
});
const list = await volumes.listVolumes({
projectId: 'proj-123',
pageSize: 20,
});