00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00062 #ifndef __vtkMultiProcessController_h
00063 #define __vtkMultiProcessController_h
00064
00065 #include "vtkObject.h"
00066
00067 #include "vtkCommunicator.h"
00068
00069 class vtkDataSet;
00070 class vtkImageData;
00071 class vtkCollection;
00072 class vtkOutputWindow;
00073 class vtkDataObject;
00074 class vtkMultiProcessController;
00075
00076
00077
00078 typedef void (*vtkProcessFunctionType)(vtkMultiProcessController *controller,
00079 void *userData);
00080
00081
00082 typedef void (*vtkRMIFunctionType)(void *localArg,
00083 void *remoteArg, int remoteArgLength,
00084 int remoteProcessId);
00085
00086
00087
00088 class VTK_PARALLEL_EXPORT vtkMultiProcessController : public vtkObject
00089 {
00090 public:
00091 static vtkMultiProcessController *New();
00092 vtkTypeRevisionMacro(vtkMultiProcessController,vtkObject);
00093 void PrintSelf(ostream& os, vtkIndent indent);
00094
00098 virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv))=0;
00099
00101
00104 virtual void Initialize(int* vtkNotUsed(argc), char*** vtkNotUsed(argv),
00105 int initializedExternally)=0;
00107
00110 virtual void Finalize()=0;
00111
00115 virtual void Finalize(int finalizedExternally)=0;
00116
00118
00121 virtual void SetNumberOfProcesses(int num);
00122 vtkGetMacro( NumberOfProcesses, int );
00124
00125
00127
00130 void SetSingleMethod(vtkProcessFunctionType, void *data);
00131
00133
00137 virtual void SingleMethodExecute() = 0;
00138
00139
00141
00145 void SetMultipleMethod(int index, vtkProcessFunctionType, void *data);
00146
00148
00152 virtual void MultipleMethodExecute() = 0;
00153
00155
00156 vtkGetMacro(LocalProcessId, int);
00158
00163 static vtkMultiProcessController *GetGlobalController();
00164
00167 virtual void CreateOutputWindow() = 0;
00168
00170
00175 vtkSetMacro(ForceDeepCopy, int);
00176 vtkGetMacro(ForceDeepCopy, int);
00177 vtkBooleanMacro(ForceDeepCopy, int);
00179
00180
00181
00182
00188 void AddRMI(vtkRMIFunctionType, void *localArg, int tag);
00189
00191 int RemoveFirstRMI(int tag);
00192
00194
00195 void RemoveRMI(vtkRMIFunctionType f, void *arg, int tag)
00196 {f = f; arg = arg; tag = tag; vtkErrorMacro("RemoveRMI Not Implemented Yet");};
00197
00199
00201 void TriggerRMI(int remoteProcessId, void *arg, int argLength, int tag);
00202
00205 void TriggerBreakRMIs();
00206
00208
00209 void TriggerRMI(int remoteProcessId, const char *arg, int tag)
00210 { this->TriggerRMI(remoteProcessId, (void*)arg,
00211 static_cast<int>(strlen(arg))+1, tag); }
00213
00215
00216 void TriggerRMI(int remoteProcessId, int tag)
00217 { this->TriggerRMI(remoteProcessId, NULL, 0, tag); }
00219
00222 void ProcessRMIs();
00223
00225
00228 vtkSetMacro(BreakFlag, int);
00229 vtkGetMacro(BreakFlag, int);
00231
00233 vtkGetObjectMacro(Communicator, vtkCommunicator);
00235
00236
00237
00238 enum Consts {
00239 MAX_PROCESSES = 8192,
00240 ANY_SOURCE = -1,
00241 INVALID_SOURCE = -2,
00242 RMI_TAG = 315167,
00243 RMI_ARG_TAG = 315168,
00244 BREAK_RMI_TAG = 239954
00245 };
00246
00247
00248
00250 virtual void Barrier() = 0;
00251
00252 static void SetGlobalController(vtkMultiProcessController *controller);
00253
00254
00255
00257
00259 int Send(int* data, int length, int remoteProcessId, int tag);
00260 int Send(unsigned long* data, int length, int remoteProcessId,
00261 int tag);
00262 int Send(char* data, int length, int remoteProcessId, int tag);
00263 int Send(unsigned char* data, int length, int remoteProcessId, int tag);
00264 int Send(float* data, int length, int remoteProcessId, int tag);
00265 int Send(double* data, int length, int remoteProcessId, int tag);
00266 #ifdef VTK_USE_64BIT_IDS
00267 int Send(vtkIdType* data, int length, int remoteProcessId, int tag);
00269 #endif
00270 int Send(vtkDataObject *data, int remoteId, int tag);
00271 int Send(vtkDataArray *data, int remoteId, int tag);
00272
00274
00277 int Receive(int* data, int length, int remoteProcessId, int tag);
00278 int Receive(unsigned long* data, int length, int remoteProcessId,
00279 int tag);
00280 int Receive(char* data, int length, int remoteProcessId, int tag);
00281 int Receive(unsigned char* data, int length, int remoteProcessId, int tag);
00282 int Receive(float* data, int length, int remoteProcessId, int tag);
00283 int Receive(double* data, int length, int remoteProcessId, int tag);
00284 #ifdef VTK_USE_64BIT_IDS
00285 int Receive(vtkIdType* data, int length, int remoteProcessId, int tag);
00287 #endif
00288 int Receive(vtkDataObject* data, int remoteId, int tag);
00289 int Receive(vtkDataArray* data, int remoteId, int tag);
00290
00291
00292
00293 protected:
00294 vtkMultiProcessController();
00295 ~vtkMultiProcessController();
00296
00297 int MaximumNumberOfProcesses;
00298 int NumberOfProcesses;
00299
00300 int LocalProcessId;
00301
00302 vtkProcessFunctionType SingleMethod;
00303 void *SingleData;
00304 vtkProcessFunctionType MultipleMethod[MAX_PROCESSES];
00305 void *MultipleData[MAX_PROCESSES];
00306
00307 vtkCollection *RMIs;
00308
00309
00310
00311 int BreakFlag;
00312
00313 void ProcessRMI(int remoteProcessId, void *arg, int argLength, int rmiTag);
00314
00315
00316
00317 virtual vtkMultiProcessController *GetLocalController();
00318
00319
00320
00321 int ForceDeepCopy;
00322
00323 vtkOutputWindow* OutputWindow;
00324
00325
00326
00327
00328 vtkCommunicator* Communicator;
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338 vtkCommunicator* RMICommunicator;
00339
00340 private:
00341 vtkMultiProcessController(const vtkMultiProcessController&);
00342 void operator=(const vtkMultiProcessController&);
00343 };
00344
00345
00346 inline int vtkMultiProcessController::Send(vtkDataObject *data,
00347 int remoteThreadId, int tag)
00348 {
00349 if (this->Communicator)
00350 {
00351 return this->Communicator->Send(data, remoteThreadId, tag);
00352 }
00353 else
00354 {
00355 return 0;
00356 }
00357 }
00358
00359 inline int vtkMultiProcessController::Send(vtkDataArray *data,
00360 int remoteThreadId, int tag)
00361 {
00362 if (this->Communicator)
00363 {
00364 return this->Communicator->Send(data, remoteThreadId, tag);
00365 }
00366 else
00367 {
00368 return 0;
00369 }
00370 }
00371
00372 inline int vtkMultiProcessController::Send(int* data, int length,
00373 int remoteThreadId, int tag)
00374 {
00375 if (this->Communicator)
00376 {
00377 return this->Communicator->Send(data, length, remoteThreadId, tag);
00378 }
00379 else
00380 {
00381 return 0;
00382 }
00383 }
00384
00385 inline int vtkMultiProcessController::Send(unsigned long* data,
00386 int length, int remoteThreadId,
00387 int tag)
00388 {
00389 if (this->Communicator)
00390 {
00391 return this->Communicator->Send(data, length, remoteThreadId, tag);
00392 }
00393 else
00394 {
00395 return 0;
00396 }
00397 }
00398
00399 inline int vtkMultiProcessController::Send(char* data, int length,
00400 int remoteThreadId, int tag)
00401 {
00402 if (this->Communicator)
00403 {
00404 return this->Communicator->Send(data, length, remoteThreadId, tag);
00405 }
00406 else
00407 {
00408 return 0;
00409 }
00410 }
00411
00412 inline int vtkMultiProcessController::Send(unsigned char* data, int length,
00413 int remoteThreadId, int tag)
00414 {
00415 if (this->Communicator)
00416 {
00417 return this->Communicator->Send(data, length, remoteThreadId, tag);
00418 }
00419 else
00420 {
00421 return 0;
00422 }
00423 }
00424
00425 inline int vtkMultiProcessController::Send(float* data, int length,
00426 int remoteThreadId, int tag)
00427 {
00428 if (this->Communicator)
00429 {
00430 return this->Communicator->Send(data, length, remoteThreadId, tag);
00431 }
00432 else
00433 {
00434 return 0;
00435 }
00436 }
00437
00438 inline int vtkMultiProcessController::Send(double* data, int length,
00439 int remoteThreadId, int tag)
00440 {
00441 if (this->Communicator)
00442 {
00443 return this->Communicator->Send(data, length, remoteThreadId, tag);
00444 }
00445 else
00446 {
00447 return 0;
00448 }
00449 }
00450
00451 #ifdef VTK_USE_64BIT_IDS
00452 inline int vtkMultiProcessController::Send(vtkIdType* data, int length,
00453 int remoteThreadId, int tag)
00454 {
00455 if (this->Communicator)
00456 {
00457 return this->Communicator->Send(data, length, remoteThreadId, tag);
00458 }
00459 else
00460 {
00461 return 0;
00462 }
00463 }
00464 #endif
00465
00466 inline int vtkMultiProcessController::Receive(vtkDataObject* data,
00467 int remoteThreadId, int tag)
00468 {
00469 if (this->Communicator)
00470 {
00471 return this->Communicator->Receive(data, remoteThreadId, tag);
00472 }
00473 else
00474 {
00475 return 0;
00476 }
00477 }
00478
00479 inline int vtkMultiProcessController::Receive(vtkDataArray* data,
00480 int remoteThreadId, int tag)
00481 {
00482 if (this->Communicator)
00483 {
00484 return this->Communicator->Receive(data, remoteThreadId, tag);
00485 }
00486 else
00487 {
00488 return 0;
00489 }
00490 }
00491
00492 inline int vtkMultiProcessController::Receive(int* data, int length,
00493 int remoteThreadId, int tag)
00494 {
00495 if (this->Communicator)
00496 {
00497 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00498 }
00499 else
00500 {
00501 return 0;
00502 }
00503 }
00504
00505 inline int vtkMultiProcessController::Receive(unsigned long* data,
00506 int length,int remoteThreadId,
00507 int tag)
00508 {
00509 if (this->Communicator)
00510 {
00511 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00512 }
00513 else
00514 {
00515 return 0;
00516 }
00517 }
00518
00519 inline int vtkMultiProcessController::Receive(char* data, int length,
00520 int remoteThreadId, int tag)
00521 {
00522 if (this->Communicator)
00523 {
00524 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00525 }
00526 else
00527 {
00528 return 0;
00529 }
00530 }
00531
00532 inline int vtkMultiProcessController::Receive(unsigned char* data, int length,
00533 int remoteThreadId, int tag)
00534 {
00535 if (this->Communicator)
00536 {
00537 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00538 }
00539 else
00540 {
00541 return 0;
00542 }
00543 }
00544
00545 inline int vtkMultiProcessController::Receive(float* data, int length,
00546 int remoteThreadId, int tag)
00547 {
00548 if (this->Communicator)
00549 {
00550 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00551 }
00552 else
00553 {
00554 return 0;
00555 }
00556 }
00557
00558 inline int vtkMultiProcessController::Receive(double* data, int length,
00559 int remoteThreadId, int tag)
00560 {
00561 if (this->Communicator)
00562 {
00563 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00564 }
00565 else
00566 {
00567 return 0;
00568 }
00569 }
00570
00571 #ifdef VTK_USE_64BIT_IDS
00572 inline int vtkMultiProcessController::Receive(vtkIdType* data, int length,
00573 int remoteThreadId, int tag)
00574 {
00575 if (this->Communicator)
00576 {
00577 return this->Communicator->Receive(data, length, remoteThreadId, tag);
00578 }
00579 else
00580 {
00581 return 0;
00582 }
00583 }
00584 #endif
00585
00586 #endif