PYTHON多线程处理文件

一个几十G的文件想用Python多线程读取提高处理效率,得到的结果总是不如预期。在毛帅的提醒下才发现一个进程启动的线程将共享文件句柄,A线程对文件的操作(即使是读)也将影响到B线程。如图,图片来自毛帅:

    

测试代码如下:

# -*- coding: UTF-8 -*-

def threadFunc1(demo, threadnum, startlinenum, deallinenum):
    # 行数计数器
    line = 0
    # skip若干行
    while line < startlinenum:
        lineStr = demo.readline()
        line += 1
    # deal指定行数
    while line < deallinenum:
        lineStr = demo.readline()
        line += 1
        print "Thread-%s:%s" % (threadnum, lineStr)

def threadFunc2(demo, threadnum, startlinenum, deallinenum):
    linenum = 1
    for line in demo:
        if linenum >= startlinenum and linenum < (startlinenum+deallinenum):
            print "Thread-%s:%s" % (threadnum, line)
        elif (linenum >= (startlinenum+deallinenum)):
            break;
        linenum = linenum+1

import threading

if __name__ == "__main__":
    # 初始化demo文件
    fileLoc = ‘demo.txt‘
    demo = open(fileLoc,‘w‘)
    for i in range(100000):
            demo.write("Line:"+str(i)+‘\n‘)
    demo.close()


    # 预读文件总行数
    demo = open(fileLoc,‘r‘)
    filetotalnum = 0
    for line in demo:
            filetotalnum = filetotalnum + 1
    demo.close()


    # 设定线程数
    TOTAL_THREAD = 6
    # 在主进程总打开文件,将文件句柄传至线程启动的函数中
    demo = open(fileLoc,‘r‘)
    for threadnum in range(TOTAL_THREAD):
        # 每个线程处理的行数
        deallinenum = filetotalnum / (TOTAL_THREAD-1)
        # 最后一个线程处理剩余部分
        leftnum = filetotalnum % (TOTAL_THREAD-1)
        if(threadnum != (TOTAL_THREAD-1)):
            # 实例化线程
            t = threading.Thread(target=threadFunc2,args=(demo, threadnum, threadnum*deallinenum, threadnum*deallinenum+deallinenum))
            # 启动
            t.start()
            # 等待线程结束后主进程退出
            t.join()
        else :
            t = threading.Thread(target=threadFunc2,args=(demo, threadnum, threadnum*deallinenum, threadnum*deallinenum+leftnum))
            t.start()
            t.join()

threadFunc2函数中利用句柄参数读取文件,但结果是有些线程有正常输出,其它一些线程则无输出。目测原因是线程切换的随机性影响了本来应正常进行读取的线程(例如一些线程正好定位到属于其处理范围的行时发生线程切换,而其它线程获得时间片后在这个位置基础上继续处理)。

threadFunc1使用readLine进行读取,但本质同threadFunc2一样。

如果传给threadFuncx的并不是在主线程中打开的文件句柄而是一个文件路径,并在threadFuncx内部打开文件,结果表现为各个线程都有输出,但是输出的内容并非分配给其的文件片段的内容,有干涉。

细节上未进行深入研究,总之不加锁的情况下,python启动多线程读取文件会得到非预期结果。

至于最后的解决方案,是利用awk对文件进行“模数切割”,Mod的不同余数分别对应一个处理线程。

targetFile=xxx
threadNum=5    # 设定五个线程
for((i=0; i<threadNum; i++))
do
    awk ‘NR%n==t {print $0}‘ n=$threadNum t=$i $targetFile | python doTask.py &
done


郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。