0

Consider:

from multiprocessing import Queue
from multiprocessing.managers import SyncManager

def make_manager(q_names, port):
    qs = [Queue() for _ in q_names]
    class MyManager(SyncManager):
        pass
    for q_name, q in zip(q_names, qs):
        MyManager.register(q_name, callable=lambda: q)

    auth = b'myauth'
    try:
        manager = MyManager(address=('', port), authkey=auth)
        manager.start()
    except:
        print('Could not start manager')

    return manager

manager = make_manager(['aa', 'bb'], 8000)
print(manager.aa())
print(manager.bb())

The output is:

<multiprocessing.queues.Queue object at 0x7f7ed1c83eb0>
<multiprocessing.queues.Queue object at 0x7f7ed1c83eb0>

Why do both methods return the same queue?

UPDATE: I figured out that the issue is about how lambda captures. Here is an example to illustrate:

ll = []
for i in range(2):
    ll.append(lambda: i)
for l in ll:
    print(l()) # 1

I am still not sure how to get around this, but will search again as it's not related to multiprocessing.

UPDATE: Found it: Lambda in a loop

AlwaysLearning
  • 7,257
  • 4
  • 33
  • 68
  • What is your os? – Charchit Agarwal Jul 15 '22 at 13:24
  • @Charchit Ubuntu under WSL – AlwaysLearning Jul 15 '22 at 13:27
  • I am voting to close my own question :) – AlwaysLearning Jul 15 '22 at 13:43
  • Any reason why you are trying to do it like this anyway? You can use `manager.Queue` to create a managed queue for you instead. Also there is no reason to use `multiprocessing.Queue` inside a managed process, you can just use `queue.Queue` for that. – Charchit Agarwal Jul 15 '22 at 13:45
  • @Charchit Given that several clients can connect to the same queue, is `queue.Queue` safe? – AlwaysLearning Jul 18 '22 at 08:52
  • If you use it with a manager, it is. This is because access to managed objects are handled through pipes internally anyway. In fact calling `manager.Queue` actually creates a `queue.Queue` rather than a `multiprocessing.Queue`. With that said, I am a little unsure whether it would be safe to do so with the method you are using (which, imo, is perhaps not the best) because you are sending over the entire queue instead of, for example, calling certain methods on the queue and passing their return values from the manager process to main. Best to test before committing. – Charchit Agarwal Jul 18 '22 at 12:51

2 Answers2

0

Regarding a workaround, instead of leaving it upto lamdba to do it for you, you can change your qs to this:

qs = [Queue for _ in q_names]

And change the lambda to this

lambda: q()

Since I do not have access to a UNIX device, I couldn't test this, so take it with a grain of salt

Charchit Agarwal
  • 2,829
  • 2
  • 8
  • 20
  • Won't you get a different queue each time you call `manager.aa()`? – AlwaysLearning Jul 15 '22 at 13:51
  • @AlwaysLearning was your goal to map a queue to a `typeid`, and extract it from different processes to get the same queue? If so then I think you might be reinventing the wheel here, you can use [Namespace](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.managers.Namespace) for that. It was built for these kinds of things – Charchit Agarwal Jul 15 '22 at 14:09
0

My own solution is as per the linked post:

def make_manager(q_names, port):
    class MyManager(SyncManager):
        pass
    for q_name in q_names:
        q = Queue()
        MyManager.register(q_name, callable=lambda q=q: q)

    auth = b'myauth'
    try:
        manager = MyManager(address=('', port), authkey=auth)
        manager.start()
    except:
        print('Could not start manager')

    return manager
AlwaysLearning
  • 7,257
  • 4
  • 33
  • 68