这里只发布核心代码。源码及测试程序请点这里下载,谢谢。 有啥BUG,问题请发送email至ilovehaley.kid@gmail.com ,谢谢。:D 1 #define Sleep 2 //#undef Sleep 3 //#define TransmitLog 4 #undef TransmitLog 5 //#define BreakpointLog 6 #undef BreakpointLog 7 using System; 8 using System.Net; 9 using System.Net.Sockets; 10 using System.IO; 11 using System.Text; 12 using System.Threading; 13 using System.Collections.Generic; 14 using System.Diagnostics; 15 16 namespace Rocky 17 { 18 public static class FileTransmiter 19 { 20 #region NestedType 21 private class SendWorker : IWorker 22 { 23 private long totalSent, totalSend; 24 private byte[] buffer; 25 private Socket sock; 26 private FileStream reader; 27 private Thread thread; 28 private bool isFinished; 29 30 public long TotalSent 31 { 32 get { return totalSent; } 33 } 34 public long TotalSend 35 { 36 get { return totalSend; } 37 } 38 public byte[] Buffer 39 { 40 get { return buffer; } 41 } 42 public Socket Client 43 { 44 get { return sock; } 45 } 46 public bool IsFinished 47 { 48 get { return isFinished; } 49 } 50 51 public SendWorker(IPEndPoint ip) 52 { 53 sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 54 sock.Connect(ip); 55 buffer = new byte[BufferSize]; 56 } 57 public void Initialize(string path, long position, long length) 58 { 59 Initialize(path, position, length, 0L, length); 60 } 61 public void Initialize(string path, long position, long length, long worked, long total) 62 { 63 reader = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read); 64 reader.Position = position + worked; 65 totalSent = worked; 66 totalSend = total; 67 thread = new Thread(new ParameterizedThreadStart(Work)); 68 thread.IsBackground = true; 69 #if TransmitLog 70 thread.Name = position.ToString() + length.ToString(); 71 AppendTransmitLog(LogType.Transmit, thread.Name + " Initialized:" + totalSent + "/" + totalSend + "."); 72 #endif 73 } 74 private void Work(object obj) 75 { 76 int read, sent; 77 bool flag; 78 while (totalSent < totalSend) 79 { 80 read = reader.Read(buffer, 0, Math.Min(BufferSize, (int)(totalSend - totalSent))); 81 sent = 0; 82 flag = true; 83 while ((sent += sock.Send(buffer, sent, read, SocketFlags.None)) < read) 84 { 85 flag = false; 86 totalSent += (long)sent; 87 #if TransmitLog 88 AppendTransmitLog(LogType.Transmit, thread.Name + ":" + totalSent + "/" + totalSend + "."); 89 #endif 90 #if Sleep 91 Thread.Sleep(200); 92 #endif 93 } 94 if (flag) 95 { 96 totalSent += (long)read; 97 #if TransmitLog 98 AppendTransmitLog(LogType.Transmit, thread.Name + ":" + totalSent + "/" + totalSend + "."); 99 #endif 100 #if Sleep 101 Thread.Sleep(200); 102 #endif 103 } 104 } 105 reader.Dispose(); 106 sock.Shutdown(SocketShutdown.Both); 107 sock.Close(); 108 EventWaitHandle waitHandle = obj as EventWaitHandle; 109 if (waitHandle != null) 110 { 111 waitHandle.Set(); 112 } 113 isFinished = true; 114 } 115 116 public void ReportProgress(out long worked, out long total) 117 { 118 worked = totalSent; 119 total = totalSend; 120 } 121 122 public void RunWork(EventWaitHandle waitHandle) 123 { 124 thread.Start(waitHandle); 125 } 126 } 127 128 private class ReceiveWorker : IWorker 129 { 130 private long offset, totalReceived, totalReceive; 131 private byte[] buffer; 132 private Socket sock; 133 private FileStream writer; 134 private Thread thread; 135 private bool isFinished; 136 137 public long TotalReceived 138 { 139 get { return totalReceived; } 140 } 141 public long TotalReceive 142 { 143 get { return totalReceive; } 144 } 145 public byte[] Buffer 146 { 147 get { return buffer; } 148 } 149 public Socket Client 150 { 151 get { return sock; } 152 } 153 public bool IsFinished 154 { 155 get { return isFinished; } 156 } 157 158 public ReceiveWorker(Socket client) 159 { 160 sock = client; 161 buffer = new byte[BufferSize]; 162 } 163 public void Initialize(string path, long position, long length) 164 { 165 Initialize(path, position, length, 0L, length); 166 } 167 public void Initialize(string path, long position, long length, long worked, long total) 168 { 169 writer = new FileStream(path, FileMode.Open, FileAccess.Write, FileShare.Write); 170 writer.Position = position + worked; 171 writer.Lock(position, length); 172 offset = position; 173 totalReceived = worked; 174 totalReceive = total; 175 thread = new Thread(new ParameterizedThreadStart(Work)); 176 thread.IsBackground = true; 177 #if TransmitLog 178 thread.Name = position.ToString() + length.ToString(); 179 AppendTransmitLog(LogType.Transmit, thread.Name + " Initialized:" + totalReceived + "/" + totalReceive + "."); 180 #endif 181 } 182 private void Work(object obj) 183 { 184 int received; 185 while (totalReceived < totalReceive) 186 { 187 if ((received = sock.Receive(buffer)) == 0) 188 { 189 break; 190 } 191 writer.Write(buffer, 0, received); 192 writer.Flush(); 193 totalReceived += (long)received; 194 #if TransmitLog 195 AppendTransmitLog(LogType.Transmit, thread.Name +
":" + totalReceived + "/" + totalReceive + "."); 196 #endif 197 #if Sleep 198 Thread.Sleep(200); 199 #endif 200 } 201 writer.Unlock(offset, totalReceive); 202 writer.Dispose(); 203 sock.Shutdown(SocketShutdown.Both); 204 sock.Close(); 205 EventWaitHandle waitHandle = obj as EventWaitHandle; 206 if (waitHandle != null) 207 { 208 waitHandle.Set(); 209 } 210 isFinished = true; 211 } 212 213 public void ReportProgress(out long worked, out long total) 214 { 215 worked = totalReceived; 216 total = totalReceive; 217 } 218 219 public void RunWork(EventWaitHandle waitHandle) 220 { 221 thread.Start(waitHandle); 222 } 223 } 224 225 private interface IWorker 226 { 227 bool IsFinished { get; } 228 void Initialize(string path, long position, long length); 229 void Initialize(string path, long position, long length, long worked, long total); 230 void ReportProgress(out long worked, out long total); 231 void RunWork(EventWaitHandle waitHandle); 232 } 233 #endregion 234 235 #region Field 236 public const int BufferSize = 1024; 237 public const int PerLongCount = sizeof(long); 238 public const int MinThreadCount = 1; 239 public const int MaxThreadCount = 9; 240 public const string PointExtension = ".dat"; 241 public const string TempExtension = ".temp"; 242 private const long SplitSize = 1024L * 1024L * 100L; 243 public static readonly IPEndPoint TestIP; 244 #if TransmitLog 245 private static StreamWriter transmitLoger; 246 #endif 247 #if BreakpointLog 248 private static StreamWriter breakpointLoger; 249 #endif 250 #endregion 251 252 #region Constructor 253 static FileTransmiter() 254 { 255 AppDomain.CurrentDomain.UnhandledException += new UnhandledExceptionEventHandler(CurrentDomain_UnhandledException); 256 TestIP = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 520); 257 #if TransmitLog 258 transmitLoger = new StreamWriter(Path.Combine( AppDomain.CurrentDomain.BaseDirectory, "transmit.log"), true, Encoding.Default); 259 #endif 260 #if BreakpointLog 261 breakpointLoger = new StreamWriter(Path.Combine( AppDomain.CurrentDomain.BaseDirectory, "breakpoint.log"), true, Encoding.Default); 262 #endif 263 } 264 265 static void CurrentDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) 266 { 267 StreamWriter writer = new StreamWriter(Path.Combine( AppDomain.CurrentDomain.BaseDirectory, "exec.log"), true, Encoding.Default); 268 writer.Write("Time:"); 269 writer.Write(DateTime.Now.ToShortTimeString()); 270 writer.Write(". "); 271 writer.WriteLine(e.ExceptionObject); 272 writer.Dispose(); 273 } 274 275 #region Log 276 #if TransmitLog || BreakpointLog 277 public enum LogType 278 { 279 Transmit, 280 Breakpoint 281 } 282 283 public static void AppendTransmitLog(LogType type, string msg) 284 { 285 switch (type) 286 { 287 case LogType.Transmit: 288 #if TransmitLog 289 transmitLoger.Write(DateTime.Now.ToShortTimeString()); 290 transmitLoger.Write('\t'); 291 transmitLoger.WriteLine(msg); 292 transmitLoger.Flush(); 293 #endif 294 break; 295 case LogType.Breakpoint: 296 #if BreakpointLog 297 breakpointLoger.Write(DateTime.Now.ToShortTimeString()); 298 breakpointLoger.Write('\t'); 299 breakpointLoger.WriteLine(msg); 300 breakpointLoger.Flush(); 301 #endif 302 break; 303 } 304 } 305 #endif 306 #endregion 307 #endregion 308 309 #region Single 310 public static void Send(IPEndPoint ip, string path) 311 { 312 Stopwatch watcher = new Stopwatch(); 313 watcher.Start(); 314 Socket sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 315 sock.Connect(ip); 316 byte[] buffer = new byte[BufferSize]; 317 using (FileStream reader = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.None)) 318 { 319 long send, length = reader.Length; 320 Buffer.BlockCopy(BitConverter.GetBytes(length), 0, buffer, 0, PerLongCount); 321 string fileName = Path.GetFileName(path); 322 sock.Send(buffer, 0, PerLongCount +
Encoding.Default.GetBytes(fileName, 0, fileName.Length, buffer, PerLongCount), SocketFlags.None); 323 Console.WriteLine("Sending file:" + fileName + ".Plz wait..."); 324 sock.Receive(buffer); 325 reader.Position = send = BitConverter.ToInt64(buffer, 0); 326 #if BreakpointLog 327 Console.WriteLine("Breakpoint " + reader.Position); 328 #endif 329 int read, sent; 330 bool flag; 331 while ((read = reader.Read(buffer, 0, BufferSize)) != 0) 332 { 333 sent = 0; 334 flag = true; 335 while ((sent += sock.Send(buffer, sent, read, SocketFlags.None)) < read) 336 { 337 flag = false; 338 send += (long)sent; 339 #if TransmitLog 340 Console.WriteLine("Sent " + send + "/" + length + "."); 341 #endif 342 #if Sleep 343 Thread.Sleep(200); 344 #endif 345 } 346 if (flag) 347 { 348 send += (long)read; 349 #if TransmitLog 350 Console.WriteLine("Sent " + send + "/" + length + "."); 351 #endif 352 #if Sleep 353 Thread.Sleep(200); 354 #endif 355 } 356 } 357 } 358 sock.Shutdown(SocketShutdown.Both); 359 sock.Close(); 360 watcher.Stop(); 361 Console.WriteLine("Send finish.Span Time:" + watcher.Elapsed.TotalMilliseconds + " ms."); 362 } 363 364 public static void Receive(IPEndPoint ip, string path) 365 { 366 Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 367 listener.Bind(ip); 368 listener.Listen(MinThreadCount); 369 Socket client = listener.Accept(); 370 Stopwatch watcher = new Stopwatch(); 371 watcher.Start(); 372 byte[] buffer = new byte[BufferSize]; 373 int received = client.Receive(buffer); 374 long receive, length = BitConverter.ToInt64(buffer, 0); 375 string fileName = Encoding.Default.GetString(buffer, PerLongCount, received - PerLongCount); 376 Console.WriteLine("Receiveing file:" + fileName + ".Plz wait..."); 377 FileInfo file = new FileInfo(Path.Combine(path, fileName)); 378 using (FileStream writer = file.Open(file.Exists ? FileMode.Append : FileMode.CreateNew, FileAccess.Write, FileShare.None)) 379 { 380 receive = writer.Length; 381 client.Send(BitConverter.GetBytes(receive)); 382 #if BreakpointLog 383 Console.WriteLine("Breakpoint " + receive); 384 #endif 385 while (receive < length) 386 { 387 if ((received = client.Receive(buffer)) == 0) 388 { 389 Console.WriteLine("Send Stop."); 390 return; 391 } 392 writer.Write(buffer, 0, received); 393 writer.Flush(); 394 receive += (long)received; 395 #if TransmitLog 396 Console.WriteLine("Received " + receive + "/" + length + "."); 397 #endif 398 #if Sleep 399 Thread.Sleep(200); 400 #endif 401 } 402 } 403 client.Shutdown(SocketShutdown.Both); 404 client.Close(); 405 watcher.Stop(); 406 Console.WriteLine("Receive finish.Span Time:" + watcher.Elapsed.TotalMilliseconds + " ms."); 407 } 408 #endregion 409 410 #region Supper 411 #region Extensions 412 private static int ReportProgress(this IWorker[] workers, out long worked, out long total) 413 { 414 worked = total = 0L; 415 long w, t; 416 foreach (IWorker worker in workers) 417 { 418 worker.ReportProgress(out w, out t); 419 worked += w; 420 total += t; 421 } 422 return (int)(worked / total) * 100; 423 } 424 private static int ReportSpeed(this IWorker[] workers, ref long oldValue) 425 { 426 long w, t; 427 workers.ReportProgress(out w, out t); 428 int speed = (int)((w - oldValue) / 8L); 429 oldValue = w; 430 return speed; 431 } 432 private static bool IsAllFinished(this IWorker[] workers) 433 { 434 bool flag = true; 435 foreach (IWorker worker in workers) 436 { 437 if (!worker.IsFinished) 438 { 439 flag = false; 440 break; 441 } 442 } 443 return flag; 444 } 445 #endregion 446 447 #region Helper 448 public static void Write(long value, byte[] buffer, int offset) 449 { 450 buffer[offset++] = (byte)value; 451 buffer[offset++] = (byte)(value >> 8); 452 buffer[offset++] = (byte)(value >> 0x10); 453 buffer[offset++] = (byte)(value >> 0x18); 454 buffer[offset++] = (byte)(value >> 0x20); 455 buffer[offset++] = (byte)(value >> 40); 456 buffer[offset++] = (byte)(value >> 0x30); 457 buffer[offset] = (byte)(value >> 0x38); 458 } 459 public static void Read(out long value, byte[] buffer, int offset) 460 { 461 uint num = (uint)(((buffer[offset++] | (buffer[offset++] << 8)) | (buffer[offset++] << 0x10)) | (buffer[offset++] << 0x18)); 462 uint num2 = (uint)(((buffer[offset++] | (buffer[offset++] << 8)) | (buffer[offset++] << 0x10)) | (buffer[offset] << 0x18)); 463 value = (long)((num2 << 0x20) | num); 464 } 465 #endregion 466 467 public static int GetThreadCount(long fileSize) 468 { 469 int count = (int)(fileSize / SplitSize); 470 if (count < MinThreadCount) 471 { 472 count = MinThreadCount; 473 } 474 else if (count > MaxThreadCount) 475 { 476 count = MaxThreadCount; 477 } 478 return count; 479 } 480 481 public static void SupperSend(IPEndPoint ip, string path) 482 { 483 Stopwatch watcher = new Stopwatch(); 484 watcher.Start(); 485 FileInfo file = new FileInfo(path); 486 #if DEBUG 487 if (!file.Exists) 488 { 489 throw new FileNotFoundException(); 490 } 491 #endif 492 SendWorker worker = new SendWorker(ip); 493 long fileLength = file.Length; 494 Buffer.BlockCopy(BitConverter.GetBytes(fileLength), 0, worker.Buffer, 0, PerLongCount); 495 string fileName = file.Name; 496 worker.Client.Send(worker.Buffer, 0, PerLongCount + Encoding.Default.GetBytes(fileName, 0, fileName.Length, worker.Buffer, PerLongCount), SocketFlags.None); 497 Console.WriteLine("Sending file:" + fileName + ".Plz wait..."); 498 int threadCount = GetThreadCount(fileLength); 499 SendWorker[] workers = new SendWorker[threadCount]; 500 for (int i = 0; i < threadCount; i++) 501 { 502 workers[i] = i == 0 ? worker : new SendWorker(ip); 503 } 504 #region Breakpoint 505 int perPairCount = PerLongCount * 2, count = perPairCount * threadCount; 506 byte[] bufferInfo = new byte[count]; 507 long oddSize, avgSize = Math.DivRem(fileLength, (long)threadCount, out oddSize); 508 if (worker.Client.Receive(bufferInfo) == 4) 509 { 510 for (int i = 0; i < threadCount; i++) 511 { 512 workers[i].Initialize(path, i * avgSize, i == threadCount - 1 ? avgSize + oddSize : avgSize); 513 } 514 } 515 else 516 { 517 long w, t; 518 for (int i = 0; i < threadCount; i++) 519 { 520 Read(out w, bufferInfo, i * perPairCount); 521 Read(out t, bufferInfo, i * perPairCount + PerLongCount); 522 workers[i].Initialize(path, i * avgSize, i == threadCount - 1 ? avgSize + oddSize : avgSize, w, t); 523 #if BreakpointLog 524 AppendTransmitLog(LogType.Breakpoint, i + " read:" + w + "/" + t + "."); 525 #endif 526 } 527 } 528 Thread.Sleep(200); 529 #endregion 530 AutoResetEvent reset = new AutoResetEvent(true); 531 for (int i = 0; i < threadCount; i++) 532 { 533 workers[i].RunWork(i == threadCount - 1 ? reset : null); 534 } 535 reset.WaitOne(); 536 #region Breakpoint 537 int speed; 538 long value = 0L; 539 do 540 { 541 speed = workers.ReportSpeed(ref value); 542 Console.WriteLine("waiting for other threads. Progress:" + value + "/" + fileLength + ";Speed:" + speed + "kb/s."); 543 Thread.Sleep(1000); 544 } 545 while (!workers.IsAllFinished()); 546 speed = workers.ReportSpeed(ref value); 547 Console.WriteLine("waiting for other threads. Progress:" + value + "/" + fileLength + ";Speed:" + speed + "kb/s."); 548 #endregion 549 watcher.Stop(); 550 Console.WriteLine("Send finish.Span Time:" + watcher.Elapsed.TotalMilliseconds + " ms."); 551 } 552 553 public static void SupperReceive(IPEndPoint ip, string path) 554 { 555 Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 556 listener.Bind(ip); 557 listener.Listen(MaxThreadCount); 558 ReceiveWorker worker = new ReceiveWorker(listener.Accept()); 559 Stopwatch watcher = new Stopwatch(); 560 watcher.Start(); 561 int recv = worker.Client.Receive(worker.Buffer); 562 long fileLength = BitConverter.ToInt64(worker.Buffer, 0); 563 string fileName = Encoding.Default.GetString(worker.Buffer, PerLongCount, recv - PerLongCount); 564 Console.WriteLine("Receiveing file:" + fileName + ".Plz wait..."); 565 int threadCount = GetThreadCount(fileLength); 566 ReceiveWorker[] workers = new ReceiveWorker[threadCount]; 567 for (int i = 0; i < threadCount; i++) 568 { 569 workers[i] = i == 0 ? worker : new ReceiveWorker(listener.Accept()); 570 } 571 #region Breakpoint 572 int perPairCount = PerLongCount * 2, count = perPairCount * threadCount; 573 byte[] bufferInfo = new byte[count]; 574 string filePath = Path.Combine(path, fileName), pointFilePath = Path.ChangeExtension(filePath, PointExtension), tempFilePath = Path.ChangeExtension(filePath, TempExtension); 575 FileStream pointStream; 576 long oddSize, avgSize = Math.DivRem(fileLength, (long)threadCount, out oddSize); 577 if (File.Exists(pointFilePath) && File.Exists(tempFilePath)) 578 { 579 pointStream = new FileStream(pointFilePath, FileMode.Open, FileAccess.ReadWrite, FileShare.None); 580 pointStream.Read(bufferInfo, 0, count); 581 long w, t; 582 for (int i = 0; i < threadCount; i++) 583 { 584 Read(out w, bufferInfo, i * perPairCount); 585 Read(out t, bufferInfo, i * perPairCount + PerLongCount); 586 workers[i].Initialize(tempFilePath, i * avgSize, i == threadCount - 1 ? avgSize + oddSize : avgSize, w, t); 587 #if BreakpointLog 588 AppendTransmitLog(LogType.Breakpoint, i + " read:" + w + "/" + t + "."); 589 #endif 590 } 591 worker.Client.Send(bufferInfo); 592 } 593 else 594 { 595 pointStream = new FileStream(pointFilePath, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None); 596 FileStream stream = new FileStream(tempFilePath, FileMode.CreateNew, FileAccess.Write, FileShare.Write); 597 stream.SetLength(fileLength); 598 stream.Flush(); 599 stream.Dispose(); 600 for (int i = 0; i < threadCount; i++) 601 { 602 workers[i].Initialize(tempFilePath, i * avgSize, i == threadCount - 1 ? avgSize + oddSize : avgSize); 603 } 604 worker.Client.Send(bufferInfo, 0, 4, SocketFlags.None); 605 } 606 Timer timer = new Timer(state => 607 { 608 long w, t; 609 for (int i = 0; i < threadCount; i++) 610 { 611 workers[i].ReportProgress(out w, out t); 612 Write(w, bufferInfo, i * perPairCount); 613 Write(t, bufferInfo, i * perPairCount + PerLongCount); 614 #if BreakpointLog 615 AppendTransmitLog(LogType.Breakpoint, i + " write:" + w + "/" + t + "."); 616 #endif 617 } 618 pointStream.Position = 0L; 619 pointStream.Write(bufferInfo, 0, count); 620 pointStream.Flush(); 621 622 }, null, TimeSpan.Zero, TimeSpan.FromSeconds(2)); 623 #endregion 624 AutoResetEvent reset = new AutoResetEvent(true); 625 for (int i = 0; i < threadCount; i++) 626 { 627 workers[i].RunWork(i == threadCount - 1 ? reset : null); 628 } 629 reset.WaitOne(); 630 #region Breakpoint 631 int speed; 632 long value = 0L; 633 do 634 { 635 speed = workers.ReportSpeed(ref value); 636 Console.WriteLine("waiting for other threads. Progress:" + value + "/" + fileLength + ";Speed:" + speed + "kb/s."); 637 Thread.Sleep(1000); 638 } 639 while (!workers.IsAllFinished()); 640 speed = workers.ReportSpeed(ref value); 641 Console.WriteLine("waiting for other threads. Progress:" + value + "/" + fileLength + ";Speed:" + speed + "kb/s."); 642 timer.Dispose(); 643 pointStream.Dispose(); 644 File.Delete(pointFilePath); 645 File.Move(tempFilePath, filePath); 646 #endregion 647 watcher.Stop(); 648 Console.WriteLine("Receive finish.Span Time:" + watcher.Elapsed.TotalMilliseconds + " ms."); 649 } 650 #endregion 651 } 652 }
PS: 1、通过测试发现多线程stream.Flush()的地方会阻塞, 可以尝试增大stream的缓冲区或手动管理调用stream的Flush()。 2、断点测试方法:先打开Server端,然后打开Client端发送; 中途任意关闭一端,再重复前一步即可。
|