IO iterator

Moderator: RevPiModIO

Post Reply
JPeGfr
Posts: 5
Joined: 26 Apr 2023, 13:30

IO iterator

Post by JPeGfr »

Hello everyone,
I newbee in the RevPi univers, so sorry if my problem is obvious.
I would like to iterate the list of inputs of an AIO module in order to check whether an input (identified by its name of type "§ad#Name") is present in piCtory.
I thought that the revpimodio2.io.IOList() method did indeed return the list of inputs and outputs.
Here's my code:
# Check that the analog input parameter is present in the piCtory configuration
_isPresent = False
for input in revpimodio2.io.IOList():
if input.name == self.analog_input: # TODO : Check that the input is an analog input
_isPresent = True
break
if not _isPresent:
print(f "Analog input '{self.analog_input}' not found in RevPi configuration") # TODO: Log the error instead of printing it
raise ValueError

Do you think this solution will work?
Thank you for your answers and comments.
PS: this code is extracted from the init method of a class called MeasurementSource

Translated with www.DeepL.com/Translator (free version)
User avatar
nicolaiB
KUNBUS
Posts: 901
Joined: 21 Jun 2018, 10:33
Location: Berlin
Contact:

Re: IO iterator

Post by nicolaiB »

Use 'revpimodio2.io:' instead of 'revpimodio2.io.IOList():'

Nicolai
JPeGfr
Posts: 5
Joined: 26 Apr 2023, 13:30

Re: IO iterator

Post by JPeGfr »

thanks it's ok
Here is my code for the posterity.
This is a solution for making available generic measurement sources whose origin can vary depending on the platform and measurement requirements.
A web application is associated with this piece of programming to generate a configuration JSON file.

Code: Select all

from abc import ABC, abstractmethod
from math import cos
import json
import serial
import revpimodio2

class SourceConfigurationError(Exception):
    """
    Exception raised when the connection or source configuration fail.
    """
    pass

class MeasurementSource(ABC):
    """
    Abstract class representing a measurement source.

    Attributes:
        name (str): The name of the measurement source.
        uuid (str): The UUID of the measurement source.

    Methods:
        retrieve_measurement: Retrieves the measurement from the measurement source.
    """
    def __init__(self, name, uuid, machine):
        """
        Constructor of the class.

        Args:
            name (str): The name of the measurement source.
            uuid (str): The UUID of the measurement source.
            machine (TestMachine): The test machine instance.
        """
        self.name = name        # The name of the measurement source.
        self.uuid = uuid        # The UUID of the measurement source.
        self.machine = machine  # The test machine instance.  

    @abstractmethod
    def retrieve_measurement(self):
        """
        Retrieves the measurement from the measurement source.

        Returns:
            float: The measurement value.
        """
        pass
        
 
 class MeasurementSourceManager:
    """
    Class for managing the measurement sources.

    Attributes:
        sources (list): List of measurement sources.

    Methods:
        load_from_json: Loads the measurement sources from a JSON file.
    """
    def __init__(self):
        """
        Constructor of the class.
        """
        self.sources = []

    @classmethod
    def from_json(self, json_data, machine):
        """
        Loads the measurement sources from JSON data.

        Args:
            json_data (dict): The JSON data.
            machine (TestMachine): The test machine instance.

        Returns:
            MeasurementSourceManager: The measurement source manager instance.
        """
        
        _source = None

        try:
            print(json_data) # TODO : Remove this line
            source_type = json_data.get('type')
            source_name = json_data.get('name')
            source_uuid = json_data.get('source_id')
            source_config = json_data.get('config')

            if source_type == 'revpi':
                
                try:
                    _source = RevPIMeasurementSource(source_name, source_uuid, machine, source_config)
                except SourceConfigurationError as e:
                    print(f"Error: Failed to create RevPi measurement source: {e}") # TODO : log the error instead of printing it
                    _source = None

            elif source_type == 'database':
                
                try:
                    _source = DatabaseMeasurementSource(source_name, source_uuid, machine, source_config)
                except SourceConfigurationError as e:
                    print(f"Error: Failed to create database measurement source: {e}") # TODO : log the error instead of printing it
                    _source = None

            elif source_type == 'serial':

                try:
                    _source = SerialMeasurementSource(source_name, source_uuid, machine, source_config)
                except SourceConfigurationError as e:
                    print(f"Error: Failed to create serial measurement source: {e}") # TODO : log the error instead of printing it
                    _source = None

            elif source_type == 'fake':
                
                try:
                    _source = FakeMeasurementSource(source_name, source_uuid, machine, source_config)
                except SourceConfigurationError as e:
                    print(f"Error: Failed to create fake measurement source: {e}") # TODO : log the error instead of printing it
                    _source = None

            else:
                _source = None
                print(f"Error : Skipping invalid measurement source data: {json_data}") # TODO : log the error instead of printing it

        except KeyError as e:
            print(f"Invalid measurement source data: {e}")  # TODO : Manage the case where the source data is invalid - log the error instead of printing it

        return _source

 
class RevPIMeasurementSource(MeasurementSource):
    def __init__(self, name, uuid, machine, config):
        """
        Constrained measurement source for sensors connected to an AIO module on the RevPI platform

        Args:
            name (str): The name of the measurement source.
            uuid (str): The UUID of the measurement source.
            machine (TestMachine): The test machine instance.
            config (dict): The configuration of the measurement source.

        Raises:
            SourceConfigurationError: Raised when the RevPiModIO is not found or not configured.
            ValueError: Raised when the analog input parameter is missing or not found in the piCtory configuration.
        
        """
        # https://revpimodio.org/en/doc2/io/
        super().__init__(name, uuid, machine)
        self.analog_input = config['analog_input']
        try:
            self.revpi = machine._revpi
        except RuntimeError:
            print(f"RevPiModIO not found or not configured")  # TODO : Log the error instead of printing it
            raise SourceConfigurationError("RevPiModIO not found or not configured")
        
        # Check that the analogue input parameter is present
        if not self.analog_input:
            print(f"Missing parameter 'analog_input' in RevPi Measurement Source configuration") # TODO : Log the error instead of printing it
            raise ValueError
        # Check that the analog input parameter is present in the piCtory configuration
        _isPresent = False
        # cf suggestion forum https://revolutionpi.com/forum/viewtopic.php?t=4055
        for input in self.revpi.io: 
            if input.name == self.analog_input:
                _isPresent = True
                break
        if not _isPresent:
            print(f"Analog input '{self.analog_input}' not found in RevPi configuration") # TODO : Log the error instead of printing it
            raise ValueError

    def retrieve_measurement(self):
        """
        Retrieves the measurement from the measurement source.
        
        Returns:
            float: The measurement value.
        """
        measurement = 0.0
        measurement = float(self.revpi.io[self.analog_input].value)
        return measurement
        
class FakeMeasurementSource(MeasurementSource):
    """
    Class representing a fake measurement source.
    The measurements are simulated by a cosine function.

    Attributes:
        name (str): The name of the measurement source.
        uuid (str): The UUID of the measurement source.
        machine (TestMachine): The test machine instance.
        config (dict): The configuration of the measurement source.

    Methods:
        retrieve_measurement: Retrieves the measurement from the measurement source.
    """
    def __init__(self, name, uuid, machine, config):
        super().__init__(name, uuid, machine)
        self._msg_count = 0     # Number of messages received from the sensor.

    def retrieve_measurement(self):
        measurement = cos(self._msg_count/10)
        self._msg_count += 1
        if self._msg_count > 100000:
            self._msg_count = 0
        return measurement
And the JSON structure :

Code: Select all

{"sources": [{"source_id": "b588e23a-256b-440c-ad11-632bd796d1a6", "name": "Fake Capteur de temp\u00e9rature", "type": "fake", "config": {}}, {"source_id": "f7bb23e6-4073-425b-95ef-d01aab1c1cbf", "name": "Sonde de temp\u00e9rature REVPI", "type": "revpi", "config": {"analog_input": "Temperature"}}, {"source_id": "9c183a60-9501-4729-833e-4788278fdef5", "name": "Source de mesure de d\u00e9placement RevPI", "type": "revpi", "config": {"analog_input": "Displacement"}}, {"source_id": "c603932a-a52a-4625-8ef6-8425054e43b1", "name": "Source de mesure Pression RevPI", "type": "revpi", "config": {"analog_input": "Pressure"}}], }
And PicTory configuration :
PicToryconf1.png
PicToryconf1.png (13.26 KiB) Viewed 8143 times
PicToryconf2.png
PicToryconf2.png (16.85 KiB) Viewed 8143 times
Results in WebApp :
Clipboard - 26 juillet 2023 20_27.png
Clipboard - 26 juillet 2023 20_27.png (134.13 KiB) Viewed 8141 times
Post Reply