多线程读取大文件,尤其是对日志文件分析很有用。
我在之前的公司里工作的时候,他们要求我做一个能够分析IIS日志的程序,可我做来做去,也只能做到,1个G的文件读取在140秒左右。愁了很久,想到了用多线程读取大文件的方法,又发现文件读取流可以有很多个,于是我就开始编写,写出来,其效率 4核CPU,可以达到14秒,提高了10倍。
而且据我测试发现,我4个CPU,用8个线程读取效率最高,所以我想会不会是一个CPU挂2个文件流效率最高,故基本逻辑是这样的,我根据CPU核数,将文件分成8块,分成8块的时候,必定出现,一行被截断的情况,针对这种情况,我采用一次初查,根据8个位点,倒着查出‘\n‘符号,查到了这个符号,那么,‘\n‘之前(包括‘\n‘)都是完整的行数,这样就能准确的放置出8个位置,然后用多线程和Stream中的Postion定位,就可以分出8块。
之所以没用StreamReader,是因为Stream有1024字节的缓冲区,会使得位置无法读取准确。
下面是代码,因需求不同,故用抽象类来表示,(注:我将初查也用抽象方法表示,以便增强其扩展性)
1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4 using System.Threading; 5 using System.IO; 6 namespace CommonLib.Threading.IO.ReadFile 7 { 8 /// <summary> 9 /// 多线程文件读取器 10 /// </summary> 11 public abstract class FileReader 12 { 13 private string filePath; 14 private List<FileReadPoint> readPoint=new List<FileReadPoint>(); 15 private bool isStart; 16 private int threadCompleteCount; 17 public event EventHandler FileReadEnd;//文件读取完成 18 public bool IsStart 19 { 20 get { return isStart; } 21 } 22 public string FilePath 23 { 24 get { return filePath; } 25 } 26 private FileReader() 27 { 28 } 29 public FileReader(string filePath) 30 { 31 this.filePath = filePath; 32 } 33 /// <summary> 34 /// 获取读取文件的起始点和结束点 35 /// 文件起始点会在参数point中给出 36 /// </summary> 37 /// <param name="point">读取文件的起始点和结束点</param> 38 /// <param name="stream">文件流</param> 39 /// <param name="length">文件长度</param> 40 protected abstract void GetPoint(FileReadPoint point,FileStream stream,long length); 41 /// <summary> 42 /// 设置文件读取起始点 43 /// </summary> 44 /// <param name="stream"></param> 45 /// <returns></returns> 46 protected virtual int SetStartPoint(FileStream stream) 47 { 48 return 0; 49 } 50 /// <summary> 51 /// 对已用多线程分块读取的文件做的处理 52 /// </summary> 53 /// <param name="threadStream"></param> 54 protected abstract void DoFileRead(ThreadStream threadStream); 55 56 /// <summary> 57 /// 初始化分块读取文件的点 58 /// </summary> 59 /// <returns></returns> 60 public bool Create() 61 { 62 FileInfo fileInfo = new FileInfo(filePath); 63 fileInfo.Refresh(); 64 if (fileInfo.Exists) 65 { 66 filePath = fileInfo.FullName; 67 using (FileStream stream = new FileStream(filePath, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite)) 68 { 69 if (readPoint.Count != 0) 70 { 71 readPoint.Clear(); 72 } 73 long startPoint = SetStartPoint(stream); 74 long length = stream.Length; 75 while (startPoint < length) 76 { 77 stream.Position = startPoint; 78 FileReadPoint fPoint = new FileReadPoint(); 79 fPoint.StartPoint = startPoint; 80 GetPoint(fPoint, stream, length); 81 if (fPoint.StartPoint + fPoint.ReadCount > length) 82 { 83 fPoint.ReadCount = length - fPoint.StartPoint; 84 } 85 readPoint.Add(fPoint); 86 startPoint = fPoint.StartPoint + fPoint.ReadCount; 87 } 88 } 89 return true; 90 } 91 else 92 { 93 return false; 94 } 95 } 96 /// <summary> 97 /// 启动多线程文件读取 98 /// </summary> 99 public void StartRead() 100 { 101 if (!isStart) 102 { 103 threadCompleteCount = 0; 104 foreach (FileReadPoint fp in readPoint) 105 { 106 Thread thread = new Thread(OnReadFile); 107 thread.IsBackground = true; 108 thread.SetApartmentState(ApartmentState.MTA); 109 thread.Start(fp); 110 } 111 isStart = true; 112 } 113 } 114 115 116 [MTAThread()] 117 private void OnReadFile(object obj) 118 { 119 FileReadPoint fp = obj as FileReadPoint; 120 if (fp != null) 121 { 122 using (FileStream stream = new FileStream(filePath, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite)) 123 { 124 stream.Position = fp.StartPoint; 125 ThreadStream threadStream = new ThreadStream(stream, fp); 126 DoFileRead(threadStream); 127 } 128 } 129 if (FileReadEnd != null) 130 { 131 lock (readPoint) 132 { 133 threadCompleteCount++; 134 if (threadCompleteCount == readPoint.Count) 135 { 136 FileReadEnd(this, new EventArgs()); 137 } 138 } 139 } 140 } 141 142 } 143 144 public class FileReadPoint 145 { 146 private long startPoint = 0L; 147 148 public long StartPoint 149 { 150 get { return startPoint; } 151 set { startPoint = value; } 152 } 153 154 private long readCount = 1L; 155 156 public long ReadCount 157 { 158 get { return readCount; } 159 set { 160 if (value >= 1) 161 { 162 readCount = value;//readCount必须大于1 163 } 164 } 165 } 166 } 167 168 public sealed class ThreadStream 169 { 170 private int MAXBLOCK = 1024 * 1024 * 4; 171 172 private FileStream fileStream; 173 private FileReadPoint fPoint; 174 private long currentCount = 0L; 175 176 public FileReadPoint FPoint 177 { 178 get { return fPoint; } 179 } 180 181 private ThreadStream() 182 { 183 } 184 185 public ThreadStream(FileStream stream, FileReadPoint point) 186 { 187 this.fileStream = stream; 188 this.fPoint = point; 189 } 190 191 /// <summary> 192 /// 读取剩余的所有字节 193 /// </summary> 194 /// <returns></returns> 195 public byte[] ReadAll() 196 { 197 if (currentCount < fPoint.ReadCount) 198 { 199 long lastCount = fPoint.ReadCount - currentCount; 200 byte[] data = new byte[lastCount]; 201 long currentDataIndex = 0L; 202 while (lastCount > MAXBLOCK) 203 { 204 AddData(MAXBLOCK,data, currentDataIndex); 205 lastCount = lastCount - MAXBLOCK; 206 currentDataIndex += MAXBLOCK; 207 } 208 if (lastCount > 0) 209 { 210 AddData((int)lastCount, data, currentDataIndex); 211 } 212 currentCount = fPoint.ReadCount; 213 return data; 214 } 215 else 216 { 217 return null; 218 } 219 } 220 221 /// <summary> 222 /// 分块读取字节 223 /// </summary> 224 /// <param name="block"></param> 225 /// <returns></returns> 226 public byte[] Read(int block) 227 { 228 if (currentCount < fPoint.ReadCount) 229 { 230 int currentBlock = block; 231 if (currentCount + block > fPoint.ReadCount) 232 { 233 currentBlock = (int)(fPoint.ReadCount - currentCount); 234 } 235 byte[] data = new byte[currentBlock]; 236 fileStream.Read(data, 0, data.Length); 237 currentCount += currentBlock; 238 return data; 239 240 } 241 else 242 { 243 return null; 244 } 245 } 246 247 private void AddData(int block,byte[] data, long currentDataIndex) 248 { 249 byte[] cutData = Read(block); 250 Array.Copy(cutData, 0, data, currentDataIndex, cutData.Length); 251 } 252 253 } 254 }
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。