Page 87
from cmath import sqrt
import threading
import time
myCounter = 0
def count():
global myCounter
for i in range(100000):
temp = myCounter+1
x = sqrt(2)
myCounter = temp
thread1 = threading.Thread(target=count)
thread2 = threading.Thread(target=count)
t1 = time.perf_counter()
thread1.start()
thread2.start()
thread1.join()
thread2.join()
t2 = time.perf_counter()
print((t2-t1)*1000)
print(myCounter)
Page 90 (top)
from cmath import sqrt
import threading
import time
myCounter = 0
countlock = threading.Lock()
def count():
global myCounter
for i in range(100000):
countlock.acquire()
temp = myCounter+1
x = sqrt(2)
myCounter = temp
countlock.release()
thread1 = threading.Thread(target=count)
thread2 = threading.Thread(target=count)
t1 = time.perf_counter()
thread1.start()
thread2.start()
thread1.join()
thread2.join()
t2 = time.perf_counter()
print((t2-t1)*1000)
print(myCounter)
Page 90 (Bottom)
from cmath import sqrt
import threading
import time
myCounter = 0
countlock = threading.Lock()
def count():
global myCounter
countlock.acquire()
for i in range(100000):
temp = myCounter+1
x = sqrt(2)
myCounter = temp
countlock.release()
thread1 = threading.Thread(target=count)
thread2 = threading.Thread(target=count)
t1 = time.perf_counter()
thread1.start()
thread2.start()
thread1.join()
thread2.join()
t2 = time.perf_counter()
print((t2-t1)*1000)
print(myCounter)
Page 91
import multiprocessing
import time
def myProcess(lock):
lock.acquire()
time.sleep(4)
lock.release()
if __name__ == '__main__':
myLock = multiprocessing.Lock()
p1 = multiprocessing.Process(target=myProcess, args=(myLock,))
p2 = multiprocessing.Process(target=myProcess, args=(myLock,))
t1 = time.perf_counter()
p1.start()
p2.start()
p1.join()
p2.join()
t2 = time.perf_counter()
print((t2-t1)*1000)
Page 92-93
import threading
import time
countlock1 = threading.Lock()
countlock2 = threading.Lock()
myCounter1 = 0
myCounter2 = 0
def count1():
global myCounter1
global myCounter2
for i in range(100000):
countlock1.acquire()
myCounter1 += 1
countlock2.acquire()
myCounter2 += 1
countlock2.release()
countlock1.release()
def count2():
global myCounter1
global myCounter2
for i in range(100000):
countlock2.acquire()
myCounter2 += 1
countlock1.acquire()
myCounter1 += 1
countlock1.release()
countlock2.release()
thread1 = threading.Thread(target=count1)
thread2 = threading.Thread(target=count2)
t1 = time.perf_counter()
thread1.start()
thread2.start()
thread1.join()
thread2.join()
t2 = time.perf_counter()
print((t2-t1)*1000)
print(myCounter1)
Page 95
import threading
import time
countlock1 = threading.Lock()
countlock2 = threading.Lock()
myCounter1 = 0
myCounter2 = 0
def count1():
global myCounter1
global myCounter2
for i in range(100000):
countlock1.acquire()
myCounter1 += 1
if countlock2.acquire(timeout=0.01):
myCounter2 += 1
countlock2.release()
countlock1.release()
def count2():
global myCounter1
global myCounter2
for i in range(100000):
countlock2.acquire()
myCounter2 += 1
if countlock1.acquire(timeout=0.01):
myCounter1 += 1
countlock1.release()
countlock2.release()
thread1 = threading.Thread(target=count1)
thread2 = threading.Thread(target=count2)
t1 = time.perf_counter()
thread1.start()
thread2.start()
thread1.join()
thread2.join()
t2 = time.perf_counter()
print((t2-t1)*1000)
print(myCounter1)
Page 105
import threading
import time
def myPi(m, n):
pi = 0
for k in range(m, n+1):
s = 1 if k % 2 else -1
pi += s / (2 * k - 1)
global PI
with myLock:
PI += pi*4
PI = 0
N = 10000000
myLock = threading.Lock()
thread1 = threading.Thread(target=myPi, args=(N//2+1, N))
t1 = time.perf_counter()
thread1.start()
myPi(1, N//2)
thread1.join()
t2 = time.perf_counter()
print((t2-t1)*1000)
print(PI)
time.perf_counter()
print((t2-t1)*1000)
print(PI)
Page 107
import threading
import time
def myThread1():
time.sleep(1)
thread2.join()
def myThread2():
time.sleep(1)
thread1.join()
thread1 = threading.Thread(target=myThread1)
thread2 = threading.Thread(target=myThread2)
thread1.start()
thread2.start()
Page 108
import threading
import time
joinLock = threading.Lock()
def myThread1():
time.sleep(1)
joinLock.release()
thread1 = threading.Thread(target=myThread1)
joinLock.acquire()
thread1.start()
joinLock.acquire()
Page 109
import urllib.request
import threading
def download(html, lock):
html.append(f.read().decode('utf-8'))
lock.release()
html1 = []
html2 = []
mySem = threading.Semaphore()
thread1 = threading.Thread(target=download, args=(html1, mySem))
thread2 = threading.Thread(target=download, args=(html2, mySem))
mySem.acquire()
thread1.start()
thread2.start()
mySem.acquire()
if (html1 != []):
print("thread1")
if (html2 != []):
print("thread2")
Page 111
import urllib.request
import threading
def download(html, event):
html.append(f.read().decode('utf-8'))
event.set()
html1 = []
html2 = []
myEvent = threading.Event()
thread1 = threading.Thread(target=download, args=(html1, myEvent))
thread2 = threading.Thread(target=download, args=(html2, myEvent))
myEvent.clear()
thread1.start()
thread2.start()
myEvent.wait()
if (html1 != []):
print("thread1")
if (html2 != []):
print("thread2")
Page 112
import urllib.request
import threading
def download(html, barrier):
html.append(f.read().decode('utf-8'))
barrier.wait()
html1 = []
html2 = []
myBarrier = threading.Barrier(3)
thread1 = threading.Thread(target=download, args=(html1, myBarrier))
thread2 = threading.Thread(target=download, args=(html2, myBarrier))
thread1.start()
thread2.start()
myBarrier.wait()
if (html1 != []):
print("thread1")
if (html2 != []):
print("thread2")
Page 113
import urllib.request
import threading
def download(html, barrier):
html.append(f.read().decode('utf-8'))
try:
barrier.wait()
except:
pass
html1 = []
html2 = []
myBarrier = threading.Barrier(2)
thread1 = threading.Thread(target=download, args=(html1, myBarrier))
thread2 = threading.Thread(target=download, args=(html2, myBarrier))
thread1.start()
thread2.start()
myBarrier.wait()
myBarrier.abort()
if (html1 != []):
print("thread1")
if (html2 != []):
print("thread2")
Page 116
import urllib.request
import threading
def download(html, condition):
html.append(f.read().decode('utf-8'))
with condition:
condition.notify()
html1 = []
html2 = []
myCondition = threading.Condition()
thread1 = threading.Thread(target=download, args=(html1, myCondition))
thread2 = threading.Thread(target=download, args=(html2, myCondition))
thread1.start()
thread2.start()
with myCondition:
myCondition.wait()
if (html1 != []):
print("thread1")
if (html2 != []):
print("thread2")
Page 118
import urllib.request
import threading
def download(html, condition):
html.append(f.read().decode('utf-8'))
global myCount
with condition:
myCount += 1
condition.notify()
myCount = 0
html1 = []
html2 = []
html3 = []
myCondition = threading.Condition(threading.Lock())
thread1 = threading.Thread(target=download, args=(html1, myCondition))
thread2 = threading.Thread(target=download, args=(html2, myCondition))
thread3 = threading.Thread(target=download, args=(html3, myCondition))
thread1.start()
thread2.start()
thread3.start()
with myCondition:
myCondition.wait_for(lambda: myCount >= 2)
if (html1 != []):
print("thread1")
if (html2 != []):
print("thread2")
if (html3 != []):
print("thread3")
Page 125
import multiprocessing
def addCount(q):
for i in range(1000):
q.put(i)
def getCount(q, l):
while True:
i = q.get()
with l:
print(i)
if __name__ == '__main__':
q = multiprocessing.Queue()
pLock = multiprocessing.Lock()
p1 = multiprocessing.Process(target=addCount, args=(q,))
p2 = multiprocessing.Process(target=getCount, args=(q, pLock))
p3 = multiprocessing.Process(target=getCount, args=(q, pLock))
p1.start()
p2.start()
p3.start()
p3.join()
Page 128-129
import multiprocessing
from time import sleep
def addCount(con):
for i in range(500):
con.send(i)
def getCount(con, l):
while True:
sleep(0.005)
i = con.recv()
with l:
print(i, multiprocessing.current_process().name)
if __name__ == '__main__':
con1, con2 = multiprocessing.Pipe()
pLock = multiprocessing.Lock()
p1 = multiprocessing.Process(target=addCount, args=(con1,))
p2 = multiprocessing.Process(target=getCount, args=(con2, pLock))
p3 = multiprocessing.Process(target=getCount, args=(con2, pLock))
p1.start()
p2.start()
p3.start()
p2.join()
Page 131
import time
import multiprocessing
import ctypes
def myUpdate(val):
for i in range(1000):
time.sleep(0.005)
val.value += 1
if __name__ == '__main__':
myValue = multiprocessing.Value(ctypes.c_int, 0)
p1 = multiprocessing.Process(target=myUpdate, args=(myValue,))
p2 = multiprocessing.Process(target=myUpdate, args=(myValue,))
p2.start()
p1.start()
p1.join()
p2.join()
print(myValue.value)
Page 135
import time
import multiprocessing
import multiprocessing.shared_memory
def myUpdate(name, lock):
mySharedMem = multiprocessing.shared_memory.SharedMemory(
create=False, name=name)
for i in range(127):
time.sleep(0.005)
with lock:
mySharedMem.buf[0] = mySharedMem.buf[0]+1
mySharedMem.close()
if __name__ == '__main__':
mylock = multiprocessing.Lock()
mySharedMem = multiprocessing.shared_memory.SharedMemory(
create=True, size=10)
mySharedMem.buf[0] = 1
p1 = multiprocessing.Process(target=myUpdate,
args=(mySharedMem.name, mylock))
p2 = multiprocessing.Process(target=myUpdate,
args=(mySharedMem.name, mylock))
p2.start()
p1.start()
p1.join()
p2.join()
print(mySharedMem.buf[0])
mySharedMem.close()
mySharedMem.unlink()
Page 136
import time
import multiprocessing
import multiprocessing.shared_memory
import sys
def myUpdate(mySharedMem, lock):
for i in range(1000):
time.sleep(0.005)
with lock:
count = int.from_bytes(
mySharedMem.buf[0:8], byteorder=sys.byteorder)
count = count+1
mySharedMem.buf[0:8] = int.to_bytes(
count, 8, byteorder=sys.byteorder)
mySharedMem.close()
if __name__ == '__main__':
mylock = multiprocessing.Lock()
mySharedMem = multiprocessing.shared_memory.SharedMemory(
create=True, size=10)
mySharedMem.buf[0:8] = int.to_bytes(1, 8,
byteorder=sys.byteorder)
p1 = multiprocessing.Process(target=myUpdate,
args=(mySharedMem, mylock))
p2 = multiprocessing.Process(target=myUpdate,
args=(mySharedMem, mylock))
p2.start()
p1.start()
p1.join()
p2.join()
count = count = int.from_bytes(mySharedMem.buf[0:8],
byteorder=sys.byteorder)
print(count)
mySharedMem.close()
mySharedMem.unlink()
Page 138
import time
import multiprocessing
import multiprocessing.shared_memory
def myUpdate(mySharedList, lock):
for i in range(1000):
time.sleep(0.005)
with lock:
mySharedList[0] = mySharedList[0]+1
mySharedList.shm.close()
if __name__ == '__main__':
mylock = multiprocessing.Lock()
mySharedList = multiprocessing.shared_memory.ShareableList(
sequence=[1])
p1 = multiprocessing.Process(target=myUpdate,
args=(mySharedList, mylock))
p2 = multiprocessing.Process(target=myUpdate,
args=(mySharedList, mylock))
p2.start()
p1.start()
p1.join()
p2.join()
print(mySharedList[0])
mySharedList.shm.close()
mySharedList.shm.unlink()
Page 139
import time
import multiprocessing
import multiprocessing.shared_memory
import multiprocessing.managers
def myUpdate(mySharedList, lock):
for i in range(1000):
time.sleep(0.005)
with lock:
mySharedList[0] = mySharedList[0]+1
mySharedList.shm.close()
if __name__ == '__main__':
mylock = multiprocessing.Lock()
with multiprocessing.managers.SharedMemoryManager() as smm:
mySharedList = smm.ShareableList([1])
p1 = multiprocessing.Process(target=myUpdate,
args=(mySharedList, mylock))
p2 = multiprocessing.Process(target=myUpdate,
args=(mySharedList, mylock))
p2.start()
p1.start()
p1.join()
p2.join()
print(mySharedList[0])
Page 139
import ctypes
import multiprocessing
import time
def myPi(m, n, PI):
pi = 0
for k in range(m, n+1):
s = 1 if k % 2 else -1
pi += s / (2 * k - 1)
with PI:
PI.value += pi*4
if __name__ == '__main__':
N = 10000000
PI = multiprocessing.Value(ctypes.c_double, 0.0)
p1 = multiprocessing.Process(target=myPi, args=(N//2+1, N, PI))
t1 = time.perf_counter()
p1.start()
myPi(1, N//2, PI)
p1.join()
t2 = time.perf_counter()
print((t2-t1)*1000)
print(PI.value)
Page 144
import multiprocessing.pool
import multiprocessing
import random
import time
def myProcess():
time.sleep(random.randrange(1, 4))
print(multiprocessing.current_process().name)
if __name__ == '__main__':
p = multiprocessing.pool.Pool(2)
p.apply_async(myProcess)
p.apply_async(myProcess)
p.close()
p.join()
Page 145
import multiprocessing.pool
import multiprocessing
import random
import time
def myProcess():
time.sleep(random.randrange(1, 4))
return multiprocessing.current_process().name
def myCallback(result):
print(result)
if __name__ == '__main__':
p = multiprocessing.pool.Pool(2)
p.apply_async(myProcess, callback=myCallback)
p.apply_async(myProcess, callback=myCallback)
p.close()
p.join()
Page 146
import multiprocessing.pool
import multiprocessing
import random
import time
def myProcess():
time.sleep(random.randrange(1, 4))
return multiprocessing.current_process().name
if __name__ == '__main__':
with multiprocessing.pool.Pool(2) as p:
asr1 = p.apply_async(myProcess)
asr2 = p.apply_async(myProcess)
result1 = asr1.get()
result2 = asr2.get()
print(result1, result2)
Page 147
import multiprocessing.pool
import multiprocessing
import random
import time
def myProcess():
time.sleep(random.randrange(1, 6))
return multiprocessing.current_process().name
if __name__ == '__main__':
with multiprocessing.pool.Pool(2) as p:
asr1 = p.apply_async(myProcess)
asr2 = p.apply_async(myProcess)
waiting = True
while waiting:
time.sleep(0.01)
if (asr1.ready()):
print(asr1.get())
break
if (asr2.ready()):
print(asr2.get())
break
Page 147-148
import multiprocessing
import multiprocessing.pool
import time
def myPi(m, n):
pi = 0
for k in range(m, n+1):
s = 1 if k % 2 else -1
pi += s / (2 * k - 1)
return pi*4
if __name__ == '__main__':
N = 10000000
with multiprocessing.pool.Pool(2) as p:
t1 = time.perf_counter()
asr1 = p.apply_async(myPi, args=(1, N//2))
asr2 = p.apply_async(myPi, args=(N//2+1, N))
PI = asr1.get()
PI += asr2.get()
t2 = time.perf_counter()
print((t2-t1)*1000)
print(PI)
Page 149 (top)
import multiprocessing
import multiprocessing.pool
def myFunc(x):
return x**2
if __name__ == '__main__':
with multiprocessing.pool.Pool(10) as p:
asr = p.map_async(myFunc, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
res = asr.get()
print(res)
Page 149 (bottom)
import multiprocessing
import multiprocessing.pool
import time
def myPi(r):
m, n = r
pi = 0
for k in range(m, n+1):
s = 1 if k % 2 else -1
pi += s / (2 * k - 1)
return pi*4
if __name__ == '__main__':
N = 10000000
with multiprocessing.pool.Pool(2) as p:
t1 = time.perf_counter()
asr = p.map_async(myPi, [(1, N//2), (N//2+1, N)])
res = asr.get()
t2 = time.perf_counter()
print((t2-t1)*1000)
PI = res[0]+res[1]
print(PI)
Page 150
import multiprocessing
import multiprocessing.pool
import time
def myPi(m, n):
pi = 0
for k in range(m, n+1):
s = 1 if k % 2 else -1
pi += s / (2 * k - 1)
return pi*4
if __name__ == '__main__':
N = 10000000
with multiprocessing.pool.Pool(2) as p:
t1 = time.perf_counter()
asr = p.starmap_async(myPi, [(1, N//2), (N//2+1, N)])
res = asr.get()
t2 = time.perf_counter()
print((t2-t1)*1000)
PI = res[0]+res[1]
print(PI)
Page 151
import multiprocessing
import multiprocessing.pool
def myFunc(x):
return x**2
if __name__ == '__main__':
with multiprocessing.pool.Pool(10) as p:
results = p.imap(myFunc, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
for result in results:
print(result)
Page 152
import multiprocessing
import multiprocessing.pool
def myFunc(x):
return x**2
if __name__ == '__main__':
with multiprocessing.pool.Pool(10) as p:
results = p.imap_unordered(myFunc, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
for result in results:
print(result)
Page 153
import functools
import multiprocessing
import multiprocessing.pool
def myFunc(x):
return x**2
def mySum(value, item):
return value+item
if __name__ == '__main__':
with multiprocessing.pool.Pool(10) as p:
asr = p.map_async(myFunc, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
res = asr.get()
sumsquares = functools.reduce(mySum, res)
print(sumsquares)
Page 154
import multiprocessing
import multiprocessing.pool
import time
import ctypes
def counter():
global count
for i in range(10000):
with count:
temp = count.value+1
count.value = temp
def setup(var):
global count
count = var
if __name__ == '__main__':
myCounter = multiprocessing.Value(ctypes.c_int, 0)
with multiprocessing.Pool(2, initializer=setup,
initargs=(myCounter,)) as p:
t1 = time.perf_counter()
asr1 = p.apply_async(counter)
asr2 = p.apply_async(counter)
asr1.wait()
asr2.wait()
t2 = time.perf_counter()
print(myCounter.value)
print((t2-t1)*1000)
Page 160
import multiprocessing
import multiprocessing.pool
import time
def myFunc(x):
for i in range(len(x)):
x[i] = x[i]**2
if __name__ == '__main__':
man = multiprocessing.Manager()
myList = man.list([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
p = multiprocessing.Process(target=myFunc, args=(myList,))
t1 = time.perf_counter()
p.start()
p.join()
t2 = time.perf_counter()
print((t2-t1)*1000)
print(myList)
Page 161
import multiprocessing
def count(myCounter):
for i in range(1000):
myCounter.value = myCounter.value+1
if __name__ == '__main__':
man = multiprocessing.Manager()
counter = man.Value(int, 0)
p1 = multiprocessing.Process(target=count, args=(counter,))
p2 = multiprocessing.Process(target=count, args=(counter,))
p1.start()
p2.start()
p1.join()
p2.join()
print(counter.value)
Page 162
import multiprocessing
def count(myCounter):
for i in range(1000):
myCounter.value = myCounter.value+1
if __name__ == '__main__':
man = multiprocessing.Manager()
counter = man.Value(int, 0)
p1 = multiprocessing.Process(target=count, args=(counter,))
p2 = multiprocessing.Process(target=count, args=(counter,))
p1.start()
p2.start()
p1.join()
p2.join()
print(counter.value)
Page 162
import multiprocessing
import time
def myPi(m, n, PI, lock):
pi = 0
for k in range(m, n+1):
s = 1 if k % 2 else -1
pi += s / (2 * k - 1)
with lock:
PI.value += pi*4
if __name__ == '__main__':
N = 10000000
man = multiprocessing.Manager()
PI = man.Value(float, 0.0)
myLock = man.Lock()
p1 = multiprocessing.Process(target=myPi,
args=(N//2+1, N, PI, myLock))
t1 = time.perf_counter()
p1.start()
myPi(1, N//2, PI, myLock)
p1.join()
t2 = time.perf_counter()
print((t2-t1)*1000)
print(PI.value)
Page 165-166
import multiprocessing
import multiprocessing.managers
class Point():
def __init__(self):
self._x = 0
self._y = 0
def setxy(self, x, y):
self._x = x
self._y = y
def getxy(self):
return (self._x, self._y)
def myFunc(point):
point.setxy(42, 43)
class myManager(multiprocessing.managers.BaseManager):
pass
myManager.register("point", Point)
if __name__ == '__main__':
man = myManager()
man.start()
point = man.point()
p1 = multiprocessing.Process(target=myFunc, args=(point,))
p1.start()
p1.join()
print(point.getxy())
Page 168-169
import multiprocessing
import multiprocessing.managers
class Counter():
def __init__(self):
self.myCounter = 0
def getCount(self):
return self.myCounter
def setCount(self, value):
self.myCounter = value
count = property(getCount, setCount)
class CounterProxy(multiprocessing.managers.BaseProxy):
_exposed_ = ('getCount', 'setCount')
def get(self):
return self._callmethod('getCount')
def set(self, value):
return self._callmethod('setCount', (value,))
value = property(get, set)
class myManager(multiprocessing.managers.BaseManager):
pass
myManager.register("counter", callable=Counter,
proxytype=CounterProxy)
def count(myCounter):
for i in range(1000):
myCounter.value = myCounter.value+1
if __name__ == '__main__':
man = myManager()
man.start()
counter = man.counter()
p1 = multiprocessing.Process(target=count, args=(counter,))
p2 = multiprocessing.Process(target=count, args=(counter,))
p1.start()
p2.start()
p1.join()
p2.join()
print(counter.value)
Page 172
import multiprocessing
import multiprocessing.managers
class SharedList():
def __init__(self):
self.list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def getData(self):
return self.list
def setData(self, value, index):
self.list[index] = value
class SharedListProxy(multiprocessing.managers.BaseProxy):
_exposed_ = ('getData', 'setData')
def getData(self):
return self._callmethod('getData')
def setData(self, value, index):
self._callmethod('setData', (value, index))
_sharedList = SharedList()
def SharedList():
return _sharedList
class myManager(multiprocessing.managers.BaseManager):
pass
myManager.register("SharedList", callable=SharedList,
proxytype=SharedListProxy)
if __name__ == '__main__':
man = myManager(address=("192.168.253.14", 50000), authkey=b"password")
s = man.get_server()
s.serve_forever()
Page 174
import multiprocessing
import multiprocessing.managers
class SharedListProxy(multiprocessing.managers.BaseProxy):
_exposed_ = ('getData', 'setData')
def getData(self):
return self._callmethod('getData')
def setData(self, value, index):
self._callmethod('setData', (value, index))
class myManager(multiprocessing.managers.BaseManager):
pass
myManager.register("SharedList", proxytype=SharedListProxy)
if __name__ == '__main__':
man = myManager(address=("192.168.253.14", 50000),
authkey=b"password")
man.connect()
myList = man.SharedList()
print(myList.getData())
Page 174-175
import multiprocessing
import multiprocessing.managers
class Math:
def myPi(self, m, n):
pi = 0
for k in range(m, n+1):
s = 1 if k % 2 else -1
pi += s / (2 * k - 1)
return pi*4
_math = Math()
def getMath():
return _math
class MathProxy(multiprocessing.managers.BaseProxy):
_exposed_ = ("myPi",)
def myPi(self, m, n):
return self._callmethod('myPi', m, n)
class myManager(multiprocessing.managers.BaseManager):
pass
myManager.register("math", callable=getMath, proxytype=MathProxy)
if __name__ == '__main__':
man = myManager(address=("192.168.253.14", 50000),
authkey=b"password")
s = man.get_server()
s.serve_forever()
Page 175
import multiprocessing
import multiprocessing.managers
class MathProxy(multiprocessing.managers.BaseProxy):
_exposed_ = ('myPi')
def myPi(self, m, n):
return self._callmethod('myPi', (m, n))
class myManager(multiprocessing.managers.BaseManager):
pass
myManager.register("math", proxytype=MathProxy)
if __name__ == '__main__':
man = myManager(address=("192.168.253.14", 50000), authkey=b"password")
man.connect()
math = man.math()
print(math.myPi(1, 100))
Page 185
for Linux:
#!/bin/sh
echo "user name: "
read name
echo $name
for Windows:
@echo off
echo "User name:"
set /p id=
echo %id%
Top of Page
import subprocess
p = subprocess.Popen(["myscript.bat"],
stdout=subprocess.PIPE, stdin=subprocess.PIPE, text=True)
ans = p.stdout.readline()
print("message", ans)
p.stdin.write("mike\n")
ans = p.stdout.readline()
print("message", ans)
as modified at bottom of page
import subprocess
p = subprocess.Popen(["myscript.bat"],
stdout=subprocess.PIPE, stdin=subprocess.PIPE, text=True)
ans = p.stdout.readline()
print("message",ans)
p.stdin.write("mike\n")
p.stdin.flush()
ans = p.stdout.readline()
print("message",ans)
Page 186
import subprocess
p = subprocess.Popen(["myscript.bat"], stdout=subprocess.PIPE,
stdin=subprocess.PIPE, bufsize=0,
universal_newlines=True, text=True)
ans = p.stdout.readline()
print("message", ans)
p.stdin.write("mike\n")
ans = p.stdout.readline()
print("message", ans)
Page 189
import subprocess
import threading
class AsyncReader():
def __init__(self, stream):
self._stream = stream
self._char = ""
self.t = threading.Thread(target=self._get, daemon=True)
self.t.start()
def get(self):
self.t.join(0.04)
if self.t.is_alive():
return ""
else:
result = self._char
self.t = threading.Thread(target=self._get, daemon=True)
self.t.start()
return result
def _get(self):
self._char = self._stream.read(1)
def readmessage(self):
ans = ""
while True:
char = self.get()
if char == "":
break
ans += char
return ans
p = subprocess.Popen(["myscript.bat"],
stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=1, universal_newlines=True, text=True)
aRead = AsyncReader(p.stdout)
ans = aRead.readmessage()
print("message", ans)
p.stdin.write("mike\n")
ans = aRead.readmessage()
print("message", ans)
Page 197
import concurrent.futures
import time
import urllib.request
def download():
html = f.read().decode('utf-8')
return html
with concurrent.futures.ThreadPoolExecutor() as executor:
t1 = time.perf_counter()
f1 = executor.submit(download)
f2 = executor.submit(download)
t2 = time.perf_counter()
print((t2-t1)*1000)
print(f1.result()[:25])
print(f2.result()[:25])
Page 198 modified by top
import concurrent.futures
import time
import urllib.request
def download():
html = f.read().decode('utf-8')
return html
t1 = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor() as executor:
f1 = executor.submit(download)
f2 = executor.submit(download)
t2 = time.perf_counter()
print((t2-t1)*1000)
res = concurrent.futures.wait([f1, f2],
return_when=concurrent.futures.FIRST_COMPLETED)
for f in res.done:
print(f.result()[:25])
Page 198 modified by bottom
import concurrent.futures
import time
import urllib.request
def download():
html = f.read().decode('utf-8')
return html
with concurrent.futures.ThreadPoolExecutor() as executor:
f1 = executor.submit(download)
f2 = executor.submit(download)
for f in concurrent.futures.as_completed([f1, f2]):
print(f.result()[:25])
Page 199
import concurrent.futures
import urllib.request
def download():
html = f.read().decode('utf-8')
return html
def processDownload(f):
print(f.result()[:25])
with concurrent.futures.ThreadPoolExecutor() as executor:
f1 = executor.submit(download)
f2 = executor.submit(download)
f1.add_done_callback(processDownload)
f2.add_done_callback(processDownload)
print("waiting")
Page 200
import concurrent.futures
import urllib.request
def download():
html = f.read().decode('utf-8')
return html
def processDownloadA(f):
print(f.result()[:25])
f2 = executor.submit(download)
f2.add_done_callback(processDownloadB)
def processDownloadB(f):
print(f.result()[:25])
with concurrent.futures.ThreadPoolExecutor() as executor:
f1 = executor.submit(download)
f1.add_done_callback(processDownloadA)
print("waiting")
while True:
pass
Page 203-204
import concurrent.futures
import threading
import time
from cmath import sqrt
myCounter = 0
countlock = threading.Lock()
def count():
global myCounter
for i in range(100000):
with countlock:
temp = myCounter+1
x = sqrt(2)
myCounter = temp
with concurrent.futures.ThreadPoolExecutor() as execute:
t1 = time.perf_counter()
f1 = execute.submit(count)
f2 = execute.submit(count)
concurrent.futures.wait(
[f1, f2], return_when=concurrent.futures.ALL_COMPLETED)
t2 = time.perf_counter()
print((t2-t1)*1000)
print(myCounter)
Page 205
import concurrent.futures
import multiprocessing
import time
import ctypes
def counter():
global count
for i in range(10000):
with count:
temp = count.value+1
count.value = temp
def setup(var):
global count
count = var
if __name__ == '__main__':
myCounter = multiprocessing.Value(ctypes.c_int, 0)
with concurrent.futures.ProcessPoolExecutor(2,
initializer=setup, initargs=(myCounter,)) as execute:
t1 = time.perf_counter()
f1 = execute.submit(counter)
f2 = execute.submit(counter)
concurrent.futures.wait([f1, f2],
return_when=concurrent.futures.ALL_COMPLETED)
t2 = time.perf_counter()
print(myCounter.value)
print((t2-t1)*1000)
Page 206
import concurrent.futures
import multiprocessing
import multiprocessing.managers
import time
import ctypes
def counter(count, lock):
for i in range(10000):
with lock:
temp = count.value+1
count.value = temp
if __name__ == '__main__':
with multiprocessing.Manager() as man:
with concurrent.futures.ProcessPoolExecutor(2) as execute:
myCounter = man.Value(ctypes.c_int, 0)
myLock = man.Lock()
t1 = time.perf_counter()
f1 = execute.submit(counter, myCounter, myLock)
f2 = execute.submit(counter, myCounter, myLock)
concurrent.futures.wait([f1, f2],
return_when=concurrent.futures.ALL_COMPLETED)
t2 = time.perf_counter()
print(myCounter.value)
print((t2-t1)*1000)
Page 207
import concurrent.futures
import time
def taskA():
time.sleep(1)
ans = f2.result()
return ans
def taskB():
time.sleep(1)
ans = f1.result()
return ans
with concurrent.futures.ThreadPoolExecutor(2) as execute:
f1 = execute.submit(taskA)
f2 = execute.submit(taskB)
concurrent.futures.wait([f1, f2],
return_when=concurrent.futures.ALL_COMPLETED)
print(f1.result())
Page 208
import concurrent.futures
import time
def myPi(m, n):
pi = 0
for k in range(m, n+1):
s = 1 if k % 2 else -1
pi += s / (2 * k - 1)
return pi*4
if __name__ == '__main__':
N = 10000000
with concurrent.futures.ProcessPoolExecutor(2) as execute:
t1 = time.perf_counter()
f1 = execute.submit(myPi, 1, N//2)
f2 = execute.submit(myPi, N//2+1, N)
PI = f1.result()
PI += f2.result()
t2 = time.perf_counter()
print((t2-t1)*1000)
print(PI)
Page 216
import asyncio
async def test1(msg):
print(msg)
async def main():
await test1("Hello Coroutine World")
asyncio.run(main())
Page 219
import asyncio
async def count(n):
for i in range(n):
print(i)
return n
async def main(myValue):
t1 = asyncio.create_task(count(10))
print("Hello Coroutine World")
await asyncio.sleep(5)
return myValue
result = asyncio.run(main(42))
print(result)
Page 221
import asyncio
async def test1(msg):
print(msg)
async def main():
asyncio.create_task(test1("one"))
asyncio.create_task(test1("two"))
await test1("three")
print("Hello Coroutine World")
await asyncio.sleep(0)
asyncio.run(main())
Page 225
import asyncio
async def test1(msg):
print(msg)
return msg
async def main():
t1 = asyncio.create_task(test1("one"))
t2 = asyncio.create_task(test1("two"))
done, pending = await asyncio.wait([t1, t2],
return_when=asyncio.ALL_COMPLETED)
for t in done:
print(t.result())
print("Hello Coroutine World")
asyncio.run(main())
Page 226
import asyncio
async def test1(msg):
print(msg)
return msg
async def main():
result = await asyncio.gather(test1("one"), test1("two"))
print(result)
print("Hello Coroutine World")
asyncio.run(main())
Page 227
import asyncio
async def test1(msg):
try:
await asyncio.sleep(0)
except:
pass
print(msg)
return msg
async def main():
t1 = asyncio.create_task(test1("one"))
await asyncio.sleep(0)
t1.cancel()
print("Hello Coroutine World")
await asyncio.sleep(0)
asyncio.run(main())
Page 228 top
import asyncio
async def test(msg):
print(msg)
raise Exception("Test exception")
async def main():
t1 = asyncio.create_task(test("one"))
try:
await t1
except:
print("an exception has occurred")
print("Hello Coroutine World")
await asyncio.sleep(0)
asyncio.run(main())
Page 228 bottom
import asyncio
async def test(msg):
print(msg)
raise Exception("Test exception")
async def main():
t1 = asyncio.create_task(test("one"))
done, pending = await asyncio.wait([t1])
print(repr(done.pop().exception()))
print("Hello Coroutine World")
await asyncio.sleep(0)
asyncio.run(main())
Page 229
import asyncio
async def count():
global myCounter
for i in range(1000):
temp = myCounter+1
myCounter = temp
async def main():
t1 = asyncio.create_task(count())
t2 = asyncio.create_task(count())
await asyncio.wait([t1, t2])
print(myCounter)
myCounter = 0
asyncio.run(main())
Page 230
import asyncio
import asyncio.locks
async def count():
global myCounter
global myLock
for i in range(1000):
async with myLock:
temp = myCounter+1
await asyncio.sleep(0)
myCounter = temp
async def main():
t1 = asyncio.create_task(count())
t2 = asyncio.create_task(count())
await asyncio.wait([t1, t2])
print(myCounter)
myCounter = 0
myLock = asyncio.locks.Lock()
asyncio.run(main())
Page 232
import asyncio
import random
idGlobal = 0
async def inner(idparam):
global idGlobal
await asyncio.sleep(0)
if idparam != idGlobal:
print("error")
async def outer():
global idGlobal
id = random.randint(0, 10000)
idGlobal = id
await inner(id)
async def main():
await asyncio.gather(outer(), outer())
asyncio.run(main())
Page 233
import asyncio
import random
import contextvars
idGlobalCtx = contextvars.ContextVar("id")
async def inner(idparam):
await asyncio.sleep(0)
print(idparam, idGlobalCtx.get(), end=" ")
if idparam != idGlobalCtx.get():
print("error", end="")
print()
async def outer():
global idGlobal
id = random.randint(0, 10000)
idGlobalCtx.set(id)
await inner(id)
async def main():
await asyncio.gather(outer(), outer())
asyncio.run(main())
Page 234
import asyncio
import asyncio.queues
async def addCount(q):
for i in range(1000):
await q.put(i)
async def getCount(q, id):
while True:
i = await q.get()
print(id, i)
await asyncio.sleep(0)
async def main():
q = asyncio.queues.Queue()
t1 = asyncio.create_task(addCount(q))
t2 = asyncio.create_task(getCount(q, "t2"))
t3 = asyncio.create_task(getCount(q, "t3"))
await asyncio.wait([t1, t2, t3])
asyncio.run(main())
Page 243
import asyncio
import urllib.parse
import time
import email
def parseHeaders(headers):
message = email.message_from_string(headers)
return dict(message.items())
async def download(url):
url = urllib.parse.urlsplit(url)
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
request = (
f"GET /index.html HTTP/1.1\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(request.encode('ascii'))
headers = ""
line = await reader.readline()
while True:
line = await reader.readline()
line = line.decode('ascii')
if line == "\r\n":
break
headers += line
headers = parseHeaders(headers)
length = int(headers["Content-Length"])
line = await reader.read(length)
line = line.decode('utf8')
writer.close()
await writer.wait_closed()
return line
async def main():
start = time.perf_counter()
end = time.perf_counter()
print((end-start)*1000)
print(results[0][:25])
print(results[1][:25])
asyncio.run(main())
Page 247
import asyncio
import email
import email.utils
async def handleRequest(reader, writer):
headers = ""
while True:
line = await reader.readline()
line = line.decode('ascii')
if line == "\r\n":
break
headers += line
print(headers)
html = ("<html><head><title>Test Page</title></head><body>"
"page content"
"</p></body></html>\r\n")
headers = ("HTTP/1.1 200 OK\r\n"
"Content-Type: text/html; charset=UTF-8\r\n"
"Server:PythonAcyncio\r\n"
f"Date: {email.utils.formatdate(timeval=None,localtime=False, usegmt=True)}\r\n"
f"Content-Length:{len(html)}\r\n\r\n"
)
data = headers.encode("ascii")+html.encode("utf8")
writer.write(data)
await writer.drain()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(handleRequest, "", 8080)
async with server:
await server.serve_forever()
asyncio.run(main())
Page 250
import urllib.request
import asyncio
import time
async def download():
html = f.read().decode('utf-8')
return html
async def main():
t1=time.perf_counter()
results=await asyncio.gather(download(),download())
t2=time.perf_counter()
print((t2-t1)*1000)
print(results[0][:25])
asyncio.run(main())
Page 251
import urllib.request
import asyncio
import time
def _download():
html = f.read().decode('utf-8')
return html
async def download():
return await asyncio.to_thread(_download)
async def main():
n = 1000
t1 = time.perf_counter()
results = await asyncio.gather(*[download() for i in range(n)])
t2 = time.perf_counter()
print((t2-t1)*1000)
print(results[0][:25])
asyncio.run(main())
Page 253
import asyncio
import time
async def tick():
i = 0
while True:
i += 1
print("TICK", i)
await asyncio.sleep(0.5)
def _myPi(m, n):
pi = 0
for k in range(m, n+1):
s = 1 if k % 2 else -1
pi += s / (2 * k - 1)
return 4*pi
async def myPi(m, n):
return await asyncio.to_thread(_myPi, m, n)
async def main():
n = 100
t1 = time.perf_counter()
T1 = asyncio.create_task(myPi(1, 10000000))
T2 = asyncio.create_task(tick())
done, pending = await asyncio.wait([T1, T2],
return_when=asyncio.FIRST_COMPLETED)
t2 = time.perf_counter()
print((t2-t1)*1000)
print(done.pop().result())
asyncio.run(main())
Page 255
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
print("Status:", response.status)
print("Content-type:", response.headers['content-type'])
html = await response.text()
print("Body:", html[:25], "...")
asyncio.run(main())
Page 256
import tkinter
count = 0
root = None
label1 = None
def stopProg(e):
global root
root.destroy()
def transfertext(e):
global root, label1
global count
count = count+1
label1.configure(text=count)
def runGUI():
global root, label1
root = tkinter.Tk()
button1 = tkinter.Button(root, text="Exit")
button1.pack()
button1.bind('<Button-1>', stopProg)
button2 = tkinter.Button(root, text="Click Me")
button2.pack()
button2.bind('<Button-1>', transfertext)
label1 = tkinter.Label(root, text="nothing to say")
label1.pack()
root.mainloop()
runGUI()
Page 257
import asyncio
import tkinter
count = 0
root = None
label1 = None
T1 = None
def stopProg(e):
global T1
global root
T1.cancel()
root.quit()
def transfertext(e):
global root, label1
global count
count = count+1
label1.configure(text=count)
async def updater(interval):
global root
while True:
root.update()
await asyncio.sleep(interval)
def setupGUI():
global root, label1
root = tkinter.Tk()
button1 = tkinter.Button(root, text="Exit")
button1.pack()
button1.bind('<Button-1>', stopProg)
button2 = tkinter.Button(root, text="Click Me")
button2.pack()
button2.bind('<Button-1>', transfertext)
label1 = tkinter.Label(root, text="nothing to say")
label1.pack()
async def tick():
i = 0
while True:
i += 1
print("TICK", i)
await asyncio.sleep(0.5)
async def main():
global T1, root
root.tk.willdispatch()
T1 = asyncio.create_task(updater(0.01))
T2 = asyncio.create_task(tick())
await asyncio.wait((T1,))
setupGUI()
asyncio.run(main())
Page 259
import asyncio
import tkinter
import threading
count = 0
root = None
label1 = None
def stopProg(e):
global root
root.quit()
def transfertext(e):
global label1
global count
count = count+1
label1.configure(text=count)
def setupGUI():
global root, label1
root = tkinter.Tk()
button1 = tkinter.Button(root, text="Exit")
button1.pack()
button1.bind('<Button-1>', stopProg)
button2 = tkinter.Button(root, text="Click Me")
button2.pack()
button2.bind('<Button-1>', transfertext)
label1 = tkinter.Label(root, text="nothing to say")
label1.pack()
async def tick():
i = 0
while True:
i += 1
print("TICK", i)
await asyncio.sleep(0.5)
def runasyncio():
asyncio.run(main())
async def main():
T2 = asyncio.create_task(tick())
await asyncio.wait((T2,))
setupGUI()
t1 = threading.Thread(target=runasyncio, daemon=True)
t1.start()
root.mainloop()
Page 262
import asyncio
async def main():
p = await asyncio.create_subprocess_exec("dir", "/",
stdout=asyncio.subprocess.PIPE)
stdout_data, stderr_data = await p.communicate()
print(stdout_data)
print("finished")
asyncio.run(main())
Page 263
import asyncio
async def main():
p = await asyncio.create_subprocess_exec("./myscript.sh",
stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
ans = await p.stdout.readline()
print("message", ans)
p.stdin.write(b"mike\n")
ans = await p.stdout.readline()
print("message", ans)
asyncio.run(main())
Page 264
import asyncio
async def main():
p = await asyncio.create_subprocess_exec("./myscript.sh",stdout=asyncio.subprocess.PIPE,stdin=asyncio.subprocess.PIPE)
T=asyncio.create_task(p.stdout.read(100))
try:
done,pending=await asyncio.wait([T],timeout= 0.1)
except asyncio.TimeoutError:
pass
print("message",T.result())
p.stdin.write(b"mike\n")
ans= await p.stdout.readline()
print("message",ans)
await p.wait()
asyncio.run(main())
Page 268
import asyncio
def myFunction(value):
print(value)
myLoop = asyncio.new_event_loop()
myLoop.call_later(3, myFunction, 43)
t = myLoop.time()+2
myLoop.call_at(t, myFunction, 44)
myLoop.call_soon(myFunction, 42)
myLoop.call_soon(myFunction, 45)
myLoop.run_forever()
Page 269
import asyncio
async def myFunction1(value):
await asyncio.sleep(0)
print(value)
async def myFunction2(value):
print(value)
await asyncio.sleep(0)
myLoop = asyncio.new_event_loop()
T1 = myLoop.create_task(myFunction1(42))
T2 = myLoop.create_task(myFunction2(43))
myLoop.run_forever()
Page 271
import asyncio
import threading
import time
def myMakeTask(loop, cor, value):
loop.call_soon_threadsafe(lambda: loop.create_task(cor(value)))
async def myFunction1(value):
await asyncio.sleep(0)
print(value)
async def myFunction2(value):
print(value)
await asyncio.sleep(0)
myLoop = None
def myThreadLoop():
global myLoop
myLoop = asyncio.new_event_loop()
T1 = myLoop.create_task(myFunction1(42))
T2 = myLoop.create_task(myFunction2(43))
myLoop.run_forever()
t = threading.Thread(target=myThreadLoop)
t.start()
time.sleep(0)
while not myLoop:
pass
myMakeTask(myLoop, myFunction1, 44)
t.join()
Page 272
import asyncio
import concurrent.futures
def myFunction1(value):
print(value)
return value
if __name__ == '__main__':
pool = concurrent.futures.ProcessPoolExecutor()
myLoop = asyncio.new_event_loop()
T1 = myLoop.run_in_executor(pool, myFunction1, 42)
T2 = myLoop.run_in_executor(pool, myFunction1, 43)
myLoop.run_until_complete(asyncio.wait([T1, T2]))
print(T1.result())
print(T2.result())
pool.shutdown()
Page 273
import asyncio
import concurrent.futures
import time
def myPi(m, n):
pi = 0
for k in range(m, n+1):
s = 1 if k % 2 else -1
pi += s / (2 * k - 1)
return pi*4
if __name__ == '__main__':
pool = concurrent.futures.ProcessPoolExecutor()
myLoop = asyncio.new_event_loop()
N = 10000000
with concurrent.futures.ProcessPoolExecutor() as pool:
t1 = time.perf_counter()
T1 = myLoop.run_in_executor(pool, myPi, 1, N//2)
T2 = myLoop.run_in_executor(pool, myPi, N//2+1, N)
myLoop.run_until_complete(asyncio.wait([T1, T2]))
t2 = time.perf_counter()
PI = T1.result()
PI += T2.result()
print((t2-t1)*1000)
print(PI)
Page 276
import asyncio
async def main():
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
lambda: asyncio.DatagramProtocol(),
local_addr=("192.168.253.20", 8080))
data = b"Hello UDP World"
transport.sendto(data, addr=("192.168.253.14", 8080))
transport.close()
asyncio.run(main())
Page 277
import asyncio
class ClientDatagramProtocol(asyncio.DatagramProtocol):
def datagram_received(self, data, addr):
message = data.decode("utf8")
print("Received", message, "from", addr)
async def main():
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
lambda: ClientDatagramProtocol(),
local_addr=('192.168.253.14', 8080))
await asyncio.sleep(1000)
transport.close()
asyncio.run(main())
Page 279
import asyncio
async def main():
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
lambda: asyncio.DatagramProtocol(),
local_addr=("192.168.253.20", 8080), allow_broadcast=True)
sock = transport.get_extra_info('socket')
print(sock)
data = b"Hello UDP World"
await loop.sock_connect(sock, ("192.168.253.255", 8080))
await loop.sock_sendall(sock, data)
transport.close()
asyncio.run(main())
Page 280
import asyncio
import socket
async def main():
loop = asyncio.get_running_loop()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
data = b"Hello UDP World"
await loop.sock_connect(sock, ("192.168.253.255", 8080))
await loop.sock_sendall(sock, data)
asyncio.run(main())