BulkWriter & Bulk Import
Generate Milvus-compatible files (JSON/Parquet) on the client side and import them via bulkInsert. This bypasses the WAL and real-time indexing pipeline, making it ideal for large-scale offline data loading.
When to Use
Section titled “When to Use”| Scenario | Use insert() | Use BulkWriter + bulkInsert |
|---|---|---|
| Real-time writes (<10k rows) | Yes | No |
| Batch import (100k+ rows) | Possible | Yes |
| Offline data pipeline | No | Yes |
| Minimize Milvus write pressure | No | Yes |
| Data migration between clusters | No | Yes |
| File-based import (CSV/JSON) | No | Yes |
Quick Start
Section titled “Quick Start”import { MilvusClient, BulkWriter, DataType } from '@zilliz/milvus2-sdk-node';
// 1. Define schema (same as createCollection)const schema = { fields: [ { name: 'id', data_type: DataType.Int64, is_primary_key: true, autoID: true, }, { name: 'vector', data_type: DataType.FloatVector, dim: 128 }, { name: 'text', data_type: DataType.VarChar, max_length: 512 }, { name: 'metadata', data_type: DataType.JSON }, ],};
// 2. Create writerconst writer = new BulkWriter({ schema, localPath: './bulk_output', format: 'parquet', // or 'json'});
// 3. Append rowsfor (const item of data) { await writer.append({ vector: item.embedding, text: item.content, metadata: { source: item.source }, });}
// 4. Close and get file pathsconst files = await writer.close();// files = [['./bulk_output/<uuid>/chunk_0/data.parquet']]Options
Section titled “Options”const writer = new BulkWriter({ schema: BulkWriterSchema, // required — field definitions format: 'json' | 'parquet', // default: 'json' localPath: string, // output directory, default: process.cwd() chunkSize: number, // auto-flush threshold in bytes, default: 128MB storage: Storage, // pluggable storage backend, default: LocalStorage});JSON vs Parquet
Section titled “JSON vs Parquet”| JSON | Parquet | |
|---|---|---|
| File size | Large | ~77% smaller |
| Write speed | Fast | Slower |
| Upload speed | Slow | ~4x faster |
| Server import | Slow | ~2x faster |
| Dependencies | None | @dsnp/parquetjs |
| Human readable | Yes | No |
Recommendation: Use Parquet for production, JSON for debugging.
Full Workflow: Write, Upload, Import
Section titled “Full Workflow: Write, Upload, Import”import * as Minio from 'minio';
// Step 1: Write filesconst writer = new BulkWriter({ schema: { fields: FIELDS }, format: 'parquet', localPath: './bulk_data',});
for (const row of data) { await writer.append(row);}const batchFiles = await writer.close();
// Step 2: Upload to MinIO/S3const minio = new Minio.Client({ endPoint: '127.0.0.1', port: 9000, useSSL: false, accessKey: 'minioadmin', secretKey: 'minioadmin',});
const remotePaths = [];for (const chunk of batchFiles) { for (const localFile of chunk) { const remote = `imports/${path.basename(localFile)}`; await minio.fPutObject('a-bucket', remote, localFile); remotePaths.push(remote); }}
// Step 3: Trigger bulk importconst client = new MilvusClient({ address: 'localhost:19530' });const importRes = await client.bulkInsert({ collection_name: 'my_collection', files: remotePaths,});
// Step 4: Wait for completionlet state;do { state = await client.getImportState({ task: importRes.tasks[0] }); await new Promise(r => setTimeout(r, 1000));} while (state.state !== 'ImportCompleted' && state.state !== 'ImportFailed');
console.log('Import completed, rows:', state.row_count);Streaming from Any Data Source
Section titled “Streaming from Any Data Source”BulkWriter supports AsyncIterable input via writeFrom(), enabling streaming from any source with constant memory:
import { parse } from 'csv-parse';import * as fs from 'fs';
// Stream CSV → BulkWriter → Parquet filesconst csvStream = fs .createReadStream('data.csv') .pipe(parse({ columns: true }));
async function* transform(source) { for await (const row of source) { yield { vector: JSON.parse(row.embedding), text: row.content, metadata: { source: row.source }, }; }}
const writer = new BulkWriter({ schema, format: 'parquet', localPath: './output',});const files = await writer.writeFrom(transform(csvStream));Collection Clone via Query Iterator
Section titled “Collection Clone via Query Iterator”Pull data from an existing collection and write to files for import into another:
// Source: query all dataconst iterator = await sourceClient.queryIterator({ collection_name: 'source_collection', batchSize: 1000, output_fields: ['vector', 'text', 'metadata'], expr: 'id > 0',});
// Write to BulkWriterconst writer = new BulkWriter({ schema: { fields: targetFields }, format: 'parquet', localPath: './migration',});
for await (const batch of iterator) { for (const row of batch) { await writer.append(row); }}const files = await writer.close();
// Upload + bulkInsert into target collection...Note on Int64: Query returns Int64 values as strings (gRPC limitation). BulkWriter handles this automatically — no precision loss even for values beyond Number.MAX_SAFE_INTEGER (2^53-1).
Progress Events
Section titled “Progress Events”writer.on('flush', event => { console.log( `Chunk ${event.chunkIndex}: ${event.rowCount} rows → ${event.files}` );});Dynamic Fields
Section titled “Dynamic Fields”When enable_dynamic_field: true, extra keys not in the schema are collected into $meta:
const writer = new BulkWriter({ schema: { fields: FIELDS, enable_dynamic_field: true }, format: 'json', localPath: './output',});
await writer.append({ vector: [0.1, 0.2, 0.3, 0.4], text: 'hello', color: 'red', // dynamic field → stored in $meta score: 42, // dynamic field → stored in $meta});You can also pass dynamic fields explicitly via $meta:
await writer.append({ vector: [0.1, 0.2, 0.3, 0.4], text: 'hello', $meta: { color: 'red', score: 42 },});Supported Types
Section titled “Supported Types”| Category | Types |
|---|---|
| Scalar | Bool, Int8, Int16, Int32, Int64, Float, Double, VarChar, JSON, Geometry, Timestamptz |
| Vector | FloatVector, BinaryVector, Float16Vector, BFloat16Vector, SparseFloatVector, Int8Vector |
| Complex | Array<scalar>, Array<Struct> (with nested vector/Int64 sub-fields) |
| Special | Dynamic fields ($meta), nullable fields, default values, autoID |
Sparse Vector Formats
Section titled “Sparse Vector Formats”BulkWriter accepts all SDK sparse formats and normalizes automatically:
// All of these work:await writer.append({ sparse: { 2: 0.5, 5: 0.3 } }); // dictawait writer.append({ sparse: { indices: [2, 5], values: [0.5, 0.3] } }); // CSRawait writer.append({ sparse: [{ index: 2, value: 0.5 }] }); // COOawait writer.append({ sparse: [undefined, 0.5, undefined, 0.3] }); // arrayTimestamptz
Section titled “Timestamptz”Accepts both ISO strings and Date objects:
await writer.append({ ts: '2025-01-15T12:00:00+08:00' });await writer.append({ ts: new Date() }); // auto-converted to ISO stringAPI Reference
Section titled “API Reference”BulkWriter
Section titled “BulkWriter”| Method | Description |
|---|---|
append(row) | Append a single row. Auto-flushes when buffer exceeds chunkSize. |
commit() | Manually flush the current buffer to disk. |
close() | Flush remaining data and return all file paths (string[][]). |
writeFrom(source) | Write all rows from an AsyncIterable, then close. |
| Property | Type | Description |
|---|---|---|
totalRowCount | number | Total rows appended so far. |
bufferRowCount | number | Rows in the current (unflushed) buffer. |
batchFiles | string[][] | File paths generated so far. |
| Event | Payload | Description |
|---|---|---|
flush | { files, rowCount, chunkIndex } | Emitted after each chunk is flushed. |
bulkInsert / Import APIs
Section titled “bulkInsert / Import APIs”// Trigger importconst res = await client.bulkInsert({ collection_name: 'my_collection', files: ['path/to/data.parquet'],});
// Check progressconst state = await client.getImportState({ task: res.tasks[0] });
// List all import jobsconst jobs = await client.listImportTasks({ collection_name: 'my_collection',});