Continuous Capture in the Background
Posted: Sat Oct 24, 2020 6:51 pm
In my application I take a horizontal intensity profile of an array of sewing pins. My setup has a set of LEDs that are toggled on and off in between frame captures. The OFF scan is subtracted from the ON scan to reduce background noise, and therefore a full scan is only produced every two capture cycles. The resultant scan is plotted on a MatPlotLib graph.
What I would like to achieve is that the capture_continuous function runs in the background, and the graph is only updated when a full scan has been produced. I understand that a simple way to do this would be to only have the graph update when the OFF scan is captured. However, I am trying to separate the capture process from the graph plotting process.
I envision that there will eventually be a GUI where the graph is always visible, and only starts updating when you click a 'LIVE MODE' button.
My initial idea had been to have the capture_continuous process running as a multiprocessing process and set a boolean to True every time it had produced a full scan. On top of this the graph would be continuously checking the state of the boolean so that it would update every time the boolean was set to True, set it back to False and update the graph. I am not having much luck with this as the capture_continuous process doesn't seem to run when it is run as a multiprocessing process, meaning that the boolean always stays as False and the graph never updates. I have confirmed this using a remote_pdb breakpoint. The capture_continuous process seems to run fine when it is just called as a function as I can see that the LEDs rapidly toggle state.
My questions are:
Am I doing something wrong?
Is there a better way to achieve this? I have seen something called the observer pattern that seems like it might be appropriate?
What I would like to achieve is that the capture_continuous function runs in the background, and the graph is only updated when a full scan has been produced. I understand that a simple way to do this would be to only have the graph update when the OFF scan is captured. However, I am trying to separate the capture process from the graph plotting process.
I envision that there will eventually be a GUI where the graph is always visible, and only starts updating when you click a 'LIVE MODE' button.
My initial idea had been to have the capture_continuous process running as a multiprocessing process and set a boolean to True every time it had produced a full scan. On top of this the graph would be continuously checking the state of the boolean so that it would update every time the boolean was set to True, set it back to False and update the graph. I am not having much luck with this as the capture_continuous process doesn't seem to run when it is run as a multiprocessing process, meaning that the boolean always stays as False and the graph never updates. I have confirmed this using a remote_pdb breakpoint. The capture_continuous process seems to run fine when it is just called as a function as I can see that the LEDs rapidly toggle state.
My questions are:
Am I doing something wrong?
Is there a better way to achieve this? I have seen something called the observer pattern that seems like it might be appropriate?
Code: Select all
from gpiozero import LED
import numpy as np
from time import sleep
import picamera
import matplotlib.pyplot as plt
from scipy import signal
import csv
import os
from multiprocessing import Process, current_process
from remote_pdb import RemotePdb
#LED Pins
LED_STATUS_RED = 18
LED_STATUS_GREEN = 15
LED_STATUS_BLUE = 14
LED_SCAN_0 = 21
LED_SCAN_1 = 20
LED_SCAN_2 = 16
LED_SCAN_TOGGLE = 19
# Scan resolution
H_RESOLUTION = 1280
V_RESOLUTION = 32
H_RESOLUTION_STEREO = 2 * H_RESOLUTION
# Set plot style
plt.style.use('dark_background')
sensor = Sensor()
live_mode(sensor)
def live_mode(sensor):
process = Process(target=sensor.grab_scan_continuous, args=(), daemon=True)
process.start()
while True:
print(sensor.scan_available)
if sensor.scan_available:
self.plot.update_scan_only(self.sensor.current_scan_left, self.sensor.current_scan_right)
class Sensor():
def __init__(self):
# SHS configuration
self.camera = picamera.PiCamera(stereo_mode ='side-by-side', stereo_decimate=False, resolution=(H_RESOLUTION_STEREO, V_RESOLUTION))
self.camera.awb_mode = 'off'
self.camera.exposure_mode = 'off'
self.camera.shutter_speed = 10000
self.video_port_enable = True
# Scan LEDs
self.scan_leds = ScanLED()
# SensorOutput object
self.output = SensorOutput()
#Current scan
self.current_scan_left = np.zeros(H_RESOLUTION)
self.current_scan_right = np.zeros(H_RESOLUTION)
#Scan available flag
self.scan_available = False
def grab_scan_continuous(self):
#RemotePdb('127.0.0.1', 4444).set_trace()
self.scan_leds.toggle.on()
for frame in self.camera.capture_continuous(self.output, format='yuv', use_video_port=self.video_port_enable):
RemotePdb('127.0.0.1', 4444).set_trace()
if self.scan_leds.toggle.is_lit:
scan_leds_on = self.output.scan
self.scan_leds.toggle.off()
else:
scan_leds_off = self.output.scan
scan_data = np.transpose(np.subtract(scan_leds_on, scan_leds_off.astype(np.int16)).clip(0, 255).astype(np.uint8))
self.current_scan_left = scan_data[:H_RESOLUTION]
self.current_scan_right = scan_data[H_RESOLUTION:]
#print(signal.find_peaks(scan_data.flatten(), minimum_peak_height, minimum_peak_width, minimum_peak_distance))
self.scan_available = True
self.scan_leds.toggle.on()
class Plot:
def __init__(self):
#Can improve efficieny of plotting function using knowledge at: https://bastibe.de/2013-05-30-speeding-up-matplotlib.html
#Define pixel array for plotting purposes
self.pixel_number = np.linspace(1, H_RESOLUTION, num=H_RESOLUTION)
#Generate figure and axes and set size of figure
self.fig, self.axs = plt.subplots(2,1, squeeze=True, sharex=True)
self.fig.set_size_inches(12, 4)
#Initialise left and right scan lines with random data for scan profile. Left is top and right is bottom, from perspective of sensor.
self.scan_data_left, = self.axs[0].plot(self.pixel_number, np.random.rand(H_RESOLUTION), color='red')
self.scan_data_right, = self.axs[1].plot(self.pixel_number, np.random.rand(H_RESOLUTION), color='red')
#Initialise left and right master profile line with random data. Left is top and right is bottom, from perspective of sensor.
#self.master_profile_left, = self.axs[0].plot(self.pixel_number, np.random.rand(H_RESOLUTION), color='silver')
#self.master_profile_right, = self.axs[1].plot(self.pixel_number, np.random.rand(H_RESOLUTION), color='silver')
#Initialise left and right peak markers with random data. Left is top and right is bottom, from perspective of sensor.
self.peak_markers_left, = self.axs[0].plot(np.random.rand(1), np.random.rand(1), 'y|')
self.peak_markers_right, = self.axs[1].plot(np.random.rand(1), np.random.rand(1), 'y|')
#Set axis limits on the graphs
self.axs[0].set_ylim(0, 265)
self.axs[0].set_xlim(0, H_RESOLUTION)
self.axs[1].set_ylim(0, 265)
self.axs[1].set_xlim(0, H_RESOLUTION)
#Show the figure (block=False disables the program break that is default for plt.show())
plt.show(block=False)
def update_scan_only(self, scan_data_left, scan_data_right):
#Refresh data for scan lines
self.scan_data_left.set_ydata(scan_data_left)
self.scan_data_right.set_ydata(scan_data_right)
#Redraw the current figure with the new data
self.fig.canvas.draw()
self.fig.canvas.flush_events()
class SensorOutput(object):
def _init_(self):
self.scan = np.empty(1, H_RESOLUTION_STEREO, dtype=np.uint8)
def write(self, buf):
# write will be called once for each frame of output. buf is a bytes
# object containing the frame data in YUV420 format; we can construct a
# numpy array on top of the Y plane of this data quite easily:
y_data = np.frombuffer(buf, dtype=np.uint8, count=H_RESOLUTION_STEREO*V_RESOLUTION).reshape((V_RESOLUTION, H_RESOLUTION_STEREO))
self.scan = y_data[0, :H_RESOLUTION_STEREO]
def flush(self):
# this will be called at the end of the recording; do whatever you want
# here
pass
class ScanLED:
def __init__(self):
# Define scan LED pins
self.top = LED(LED_SCAN_0)
self.middle = LED(LED_SCAN_1)
self.bottom = LED(LED_SCAN_2)
self.toggle = LED(LED_SCAN_TOGGLE)
self.top.on()
self.middle.on()
self.bottom.on()