AI based document processing with Camunda 8
This tutorial guides you through setting up a local, privacy-compliant Intelligent Document Processing (IDP) pipeline using Camunda 8 (Desktop/Run), Ollama (for the LLM), and a Python External Task Worker (for OCR and file operations).
The pipeline reads PDF documents using OCR, analyzes the content with an Agentic AI (local LLM) to determine the sender and document type, and automatically reorganizes and renames the files within your local file system.
Architecture Overview
[ Input Folder ] ➔ (Python Worker: OCR) ➔ [ Camunda 8 Engine ]
│
[ Local Directory ] ⮌ (Python Worker) ⮀ [ AI Agent Connector (Ollama) ]
1. Prerequisites & Local Infrastructure
Install Ollama & LLM
- Download and install Ollama.
- Open your terminal and download an LLM well-suited for structured JSON output and tool calling (e.g.,
Llama 3orMistral):ollama run llama3Note: Ollama exposes an OpenAI-compatible API endpoint by default athttp://localhost:11434/v1.
Set Up Python Environment
Create a project directory and install the required dependencies, including the official Camunda 8 Python SDK:
pip install camunda-orchestration-sdk pypdf pytesseract pdf2image OpenAI
(Ensure that Tesseract OCR is installed on your system and added to your system environment variables/PATH).
2. Camunda BPMN Process Modeling
Create a new BPMN diagram in the Camunda Desktop Modeler or Web Modeler.
Process Steps Definition
- Service Task 1: “Extract Document Text via OCR”
- Type (Job Worker Key):
ocr-extract
- Type (Job Worker Key):
- AI Agent Task (Connector): “Analyze Document Content”
- Use the AI Agent Connector available in Camunda 8.
- Configuration:
- Provider:
OpenAI-compatible(or Custom REST depending on your connector version) - Endpoint:
http://localhost:11434/v1 - Model ID:
llama3 - System Prompt:
You are a precise document classifier. Analyze the text and extract the sender (company or person) and the document type (e.g., Invoice, Contract, Cancellation, Cover Letter). Respond exclusively in valid JSON format: {"sender": "Name", "type": "Type"}. - Prompt:
Context Text: {{ocr_text}} - Result Variable:
analysis_result
- Provider:
- Service Task 3: “Move and Rename File”
- Type (Job Worker Key):
file-organizer
- Type (Job Worker Key):
3. Implementing the Python Job Worker
Create a file named app.py. This worker handles the extraction of the PDF content and the final local file system operations.
import os
import json
import shutil
from pathlib import Path
from pypdf import PdfReader
import pytesseract
from pdf2image import convert_from_path
from camunda.client import CamundaClient
# Configuration
INPUT_DIR = Path("./input")
STORAGE_ROOT = Path("./archive")
# Initialize local directory structures
INPUT_DIR.mkdir(exist_ok=True)
STORAGE_ROOT.mkdir(exist_ok=True)
# Initialize Camunda Client (Default configurations for Self-Managed / Local Run)
client = CamundaClient()
def extract_text_from_pdf(pdf_path):
"""Extracts native text, falls back to OCR if the PDF is a scan."""
text = ""
reader = PdfReader(pdf_path)
for page in reader.pages:
text += page.extract_text() or ""
# Fallback to OCR if no native text is found
if not text.strip():
images = convert_from_path(pdf_path)
for img in images:
text += pytesseract.image_to_string(img)
return text
# 1. WORKER: OCR & Text Extraction
@client.worker(type="ocr-extract")
def handle_ocr(job):
file_name = job.variables.get("file_name")
file_path = INPUT_DIR / file_name
if not file_path.exists():
return {"error": "File not found"}
extracted_text = extract_text_from_pdf(file_path)
# Restrict context length to fit local LLM context windows smoothly
return {"ocr_text": extracted_text[:4000]}
# 2. WORKER: File Organization & Renaming
@client.worker(type="file-organizer")
def handle_organization(job):
file_name = job.variables.get("file_name")
analysis_raw = job.variables.get("analysis_result")
# Parse the JSON string returned by the LLM
try:
data = json.loads(analysis_raw)
sender = data.get("sender", "Unknown").replace(" ", "_")
doc_type = data.get("type", "Other").replace(" ", "_")
except Exception:
sender, doc_type = "Unknown", "Unclassified"
# Define target directory structure (e.g., ./archive/Acme_Corp/Invoice/)
target_dir = STORAGE_ROOT / sender / doc_type
target_dir.mkdir(parents=True, exist_ok=True)
# Generate new standardized filename (e.g., Invoice_Acme_Corp.pdf)
file_extension = Path(file_name).suffix
new_file_name = f"{doc_type}_{sender}{file_extension}"
target_path = target_dir / new_file_name
# Execute file movement
source_path = INPUT_DIR / file_name
shutil.move(str(source_path), str(target_path))
print(f"File successfully archived: {target_path}")
return {"status": "success", "archived_path": str(target_path)}
if __name__ == "__main__":
print("Python Camunda Workers active. Awaiting jobs...")
client.start_workers()
4. Triggering and Testing the Pipeline
To feed documents into the system, you can use a secondary helper script that scans your input directory and instantiates a process instance in Camunda for each discovered file:
# trigger_process.py
import os
from camunda.client import CamundaClient
client = CamundaClient()
def check_for_new_files():
for file in os.listdir("./input"):
if file.endswith(".pdf"):
# Pass the filename into the process variables
client.start_process_instance(
process_definition_key="Process_Document_Pipeline",
variables={"file_name": file}
)
print(f"Process instance started for file: {file}")
if __name__ == "__main__":
check_for_new_files()
- Place a sample PDF document inside the
./inputfolder. - Run
python app.pyin your terminal to keep your workers listening. - In a separate terminal window, run
python trigger_process.py. - Your file will process through Camunda, get parsed by Ollama, and land perfectly organized under
./archive/[Sender]/[Type]/.
Leave a Reply