-
Notifications
You must be signed in to change notification settings - Fork 2
AI Documentation
In this Section, I will go Through a detailed documentation about all the challanges I faced in my AI-integration and my approach in solving them, as well as some of the technologies I used.
- What is Langchain?
- Gemini vs. Gemma
- Docker user-guide for running Gemma locally
- Creating a dockerfile with Ollama
- Docker issues and challenges
- Ollama with Langchain for running local models
- RAG (Retrieval Augment Generation)
- Project AI architecture
- Web Scraping & Web Loaders
- Prompt Engineering
- Output Parsers and its challenges
- Real-life stream of model responses
- Integrating Real-Time Data Streams in Flutter UI
- LangServe and FastAPI for deployment
- Gemini with Langchain flutter package
- Building Gemma AI from source code
LangChain is a powerful framework designed to streamline the development of applications that utilize large language models (LLMs). It offers tools and abstractions that simplify integrating LLMs into a variety of tasks, including natural language processing, conversational AI, and data extraction. By providing components like chains, agents, and prompts, LangChain allows developers to build complex, multi-step workflows and applications with minimal effort.
-
Langchain: This contains the previous release to avoid breakage of apps that was developed by the previous version.
-
Langchain Community: An ecosystem of third-party integrations encompassing various components for LangChain applications. This ecosystem offers a wide range of options, including Model providers, Vector stores, Agent tooling and retrievals.
-
Langchain Core: The core building blocks of LangChain. It provides fundamental abstractions for constructing LLM applications.
-
LangSmith: acts as a unified DevOps platform for LangChain applications, streamlining development, collaboration, testing, and deployment. “BEST FOR DEBUGGING!”
-
LangServe: simplifies LangChain application deployment by transforming them into REST APIs. This enables remote user interaction via a standard web interface or integration within mobile apps like Flutter!
-
Models: This component interacts with various LLM models, allowing you to integrate different models into your LangChain apps.
-
Prompts: Prompts are essentially instructions or queries that you provide to the LLM model. LangChain offers functionalities to construct and manage prompts effectively.
-
Output parsers: LangChain's output parsers transform raw LLM responses into structured data, making them easier to integrate and use in downstream applications. (i.e. Parsing into Json or KML structure)
-
Chains: Chains represent sequences of predefined steps that guide the interaction with the LLM model. They offer a structured approach to LLM interaction.
-
Agents: Agents are decision-making components within LangChain. They can analyze the outputs from the LLM model and determine subsequent actions or prompts.(i.e. Search Engines, Web Scraping, API integration, Calculator integration)
-
Embeddings and Vector Stores: This component allows you to incorporate custom data and leverage vector representations for various tasks within - LangChain.(i.e. Qdrant, Faiss, ChromaDB, Pinecone)
-
Retrieval Strategies: Retrieval strategies deal with how LangChain retrieves relevant information from external sources, such as databases or documents. (So the model can now answer you any question you want about your OWN data!)
There is also a flutter package of langchain-flutter available for seamless integration with flutter. However, it is still under development.
Gemini and Gemma are two Google large language models that serve different purposes within the realm of AI.
Gemini is a more advanced and versatile model, designed for complex tasks and capable of handling a wide range of queries with improved accuracy and reduced hallucinations. It's optimized for real-time applications, making it suitable for scenarios requiring quick responses and high reliability.
Gemma, on the other hand, is an earlier model that, while functional, may not perform as well in terms of speed or accuracy compared to Gemini. Gemma was often used for foundational tasks but has since been largely succeeded by Gemini due to the latter's enhanced capabilities.
My project initially relied on the Gemma model, which was specifically tailored to run locally. To achieve this, I used Ollama, LangChain, and LangServe in Python, and I also implemented a Docker image to facilitate hosting on any server. However, due to Gemma's very slow response time on a limited-resource GPU, we transitioned to the Gemini API towards the end of the project. For this integration, I also utilized the LangChain Flutter package.
- You should have at least 8GB VRAM for the Gemma 9b model to run smoothly
- You should have a Good GPU, or else it would take alot of time to show the response. For example a GPU GTX 1660TI usually responsed within 6-8 minutes response time.
If you do not have a good GPU or no GPU at all, do not try to run this, otherwise your PC will crash!!!!!!!!!!!!!!!!
- Allow the firewall to access port 8085:
sudo firewall-cmd --add-port=8085/tcp --permanent
sudo firewall-cmd --reload
- Check if Nvidia Driver is installed and install if not
Check if Nvidia Driver Exists:
nvidia-smi
Installing Nvidia Driver if not:
sudo dnf install kernel-devel-$(uname -r) kernel-headers-$(uname -r)
sudo dnf install https://dl.fedoraproject.org/pub/epel/epel-release-latest-9.noarch.rpm
sudo dnf config-manager --add-repo https://developer.download.nvidia.com/compute/cuda/repos/rhel9/x86_64/cuda-rhel9.repo
sudo dnf module install nvidia-driver:latest-dkms
sudo dnf install cuda-toolkit
- Install the Nvidia Cuda Toolkit:
curl -s -L https://nvidia.github.io/libnvidia-container/stable/rpm/nvidia-container-toolkit.repo | sudo tee /etc/yum.repos.d/nvidia-container-toolkit.repo
sudo yum install -y nvidia-container-toolkit
sudo nvidia-ctk cdi generate --output=/etc/cdi/nvidia.yaml
- Configure docker to use the GPU
sudo nvidia-ctk runtime configure --runtime=docker
sudo systemctl restart docker
- Pull the image
docker pull mahinour/gemma9b-ai-touristic-tool:latest
- Run the container with enabling the GPU and using a volume for the Ollama Models
docker run --gpus=all -v ollama:/root/.ollama -p 8085:8085 mahinour/gemma9b-ai-touristic-tool
If you reached this step, Congratulations you can now use Gemma locally on your machine!
Some of the source links:
- https://docs.nvidia.com/ai-enterprise/deployment-guide-rhel-with-kvm/0.1.0/podman.html
- https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/install-guide.html
- https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/cdi-support.html
To be able to use Ollama Inside a docker Image, you have to do the following:
- Add the installation command in the dockerfile
RUN apt-get update \
&& curl https://ollama.com/install.sh | sh \
&& apt-get autoremove -y \
&& apt-get clean -y \
&& rm -rf /var/lib/apt/lists/*
- Add a volume to save you pulled models by setting up Ollama directory:
ENV OLLAMA_MODELS=/root/.ollama
- Pull the models in the start script:
RUN echo '#!/bin/bash\nollama serve &\nsleep 5\nollama pull gemma2:9b-instruct-q4_K_M\nollama pull nomic-embed-text:v1.5\npython3 /app/app.py' > /app/start.sh \
&& chmod +x /app/start.sh
One of the main challenges I encountered was realizing the need to open firewall access to the port and ensure that both the NVIDIA driver and toolkit were installed for Docker to detect the GPU.
While the container worked well on my local machine, it often didn't perform as expected on the AI server at Lleida Lab. To resolve this and gain better insights for debugging, I attempted to replicate the environment of the Liquid Galaxy AI server on my machine.
The LG server runs on Rocky Linux 9. Having a system similar to this on your computer can help you debug unexpected problems that may occur on their system and fix them.
- Installing WSL
Windows Subsystem for Linux allows developers to access the power of both Windows and Linux at the same time on a Windows machine. The easiest supported way of installing WSL for the first time is using the installation command in the command prompt.
- Type CMD into your search bar.
- Right-click the "Command Prompt" and open it as an Administrator.
- Type in the following command:
wsl --install
This command will enable the features necessary to run WSL and install the Ubuntu distribution of Linux.
- Downloading Rocky Linux
Download the Official Rocky Linux 9 image from the CDN
- Importing Rocky Linux into WSL
Now we need to import Rocky Linux into our WSL in order to be able to run the distro on our windows machine.
- Type CMD into your search bar.
- choose the "Command Prompt".
- Type the following command
wsl --import <Distro> <InstallLocation> <FileName>
where:
-
<Distro>
is the name you want for your distribution. for example let's call it "Rocky-9" -
<InstallLocation>
is where you want you distribution to be installed. let's install it in "C:\Rocky-9" -
<FileName>
is the path to your Rocky Linux 9 image. let's assume it was downloaded to the Downloads folder "C:\Users\USERNAME\Downloads"
So the final command would be
wsl --import Rocky-9 "C:\Rocky-9" "C:\Users\USERNAME\Downloads\Rocky-9-Container-Base.latest.x86_64.tar.xz"
- Running your Rocky Linux Instance
You can now run your Rocky Linux distribution at any time.
- Type CMD into your search bar.
- choose the "Command Prompt".
- Type the following command:
wsl -d <Distro>
or in our case:
wsl -d Rocky-9
Please refer to the official Rocky Linux Installation for more details!
To run the Gemma model locally with LangChain and leverage its chaining mechanism, I found that the most effective solution was through Ollama. Initially, I attempted to use TensorFlow and Hugging Face, but LangChain's integration with these platforms wasn't fully supported specifically for Gemma locally, leading to several issues. This is why I ultimately chose Ollama to run my models locally with LangChain. Fortunately, Langchain supported Ollama, which made it much easier for this integration to occur.
To achieve this, all you need to do is download Ollama on your machine, pull your models:
ollama pull `model-name`
then you can easily integrate those lines in your code:
llm = Ollama(model="`model-name`", num_ctx=8192)
embeddings = OllamaEmbeddings(model='`model-name`', num_ctx=8192, show_progress=True)
Check out the Ollama official guide for more information!
Retrieval-Augmented Generation (RAG) is an advanced technique that enhances the capabilities of large language models (LLMs) by combining them with retrieval systems. The primary goal of RAG is to improve the accuracy, relevance, and factuality of generated content by incorporating external knowledge during the generation process.
RAG operates in two main phases:
-
Retrieval Phase: In this phase, relevant documents or pieces of information are retrieved from a pre-existing knowledge base, database, or external source based on the input query. This step ensures that the model has access to up-to-date and contextually relevant information.
-
Generation Phase: The retrieved information is then used as context for the language model to generate a response or output. By grounding the generation in real-world data, RAG helps reduce hallucinations (fabricated or incorrect information) and enhances the overall quality of the output.
-
🚀 Staying Up-to-Date: RAG is particularly beneficial because LLMs like GPT, Gemma, and Gemini are trained on data up to a certain point (e.g., until 2023) and lack knowledge of more recent information. This limitation can lead to outdated responses and hallucinations—where the model generates inaccurate or fabricated information. RAG mitigates these issues by incorporating up-to-date, relevant data during the generation process, ensuring more accurate and reliable outputs.
-
💡 Customization: RAG also allows you to train models on your own data without the need for extensive fine-tuning. By retrieving relevant information from your specific data sources, RAG enables the model to provide contextually accurate answers, making it easier to adapt to new information or specific domains.
RAG (Retrieval-Augmented Generation):
- Adaptability: RAG can quickly adapt to new information by retrieving and incorporating relevant data on the fly, without modifying the underlying model.
- Cost-Efficiency: It’s more cost-effective as it doesn’t require the computational resources and time associated with fine-tuning large models.
- Flexibility: RAG can handle a wide range of topics by retrieving domain-specific information as needed, making it versatile for various applications.
Fine-Tuning:
- Customization: Fine-tuning involves updating the model’s parameters with new data, allowing for highly customized behavior and performance on specific tasks.
- Resource Intensive: It requires significant computational resources and time, especially with large models.
- Fixed Knowledge: Once fine-tuned, the model’s knowledge is static until further fine-tuning, making it less adaptable to new or evolving information.
RAG is highly versatile in the types of data it can leverage to enhance the performance of language models. Here are some common data sources that can be used with RAG:
-
Databases: Structured data from SQL or NoSQL databases can be retrieved and utilized by RAG to provide precise, data-driven responses.
-
Documents (PDFs, Word, etc.): RAG can access and retrieve relevant information from various document formats like PDFs, Word documents, and more, making it ideal for knowledge retrieval from existing reports, research papers, or manuals.
-
Spreadsheets (Excel, CSV): Information stored in spreadsheets can be queried to pull out specific data points, trends, or statistics, which the model can then incorporate into its responses.
-
Web Scraping: RAG can pull in the most recent data from websites through web scraping, allowing the model to provide insights based on the latest available information online.
-
APIs: External APIs can serve as a dynamic data source, offering real-time data that RAG can use to enrich its outputs.
-
Knowledge Bases: Pre-existing knowledge bases, whether proprietary or open-source, can be tapped by RAG to add authoritative information to the model's responses.
This project integrates Langchain and Langserve to create a robust system that leverages the Gemma 9b language model (from Ollama) for processing user queries with real-time, contextually rich responses. The architecture is designed to efficiently gather, process, and respond to user queries by incorporating semantic search, embedding models, and a streamlined interaction flow between various components.
-
Gathering Relevant URLs: The process begins when a user submits a query. The system performs a simple Google search to collect relevant URLs. These URLs represent potential sources of information that will be used to generate customized responses.
-
Web Scraping: Once the URLs are identified, the system scrapes the top 10 web pages using Asynchromium loaders combined with Playwright. This approach efficiently fetches the web data and converts it from HTML into plain text format (HTML2Text).
-
Chunking: After converting the web pages into text, the content is divided into smaller, manageable chunks. This step is crucial for handling large documents and ensures that the data is organized effectively for further processing.
-
Embedding Model: Each chunk is processed through an embedding model, which transforms the text into numerical vectors. These vectors capture the semantic meaning of the text and are stored for later use in semantic searches. This enables the system to match user queries with the most relevant and contextually appropriate information.
- The embeddings generated from the content chunks are stored in a database. When a user submits a query, the system performs a semantic search against these embeddings to identify and retrieve the most relevant pieces of information.
-
User Query Input: Users interact with the system through an interface (e.g., a tablet application). The user query is sent to the Langserve application via a FastAPI endpoint.
-
Langserve Endpoint: The Langserve application manages the interaction between the user and the language model, receiving the user query and preparing it for processing.
-
Pre-Prompting and Prompt Template: The system pre-processes the user query using a prompt template. This step structures the query in a format that the language model can efficiently understand and respond to.
-
Language Model (LLM): The heart of the system is the Gemma 9b model from Ollama. It processes the pre-prompted query, utilizing the results from the semantic search and any additional context provided to generate a coherent response.
-
Output Parser: The output from the language model is then parsed and formatted into well-defined JSON to ensure clarity and user-friendliness.
-
Formatted Output: The final, formatted response is sent back to the user through the interface, completing the interaction cycle.
-
Langserve and Langchain Integration: The Langserve application is tightly integrated with Langchain, ensuring seamless communication between user queries, the language model, and the embedding database.
-
Handling Extra Context: The architecture allows for additional context to be included in the language model's processing, enhancing the relevance and accuracy of the generated responses.
-
Docker Implementation: To facilitate deployment, the entire application stack, including Langchain, Langserve, and the Gemma 9b model, is containerized using Docker. This ensures that the system can be easily deployed and scaled on any server environment.
Since web scraping was a crucial component of my application, I explored various methods to fetch URLs, preferably at no cost. Here are some of the approaches I considered:
- DuckDuckGo Search: This search engine was available in Langchain integrations and provided good results for free.
from langchain_community.tools import DuckDuckGoSearchResults
search = DuckDuckGoSearchResults()
print(search.run("Best salons in cairo Egypt"))
This code retrieves all links related to your query along with some extra snippets. While DuckDuckGo was a viable option, we ultimately aimed to use Google Search.
- SerpSearchTool: This tool scrapes Google Search results in real-time but is not free and comes with some limitations.
class SerpSearchTool:
def __init__(self, api_key, query):
self.api_key = api_key
self.query = query
def run(self):
url = "https://google.serper.dev/search"
payload = json.dumps({
'q': self.query,
'type': "news",
'num': 5,
'hl': 'en',
})
headers = {
'X-API-KEY': self.api_key,
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
response_data = response.json()
return response_data['news']
- Google Search: This approach proved to be the best, as it was free and utilized the Google Search engine. I implemented it in both Python for Gemma and Flutter for Gemini
The python function:
def scrape_urls(user_query):
general_fetched_urls=[]
try:
from googlesearch import search
except ImportError:
print("No module named 'google' found")
for url in search(user_query,num_results=10, ssl_verify=True, safe="active", lang="en"):
if('tripadvisor' not in url):
general_fetched_urls.append(url)
print(url)
if len(general_fetched_urls) >= 10:
print('Fetched 10 URLs')
break
return general_fetched_urls
The Flutter Function:
Future<List<String>> fetchUrls(
String term, {
int numResults = 40,
String lang = 'en',
Duration? sleepInterval,
Duration? timeout,
String? safe = 'active',
String? region,
}) async {
const _userAgentList = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',
'Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Mobile/15E148 Safari/604.1',
'Mozilla/5.0 (Linux; Android 13; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:115.0) Gecko/20100101 Firefox/115.0'
];
final results = <String>[];
int start = 0;
int fetchedResults = 0;
final client = http.Client();
final random = Random();
while (fetchedResults < numResults) {
final uri = Uri.https(
'www.google.com',
'/search',
{
'q': '$term -tripadvisor -inurl:http',
'num': '${numResults + 2}',
'hl': lang,
'start': '$start',
'safe': safe,
'gl': region,
},
);
final response = await client.get(
uri,
headers: {
'User-Agent': _userAgentList[random.nextInt(_userAgentList.length)],
},
).timeout(timeout ?? const Duration(seconds: 60));
print('response code: ${response.statusCode}');
if (response.statusCode != 200) {
if (response.statusCode == 429) {
return [];
} else {
continue;
}
}
final soup = BeautifulSoup(response.body);
soup.findAll('style').forEach((final element) => element.extract());
soup.findAll('script').forEach((final element) => element.extract());
final resultElements = soup.findAll('div', class_: 'g');
int newResults = 0;
for (final resultElement in resultElements) {
final linkElement = resultElement.find('a', attrs: {'href': true});
final link = linkElement?['href'];
if (link != null && link.isNotEmpty) {
final uri = Uri.tryParse(link);
if (uri != null && uri.scheme == 'https') {
if (link.startsWith("https://www.google") ||
link.startsWith("https://accounts")) {
continue;
}
if (results.contains(link)) {
continue;
}
results.add(link);
fetchedResults++;
newResults++;
}
}
if (fetchedResults >= numResults) {
break;
}
}
if (newResults == 0) {
break;
}
start += numResults;
await Future.delayed(sleepInterval ?? Duration.zero);
}
client.close();
return results;
}
Note that I have removed any "tripAdvisor" link, this is because during the web-loader process, an error occurred during scraping trip advisor: "Please enable JS and disable any ad blocker"
In conclusion, DuckDuckGo and Google Search were the top choices, with Google Search being the final option due to its comprehensive coverage and lack of cost.
After web scraping, I utilized Web Loaders to asynchronously load web content into documents. Once the content was loaded, I transformed the HTML into text, enabling me to split it into chunks and convert it into embeddings. I chose Asynchromium for this task because of its efficiency in handling asynchronous operations, allowing for faster and more scalable data processing.
Asynchromium was particularly well-suited for this project due to its ability to manage multiple concurrent tasks, reducing wait times and improving overall performance. Additionally, I employed Playwright, a powerful web automation tool, to facilitate the loading and interaction with web pages. Playwright's robust features, including multi-browser support and precise control over web elements, made it an excellent choice for ensuring accurate and reliable data extraction. The combination of Asynchromium and Playwright provided a highly effective solution for handling large volumes of web content efficiently.
To use Asynchromium, playwright had to be installed by those commands:
pip install playwright
playwright install --with-deps chromium
Then you can add your python code for web loading and turning into embeddings that can be saved in vectorstores such as ChromDB in this case:
general_fetched_urls = scrape_urls(user_query)
# general_fetched_urls=[]
# Load Data
loader = AsyncChromiumLoader(general_fetched_urls, user_agent="MyAppUserAgent")
docs = await loader.aload()
html2text = Html2TextTransformer()
docs_transformed = html2text.transform_documents(docs, kwargs={"parse_only": bs4.SoupStrainer(class_=("post-title", "post-header", "post-content"))})
# Filter out non-English documents
def is_english(text):
try:
return detect(text) == 'en'
except:
return False
docs_transformed_english = [doc for doc in docs_transformed if is_english(doc.page_content)]
# Split documents
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs_transformed_english)
# Store embeddings into vectorstore
vectorstore = Chroma.from_documents(documents=splits, embedding=embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
Prompt engineering is crucial in working with AI language models and other generative systems. It involves designing and refining prompts to elicit the most accurate, relevant, and useful responses from these models. The approach to crafting prompts needs to be adapted based on the specific characteristics and strengths of the LLM being used. By understanding these differences, users can tailor their prompts to achieve optimal performance and obtain the best results from various language models.
Example of the prompt I used in Gemma:
prompt_template='''
You are an advanced AI model acting as a touristic guide with extensive knowledge of various travel destinations and touristic information.
Use your internal knowledge and the provided context to generate a comprehensive and accurate response.
Your answer must include Top 10 places, not more not less, with a brief description of each place, and what makes it unique.
Your answer must include the accurate address of each of the 10 places too.
Your answer must include the country.
Your answer must include addtional information such as: pricing, rating, and amenities.
If any of the information is not available to you, leave empty.
Do not include null in your response.
Do not omit any place from the list for brevity.
Your answer MUST include all 10 places.
Make sure to include a "source" key with the URL from which the information for each place was retrieved.
Your answer must be in a Well-defined JSON format, with correct curly braces, commas, and quotes. Only user double quotes for strings in your JSON format.
Each Place should include the following details: name (string), address (string), city (string), country (string), description (string), pricing (string), rating (float), amenities (string), source (string).
If a string is empty, do not write null, just leave it as an empty string.
If a float is empty, write it as 0.0
The response should be in UTF-8 JSON format, all places enclosed in the 'places' field of the JSON to be returned without any extra comments or quote wrappers.
The response should not be enclosed in a code section.
CONTEXT:
{context}
QUESTION: {question}
YOUR ANSWER:"""
'''
prompt = PromptTemplate(
template=prompt_template,
input_variables=["question"],
)
This example demonstrates how a well-engineered prompt can guide the model to generate structured and detailed responses in a specific format, ensuring that the output meets the desired criteria and quality standards.
Including JSON instructions was crucial to prevent parsing errors that could occur later.
Output parsers are essential tools for transforming and validating the responses generated by AI models into structured formats that can be effectively used by downstream systems. In the context of my project, which required JSON parsing, I explored various parser options to ensure that the output was correctly formatted and aligned with the expected schema.
-
Schema Validation: Ensuring that the output adheres to a specific schema is crucial for accurate data processing. Output parsers must validate that the response matches the defined structure, which can be challenging when dealing with complex or nested data formats.
-
Consistency: Different parsers offer varying levels of support for handling complex structures and maintaining consistency in the output. Choosing the right parser is essential for ensuring that the data remains reliable and usable.
-
Error Handling: Parsing errors can occur if the output does not conform to the expected format. Handling these errors gracefully and providing meaningful feedback for debugging is an important aspect of working with output parsers.
In my project, I experimented with several output parsers to find the most effective solution for JSON parsing:
- JsonOutputParser with pydantic objects:
My pydantic objects:
class PlaceData(BaseModel):
placeName: str = Field(description="Full name of the place")
cityName: str = Field(description="Name of the city")
countryName: str = Field(description="Name of the country")
address: List[str] = Field(description="a List of available addresses")
description: str = Field(description="A brief description of the place and what makes it unique")
ratings: float = Field(description="Average rating based on user reviews")
amenities: List[str] = Field(description="List of amenities available at the place (e.g., Wi-Fi, parking, restrooms, etc.)")
prices: str = Field(description="Any pricing available for entry or services")
class Places(BaseModel):
places: List[PlaceData] = Field(description="List of 10 places")
Then define my parser and use it in the prompt:
parser = JsonOutputParser(pydantic_object=Places)
prompt_template = '''
your prompt here...
{format_instructions}
QUESTION: {question}
YOUR ANSWER:"""
'''
prompt = PromptTemplate(
template=prompt_template,
input_variables=["question"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
This approach was not very efficient and did not parse correctly.
- Structured Output with ResponseSchema
In this approach I tried StructuredOutputParser from Langchain with response schema as shown:
response_schemas = [
ResponseSchema(name="PlacesList", description= """A List of 10 JSON objects matching the structure
{placeName: string, address: string, city: string, country: string, description: string, ratings: float, amenities: [string], prices: string}
""", type="List")
]
output_parser = StructuredOutputParser.from_response_schemas(response_schemas)
format_instructions = output_parser.get_format_instructions()
prompt_template = '''
Type your prompt here ...
{format_instructions}
QUESTION: {question}
YOUR ANSWER:"""
'''
prompt = PromptTemplate(
template=prompt_template,
input_variables=["question"],
partial_variables={"format_instructions": format_instructions},
)
However, this approach also contained some parsing errors and was not well-structured.
- Using Pydantic Output Parsers with Pydantic Objects:
In this approach, I utilized Langchain's PydanticOutputParser with structured Pydantic objects. I found that using get_format_instructions()
in earlier methods was insufficient for providing clear guidance on how to generate a well-defined JSON, which led to numerous parsing errors. Consequently, I opted to manually define the JSON instructions directly in the prompt, as demonstrated:
- Defining the pydantic objects:
class Place(BaseModel):
name: str = Field(description="Full name of the place", default=None)
address: str = Field(description="Address of the place", default=None)
city: str = Field(description="Name of the city", default=None)
country: str = Field(description="Name of the country", default=None)
description: str = Field(description="A brief description", default=None)
ratings: float = Field(description="Average rating", default=None)
amenities: str = Field(description="Amenities available", default=None)
price: str = Field(description="Pricing info", default=None)
class Places(BaseModel):
places: List[Place] = Field(description="List of Place dictionaries", default=None)
- Defining the parser:
parser = PydanticOutputParser(pydantic_object=Places)
- Write your normal prompt without format_instructions, but with extra instructions for the model to return a well-defined JSON output:
prompt_template = '''
Your normal prompt here....
Your answer must be in a Well-defined JSON format, with correct curly braces, commas, and quotes. Only user double quotes for strings in your JSON format.
Each Place should include the following details: name, address, city, country, description, pricing, rating, amenities.
The response should be in UTF-8 JSON format, all places enclosed in the 'places' field of the JSON to be returned without any extra comments or quote wrappers.
QUESTION: {question}
YOUR ANSWER:"""
'''
prompt = PromptTemplate(
template=prompt_template,
input_variables=["question",],
)
- Then chain everything together:
chain = prompt | llm | parser
The above snippets did not take into consideration RAG for simplicity.
The ability to stream responses from AI models in real-time is crucial, especially when dealing with large outputs or when immediate feedback is required. Streaming responses allows users to start processing and displaying data as it becomes available, rather than waiting for the entire response to be generated. This is very beneficial in our application so users can see the places progress, each place at a time instead of waiting for the whole 10 places to get generated!
LangChain provides robust support for streaming AI model responses, making it easier to implement real-time interaction with models. Two key functions that facilitate this are astream and stream.
-
astream: This asynchronous function enables streaming of model responses in real-time. It’s particularly useful when dealing with large datasets or when the response time is critical. By using astream, you can start processing parts of the model's response as soon as they are generated, without waiting for the entire output to be completed.
-
stream: Similar to astream, the stream function allows for real-time response streaming, but is designed for synchronous execution. This is ideal for applications where you need to maintain a simple synchronous workflow while still benefiting from the efficiency of streaming.
When deploying AI models using LangChain's LangServe, streaming support is seamlessly integrated. LangServe supports streaming endpoints, which means that clients can connect to these endpoints and receive real-time data as it is processed by the model. This is particularly advantageous in deployment scenarios where low latency and immediate data availability are critical.
To be able to stream the responses, you have to make sure your functions are async
and that you are using aload
instead of load
async def handle_request(user_query:str):
general_fetched_urls = scrape_urls(user_query)
# Load Data
loader = AsyncChromiumLoader(general_fetched_urls, user_agent="MyAppUserAgent")
docs = await loader.aload()
# rest of your code
- Streaming with Gemma through the code-base
To stream and just print your output in a terminal, you can easily add this function where you use LangChain's stream
method instead of invoke
:
for chunk in rag_chain.stream("question " "Theme Parks World Wide"):
print(chunk, end="|", flush=True)
- Streaming with Gemma through LangServe deployment
To be able to have a streaming endpoint, we can easily do that through Langserve as it offers endpoints such as:
- POST /your_runnable_path/invoke - invoke the runnable on a single input
- POST /your_runnable_path/batch - invoke the runnable on a batch of inputs
- POST /your_runnable_path/stream - invoke on a single input and stream the output
- POST /your_runnable_path/stream_log - invoke on a single input and stream the output, including output of intermediate steps as it's generated = POST /your_runnable_path/astream_events - invoke on a single input and stream events as they are generated, including from intermediate steps.
- GET /your_runnable_path/input_schema - json schema for input to the runnable
- GET /your_runnable_path/output_schema - json schema for output of the runnable
- GET /your_runnable_path/config_schema - json schema for config of the runnable
All you have to do is add_routes
to your runnable chain.
add_routes(app,
runnable= chain,
path="/rag",
enable_feedback_endpoint=True,
enable_public_trace_link_endpoint=True,
playground_type="default"
)
Please refer to LangServe documentation for more information!
After this, you will need to integrate it with your app, in this case our flutter application as a service
.
- You have to ensure that the return type of your function is
Stream<dynamic>
- You have to add
async*
to your function - Make your request to your desired LangServe endpoint, in this case
stream_events
final http.Client _client = http.Client();
final rag_url = Uri.parse('$baseUrl/rag/stream_events');
final rag_headers = {
'Content-Type': 'application/json',
'accept': 'text/event-stream',
};
final rag_body = json.encode({'input': input});
final request = http.Request('POST', rag_url)
..headers.addAll(rag_headers)
..body = rag_body;
- Send your request
final rag_response = await _client.send(request);
- define your stream
final stream = rag_response.stream;
- Start your streaming process, and use
yield
to yield data as soon as they are generated, and do not forget to addawait
Full Function implemented for Streaming Gemma responses through stream_events
endpoint with different kind of messages for progress bars, and full result at the end.
Stream<dynamic> postaStreamEventsGemma(BuildContext context,
{required String input}) async* {
final http.Client _client = http.Client();
final rag_url = Uri.parse('$baseUrl/rag/stream_events');
final rag_headers = {
'Content-Type': 'application/json',
'accept': 'text/event-stream',
};
final rag_body = json.encode({'input': input});
final request = http.Request('POST', rag_url)
..headers.addAll(rag_headers)
..body = rag_body;
if (!await isServerAvailable()) {
yield {
'type': 'error',
// 'data': 'The server is currently unavailable. Please try again later.'
'data': AppLocalizations.of(context)!
.aiGenerationAPIGemma_serverNotAvailable
};
return;
}
final rag_response = await _client.send(request);
final stream = rag_response.stream;
Map<String, dynamic> outputStrJson;
List<PlacesModel> pois = [];
try {
await for (var event
in stream.transform(utf8.decoder).transform(LineSplitter())) {
if (event.startsWith('data: ')) {
String jsonData = event.substring(6);
Map<String, dynamic> jsonMap = jsonDecode(jsonData);
String chunk = '';
if (jsonMap.containsKey('event') &&
jsonMap['event'] == 'on_chain_start') {
yield {
'type': 'message',
// 'data': "RAG (Retrieval Augmented Generation) Chain Starting ..."
'data': AppLocalizations.of(context)!
.aiGenerationAPIGemma_progressMessages1
};
} else if (jsonMap.containsKey('event') &&
jsonMap['event'] == 'on_retriever_start') {
yield {
'type': 'message',
// 'data': "Getting Retrieval ready..."
'data': AppLocalizations.of(context)!
.aiGenerationAPIGemma_progressMessages2
};
} else if (jsonMap.containsKey('event') &&
jsonMap['event'] == 'on_retriever_end') {
yield {
'type': 'message',
// 'data': "Retrieval Initialized ..."
'data': AppLocalizations.of(context)!
.aiGenerationAPIGemma_progressMessages3
};
} else if (jsonMap.containsKey('event') &&
jsonMap['event'] == 'on_prompt_start') {
yield {
'type': 'message',
// 'data': "Preparing Prompt ..."
'data': AppLocalizations.of(context)!
.aiGenerationAPIGemma_progressMessages4
};
} else if (jsonMap.containsKey('event') &&
jsonMap['event'] == 'on_prompt_end') {
yield {
'type': 'message',
// 'data': "Prompt Ready ..."
'data': AppLocalizations.of(context)!
.aiGenerationAPIGemma_progressMessages5
};
} else if (jsonMap.containsKey('event') &&
jsonMap['event'] == 'on_llm_start') {
yield {
'type': 'message',
// 'data': "Getting Gemma LLM Model ready..."
'data': AppLocalizations.of(context)!
.aiGenerationAPIGemma_progressMessages6
};
} else if (jsonMap.containsKey('event') &&
jsonMap['event'] == 'on_llm_stream') {
chunk = jsonMap['data']['chunk'];
yield {'type': 'chunk', 'data': chunk};
} else if (jsonMap.containsKey('event') &&
jsonMap['event'] == 'on_llm_end') {
yield {
'type': 'message',
// 'data': "End of Chain ..."
'data': AppLocalizations.of(context)!
.aiGenerationAPIGemma_progressMessages7
};
} else if (jsonMap.containsKey('event') &&
jsonMap['event'] == 'on_chain_end' &&
jsonMap['name'] == '/rag') {
// the output
outputStrJson = jsonMap['data']['output'];
List<dynamic> places = outputStrJson['places'];
for (int i = 0; i < places.length; i++) {
Map<String, dynamic> place = places[i];
String location =
'${place['name']}, ${place['address']}, ${place['city']}, ${place['country']}';
MyLatLng latlng =
await GeocodingService().getCoordinates(location);
PlacesModel poi = PlacesModel(
id: i + 1,
name: place['name'],
description: place['description'],
address: place['address'],
city: place['city'],
country: place['country'],
ratings: place['ratings'],
amenities: place['reviews'],
price: place['price'],
sourceLink: place['sourceLink'],
latitude: latlng.latitude,
longitude: latlng.longitude,
);
pois.add(poi);
}
yield {'type': 'result', 'data': pois};
}
}
}
} catch (e) {
yield {
'type': 'error',
// 'data': 'An error occurred while generating the response: $e'
'data': AppLocalizations.of(context)!
.aiGenerationAPIGemma_errorresponsegeneration(e.toString())
};
} finally {
_client.close();
}
}
During refactoring the code from Gemma to Gemini, the streaming was handled differently
- You have to ensure that the return type of your function is
Stream<dynamic>
- You have to add
async*
to your function - define your chain, and call
stream
method
final stream = chain.stream(userQuery);
- yield your data and do not forget to add
await
await for (var result in stream) {
final placeCount = RegExp(r'name').allMatches(result.toString()).length;
if (result.toString().contains('name:')) {
if (currPlaceCount < placeCount) {
currPlaceCount = placeCount;
yield {
'type': 'message',
'data': 'Streaming',
};
}
}
yield {
'type': 'stream',
'data': result,
};
}
yield {
'type': 'message',
'data': 'Preparing visualizations'
};
In Flutter, you define a StreamController
to manage the stream of events. You then use a StreamBuilder
to listen to this stream and update the UI in real-time based on the stream's data
-
StreamController: Manages the stream of events or data. You can add data to the stream by using the add method, and close the stream when done.
-
StreamBuilder: Listens to the stream and rebuilds the UI whenever new data is emitted. It acts as a subscriber to the stream and helps update the UI in real-time as the data changes.
- Step1: Define your stream controllers as
late
:
late StreamController<dynamic> _messageController;
late StreamController<dynamic> _chunkController;
late StreamController<dynamic> _errorController;
- Step2: Initialize the stream controllers in init function and call your streaming endpoint
Here I have messageController
for progress bar intuitive messages, as well as chunkController
for streaming each chunk, as well as errorController
for showing any errors.
@override
void initState() {
super.initState();
_messageController = StreamController();
_chunkController = StreamController();
_errorController = StreamController();
GemmaApiServices()
.postaStreamEventsGemma(input: widget.query, context)
.listen((event) {
if (mounted) {
setState(() {
if (event['type'] == 'chunk') {
_chunkController.sink.add(event['data']);
} else if (event['type'] == 'message') {
ModelErrorProvider errProvider =
Provider.of<ModelErrorProvider>(context, listen: false);
errProvider.hasStarted = true;
_currProgress++;
_messageController.sink.add(event['data']);
} else if (event['type'] == 'result') {
_isFinished = true;
_pois.addAll(event['data']);
} else if (event['type'] == 'error') {
_isError = true;
ModelErrorProvider errProvider =
Provider.of<ModelErrorProvider>(context, listen: false);
errProvider.isError = true;
_errorController.sink.add(event['data']);
if (event['data'].toString() ==
'The server is currently unavailable. Please try again later.') {
Connectionprovider connection =
Provider.of<Connectionprovider>(context, listen: false);
connection.isAiConnected = false;
}
}
});
}
});
}
- Step3: Remember to close the subscription in the dispose method
@override
void dispose() {
_messageController.close();
_chunkController.close();
_errorController.close();
super.dispose();
}
- Step4: Define the listener widgets that should listen to the stream and update the UI accordingly, and wrap them with
StreamBuilder
Example:
StreamBuilder<dynamic>(
stream: _chunkController.stream,
builder: (context, snapshot) { .... } );
Langserve is a specialized framework designed for deploying AI models and applications with a focus on real-time performance and scalability. Built on top of FastAPI, Langserve leverages FastAPI's capabilities to create high-performance web applications. FastAPI's features, including its support for asynchronous programming and real-time data handling, make it ideal for serving machine learning models efficiently.
Langserve supports streaming responses, which are essential for applications that need to handle continuous data streams or provide real-time updates. Its architecture is optimized for low-latency interactions, ensuring that your AI-powered services can respond quickly to user queries. By combining Langserve with FastAPI, you can deploy interactive AI applications with seamless integration of real-time data processing and efficient API management.
Refer to the official documentation for more details!
There are 2 ways where you can leverage Langserve
1. Using the LangServe generated endpoints
To utilize the LangServe endpoints in your application, follow these steps:
-
Define the Request Handler Function: Create an asynchronous function,
handle_request
, that returns the LangChain chain:
async def handle_request(user_query:str):
# code logic here...
rag_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
| parser
)
return rag_chain
- Initialize the Runnable Chain: Use the RunnableLambda to initialize the chain with your handle_request function:
chain = RunnableLambda(handle_request)
What is RunnableLambda RunnableLambda is a component in LangChain that allows you to wrap a regular function so it can be executed as part of a LangChain pipeline. It turns your function into a
runnable
object that integrates smoothly with LangChain's chaining mechanism.
Why Use It? Seamless Integration: It lets you incorporate custom logic into LangChain pipelines easily. Asynchronous Execution: Handles asynchronous tasks efficiently. Modularity: Promotes reusable and modular code components.
In the context of our project, we used RunnableLambda to wrap the handle_request function: This allows the handle_request function, which encapsulates the core logic for processing user queries and generating responses, to be executed as part of the LangChain pipeline. By integrating it with RunnableLambda, you ensure that it can be managed and executed within the LangChain framework, enabling a more seamless and effective implementation of your application's logic.
- Add LangServe App Routes: Add the app routes provided by LangServe:
add_routes(app,
runnable= chain,
path="/rag",
enable_feedback_endpoint=True,
enable_public_trace_link_endpoint=True,
playground_type="default"
)
- Add Custom Endpoints: You can also define your own endpoints, such as a health check endpoint:
@app.get("/health")
async def health_check():
return JSONResponse(status_code=200, content={"status": "OK"})
-
Start the Server: Finally, add code to start your server using
uvicorn
:
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8085)
You can access the endpoints via the following URLs:
$baseUrl/rag/stream_events
$baseUrl/health
2. Making your own endpoints
For more customization, you can implement your own endpoints by following those steps:
- Define your
get_api_handler
by using aRunnableLambada
on yourhandle_request
function, and define yourpath
:
async def _get_api_handler() -> APIHandler:
"""Prepare a RunnableLambda."""
return APIHandler(RunnableLambda(handle_request), path="/rag")
- Define your endpoints:
@app.post("/rag/astream")
async def rag_astream(
request: Request, runnable: Annotated[APIHandler, Depends(_get_api_handler)]
) -> EventSourceResponse:
"""Handle astream request."""
# The API Handler validates the parts of the request
# that are used by the runnnable (e.g., input, config fields)
return await runnable.astream_events(request)
@app.post("/rag/streamlog")
async def rag_stream_log(
request: Request, runnable: Annotated[APIHandler, Depends(_get_api_handler)]
) -> EventSourceResponse:
"""Handle stream log request."""
# The API Handler validates the parts of the request
# that are used by the runnnable (e.g., input, config fields)
return await runnable.stream_log(request)
-
Start the Server: Finally, add code to start your server using
uvicorn
:
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8085)
Toward the end of the project, I needed to refactor the code to switch from using the Gemma API to the Gemini API. To make this change quickly and without disrupting the system's architecture, I utilized [Langchain flutter package[(https://pub.dev/packages/langchain)
This package was similar with some limitations to original LangChain Framework tailored for python. Let me walk you through some of the important code changes:
- Defining your model and your embeddings:
final embeddings = GoogleGenerativeAIEmbeddings(
apiKey: apiKey,
);
ChatGoogleGenerativeAI llm = ChatGoogleGenerativeAI(
apiKey: apiKey,
defaultOptions: ChatGoogleGenerativeAIOptions(
model: "gemini-1.0-pro",
),
);
- Defining vectorStore and Retrieval
final vectorStore = MemoryVectorStore(embeddings: embeddings);
final retriever = vectorStore.asRetriever();
final setupAndRetrieval = Runnable.fromMap<String>({
'context': retriever.pipe(
Runnable.mapInput(
(docs) => docs.map((d) => d.pageContent).join('\n')),
),
'question': Runnable.passthrough(),
});
- Defining the parser:
final outputParser = JsonOutputParser<ChatResult>();
- Defining your chain
Set<String> inputVariables = {};
inputVariables.add('question');
final chain = setupAndRetrieval
.pipe(
PromptTemplate(inputVariables: inputVariables, template: prompt))
.pipe(llm)
.pipe(outputParser);
- Stream or Invoke your response
final stream = chain.stream(userQuery);
//or final result = chain.invoke(userQuery);
If you are interested in running the AI-Gemma code by yourself, or even contributing to it, you can follow those steps:
-
Go to the project root directory
-
Navigate to AI_Gemma_Model directory
-
Navigate to gemma_app
-
Open a terminal in the current gemma_app directory
-
Create a new python env:
python -m venv env
- Go to your env directory and activate your env:
Scripts\activate
- Go back to gemma_app directory and install the requirements.txt file
pip install --no-cache-dir -r requirements.txt
- Install playwright
playwright install --with-deps chromium
-
Install Ollama according to you system enviornment (i.e. windows, linux or macOS)
-
Pull the Gemma model
gemma2:9b-instruct-q4_K_M
and Pull the embeddings modelnomic-embed-text:v1.5
ollama pull gemma2:9b-instruct-q4_K_M
ollama pull nomic-embed-text:v1.5
** If you prefer using any other models, you can check the Ollama Models and choose your preferred model then navigate to gemma_app directory, open app.py and change the model names:
llm = Ollama(model="gemma2:9b-instruct-q4_K_M", num_ctx=8192)
embeddings = OllamaEmbeddings(model='nomic-embed-text:v1.5', num_ctx=8192, show_progress=True)
You can also change the models parameters upon your preference. Refer to the documentation here
- Make sure you are in gemma_app directory and run the app file:
python app.py
This will start the server locally from the source-code without docker.
If you want to just run the docker container please follow the docker manual guide.