PDF RAG
This is a PDF-based RAG application. While answering questions, it accesses relevant information from the PDF and displays the corresponding paragraphs in the form of images.
APPLY = True
EAGER = False
COLLECTION_NAME = '<var:table_name>' if not APPLY else 'sample_pdf_rag'
from superduper import superduper, CFG
db = superduper('mongomock://test')
def getter():
import subprocess
import os
subprocess.run(['curl', '-O', 'https://superduperdb-public-demo.s3.amazonaws.com/pdfs.zip'])
subprocess.run(['unzip', '-o', 'pdfs.zip'])
subprocess.run(['rm', 'pdfs.zip'])
pdf_folder = "pdfs"
pdf_names = [pdf for pdf in os.listdir(pdf_folder) if pdf.endswith(".pdf")]
pdf_paths = [os.path.join(pdf_folder, pdf) for pdf in pdf_names]
data = [{"url": pdf_path, "file": pdf_path} for pdf_path in pdf_paths]
return data
if APPLY:
data = getter()
Create a table to store PDFs.
import os
from superduper import Table
table = Table(identifier=COLLECTION_NAME, fields={'url': 'str', 'file': 'file'})
if APPLY:
db.apply(table, force=True)
db[COLLECTION_NAME].insert(data)
db.show()
Split the PDF file into images for later result display
!pip install pdf2image
from superduper import ObjectModel, Listener, logging
from pdf2image import convert_from_path
import os
def split_image(pdf_path):
if hasattr(pdf_path, 'unpack'):
pdf_path = pdf_path.unpack()
logging.info(f"Splitting images from {pdf_path}")
image_folders = "data/pdf-images"
pdf_name = os.path.basename(pdf_path)
images = convert_from_path(pdf_path)
logging.info(f"Number of images: {len(images)}")
image_folder = os.path.join(image_folders, pdf_name)
if not os.path.exists(image_folder):
os.makedirs(image_folder)
data = []
for i, image in enumerate(images):
path = os.path.join(image_folder, f"{i}.jpg")
image.save(os.path.join(path))
data.append(path)
return data
model_split_image = ObjectModel(
identifier="split_image",
object=split_image,
datatype='file',
)
listener_split_image = Listener(
'split_image',
model=model_split_image,
key="file",
select=db[COLLECTION_NAME],
flatten=True,
)
if EAGER and APPLY:
db.apply(listener_split_image, force=True)
Build a chunks model and return chunk results with coordinate information.
def remove_sidebars(elements):
import re
from collections import defaultdict
from unstructured.documents.elements import ElementType
if not elements:
return elements
points_groups = defaultdict(list)
min_x = 99999999
max_x = 0
e2index = {e.id: i for i, e in enumerate(elements)}
for e in elements:
x_l = int(e.metadata.coordinates.points[0][0])
x_r = int(e.metadata.coordinates.points[2][0])
points_groups[(x_l, x_r)].append(e)
min_x = min(min_x, x_l)
max_x = max(max_x, x_r)
sidebars_elements = set()
for (x_l, x_r), es in points_groups.items():
first_id = e2index[es[0].id]
last_id = e2index[es[-1].id]
on_left = first_id == 0 and x_l == min_x
on_right = (last_id == len(elements) - 2) and x_r == max_x
loc_match = [on_left, on_right]
total_text = "".join(map(str, es))
condiction = [
any(loc_match),
len(es) >= 3,
re.findall("^[A-Z\s\d,]+$", total_text),
]
if not all(condiction):
continue
sidebars_elements.update(map(lambda x: x.id, es))
if on_left:
check_page_num_e = elements[last_id + 1]
else:
check_page_num_e = elements[-1]
if (
check_page_num_e.category == ElementType.UNCATEGORIZED_TEXT
and check_page_num_e.text.strip().isalnum()
):
sidebars_elements.add(check_page_num_e.id)
elements = [e for e in elements if e.id not in sidebars_elements]
return elements
def remove_annotation(elements):
from collections import Counter
from unstructured.documents.elements import ElementType
page_num = max(e.metadata.page_number for e in elements)
un_texts_counter = Counter(
[e.text for e in elements if e.category == ElementType.UNCATEGORIZED_TEXT]
)
rm_text = set()
for text, count in un_texts_counter.items():
if count / page_num >= 0.5:
rm_text.add(text)
elements = [e for e in elements if e.text not in rm_text]
return elements
def create_chunk_and_metadatas(page_elements, stride=3, window=10):
page_elements = remove_sidebars(page_elements)
for index, page_element in enumerate(page_elements):
page_element.metadata.num = index
datas = []
for i in range(0, len(page_elements), stride):
windown_elements = page_elements[i : i + window]
chunk = "\n".join([e.text for e in windown_elements])
source_elements = [e.to_dict() for e in windown_elements]
datas.append(
{
"txt": chunk,
"source_elements": source_elements,
}
)
return datas
def get_chunks(pdf):
from collections import defaultdict
from unstructured.documents.coordinates import RelativeCoordinateSystem
from unstructured.partition.pdf import partition_pdf
if hasattr(pdf, 'unpack'):
pdf = pdf.unpack()
elements = partition_pdf(pdf)
elements = remove_annotation(elements)
pages_elements = defaultdict(list)
for element in elements:
element.convert_coordinates_to_new_system(
RelativeCoordinateSystem(), in_place=True
)
pages_elements[element.metadata.page_number].append(element)
all_chunks_and_links = sum(
[
create_chunk_and_metadatas(page_elements)
for _, page_elements in pages_elements.items()
],
[],
)
return all_chunks_and_links
model_chunk = ObjectModel(
identifier="chunk",
object=get_chunks,
datatype='json',
)
listener_chunk = Listener(
'chunker',
key='file',
model=model_chunk,
select=db[COLLECTION_NAME],
flatten=True,
)
if EAGER and APPLY:
db.apply(listener_chunk, force=True)
Build a vector index for vector search
from superduper_openai import OpenAIEmbedding
openai_embedding = OpenAIEmbedding(identifier='embedding', model='text-embedding-ada-002', datatype='vector[float:1536]')
from superduper_openai.model import OpenAIEmbedding
from superduper import VectorIndex
listener_embedding = Listener(
'embedding',
model=openai_embedding,
key=f"{listener_chunk.outputs}.txt",
select=db[listener_chunk.outputs].select(),
)
vector_index = VectorIndex(
identifier="vector-index",
indexing_listener=listener_embedding,
)
if EAGER and APPLY:
db.apply(vector_index, force=True)
Create a plugin
When applying the processor, saves the plugin in the database, thereby saving the related dependencies as well.
The processor will integrate the returned chunks information with the images, and return a visualized image.
from superduper import Plugin
from utils import Processor
processor = Processor(
identifier="processor",
chunk_key=listener_chunk.outputs,
split_image_key=listener_split_image.outputs,
upstream=[Plugin(path="./utils.py")],
)
Create a RAG model
Create a RAG model to perform retrieval-augmented generation (RAG) and return the results.
from superduper_openai import OpenAIChatCompletion
llm_openai = OpenAIChatCompletion(identifier='llm-openai', model='gpt-3.5-turbo')
from superduper_openai.model import OpenAIChatCompletion
from utils import Rag
prompt_template = (
"The following is a document and question\n"
"Only provide a very concise answer\n"
"Context:\n\n"
"{context}\n\n"
"Here's the question:{query}\n"
"answer:"
)
rag = Rag(
identifier="rag",
llm_model=llm_openai,
vector_index=vector_index,
prompt_template=prompt_template,
processor=processor,
upstream=[vector_index],
)
from utils import Rag
Rag.__module__
Create template
from superduper import Application
app = Application(
'pdf-rag',
components=[
table,
listener_split_image,
listener_chunk,
vector_index,
rag
]
)
if APPLY:
db.apply(app, force=True)
from IPython.display import Image, Markdown, display
if APPLY:
db.apply(rag, force=True)
result = rag.predict("Tell me about GPT on the basis of these data.", format_result=True)
display(Markdown(result["answer"]))
for message, img in result["images"]:
display(Markdown(message))
display(img)
from superduper import Template, CFG, Table
from superduper.components.dataset import RemoteData
template = Template(
'pdf-rag',
db=db,
template=app,
substitutions={
prompt_template:
'prompt_template',
COLLECTION_NAME: 'table_name',
'gpt-3.5-turbo': 'llm_model',
'text-embedding-ada-002': 'embedding_model'
},
template_variables=['table_name', 'prompt_template', 'llm_model', 'embedding_model'],
default_tables=[Table(
'sample_pdf_rag',
fields={"url": "str", "file": 'file'},
data=RemoteData('sample_pdfs', getter=getter),
)],
types={
'prompt_template':{
'type': 'str',
'default': prompt_template
},
'table_name': {
'type': 'str',
'default': 'sample_pdf_rag'
},
'llm_model': {
'type': 'str',
'default': 'gpt-3.5-turbo',
},
'embedding_model': {
'type': 'str',
'default': 'text-embedding-ada-002',
},
}
)
template.export(".")