BulkWriter & Bulk Import
Generate Milvus-compatible files (JSON/Parquet) on the client side and import them via bulkInsert. This bypasses the WAL and real-time indexing pipeline, making it ideal for large-scale offline data loading.
When to Use
| Scenario | Use insert() | Use BulkWriter + bulkInsert |
|---|---|---|
| Real-time writes (<10k rows) | Yes | No |
| Batch import (100k+ rows) | Possible | Yes |
| Offline data pipeline | No | Yes |
| Minimize Milvus write pressure | No | Yes |
| Data migration between clusters | No | Yes |
| File-based import (CSV/JSON) | No | Yes |
Quick Start
import { MilvusClient, BulkWriter, DataType } from '@zilliz/milvus2-sdk-node';
// 1. Define schema (same as createCollection)
const schema = {
fields: [
{
name: 'id',
data_type: DataType.Int64,
is_primary_key: true,
autoID: true,
},
{ name: 'vector', data_type: DataType.FloatVector, dim: 128 },
{ name: 'text', data_type: DataType.VarChar, max_length: 512 },
{ name: 'metadata', data_type: DataType.JSON },
],
};
// 2. Create writer
const writer = new BulkWriter({
schema,
localPath: './bulk_output',
format: 'parquet', // or 'json'
});
// 3. Append rows
for (const item of data) {
await writer.append({
vector: item.embedding,
text: item.content,
metadata: { source: item.source },
});
}
// 4. Close and get file paths
const files = await writer.close();
// files = [['./bulk_output/<uuid>/chunk_0/data.parquet']]Options
const writer = new BulkWriter({
schema: BulkWriterSchema, // required — field definitions
format: 'json' | 'parquet', // default: 'json'
localPath: string, // output directory, default: process.cwd()
chunkSize: number, // auto-flush threshold in bytes, default: 128MB
storage: Storage, // pluggable storage backend, default: LocalStorage
});JSON vs Parquet
| JSON | Parquet | |
|---|---|---|
| File size | Large | ~77% smaller |
| Write speed | Fast | Slower |
| Upload speed | Slow | ~4x faster |
| Server import | Slow | ~2x faster |
| Dependencies | None | @dsnp/parquetjs |
| Human readable | Yes | No |
Recommendation: Use Parquet for production, JSON for debugging.
Full Workflow: Write, Upload, Import
import * as Minio from 'minio';
// Step 1: Write files
const writer = new BulkWriter({
schema: { fields: FIELDS },
format: 'parquet',
localPath: './bulk_data',
});
for (const row of data) {
await writer.append(row);
}
const batchFiles = await writer.close();
// Step 2: Upload to MinIO/S3
const minio = new Minio.Client({
endPoint: '127.0.0.1',
port: 9000,
useSSL: false,
accessKey: 'minioadmin',
secretKey: 'minioadmin',
});
const remotePaths = [];
for (const chunk of batchFiles) {
for (const localFile of chunk) {
const remote = `imports/${path.basename(localFile)}`;
await minio.fPutObject('a-bucket', remote, localFile);
remotePaths.push(remote);
}
}
// Step 3: Trigger bulk import
const client = new MilvusClient({ address: 'localhost:19530' });
const importRes = await client.bulkInsert({
collection_name: 'my_collection',
files: remotePaths,
});
// Step 4: Wait for completion
let state;
do {
state = await client.getImportState({ task: importRes.tasks[0] });
await new Promise(r => setTimeout(r, 1000));
} while (state.state !== 'ImportCompleted' && state.state !== 'ImportFailed');
console.log('Import completed, rows:', state.row_count);Streaming from Any Data Source
BulkWriter supports AsyncIterable input via writeFrom(), enabling streaming from any source with constant memory:
import { parse } from 'csv-parse';
import * as fs from 'fs';
// Stream CSV → BulkWriter → Parquet files
const csvStream = fs
.createReadStream('data.csv')
.pipe(parse({ columns: true }));
async function* transform(source) {
for await (const row of source) {
yield {
vector: JSON.parse(row.embedding),
text: row.content,
metadata: { source: row.source },
};
}
}
const writer = new BulkWriter({
schema,
format: 'parquet',
localPath: './output',
});
const files = await writer.writeFrom(transform(csvStream));Collection Clone via Query Iterator
Pull data from an existing collection and write to files for import into another:
// Source: query all data
const iterator = await sourceClient.queryIterator({
collection_name: 'source_collection',
batchSize: 1000,
output_fields: ['vector', 'text', 'metadata'],
expr: 'id > 0',
});
// Write to BulkWriter
const writer = new BulkWriter({
schema: { fields: targetFields },
format: 'parquet',
localPath: './migration',
});
for await (const batch of iterator) {
for (const row of batch) {
await writer.append(row);
}
}
const files = await writer.close();
// Upload + bulkInsert into target collection...Note on Int64: Query returns Int64 values as strings (gRPC limitation). BulkWriter handles this automatically — no precision loss even for values beyond Number.MAX_SAFE_INTEGER (2^53-1).
Progress Events
writer.on('flush', event => {
console.log(
`Chunk ${event.chunkIndex}: ${event.rowCount} rows → ${event.files}`
);
});Dynamic Fields
When enable_dynamic_field: true, extra keys not in the schema are collected into $meta:
const writer = new BulkWriter({
schema: { fields: FIELDS, enable_dynamic_field: true },
format: 'json',
localPath: './output',
});
await writer.append({
vector: [0.1, 0.2, 0.3, 0.4],
text: 'hello',
color: 'red', // dynamic field → stored in $meta
score: 42, // dynamic field → stored in $meta
});You can also pass dynamic fields explicitly via $meta:
await writer.append({
vector: [0.1, 0.2, 0.3, 0.4],
text: 'hello',
$meta: { color: 'red', score: 42 },
});Supported Types
| Category | Types |
|---|---|
| Scalar | Bool, Int8, Int16, Int32, Int64, Float, Double, VarChar, JSON, Geometry, Timestamptz |
| Vector | FloatVector, BinaryVector, Float16Vector, BFloat16Vector, SparseFloatVector, Int8Vector |
| Complex | Array<scalar>, Array<Struct> (with nested vector/Int64 sub-fields) |
| Special | Dynamic fields ($meta), nullable fields, default values, autoID |
Sparse Vector Formats
BulkWriter accepts all SDK sparse formats and normalizes automatically:
// All of these work:
await writer.append({ sparse: { 2: 0.5, 5: 0.3 } }); // dict
await writer.append({ sparse: { indices: [2, 5], values: [0.5, 0.3] } }); // CSR
await writer.append({ sparse: [{ index: 2, value: 0.5 }] }); // COO
await writer.append({ sparse: [undefined, 0.5, undefined, 0.3] }); // arrayTimestamptz
Accepts both ISO strings and Date objects:
await writer.append({ ts: '2025-01-15T12:00:00+08:00' });
await writer.append({ ts: new Date() }); // auto-converted to ISO stringAPI Reference
BulkWriter
| Method | Description |
|---|---|
append(row) | Append a single row. Auto-flushes when buffer exceeds chunkSize. |
commit() | Manually flush the current buffer to disk. |
close() | Flush remaining data and return all file paths (string[][]). |
writeFrom(source) | Write all rows from an AsyncIterable, then close. |
| Property | Type | Description |
|---|---|---|
totalRowCount | number | Total rows appended so far. |
bufferRowCount | number | Rows in the current (unflushed) buffer. |
batchFiles | string[][] | File paths generated so far. |
| Event | Payload | Description |
|---|---|---|
flush | { files, rowCount, chunkIndex } | Emitted after each chunk is flushed. |
bulkInsert / Import APIs
// Trigger import
const res = await client.bulkInsert({
collection_name: 'my_collection',
files: ['path/to/data.parquet'],
});
// Check progress
const state = await client.getImportState({ task: res.tasks[0] });
// List all import jobs
const jobs = await client.listImportTasks({
collection_name: 'my_collection',
});