多线程读取大文件,尤其是对日志文件分析很有用。

  我在之前的公司里工作的时候,他们要求我做一个能够分析IIS日志的程序,可我做来做去,也只能做到,1个G的文件读取在140秒左右。愁了很久,想到了用多线程读取大文件的方法,又发现文件读取流可以有很多个,于是我就开始编写,写出来,其效率 4核CPU,可以达到14秒,提高了10倍。

  而且据我测试发现,我4个CPU,用8个线程读取效率最高,所以我想会不会是一个CPU挂2个文件流效率最高,故基本逻辑是这样的,我根据CPU核数,将文件分成8块,分成8块的时候,必定出现,一行被截断的情况,针对这种情况,我采用一次初查,根据8个位点,倒着查出‘\n‘符号,查到了这个符号,那么,‘\n‘之前(包括‘\n‘)都是完整的行数,这样就能准确的放置出8个位置,然后用多线程和Stream中的Postion定位,就可以分出8块。

  之所以没用StreamReader,是因为Stream有1024字节的缓冲区,会使得位置无法读取准确。

  下面是代码,因需求不同,故用抽象类来表示,(注:我将初查也用抽象方法表示,以便增强其扩展性)

 

技术分享
  1  using System;
  2 using System.Collections.Generic;
  3 using System.Text;
  4 using System.Threading;
  5 using System.IO;
  6 namespace CommonLib.Threading.IO.ReadFile
  7 {
  8     /// <summary>
  9     /// 多线程文件读取器
 10     /// </summary>
 11     public abstract class FileReader
 12     {
 13         private string filePath;
 14         private List<FileReadPoint> readPoint=new List<FileReadPoint>();
 15         private bool isStart;
 16         private int threadCompleteCount;
 17         public event EventHandler FileReadEnd;//文件读取完成
 18         public bool IsStart
 19         {
 20             get { return isStart; }
 21         }
 22         public string FilePath
 23         {
 24             get { return filePath; }
 25         }
 26         private FileReader()
 27         { 
 28         }
 29         public FileReader(string filePath)
 30         {
 31             this.filePath = filePath;
 32         }
 33         /// <summary>
 34         /// 获取读取文件的起始点和结束点
 35         /// 文件起始点会在参数point中给出
 36         /// </summary>
 37         /// <param name="point">读取文件的起始点和结束点</param>
 38         /// <param name="stream">文件流</param>
 39         /// <param name="length">文件长度</param>
 40         protected abstract void GetPoint(FileReadPoint point,FileStream stream,long length);
 41         /// <summary>
 42         /// 设置文件读取起始点
 43         /// </summary>
 44         /// <param name="stream"></param>
 45         /// <returns></returns>
 46         protected virtual int SetStartPoint(FileStream stream)
 47         {
 48             return 0;
 49         }
 50         /// <summary>
 51         /// 对已用多线程分块读取的文件做的处理
 52         /// </summary>
 53         /// <param name="threadStream"></param>
 54         protected abstract void DoFileRead(ThreadStream threadStream);
 55 
 56         /// <summary>
 57         /// 初始化分块读取文件的点
 58         /// </summary>
 59         /// <returns></returns>
 60         public bool Create()
 61         {
 62             FileInfo fileInfo = new FileInfo(filePath);
 63             fileInfo.Refresh();
 64             if (fileInfo.Exists)
 65             {
 66                 filePath = fileInfo.FullName;
 67                 using (FileStream stream = new FileStream(filePath, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite))
 68                 {
 69                     if (readPoint.Count != 0)
 70                     {
 71                         readPoint.Clear();
 72                     }
 73                     long startPoint = SetStartPoint(stream);
 74                     long length = stream.Length;
 75                     while (startPoint < length)
 76                     {
 77                         stream.Position = startPoint;
 78                         FileReadPoint fPoint = new FileReadPoint();
 79                         fPoint.StartPoint = startPoint;
 80                         GetPoint(fPoint, stream, length);
 81                         if (fPoint.StartPoint + fPoint.ReadCount > length)
 82                         {
 83                             fPoint.ReadCount = length - fPoint.StartPoint;
 84                         }
 85                         readPoint.Add(fPoint);
 86                         startPoint = fPoint.StartPoint + fPoint.ReadCount;
 87                     }
 88                 }
 89                 return true;
 90             }
 91             else
 92             {
 93                 return false;
 94             }
 95         }
 96         /// <summary>
 97         /// 启动多线程文件读取
 98         /// </summary>
 99         public void StartRead()
100         {
101             if (!isStart)
102             {
103                 threadCompleteCount = 0;
104                 foreach (FileReadPoint fp in readPoint)
105                 {
106                     Thread thread = new Thread(OnReadFile);
107                     thread.IsBackground = true;
108                     thread.SetApartmentState(ApartmentState.MTA);
109                     thread.Start(fp);
110                 }
111                 isStart = true;
112             }
113         }
114  
115 
116         [MTAThread()]
117         private void OnReadFile(object obj)
118         {
119             FileReadPoint fp = obj as FileReadPoint;
120             if (fp != null)
121             {
122                 using (FileStream stream = new FileStream(filePath, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite))
123                 {
124                     stream.Position = fp.StartPoint;
125                     ThreadStream threadStream = new ThreadStream(stream, fp);
126                     DoFileRead(threadStream);
127                 }
128             }
129             if (FileReadEnd != null)
130             {
131                 lock (readPoint)
132                 {
133                     threadCompleteCount++;
134                     if (threadCompleteCount == readPoint.Count)
135                     {
136                         FileReadEnd(this, new EventArgs());
137                     }
138                 }
139             }
140         }
141 
142     }
143 
144     public class FileReadPoint
145     {
146         private long startPoint = 0L;
147 
148         public long StartPoint
149         {
150             get { return startPoint; }
151             set { startPoint = value; }
152         }
153 
154         private long readCount = 1L;
155 
156         public long ReadCount
157         {
158             get { return readCount; }
159             set {
160                 if (value >= 1)
161                 {
162                     readCount = value;//readCount必须大于1
163                 }
164             }
165         }
166     }
167 
168     public sealed class ThreadStream
169     {
170         private int MAXBLOCK = 1024 * 1024 * 4;
171 
172         private FileStream fileStream;
173         private FileReadPoint fPoint;
174         private long currentCount = 0L;
175 
176         public FileReadPoint FPoint
177         {
178             get { return fPoint; }
179         }
180 
181         private ThreadStream()
182         { 
183         }
184 
185         public ThreadStream(FileStream stream, FileReadPoint point)
186         {
187             this.fileStream = stream;
188             this.fPoint = point;
189         }
190 
191         /// <summary>
192         /// 读取剩余的所有字节
193         /// </summary>
194         /// <returns></returns>
195         public byte[] ReadAll()
196         {
197             if (currentCount < fPoint.ReadCount)
198             {
199                 long lastCount = fPoint.ReadCount - currentCount;
200                 byte[] data = new byte[lastCount];
201                 long currentDataIndex = 0L;
202                 while (lastCount > MAXBLOCK)
203                 {
204                     AddData(MAXBLOCK,data, currentDataIndex);
205                     lastCount = lastCount - MAXBLOCK;
206                     currentDataIndex += MAXBLOCK;
207                 }
208                 if (lastCount > 0)
209                 {
210                     AddData((int)lastCount, data, currentDataIndex);
211                 }
212                 currentCount = fPoint.ReadCount;
213                 return data;
214             }
215             else
216             {
217                 return null;
218             }
219         }
220 
221         /// <summary>
222         /// 分块读取字节
223         /// </summary>
224         /// <param name="block"></param>
225         /// <returns></returns>
226         public byte[] Read(int block)
227         {
228             if (currentCount < fPoint.ReadCount)
229             {
230                 int currentBlock = block;
231                 if (currentCount + block > fPoint.ReadCount)
232                 {
233                     currentBlock = (int)(fPoint.ReadCount - currentCount);
234                 }
235                 byte[] data = new byte[currentBlock];
236                 fileStream.Read(data, 0, data.Length);
237                 currentCount += currentBlock;
238                 return data;
239 
240             }
241             else
242             {
243                 return null;
244             }
245         }
246 
247         private void AddData(int block,byte[] data, long currentDataIndex)
248         {
249             byte[] cutData = Read(block);
250             Array.Copy(cutData, 0, data, currentDataIndex, cutData.Length);
251         }
252 
253     }
254 }
View Code

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。