首页  编辑  

提高分布式多层系统的效率之二(Pooler)

Tags: /超级猛料/Friends.网友专栏/ihihonline(小小)/   Date Created:

提高分布式多层系统的效率之二(Pooler)

原创 (小小->爱被下载中……)

提高分布式多层系统的效率是一个永远不变的主题,有很多制约分布式系统的效率,如:RDBMS、数据库引擎、代码效率、各种Pooling、开发设计等……

在此,我就Pooler说一说,希望对大家有用;

有那些Pooler?Pooler是什么?那儿可以用Pooler?Pooler有什么用?怎么用Pooler?当一系例的Pooler问题摆在你的面前时,你如何下手?从那儿下手?但这些又是我们不得不面对的待解决的问题;唯一的办法只能是理解、解决它些问题;

Pooler是池子的意思,可以这样理解,无论是那种Pooler,它都为用户提供了一个相当于容器的作用的连接,比如一个用户可能需要连接远程数据库中特定的表,当他花费了一定时间进行连接、使用完毕之后,他是要释放这个连接呢?还是将这个连接放在某一个容器里边呢?不可否认,在多层中,连接操作是一个费时操作,特别是对于大型数据库,要进行各种检查等操作,那么,是不是每一个用户都需要这个步骤呢?这儿就提到了Pooler这个概念;当用户A将操作都执行完毕之后,他如果释放了他的连接,那么这个连接就只是被用户A一个人用过,其它用户没有机会再次用这个连接,还要继续进行连接操作,继续的浪费时间;而如果用户A能将这个连接保存下来,供以后可以用的着这个连接的用户使用的话,岂不是很好吗?此是Pooler就充当了这个角色,所以,我们可以将Pooler在这儿理解成存放连接的池子或容器;

Pooler又可以分为那些?分类所然没有被完全的规格化,但是可以将它罗列出来对我们有用的一些,比如: Session Pooling,Object Pooling,Connection Pooling(前边我们说过),ResourcePooling,DataPooling等,每种不同的Pooling有不同的作用,但它们实现的功能大至相同,它们可以提供方便的连接以节省用户时间,而且可以用我们曾说过的COM or DCOM的TimeOut来判断用户是否超时等;

只要有三层的地方,都应该用Pooling,甚至在C/S里也提倡用Pooling,Connection Pooling就是最好的例子;当然,需要合理的应用Pooling,不然会造成接口混乱等现象;

Pooler 有什么作用?在说什么是Pooler时,我们已经可以感觉的出来它的作用了;那么如何用Pooler呢?提到如何用Pooler时,我想大家如果有时间应该看一看我关边的关于COM DCOM方面的介绍,因为用Pooler 或者叫pooling就必须要用到接口,而前边我曾对接口给过很详细的说明,如果你不能理解接口的话,你在这儿也将无存下手;用Pooling提高分布式系统的效率的确是一个直接的办法,特别是当用户增加时;不管是MTS or DCOM or CORBA都需要用到Pooling技术来使效率提高,我将实现原理介绍一下,后边我会对一个例子进行分析以便于我们更好的交流;一般而言,用Pooling都需要用到协调接口类CoClass(前边的贴子,我曾说过这个类);我们可以将它理解为一个类,一个实现接口的类,而Pooling就要用到它!当客户端需要数据时,那以应用程序服务器就会应用CoClass所实现的接口来和应用服务器本身的接口进行交互;而且所有连接、线程控制等操作都由你新建的CoClass所负责,之后才通过应用服务本身的接口进行数据的存取,步骤我理解是这样的:当然用户想建立某一个连接或是某一项费时操作时,而正好它现在没有和应用服务器进行通讯,这样的话,它就希望以最快的速度和应用服务器进行通讯,但步骤往往不是它直接和应用服务器进行通讯,而是它需要和CoClass的实现的接口进行作用,当这个辅助类(CoClass)给这个用户分配了线程、建立了连接等一些资源时,它才可以和应用服务进行通讯,你会不会觉的这是多此一举呢?不,应用这个辅助类可以检测是否有空闲的连接来让这个用户进行连接,是否有可用的其它资源让这个用户进行操作,这样才会正真的节省客户的时间,同时也大大的降底了服务器的担子;这便是它的作用;

而至于如何建立进程、如果分配资源,又如何回收,下边有例子可供参考;

我们来用一个例子来说;(你可以参考Delphi/Demo/Midas/Pooler这个例子 );

//****************************************************************************//

//                         Pool Code(池子代码)                                //

//                                                                            //

// 作    者:   小小                                                          //

// 调试时间:2002-10-31 15:41                                                 //

// 调试环境:Windows 2000 + Delphi6.0;                                        //

// 参考资料:暂无                                                             //

// 交流地点:www.nxit.net/bbs  or  www.nxit.net/bbs                           //

//****************************************************************************//

服务端窗体///只是为了让操作者更明确一些;

unit srvrfrm;

interface

uses

 Windows, Messages, SysUtils, Classes, Graphics, Controls, Forms, Dialogs;

type

 TForm1 = class(TForm)

 private

   { Private declarations }

 public

   { Public declarations }

 end;

var

 Form1: TForm1;

implementation

{$R *.dfm}

end.

这不用解释吧,呵呵;只是例出来更有利用各位的调试

远程数据模块:

//****************************************************************************//

//                         Pool Code(池子代码)                                //

//                                                                            //

// 作    者:   小小                                                          //

// 调试时间:2002-10-31 15:41                                                 //

// 调试环境:Windows 2000 + Delphi6.0;                                        //

// 参考资料:暂无                                                             //

// 交流地点:www.nxit.net/bbs  or  www.nxit.net/bbs                           //

//****************************************************************************//

unit srvrdm;

interface

uses

 Windows, Messages, SysUtils, Classes, Graphics, Controls, Forms, Dialogs,

 ComServ, ComObj, VCLCom, StdVcl, ActiveX, DataBkr, Server_TLB,

 Db, DBTables, Provider, Variants;

type

 TPooledRDM = class(TRemoteDataModule, IPooledRDM)

   Session1: TSession;

   Database1: TDatabase;

   Query1: TQuery;

   DataSetProvider1: TDataSetProvider;

   procedure DataSetProvider1BeforeGetRecords(Sender: TObject;

     var OwnerData: OleVariant);

 protected

   class procedure UpdateRegistry(Register: Boolean; const ClassID, ProgID: string); override;

 private

   { Private declarations }

 public

   { Public declarations }

 end;

var

{ Need a reference to the ClassFactory so the pooler can create instances of the

 class. }

 RDMFactory: TComponentFactory;

/////这儿定义了一个类工厂,如果你对它不明白,请看一看COM DCOM里边有;

////我觉的这个单元文件没有什么可解释的;

implementation

{$R *.dfm}

procedure TPooledRDM.DataSetProvider1BeforeGetRecords(Sender: TObject;

 var OwnerData: OleVariant);

begin

 try

   Query1.Close;

   Query1.SQL.Text := OwnerData[0];

   if not VarIsNull(OwnerData[1]) and not VarIsEmpty(OwnerData[1]) then

   begin

     Query1.Open;

     if not Query1.Locate(Query1.Fields[0].FieldName, OwnerData[1], []) then

       raise Exception.Create('Record not found');

     Query1.Next;

   end;

 finally

   OwnerData := NULL;

 end;

end;

class procedure TPooledRDM.UpdateRegistry(Register: Boolean; const ClassID, ProgID: string);

begin

 if Register then

 begin

   inherited UpdateRegistry(Register, ClassID, ProgID);

   EnableSocketTransport(ClassID);

   EnableWebTransport(ClassID);

 end else

 begin

   DisableSocketTransport(ClassID);

   DisableWebTransport(ClassID);

   inherited UpdateRegistry(Register, ClassID, ProgID);

 end;

end;

initialization

 RDMFactory := TComponentFactory.Create(ComServer, TPooledRDM,

   Class_PooledRDM, ciInternal, tmApartment);///////////希望你可以明白这些代码

end.

//****************************************************************************//

//                         Pool Code(池子代码)                                //

//                                                                            //

// 作    者:   小小                                                          //

// 调试时间:2002-10-31 15:41                                                 //

// 调试环境:Windows 2000 + Delphi6.0;                                        //

// 参考资料:暂无                                                             //

// 交流地点:www.nxit.net/bbs  or  www.nxit.net/bbs                           //

//****************************************************************************//

unit Server_TLB;

// ************************************************************************ //

// 如些之类的代码在回调机制中,我详细的说明过,这儿,我不准备再说;你们

// 可以自己分析                                                                    

************************************************************************ //

{$TYPEDADDRESS OFF} // Unit must be compiled without type-checked pointers.

{$WARN SYMBOL_PLATFORM OFF}

{$WRITEABLECONST ON}

interface

uses ActiveX, Classes, Graphics, Midas, StdVCL, Variants, Windows;

 

const

 // TypeLibrary Major and minor versions

 ServerMajorVersion = 1;

 ServerMinorVersion = 0;

 LIBID_Server: TGUID = '{0CE99800-9F28-11D1-8944-00A0248E5091}';

 IID_IPooledRDM: TGUID = '{0CE99801-9F28-11D1-8944-00A0248E5091}';

 CLASS_PooledRDM: TGUID = '{0CE99802-9F28-11D1-8944-00A0248E5091}';

 CLASS_Pooler: TGUID = '{0CE99804-9F28-11D1-8944-00A0248E5091}';

type

 IPooledRDM = interface;

 IPooledRDMDisp = dispinterface;

 PooledRDM = IPooledRDM;

 Pooler = IPooledRDM;

IPooledRDM = interface(IAppServer)

   ['{0CE99801-9F28-11D1-8944-00A0248E5091}']

 end;

 IPooledRDMDisp = dispinterface

   ['{0CE99801-9F28-11D1-8944-00A0248E5091}']

   function  AS_ApplyUpdates(const ProviderName: WideString; Delta: OleVariant;

                             MaxErrors: Integer; out ErrorCount: Integer; var OwnerData: OleVariant): OleVariant; dispid 20000000;

   function  AS_GetRecords(const ProviderName: WideString; Count: Integer; out RecsOut: Integer;

                           Options: Integer; const CommandText: WideString;

                           var Params: OleVariant; var OwnerData: OleVariant): OleVariant; dispid 20000001;

   function  AS_DataRequest(const ProviderName: WideString; Data: OleVariant): OleVariant; dispid 20000002;

   function  AS_GetProviderNames: OleVariant; dispid 20000003;

   function  AS_GetParams(const ProviderName: WideString; var OwnerData: OleVariant): OleVariant; dispid 20000004;

   function  AS_RowRequest(const ProviderName: WideString; Row: OleVariant; RequestType: Integer;

                           var OwnerData: OleVariant): OleVariant; dispid 20000005;

   procedure AS_Execute(const ProviderName: WideString; const CommandText: WideString;

                        var Params: OleVariant; var OwnerData: OleVariant); dispid 20000006;

 end;

 CoPooledRDM = class

   class function Create: IPooledRDM;

   class function CreateRemote(const MachineName: string): IPooledRDM;

 end;

 CoPooler = class

   class function Create: IPooledRDM;

   class function CreateRemote(const MachineName: string): IPooledRDM;

 end;

implementation

uses ComObj;

class function CoPooledRDM.Create: IPooledRDM;

begin

 Result := CreateComObject(CLASS_PooledRDM) as IPooledRDM;

end;

class function CoPooledRDM.CreateRemote(const MachineName: string): IPooledRDM;

begin

 Result := CreateRemoteComObject(MachineName, CLASS_PooledRDM) as IPooledRDM;

end;

class function CoPooler.Create: IPooledRDM;

begin

 Result := CreateComObject(CLASS_Pooler) as IPooledRDM;

end;

class function CoPooler.CreateRemote(const MachineName: string): IPooledRDM;

begin

 Result := CreateRemoteComObject(MachineName, CLASS_Pooler) as IPooledRDM;

end;

end.

//****************************************************************************//

//                         Pool Code(池子代码)                                //

//                                                                            //

// 作    者:   小小                                                          //

// 调试时间:2002-10-31 15:41                                                 //

// 调试环境:Windows 2000 + Delphi6.0;                                        //

// 参考资料:暂无                                                             //

// 交流地点:www.nxit.net/bbs  or  www.nxit.net/bbs                           //

//****************************************************************************//

unit ServerS_Unit;

interface

uses

   ComObj, ActiveX, ServerS_TLB, Classes, SyncObjs, Windows;/////单元引用;

type

 TPoolS = Class(TAutoObject,IPooler_LX)//接口的实现,前边说COM and DCOM曾说过;

 Private

   Function GetInterface : IPooler_LX; //获得一个接口,进而获得对像;

   Procedure SetInterface(InterfaceValue : IPooler_LX);//释放一个接口,是放回去;

   Function GetLock(Index : Integer) : Boolean;

   Procedure ReleaseLock(Index : Integer;Var InterfaceValue : IPooler_LX);

 Protected

   { IAppServer }

   {**************************************************************************}

   {在Protected里边,所有的声明都是基于IppAerver接口的,如计数加1,获得接口, }

   {                  释放接口操作,而且还引用了协调接口类                    }

   {**************************************************************************}

   function  AS_ApplyUpdates(const ProviderName: WideString; Delta: OleVariant;

                             MaxErrors: Integer; out ErrorCount: Integer; var OwnerData: OleVariant): OleVariant; safecall;

   function  AS_GetRecords(const ProviderName: WideString; Count: Integer; out RecsOut: Integer;

                           Options: Integer; const CommandText: WideString;

                           var Params: OleVariant; var OwnerData: OleVariant): OleVariant; safecall;

   function  AS_DataRequest(const ProviderName: WideString; Data: OleVariant): OleVariant; safecall;

   function  AS_GetProviderNames: OleVariant; safecall;

   function  AS_GetParams(const ProviderName: WideString; var OwnerData: OleVariant): OleVariant; safecall;

   function  AS_RowRequest(const ProviderName: WideString; Row: OleVariant; RequestType: Integer;

                           var OwnerData: OleVariant): OleVariant; safecall;

   procedure AS_Execute(const ProviderName: WideString; const CommandText: WideString;

                        var Params: OleVariant; var OwnerData: OleVariant); safecall;

 end;

 TPoolManager = Class(TObject)///定义一个管理接口对像类,必不可少,不然无法管理

 Private

   FMaxCount : Integer;////最大连接数;

   FTimeOut  : Integer;////连接超时;

   FCriticalSectionS : TCriticalSection;////可以理解为进行协调;

   FHandle : THandle ;

   FRDMList:TList;

   /////////////Function GetLock(Index : Integer) : Boolean;

   ///////////////////Procedure RePlease(Index : Integer ; InterfaceValue : IPooler_LX);

   Function CreateNewInstance : IPooler_LX;

 Public

   Constructor Create;

   Destructor Destroy;Override;

   Function GetInterface : IPooler_LX; //获得一个接口,进而获得对像;

   Procedure SetInterface(InterfaceValue : IPooler_LX);//释放一个接口,是放回去; s

   Property  TimeOut : Integer read FTimeOut;

   Property  MaxCount : Integer read FMaxCOunt;

 end;

 PRDM = ^TRDM;

 TRDM = Record

   InValue : IPooler_LX;

   InUser : Boolean;

 end;

var

 PoolManager : TPoolManager;

   

implementation

uses

 ComServ, SysUtils , SRDMUnit;

{ TPoolManager }

constructor TPoolManager.Create;

begin

 FRDMList := TList.Create;

 FCriticalSectionS := TCriticalSection.Create;

 FTimeOut := 1000;

 FMaxCount := 5;

 FHandle := CreateSemaphore(Nil,FTimeOut,FMaxCount,Nil);

end;

{******************************************************************************}

{   CriticalSection是指一段程序码,同一时间只能有一个Thread执行它,此VCL包装了 }

{       Win32API中关於CriticalSection的4个函数InitializeCriticalSection、      }

{      EnterCriticalSection、LeaveCriticalSection及DeleteCriticalSection)      }

{                                                                              }

{       Win 32 API提供了一组同步对象,如:信号灯(Semaphore)、互斥(Mutex)、   }

{          临界区(CriticalSection)和事件(Event)等,用来解决这个问题          }

{******************************************************************************}

{******************************************************************************}

{   TPoolManager.Create是为远程数据模快创建一个缓冲池,并且进行了规定,禄始化 }

{                                              }

{                                            }

{                                                                              }

{                                            }

{                                                }

{******************************************************************************}

function TPoolManager.CreateNewInstance: IPooler_LX;

var

 PValue : PRDM;

begin

 FCriticalSectionS.Enter;

 Try

   New(PValue);

   PValue.InValue := RDMFactory.CreateComObject(Nil) as IPooler_LX;//类工厂的应用

   PValue.InUser := True;//置为可用;

   FRDMList.Add(PValue);

   Result := PValue.InValue;

 Finally

   FCriticalSectionS.Leave;

 end;

end;

destructor TPoolManager.Destroy;

var

 I : Integer;

begin

 FCriticalSectionS.Free;

 for I := 0 to FRDMList.Count - 1 do

 begin

   PRDM(FRDMList[I]).InValue := Nil;

   //////////PRDM(FRDMList[I]).InUser := False;

   FreeMem(PRDM(FRDMList[I]));  //////释放了PRDM资源;

 end;

 FRDMList.Free;

 CloseHandle(FHandle);

 inherited Destroy;

end;

function TPoolManager.GetInterface: IPooler_LX;  //// 获得连接;

var

 I : Integer;

begin

 Result := False;

 if WaitForSingleObject(FHandle,TimeOut) = WAIT_FAILED then

   Raise Exception.Create('对不起,没有成功获得资源');

 for I := 0 to FRDMList.Count - 1 do

 begin

   if GetLock(I) then

   begin

     Result := PRDM(FRDMList[I]).InValue;///判断是否有可用的资源可以利用,或接口

     Exit;

   end;

 end;

 if FRDMList.Count < MaxCount then

   Resule := CreateNewInstance;///新建资源

 if Result = Nil then

   Raise Exception.Create('对不起,不能获得资源');

end;

{******************************************************************************}

{         WaitForSingleObject()说明:                                          }

{ WaitForSingleObject(ByVal hHandle As Long,                                   }

{                      ByVal dwMilliseconds As Long);                         }

{ hHandle:等待对象的句柄;                                                    }

{ dwMilliseconds:等待时间。                                                   }

{ 该函数可以实现对一个可等待对象的等待操作,获取操作执行权。当等待的对象被释放时}

{ 函数成功返回,同时使等待对象变为有信号状态,或者超时返回。该函数用于等待     }

{ Semaphore信号时,若Semaphore信号不为0,则函数成功返回,同时使Semaphore信号记 }

{ 数减1。我们可以利用它建立一个记数信号,每次当相同的程序加载时使记数器减1,退 }

{ 出的时候使计数器加1,这样就可以限制相同程序的多份拷贝同时运行。              }

{******************************************************************************}

function TPoolManager.GetLock(Index: Integer): Boolean;

begin

 FCriticalSectionS.Enter;///进锁

 Try

   Result := Not PRDM(FRDMList[Index]).InUser;//是否可用,是否可以获得连接;

   if Not Result then

     PRDM(FRDMList[Index]).InUser := True;//置连接为不在可用(对于其它用户而言)

 Finally

   FCriticalSectionS.Leave;

 end;

end;

procedure TPoolManager.RePlease(Index: Integer;

 InterfaceValue: IPooler_LX);

begin

 FCritiCalSectionS.Enter;

 Try

   PRDM(FRDMList[Index]).InUser := False;

   InterfaceValue := Nil;

   ReleaseSemaphore(FHandle,1,Nil);

 Finally

   FCritiCalSectionS.Leave;

 end;

end;

procedure TPoolManager.SetInterface(InterfaceValue: IPooler_LX);

var

 I : Integer;

begin

 for I := 0 to FRDMList.Count do

 begin

   if InterfaceValue = PRDM(FRDM[I]).InValue then

     RePlease(I,InterfaceValue);

   Break;

 end;

end;

{ TPoolS }

function TPoolS.AS_ApplyUpdates(const ProviderName: WideString;

 Delta: OleVariant; MaxErrors: Integer; out ErrorCount: Integer;

 var OwnerData: OleVariant): OleVariant;

begin

 ....//书上有详细的说明,每本多层系统的书上都有;

end;

function TPoolS.AS_DataRequest(const ProviderName: WideString;

 Data: OleVariant): OleVariant;

begin

 ....//书上有详细的说明,每本多层系统的书上都有;

end;

procedure TPoolS.AS_Execute(const ProviderName, CommandText: WideString;

 var Params, OwnerData: OleVariant);

begin

 ....//书上有详细的说明,每本多层系统的书上都有;

end;

function TPoolS.AS_GetParams(const ProviderName: WideString;

 var OwnerData: OleVariant): OleVariant;

begin

 ....//书上有详细的说明,每本多层系统的书上都有;

end;

function TPoolS.AS_GetProviderNames: OleVariant;

begin

 ....//书上有详细的说明,每本多层系统的书上都有;

end;

function TPoolS.AS_GetRecords(const ProviderName: WideString;

 Count: Integer; out RecsOut: Integer; Options: Integer;

 const CommandText: WideString; var Params,

 OwnerData: OleVariant): OleVariant;

begin

 ....//书上有详细的说明,每本多层系统的书上都有;

end;

function TPoolS.AS_RowRequest(const ProviderName: WideString;

 Row: OleVariant; RequestType: Integer;

 var OwnerData: OleVariant): OleVariant;

begin

end;

function TPoolS.GetInterface: IPooler_LX;

begin

 Result := PoolManager.GetInterface;/////获得接口

end;

procedure TPoolS.SetInterface(InterfaceValue: IPooler_LX);

begin

 PoolManager.SetInterface(InterfaceValue);////这是当释放一个接口是用到的代码;

end;

end.

希望对大家有用,但是需要说明一点的是,你应该理解锁定、线程、类工厂等内容,不然,理解这些,有些勉强;

小小希望你是最棒的一个;