Skip to main content

Train a Financial Spam Detector with GLiClass

Fine-tune a GLiClass model to detect financial spam, phishing, and fraudulent content — entirely locally with no API keys or cloud dependencies.

Overview

This cookbook walks through the full cycle of building a financial spam classifier using GLiClass:

  1. Zero-shot baseline — Immediate spam detection without any training data
  2. Fine-tuning — Train a GLiClass model on your labeled examples for higher accuracy
  3. Production pipeline — Deploy the trained model for batch classification

GLiClass uses a natural language inference (NLI) approach where classification labels are treated as text, making it inherently zero-shot capable and highly data-efficient when fine-tuned.

Installation

pip install gliclass torch transformers scikit-learn

For GPU acceleration (optional):

# CUDA 11.8
pip install torch --index-url https://download.pytorch.org/whl/cu118

# CUDA 12.1
pip install torch --index-url https://download.pytorch.org/whl/cu121

Quick Start: Zero-Shot Detection

Detect financial spam immediately — no training required:

from gliclass import GLiClassModel, ZeroShotClassificationPipeline
from transformers import AutoTokenizer

model = GLiClassModel.from_pretrained("knowledgator/gliclass-base-v3.0")
tokenizer = AutoTokenizer.from_pretrained("knowledgator/gliclass-base-v3.0")

pipeline = ZeroShotClassificationPipeline(
model, tokenizer,
classification_type='multi-label',
device='cpu'
)

SPAM_LABELS = [
"phishing attempt",
"investment scam",
"invoice fraud",
"social engineering",
"legitimate financial communication"
]

text = "URGENT: Your account has been compromised. Click here to verify your identity immediately."
results = pipeline(text, SPAM_LABELS, threshold=0.5)[0]

for r in results:
print(f"{r['label']} => {r['score']:.3f}")
# phishing attempt => 0.921
# legitimate financial communication => 0.034

This zero-shot baseline works well for common spam patterns. For higher accuracy on your specific data, fine-tune the model as shown below.


Prepare Training Data

Data Format

GLiClass training data uses three fields — text, all_labels, and true_labels:

{"text": "URGENT: Your account has been compromised. Click here to verify your identity immediately.", "all_labels": ["phishing", "investment_scam", "invoice_fraud", "social_engineering", "legitimate"], "true_labels": ["phishing"]}
{"text": "Your monthly statement for account ending in 4521 is now available in online banking.", "all_labels": ["phishing", "investment_scam", "invoice_fraud", "social_engineering", "legitimate"], "true_labels": ["legitimate"]}
{"text": "Hi John, per our call, please wire $50,000 to the new vendor account. This is urgent - CEO", "all_labels": ["phishing", "investment_scam", "invoice_fraud", "social_engineering", "legitimate"], "true_labels": ["social_engineering", "invoice_fraud"]}

Build the Training Dataset

import json

ALL_LABELS = ["phishing", "investment_scam", "invoice_fraud", "social_engineering", "legitimate"]

training_data = [
# Phishing
{
"text": "Your bank account has been locked due to suspicious activity. Click here to unlock: http://bank-secure-verify.com",
"all_labels": ALL_LABELS,
"true_labels": ["phishing"]
},
{
"text": "Action Required: Verify your identity within 24 hours or your account will be permanently closed.",
"all_labels": ALL_LABELS,
"true_labels": ["phishing"]
},

# Investment scams
{
"text": "Exclusive offer: Join our VIP trading group and earn $10,000 daily with our proven forex signals!",
"all_labels": ALL_LABELS,
"true_labels": ["investment_scam"]
},
{
"text": "Limited spots available! Our AI trading bot guarantees 300% monthly returns. No experience needed.",
"all_labels": ALL_LABELS,
"true_labels": ["investment_scam"]
},

# Invoice fraud
{
"text": "Please update our banking details for future payments. New account: 12345678, routing: 987654321",
"all_labels": ALL_LABELS,
"true_labels": ["invoice_fraud"]
},
{
"text": "URGENT: Outstanding invoice #INV-2024-001. Payment overdue. Wire transfer required immediately.",
"all_labels": ALL_LABELS,
"true_labels": ["invoice_fraud"]
},

# Social engineering
{
"text": "Hi, this is Mike from IT. We need your login credentials to complete the security upgrade.",
"all_labels": ALL_LABELS,
"true_labels": ["social_engineering"]
},
{
"text": "I'm stuck in a meeting. Can you purchase gift cards for client appreciation? Will reimburse. -CEO",
"all_labels": ALL_LABELS,
"true_labels": ["social_engineering"]
},

# Legitimate
{
"text": "Your automatic payment of $150.00 to Electric Company has been processed successfully.",
"all_labels": ALL_LABELS,
"true_labels": ["legitimate"]
},
{
"text": "Reminder: Your quarterly investment statement is available in your secure portal.",
"all_labels": ALL_LABELS,
"true_labels": ["legitimate"]
},
{
"text": "Invoice #12345 for consulting services - Net 30 payment terms. Thank you for your business.",
"all_labels": ALL_LABELS,
"true_labels": ["legitimate"]
}
]

# Save as JSON
with open("spam_training_data.json", "w") as f:
json.dump(training_data, f, indent=2)

print(f"Saved {len(training_data)} training examples")

Data Quality Guidelines

  1. Balance your dataset: Include roughly equal examples of each category
  2. Include edge cases: Add ambiguous or challenging examples
  3. Use real-world data: Train on actual spam for better real-world performance
  4. Minimum size: At least 20-50 examples per category for effective fine-tuning
  5. Clean your data: Remove duplicates and fix labeling errors

Validate the Dataset

from collections import Counter

def validate_dataset(data: list[dict]) -> dict:
"""Check training dataset quality and balance."""
label_counts = Counter()

for item in data:
for label in item["true_labels"]:
label_counts[label] += 1

total = sum(label_counts.values())

stats = {
"total_examples": len(data),
"label_distribution": dict(label_counts),
"label_percentages": {
label: f"{count / total * 100:.1f}%"
for label, count in label_counts.items()
}
}

min_count = min(label_counts.values())
max_count = max(label_counts.values())

if max_count / min_count > 5:
stats["warning"] = "Dataset is imbalanced - consider adding more examples for minority classes"

return stats


stats = validate_dataset(training_data)
print(json.dumps(stats, indent=2))

Fine-Tune with GLiClass

Step 1: Load the Pretrained Model

import torch
from gliclass import GLiClassModel
from transformers import AutoTokenizer

device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')

model_name = "knowledgator/gliclass-base-v3.0"
model = GLiClassModel.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

model.to(device)

Step 2: Prepare the Dataset

import json
from gliclass.data_processing import GLiClassDataset, DataCollatorWithPadding, AugmentationConfig

# Load training data
with open("spam_training_data.json", "r") as f:
data = json.load(f)

# Configure data augmentation (disable for basic fine-tuning)
augment_config = AugmentationConfig(enabled=False)

train_dataset = GLiClassDataset(
data,
tokenizer,
augment_config,
label_to_description={},
max_length=512,
problem_type='multi_label_classification',
)

data_collator = DataCollatorWithPadding(device=device)
tip

Enable augmentation for larger datasets to improve generalization:

augment_config = AugmentationConfig(
enabled=True,
random_label_removal_prob=0.1,
random_label_addition_prob=0.1,
random_add_description_prob=0.1,
)

Step 3: Define Metrics

import numpy as np
from sklearn.metrics import precision_recall_fscore_support, accuracy_score

def compute_metrics(p):
predictions, labels = p
labels = labels.reshape(-1)
predictions = predictions.reshape(-1)
preds = (predictions > 0.5).astype(int)
labels = np.where(labels > 0.5, 1, 0)
precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted')
accuracy = accuracy_score(labels, preds)
return {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1': f1,
}

Step 4: Train

from gliclass.training import TrainingArguments, Trainer

training_args = TrainingArguments(
output_dir="spam-detector",
learning_rate=1e-5,
weight_decay=0.01,
others_lr=1e-5,
others_weight_decay=0.01,
lr_scheduler_type="cosine",
per_device_train_batch_size=8,
per_device_eval_batch_size=8,
num_train_epochs=3,
logging_steps=50,
use_cpu=not torch.cuda.is_available(),
report_to="none",
fp16=torch.cuda.is_available(),
)

trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
tokenizer=tokenizer,
data_collator=data_collator,
compute_metrics=compute_metrics,
)

trainer.train()

Step 5: Save the Model

# Save model and tokenizer
model.save_pretrained("./spam-detector")
tokenizer.save_pretrained("./spam-detector")
print("Model saved to ./spam-detector")

Training is fast — typically a few minutes on CPU for small datasets, or under a minute with a GPU.


Evaluate the Model

Run Inference with the Trained Model

from gliclass import GLiClassModel, ZeroShotClassificationPipeline
from transformers import AutoTokenizer

# Load the fine-tuned model
model = GLiClassModel.from_pretrained("./spam-detector")
tokenizer = AutoTokenizer.from_pretrained("./spam-detector")

pipeline = ZeroShotClassificationPipeline(
model, tokenizer,
classification_type='multi-label',
device='cpu'
)

ALL_LABELS = ["phishing", "investment_scam", "invoice_fraud", "social_engineering", "legitimate"]

test_messages = [
("URGENT: Verify your bank account NOW or face suspension!", ["phishing"]),
("Congratulations! You won $1,000,000. Send $500 to claim your prize.", ["investment_scam"]),
("Your direct deposit of $2,500.00 has been processed successfully.", ["legitimate"]),
("CEO here - need you to buy $500 in Amazon gift cards ASAP.", ["social_engineering"]),
("Invoice attached for your review. Payment due NET 30.", ["legitimate"]),
("Click here NOW to secure your crypto investment!", ["investment_scam"]),
("Please update our banking details for future payments.", ["invoice_fraud"]),
("Hi team, please review the attached quarterly report.", ["legitimate"]),
]

for text, true_labels in test_messages:
results = pipeline(text, ALL_LABELS, threshold=0.3)[0]
top = max(results, key=lambda r: r['score'])
print(f"Text: {text[:60]}...")
print(f" True: {true_labels[0]} | Predicted: {top['label']} ({top['score']:.3f})")
print()

Classification Report

from sklearn.metrics import classification_report

y_true = []
y_pred = []

for text, true_labels in test_messages:
results = pipeline(text, ALL_LABELS, threshold=0.3)[0]
top = max(results, key=lambda r: r['score'])
y_true.append(true_labels[0])
y_pred.append(top['label'])

print(classification_report(y_true, y_pred, labels=ALL_LABELS))

Production Spam Detection Pipeline

A complete spam detection system using the fine-tuned GLiClass model.

from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class SpamClassification:
"""Classification result for a single message."""
message_id: str
text: str
is_spam: bool
spam_score: float
categories: dict
primary_category: str
confidence: float
action: str # "block", "quarantine", "flag", "allow"
classified_at: str

def to_dict(self) -> dict:
return {
"message_id": self.message_id,
"is_spam": self.is_spam,
"spam_score": round(self.spam_score, 4),
"categories": {k: round(v, 4) for k, v in self.categories.items()},
"primary_category": self.primary_category,
"confidence": round(self.confidence, 4),
"action": self.action,
"classified_at": self.classified_at
}


class FinancialSpamDetector:
"""
Production spam detection pipeline using a fine-tuned GLiClass model.

No API keys or cloud dependencies required.
"""

BLOCK_THRESHOLD = 0.9
QUARANTINE_THRESHOLD = 0.7
FLAG_THRESHOLD = 0.5

DEFAULT_LABELS = [
"phishing", "investment_scam", "invoice_fraud",
"social_engineering", "legitimate"
]

def __init__(
self,
model_path: str,
labels: Optional[list[str]] = None,
device: str = "cpu"
):
from gliclass import GLiClassModel, ZeroShotClassificationPipeline
from transformers import AutoTokenizer

model = GLiClassModel.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)
self.pipeline = ZeroShotClassificationPipeline(
model, tokenizer,
classification_type='multi-label',
device=device
)
self.labels = labels or self.DEFAULT_LABELS
self.classifications: list[SpamClassification] = []

logger.info(f"Spam detector loaded from {model_path}")

def classify(
self,
message_id: str,
text: str,
threshold: float = 0.3
) -> SpamClassification:
"""Classify a single message."""
results = self.pipeline(text, self.labels, threshold=threshold)[0]
label_scores = {r['label']: r['score'] for r in results}

# Fill in any labels below threshold (not returned by pipeline)
for label in self.labels:
if label not in label_scores:
label_scores[label] = 0.0

# Calculate spam score
spam_scores = {k: v for k, v in label_scores.items() if k != "legitimate"}
spam_score = max(spam_scores.values()) if spam_scores else 0.0
legit_score = label_scores.get("legitimate", 0.0)

is_spam = spam_score > legit_score and spam_score > threshold
primary = max(label_scores.items(), key=lambda x: x[1])
action = self._determine_action(spam_score, is_spam)

classification = SpamClassification(
message_id=message_id,
text=text,
is_spam=is_spam,
spam_score=spam_score,
categories=label_scores,
primary_category=primary[0],
confidence=primary[1],
action=action,
classified_at=datetime.utcnow().isoformat()
)

self.classifications.append(classification)
return classification

def _determine_action(self, spam_score: float, is_spam: bool) -> str:
"""Map spam score to an action."""
if not is_spam:
return "allow"
if spam_score >= self.BLOCK_THRESHOLD:
return "block"
elif spam_score >= self.QUARANTINE_THRESHOLD:
return "quarantine"
elif spam_score >= self.FLAG_THRESHOLD:
return "flag"
return "allow"

def classify_batch(
self,
messages: list[dict],
threshold: float = 0.3
) -> list[SpamClassification]:
"""
Classify multiple messages.

Args:
messages: List of dicts with 'id' and 'text' keys
threshold: Classification threshold
"""
return [
self.classify(msg["id"], msg["text"], threshold)
for msg in messages
]

def get_statistics(self) -> dict:
"""Get classification statistics."""
if not self.classifications:
return {"total": 0}

total = len(self.classifications)
spam_count = sum(1 for c in self.classifications if c.is_spam)

action_counts: dict[str, int] = {}
category_counts: dict[str, int] = {}

for c in self.classifications:
action_counts[c.action] = action_counts.get(c.action, 0) + 1
if c.is_spam:
category_counts[c.primary_category] = (
category_counts.get(c.primary_category, 0) + 1
)

return {
"total": total,
"spam_count": spam_count,
"legitimate_count": total - spam_count,
"spam_rate": f"{spam_count / total * 100:.1f}%",
"actions": action_counts,
"spam_categories": category_counts
}

def export_results(self, filepath: str):
"""Export classifications to JSON."""
data = [c.to_dict() for c in self.classifications]
with open(filepath, "w") as f:
json.dump(data, f, indent=2)
logger.info(f"Exported {len(data)} classifications to {filepath}")


# Usage
if __name__ == "__main__":
detector = FinancialSpamDetector(model_path="./spam-detector")

messages = [
{"id": "MSG001", "text": "Your package is on the way! Track at: fedex.com/track/123"},
{"id": "MSG002", "text": "URGENT: Verify your bank account NOW or face suspension!"},
{"id": "MSG003", "text": "Hi, can we schedule a call to discuss Q4 projections?"},
{"id": "MSG004", "text": "Make $5000/day from home! Limited spots available!"},
{"id": "MSG005", "text": "Invoice #4521 attached. Payment due: 30 days."},
{"id": "MSG006", "text": "CEO here - wire $25,000 to this account immediately."}
]

results = detector.classify_batch(messages)

for result in results:
status = "SPAM" if result.is_spam else "OK"
print(f"[{status}] [{result.action.upper()}] {result.message_id}")
print(f" Category: {result.primary_category} ({result.confidence:.2f})")
print(f" Text: {result.text[:60]}...")
print()

stats = detector.get_statistics()
print(json.dumps(stats, indent=2))

detector.export_results("spam_classifications.json")

Best Practices

Regular Retraining

Spam tactics evolve. Monitor model confidence and retrain when performance degrades:

def should_retrain(detector: FinancialSpamDetector) -> bool:
"""Check if model needs retraining based on confidence drift."""
if not detector.classifications:
return False

low_confidence = sum(
1 for c in detector.classifications if c.confidence < 0.6
)
low_confidence_rate = low_confidence / len(detector.classifications)

return low_confidence_rate > 0.2 # More than 20% low confidence

Feedback Collection

Log predictions and corrections to build better training sets:

def log_feedback(
message_id: str,
predicted_spam: bool,
actual_spam: bool,
user_feedback: str = ""
) -> dict:
"""Log classification feedback for future retraining."""
feedback = {
"message_id": message_id,
"predicted": predicted_spam,
"actual": actual_spam,
"correct": predicted_spam == actual_spam,
"user_feedback": user_feedback,
"timestamp": datetime.utcnow().isoformat()
}

with open("feedback_log.jsonl", "a") as f:
f.write(json.dumps(feedback) + "\n")

return feedback

Threshold Tuning

Find the optimal threshold for your precision/recall requirements:

ThresholdBehaviorUse Case
0.3Aggressive — catches more spam, higher false positivesHigh-security environments
0.5Balanced — good default for most use casesGeneral email filtering
0.7Conservative — fewer false positives, may miss some spamCustomer-facing communications
0.9Very conservative — only blocks high-confidence spamLow-risk environments

Limitations and Considerations

  1. Language support: Models trained on English may not generalize to other languages. Train separate models for each language you need to support.

  2. Evolving threats: Spammers adapt their tactics. Schedule periodic retraining with fresh samples to maintain accuracy.

  3. False positives: Aggressive thresholds may block legitimate financial communications. Use the quarantine action for borderline cases.

  4. Context limitations: The model classifies message text only. Sender reputation, email headers, and attachments require additional analysis.

  5. Compute requirements: Fine-tuning and inference run locally. Ensure you have sufficient RAM (8GB+) and optionally a GPU for faster processing.

  6. Compliance: Ensure your spam detection system complies with relevant regulations (GDPR, CAN-SPAM, etc.).

  7. Human oversight: Always provide a mechanism for users to report false positives and recover quarantined messages.


Next Steps