首页  编辑  

理解I/O完成端口模型

Tags: /超级猛料/Network.网络通讯/TCP_IP/   Date Created:

理解I/O Completion Port

[原创]理解I/O Completion Port    nonocast(原作)

欢迎阅读此篇IOCP教程。我将先给出IOCP的定义然后给出它的实现方法,最后剖析一个Echo程序来为您拨开IOCP的谜云,除去你心中对IOCP的烦恼。OK,但我不能保证你明白IOCP的一切,但我会尽我最大的努力。以下是我会在这篇文章中提到的相关技术:

I/O端口

同步/异步

堵塞/非堵塞

服务端/客户端

多线程程序设计

Winsock API 2.0

在这之前,我曾经开发过一个项目,其中一块需要网络支持,当时还考虑到了代码的可移植性,只要使用select,connect,accept,listen,send还有recv,再加上几个#ifdef的封装以用来处理Winsock和BSD套接字[socket]中间的不兼容性,一个网络子系统只用了几个小时很少的代码就写出来了,至今还让我很回味。那以后很长时间也就没再碰了。

前些日子,我们策划做一个网络游戏,我主动承担下网络这一块,想想这还不是小case,心里偷着乐啊。网络游戏好啊,网络游戏为成百上千的玩家提供了乐趣和令人着秘的游戏体验,他们在线上互相战斗或是加入队伍去战胜共同的敌人。我信心满满的准备开写我的网络,于是乎,发现过去的阻塞同步模式模式根本不能拿到一个巨量多玩家[MMP]的架构中去,直接被否定掉了。于是乎,就有了IOCP,如果能过很轻易而举的搞掂IOCP,也就不会有这篇教程了。下面请诸位跟随我进入正题。

什么是IOCP

先让我们看看对IOCP的评价

I/O完成端口可能是Win32提供的最复杂的内核对象。

      [Advanced Windows 3rd] Jeffrey Richter

这是[IOCP]实现高容量网络服务器的最佳方法。

[Windows Sockets2.0:Write Scalable Winsock Apps Using Completion Ports]  

Microsoft Corporation

完成端口模型提供了最好的伸缩性。这个模型非常适用来处理数百乃至上千个套接字。

      [Windows网络编程2nd] Anthony Jones & Jim Ohlund

I/O completion ports特别显得重要,因为它们是唯一适用于高负载服务器[必须同时维护许多连接线路]的一个技术。Completion ports利用一些线程,帮助平衡由I/O请求所引起的负载。这样的架构特别适合用在SMP系统中产生的"scalable"服务器。

      [Win32多线程程序设计] Jim Beveridge & Robert Wiener

看来我们完全有理由相信IOCP是大型网络架构的首选。那IOCP到底是什么呢?

微软在Winsock2中引入了IOCP这一概念 。IOCP全称I/O Completion Port,中文译为I/O完成端口。IOCP是一个异步I/O的API,它可以高效地将I/O事件通知给应用程序。与使用select()或是其它异步方法不同的是,一个套接字[socket]与一个完成端口关联了起来,然后就可继续进行正常的Winsock操作了。然而,当一个事件发生的时候,此完成端口就将被操作系统加入一个队列中。然后应用程序可以对核心层进行查询以得到此完成端口。

这里我要对上面的一些概念略作补充,在解释[完成]两字之前,我想先简单的提一下同步和异步这两个概念,逻辑上来讲做完一件事后再去做另一件事就是同步,而同时一起做两件或两件以上事的话就是异步了。你也可以拿单线程和多线程来作比喻。但是我们一定要将同步和堵塞,异步和非堵塞区分开来,所谓的堵塞函数诸如accept(…),当调用此函数后,此时线程将挂起,直到操作系统来通知它,"HEY兄弟,有人连进来了",那个挂起的线程将继续进行工作,也就符合"生产者-消费者"模型。堵塞和同步看上去有两分相似,但却是完全不同的概念。大家都知道I/O设备是个相对慢速的设备,不论打印机,调制解调器,甚至硬盘,与CPU相比都是奇慢无比的,坐下来等I/O的完成是一件不甚明智的事情,有时候数据的流动率非常惊人,把数据从你的文件服务器中以Ethernet速度搬走,其速度可能高达每秒一百万字节,如果你尝试从文件服务器中读取100KB,在用户的眼光来看几乎是瞬间完成,但是,要知道,你的线程执行这个命令,已经浪费了10个一百万次CPU周期。所以说,我们一般使用另一个线程来进行I/O。重叠IO[overlapped I/O]是Win32的一项技术,你可以要求操作系统为你传送数据,并且在传送完毕时通知你。这也就是[完成]的含义。这项技术使你的程序在I/O进行过程中仍然能够继续处理事务。事实上,操作系统内部正是以线程来完成overlapped I/O。你可以获得线程所有利益,而不需要付出什么痛苦的代价。

完成端口中所谓的[端口]并不是我们在TCP/IP中所提到的端口,可以说是完全没有关系。我到现在也没想通一个I/O设备[I/O Device]和端口[IOCP中的Port]有什么关系。估计这个端口也迷惑了不少人。IOCP只不过是用来进行读写操作,和文件I/O倒是有些类似。既然是一个读写设备,我们所能要求它的只是在处理读与写上的高效。在文章的第三部分你会轻而易举的发现IOCP设计的真正用意。

IOCP和网络又有什么关系?

int main()

{

WSAStartup(MAKEWORD(2, 2), &wsaData);

ListeningSocket = socket(AF_INET, SOCK_STREAM, 0);

bind(ListeningSocket, (SOCKADDR*)&ServerAddr, sizeof(ServerAddr));

listen(ListeningSocket, 5);

int nlistenAddrLen = sizeof(ClientAddr);

while(TRUE)

{

NewConnection = accept(ListeningSocket, (SOCKADDR*)&ClientAddr, &nlistenAddrLen);

HANDLE hThread = CreateThread(NULL, 0, ThreadFunc, (void*) NewConnection, 0, &dwTreadId);

 CloseHandle(hThread);

}

return 0;

}

相信只要写过网络的朋友,应该对这样的结构在熟悉不过了。accept后线程被挂起,等待一个客户发出请求,而后创建新线程来处理请求。当新线程处理客户请求时,起初的线程循环回去等待另一个客户请求。处理客户请求的线程处理完毕后终结。

在上述的并发模型中,对每个客户请求都创建了一个线程。其优点在于等待请求的线程只需做很少的工作。大多数时间中,该线程在休眠[因为recv处于堵塞状态]。

但是当并发模型应用在服务器端[基于Windows NT],Windows NT小组注意到这些应用程序的性能没有预料的那么高。特别的,处理很多同时的客户请求意味着很多线程并发地运行在系统中。因为所有这些线程都是可运行的[没有被挂起和等待发生什么事],Microsoft意识到NT内核花费了太多的时间来转换运行线程的上下文[Context],线程就没有得到很多CPU时间来做它们的工作。

大家可能也都感觉到并行模型的瓶颈在于它为每一个客户请求都创建了一个新线程。创建线程比起创建进程开销要小,但也远不是没有开销的。

我们不妨设想一下:如果事先开好N个线程,让它们在那hold[堵塞],然后可以将所有用户的请求都投递到一个消息队列中去。然后那N个线程逐一从消息队列中去取出消息并加以处理。就可以避免针对每一个用户请求都开线程。不仅减少了线程的资源,也提高了线程的利用率。理论上很不错,你想我等泛泛之辈都能想出来的问题,Microsoft又怎会没有考虑到呢?!

这个问题的解决方法就是一个称为I/O完成端口的内核对象,他首次在Windows NT3.5中被引入。

其实我们上面的构想应该就差不多是IOCP的设计机理。其实说穿了IOCP不就是一个消息队列嘛!你说这和[端口]这两字有何联系。我的理解就是IOCP最多是应用程序和操作系统沟通的一个接口罢了。

至于IOCP的具体设计那我也很难说得上来,毕竟我没看过实现的代码,但你完全可以进行模拟,只不过性能可能…,如果想深入理解IOCP, Jeffrey Ritchter的Advanced Windows 3rd其中第13章和第14张有很多宝贵的内容,你可以拿来窥视一下系统是如何完成这一切的。

实现方法

Microsoft为IOCP提供了相应的API函数,主要的就两个,我们逐一的来看一下:

HANDLE CreateIoCompletionPort (

 HANDLE FileHandle,                // handle to file

 HANDLE ExistingCompletionPort,     // handle to I/O completion port

 ULONG_PTR CompletionKey,        // completion key

 DWORD NumberOfConcurrentThreads // number of threads to execute concurrently

);

在讨论各参数之前,首先要注意该函数实际用于两个截然不同的目的:

 1.用于创建一个完成端口对象

 2.将一个句柄[HANDLE]和完成端口关联到一起

在创建一个完成一个端口的时候,我们只需要填写一下NumberOfConcurrentThreads这个参数就可以了。它告诉系统一个完成端口上同时允许运行的线程最大数。在默认情况下,所开线程数和CPU数量相同,但经验给我们一个公式:

 线程数 = CPU数 * 2 + 2

要使完成端口有用,你必须把它同一个或多个设备相关联。这也是调用CreateIoCompletionPort完成的。你要向该函数传递一个已有的完成端口的句柄,我们既然要处理网络事件,那也就是将客户的socket作为HANDLE传进去。和一个完成键[对你有意义的一个32位值,也就是一个指针,操作系统并不关心你传什么]。每当你向端口关联一个设备时,系统向该完成端口的设备列表中加入一条信息纪录。

另一个API就是

BOOL GetQueuedCompletionStatus(

 HANDLE CompletionPort,       // handle to completion port

 LPDWORD lpNumberOfBytes,     // bytes transferred

 PULONG_PTR lpCompletionKey,  // file completion key

 LPOVERLAPPED *lpOverlapped,  // buffer

 DWORD dwMilliseconds         // optional timeout value

);

第一个参数指出了线程要监视哪一个完成端口。很多服务应用程序只是使用一个I/O完成端口,所有的I/O请求完成以后的通知都将发给该端口。简单的说,GetQueuedCompletionStatus使调用线程挂起,直到指定的端口的I/O完成队列中出现了一项或直到超时。同I/O完成端口相关联的第3个数据结构是使线程得到完成I/O项中的信息:传输的字节数,完成键和OVERLAPPED结构的地址。该信息是通过传递给GetQueuedCompletionSatatus的lpdwNumberOfBytesTransferred,lpdwCompletionKey和lpOverlapped参数返回给线程的。

根据到目前为止已经讲到的东西,首先来构建一个frame。下面为您说明了如何使用完成端口来开发一个echo服务器。大致如下:

1.初始化Winsock

2.创建一个完成端口

3.根据服务器线程数创建一定量的线程数

4.准备好一个socket进行bind然后listen

5.进入循环accept等待客户请求

6.创建一个数据结构容纳socket和其他相关信息

7.将连进来的socket同完成端口相关联

8.投递一个准备接受的请求

以后就不断的重复5至8的过程

那好,我们用具体的代码来展示一下细节的操作。

WOW,程序的代码若是贴在此处,实在大煞风景,又不能CTRL+V还不能F7,如果大家需要源代码可以发信给我o_nono@163.net

至此文章也该告一段落了,我带着您做了一趟旋风般的旅游,游览了所谓的完成端口。

很多细节由于篇幅的关系无法细细道来。但希望这篇文章能带给您更多的思考。如有任何问题,可以发信至o_nono@163.net。

也可至此发表讨论 http://expert.csdn.net/Expert/topic/2659/2659726.xml?temp=.3871271


【【【【上面的是windows网络编程英文原版,中文是我收集的。】】】】

"完成端口"模型是迄今为止最为复杂的-种I/O模型。然而。假若-个应用程序同时需要管理为数众多的套接字,那么采用这种模型。往往可以达到最佳的系统性能,然而不幸的是,该模型只适用于以下操作系统(微软的):Windows NT和Windows 2000操作系统。因其设计的复杂性,只有在你的应用程序需要同时管理数百乃至上千个套接字的时候、而且希望随着系统内安装的CPU数量的增多、应用程序的性能也可以线性提升,才应考虑采用"完成端口"模型。要记住的一个基本准则是,假如要为Windows NT或windows 2000开发高性能的服务器应用,同时希望为大量套接字I/O请求提供服务(Web服务器便是这方面的典型例子),那么I/O完成端口模型便是最佳选择.

   从本质上说,完成端口模型要求我们创建一个Win32完成端口对象,通过指定数量的线程对重叠I/O请求进行管理。以便为已经完成的重叠I/O请求提供服务。要注意的是。所谓"完成端口",实际是Win32、Windows NT以及windows 2000采用的一种I/O构造机制,除套接字句柄之外,实际上还可接受其他东西。然而,本节只打算讲述如何使用套接字句柄,来发挥完成端口模型的巨大威力。使用这种模型之前,首先要创建一个I/O完成端口对象,用它面向任意数量的套接字句柄。管理多个I/O请求。要做到这-点,需要调用CreateIoCompletionPort函数。该函数定义如下:

HANDLE CreateIoCompletionPort(

   HANDLE FileHandle,

   HANDLE ExistingCompletionPort,

   DWORD CompletionKey,

   DWORD  NumberOfConcurrentThreads

);

在我们深入探讨其中的各个参数之前,首先要注意意该函数实际用于两个明显有别的目的:

   ■用于创建-个完成端口对象。

   ■将一个句柄同完成端口关联到一起。

   最开始创建-个完成端口的时候,唯一感兴趣的参数便是NumberOfConcurrentThreads 并发线程的数量);前面三个参数都会被忽略。NumberOfConcurrentThreads 参数的特殊之处在于.它定义了在一个完成端口上,同时允许执行的线程数量。理想情况下我们希望每个处理器各自负责-个线程的运行,为完成端口提供服务,避免过于频繁的线程"场景"切换。若将该参数设为0,说明系统内安装了多少个处理器,便允许同时运行多少个线程!可用下述代码创建一个I/O完成端口:

   CompletionPort  =  CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,0, 0);

   该语加的作用是返问一个句柄.在为完成端口分配了-个套接字句柄后,用来对那个端口进行标定(引用)。

   1.工作者线程与完成端口

   成功创建一个完成端口后,便可开始将套接字句柄与对象关联到一起。但在关联套接字之前、首先必须创建-个或多个"工作者线程",以便在I/O请求投递给完成端口对象后。为完成端口提供服务。在这个时候,大家或许会觉得奇怪、到底应创建多少个线程。以便为完成端口提供服务呢?这实际正是完成端口模型显得颇为"复杂"的-个方面, 因为服务I/O请求所需的数量取决于应用程序的总体设计情况。在此要记住的-个重点在于,在我们调用CreateIoComletionPort时指定的并发线程数量,与打算创建的工作者线程数量相比,它们代表的并非同-件事情。早些时候,我们曾建议大家用CreateIoCompletionPort函数为每个处理器都指定一个线程(处理器的数量有多少,便指定多少线程)以避免由于频繁的线程"场景"交换活动,从而影响系统的整体性能。

   CreateIoCompletionPort函数的NumberofConcurrentThreads参数明确指示系统:  在一个完成端口上,一次只允许n个工作者线程运行。假如在完成端门上创建的工作者线程数量超出n个.那么在同一时刻,最多只允许n个线程运行。但实际上,在-段较短的时间内,系统有可能超过这个值。但很快便会把它减少至事先在CreateIoCompletionPort函数中设定的值。那么,为何实际创建的工作者线程数最有时要比CreateIoCompletionPort函数设定的多-些呢?这样做有必要吗?如先前所述。这主要取决于应用程序的总体设计情况,假设我们的工作者线程调用了一个函数,比如Sleep()或者WaitForSingleobject(),但却进入了暂停(锁定或挂起)状态、那么允许另-个线程代替它的位置。换行之,我们希望随时都能执行尽可能多的线程;当然,最大的线程数量是事先在CreateIoCompletonPort调用里设定好的。这样-来。假如事先预料到自己的线程有可能暂时处于停顿状态,那么最好能够创建比CreateIoCompletionPort的NumberofConcurrentThreads参数的值多的线程.以便到时候充分发挥系统的潜力。-旦在完成端口上拥有足够多的工作者线程来为I/O请求提供服务,便可着手将套接字句柄同完成端口关联到一起。这要求我们在-个现有的完成端口上调用CreateIoCompletionPort函数,同时为前三个参数: FileHandle,ExistingCompletionPort和CompletionKey--提供套接字的信息。其中,FileHandle参数指定-个要同完成端口关联在-一起的套接字句柄。

   ExistingCompletionPort参数指定的是一个现有的完成端口。CompletionKey(完成键)参数则指定要与某个特定套接字句柄关联在-起的"单句柄数据",在这个参数中,应用程序可保存与-个套接字对应的任意类型的信息。之所以把它叫作"单句柄数据",是由于它只对应着与那个套接字句柄关联在-起的数据。可将其作为指向一个数据结构的指针、来保存套接字句柄;在那个结构中,同时包含了套接字的句柄,以及与那个套接字有关的其他信息。就象本章稍后还会讲述的那样,为完成端口提供服务的线程例程可通过这个参数。取得与其套字句柄有关的信息。

   根据我们到目前为止学到的东西。首先来构建-个基本的应用程序框架。程序清单8-9向人家阐述了如何使用完成端口模型。来开发-个回应(或"反射')服务器应用。在这个程序中。我们基本上按下述步骤行事:

   1)创建一个完成端口。第四个参数保持为0,指定在完成端口上,每个处理器一次只允许执行一个工作者线程。

   2)判断系统内到底安装了多少个处理器。

   3)创建工作者线程,根据步骤2)得到的处理器信息,在完成端口上,为已完成的I/O请求提供服务。在这个简单的例子中,我们为每个处理器都只创建-个工作者线程。这是出于事先已经预计到,到时候不会有任何线程进入"挂起"状态,造成由于线程数量的不足,而使处理器空闲的局面(没有足够的线程可供执行)。调用CreateThread函数时,必须同时提供-个工作者线程,由线程在创建好执行。本节稍后还会详细讨论线程的职责。

   4)准备好-个监听套接字。在端口5150上监听进入的连接请求。

   5)使用accept函数,接受进入的连接请求。

   6)创建-个数据结构,用于容纳"单句柄数据"。  同时在结构中存入接受的套接字句柄。

   7)调用CreateIoCompletionPort将自accept返回的新套接字句柄向完成端口关联到  一起,通过完成键(CompletionKey)参数,将但句柄数据结构传递给CreateIoCompletionPort。

   8)开始在已接受的连接上进行I/O操作。在此,我们希望通过重叠I/O机制,在新建的套接字上投递一个或多个异步WSARecv或WSASend请求。这些I/O请求完成后,一个工作者线程会为I/O请求提供服务,同时继续处理未来的I/O请求,稍后便会在步骤3)指定的工作者例程中。体验到这一点。

   9)重复步骤5)-8)。直到服务器终止。

程序清单8。9  完成端口的建立

#include <winsock2.h>

#include <windows.h>

#include <stdio.h>

#define PORT 5150

#define DATA_BUFSIZE 8192

typedef struct

{

  OVERLAPPED Overlapped;

  WSABUF DataBuf;

  CHAR Buffer[DATA_BUFSIZE];

  DWORD BytesSEND;

  DWORD BytesRECV;

} PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;

typedef struct  

{

  SOCKET Socket;

} PER_HANDLE_DATA, * LPPER_HANDLE_DATA;

DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID);

void main(void)

{

  SOCKADDR_IN InternetAddr;

  SOCKET Listen;

  SOCKET Accept;

  HANDLE CompletionPort;

  SYSTEM_INFO SystemInfo;

  LPPER_HANDLE_DATA PerHandleData;

  LPPER_IO_OPERATION_DATA PerIoData;

  int i;

  DWORD RecvBytes;

  DWORD Flags;

  DWORD ThreadID;

  WSADATA wsaData;

  DWORD Ret;

  if ((Ret = WSAStartup(0x0202, &wsaData)) != 0)

  {

     printf("WSAStartup failed with error %d\n", Ret);

     return;

  }

  // Setup an I/O completion port.

  if ((CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)) == NULL)

  {

     printf( "CreateIoCompletionPort failed with error: %d\n", GetLastError());

     return;

  }

  // Determine how many processors are on the system.

  GetSystemInfo(&SystemInfo);

  // Create worker threads based on the number of processors available on the

  // system. Create two worker threads for each processor.

  for(i = 0; i < SystemInfo.dwNumberOfProcessors * 2; i++)

  {

     HANDLE ThreadHandle;

     // Create a server worker thread and pass the completion port to the thread.

     if ((ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread, CompletionPort,

        0, &ThreadID)) == NULL)

     {

        printf("CreateThread() failed with error %d\n", GetLastError());

        return;

     }

     // Close the thread handle

     CloseHandle(ThreadHandle);

  }

  // Create a listening socket

  if ((Listen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0,

     WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET)

  {

     printf("WSASocket() failed with error %d\n", WSAGetLastError());

     return;

  }  

  InternetAddr.sin_family = AF_INET;

  InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);

  InternetAddr.sin_port = htons(PORT);

  if (bind(Listen, (PSOCKADDR) &InternetAddr, sizeof(InternetAddr)) == SOCKET_ERROR)

  {

     printf("bind() failed with error %d\n", WSAGetLastError());

     return;

  }

  // Prepare socket for listening

  if (listen(Listen, 5) == SOCKET_ERROR)

  {

     printf("listen() failed with error %d\n", WSAGetLastError());

     return;

  }

  // Accept connections and assign to the completion port.

  while(TRUE)

  {

     if ((Accept = WSAAccept(Listen, NULL, NULL, NULL, 0)) == SOCKET_ERROR)

     {

        printf("WSAAccept() failed with error %d\n", WSAGetLastError());

        return;

     }

     // Create a socket information structure to associate with the socket

     if ((PerHandleData = (LPPER_HANDLE_DATA) GlobalAlloc(GPTR,  

        sizeof(PER_HANDLE_DATA))) == NULL)

     {

        printf("GlobalAlloc() failed with error %d\n", GetLastError());

        return;

     }

     // Associate the accepted socket with the original completion port.

     printf("Socket number %d connected\n", Accept);

     PerHandleData->Socket = Accept;

     if (CreateIoCompletionPort((HANDLE) Accept, CompletionPort, (DWORD) PerHandleData,

        0) == NULL)

     {

        printf("CreateIoCompletionPort failed with error %d\n", GetLastError());

        return;

     }

     // Create per I/O socket information structure to associate with the  

     // WSARecv call below.

     if ((PerIoData = (LPPER_IO_OPERATION_DATA) GlobalAlloc(GPTR,          sizeof(PER_IO_OPERATION_DATA))) == NULL)

     {

        printf("GlobalAlloc() failed with error %d\n", GetLastError());

        return;

     }

     ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));

     PerIoData->BytesSEND = 0;

     PerIoData->BytesRECV = 0;

     PerIoData->DataBuf.len = DATA_BUFSIZE;

     PerIoData->DataBuf.buf = PerIoData->Buffer;

     Flags = 0;

     if (WSARecv(Accept, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags,

        &(PerIoData->Overlapped), NULL) == SOCKET_ERROR)

     {

        if (WSAGetLastError() != ERROR_IO_PENDING)

        {

           printf("WSARecv() failed with error %d\n", WSAGetLastError());

           return;

        }

     }

  }

}

DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID)

{

  HANDLE CompletionPort = (HANDLE) CompletionPortID;

  DWORD BytesTransferred;

  LPOVERLAPPED Overlapped;

  LPPER_HANDLE_DATA PerHandleData;

  LPPER_IO_OPERATION_DATA PerIoData;

  DWORD SendBytes, RecvBytes;

  DWORD Flags;

   

  while(TRUE)

  {

       

     if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred,

        (LPDWORD)&PerHandleData, (LPOVERLAPPED *) &PerIoData, INFINITE) == 0)

     {

        printf("GetQueuedCompletionStatus failed with error %d\n", GetLastError());

        return 0;

     }

     // First check to see if an error has occured on the socket and if so

     // then close the socket and cleanup the SOCKET_INFORMATION structure

     // associated with the socket.

     if (BytesTransferred == 0)

     {

        printf("Closing socket %d\n", PerHandleData->Socket);

        if (closesocket(PerHandleData->Socket) == SOCKET_ERROR)

        {

           printf("closesocket() failed with error %d\n", WSAGetLastError());

           return 0;

        }

        GlobalFree(PerHandleData);

        GlobalFree(PerIoData);

        continue;

     }

     // Check to see if the BytesRECV field equals zero. If this is so, then

     // this means a WSARecv call just completed so update the BytesRECV field

     // with the BytesTransferred value from the completed WSARecv() call.

     if (PerIoData->BytesRECV == 0)

     {

        PerIoData->BytesRECV = BytesTransferred;

        PerIoData->BytesSEND = 0;

     }

     else

     {

        PerIoData->BytesSEND += BytesTransferred;

     }

     if (PerIoData->BytesRECV > PerIoData->BytesSEND)

     {

        // Post another WSASend() request.

        // Since WSASend() is not gauranteed to send all of the bytes requested,

        // continue posting WSASend() calls until all received bytes are sent.

        ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));

        PerIoData->DataBuf.buf = PerIoData->Buffer + PerIoData->BytesSEND;

        PerIoData->DataBuf.len = PerIoData->BytesRECV - PerIoData->BytesSEND;

        if (WSASend(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &SendBytes, 0,

           &(PerIoData->Overlapped), NULL) == SOCKET_ERROR)

        {

           if (WSAGetLastError() != ERROR_IO_PENDING)

           {

              printf("WSASend() failed with error %d\n", WSAGetLastError());

              return 0;

           }

        }

     }

     else

     {

        PerIoData->BytesRECV = 0;

        // Now that there are no more bytes to send post another WSARecv() request.

        Flags = 0;

        ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED));

        PerIoData->DataBuf.len = DATA_BUFSIZE;

        PerIoData->DataBuf.buf = PerIoData->Buffer;

        if (WSARecv(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags,

           &(PerIoData->Overlapped), NULL) == SOCKET_ERROR)

        {

           if (WSAGetLastError() != ERROR_IO_PENDING)

           {

              printf("WSARecv() failed with error %d\n", WSAGetLastError());

              return 0;

           }

        }

     }

  }

}

StartWinsock()

//步骤一,创建一个完成端口

CompletionPort=CreateIoCompletionPort(INVALI_HANDLE_VALUE,NULL,0,0);

//步骤二判断有多少个处理器

GetSystemInfo(&SystemInfo);

//步骤三:根据处理器的数量创建工作线程,本例当中,工作线程的数目和处理器数目是相同的

for(i=0; i < SystemInfo.dwNumberOfProcessers,i++)

{

HANDLE ThreadHandle;

//创建工作者线程,并把完成端口作为参数传给线程

ThreadHandle=CreateThread(NULL,0,ServerWorkerThread, CompletionPort, 0, &ThreadID);

//关闭线程句柄(仅仅关闭句柄,并非关闭线程本身)

CloseHandle(ThreadHandle);

}

//步骤四:创建监听套接字

Listen=WSASocket(AF_INET,S0CK_STREAM,0,NULL,WSA_FLAG_OVERLAPPED);

InternetAddr.sin_famlly=AF_INET;

InternetAddr.sin_addr.s_addr =  htonl(INADDR_ANY);

InternetAddr.sln_port  =  htons(5150);

bind(Listen,(PSOCKADDR)&InternetAddr,sizeof(InternetAddr));

//准备监听套接字

listen(Listen,5);

while(TRUE)

{

//步骤五,接入Socket,并和完成端口关联

Accept = WSAAccept(Listen,NULL,NULL,NULL,0);

//步骤六 创建一个perhandle结构,并和端口关联

PerHandleData=(LPPER_HANDLE_DATA)GlobalAlloc(GPTR,sizeof(PER_HANDLE_DATA));

printf("Socket  number  %d  connected\n",Accept);

PerHandleData->Socket=Accept;

//步骤七,接入套接字和完成端口关联

CreateIoCompletionPort((HANDLE)Accept,CompletionPort,(DWORD)PerHandleData,0);

//步骤八

//开始进行I/O操作,用重叠I/O发送一些WSASend()和WSARecv()

WSARecv(...);

}

 2.完成端口和重叠I/O

   将套接字句柄与一个完成端口关联在一起后,便可以套接字句柄为基础。投递发送或接

收请求。开始I/O请求的处理。接下来,可开始依赖完成端口,来接收有关I/O操作完成情况的通知。从本质上说、完成瑞口模型利用了Win32重叠I/O机制。在这种机制中。象WSASend和WSARecv这样的Winsock API调用会立即返回。此时, 需要由我们的应用程序负责在以后的某个时间。通过一个OVERLAPPED结构,来接收调用的结果。在完成端口模型中。要想做到这一点,需要使用GetQueuedCompletionStatus(获取排队完成状态)函数。让一个或者多个工作者线程在完成端口上等待。该函数的定义如下:

B00L GetQueuedCompletionStatus(

   HANDLE CompletionPort,

   LPDWORD  lpNumber0fBytesTransferred,

   LPDWORD  lpCompletionKey,

   LPOVERLAPPED  *lpOverlapped,

   DWORD dwMilliseconds

};

其中,CompletionPort参数对应与要在上面等待的完成端口.lpNumberOfBytesTransferred参数负责在完成了-次I/O操作后(如WSASend或WSARecv)、接收实际传输的字节数。lpCompletionKey参数为原先传递进入CreateCompletionPort函数的套接字返回"单句柄数据"。如我们早先所述,大家最好将套接字句柄保存在这个"键"(Key)中。lpOverlapped参数用于接收完成的I/O操作的重叠结果。这实际是一个相当重要的参数,因为要用它获取每个I/O操作的数据。而最后一个参数。DwMilliseconds用于指定调用者希望等待一个完成数据包在完成端口上出现的时间。假如将其设为INFINITE。调用会无休止地等持下去。

   3.单句柄数据和单I/O操作数据

   -个工作者线程从GetQueuedCompletionStatus这个API调用接收到I/O完成通知后。在lpCompletionKey和lpOverlapped参数中,会包含-些必要的套接字信息。利用这些信息,可通过完成端口,继续在一个套接字上的I/O处理,通过这些参数。可获得两方面重要的套接字数据: 单句柄数据,以及单I/O操作数据。

   其中,lpCompletionKey参数包含了"单句柄数据",因为在-个套接字首次与完成端口关联到-起的时候。那些数据便与一个特定的套接字句柄对应起来了。这些数据正是我们在进行CreateIoCompletionPort  API调用的时候,通过CompletionKey参数传递的。  如早先所述。应用程序可通过该参数传递任意类型的数据。通常情况下,应用程序会将与I/O请求有关的套接字句柄保存在这里。

   lpOVerlapped参数则包含了-个OVERLAPPED结构,在它后边跟随"单I/O操作数据"。

我们的工作者线程处理-个完成数据包时(将数据原封不动打转回去,接受连接,投递另-个线程,等等). 这些信息是它必须要知道的. 单I/O操作数据可以是追加到一个OVERLAPPED结构末尾的任意数量的字节。假如一个函数要求用到一个OVERLAPPED结构,我们便必须将这样的-个结构传递进去,以满足它的耍求。要想做到这一点,一个简单的方法是定义-个结构。然后将OVERLAPPED结构作为新结构的第一个元素使用。举个例子来说。  可定义上述数据结构,实现对单I/O操作数据的管理:

typedef  struct  {

  OVERLAPPED Overlapped;

  WSABUF    DataBuf;

  CHAR      Bufferl[DATA-BUFSIZE];

  BOOL      OperationType;

}PER_IO_OPERATION_DATA;

该结构演示了通常要与I/O操作关联在-起的某些重要数据元素,比如刚才完成的那个I/O操作的类型(发送或接收请求).在这个结构中。我们认为用于已完成I/O操作的数据缓冲区是非常有用的。要想调用-个Winsock API函数,同时为其分配一个OVERLAPPED结构,既可将自己的结构"造型"为一个OVERLAPPED指针,亦可简单地撤消对结构中的OVBRLAPPED元素的引用。如下例所示:

PER_IO_OPERATION_DATA PerIoData;

可以象下边这样调用一个函数

 WSARecv(socket,…,(OVERLAPPED *)&PerIoData;

或者象下边这样

 WSARecv(socket,…,&( PerIoData.Overlapped));

   在工作线程的后面部分。等GetQueuedCompletionStatus函数返回了-个重叠结构(和完成键)后。便可通过撤消对OperationType成员的引用。调查到底是哪个操作投递到了这个句柄之上(只需将返回的重叠结构造型为自的PER_IO_OPERATlON_DATA结构)。对单I/O操作数据来说,它最大的-个优点便是允许我们在同一个句柄上。同时管理多个I/O操作(读/写、多个读、多个写,等等)。大家此时或许会产生这样的疑问:在同-个套接字上,真的有必要同时投递多个I/O操作吗?答案在于系统的"伸缩性",或者说"扩展能力"。例如,假定我们的机器安装了多个中央处理器。每个处理器都在远行一个工作者线程,那么在同一个时候、完全可能有几个不同的处理器在同一个套接字上,进行数据的收发操作。

   为了完成前述的简单回应服务器示例,我们需要提供一个ServerWorkerThread(服务器工作者线程)函数。在程序消单8.10中,我们展示了如何设计一个工作者线程例程,令其使用单句柄数据以及单I/O操作数据,为I/O请求提供服务。

   程序清单8-10  完成端口工作者线程

DWORD  WINAPI  ServerWokerThread(LPVOID CompletionPortID)

{

HANDLE  CompletionPort=(HANDLE)ComleTionPortID;

DWORD  BytesTransferred;

LPOVERLAPPED Overlapped;

LPPER_HANDLE_DATA PerHandleData;

LPPER_IO_OPERATION_DATA PerIoData;

DWORD  SendBytes,RecvBytes;

DWORD Flages;

while(TRUE)

{

 //wait for I/O to Complete on any socket

 // associated with the completionport

 GetQueuedCompletionStatus(CompletionPort,

  &BytesTransferred,

  (LPDWORD)&PerHandleData,

  (LPOVERLAPPED *)&PerIoData,INFINITE);

 //first check o see whether an error  has occurr

 // on the socket ,if so ,close the socke and clearup the

 //per-handle and Per-I/O operation data associated with

 //socket

 if (BytesTransferred == 0)&&

  (PerIoData->OperationType == RECV_POSTED)&&

  (PerIoData->OperationType == SEND_POSTED)

 {

  //A Zero BytesTransferred indicates that the

  //socke has been closed by the peer,so you should

  //close the socket

  //Note: Per-handle Data was used to refresence the

  // socket associated with the I/O operation;

  closesocket(PerHandleData->Socket);

  GlobalFree(PerHandleData);

  GlobalFree(PerIoData);

  continue;

 }

  //service the completed I/O request;You

  //detemine which I/O request has just completed

  //by looking as the operationType field contained

  // the per-I/O operation data

 if (PerIoData->OperationType == RECV_POSTED)

 {

  //do someting with the received data

  //in PerIoData->Buffer

 }

 //Post another WSASend or WSARecv operation

 //as a example we will post another WSARecv()

 Flags = 0;

 //Set up the Per-I/O Operation Data for a next

 //overlapped call

 ZeroMemory(&(PerIoData->Overlapped),sizeof(OVERLAPPED));

 PerIoData->DataBuf.len = DATA_BUFFER_LEN;

 PerIoData->DataBuf.buf = PerIoData->Buffer;

 PerIoData->OperationType = RECV_POSTED;

 WSARecv(PerHandleData->Socket,

  &(PerIoData->DataBuf),1,&RecvBytes,

  &Flags,&(PerIoData->Overlapped),NULL);

}

}

在程序清单8-9和程序清单8-10列出的简单服务器示例中(配套光盘也有),最后要注意的处理细节是如何正确地关闭I/O完成端口一-特别是同时运行了一个或多个线程,在几个不同的套接字上执行I/O操作的时候。要避免的一个重要问题是在进行重叠I/O操作的同时,强行释放-个OVERLAPPED结构。要想避免出现这种情况,最好的办法是针对每个套接字句柄,调用closesocket函数。任何尚未进行的重叠I/O操作都会完成。-旦所有套接字句柄都已关闭。便需在完成端口上,终止所有工作者线程的运行。要想做到这一点,需要使用

PostQueuedCompletionStatus函数,向每个工作者线程都发送-个特殊的完成数据包。该函数会指示每个线程都"立即结束并退出".下面是PostQueuedCompletionStatus函数的定义:

BOOL PostQueuedCompletionStatus(

   HANDLE CompletlonPort,

   DW0RD  dwNumberOfBytesTrlansferred,

   DWORD  dwCompletlonKey,

LPOVERLAPPED lpoverlapped,

);

   其中,CompletionPort参数指定想向其发送一个完成数据包的完成端口对象。而就dwNumberOfBytesTransferred,dwCompletionKey和lpOverlapped这三个参数来说.每-个都允许我们指定-个值,直接传递给GetQueuedCompletionStatus函数中对应的参数。这样-来。-个工作者线程收到传递过来的三个GetQueuedCompletionStatus函数参数后,便可根据由这三个参数的某一个设置的特殊值,决定何时应该退出。例如,可用dwCompletionPort参数传递0值,而-个工作者线程会将其解释成中止指令。一旦所有工作者线程都已关闭,便可使用CloseHandle函数,关闭完成端口。最终安全退出程序。

   4.其他问题

另外还有几种颇有价值的技术。可用来进-步改善套接字应用程序的总体I/O性能。值得考虑的一项技术是试验不同的套接字缓冲区大小,以改善I/O性能和应用程序的扩展能力。例如,假如某个程序只采用了-个比较大的缓冲区,仅能支持-个wSARecv请求,而不是同时设置了三个较小的缓冲区。提供对三个WSARecv请求的支持,那么该程序的扩展能力并不是很好,特别是在转移到安装了多个处理器的机器上之后。这是由于单独一个缓冲区每次只能处理一个线程!除此以外,单缓冲区设计还会对性能造成一定的干扰,假如-次仅能进行-次接收操作。网络协议驱动程序的潜力使不能得到充分发挥(它经常都会很"闲")。换言之,假如在接收更多的数据前、需要等待-次WSARecv操作的完成,那么在WSARecv完成和下一次接收之间,整个协议实际上处于"休息"状态。

   另-个值得考虑的性能改进措施是用套接宇选项SO_SNDBUF和SO_RCVBUF对内部套接字缓冲区的大小进行控制。利用这些选项,应用程序可更改-个套接字的内部数据缓冲区的大小。如将该设为0,Winsock便会在重叠I/O调用中直接使用应用程序的缓冲区、进行数据在协议堆栈里的传人,传出。这样一来,在应用程序与Winsock之间,便避免了进行-次缓冲区复制的必要。下述代码片断阐释了如何使用SO_SNDBUF选项,来进行setsockopt函数的调用:

   setsockopt(socket,SOL_S0CKET,SO_SNDBUF,(char *)&nZero,sizeof(nZero));

   要注意的是,将这些缓冲区的大小设为0后,只有在一段给定的时间内,存在着多个I/O请求的前提下才会产生积极作用。等到第9章,我们会向大家更深入地讲述套接字选项的知识。提升性能的最后一项措施是使用AcceptEx这个API调用。

Trackback: http://tb.blog.csdn.net/TrackBack.aspx?PostId=32367