φυβλαςのβλογ
phyblas的博客



[python] การใช้ multiprocessing เพื่อให้โปรแกรมทำงานหลายงานพร้อมกัน
เขียนเมื่อ 2018/03/17 18:31
แก้ไขล่าสุด 2024/02/22 10:55
ปกติเวลาที่รันโปรแกรมในไพธอนโปรแกรมจะทำตามคำสั่งตามลำดับทีละขั้นตอนโดยตลอดโปรแกรมถือว่าเป็นงานงานเดียวที่รันใน CPU ตัวเดียว

การที่โปรแกรมรันไปตามลำดับแบบนี้มีข้อดีคือเป็นระบบระเบียบ แต่ว่าในบางครั้งเราอาจจะมีงานบางอย่างที่ไม่จำเป็นต้องรอให้อีกงานนึงจบก่อนก็ได้ กรณีแบบนี้ถ้าสามารถรันโปรแกรมให้มันทำอะไรหลายๆไปพร้อมๆกันได้ก็จะทำให้การทำงานเร็วขึ้นมาก

ปกติแล้วคอมพิวเตอร์มีความสามารถที่จะทำงานหลายๆอย่างในเวลาเดียวกันได้อยู่แล้ว แต่ปัญหาคือหากเขียนโปรแกรมไพธอนด้วยวิธีทั่วไปยังไงก็รันได้ทีละงานในเวลาเดียว หากอยากรันหลายงานก็ต้องมาสั่งหลายๆครั้งเอง เช่นเปิดหน้าคอมมานด์ไลน์มาหลายๆอันแล้วรันไปทีละอัน แต่แบบนั้นต้องมาคอยเปิดหลายอัน อีกทั้งหากทำแบบนั้นก็จะเป็นการรันคนละโปรแกรม ไม่สามารถเอามาเชื่อมโยงกันได้โดยตรง

ยังดีที่ไพธอนมีเตรียมคำสั่งที่ใช้สำหรับรันหลายงานในเวลาเดียวกันได้โดยอัตโนมัติในโปรแกรมเดียว คำสั่งนั้นอยู่ในมอดูลชื่อ multiprocessing เป็นมอดูลติดตัวที่มีอยู่ในไพธอนอยู่แล้ว สามารถเรียกใช้ได้เลย



การใช้งาน
ขอเริ่มด้วยการยกตัวอย่างการทำงานอย่างง่าย
import multiprocessing as mp

def job():
    print('เริ่มทำงาน')
    for i in range(1000000):
        2**2
    print('งานเสร็จแล้ว')

if(__name__=='__main__'):
    p = mp.Process(target=job)
    p.start()
    print('สั่งงาน')

ผลที่ได้
เริ่มทำงาน
สั่งงาน
งานเสร็จแล้ว

จะเห็นว่าขั้นตอนการใช้ก็คือ ให้นิยามฟังก์ชันของงานที่เราต้องการให้มันทำ จากนั้นให้สร้างอ็อบเจ็กต์ multiprocessing.Process ขึ้น (ในที่นี้ย่อเป็น mp.) โดยใส่ค่า target เป็นตัวฟังก์ชันของงานตัวนั้น จากนั้นก็ใช้เมธอด .start() จะเป็นการเริ่มทำงานในฟังก์ชันนั้นทันที

สำหรับ if(__name__=='__main__'): ในที่นี้เป็นธรรมเนียมปฏิบัติที่มักจะต้องเขียนประจำเวลาใช้งาน multiprocessing เพื่อให้คำสั่งนี้ถูกเรียกใช้เฉพาะเวลาที่ถูกรันโดยตรง ไม่ใช่ถูกเรียกในฐานะมอดูล ซึ่งจะป้องกันปัญหาที่อาจเกิดขึ้นได้

เกี่ยวกับเรื่องรูปแบบการเขียนแบบนี้มีเขียนไว้ใน https://phyblas.hinaboshi.com/tsuchinoko35

ซึ่งผลที่ได้จะเห็นว่าข้อความสุดท้ายใน main ถูกพิมพ์ออกมาก่อนทั้งๆที่สั่ง p.start() ไปแล้ว นั่นเพราะหลังจากสั่งเริ่มงานไปใน job จะเจองานที่ต้องใช้เวลาทำให้คำสั่ง print ตัวหลังมาช้าลงไป แต่ คำสั่ง print ใน main ที่ตามมานั้นต่อมาทันที

นี่แสดงให้เห็นว่าคำสั่งที่ตามต่อมาหลังจากสั่งเริ่มงานไปแล้วนั้นมันไม่ได้รอให้งานเสร็จก่อนจึงทำงาน งานถูกสั่งทิ้งไว้แล้วโปรแกรมหลักก็ไปทำอะไรอย่างอื่นต่อได้ทันที

กรณีที่ฟังก์ชันนั้นต้องการอาร์กิวเมนต์ ให้ใส่ลงในค่า args แทนที่จะเรียก job(1,2) แบบฟังก์ชันทั่วไป คือเขียนแบบนี้
def job(a,b):
    print(a+b)

if(__name__=='__main__'):
    p = mp.Process(target=job,args=(1,2))
    p.start()

จากนั้นลองดูกรณีที่มีการรัน ๒ งานพร้อมกัน เช่นแบบนี้
import time

def job(a,x):
    for i in range(5):
        x += 1
        time.sleep(0.5)
        print('%s=%d'%(a,x))

if(__name__=='__main__'):
    p1 = mp.Process(target=job,args=('x',1))
    p2 = mp.Process(target=job,args=('y',11))
    p1.start()
    p2.start()
    print('สั่งงานไปแล้ว')

รันแล้วจะได้
สั่งงานไปแล้ว
x=2
y=12
x=3
y=13
x=4
y=14
x=5
y=15
x=6
y=16

ในที่นี้มีการใช้คำสั่ง time.sleep นี่เป็นคำสั่งสำหรับทำให้โปรแกรมพักการทำงานตามเวลาที่กำหนด หน่วยเป็นวินาที ที่ใส่ลงไปก็เพื่อให้เห็นความแตกต่างของลำดับการทำงานชัดเจน

ผลที่ได้จะเห็นว่าทั้งสองงานถูกทำไปพร้อมๆกัน จึงเห็นค่าถูกพิมพ์ออกมาสลับกันไปแบบนี้

หากต้องการรันหลายงานพร้อมกันก็อาจเขียนแบบนี้ได้
def job(a):
    for i in range(1,4):
        time.sleep(0.5)
        print('งานที่ %d รอบที่ %d'%(a,i))

if(__name__=='__main__'):
    for j in range(1,6):
        p = mp.Process(target=job,args=(j,))
        p.start()
    print('สั่งงานไปแล้ว')

ผลที่ได้
สั่งงานไปแล้ว
งานที่ 1 รอบที่ 1
งานที่ 2 รอบที่ 1
งานที่ 3 รอบที่ 1
งานที่ 4 รอบที่ 1
งานที่ 5 รอบที่ 1
งานที่ 1 รอบที่ 2
งานที่ 2 รอบที่ 2
งานที่ 3 รอบที่ 2
งานที่ 4 รอบที่ 2
งานที่ 5 รอบที่ 2
งานที่ 2 รอบที่ 3
งานที่ 1 รอบที่ 3
งานที่ 3 รอบที่ 3
งานที่ 4 รอบที่ 3
งานที่ 5 รอบที่ 3


จะเห็นว่าแต่ละรอบของทุกงานทำเสร็จก่อนจึงเริ่มงานของรอบถัดไป แสดงให้เห็นว่าทุกงานทำไปพร้อมๆกัน ไม่ได้รองานนึงเสร็จค่อยทำอีกงาน



การใช้ join
หากมีคำสั่งบางอย่างที่ต้องการรอให้งานที่สั่งไปเสร็จก่อนแล้วค่อยทำ จำเป็นจะต้องใช้เมธอด join

ตัวอย่างการใช้
def job(a):
    for i in range(2):
        time.sleep(0.5)
        print('งานที่ %d รอบที่ %d'%(a,i+1))

if(__name__=='__main__'):
    pp = []
    for j in range(4):
        p = mp.Process(target=job,args=(j+1,))
        p.start()
        pp.append(p)
    print('เริ่มงานได้แล้ว')
    for p in pp:
        p.join()
    print('เก็บกวาดหลังเลิกงาน')

ผลที่ได้
เริ่มงานได้แล้ว
งานที่ 1 รอบที่ 1
งานที่ 2 รอบที่ 1
งานที่ 3 รอบที่ 1
งานที่ 4 รอบที่ 1
งานที่ 1 รอบที่ 2
งานที่ 2 รอบที่ 2
งานที่ 3 รอบที่ 2
งานที่ 4 รอบที่ 2
เก็บกวาดหลังเลิกงาน

จะเห็นว่า print ที่เขียนก่อน join จะทำงานก่อน แต่ print ที่อยู่หลัง join จะรอให้ทำงานจบ

หากทุกงานที่สั่งไปมีการใช้ join โปรแกรมก็จะรอให้ทุกงานเสร็จทั้งหมดก่อน



การใช้ queue
ปกติแล้วค่าตัวแแปรต่างๆที่อยู่ภายในฟังก์ชันของงานนั้นไม่สามารถที่จะคืนกลับมายังโปรแกรมหลักได้ เพราะเราไม่ได้เขียนคำสั่งในรูปแบบของ x = job() แบบนี้ แต่เวลารันเราเริ่มด้วยคำสั่ง p.start() ต่อให้เขียน return ไปก็ไม่ได้อะไรกลับมา เช่น
def job(a):
    return a

if(__name__=='__main__'):
    p = mp.Process(target=job,args=(1,))
    a = p.start()
    print(a)

ได้
None

ดังนั้นต้องใช้วิธีอื่นในการที่จะได้ตัวแปรกลับออกมาจากงานที่ทำ วิธีหนึ่งที่สามารถใช้ได้ก็คือใช้ Queue

สามารถเขียนได้แบบนี้
def job(a,q):
    q.put(a**3)

if(__name__=='__main__'):
    q = mp.Queue()
    p = mp.Process(target=job,args=(2,q))
    p.start()
    a = q.get()
    print(a)

ก่อนอื่นตอนนิยามฟังก์ชันของงานต้องใส่ตัวแปรนึงสำหรับรับค่าไปด้วย จากนั้นใช้เมธอด put เพื่อป้อนค่าที่ต้องการให้คืนกลับไป

แล้วในโปรแกรมหลักให้สร้างตัวแปรนึงขึ้นมาเป็นออบเจ็กต์ของ mp.Queue แล้วป้อนใส่มันลงไปในอาร์กิวเมนต์ของฟังก์ชันด้วย จากนั้นก็สามารถเอาค่าที่คืนกลับมาได้โดยใช้เมธอด get

จะใช้ put กี่ครั้งก็ได้ในฟังก์ชัน แล้วเวลา get ก็จะได้ค่าออกมาทีละค่า เรียงตามลำดับ
def job(a,b,q):
    q.put(a*b)
    q.put(a**b)

if(__name__=='__main__'):
    q = mp.Queue()
    p = mp.Process(target=job,args=(2,3,q))
    p.start()
    print(q.get())
    print(q.get())

ได้
6
8


เพียงแต่ข้อควรระวังคือมีการใช้ put กี่ครั้งก็ต้อง get เป็นจำนวนครั้งเท่านั้น หาก get เกินโปรแกรมจะหยุดทำงานไปเลย

นั่นเพราะถ้าเราใช้ get เมื่อไหร่โปรแกรมหลักจะหยุดรอให้มีการ put เกิดขึ้นในฟังก์ชันจึงรับค่ามาแล้วทำงานต่อ ไม่งั้นโปรแกรมจะไม่เดินต่อไป

กรณีที่ส่งค่า q ไปแล้วแต่ไม่มีการ put สักที เช่นแบบนี้ โปรแกรมจะรอไปตลอดกาล ไม่ขยับไปไหน
def job(q):
    ropaithoe = 'รอไปเถอะ'
    print(ropaithoe)

if(__name__=='__main__'):
    q = mp.Queue()
    p = mp.Process(target=job,args=(q,))
    p.start()
    a = q.get()

ดังนั้นจึงต้องระวังด้วย เวลาใช้วิธีนี้

กรณีที่รันสองงานขึ้นไปพร้อมกันโดยใช้ q ตัวเดียวกัน ค่าที่ได้จะเรียงตามลำดับว่าใคร put ก่อนกัน ไม่ได้อยู่ที่ใครเริ่มต้นก่อน เช่น
def job(s,ro,q):
    time.sleep(ro)
    q.put(s)

if(__name__=='__main__'):
    q = mp.Queue()
    p1 = mp.Process(target=job,args=('p1',3,q))
    p1.start()
    p2 = mp.Process(target=job,args=('p2',2,q))
    p2.start()
    print(q.get())
    print(q.get())


จะได้
p2
p1

นั่นเพราะว่า p2 สั่งให้รอแค่ 2 วินาที แต่ p1 รอ 3 วินาที ดังนั้น p2 จึงเสร็จก่อน ใส่ค่าลงใน q ก่อน

ดังนั้นควรจะเข้าใจลำดับข้อมูลให้ดี ไม่เช่นนั้นจะได้ค่าออกมาผิด



ตัวแปรร่วมสำหรับใช้ในทุกงาน
ปกติแล้วหากเราประกาศตัวแปรไว้นอกฟังก์ชัน ตัวแปรนั้นสามารถเปลี่ยนแปลงค่าได้จากภายในฟังก์ชันโดยใช้คำสั่ง global (ใครไม่แม่นอ่านทวนได้ใน https://phyblas.hinaboshi.com/tsuchinoko19)

เช่นมีฟังก์ชันแบบนี้
c = [1]
def job(a):
    global c
    c += [a]
    print(c)

ถ้าเรียกใช้โดยทั่วไปแบบนี้
job(2)
job(3)

จะได้
[1, 2]
[1, 2, 3]

นั่นคือลิสต์ c ค่อยๆถูกเติม

แต่หากเรียกด้วย Process แบบนี้
c = [1]
if(__name__=='__main__'):
    p1 = mp.Process(target=job,args=(2,))
    p1.start()
    p2 = mp.Process(target=job,args=(3,))
    p2.start()

ผลที่ได้
[1, 2]
[1, 3]

จะเห็นว่าถึง p1 รันก่อนแล้วเติมค่าให้ลิสต์ไป แต่พอ p2 เรียกใช้อีกกลับอยู่ในสภาพก่อนเติมแล้วจึงเติมใหม่อีก แสดงว่าตัวแปรนี้ไม่สามารถใช้ร่วมกันได้

สำหรับกรณีแบบนี้ การกำหนดตัวแปรสากลที่ใช้ร่วมจะต้องทำโดยใช้ mp.Value

ตัวอย่าง
def job(a):
    v.value +=1
    print(v.value)

if(__name__=='__main__'):
    v = mp.Value('i',0)
    p1 = mp.Process(target=job,args=(2,))
    p1.start()
    p2 = mp.Process(target=job,args=(3,))
    p2.start()

ได้
1
2

การใช้ Value แบบนี้ต้องกำหนดชนิดของตัวแปรไปด้วย โดยใส่ชนิดตัวแปรก่อนค่อยตามด้วยค่า

ชนิดของข้อมูลจะใช้ตัวย่อ อ้างอิงตามนี้ https://docs.python.org/3/library/array.html

ในที่นี้ i หมายถึงตัวแปรจำนวนเต็ม

ค่าของตัวแปรจะไม่ได้อยู่ที่ตัวแปรโดยตรงแต่อยู่ที่แอตทริบิวต์ .value ดังนั้นจึงต้องพิมพ์ .value ต่อท้ายเวลาใช้อย่างที่เห็น

หากต้องการเป็นตัวแปรหลายตัวก็ต้องทำเป็นอาเรย์ โดยใช้ mp.Array
def job(a,b):
    v[a] += b
    print(v[:])

if(__name__=='__main__'):
    v = mp.Array('i',[0,0,0,1])
    for i in range(4):
        p = mp.Process(target=job,args=(i,i+2))
        p.start()
        p.join()

ได้
[2, 0, 0, 1]
[2, 3, 0, 1]
[2, 3, 4, 1]
[2, 3, 4, 6]

อาเรย์ในที่นี้นั้นจะคล้ายๆกับอาเรย์ในภาษา C หรือ ndarray ของ numpy คือสามารถแก้ไขค่าภายในได้แต่ไม่สามารถเพิ่มหรือลดสมาชิกได้ นอกจากสร้างใหม่ไปเลย



การใช้ Pool
Pool นั้นเป็นอีกวิธีที่ใช้สั่งงานเช่นเดียวกันกับ Process เพียงแต่สามารถสั่งให้คืนค่าออกมาได้โดยตรง ถือเป็นอีกวิธีหนึ่งที่จะสามารถนำค่าที่คืนกลับจากงานที่สั่งไปมาใช้ได้

การใช้งานจะคล้ายกับ Process โดยมีวิธีการเขียนดังนี้
def job(a):
    return [a,a**2,a**3]

if(__name__=='__main__'):
    pool = mp.Pool(processes=1)
    a = pool.apply_async(job,(2,))
    b = pool.apply_async(job,(3,))
    print(a.get())
    print(b.get())

ได้
[2, 4, 8]
[3, 9, 27]

เริ่มจากสร้างออบเจ็กต์ของ Pool ขึ้นมา จากนั้นก็สั่งให้เริ่มทำงานโดยใช้เมธอด apply_async ใส่ฟังก์ชันแล้วตามด้วยอาร์กิวเมนต์

ถ้าเจอเมธอด get โปรแกรมก็จะทำการรอให้ฟังก์ชันทำงานจนจบแล้วคืนค่ากลับมา ไม่เช่นนั้นจะไม่เดินต่อ

ค่า processes ที่ใส่เข้าไปตอนสร้าง Pool นี้คือจำนวนที่จะให้มันทำงานสูงสุดในเวลาเดียวกัน ถ้าเราสั่งงานให้ pool นั้นไปมากกว่าจำนวนที่กำหนดมันก็จะรอให้งานที่เริ่มทำก่อนนั้นจบลงไปก่อน เช่น
def job(a):
    for i in range(1,4):
        time.sleep(0.5)
        print(100*a+i)

if(__name__=='__main__'):
    pool = mp.Pool(processes=2)
    for j in range(1,6):
        pool.apply_async(job,(j,))

ได้
101
201
102
202
103
203
301
401
302
402
303
403
501
502
503


จะเห็นว่างานที่ 1 และ 2 ทำพร้อมกัน แล้วต่อมางานที่ 3 และ 4 ก็ทำงานพร้อมกัน แล้วจึงเริ่มงานที่ 5 นั่นเพราะสั่งให้ทำงานสูงสุดแค่ 2

นอกจากใช้ apply_async แล้ว ยังมีอีกเมธอดที่สะดวกที่มักใช้ใน Pool คือ map

map ใช้สั่งงานโดยรับค่าเข้าไปหลายตัวพร้อมกันแล้วคืนค่าของแต่ละงานกลับมาเป็นลิสต์
def job(a):
    return a**2/2

if(__name__=='__main__'):
    pool = mp.Pool(processes=5)
    print(pool.map(job,[1,2,3,10,18]))

ได้
[0.5, 2.0, 4.5, 50.0, 162.0]


ดูเผินๆอาจคล้ายกับใช้ฟังก์ชัน map ธรรมดา
print(list(map(job,[1,2,3,10,18])))

ผลที่ได้ก็เหมือนกัน เพียงแต่ว่าการสั่งด้วย Pool แบบนี้งานทั้งหมดจะทำไปพร้อมกันในเวลาเดียวกัน จึงเร็วขึ้นมาก



สถานะการทำงานของโปรแกรม
สุดท้ายนี้ลองมาดูตรงนี้สักหน่อย

หากขณะที่โปรแกรมทำงานอยู่เราไปดูที่สถานะการทำงานของโปรแกรมเช่นผ่าน ตัวจัดการงาน (task manager) ของ windows หรือ ตัวตรวจสอบกิจกรรม (activity monitor) ของ mac ก็จะเห็นได้ว่ามันมีงาน python โผล่มาเต็มไปหมด แยกกันชัดเจน เช่นลองรันโค้ดนี้
def job(a):
    for i in range(10000000):
        a += i

if(__name__=='__main__'):
    pool = mp.Pool(processes=30)
    pool.map(job,range(100))

จากนั้นจะเห็นว่าเครื่องทำงานทั้งหมดตามที่สั่งไปอย่างเต็มที่ แน่นอนว่าไม่ใช่ว่าสั่งไปกี่งานก็จะเร็วขึ้นเท่านั้น แต่ CPU ของแต่ละเครื่องก็มีขีดจำกัดอยู่ ถ้าเครื่องทำงานเต็มที่แล้วก็จะทำให้ทุกอย่างช้าลงตาม



ดังนั้นควรจะตรวจดูสถานะการทำงานของโปรแกรมในเครื่องเพื่อให้ใช้ได้อย่างมีประสิทธิภาพสูงสุดและไม่ให้ทำงานหนักเกินไปโดยเปล่าประโยชน์



อ้างอิง


-----------------------------------------

囧囧囧囧囧囧囧囧囧囧囧囧囧囧囧囧囧囧囧囧囧囧囧囧囧

ดูสถิติของหน้านี้

หมวดหมู่

-- คอมพิวเตอร์ >> เขียนโปรแกรม >> python

ไม่อนุญาตให้นำเนื้อหาของบทความไปลงที่อื่นโดยไม่ได้ขออนุญาตโดยเด็ดขาด หากต้องการนำบางส่วนไปลงสามารถทำได้โดยต้องไม่ใช่การก๊อปแปะแต่ให้เปลี่ยนคำพูดเป็นของตัวเอง หรือไม่ก็เขียนในลักษณะการยกข้อความอ้างอิง และไม่ว่ากรณีไหนก็ตาม ต้องให้เครดิตพร้อมใส่ลิงก์ของทุกบทความที่มีการใช้เนื้อหาเสมอ

目录

从日本来的名言
模块
-- numpy
-- matplotlib

-- pandas
-- manim
-- opencv
-- pyqt
-- pytorch
机器学习
-- 神经网络
javascript
蒙古语
语言学
maya
概率论
与日本相关的日记
与中国相关的日记
-- 与北京相关的日记
-- 与香港相关的日记
-- 与澳门相关的日记
与台湾相关的日记
与北欧相关的日记
与其他国家相关的日记
qiita
其他日志

按类别分日志



ติดตามอัปเดตของบล็อกได้ที่แฟนเพจ

  查看日志

  推荐日志

ตัวอักษรกรีกและเปรียบเทียบการใช้งานในภาษากรีกโบราณและกรีกสมัยใหม่
ที่มาของอักษรไทยและความเกี่ยวพันกับอักษรอื่นๆในตระกูลอักษรพราหมี
การสร้างแบบจำลองสามมิติเป็นไฟล์ .obj วิธีการอย่างง่ายที่ไม่ว่าใครก็ลองทำได้ทันที
รวมรายชื่อนักร้องเพลงกวางตุ้ง
ภาษาจีนแบ่งเป็นสำเนียงอะไรบ้าง มีความแตกต่างกันมากแค่ไหน
ทำความเข้าใจระบอบประชาธิปไตยจากประวัติศาสตร์ความเป็นมา
เรียนรู้วิธีการใช้ regular expression (regex)
การใช้ unix shell เบื้องต้น ใน linux และ mac
g ในภาษาญี่ปุ่นออกเสียง "ก" หรือ "ง" กันแน่
ทำความรู้จักกับปัญญาประดิษฐ์และการเรียนรู้ของเครื่อง
ค้นพบระบบดาวเคราะห์ ๘ ดวง เบื้องหลังความสำเร็จคือปัญญาประดิษฐ์ (AI)
หอดูดาวโบราณปักกิ่ง ตอนที่ ๑: แท่นสังเกตการณ์และสวนดอกไม้
พิพิธภัณฑ์สถาปัตยกรรมโบราณปักกิ่ง
เที่ยวเมืองตานตง ล่องเรือในน่านน้ำเกาหลีเหนือ
ตระเวนเที่ยวตามรอยฉากของอนิเมะในญี่ปุ่น
เที่ยวชมหอดูดาวที่ฐานสังเกตการณ์ซิงหลง
ทำไมจึงไม่ควรเขียนวรรณยุกต์เวลาทับศัพท์ภาษาต่างประเทศ