diff --git a/proof/doc/confman/TDataSetManagerAliEn.md b/proof/doc/confman/TDataSetManagerAliEn.md index 71f587d266034..80a4998a59273 100644 --- a/proof/doc/confman/TDataSetManagerAliEn.md +++ b/proof/doc/confman/TDataSetManagerAliEn.md @@ -179,7 +179,7 @@ and official Monte Carlo productions: Period : The LHC period. - Example of valid values: *LHC10h, LHC11h\_2, LHC11f\_Technical* + Example of valid values: `LHC10h`, `LHC11h_2`, `LHC11f_Technical` Variant : Data variant, which might be `ESDs` (or `ESD`) for ESDs and `AODXXX` @@ -351,6 +351,14 @@ cache > to stage data asynchronously using the [stager > daemon](http://afdsmgrd.googlecode.com/). +#### Force cache refresh + +If the cached information for a certain AliEn file catalog query is wrong, +it is possible to force querying the catalog again by using the keyword +`ForceUpdate`: + + Data;ForceUpdate;Period=LHC10h;Variant=AOD086;Run=130831-130833;Pass=pass1 + ### Staging requests Issuing staging requests and keeping track of them requires an auxiliary diff --git a/proof/proof/inc/TProof.h b/proof/proof/inc/TProof.h index 84fb5dd6b47ed..77ecfdb028d70 100644 --- a/proof/proof/inc/TProof.h +++ b/proof/proof/inc/TProof.h @@ -159,6 +159,7 @@ const char* const kPROOF_TerminateWorker = "+++ terminating +++"; // signal work const char* const kPROOF_WorkerIdleTO = "+++ idle-timeout +++"; // signal worker idle timeout in MarkBad const char* const kPROOF_InputDataFile = "inputdata.root"; // Default input data file name const char* const kPROOF_MissingFiles = "MissingFiles"; // Missingfile list name +const Long64_t kPROOF_DynWrkPollInt_s = 10; // minimum number of seconds between two polls for dyn wrks #ifndef R__WIN32 const char* const kCP = "/bin/cp -fp"; @@ -507,6 +508,8 @@ friend class TXProofServ; // to access EUrgent TList *fRecvMessages; //Messages received during collect not yet processed TList *fSlaveInfo; //!list returned by kPROOF_GETSLAVEINFO Bool_t fSendGroupView; //if true send new group view + Bool_t fIsPollingWorkers; //will be set to kFALSE to prevent recursive dyn workers check in dyn mode + Long64_t fLastPollWorkers_s; //timestamp (in seconds) of last poll for workers, -1 if never checked TList *fActiveSlaves; //list of active slaves (subset of all slaves) TString fActiveSlavesSaved;// comma-separated list of active slaves (before last call to // SetParallel or Activate/DeactivateWorkers) @@ -644,12 +647,13 @@ friend class TXProofServ; // to access EUrgent void AskStatistics(); void AskParallel(); Int_t GoParallel(Int_t nodes, Bool_t accept = kFALSE, Bool_t random = kFALSE); + Int_t GoMoreParallel(Int_t nWorkersToAdd); Int_t SetParallelSilent(Int_t nodes, Bool_t random = kFALSE); void RecvLogFile(TSocket *s, Int_t size); void NotifyLogMsg(const char *msg, const char *sfx = "\n"); Int_t BuildPackage(const char *package, EBuildPackageOpt opt = kBuildAll, Int_t chkveropt = 2); Int_t BuildPackageOnClient(const char *package, Int_t opt = 0, TString *path = 0, Int_t chkveropt = 2); - Int_t LoadPackage(const char *package, Bool_t notOnClient = kFALSE, TList *loadopts = 0); + Int_t LoadPackage(const char *package, Bool_t notOnClient = kFALSE, TList *loadopts = 0, TList *workers = 0); Int_t LoadPackageOnClient(const char *package, TList *loadopts = 0); Int_t UnloadPackage(const char *package); Int_t UnloadPackageOnClient(const char *package); @@ -680,6 +684,7 @@ friend class TXProofServ; // to access EUrgent Int_t HandleInputMessage(TSlave *wrk, TMessage *m, Bool_t deactonfail = kFALSE); void HandleSubmerger(TMessage *mess, TSlave *sl); void SetMonitor(TMonitor *mon = 0, Bool_t on = kTRUE); + Bool_t PollForNewWorkers(); void ReleaseMonitor(TMonitor *mon); @@ -883,17 +888,17 @@ friend class TXProofServ; // to access EUrgent Int_t ClearPackages(); Int_t ClearPackage(const char *package); Int_t DownloadPackage(const char *par, const char *dstdir = 0); - Int_t EnablePackage(const char *package, Bool_t notOnClient = kFALSE); + Int_t EnablePackage(const char *package, Bool_t notOnClient = kFALSE, TList *workers = 0); Int_t EnablePackage(const char *package, const char *loadopts, - Bool_t notOnClient = kFALSE); + Bool_t notOnClient = kFALSE, TList *workers = 0); Int_t EnablePackage(const char *package, TList *loadopts, - Bool_t notOnClient = kFALSE); - Int_t UploadPackage(const char *par, EUploadPackageOpt opt = kUntar); + Bool_t notOnClient = kFALSE, TList *workers = 0); + Int_t UploadPackage(const char *par, EUploadPackageOpt opt = kUntar, TList *workers = 0); virtual Int_t Load(const char *macro, Bool_t notOnClient = kFALSE, Bool_t uniqueOnly = kTRUE, TList *wrks = 0); - Int_t AddDynamicPath(const char *libpath, Bool_t onClient = kFALSE, TList *wrks = 0); - Int_t AddIncludePath(const char *incpath, Bool_t onClient = kFALSE, TList *wrks = 0); + Int_t AddDynamicPath(const char *libpath, Bool_t onClient = kFALSE, TList *wrks = 0, Bool_t doCollect = kTRUE); + Int_t AddIncludePath(const char *incpath, Bool_t onClient = kFALSE, TList *wrks = 0, Bool_t doCollect = kTRUE); Int_t RemoveDynamicPath(const char *libpath, Bool_t onClient = kFALSE); Int_t RemoveIncludePath(const char *incpath, Bool_t onClient = kFALSE); diff --git a/proof/proof/inc/TVirtualProofPlayer.h b/proof/proof/inc/TVirtualProofPlayer.h index a7aa3252bfd16..8ba1d787a3712 100644 --- a/proof/proof/inc/TVirtualProofPlayer.h +++ b/proof/proof/inc/TVirtualProofPlayer.h @@ -60,6 +60,7 @@ class TVirtualProofPlayer : public TObject, public TQObject { virtual Long64_t Process(TDSet *set, TSelector *selector, Option_t *option = "", Long64_t nentries = -1, Long64_t firstentry = 0) = 0; + virtual Bool_t JoinProcess(TList *workers) = 0; virtual Long64_t Finalize(Bool_t force = kFALSE, Bool_t sync = kFALSE) = 0; virtual Long64_t Finalize(TQueryResult *qr) = 0; virtual Long64_t DrawSelect(TDSet *set, const char *varexp, diff --git a/proof/proof/src/TProof.cxx b/proof/proof/src/TProof.cxx index e8984026b1540..943fc277584aa 100644 --- a/proof/proof/src/TProof.cxx +++ b/proof/proof/src/TProof.cxx @@ -516,6 +516,8 @@ void TProof::InitMembers() fSlaveInfo = 0; fMasterServ = kFALSE; fSendGroupView = kFALSE; + fIsPollingWorkers = kFALSE; + fLastPollWorkers_s = -1; fActiveSlaves = 0; fInactiveSlaves = 0; fUniqueSlaves = 0; @@ -1227,7 +1229,8 @@ Int_t TProof::AddWorkers(TList *workerList) UInt_t nSlavesDone = 0; Int_t ord = 0; - // Loop over all workers and start them + // Loop over all new workers and start them + Bool_t goMoreParallel = (fSlaves->GetEntries() > 0); // a list of TSlave objects for workers that are being added TList *addedWorkers = new TList(); @@ -1350,8 +1353,23 @@ Int_t TProof::AddWorkers(TList *workerList) TString s(n->GetTitle()); if (s.IsDigit()) nwrk = s.Atoi(); } - GoParallel(nwrk, kFALSE, 0); - + + if (fDynamicStartup && goMoreParallel) { + + PDB(kGlobal, 3) + Info("AddWorkers", "Will invoke GoMoreParallel()"); + Int_t nw = GoMoreParallel(nwrk); + PDB(kGlobal, 3) + Info("AddWorkers", "GoMoreParallel()=%d", nw); + + } + else { + // Not in Dynamic Workers mode + PDB(kGlobal, 3) + Info("AddWorkers", "Will invoke GoParallel()"); + GoParallel(nwrk, kFALSE, 0); + } + if (gProofServ && gProofServ->GetEnabledPackages() && gProofServ->GetEnabledPackages()->GetSize() > 0) { TIter nxp(gProofServ->GetEnabledPackages()); @@ -1359,16 +1377,28 @@ Int_t TProof::AddWorkers(TList *workerList) while ((pck = (TPair *) nxp())) { // Upload and Enable methods are intelligent and avoid // re-uploading or re-enabling of a package to a node that has it. - if (UploadPackage(pck->GetName()) >= 0) - EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE); + if (fDynamicStartup && goMoreParallel) { + // Upload only on added workers + PDB(kGlobal, 3) + Info("AddWorkers", "Will invoke UploadPackage() and EnablePackage() on added workers"); + if (UploadPackage(pck->GetName(), kUntar, addedWorkers) >= 0) + EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE, addedWorkers); + } + else { + PDB(kGlobal, 3) + Info("AddWorkers", "Will invoke UploadPackage() and EnablePackage() on all workers"); + if (UploadPackage(pck->GetName()) >= 0) + EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE); + } } } - if (fLoadedMacros) { TIter nxp(fLoadedMacros); TObjString *os = 0; while ((os = (TObjString *) nxp())) { + PDB(kGlobal, 3) + Info("AddWorkers", "Will invoke Load() on selected workers"); Printf("Loading a macro : %s", os->GetName()); Load(os->GetName(), kTRUE, kTRUE, addedWorkers); } @@ -1377,22 +1407,41 @@ Int_t TProof::AddWorkers(TList *workerList) TString dyn = gSystem->GetDynamicPath(); dyn.ReplaceAll(":", " "); dyn.ReplaceAll("\"", " "); - AddDynamicPath(dyn, kFALSE, addedWorkers); + PDB(kGlobal, 3) + Info("AddWorkers", "Will invoke AddDynamicPath() on selected workers"); + AddDynamicPath(dyn, kFALSE, addedWorkers, fDynamicStartup); + TString inc = gSystem->GetIncludePath(); inc.ReplaceAll("-I", " "); inc.ReplaceAll("\"", " "); - AddIncludePath(inc, kFALSE, addedWorkers); - - // Cleanup - delete addedWorkers; + PDB(kGlobal, 3) + Info("AddWorkers", "Will invoke AddIncludePath() on selected workers"); + AddIncludePath(inc, kFALSE, addedWorkers, fDynamicStartup); // Update list of current workers + PDB(kGlobal, 3) + Info("AddWorkers", "Will invoke SaveWorkerInfo()"); SaveWorkerInfo(); - // Inform the client that the number of workers is changed - if (fDynamicStartup && gProofServ) + // Inform the client that the number of workers has changed + if (fDynamicStartup && gProofServ) { + PDB(kGlobal, 3) + Info("AddWorkers", "Will invoke SendParallel()"); gProofServ->SendParallel(kTRUE); + if (goMoreParallel && fPlayer) { + // In case we are adding workers dynamically to an existing process, we + // should invoke a special player's Process() to set only added workers + // to the proper state + PDB(kGlobal, 3) + Info("AddWorkers", "Will send the PROCESS message to selected workers"); + fPlayer->JoinProcess(addedWorkers); + } + } + + // Cleanup + delete addedWorkers; + return 0; } @@ -2560,6 +2609,13 @@ Int_t TProof::Collect(TMonitor *mon, Long_t timeout, Int_t endtype, Bool_t deact // If timeout >= 0, wait at most timeout seconds (timeout = -1 by default, // which means wait forever). // If defined (>= 0) endtype is the message that stops this collection. + // Collect also stops its execution from time to time to check for new + // workers in Dynamic Startup mode. + + Int_t collectId = gRandom->Integer(9999); + + PDB(kCollect, 3) + Info("Collect", ">>>>>> Entering collect responses #%04d", collectId); // Reset the status flag and clear the messages in the list, if any fStatus = 0; @@ -2627,7 +2683,19 @@ Int_t TProof::Collect(TMonitor *mon, Long_t timeout, Int_t endtype, Bool_t deact } } + // Preemptive poll for new workers on the master only in Dynamic Mode and only + // during processing (TODO: should work on Top Master only) + if (TestBit(TProof::kIsMaster) && !IsIdle() && fDynamicStartup && !fIsPollingWorkers && + ((fLastPollWorkers_s == -1) || (time(0)-fLastPollWorkers_s >= kPROOF_DynWrkPollInt_s))) { + fIsPollingWorkers = kTRUE; + PollForNewWorkers(); + fLastPollWorkers_s = time(0); + fIsPollingWorkers = kFALSE; + } + // Wait for a ready socket + PDB(kCollect, 3) + Info("Collect", "Will invoke Select() #%04d", collectId); TSocket *s = mon->Select(1000); if (s && s != (TSocket *)(-1)) { @@ -2672,7 +2740,8 @@ Int_t TProof::Collect(TMonitor *mon, Long_t timeout, Int_t endtype, Bool_t deact sto = (Long_t) actto; nsto = 60; } - } + + } // end loop over active monitors // If timed-out, deactivate the remaining sockets if (nto == 0) { @@ -2709,9 +2778,77 @@ Int_t TProof::Collect(TMonitor *mon, Long_t timeout, Int_t endtype, Bool_t deact ActivateAsyncInput(); + PDB(kCollect, 3) + Info("Collect", "<<<<<< Exiting collect responses #%04d", collectId); + return cnt; } +//______________________________________________________________________________ +Bool_t TProof::PollForNewWorkers() +{ + // Asks the PROOF Serv for new workers in Dynamic Startup mode and activates + // them. Returns the number of new workers found. + + // Requests for worker updates + Int_t dummy = 0; + TList *reqWorkers = new TList(); + reqWorkers->SetOwner(kFALSE); + + R__ASSERT(gProofServ); + gProofServ->GetWorkers(reqWorkers, dummy, kTRUE); // last 2 are dummy + + // List of new workers only (TProofNodeInfo) + TList *newWorkers = new TList(); + newWorkers->SetOwner(kTRUE); + + TIter next(reqWorkers); + TProofNodeInfo *ni; + TString fullOrd; + while (( ni = dynamic_cast(next()) )) { + + // Form the full ordinal + fullOrd.Form("%s.%s", gProofServ->GetOrdinal(), ni->GetOrdinal().Data()); + + TIter nextInner(fSlaves); + TSlave *sl; + Bool_t found = kFALSE; + while (( sl = dynamic_cast(nextInner()) )) { + if ( strcmp(sl->GetOrdinal(), fullOrd.Data()) == 0 ) { + found = kTRUE; + break; + } + } + + if (found) delete ni; + else { + newWorkers->Add(ni); + PDB(kGlobal, 1) + Info("PollForNewWorkers", "New worker found: %s:%s", + ni->GetNodeName().Data(), fullOrd.Data()); + } + } + + delete reqWorkers; // not owner + + Int_t nNewWorkers = newWorkers->GetEntries(); + + // Add the new workers + if (newWorkers->GetEntries() > 0) { + PDB(kGlobal, 1) + Info("PollForNewWorkers", "Requesting to add %d new worker(s)", newWorkers->GetEntries()); + AddWorkers(newWorkers); + // Don't delete newWorkers: AddWorkers() will do that + } + else { + PDB(kGlobal, 2) + Info("PollForNewWorkers", "No new worker found"); + delete newWorkers; + } + + return nNewWorkers; +} + //______________________________________________________________________________ void TProof::CleanGDirectory(TList *ol) { @@ -6716,7 +6853,7 @@ Int_t TProof::SetParallel(Int_t nodes, Bool_t random) if (fDynamicStartup && nodes < 0) { if (gSystem->Getenv("PROOF_NWORKERS")) gSystem->Unsetenv("PROOF_NWORKERS"); } - + Int_t n = SetParallelSilent(nodes, random); if (TestBit(TProof::kIsClient)) { if (n < 1) { @@ -6734,6 +6871,102 @@ Int_t TProof::SetParallel(Int_t nodes, Bool_t random) return n; } +//______________________________________________________________________________ +Int_t TProof::GoMoreParallel(Int_t nWorkersToAdd) +{ + // Add nWorkersToAdd workers to current list of workers. This function is + // works on the master only, and only when an analysis is ongoing. A message + // is sent back to the client when we go "more" parallel. + // Returns -1 on error, number of total (not added!) workers on success. + + if (!IsValid() || !IsMaster() || IsIdle()) + return -1; + + TSlave *sl = 0x0; + TIter next( fSlaves ); + Int_t nAddedWorkers = 0; + + while (((nAddedWorkers < nWorkersToAdd) || (nWorkersToAdd == -1)) && + (( sl = dynamic_cast( next() ) ))) { + + // If worker is of an invalid type, break everything: it should not happen! + if ((sl->GetSlaveType() != TSlave::kSlave) && + (sl->GetSlaveType() != TSlave::kMaster)) { + Error("GoMoreParallel", "TSlave is neither a Master nor a Slave: %s:%s", + sl->GetName(), sl->GetOrdinal()); + R__ASSERT(0); + } + + // Skip current worker if it is not a good candidate + if ((!sl->IsValid()) || (fBadSlaves->FindObject(sl)) || + (strcmp("IGNORE", sl->GetImage()) == 0)) { + PDB(kGlobal, 2) + Info("GoMoreParallel", "Worker %s:%s won't be considered", + sl->GetName(), sl->GetOrdinal()); + continue; + } + + // Worker is good but it is already active: skip it + if (fActiveSlaves->FindObject(sl)) { + Info("GoMoreParallel", "Worker %s:%s is already active: skipping", + sl->GetName(), sl->GetOrdinal()); + continue; + } + + // + // From here on: worker is a good candidate + // + + if (sl->GetSlaveType() == TSlave::kSlave) { + sl->SetStatus(TSlave::kActive); + fActiveSlaves->Add(sl); + fInactiveSlaves->Remove(sl); + fActiveMonitor->Add(sl->GetSocket()); + nAddedWorkers++; + PDB(kGlobal, 2) + Info("GoMoreParallel", "Worker %s:%s marked as active!", + sl->GetName(), sl->GetOrdinal()); + } + else { + // Can't add masters dynamically: this should not happen! + Error("GoMoreParallel", "Dynamic addition of master is not supported"); + R__ASSERT(0); + } + + } // end loop over all slaves + + // Get slave status (will set the slaves fWorkDir correctly) + PDB(kGlobal, 3) + Info("GoMoreParallel", "Will invoke AskStatistics() -- implies a Collect()"); + AskStatistics(); + + // Find active slaves with unique image + PDB(kGlobal, 3) + Info("GoMoreParallel", "Will invoke FindUniqueSlaves()"); + FindUniqueSlaves(); + + // Send new group-view to slaves + PDB(kGlobal, 3) + Info("GoMoreParallel", "Will invoke SendGroupView()"); + SendGroupView(); + + PDB(kGlobal, 3) + Info("GoMoreParallel", "Will invoke GetParallel()"); + Int_t nTotalWorkers = GetParallel(); + + // Notify the client that we've got more workers, and print info on + // Master's log as well + R__ASSERT(gProofServ); + TString s; + s.Form("PROOF just went more parallel (%d additional worker%s, %d worker%s total)", + nAddedWorkers, (nAddedWorkers == 1) ? "" : "s", + nTotalWorkers, (nTotalWorkers == 1) ? "" : "s"); + gProofServ->SendAsynMessage(s); + Info("GoMoreParallel", s.Data()); + + return nTotalWorkers; +} + //______________________________________________________________________________ Int_t TProof::GoParallel(Int_t nodes, Bool_t attach, Bool_t random) { @@ -7776,7 +8009,8 @@ Int_t TProof::BuildPackageOnClient(const char *pack, Int_t opt, TString *path, I } //______________________________________________________________________________ -Int_t TProof::LoadPackage(const char *package, Bool_t notOnClient, TList *loadopts) +Int_t TProof::LoadPackage(const char *package, Bool_t notOnClient, + TList *loadopts, TList *workers) { // Load specified package. Executes the PROOF-INF/SETUP.C script // on all active nodes. If notOnClient = true, don't load package @@ -7806,10 +8040,20 @@ Int_t TProof::LoadPackage(const char *package, Bool_t notOnClient, TList *loadop TMessage mess(kPROOF_CACHE); mess << Int_t(kLoadPackage) << pac; if (loadopts) mess << loadopts; - Broadcast(mess); + // On the master, workers that fail are deactivated Bool_t deactivateOnFailure = (IsMaster()) ? kTRUE : kFALSE; - Collect(kActive, -1, -1, deactivateOnFailure); + + if (workers) { + PDB(kPackage, 3) + Info("LoadPackage", "Sending load message to selected workers only"); + Broadcast(mess, workers); + Collect(workers, -1, -1, deactivateOnFailure); + } + else { + Broadcast(mess); + Collect(kActive, -1, -1, deactivateOnFailure); + } return fStatus; } @@ -8082,21 +8326,23 @@ Int_t TProof::UnloadPackages() } //______________________________________________________________________________ -Int_t TProof::EnablePackage(const char *package, Bool_t notOnClient) +Int_t TProof::EnablePackage(const char *package, Bool_t notOnClient, + TList *workers) { // Enable specified package. Executes the PROOF-INF/BUILD.sh // script if it exists followed by the PROOF-INF/SETUP.C script. // In case notOnClient = true, don't enable the package on the client. // The default is to enable packages also on the client. + // If specified, enables packages only on the specified workers. // Returns 0 in case of success and -1 in case of error. // Provided for backward compatibility. - return EnablePackage(package, (TList *)0, notOnClient); + return EnablePackage(package, (TList *)0, notOnClient, workers); } //______________________________________________________________________________ Int_t TProof::EnablePackage(const char *package, const char *loadopts, - Bool_t notOnClient) + Bool_t notOnClient, TList *workers) { // Enable specified package. Executes the PROOF-INF/BUILD.sh // script if it exists followed by the PROOF-INF/SETUP.C script. @@ -8109,7 +8355,8 @@ Int_t TProof::EnablePackage(const char *package, const char *loadopts, // off no check; failure may occur at loading // on check ROOT version [default] // svn check ROOT version and SVN revision number. - // (Use ';', ' ' or '|' to separate 'chkv=' from the rest.) + // (Use ';', ' ' or '|' to separate 'chkv=' from the rest.) + // If specified, enables packages only on the specified workers. // Returns 0 in case of success and -1 in case of error. TList *optls = 0; @@ -8156,7 +8403,7 @@ Int_t TProof::EnablePackage(const char *package, const char *loadopts, } } // Run - Int_t rc = EnablePackage(package, optls, notOnClient); + Int_t rc = EnablePackage(package, optls, notOnClient, workers); // Clean up SafeDelete(optls); // Done @@ -8165,7 +8412,7 @@ Int_t TProof::EnablePackage(const char *package, const char *loadopts, //______________________________________________________________________________ Int_t TProof::EnablePackage(const char *package, TList *loadopts, - Bool_t notOnClient) + Bool_t notOnClient, TList *workers) { // Enable specified package. Executes the PROOF-INF/BUILD.sh // script if it exists followed by the PROOF-INF/SETUP.C script. @@ -8225,7 +8472,7 @@ Int_t TProof::EnablePackage(const char *package, TList *loadopts, optls = 0; } - if (LoadPackage(pac, notOnClient, optls) == -1) + if (LoadPackage(pac, notOnClient, optls, workers) == -1) return -1; return 0; @@ -8326,7 +8573,8 @@ Int_t TProof::DownloadPackage(const char *pack, const char *dstdir) } //______________________________________________________________________________ -Int_t TProof::UploadPackage(const char *pack, EUploadPackageOpt opt) +Int_t TProof::UploadPackage(const char *pack, EUploadPackageOpt opt, + TList *workers) { // Upload a PROOF archive (PAR file). A PAR file is a compressed // tar file with one special additional directory, PROOF-INF @@ -8442,8 +8690,11 @@ Int_t TProof::UploadPackage(const char *pack, EUploadPackageOpt opt) mess3 << (UInt_t) opt; } - // loop over all selected nodes - TIter next(fUniqueSlaves); + // Loop over all slaves with unique fs image, or to a selected + // list of workers, if specified + if (!workers) + workers = fUniqueSlaves; + TIter next(workers); TSlave *sl = 0; while ((sl = (TSlave *) next())) { if (!sl->IsValid()) @@ -8841,7 +9092,8 @@ Int_t TProof::Load(const char *macro, Bool_t notOnClient, Bool_t uniqueWorkers, } //______________________________________________________________________________ -Int_t TProof::AddDynamicPath(const char *libpath, Bool_t onClient, TList *wrks) +Int_t TProof::AddDynamicPath(const char *libpath, Bool_t onClient, TList *wrks, + Bool_t doCollect) { // Add 'libpath' to the lib path search. // Multiple paths can be specified at once separating them with a comma or @@ -8868,17 +9120,22 @@ Int_t TProof::AddDynamicPath(const char *libpath, Bool_t onClient, TList *wrks) m << TString("-"); // Forward the request - if (wrks) + if (wrks) { Broadcast(m, wrks); - else + if (doCollect) + Collect(wrks, fCollectTimeout); + } + else { Broadcast(m); - Collect(kActive, fCollectTimeout); + Collect(kActive, fCollectTimeout); + } return 0; } //______________________________________________________________________________ -Int_t TProof::AddIncludePath(const char *incpath, Bool_t onClient, TList *wrks) +Int_t TProof::AddIncludePath(const char *incpath, Bool_t onClient, TList *wrks, + Bool_t doCollect) { // Add 'incpath' to the inc path search. // Multiple paths can be specified at once separating them with a comma or @@ -8905,11 +9162,15 @@ Int_t TProof::AddIncludePath(const char *incpath, Bool_t onClient, TList *wrks) m << TString("-"); // Forward the request - if (wrks) + if (wrks) { Broadcast(m, wrks); - else + if (doCollect) + Collect(wrks, fCollectTimeout); + } + else { Broadcast(m); - Collect(kActive, fCollectTimeout); + Collect(kActive, fCollectTimeout); + } return 0; } diff --git a/proof/proof/src/TProofServ.cxx b/proof/proof/src/TProofServ.cxx index a3797966f75d8..f5dfda266dbe2 100644 --- a/proof/proof/src/TProofServ.cxx +++ b/proof/proof/src/TProofServ.cxx @@ -2762,7 +2762,11 @@ void TProofServ::SendParallel(Bool_t async) Int_t nparallel = 0; if (IsMaster()) { + PDB(kGlobal, 2) + Info("SendParallel", "Will invoke AskParallel()"); fProof->AskParallel(); + PDB(kGlobal, 2) + Info("SendParallel", "Will invoke GetParallel()"); nparallel = fProof->GetParallel(); } else { nparallel = 1; diff --git a/proof/proofd/src/XrdProofSched.cxx b/proof/proofd/src/XrdProofSched.cxx index 7aeb4dfa20592..b61ec9cc01e94 100644 --- a/proof/proofd/src/XrdProofSched.cxx +++ b/proof/proofd/src/XrdProofSched.cxx @@ -516,7 +516,7 @@ int XrdProofSched::GetWorkers(XrdProofdProofServ *xps, } } else { if (maxnum > 0) { - // This is over-conservative for sub-selectiob (random, or round-robin options) + // This is over-conservative for sub-selection (random, or round-robin options) // A better solution should be implemented for that. int nactsess = mst->GetNActiveSessions(); TRACE(REQ, "act sess ... " << nactsess); @@ -549,8 +549,8 @@ int XrdProofSched::GetWorkers(XrdProofdProofServ *xps, } } - // If the session has already assigned workers just return - if (xps->Workers()->Num() > 0) { + // If a non-dynamic session already has assigned workers just return + if (!isDynamic && (xps->Workers()->Num() > 0)) { // Current assignement is valid SafeDel(acwseff); return 1; diff --git a/proof/proofplayer/inc/TPacketizerUnit.h b/proof/proofplayer/inc/TPacketizerUnit.h index 6bda19796fc38..fa95a0f34d451 100644 --- a/proof/proofplayer/inc/TPacketizerUnit.h +++ b/proof/proofplayer/inc/TPacketizerUnit.h @@ -55,12 +55,13 @@ class TPacketizerUnit : public TVirtualPacketizer { Double_t fCalibFrac; // Size of the calibrating packet as fraction of Ntot/Nwrk Long64_t fNumPerWorker; // Number of cycles per worker, if this option // is chosen - + Bool_t fFixedNum; // Fixed num of packets per worker Long64_t fPacketSeq; // Sequential number of the last packet assigned + TList *fInput; // Input list TPacketizerUnit(); TPacketizerUnit(const TPacketizerUnit&); // no implementation, will generate - void operator=(const TPacketizerUnit&); // error on accidental usage + void operator=(const TPacketizerUnit&); // error on accidental usage public: TPacketizerUnit(TList *slaves, Long64_t num, TList *input, TProofProgressStatus *st = 0); @@ -73,6 +74,8 @@ class TPacketizerUnit : public TVirtualPacketizer { Float_t GetCurrentRate(Bool_t &all); Int_t GetActiveWorkers() { return fWrkStats->GetSize(); } + Int_t AddWorkers(TList *workers); + ClassDef(TPacketizerUnit,0) //Generate work packets for parallel processing }; diff --git a/proof/proofplayer/inc/TProofPlayer.h b/proof/proofplayer/inc/TProofPlayer.h index 9ea3b9e88caa0..12dc34328dd53 100644 --- a/proof/proofplayer/inc/TProofPlayer.h +++ b/proof/proofplayer/inc/TProofPlayer.h @@ -157,6 +157,7 @@ class TProofPlayer : public TVirtualProofPlayer { Long64_t Process(TDSet *set, TSelector *selector, Option_t *option = "", Long64_t nentries = -1, Long64_t firstentry = 0); + virtual Bool_t JoinProcess(TList *workers); TVirtualPacketizer *GetPacketizer() const { return 0; } Long64_t Finalize(Bool_t force = kFALSE, Bool_t sync = kFALSE); Long64_t Finalize(TQueryResult *qr); @@ -302,6 +303,8 @@ class TProofPlayerRemote : public TProofPlayer { ErrorHandlerFunc_t fErrorHandler; // Store previous handler when redirecting output Bool_t fMergeTH1OneByOne; // If kTRUE forces TH1 merge one-by-one [kTRUE] TH1 *fProcPackets; //!Histogram with packets being processed (owned by TPerfStats) + TMessage *fProcessMessage; // Process message to replay when adding new workers dynamically + TString fSelectorFileName; // Current Selector's name, set by Process() virtual Bool_t HandleTimer(TTimer *timer); Int_t InitPacketizer(TDSet *dset, Long64_t nentries, @@ -321,7 +324,8 @@ class TProofPlayerRemote : public TProofPlayer { TProofPlayerRemote(TProof *proof = 0) : fProof(proof), fOutputLists(0), fFeedback(0), fFeedbackLists(0), fPacketizer(0), fMergeFiles(kFALSE), fDSet(0), fErrorHandler(0), - fMergeTH1OneByOne(kTRUE), fProcPackets(0) + fMergeTH1OneByOne(kTRUE), fProcPackets(0), + fProcessMessage(0) { fProgressStatus = new TProofProgressStatus(); } virtual ~TProofPlayerRemote(); // Owns the fOutput list virtual Long64_t Process(TDSet *set, const char *selector, @@ -330,6 +334,7 @@ class TProofPlayerRemote : public TProofPlayer { virtual Long64_t Process(TDSet *set, TSelector *selector, Option_t *option = "", Long64_t nentries = -1, Long64_t firstentry = 0); + virtual Bool_t JoinProcess(TList *workers); virtual Long64_t Finalize(Bool_t force = kFALSE, Bool_t sync = kFALSE); virtual Long64_t Finalize(TQueryResult *qr); Long64_t DrawSelect(TDSet *set, const char *varexp, diff --git a/proof/proofplayer/inc/TVirtualPacketizer.h b/proof/proofplayer/inc/TVirtualPacketizer.h index 5e45945df6a30..a9c72f8a24298 100644 --- a/proof/proofplayer/inc/TVirtualPacketizer.h +++ b/proof/proofplayer/inc/TVirtualPacketizer.h @@ -132,6 +132,7 @@ class TVirtualPacketizer : public TObject { virtual void StopProcess(Bool_t abort, Bool_t stoptimer = kFALSE); TList *GetFailedPackets() { return fFailedPackets; } void SetFailedPackets(TList *list) { fFailedPackets = list; } + virtual Int_t AddWorkers(TList *workers); Long64_t GetBytesRead() const { return (fProgressStatus? fProgressStatus->GetBytesRead() : 0); } Long64_t GetReadCalls() const { return (fProgressStatus? fProgressStatus->GetReadCalls() : 0); } diff --git a/proof/proofplayer/src/TPacketizerUnit.cxx b/proof/proofplayer/src/TPacketizerUnit.cxx index 0d6f8238621c1..855c1edf721b6 100644 --- a/proof/proofplayer/src/TPacketizerUnit.cxx +++ b/proof/proofplayer/src/TPacketizerUnit.cxx @@ -179,12 +179,16 @@ TPacketizerUnit::TPacketizerUnit(TList *slaves, Long64_t num, TList *input, // Init pointer members fWrkStats = 0; fPackets = 0; + fInput = input; Int_t fixednum = -1; - if (TProof::GetParameter(input, "PROOF_PacketizerFixedNum", fixednum) != 0 || fixednum <= 0) - fixednum = 0; - if (fixednum == 1) + if (TProof::GetParameter(input, "PROOF_PacketizerFixedNum", fixednum) != 0 || fixednum <= 0) { + fFixedNum = kFALSE; + } + else { Info("TPacketizerUnit", "forcing the same cycles on each worker"); + fFixedNum = kTRUE; + } fCalibFrac = 0.01; if (TProof::GetParameter(input, "PROOF_PacketizerCalibFrac", fCalibFrac) != 0 || fCalibFrac <= 0) @@ -231,7 +235,7 @@ TPacketizerUnit::TPacketizerUnit(TList *slaves, Long64_t num, TList *input, fTotalEntries = num; fNumPerWorker = -1; - if (fixednum == 1 && fWrkStats->GetSize() > 0) { + if (fFixedNum && fWrkStats->GetSize() > 0) { // Approximate number: the exact number is determined in GetNextPacket fNumPerWorker = fTotalEntries / fWrkStats->GetSize(); if (fNumPerWorker == 0) fNumPerWorker = 1; @@ -304,7 +308,11 @@ TDSetElement *TPacketizerUnit::GetNextPacket(TSlave *sl, TMessage *r) // Find slave TSlaveStat *slstat = (TSlaveStat*) fWrkStats->GetValue(sl); - R__ASSERT(slstat != 0); + if (!slstat) { + Warning("GetNextPacket", "Received a packet request from an unknown slave: %s:%s", + sl->GetName(), sl->GetOrdinal()); + return 0; + } PDB(kPacketizer,2) Info("GetNextPacket","worker-%s: fAssigned %lld\t", sl->GetOrdinal(), fAssigned); @@ -512,3 +520,32 @@ TDSetElement *TPacketizerUnit::GetNextPacket(TSlave *sl, TMessage *r) return elem; } + +//______________________________________________________________________________ +Int_t TPacketizerUnit::AddWorkers(TList *workers) +{ + // Adds new workers. Returns the number of workers added, or -1 on failure. + + if (!workers) { + Error("AddWorkers", "Null list of new workers!"); + return -1; + } + + Int_t curNumOfWrks = fWrkStats->GetEntries(); + + TSlave *sl; + TIter next(workers); + while (( sl = dynamic_cast(next()) )) + fWrkStats->Add(sl, new TSlaveStat(sl, fInput)); + + fNumPerWorker = -1; + if (fFixedNum && fWrkStats->GetSize() > 0) { + // Approximate number: the exact number is determined in GetNextPacket + fNumPerWorker = (fNumPerWorker * curNumOfWrks) / fWrkStats->GetSize(); + if (fNumPerWorker == 0) fNumPerWorker = 1; + } + + fConfigParams->Add(new TParameter("PROOF_PacketizerFixedNum", fNumPerWorker)); + + return fWrkStats->GetEntries(); +} diff --git a/proof/proofplayer/src/TProofPlayer.cxx b/proof/proofplayer/src/TProofPlayer.cxx index 1e5ffed7563b9..94197faa3b3eb 100644 --- a/proof/proofplayer/src/TProofPlayer.cxx +++ b/proof/proofplayer/src/TProofPlayer.cxx @@ -1502,6 +1502,14 @@ Long64_t TProofPlayer::Process(TDSet *dset, TSelector *selector, return Process(dset, (const char *)0, option, nentries, first); } +//______________________________________________________________________________ +Bool_t TProofPlayer::JoinProcess(TList *) +{ + // Not implemented: meaningful only in the remote player. Returns kFALSE. + + return kFALSE; +} + //______________________________________________________________________________ Bool_t TProofPlayer::CheckMemUsage(Long64_t &mfreq, Bool_t &w80r, Bool_t &w80v, TString &wmsg) @@ -1871,6 +1879,9 @@ TProofPlayerRemote::~TProofPlayerRemote() // Objects stored in maps are already deleted when merging the feedback SafeDelete(fFeedbackLists); SafeDelete(fPacketizer); + + if (fProcessMessage) + SafeDelete(fProcessMessage); } //______________________________________________________________________________ @@ -2140,7 +2151,8 @@ Long64_t TProofPlayerRemote::Process(TDSet *dset, const char *selector_file, // The return value is -1 in case of an error and TSelector::GetStatus() in // in case of success. - PDB(kGlobal,1) Info("Process","Enter"); + PDB(kGlobal,1) Info("Process", "Enter"); + fDSet = dset; fExitStatus = kFinished; @@ -2166,6 +2178,7 @@ Long64_t TProofPlayerRemote::Process(TDSet *dset, const char *selector_file, // Define filename TString fn; + fSelectorFileName = selector_file; if (fCreateSelObj) { if(!SendSelector(selector_file)) return -1; @@ -2325,7 +2338,10 @@ Long64_t TProofPlayerRemote::Process(TDSet *dset, const char *selector_file, TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast(set->GetEntryList()) : (TEventList *)0; if (fProof->fProtocol > 14) { + if (fProcessMessage) delete fProcessMessage; + fProcessMessage = new TMessage(kPROOF_PROCESS); mesg << set << fn << fInput << opt << num << fst << evl << sync << enl; + (*fProcessMessage) << set << fn << fInput << opt << num << fst << evl << sync << enl; } else { mesg << set << fn << fInput << opt << num << fst << evl << sync; if (enl) @@ -2468,6 +2484,55 @@ Long64_t TProofPlayerRemote::Process(TDSet *dset, TSelector *selector, return Process(dset, selector->ClassName(), option, nentries, first); } + +//______________________________________________________________________________ +Bool_t TProofPlayerRemote::JoinProcess(TList *workers) +{ + // Prepares the given list of new workers to join a progressing process. + // Returns kTRUE on success, kFALSE otherwise. + + if (!fProcessMessage || !fProof || !fPacketizer) { + Error("Process", "Should not happen: fProcessMessage=%p fProof=%p fPacketizer=%p", + fProcessMessage, fProof, fPacketizer); + return kFALSE; + } + + if (!workers || !fProof->IsMaster()) { + Error("Process", "Invalid call"); + return kFALSE; + } + + PDB(kGlobal, 1) + Info("Process", "Preparing %d new worker(s) to process", workers->GetEntries()); + + // Sends the file associated to the TSelector, if necessary + if (fCreateSelObj) { + PDB(kGlobal, 2) + Info("Process", "Sending selector file %s", fSelectorFileName.Data()); + if(!SendSelector(fSelectorFileName.Data())) { + Error("Process", "Problems in sending selector file %s", fSelectorFileName.Data()); + return kFALSE; + } + } + + PDB(kGlobal, 2) + Info("Process", "Adding new workers to the packetizer"); + if (fPacketizer->AddWorkers(workers) == -1) { + Error("Process", "Cannot add new workers to the packetizer!"); + return kFALSE; // TODO: make new wrks inactive + } + + PDB(kGlobal, 2) + Info("Process", "Broadcasting process message to new workers"); + fProof->Broadcast(*fProcessMessage, workers); + + // Don't call Collect(): we came here from a global Collect() already which + // will take care of new workers as well + + return kTRUE; + +} + //______________________________________________________________________________ Bool_t TProofPlayerRemote::MergeOutputFiles() { diff --git a/proof/proofplayer/src/TVirtualPacketizer.cxx b/proof/proofplayer/src/TVirtualPacketizer.cxx index b93bf1c9eb8ba..5a2529deb314c 100644 --- a/proof/proofplayer/src/TVirtualPacketizer.cxx +++ b/proof/proofplayer/src/TVirtualPacketizer.cxx @@ -434,3 +434,14 @@ void TVirtualPacketizer::SetInitTime() Info("SetInitTime","fInitTime set to %f s", fInitTime); } } + +//______________________________________________________________________________ +Int_t TVirtualPacketizer::AddWorkers(TList *) +{ + // Adds new workers. Must be implemented by each real packetizer properly. + // Returns the number of workers added, or -1 on failure. + + Warning("AddWorkers", "Not implemented for this packetizer"); + + return -1; +} diff --git a/proof/proofx/src/TXProofServ.cxx b/proof/proofx/src/TXProofServ.cxx index 9bdf35add2a97..4bc1316739870 100644 --- a/proof/proofx/src/TXProofServ.cxx +++ b/proof/proofx/src/TXProofServ.cxx @@ -806,8 +806,8 @@ TProofServ::EQueryAction TXProofServ::GetWorkers(TList *workers, } if (s.IsDigit()) { nwrks = s.Atoi(); - if (nwrks > 0) { - // Notify + if (!dynamicStartup && (nwrks > 0)) { + // Notify, except in dynamic workers mode to avoid flooding TString msg; if (pernode) { msg.Form("+++ Starting max %d workers per node following the setting of PROOF_NWORKERS", nwrks);