Su Pompası Kontrol Kodları
Bu bölümde, akıllı sulama sistemimizde su pompasını kontrol etmek için gerekli Python kodlarını bulacaksınız. Röle modülü aracılığıyla su pompasını açıp kapatma, zamanlayıcı ile otomatik sulama ve manuel kontrol işlemleri için kod örnekleri verilmiştir.1. Temel Su Pompası Kontrolü
Copy
# pump_controller.py
import RPi.GPIO as GPIO
import time
import logging
# Loglama ayarları
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("pump_controller.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("PumpController")
class PumpController:
def __init__(self, relay_pin=17, active_low=True):
"""
Su pompası kontrol sınıfı
Args:
relay_pin: Röle kontrol pini (BCM numaralandırması)
active_low: Rölenin aktif düşük mantıkla çalışıp çalışmadığı
True: LOW sinyali röleyi aktifleştirir
False: HIGH sinyali röleyi aktifleştirir
"""
self.relay_pin = relay_pin
self.active_low = active_low
# Röle durumu
self.is_running = False
self.start_time = None
self.total_runtime = 0 # Toplam çalışma süresi (saniye)
self.daily_runtime = 0 # Günlük çalışma süresi (saniye)
self.last_run_date = None
# GPIO ayarları
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.relay_pin, GPIO.OUT)
# Başlangıçta pompayı kapat
self.stop()
logger.info(f"Pompa kontrol sistemi başlatıldı (Pin: {relay_pin}, Aktif Düşük: {active_low})")
def start(self, duration=None):
"""
Pompayı çalıştır
Args:
duration: Çalışma süresi (saniye), None ise manuel durdurulana kadar çalışır
Returns:
bool: İşlem başarılı mı
"""
if self.is_running:
logger.warning("Pompa zaten çalışıyor!")
return False
try:
# Röleyi aktifleştir
GPIO.output(self.relay_pin, GPIO.LOW if self.active_low else GPIO.HIGH)
self.is_running = True
self.start_time = time.time()
# Günlük çalışma süresini kontrol et ve gerekirse sıfırla
current_date = time.strftime("%Y-%m-%d")
if self.last_run_date != current_date:
self.daily_runtime = 0
self.last_run_date = current_date
logger.info(f"Pompa çalıştırıldı" + (f" ({duration} saniye)" if duration else ""))
# Belirli bir süre çalıştırma
if duration:
time.sleep(duration)
self.stop()
return True
except Exception as e:
logger.error(f"Pompa çalıştırma hatası: {str(e)}")
return False
def stop(self):
"""
Pompayı durdur
Returns:
bool: İşlem başarılı mı
"""
try:
# Röleyi deaktifleştir
GPIO.output(self.relay_pin, GPIO.HIGH if self.active_low else GPIO.LOW)
# Çalışma süresini güncelle
if self.is_running and self.start_time:
runtime = time.time() - self.start_time
self.total_runtime += runtime
self.daily_runtime += runtime
logger.info(f"Pompa durduruldu (Çalışma süresi: {runtime:.2f} saniye)")
else:
logger.info("Pompa durduruldu")
self.is_running = False
self.start_time = None
return True
except Exception as e:
logger.error(f"Pompa durdurma hatası: {str(e)}")
return False
def get_status(self):
"""
Pompa durumunu döndür
Returns:
dict: Pompa durum bilgileri
"""
current_runtime = 0
if self.is_running and self.start_time:
current_runtime = time.time() - self.start_time
return {
'is_running': self.is_running,
'current_runtime': current_runtime,
'total_runtime': self.total_runtime + current_runtime,
'daily_runtime': self.daily_runtime + current_runtime,
'last_run_date': self.last_run_date
}
def cleanup(self):
"""
GPIO pinlerini temizle
"""
self.stop()
GPIO.cleanup(self.relay_pin)
logger.info("Pompa kontrol sistemi kapatıldı")
# Örnek kullanım
if __name__ == "__main__":
pump = PumpController(relay_pin=17)
try:
# Pompayı 5 saniye çalıştır
print("Pompa 5 saniye çalıştırılıyor...")
pump.start(duration=5)
# Durum bilgisini göster
status = pump.get_status()
print(f"Pompa Durumu: {'Çalışıyor' if status['is_running'] else 'Durdu'}")
print(f"Toplam Çalışma Süresi: {status['total_runtime']:.2f} saniye")
# Biraz bekle
time.sleep(2)
# Pompayı manuel olarak çalıştır
print("\nPompa manuel olarak çalıştırılıyor...")
pump.start()
# 3 saniye bekle
time.sleep(3)
# Pompayı manuel olarak durdur
print("Pompa manuel olarak durduruluyor...")
pump.stop()
# Son durum bilgisini göster
status = pump.get_status()
print(f"Pompa Durumu: {'Çalışıyor' if status['is_running'] else 'Durdu'}")
print(f"Toplam Çalışma Süresi: {status['total_runtime']:.2f} saniye")
print(f"Günlük Çalışma Süresi: {status['daily_runtime']:.2f} saniye")
except KeyboardInterrupt:
print("\nProgram sonlandırıldı")
finally:
pump.cleanup()
2. Zamanlayıcı ile Otomatik Sulama
Copy
# irrigation_scheduler.py
import time
import threading
import schedule
import json
import os
from datetime import datetime
from pump_controller import PumpController
import logging
# Loglama ayarları
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("irrigation_scheduler.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("IrrigationScheduler")
class IrrigationScheduler:
def __init__(self, pump_controller, config_file="irrigation_schedule.json"):
"""
Sulama zamanlayıcı sınıfı
Args:
pump_controller: PumpController nesnesi
config_file: Zamanlama yapılandırma dosyası
"""
self.pump = pump_controller
self.config_file = config_file
self.schedule_config = self.load_config()
self.running = False
self.scheduler_thread = None
logger.info("Sulama zamanlayıcı başlatıldı")
def load_config(self):
"""
Zamanlama yapılandırmasını yükle
Returns:
dict: Zamanlama yapılandırması
"""
if os.path.exists(self.config_file):
try:
with open(self.config_file, 'r') as f:
config = json.load(f)
logger.info(f"Zamanlama yapılandırması yüklendi: {self.config_file}")
return config
except Exception as e:
logger.error(f"Yapılandırma dosyası okuma hatası: {str(e)}")
return self.get_default_config()
else:
logger.warning(f"Yapılandırma dosyası bulunamadı, varsayılan yapılandırma kullanılıyor")
return self.get_default_config()
def save_config(self):
"""
Zamanlama yapılandırmasını kaydet
"""
try:
with open(self.config_file, 'w') as f:
json.dump(self.schedule_config, f, indent=4)
logger.info(f"Zamanlama yapılandırması kaydedildi: {self.config_file}")
except Exception as e:
logger.error(f"Yapılandırma dosyası yazma hatası: {str(e)}")
def get_default_config(self):
"""
Varsayılan zamanlama yapılandırmasını döndür
Returns:
dict: Varsayılan zamanlama yapılandırması
"""
return {
"enabled": True,
"schedules": [
{
"name": "Sabah Sulaması",
"time": "06:30",
"days": ["monday", "wednesday", "friday"],
"duration": 300, # 5 dakika
"enabled": True
},
{
"name": "Akşam Sulaması",
"time": "18:30",
"days": ["monday", "wednesday", "friday"],
"duration": 300, # 5 dakika
"enabled": True
}
],
"max_daily_runtime": 1800, # 30 dakika
"skip_if_raining": True
}
def add_schedule(self, name, time_str, days, duration, enabled=True):
"""
Yeni bir sulama zamanı ekle
Args:
name: Zamanlama adı
time_str: Saat (HH:MM formatında)
days: Günler listesi (monday, tuesday, ...)
duration: Sulama süresi (saniye)
enabled: Etkin mi
Returns:
bool: İşlem başarılı mı
"""
try:
# Saat formatını kontrol et
datetime.strptime(time_str, "%H:%M")
# Yeni zamanlamayı ekle
new_schedule = {
"name": name,
"time": time_str,
"days": days,
"duration": duration,
"enabled": enabled
}
self.schedule_config["schedules"].append(new_schedule)
self.save_config()
# Zamanlamayı yeniden yükle
if self.running:
self.stop()
self.start()
logger.info(f"Yeni sulama zamanı eklendi: {name} - {time_str}")
return True
except Exception as e:
logger.error(f"Sulama zamanı ekleme hatası: {str(e)}")
return False
def remove_schedule(self, index):
"""
Sulama zamanını kaldır
Args:
index: Kaldırılacak zamanlamanın indeksi
Returns:
bool: İşlem başarılı mı
"""
try:
if 0 <= index < len(self.schedule_config["schedules"]):
removed = self.schedule_config["schedules"].pop(index)
self.save_config()
# Zamanlamayı yeniden yükle
if self.running:
self.stop()
self.start()
logger.info(f"Sulama zamanı kaldırıldı: {removed['name']}")
return True
else:
logger.warning(f"Geçersiz zamanlama indeksi: {index}")
return False
except Exception as e:
logger.error(f"Sulama zamanı kaldırma hatası: {str(e)}")
return False
def update_schedule(self, index, name=None, time_str=None, days=None, duration=None, enabled=None):
"""
Sulama zamanını güncelle
Args:
index: Güncellenecek zamanlamanın indeksi
name, time_str, days, duration, enabled: Güncellenecek değerler (None ise değiştirilmez)
Returns:
bool: İşlem başarılı mı
"""
try:
if 0 <= index < len(self.schedule_config["schedules"]):
schedule = self.schedule_config["schedules"][index]
if name is not None:
schedule["name"] = name
if time_str is not None:
# Saat formatını kontrol et
datetime.strptime(time_str, "%H:%M")
schedule["time"] = time_str
if days is not None:
schedule["days"] = days
if duration is not None:
schedule["duration"] = duration
if enabled is not None:
schedule["enabled"] = enabled
self.save_config()
# Zamanlamayı yeniden yükle
if self.running:
self.stop()
self.start()
logger.info(f"Sulama zamanı güncellendi: {schedule['name']}")
return True
else:
logger.warning(f"Geçersiz zamanlama indeksi: {index}")
return False
except Exception as e:
logger.error(f"Sulama zamanı güncelleme hatası: {str(e)}")
return False
def irrigate(self, schedule_name, duration):
"""
Sulama işlemini gerçekleştir
Args:
schedule_name: Zamanlama adı
duration: Sulama süresi (saniye)
"""
# Günlük maksimum çalışma süresini kontrol et
pump_status = self.pump.get_status()
if pump_status['daily_runtime'] >= self.schedule_config["max_daily_runtime"]:
logger.warning(f"Günlük maksimum sulama süresi aşıldı, sulama iptal edildi: {schedule_name}")
return
# Yağmur durumunu kontrol et (sensör verisi gerekir)
if self.schedule_config["skip_if_raining"] and self.is_raining():
logger.info(f"Yağmur yağıyor, sulama iptal edildi: {schedule_name}")
return
logger.info(f"Zamanlı sulama başlatılıyor: {schedule_name} ({duration} saniye)")
self.pump.start(duration=duration)
def is_raining(self):
"""
Yağmur yağıp yağmadığını kontrol et
Bu fonksiyon, yağmur sensörü verilerine göre güncellenmelidir
Returns:
bool: Yağmur yağıyor mu
"""
# Bu kısım, yağmur sensörü entegrasyonu ile güncellenmelidir
# Şimdilik varsayılan olarak False döndürüyoruz
return False
def setup_schedule(self):
"""
Zamanlama görevlerini ayarla
"""
# Mevcut zamanlamaları temizle
schedule.clear()
# Zamanlamaları etkinleştir
if not self.schedule_config["enabled"]:
logger.info("Sulama zamanlaması devre dışı")
return
# Her bir zamanlama için görev oluştur
for idx, s in enumerate(self.schedule_config["schedules"]):
if not s["enabled"]:
continue
# Günlere göre zamanlama oluştur
for day in s["days"]:
# Schedule kütüphanesi için gün adını al
day_method = getattr(schedule.every(), day)
# Zamanlamayı ayarla
day_method.at(s["time"]).do(
self.irrigate,
schedule_name=s["name"],
duration=s["duration"]
)
logger.info(f"Zamanlama eklendi: {s['name']} - {s['time']} - {', '.join(s['days'])}")
def run_scheduler(self):
"""
Zamanlayıcıyı çalıştır (ayrı bir thread'de)
"""
logger.info("Zamanlayıcı thread başlatıldı")
while self.running:
schedule.run_pending()
time.sleep(1)
logger.info("Zamanlayıcı thread durduruldu")
def start(self):
"""
Zamanlayıcıyı başlat
Returns:
bool: İşlem başarılı mı
"""
(Content truncated due to size limit. Use line ranges to read in chunks)