Skip to content

Measurement

Note that Measurement is not a dedicated service but a part of UserApplicationControl or UserApplicationMonitoring.

Below a sample that demonstrate how use Measurement. It starts an acquisition of 2 channels at 1kHz for 4 seconds. And use pyplot to display the result:

from restmapi.services import MAPIServices
import time
import matplotlib.pyplot as plt

with MAPIServices.instance.get_service("UserApplicationControl") as uac:
    measurement_manager = uac.get_measurement_manager()
    with measurement_manager.create() as measurement:
        measurement.set_period(0.001)
        measurement.set_quantities(["sintime","d_sintime_dt"]) # REPLACE HERE BY EXISTING CHANNELS
        measurement.set_duration(4)     
        measurement.set_measurement_mode(1) #oneshot
        print(f"Measurement status {measurement.get_status()}")
        measurement.activate()        
        print(f"Measurement status {measurement.get_status()}")
        measurement.start()        
        while measurement.get_status() == 2:
            print(f"Measurement status {measurement.get_status()}")
            time.sleep(0.5)     
        print(f"Measurement status {measurement.get_status()}")
        values = measurement.get_values()
        x = [x for index, x in enumerate(values) if index%3 == 0]
        y1 = [y for index, y in enumerate(values) if index%3 == 1]
        y2 = [y for index, y in enumerate(values) if index%3 == 2]
        plt.plot(x, y1, label = "sintime")
        plt.plot(x, y2, label = "d(sintime)/dt")
        plt.xlabel('x - time')
        plt.ylabel('y - sintime')
        plt.title('sintime!')
        plt.legend()
        plt.show() 

Display result:

Measurement Plot Result

Same sample but using SignalR and continous acquisition mode:

from restmapi.services import MAPIServices
import time
import matplotlib.pyplot as plt  

global values

with MAPIServices.instance.get_service("UserApplicationControl") as uac:
    measurement_manager = uac.get_measurement_manager()
    measurement_manager.init_signalr()
    with measurement_manager.create() as measurement:
        values = []
        measurement_manager.register_callback(measurement.id, "StatusChanged", lambda id, old, new:print(f"Measurement status changed from {old} to {new}") )
        def on_buffer_ready (id, new_values):
            global values            
            print("Measurement buffer ready")                            
            values = values + new_values   
        measurement_manager.register_callback(measurement.id, "BufferReady",  on_buffer_ready)
        measurement_manager.register_callback(measurement.id, "BufferFull", lambda id:print(f"Measurement buffer full") )
        measurement_manager.register_callback(measurement.id, "Disposed", lambda id:print(f"Measurement disposed") )
        measurement.set_period(0.001)
        measurement.set_quantities(["sintime","d_sintime_dt"])  # REPLACE HERE BY EXISTING CHANNELS
        measurement.set_duration(4)     
        measurement.set_measurement_mode(0) #Continuous
        measurement.activate()        
        measurement.start()        
        time.sleep(5)        
        measurement.stop()        
        x = [x for index, x in enumerate(values) if index%3 == 0]
        y1 = [y for index, y in enumerate(values) if index%3 == 1]
        y2 = [y for index, y in enumerate(values) if index%3 == 2]
        plt.plot(x, y1, label = "sintime")
        plt.plot(x, y2, label = "d(sintime)/dt")
        plt.xlabel('x - time')
        plt.ylabel('y - sintime')
        plt.title('sintime!')
        plt.legend()
        plt.show()                          

A new sample with full continous acquisition mode management:

from restmapi.services import MAPIServices
import time
import matplotlib.pyplot as plt  

global values, x, y1, y2
global ax, curve1, curve2, fig, initialized
initialized = False

with MAPIServices.instance.get_service("UserApplicationControl") as uac:
    measurement_manager = uac.get_measurement_manager()
    measurement_manager.init_signalr()
    with measurement_manager.create() as measurement:
        values = []
        fig, ax = plt.subplots() 
        measurement_manager.register_callback(measurement.id, "StatusChanged", lambda id, old, new:print(f"Measurement status changed from {old} to {new}") )
        def on_buffer_ready (id, new_values):
            global values, x, y1, y2                                    
            global ax, curve1, curve2, fig            
            print(f"Measurement buffer ready {len(new_values)}")                            
            values = values + new_values 
            x = [x for index, x in enumerate(values) if index%3 == 0]
            y1 = [y for index, y in enumerate(values) if index%3 == 1]
            y2 = [y for index, y in enumerate(values) if index%3 == 2]  
            print(f"Buffer size {len(x)}") 
            if (initialized):                            
                #ax.clear()
                curve1.set_xdata(x)           
                curve1.set_ydata(y1) 
                curve2.set_xdata(x)           
                curve2.set_ydata(y2)           
                ax.relim()
                ax.autoscale_view()            
                fig.canvas.draw_idle()                                         
        measurement_manager.register_callback(measurement.id, "BufferReady",  on_buffer_ready)
        measurement_manager.register_callback(measurement.id, "BufferFull", lambda id:print(f"Measurement buffer full") )
        measurement_manager.register_callback(measurement.id, "Disposed", lambda id:print(f"Measurement disposed") )
        measurement.set_period(0.001)
        measurement.set_quantities(["sintime","d_sintime_dt"])  # REPLACE HERE BY EXISTING CHANNELS
        measurement.set_duration(4)     
        measurement.set_measurement_mode(0) #Continuous
        measurement.activate()        
        measurement.start()        
        time.sleep(1)        
        x = [x for index, x in enumerate(values) if index%3 == 0]
        y1 = [y for index, y in enumerate(values) if index%3 == 1]
        y2 = [y for index, y in enumerate(values) if index%3 == 2]
        curve1, = ax.plot(x, y1, label = "sintime")
        curve2, = ax.plot(x, y2, label = "d(sintime)/dt")
        ax.set_autoscaley_on(True)        
        fig.suptitle('sintime!')
        initialized = True
        plt.show()                                  
        measurement.stop()