Python3多线程

向量数据库大模型机器学习

一.进程和线程

进程:是程序的一次执行,每个进程都有自己的地址空间、内存、数据栈及其他记录运行轨迹的辅助数据。

线程:所有的线程都运行在同一个进程当中,共享相同的运行环境。线程有开始、顺序执行和结束三个部分, 线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

由于单线程效率低,程序中往往要引入多线程编程。计算机的核心是CPU,它承担了所有的计算任务,它就像一座工厂,时刻运行着。假定工厂的电力有限,一次只能供给一个车间使用。也就是说,一个车间开工的时候,其他车间都必须停工。背后的含义就是,单个CPU一次只能运行一个任务。

进程就好比工厂的车间,它代表CPU所能处理的单个任务。任一时刻,CPU总是运行一个进程,其他进程处于非运行状态。一个车间里,可以有很多工人。他们协同完成一个任务。线程就好比车间里的工人,一个进程可以包括多个线程。

在Python3中实现的大部分运行任务里,不同的线程实际上并没有同时运行:它们只是看起来像是同时运行的。

大家很容易认为线程化是在程序上运行两个(或多个)不同的处理器,每个处理器同时执行一个独立的任务。这种理解并不完全正确,线程可能会在不同的处理器上运行,但一次只能运行一个线程。

同时执行多个任务需要使用非标准的Python运行方式:用不同的语言编写一部分代码,或者使用多进程模块multiprocessing,但这么做会带来一些额外的开销。

由于Python默认的运行环境是CPython(C语言开发的Python),所以线程化可能不会提升所有任务的运行速度。这是因为和GIL(Global Interpreter Lock)的交互形成了限制:一次只能运行一个Python线程。

线程化的一般替代方法是:让各项任务花费大量时间等待外部事件。但问题是,如果想缩短等待时间,会需要大量的CPU计算,结果是程序的运行速度可能并不会提升。

当代码是用Python语言编写并在默认执行环境CPython上运行时,会出现这种情况。如果线程代码是用C语言写的,那它们就能够释放GIL并同时运行。如果是在别的Python执行环境(如IPython, PyPy,Jython, IronPython)上运行,请参考相关文档了解它们是如何处理线程的。

如果只用Python语言在默认的Python执行环境下运行,并且遇到CPU受限的问题,那就应该用多进程模块multiprocessing来解决。

Python的标准库提供了两个模块:_thread和threading,_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。

二、thread模块

**
Python thread模块可以调用下述函数实现多线程开启。它将产生一个新线程,在新的线程中用指定的参数和可选的kwargs来调用这个函数。

  
start_new_thread(function, args kwargs=None)  

注意:使用这个方法时,一定要加time.sleep()函数,否则每个线程都可能不执行。此方法还有一个缺点,遇到较复杂的问题时,线程数不易控制。

  
# -*- coding: utf-8 -*-  
import _thread  
import time  
  
def fun1():  
    print("hello world %s" % time.ctime())  
      
#多线程  
def main():  
    _thread.start_new_thread(fun1, ()) #无参数  
    _thread.start_new_thread(fun1, ())  
    time.sleep(2)  
    print("over")  
  
#程序成功在同一时刻运行两个函数  
if __name__ == '\_\_main\_\_':  
    main()  
  

输出结果如下图所示:

  
hello world Sat Jan 16 10:50:39 2021  
hello world Sat Jan 16 10:50:39 2021  
over  
  

三、threading模块

**
thread模块存在一些缺点,尤其是线程数不能被控制。下面使用threading解决线程数可控制的问题。

  
threading模块  
简述:  
threading模块  
threading.currentThread(): 返回当前的线程变量。  
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。  
threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。  
  
   
  
  thread类  
  
    run(): 用以表示线程活动的方法。  
    start():启动线程活动。  
    join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。  
    isAlive(): 返回线程是否活动的。  
    getName(): 返回线程名。  
    setName(): 设置线程名。  
  

有两种方法来创建多线程:一种是继承Thread类,并重写它的run()方法;另一种是实例化threading.Thread对象时,将线程要执行的任务函数作为参数传入线程。

1、第一种是继承Thread类,并重写它的run()方法

  
#!/usr/bin/python3  
# -*- coding: utf-8 -*-  
  
"""  
@Datetime: 2021/1/16  
@Author: god\_mellon  
"""  
import threading  
import time  
  
''' \_\_init\_\_()方法总结:  
\_\_init\_\_方法:  
 使用方式:  
 def 类名:  
 #初始化函数,用来完成一些默认的设定  
 def \_\_init\_\_():  
 pass  
  
 \_\_init\_\_()方法,在创建一个对象时默认被调用,不需要手动调用  
 \_\_init\_\_(self)中,默认有1个参数名字为self,如果在创建对象时传递了2个实参,那么\_\_init\_\_(self)中出了self作为第一个形参外还需要2个形 参,例如\_\_init\_\_(self,x,y)  
 \_\_init\_\_(self)中的self参数,不需要开发者传递,python解释器会自动把当前的对象引用传递进去  
  
 '''  
class MyThread(threading.Thread):  
    def __init__(self, name):  
        threading.Thread.__init__(self)  
        self.name = name  
  
    def run(self):  # 定义每个线程要运行的函数  
  
        print("name:%s" % self.name)  
  
        time.sleep(3)  
  
  
if __name__ == '\_\_main\_\_':  
    t1 = MyThread('云云')  
    t2 = MyThread('憨憨')  
    t1.start()  
    t2.start()  
  

输出:

  
name:云云  
name:憨憨  
  

线程同步:

使用 Thread 对象的 Lock 和 Rlock 可以实现简单的线程同步,这两个对象都有 acquire 方法和 release 方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间。

  
#!/usr/bin/python3  
#-*- coding: utf-8 -*-  
  
"""  
@Datetime: 2021/1/16  
@Author: god\_mellon  
"""  
import threading  
import time  
  
  
class MyThread(threading.Thread):  
    def __init__(self, name,threadID,counter):  
        threading.Thread.__init__(self)  
        self.name = name  
        self.threadID = threadID  
        self.counter = counter  
    def run(self):  # 定义每个线程要运行的函数  
        print("开启线程:" + self.name)  
        # 获取锁,用于线程同步  
        threadLock.acquire()  
        print_time(self.name, self.counter, 5)  
        #print("name:%s" % self.name)  
        # 释放锁,开启下一个线程  
        threadLock.release()  
  
  
  
        time.sleep(2)  
  
  
def print_time(threadName, delay, counter):  
    while counter:  
        time.sleep(delay)  
        print ("%s: %s" % (threadName, time.ctime(time.time())))  
        counter -= 1  
threadLock = threading.Lock()  
threads = []  
  
if __name__ == '\_\_main\_\_':  
    # 创建新线程  
    t1 = MyThread('云云',1,1)  
    t2 = MyThread('憨憨',2,2)  
    # 开启新线程  
    t1.start()  
    t2.start()  
    # 添加线程到线程列表  
    threads.append(t1)  
    threads.append(t2)  
    # 等待所有线程完成  
    for t in threads:  
        t.join()  
    print("退出主线程")  
  

运行结果:

  
开启线程:云云  
开启线程:憨憨  
云云: Sun Jan 17 01:07:24 2021  
云云: Sun Jan 17 01:07:25 2021  
云云: Sun Jan 17 01:07:26 2021  
云云: Sun Jan 17 01:07:27 2021  
云云: Sun Jan 17 01:07:28 2021  
憨憨: Sun Jan 17 01:07:30 2021  
憨憨: Sun Jan 17 01:07:32 2021  
憨憨: Sun Jan 17 01:07:34 2021  
憨憨: Sun Jan 17 01:07:36 2021  
憨憨: Sun Jan 17 01:07:38 2021  
退出主线程  
  

Queue 模块中的常用方法:

  
  
    Queue.qsize() 返回队列的大小  
    Queue.empty() 如果队列为空,返回True,反之False  
    Queue.full() 如果队列满了,返回True,反之False  
    Queue.full 与 maxsize 大小对应  
    Queue.get([block[, timeout]])获取队列,timeout等待时间  
    Queue.get_nowait() 相当Queue.get(False)  
    Queue.put(item) 写入队列,timeout等待时间  
    Queue.put_nowait(item) 相当Queue.put(item, False)  
    Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号  
    Queue.join() 实际上意味着等到队列为空,再执行别的操作  
  

队列queue公共方法

  
  
import queue  
#创建基本队列  
#queue.Queue(maxsize=0)创建一个队列对象(队列容量),若maxsize小于或者等于0,队列大小没有限制  
Q=queue.Queue(10)  
print(Q)  
print(type(Q))  
  
#1.基本方法  
print(Q.queue)#查看队列中所有元素  
print(Q.qsize())#返回队列的大小  
print(Q.empty())#判断队空  
print(Q.full())#判断队满  
  
#2.获取队列,0--5  
#Queue.put(item,block = True,timeout = None )将对象放入队列,阻塞调用(block=False抛异常),无等待时间  
for i in range(5):  
    Q.put(i)  
 # Queue.put\_nowait(item)相当于 put(item, False).  
  
  
 #3.读队列,0--5  
 #Queue.get(block=True, timeout=None)读出队列的一个元素,阻塞调用,无等待时间  
while not Q.empty():  
    print(Q.get())  
 # Queue.get\_nowait()相当于get(False).取数据,如果没数据抛queue.Empty异常  
  
  
 #4.另两种涉及等待排队任务的方法  
 # Queue.task\_done()在完成一项工作后,向任务已经完成的队列发送一个信号  
 # Queue.join()阻止直到队列中的所有项目都被获取并处理。即等到队列为空再执行别的操作  
  
  
  

输出:

  
<queue.Queue object at 0x0000027A0A585408>  
<class 'queue.Queue'>  
deque([])  
0  
True  
False  
0  
1  
2  
3  
4  
  
Process finished with exit code 0  
  

线程优先级队列( Queue)

Python 的 Queue 模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue。

这些队列都实现了锁原语,能够在多线程中直接使用,可以使用队列来实现线程间的同步。

  
import queue  
import threading  
import time  
  
exitFlag = 0  
  
class myThread (threading.Thread):  
    def __init__(self, threadID, name, q):  
        threading.Thread.__init__(self)  
        self.threadID = threadID  
        self.name = name  
        self.q = q  
    def run(self):  
        print ("开启线程:" + self.name)  
        process_data(self.name, self.q)  
        print ("退出线程:" + self.name)  
  
def process_data(threadName, q):  
    while not exitFlag:  
        queueLock.acquire()  
        if not workQueue.empty():  
            data = q.get()  
            queueLock.release()  
            print ("%s processing %s" % (threadName, data))  
        else:  
            queueLock.release()  
        time.sleep(1)  
  
threadList = ["Thread-1", "Thread-2", "Thread-3"]  
nameList = ["One", "Two", "Three", "Four", "Five"]  
queueLock = threading.Lock()  
workQueue = queue.Queue(10)  
threads = []  
threadID = 1  
  
# 创建新线程  
for tName in threadList:  
    thread = myThread(threadID, tName, workQueue)  
    thread.start()  
    threads.append(thread)  
    threadID += 1  
  
# 填充队列  
#queueLock.acquire()  
for word in nameList:  
    workQueue.put(word)  
#queueLock.release()  
  
# 等待队列清空  
while not workQueue.empty():  
    pass  
  
# 通知线程是时候退出  
exitFlag = 1  
  
# 等待所有线程完成  
for t in threads:  
    t.join()  
print ("退出主线程")  

2、另一种是实例化threading.Thread对象时,将线程要执行的任务函数作为参数传入线程。

  
  
  
# -*- coding: utf-8 -*-  
  
# -*- coding: utf-8 -*-  
  
import threading  
import time  
from threading import Thread  
  
num = 10000  
  
  
def sub():  
    global num  
  
  
    while num:  
        # num -= 1 #没有IO操作,计算型任务,多线程不起作用,cpu会按照时间轮询切换,而这个操作时间太短,打不倒轮询的时间  
  
        temp = num  
        lock.acquire()  # 开启同步锁  
        num = temp-1  
        print(num)  
        #time.sleep(0.01) # IO阻塞,cpu会去执行其他线程,1秒时间足够长,所有线程拿到的temp都是100  
        lock.release() #释放同步锁  
  
l = []  
lock = threading.Lock()  
for i in range(100):  
    t = threading.Thread(target=sub)  
    t.start()  
    l.append(t)  
  
for t in l:  
    t.join()  
  
  

最近打算写一个C段扫描工具,学习了一下线程,喜欢网络安全和Python的兄弟可以关注一下我得公众号。
picture.image

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
云原生可观测性技术的落地实践
云原生技术和理念在近几年成为了备受关注的话题。应用通过云原生改造,变得更动态、弹性,可以更好地利用云的弹性能力。但是动态、弹性的环境也给应用以及基础设施的观测带来了更大的挑战。本次分享主要介绍了云原生社区中可观测性相关的技术和工具,以及如何使用这些工具来完成对云原生环境的观测。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论