Multi Processing

Multiprocessing refers to the ability of a system to support more than one processor at the same time.

Applications in a multiprocessing system are broken to smaller routines that run independently. The operating system allocates these threads to the processors improving performance of the system.

Python provides the built-in package called multiprocessing which supports swapping processes.

import multiprocessing as mp
print(mp.cpu_count())

Python provides several tools for implementing multiprocessing via the package with that very name, which includes Process, Lock, Queue, and Pool.

Process

This is the basic unit of execution in Python. Every process has its copy of the Python interpreter and its memory space, allowing multiple jobs to get executed simultaneously without conflicts.

The Process class is handled as follows:

  1. the current process gets copied via forked
  2. the creation of a new process identifier;
  3. the task runs as an independent child process.

Specifically, and as the following example shows, the Process is managed via two functions, start() and join().

# start()
import time
import multiprocessing as mp

def task1(sleep_sec):
    print(f'Sleeping for {sleep_sec} seconds')
    time.sleep(sleep_sec)
    print(f'Finished sleeping: {sleep_sec}')


if __name__ == "__main__":
    start_time = time.perf_counter()
    p1 = mp.Process(target=task1, args=(0.5,))
    p2 = mp.Process(target=task1, args=(0.5,))
    p1.start()
    p2.start()
    finish_time = time.perf_counter()
    print(f"Program finished in {(finish_time - start_time):.3f} seconds")

multiprocessing-start

Notice that although two calls to sleep were executed, one would expect that the program would at least take 0.5 + 0.5 = 1 sec but it took less.

# start() + join()
import time
import multiprocessing as mp

def task1(sleep_sec):
    print(f'Sleeping for {sleep_sec} seconds')
    time.sleep(sleep_sec)
    print(f'Finished sleeping: {sleep_sec}')


if __name__ == "__main__":
    start_time = time.perf_counter()
    p1 = mp.Process(target=task1, args=(3,))
    p2 = mp.Process(target=task1, args=(3,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    finish_time = time.perf_counter()
    print(f"Program finished in {(finish_time - start_time):.3f} seconds"

multiprocessing-join

# Instead of (10 * 2) sec it takes just 2 sec

import time
import multiprocessing as mp

def square(index, value):
    time.sleep(2)
    value[index] = value[index] ** 2


if __name__ == '__main__':
    arr = mp.Array('i', [2, 3, 6, 7, 8, 8, 9, 3, 3, 3])
    process = []
    for i in range(10):
        m = mp.Process(target=square, args=(i, arr))
        process.append(m)
        m.start()
    for m in process:
        m.join()
    print(list(arr))

Pool

A pool is a collection of processes used to execute tasks in parallel. Pools help divide an enormous task into smaller parts that multiple processors can handle.

import multiprocessing as mp

def square(n):
    return n**2

if __name__ == '__main__':
    with mp.Pool(processes=5) as pool :
        out =pool.map(square , [3,4,5,6,6,7,87,8,8])
        print(out)
#[9, 16, 25, 36, 36, 49, 7569, 64, 64]
import multiprocessing as mp
import time

def work_log(work_data):
    print(" Process %s waiting %s seconds" % (work_data[0], work_data[1]))
    time.sleep(int(work_data[1]))
    print(" Process %s Finished." % work_data[0])


if __name__ == '__main__':
    work = (["A", 5], ["B", 2], ["C", 1], ["D", 3])
    p = mp.Pool(4)
    p.map(work_log, work)

With the Pool set to have nworkers=4 and the list has 4 elements, which means each component is executed simultaneously.

Hence, notice the print statement shows the ordering of each finishing being the same order in seconds set to sleep.

Queue

Allow processes to communicate with each other.

For example, data can be placed on a queue and processed by another processor when it becomes available, allowing us to break up a task into smaller parts that can be processed simultaneously.

  • put() - Elements are inserted to queue using put method
  • get() - Get and del element from the queue
import multiprocessing as mp
import time

# pushing items to queue
def producer(q):
    for i in ["sudh", "kumar", "pwskills", "krish", "naik"]:
        time.sleep(1.5)
        print(f'Entry: {i}')
        q.put(i)


# popping items from queue
def consume(q):
    # while True:
    while not queue.empty()
        time.sleep(0.5)
        print('- ', end='')
        item = q.get()
        if item is None:
            break
        print(f'Exit: {item}')


if __name__ == '__main__':
    queue = mp.Queue()
    m1 = mp.Process(target=producer, args=(queue,))
    m2 = mp.Process(target=consume, args=(queue,))
    m1.start()
    m2.start()
    queue.put("xyz")
    m1.join()
    m2.join()

Pipes

  • A pipe can have only two endpoints. Hence, it is preferred over queue when only two-way communication is required.

Multiprocessing module provides Pipe() function which returns a pair of connection objects connected by a pipe. The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others).

import multiprocessing as mp
import time

def sender(conn, msg):
    for i in msg:
        time.sleep(1)
        conn.send(i)
    conn.close()


def receive(conn):
    while True:
        try:
            msg = conn.recv()
        except Exception as e:
            print(e)
            break
        print(msg)


if __name__ == '__main__':
    msg = ["my name is sudh", "this is my msg  to my students ", "i am taking class for dsm ",
           "try to practice all the code "]
    conn1, conn2 = mp.Pipe()
    m1 = mp.Process(target=sender, args=(conn2, msg))
    m2 = mp.Process(target=receive, args=(conn1,))
    m1.start()
    m2.start()
    m1.join()
    conn2.close()
    m2.join()
    conn1.close()

Reference