Skip to Content
Data OperationsBulk Writer & Import

BulkWriter & Bulk Import

Generate Milvus-compatible files (JSON/Parquet) on the client side and import them via bulkInsert. This bypasses the WAL and real-time indexing pipeline, making it ideal for large-scale offline data loading.

When to Use

ScenarioUse insert()Use BulkWriter + bulkInsert
Real-time writes (<10k rows)YesNo
Batch import (100k+ rows)PossibleYes
Offline data pipelineNoYes
Minimize Milvus write pressureNoYes
Data migration between clustersNoYes
File-based import (CSV/JSON)NoYes

Quick Start

import { MilvusClient, BulkWriter, DataType } from '@zilliz/milvus2-sdk-node'; // 1. Define schema (same as createCollection) const schema = { fields: [ { name: 'id', data_type: DataType.Int64, is_primary_key: true, autoID: true, }, { name: 'vector', data_type: DataType.FloatVector, dim: 128 }, { name: 'text', data_type: DataType.VarChar, max_length: 512 }, { name: 'metadata', data_type: DataType.JSON }, ], }; // 2. Create writer const writer = new BulkWriter({ schema, localPath: './bulk_output', format: 'parquet', // or 'json' }); // 3. Append rows for (const item of data) { await writer.append({ vector: item.embedding, text: item.content, metadata: { source: item.source }, }); } // 4. Close and get file paths const files = await writer.close(); // files = [['./bulk_output/<uuid>/chunk_0/data.parquet']]

Options

const writer = new BulkWriter({ schema: BulkWriterSchema, // required — field definitions format: 'json' | 'parquet', // default: 'json' localPath: string, // output directory, default: process.cwd() chunkSize: number, // auto-flush threshold in bytes, default: 128MB storage: Storage, // pluggable storage backend, default: LocalStorage });

JSON vs Parquet

JSONParquet
File sizeLarge~77% smaller
Write speedFastSlower
Upload speedSlow~4x faster
Server importSlow~2x faster
DependenciesNone@dsnp/parquetjs
Human readableYesNo

Recommendation: Use Parquet for production, JSON for debugging.

Full Workflow: Write, Upload, Import

import * as Minio from 'minio'; // Step 1: Write files const writer = new BulkWriter({ schema: { fields: FIELDS }, format: 'parquet', localPath: './bulk_data', }); for (const row of data) { await writer.append(row); } const batchFiles = await writer.close(); // Step 2: Upload to MinIO/S3 const minio = new Minio.Client({ endPoint: '127.0.0.1', port: 9000, useSSL: false, accessKey: 'minioadmin', secretKey: 'minioadmin', }); const remotePaths = []; for (const chunk of batchFiles) { for (const localFile of chunk) { const remote = `imports/${path.basename(localFile)}`; await minio.fPutObject('a-bucket', remote, localFile); remotePaths.push(remote); } } // Step 3: Trigger bulk import const client = new MilvusClient({ address: 'localhost:19530' }); const importRes = await client.bulkInsert({ collection_name: 'my_collection', files: remotePaths, }); // Step 4: Wait for completion let state; do { state = await client.getImportState({ task: importRes.tasks[0] }); await new Promise(r => setTimeout(r, 1000)); } while (state.state !== 'ImportCompleted' && state.state !== 'ImportFailed'); console.log('Import completed, rows:', state.row_count);

Streaming from Any Data Source

BulkWriter supports AsyncIterable input via writeFrom(), enabling streaming from any source with constant memory:

import { parse } from 'csv-parse'; import * as fs from 'fs'; // Stream CSV → BulkWriter → Parquet files const csvStream = fs .createReadStream('data.csv') .pipe(parse({ columns: true })); async function* transform(source) { for await (const row of source) { yield { vector: JSON.parse(row.embedding), text: row.content, metadata: { source: row.source }, }; } } const writer = new BulkWriter({ schema, format: 'parquet', localPath: './output', }); const files = await writer.writeFrom(transform(csvStream));

Collection Clone via Query Iterator

Pull data from an existing collection and write to files for import into another:

// Source: query all data const iterator = await sourceClient.queryIterator({ collection_name: 'source_collection', batchSize: 1000, output_fields: ['vector', 'text', 'metadata'], expr: 'id > 0', }); // Write to BulkWriter const writer = new BulkWriter({ schema: { fields: targetFields }, format: 'parquet', localPath: './migration', }); for await (const batch of iterator) { for (const row of batch) { await writer.append(row); } } const files = await writer.close(); // Upload + bulkInsert into target collection...

Note on Int64: Query returns Int64 values as strings (gRPC limitation). BulkWriter handles this automatically — no precision loss even for values beyond Number.MAX_SAFE_INTEGER (2^53-1).

Progress Events

writer.on('flush', event => { console.log( `Chunk ${event.chunkIndex}: ${event.rowCount} rows → ${event.files}` ); });

Dynamic Fields

When enable_dynamic_field: true, extra keys not in the schema are collected into $meta:

const writer = new BulkWriter({ schema: { fields: FIELDS, enable_dynamic_field: true }, format: 'json', localPath: './output', }); await writer.append({ vector: [0.1, 0.2, 0.3, 0.4], text: 'hello', color: 'red', // dynamic field → stored in $meta score: 42, // dynamic field → stored in $meta });

You can also pass dynamic fields explicitly via $meta:

await writer.append({ vector: [0.1, 0.2, 0.3, 0.4], text: 'hello', $meta: { color: 'red', score: 42 }, });

Supported Types

CategoryTypes
ScalarBool, Int8, Int16, Int32, Int64, Float, Double, VarChar, JSON, Geometry, Timestamptz
VectorFloatVector, BinaryVector, Float16Vector, BFloat16Vector, SparseFloatVector, Int8Vector
ComplexArray<scalar>, Array<Struct> (with nested vector/Int64 sub-fields)
SpecialDynamic fields ($meta), nullable fields, default values, autoID

Sparse Vector Formats

BulkWriter accepts all SDK sparse formats and normalizes automatically:

// All of these work: await writer.append({ sparse: { 2: 0.5, 5: 0.3 } }); // dict await writer.append({ sparse: { indices: [2, 5], values: [0.5, 0.3] } }); // CSR await writer.append({ sparse: [{ index: 2, value: 0.5 }] }); // COO await writer.append({ sparse: [undefined, 0.5, undefined, 0.3] }); // array

Timestamptz

Accepts both ISO strings and Date objects:

await writer.append({ ts: '2025-01-15T12:00:00+08:00' }); await writer.append({ ts: new Date() }); // auto-converted to ISO string

API Reference

BulkWriter

MethodDescription
append(row)Append a single row. Auto-flushes when buffer exceeds chunkSize.
commit()Manually flush the current buffer to disk.
close()Flush remaining data and return all file paths (string[][]).
writeFrom(source)Write all rows from an AsyncIterable, then close.
PropertyTypeDescription
totalRowCountnumberTotal rows appended so far.
bufferRowCountnumberRows in the current (unflushed) buffer.
batchFilesstring[][]File paths generated so far.
EventPayloadDescription
flush{ files, rowCount, chunkIndex }Emitted after each chunk is flushed.

bulkInsert / Import APIs

// Trigger import const res = await client.bulkInsert({ collection_name: 'my_collection', files: ['path/to/data.parquet'], }); // Check progress const state = await client.getImportState({ task: res.tasks[0] }); // List all import jobs const jobs = await client.listImportTasks({ collection_name: 'my_collection', });
Last updated on