Hi troll100!
Ich habe jetzt mal einen Code vorbereitet, der sich den 80 Hz vom AIO mit einer Zykluszeit von 13 Millisekunden annähert. Damit solltest du das beste Ergebnis erzielen.
Der Code hat nun keine Ausgaben mehr auf der Konsole. Wenn er gestartet wird, wartet er auf den I_1 Input. Wenn der kommt, dann sammelt das System 2.5 Sekunden lang (der Wert kann ganz unten bei "measure_time=2.5" eingestellt werden) die Werte vom RDTValue_1. Den Wert und einen einfachen Timestamp übergibt die Zyklusfunktion in eine Queue. Damit ist die Funktion relativ schnell und sollte die Zykluszeit einhalten können, damit die RuntimeWarnings ausbleiben.
Ein anderer Thread in dem Programm (writer_thread) arbeitet parallel zur Zyklusfunktion und ist für das Auslesen der Daten aus der Queue, das Formatieren und das eigentliche schreiben verantwortlich. Er nimmt sich einen Wert aus der Queue, bereitet Datum und Uhrzeit bis zur Mikrosekunde vor und schreibt einen Zeile in die CSV Datei. Diese Vorgänge brauchen "viel" Rechenzeit und würden unsere Zyklusfunktion sehr langsam machen - Dementsprechend kommen RuntimeWarnings und wir "verpassen" im schlimmsten Fall sogar Werte.
Klar kann es nun passieren, dass der Thread nicht "hinterher" kommt, da sich ja Zyklusfunktion und Thread die Rechenzeit teilen müssen, aber das macht in der Hinsicht nichts, da er nach einer Messung ja etwas länger Zeit hat um seine Arbeit zu erledigen
PS: Der Code in der Zyklusfunktion schaut auf die Flanke von I_1, wenn I_1 auf True wechselt, wird die Messung sofort gestartet. Während oder nach der Messung muss I_1 einmal den Status False annehmen, bevor ein weiteres True Signal eine weitere Messung startet.
Code: Select all
# -*- coding: utf-8 -*-
"""Measure values for a defined time and save to CSV."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2020 Sven Sager"
__license__ = "GPLv3"
import csv
from datetime import datetime
from queue import Queue
from threading import Thread
from time import time
import revpimodio2
class RTDDemo():
"""Measure RTD values for a defined time."""
def __init__(self, measure_time: float, file_name: str):
"""
Measure RTD values for a defined time on trigger.
:param measure_time: Time in seconds
"""
self._revpi_cycletime = 13
# Amount of cycles to measure on trigger
self.measure_cycles = int(measure_time * 1000 / self._revpi_cycletime)
# Thread and queue to write CSV file
self.file_name = file_name
self.q = Queue()
self.th = Thread(target=self.writer_thread)
# RevPi with cleanup function
self.rpi = revpimodio2.RevPiModIO(autorefresh=True)
self.rpi.handlesignalend()
def cycle(self, ct):
# Init cycle tools with variables
if ct.first:
ct.var.counter = 0
ct.var.running = False
ct.var.mrk_input = False
# Value changed on input to start measurement
if not ct.var.mrk_input and self.rpi.io.I_1.value:
# Save value to get the edge of input
ct.var.mrk_input = True
# Activate measurement
ct.var.counter = 0
ct.var.running = True
# Start the work in the same cycle as input was triggered
if ct.var.running:
ct.var.counter += 1
# Collect value and put to queue
self.q.put_nowait((time(), self.rpi.io.RTDValue_1.value))
if ct.var.counter == self.measure_cycles:
# End collecting values on next cycle
ct.var.counter = 0
ct.var.running = False
elif not self.rpi.io.I_1.value:
# Reset marker to get the next edge of input after measurement only
ct.var.mrk_input = False
def start(self):
"""Start the cycle work"""
self.th.start()
self.rpi.cycleloop(self.cycle, cycletime=self._revpi_cycletime)
# Tell thread to shutdown and wait
self.q.put_nowait(None)
self.th.join()
def writer_thread(self):
"""Thread to write values to CSV file."""
csv_fh = open(self.file_name, "a")
csv_writer = csv.writer(csv_fh)
# If this is the beginning of csv, write header
if csv_fh.tell() == 0:
csv_writer.writerow([
"year", "month", "day",
"hour", "minute", "second", "microsecond",
"value"
])
# Mainloop of this thread
while True:
value = self.q.get()
if value is not None:
# We got a value from cycle function
# Process date time and the value in this thread, not in cycle function
dt = datetime.fromtimestamp(value[0])
csv_writer.writerow([
dt.year, dt.month, dt.day,
dt.hour, dt.minute, dt.second, dt.microsecond,
value[1]
])
# Optional to write data immediately to disk
csv_fh.flush()
else:
# Cleanup function requested us to stop
break
# Close CSV file
csv_fh.close()
if __name__ == "__main__":
app = RTDDemo(
measure_time=2.5,
file_name="/home/pi/Pt100/sensordaten.csv"
)
app.start()
Gruß, Sven