Main Page | Directories | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members | Related Pages

vtkMultiProcessController.h

Go to the documentation of this file.
00001 /*=========================================================================
00002 
00003   Program:   Visualization Toolkit
00004   Module:    $RCSfile: vtkMultiProcessController.h,v $
00005 
00006   Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
00007   All rights reserved.
00008   See Copyright.txt or http://www.kitware.com/Copyright.htm for details.
00009 
00010      This software is distributed WITHOUT ANY WARRANTY; without even
00011      the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
00012      PURPOSE.  See the above copyright notice for more information.
00013 
00014 =========================================================================*/
00062 #ifndef __vtkMultiProcessController_h
00063 #define __vtkMultiProcessController_h
00064 
00065 #include "vtkObject.h"
00066 
00067 #include "vtkCommunicator.h" // Needed for direct access to communicator
00068 
00069 class vtkDataSet;
00070 class vtkImageData;
00071 class vtkCollection;
00072 class vtkOutputWindow;
00073 class vtkDataObject;
00074 class vtkMultiProcessController;
00075 
00076 //BTX
00077 // The type of function that gets called when new processes are initiated.
00078 typedef void (*vtkProcessFunctionType)(vtkMultiProcessController *controller, 
00079                                        void *userData);
00080 
00081 // The type of function that gets called when an RMI is triggered.
00082 typedef void (*vtkRMIFunctionType)(void *localArg, 
00083                                    void *remoteArg, int remoteArgLength, 
00084                                    int remoteProcessId);
00085 //ETX
00086 
00087 
00088 class VTK_PARALLEL_EXPORT vtkMultiProcessController : public vtkObject
00089 {
00090 public:
00091   static vtkMultiProcessController *New();
00092   vtkTypeRevisionMacro(vtkMultiProcessController,vtkObject);
00093   void PrintSelf(ostream& os, vtkIndent indent);
00094 
00098   virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv))=0;
00099 
00101 
00104   virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv),
00105                           int initializedExternally)=0;
00107 
00110   virtual void Finalize()=0;
00111 
00115   virtual void Finalize(int finalizedExternally)=0;
00116 
00118 
00121   virtual void SetNumberOfProcesses(int num);
00122   vtkGetMacro( NumberOfProcesses, int );
00124 
00125   //BTX
00127 
00130   void SetSingleMethod(vtkProcessFunctionType, void *data);
00131   //ETX
00133 
00137   virtual void SingleMethodExecute() = 0;
00138   
00139   //BTX
00141 
00145   void SetMultipleMethod(int index, vtkProcessFunctionType, void *data); 
00146   //ETX
00148 
00152   virtual void MultipleMethodExecute() = 0;
00153 
00155 
00156   vtkGetMacro(LocalProcessId, int);
00158 
00163   static vtkMultiProcessController *GetGlobalController();
00164 
00167   virtual void CreateOutputWindow() = 0;
00168   
00170 
00175   vtkSetMacro(ForceDeepCopy, int);
00176   vtkGetMacro(ForceDeepCopy, int);
00177   vtkBooleanMacro(ForceDeepCopy, int);
00179 
00180   
00181   //------------------ RMIs --------------------
00182   //BTX
00188   void AddRMI(vtkRMIFunctionType, void *localArg, int tag);
00189   
00191   int RemoveFirstRMI(int tag);
00192 
00194 
00195   void RemoveRMI(vtkRMIFunctionType f, void *arg, int tag)
00196     {f = f; arg = arg; tag = tag; vtkErrorMacro("RemoveRMI Not Implemented Yet");};
00197   //ETX
00199   
00201   void TriggerRMI(int remoteProcessId, void *arg, int argLength, int tag);
00202 
00205   void TriggerBreakRMIs();
00206 
00208 
00209   void TriggerRMI(int remoteProcessId, const char *arg, int tag) 
00210     { this->TriggerRMI(remoteProcessId, (void*)arg, 
00211                        static_cast<int>(strlen(arg))+1, tag); }
00213 
00215 
00216   void TriggerRMI(int remoteProcessId, int tag)
00217     { this->TriggerRMI(remoteProcessId, NULL, 0, tag); }
00219 
00222   void ProcessRMIs();
00223 
00225 
00228   vtkSetMacro(BreakFlag, int);
00229   vtkGetMacro(BreakFlag, int);
00231 
00233   vtkGetObjectMacro(Communicator, vtkCommunicator);
00235   
00236 //BTX
00237 
00238   enum Consts {
00239     MAX_PROCESSES  = 8192,
00240     ANY_SOURCE     = -1,
00241     INVALID_SOURCE = -2,
00242     RMI_TAG        = 315167,
00243     RMI_ARG_TAG    = 315168,
00244     BREAK_RMI_TAG  = 239954
00245   };
00246 
00247 //ETX
00248 
00250   virtual void Barrier() = 0;
00251 
00252   static void SetGlobalController(vtkMultiProcessController *controller);
00253 
00254   //------------------ Communication --------------------
00255   
00257 
00259   int Send(int* data, int length, int remoteProcessId, int tag);
00260   int Send(unsigned long* data, int length, int remoteProcessId, 
00261            int tag);
00262   int Send(char* data, int length, int remoteProcessId, int tag);
00263   int Send(unsigned char* data, int length, int remoteProcessId, int tag);
00264   int Send(float* data, int length, int remoteProcessId, int tag);
00265   int Send(double* data, int length, int remoteProcessId, int tag);
00266 #ifdef VTK_USE_64BIT_IDS
00267   int Send(vtkIdType* data, int length, int remoteProcessId, int tag);
00269 #endif
00270   int Send(vtkDataObject *data, int remoteId, int tag);
00271   int Send(vtkDataArray *data, int remoteId, int tag);
00272 
00274 
00277   int Receive(int* data, int length, int remoteProcessId, int tag);
00278   int Receive(unsigned long* data, int length, int remoteProcessId, 
00279               int tag);
00280   int Receive(char* data, int length, int remoteProcessId, int tag);
00281   int Receive(unsigned char* data, int length, int remoteProcessId, int tag);
00282   int Receive(float* data, int length, int remoteProcessId, int tag);
00283   int Receive(double* data, int length, int remoteProcessId, int tag);
00284 #ifdef VTK_USE_64BIT_IDS
00285   int Receive(vtkIdType* data, int length, int remoteProcessId, int tag);
00287 #endif
00288   int Receive(vtkDataObject* data, int remoteId, int tag);
00289   int Receive(vtkDataArray* data, int remoteId, int tag);
00290 
00291 // Internally implemented RMI to break the process loop.
00292 
00293 protected:
00294   vtkMultiProcessController();
00295   ~vtkMultiProcessController();
00296   
00297   int MaximumNumberOfProcesses;
00298   int NumberOfProcesses;
00299 
00300   int LocalProcessId;
00301   
00302   vtkProcessFunctionType      SingleMethod;
00303   void                       *SingleData;
00304   vtkProcessFunctionType      MultipleMethod[MAX_PROCESSES];
00305   void                       *MultipleData[MAX_PROCESSES];  
00306   
00307   vtkCollection *RMIs;
00308   
00309   // This is a flag that can be used by the ports to break
00310   // their update loop. (same as ProcessRMIs)
00311   int BreakFlag;
00312 
00313   void ProcessRMI(int remoteProcessId, void *arg, int argLength, int rmiTag);
00314 
00315   // This method implements "GetGlobalController".  
00316   // It needs to be virtual and static.
00317   virtual vtkMultiProcessController *GetLocalController();
00318 
00319   
00320   // This flag can force deep copies during send.
00321   int ForceDeepCopy;
00322 
00323   vtkOutputWindow* OutputWindow;
00324 
00325   // Note that since the communicators can be created differently
00326   // depending on the type of controller, the subclasses are
00327   // responsible of deleting them.
00328   vtkCommunicator* Communicator;
00329 
00330   // Communicator which is a copy of the current user
00331   // level communicator except the context; i.e. even if the tags 
00332   // are the same, the RMI messages will not interfere with user 
00333   // level messages. (This only works with MPI. When using threads,
00334   // the tags have to be unique.)
00335   // Note that since the communicators can be created differently
00336   // depending on the type of controller, the subclasses are
00337   // responsible of deleting them.
00338   vtkCommunicator* RMICommunicator;
00339 
00340 private:
00341   vtkMultiProcessController(const vtkMultiProcessController&);  // Not implemented.
00342   void operator=(const vtkMultiProcessController&);  // Not implemented.
00343 };
00344 
00345 
00346 inline int vtkMultiProcessController::Send(vtkDataObject *data, 
00347                                            int remoteThreadId, int tag)
00348 {
00349   if (this->Communicator)
00350     {
00351     return this->Communicator->Send(data, remoteThreadId, tag);
00352     }
00353   else
00354     {
00355     return 0;
00356     }
00357 }
00358 
00359 inline int vtkMultiProcessController::Send(vtkDataArray *data, 
00360                                            int remoteThreadId, int tag)
00361 {
00362   if (this->Communicator)
00363     {
00364     return this->Communicator->Send(data, remoteThreadId, tag);
00365     }
00366   else
00367     {
00368     return 0;
00369     }
00370 }
00371 
00372 inline int vtkMultiProcessController::Send(int* data, int length, 
00373                                            int remoteThreadId, int tag)
00374 {
00375   if (this->Communicator)
00376     {
00377     return this->Communicator->Send(data, length, remoteThreadId, tag);
00378     }
00379   else
00380     {
00381     return 0;
00382     }
00383 }
00384 
00385 inline int vtkMultiProcessController::Send(unsigned long* data, 
00386                                            int length, int remoteThreadId, 
00387                                            int tag)
00388 {
00389   if (this->Communicator)
00390     {
00391     return this->Communicator->Send(data, length, remoteThreadId, tag);
00392     }
00393   else
00394     {
00395     return 0;
00396     }
00397 }
00398 
00399 inline int vtkMultiProcessController::Send(char* data, int length, 
00400                                            int remoteThreadId, int tag)
00401 {
00402   if (this->Communicator)
00403     {
00404     return this->Communicator->Send(data, length, remoteThreadId, tag);
00405     }
00406   else
00407     {
00408     return 0;
00409     }
00410 }
00411 
00412 inline int vtkMultiProcessController::Send(unsigned char* data, int length, 
00413                                            int remoteThreadId, int tag)
00414 {
00415   if (this->Communicator)
00416     {
00417     return this->Communicator->Send(data, length, remoteThreadId, tag);
00418     }
00419   else
00420     {
00421     return 0;
00422     }
00423 }
00424 
00425 inline int vtkMultiProcessController::Send(float* data, int length, 
00426                                            int remoteThreadId, int tag)
00427 {
00428   if (this->Communicator)
00429     {
00430     return this->Communicator->Send(data, length, remoteThreadId, tag);
00431     }
00432   else
00433     {
00434     return 0;
00435     }
00436 }
00437 
00438 inline int vtkMultiProcessController::Send(double* data, int length, 
00439                                            int remoteThreadId, int tag)
00440 {
00441   if (this->Communicator)
00442     {
00443     return this->Communicator->Send(data, length, remoteThreadId, tag);
00444     }
00445   else
00446     {
00447     return 0;
00448     }
00449 }
00450 
00451 #ifdef VTK_USE_64BIT_IDS
00452 inline int vtkMultiProcessController::Send(vtkIdType* data, int length, 
00453                                            int remoteThreadId, int tag)
00454 {
00455   if (this->Communicator)
00456     {
00457     return this->Communicator->Send(data, length, remoteThreadId, tag);
00458     }
00459   else
00460     {
00461     return 0;
00462     }
00463 }
00464 #endif
00465 
00466 inline int vtkMultiProcessController::Receive(vtkDataObject* data, 
00467                                               int remoteThreadId, int tag)
00468 {
00469   if (this->Communicator)
00470     {
00471     return this->Communicator->Receive(data, remoteThreadId, tag);
00472     }
00473   else
00474     {
00475     return 0;
00476     }
00477 }
00478 
00479 inline int vtkMultiProcessController::Receive(vtkDataArray* data, 
00480                                               int remoteThreadId, int tag)
00481 {
00482   if (this->Communicator)
00483     {
00484     return this->Communicator->Receive(data, remoteThreadId, tag);
00485     }
00486   else
00487     {
00488     return 0;
00489     }
00490 }
00491 
00492 inline int vtkMultiProcessController::Receive(int* data, int length, 
00493                                               int remoteThreadId, int tag)
00494 {
00495   if (this->Communicator)
00496     {
00497     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00498     }
00499   else
00500     {
00501     return 0;
00502     }
00503 }
00504 
00505 inline int vtkMultiProcessController::Receive(unsigned long* data, 
00506                                               int length,int remoteThreadId, 
00507                                               int tag)
00508 {
00509   if (this->Communicator)
00510     {
00511     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00512     }
00513   else
00514     {
00515     return 0;
00516     }
00517 }
00518 
00519 inline int vtkMultiProcessController::Receive(char* data, int length, 
00520                                               int remoteThreadId, int tag)
00521 {
00522   if (this->Communicator)
00523     {
00524     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00525     }
00526   else
00527     {
00528     return 0;
00529     }
00530 }
00531 
00532 inline int vtkMultiProcessController::Receive(unsigned char* data, int length, 
00533                                               int remoteThreadId, int tag)
00534 {
00535   if (this->Communicator)
00536     {
00537     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00538     }
00539   else
00540     {
00541     return 0;
00542     }
00543 }
00544 
00545 inline int vtkMultiProcessController::Receive(float* data, int length, 
00546                                               int remoteThreadId, int tag)
00547 {
00548   if (this->Communicator)
00549     {
00550     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00551     }
00552   else
00553     {
00554     return 0;
00555     }
00556 }
00557 
00558 inline int vtkMultiProcessController::Receive(double* data, int length, 
00559                                               int remoteThreadId, int tag)
00560 {
00561   if (this->Communicator)
00562     {
00563     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00564     }
00565   else
00566     {
00567     return 0;
00568     }
00569 }
00570 
00571 #ifdef VTK_USE_64BIT_IDS
00572 inline int vtkMultiProcessController::Receive(vtkIdType* data, int length, 
00573                                               int remoteThreadId, int tag)
00574 {
00575   if (this->Communicator)
00576     {
00577     return this->Communicator->Receive(data, length, remoteThreadId, tag);
00578     }
00579   else
00580     {
00581     return 0;
00582     }
00583 }
00584 #endif
00585 
00586 #endif