Hello everyone,
I newbee in the RevPi univers, so sorry if my problem is obvious.
I would like to iterate the list of inputs of an AIO module in order to check whether an input (identified by its name of type "§ad#Name") is present in piCtory.
I thought that the revpimodio2.io.IOList() method did indeed return the list of inputs and outputs.
Here's my code:
# Check that the analog input parameter is present in the piCtory configuration
_isPresent = False
for input in revpimodio2.io.IOList():
if input.name == self.analog_input: # TODO : Check that the input is an analog input
_isPresent = True
break
if not _isPresent:
print(f "Analog input '{self.analog_input}' not found in RevPi configuration") # TODO: Log the error instead of printing it
raise ValueError
Do you think this solution will work?
Thank you for your answers and comments.
PS: this code is extracted from the init method of a class called MeasurementSource
Translated with www.DeepL.com/Translator (free version)
IO iterator
Moderator: RevPiModIO
Use 'revpimodio2.io:' instead of 'revpimodio2.io.IOList():'
Nicolai
Nicolai
thanks it's ok
Here is my code for the posterity.
This is a solution for making available generic measurement sources whose origin can vary depending on the platform and measurement requirements.
A web application is associated with this piece of programming to generate a configuration JSON file.
And the JSON structure :
And PicTory configuration :
Results in WebApp :
Here is my code for the posterity.
This is a solution for making available generic measurement sources whose origin can vary depending on the platform and measurement requirements.
A web application is associated with this piece of programming to generate a configuration JSON file.
Code: Select all
from abc import ABC, abstractmethod
from math import cos
import json
import serial
import revpimodio2
class SourceConfigurationError(Exception):
"""
Exception raised when the connection or source configuration fail.
"""
pass
class MeasurementSource(ABC):
"""
Abstract class representing a measurement source.
Attributes:
name (str): The name of the measurement source.
uuid (str): The UUID of the measurement source.
Methods:
retrieve_measurement: Retrieves the measurement from the measurement source.
"""
def __init__(self, name, uuid, machine):
"""
Constructor of the class.
Args:
name (str): The name of the measurement source.
uuid (str): The UUID of the measurement source.
machine (TestMachine): The test machine instance.
"""
self.name = name # The name of the measurement source.
self.uuid = uuid # The UUID of the measurement source.
self.machine = machine # The test machine instance.
@abstractmethod
def retrieve_measurement(self):
"""
Retrieves the measurement from the measurement source.
Returns:
float: The measurement value.
"""
pass
class MeasurementSourceManager:
"""
Class for managing the measurement sources.
Attributes:
sources (list): List of measurement sources.
Methods:
load_from_json: Loads the measurement sources from a JSON file.
"""
def __init__(self):
"""
Constructor of the class.
"""
self.sources = []
@classmethod
def from_json(self, json_data, machine):
"""
Loads the measurement sources from JSON data.
Args:
json_data (dict): The JSON data.
machine (TestMachine): The test machine instance.
Returns:
MeasurementSourceManager: The measurement source manager instance.
"""
_source = None
try:
print(json_data) # TODO : Remove this line
source_type = json_data.get('type')
source_name = json_data.get('name')
source_uuid = json_data.get('source_id')
source_config = json_data.get('config')
if source_type == 'revpi':
try:
_source = RevPIMeasurementSource(source_name, source_uuid, machine, source_config)
except SourceConfigurationError as e:
print(f"Error: Failed to create RevPi measurement source: {e}") # TODO : log the error instead of printing it
_source = None
elif source_type == 'database':
try:
_source = DatabaseMeasurementSource(source_name, source_uuid, machine, source_config)
except SourceConfigurationError as e:
print(f"Error: Failed to create database measurement source: {e}") # TODO : log the error instead of printing it
_source = None
elif source_type == 'serial':
try:
_source = SerialMeasurementSource(source_name, source_uuid, machine, source_config)
except SourceConfigurationError as e:
print(f"Error: Failed to create serial measurement source: {e}") # TODO : log the error instead of printing it
_source = None
elif source_type == 'fake':
try:
_source = FakeMeasurementSource(source_name, source_uuid, machine, source_config)
except SourceConfigurationError as e:
print(f"Error: Failed to create fake measurement source: {e}") # TODO : log the error instead of printing it
_source = None
else:
_source = None
print(f"Error : Skipping invalid measurement source data: {json_data}") # TODO : log the error instead of printing it
except KeyError as e:
print(f"Invalid measurement source data: {e}") # TODO : Manage the case where the source data is invalid - log the error instead of printing it
return _source
class RevPIMeasurementSource(MeasurementSource):
def __init__(self, name, uuid, machine, config):
"""
Constrained measurement source for sensors connected to an AIO module on the RevPI platform
Args:
name (str): The name of the measurement source.
uuid (str): The UUID of the measurement source.
machine (TestMachine): The test machine instance.
config (dict): The configuration of the measurement source.
Raises:
SourceConfigurationError: Raised when the RevPiModIO is not found or not configured.
ValueError: Raised when the analog input parameter is missing or not found in the piCtory configuration.
"""
# https://revpimodio.org/en/doc2/io/
super().__init__(name, uuid, machine)
self.analog_input = config['analog_input']
try:
self.revpi = machine._revpi
except RuntimeError:
print(f"RevPiModIO not found or not configured") # TODO : Log the error instead of printing it
raise SourceConfigurationError("RevPiModIO not found or not configured")
# Check that the analogue input parameter is present
if not self.analog_input:
print(f"Missing parameter 'analog_input' in RevPi Measurement Source configuration") # TODO : Log the error instead of printing it
raise ValueError
# Check that the analog input parameter is present in the piCtory configuration
_isPresent = False
# cf suggestion forum https://revolutionpi.com/forum/viewtopic.php?t=4055
for input in self.revpi.io:
if input.name == self.analog_input:
_isPresent = True
break
if not _isPresent:
print(f"Analog input '{self.analog_input}' not found in RevPi configuration") # TODO : Log the error instead of printing it
raise ValueError
def retrieve_measurement(self):
"""
Retrieves the measurement from the measurement source.
Returns:
float: The measurement value.
"""
measurement = 0.0
measurement = float(self.revpi.io[self.analog_input].value)
return measurement
class FakeMeasurementSource(MeasurementSource):
"""
Class representing a fake measurement source.
The measurements are simulated by a cosine function.
Attributes:
name (str): The name of the measurement source.
uuid (str): The UUID of the measurement source.
machine (TestMachine): The test machine instance.
config (dict): The configuration of the measurement source.
Methods:
retrieve_measurement: Retrieves the measurement from the measurement source.
"""
def __init__(self, name, uuid, machine, config):
super().__init__(name, uuid, machine)
self._msg_count = 0 # Number of messages received from the sensor.
def retrieve_measurement(self):
measurement = cos(self._msg_count/10)
self._msg_count += 1
if self._msg_count > 100000:
self._msg_count = 0
return measurement
Code: Select all
{"sources": [{"source_id": "b588e23a-256b-440c-ad11-632bd796d1a6", "name": "Fake Capteur de temp\u00e9rature", "type": "fake", "config": {}}, {"source_id": "f7bb23e6-4073-425b-95ef-d01aab1c1cbf", "name": "Sonde de temp\u00e9rature REVPI", "type": "revpi", "config": {"analog_input": "Temperature"}}, {"source_id": "9c183a60-9501-4729-833e-4788278fdef5", "name": "Source de mesure de d\u00e9placement RevPI", "type": "revpi", "config": {"analog_input": "Displacement"}}, {"source_id": "c603932a-a52a-4625-8ef6-8425054e43b1", "name": "Source de mesure Pression RevPI", "type": "revpi", "config": {"analog_input": "Pressure"}}], }