Programming
python multiprocessing queue
Updated Fri, 26 Aug 2022 09:37:20 GMT

Python Multiprocessing Controlled Bidirectional Queue


I was trying to look for a Bidirectional/Omnidirectional Queue to send jobs back and forth between processes.

the best solution I could come up with was to use two multiprocessing queues that are filled from one process and read through the other (or a Pipe which is apparently faster, still haven't tried it yet).

I came across this answer that describes the difference between a Pipe and a Queue, it states that

A Queue() can have multiple producers and consumers.

I know a queue can be shared between multiple processes( > 2 processes ), but how should I organize the communication between the processes so that a message has a targeted process, or at least the process does not read the jobs it inserted to the queue, and how I scale it to more than 2 processes.

EX: I have 2 (or more) Processes (A, B) they they share the same Queue, A needs to send a job to B and B sends a job to A, if I simply use queue.put(job), the job might be read from either processes depending on who called queue.get() first, so the job that was put by A intended to B might be read by A, which is not the targeted process, if I added a flag of which process it should be executed by, it would destroy the sequentiality of the queue.




Solution

For those facing the same problem, I have found the solution, it is multiprocessing.Pipe() it is faster than queues but it only works if you have 2 processes.

Here is a simple example to help

import multiprocessing as mp
from time import time
def process1_function(conn, events):
    for event in events:
        # send jobs to the process_2
        conn.send((event, time()))
        print(f"Event Sent: {event}")
        # check if there are any messages in the pipe from process_2
        if conn.poll():
            # read the message from process_2
            print(conn.recv())
    # continue checking the messages in the pipe from process_2
    while conn.poll():
        print(conn.recv())
def process2_function(conn):
    while True:
        # check if there are any messages in the pipe from process_1
        if conn.poll():
            # read messages in the pipe from process_1
            event, sent = conn.recv()
            # send messages to process_1
            conn.send(f"{event} complete, {time() - sent}")
            if event == "eod":
                break
    conn.send("all events finished")
def run():
    events = ["get up", "brush your teeth", "shower", "work", "eod"]
    conn1, conn2 = mp.Pipe()
    process_1 = mp.Process(target=process1_function, args=(conn1, events))
    process_2 = mp.Process(target=process2_function, args=(conn2,))
    process_1.start()
    process_2.start()
    process_1.join()
    process_2.join()
if __name__ == "__main__":
    run()