Build a GraphRAG System with GLiNER
Build a knowledge graph from your documents using a single local model, then use graph structure to improve retrieval-augmented generation -- no API keys required.
Overview
Traditional RAG retrieves document chunks by vector similarity alone, which can miss connections between concepts spread across multiple documents. GraphRAG adds a knowledge graph layer: entities and relations are extracted from text, stored as a graph, and used at query time to pull in structurally relevant context that vector search would miss.
| Component | Traditional RAG | GraphRAG |
|---|---|---|
| Indexing | Text chunks to embeddings | Text to entities + relations to knowledge graph |
| Retrieval | Vector similarity search | Graph traversal + vector similarity |
| Context | Top-k similar chunks | Connected entity neighborhoods + relevant chunks |
| Reasoning | Single-hop | Multi-hop through entity relationships |
This cookbook uses a single GLiNER model that extracts both entities and relations in one pass:
- Model:
knowledgator/gliner-relex-large-v0.5 - API:
model.inference()withreturn_relations=True
Installation
pip install gliner networkx numpy
For optional Neo4j integration:
pip install neo4j
Quick Start
Extract entities and relations from a text passage in a single call:
from gliner import GLiNER
model = GLiNER.from_pretrained("knowledgator/gliner-relex-large-v0.5")
text = """
Acme Corporation was founded by John Smith in 2010. The company is
headquartered in San Francisco and specializes in AI technology.
"""
entity_labels = ["person", "organization", "technology", "location", "date"]
relation_labels = ["founded_by", "founded in", "located_in", "specializes_in"]
entities, relations = model.inference(
texts=[text],
labels=entity_labels,
relations=relation_labels,
threshold=0.5,
relation_threshold=0.5,
return_relations=True,
flat_ner=False,
)
print("Entities:")
for e in entities[0]:
print(f" [{e['label']}] {e['text']} (score: {e['score']:.2f})")
print("\nRelations:")
for r in relations[0]:
print(f" {r['head']['text']} --[{r['relation']}]--> {r['tail']['text']} (score: {r['score']:.2f})")
Define Your Knowledge Schema
Entity and relation types determine what knowledge gets captured. Define them based on your domain.
General Knowledge
ENTITY_TYPES = [
"person", "organization", "location",
"event", "concept", "product", "technology", "date"
]
RELATION_TYPES = [
"works_for", "founded_by", "located_in",
"part_of", "related_to", "created_by", "uses"
]
Technical Documentation
ENTITY_TYPES = [
"software_component", "api_endpoint", "configuration",
"error_type", "feature", "version", "framework"
]
RELATION_TYPES = [
"depends_on", "implements", "extends",
"calls", "configured_by", "deprecated_in"
]
Research Literature
ENTITY_TYPES = [
"researcher", "institution", "methodology",
"finding", "dataset", "metric", "theory"
]
RELATION_TYPES = [
"authored_by", "affiliated_with", "cites",
"supports", "contradicts", "uses_method", "proposes"
]
Extract Entities and Relations
Build a processor that extracts entities and relations in a single pass, and deduplicates entities across a corpus:
from gliner import GLiNER
from dataclasses import dataclass, field
from typing import List, Dict, Set
import hashlib
@dataclass
class Entity:
"""An extracted entity."""
id: str
text: str
entity_type: str
score: float
source_doc: str
mentions: List[dict] = field(default_factory=list)
@staticmethod
def generate_id(text: str, entity_type: str) -> str:
normalized = text.lower().strip()
return hashlib.md5(f"{entity_type}:{normalized}".encode()).hexdigest()[:12]
@dataclass
class Relation:
"""A relation extracted between entities."""
head_text: str
relation_type: str
tail_text: str
score: float
source_doc: str
@dataclass
class Document:
"""A source document."""
id: str
content: str
metadata: Dict = field(default_factory=dict)
class GraphExtractor:
"""Extract entities and relations from documents using GLiNER-relex."""
def __init__(
self,
entity_types: List[str],
relation_types: List[str],
entity_threshold: float = 0.5,
relation_threshold: float = 0.5,
):
self.model = GLiNER.from_pretrained("knowledgator/gliner-relex-large-v0.5")
self.entity_types = entity_types
self.relation_types = relation_types
self.entity_threshold = entity_threshold
self.relation_threshold = relation_threshold
self.entity_index: Dict[str, Entity] = {}
def extract_from_document(self, doc: Document) -> tuple[List[Entity], List[Relation]]:
"""Extract entities and relations from a single document in one pass."""
entities_raw, relations_raw = self.model.inference(
texts=[doc.content],
labels=self.entity_types,
relations=self.relation_types,
threshold=self.entity_threshold,
relation_threshold=self.relation_threshold,
return_relations=True,
flat_ner=False,
)
# Process entities with deduplication
entities = []
for item in entities_raw[0]:
entity_id = Entity.generate_id(item["text"], item["label"])
mention = {"doc_id": doc.id, "score": item["score"]}
if entity_id in self.entity_index:
existing = self.entity_index[entity_id]
existing.mentions.append(mention)
entities.append(existing)
else:
entity = Entity(
id=entity_id,
text=item["text"],
entity_type=item["label"],
score=item["score"],
source_doc=doc.id,
mentions=[mention],
)
self.entity_index[entity_id] = entity
entities.append(entity)
# Process relations
relations = []
for item in relations_raw[0]:
relation = Relation(
head_text=item["head"]["text"],
relation_type=item["relation"],
tail_text=item["tail"]["text"],
score=item["score"],
source_doc=doc.id,
)
relations.append(relation)
return entities, relations
def extract_from_corpus(
self, documents: List[Document]
) -> tuple[Dict[str, List[Entity]], List[Relation]]:
"""Extract entities and relations from multiple documents."""
doc_entities = {}
all_relations = []
for doc in documents:
entities, relations = self.extract_from_document(doc)
doc_entities[doc.id] = entities
all_relations.extend(relations)
print(
f"Extracted {len(entities)} entities and "
f"{len(relations)} relations from {doc.id}"
)
return doc_entities, all_relations
def get_all_entities(self) -> List[Entity]:
return list(self.entity_index.values())
Usage:
ENTITY_TYPES = ["person", "organization", "location", "technology", "date"]
RELATION_TYPES = ["works_for", "founded_by", "located_in", "part_of", "created_by", "uses"]
extractor = GraphExtractor(
entity_types=ENTITY_TYPES,
relation_types=RELATION_TYPES,
entity_threshold=0.5,
relation_threshold=0.5,
)
documents = [
Document(
id="doc_001",
content="Acme Corporation was founded by John Smith in 2010. "
"The company is headquartered in San Francisco and specializes in AI technology. "
"Sarah Johnson joined as CTO in 2015 and led the development of AcmeAI.",
),
Document(
id="doc_002",
content="In 2023, Acme Corporation announced a partnership with TechGiant Inc. "
"John Smith and TechGiant's CEO, Michael Chen, signed the agreement in New York. "
"The partnership focuses on enterprise AI solutions.",
),
]
doc_entities, relations = extractor.extract_from_corpus(documents)
print(f"\nTotal unique entities: {len(extractor.get_all_entities())}")
for entity in extractor.get_all_entities():
print(f" [{entity.entity_type}] {entity.text} (mentions: {len(entity.mentions)})")
print(f"\nTotal relations: {len(relations)}")
for rel in relations:
print(f" {rel.head_text} --[{rel.relation_type}]--> {rel.tail_text}")
Build the Knowledge Graph
Construct a graph from extracted entities and relations using NetworkX:
import networkx as nx
import json
class KnowledgeGraph:
"""In-memory knowledge graph using NetworkX."""
def __init__(self):
self.graph = nx.MultiDiGraph()
self.entity_text_index: Dict[str, List[str]] = {}
self.doc_index: Dict[str, Set[str]] = {}
def add_entity(self, entity: Entity):
"""Add an entity as a node."""
self.graph.add_node(
entity.id,
text=entity.text,
entity_type=entity.entity_type,
score=entity.score,
mentions=entity.mentions,
)
normalized = entity.text.lower().strip()
self.entity_text_index.setdefault(normalized, []).append(entity.id)
for mention in entity.mentions:
self.doc_index.setdefault(mention["doc_id"], set()).add(entity.id)
def add_relation(self, relation: Relation):
"""Add a relation as an edge between two entities."""
source_ids = self.find_entity(relation.head_text)
target_ids = self.find_entity(relation.tail_text)
if source_ids and target_ids:
self.graph.add_edge(
source_ids[0],
target_ids[0],
relation_type=relation.relation_type,
score=relation.score,
source_doc=relation.source_doc,
)
def find_entity(self, text: str) -> List[str]:
"""Find entity IDs by text (case-insensitive)."""
return self.entity_text_index.get(text.lower().strip(), [])
def get_neighbors(
self, entity_id: str, depth: int = 1, relation_types: List[str] = None
) -> Set[str]:
"""Get entity IDs reachable within N hops."""
if entity_id not in self.graph:
return set()
neighbors = set()
current_level = {entity_id}
for _ in range(depth):
next_level = set()
for node in current_level:
for _, target, data in self.graph.out_edges(node, data=True):
if relation_types is None or data["relation_type"] in relation_types:
next_level.add(target)
for source, _, data in self.graph.in_edges(node, data=True):
if relation_types is None or data["relation_type"] in relation_types:
next_level.add(source)
neighbors.update(next_level)
current_level = next_level - neighbors
neighbors.discard(entity_id)
return neighbors
def get_subgraph(self, entity_ids: Set[str]) -> nx.MultiDiGraph:
"""Extract a subgraph containing the specified entities."""
return self.graph.subgraph(entity_ids).copy()
def find_paths(
self, source_id: str, target_id: str, max_length: int = 3
) -> List[List[str]]:
"""Find all simple paths between two entities."""
if source_id not in self.graph or target_id not in self.graph:
return []
try:
return list(
nx.all_simple_paths(self.graph, source_id, target_id, cutoff=max_length)
)
except nx.NetworkXNoPath:
return []
def get_entity_context(self, entity_id: str) -> dict:
"""Get full context for an entity including its relations."""
if entity_id not in self.graph:
return None
node = self.graph.nodes[entity_id]
outgoing = []
for _, target, data in self.graph.out_edges(entity_id, data=True):
t = self.graph.nodes[target]
outgoing.append({
"relation": data["relation_type"],
"target": t["text"],
"target_type": t["entity_type"],
})
incoming = []
for source, _, data in self.graph.in_edges(entity_id, data=True):
s = self.graph.nodes[source]
incoming.append({
"relation": data["relation_type"],
"source": s["text"],
"source_type": s["entity_type"],
})
return {
"id": entity_id,
"text": node["text"],
"type": node["entity_type"],
"outgoing_relations": outgoing,
"incoming_relations": incoming,
}
def get_stats(self) -> dict:
return {
"num_entities": self.graph.number_of_nodes(),
"num_relations": self.graph.number_of_edges(),
"avg_degree": (
sum(dict(self.graph.degree()).values())
/ max(1, self.graph.number_of_nodes())
),
}
def export_to_json(self, filepath: str):
data = {
"nodes": [{"id": n, **d} for n, d in self.graph.nodes(data=True)],
"edges": [
{"source": u, "target": v, **d}
for u, v, d in self.graph.edges(data=True)
],
}
with open(filepath, "w") as f:
json.dump(data, f, indent=2, default=str)
Build the graph from extraction results:
knowledge_graph = KnowledgeGraph()
for entity in extractor.get_all_entities():
knowledge_graph.add_entity(entity)
for relation in relations:
knowledge_graph.add_relation(relation)
stats = knowledge_graph.get_stats()
print(f"Graph: {stats['num_entities']} entities, {stats['num_relations']} relations")
Graph-Based Retrieval
Create a retriever that uses graph structure to expand context beyond what vector search alone would find:
from gliner import GLiNER
from dataclasses import dataclass
from typing import List, Set
@dataclass
class RetrievalResult:
"""Result from graph-based retrieval."""
query_entities: List[dict]
expanded_entities: List[dict]
relevant_relations: List[dict]
context_documents: List[str]
class GraphRetriever:
"""Retrieve relevant context using knowledge graph traversal."""
def __init__(
self,
knowledge_graph: KnowledgeGraph,
entity_types: List[str],
relation_types: List[str],
expansion_depth: int = 2,
):
self.model = GLiNER.from_pretrained("knowledgator/gliner-relex-large-v0.5")
self.knowledge_graph = knowledge_graph
self.entity_types = entity_types
self.relation_types = relation_types
self.expansion_depth = expansion_depth
def retrieve(
self, query: str, top_k: int = 5, expansion_depth: int = None
) -> RetrievalResult:
"""
Retrieve context for a query:
1. Extract entities from query
2. Match them in the graph
3. Expand to neighbors
4. Collect relations and source docs
"""
depth = expansion_depth or self.expansion_depth
# Extract entities from query (lower threshold to catch more)
query_entities_raw, _ = self.model.inference(
texts=[query],
labels=self.entity_types,
relations=self.relation_types,
threshold=0.3,
relation_threshold=0.3,
return_relations=True,
flat_ner=False,
)
query_entities = []
matched_ids = set()
for item in query_entities_raw[0]:
matches = self.knowledge_graph.find_entity(item["text"])
query_entities.append({
"text": item["text"],
"type": item["label"],
"graph_matches": matches,
})
matched_ids.update(matches)
# Expand neighborhood
expanded_ids = set(matched_ids)
for entity_id in matched_ids:
neighbors = self.knowledge_graph.get_neighbors(entity_id, depth=depth)
expanded_ids.update(neighbors)
# Gather entity details
expanded_entities = []
for entity_id in expanded_ids:
ctx = self.knowledge_graph.get_entity_context(entity_id)
if ctx:
ctx["is_query_match"] = entity_id in matched_ids
expanded_entities.append(ctx)
expanded_entities.sort(
key=lambda e: (
-int(e["is_query_match"]),
-(len(e["outgoing_relations"]) + len(e["incoming_relations"])),
)
)
# Collect relations from the subgraph
subgraph = self.knowledge_graph.get_subgraph(expanded_ids)
relevant_relations = []
for source, target, data in subgraph.edges(data=True):
s = subgraph.nodes[source]
t = subgraph.nodes[target]
relevant_relations.append({
"source": s["text"],
"relation": data["relation_type"],
"target": t["text"],
})
# Find source documents
context_docs = set()
for entity_id in expanded_ids:
node = self.knowledge_graph.graph.nodes.get(entity_id, {})
for mention in node.get("mentions", []):
context_docs.add(mention["doc_id"])
return RetrievalResult(
query_entities=query_entities,
expanded_entities=expanded_entities[:top_k],
relevant_relations=relevant_relations,
context_documents=list(context_docs),
)
def format_context(self, result: RetrievalResult) -> str:
"""Format retrieval result as context for an LLM."""
parts = []
if result.expanded_entities:
parts.append("## Relevant Entities\n")
for entity in result.expanded_entities:
marker = "-> " if entity.get("is_query_match") else " "
parts.append(f"{marker}**{entity['text']}** ({entity['type']})")
if result.relevant_relations:
parts.append("\n## Known Relationships\n")
for rel in result.relevant_relations:
parts.append(f"- {rel['source']} --[{rel['relation']}]--> {rel['target']}")
return "\n".join(parts)
Usage:
retriever = GraphRetriever(
knowledge_graph=knowledge_graph,
entity_types=ENTITY_TYPES,
relation_types=RELATION_TYPES,
expansion_depth=2,
)
result = retriever.retrieve("Who founded Acme Corporation?")
print("Query entities:")
for e in result.query_entities:
print(f" [{e['type']}] {e['text']} -> graph matches: {e['graph_matches']}")
print("\nExpanded entities:")
for e in result.expanded_entities:
marker = "(direct match)" if e.get("is_query_match") else "(expanded)"
print(f" {e['text']} ({e['type']}) {marker}")
print("\nRelations:")
for rel in result.relevant_relations:
print(f" {rel['source']} --[{rel['relation']}]--> {rel['target']}")
print("\nSource documents:", result.context_documents)
# Format as LLM context
context = retriever.format_context(result)
print("\n" + context)
Complete GraphRAG Pipeline
Combine document indexing, graph construction, and retrieval into a single pipeline:
from dataclasses import dataclass
from typing import List, Callable, Optional
from datetime import datetime
@dataclass
class GraphRAGResponse:
"""Response from the GraphRAG pipeline."""
query: str
answer: str
entities_found: List[dict]
relations_used: List[dict]
source_documents: List[dict]
reasoning_path: List[str]
confidence: float
processing_time_ms: float
class GraphRAGPipeline:
"""Complete GraphRAG pipeline for knowledge-grounded Q&A."""
def __init__(
self,
entity_types: List[str],
relation_types: List[str],
llm_fn: Optional[Callable[[str, str], str]] = None,
entity_threshold: float = 0.5,
relation_threshold: float = 0.5,
):
self.entity_types = entity_types
self.relation_types = relation_types
self.llm_fn = llm_fn
# Initialize extractor (single model for entities + relations)
self.extractor = GraphExtractor(
entity_types=entity_types,
relation_types=relation_types,
entity_threshold=entity_threshold,
relation_threshold=relation_threshold,
)
# Initialize graph and retriever
self.knowledge_graph = KnowledgeGraph()
self.retriever = GraphRetriever(
knowledge_graph=self.knowledge_graph,
entity_types=entity_types,
relation_types=relation_types,
)
self.document_store: Dict[str, Document] = {}
def index_document(self, doc: Document) -> dict:
"""Index a single document: extract entities and relations, add to graph."""
self.document_store[doc.id] = doc
entities, relations = self.extractor.extract_from_document(doc)
for entity in entities:
self.knowledge_graph.add_entity(entity)
for relation in relations:
self.knowledge_graph.add_relation(relation)
return {
"doc_id": doc.id,
"entities_extracted": len(entities),
"relations_extracted": len(relations),
}
def index_corpus(self, documents: List[Document]) -> dict:
"""Index multiple documents."""
results = []
for doc in documents:
result = self.index_document(doc)
results.append(result)
print(
f"Indexed {doc.id}: {result['entities_extracted']} entities, "
f"{result['relations_extracted']} relations"
)
return {
"documents_indexed": len(documents),
"total_entities": self.knowledge_graph.graph.number_of_nodes(),
"total_relations": self.knowledge_graph.graph.number_of_edges(),
}
def query(
self,
question: str,
max_entities: int = 10,
expansion_depth: int = 2,
) -> GraphRAGResponse:
"""Answer a question using the GraphRAG system."""
start_time = datetime.now()
# Retrieve graph context
retrieval = self.retriever.retrieve(
question, top_k=max_entities, expansion_depth=expansion_depth
)
# Build reasoning path
reasoning_path = []
if retrieval.query_entities:
reasoning_path.append(
f"Identified entities in question: "
f"{[e['text'] for e in retrieval.query_entities]}"
)
matched = sum(1 for e in retrieval.expanded_entities if e.get("is_query_match"))
if matched:
reasoning_path.append(f"Found {matched} matching entities in knowledge graph")
if len(retrieval.expanded_entities) > matched:
reasoning_path.append(
f"Expanded to {len(retrieval.expanded_entities)} related entities"
)
if retrieval.relevant_relations:
reasoning_path.append(
f"Found {len(retrieval.relevant_relations)} relevant relationships"
)
# Generate answer
context = self.retriever.format_context(retrieval)
answer = self._generate_answer(question, context)
# Confidence based on graph coverage
entity_count = len(retrieval.query_entities) or 1
match_rate = matched / entity_count
relation_score = min(1.0, len(retrieval.relevant_relations) / 5)
confidence = round(0.5 * match_rate + 0.5 * relation_score, 2)
processing_time = (datetime.now() - start_time).total_seconds() * 1000
# Gather source docs
source_docs = []
for doc_id in retrieval.context_documents:
if doc_id in self.document_store:
content = self.document_store[doc_id].content
source_docs.append({
"id": doc_id,
"excerpt": content[:300].strip(),
})
return GraphRAGResponse(
query=question,
answer=answer,
entities_found=[
{"text": e["text"], "type": e["type"]}
for e in retrieval.expanded_entities
],
relations_used=retrieval.relevant_relations,
source_documents=source_docs,
reasoning_path=reasoning_path,
confidence=confidence,
processing_time_ms=processing_time,
)
def _generate_answer(self, question: str, context: str) -> str:
if self.llm_fn:
system = (
"You are a helpful assistant that answers questions based on "
"the provided knowledge graph context. Use only the information "
"given. If the context is insufficient, say so."
)
user = f"Context:\n{context}\n\nQuestion: {question}\n\nAnswer:"
return self.llm_fn(system, user)
return f"[No LLM configured] Graph context:\n{context}"
Usage:
ENTITY_TYPES = ["person", "organization", "location", "technology", "date"]
RELATION_TYPES = ["works_for", "founded_by", "located_in", "part_of", "created_by", "uses"]
pipeline = GraphRAGPipeline(
entity_types=ENTITY_TYPES,
relation_types=RELATION_TYPES,
# llm_fn=your_llm_function, # (system_prompt, user_prompt) -> str
)
# Index documents
documents = [
Document(
id="doc_001",
content="Acme Corporation was founded by John Smith in 2010. "
"The company is headquartered in San Francisco and specializes in AI technology. "
"Sarah Johnson joined as CTO in 2015 and led the development of AcmeAI.",
),
Document(
id="doc_002",
content="In 2023, Acme Corporation announced a partnership with TechGiant Inc. "
"John Smith and TechGiant's CEO, Michael Chen, signed the agreement in New York. "
"The partnership focuses on enterprise AI solutions.",
),
Document(
id="doc_003",
content="TechGiant Inc., based in Seattle, is a global leader in cloud computing. "
"Michael Chen became CEO in 2018 after serving as VP of Engineering. "
"The company's flagship product, CloudScale, powers over 10,000 enterprises.",
),
]
index_stats = pipeline.index_corpus(documents)
print(f"Indexed: {index_stats}")
# Query the graph
response = pipeline.query("Who founded Acme Corporation?")
print(f"\nQ: {response.query}")
print(f"A: {response.answer}")
print(f"Confidence: {response.confidence}")
print(f"Time: {response.processing_time_ms:.0f}ms")
print(f"\nReasoning path:")
for step in response.reasoning_path:
print(f" - {step}")
print(f"\nEntities found:")
for e in response.entities_found:
print(f" [{e['type']}] {e['text']}")
print(f"\nRelations used:")
for rel in response.relations_used:
print(f" {rel['source']} --[{rel['relation']}]--> {rel['target']}")
print(f"\nSource documents:")
for doc in response.source_documents:
print(f" {doc['id']}: {doc['excerpt'][:80]}...")
# Multi-hop query
response2 = pipeline.query(
"What is the relationship between John Smith and Michael Chen?"
)
print(f"\nQ: {response2.query}")
print(f"A: {response2.answer}")
print(f"Confidence: {response2.confidence}")
# Export the knowledge graph
pipeline.knowledge_graph.export_to_json("knowledge_graph.json")
print("\nGraph exported to knowledge_graph.json")
print(f"Graph stats: {pipeline.knowledge_graph.get_stats()}")
Neo4j Integration (Optional)
For production-scale graphs, swap NetworkX for Neo4j:
from neo4j import GraphDatabase
class Neo4jKnowledgeGraph:
"""Knowledge graph backed by Neo4j."""
def __init__(self, uri: str, user: str, password: str):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def close(self):
self.driver.close()
def add_entity(self, entity: Entity):
with self.driver.session() as session:
session.execute_write(
lambda tx: tx.run(
"""
MERGE (e:Entity {id: $id})
SET e.text = $text,
e.entity_type = $entity_type,
e.score = $score
""",
id=entity.id,
text=entity.text,
entity_type=entity.entity_type,
score=entity.score,
)
)
def add_relation(self, relation: Relation):
with self.driver.session() as session:
session.execute_write(
lambda tx: tx.run(
"""
MATCH (source:Entity), (target:Entity)
WHERE toLower(source.text) = toLower($head)
AND toLower(target.text) = toLower($tail)
MERGE (source)-[r:RELATES_TO {type: $rel_type}]->(target)
SET r.score = $score
""",
head=relation.head_text,
tail=relation.tail_text,
rel_type=relation.relation_type,
score=relation.score,
)
)
def get_neighbors(self, entity_id: str, depth: int = 2) -> List[dict]:
with self.driver.session() as session:
result = session.execute_read(
lambda tx: tx.run(
f"""
MATCH (start:Entity {{id: $entity_id}})
MATCH path = (start)-[*1..{depth}]-(neighbor:Entity)
WHERE neighbor.id <> start.id
RETURN DISTINCT neighbor.id AS id,
neighbor.text AS text,
neighbor.entity_type AS type,
length(path) AS distance
ORDER BY distance
""",
entity_id=entity_id,
).data()
)
return result
def find_paths(self, source_text: str, target_text: str, max_length: int = 3):
with self.driver.session() as session:
result = session.execute_read(
lambda tx: tx.run(
f"""
MATCH (source:Entity), (target:Entity)
WHERE toLower(source.text) = toLower($source)
AND toLower(target.text) = toLower($target)
MATCH path = shortestPath((source)-[*..{max_length}]-(target))
RETURN [n IN nodes(path) | n.text] AS path
""",
source=source_text,
target=target_text,
).data()
)
return result
Usage:
neo4j_graph = Neo4jKnowledgeGraph(
uri="bolt://localhost:7687", user="neo4j", password="password"
)
for entity in extractor.get_all_entities():
neo4j_graph.add_entity(entity)
for relation in relations:
neo4j_graph.add_relation(relation)
neighbors = neo4j_graph.get_neighbors("entity_id_here", depth=2)
paths = neo4j_graph.find_paths("John Smith", "Acme Corporation")
neo4j_graph.close()
Best Practices
Design specific schemas. Use precise, non-overlapping entity types like software_library and api_function rather than vague labels like thing or code. Better schemas produce cleaner graphs.
Tune thresholds by stage. Use a higher threshold (0.5-0.7) during indexing to keep the graph clean, and a lower threshold (0.3) during query parsing to catch more entities from short user questions.
Implement entity resolution. The same entity may appear with different surface forms ("John", "John Smith", "J. Smith"). Use normalization and fuzzy matching to merge them:
from difflib import get_close_matches
def resolve_entity(text: str, known_entities: dict, cutoff: float = 0.8) -> str:
normalized = text.lower().strip()
if normalized in known_entities:
return known_entities[normalized]
matches = get_close_matches(normalized, known_entities.keys(), n=1, cutoff=cutoff)
return known_entities[matches[0]] if matches else text
Monitor graph quality. Track the ratio of isolated nodes (entities with no relations). A high isolation rate suggests your relation extraction threshold is too high or your relation types are too narrow.
Batch processing for large corpora. Process documents in batches to manage memory. The GLiNER-relex model loads once and handles both entity and relation extraction, so memory usage is lower than running two separate models.
Limitations and Considerations
-
Model resource usage: The GLiNER-relex model runs locally and requires approximately 1-2 GB of RAM/VRAM. Since a single model handles both entities and relations, memory usage is lower than a two-model approach.
-
Extraction accuracy: Entity and relation extraction quality depends on text clarity and how well your schema matches the domain. Domain-specific fine-tuning may improve results.
-
Graph scalability: In-memory graphs (NetworkX) work well for up to roughly 100K entities. For larger graphs, use Neo4j or a similar graph database.
-
Context window limits: When assembling context for LLMs, the subgraph context can grow large. Implement truncation or summarization for dense graph neighborhoods.
-
Relation sparsity: Not all entity pairs have explicit textual evidence for relations. Consider relation inference techniques for denser graphs.
-
Incremental updates: Adding new documents requires extraction and potential entity resolution against existing graph nodes. Design your pipeline for incremental indexing from the start.
Next Steps
- Explore GLiNER model fine-tuning to improve extraction accuracy on your specific domain
- Add vector-based retrieval alongside graph retrieval for hybrid search
- Implement graph visualization using libraries like
pyvisormatplotlibto inspect your knowledge graph - Scale to production with Neo4j and batch processing pipelines