Multi Processing
Multiprocessing refers to the ability of a system to support more than one processor
at the same time.
Applications in a multiprocessing system are broken to smaller routines
that run independently. The operating system allocates these threads to the processors improving performance of the system.
Python provides the built-in package
called multiprocessing
which supports swapping processes.
import multiprocessing as mp
print(mp.cpu_count())
Python provides several tools for implementing multiprocessing via the package with that very name, which includes Process
, Lock
, Queue
, and Pool
.
Process
This is the basic unit of execution in Python. Every process has its copy of the Python interpreter and its memory space, allowing multiple jobs to get executed simultaneously without conflicts.
The Process class is handled as follows:
- the current process gets copied via forked
- the creation of a new process identifier;
- the task runs as an independent child process.
Specifically, and as the following example shows, the Process is managed via two functions, start()
and join()
.
# start()
import time
import multiprocessing as mp
def task1(sleep_sec):
print(f'Sleeping for {sleep_sec} seconds')
time.sleep(sleep_sec)
print(f'Finished sleeping: {sleep_sec}')
if __name__ == "__main__":
start_time = time.perf_counter()
p1 = mp.Process(target=task1, args=(0.5,))
p2 = mp.Process(target=task1, args=(0.5,))
p1.start()
p2.start()
finish_time = time.perf_counter()
print(f"Program finished in {(finish_time - start_time):.3f} seconds")
Notice that although two calls to sleep were executed, one would expect that the program would at least take 0.5 + 0.5 = 1 sec but it took less.
# start() + join()
import time
import multiprocessing as mp
def task1(sleep_sec):
print(f'Sleeping for {sleep_sec} seconds')
time.sleep(sleep_sec)
print(f'Finished sleeping: {sleep_sec}')
if __name__ == "__main__":
start_time = time.perf_counter()
p1 = mp.Process(target=task1, args=(3,))
p2 = mp.Process(target=task1, args=(3,))
p1.start()
p2.start()
p1.join()
p2.join()
finish_time = time.perf_counter()
print(f"Program finished in {(finish_time - start_time):.3f} seconds"
# Instead of (10 * 2) sec it takes just 2 sec
import time
import multiprocessing as mp
def square(index, value):
time.sleep(2)
value[index] = value[index] ** 2
if __name__ == '__main__':
arr = mp.Array('i', [2, 3, 6, 7, 8, 8, 9, 3, 3, 3])
process = []
for i in range(10):
m = mp.Process(target=square, args=(i, arr))
process.append(m)
m.start()
for m in process:
m.join()
print(list(arr))
Pool
A pool is a collection of processes used to execute tasks in parallel. Pools help divide an enormous task into smaller parts that multiple processors can handle.
import multiprocessing as mp
def square(n):
return n**2
if __name__ == '__main__':
with mp.Pool(processes=5) as pool :
out =pool.map(square , [3,4,5,6,6,7,87,8,8])
print(out)
#[9, 16, 25, 36, 36, 49, 7569, 64, 64]
import multiprocessing as mp
import time
def work_log(work_data):
print(" Process %s waiting %s seconds" % (work_data[0], work_data[1]))
time.sleep(int(work_data[1]))
print(" Process %s Finished." % work_data[0])
if __name__ == '__main__':
work = (["A", 5], ["B", 2], ["C", 1], ["D", 3])
p = mp.Pool(4)
p.map(work_log, work)
With the Pool set to have nworkers=4
and the list has 4 elements, which means each component is executed simultaneously.
Hence, notice the print statement shows the ordering of each finishing being the same order in seconds set to sleep.
Queue
Allow processes to communicate
with each other.
For example, data can be placed on a queue and processed by another processor when it becomes available, allowing us to break up a task into smaller parts that can be processed simultaneously.
put()
- Elements are inserted to queue using put methodget()
- Get and del element from the queue
import multiprocessing as mp
import time
# pushing items to queue
def producer(q):
for i in ["sudh", "kumar", "pwskills", "krish", "naik"]:
time.sleep(1.5)
print(f'Entry: {i}')
q.put(i)
# popping items from queue
def consume(q):
# while True:
while not queue.empty()
time.sleep(0.5)
print('- ', end='')
item = q.get()
if item is None:
break
print(f'Exit: {item}')
if __name__ == '__main__':
queue = mp.Queue()
m1 = mp.Process(target=producer, args=(queue,))
m2 = mp.Process(target=consume, args=(queue,))
m1.start()
m2.start()
queue.put("xyz")
m1.join()
m2.join()
Pipes
- A pipe can have only two endpoints. Hence, it is preferred over queue when only two-way communication is required.
Multiprocessing module provides Pipe() function which returns a pair of connection objects connected by a pipe. The two connection objects
returned by Pipe() represent the two ends
of the pipe. Each connection object has send() and recv() methods (among others).
import multiprocessing as mp
import time
def sender(conn, msg):
for i in msg:
time.sleep(1)
conn.send(i)
conn.close()
def receive(conn):
while True:
try:
msg = conn.recv()
except Exception as e:
print(e)
break
print(msg)
if __name__ == '__main__':
msg = ["my name is sudh", "this is my msg to my students ", "i am taking class for dsm ",
"try to practice all the code "]
conn1, conn2 = mp.Pipe()
m1 = mp.Process(target=sender, args=(conn2, msg))
m2 = mp.Process(target=receive, args=(conn1,))
m1.start()
m2.start()
m1.join()
conn2.close()
m2.join()
conn1.close()