Introduction
You may simply create a easy software that may chat with SQL Database. However right here’s the issue with that. You may’t make it work seamlessly on the subject of dealing with and dealing with massive databases. If the database is big, it’s impractical to incorporate the entire listing of columns and tables within the immediate context. This text explains learn how to bypass this impediment by creating an AI software that may chat with large SQL databases.
Creating an AI Software to Chat with Large SQL Databases
The next code initiates a easy Streamlit software that permits customers to hook up with an SQL database and chat with it.
import streamlit as st
import requests
import os
import pandas as pd
from uuid import uuid4
import psycopg2
from langchain.prompts import ChatPromptTemplate
from langchain.prompts.chat import SystemMessage, HumanMessagePromptTemplate
from langchain.llms import OpenAI, AzureOpenAI
from langchain.chat_models import ChatOpenAI, AzureChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from dotenv import load_dotenv
# Create needed folders
folders_to_create = ['csvs']
for folder_name in folders_to_create:
if not os.path.exists(folder_name):
os.makedirs(folder_name)
print(f"Folder '{folder_name}' created.")
else:
print(f"Folder '{folder_name}' already exists.")
# Load the OpenAI API key
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
llm = OpenAI(openai_api_key=openai_api_key)
chat_llm = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.4)
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
def get_basic_table_details(cursor):
question = """
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_name IN (
SELECT tablename FROM pg_tables WHERE schemaname="public"
);"""
cursor.execute(question)
return cursor.fetchall()
def save_db_details(db_uri):
unique_id = str(uuid4()).change("-", "_")
connection = psycopg2.join(db_uri)
cursor = connection.cursor()
tables_and_columns = get_basic_table_details(cursor)
df = pd.DataFrame(tables_and_columns, columns=['table_name', 'column_name', 'data_type'])
filename_t = f'csvs/tables_{unique_id}.csv'
df.to_csv(filename_t, index=False)
cursor.shut()
connection.shut()
return unique_id
def generate_template_for_sql(question, table_info, db_uri):
template = ChatPromptTemplate.from_messages([
SystemMessage(content=f"You are an assistant that can write SQL Queries. Given the text below, write a SQL query that answers the user's question. DB connection string is {db_uri} Here is a detailed description of the table(s): {table_info} Prepend and append the SQL query with three backticks '```'"),
HumanMessagePromptTemplate.from_template("{text}")
])
reply = chat_llm(template.format_messages(textual content=question))
return reply.content material
def get_the_output_from_llm(question, unique_id, db_uri):
filename_t = f'csvs/tables_{unique_id}.csv'
df = pd.read_csv(filename_t)
table_info = ''
for desk in df['table_name'].distinctive():
table_info += f'Details about desk {desk}:n'
table_info += df[df['table_name'] == desk].to_string(index=False) + 'nn'
return generate_template_for_sql(question, table_info, db_uri)
def execute_the_solution(resolution, db_uri):
connection = psycopg2.join(db_uri)
cursor = connection.cursor()
_, final_query, _ = resolution.break up("```")
cursor.execute(final_query.strip())
end result = cursor.fetchall()
return str(end result)
def connect_with_db(uri):
st.session_state.db_uri = uri
st.session_state.unique_id = save_db_details(uri)
return {"message": "Database connection established!"}
def send_message(message):
resolution = get_the_output_from_llm(message, st.session_state.unique_id, st.session_state.db_uri)
end result = execute_the_solution(resolution, st.session_state.db_uri)
return {"message": resolution + "nn" + "End result:n" + end result}
# Streamlit interface setup
st.subheader("Directions")
st.markdown("1. Enter your RDS Database URI under.n2. ")
The elemental technique for simplifying the immediate entails sending solely the related tables and column names that pertain to the person’s question. To realize this, we will generate embeddings for the desk and column names, dynamically retrieve essentially the most pertinent ones based mostly on the person’s enter, and embrace these within the immediate. On this article, we’ll make the most of ChromaDB as our vector database. Nevertheless, options like Pinecone, Milvus, or any appropriate vector database can be utilized.
How one can Simplify the Immediate
Let’s start by putting in ChromaDB.
pip set up chromadb
First, we’ll arrange a further folder named ‘vectors’ alongside the ‘csvs’ folder to retailer embeddings of desk and column names. This folder may comprise different pertinent database particulars, such because the international keys that hyperlink completely different tables, and potential values for the WHERE clause.
def generate_embeddings(filename, storage_folder):
csv_loader = CSVLoader(file_path=filename, encoding="utf8")
dataset = csv_loader.load()
vector_database = Chroma.from_documents(dataset, embedding=embeddings, persist_directory=storage_folder)
vector_database.persist()
We will even first verify whether or not the person’s question wants any details about tables or if the person is as a substitute asking about simply the overall schema of the database.
def check_user_intent_for_database_info_or_sql(question):
# Outline a template for the dialog
prompt_template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
"Based on the provided text, the user is asking a question about databases. "
"Determine if the user seeks information about the database schema or if they want to write a SQL query. "
"Respond with 'yes' if the user is seeking information about the database schema and 'no' if they intend to write a SQL query."
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
# Generate a response utilizing a language mannequin
response = chat_llm(prompt_template.format_messages(textual content=question))
print(response.content material)
return response.content material
The person responds with both ‘sure’ or ‘no’. If the reply is ‘sure’, a immediate is generated.
def generate_sql_query_prompt(question, db_uri):
# Configure the chat template with system and human messages
prompt_template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
"As an assistant tasked with writing SQL queries, create a SQL query based on the text below. "
"Enclose the SQL query within three backticks '```' for clarity. "
"Aim to use 'SELECT' queries as much as possible. "
f"The connection string for the database is {db_uri}."
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
# Generate and print the reply utilizing a language mannequin
response = chat_llm.prompt_template.format_messages(textual content=question))
print(response.content material)
return response.content material
If the reply is ‘no’, it signifies that the person’s question particularly requires the names of tables and columns inside these tables. We’ll then establish essentially the most related tables & columns, and assemble a string from these to incorporate in our immediate.
Subsequent, we’ll confirm that our vectors have been efficiently created and that every one different elements are functioning appropriately. Under is the entire code up thus far.
import streamlit as st
import requests
import os
import pandas as pd
from uuid import uuid4
import psycopg2
from langchain.prompts import ChatPromptTemplate
from langchain.prompts.chat import SystemMessage, HumanMessagePromptTemplate
from langchain.llms import OpenAI, AzureOpenAI
from langchain.chat_models import ChatOpenAI, AzureChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from dotenv import load_dotenv
from langchain.vectorstores import Chroma
from langchain.document_loaders.csv_loader import CSVLoader
# Create needed folders for knowledge storage
folders_to_create = ['csvs', 'vectors']
for folder_name in folders_to_create:
if not os.path.exists(folder_name):
os.makedirs(folder_name)
print(f"Folder '{folder_name}' created.")
else:
print(f"Folder '{folder_name}' already exists.")
# Load API key from surroundings variable
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
# Initialize language fashions and embeddings
llm = OpenAI(openai_api_key=openai_api_key)
chat_llm = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.4)
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
# Perform to retrieve fundamental desk particulars from the database
def get_basic_table_details(cursor):
cursor.execute("""
SELECT c.table_name, c.column_name, c.data_type
FROM information_schema.columns c
WHERE c.table_name IN (
SELECT tablename
FROM pg_tables
WHERE schemaname="public"
);""")
return cursor.fetchall()
# Perform to create vector databases from CSV information
def create_vectors(filename, persist_directory):
loader = CSVLoader(file_path=filename, encoding="utf8")
knowledge = loader.load()
vectordb = Chroma.from_documents(knowledge, embedding=embeddings, persist_directory=persist_directory)
vectordb.persist()
# Perform to save lots of database particulars and generate vectors
def save_db_details(db_uri):
unique_id = str(uuid4()).change("-", "_")
connection = psycopg2.join(db_uri)
cursor = connection.cursor()
tables_and_columns = get_basic_table_details(cursor)
df = pd.DataFrame(tables_and_columns, columns=['table_name', 'column_name', 'data_type'])
filename_t="csvs/tables_" + unique_id + '.csv'
df.to_csv(filename_t, index=False)
create_vectors(filename_t, "./vectors/tables_"+ unique_id)
cursor.shut()
connection.shut()
return unique_id
# Perform to generate SQL question templates
def generate_template_for_sql(question, table_info, db_uri):
template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
f"You are an assistant that can write SQL Queries. Given the text below, write a SQL query that answers the user's question. DB connection string is {db_uri}. Here is a detailed description of the table(s): {table_info} Prepend and append the SQL query with three backticks '```'"
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
return chat_llm(template.format_messages(textual content=question)).content material
# Perform to find out if person's question is about basic schema data or SQL
def check_if_users_query_want_general_schema_information_or_sql(question):
template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
f"In the given text, the user is asking a question about the database. Determine whether the user wants information about the database schema or wants to write a SQL query. Answer 'yes' for schema information and 'no' for SQL query."
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
reply = chat_llm(template.format_messages(textual content=question))
print(reply.content material)
return reply.content material
# Perform to immediate when person needs basic database data
def prompt_when_user_want_general_db_information(question, db_uri):
template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
"You are an assistant who writes SQL queries. Given the text below, write a SQL query that answers the user's question. Prepend and append the SQL query with three backticks '```' Write select query whenever possible Connection string to this database is {db_uri}"
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
reply = chat_llm(template.format_messages(textual content=question))
print(reply.content material)
return reply.content material
# Perform to course of person queries and generate outputs based mostly on whether or not it is about basic schema or particular SQL question
def get_the_output_from_llm(question, unique_id, db_uri):
filename_t="csvs/tables_" + unique_id + '.csv'
df = pd.read_csv(filename_t)
table_info = ''
for desk in df['table_name'].distinctive():
table_info += 'Details about desk ' + desk + ':n'
table_info += df[df['table_name'] == desk].to_string(index=False) + 'nnn'
answer_to_question_general_schema = check_if_users_query_want_general_schema_information_or_sql(question)
if answer_to_question_general_schema == "sure":
return prompt_when_user_want_general_db_information(question, db_uri)
return generate_template_for_sql(question, table_info, db_uri)
# Perform to execute SQL options
def execute_the_solution(resolution, db_uri):
connection = psycopg2.join(db_uri)
cursor = connection.cursor()
_, final_query, _ = resolution.break up("```")
final_query = final_query.strip('sql')
cursor.execute(final_query)
end result = cursor.fetchall()
return str(end result)
# Streamlit app setup and interplay dealing with
if __name__ == "__main__":
st.subheader("Directions")
st.markdown("""
1. Enter the URI of your RDS Database within the textual content field under.
2. Click on the **Begin Chat** button to begin the chat.
3. Enter your message within the textual content field under and press **Enter** to ship the message to the API.
""")
chat_history = []
uri = st.text_input("Enter the RDS Database URI")
if st.button("Begin Chat"):
if not uri:
st.warning("Please enter a sound database URI.")
else:
st.data("Connecting to the API and beginning the chat...")
chat_response = connect_with_db(uri)
if "error" in chat_response:
st.error("Error: Failed to begin the chat. Please verify the URI and check out once more.")
else:
st.success("Chat began efficiently!")
st.subheader("Chat with the API")
if "messages" not in st.session_state:
st.session_state.messages = []
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
if immediate := st.chat_input("What's up?"):
st.chat_message("person").markdown(immediate)
st.session_state.messages.append({"position": "person", "content material": immediate})
response = send_message(immediate)["message"]
with st.chat_message("assistant"):
st.markdown(response)
st.session_state.messages.append({"position": "assistant", "content material": response})
st.write("It is a easy Streamlit app for beginning a chat with an RDS Database.")
Within the subsequent step, we are going to carry out vector retrieval to establish essentially the most related tables. As soon as these tables are chosen, we are going to collect all of the column particulars to supply further context for our immediate. We’ll then compile this data right into a string to incorporate within the immediate.
# Initialize the vector database for storing desk embeddings
vectordb = Chroma(embedding_function=embeddings, persist_directory=f"./vectors/tables_{unique_id}")
retriever = vectordb.as_retriever()
docs = retriever.get_relevant_documents(question)
print(docs)
# Amassing the related tables and their columns
relevant_tables = []
relevant_table_details = []
for doc in docs:
table_info = doc.page_content.break up("n")
table_name = table_info[0].break up(":")[1].strip()
column_name = table_info[1].break up(":")[1].strip()
data_type = table_info[2].break up(":")[1].strip()
relevant_tables.append(table_name)
relevant_table_details.append((table_name, column_name, data_type))
# Load knowledge about all tables from a CSV file
filename_t = f'csvs/tables_{unique_id}.csv'
df = pd.read_csv(filename_t)
# Assemble a descriptive string for every related desk, together with all columns and their knowledge varieties
table_info = ''
for desk in relevant_tables:
table_info += f'Details about desk {desk}:n'
table_info += df[df['table_name'] == desk].to_string(index=False) + 'nnn'
def create_sql_query_template(question, relevant_tables, table_info):
tables_list = ",".be a part of(relevant_tables)
chat_template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
f"As an assistant capable of composing SQL queries, please write a query that resolves the user's inquiry based on the text provided. "
f"Consider SQL tables named '{tables_list}'. "
f"Below is a detailed description of these table(s): "
f"{table_info}"
"Enclose the SQL query within three backticks '```' for proper formatting."
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
response = chat_llm(chat_template.format_messages(textual content=question))
print(response.content material)
return response.content material
One remaining factor we will do is to offer details about international keys to the immediate.
import streamlit as st
import os
import pandas as pd
from uuid import uuid4
import psycopg2
from langchain.prompts import ChatPromptTemplate
from langchain.prompts.chat import SystemMessage, HumanMessagePromptTemplate
from langchain.llms import OpenAI
from langchain.chat_models import ChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from dotenv import load_dotenv
from langchain.vectorstores import Chroma
from langchain.document_loaders.csv_loader import CSVLoader
# Guarantee needed directories exist
folders_to_create = ['csvs', 'vectors']
for folder in folders_to_create:
os.makedirs(folder, exist_ok=True)
print(f"Listing '{folder}' checked or created.")
# Load surroundings and API keys
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
# Initialize language fashions and embeddings
language_model = OpenAI(openai_api_key=openai_api_key)
chat_language_model = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.4)
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
def fetch_table_details(cursor):
sql = """
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema="public";
"""
cursor.execute(sql)
return cursor.fetchall()
def fetch_foreign_key_details(cursor):
sql = """
SELECT conrelid::regclass AS table_name, conname AS foreign_key,
pg_get_constraintdef(oid) AS constraint_definition
FROM pg_constraint
WHERE contype="f" AND connamespace="public"::regnamespace;
"""
cursor.execute(sql)
return cursor.fetchall()
def create_vector_database(knowledge, listing):
loader = CSVLoader(knowledge=knowledge, encoding="utf8")
document_data = loader.load()
vector_db = Chroma(embeddings, persist_directory=listing)
vector_db.from_documents(document_data)
vector_db.persist()
def save_database_details(uri):
unique_id = str(uuid4()).change("-", "_")
conn = psycopg2.join(uri)
cur = conn.cursor()
particulars = fetch_table_details(cur)
df = pd.DataFrame(particulars, columns=['table_name', 'column_name', 'data_type'])
csv_path = f'csvs/tables_{unique_id}.csv'
df.to_csv(csv_path, index=False)
create_vector_database(df, f"./vectors/tables_{unique_id}")
foreign_keys = fetch_foreign_key_details(cur)
fk_df = pd.DataFrame(foreign_keys, columns=['table_name', 'foreign_key', 'constraint_definition'])
fk_csv_path = f'csvs/foreign_keys_{unique_id}.csv'
fk_df.to_csv(fk_csv_path, index=False)
cur.shut()
conn.shut()
return unique_id
def generate_sql_query_template(question, db_uri):
template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
f"You are an assistant capable of composing SQL queries. Use the details provided to write a relevant SQL query for the question below. DB connection string is {db_uri}."
"Enclose the SQL query with three backticks '```'."
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
response = chat_language_model(template.format_messages(textual content=question))
return response.content material
# Streamlit software setup
st.title("Database Interplay Software")
uri = st.text_input("Enter the RDS Database URI")
if st.button("Hook up with Database"):
if uri:
attempt:
unique_id = save_database_details(uri)
st.success(f"Linked to database and knowledge saved with ID: {unique_id}")
besides Exception as e:
st.error(f"Failed to attach: {str(e)}")
else:
st.warning("Please enter a sound database URI.")
In related methods, we will preserve enhancing this software by including fallbacks. In every fallback, we will preserve including further data.
Conclusion
This text presents a novel strategy to growing an AI software able to seamlessly interacting with large SQL databases by chat. Now we have addressed the problem of dealing with massive databases the place it’s impractical to incorporate the entire listing of columns and tables within the immediate. Our proposed resolution dynamically retrieves related desk and column names based mostly on person queries. We make sure the immediate contains solely pertinent data, enhancing person expertise and effectivity. That is executed by leveraging vector databases like ChromaDB for embedding technology and retrieval.
Now we have demonstrated learn how to streamline the interplay course of by step-by-step implementation and code examples. In the meantime, we additionally labored on repeatedly enhancing the applying’s performance. With additional enhancements corresponding to incorporating international keys and extra fallbacks, this software holds promise for various database interplay eventualities.