Parse Enterprise Search Queries with GLiNER
Turn natural language search queries into structured filters using GLiNER, a local open-source entity recognition framework. No API keys, no external services -- just a model running on your machine.
Overview
Enterprise users search with queries like "Show me John's invoices from last quarter over $5000." Behind that sentence are structured components: a person, a document type, a date range, and a numeric threshold. This cookbook shows how to extract those components locally with GLiNER and convert them into queries for Elasticsearch, Solr, or SQL.
What you will build:
- A query parser that extracts typed entities from free-text search input
- A filter generator that maps entities to search index fields
- Output adapters for Elasticsearch DSL, Solr syntax, and SQL WHERE clauses
- Disambiguation logic for ambiguous entity matches
- A complete pipeline tying it all together
Installation
pip install gliner
GLiNER runs locally. A GPU is recommended for production throughput but not required -- CPU inference works fine for interactive use.
Quick Start
Parse a search query in about 15 lines:
from gliner import GLiNER
model = GLiNER.from_pretrained("knowledgator/gliner-multitask-large-v0.5")
query = "Find all contracts with Acme Corp from 2023"
labels = ["person", "organization", "document_type", "date", "location"]
entities = model.predict_entities(query, labels, threshold=0.5)
for e in entities:
print(f" {e['label']:15s} {e['text']:20s} (score: {e['score']:.2f})")
# Output:
# document_type contracts (score: 0.91)
# organization Acme Corp (score: 0.88)
# date 2023 (score: 0.85)
Each item in entities is a dict with keys text, label, start, end, and score.
Define Searchable Entity Types
The labels you pass to predict_entities determine what the model looks for. Tailor them to your domain.
General Enterprise Search
GENERAL_LABELS = [
"person", "organization", "department", "document_type",
"project", "date", "date_range", "location", "product", "status",
]
E-Commerce Product Search
ECOMMERCE_LABELS = [
"product_name", "brand", "category", "color",
"size", "price_range", "material", "feature", "rating_filter",
]
Document Management
DOCUMENT_LABELS = [
"document_type", "author", "owner", "creation_date",
"modification_date", "department", "project",
"classification", "file_format", "keyword",
]
CRM and Customer Data
CRM_LABELS = [
"customer_name", "company", "contact_email", "deal_stage",
"deal_value", "date_range", "sales_rep",
"industry", "region", "product_interest",
]
Basic Query Parsing
Group extracted entities by their label for downstream processing:
from gliner import GLiNER
model = GLiNER.from_pretrained("knowledgator/gliner-multitask-large-v0.5")
def parse_search_query(
query: str,
labels: list[str] | None = None,
threshold: float = 0.5,
) -> dict[str, list[dict]]:
"""Extract entities from a search query and group them by type."""
labels = labels or GENERAL_LABELS
entities = model.predict_entities(query, labels, threshold=threshold)
grouped: dict[str, list[dict]] = {}
for e in entities:
grouped.setdefault(e["label"], []).append({
"value": e["text"],
"score": e["score"],
"start": e["start"],
"end": e["end"],
})
return grouped
# Try it out
result = parse_search_query("Find invoices from Acme Corp over $5000 from last quarter")
for label, mentions in result.items():
for m in mentions:
print(f" [{label}] {m['value']} (score: {m['score']:.2f})")
Build a Search Filter Generator
Data Structures
from dataclasses import dataclass, field
from typing import Any
@dataclass
class SearchFilter:
"""A single structured filter derived from an entity."""
field: str
operator: str # eq, gt, lt, gte, lte, range
value: Any
boost: float = 1.0
@dataclass
class ParsedQuery:
"""The full result of parsing a search query."""
original_query: str
keywords: list[str]
filters: list[SearchFilter]
entities: dict[str, list[dict]]
Entity-to-Filter Mapping
# Maps entity labels to search index field names.
DEFAULT_FIELD_MAP: dict[str, str] = {
"person": "author",
"organization": "company",
"department": "department",
"document_type": "type",
"project": "project_id",
"date": "created_at",
"date_range": "created_at",
"location": "location",
"product": "product_name",
"status": "status",
}
Date Parsing
import re
from datetime import datetime, timedelta
def parse_date_value(date_str: str) -> tuple[Any, str]:
"""Convert a date entity into a filter value and operator."""
low = date_str.lower()
relative_days = {
"last week": 7,
"last month": 30,
"last quarter": 90,
"last year": 365,
}
for phrase, days in relative_days.items():
if phrase in low:
start = datetime.now() - timedelta(days=days)
return {"start": start.isoformat(), "end": datetime.now().isoformat()}, "range"
year_match = re.match(r"^(\d{4})$", date_str)
if year_match:
year = int(year_match.group(1))
return {"start": f"{year}-01-01T00:00:00", "end": f"{year}-12-31T23:59:59"}, "range"
return date_str, "eq"
Numeric Filter Parsing
def parse_numeric_filter(value: str) -> tuple[Any, str]:
"""Extract a number and comparison operator from text like 'over $5000'."""
numbers = re.findall(r"[\d]+\.?\d*", value.replace(",", ""))
if not numbers:
return value, "eq"
num = float(numbers[0])
low = value.lower()
if any(w in low for w in [">", "over", "more than", "above"]):
return num, "gt"
if any(w in low for w in ["<", "under", "less than", "below"]):
return num, "lt"
if any(w in low for w in [">=", "at least"]):
return num, "gte"
if any(w in low for w in ["<=", "at most"]):
return num, "lte"
return num, "eq"
def is_numeric_filter(value: str) -> bool:
return any(ind in value.lower() for ind in ["$", ">", "<", "over", "under", "more than", "less than"])
Keyword Extraction
STOPWORDS = {"find", "show", "get", "all", "the", "from", "with", "me", "my", "for", "and", "in", "of"}
def extract_keywords(query: str, entities: list[dict]) -> list[str]:
"""Return words from the query that were not captured as entities."""
remaining = query
for e in sorted(entities, key=lambda x: x["start"], reverse=True):
remaining = remaining[:e["start"]] + remaining[e["end"]:]
return [w for w in remaining.split() if w.lower() not in STOPWORDS and len(w) > 2]
Putting It Together
def build_parsed_query(
query: str,
labels: list[str] | None = None,
field_map: dict[str, str] | None = None,
threshold: float = 0.5,
) -> ParsedQuery:
"""Parse a query into keywords and typed filters."""
labels = labels or GENERAL_LABELS
field_map = field_map or DEFAULT_FIELD_MAP
raw_entities = model.predict_entities(query, labels, threshold=threshold)
# Group by label
grouped: dict[str, list[dict]] = {}
for e in raw_entities:
grouped.setdefault(e["label"], []).append({
"value": e["text"],
"score": e["score"],
"start": e["start"],
"end": e["end"],
})
# Convert to filters
filters: list[SearchFilter] = []
for label, mentions in grouped.items():
index_field = field_map.get(label, label)
for m in mentions:
value = m["value"]
operator = "eq"
if label in ("date", "date_range"):
value, operator = parse_date_value(value)
elif is_numeric_filter(value):
value, operator = parse_numeric_filter(value)
filters.append(SearchFilter(
field=index_field,
operator=operator,
value=value,
boost=m["score"],
))
keywords = extract_keywords(query, raw_entities)
return ParsedQuery(
original_query=query,
keywords=keywords,
filters=filters,
entities=grouped,
)
# Example
parsed = build_parsed_query("Find invoices from Acme Corp over $5000 from last quarter")
print(f"Keywords: {parsed.keywords}")
for f in parsed.filters:
print(f" {f.field} {f.operator} {f.value} (boost: {f.boost:.2f})")
Generate Search Engine Queries
Elasticsearch DSL
def to_elasticsearch(parsed: ParsedQuery) -> dict:
"""Convert a ParsedQuery to an Elasticsearch bool query."""
must: list[dict] = []
filters: list[dict] = []
if parsed.keywords:
must.append({
"multi_match": {
"query": " ".join(parsed.keywords),
"fields": ["title^2", "content", "description"],
"type": "best_fields",
}
})
for f in parsed.filters:
if f.operator == "eq":
filters.append({"term": {f.field: f.value}})
elif f.operator == "range":
filters.append({"range": {f.field: {"gte": f.value["start"], "lte": f.value["end"]}}})
elif f.operator in ("gt", "lt", "gte", "lte"):
filters.append({"range": {f.field: {f.operator: f.value}}})
return {
"query": {
"bool": {
"must": must or [{"match_all": {}}],
"filter": filters,
}
}
}
Solr Syntax
def to_solr(parsed: ParsedQuery) -> str:
"""Convert a ParsedQuery to a Solr query string."""
parts: list[str] = []
if parsed.keywords:
parts.append(f"({' '.join(parsed.keywords)})")
for f in parsed.filters:
if f.operator == "eq":
parts.append(f'{f.field}:"{f.value}"')
elif f.operator == "range":
parts.append(f'{f.field}:[{f.value["start"]} TO {f.value["end"]}]')
elif f.operator == "gt":
parts.append(f"{f.field}:{{{f.value} TO *}}")
elif f.operator == "lt":
parts.append(f"{f.field}:{{* TO {f.value}}}")
elif f.operator == "gte":
parts.append(f"{f.field}:[{f.value} TO *]")
elif f.operator == "lte":
parts.append(f"{f.field}:[* TO {f.value}]")
return " AND ".join(parts) if parts else "*:*"
SQL WHERE Clause
def to_sql_where(parsed: ParsedQuery) -> tuple[str, list]:
"""Convert a ParsedQuery to a parameterized SQL WHERE clause."""
conditions: list[str] = []
params: list = []
if parsed.keywords:
conditions.append("MATCH(title, content) AGAINST (%s IN NATURAL LANGUAGE MODE)")
params.append(" ".join(parsed.keywords))
op_sql = {"eq": "=", "gt": ">", "lt": "<", "gte": ">=", "lte": "<="}
for f in parsed.filters:
if f.operator == "range":
conditions.append(f"{f.field} BETWEEN %s AND %s")
params.extend([f.value["start"], f.value["end"]])
elif f.operator in op_sql:
conditions.append(f"{f.field} {op_sql[f.operator]} %s")
params.append(f.value)
return (" AND ".join(conditions) if conditions else "1=1"), params
Example
parsed = build_parsed_query("Show contracts with Acme Corp from 2023 over $10000")
es = to_elasticsearch(parsed)
print("Elasticsearch:", es)
solr = to_solr(parsed)
print("Solr:", solr)
where, params = to_sql_where(parsed)
print(f"SQL: WHERE {where}")
print(f" params: {params}")
Handle Ambiguous Queries
Enterprise data often contains multiple matches for a name like "John." A lookup function lets you surface candidates and ask the user to choose.
from typing import Callable
def disambiguate(
parsed: ParsedQuery,
lookup_fn: Callable[[str, str], list[dict]],
) -> dict:
"""
Check each entity against a lookup function.
Returns a dict with 'resolved', 'ambiguous', and 'needs_clarification' keys.
"""
resolved: list[dict] = []
ambiguous: list[dict] = []
for label, mentions in parsed.entities.items():
for m in mentions:
candidates = lookup_fn(m["value"], label)
if len(candidates) > 1:
ambiguous.append({
"original": m["value"],
"type": label,
"candidates": candidates,
})
elif len(candidates) == 1:
resolved.append({
"original": m["value"],
"type": label,
"resolved_to": candidates[0],
})
else:
resolved.append({
"original": m["value"],
"type": label,
"resolved_to": {"id": None, "name": m["value"]},
})
return {
"resolved": resolved,
"ambiguous": ambiguous,
"needs_clarification": len(ambiguous) > 0,
}
# Example lookup function
def mock_entity_lookup(value: str, entity_type: str) -> list[dict]:
if value.lower() == "john" and entity_type == "person":
return [
{"id": "emp_123", "name": "John Smith", "department": "Engineering"},
{"id": "emp_456", "name": "John Davis", "department": "Sales"},
]
if "acme" in value.lower() and entity_type == "organization":
return [{"id": "org_001", "name": "Acme Corporation"}]
return [{"id": None, "name": value}]
# Try it
parsed = build_parsed_query("Find documents created by John for Acme")
result = disambiguate(parsed, mock_entity_lookup)
if result["needs_clarification"]:
for amb in result["ambiguous"]:
print(f"'{amb['original']}' could be:")
for c in amb["candidates"]:
print(f" - {c['name']} ({c.get('department', '')})")
Full Search Pipeline
Combine everything into a single pipeline function and a lightweight analytics tracker.
import json
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class SearchPipelineResult:
"""Output of the search pipeline."""
query: str
parsed: ParsedQuery
search_engine_query: dict
disambiguation_needed: bool
ambiguous_entities: list[dict]
parse_time_ms: float
class SearchPipeline:
"""
End-to-end enterprise search query pipeline.
Parses queries, resolves ambiguity, and generates backend-specific queries.
"""
def __init__(
self,
model_name: str = "knowledgator/gliner-multitask-large-v0.5",
labels: list[str] | None = None,
field_map: dict[str, str] | None = None,
lookup_fn: Callable[[str, str], list[dict]] | None = None,
backend: str = "elasticsearch",
):
self.model = GLiNER.from_pretrained(model_name)
self.labels = labels or GENERAL_LABELS
self.field_map = field_map or DEFAULT_FIELD_MAP
self.lookup_fn = lookup_fn
self.backend = backend
self.history: list[SearchPipelineResult] = []
def process_query(
self,
query: str,
threshold: float = 0.5,
auto_resolve: bool = False,
) -> SearchPipelineResult:
"""Parse a query, disambiguate, and generate a search engine query."""
t0 = datetime.now()
# --- parse ---
raw = self.model.predict_entities(query, self.labels, threshold=threshold)
grouped: dict[str, list[dict]] = {}
for e in raw:
grouped.setdefault(e["label"], []).append({
"value": e["text"], "score": e["score"],
"start": e["start"], "end": e["end"],
})
filters: list[SearchFilter] = []
for label, mentions in grouped.items():
idx_field = self.field_map.get(label, label)
for m in mentions:
value, operator = m["value"], "eq"
if label in ("date", "date_range"):
value, operator = parse_date_value(value)
elif is_numeric_filter(value):
value, operator = parse_numeric_filter(value)
filters.append(SearchFilter(idx_field, operator, value, m["score"]))
keywords = extract_keywords(query, raw)
parsed = ParsedQuery(query, keywords, filters, grouped)
# --- disambiguate ---
ambiguous: list[dict] = []
if self.lookup_fn:
dis = disambiguate(parsed, self.lookup_fn)
ambiguous = dis["ambiguous"]
if auto_resolve and ambiguous:
for amb in ambiguous:
selected_id = amb["candidates"][0]["id"]
for f in parsed.filters:
if str(f.value).lower() == amb["original"].lower():
f.value = selected_id
ambiguous = []
# --- generate backend query ---
if self.backend == "elasticsearch":
engine_query = to_elasticsearch(parsed)
elif self.backend == "solr":
engine_query = {"q": to_solr(parsed)}
elif self.backend == "sql":
where, params = to_sql_where(parsed)
engine_query = {"where": where, "params": params}
else:
engine_query = {}
elapsed = (datetime.now() - t0).total_seconds() * 1000
result = SearchPipelineResult(
query=query,
parsed=parsed,
search_engine_query=engine_query,
disambiguation_needed=len(ambiguous) > 0,
ambiguous_entities=ambiguous,
parse_time_ms=elapsed,
)
self.history.append(result)
return result
# --- analytics ---
def get_analytics(self) -> dict:
"""Return summary statistics over all processed queries."""
if not self.history:
return {"total_queries": 0}
n = len(self.history)
avg_ms = sum(r.parse_time_ms for r in self.history) / n
disamb_pct = sum(1 for r in self.history if r.disambiguation_needed) / n * 100
entity_freq: dict[str, int] = {}
for r in self.history:
for label in r.parsed.entities:
entity_freq[label] = entity_freq.get(label, 0) + 1
return {
"total_queries": n,
"avg_parse_time_ms": round(avg_ms, 2),
"disambiguation_rate_pct": round(disamb_pct, 2),
"entity_type_frequency": entity_freq,
}
# --- export ---
def export_log(self, filepath: str) -> None:
"""Write query history to a JSON file."""
entries = []
for r in self.history:
entries.append({
"query": r.query,
"keywords": r.parsed.keywords,
"filters": [
{"field": f.field, "operator": f.operator, "value": f.value}
for f in r.parsed.filters
],
"disambiguation_needed": r.disambiguation_needed,
"parse_time_ms": r.parse_time_ms,
})
with open(filepath, "w") as fh:
json.dump(entries, fh, indent=2, default=str)
Run the Pipeline
pipeline = SearchPipeline(
labels=DOCUMENT_LABELS,
lookup_fn=mock_entity_lookup,
backend="elasticsearch",
)
queries = [
"Find all contracts with Acme Corp from last quarter",
"Show me John's expense reports over $500",
"Get marketing presentations created this year",
"Documents about product launch in New York",
]
for q in queries:
r = pipeline.process_query(q, auto_resolve=True)
print(f"\nQuery: {q}")
print(f" Keywords: {r.parsed.keywords}")
print(f" Filters: {len(r.parsed.filters)}")
print(f" Time: {r.parse_time_ms:.1f} ms")
if r.disambiguation_needed:
print(" ** Needs disambiguation **")
print("\nAnalytics:", pipeline.get_analytics())
Best Practices
Use domain-specific labels. Generic labels like "person" may miss nuances. For a legal search system, prefer labels like "party_name", "contract_type", "jurisdiction", and "signatory".
Tune the confidence threshold to your use case.
| Scenario | Threshold | Goal |
|---|---|---|
| Automated workflows | 0.7 | Minimize false positives |
| Interactive search | 0.5 | Balanced precision/recall |
| Exploratory search | 0.3 | Surface more candidates |
Validate before executing. Check that the parsed query is not empty, flag low-confidence filters, and detect conflicting values on the same field.
def validate(parsed: ParsedQuery) -> list[str]:
issues = []
if not parsed.keywords and not parsed.filters:
issues.append("Query produced no searchable terms.")
for f in parsed.filters:
if f.boost < 0.4:
issues.append(f"Low confidence on {f.field}={f.value} ({f.boost:.2f}).")
return issues
Cache repeated queries. Wrap predict_entities with an LRU cache keyed on (query, tuple(labels), threshold) to avoid redundant model calls during repeated searches.
from functools import lru_cache
@lru_cache(maxsize=512)
def cached_predict(query: str, labels: tuple[str, ...], threshold: float = 0.5) -> tuple:
entities = model.predict_entities(query, list(labels), threshold=threshold)
return tuple(tuple(e.items()) for e in entities)
Limitations and Considerations
-
Accuracy on specialized jargon. The general-purpose model may struggle with highly technical terms. Test with real queries from your users and consider fine-tuning if needed.
-
Relative date interpretation. Phrases like "last quarter" are resolved relative to the moment the query is parsed. Account for timezones in global deployments.
-
Numeric expression variety. Users write numbers in many ways -- "$5K", "five thousand", "around 5000." The simple regex parser above covers common patterns but may need extension.
-
GPU memory. The
gliner-multitask-large-v0.5model loads into GPU memory. For CPU-only environments, expect slower inference. Batch queries when possible. -
No intent classification. This pipeline extracts entities and filters but does not classify intent (e.g., "create" vs. "search" vs. "delete"). Pair with an intent classifier if your application needs it.
Next Steps
- Combine with PII Detection and Redaction to sanitize queries containing sensitive information.
- Use Customer Intent Classification to classify the action behind a search query.
- Build on Building a GraphRAG-Powered Research System for knowledge-graph-enhanced search.