What I would like to achieve is that the capture_continuous function runs in the background, and the graph is only updated when a full scan has been produced. I understand that a simple way to do this would be to only have the graph update when the OFF scan is captured. However, I am trying to separate the capture process from the graph plotting process.
I envision that there will eventually be a GUI where the graph is always visible, and only starts updating when you click a 'LIVE MODE' button.
My initial idea had been to have the capture_continuous process running as a multiprocessing process and set a boolean to True every time it had produced a full scan. On top of this the graph would be continuously checking the state of the boolean so that it would update every time the boolean was set to True, set it back to False and update the graph. I am not having much luck with this as the capture_continuous process doesn't seem to run when it is run as a multiprocessing process, meaning that the boolean always stays as False and the graph never updates. I have confirmed this using a remote_pdb breakpoint. The capture_continuous process seems to run fine when it is just called as a function as I can see that the LEDs rapidly toggle state.
My questions are:
Am I doing something wrong?
Is there a better way to achieve this? I have seen something called the observer pattern that seems like it might be appropriate?
Code: Select all
from gpiozero import LED import numpy as np from time import sleep import picamera import matplotlib.pyplot as plt from scipy import signal import csv import os from multiprocessing import Process, current_process from remote_pdb import RemotePdb #LED Pins LED_STATUS_RED = 18 LED_STATUS_GREEN = 15 LED_STATUS_BLUE = 14 LED_SCAN_0 = 21 LED_SCAN_1 = 20 LED_SCAN_2 = 16 LED_SCAN_TOGGLE = 19 # Scan resolution H_RESOLUTION = 1280 V_RESOLUTION = 32 H_RESOLUTION_STEREO = 2 * H_RESOLUTION # Set plot style plt.style.use('dark_background') sensor = Sensor() live_mode(sensor) def live_mode(sensor): process = Process(target=sensor.grab_scan_continuous, args=(), daemon=True) process.start() while True: print(sensor.scan_available) if sensor.scan_available: self.plot.update_scan_only(self.sensor.current_scan_left, self.sensor.current_scan_right) class Sensor(): def __init__(self): # SHS configuration self.camera = picamera.PiCamera(stereo_mode ='side-by-side', stereo_decimate=False, resolution=(H_RESOLUTION_STEREO, V_RESOLUTION)) self.camera.awb_mode = 'off' self.camera.exposure_mode = 'off' self.camera.shutter_speed = 10000 self.video_port_enable = True # Scan LEDs self.scan_leds = ScanLED() # SensorOutput object self.output = SensorOutput() #Current scan self.current_scan_left = np.zeros(H_RESOLUTION) self.current_scan_right = np.zeros(H_RESOLUTION) #Scan available flag self.scan_available = False def grab_scan_continuous(self): #RemotePdb('127.0.0.1', 4444).set_trace() self.scan_leds.toggle.on() for frame in self.camera.capture_continuous(self.output, format='yuv', use_video_port=self.video_port_enable): RemotePdb('127.0.0.1', 4444).set_trace() if self.scan_leds.toggle.is_lit: scan_leds_on = self.output.scan self.scan_leds.toggle.off() else: scan_leds_off = self.output.scan scan_data = np.transpose(np.subtract(scan_leds_on, scan_leds_off.astype(np.int16)).clip(0, 255).astype(np.uint8)) self.current_scan_left = scan_data[:H_RESOLUTION] self.current_scan_right = scan_data[H_RESOLUTION:] #print(signal.find_peaks(scan_data.flatten(), minimum_peak_height, minimum_peak_width, minimum_peak_distance)) self.scan_available = True self.scan_leds.toggle.on() class Plot: def __init__(self): #Can improve efficieny of plotting function using knowledge at: https://bastibe.de/2013-05-30-speeding-up-matplotlib.html #Define pixel array for plotting purposes self.pixel_number = np.linspace(1, H_RESOLUTION, num=H_RESOLUTION) #Generate figure and axes and set size of figure self.fig, self.axs = plt.subplots(2,1, squeeze=True, sharex=True) self.fig.set_size_inches(12, 4) #Initialise left and right scan lines with random data for scan profile. Left is top and right is bottom, from perspective of sensor. self.scan_data_left, = self.axs.plot(self.pixel_number, np.random.rand(H_RESOLUTION), color='red') self.scan_data_right, = self.axs.plot(self.pixel_number, np.random.rand(H_RESOLUTION), color='red') #Initialise left and right master profile line with random data. Left is top and right is bottom, from perspective of sensor. #self.master_profile_left, = self.axs.plot(self.pixel_number, np.random.rand(H_RESOLUTION), color='silver') #self.master_profile_right, = self.axs.plot(self.pixel_number, np.random.rand(H_RESOLUTION), color='silver') #Initialise left and right peak markers with random data. Left is top and right is bottom, from perspective of sensor. self.peak_markers_left, = self.axs.plot(np.random.rand(1), np.random.rand(1), 'y|') self.peak_markers_right, = self.axs.plot(np.random.rand(1), np.random.rand(1), 'y|') #Set axis limits on the graphs self.axs.set_ylim(0, 265) self.axs.set_xlim(0, H_RESOLUTION) self.axs.set_ylim(0, 265) self.axs.set_xlim(0, H_RESOLUTION) #Show the figure (block=False disables the program break that is default for plt.show()) plt.show(block=False) def update_scan_only(self, scan_data_left, scan_data_right): #Refresh data for scan lines self.scan_data_left.set_ydata(scan_data_left) self.scan_data_right.set_ydata(scan_data_right) #Redraw the current figure with the new data self.fig.canvas.draw() self.fig.canvas.flush_events() class SensorOutput(object): def _init_(self): self.scan = np.empty(1, H_RESOLUTION_STEREO, dtype=np.uint8) def write(self, buf): # write will be called once for each frame of output. buf is a bytes # object containing the frame data in YUV420 format; we can construct a # numpy array on top of the Y plane of this data quite easily: y_data = np.frombuffer(buf, dtype=np.uint8, count=H_RESOLUTION_STEREO*V_RESOLUTION).reshape((V_RESOLUTION, H_RESOLUTION_STEREO)) self.scan = y_data[0, :H_RESOLUTION_STEREO] def flush(self): # this will be called at the end of the recording; do whatever you want # here pass class ScanLED: def __init__(self): # Define scan LED pins self.top = LED(LED_SCAN_0) self.middle = LED(LED_SCAN_1) self.bottom = LED(LED_SCAN_2) self.toggle = LED(LED_SCAN_TOGGLE) self.top.on() self.middle.on() self.bottom.on()