SimpleLPR Python Quick Start Guide

Table of Contents

  1. Installation
  2. Basic Setup
  3. Your First License Plate Recognition
  4. Working with Images
  5. Processing Video Files
  6. Real-time Video Streams
  7. Multi-threaded Processing
  8. License Plate Tracking
  9. Advanced Configuration
  10. Common Patterns and Best Practices
  11. Error Handling
  12. Performance Optimization

Installation

Install SimpleLPR from PyPI:

pip install simplelpr

Requirements


Basic Setup

Minimal Example

import simplelpr # Create engine with default settings setup_params = simplelpr.EngineSetupParms() engine = simplelpr.SimpleLPR(setup_params) # Optional: Set license key for production use # engine.set_productKey("path/to/license.xml") print(f"SimpleLPR Version: {engine.versionNumber.A}.{engine.versionNumber.B}.{engine.versionNumber.C}.{engine.versionNumber.D}") print(f"Supported countries: {engine.numSupportedCountries}")

License Key Setup

# From file engine.set_productKey("license.xml") # From bytes (useful for embedded keys) with open("license.xml", "rb") as f: license_data = f.read() engine.set_productKey(license_data)

Your First License Plate Recognition

Analyze a Single Image

import simplelpr def analyze_image(image_path, country="Spain"): # Setup engine setup_params = simplelpr.EngineSetupParms() engine = simplelpr.SimpleLPR(setup_params) # Configure for specific country for i in range(engine.numSupportedCountries): engine.set_countryWeight(i, 0.0) # Disable all countries engine.set_countryWeight(country, 1.0) # Enable target country engine.realizeCountryWeights() # Create processor processor = engine.createProcessor() processor.plateRegionDetectionEnabled = True processor.cropToPlateRegionEnabled = True # Analyze image candidates = processor.analyze(image_path) # Display results print(f"Found {len(candidates)} license plate candidates:") for i, candidate in enumerate(candidates): print(f"\nCandidate {i+1}:") print(f" Dark on light background: {candidate.darkOnLight}") print(f" Plate detection confidence: {candidate.plateDetectionConfidence:.3f}") print(f" Bounding box: {candidate.boundingBox.left}, {candidate.boundingBox.top}, " f"{candidate.boundingBox.width}, {candidate.boundingBox.height}") for j, match in enumerate(candidate.matches): print(f" Match {j+1}: '{match.text}' (Country: {match.country}, " f"ISO: {match.countryISO}, Confidence: {match.confidence:.3f})") # Show individual characters for k, element in enumerate(match.elements): print(f" Char {k+1}: '{element.glyph}' (Confidence: {element.confidence:.3f})") # Usage analyze_image("car_image.jpg", "Spain")

Working with Images

Different Input Methods

import simplelpr import numpy as np from PIL import Image def setup_engine_and_processor(country="Spain"): """Helper function to setup engine and processor""" setup_params = simplelpr.EngineSetupParms() engine = simplelpr.SimpleLPR(setup_params) # Configure country for i in range(engine.numSupportedCountries): engine.set_countryWeight(i, 0.0) engine.set_countryWeight(country, 1.0) engine.realizeCountryWeights() processor = engine.createProcessor() processor.plateRegionDetectionEnabled = True processor.cropToPlateRegionEnabled = True return engine, processor # Method 1: File path def analyze_from_file(image_path): engine, processor = setup_engine_and_processor() return processor.analyze(image_path) # Method 2: Byte array def analyze_from_bytes(image_path): engine, processor = setup_engine_and_processor() with open(image_path, 'rb') as f: image_bytes = f.read() return processor.analyze(image_bytes) # Method 3: NumPy array (from PIL) def analyze_from_pil(image_path): engine, processor = setup_engine_and_processor() # Load with PIL and convert to numpy pil_image = Image.open(image_path) numpy_array = np.asarray(pil_image) return processor.analyze(numpy_array) # Method 4: OpenCV (if available) def analyze_from_opencv(image_path): try: import cv2 engine, processor = setup_engine_and_processor() # Load with OpenCV cv_image = cv2.imread(image_path) return processor.analyze(cv_image) except ImportError: print("OpenCV not available") return []

Batch Processing Multiple Images

import os import simplelpr from pathlib import Path def batch_analyze_images(image_folder, country="Spain", image_extensions=('.jpg', '.png', '.tif')): """Analyze all images in a folder""" # Setup engine once setup_params = simplelpr.EngineSetupParms() engine = simplelpr.SimpleLPR(setup_params) for i in range(engine.numSupportedCountries): engine.set_countryWeight(i, 0.0) engine.set_countryWeight(country, 1.0) engine.realizeCountryWeights() # Create processor processor = engine.createProcessor() processor.plateRegionDetectionEnabled = True results = {} # Process all images image_folder = Path(image_folder) for image_path in image_folder.iterdir(): if image_path.suffix.lower() in image_extensions: try: candidates = processor.analyze(str(image_path)) results[image_path.name] = candidates # Print summary if candidates: best_match = candidates[0].matches[0] if candidates[0].matches else None plate_text = best_match.text if best_match else "No text" print(f"{image_path.name}: {plate_text}") else: print(f"{image_path.name}: No plates detected") except Exception as e: print(f"Error processing {image_path.name}: {e}") results[image_path.name] = [] return results # Usage results = batch_analyze_images("./car_images", "Germany")

Processing Video Files

Basic Video Processing

import simplelpr def process_video_file(video_path, country="Spain", max_frames=None): """Process a video file frame by frame""" # Setup engine setup_params = simplelpr.EngineSetupParms() engine = simplelpr.SimpleLPR(setup_params) # Configure country for i in range(engine.numSupportedCountries): engine.set_countryWeight(i, 0.0) engine.set_countryWeight(country, 1.0) engine.realizeCountryWeights() # Create processor processor = engine.createProcessor() processor.plateRegionDetectionEnabled = True processor.cropToPlateRegionEnabled = True # Open video source video_source = engine.openVideoSource( video_path, simplelpr.FrameFormat.FRAME_FORMAT_BGR24, -1, -1 # No size limits ) print(f"Video opened: Live source = {video_source.isLiveSource}") print(f"Video state: {video_source.state}") frame_count = 0 detections = [] # Process frames while True: frame = video_source.nextFrame() if frame is None: break frame_count += 1 # Analyze frame candidates = processor.analyze(frame) if candidates: for candidate in candidates: if candidate.matches: match = candidate.matches[0] detection = { 'frame': frame_count, 'sequence': frame.sequenceNumber, 'timestamp': frame.timestamp, 'text': match.text, 'country': match.country, 'confidence': match.confidence } detections.append(detection) print(f"Frame {frame_count} ({frame.timestamp:.2f}s): {match.text} " f"({match.country}, conf: {match.confidence:.3f})") # Optional: limit processing if max_frames and frame_count >= max_frames: break print(f"\nProcessed {frame_count} frames, found {len(detections)} plates") return detections # Usage detections = process_video_file("traffic_video.mp4", "Germany", max_frames=1000)

Saving Frame Thumbnails

import simplelpr import os def process_video_with_thumbnails(video_path, output_dir, country="Spain"): """Process video and save thumbnails of detected plates""" # Create output directory os.makedirs(output_dir, exist_ok=True) # Setup engine and processor setup_params = simplelpr.EngineSetupParms() engine = simplelpr.SimpleLPR(setup_params) for i in range(engine.numSupportedCountries): engine.set_countryWeight(i, 0.0) engine.set_countryWeight(country, 1.0) engine.realizeCountryWeights() processor = engine.createProcessor() processor.plateRegionDetectionEnabled = True processor.cropToPlateRegionEnabled = True # Open video video_source = engine.openVideoSource( video_path, simplelpr.FrameFormat.FRAME_FORMAT_BGR24, -1, -1 ) detection_count = 0 while True: frame = video_source.nextFrame() if frame is None: break candidates = processor.analyze(frame) for candidate in candidates: if candidate.matches: match = candidate.matches[0] detection_count += 1 # Save frame as thumbnail filename = f"detection_{detection_count:04d}_{match.text}_{frame.timestamp:.2f}s.jpg" filepath = os.path.join(output_dir, filename) frame.saveAsJPEG(filepath, 95) # 95% quality print(f"Saved: {filename} - {match.text} ({match.confidence:.3f})") print(f"Saved {detection_count} thumbnails to {output_dir}") # Usage process_video_with_thumbnails("traffic.mp4", "./thumbnails", "Austria")

Real-time Video Streams

RTSP Stream Processing

import simplelpr import time def process_rtsp_stream(rtsp_url, country="Spain", duration_seconds=60): """Process real-time RTSP video stream""" # Setup engine setup_params = simplelpr.EngineSetupParms() engine = simplelpr.SimpleLPR(setup_params) for i in range(engine.numSupportedCountries): engine.set_countryWeight(i, 0.0) engine.set_countryWeight(country, 1.0) engine.realizeCountryWeights() processor = engine.createProcessor() processor.plateRegionDetectionEnabled = True # Open RTSP stream video_source = engine.openVideoSource( rtsp_url, simplelpr.FrameFormat.FRAME_FORMAT_BGR24, 1920, 1080 # Limit resolution for performance ) print(f"Stream opened: {rtsp_url}") print(f"Is live source: {video_source.isLiveSource}") start_time = time.time() frame_count = 0 detections = [] try: while time.time() - start_time < duration_seconds: frame = video_source.nextFrame() if frame is None: print(f"No frame received, stream state: {video_source.state}") # Try to reconnect if it's a live source if video_source.isLiveSource: print("Attempting to reconnect...") video_source.reconnect() time.sleep(1) continue else: break frame_count += 1 # Process frame candidates = processor.analyze(frame) for candidate in candidates: if candidate.matches: match = candidate.matches[0] detection = { 'timestamp': time.time(), 'frame_time': frame.timestamp, 'text': match.text, 'country': match.country, 'confidence': match.confidence } detections.append(detection) print(f"[{time.strftime('%H:%M:%S')}] Detected: {match.text} " f"({match.country}, {match.confidence:.3f})") # Print stats every 100 frames if frame_count % 100 == 0: fps = frame_count / (time.time() - start_time) print(f"Processed {frame_count} frames, {fps:.1f} FPS, " f"{len(detections)} total detections") except KeyboardInterrupt: print("Stream processing interrupted by user") print(f"\nStream processing complete:") print(f" Duration: {time.time() - start_time:.1f} seconds") print(f" Frames processed: {frame_count}") print(f" Total detections: {len(detections)}") return detections # Usage detections = process_rtsp_stream("rtsp://camera_ip:554/stream", "Germany", 300)

Multi-threaded Processing

Processor Pool for Concurrent Analysis

import simplelpr import time import os from pathlib import Path from concurrent.futures import ThreadPoolExecutor def setup_processor_pool(country="Spain", num_processors=0): """Create and configure a processor pool""" setup_params = simplelpr.EngineSetupParms() setup_params.maxConcurrentImageProcessingOps = 4 engine = simplelpr.SimpleLPR(setup_params) # Configure country for i in range(engine.numSupportedCountries): engine.set_countryWeight(i, 0.0) engine.set_countryWeight(country, 1.0) engine.realizeCountryWeights() # Create processor pool pool = engine.createProcessorPool(num_processors) pool.plateRegionDetectionEnabled = True pool.cropToPlateRegionEnabled = True return engine, pool def process_images_with_pool(image_paths, country="Spain"): """Process multiple images concurrently using processor pool""" engine, pool = setup_processor_pool(country) # Launch all analyses launched_requests = [] for i, image_path in enumerate(image_paths): success = pool.launchAnalyze( streamId=0, requestId=i, timestampInSec=float(i), timeoutInMs=simplelpr.TIMEOUT_INFINITE, imgPath=str(image_path) ) if success: launched_requests.append((i, image_path)) else: print(f"Failed to launch analysis for {image_path}") # Collect results results = {} completed = 0 while completed < len(launched_requests): result = pool.pollNextResult(0, simplelpr.TIMEOUT_INFINITE) if result is None: break completed += 1 request_id = result.requestId image_path = image_paths[request_id] if result.errorInfo: print(f"Error processing {image_path}: {result.errorInfo.description}") results[image_path] = [] else: results[image_path] = result.candidates # Print summary if result.candidates: best_match = result.candidates[0].matches[0] if result.candidates[0].matches else None plate_text = best_match.text if best_match else "No text" print(f"Completed {completed}/{len(launched_requests)}: " f"{image_path.name} -> {plate_text}") else: print(f"Completed {completed}/{len(launched_requests)}: " f"{image_path.name} -> No plates") return results def concurrent_video_analysis(video_path, country="Spain", max_frames=1000): """Process video using processor pool for better performance""" engine, pool = setup_processor_pool(country, num_processors=4) # Open video video_source = engine.openVideoSource( video_path, simplelpr.FrameFormat.FRAME_FORMAT_BGR24, -1, -1 ) frames_launched = 0 frames_processed = 0 detections = [] # Process video with concurrent analysis while frames_launched < max_frames: frame = video_source.nextFrame() if frame is None: break # Launch analysis success = pool.launchAnalyze( streamId=0, requestId=frame.sequenceNumber, timeoutInMs=simplelpr.TIMEOUT_IMMEDIATE, # Non-blocking frame=frame ) if success: frames_launched += 1 # Check for completed results (non-blocking) while True: result = pool.pollNextResult(0, simplelpr.TIMEOUT_IMMEDIATE) if result is None: break frames_processed += 1 if result.candidates: for candidate in result.candidates: if candidate.matches: match = candidate.matches[0] detection = { 'frame_id': result.requestId, 'timestamp': result.timestamp, 'text': match.text, 'confidence': match.confidence } detections.append(detection) print(f"Frame {result.requestId}: {match.text} ({match.confidence:.3f})") # Wait for remaining results while frames_processed < frames_launched: result = pool.pollNextResult(0, simplelpr.TIMEOUT_INFINITE) if result is None: break frames_processed += 1 # Process result same as above... print(f"Processed {frames_processed} frames, found {len(detections)} plates") return detections # Usage examples if __name__ == "__main__": # Example 1: Batch process images image_folder = Path("./test_images") if image_folder.exists(): image_paths = list(image_folder.glob("*.jpg")) results = process_images_with_pool(image_paths, "Germany") # Example 2: Concurrent video processing # detections = concurrent_video_analysis("traffic.mp4", "Spain", 500)

License Plate Tracking

Video Tracking with Thumbnails

import simplelpr import os import time def track_plates_in_video(video_path, output_dir="./tracked_plates", country="Spain"): """Track license plates across video frames with thumbnail generation""" # Create output directory os.makedirs(output_dir, exist_ok=True) # Setup engine setup_params = simplelpr.EngineSetupParms() setup_params.maxConcurrentImageProcessingOps = 4 engine = simplelpr.SimpleLPR(setup_params) # Configure country for i in range(engine.numSupportedCountries): engine.set_countryWeight(i, 0.0) engine.set_countryWeight(country, 1.0) engine.realizeCountryWeights() # Create processor pool pool = engine.createProcessorPool(4) pool.plateRegionDetectionEnabled = True # Setup plate tracker tracker_params = simplelpr.PlateCandidateTrackerSetupParms( triggerWindowInSec=3.0, # 3 second trigger window maxIdleTimeInSec=2.0, # 2 second max idle time minTriggerFrameCount=3, # Need 3 detections to trigger thumbnailWidth=256, # Thumbnail dimensions thumbnailHeight=128 ) tracker = engine.createPlateCandidateTracker(tracker_params) # Open video video_source = engine.openVideoSource( video_path, simplelpr.FrameFormat.FRAME_FORMAT_BGR24, -1, -1 ) print(f"Processing video: {video_path}") print(f"Output directory: {output_dir}") frame_queue = [] track_count = 0 def process_tracks(tracks, track_type): """Process new or closed tracks""" nonlocal track_count for track in tracks: track_count += 1 # Get best match candidate = track.representativeCandidate if candidate.matches: match = candidate.matches[0] print(f"{track_type} track #{track_count}:") print(f" Text: {match.text}") print(f" Country: {match.country} ({match.countryISO})") print(f" Confidence: {match.confidence:.3f}") print(f" First detection: Frame {track.firstDetectionFrameId} " f"at {track.firstDetectionTimestamp:.2f}s") print(f" Latest detection: Frame {track.newestDetectionFrameId} " f"at {track.newestDetectionTimestamp:.2f}s") print(f" Representative: Frame {track.representativeFrameId} " f"at {track.representativeTimestamp:.2f}s") # Save thumbnail if available if track.representativeThumbnail: filename = (f"track_{track_count:04d}_{track.representativeFrameId:06d}_" f"{track.representativeTimestamp:.2f}s_{match.text}.jpg") filepath = os.path.join(output_dir, filename) track.representativeThumbnail.saveAsJPEG(filepath, 95) print(f" Saved thumbnail: {filename}") print() # Process video frame_count = 0 start_time = time.time() try: while True: frame = video_source.nextFrame() if frame is None: break frame_queue.append(frame) frame_count += 1 # Launch analysis success = pool.launchAnalyze( streamId=0, requestId=frame.sequenceNumber, timeoutInMs=simplelpr.TIMEOUT_INFINITE, frame=frame ) if not success: print(f"Failed to launch analysis for frame {frame.sequenceNumber}") continue # Process completed results while True: result = pool.pollNextResult(0, simplelpr.TIMEOUT_IMMEDIATE) if result is None: break # Find corresponding frame if frame_queue and result.requestId == frame_queue[0].sequenceNumber: process_frame = frame_queue.pop(0) # Process with tracker tracker_result = tracker.processFrameCandidates(result, process_frame) # Handle new tracks if tracker_result.newTracks: process_tracks(tracker_result.newTracks, "NEW") # Handle closed tracks if tracker_result.closedTracks: process_tracks(tracker_result.closedTracks, "CLOSED") # Progress update if frame_count % 100 == 0: elapsed = time.time() - start_time fps = frame_count / elapsed print(f"Processed {frame_count} frames ({fps:.1f} FPS), " f"{track_count} tracks found") except KeyboardInterrupt: print("Processing interrupted by user") # Process remaining results print("Processing remaining frames...") while pool.ongoingRequestCount_get(0) > 0: result = pool.pollNextResult(0, simplelpr.TIMEOUT_INFINITE) if result and frame_queue and result.requestId == frame_queue[0].sequenceNumber: process_frame = frame_queue.pop(0) tracker_result = tracker.processFrameCandidates(result, process_frame) if tracker_result.newTracks: process_tracks(tracker_result.newTracks, "NEW") if tracker_result.closedTracks: process_tracks(tracker_result.closedTracks, "CLOSED") # Flush tracker to get remaining tracks print("Flushing tracker...") final_result = tracker.flush() if final_result.closedTracks: process_tracks(final_result.closedTracks, "FINAL") elapsed = time.time() - start_time print(f"\nTracking complete:") print(f" Total frames: {frame_count}") print(f" Processing time: {elapsed:.1f} seconds") print(f" Average FPS: {frame_count/elapsed:.1f}") print(f" Total tracks: {track_count}") print(f" Thumbnails saved to: {output_dir}") # Usage track_plates_in_video("traffic_video.mp4", "./tracked_plates", "Austria")

Real-time Stream Tracking

import simplelpr import time from collections import defaultdict def track_realtime_stream(rtsp_url, country="Spain", duration_minutes=10): """Track plates in real-time stream with statistics""" # Setup setup_params = simplelpr.EngineSetupParms() engine = simplelpr.SimpleLPR(setup_params) for i in range(engine.numSupportedCountries): engine.set_countryWeight(i, 0.0) engine.set_countryWeight(country, 1.0) engine.realizeCountryWeights() pool = engine.createProcessorPool(4) pool.plateRegionDetectionEnabled = True # Tracker setup tracker_params = simplelpr.PlateCandidateTrackerSetupParms( triggerWindowInSec=2.0, # Faster for real-time maxIdleTimeInSec=3.0, minTriggerFrameCount=2, # Lower threshold for real-time thumbnailWidth=128, # Smaller for performance thumbnailHeight=64 ) tracker = engine.createPlateCandidateTracker(tracker_params) # Open stream video_source = engine.openVideoSource( rtsp_url, simplelpr.FrameFormat.FRAME_FORMAT_BGR24, 1280, 720 # Limit resolution for performance ) print(f"Real-time tracking started: {rtsp_url}") print(f"Duration: {duration_minutes} minutes") # Statistics tracking stats = { 'frames_processed': 0, 'total_detections': 0, 'unique_plates': set(), 'plate_counts': defaultdict(int), 'tracks_created': 0 } start_time = time.time() end_time = start_time + (duration_minutes * 60) frame_queue = [] try: while time.time() < end_time: frame = video_source.nextFrame() if frame is None: if video_source.isLiveSource: video_source.reconnect() time.sleep(0.1) continue else: break frame_queue.append(frame) stats['frames_processed'] += 1 # Launch analysis success = pool.launchAnalyze( streamId=0, requestId=frame.sequenceNumber, timeoutInMs=simplelpr.TIMEOUT_IMMEDIATE, frame=frame ) # Process results while True: result = pool.pollNextResult(0, simplelpr.TIMEOUT_IMMEDIATE) if result is None: break if frame_queue and result.requestId == frame_queue[0].sequenceNumber: process_frame = frame_queue.pop(0) # Track plates tracker_result = tracker.processFrameCandidates(result, process_frame) # Process new tracks for track in tracker_result.newTracks: stats['tracks_created'] += 1 candidate = track.representativeCandidate if candidate.matches: match = candidate.matches[0] plate_text = match.text stats['unique_plates'].add(plate_text) stats['plate_counts'][plate_text] += 1 print(f"[{time.strftime('%H:%M:%S')}] NEW TRACK #{stats['tracks_created']}: " f"{plate_text} ({match.country}, conf: {match.confidence:.3f}) " f"- Frame {track.representativeFrameId}") # Print stats every 30 seconds if stats['frames_processed'] % 300 == 0: # Assuming ~10 FPS elapsed = time.time() - start_time fps = stats['frames_processed'] / elapsed print(f"\n--- Stats after {elapsed/60:.1f} minutes ---") print(f"Frames processed: {stats['frames_processed']} ({fps:.1f} FPS)") print(f"Tracks created: {stats['tracks_created']}") print(f"Unique plates: {len(stats['unique_plates'])}") print("Most frequent plates:") for plate, count in sorted(stats['plate_counts'].items(), key=lambda x: x[1], reverse=True)[:5]: print(f" {plate}: {count} times") print() except KeyboardInterrupt: print("Real-time tracking stopped by user") # Final statistics elapsed = time.time() - start_time print(f"\n=== Final Statistics ===") print(f"Duration: {elapsed/60:.1f} minutes") print(f"Frames processed: {stats['frames_processed']}") print(f"Average FPS: {stats['frames_processed']/elapsed:.1f}") print(f"Total tracks: {stats['tracks_created']}") print(f"Unique plates detected: {len(stats['unique_plates'])}") return stats # Usage # stats = track_realtime_stream("rtsp://camera_ip:554/stream", "Germany", 5)

Advanced Configuration

Country-Specific Configuration

import simplelpr def configure_engine_for_region(region="europe"): """Configure engine for specific geographical regions""" setup_params = simplelpr.EngineSetupParms() engine = simplelpr.SimpleLPR(setup_params) # Define regional country groups (based on actually supported countries) regions = { "europe": ["Spain", "Germany", "France", "Italy", "Netherlands", "Belgium", "Austria", "Switzerland", "Portugal", "Sweden", "Norway", "Denmark", "Finland", "Poland", "Czech-Republic"], "uk": ["UK-GreatBritain"], "south_america": ["Argentina", "Brazil", "Chile", "Colombia", "Peru", "Ecuador", "Uruguay", "Venezuela", "Bolivia"], "oceania": ["Australia"], "north_america": ["Canada"] # Note: USA and Mexico not currently supported } # Disable all countries first for i in range(engine.numSupportedCountries): engine.set_countryWeight(i, 0.0) # Enable countries for the selected region if region in regions: for country in regions[region]: try: engine.set_countryWeight(country, 1.0) print(f"Enabled: {country}") except Exception: print(f"Country not found: {country}") else: print(f"Unknown region: {region}") return None engine.realizeCountryWeights() return engine def advanced_processor_setup(engine, sensitivity="normal"): """Advanced processor configuration""" processor = engine.createProcessor() # Configure based on expected conditions if sensitivity == "high": # High sensitivity - detects more characters but may include noise and be slower processor.contrastSensitivityFactor = 0.95 # Close to 1.0 for high sensitivity processor.plateRegionDetectionEnabled = True processor.cropToPlateRegionEnabled = True elif sensitivity == "normal": # Balanced settings - recommended for most use cases # processor.contrastSensitivityFactor uses default value (~0.9) processor.plateRegionDetectionEnabled = True processor.cropToPlateRegionEnabled = False elif sensitivity == "low": # Low sensitivity - faster, fewer false positives, good for well-contrasted plates processor.contrastSensitivityFactor = 0.3 # Lower for well-contrasted plates processor.plateRegionDetectionEnabled = False processor.cropToPlateRegionEnabled = False return processor # Usage examples engine = configure_engine_for_region("europe") if engine: processor = advanced_processor_setup(engine, "normal") # Use normal instead of high

ROI (Region of Interest) Processing

import simplelpr def process_with_roi(video_path, roi_configs, country="Spain"): """Process video with multiple regions of interest""" # Setup engine setup_params = simplelpr.EngineSetupParms() engine = simplelpr.SimpleLPR(setup_params) for i in range(engine.numSupportedCountries): engine.set_countryWeight(i, 0.0) engine.set_countryWeight(country, 1.0) engine.realizeCountryWeights() pool = engine.createProcessorPool(len(roi_configs)) pool.plateRegionDetectionEnabled = True # Open video video_source = engine.openVideoSource( video_path, simplelpr.FrameFormat.FRAME_FORMAT_BGR24, -1, -1 ) print(f"Processing with {len(roi_configs)} ROI regions:") for i, roi_config in enumerate(roi_configs): print(f" ROI {i+1}: {roi_config}") detections_by_roi = {i: [] for i in range(len(roi_configs))} frame_count = 0 while True: frame = video_source.nextFrame() if frame is None: break frame_count += 1 # Launch analysis for each ROI for roi_id, roi_config in enumerate(roi_configs): roi = simplelpr.Rectangle() roi.left = roi_config['left'] roi.top = roi_config['top'] roi.width = roi_config['width'] roi.height = roi_config['height'] # Use different stream IDs for different ROIs success = pool.launchAnalyze( streamId=roi_id, requestId=frame.sequenceNumber, timeoutInMs=simplelpr.TIMEOUT_IMMEDIATE, frame=frame, roi=roi ) # Collect results for roi_id in range(len(roi_configs)): while True: result = pool.pollNextResult(roi_id, simplelpr.TIMEOUT_IMMEDIATE) if result is None: break if result.candidates: for candidate in result.candidates: if candidate.matches: match = candidate.matches[0] detection = { 'frame': frame_count, 'roi': roi_id, 'roi_name': roi_configs[roi_id].get('name', f'ROI_{roi_id}'), 'text': match.text, 'confidence': match.confidence, 'timestamp': frame.timestamp } detections_by_roi[roi_id].append(detection) print(f"Frame {frame_count} - {detection['roi_name']}: " f"{match.text} ({match.confidence:.3f})") # Summary print(f"\nProcessing complete - {frame_count} frames:") for roi_id, detections in detections_by_roi.items(): roi_name = roi_configs[roi_id].get('name', f'ROI_{roi_id}') print(f" {roi_name}: {len(detections)} detections") return detections_by_roi # Usage roi_configs = [ { 'name': 'Lane_1', 'left': 100, 'top': 200, 'width': 400, 'height': 300 }, { 'name': 'Lane_2', 'left': 600, 'top': 200, 'width': 400, 'height': 300 }, { 'name': 'Entrance', 'left': 200, 'top': 500, 'width': 600, 'height': 200 } ] # detections = process_with_roi("traffic.mp4", roi_configs, "Germany")

Common Patterns and Best Practices

Resource Management Pattern

import simplelpr from contextlib import contextmanager @contextmanager def simplelpr_engine(country="Spain", license_path=None): """Context manager for SimpleLPR engine""" # Setup setup_params = simplelpr.EngineSetupParms() engine = simplelpr.SimpleLPR(setup_params) if license_path: engine.set_productKey(license_path) # Configure country for i in range(engine.numSupportedCountries): engine.set_countryWeight(i, 0.0) engine.set_countryWeight(country, 1.0) engine.realizeCountryWeights() try: yield engine finally: # Cleanup is automatic in Python pass @contextmanager def simplelpr_processor(engine, sensitivity="normal"): """Context manager for processor""" processor = engine.createProcessor() # Configure processor if sensitivity == "high": processor.contrastSensitivityFactor = 0.3 processor.plateRegionDetectionEnabled = True processor.cropToPlateRegionEnabled = True elif sensitivity == "normal": processor.contrastSensitivityFactor = 0.7 processor.plateRegionDetectionEnabled = True processor.cropToPlateRegionEnabled = False try: yield processor finally: # Cleanup is automatic pass # Usage with context managers def analyze_with_context_manager(image_path, country="Spain"): with simplelpr_engine(country) as engine: with simplelpr_processor(engine, "high") as processor: return processor.analyze(image_path) # candidates = analyze_with_context_manager("test.jpg", "Germany")

Configuration Management

import simplelpr import json from pathlib import Path class SimpleLPRConfig: """Configuration management for SimpleLPR""" def __init__(self, config_file=None): self.config = self._load_default_config() if config_file: self.load_from_file(config_file) def _load_default_config(self): return { "engine": { "cudaDeviceId": -1, "enableImageProcessingWithGPU": False, "enableClassificationWithGPU": False, "maxConcurrentImageProcessingOps": 0 }, "processing": { "country": "Spain", "contrastSensitivityFactor": 0.9, # Note: Default value (~0.9) recommended "plateRegionDetectionEnabled": True, "cropToPlateRegionEnabled": False }, "tracking": { "triggerWindowInSec": 3.0, "maxIdleTimeInSec": 3.5, "minTriggerFrameCount": 3, "thumbnailWidth": 256, "thumbnailHeight": 128 }, "video": { "frameFormat": "BGR24", "maxWidth": -1, "maxHeight": -1 } } def load_from_file(self, config_file): """Load configuration from JSON file""" try: with open(config_file, 'r') as f: file_config = json.load(f) # Merge with defaults self._deep_update(self.config, file_config) except Exception as e: print(f"Warning: Could not load config file {config_file}: {e}") def save_to_file(self, config_file): """Save current configuration to JSON file""" with open(config_file, 'w') as f: json.dump(self.config, f, indent=2) def _deep_update(self, base_dict, update_dict): """Deep update dictionary""" for key, value in update_dict.items(): if key in base_dict and isinstance(base_dict[key], dict) and isinstance(value, dict): self._deep_update(base_dict[key], value) else: base_dict[key] = value def create_engine(self, license_path=None): """Create engine with current configuration""" # Setup parameters setup_params = simplelpr.EngineSetupParms() engine_config = self.config["engine"] setup_params.cudaDeviceId = engine_config["cudaDeviceId"] setup_params.enableImageProcessingWithGPU = engine_config["enableImageProcessingWithGPU"] setup_params.enableClassificationWithGPU = engine_config["enableClassificationWithGPU"] setup_params.maxConcurrentImageProcessingOps = engine_config["maxConcurrentImageProcessingOps"] # Create engine engine = simplelpr.SimpleLPR(setup_params) if license_path: engine.set_productKey(license_path) # Configure country country = self.config["processing"]["country"] for i in range(engine.numSupportedCountries): engine.set_countryWeight(i, 0.0) engine.set_countryWeight(country, 1.0) engine.realizeCountryWeights() return engine def configure_processor(self, processor): """Configure processor with current settings""" proc_config = self.config["processing"] # Only set contrast sensitivity if explicitly configured (not default) if proc_config["contrastSensitivityFactor"] != 0.9: processor.contrastSensitivityFactor = proc_config["contrastSensitivityFactor"] processor.plateRegionDetectionEnabled = proc_config["plateRegionDetectionEnabled"] processor.cropToPlateRegionEnabled = proc_config["cropToPlateRegionEnabled"] return processor def get_frame_format(self): """Get frame format enum""" format_str = self.config["video"]["frameFormat"] if format_str == "GRAY8": return simplelpr.FrameFormat.FRAME_FORMAT_GRAY8 else: return simplelpr.FrameFormat.FRAME_FORMAT_BGR24 def get_tracker_params(self): """Get tracker parameters""" track_config = self.config["tracking"] return simplelpr.PlateCandidateTrackerSetupParms( triggerWindowInSec=track_config["triggerWindowInSec"], maxIdleTimeInSec=track_config["maxIdleTimeInSec"], minTriggerFrameCount=track_config["minTriggerFrameCount"], thumbnailWidth=track_config["thumbnailWidth"], thumbnailHeight=track_config["thumbnailHeight"] ) # Usage def process_with_config(image_path, config_file="simplelpr_config.json"): """Process image using configuration file""" config = SimpleLPRConfig(config_file) engine = config.create_engine() processor = engine.createProcessor() config.configure_processor(processor) return processor.analyze(image_path) # Example config file content: example_config = { "processing": { "country": "Germany", "contrastSensitivityFactor": 0.5, "plateRegionDetectionEnabled": True, "cropToPlateRegionEnabled": True }, "tracking": { "triggerWindowInSec": 2.0, "minTriggerFrameCount": 2 } } # Save example config # with open("example_config.json", "w") as f: # json.dump(example_config, f, indent=2)

Logging and Monitoring

import simplelpr import logging import time from datetime import datetime # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('simplelpr.log'), logging.StreamHandler() ] ) logger = logging.getLogger('SimpleLPR') class SimpleLPRMonitor: """Monitoring and logging wrapper for SimpleLPR""" def __init__(self, country="Spain"): self.country = country self.stats = { 'total_frames': 0, 'total_detections': 0, 'processing_times': [], 'start_time': None, 'errors': 0 } self.engine = None self.processor = None def initialize(self, license_path=None): """Initialize engine and processor with logging""" try: logger.info("Initializing SimpleLPR engine...") setup_params = simplelpr.EngineSetupParms() self.engine = simplelpr.SimpleLPR(setup_params) if license_path: self.engine.set_productKey(license_path) logger.info(f"License key loaded from: {license_path}") # Configure country for i in range(self.engine.numSupportedCountries): self.engine.set_countryWeight(i, 0.0) self.engine.set_countryWeight(self.country, 1.0) self.engine.realizeCountryWeights() logger.info(f"Engine configured for country: {self.country}") # Create processor self.processor = self.engine.createProcessor() self.processor.plateRegionDetectionEnabled = True version = self.engine.versionNumber logger.info(f"SimpleLPR initialized successfully - Version: " f"{version.A}.{version.B}.{version.C}.{version.D}") self.stats['start_time'] = time.time() except Exception as e: logger.error(f"Failed to initialize SimpleLPR: {e}") raise def analyze_image(self, image_path): """Analyze image with monitoring""" if not self.processor: raise RuntimeError("SimpleLPR not initialized") start_time = time.time() try: logger.debug(f"Analyzing image: {image_path}") candidates = self.processor.analyze(image_path) processing_time = time.time() - start_time self.stats['processing_times'].append(processing_time) self.stats['total_frames'] += 1 self.stats['total_detections'] += len(candidates) logger.info(f"Image processed in {processing_time:.3f}s - " f"Found {len(candidates)} candidates") # Log detailed results for i, candidate in enumerate(candidates): if candidate.matches: match = candidate.matches[0] logger.info(f" Candidate {i+1}: '{match.text}' " f"({match.country}, conf: {match.confidence:.3f})") return candidates except Exception as e: self.stats['errors'] += 1 logger.error(f"Error analyzing {image_path}: {e}") raise def process_video_with_monitoring(self, video_path, max_frames=None): """Process video with detailed monitoring""" if not self.engine: raise RuntimeError("SimpleLPR not initialized") logger.info(f"Starting video processing: {video_path}") try: video_source = self.engine.openVideoSource( video_path, simplelpr.FrameFormat.FRAME_FORMAT_BGR24, -1, -1 ) logger.info(f"Video opened - Live source: {video_source.isLiveSource}") frame_count = 0 total_detections = 0 while True: frame = video_source.nextFrame() if frame is None: break frame_count += 1 start_time = time.time() try: candidates = self.processor.analyze(frame) processing_time = time.time() - start_time self.stats['processing_times'].append(processing_time) self.stats['total_frames'] += 1 self.stats['total_detections'] += len(candidates) total_detections += len(candidates) if candidates: for candidate in candidates: if candidate.matches: match = candidate.matches[0] logger.info(f"Frame {frame_count} ({frame.timestamp:.2f}s): " f"'{match.text}' ({match.confidence:.3f})") # Progress logging if frame_count % 100 == 0: avg_time = sum(self.stats['processing_times'][-100:]) / min(100, len(self.stats['processing_times'])) logger.info(f"Processed {frame_count} frames - " f"Avg processing time: {avg_time:.3f}s") except Exception as e: self.stats['errors'] += 1 logger.error(f"Error processing frame {frame_count}: {e}") if max_frames and frame_count >= max_frames: break logger.info(f"Video processing complete - {frame_count} frames, " f"{total_detections} total detections") except Exception as e: logger.error(f"Video processing failed: {e}") raise def get_statistics(self): """Get processing statistics""" if not self.stats['start_time']: return {} elapsed = time.time() - self.stats['start_time'] avg_processing_time = (sum(self.stats['processing_times']) / len(self.stats['processing_times'])) if self.stats['processing_times'] else 0 return { 'elapsed_time': elapsed, 'total_frames': self.stats['total_frames'], 'total_detections': self.stats['total_detections'], 'avg_processing_time': avg_processing_time, 'frames_per_second': self.stats['total_frames'] / elapsed if elapsed > 0 else 0, 'detections_per_frame': (self.stats['total_detections'] / self.stats['total_frames']) if self.stats['total_frames'] > 0 else 0, 'errors': self.stats['errors'] } def log_final_statistics(self): """Log final processing statistics""" stats = self.get_statistics() logger.info("=== Final Processing Statistics ===") logger.info(f"Total processing time: {stats['elapsed_time']:.1f} seconds") logger.info(f"Total frames processed: {stats['total_frames']}") logger.info(f"Total detections: {stats['total_detections']}") logger.info(f"Average processing time per frame: {stats['avg_processing_time']:.3f}s") logger.info(f"Average frames per second: {stats['frames_per_second']:.1f}") logger.info(f"Average detections per frame: {stats['detections_per_frame']:.2f}") logger.info(f"Errors encountered: {stats['errors']}") # Usage def monitored_processing_example(): monitor = SimpleLPRMonitor("Germany") try: monitor.initialize() # Process some images # candidates = monitor.analyze_image("test.jpg") # Process video # monitor.process_video_with_monitoring("traffic.mp4", max_frames=500) finally: monitor.log_final_statistics()

Error Handling

Comprehensive Error Handling Patterns

import simplelpr import logging logger = logging.getLogger(__name__) class SimpleLPRError(Exception): """Custom exception for SimpleLPR operations""" pass class SimpleLPRLicenseError(SimpleLPRError): """License-related errors""" pass class SimpleLPRProcessingError(SimpleLPRError): """Processing-related errors""" pass def safe_engine_creation(license_path=None, country="Spain"): """Safely create and configure SimpleLPR engine""" try: # Create engine setup_params = simplelpr.EngineSetupParms() engine = simplelpr.SimpleLPR(setup_params) # Set license if provided if license_path: try: engine.set_productKey(license_path) logger.info("License key applied successfully") except Exception as e: raise SimpleLPRLicenseError(f"Failed to set license key: {e}") # Configure countries try: available_countries = [] for i in range(engine.numSupportedCountries): country_code = engine.get_countryCode(i) available_countries.append(country_code) engine.set_countryWeight(i, 0.0) # Check if requested country is available if country not in available_countries: logger.warning(f"Country '{country}' not found. Available: {available_countries}") # Use first available country as fallback country = available_countries[0] if available_countries else None if country: engine.set_countryWeight(country, 1.0) engine.realizeCountryWeights() logger.info(f"Engine configured for country: {country}") else: raise SimpleLPRError("No countries available") except Exception as e: raise SimpleLPRError(f"Failed to configure countries: {e}") return engine, country except Exception as e: logger.error(f"Engine creation failed: {e}") raise def safe_image_analysis(processor, image_input, input_type="file"): """Safely analyze image with proper error handling""" try: if input_type == "file": # Check if file exists import os if not os.path.exists(image_input): raise SimpleLPRProcessingError(f"Image file not found: {image_input}") candidates = processor.analyze(image_input) elif input_type == "bytes": if not image_input: raise SimpleLPRProcessingError("Empty image data provided") candidates = processor.analyze(image_input) elif input_type == "buffer": import numpy as np if not isinstance(image_input, np.ndarray): raise SimpleLPRProcessingError("Invalid buffer type - expected numpy array") if image_input.size == 0: raise SimpleLPRProcessingError("Empty image buffer") candidates = processor.analyze(image_input) else: raise SimpleLPRProcessingError(f"Unknown input type: {input_type}") return candidates except SimpleLPRProcessingError: raise # Re-raise our custom errors except Exception as e: raise SimpleLPRProcessingError(f"Analysis failed: {e}") def robust_video_processing(video_path, country="Spain", license_path=None): """Robust video processing with error handling and recovery""" engine = None processor = None video_source = None try: # Initialize engine engine, actual_country = safe_engine_creation(license_path, country) processor = engine.createProcessor() processor.plateRegionDetectionEnabled = True # Open video with retry logic max_retries = 3 for attempt in range(max_retries): try: video_source = engine.openVideoSource( video_path, simplelpr.FrameFormat.FRAME_FORMAT_BGR24, -1, -1 ) logger.info(f"Video opened successfully on attempt {attempt + 1}") break except Exception as e: logger.warning(f"Video open attempt {attempt + 1} failed: {e}") if attempt == max_retries - 1: raise SimpleLPRProcessingError(f"Failed to open video after {max_retries} attempts") time.sleep(1) # Process video with error recovery frame_count = 0 consecutive_errors = 0 max_consecutive_errors = 10 results = [] while consecutive_errors < max_consecutive_errors: try: frame = video_source.nextFrame() if frame is None: logger.info("End of video reached") break frame_count += 1 # Analyze frame candidates = safe_image_analysis(processor, frame, "frame") consecutive_errors = 0 # Reset error counter on success # Process results for candidate in candidates: if candidate.matches: match = candidate.matches[0] result = { 'frame': frame_count, 'timestamp': frame.timestamp, 'text': match.text, 'country': match.country, 'confidence': match.confidence } results.append(result) logger.info(f"Frame {frame_count}: {match.text} ({match.confidence:.3f})") except Exception as e: consecutive_errors += 1 logger.error(f"Frame {frame_count} processing error ({consecutive_errors}/{max_consecutive_errors}): {e}") if consecutive_errors >= max_consecutive_errors: raise SimpleLPRProcessingError(f"Too many consecutive errors ({max_consecutive_errors})") logger.info(f"Video processing completed - {frame_count} frames, {len(results)} detections") return results except SimpleLPRLicenseError as e: logger.error(f"License error: {e}") raise except SimpleLPRProcessingError as e: logger.error(f"Processing error: {e}") raise except Exception as e: logger.error(f"Unexpected error: {e}") raise SimpleLPRError(f"Video processing failed: {e}") def safe_processor_pool_operation(engine, tasks, max_workers=4): """Safely execute processor pool operations with error handling""" pool = None try: pool = engine.createProcessorPool(max_workers) pool.plateRegionDetectionEnabled = True # Launch all tasks launched_tasks = [] failed_launches = [] for i, task in enumerate(tasks): try: if task['type'] == 'image_file': success = pool.launchAnalyze( streamId=0, requestId=i, timestampInSec=float(i), timeoutInMs=simplelpr.TIMEOUT_INFINITE, imgPath=task['path'] ) elif task['type'] == 'video_frame': success = pool.launchAnalyze( streamId=0, requestId=i, timeoutInMs=simplelpr.TIMEOUT_INFINITE, frame=task['frame'] ) else: logger.error(f"Unknown task type: {task['type']}") continue if success: launched_tasks.append((i, task)) else: failed_launches.append((i, task)) logger.warning(f"Failed to launch task {i}: {task}") except Exception as e: failed_launches.append((i, task)) logger.error(f"Error launching task {i}: {e}") logger.info(f"Launched {len(launched_tasks)} tasks, {len(failed_launches)} failed") # Collect results results = {} completed = 0 timeout_count = 0 max_timeouts = 5 while completed < len(launched_tasks) and timeout_count < max_timeouts: try: result = pool.pollNextResult(0, 5000) # 5 second timeout if result is None: timeout_count += 1 logger.warning(f"Timeout waiting for results ({timeout_count}/{max_timeouts})") continue timeout_count = 0 # Reset timeout counter completed += 1 task_id = result.requestId if result.errorInfo: logger.error(f"Task {task_id} failed: {result.errorInfo.description}") results[task_id] = {'error': result.errorInfo.description} else: results[task_id] = {'candidates': result.candidates} if result.candidates: logger.info(f"Task {task_id} completed - {len(result.candidates)} candidates") except Exception as e: logger.error(f"Error collecting results: {e}") break # Handle failed launches for task_id, task in failed_launches: results[task_id] = {'error': 'Failed to launch'} return results except Exception as e: logger.error(f"Processor pool operation failed: {e}") raise SimpleLPRProcessingError(f"Pool operation error: {e}") # Usage examples with error handling def example_safe_processing(): """Example of safe SimpleLPR processing""" try: # Initialize with error handling engine, country = safe_engine_creation("license.xml", "Germany") processor = engine.createProcessor() processor.plateRegionDetectionEnabled = True # Safe image analysis try: candidates = safe_image_analysis(processor, "test_image.jpg", "file") print(f"Found {len(candidates)} candidates") except SimpleLPRProcessingError as e: print(f"Image analysis failed: {e}") # Safe video processing try: results = robust_video_processing("test_video.mp4", country) print(f"Video processing completed with {len(results)} detections") except SimpleLPRError as e: print(f"Video processing failed: {e}") except SimpleLPRLicenseError as e: print(f"License issue: {e}") print("Consider running in evaluation mode or checking license file") except SimpleLPRError as e: print(f"SimpleLPR error: {e}") except Exception as e: print(f"Unexpected error: {e}")

Performance Optimization

Optimization Strategies

import simplelpr import time import psutil import threading from queue import Queue def optimize_engine_setup(): """Optimized engine setup for performance""" # Get system info cpu_count = psutil.cpu_count() memory_gb = psutil.virtual_memory().total / (1024**3) print(f"System: {cpu_count} CPUs, {memory_gb:.1f}GB RAM") setup_params = simplelpr.EngineSetupParms() # Optimize concurrent operations based on system if memory_gb >= 16: # High memory system setup_params.maxConcurrentImageProcessingOps = min(cpu_count, 8) elif memory_gb >= 8: # Medium memory system setup_params.maxConcurrentImageProcessingOps = min(cpu_count // 2, 4) else: # Low memory system setup_params.maxConcurrentImageProcessingOps = 2 print(f"Configured for {setup_params.maxConcurrentImageProcessingOps} concurrent operations") return setup_params def benchmark_processing_methods(image_path, country="Spain", iterations=10): """Benchmark different processing approaches""" # Setup engine, _ = safe_engine_creation(country=country) results = {} # Method 1: Single processor print("Benchmarking single processor...") processor = engine.createProcessor() processor.plateRegionDetectionEnabled = True start_time = time.time() for i in range(iterations): candidates = processor.analyze(image_path) single_time = time.time() - start_time results['single_processor'] = single_time / iterations # Method 2: Processor pool print("Benchmarking processor pool...") pool = engine.createProcessorPool(4) pool.plateRegionDetectionEnabled = True start_time = time.time() # Launch all analyses for i in range(iterations): pool.launchAnalyze(0, i, float(i), simplelpr.TIMEOUT_INFINITE, image_path) # Collect results for i in range(iterations): result = pool.pollNextResult(0, simplelpr.TIMEOUT_INFINITE) pool_time = time.time() - start_time results['processor_pool'] = pool_time / iterations # Results print(f"\nBenchmark Results ({iterations} iterations):") print(f"Single processor: {results['single_processor']:.3f}s per image") print(f"Processor pool: {results['processor_pool']:.3f}s per image") print(f"Speedup: {results['single_processor']/results['processor_pool']:.1f}x") return results def optimize_video_processing(video_path, country="Spain"): """Optimized video processing pipeline""" # Setup optimized engine setup_params = optimize_engine_setup() engine = simplelpr.SimpleLPR(setup_params) for i in range(engine.numSupportedCountries): engine.set_countryWeight(i, 0.0) engine.set_countryWeight(country, 1.0) engine.realizeCountryWeights() # Create optimized processor pool num_processors = setup_params.maxConcurrentImageProcessingOps pool = engine.createProcessorPool(num_processors) # Optimize processor settings for speed pool.plateRegionDetectionEnabled = True pool.cropToPlateRegionEnabled = False # Disable for speed # Note: contrastSensitivityFactor left at default - only adjust if needed for specific image conditions # Open video with resolution limits for performance video_source = engine.openVideoSource( video_path, simplelpr.FrameFormat.FRAME_FORMAT_BGR24, 1920, 1080 # Limit resolution ) print(f"Processing with {num_processors} processors") # Processing pipeline frame_queue = Queue(maxsize=num_processors * 2) result_queue = Queue() def frame_reader(): """Read frames in separate thread""" frame_count = 0 while True: frame = video_source.nextFrame() if frame is None: break frame_queue.put((frame_count, frame)) frame_count += 1 frame_queue.put(None) # End marker def result_collector(): """Collect results in separate thread""" results = [] while True: result = pool.pollNextResult(0, simplelpr.TIMEOUT_INFINITE) if result is None: break result_queue.put(result) result_queue.put(None) # End marker # Start threads reader_thread = threading.Thread(target=frame_reader) collector_thread = threading.Thread(target=result_collector) reader_thread.start() collector_thread.start() # Main processing loop active_requests = 0 processed_frames = 0 detections = [] start_time = time.time() while True: # Launch new analyses try: frame_data = frame_queue.get(timeout=1.0) if frame_data is None: break # End of frames frame_count, frame = frame_data success = pool.launchAnalyze( 0, frame_count, simplelpr.TIMEOUT_IMMEDIATE, frame ) if success: active_requests += 1 except: pass # Timeout, continue # Process results try: result = result_queue.get(timeout=0.1) if result is None: break # End of results active_requests -= 1 processed_frames += 1 if result.candidates: for candidate in result.candidates: if candidate.matches: match = candidate.matches[0] detections.append({ 'frame': result.requestId, 'text': match.text, 'confidence': match.confidence }) except: pass # No result available # Wait for completion reader_thread.join() # Process remaining results while active_requests > 0: result = result_queue.get() if result is None: break active_requests -= 1 processed_frames += 1 # Process result... collector_thread.join() elapsed = time.time() - start_time fps = processed_frames / elapsed print(f"Optimized processing complete:") print(f" Frames: {processed_frames}") print(f" Time: {elapsed:.1f}s") print(f" FPS: {fps:.1f}") print(f" Detections: {len(detections)}") return detections def memory_efficient_batch_processing(image_paths, country="Spain", batch_size=10): """Memory-efficient batch processing for large image sets""" engine, _ = safe_engine_creation(country=country) pool = engine.createProcessorPool(4) pool.plateRegionDetectionEnabled = True total_images = len(image_paths) all_results = {} print(f"Processing {total_images} images in batches of {batch_size}") for batch_start in range(0, total_images, batch_size): batch_end = min(batch_start + batch_size, total_images) batch_paths = image_paths[batch_start:batch_end] print(f"Processing batch {batch_start//batch_size + 1}: " f"images {batch_start+1}-{batch_end}") # Process batch launched = 0 for i, image_path in enumerate(batch_paths): global_id = batch_start + i success = pool.launchAnalyze( 0, global_id, float(global_id), simplelpr.TIMEOUT_INFINITE, str(image_path) ) if success: launched += 1 # Collect batch results for _ in range(launched): result = pool.pollNextResult(0, simplelpr.TIMEOUT_INFINITE) if result: image_path = image_paths[result.requestId] all_results[str(image_path)] = result.candidates # Memory cleanup hint import gc gc.collect() return all_results # Performance testing utilities def performance_test_suite(): """Run comprehensive performance tests""" print("SimpleLPR Performance Test Suite") print("=" * 40) # System info print(f"CPU cores: {psutil.cpu_count()}") print(f"Memory: {psutil.virtual_memory().total / (1024**3):.1f}GB") print(f"Available memory: {psutil.virtual_memory().available / (1024**3):.1f}GB") # Test different configurations test_image = "test_image.jpg" # Replace with actual test image if os.path.exists(test_image): # Benchmark processing methods benchmark_results = benchmark_processing_methods(test_image, "Spain", 5) # Test different processor counts engine, _ = safe_engine_creation(country="Spain") print("\nProcessor count optimization:") for proc_count in [1, 2, 4, 8]: try: pool = engine.createProcessorPool(proc_count) start_time = time.time() for i in range(10): pool.launchAnalyze(0, i, float(i), simplelpr.TIMEOUT_INFINITE, test_image) for i in range(10): pool.pollNextResult(0, simplelpr.TIMEOUT_INFINITE) elapsed = time.time() - start_time print(f" {proc_count} processors: {elapsed:.2f}s ({10/elapsed:.1f} images/sec)") except Exception as e: print(f" {proc_count} processors: Failed - {e}") else: print(f"Test image {test_image} not found - skipping benchmarks")