基于生产-消费模式,使用Channel进行文件传输(Tcp方式)

06-01 1485阅读

Client端:

#region 多文件传输
public class FileMetadata
{
    public string FileName { get; set; }
    public long FileSize { get; set; }
}
class Program
{
    const int PORT = 8888;
    const int BUFFER_SIZE = 60 * 1024 * 1024;//15s-50  25s-64 33s-32 27s-50 31s-40 25s-60
    const int MAX_CHANNEL_CAPACITY = 1000;
    static async Task Main()
    {
        Console.WriteLine($"Client ready to send file ...");
        Stopwatch stopwatch = new Stopwatch();
        stopwatch.Start();
        var folderPath = @"D:\cuda";//"D:\TestImage\imagesbaiyou";
        await SendFolderAsync(folderPath, "192.168.10.147");
        stopwatch.Stop();
        Console.WriteLine($"Client Transfer file need {TimeSpan.FromMilliseconds(stopwatch.ElapsedMilliseconds)} Milliseconds");
        Console.ReadKey();
    }
    static async Task SendFolderAsync(string folderPath, string server)
    {
        using var client = new TcpClient();
        await client.ConnectAsync(server, PORT);
        using var stream = client.GetStream();
        int i = 1;
        foreach (var filePath in Directory.GetFiles(folderPath))
        {
            await SendFileAsync(filePath, stream);
            Console.WriteLine($"Send file {i++} ...");
        }
    }
    static async Task SendFileAsync(string filePath, NetworkStream stream)
    {
        var fileInfo = new FileInfo(filePath);
        var metadata = new FileMetadata
        {
            FileName = fileInfo.Name,
            FileSize = fileInfo.Length
        };
        // 发送元数据
        var metaJson = JsonSerializer.Serialize(metadata);
        var metaBytes = Encoding.UTF8.GetBytes(metaJson);
        await stream.WriteAsync(BitConverter.GetBytes(metaBytes.Length));
        await stream.WriteAsync(metaBytes);
        // 创建传输通道
        var channel = Channel.CreateBounded(MAX_CHANNEL_CAPACITY);
        var readTask = FileToChannelAsync(filePath, channel.Writer);
        var sendTask = ChannelToNetworkAsync(channel.Reader, stream);
        await Task.WhenAll(readTask, sendTask);
    }
    static async Task FileToChannelAsync(string path, ChannelWriter writer)
    {
        await using var fs = new FileStream(path, FileMode.Open);
        var buffer = new byte[BUFFER_SIZE];
        int bytesRead;
        while ((bytesRead = await fs.ReadAsync(buffer)) > 0)
        {
            var chunk = new byte[bytesRead];
            Buffer.BlockCopy(buffer, 0, chunk, 0, bytesRead);
            await writer.WriteAsync(chunk);
        }
        writer.Complete();
    }
    static async Task ChannelToNetworkAsync(ChannelReader reader, NetworkStream stream)
    {
        await foreach (var chunk in reader.ReadAllAsync())
        {
            await stream.WriteAsync(BitConverter.GetBytes(chunk.Length));
            await stream.WriteAsync(chunk);
        }
    }
}
#endregion

Server端:

 #region 多文件传输2
 /*
  优化性能 7.7GB 文件 平均时间约15s完成接受和传送
 传输时间和硬盘读写速度以及网络硬件成正比关系
 测试该电脑本地传输速度约15s,固态硬盘
 将其传输到2.0的U盘当中,传输573MB的图像约46s时间, 和单图直接拷贝的时间差不多±1s的时间
  */
 public class FileMetadata
 {
     public string FileName { get; set; }
     public long FileSize { get; set; }
 }
 class Program
 {
     const int PORT = 8888;
     const string SAVE_DIR = @"C:\Users\Leio\Desktop\ServerDownloads";
     const int BUFFER_SIZE = 1024 * 1024;
     const int MAX_CHANNEL_CAPACITY = 1000;
     static Stopwatch stopwatch = new Stopwatch();
     static async Task Main()
     {
         Directory.CreateDirectory(SAVE_DIR);
         var listener = new TcpListener(IPAddress.Any, PORT);
         listener.Start();
         Console.WriteLine("Server started,waiting client connect ...");
        
         while (true)
         {
             var client = await listener.AcceptTcpClientAsync();
             _ = ProcessClientAsync(client);
         }
     }
     static async Task ProcessClientAsync(TcpClient client)
     {
         stopwatch.Restart();
         using (client)
         using (var stream = client.GetStream())
         {
             try
             {
                 while (true)
                 {
                    
                     // 读取元数据
                     var metaSize = BitConverter.ToInt32(await ReadExactlyAsync(stream, 4));
                     var metadata = JsonSerializer.Deserialize(
                         Encoding.UTF8.GetString(await ReadExactlyAsync(stream, metaSize)));
                     // 创建传输通道
                     var channel = Channel.CreateBounded(MAX_CHANNEL_CAPACITY);
                     var savePath = Path.Combine(SAVE_DIR, metadata.FileName);
                     // 启动并行任务
                     var receiveTask = ReceiveFileDataAsync(stream, channel.Writer, metadata.FileSize);
                     var saveTask = SaveFileAsync(channel.Reader, savePath);
                     await Task.WhenAll(receiveTask, saveTask);
                     Console.WriteLine($"File saved: {savePath}");
                     //totalTime += stopwatch.ElapsedMilliseconds;
                 }
             }
             catch (EndOfStreamException)
             {
                 Console.WriteLine("Connection closed by client");
             }
         }
         stopwatch.Stop();
         Console.WriteLine($"Client Transfer file need {TimeSpan.FromMilliseconds(stopwatch.ElapsedMilliseconds)} s");
     }
     static async Task ReceiveFileDataAsync(
         NetworkStream stream,
         ChannelWriter writer,
         long totalSize)
     {
         try
         {
             long remaining = totalSize;
             while (remaining > 0)
             {
                 var chunkSize = BitConverter.ToInt32(await ReadExactlyAsync(stream, 4));
                 var chunkData = await ReadExactlyAsync(stream, chunkSize);
                 await writer.WriteAsync(chunkData);
                 remaining -= chunkSize;
             }
         }
         finally
         {
             writer.Complete();
         }
     }
     static async Task SaveFileAsync(ChannelReader reader, string savePath)
     {
         await using var fs = new FileStream(savePath, FileMode.Create);
         await foreach (var chunk in reader.ReadAllAsync())
         {
             await fs.WriteAsync(chunk);
         }
     }
     static async Task ReadExactlyAsync(NetworkStream stream, int count)
     {
         var buffer = new byte[count];
         int totalRead = 0;
         while (totalRead  
基于生产-消费模式,使用Channel进行文件传输(Tcp方式)
(图片来源网络,侵删)
基于生产-消费模式,使用Channel进行文件传输(Tcp方式)
(图片来源网络,侵删)
基于生产-消费模式,使用Channel进行文件传输(Tcp方式)
(图片来源网络,侵删)
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码