Install SimpleLPR from PyPI:
pip install simplelpr
import simplelpr
# Create engine with default settings
setup_params = simplelpr.EngineSetupParms()
engine = simplelpr.SimpleLPR(setup_params)
# Optional: Set license key for production use
# engine.set_productKey("path/to/license.xml")
print(f"SimpleLPR Version: {engine.versionNumber.A}.{engine.versionNumber.B}.{engine.versionNumber.C}.{engine.versionNumber.D}")
print(f"Supported countries: {engine.numSupportedCountries}")
# From file
engine.set_productKey("license.xml")
# From bytes (useful for embedded keys)
with open("license.xml", "rb") as f:
license_data = f.read()
engine.set_productKey(license_data)
import simplelpr
def analyze_image(image_path, country="Spain"):
# Setup engine
setup_params = simplelpr.EngineSetupParms()
engine = simplelpr.SimpleLPR(setup_params)
# Configure for specific country
for i in range(engine.numSupportedCountries):
engine.set_countryWeight(i, 0.0) # Disable all countries
engine.set_countryWeight(country, 1.0) # Enable target country
engine.realizeCountryWeights()
# Create processor
processor = engine.createProcessor()
processor.plateRegionDetectionEnabled = True
processor.cropToPlateRegionEnabled = True
# Analyze image
candidates = processor.analyze(image_path)
# Display results
print(f"Found {len(candidates)} license plate candidates:")
for i, candidate in enumerate(candidates):
print(f"\nCandidate {i+1}:")
print(f" Dark on light background: {candidate.darkOnLight}")
print(f" Plate detection confidence: {candidate.plateDetectionConfidence:.3f}")
print(f" Bounding box: {candidate.boundingBox.left}, {candidate.boundingBox.top}, "
f"{candidate.boundingBox.width}, {candidate.boundingBox.height}")
for j, match in enumerate(candidate.matches):
print(f" Match {j+1}: '{match.text}' (Country: {match.country}, "
f"ISO: {match.countryISO}, Confidence: {match.confidence:.3f})")
# Show individual characters
for k, element in enumerate(match.elements):
print(f" Char {k+1}: '{element.glyph}' (Confidence: {element.confidence:.3f})")
# Usage
analyze_image("car_image.jpg", "Spain")
import simplelpr
import numpy as np
from PIL import Image
def setup_engine_and_processor(country="Spain"):
"""Helper function to setup engine and processor"""
setup_params = simplelpr.EngineSetupParms()
engine = simplelpr.SimpleLPR(setup_params)
# Configure country
for i in range(engine.numSupportedCountries):
engine.set_countryWeight(i, 0.0)
engine.set_countryWeight(country, 1.0)
engine.realizeCountryWeights()
processor = engine.createProcessor()
processor.plateRegionDetectionEnabled = True
processor.cropToPlateRegionEnabled = True
return engine, processor
# Method 1: File path
def analyze_from_file(image_path):
engine, processor = setup_engine_and_processor()
return processor.analyze(image_path)
# Method 2: Byte array
def analyze_from_bytes(image_path):
engine, processor = setup_engine_and_processor()
with open(image_path, 'rb') as f:
image_bytes = f.read()
return processor.analyze(image_bytes)
# Method 3: NumPy array (from PIL)
def analyze_from_pil(image_path):
engine, processor = setup_engine_and_processor()
# Load with PIL and convert to numpy
pil_image = Image.open(image_path)
numpy_array = np.asarray(pil_image)
return processor.analyze(numpy_array)
# Method 4: OpenCV (if available)
def analyze_from_opencv(image_path):
try:
import cv2
engine, processor = setup_engine_and_processor()
# Load with OpenCV
cv_image = cv2.imread(image_path)
return processor.analyze(cv_image)
except ImportError:
print("OpenCV not available")
return []
import os
import simplelpr
from pathlib import Path
def batch_analyze_images(image_folder, country="Spain", image_extensions=('.jpg', '.png', '.tif')):
"""Analyze all images in a folder"""
# Setup engine once
setup_params = simplelpr.EngineSetupParms()
engine = simplelpr.SimpleLPR(setup_params)
for i in range(engine.numSupportedCountries):
engine.set_countryWeight(i, 0.0)
engine.set_countryWeight(country, 1.0)
engine.realizeCountryWeights()
# Create processor
processor = engine.createProcessor()
processor.plateRegionDetectionEnabled = True
results = {}
# Process all images
image_folder = Path(image_folder)
for image_path in image_folder.iterdir():
if image_path.suffix.lower() in image_extensions:
try:
candidates = processor.analyze(str(image_path))
results[image_path.name] = candidates
# Print summary
if candidates:
best_match = candidates[0].matches[0] if candidates[0].matches else None
plate_text = best_match.text if best_match else "No text"
print(f"{image_path.name}: {plate_text}")
else:
print(f"{image_path.name}: No plates detected")
except Exception as e:
print(f"Error processing {image_path.name}: {e}")
results[image_path.name] = []
return results
# Usage
results = batch_analyze_images("./car_images", "Germany")
import simplelpr
def process_video_file(video_path, country="Spain", max_frames=None):
"""Process a video file frame by frame"""
# Setup engine
setup_params = simplelpr.EngineSetupParms()
engine = simplelpr.SimpleLPR(setup_params)
# Configure country
for i in range(engine.numSupportedCountries):
engine.set_countryWeight(i, 0.0)
engine.set_countryWeight(country, 1.0)
engine.realizeCountryWeights()
# Create processor
processor = engine.createProcessor()
processor.plateRegionDetectionEnabled = True
processor.cropToPlateRegionEnabled = True
# Open video source
video_source = engine.openVideoSource(
video_path,
simplelpr.FrameFormat.FRAME_FORMAT_BGR24,
-1, -1 # No size limits
)
print(f"Video opened: Live source = {video_source.isLiveSource}")
print(f"Video state: {video_source.state}")
frame_count = 0
detections = []
# Process frames
while True:
frame = video_source.nextFrame()
if frame is None:
break
frame_count += 1
# Analyze frame
candidates = processor.analyze(frame)
if candidates:
for candidate in candidates:
if candidate.matches:
match = candidate.matches[0]
detection = {
'frame': frame_count,
'sequence': frame.sequenceNumber,
'timestamp': frame.timestamp,
'text': match.text,
'country': match.country,
'confidence': match.confidence
}
detections.append(detection)
print(f"Frame {frame_count} ({frame.timestamp:.2f}s): {match.text} "
f"({match.country}, conf: {match.confidence:.3f})")
# Optional: limit processing
if max_frames and frame_count >= max_frames:
break
print(f"\nProcessed {frame_count} frames, found {len(detections)} plates")
return detections
# Usage
detections = process_video_file("traffic_video.mp4", "Germany", max_frames=1000)
import simplelpr
import os
def process_video_with_thumbnails(video_path, output_dir, country="Spain"):
"""Process video and save thumbnails of detected plates"""
# Create output directory
os.makedirs(output_dir, exist_ok=True)
# Setup engine and processor
setup_params = simplelpr.EngineSetupParms()
engine = simplelpr.SimpleLPR(setup_params)
for i in range(engine.numSupportedCountries):
engine.set_countryWeight(i, 0.0)
engine.set_countryWeight(country, 1.0)
engine.realizeCountryWeights()
processor = engine.createProcessor()
processor.plateRegionDetectionEnabled = True
processor.cropToPlateRegionEnabled = True
# Open video
video_source = engine.openVideoSource(
video_path,
simplelpr.FrameFormat.FRAME_FORMAT_BGR24,
-1, -1
)
detection_count = 0
while True:
frame = video_source.nextFrame()
if frame is None:
break
candidates = processor.analyze(frame)
for candidate in candidates:
if candidate.matches:
match = candidate.matches[0]
detection_count += 1
# Save frame as thumbnail
filename = f"detection_{detection_count:04d}_{match.text}_{frame.timestamp:.2f}s.jpg"
filepath = os.path.join(output_dir, filename)
frame.saveAsJPEG(filepath, 95) # 95% quality
print(f"Saved: {filename} - {match.text} ({match.confidence:.3f})")
print(f"Saved {detection_count} thumbnails to {output_dir}")
# Usage
process_video_with_thumbnails("traffic.mp4", "./thumbnails", "Austria")
import simplelpr
import time
def process_rtsp_stream(rtsp_url, country="Spain", duration_seconds=60):
"""Process real-time RTSP video stream"""
# Setup engine
setup_params = simplelpr.EngineSetupParms()
engine = simplelpr.SimpleLPR(setup_params)
for i in range(engine.numSupportedCountries):
engine.set_countryWeight(i, 0.0)
engine.set_countryWeight(country, 1.0)
engine.realizeCountryWeights()
processor = engine.createProcessor()
processor.plateRegionDetectionEnabled = True
# Open RTSP stream
video_source = engine.openVideoSource(
rtsp_url,
simplelpr.FrameFormat.FRAME_FORMAT_BGR24,
1920, 1080 # Limit resolution for performance
)
print(f"Stream opened: {rtsp_url}")
print(f"Is live source: {video_source.isLiveSource}")
start_time = time.time()
frame_count = 0
detections = []
try:
while time.time() - start_time < duration_seconds:
frame = video_source.nextFrame()
if frame is None:
print(f"No frame received, stream state: {video_source.state}")
# Try to reconnect if it's a live source
if video_source.isLiveSource:
print("Attempting to reconnect...")
video_source.reconnect()
time.sleep(1)
continue
else:
break
frame_count += 1
# Process frame
candidates = processor.analyze(frame)
for candidate in candidates:
if candidate.matches:
match = candidate.matches[0]
detection = {
'timestamp': time.time(),
'frame_time': frame.timestamp,
'text': match.text,
'country': match.country,
'confidence': match.confidence
}
detections.append(detection)
print(f"[{time.strftime('%H:%M:%S')}] Detected: {match.text} "
f"({match.country}, {match.confidence:.3f})")
# Print stats every 100 frames
if frame_count % 100 == 0:
fps = frame_count / (time.time() - start_time)
print(f"Processed {frame_count} frames, {fps:.1f} FPS, "
f"{len(detections)} total detections")
except KeyboardInterrupt:
print("Stream processing interrupted by user")
print(f"\nStream processing complete:")
print(f" Duration: {time.time() - start_time:.1f} seconds")
print(f" Frames processed: {frame_count}")
print(f" Total detections: {len(detections)}")
return detections
# Usage
detections = process_rtsp_stream("rtsp://camera_ip:554/stream", "Germany", 300)
import simplelpr
import time
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
def setup_processor_pool(country="Spain", num_processors=0):
"""Create and configure a processor pool"""
setup_params = simplelpr.EngineSetupParms()
setup_params.maxConcurrentImageProcessingOps = 4
engine = simplelpr.SimpleLPR(setup_params)
# Configure country
for i in range(engine.numSupportedCountries):
engine.set_countryWeight(i, 0.0)
engine.set_countryWeight(country, 1.0)
engine.realizeCountryWeights()
# Create processor pool
pool = engine.createProcessorPool(num_processors)
pool.plateRegionDetectionEnabled = True
pool.cropToPlateRegionEnabled = True
return engine, pool
def process_images_with_pool(image_paths, country="Spain"):
"""Process multiple images concurrently using processor pool"""
engine, pool = setup_processor_pool(country)
# Launch all analyses
launched_requests = []
for i, image_path in enumerate(image_paths):
success = pool.launchAnalyze(
streamId=0,
requestId=i,
timestampInSec=float(i),
timeoutInMs=simplelpr.TIMEOUT_INFINITE,
imgPath=str(image_path)
)
if success:
launched_requests.append((i, image_path))
else:
print(f"Failed to launch analysis for {image_path}")
# Collect results
results = {}
completed = 0
while completed < len(launched_requests):
result = pool.pollNextResult(0, simplelpr.TIMEOUT_INFINITE)
if result is None:
break
completed += 1
request_id = result.requestId
image_path = image_paths[request_id]
if result.errorInfo:
print(f"Error processing {image_path}: {result.errorInfo.description}")
results[image_path] = []
else:
results[image_path] = result.candidates
# Print summary
if result.candidates:
best_match = result.candidates[0].matches[0] if result.candidates[0].matches else None
plate_text = best_match.text if best_match else "No text"
print(f"Completed {completed}/{len(launched_requests)}: "
f"{image_path.name} -> {plate_text}")
else:
print(f"Completed {completed}/{len(launched_requests)}: "
f"{image_path.name} -> No plates")
return results
def concurrent_video_analysis(video_path, country="Spain", max_frames=1000):
"""Process video using processor pool for better performance"""
engine, pool = setup_processor_pool(country, num_processors=4)
# Open video
video_source = engine.openVideoSource(
video_path,
simplelpr.FrameFormat.FRAME_FORMAT_BGR24,
-1, -1
)
frames_launched = 0
frames_processed = 0
detections = []
# Process video with concurrent analysis
while frames_launched < max_frames:
frame = video_source.nextFrame()
if frame is None:
break
# Launch analysis
success = pool.launchAnalyze(
streamId=0,
requestId=frame.sequenceNumber,
timeoutInMs=simplelpr.TIMEOUT_IMMEDIATE, # Non-blocking
frame=frame
)
if success:
frames_launched += 1
# Check for completed results (non-blocking)
while True:
result = pool.pollNextResult(0, simplelpr.TIMEOUT_IMMEDIATE)
if result is None:
break
frames_processed += 1
if result.candidates:
for candidate in result.candidates:
if candidate.matches:
match = candidate.matches[0]
detection = {
'frame_id': result.requestId,
'timestamp': result.timestamp,
'text': match.text,
'confidence': match.confidence
}
detections.append(detection)
print(f"Frame {result.requestId}: {match.text} ({match.confidence:.3f})")
# Wait for remaining results
while frames_processed < frames_launched:
result = pool.pollNextResult(0, simplelpr.TIMEOUT_INFINITE)
if result is None:
break
frames_processed += 1
# Process result same as above...
print(f"Processed {frames_processed} frames, found {len(detections)} plates")
return detections
# Usage examples
if __name__ == "__main__":
# Example 1: Batch process images
image_folder = Path("./test_images")
if image_folder.exists():
image_paths = list(image_folder.glob("*.jpg"))
results = process_images_with_pool(image_paths, "Germany")
# Example 2: Concurrent video processing
# detections = concurrent_video_analysis("traffic.mp4", "Spain", 500)
import simplelpr
import os
import time
def track_plates_in_video(video_path, output_dir="./tracked_plates", country="Spain"):
"""Track license plates across video frames with thumbnail generation"""
# Create output directory
os.makedirs(output_dir, exist_ok=True)
# Setup engine
setup_params = simplelpr.EngineSetupParms()
setup_params.maxConcurrentImageProcessingOps = 4
engine = simplelpr.SimpleLPR(setup_params)
# Configure country
for i in range(engine.numSupportedCountries):
engine.set_countryWeight(i, 0.0)
engine.set_countryWeight(country, 1.0)
engine.realizeCountryWeights()
# Create processor pool
pool = engine.createProcessorPool(4)
pool.plateRegionDetectionEnabled = True
# Setup plate tracker
tracker_params = simplelpr.PlateCandidateTrackerSetupParms(
triggerWindowInSec=3.0, # 3 second trigger window
maxIdleTimeInSec=2.0, # 2 second max idle time
minTriggerFrameCount=3, # Need 3 detections to trigger
thumbnailWidth=256, # Thumbnail dimensions
thumbnailHeight=128
)
tracker = engine.createPlateCandidateTracker(tracker_params)
# Open video
video_source = engine.openVideoSource(
video_path,
simplelpr.FrameFormat.FRAME_FORMAT_BGR24,
-1, -1
)
print(f"Processing video: {video_path}")
print(f"Output directory: {output_dir}")
frame_queue = []
track_count = 0
def process_tracks(tracks, track_type):
"""Process new or closed tracks"""
nonlocal track_count
for track in tracks:
track_count += 1
# Get best match
candidate = track.representativeCandidate
if candidate.matches:
match = candidate.matches[0]
print(f"{track_type} track #{track_count}:")
print(f" Text: {match.text}")
print(f" Country: {match.country} ({match.countryISO})")
print(f" Confidence: {match.confidence:.3f}")
print(f" First detection: Frame {track.firstDetectionFrameId} "
f"at {track.firstDetectionTimestamp:.2f}s")
print(f" Latest detection: Frame {track.newestDetectionFrameId} "
f"at {track.newestDetectionTimestamp:.2f}s")
print(f" Representative: Frame {track.representativeFrameId} "
f"at {track.representativeTimestamp:.2f}s")
# Save thumbnail if available
if track.representativeThumbnail:
filename = (f"track_{track_count:04d}_{track.representativeFrameId:06d}_"
f"{track.representativeTimestamp:.2f}s_{match.text}.jpg")
filepath = os.path.join(output_dir, filename)
track.representativeThumbnail.saveAsJPEG(filepath, 95)
print(f" Saved thumbnail: {filename}")
print()
# Process video
frame_count = 0
start_time = time.time()
try:
while True:
frame = video_source.nextFrame()
if frame is None:
break
frame_queue.append(frame)
frame_count += 1
# Launch analysis
success = pool.launchAnalyze(
streamId=0,
requestId=frame.sequenceNumber,
timeoutInMs=simplelpr.TIMEOUT_INFINITE,
frame=frame
)
if not success:
print(f"Failed to launch analysis for frame {frame.sequenceNumber}")
continue
# Process completed results
while True:
result = pool.pollNextResult(0, simplelpr.TIMEOUT_IMMEDIATE)
if result is None:
break
# Find corresponding frame
if frame_queue and result.requestId == frame_queue[0].sequenceNumber:
process_frame = frame_queue.pop(0)
# Process with tracker
tracker_result = tracker.processFrameCandidates(result, process_frame)
# Handle new tracks
if tracker_result.newTracks:
process_tracks(tracker_result.newTracks, "NEW")
# Handle closed tracks
if tracker_result.closedTracks:
process_tracks(tracker_result.closedTracks, "CLOSED")
# Progress update
if frame_count % 100 == 0:
elapsed = time.time() - start_time
fps = frame_count / elapsed
print(f"Processed {frame_count} frames ({fps:.1f} FPS), "
f"{track_count} tracks found")
except KeyboardInterrupt:
print("Processing interrupted by user")
# Process remaining results
print("Processing remaining frames...")
while pool.ongoingRequestCount_get(0) > 0:
result = pool.pollNextResult(0, simplelpr.TIMEOUT_INFINITE)
if result and frame_queue and result.requestId == frame_queue[0].sequenceNumber:
process_frame = frame_queue.pop(0)
tracker_result = tracker.processFrameCandidates(result, process_frame)
if tracker_result.newTracks:
process_tracks(tracker_result.newTracks, "NEW")
if tracker_result.closedTracks:
process_tracks(tracker_result.closedTracks, "CLOSED")
# Flush tracker to get remaining tracks
print("Flushing tracker...")
final_result = tracker.flush()
if final_result.closedTracks:
process_tracks(final_result.closedTracks, "FINAL")
elapsed = time.time() - start_time
print(f"\nTracking complete:")
print(f" Total frames: {frame_count}")
print(f" Processing time: {elapsed:.1f} seconds")
print(f" Average FPS: {frame_count/elapsed:.1f}")
print(f" Total tracks: {track_count}")
print(f" Thumbnails saved to: {output_dir}")
# Usage
track_plates_in_video("traffic_video.mp4", "./tracked_plates", "Austria")
import simplelpr
import time
from collections import defaultdict
def track_realtime_stream(rtsp_url, country="Spain", duration_minutes=10):
"""Track plates in real-time stream with statistics"""
# Setup
setup_params = simplelpr.EngineSetupParms()
engine = simplelpr.SimpleLPR(setup_params)
for i in range(engine.numSupportedCountries):
engine.set_countryWeight(i, 0.0)
engine.set_countryWeight(country, 1.0)
engine.realizeCountryWeights()
pool = engine.createProcessorPool(4)
pool.plateRegionDetectionEnabled = True
# Tracker setup
tracker_params = simplelpr.PlateCandidateTrackerSetupParms(
triggerWindowInSec=2.0, # Faster for real-time
maxIdleTimeInSec=3.0,
minTriggerFrameCount=2, # Lower threshold for real-time
thumbnailWidth=128, # Smaller for performance
thumbnailHeight=64
)
tracker = engine.createPlateCandidateTracker(tracker_params)
# Open stream
video_source = engine.openVideoSource(
rtsp_url,
simplelpr.FrameFormat.FRAME_FORMAT_BGR24,
1280, 720 # Limit resolution for performance
)
print(f"Real-time tracking started: {rtsp_url}")
print(f"Duration: {duration_minutes} minutes")
# Statistics tracking
stats = {
'frames_processed': 0,
'total_detections': 0,
'unique_plates': set(),
'plate_counts': defaultdict(int),
'tracks_created': 0
}
start_time = time.time()
end_time = start_time + (duration_minutes * 60)
frame_queue = []
try:
while time.time() < end_time:
frame = video_source.nextFrame()
if frame is None:
if video_source.isLiveSource:
video_source.reconnect()
time.sleep(0.1)
continue
else:
break
frame_queue.append(frame)
stats['frames_processed'] += 1
# Launch analysis
success = pool.launchAnalyze(
streamId=0,
requestId=frame.sequenceNumber,
timeoutInMs=simplelpr.TIMEOUT_IMMEDIATE,
frame=frame
)
# Process results
while True:
result = pool.pollNextResult(0, simplelpr.TIMEOUT_IMMEDIATE)
if result is None:
break
if frame_queue and result.requestId == frame_queue[0].sequenceNumber:
process_frame = frame_queue.pop(0)
# Track plates
tracker_result = tracker.processFrameCandidates(result, process_frame)
# Process new tracks
for track in tracker_result.newTracks:
stats['tracks_created'] += 1
candidate = track.representativeCandidate
if candidate.matches:
match = candidate.matches[0]
plate_text = match.text
stats['unique_plates'].add(plate_text)
stats['plate_counts'][plate_text] += 1
print(f"[{time.strftime('%H:%M:%S')}] NEW TRACK #{stats['tracks_created']}: "
f"{plate_text} ({match.country}, conf: {match.confidence:.3f}) "
f"- Frame {track.representativeFrameId}")
# Print stats every 30 seconds
if stats['frames_processed'] % 300 == 0: # Assuming ~10 FPS
elapsed = time.time() - start_time
fps = stats['frames_processed'] / elapsed
print(f"\n--- Stats after {elapsed/60:.1f} minutes ---")
print(f"Frames processed: {stats['frames_processed']} ({fps:.1f} FPS)")
print(f"Tracks created: {stats['tracks_created']}")
print(f"Unique plates: {len(stats['unique_plates'])}")
print("Most frequent plates:")
for plate, count in sorted(stats['plate_counts'].items(),
key=lambda x: x[1], reverse=True)[:5]:
print(f" {plate}: {count} times")
print()
except KeyboardInterrupt:
print("Real-time tracking stopped by user")
# Final statistics
elapsed = time.time() - start_time
print(f"\n=== Final Statistics ===")
print(f"Duration: {elapsed/60:.1f} minutes")
print(f"Frames processed: {stats['frames_processed']}")
print(f"Average FPS: {stats['frames_processed']/elapsed:.1f}")
print(f"Total tracks: {stats['tracks_created']}")
print(f"Unique plates detected: {len(stats['unique_plates'])}")
return stats
# Usage
# stats = track_realtime_stream("rtsp://camera_ip:554/stream", "Germany", 5)
import simplelpr
def configure_engine_for_region(region="europe"):
"""Configure engine for specific geographical regions"""
setup_params = simplelpr.EngineSetupParms()
engine = simplelpr.SimpleLPR(setup_params)
# Define regional country groups (based on actually supported countries)
regions = {
"europe": ["Spain", "Germany", "France", "Italy", "Netherlands",
"Belgium", "Austria", "Switzerland", "Portugal", "Sweden",
"Norway", "Denmark", "Finland", "Poland", "Czech-Republic"],
"uk": ["UK-GreatBritain"],
"south_america": ["Argentina", "Brazil", "Chile", "Colombia", "Peru",
"Ecuador", "Uruguay", "Venezuela", "Bolivia"],
"oceania": ["Australia"],
"north_america": ["Canada"] # Note: USA and Mexico not currently supported
}
# Disable all countries first
for i in range(engine.numSupportedCountries):
engine.set_countryWeight(i, 0.0)
# Enable countries for the selected region
if region in regions:
for country in regions[region]:
try:
engine.set_countryWeight(country, 1.0)
print(f"Enabled: {country}")
except Exception:
print(f"Country not found: {country}")
else:
print(f"Unknown region: {region}")
return None
engine.realizeCountryWeights()
return engine
def advanced_processor_setup(engine, sensitivity="normal"):
"""Advanced processor configuration"""
processor = engine.createProcessor()
# Configure based on expected conditions
if sensitivity == "high":
# High sensitivity - detects more characters but may include noise and be slower
processor.contrastSensitivityFactor = 0.95 # Close to 1.0 for high sensitivity
processor.plateRegionDetectionEnabled = True
processor.cropToPlateRegionEnabled = True
elif sensitivity == "normal":
# Balanced settings - recommended for most use cases
# processor.contrastSensitivityFactor uses default value (~0.9)
processor.plateRegionDetectionEnabled = True
processor.cropToPlateRegionEnabled = False
elif sensitivity == "low":
# Low sensitivity - faster, fewer false positives, good for well-contrasted plates
processor.contrastSensitivityFactor = 0.3 # Lower for well-contrasted plates
processor.plateRegionDetectionEnabled = False
processor.cropToPlateRegionEnabled = False
return processor
# Usage examples
engine = configure_engine_for_region("europe")
if engine:
processor = advanced_processor_setup(engine, "normal") # Use normal instead of high
import simplelpr
def process_with_roi(video_path, roi_configs, country="Spain"):
"""Process video with multiple regions of interest"""
# Setup engine
setup_params = simplelpr.EngineSetupParms()
engine = simplelpr.SimpleLPR(setup_params)
for i in range(engine.numSupportedCountries):
engine.set_countryWeight(i, 0.0)
engine.set_countryWeight(country, 1.0)
engine.realizeCountryWeights()
pool = engine.createProcessorPool(len(roi_configs))
pool.plateRegionDetectionEnabled = True
# Open video
video_source = engine.openVideoSource(
video_path,
simplelpr.FrameFormat.FRAME_FORMAT_BGR24,
-1, -1
)
print(f"Processing with {len(roi_configs)} ROI regions:")
for i, roi_config in enumerate(roi_configs):
print(f" ROI {i+1}: {roi_config}")
detections_by_roi = {i: [] for i in range(len(roi_configs))}
frame_count = 0
while True:
frame = video_source.nextFrame()
if frame is None:
break
frame_count += 1
# Launch analysis for each ROI
for roi_id, roi_config in enumerate(roi_configs):
roi = simplelpr.Rectangle()
roi.left = roi_config['left']
roi.top = roi_config['top']
roi.width = roi_config['width']
roi.height = roi_config['height']
# Use different stream IDs for different ROIs
success = pool.launchAnalyze(
streamId=roi_id,
requestId=frame.sequenceNumber,
timeoutInMs=simplelpr.TIMEOUT_IMMEDIATE,
frame=frame,
roi=roi
)
# Collect results
for roi_id in range(len(roi_configs)):
while True:
result = pool.pollNextResult(roi_id, simplelpr.TIMEOUT_IMMEDIATE)
if result is None:
break
if result.candidates:
for candidate in result.candidates:
if candidate.matches:
match = candidate.matches[0]
detection = {
'frame': frame_count,
'roi': roi_id,
'roi_name': roi_configs[roi_id].get('name', f'ROI_{roi_id}'),
'text': match.text,
'confidence': match.confidence,
'timestamp': frame.timestamp
}
detections_by_roi[roi_id].append(detection)
print(f"Frame {frame_count} - {detection['roi_name']}: "
f"{match.text} ({match.confidence:.3f})")
# Summary
print(f"\nProcessing complete - {frame_count} frames:")
for roi_id, detections in detections_by_roi.items():
roi_name = roi_configs[roi_id].get('name', f'ROI_{roi_id}')
print(f" {roi_name}: {len(detections)} detections")
return detections_by_roi
# Usage
roi_configs = [
{
'name': 'Lane_1',
'left': 100, 'top': 200, 'width': 400, 'height': 300
},
{
'name': 'Lane_2',
'left': 600, 'top': 200, 'width': 400, 'height': 300
},
{
'name': 'Entrance',
'left': 200, 'top': 500, 'width': 600, 'height': 200
}
]
# detections = process_with_roi("traffic.mp4", roi_configs, "Germany")
import simplelpr
from contextlib import contextmanager
@contextmanager
def simplelpr_engine(country="Spain", license_path=None):
"""Context manager for SimpleLPR engine"""
# Setup
setup_params = simplelpr.EngineSetupParms()
engine = simplelpr.SimpleLPR(setup_params)
if license_path:
engine.set_productKey(license_path)
# Configure country
for i in range(engine.numSupportedCountries):
engine.set_countryWeight(i, 0.0)
engine.set_countryWeight(country, 1.0)
engine.realizeCountryWeights()
try:
yield engine
finally:
# Cleanup is automatic in Python
pass
@contextmanager
def simplelpr_processor(engine, sensitivity="normal"):
"""Context manager for processor"""
processor = engine.createProcessor()
# Configure processor
if sensitivity == "high":
processor.contrastSensitivityFactor = 0.3
processor.plateRegionDetectionEnabled = True
processor.cropToPlateRegionEnabled = True
elif sensitivity == "normal":
processor.contrastSensitivityFactor = 0.7
processor.plateRegionDetectionEnabled = True
processor.cropToPlateRegionEnabled = False
try:
yield processor
finally:
# Cleanup is automatic
pass
# Usage with context managers
def analyze_with_context_manager(image_path, country="Spain"):
with simplelpr_engine(country) as engine:
with simplelpr_processor(engine, "high") as processor:
return processor.analyze(image_path)
# candidates = analyze_with_context_manager("test.jpg", "Germany")
import simplelpr
import json
from pathlib import Path
class SimpleLPRConfig:
"""Configuration management for SimpleLPR"""
def __init__(self, config_file=None):
self.config = self._load_default_config()
if config_file:
self.load_from_file(config_file)
def _load_default_config(self):
return {
"engine": {
"cudaDeviceId": -1,
"enableImageProcessingWithGPU": False,
"enableClassificationWithGPU": False,
"maxConcurrentImageProcessingOps": 0
},
"processing": {
"country": "Spain",
"contrastSensitivityFactor": 0.9, # Note: Default value (~0.9) recommended
"plateRegionDetectionEnabled": True,
"cropToPlateRegionEnabled": False
},
"tracking": {
"triggerWindowInSec": 3.0,
"maxIdleTimeInSec": 3.5,
"minTriggerFrameCount": 3,
"thumbnailWidth": 256,
"thumbnailHeight": 128
},
"video": {
"frameFormat": "BGR24",
"maxWidth": -1,
"maxHeight": -1
}
}
def load_from_file(self, config_file):
"""Load configuration from JSON file"""
try:
with open(config_file, 'r') as f:
file_config = json.load(f)
# Merge with defaults
self._deep_update(self.config, file_config)
except Exception as e:
print(f"Warning: Could not load config file {config_file}: {e}")
def save_to_file(self, config_file):
"""Save current configuration to JSON file"""
with open(config_file, 'w') as f:
json.dump(self.config, f, indent=2)
def _deep_update(self, base_dict, update_dict):
"""Deep update dictionary"""
for key, value in update_dict.items():
if key in base_dict and isinstance(base_dict[key], dict) and isinstance(value, dict):
self._deep_update(base_dict[key], value)
else:
base_dict[key] = value
def create_engine(self, license_path=None):
"""Create engine with current configuration"""
# Setup parameters
setup_params = simplelpr.EngineSetupParms()
engine_config = self.config["engine"]
setup_params.cudaDeviceId = engine_config["cudaDeviceId"]
setup_params.enableImageProcessingWithGPU = engine_config["enableImageProcessingWithGPU"]
setup_params.enableClassificationWithGPU = engine_config["enableClassificationWithGPU"]
setup_params.maxConcurrentImageProcessingOps = engine_config["maxConcurrentImageProcessingOps"]
# Create engine
engine = simplelpr.SimpleLPR(setup_params)
if license_path:
engine.set_productKey(license_path)
# Configure country
country = self.config["processing"]["country"]
for i in range(engine.numSupportedCountries):
engine.set_countryWeight(i, 0.0)
engine.set_countryWeight(country, 1.0)
engine.realizeCountryWeights()
return engine
def configure_processor(self, processor):
"""Configure processor with current settings"""
proc_config = self.config["processing"]
# Only set contrast sensitivity if explicitly configured (not default)
if proc_config["contrastSensitivityFactor"] != 0.9:
processor.contrastSensitivityFactor = proc_config["contrastSensitivityFactor"]
processor.plateRegionDetectionEnabled = proc_config["plateRegionDetectionEnabled"]
processor.cropToPlateRegionEnabled = proc_config["cropToPlateRegionEnabled"]
return processor
def get_frame_format(self):
"""Get frame format enum"""
format_str = self.config["video"]["frameFormat"]
if format_str == "GRAY8":
return simplelpr.FrameFormat.FRAME_FORMAT_GRAY8
else:
return simplelpr.FrameFormat.FRAME_FORMAT_BGR24
def get_tracker_params(self):
"""Get tracker parameters"""
track_config = self.config["tracking"]
return simplelpr.PlateCandidateTrackerSetupParms(
triggerWindowInSec=track_config["triggerWindowInSec"],
maxIdleTimeInSec=track_config["maxIdleTimeInSec"],
minTriggerFrameCount=track_config["minTriggerFrameCount"],
thumbnailWidth=track_config["thumbnailWidth"],
thumbnailHeight=track_config["thumbnailHeight"]
)
# Usage
def process_with_config(image_path, config_file="simplelpr_config.json"):
"""Process image using configuration file"""
config = SimpleLPRConfig(config_file)
engine = config.create_engine()
processor = engine.createProcessor()
config.configure_processor(processor)
return processor.analyze(image_path)
# Example config file content:
example_config = {
"processing": {
"country": "Germany",
"contrastSensitivityFactor": 0.5,
"plateRegionDetectionEnabled": True,
"cropToPlateRegionEnabled": True
},
"tracking": {
"triggerWindowInSec": 2.0,
"minTriggerFrameCount": 2
}
}
# Save example config
# with open("example_config.json", "w") as f:
# json.dump(example_config, f, indent=2)
import simplelpr
import logging
import time
from datetime import datetime
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('simplelpr.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('SimpleLPR')
class SimpleLPRMonitor:
"""Monitoring and logging wrapper for SimpleLPR"""
def __init__(self, country="Spain"):
self.country = country
self.stats = {
'total_frames': 0,
'total_detections': 0,
'processing_times': [],
'start_time': None,
'errors': 0
}
self.engine = None
self.processor = None
def initialize(self, license_path=None):
"""Initialize engine and processor with logging"""
try:
logger.info("Initializing SimpleLPR engine...")
setup_params = simplelpr.EngineSetupParms()
self.engine = simplelpr.SimpleLPR(setup_params)
if license_path:
self.engine.set_productKey(license_path)
logger.info(f"License key loaded from: {license_path}")
# Configure country
for i in range(self.engine.numSupportedCountries):
self.engine.set_countryWeight(i, 0.0)
self.engine.set_countryWeight(self.country, 1.0)
self.engine.realizeCountryWeights()
logger.info(f"Engine configured for country: {self.country}")
# Create processor
self.processor = self.engine.createProcessor()
self.processor.plateRegionDetectionEnabled = True
version = self.engine.versionNumber
logger.info(f"SimpleLPR initialized successfully - Version: "
f"{version.A}.{version.B}.{version.C}.{version.D}")
self.stats['start_time'] = time.time()
except Exception as e:
logger.error(f"Failed to initialize SimpleLPR: {e}")
raise
def analyze_image(self, image_path):
"""Analyze image with monitoring"""
if not self.processor:
raise RuntimeError("SimpleLPR not initialized")
start_time = time.time()
try:
logger.debug(f"Analyzing image: {image_path}")
candidates = self.processor.analyze(image_path)
processing_time = time.time() - start_time
self.stats['processing_times'].append(processing_time)
self.stats['total_frames'] += 1
self.stats['total_detections'] += len(candidates)
logger.info(f"Image processed in {processing_time:.3f}s - "
f"Found {len(candidates)} candidates")
# Log detailed results
for i, candidate in enumerate(candidates):
if candidate.matches:
match = candidate.matches[0]
logger.info(f" Candidate {i+1}: '{match.text}' "
f"({match.country}, conf: {match.confidence:.3f})")
return candidates
except Exception as e:
self.stats['errors'] += 1
logger.error(f"Error analyzing {image_path}: {e}")
raise
def process_video_with_monitoring(self, video_path, max_frames=None):
"""Process video with detailed monitoring"""
if not self.engine:
raise RuntimeError("SimpleLPR not initialized")
logger.info(f"Starting video processing: {video_path}")
try:
video_source = self.engine.openVideoSource(
video_path,
simplelpr.FrameFormat.FRAME_FORMAT_BGR24,
-1, -1
)
logger.info(f"Video opened - Live source: {video_source.isLiveSource}")
frame_count = 0
total_detections = 0
while True:
frame = video_source.nextFrame()
if frame is None:
break
frame_count += 1
start_time = time.time()
try:
candidates = self.processor.analyze(frame)
processing_time = time.time() - start_time
self.stats['processing_times'].append(processing_time)
self.stats['total_frames'] += 1
self.stats['total_detections'] += len(candidates)
total_detections += len(candidates)
if candidates:
for candidate in candidates:
if candidate.matches:
match = candidate.matches[0]
logger.info(f"Frame {frame_count} ({frame.timestamp:.2f}s): "
f"'{match.text}' ({match.confidence:.3f})")
# Progress logging
if frame_count % 100 == 0:
avg_time = sum(self.stats['processing_times'][-100:]) / min(100, len(self.stats['processing_times']))
logger.info(f"Processed {frame_count} frames - "
f"Avg processing time: {avg_time:.3f}s")
except Exception as e:
self.stats['errors'] += 1
logger.error(f"Error processing frame {frame_count}: {e}")
if max_frames and frame_count >= max_frames:
break
logger.info(f"Video processing complete - {frame_count} frames, "
f"{total_detections} total detections")
except Exception as e:
logger.error(f"Video processing failed: {e}")
raise
def get_statistics(self):
"""Get processing statistics"""
if not self.stats['start_time']:
return {}
elapsed = time.time() - self.stats['start_time']
avg_processing_time = (sum(self.stats['processing_times']) /
len(self.stats['processing_times'])) if self.stats['processing_times'] else 0
return {
'elapsed_time': elapsed,
'total_frames': self.stats['total_frames'],
'total_detections': self.stats['total_detections'],
'avg_processing_time': avg_processing_time,
'frames_per_second': self.stats['total_frames'] / elapsed if elapsed > 0 else 0,
'detections_per_frame': (self.stats['total_detections'] /
self.stats['total_frames']) if self.stats['total_frames'] > 0 else 0,
'errors': self.stats['errors']
}
def log_final_statistics(self):
"""Log final processing statistics"""
stats = self.get_statistics()
logger.info("=== Final Processing Statistics ===")
logger.info(f"Total processing time: {stats['elapsed_time']:.1f} seconds")
logger.info(f"Total frames processed: {stats['total_frames']}")
logger.info(f"Total detections: {stats['total_detections']}")
logger.info(f"Average processing time per frame: {stats['avg_processing_time']:.3f}s")
logger.info(f"Average frames per second: {stats['frames_per_second']:.1f}")
logger.info(f"Average detections per frame: {stats['detections_per_frame']:.2f}")
logger.info(f"Errors encountered: {stats['errors']}")
# Usage
def monitored_processing_example():
monitor = SimpleLPRMonitor("Germany")
try:
monitor.initialize()
# Process some images
# candidates = monitor.analyze_image("test.jpg")
# Process video
# monitor.process_video_with_monitoring("traffic.mp4", max_frames=500)
finally:
monitor.log_final_statistics()
import simplelpr
import logging
logger = logging.getLogger(__name__)
class SimpleLPRError(Exception):
"""Custom exception for SimpleLPR operations"""
pass
class SimpleLPRLicenseError(SimpleLPRError):
"""License-related errors"""
pass
class SimpleLPRProcessingError(SimpleLPRError):
"""Processing-related errors"""
pass
def safe_engine_creation(license_path=None, country="Spain"):
"""Safely create and configure SimpleLPR engine"""
try:
# Create engine
setup_params = simplelpr.EngineSetupParms()
engine = simplelpr.SimpleLPR(setup_params)
# Set license if provided
if license_path:
try:
engine.set_productKey(license_path)
logger.info("License key applied successfully")
except Exception as e:
raise SimpleLPRLicenseError(f"Failed to set license key: {e}")
# Configure countries
try:
available_countries = []
for i in range(engine.numSupportedCountries):
country_code = engine.get_countryCode(i)
available_countries.append(country_code)
engine.set_countryWeight(i, 0.0)
# Check if requested country is available
if country not in available_countries:
logger.warning(f"Country '{country}' not found. Available: {available_countries}")
# Use first available country as fallback
country = available_countries[0] if available_countries else None
if country:
engine.set_countryWeight(country, 1.0)
engine.realizeCountryWeights()
logger.info(f"Engine configured for country: {country}")
else:
raise SimpleLPRError("No countries available")
except Exception as e:
raise SimpleLPRError(f"Failed to configure countries: {e}")
return engine, country
except Exception as e:
logger.error(f"Engine creation failed: {e}")
raise
def safe_image_analysis(processor, image_input, input_type="file"):
"""Safely analyze image with proper error handling"""
try:
if input_type == "file":
# Check if file exists
import os
if not os.path.exists(image_input):
raise SimpleLPRProcessingError(f"Image file not found: {image_input}")
candidates = processor.analyze(image_input)
elif input_type == "bytes":
if not image_input:
raise SimpleLPRProcessingError("Empty image data provided")
candidates = processor.analyze(image_input)
elif input_type == "buffer":
import numpy as np
if not isinstance(image_input, np.ndarray):
raise SimpleLPRProcessingError("Invalid buffer type - expected numpy array")
if image_input.size == 0:
raise SimpleLPRProcessingError("Empty image buffer")
candidates = processor.analyze(image_input)
else:
raise SimpleLPRProcessingError(f"Unknown input type: {input_type}")
return candidates
except SimpleLPRProcessingError:
raise # Re-raise our custom errors
except Exception as e:
raise SimpleLPRProcessingError(f"Analysis failed: {e}")
def robust_video_processing(video_path, country="Spain", license_path=None):
"""Robust video processing with error handling and recovery"""
engine = None
processor = None
video_source = None
try:
# Initialize engine
engine, actual_country = safe_engine_creation(license_path, country)
processor = engine.createProcessor()
processor.plateRegionDetectionEnabled = True
# Open video with retry logic
max_retries = 3
for attempt in range(max_retries):
try:
video_source = engine.openVideoSource(
video_path,
simplelpr.FrameFormat.FRAME_FORMAT_BGR24,
-1, -1
)
logger.info(f"Video opened successfully on attempt {attempt + 1}")
break
except Exception as e:
logger.warning(f"Video open attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
raise SimpleLPRProcessingError(f"Failed to open video after {max_retries} attempts")
time.sleep(1)
# Process video with error recovery
frame_count = 0
consecutive_errors = 0
max_consecutive_errors = 10
results = []
while consecutive_errors < max_consecutive_errors:
try:
frame = video_source.nextFrame()
if frame is None:
logger.info("End of video reached")
break
frame_count += 1
# Analyze frame
candidates = safe_image_analysis(processor, frame, "frame")
consecutive_errors = 0 # Reset error counter on success
# Process results
for candidate in candidates:
if candidate.matches:
match = candidate.matches[0]
result = {
'frame': frame_count,
'timestamp': frame.timestamp,
'text': match.text,
'country': match.country,
'confidence': match.confidence
}
results.append(result)
logger.info(f"Frame {frame_count}: {match.text} ({match.confidence:.3f})")
except Exception as e:
consecutive_errors += 1
logger.error(f"Frame {frame_count} processing error ({consecutive_errors}/{max_consecutive_errors}): {e}")
if consecutive_errors >= max_consecutive_errors:
raise SimpleLPRProcessingError(f"Too many consecutive errors ({max_consecutive_errors})")
logger.info(f"Video processing completed - {frame_count} frames, {len(results)} detections")
return results
except SimpleLPRLicenseError as e:
logger.error(f"License error: {e}")
raise
except SimpleLPRProcessingError as e:
logger.error(f"Processing error: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise SimpleLPRError(f"Video processing failed: {e}")
def safe_processor_pool_operation(engine, tasks, max_workers=4):
"""Safely execute processor pool operations with error handling"""
pool = None
try:
pool = engine.createProcessorPool(max_workers)
pool.plateRegionDetectionEnabled = True
# Launch all tasks
launched_tasks = []
failed_launches = []
for i, task in enumerate(tasks):
try:
if task['type'] == 'image_file':
success = pool.launchAnalyze(
streamId=0,
requestId=i,
timestampInSec=float(i),
timeoutInMs=simplelpr.TIMEOUT_INFINITE,
imgPath=task['path']
)
elif task['type'] == 'video_frame':
success = pool.launchAnalyze(
streamId=0,
requestId=i,
timeoutInMs=simplelpr.TIMEOUT_INFINITE,
frame=task['frame']
)
else:
logger.error(f"Unknown task type: {task['type']}")
continue
if success:
launched_tasks.append((i, task))
else:
failed_launches.append((i, task))
logger.warning(f"Failed to launch task {i}: {task}")
except Exception as e:
failed_launches.append((i, task))
logger.error(f"Error launching task {i}: {e}")
logger.info(f"Launched {len(launched_tasks)} tasks, {len(failed_launches)} failed")
# Collect results
results = {}
completed = 0
timeout_count = 0
max_timeouts = 5
while completed < len(launched_tasks) and timeout_count < max_timeouts:
try:
result = pool.pollNextResult(0, 5000) # 5 second timeout
if result is None:
timeout_count += 1
logger.warning(f"Timeout waiting for results ({timeout_count}/{max_timeouts})")
continue
timeout_count = 0 # Reset timeout counter
completed += 1
task_id = result.requestId
if result.errorInfo:
logger.error(f"Task {task_id} failed: {result.errorInfo.description}")
results[task_id] = {'error': result.errorInfo.description}
else:
results[task_id] = {'candidates': result.candidates}
if result.candidates:
logger.info(f"Task {task_id} completed - {len(result.candidates)} candidates")
except Exception as e:
logger.error(f"Error collecting results: {e}")
break
# Handle failed launches
for task_id, task in failed_launches:
results[task_id] = {'error': 'Failed to launch'}
return results
except Exception as e:
logger.error(f"Processor pool operation failed: {e}")
raise SimpleLPRProcessingError(f"Pool operation error: {e}")
# Usage examples with error handling
def example_safe_processing():
"""Example of safe SimpleLPR processing"""
try:
# Initialize with error handling
engine, country = safe_engine_creation("license.xml", "Germany")
processor = engine.createProcessor()
processor.plateRegionDetectionEnabled = True
# Safe image analysis
try:
candidates = safe_image_analysis(processor, "test_image.jpg", "file")
print(f"Found {len(candidates)} candidates")
except SimpleLPRProcessingError as e:
print(f"Image analysis failed: {e}")
# Safe video processing
try:
results = robust_video_processing("test_video.mp4", country)
print(f"Video processing completed with {len(results)} detections")
except SimpleLPRError as e:
print(f"Video processing failed: {e}")
except SimpleLPRLicenseError as e:
print(f"License issue: {e}")
print("Consider running in evaluation mode or checking license file")
except SimpleLPRError as e:
print(f"SimpleLPR error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
import simplelpr
import time
import psutil
import threading
from queue import Queue
def optimize_engine_setup():
"""Optimized engine setup for performance"""
# Get system info
cpu_count = psutil.cpu_count()
memory_gb = psutil.virtual_memory().total / (1024**3)
print(f"System: {cpu_count} CPUs, {memory_gb:.1f}GB RAM")
setup_params = simplelpr.EngineSetupParms()
# Optimize concurrent operations based on system
if memory_gb >= 16:
# High memory system
setup_params.maxConcurrentImageProcessingOps = min(cpu_count, 8)
elif memory_gb >= 8:
# Medium memory system
setup_params.maxConcurrentImageProcessingOps = min(cpu_count // 2, 4)
else:
# Low memory system
setup_params.maxConcurrentImageProcessingOps = 2
print(f"Configured for {setup_params.maxConcurrentImageProcessingOps} concurrent operations")
return setup_params
def benchmark_processing_methods(image_path, country="Spain", iterations=10):
"""Benchmark different processing approaches"""
# Setup
engine, _ = safe_engine_creation(country=country)
results = {}
# Method 1: Single processor
print("Benchmarking single processor...")
processor = engine.createProcessor()
processor.plateRegionDetectionEnabled = True
start_time = time.time()
for i in range(iterations):
candidates = processor.analyze(image_path)
single_time = time.time() - start_time
results['single_processor'] = single_time / iterations
# Method 2: Processor pool
print("Benchmarking processor pool...")
pool = engine.createProcessorPool(4)
pool.plateRegionDetectionEnabled = True
start_time = time.time()
# Launch all analyses
for i in range(iterations):
pool.launchAnalyze(0, i, float(i), simplelpr.TIMEOUT_INFINITE, image_path)
# Collect results
for i in range(iterations):
result = pool.pollNextResult(0, simplelpr.TIMEOUT_INFINITE)
pool_time = time.time() - start_time
results['processor_pool'] = pool_time / iterations
# Results
print(f"\nBenchmark Results ({iterations} iterations):")
print(f"Single processor: {results['single_processor']:.3f}s per image")
print(f"Processor pool: {results['processor_pool']:.3f}s per image")
print(f"Speedup: {results['single_processor']/results['processor_pool']:.1f}x")
return results
def optimize_video_processing(video_path, country="Spain"):
"""Optimized video processing pipeline"""
# Setup optimized engine
setup_params = optimize_engine_setup()
engine = simplelpr.SimpleLPR(setup_params)
for i in range(engine.numSupportedCountries):
engine.set_countryWeight(i, 0.0)
engine.set_countryWeight(country, 1.0)
engine.realizeCountryWeights()
# Create optimized processor pool
num_processors = setup_params.maxConcurrentImageProcessingOps
pool = engine.createProcessorPool(num_processors)
# Optimize processor settings for speed
pool.plateRegionDetectionEnabled = True
pool.cropToPlateRegionEnabled = False # Disable for speed
# Note: contrastSensitivityFactor left at default - only adjust if needed for specific image conditions
# Open video with resolution limits for performance
video_source = engine.openVideoSource(
video_path,
simplelpr.FrameFormat.FRAME_FORMAT_BGR24,
1920, 1080 # Limit resolution
)
print(f"Processing with {num_processors} processors")
# Processing pipeline
frame_queue = Queue(maxsize=num_processors * 2)
result_queue = Queue()
def frame_reader():
"""Read frames in separate thread"""
frame_count = 0
while True:
frame = video_source.nextFrame()
if frame is None:
break
frame_queue.put((frame_count, frame))
frame_count += 1
frame_queue.put(None) # End marker
def result_collector():
"""Collect results in separate thread"""
results = []
while True:
result = pool.pollNextResult(0, simplelpr.TIMEOUT_INFINITE)
if result is None:
break
result_queue.put(result)
result_queue.put(None) # End marker
# Start threads
reader_thread = threading.Thread(target=frame_reader)
collector_thread = threading.Thread(target=result_collector)
reader_thread.start()
collector_thread.start()
# Main processing loop
active_requests = 0
processed_frames = 0
detections = []
start_time = time.time()
while True:
# Launch new analyses
try:
frame_data = frame_queue.get(timeout=1.0)
if frame_data is None:
break # End of frames
frame_count, frame = frame_data
success = pool.launchAnalyze(
0, frame_count, simplelpr.TIMEOUT_IMMEDIATE, frame
)
if success:
active_requests += 1
except:
pass # Timeout, continue
# Process results
try:
result = result_queue.get(timeout=0.1)
if result is None:
break # End of results
active_requests -= 1
processed_frames += 1
if result.candidates:
for candidate in result.candidates:
if candidate.matches:
match = candidate.matches[0]
detections.append({
'frame': result.requestId,
'text': match.text,
'confidence': match.confidence
})
except:
pass # No result available
# Wait for completion
reader_thread.join()
# Process remaining results
while active_requests > 0:
result = result_queue.get()
if result is None:
break
active_requests -= 1
processed_frames += 1
# Process result...
collector_thread.join()
elapsed = time.time() - start_time
fps = processed_frames / elapsed
print(f"Optimized processing complete:")
print(f" Frames: {processed_frames}")
print(f" Time: {elapsed:.1f}s")
print(f" FPS: {fps:.1f}")
print(f" Detections: {len(detections)}")
return detections
def memory_efficient_batch_processing(image_paths, country="Spain", batch_size=10):
"""Memory-efficient batch processing for large image sets"""
engine, _ = safe_engine_creation(country=country)
pool = engine.createProcessorPool(4)
pool.plateRegionDetectionEnabled = True
total_images = len(image_paths)
all_results = {}
print(f"Processing {total_images} images in batches of {batch_size}")
for batch_start in range(0, total_images, batch_size):
batch_end = min(batch_start + batch_size, total_images)
batch_paths = image_paths[batch_start:batch_end]
print(f"Processing batch {batch_start//batch_size + 1}: "
f"images {batch_start+1}-{batch_end}")
# Process batch
launched = 0
for i, image_path in enumerate(batch_paths):
global_id = batch_start + i
success = pool.launchAnalyze(
0, global_id, float(global_id),
simplelpr.TIMEOUT_INFINITE, str(image_path)
)
if success:
launched += 1
# Collect batch results
for _ in range(launched):
result = pool.pollNextResult(0, simplelpr.TIMEOUT_INFINITE)
if result:
image_path = image_paths[result.requestId]
all_results[str(image_path)] = result.candidates
# Memory cleanup hint
import gc
gc.collect()
return all_results
# Performance testing utilities
def performance_test_suite():
"""Run comprehensive performance tests"""
print("SimpleLPR Performance Test Suite")
print("=" * 40)
# System info
print(f"CPU cores: {psutil.cpu_count()}")
print(f"Memory: {psutil.virtual_memory().total / (1024**3):.1f}GB")
print(f"Available memory: {psutil.virtual_memory().available / (1024**3):.1f}GB")
# Test different configurations
test_image = "test_image.jpg" # Replace with actual test image
if os.path.exists(test_image):
# Benchmark processing methods
benchmark_results = benchmark_processing_methods(test_image, "Spain", 5)
# Test different processor counts
engine, _ = safe_engine_creation(country="Spain")
print("\nProcessor count optimization:")
for proc_count in [1, 2, 4, 8]:
try:
pool = engine.createProcessorPool(proc_count)
start_time = time.time()
for i in range(10):
pool.launchAnalyze(0, i, float(i), simplelpr.TIMEOUT_INFINITE, test_image)
for i in range(10):
pool.pollNextResult(0, simplelpr.TIMEOUT_INFINITE)
elapsed = time.time() - start_time
print(f" {proc_count} processors: {elapsed:.2f}s ({10/elapsed:.1f} images/sec)")
except Exception as e:
print(f" {proc_count} processors: Failed - {e}")
else:
print(f"Test image {test_image} not found - skipping benchmarks")