feat: basic implementation of DOI resolver using LLM + crossref api

This commit is contained in:
2025-07-22 15:40:11 -03:00
parent e52045bf76
commit a05cabdf54
17 changed files with 570 additions and 22 deletions

View File

@@ -1,6 +1,3 @@
# python-template
# librarian
Python Project Template
## Running:
`python3 -m src.python_template`
Librarian is a WIP indexer for scientific articles

View File

@@ -3,7 +3,7 @@ requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "python-template"
name = "librarian"
version = "0.0.0"
dynamic = ["dependencies"]
@@ -11,7 +11,7 @@ dynamic = ["dependencies"]
path = "requirements.txt"
[tool.hatch.build.targets.wheel]
packages = ["src/python_template"]
packages = ["src/librarian"]
[tool.hatch.build]
skip-excluded-dirs = true

View File

@@ -0,0 +1,5 @@
PyMuPDF
openai==1.97.1
pydantic==2.11.7
python-dotenv==1.1.1
Requests==2.32.4

View File

@@ -1,5 +1,5 @@
[tool.ruff]
line-length = 120
line-length = 160
target-version = "py312"
select = [

View File

@@ -3,6 +3,6 @@ from pathlib import Path
# Add 'src/' to sys.path manually
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from python_template.main import main
from librarian.main import main
main()

35
src/librarian/config.py Normal file
View File

@@ -0,0 +1,35 @@
from dotenv import load_dotenv
from os import getenv
DEFAULT_LIBRARY_DB_TYPE = "sqlite"
class LibrarianConfig:
def __init__(self):
load_dotenv()
@property
def database_type():
return getenv("LIBRARIAN_DB_TYPE", DEFAULT_LIBRARY_DB_TYPE)
@property
def llm_model(self):
llm_model = getenv("LIBRARIAN_LLM_MODEL", None)
if llm_model is None:
raise RuntimeError("Environment variable 'LIBRARIAN_LLM_MODEL' must be set")
return llm_model
@property
def llm_base_path(self):
llm_base_path = getenv("LIBRARIAN_LLM_BASE_PATH", None)
if llm_base_path is None:
raise RuntimeError("Environment variable 'LIBRARIAN_LLM_BASE_PATH' must be set")
return llm_base_path
@property
def llm_api_key(self):
llm_api_key = getenv("LIBRARIAN_LLM_API_KEY", None)
if llm_api_key is None:
raise RuntimeError("Environment variable 'LIBRARIAN_LLM_API_KEY' must be set")
return llm_api_key

View File

@@ -0,0 +1,36 @@
import requests
from librarian.models import PaperInfo
def search_doi(title: str, author: str) -> PaperInfo | None:
url = "https://api.crossref.org/works"
params = {
"query.title": title,
"query.author": author,
"rows": 1,
}
response = requests.get(url, params=params)
if not response.ok:
return None
items = response.json()["message"]["items"]
if len(items) <= 0:
return None
item = items[0]
paper = PaperInfo(
title=item["title"][0],
authors=[f"{a['given']} {a['family']}" for a in item.get("author", [])],
publish_date=int(*item["issued"]["date-parts"][0]),
published_in=item["container-title"][0]
if item.get("container-title")
else None,
references=[
ref.get("DOI", ref.get("article-title", "unknown reference"))
for ref in item.get("reference", [])
],
doi=item.get("DOI"),
)
return paper

View File

@@ -0,0 +1,59 @@
import json
import re
from pathlib import Path
from typing import Union
from openai import OpenAI
from librarian.readers.pdf import PDFReader
class LLMExtractor:
def __init__(self, api_key: str, model: str, base_path: str):
self._client = OpenAI(base_url=base_path, api_key=api_key)
self._model = model
def _extract_json_from_markdown(self, text: str) -> str:
# Extract content between ```json ... ```
match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
if match:
return match.group(1).strip()
return text.strip() # fallback if not wrapped
def extract(self, path: Path) -> (str, list[str]):
SYSTEM_MSG = """You are an AI assistant that extracts metadata from scientific papers. Given the full text or metadata section of a paper, extract the following fields in JSON format:
* title: Full title of the paper.
* authors: A list of all authors (e.g., "John Doe, Jane Smith, Alan Turing")."""
if isinstance(path, str):
path = Path(path)
if not path.is_file():
raise RuntimeError(f"{path.absolute()} is not a file")
file_content_list: list[str] = PDFReader(path).extract_texts()
paper_text = "\n".join(file_content_list)
response = self._client.chat.completions.create(
model=self._model,
messages=[
{"role": "system", "content": SYSTEM_MSG},
{"role": "user", "content": paper_text},
],
temperature=0.0,
)
content = response.choices[0].message.content
content = self._extract_json_from_markdown(content)
data = json.loads(content)
authors_raw: Union[str, list[str]] = data["authors"]
if isinstance(authors_raw, str):
authors = [author.strip() for author in authors_raw.split(",")]
elif isinstance(authors_raw, list):
authors = [author.strip() for author in authors_raw]
else:
raise TypeError(f"Unexpected authors format: {type(authors_raw)}")
return data["title"], authors

13
src/librarian/main.py Normal file
View File

@@ -0,0 +1,13 @@
from librarian.llm_extractor import LLMExtractor
from librarian.config import LibrarianConfig
from librarian.doi_resolver import search_doi
config = LibrarianConfig()
def main():
extractor = LLMExtractor(config.llm_api_key, config.llm_model, config.llm_base_path)
title, authors = extractor.extract("/home/brenozd/Documents/Artigos/QoS/chang2012.pdf")
print(f"Title: {title}")
print(f"Authors: {authors}")
doi = search_doi(title, authors[0])

16
src/librarian/models.py Normal file
View File

@@ -0,0 +1,16 @@
from pydantic import BaseModel, field_validator
from typing import Optional
class PaperInfo(BaseModel):
title: str
authors: list[str]
publish_date: Optional[int]
published_in: Optional[str]
doi: Optional[str]
@field_validator("authors", mode="before")
def split_authors(cls, v):
if isinstance(v, str):
return [a.strip() for a in v.split(",")]
return v

View File

View File

@@ -0,0 +1,40 @@
import abc
from pathlib import Path
from typing import List
from urllib.parse import urlparse
class IReader(metaclass=abc.ABCMeta):
def __init__(self, source: str):
self._source = source
@classmethod
def __subclasshook__(cls, subclass):
_attributes = ['extract_texts']
for attr in _attributes:
if not hasattr(subclass, attr) and not callable(getattr(subclass, attr)):
return False
return True
def _is_source_a_file(self) -> bool:
if isinstance(self._source, str):
self._source: Path = Path(self._source).absolute()
elif isinstance(self._source, Path):
self._source: Path = self._source.absolute()
if not self._source.is_file():
return False
return True
def _is_source_a_url(self) -> bool:
try:
if not isinstance(self._source, str):
self._source: str = str(self._source)
_res = urlparse(self._source)
return all([_res.scheme, _res.netloc])
except (AttributeError, UnicodeDecodeError, UnicodeEncodeError):
return False
@abc.abstractmethod
def extract_texts(self, separator='\n') -> List[str]:
"""Extract text contents from file"""
raise NotImplementedError

View File

@@ -0,0 +1,284 @@
import fitz
# Source: https://github.com/pymupdf/PyMuPDF-Utilities/blob/master/text-extraction/multi_column.py
"""
This is an advanced PyMuPDF utility for detecting multi-column pages.
It can be used in a shell script, or its main function can be imported and
invoked as descript below.
Features
---------
- Identify text belonging to (a variable number of) columns on the page.
- Text with different background color is handled separately, allowing for
easier treatment of side remarks, comment boxes, etc.
- Uses text block detection capability to identify text blocks and
uses the block bboxes as primary structuring principle.
- Supports ignoring footers via a footer margin parameter.
- Returns re-created text boundary boxes (integer coordinates), sorted ascending
by the top, then by the left coordinates.
Restrictions
-------------
- Only supporting horizontal, left-to-right text
- Returns a list of text boundary boxes - not the text itself. The caller is
expected to extract text from within the returned boxes.
- Text written above images is ignored altogether (option).
- This utility works as expected in most cases. The following situation cannot
be handled correctly:
* overlapping (non-disjoint) text blocks
* image captions are not recognized and are handled like normal text
Usage
------
- As a CLI shell command use
python multi_column.py input.pdf footer_margin
Where footer margin is the height of the bottom stripe to ignore on each page.
This code is intended to be modified according to your need.
- Use in a Python script as follows:
----------------------------------------------------------------------------------
from multi_column import column_boxes
# for each page execute
bboxes = column_boxes(page, footer_margin=50, no_image_text=True)
# bboxes is a list of fitz.IRect objects, that are sort ascending by their y0,
# then x0 coordinates. Their text content can be extracted by all PyMuPDF
# get_text() variants, like for instance the following:
for rect in bboxes:
print(page.get_text(clip=rect, sort=True))
----------------------------------------------------------------------------------
"""
def column_boxes(page, footer_margin=50, header_margin=50, no_image_text=True):
"""Determine bboxes which wrap a column."""
paths = page.get_drawings()
bboxes = []
# path rectangles
path_rects = []
# image bboxes
img_bboxes = []
# bboxes of non-horizontal text
# avoid when expanding horizontal text boxes
vert_bboxes = []
# compute relevant page area
clip = +page.rect
clip.y1 -= footer_margin # Remove footer area
clip.y0 += header_margin # Remove header area
def can_extend(temp, bb, bboxlist):
"""Determines whether rectangle 'temp' can be extended by 'bb'
without intersecting any of the rectangles contained in 'bboxlist'.
Items of bboxlist may be None if they have been removed.
Returns:
True if 'temp' has no intersections with items of 'bboxlist'.
"""
for b in bboxlist:
if not intersects_bboxes(temp, vert_bboxes) and (
b == None or b == bb or (temp & b).is_empty
):
continue
return False
return True
def in_bbox(bb, bboxes):
"""Return 1-based number if a bbox contains bb, else return 0."""
for i, bbox in enumerate(bboxes):
if bb in bbox:
return i + 1
return 0
def intersects_bboxes(bb, bboxes):
"""Return True if a bbox intersects bb, else return False."""
for bbox in bboxes:
if not (bb & bbox).is_empty:
return True
return False
def extend_right(bboxes, width, path_bboxes, vert_bboxes, img_bboxes):
"""Extend a bbox to the right page border.
Whenever there is no text to the right of a bbox, enlarge it up
to the right page border.
Args:
bboxes: (list[IRect]) bboxes to check
width: (int) page width
path_bboxes: (list[IRect]) bboxes with a background color
vert_bboxes: (list[IRect]) bboxes with vertical text
img_bboxes: (list[IRect]) bboxes of images
Returns:
Potentially modified bboxes.
"""
for i, bb in enumerate(bboxes):
# do not extend text with background color
if in_bbox(bb, path_bboxes):
continue
# do not extend text in images
if in_bbox(bb, img_bboxes):
continue
# temp extends bb to the right page border
temp = +bb
temp.x1 = width
# do not cut through colored background or images
if intersects_bboxes(temp, path_bboxes + vert_bboxes + img_bboxes):
continue
# also, do not intersect other text bboxes
check = can_extend(temp, bb, bboxes)
if check:
bboxes[i] = temp # replace with enlarged bbox
return [b for b in bboxes if b != None]
def clean_nblocks(nblocks):
"""Do some elementary cleaning."""
# 1. remove any duplicate blocks.
blen = len(nblocks)
if blen < 2:
return nblocks
start = blen - 1
for i in range(start, -1, -1):
bb1 = nblocks[i]
bb0 = nblocks[i - 1]
if bb0 == bb1:
del nblocks[i]
# 2. repair sequence in special cases:
# consecutive bboxes with almost same bottom value are sorted ascending
# by x-coordinate.
y1 = nblocks[0].y1 # first bottom coordinate
i0 = 0 # its index
i1 = -1 # index of last bbox with same bottom
# Iterate over bboxes, identifying segments with approx. same bottom value.
# Replace every segment by its sorted version.
for i in range(1, len(nblocks)):
b1 = nblocks[i]
if abs(b1.y1 - y1) > 10: # different bottom
if i1 > i0: # segment length > 1? Sort it!
nblocks[i0 : i1 + 1] = sorted(
nblocks[i0 : i1 + 1], key=lambda b: b.x0
)
y1 = b1.y1 # store new bottom value
i0 = i # store its start index
i1 = i # store current index
if i1 > i0: # segment waiting to be sorted
nblocks[i0 : i1 + 1] = sorted(nblocks[i0 : i1 + 1], key=lambda b: b.x0)
return nblocks
# extract vector graphics
for p in paths:
path_rects.append(p["rect"].irect)
path_bboxes = path_rects
# sort path bboxes by ascending top, then left coordinates
path_bboxes.sort(key=lambda b: (b.y0, b.x0))
# bboxes of images on page, no need to sort them
for item in page.get_images():
img_bboxes.extend(page.get_image_rects(item[0]))
# blocks of text on page
blocks = page.get_text(
"dict",
flags=fitz.TEXTFLAGS_TEXT,
clip=clip,
)["blocks"]
# Make block rectangles, ignoring non-horizontal text
for b in blocks:
bbox = fitz.IRect(b["bbox"]) # bbox of the block
# ignore text written upon images
if no_image_text and in_bbox(bbox, img_bboxes):
continue
# confirm first line to be horizontal
if len(b['lines']) <= 0:
continue
line0 = b["lines"][0] # get first line
if line0["dir"] != (1, 0): # only accept horizontal text
vert_bboxes.append(bbox)
continue
srect = fitz.EMPTY_IRECT()
for line in b["lines"]:
lbbox = fitz.IRect(line["bbox"])
text = "".join([s["text"].strip() for s in line["spans"]])
if len(text) > 1:
srect |= lbbox
bbox = +srect
if not bbox.is_empty:
bboxes.append(bbox)
# Sort text bboxes by ascending background, top, then left coordinates
bboxes.sort(key=lambda k: (in_bbox(k, path_bboxes), k.y0, k.x0))
# Extend bboxes to the right where possible
bboxes = extend_right(
bboxes, int(page.rect.width), path_bboxes, vert_bboxes, img_bboxes
)
# immediately return of no text found
if bboxes == []:
return []
# --------------------------------------------------------------------
# Join bboxes to establish some column structure
# --------------------------------------------------------------------
# the final block bboxes on page
nblocks = [bboxes[0]] # pre-fill with first bbox
bboxes = bboxes[1:] # remaining old bboxes
for i, bb in enumerate(bboxes): # iterate old bboxes
check = False # indicates unwanted joins
# check if bb can extend one of the new blocks
for j in range(len(nblocks)):
nbb = nblocks[j] # a new block
# never join across columns
if bb == None or nbb.x1 < bb.x0 or bb.x1 < nbb.x0:
continue
# never join across different background colors
if in_bbox(nbb, path_bboxes) != in_bbox(bb, path_bboxes):
continue
temp = bb | nbb # temporary extension of new block
check = can_extend(temp, nbb, nblocks)
if check == True:
break
if not check: # bb cannot be used to extend any of the new bboxes
nblocks.append(bb) # so add it to the list
j = len(nblocks) - 1 # index of it
temp = nblocks[j] # new bbox added
# check if some remaining bbox is contained in temp
check = can_extend(temp, bb, bboxes)
if check == False:
nblocks.append(bb)
else:
nblocks[j] = temp
bboxes[i] = None
# do some elementary cleaning
nblocks = clean_nblocks(nblocks)
# return identified text bboxes
return nblocks

View File

@@ -0,0 +1,76 @@
from librarian.readers.ireader import IReader
from librarian.readers.multi_column_pdf import column_boxes
from typing import List
import fitz
class PDFReader(IReader):
"""Read text from a PDF."""
def __init__(self, source: str) -> None:
super().__init__(source)
if not self._is_source_a_file():
raise RuntimeError(f"Source {self._source} is not a valid file")
def extract_texts(self) -> List[str]:
"""Parses only text in the PDF"""
_extracted_text = []
with fitz.open(self._source) as _document:
for page in _document:
bboxes = column_boxes(page, footer_margin=50, no_image_text=True)
for rect in bboxes:
_text: str = page.get_text(clip=rect, sort=True)
_text = self._replace_ligatures(_text)
_text = self._remove_hyphens(_text)
_extracted_text.append(_text)
return _extracted_text
# Source: https://pypdf.readthedocs.io/en/latest/user/post-processing-in-text-extraction.html
def _replace_ligatures(self, text: str) -> str:
ligatures = {
"": "ff",
"": "fi",
"": "fl",
"": "ffi",
"": "ffl",
"": "ft",
"": "st",
"": "AA",
"Æ": "AE",
"": "aa",
}
for search, replace in ligatures.items():
text = text.replace(search, replace)
return text
# Source: https://pypdf.readthedocs.io/en/latest/user/post-processing-in-text-extraction.html
def _remove_hyphens(self, text: str) -> str:
"""
This fails for:
* Natural dashes: well-known, self-replication, use-cases, non-semantic,
Post-processing, Window-wise, viewpoint-dependent
* Trailing math operands: 2 - 4
* Names: Lopez-Ferreras, VGG-19, CIFAR-100
"""
lines = [line.rstrip() for line in text.split("\n")]
# Find dashes
line_numbers = []
for line_no, line in enumerate(lines[:-1]):
if line.endswith("-"):
line_numbers.append(line_no)
# Replace
for line_no in line_numbers:
lines = self._dehyphenate(lines, line_no)
return "\n".join(lines)
def _dehyphenate(self, lines: List[str], line_no: int) -> List[str]:
next_line = lines[line_no + 1]
word_suffix = next_line.split(" ")[0]
lines[line_no] = lines[line_no][:-1] + word_suffix
lines[line_no + 1] = lines[line_no + 1][len(word_suffix) :]
return lines

View File

@@ -1,3 +0,0 @@
class AwesomeClass:
def __init__(self):
print("This is an Awesome Class!")

View File

@@ -1,10 +0,0 @@
from python_template.awesome_include import AwesomeClass
def main():
print("Python Example")
AwesomeClass()
if __name__ == "__main__":
main()