Sensör Veri Okuma Kodları
Bu bölümde, akıllı sulama sistemimizde kullanacağımız sensörlerden veri okumak için gerekli Python kodlarını bulacaksınız. Her sensör için ayrı bir kod örneği ve açıklaması verilmiştir.1. Toprak Nem Sensörü Okuma (MCP3008 ADC ile)
Raspberry Pi’nin analog giriş pini olmadığı için, toprak nem sensörlerinin analog çıkışlarını okumak için MCP3008 ADC (Analog-Dijital Dönüştürücü) kullanacağız.Copy
# soil_moisture_sensor.py
import time
import busio
import digitalio
import board
import adafruit_mcp3xxx.mcp3008 as MCP
from adafruit_mcp3xxx.analog_in import AnalogIn
class SoilMoistureSensor:
def __init__(self, channel):
# SPI bağlantısını oluştur
spi = busio.SPI(clock=board.SCK, MISO=board.MISO, MOSI=board.MOSI)
# CS pini için chip select oluştur
cs = digitalio.DigitalInOut(board.CE0)
# MCP3008 ADC'yi oluştur
mcp = MCP.MCP3008(spi, cs)
# Analog giriş kanalını belirle
self.channel = AnalogIn(mcp, channel)
# Kalibrasyon değerleri (bu değerler sensörünüze göre ayarlanmalıdır)
self.dry_value = 65000 # Kuru toprakta ölçülen değer
self.wet_value = 20000 # Islak toprakta ölçülen değer
def read_raw(self):
"""Ham ADC değerini okur"""
return self.channel.value
def read_voltage(self):
"""Voltaj değerini okur"""
return self.channel.voltage
def read_percentage(self):
"""Nem yüzdesini hesaplar (0-100)"""
raw_value = self.read_raw()
# Değeri yüzdeye dönüştür (kuru: 0%, ıslak: 100%)
moisture_percentage = max(0, min(100, ((self.dry_value - raw_value) /
(self.dry_value - self.wet_value) * 100)))
return round(moisture_percentage, 1)
def calibrate_dry(self):
"""Kuru toprak kalibrasyonu"""
self.dry_value = self.read_raw()
print(f"Kuru toprak değeri: {self.dry_value}")
return self.dry_value
def calibrate_wet(self):
"""Islak toprak kalibrasyonu"""
self.wet_value = self.read_raw()
print(f"Islak toprak değeri: {self.wet_value}")
return self.wet_value
# Örnek kullanım
if __name__ == "__main__":
# MCP3008'in 0. kanalına bağlı toprak nem sensörü
sensor = SoilMoistureSensor(0)
try:
while True:
raw = sensor.read_raw()
voltage = sensor.read_voltage()
percentage = sensor.read_percentage()
print(f"Ham Değer: {raw}")
print(f"Voltaj: {voltage:.2f}V")
print(f"Nem Yüzdesi: {percentage}%")
print("-" * 30)
time.sleep(2)
except KeyboardInterrupt:
print("Program sonlandırıldı")
2. DHT22 Sıcaklık ve Nem Sensörü Okuma
DHT22 sensörü, ortam sıcaklığı ve nem değerlerini ölçmek için kullanılır.Copy
# dht22_sensor.py
import time
import board
import adafruit_dht
class DHT22Sensor:
def __init__(self, pin=board.D4):
# DHT22 sensörünü belirtilen pine bağla
self.dht_device = adafruit_dht.DHT22(pin)
self.last_temperature = None
self.last_humidity = None
self.error_count = 0
def read(self):
"""Sıcaklık ve nem değerlerini okur"""
try:
# Sensörden değerleri oku
temperature = self.dht_device.temperature
humidity = self.dht_device.humidity
# Başarılı okuma, değerleri güncelle
self.last_temperature = temperature
self.last_humidity = humidity
self.error_count = 0
return {
'temperature': temperature,
'humidity': humidity,
'status': 'success'
}
except RuntimeError as e:
# DHT sensörleri bazen okuma hatası verebilir
self.error_count += 1
# Son başarılı değerleri döndür (eğer varsa)
if self.last_temperature is not None and self.last_humidity is not None:
return {
'temperature': self.last_temperature,
'humidity': self.last_humidity,
'status': 'using_last_values'
}
else:
return {
'temperature': None,
'humidity': None,
'status': 'error',
'message': str(e)
}
except Exception as e:
self.error_count += 1
return {
'temperature': None,
'humidity': None,
'status': 'error',
'message': str(e)
}
def close(self):
"""Sensör bağlantısını kapat"""
self.dht_device.exit()
# Örnek kullanım
if __name__ == "__main__":
# DHT22 sensörünü GPIO 4 pinine bağla
sensor = DHT22Sensor(board.D4)
try:
while True:
result = sensor.read()
if result['status'] == 'success':
print(f"Sıcaklık: {result['temperature']}°C")
print(f"Nem: {result['humidity']}%")
elif result['status'] == 'using_last_values':
print(f"Sıcaklık (son değer): {result['temperature']}°C")
print(f"Nem (son değer): {result['humidity']}%")
else:
print(f"Hata: {result['message']}")
print("-" * 30)
time.sleep(2)
except KeyboardInterrupt:
print("Program sonlandırıldı")
sensor.close()
3. Yağmur Sensörü Okuma (Opsiyonel)
Yağmur sensörü, yağmur yağıp yağmadığını tespit etmek için kullanılır.Copy
# rain_sensor.py
import time
import busio
import digitalio
import board
import adafruit_mcp3xxx.mcp3008 as MCP
from adafruit_mcp3xxx.analog_in import AnalogIn
import RPi.GPIO as GPIO
class RainSensor:
def __init__(self, analog_channel, digital_pin=24):
# SPI bağlantısını oluştur (MCP3008 için)
spi = busio.SPI(clock=board.SCK, MISO=board.MISO, MOSI=board.MOSI)
cs = digitalio.DigitalInOut(board.CE0)
mcp = MCP.MCP3008(spi, cs)
# Analog giriş kanalını belirle
self.analog_channel = AnalogIn(mcp, analog_channel)
# Dijital pin ayarları
self.digital_pin = digital_pin
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.digital_pin, GPIO.IN)
# Kalibrasyon değerleri
self.dry_value = 65000 # Kuru sensör değeri
self.wet_value = 20000 # Islak sensör değeri
def read_analog(self):
"""Analog değeri okur (hassas ölçüm)"""
return self.analog_channel.value
def read_digital(self):
"""Dijital değeri okur (var/yok tespiti)"""
return GPIO.input(self.digital_pin)
def is_raining(self):
"""Yağmur yağıp yağmadığını kontrol eder"""
analog_value = self.read_analog()
digital_value = self.read_digital()
# Analog değer eşik değerinin altındaysa veya dijital pin LOW ise yağmur var
rain_threshold = (self.dry_value + self.wet_value) / 2
is_raining_analog = analog_value < rain_threshold
is_raining_digital = digital_value == 0 # LOW = yağmur var
return {
'is_raining': is_raining_analog or is_raining_digital,
'analog_value': analog_value,
'digital_value': digital_value,
'intensity': self.calculate_intensity(analog_value)
}
def calculate_intensity(self, analog_value):
"""Yağmur şiddetini hesaplar (0-100)"""
# Değeri yüzdeye dönüştür (kuru: 0%, ıslak: 100%)
intensity = max(0, min(100, ((self.dry_value - analog_value) /
(self.dry_value - self.wet_value) * 100)))
return round(intensity, 1)
def cleanup(self):
"""GPIO pinlerini temizle"""
GPIO.cleanup(self.digital_pin)
# Örnek kullanım
if __name__ == "__main__":
# MCP3008'in 3. kanalına ve GPIO 24 pinine bağlı yağmur sensörü
sensor = RainSensor(3, 24)
try:
while True:
result = sensor.is_raining()
if result['is_raining']:
print(f"Yağmur tespit edildi! Şiddet: {result['intensity']}%")
else:
print("Yağmur yok")
print(f"Analog Değer: {result['analog_value']}")
print(f"Dijital Değer: {result['digital_value']}")
print("-" * 30)
time.sleep(2)
except KeyboardInterrupt:
print("Program sonlandırıldı")
sensor.cleanup()
4. Işık Sensörü (LDR) Okuma (Opsiyonel)
Işık sensörü, ortam ışık seviyesini ölçmek için kullanılır.Copy
# light_sensor.py
import time
import busio
import digitalio
import board
import adafruit_mcp3xxx.mcp3008 as MCP
from adafruit_mcp3xxx.analog_in import AnalogIn
class LightSensor:
def __init__(self, channel):
# SPI bağlantısını oluştur
spi = busio.SPI(clock=board.SCK, MISO=board.MISO, MOSI=board.MOSI)
cs = digitalio.DigitalInOut(board.CE0)
mcp = MCP.MCP3008(spi, cs)
# Analog giriş kanalını belirle
self.channel = AnalogIn(mcp, channel)
# Kalibrasyon değerleri
self.dark_value = 65000 # Karanlıkta ölçülen değer
self.bright_value = 5000 # Parlak ışıkta ölçülen değer
def read_raw(self):
"""Ham ADC değerini okur"""
return self.channel.value
def read_voltage(self):
"""Voltaj değerini okur"""
return self.channel.voltage
def read_percentage(self):
"""Işık seviyesi yüzdesini hesaplar (0-100)"""
raw_value = self.read_raw()
# Değeri yüzdeye dönüştür (karanlık: 0%, parlak: 100%)
light_percentage = max(0, min(100, ((self.dark_value - raw_value) /
(self.dark_value - self.bright_value) * 100)))
return round(light_percentage, 1)
def is_daylight(self, threshold=50):
"""Gündüz mü gece mi olduğunu kontrol eder"""
return self.read_percentage() > threshold
# Örnek kullanım
if __name__ == "__main__":
# MCP3008'in 4. kanalına bağlı ışık sensörü
sensor = LightSensor(4)
try:
while True:
raw = sensor.read_raw()
voltage = sensor.read_voltage()
percentage = sensor.read_percentage()
is_day = sensor.is_daylight()
print(f"Ham Değer: {raw}")
print(f"Voltaj: {voltage:.2f}V")
print(f"Işık Seviyesi: {percentage}%")
print(f"Durum: {'Gündüz' if is_day else 'Gece'}")
print("-" * 30)
time.sleep(2)
except KeyboardInterrupt:
print("Program sonlandırıldı")
5. Su Seviye Sensörü Okuma (Opsiyonel)
Su seviye sensörü, su deposundaki su seviyesini ölçmek için kullanılır.Copy
# water_level_sensor.py
import time
import busio
import digitalio
import board
import adafruit_mcp3xxx.mcp3008 as MCP
from adafruit_mcp3xxx.analog_in import AnalogIn
class WaterLevelSensor:
def __init__(self, channel):
# SPI bağlantısını oluştur
spi = busio.SPI(clock=board.SCK, MISO=board.MISO, MOSI=board.MOSI)
cs = digitalio.DigitalInOut(board.CE0)
mcp = MCP.MCP3008(spi, cs)
# Analog giriş kanalını belirle
self.channel = AnalogIn(mcp, channel)
# Kalibrasyon değerleri
self.empty_value = 100 # Boş depoda ölçülen değer
self.full_value = 65000 # Dolu depoda ölçülen değer
def read_raw(self):
"""Ham ADC değerini okur"""
return self.channel.value
def read_voltage(self):
"""Voltaj değerini okur"""
return self.channel.voltage
def read_percentage(self):
"""Su seviyesi yüzdesini hesaplar (0-100)"""
raw_value = self.read_raw()
# Değeri yüzdeye dönüştür (boş: 0%, dolu: 100%)
level_percentage = max(0, min(100, ((raw_value - self.empty_value) /
(self.full_value - self.empty_value) * 100)))
return round(level_percentage, 1)
def is_low_level(self, threshold=20):
"""Su seviyesinin düşük olup olmadığını kontrol eder"""
return self.read_percentage() < threshold
def calibrate_empty(self):
"""Boş depo kalibrasyonu"""
self.empty_value = self.read_raw()
print(f"Boş depo değeri: {self.empty_value}")
return self.empty_value
def calibrate_full(self):
"""Dolu depo kalibrasyonu"""
self.full_value = self.read_raw()
print(f"Dolu depo değeri: {self.full_value}")
return self.full_value
# Örnek kullanım
if __name__ == "__main__":
# MCP3008'in 5. kanalına bağlı su seviye sensörü
sensor = WaterLevelSensor(5)
try:
while True:
raw = sensor.read_raw()
voltage = sensor.read_voltage()
percentage = sensor.read_percentage()
is_low = sensor.is_low_level()
print(f"Ham Değer: {raw}")
print(f"Voltaj: {voltage:.2f}V")
print(f"Su Seviyesi: {percentage}%")
print(f"Durum: {'Düşük Seviye!' if is_low else 'Normal'}")
print("-" * 30)
time.sleep(2)
except KeyboardInterrupt:
print("Program sonlandırıldı")
6. Tüm Sensörleri Birleştiren Ana Sınıf
Tüm sensörleri tek bir sınıfta birleştirerek kullanımı kolaylaştırabiliriz.Copy
# sensors_manager.py
import time
import board
from soil_moisture_sensor import SoilMoistureSensor
from dht22_sensor import DHT22Sensor
from rain_sensor import RainSensor
from light_sensor import LightSensor
from water_level_sensor import WaterLevelSensor
class SensorsManager:
def __init__(self):
# Sensörleri başlat
self.soil_sensors = [
SoilMoistureSensor(0), # MCP3008 kanal 0
SoilMoistureSensor(1), # MCP3008 kanal 1
SoilMoistureSensor(2) # MCP3008 kanal 2
]
self.dht_sensor = DHT22Sensor(board.D4) # GPIO 4
# Opsiyonel sensörler
self.rain_sensor = RainSensor(3, 24) # MCP3008 kanal 3, GPIO 24
self.light_sensor = LightSensor(4) # MCP3008 kanal 4
self.water_level_sensor = WaterLevelSensor(5) # MCP3008 kanal 5
def read_all(self):
"""Tüm sensörlerden veri okur"""
# Toprak nem sensörleri
soil_data = []
for i, sensor in enumerate(self.soil_sensors):
soil_data.append({
'sensor_id': i + 1,
'moisture_raw': sensor.read_raw(),
'moisture_percentage': sensor.read_percentage()
})
# DHT22 sıcaklık ve nem sensörü
dht_data = self.dht_sensor.read()
# Opsiyonel sensörler
rain_data = self.rain_sensor.is_raining()
light_data = {
'light_raw': self.light_sensor.read_raw(),
'light_percentage': self.light_sensor.read_percentage(),
'is_daylight': self.light_sensor.is_daylight()
}
water_level_data = {
'level_raw': self.water_level_sensor.read_raw(),
'level_percentage': self.water_level_sensor.read_percentage(),
'is_low_level': self.water_level_sensor.is_low_level()
}
# Tüm verileri birleştir
timestamp = time.time()
return {
'timestamp': timestamp,
'soil_moisture': soil_data,
'environment': dht_data,
'rain': rain_data,
'light': light_data,
'water_level': water_level_data
}