多线程
Python里,多个cpu考虑使用多线程,io(磁盘,网络通讯)多线程
线程执行是无序的,线程是不安全的,共同操作数据时没有检查机制,所有要加锁
默认情况下主线程结束子线程不结束,除非设置setDaemon(True)
import threading
def worker(a_tid,a_account):
print a_tid,a_account
for i in range(20):
th = threading.Thread(name='name',target=worker,args=(‘key’,), kwargs={'a_tid':'a', 'a_account':2} )
产生一个线程对象
#参数name线程名字,可以不写,参数 target 是要执行的函数, args 里面的是参数列表(一个参数需要加个逗号,代表是,列表),
kwargs是字典式的参数列表
th.start() 启动这个线程
# 多线程+队列版本:
import socket
import threading
from Queue import Queue
def scan(port):
s = socket.socket()
s.settimeout(0.1)
if s.connect_ex(('localhost', port)) == 0:
print port, 'open'
s.close()
def worker():
while not q.empty():
port = q.get() 获取数据队列分配的值
try:
scan(port)
finally:
q.task_done()
if __name__ == '__main__':
q = Queue()
map(q.put,xrange(1,65535)) 数据放入队列
threads = [threading.Thread(target=worker) for i in xrange(500)] 定义500个线程
map(lambda x:x.start(),threads) 启动线程
# q.join() 开启会慢
threading.Thread类的使用:
1,在自己的线程类的 __init__ 里调用 threading.Thread.__init__(self, name = threadname) # threadname 为线程的名字
2, run(),通常需要重写,编写代码实现做需要的功能。
3,getName(),获得线程对象名称
4,setName(),设置线程对象名称
5,start(),启动线程
6,join(timeout=None),等待另一线程结束后再运行。如果给出timeout,则最多阻塞timeout秒
7,setDaemon(bool),设置子线程是否随主线程一起结束,必须在start()之前调用。默认为 False
8,isDaemon(),判断线程是否随主线程一起结束
9,isAlive(),检查线程是否在运行中。
此外threading模块本身也提供了很多方法和其他的类,可以帮助我们更好的使用和管理线程。
可以参看https://docs.python.org/2/library/threading.html?highlight=threading#module-threading。
线程锁
import threading
lock = threading.Lock() 产生一个锁对象
share = [0,1]
num = 2
def AddNum():
global num
lock.acquire() 获得锁
share.append(num)
print 'share add:', num
print 'now:', share
num += 1
lock.release() 释放锁
Condition(锁上锁)
•对于消费者,在消费前检查队列是否为空。
•如果为空,调用condition实例的wait()方法。
•消费者进入wait(),同时释放所持有的lock(隐形释放,处于wait状态就会释放)。
•除非被notify,否则它不会运行。
• 生产者可以acquire这个lock,因为它已经被消费者release。
•当调用了condition的notify()方法后,消费者被唤醒,但唤醒不意味着它可以开始运行。
•notify()并不释放lock,调用notify()后,lock依然被生产者所持有。
•生产者通过condition.release()显式释放lock。
•消费者再次开始运行,同时申请锁,现在它可以得到队列中的数据而不会出现IndexError异常。
from threading import Condition, Thread
import time
import random
queue = []
condition = Condition()
class ConsumerThread(Thread):
def run(self):
global queue
while True:
condition.acquire()
if not queue:
print "Nothing in queue, consumer is waiting"
condition.wait() 如果queue为空,进入wait,同时释放锁,唤醒时同时获得锁
print "Producer added something to queue and notified the consumer"
num = queue.pop(0)
print "Consumed", num
condition.release()
time.sleep(random.random())
class ProducerThread(Thread):
def run(self):
nums = range(5)
global queue
while True:
condition.acquire() 获得锁
num = random.choice(nums)
queue.append(num)
print "Produced", num
condition.notify() 唤醒wait
condition.release() 释放锁
time.sleep(random.random())
ProducerThread().start()
ConsumerThread().start()
队列(Queue)
队列是线程安全
•在原来使用list的位置,改为使用Queue实例(下称队列)。
•这个队列有一个condition,它有自己的lock。如果你使用Queue,你不需要为condition和lock而烦恼。
•生生产者调用队列的put方法来插入数据。
•put(—) 在插入数据前有一个获取lock的逻辑。
•同时,put()也会检查队列是否已满。如果已满,它会在内部调用wait(),生产者开始等待。
• 消费者使用get方法。
•get()从队列中移出数据前会获取lock。
•get()会检查队列是否为空,如果为空,消费者进入等待状态。
•get()和put()都有适当的notify()。现在就去看Queue的源码吧。
from threading import Thread
import time
import random
from Queue import Queue
queue = Queue(10)
class ProducerThread(Thread):
def run(self):
nums = range(5)
global queue
while True:
num = random.choice(nums)
queue.put(num)
print "Produced", num
time.sleep(random.random())
class ConsumerThread(Thread):
def run(self):
global queue
while True:
num = queue.get()
#queue.task_done()
print "Consumed", num
time.sleep(random.random())
ProducerThread().start()
ConsumerThread().start()