1

In my current project, I am trying to stop a video stream (the program is running on Raspberry PI), when I send a stop signal ( the program is running on my computer).

To achieve this objective, I have written a multiprocessing python code, which is running two processes.

  • Process 1 is reading data from a camera attached with the raspberry PI and send it to amazon Kinesis. It is using OpenCV library for raspberry PI to send images to Kinesis.

  • Process 2 is a subscriber (through AWS IoT), which waits for the stop signal from AWS IoT. When it receives the stop signal, it stops the streaming.

I have used commandFlag as a Flag. It is initialized with 0. Whenever, thread 2 receives the stop signal, the commandFlag is turned to 1.

In the current program, def on_message(client, userdata, msg): is not called. Thus, it is not stopping the video stream ( because commandFlag flag is not set to 1)

Moreover, it does not releases the attached camera resource.

Could you please suggest any other methods to resolve this issue? Do I need to make any other changes in the program to work it correctly?

  import sys
import cPickle
import datetime
import cv2
import boto3
import time
import cPickle
from multiprocessing import Pool
import pytz
import threading
import multiprocessing

#Subscriber
import paho.mqtt.client as paho
import os
import socket
import ssl

commandFlag = 0



# The unique hostname that &IoT; generated for this device.
awshost = "AAAAAA-ats.iot.us-east-1.amazonaws.com"
awsport = 8883

# A programmatic shadow handler name prefix.
clientId = "IoTDevice"
thingName = "IoTDevice"

# The relative path to the correct root CA file for &IoT;, which you have already saved onto this device.
caPath = "AmazonRootCA1.pem"

# The relative path to your certificate file that 
# &IoT; generated for this device, which you 
# have already saved onto this device.
certPath = "AAAAA-certificate.pem.crt"


# The relative path to your private key file that 
# &IoT; generated for this device, which you 
# have already saved onto this device.
keyPath = "AAAAAA-private.pem.key"


kinesis_client = boto3.client("kinesis", region_name='us-east-1')
rekog_client = boto3.client("rekognition",region_name='us-east-1')

camera_index = 0 # 0 is usually the built-in webcam
capture_rate = 30 # Frame capture rate.. every X frames. Positive integer.
rekog_max_labels = 123
rekog_min_conf = 50.0

def on_connect(client, userdata, flags, rc):
    global commandFlag
    print("I am ready to receive control message...." )
    client.subscribe("#" , 1 )

def on_message(client, userdata, msg):
    global commandFlag
    print("Received Switch Off message from AWS IoT....")
    commandFlag = 1


def subscriber():
    global commandFlag
    mqttc = paho.Client()
    mqttc.on_connect = on_connect
    mqttc.on_message = on_message
    mqttc.tls_set(caPath, certfile=certPath, keyfile=keyPath, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
    mqttc.connect(awshost, awsport, keepalive=60)
    mqttc.loop_forever()

#Send frame to Kinesis stream
def encode_and_send_frame(frame, frame_count, enable_kinesis=True, enable_rekog=False, write_file=False):
    try:
        #convert opencv Mat to jpg image
        #print "----FRAME---"
        retval, buff = cv2.imencode(".jpg", frame)

        img_bytes = bytearray(buff)

        utc_dt = pytz.utc.localize(datetime.datetime.now())
        now_ts_utc = (utc_dt - datetime.datetime(1970, 1, 1, tzinfo=pytz.utc)).total_seconds()

        frame_package = {
            'ApproximateCaptureTime' : now_ts_utc,
        'FrameCount' : frame_count,
            'ImageBytes' : img_bytes
        }

        if write_file:
            print("Writing file img_{}.jpg".format(frame_count))
            target = open("img_{}.jpg".format(frame_count), 'w')
            target.write(img_bytes)
            target.close()

        #put encoded image in kinesis stream
        if enable_kinesis:
            print "Sending image to Kinesis"
            response = kinesis_client.put_record(
                StreamName="FrameStream",
                Data=cPickle.dumps(frame_package),
                PartitionKey="partitionkey"
            )
            print response

        if enable_rekog:
            response = rekog_client.detect_labels(
                Image={
                    'Bytes': img_bytes
                },
                MaxLabels=rekog_max_labels,
                MinConfidence=rekog_min_conf
            )
            print response

    except Exception as e:
        print e


def main():
    global commandFlag
    #capture_rate   
    argv_len = len(sys.argv)

    if argv_len > 1 and sys.argv[1].isdigit():
        capture_rate = int(sys.argv[1])

    cap = cv2.VideoCapture(0) #Use 0 for built-in camera. Use 1, 2, etc. for attached cameras.
    pool = Pool(processes=3)

    frame_count = 0
    while True:
        # Capture frame-by-frame
        ret, frame = cap.read()
        #cv2.resize(frame, (640, 360));

        if ret is False:
            break

        if frame_count % 30 == 0:
        result = pool.apply_async(encode_and_send_frame, (frame, frame_count, True, False, False,))

        frame_count += 1

        # Display the resulting frame
        cv2.imshow('frame', frame)

        #if cv2.waitKey(1) & 0xFF == ord('q'):
        if commandFlag == 1:
          break;

    # When everything done, release the capture
    cap.release()
    cv2.destroyAllWindows()
    return


if __name__ == '__main__':

    t1 = multiprocessing.Process(target=main)
    t1.start()
    t2 = multiprocessing.Process(target=subscriber)
    t2.start()

    while True:
        if commandFlag == 1:
           t1.terminate()
           t2.terminate()
           sys.exit(1)
user-517752
  • 1,188
  • 5
  • 21
  • 54
  • try setting the threads as `daemon`, you can do so by using `t1.daemon = True` and `t2.daemon = True`, this way the threads will end once the program ends. – Axois Aug 15 '19 at 12:43
  • @Axois: Thanks for the response! I tried it before. But I may not have set correctly. Therefore, let me request you -- Where should I set these two setting? Do I need to make any other changes in the program to work it correctly? – user-517752 Aug 15 '19 at 12:47
  • `t1 = threading.Thread(target=main , daemon = True)` `t2 = threading.Thread(target=subscriber, daemon = True)` you also wont have to call `t1.join()` and `t2.join()` because the threads will end once your script ends. – Axois Aug 15 '19 at 12:51
  • @Axois : When I use the `t1 = threading.Thread(target=main , daemon = True)` syntax, I am getting error `TypeError: __init()___ got an unexpected keyword argument 'daemon'` – user-517752 Aug 15 '19 at 13:05
  • what is your python version that you are using? IIRC this command is only possible for the later versions of python. If you are using an earlier version, you can set it by using `t1.daemon = True` but did that work for you ? – Axois Aug 15 '19 at 13:08
  • The python version is Python 2.7.13. However `t1.daemon = True` and `t2.daemon=True` syntax work. – user-517752 Aug 15 '19 at 13:11
  • when I apply `t1.daemon = True` and `t2.daemon=True` syntax, the python program terminate promptly without executing the thread code. – user-517752 Aug 15 '19 at 13:13
  • I realised that this is not a problem of daemon threads, you have multiprocessing in your code. You should see [this](https://stackoverflow.com/questions/47903791/how-to-terminate-a-multiprocess-in-python-when-a-given-condition-is-met) to terminate it – Axois Aug 15 '19 at 13:23
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/197986/discussion-between-pankesh-and-axois). – user-517752 Aug 15 '19 at 13:28

0 Answers0