"""
This module defines a Redundancy Agent that can be used in the agentMET4FOF framework. It has two main data processing
types:
- lcs: best estimate calculation using Largest Consistent Subset method
- lcss: best estimate calculation using Largest Consistent Subset of Sensor values method
"""
from typing import Dict
import numpy as np
from agentMET4FOF.metrological_agents import MetrologicalAgent
from time_series_metadata.scheme import MetaData
from .metrological_streams_v2 import MetrologicalMultiWaveGenerator
from ..MFred.redundancy1 import calc_lcs, calc_lcss
[docs]class MetrologicalMultiWaveGeneratorAgent(MetrologicalAgent):
"""An agent streaming a signal composed of various sine and cosine components.
Takes samples from the :py:mod:`MultiWaveGenerator` and pushes them sample by sample (or in batches)
to connected agents via its output channel.
"""
# The datatype of the stream will be MultiWaveGenerator.
_data_stream: MetrologicalMultiWaveGenerator
batch_size = 100
[docs] def init_parameters(self, signal: MetrologicalMultiWaveGenerator = MetrologicalMultiWaveGenerator(), **kwargs):
"""Initialize the input data
Initialize the input data stream as an instance of the :py:mod:`MultiWaveGenerator` class
Parameters
----------
signal : Signal
the underlying signal for the generator
"""
self._data_stream = signal
super().init_parameters()
self.set_output_data(channel="default", metadata=self._data_stream.metadata)
[docs] def agent_loop(self):
"""Model the agent's behaviour
On state *Running* the agent will extract sample by sample the input data
streams content and push it via invoking :py:func:`AgentMET4FOF.send_output`.
"""
if self.current_state == "Running":
self.set_output_data(channel="default", data=self._data_stream._next_sample_generator())
super().agent_loop()
@property
def metadata(self) -> Dict:
return self._data_stream.metadata.metadata
[docs]class RedundancyAgent(MetrologicalAgent):
"""
This is the main Redundancy Agent class. Main calculation types are :py:func:`lcs` and :py:func:`lcss`, as defined
in the module :mod:`redundancy1`.
"""
metadata: MetaData
n_pr: int
problim: float
calc_type: str
sensor_key_list: list
a_arr: np.ndarray
a_arr2d: np.ndarray
[docs] def init_parameters(self, input_data_maxlen=25, output_data_maxlen=25):
"""
Initialize the input data stream as an instance of the :py:mod:`MultiWaveGenerator` class.
Parameters
----------
signal: Signal
the underlying signal for the generator
"""
self.metadata = MetaData(
device_id="RedAgent01",
time_name="time",
time_unit="s",
quantity_names="m",
quantity_units="kg",
misc="nothing")
super().init_parameters(input_data_maxlen=25, output_data_maxlen=25)
self.set_output_data(channel="default", metadata=self.metadata)
[docs] def init_parameters1(self, calc_type, sensor_key_list, n_pr, problim):
"""
Parameters used for both methods :func:`lcs` and :func:`lcss`.
Parameters
----------
calc_type: str
calculation type: 'lcs' or 'lcss'
sensor_key_list: list of strings
list containing the names of the sensors that should feed data to the Redundancy Agent
n_pr: integer
size of the batch of data that is handled at once by the Redundancy Agent
problim: float
limit probability used for conistency evaluation
"""
self.calc_type = calc_type
self.sensor_key_list = sensor_key_list
self.n_pr = n_pr
self.problim = problim
[docs] def init_parameters2(self, fsam, f1, f2, ampl_ratio, phi1, phi2):
"""
Additional parameters used for this particular example in combination with the :py:func:`lcss` method.
It provides the prior knowledge needed to make the information contained in the data redundant.
This method sets up the vector **a** and matrix *A* for the system **y** = **a** + *A* * **x**.
Parameters
----------
fsam: float
sampling frequency
f1: float
first frequency of interest in signal
f2 : float
second frequency of interest in signal
ampl_ratio: float
ratio of the amplitudes of the two frequency components
phi1: float
initial phase of first frequency component
phi2: float
initial phase of second frequency component
"""
# set-up vector a_arr and matrix a_arr2d for redundancy method
a = np.identity(self.n_pr)
n_pr2 = int(self.n_pr / 2)
afft = np.fft.fft(a)
bfft = np.real(afft[:n_pr2, :]) / n_pr2
cfft = -np.imag(afft[:n_pr2, :]) / n_pr2
c1 = 1 / np.cos(phi1)
c2 = 1 / np.sin(-phi1)
c3 = ampl_ratio / np.cos(phi2)
c4 = ampl_ratio / np.sin(-phi2)
# print('c1=',c1,', c2=',c2, ', c3=', c3, ', c4=', c4)
t_max = self.n_pr / fsam
df = 1 / t_max
# print('df=',df)
# print('f1=',f1)
# print('f2=', f2)
# integer check is_integer(f1/df) ?
ind_freq1 = f1/df
ind_freq2 = f2/df
# print('ind_freq1=', ind_freq1)
# print('ind_freq2=', ind_freq2)
ind_freq1 = int(ind_freq1)
ind_freq2 = int(ind_freq2)
# print('ind_freq1=', ind_freq1)
# print('ind_freq2=', ind_freq2)
a_row1 = c1 * bfft[ind_freq1, :]
a_row2 = c2 * cfft[ind_freq1, :]
a_row3 = c3 * bfft[ind_freq2, :]
a_row4 = c4 * cfft[ind_freq2, :]
# print('a_row1=', a_row1)
# print('a_row2=', a_row2)
# print('a_row3=', a_row3)
# print('a_row4=', a_row4)
a_row1 = a_row1.reshape((1, len(a_row1)))
a_row2 = a_row2.reshape((1, len(a_row2)))
a_row3 = a_row3.reshape((1, len(a_row3)))
a_row4 = a_row4.reshape((1, len(a_row4)))
self.a_arr2d = np.concatenate((a_row1, a_row2, a_row3, a_row4), axis=0)
self.a_arr = np.zeros(shape=(4, 1))
[docs] def agent_loop(self):
"""
Model the agent's behaviour
On state *Running* the agent will extract sample by sample the input data
streams content and push it via invoking :py:func:`AgentMET4FOF.send_output`.
"""
if self.current_state == "Running":
# sometimes the buffer does not contain values for all sensors
# sensor_key_list = ["Sensor1", "Sensor2"]
key_list = [key for key in self.sensor_key_list if key in self.buffer.keys()]
n_sensors = len(key_list)
if n_sensors != len(self.sensor_key_list): # expected number of sensors
print('Not all sensors were present in the buffer. Not evaluating the data.')
return 0
for key in key_list:
if self.buffer[key].shape[0] < self.n_pr:
print('Buffer size is ', self.buffer[key].shape[0], ', which is less than ', self.n_pr, '.')
print('Not enough data for redundancy agent evaluation.')
return 0
buff = self.buffer.popleft(self.n_pr) # take n_pr entries out from the buffer
t_data_arr2d = np.full(shape=(self.n_pr, n_sensors), fill_value=np.nan)
ut_data_arr2d = np.full(shape=(self.n_pr, n_sensors), fill_value=np.nan)
x_data_arr2d = np.full(shape=(self.n_pr, n_sensors), fill_value=np.nan)
ux_data_arr2d = np.full(shape=(self.n_pr, n_sensors), fill_value=np.nan)
# print('buff = ', buff)
i_sensor = 0
# for key in buff.keys(): # arbitrary order
for key in key_list:
data_arr = buff[key]
t_data_arr2d[:, i_sensor] = data_arr[:, 0]
ut_data_arr2d[:, i_sensor] = data_arr[:, 1]
x_data_arr2d[:, i_sensor] = data_arr[:, 2]
ux_data_arr2d[:, i_sensor] = data_arr[:, 3]
i_sensor = i_sensor + 1
#print('calc_type: ', self.calc_type)
if self.calc_type == "lcs":
#print('case lcs')
data = np.full(shape=(self.n_pr, 4), fill_value=np.nan)
for i_pnt in range(self.n_pr):
y_arr = np.array(x_data_arr2d[i_pnt, :])
y_arr = y_arr.reshape((n_sensors,1))
vy_arr2d = np.zeros(shape=(n_sensors, n_sensors))
for i_sensor in range(n_sensors):
vy_arr2d[i_sensor, i_sensor] = np.square(ux_data_arr2d[i_pnt, i_sensor])
#data = np.array([1, 2, 3, 4])
n_sols, ybest, uybest, chi2obs, indkeep = calc_lcs(y_arr, vy_arr2d, self.problim)
if n_sols == 1: # time stamp is value of first sensor
if isinstance(ybest, np.ndarray):
ybest = ybest[0]
data[i_pnt, :] = np.array([t_data_arr2d[i_pnt, 0], ut_data_arr2d[i_pnt, 0], ybest, uybest])
else: # only return the first solution
data[i_pnt, :] = np.array([t_data_arr2d[i_pnt, 0], ut_data_arr2d[i_pnt, 0], ybest[0], uybest[0]])
elif self.calc_type == "lcss":
# lcss applied to one data vector (required input)
# Sum the signals to get one signal
x_data_arr = np.sum(x_data_arr2d, axis=1)
x_data_arr = x_data_arr.reshape((len(x_data_arr), 1))
ux2_data_arr = np.sum(np.square(ux_data_arr2d), axis=1)
vx_arr2d = np.zeros((self.n_pr, self.n_pr))
for i_val in range(self.n_pr):
vx_arr2d[i_val, i_val] = ux2_data_arr[i_val]
# print('shape a_arr: ', self.a_arr.shape)
# print('shape a_arr2d: ', self.a_arr2d.shape)
# print('shape x_data_arr: ', x_data_arr.shape)
# print('shape vx_arr2d: ', vx_arr2d.shape)
# print('a_arr2d: ', self.a_arr2d)
n_sols, ybest, uybest, chi2obs, indkeep = \
calc_lcss(self.a_arr, self.a_arr2d, x_data_arr, vx_arr2d, self.problim)
print('calc lcss finished')
print('n_sols: ', n_sols)
print('ybest: ', ybest)
print('uybest: ', uybest)
if n_sols == 1: # time stamp is latest value
if isinstance(ybest, np.ndarray):
ybest = ybest[0]
data = np.array([t_data_arr2d[-1, 0], ut_data_arr2d[-1, 0], ybest, uybest])
else: # only return the first solution
data = np.array([t_data_arr2d[-1, 0], ut_data_arr2d[-1, 0], ybest[0], uybest[0]])
# Send the data
# data = np.array([1,2,3,4])
if len(data.shape) == 1:
data = data.reshape((1, len(data)))
# print('data = ', data)
self.set_output_data(channel="default", data=data)
super().agent_loop()
[docs] def on_received_message(self, message):
"""
Handles incoming data from 'default' channels.
Stores 'default' data into an internal buffer
Parameters
----------
message : dict
Only acceptable channel value is 'default'.
"""
if message['channel'] == 'default':
self.buffer_store(agent_from=message["from"], data=message["data"])
return 0