Migrating Vector Data from Milvus to TiDB

Migrating Vector Data from Milvus to TiDB

This article was written by caiyfc, a dedicated TiDB Cloud Serverless user and TiDB Community Moderator.

Introduction

Recently, I’ve been exploring the use of vector databases to build Retrieval-Augmented Generation (RAG) applications, successfully implementing a setup with Milvus, Llama 3, Ollama, and LangChain. After obtaining a TiDB Cloud Serverless credit through an event, I decided to migrate the vector data from Milvus to TiDB Cloud Serverless.

Upon researching, I discovered that existing migration tools currently do not support transferring data from Milvus to TiDB. However, this doesn’t mean migration is impossible. While existing tools do not facilitate this process, a manual migration is feasible. This article outlines my approach to achieving this.

For more information about obtaining free TiDB Cloud Serverless credit, visit the event page.

Migration Plan

To perform data migration, the first step is to determine the migration plan. The simplest migration consists of two steps: exporting data from the source database and importing it into the target database, thus completing the data migration.

However, this case is different. The RAG application utilizes LangChain, and based on research, the structure created by LangChain in Milvus differs from that in TiDB.

In Milvus, the collection name is: LangChainCollection, with the structure being:

However, in TiDB, the table name is langchain_vector, and the structure is as follows:

The documentation for LangChain also provides details:

Therefore, the data migration will require two additional steps: data preparation and table structure adjustment. Since these are heterogeneous databases, the exported data format chosen is the more universal CSV.

The overall plan is as follows:

Exporting Data from Milvus

According to the official Milvus documentation, there is no tool available to directly export data as a CSV file. However, we can use the Python SDK to read the data and then save it as a CSV file.

import csv
from pymilvus import connections, Collection

# Connect to Milvus
connections.connect("default", host="10.3.xx.xx", port="19530")

# Get the Collection
collection = Collection("LangChainCollection")

# Paginate through all the data
limit = 1000
offset = 0
all_results = []

while True:
    # Pass expr parameter, using a simple condition to query all data
    results = collection.query(expr="", output_fields=["pk", "source", "page", "text", "vector"], limit=limit, offset=offset)
    if not results:
        break
    all_results.extend(results)
    offset += limit

# Open the CSV file, prepare to write data
with open("milvus_data.csv", "w", newline="", encoding='utf-8') as csvfile:
    # Define the CSV column names
    fieldnames = ["pk", "source", "page", "text", "vector"]
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

    # Write the header
    writer.writeheader()

    # Write each record
    for result in all_results:
        # Parse JSON data and extract fields
        vector_str = ','.join(map(str, result.get("vector", [])))  # Convert the vector array to a string
        writer.writerow({
            "pk": result.get("pk"),          # Get the primary key
            "source": result.get("source"),  # Get the source file
            "page": result.get("page"),      # Get the page number
            "text": result.get("text"),      # Get the text
            "vector": vector_str             # Write the vector data
        })

print(f"Total records written to CSV: {len(all_results)}")

The format of the exported CSV file data is as follows:

Data Preparation and Table Structure Adjustment

I converted a small amount of test data into vectors and used LangChain to load it into TiDB Cloud Serverless. This process facilitated obtaining the data structure and format within TiDB Cloud Serverless.

The table structure is as follows:

CREATE TABLE `langchain_vector` (
`id` varchar(36) NOT NULL,
`embedding` vector(512) NOT NULL COMMENT 'hnsw(distance=cosine)',
`document` text DEFAULT NULL,
`meta` json DEFAULT NULL,
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin

The exported data format in CSV is as follows (partial content omitted):

"id","embedding","document","meta","create_time","update_time"
"081574ec-b3e4-481b-b0c7-9a789080d160","[-0.4020949,-0.6850993,******,0.16776393,-0.049104385]","forecasting and disaster mitigation. The organization is committed to advancing scienti\0c\nknowledge and improving public safety and well-being through its work.\nFor further information, please contact:","{\"page\": 5, \"source\": \"./WMO report highlights growing shortfalls and stress in global water resources.pdf\"}","2024-10-22 02:41:49","2024-10-22 02:41:49"

Given the CSV file exported from Milvus, the relationship is clear: "embedding" corresponds to "vector," "document" corresponds to "text," and "meta" corresponds to "page" and "source." This logic clarifies the mapping. A data preparation script based on this relationship is as follows:

import pandas as pd
import json
from uuid import uuid4
from datetime import datetime

# Read the CSV file
input_csv = 'milvus_data.csv'  # Replace with your CSV file name
df = pd.read_csv(input_csv)

# Create a new DataFrame
output_data = []

for _, row in df.iterrows():
    # Extract required fields
    id_value = str(uuid4())  # Generate a unique ID
    embedding = f"[{','.join(row['vector'].split(','))}]"  # Convert vector to embedding format
    document = row['text']

    # Generate meta information
    meta_dict = {"page": row['page'], "source": row['source']}
    meta = json.dumps(meta_dict, ensure_ascii=False)  # First generate standard JSON
    # meta = meta.replace('"', '\\"')  # Escape double quotes if needed

    create_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    update_time = create_time  # Same time for update

    # Add to output data
    output_data.append({
        "id": id_value,
        "embedding": embedding,
        "document": document,
        "meta": meta,
        "create_time": create_time,
        "update_time": update_time
    })

# Convert to DataFrame
output_df = pd.DataFrame(output_data)

# Save as CSV file
output_csv = 'output.csv'  # Output file name
output_df.to_csv(output_csv, index=False, quoting=1)  # quoting=1 ensures strings are quoted

print(f"Conversion completed, saved as {output_csv}")

Once the data preparation is complete, the data can be imported into TiDB Cloud Serverless.

Importing Data to TiDB Cloud Serverless

TiDB Cloud Serverless provides three import methods:

In this case, we will use the “Upload a local file” method.

CSV files smaller than 50 MiB can be uploaded using the first option for uploading local files. If the file exceeds 50 MiB, a script can be used to split the file into smaller chunks before uploading:

After uploading the file, select the previously created database and table, and click define table:

Adjust the mappings as needed, then click start import:

For more import methods, you can refer to the documentation: Migration and Import Overview.

Validation of Results

After successfully importing the data, the next step is to validate it. I modified the code for the RAG application to read vector data from both Milvus and TiDB. Using the same question, I queried the large model to return answers and checked whether the answers were similar.

from langchain_community.llms import Ollama
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain import hub
from langchain.chains import RetrievalQA
from langchain.vectorstores.milvus import Milvus
from langchain_community.embeddings.jina import JinaEmbeddings
from langchain_community.vectorstores import TiDBVectorStore
import os

llm = Ollama(
model="llama3",
callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]
),
stop=["<|eot_id|>"],
)


embeddings = JinaEmbeddings(jina_api_key="xxxx", model_name="jina-embeddings-v2-small-en")

vector_store_milvus = Milvus(
    embedding_function=embeddings,
    connection_args={"uri": "http://10.3.xx.xx:19530"},
)


TIDB_CONN_STR="mysql+pymysql://xxxx.root:password@host:4000/test?ssl_ca=/Downloads/isrgrootx1.pem&ssl_verify_cert=true&ssl_verify_identity=true"
vector_store_tidb = TiDBVectorStore(
    connection_string=TIDB_CONN_STR,
    embedding_function=embeddings,
    table_name="langchain_vector",
)


os.environ["LANGCHAIN_API_KEY"] = "xxxx"
query = input("\nQuery: ")
prompt = hub.pull("rlm/rag-prompt")   

qa_chain = RetrievalQA.from_chain_type(
    llm, retriever=vector_store_milvus.as_retriever(), chain_type_kwargs={"prompt": prompt}
)
print("milvus")
result = qa_chain({"query": query})

print("\n--------------------------------------")
print("tidb")
qa_chain = RetrievalQA.from_chain_type(
    llm, retriever=vector_store_tidb.as_retriever(), chain_type_kwargs={"prompt": prompt}
)
result = qa_chain({"query": query})

The connection string for TiDB can be obtained directly from TiDB Cloud Serverless:

After posing the question to the RAG application and reviewing the responses, I found that the answers from Milvus and TiDB were essentially consistent, indicating that the vector migration was successful. It is also advisable to compare the number of data entries in Milvus and TiDB tables; if they match, the migration should be considered successful.