Python Automatisering: Van Simpele Scripts tot Complexe Workflows

In de moderne digitale wereld verspillen we dagelijks kostbare uren aan repetitieve taken. Email sorteren, bestanden hernoemen, data tussen systemen verplaatsen, rapporten genereren - allemaal taken die perfect geschikt zijn voor automatisering. Python biedt de perfecte toolset om deze tijdrovende processen te automatiseren en je productiviteit drastisch te verhogen.

Als oprichter van ImmenArchl en voormalig senior developer heb ik de afgelopen 15 jaar talloze automatiseringsprojecten gerealiseerd, van eenvoudige file management scripts tot complexe enterprise workflows. In dit artikel deel ik die ervaring en laat ik zien hoe je Python kunt gebruiken om je dagelijkse werk te transformeren.

Waarom Python voor Automatisering?

1. Simpliciteit en Leesbaarheid

Python's syntax is zo dicht bij natuurlijke taal dat zelfs niet-programmeurs vaak kunnen begrijpen wat een script doet. Dit maakt het ideaal voor automatisering in bedrijfsomgevingen waar scripts door verschillende mensen onderhouden worden.

2. Uitgebreide Standaard Library

Python komt out-of-the-box met libraries voor vrijwel alles:

3. Cross-platform Compatibiliteit

Python scripts draaien op Windows, macOS en Linux zonder aanpassingen, perfect voor heterogene bedrijfsomgevingen.

Level 1: Basis File Automatisering

File Organisatie Script

Een van de meest voorkomende automatiseringstaken: het organiseren van bestanden. Hier is een script dat downloads automatisch sorteert:


import os
import shutil
from pathlib import Path
from datetime import datetime

def organiseer_downloads():
    """
    Organiseert bestanden in de Downloads map op basis van extensie
    """
    downloads_path = Path.home() / "Downloads"
    
    # Definieer categorieën en bijbehorende extensies
    file_categories = {
        'Documenten': ['.pdf', '.docx', '.doc', '.txt', '.rtf', '.odt'],
        'Afbeeldingen': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', '.webp'],
        'Video': ['.mp4', '.avi', '.mov', '.wmv', '.flv', '.webm'],
        'Audio': ['.mp3', '.wav', '.flac', '.aac', '.ogg'],
        'Archief': ['.zip', '.rar', '.7z', '.tar', '.gz'],
        'Programmas': ['.exe', '.msi', '.dmg', '.pkg', '.deb', '.rpm'],
        'Spreadsheets': ['.xlsx', '.xls', '.csv', '.ods']
    }
    
    # Maak directories aan indien nodig
    for category in file_categories.keys():
        category_path = downloads_path / category
        category_path.mkdir(exist_ok=True)
    
    # Itereer door alle bestanden in Downloads
    for file_path in downloads_path.iterdir():
        if file_path.is_file():
            file_extension = file_path.suffix.lower()
            
            # Zoek de juiste categorie
            for category, extensions in file_categories.items():
                if file_extension in extensions:
                    destination = downloads_path / category / file_path.name
                    
                    # Voorkom overschrijven
                    if destination.exists():
                        timestamp = datetime.now().strftime("_%Y%m%d_%H%M%S")
                        name_without_ext = file_path.stem
                        new_name = f"{name_without_ext}{timestamp}{file_extension}"
                        destination = downloads_path / category / new_name
                    
                    # Verplaats bestand
                    shutil.move(str(file_path), str(destination))
                    print(f"Verplaatst: {file_path.name} → {category}")
                    break
            else:
                # Bestand past in geen categorie
                other_path = downloads_path / "Overig"
                other_path.mkdir(exist_ok=True)
                
                destination = other_path / file_path.name
                if destination.exists():
                    timestamp = datetime.now().strftime("_%Y%m%d_%H%M%S")
                    name_without_ext = file_path.stem
                    new_name = f"{name_without_ext}{timestamp}{file_extension}"
                    destination = other_path / new_name
                
                shutil.move(str(file_path), str(destination))
                print(f"Verplaatst: {file_path.name} → Overig")

if __name__ == "__main__":
    organiseer_downloads()
    print("Downloads georganiseerd!")
            

Batch File Hernoemen


import os
import re
from pathlib import Path

def hernoem_fotos_batch(directory, prefix="vakantie"):
    """
    Hernoemt alle foto's in een directory met een consistente naamgeving
    """
    photo_extensions = ['.jpg', '.jpeg', '.png', '.raw', '.tiff']
    directory_path = Path(directory)
    
    photo_files = []
    for ext in photo_extensions:
        photo_files.extend(directory_path.glob(f"*{ext}"))
        photo_files.extend(directory_path.glob(f"*{ext.upper()}"))
    
    # Sorteer op creation time
    photo_files.sort(key=lambda p: p.stat().st_ctime)
    
    for index, photo_path in enumerate(photo_files, 1):
        # Haal EXIF data op voor datum (optioneel)
        try:
            from PIL import Image
            from PIL.ExifTags import TAGS
            
            with Image.open(photo_path) as img:
                exif_data = img._getexif()
                if exif_data:
                    for tag_id, value in exif_data.items():
                        tag = TAGS.get(tag_id, tag_id)
                        if tag == "DateTime":
                            # Parse datum: 2024:05:15 14:30:25
                            date_taken = value.replace(":", "").replace(" ", "_")
                            new_name = f"{prefix}_{date_taken}_{index:03d}{photo_path.suffix}"
                            break
                    else:
                        new_name = f"{prefix}_{index:03d}{photo_path.suffix}"
                else:
                    new_name = f"{prefix}_{index:03d}{photo_path.suffix}"
        except ImportError:
            # PIL niet beschikbaar, gebruik gewone nummering
            new_name = f"{prefix}_{index:03d}{photo_path.suffix}"
        except Exception:
            # Fallback voor EXIF problemen
            new_name = f"{prefix}_{index:03d}{photo_path.suffix}"
        
        new_path = directory_path / new_name
        
        # Voorkom overschrijven
        counter = 1
        while new_path.exists():
            name_without_ext = new_name.rsplit('.', 1)[0]
            extension = new_name.rsplit('.', 1)[1]
            new_name = f"{name_without_ext}_({counter}).{extension}"
            new_path = directory_path / new_name
            counter += 1
        
        photo_path.rename(new_path)
        print(f"Hernoemd: {photo_path.name} → {new_name}")

# Gebruik
hernoem_fotos_batch("/path/to/photos", "zomer2024")
            

Level 2: Email Automatisering

Automatische Email Rapporten


import smtplib
import ssl
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from datetime import datetime, timedelta
import pandas as pd
import matplotlib.pyplot as plt
import os

class EmailAutomation:
    def __init__(self, smtp_server, port, username, password):
        self.smtp_server = smtp_server
        self.port = port
        self.username = username
        self.password = password
    
    def stuur_rapport_email(self, ontvangers, onderwerp, body, bijlagen=None):
        """
        Stuurt een email met optionele bijlagen
        """
        msg = MIMEMultipart()
        msg['From'] = self.username
        msg['To'] = ', '.join(ontvangers)
        msg['Subject'] = onderwerp
        
        # HTML body
        html_body = f"""
        
        
        
            

Automatisch Gegenereerd Rapport

Gegenereerd op: {datetime.now().strftime('%d-%m-%Y %H:%M')}


{body}

Dit is een automatisch gegenereerd bericht van het ImmenArchl systeem.

""" msg.attach(MIMEText(html_body, 'html')) # Voeg bijlagen toe if bijlagen: for bijlage_path in bijlagen: if os.path.exists(bijlage_path): with open(bijlage_path, 'rb') as bijlage: deel = MIMEApplication(bijlage.read(), Name=os.path.basename(bijlage_path)) deel['Content-Disposition'] = f'attachment; filename="{os.path.basename(bijlage_path)}"' msg.attach(deel) # Verstuur email context = ssl.create_default_context() with smtplib.SMTP(self.smtp_server, self.port) as server: server.starttls(context=context) server.login(self.username, self.password) server.send_message(msg) print(f"Email verzonden naar {', '.join(ontvangers)}") def genereer_verkoop_rapport(self, data_file): """ Genereert een verkoop rapport en stuurt het per email """ # Lees verkoop data df = pd.read_csv(data_file) # Basis statistieken totale_verkoop = df['bedrag'].sum() gemiddelde_verkoop = df['bedrag'].mean() aantal_transacties = len(df) # Verkoop per dag (laatste 7 dagen) df['datum'] = pd.to_datetime(df['datum']) laatste_week = df[df['datum'] >= (datetime.now() - timedelta(days=7))] dagelijkse_verkoop = laatste_week.groupby(laatste_week['datum'].dt.date)['bedrag'].sum() # Maak grafiek plt.figure(figsize=(10, 6)) dagelijkse_verkoop.plot(kind='bar', color='#3776ab') plt.title('Verkoop Laatste 7 Dagen') plt.xlabel('Datum') plt.ylabel('Bedrag (€)') plt.xticks(rotation=45) plt.tight_layout() grafiek_bestand = 'verkoop_grafiek.png' plt.savefig(grafiek_bestand) plt.close() # HTML rapport body rapport_body = f"""

Verkoop Overzicht

Metric Waarde
Totale Verkoop €{totale_verkoop:,.2f}
Gemiddelde Verkoop €{gemiddelde_verkoop:,.2f}
Aantal Transacties {aantal_transacties}

Top 5 Producten

    """ top_producten = df.groupby('product')['bedrag'].sum().sort_values(ascending=False).head(5) for product, bedrag in top_producten.items(): rapport_body += f"
  • {product}: €{bedrag:,.2f}
  • " rapport_body += "
" # Verstuur email ontvangers = ['[email protected]', '[email protected]'] onderwerp = f"Verkoop Rapport - {datetime.now().strftime('%d/%m/%Y')}" self.stuur_rapport_email( ontvangers=ontvangers, onderwerp=onderwerp, body=rapport_body, bijlagen=[grafiek_bestand] ) # Clean up os.remove(grafiek_bestand) # Gebruik email_automation = EmailAutomation( smtp_server="smtp.gmail.com", port=587, username="[email protected]", password="jouw-app-password" # Gebruik app-specific password voor Gmail ) # Genereer en verstuur rapport email_automation.genereer_verkoop_rapport('verkoop_data.csv')

Level 3: Web Scraping en API Automatisering

Automatische Marktprijzen Monitoring


import requests
from bs4 import BeautifulSoup
import json
import time
import logging
from datetime import datetime
import sqlite3

class PrijsMonitor:
    def __init__(self, database_path="prijzen.db"):
        self.database_path = database_path
        self.setup_database()
        self.setup_logging()
    
    def setup_database(self):
        """Maak database tabellen aan"""
        conn = sqlite3.connect(self.database_path)
        cursor = conn.cursor()
        
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS prijzen (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            product_naam TEXT NOT NULL,
            website TEXT NOT NULL,
            prijs REAL NOT NULL,
            datum TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            url TEXT
        )
        """)
        
        conn.commit()
        conn.close()
    
    def setup_logging(self):
        """Setup logging configuratie"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('prijs_monitor.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def scrape_bol_prijs(self, product_url):
        """
        Scrape prijs van Bol.com (voorbeeld - respecteer robots.txt!)
        """
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        
        try:
            response = requests.get(product_url, headers=headers)
            response.raise_for_status()
            
            soup = BeautifulSoup(response.content, 'html.parser')
            
            # Dit is een voorbeeld - werkelijke selectors kunnen verschillen
            prijs_element = soup.find('span', class_='promo-price')
            if not prijs_element:
                prijs_element = soup.find('span', class_='price-now')
            
            if prijs_element:
                prijs_text = prijs_element.get_text().strip()
                # Extract numerieke waarde: "€ 29,99" -> 29.99
                import re
                prijs_match = re.search(r'(\d+,?\d*)', prijs_text.replace(',', '.'))
                if prijs_match:
                    return float(prijs_match.group(1))
            
            return None
            
        except Exception as e:
            self.logger.error(f"Fout bij scrapen van {product_url}: {e}")
            return None
    
    def check_crypto_prijs(self, crypto_symbol='BTC'):
        """
        Haal cryptocurrency prijs op via CoinGecko API
        """
        try:
            url = f"https://api.coingecko.com/api/v3/simple/price?ids={crypto_symbol.lower()}&vs_currencies=eur"
            response = requests.get(url)
            response.raise_for_status()
            
            data = response.json()
            crypto_id = crypto_symbol.lower()
            if crypto_id == 'btc':
                crypto_id = 'bitcoin'
            elif crypto_id == 'eth':
                crypto_id = 'ethereum'
            
            return data.get(crypto_id, {}).get('eur')
            
        except Exception as e:
            self.logger.error(f"Fout bij ophalen crypto prijs: {e}")
            return None
    
    def check_stock_prijs(self, symbol='ASML.AS'):
        """
        Haal aandelenprijs op via Alpha Vantage API
        Let op: vereist API key!
        """
        api_key = os.getenv('ALPHA_VANTAGE_API_KEY', 'demo')
        
        try:
            url = f"https://www.alphavantage.co/query"
            params = {
                'function': 'GLOBAL_QUOTE',
                'symbol': symbol,
                'apikey': api_key
            }
            
            response = requests.get(url, params=params)
            response.raise_for_status()
            
            data = response.json()
            quote = data.get('Global Quote', {})
            prijs = quote.get('05. price')
            
            return float(prijs) if prijs else None
            
        except Exception as e:
            self.logger.error(f"Fout bij ophalen aandelenprijs: {e}")
            return None
    
    def sla_prijs_op(self, product_naam, website, prijs, url=None):
        """Sla prijs op in database"""
        conn = sqlite3.connect(self.database_path)
        cursor = conn.cursor()
        
        cursor.execute("""
        INSERT INTO prijzen (product_naam, website, prijs, url)
        VALUES (?, ?, ?, ?)
        """, (product_naam, website, prijs, url))
        
        conn.commit()
        conn.close()
        
        self.logger.info(f"Prijs opgeslagen: {product_naam} - €{prijs} ({website})")
    
    def check_prijs_alerts(self, product_naam, target_prijs, operator='<='):
        """
        Check of een product een bepaalde prijs heeft bereikt
        """
        conn = sqlite3.connect(self.database_path)
        cursor = conn.cursor()
        
        cursor.execute("""
        SELECT prijs, datum FROM prijzen 
        WHERE product_naam = ? 
        ORDER BY datum DESC LIMIT 1
        """, (product_naam,))
        
        result = cursor.fetchone()
        conn.close()
        
        if result:
            huidige_prijs, datum = result
            
            if operator == '<=' and huidige_prijs <= target_prijs:
                return True, huidige_prijs
            elif operator == '>=' and huidige_prijs >= target_prijs:
                return True, huidige_prijs
        
        return False, None
    
    def run_monitoring_cycle(self):
        """
        Voer een complete monitoring cyclus uit
        """
        self.logger.info("Start prijs monitoring cyclus")
        
        # Monitor verschillende producten
        monitoring_list = [
            {
                'naam': 'Bitcoin',
                'type': 'crypto',
                'symbol': 'bitcoin',
                'alert_prijs': 50000,
                'operator': '<='
            },
            {
                'naam': 'ASML Aandeel',
                'type': 'stock',
                'symbol': 'ASML.AS',
                'alert_prijs': 600,
                'operator': '<='
            }
        ]
        
        for item in monitoring_list:
            try:
                if item['type'] == 'crypto':
                    prijs = self.check_crypto_prijs(item['symbol'])
                    website = 'CoinGecko'
                elif item['type'] == 'stock':
                    prijs = self.check_stock_prijs(item['symbol'])
                    website = 'Alpha Vantage'
                else:
                    continue
                
                if prijs:
                    self.sla_prijs_op(item['naam'], website, prijs)
                    
                    # Check alerts
                    alert_triggered, current_price = self.check_prijs_alerts(
                        item['naam'], 
                        item['alert_prijs'], 
                        item['operator']
                    )
                    
                    if alert_triggered:
                        self.verstuur_prijs_alert(item['naam'], current_price, item['alert_prijs'])
                
                # Wacht tussen requests
                time.sleep(1)
                
            except Exception as e:
                self.logger.error(f"Fout bij monitoren van {item['naam']}: {e}")
        
        self.logger.info("Prijs monitoring cyclus voltooid")
    
    def verstuur_prijs_alert(self, product_naam, huidige_prijs, target_prijs):
        """
        Verstuur email alert voor prijs
        """
        # Gebruik de EmailAutomation class van eerder
        onderwerp = f"🚨 Prijs Alert: {product_naam}"
        body = f"""
        

Prijs Alert Triggered!

Product: {product_naam}

Huidige Prijs: €{huidige_prijs:,.2f}

Target Prijs: €{target_prijs:,.2f}

Tijd: {datetime.now().strftime('%d-%m-%Y %H:%M:%S')}

""" # Hier zou je de email versturen... self.logger.info(f"ALERT: {product_naam} prijs is €{huidige_prijs:.2f}") # Gebruik monitor = PrijsMonitor() monitor.run_monitoring_cycle()

Level 4: Geavanceerde Workflow Automatisering

Enterprise Document Processing Pipeline


import os
import shutil
import pandas as pd
from pathlib import Path
from datetime import datetime
import logging
import json
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class DocumentProcessor:
    def __init__(self, config_file="document_config.json"):
        self.load_config(config_file)
        self.setup_logging()
        self.setup_directories()
    
    def load_config(self, config_file):
        """Laad configuratie uit JSON bestand"""
        with open(config_file, 'r') as f:
            self.config = json.load(f)
    
    def setup_logging(self):
        """Setup logging"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('document_processor.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def setup_directories(self):
        """Maak benodigde directories aan"""
        for directory in self.config['directories'].values():
            Path(directory).mkdir(parents=True, exist_ok=True)
    
    def extract_text_from_pdf(self, pdf_path):
        """Extract tekst uit PDF bestand"""
        try:
            import PyPDF2
            
            with open(pdf_path, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                text = ""
                for page in pdf_reader.pages:
                    text += page.extract_text()
                return text
        except ImportError:
            self.logger.warning("PyPDF2 niet geïnstalleerd. PDF tekst extractie overgeslagen.")
            return ""
        except Exception as e:
            self.logger.error(f"Fout bij PDF tekst extractie: {e}")
            return ""
    
    def classify_document(self, file_path):
        """
        Classificeer document op basis van inhoud en naam
        """
        file_name = file_path.name.lower()
        
        # Classificatie op basis van bestandsnaam
        for doc_type, keywords in self.config['classification_rules'].items():
            if any(keyword in file_name for keyword in keywords):
                return doc_type
        
        # Classificatie op basis van inhoud (voor PDF's)
        if file_path.suffix.lower() == '.pdf':
            content = self.extract_text_from_pdf(file_path).lower()
            for doc_type, keywords in self.config['content_keywords'].items():
                if any(keyword in content for keyword in keywords):
                    return doc_type
        
        return 'onbekend'
    
    def process_invoice(self, file_path):
        """
        Specifieke processing voor facturen
        """
        # Extract factuur informatie
        if file_path.suffix.lower() == '.pdf':
            content = self.extract_text_from_pdf(file_path)
            
            # Zoek naar factuur nummer
            import re
            factuur_nummer = re.search(r'factuur[:\s]*(\d+)', content, re.IGNORECASE)
            if factuur_nummer:
                factuur_nr = factuur_nummer.group(1)
                
                # Hernoem bestand met factuur nummer
                new_name = f"factuur_{factuur_nr}_{datetime.now().strftime('%Y%m%d')}{file_path.suffix}"
                new_path = file_path.parent / new_name
                
                if not new_path.exists():
                    file_path.rename(new_path)
                    self.logger.info(f"Factuur hernoemd: {file_path.name} → {new_name}")
                    return new_path
        
        return file_path
    
    def process_contract(self, file_path):
        """
        Specifieke processing voor contracten
        """
        # Voor dit voorbeeld: verplaats naar contracts directory en log
        contracts_dir = Path(self.config['directories']['contracts'])
        destination = contracts_dir / file_path.name
        
        # Voorkom overschrijven
        counter = 1
        while destination.exists():
            name_without_ext = file_path.stem
            extension = file_path.suffix
            new_name = f"{name_without_ext}_({counter}){extension}"
            destination = contracts_dir / new_name
            counter += 1
        
        shutil.move(str(file_path), str(destination))
        self.logger.info(f"Contract verplaatst naar: {destination}")
        
        # Log contract informatie
        self.log_document_info('contract', destination)
        
        return destination
    
    def log_document_info(self, doc_type, file_path):
        """
        Log document informatie naar CSV
        """
        log_file = Path(self.config['directories']['logs']) / 'document_log.csv'
        
        # Maak CSV header aan als bestand niet bestaat
        if not log_file.exists():
            df = pd.DataFrame(columns=['timestamp', 'document_type', 'filename', 'file_path', 'file_size'])
            df.to_csv(log_file, index=False)
        
        # Voeg nieuwe record toe
        new_record = {
            'timestamp': datetime.now().isoformat(),
            'document_type': doc_type,
            'filename': file_path.name,
            'file_path': str(file_path),
            'file_size': file_path.stat().st_size
        }
        
        df = pd.read_csv(log_file)
        df = pd.concat([df, pd.DataFrame([new_record])], ignore_index=True)
        df.to_csv(log_file, index=False)
    
    def process_document(self, file_path):
        """
        Hoofdfunctie voor document processing
        """
        if not file_path.exists() or file_path.is_dir():
            return
        
        self.logger.info(f"Processing document: {file_path.name}")
        
        # Classificeer document
        doc_type = self.classify_document(file_path)
        self.logger.info(f"Document geclassificeerd als: {doc_type}")
        
        # Type-specifieke processing
        if doc_type == 'factuur':
            processed_path = self.process_invoice(file_path)
        elif doc_type == 'contract':
            processed_path = self.process_contract(file_path)
        else:
            # Algemene processing: verplaats naar onbekend
            unknown_dir = Path(self.config['directories']['unknown'])
            destination = unknown_dir / file_path.name
            
            counter = 1
            while destination.exists():
                name_without_ext = file_path.stem
                extension = file_path.suffix
                new_name = f"{name_without_ext}_({counter}){extension}"
                destination = unknown_dir / new_name
                counter += 1
            
            shutil.move(str(file_path), str(destination))
            processed_path = destination
        
        # Log processing
        self.log_document_info(doc_type, processed_path)
        
        # Verstuur notificatie (optioneel)
        self.send_processing_notification(doc_type, processed_path)
    
    def send_processing_notification(self, doc_type, file_path):
        """
        Verstuur notificatie over document processing
        """
        if self.config.get('notifications', {}).get('enabled', False):
            # Hier zou je een email of Slack notificatie kunnen versturen
            self.logger.info(f"Notificatie: {doc_type} document processed: {file_path.name}")

class DocumentWatcher(FileSystemEventHandler):
    """
    Watchdog handler voor realtime document monitoring
    """
    def __init__(self, processor):
        self.processor = processor
    
    def on_created(self, event):
        if not event.is_directory:
            # Wacht even voor file write completion
            import time
            time.sleep(2)
            
            file_path = Path(event.src_path)
            self.processor.process_document(file_path)

def start_document_monitoring():
    """
    Start de document monitoring service
    """
    # Laad configuratie
    config = {
        "directories": {
            "input": "./documents/input",
            "facturen": "./documents/facturen",
            "contracts": "./documents/contracts",
            "unknown": "./documents/unknown",
            "logs": "./documents/logs"
        },
        "classification_rules": {
            "factuur": ["factuur", "invoice", "rekening"],
            "contract": ["contract", "overeenkomst", "agreement"],
            "rapport": ["rapport", "report", "analyse"]
        },
        "content_keywords": {
            "factuur": ["btw", "totaal", "factuurnummer"],
            "contract": ["partijen", "voorwaarden", "geldig tot"]
        },
        "notifications": {
            "enabled": True,
            "email": "[email protected]"
        }
    }
    
    # Sla config op
    with open('document_config.json', 'w') as f:
        json.dump(config, f, indent=2)
    
    # Start processor
    processor = DocumentProcessor()
    
    # Setup file watcher
    event_handler = DocumentWatcher(processor)
    observer = Observer()
    observer.schedule(event_handler, config['directories']['input'], recursive=False)
    
    # Start monitoring
    observer.start()
    print(f"Document monitoring gestart voor: {config['directories']['input']}")
    print("Druk Ctrl+C om te stoppen...")
    
    try:
        import time
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    
    observer.join()

if __name__ == "__main__":
    start_document_monitoring()
            

Scheduling en Deployment

Automatische Taak Scheduling


import schedule
import time
import threading
from datetime import datetime

class TaskScheduler:
    def __init__(self):
        self.running = False
        self.scheduler_thread = None
    
    def backup_databases(self):
        """Dagelijkse database backup"""
        backup_script = """
        import shutil
        from datetime import datetime
        
        # Backup SQLite databases
        databases = ['app.db', 'prijzen.db', 'documents.db']
        backup_dir = f"backups/{datetime.now().strftime('%Y%m%d')}"
        
        os.makedirs(backup_dir, exist_ok=True)
        
        for db in databases:
            if os.path.exists(db):
                shutil.copy2(db, f"{backup_dir}/{db}")
                print(f"Database backup: {db}")
        """
        exec(backup_script)
    
    def cleanup_old_files(self):
        """Wekelijkse cleanup van oude bestanden"""
        from datetime import datetime, timedelta
        
        cleanup_dirs = ['./logs', './temp', './downloads/Overig']
        cutoff_date = datetime.now() - timedelta(days=30)
        
        for directory in cleanup_dirs:
            if os.path.exists(directory):
                for file_path in Path(directory).glob('*'):
                    if file_path.is_file():
                        file_time = datetime.fromtimestamp(file_path.stat().st_mtime)
                        if file_time < cutoff_date:
                            file_path.unlink()
                            print(f"Verwijderd oud bestand: {file_path}")
    
    def monthly_report(self):
        """Maandelijks rapport genereren"""
        print(f"Genereren maandelijks rapport voor {datetime.now().strftime('%B %Y')}")
        # Hier zou je een uitgebreid rapport kunnen genereren
    
    def setup_schedule(self):
        """Configureer alle geplande taken"""
        # Dagelijkse taken
        schedule.every().day.at("02:00").do(self.backup_databases)
        schedule.every().day.at("08:00").do(organiseer_downloads)
        
        # Uurlijkse taken
        schedule.every().hour.do(lambda: PrijsMonitor().run_monitoring_cycle())
        
        # Wekelijkse taken
        schedule.every().monday.at("03:00").do(self.cleanup_old_files)
        
        # Maandelijkse taken
        schedule.every().month.do(self.monthly_report)
        
        print("Task scheduler geconfigureerd:")
        for job in schedule.jobs:
            print(f"  - {job}")
    
    def run_scheduler(self):
        """Run de scheduler in een separate thread"""
        while self.running:
            schedule.run_pending()
            time.sleep(60)  # Check elke minuut
    
    def start(self):
        """Start de scheduler"""
        if not self.running:
            self.running = True
            self.setup_schedule()
            self.scheduler_thread = threading.Thread(target=self.run_scheduler)
            self.scheduler_thread.daemon = True
            self.scheduler_thread.start()
            print("Task scheduler gestart")
    
    def stop(self):
        """Stop de scheduler"""
        self.running = False
        if self.scheduler_thread:
            self.scheduler_thread.join()
        print("Task scheduler gestopt")

# Gebruik
if __name__ == "__main__":
    scheduler = TaskScheduler()
    scheduler.start()
    
    try:
        # Houd main thread actief
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        scheduler.stop()
            

Monitoring en Error Handling

Robuuste Error Handling en Logging


import logging
import traceback
import smtplib
from functools import wraps
from datetime import datetime
import json

class AutomationMonitor:
    def __init__(self, log_file="automation.log"):
        self.setup_logging(log_file)
        self.error_count = 0
        self.max_errors = 5
    
    def setup_logging(self, log_file):
        """Setup uitgebreide logging"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_file),
                logging.StreamHandler(),
                logging.handlers.RotatingFileHandler(
                    log_file, maxBytes=10*1024*1024, backupCount=5
                )
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def log_automation_event(self, event_type, details, success=True):
        """Log automation events"""
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'event_type': event_type,
            'success': success,
            'details': details
        }
        
        if success:
            self.logger.info(f"SUCCESS: {event_type} - {details}")
        else:
            self.logger.error(f"FAILED: {event_type} - {details}")
            self.error_count += 1
            
            if self.error_count >= self.max_errors:
                self.send_alert(f"Maximum errors ({self.max_errors}) bereikt!")
    
    def send_alert(self, message):
        """Verstuur alert bij kritieke fouten"""
        alert_msg = f"""
        🚨 AUTOMATION ALERT 🚨
        
        Tijd: {datetime.now().strftime('%d-%m-%Y %H:%M:%S')}
        Bericht: {message}
        
        Controleer de logs voor meer details.
        """
        
        # Log de alert
        self.logger.critical(f"ALERT SENT: {message}")
        
        # Hier zou je een email of Slack bericht kunnen versturen
        print(f"ALERT: {alert_msg}")

def retry_on_failure(max_retries=3, delay=1):
    """Decorator voor automatische retry bij failures"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise e
                    
                    print(f"Poging {attempt + 1} gefaald: {e}")
                    time.sleep(delay * (attempt + 1))
            
        return wrapper
    return decorator

def monitor_automation(event_type):
    """Decorator voor automation monitoring"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            monitor = AutomationMonitor()
            
            try:
                result = func(*args, **kwargs)
                monitor.log_automation_event(event_type, f"Functie {func.__name__} succesvol uitgevoerd")
                return result
                
            except Exception as e:
                error_details = f"Functie {func.__name__} gefaald: {str(e)}"
                monitor.log_automation_event(event_type, error_details, success=False)
                
                # Log volledige traceback
                monitor.logger.error(f"Traceback:\n{traceback.format_exc()}")
                
                raise e
                
        return wrapper
    return decorator

# Voorbeelden van gebruik
@monitor_automation("file_processing")
@retry_on_failure(max_retries=3)
def process_file_with_monitoring(file_path):
    """Voorbeeld functie met monitoring en retry"""
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Bestand niet gevonden: {file_path}")
    
    # Simuleer file processing
    print(f"Processing {file_path}")
    
    # Simuleer mogelijke fout
    import random
    if random.random() < 0.3:  # 30% kans op fout
        raise Exception("Simulatie van processing fout")
    
    return "Success"
            

Conclusie en Best Practices

Python automatisering kan je werk leven drastisch verbeteren, maar het is belangrijk om het op de juiste manier aan te pakken. Hier zijn de key takeaways uit dit artikel:

Best Practices voor Python Automatisering

  1. Start Klein: Begin met eenvoudige, herhalende taken
  2. Error Handling: Implementeer altijd robuuste error handling
  3. Logging: Log alles - success en failures
  4. Monitoring: Monitor je scripts en stel alerts in
  5. Documentation: Documenteer wat je scripts doen
  6. Testing: Test je scripts grondig voordat je ze deployed
  7. Security: Bewaar gevoelige informatie veilig
  8. Backup: Maak altijd backups voordat je data manipuleert

ROI van Automatisering

Laten we de impact berekenen. Als je dagelijks 2 uur bespaart door automatisering:

Nederlandse Automatisering Cases

In Nederland zie ik deze trends in automatisering:

Volgende Stappen

Nu je de kracht van Python automatisering hebt gezien, wat zijn je volgende stappen?

  1. Identificeer: Welke repetitieve taken doe je dagelijks?
  2. Prioriteer: Welke taken kosten de meeste tijd?
  3. Prototype: Bouw een simpele versie van je automatisering
  4. Test: Test grondig met een subset van je data
  5. Deploy: Implementeer geleidelijk in productie
  6. Monitor: Houd de performance en errors bij
  7. Itereer: Verbeter en breid uit
Ready om je productiviteit te transformeren? Bij ImmenArchl leren onze studenten niet alleen Python syntax, maar ook hoe ze echte automatiseringsoplossingen bouwen die directe waarde toevoegen aan hun werk en organisatie.

Expert Tip voor Automatisering

"De beste automatisering is die je vergeet dat bestaat. Start met de meest pijnlijke, repetitieve taak die je dagelijks doet. Bouw iets simpels dat werkt, dan pas iets complex dat perfect is. En vergeet niet: elke minuut die je besteedt aan het bouwen van automatisering, levert uren terug in de toekomst."

- Dr. Pieter van Houten, Senior Developer & Automation Expert

Gerelateerde Artikelen